/* * Copyright 2021 The WebRTC project authors. All Rights Reserved. * * Use of this source code is governed by a BSD-style license * that can be found in the LICENSE file in the root of the source * tree. An additional intellectual property rights grant can be found * in the file PATENTS. All contributing project authors may * be found in the AUTHORS file in the root of the source tree. */ #include "media/sctp/dcsctp_transport.h" #include #include #include #include #include #include "absl/strings/string_view.h" #include "absl/types/optional.h" #include "api/array_view.h" #include "media/base/media_channel.h" #include "net/dcsctp/public/dcsctp_socket_factory.h" #include "net/dcsctp/public/packet_observer.h" #include "net/dcsctp/public/text_pcap_packet_observer.h" #include "net/dcsctp/public/types.h" #include "p2p/base/packet_transport_internal.h" #include "rtc_base/checks.h" #include "rtc_base/logging.h" #include "rtc_base/socket.h" #include "rtc_base/strings/string_builder.h" #include "rtc_base/thread.h" #include "rtc_base/trace_event.h" #include "system_wrappers/include/clock.h" namespace webrtc { namespace { using ::dcsctp::SendPacketStatus; // When there is packet loss for a long time, the SCTP retry timers will use // exponential backoff, which can grow to very long durations and when the // connection recovers, it may take a long time to reach the new backoff // duration. By limiting it to a reasonable limit, the time to recover reduces. constexpr dcsctp::DurationMs kMaxTimerBackoffDuration = dcsctp::DurationMs(3000); enum class WebrtcPPID : dcsctp::PPID::UnderlyingType { // https://www.rfc-editor.org/rfc/rfc8832.html#section-8.1 kDCEP = 50, // https://www.rfc-editor.org/rfc/rfc8831.html#section-8 kString = 51, kBinaryPartial = 52, // Deprecated kBinary = 53, kStringPartial = 54, // Deprecated kStringEmpty = 56, kBinaryEmpty = 57, }; WebrtcPPID ToPPID(DataMessageType message_type, size_t size) { switch (message_type) { case webrtc::DataMessageType::kControl: return WebrtcPPID::kDCEP; case webrtc::DataMessageType::kText: return size > 0 ? WebrtcPPID::kString : WebrtcPPID::kStringEmpty; case webrtc::DataMessageType::kBinary: return size > 0 ? WebrtcPPID::kBinary : WebrtcPPID::kBinaryEmpty; } } absl::optional ToDataMessageType(dcsctp::PPID ppid) { switch (static_cast(ppid.value())) { case WebrtcPPID::kDCEP: return webrtc::DataMessageType::kControl; case WebrtcPPID::kString: case WebrtcPPID::kStringPartial: case WebrtcPPID::kStringEmpty: return webrtc::DataMessageType::kText; case WebrtcPPID::kBinary: case WebrtcPPID::kBinaryPartial: case WebrtcPPID::kBinaryEmpty: return webrtc::DataMessageType::kBinary; } return absl::nullopt; } absl::optional ToErrorCauseCode( dcsctp::ErrorKind error) { switch (error) { case dcsctp::ErrorKind::kParseFailed: return cricket::SctpErrorCauseCode::kUnrecognizedParameters; case dcsctp::ErrorKind::kPeerReported: return cricket::SctpErrorCauseCode::kUserInitiatedAbort; case dcsctp::ErrorKind::kWrongSequence: case dcsctp::ErrorKind::kProtocolViolation: return cricket::SctpErrorCauseCode::kProtocolViolation; case dcsctp::ErrorKind::kResourceExhaustion: return cricket::SctpErrorCauseCode::kOutOfResource; case dcsctp::ErrorKind::kTooManyRetries: case dcsctp::ErrorKind::kUnsupportedOperation: case dcsctp::ErrorKind::kNoError: case dcsctp::ErrorKind::kNotConnected: // No SCTP error cause code matches those break; } return absl::nullopt; } bool IsEmptyPPID(dcsctp::PPID ppid) { WebrtcPPID webrtc_ppid = static_cast(ppid.value()); return webrtc_ppid == WebrtcPPID::kStringEmpty || webrtc_ppid == WebrtcPPID::kBinaryEmpty; } } // namespace DcSctpTransport::DcSctpTransport(rtc::Thread* network_thread, rtc::PacketTransportInternal* transport, Clock* clock) : DcSctpTransport(network_thread, transport, clock, std::make_unique()) {} DcSctpTransport::DcSctpTransport( rtc::Thread* network_thread, rtc::PacketTransportInternal* transport, Clock* clock, std::unique_ptr socket_factory) : network_thread_(network_thread), transport_(transport), clock_(clock), random_(clock_->TimeInMicroseconds()), socket_factory_(std::move(socket_factory)), task_queue_timeout_factory_( *network_thread, [this]() { return TimeMillis(); }, [this](dcsctp::TimeoutID timeout_id) { socket_->HandleTimeout(timeout_id); }) { RTC_DCHECK_RUN_ON(network_thread_); static std::atomic instance_count = 0; rtc::StringBuilder sb; sb << debug_name_ << instance_count++; debug_name_ = sb.Release(); ConnectTransportSignals(); } DcSctpTransport::~DcSctpTransport() { if (socket_) { socket_->Close(); } } void DcSctpTransport::SetOnConnectedCallback(std::function callback) { RTC_DCHECK_RUN_ON(network_thread_); on_connected_callback_ = std::move(callback); } void DcSctpTransport::SetDataChannelSink(DataChannelSink* sink) { RTC_DCHECK_RUN_ON(network_thread_); data_channel_sink_ = sink; if (data_channel_sink_ && ready_to_send_data_) { data_channel_sink_->OnReadyToSend(); } } void DcSctpTransport::SetDtlsTransport( rtc::PacketTransportInternal* transport) { RTC_DCHECK_RUN_ON(network_thread_); DisconnectTransportSignals(); transport_ = transport; ConnectTransportSignals(); MaybeConnectSocket(); } bool DcSctpTransport::Start(int local_sctp_port, int remote_sctp_port, int max_message_size) { RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK(max_message_size > 0); RTC_DLOG(LS_INFO) << debug_name_ << "->Start(local=" << local_sctp_port << ", remote=" << remote_sctp_port << ", max_message_size=" << max_message_size << ")"; if (!socket_) { dcsctp::DcSctpOptions options; options.local_port = local_sctp_port; options.remote_port = remote_sctp_port; options.max_message_size = max_message_size; options.max_timer_backoff_duration = kMaxTimerBackoffDuration; // Don't close the connection automatically on too many retransmissions. options.max_retransmissions = absl::nullopt; options.max_init_retransmits = absl::nullopt; std::unique_ptr packet_observer; if (RTC_LOG_CHECK_LEVEL(LS_VERBOSE)) { packet_observer = std::make_unique(debug_name_); } socket_ = socket_factory_->Create(debug_name_, *this, std::move(packet_observer), options); } else { if (local_sctp_port != socket_->options().local_port || remote_sctp_port != socket_->options().remote_port) { RTC_LOG(LS_ERROR) << debug_name_ << "->Start(local=" << local_sctp_port << ", remote=" << remote_sctp_port << "): Can't change ports on already started transport."; return false; } socket_->SetMaxMessageSize(max_message_size); } MaybeConnectSocket(); return true; } bool DcSctpTransport::OpenStream(int sid) { RTC_DCHECK_RUN_ON(network_thread_); RTC_DLOG(LS_INFO) << debug_name_ << "->OpenStream(" << sid << ")."; StreamState stream_state; stream_states_.insert_or_assign(dcsctp::StreamID(static_cast(sid)), stream_state); return true; } bool DcSctpTransport::ResetStream(int sid) { RTC_DCHECK_RUN_ON(network_thread_); RTC_DLOG(LS_INFO) << debug_name_ << "->ResetStream(" << sid << ")."; if (!socket_) { RTC_LOG(LS_ERROR) << debug_name_ << "->ResetStream(sid=" << sid << "): Transport is not started."; return false; } dcsctp::StreamID streams[1] = {dcsctp::StreamID(static_cast(sid))}; auto it = stream_states_.find(streams[0]); if (it == stream_states_.end()) { RTC_LOG(LS_ERROR) << debug_name_ << "->ResetStream(sid=" << sid << "): Stream is not open."; return false; } StreamState& stream_state = it->second; if (stream_state.closure_initiated || stream_state.incoming_reset_done || stream_state.outgoing_reset_done) { // The closing procedure was already initiated by the remote, don't do // anything. return false; } stream_state.closure_initiated = true; socket_->ResetStreams(streams); return true; } bool DcSctpTransport::SendData(int sid, const SendDataParams& params, const rtc::CopyOnWriteBuffer& payload, cricket::SendDataResult* result) { RTC_DCHECK_RUN_ON(network_thread_); RTC_DLOG(LS_VERBOSE) << debug_name_ << "->SendData(sid=" << sid << ", type=" << static_cast(params.type) << ", length=" << payload.size() << ")."; if (!socket_) { RTC_LOG(LS_ERROR) << debug_name_ << "->SendData(...): Transport is not started."; *result = cricket::SDR_ERROR; return false; } // It is possible for a message to be sent from the signaling thread at the // same time a data-channel is closing, but before the signaling thread is // aware of it. So we need to keep track of currently active data channels and // skip sending messages for the ones that are not open or closing. // The sending errors are not impacting the data channel API contract as // it is allowed to discard queued messages when the channel is closing. auto stream_state = stream_states_.find(dcsctp::StreamID(static_cast(sid))); if (stream_state == stream_states_.end()) { RTC_LOG(LS_VERBOSE) << "Skipping message on non-open stream with sid: " << sid; *result = cricket::SDR_ERROR; return false; } if (stream_state->second.closure_initiated || stream_state->second.incoming_reset_done || stream_state->second.outgoing_reset_done) { RTC_LOG(LS_VERBOSE) << "Skipping message on closing stream with sid: " << sid; *result = cricket::SDR_ERROR; return false; } auto max_message_size = socket_->options().max_message_size; if (max_message_size > 0 && payload.size() > max_message_size) { RTC_LOG(LS_WARNING) << debug_name_ << "->SendData(...): " "Trying to send packet bigger " "than the max message size: " << payload.size() << " vs max of " << max_message_size; *result = cricket::SDR_ERROR; return false; } std::vector message_payload(payload.cdata(), payload.cdata() + payload.size()); if (message_payload.empty()) { // https://www.rfc-editor.org/rfc/rfc8831.html#section-6.6 // SCTP does not support the sending of empty user messages. Therefore, if // an empty message has to be sent, the appropriate PPID (WebRTC String // Empty or WebRTC Binary Empty) is used, and the SCTP user message of one // zero byte is sent. message_payload.push_back('\0'); } dcsctp::DcSctpMessage message( dcsctp::StreamID(static_cast(sid)), dcsctp::PPID(static_cast(ToPPID(params.type, payload.size()))), std::move(message_payload)); dcsctp::SendOptions send_options; send_options.unordered = dcsctp::IsUnordered(!params.ordered); if (params.max_rtx_ms.has_value()) { RTC_DCHECK(*params.max_rtx_ms >= 0 && *params.max_rtx_ms <= std::numeric_limits::max()); send_options.lifetime = dcsctp::DurationMs(*params.max_rtx_ms); } if (params.max_rtx_count.has_value()) { RTC_DCHECK(*params.max_rtx_count >= 0 && *params.max_rtx_count <= std::numeric_limits::max()); send_options.max_retransmissions = *params.max_rtx_count; } auto error = socket_->Send(std::move(message), send_options); switch (error) { case dcsctp::SendStatus::kSuccess: *result = cricket::SDR_SUCCESS; break; case dcsctp::SendStatus::kErrorResourceExhaustion: *result = cricket::SDR_BLOCK; ready_to_send_data_ = false; break; default: RTC_LOG(LS_ERROR) << debug_name_ << "->SendData(...): send() failed with error " << dcsctp::ToString(error) << "."; *result = cricket::SDR_ERROR; break; } return *result == cricket::SDR_SUCCESS; } bool DcSctpTransport::ReadyToSendData() { return ready_to_send_data_; } int DcSctpTransport::max_message_size() const { if (!socket_) { RTC_LOG(LS_ERROR) << debug_name_ << "->max_message_size(...): Transport is not started."; return 0; } return socket_->options().max_message_size; } absl::optional DcSctpTransport::max_outbound_streams() const { if (!socket_) return absl::nullopt; return socket_->options().announced_maximum_outgoing_streams; } absl::optional DcSctpTransport::max_inbound_streams() const { if (!socket_) return absl::nullopt; return socket_->options().announced_maximum_incoming_streams; } void DcSctpTransport::set_debug_name_for_testing(const char* debug_name) { debug_name_ = debug_name; } SendPacketStatus DcSctpTransport::SendPacketWithStatus( rtc::ArrayView data) { RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK(socket_); if (data.size() > (socket_->options().mtu)) { RTC_LOG(LS_ERROR) << debug_name_ << "->SendPacket(...): " "SCTP seems to have made a packet that is bigger " "than its official MTU: " << data.size() << " vs max of " << socket_->options().mtu; return SendPacketStatus::kError; } TRACE_EVENT0("webrtc", "DcSctpTransport::SendPacket"); if (!transport_ || !transport_->writable()) return SendPacketStatus::kError; RTC_DLOG(LS_VERBOSE) << debug_name_ << "->SendPacket(length=" << data.size() << ")"; auto result = transport_->SendPacket(reinterpret_cast(data.data()), data.size(), rtc::PacketOptions(), 0); if (result < 0) { RTC_LOG(LS_WARNING) << debug_name_ << "->SendPacket(length=" << data.size() << ") failed with error: " << transport_->GetError() << "."; if (rtc::IsBlockingError(transport_->GetError())) { return SendPacketStatus::kTemporaryFailure; } return SendPacketStatus::kError; } return SendPacketStatus::kSuccess; } std::unique_ptr DcSctpTransport::CreateTimeout( webrtc::TaskQueueBase::DelayPrecision precision) { return task_queue_timeout_factory_.CreateTimeout(precision); } dcsctp::TimeMs DcSctpTransport::TimeMillis() { return dcsctp::TimeMs(clock_->TimeInMilliseconds()); } uint32_t DcSctpTransport::GetRandomInt(uint32_t low, uint32_t high) { return random_.Rand(low, high); } void DcSctpTransport::OnTotalBufferedAmountLow() { RTC_DCHECK_RUN_ON(network_thread_); if (!ready_to_send_data_) { ready_to_send_data_ = true; if (data_channel_sink_) { data_channel_sink_->OnReadyToSend(); } } } void DcSctpTransport::OnMessageReceived(dcsctp::DcSctpMessage message) { RTC_DCHECK_RUN_ON(network_thread_); RTC_DLOG(LS_VERBOSE) << debug_name_ << "->OnMessageReceived(sid=" << message.stream_id().value() << ", ppid=" << message.ppid().value() << ", length=" << message.payload().size() << ")."; cricket::ReceiveDataParams receive_data_params; receive_data_params.sid = message.stream_id().value(); auto type = ToDataMessageType(message.ppid()); if (!type.has_value()) { RTC_LOG(LS_VERBOSE) << debug_name_ << "->OnMessageReceived(): Received an unknown PPID " << message.ppid().value() << " on an SCTP packet. Dropping."; } receive_data_params.type = *type; // No seq_num available from dcSCTP receive_data_params.seq_num = 0; receive_buffer_.Clear(); if (!IsEmptyPPID(message.ppid())) receive_buffer_.AppendData(message.payload().data(), message.payload().size()); if (data_channel_sink_) { data_channel_sink_->OnDataReceived( receive_data_params.sid, receive_data_params.type, receive_buffer_); } } void DcSctpTransport::OnError(dcsctp::ErrorKind error, absl::string_view message) { if (error == dcsctp::ErrorKind::kResourceExhaustion) { // Indicates that a message failed to be enqueued, because the send buffer // is full, which is a very common (and wanted) state for high throughput // sending/benchmarks. RTC_LOG(LS_VERBOSE) << debug_name_ << "->OnError(error=" << dcsctp::ToString(error) << ", message=" << message << ")."; } else { RTC_LOG(LS_ERROR) << debug_name_ << "->OnError(error=" << dcsctp::ToString(error) << ", message=" << message << ")."; } } void DcSctpTransport::OnAborted(dcsctp::ErrorKind error, absl::string_view message) { RTC_DCHECK_RUN_ON(network_thread_); RTC_LOG(LS_ERROR) << debug_name_ << "->OnAborted(error=" << dcsctp::ToString(error) << ", message=" << message << ")."; ready_to_send_data_ = false; RTCError rtc_error(RTCErrorType::OPERATION_ERROR_WITH_DATA, std::string(message)); rtc_error.set_error_detail(RTCErrorDetailType::SCTP_FAILURE); auto code = ToErrorCauseCode(error); if (code.has_value()) { rtc_error.set_sctp_cause_code(static_cast(*code)); } if (data_channel_sink_) { data_channel_sink_->OnTransportClosed(rtc_error); } } void DcSctpTransport::OnConnected() { RTC_DCHECK_RUN_ON(network_thread_); RTC_DLOG(LS_INFO) << debug_name_ << "->OnConnected()."; ready_to_send_data_ = true; if (data_channel_sink_) { data_channel_sink_->OnReadyToSend(); } if (on_connected_callback_) { on_connected_callback_(); } } void DcSctpTransport::OnClosed() { RTC_DLOG(LS_INFO) << debug_name_ << "->OnClosed()."; ready_to_send_data_ = false; } void DcSctpTransport::OnConnectionRestarted() { RTC_DLOG(LS_INFO) << debug_name_ << "->OnConnectionRestarted()."; } void DcSctpTransport::OnStreamsResetFailed( rtc::ArrayView outgoing_streams, absl::string_view reason) { // TODO(orphis): Need a test to check for correct behavior for (auto& stream_id : outgoing_streams) { RTC_LOG(LS_WARNING) << debug_name_ << "->OnStreamsResetFailed(...): Outgoing stream reset failed" << ", sid=" << stream_id.value() << ", reason: " << reason << "."; } } void DcSctpTransport::OnStreamsResetPerformed( rtc::ArrayView outgoing_streams) { RTC_DCHECK_RUN_ON(network_thread_); for (auto& stream_id : outgoing_streams) { RTC_LOG(LS_INFO) << debug_name_ << "->OnStreamsResetPerformed(...): Outgoing stream reset" << ", sid=" << stream_id.value(); auto it = stream_states_.find(stream_id); if (it == stream_states_.end()) { // Ignoring an outgoing stream reset for a closed stream return; } StreamState& stream_state = it->second; stream_state.outgoing_reset_done = true; if (stream_state.incoming_reset_done) { // When the close was not initiated locally, we can signal the end of the // data channel close procedure when the remote ACKs the reset. if (data_channel_sink_) { data_channel_sink_->OnChannelClosed(stream_id.value()); } stream_states_.erase(stream_id); } } } void DcSctpTransport::OnIncomingStreamsReset( rtc::ArrayView incoming_streams) { RTC_DCHECK_RUN_ON(network_thread_); for (auto& stream_id : incoming_streams) { RTC_LOG(LS_INFO) << debug_name_ << "->OnIncomingStreamsReset(...): Incoming stream reset" << ", sid=" << stream_id.value(); auto it = stream_states_.find(stream_id); if (it == stream_states_.end()) return; StreamState& stream_state = it->second; stream_state.incoming_reset_done = true; if (!stream_state.closure_initiated) { // When receiving an incoming stream reset event for a non local close // procedure, the transport needs to reset the stream in the other // direction too. dcsctp::StreamID streams[1] = {stream_id}; socket_->ResetStreams(streams); if (data_channel_sink_) { data_channel_sink_->OnChannelClosing(stream_id.value()); } } if (stream_state.outgoing_reset_done) { // The close procedure that was initiated locally is complete when we // receive and incoming reset event. if (data_channel_sink_) { data_channel_sink_->OnChannelClosed(stream_id.value()); } stream_states_.erase(stream_id); } } } void DcSctpTransport::ConnectTransportSignals() { RTC_DCHECK_RUN_ON(network_thread_); if (!transport_) { return; } transport_->SignalWritableState.connect( this, &DcSctpTransport::OnTransportWritableState); transport_->SignalReadPacket.connect(this, &DcSctpTransport::OnTransportReadPacket); transport_->SignalClosed.connect(this, &DcSctpTransport::OnTransportClosed); } void DcSctpTransport::DisconnectTransportSignals() { RTC_DCHECK_RUN_ON(network_thread_); if (!transport_) { return; } transport_->SignalWritableState.disconnect(this); transport_->SignalReadPacket.disconnect(this); transport_->SignalClosed.disconnect(this); } void DcSctpTransport::OnTransportWritableState( rtc::PacketTransportInternal* transport) { RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK_EQ(transport_, transport); RTC_DLOG(LS_VERBOSE) << debug_name_ << "->OnTransportWritableState(), writable=" << transport->writable(); MaybeConnectSocket(); } void DcSctpTransport::OnTransportReadPacket( rtc::PacketTransportInternal* transport, const char* data, size_t length, const int64_t& /* packet_time_us */, int flags) { RTC_DCHECK_RUN_ON(network_thread_); if (flags) { // We are only interested in SCTP packets. return; } RTC_DLOG(LS_VERBOSE) << debug_name_ << "->OnTransportReadPacket(), length=" << length; if (socket_) { socket_->ReceivePacket(rtc::ArrayView( reinterpret_cast(data), length)); } } void DcSctpTransport::OnTransportClosed( rtc::PacketTransportInternal* transport) { RTC_DCHECK_RUN_ON(network_thread_); RTC_DLOG(LS_VERBOSE) << debug_name_ << "->OnTransportClosed()."; if (data_channel_sink_) { data_channel_sink_->OnTransportClosed({}); } } void DcSctpTransport::MaybeConnectSocket() { if (transport_ && transport_->writable() && socket_ && socket_->state() == dcsctp::SocketState::kClosed) { socket_->Connect(); } } } // namespace webrtc