/* * Copyright 2019 The WebRTC project authors. All Rights Reserved. * * Use of this source code is governed by a BSD-style license * that can be found in the LICENSE file in the root of the source * tree. An additional intellectual property rights grant can be found * in the file PATENTS. All contributing project authors may * be found in the AUTHORS file in the root of the source tree. */ #include "pc/data_channel_controller.h" #include #include "absl/algorithm/container.h" #include "api/peer_connection_interface.h" #include "api/rtc_error.h" #include "pc/peer_connection_internal.h" #include "pc/sctp_utils.h" #include "rtc_base/logging.h" namespace webrtc { DataChannelController::~DataChannelController() { RTC_DCHECK(sctp_data_channels_n_.empty()) << "Missing call to TeardownDataChannelTransport_n?"; RTC_DCHECK(!signaling_safety_.flag()->alive()) << "Missing call to PrepareForShutdown?"; } bool DataChannelController::HasDataChannels() const { RTC_DCHECK_RUN_ON(signaling_thread()); return channel_usage_ == DataChannelUsage::kInUse; } bool DataChannelController::HasUsedDataChannels() const { RTC_DCHECK_RUN_ON(signaling_thread()); return channel_usage_ != DataChannelUsage::kNeverUsed; } RTCError DataChannelController::SendData( StreamId sid, const SendDataParams& params, const rtc::CopyOnWriteBuffer& payload) { RTC_DCHECK_RUN_ON(network_thread()); if (!data_channel_transport_) { RTC_LOG(LS_ERROR) << "SendData called before transport is ready"; return RTCError(RTCErrorType::INVALID_STATE); } return data_channel_transport_->SendData(sid.stream_id_int(), params, payload); } void DataChannelController::AddSctpDataStream(StreamId sid) { RTC_DCHECK_RUN_ON(network_thread()); RTC_DCHECK(sid.HasValue()); if (data_channel_transport_) { data_channel_transport_->OpenChannel(sid.stream_id_int()); } } void DataChannelController::RemoveSctpDataStream(StreamId sid) { RTC_DCHECK_RUN_ON(network_thread()); if (data_channel_transport_) { data_channel_transport_->CloseChannel(sid.stream_id_int()); } } void DataChannelController::OnChannelStateChanged( SctpDataChannel* channel, DataChannelInterface::DataState state) { RTC_DCHECK_RUN_ON(network_thread()); // Stash away the internal id here in case `OnSctpDataChannelClosed` ends up // releasing the last reference to the channel. const int channel_id = channel->internal_id(); if (state == DataChannelInterface::DataState::kClosed) OnSctpDataChannelClosed(channel); DataChannelUsage channel_usage = sctp_data_channels_n_.empty() ? DataChannelUsage::kHaveBeenUsed : DataChannelUsage::kInUse; signaling_thread()->PostTask(SafeTask( signaling_safety_.flag(), [this, channel_id, state, channel_usage] { RTC_DCHECK_RUN_ON(signaling_thread()); channel_usage_ = channel_usage; pc_->OnSctpDataChannelStateChanged(channel_id, state); })); } void DataChannelController::OnDataReceived( int channel_id, DataMessageType type, const rtc::CopyOnWriteBuffer& buffer) { RTC_DCHECK_RUN_ON(network_thread()); if (HandleOpenMessage_n(channel_id, type, buffer)) return; auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) { return c->sid_n().stream_id_int() == channel_id; }); if (it != sctp_data_channels_n_.end()) (*it)->OnDataReceived(type, buffer); } void DataChannelController::OnChannelClosing(int channel_id) { RTC_DCHECK_RUN_ON(network_thread()); auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) { return c->sid_n().stream_id_int() == channel_id; }); if (it != sctp_data_channels_n_.end()) (*it)->OnClosingProcedureStartedRemotely(); } void DataChannelController::OnChannelClosed(int channel_id) { RTC_DCHECK_RUN_ON(network_thread()); StreamId sid(channel_id); sid_allocator_.ReleaseSid(sid); auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) { return c->sid_n() == sid; }); if (it != sctp_data_channels_n_.end()) { rtc::scoped_refptr channel = std::move(*it); sctp_data_channels_n_.erase(it); channel->OnClosingProcedureComplete(); } } void DataChannelController::OnReadyToSend() { RTC_DCHECK_RUN_ON(network_thread()); auto copy = sctp_data_channels_n_; for (const auto& channel : copy) { if (channel->sid_n().HasValue()) { channel->OnTransportReady(); } else { // This happens for role==SSL_SERVER channels when we get notified by // the transport *before* the SDP code calls `AllocateSctpSids` to // trigger assignment of sids. In this case OnTransportReady() will be // called from within `AllocateSctpSids` below. RTC_LOG(LS_INFO) << "OnReadyToSend: Still waiting for an id for channel."; } } } void DataChannelController::OnTransportClosed(RTCError error) { RTC_DCHECK_RUN_ON(network_thread()); // This loop will close all data channels and trigger a callback to // `OnSctpDataChannelClosed`. We'll empty `sctp_data_channels_n_`, first // and `OnSctpDataChannelClosed` will become a noop but we'll release the // StreamId here. std::vector> temp_sctp_dcs; temp_sctp_dcs.swap(sctp_data_channels_n_); for (const auto& channel : temp_sctp_dcs) { channel->OnTransportChannelClosed(error); sid_allocator_.ReleaseSid(channel->sid_n()); } } void DataChannelController::SetupDataChannelTransport_n( DataChannelTransportInterface* transport) { RTC_DCHECK_RUN_ON(network_thread()); RTC_DCHECK(transport); set_data_channel_transport(transport); } void DataChannelController::PrepareForShutdown() { RTC_DCHECK_RUN_ON(signaling_thread()); signaling_safety_.reset(PendingTaskSafetyFlag::CreateDetachedInactive()); if (channel_usage_ != DataChannelUsage::kNeverUsed) channel_usage_ = DataChannelUsage::kHaveBeenUsed; } void DataChannelController::TeardownDataChannelTransport_n(RTCError error) { RTC_DCHECK_RUN_ON(network_thread()); OnTransportClosed(error); set_data_channel_transport(nullptr); RTC_DCHECK(sctp_data_channels_n_.empty()); weak_factory_.InvalidateWeakPtrs(); } void DataChannelController::OnTransportChanged( DataChannelTransportInterface* new_data_channel_transport) { RTC_DCHECK_RUN_ON(network_thread()); if (data_channel_transport_ && data_channel_transport_ != new_data_channel_transport) { // Changed which data channel transport is used for `sctp_mid_` (eg. now // it's bundled). set_data_channel_transport(new_data_channel_transport); } } std::vector DataChannelController::GetDataChannelStats() const { RTC_DCHECK_RUN_ON(network_thread()); std::vector stats; stats.reserve(sctp_data_channels_n_.size()); for (const auto& channel : sctp_data_channels_n_) stats.push_back(channel->GetStats()); return stats; } bool DataChannelController::HandleOpenMessage_n( int channel_id, DataMessageType type, const rtc::CopyOnWriteBuffer& buffer) { if (type != DataMessageType::kControl || !IsOpenMessage(buffer)) return false; // Received OPEN message; parse and signal that a new data channel should // be created. std::string label; InternalDataChannelInit config; config.id = channel_id; if (!ParseDataChannelOpenMessage(buffer, &label, &config)) { RTC_LOG(LS_WARNING) << "Failed to parse the OPEN message for sid " << channel_id; } else { config.open_handshake_role = InternalDataChannelInit::kAcker; auto channel_or_error = CreateDataChannel(label, config); if (channel_or_error.ok()) { signaling_thread()->PostTask(SafeTask( signaling_safety_.flag(), [this, channel = channel_or_error.MoveValue(), ready_to_send = data_channel_transport_->IsReadyToSend()] { RTC_DCHECK_RUN_ON(signaling_thread()); OnDataChannelOpenMessage(std::move(channel), ready_to_send); })); } else { RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message." << ToString(channel_or_error.error().type()); } } return true; } void DataChannelController::OnDataChannelOpenMessage( rtc::scoped_refptr channel, bool ready_to_send) { channel_usage_ = DataChannelUsage::kInUse; auto proxy = SctpDataChannel::CreateProxy(channel, signaling_safety_.flag()); pc_->Observer()->OnDataChannel(proxy); pc_->NoteDataAddedEvent(); if (ready_to_send) { network_thread()->PostTask([channel = std::move(channel)] { if (channel->state() != DataChannelInterface::DataState::kClosed) channel->OnTransportReady(); }); } } // RTC_RUN_ON(network_thread()) RTCError DataChannelController::ReserveOrAllocateSid( StreamId& sid, absl::optional fallback_ssl_role) { if (sid.HasValue()) { return sid_allocator_.ReserveSid(sid) ? RTCError::OK() : RTCError(RTCErrorType::INVALID_RANGE, "StreamId out of range or reserved."); } // Attempt to allocate an ID based on the negotiated role. absl::optional role = pc_->GetSctpSslRole_n(); if (!role) role = fallback_ssl_role; if (role) { sid = sid_allocator_.AllocateSid(*role); if (!sid.HasValue()) return RTCError(RTCErrorType::RESOURCE_EXHAUSTED); } // When we get here, we may still not have an ID, but that's a supported case // whereby an id will be assigned later. RTC_DCHECK(sid.HasValue() || !role); return RTCError::OK(); } // RTC_RUN_ON(network_thread()) RTCErrorOr> DataChannelController::CreateDataChannel(const std::string& label, InternalDataChannelInit& config) { StreamId sid(config.id); RTCError err = ReserveOrAllocateSid(sid, config.fallback_ssl_role); if (!err.ok()) return err; // In case `sid` has changed. Update `config` accordingly. config.id = sid.stream_id_int(); rtc::scoped_refptr channel = SctpDataChannel::Create( weak_factory_.GetWeakPtr(), label, data_channel_transport_ != nullptr, config, signaling_thread(), network_thread()); RTC_DCHECK(channel); sctp_data_channels_n_.push_back(channel); // If we have an id already, notify the transport. if (sid.HasValue()) AddSctpDataStream(sid); return channel; } RTCErrorOr> DataChannelController::InternalCreateDataChannelWithProxy( const std::string& label, const InternalDataChannelInit& config) { RTC_DCHECK_RUN_ON(signaling_thread()); RTC_DCHECK(!pc_->IsClosed()); if (!config.IsValid()) { LOG_AND_RETURN_ERROR(RTCErrorType::INVALID_PARAMETER, "Invalid DataChannelInit"); } bool ready_to_send = false; InternalDataChannelInit new_config = config; StreamId sid(new_config.id); auto ret = network_thread()->BlockingCall( [&]() -> RTCErrorOr> { RTC_DCHECK_RUN_ON(network_thread()); auto channel = CreateDataChannel(label, new_config); if (!channel.ok()) return channel; ready_to_send = data_channel_transport_ && data_channel_transport_->IsReadyToSend(); if (ready_to_send) { // If the transport is ready to send because the initial channel // ready signal may have been sent before the DataChannel creation. // This has to be done async because the upper layer objects (e.g. // Chrome glue and WebKit) are not wired up properly until after // `InternalCreateDataChannelWithProxy` returns. network_thread()->PostTask([channel = channel.value()] { if (channel->state() != DataChannelInterface::DataState::kClosed) channel->OnTransportReady(); }); } return channel; }); if (!ret.ok()) return ret.MoveError(); channel_usage_ = DataChannelUsage::kInUse; return SctpDataChannel::CreateProxy(ret.MoveValue(), signaling_safety_.flag()); } void DataChannelController::AllocateSctpSids(rtc::SSLRole role) { RTC_DCHECK_RUN_ON(network_thread()); const bool ready_to_send = data_channel_transport_ && data_channel_transport_->IsReadyToSend(); std::vector> channels_to_update; std::vector> channels_to_close; for (auto it = sctp_data_channels_n_.begin(); it != sctp_data_channels_n_.end();) { if (!(*it)->sid_n().HasValue()) { StreamId sid = sid_allocator_.AllocateSid(role); if (sid.HasValue()) { (*it)->SetSctpSid_n(sid); AddSctpDataStream(sid); if (ready_to_send) { RTC_LOG(LS_INFO) << "AllocateSctpSids: Id assigned, ready to send."; (*it)->OnTransportReady(); } channels_to_update.push_back(std::make_pair((*it).get(), sid)); } else { channels_to_close.push_back(std::move(*it)); it = sctp_data_channels_n_.erase(it); continue; } } ++it; } // Since closing modifies the list of channels, we have to do the actual // closing outside the loop. for (const auto& channel : channels_to_close) { channel->CloseAbruptlyWithDataChannelFailure("Failed to allocate SCTP SID"); } } void DataChannelController::OnSctpDataChannelClosed(SctpDataChannel* channel) { RTC_DCHECK_RUN_ON(network_thread()); // After the closing procedure is done, it's safe to use this ID for // another data channel. if (channel->sid_n().HasValue()) { sid_allocator_.ReleaseSid(channel->sid_n()); } auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) { return c.get() == channel; }); if (it != sctp_data_channels_n_.end()) sctp_data_channels_n_.erase(it); } void DataChannelController::set_data_channel_transport( DataChannelTransportInterface* transport) { RTC_DCHECK_RUN_ON(network_thread()); if (data_channel_transport_) data_channel_transport_->SetDataSink(nullptr); data_channel_transport_ = transport; if (data_channel_transport_) { // There's a new data channel transport. This needs to be signaled to the // `sctp_data_channels_n_` so that they can reopen and reconnect. This is // necessary when bundling is applied. NotifyDataChannelsOfTransportCreated(); data_channel_transport_->SetDataSink(this); } } void DataChannelController::NotifyDataChannelsOfTransportCreated() { RTC_DCHECK_RUN_ON(network_thread()); RTC_DCHECK(data_channel_transport_); for (const auto& channel : sctp_data_channels_n_) { if (channel->sid_n().HasValue()) AddSctpDataStream(channel->sid_n()); channel->OnTransportChannelCreated(); } } rtc::Thread* DataChannelController::network_thread() const { return pc_->network_thread(); } rtc::Thread* DataChannelController::signaling_thread() const { return pc_->signaling_thread(); } } // namespace webrtc