diff options
Diffstat (limited to 'third_party/libwebrtc/pc/data_channel_controller.cc')
-rw-r--r-- | third_party/libwebrtc/pc/data_channel_controller.cc | 440 |
1 files changed, 440 insertions, 0 deletions
diff --git a/third_party/libwebrtc/pc/data_channel_controller.cc b/third_party/libwebrtc/pc/data_channel_controller.cc new file mode 100644 index 0000000000..93599fdba9 --- /dev/null +++ b/third_party/libwebrtc/pc/data_channel_controller.cc @@ -0,0 +1,440 @@ +/* + * Copyright 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "pc/data_channel_controller.h" + +#include <utility> + +#include "absl/algorithm/container.h" +#include "api/peer_connection_interface.h" +#include "api/rtc_error.h" +#include "pc/peer_connection_internal.h" +#include "pc/sctp_utils.h" +#include "rtc_base/logging.h" + +namespace webrtc { + +DataChannelController::~DataChannelController() { + RTC_DCHECK(sctp_data_channels_n_.empty()) + << "Missing call to TeardownDataChannelTransport_n?"; + RTC_DCHECK(!signaling_safety_.flag()->alive()) + << "Missing call to PrepareForShutdown?"; +} + +bool DataChannelController::HasDataChannels() const { + RTC_DCHECK_RUN_ON(signaling_thread()); + return channel_usage_ == DataChannelUsage::kInUse; +} + +bool DataChannelController::HasUsedDataChannels() const { + RTC_DCHECK_RUN_ON(signaling_thread()); + return channel_usage_ != DataChannelUsage::kNeverUsed; +} + +RTCError DataChannelController::SendData( + StreamId sid, + const SendDataParams& params, + const rtc::CopyOnWriteBuffer& payload) { + RTC_DCHECK_RUN_ON(network_thread()); + if (!data_channel_transport_) { + RTC_LOG(LS_ERROR) << "SendData called before transport is ready"; + return RTCError(RTCErrorType::INVALID_STATE); + } + return data_channel_transport_->SendData(sid.stream_id_int(), params, + payload); +} + +void DataChannelController::AddSctpDataStream(StreamId sid) { + RTC_DCHECK_RUN_ON(network_thread()); + RTC_DCHECK(sid.HasValue()); + if (data_channel_transport_) { + data_channel_transport_->OpenChannel(sid.stream_id_int()); + } +} + +void DataChannelController::RemoveSctpDataStream(StreamId sid) { + RTC_DCHECK_RUN_ON(network_thread()); + if (data_channel_transport_) { + data_channel_transport_->CloseChannel(sid.stream_id_int()); + } +} + +void DataChannelController::OnChannelStateChanged( + SctpDataChannel* channel, + DataChannelInterface::DataState state) { + RTC_DCHECK_RUN_ON(network_thread()); + + // Stash away the internal id here in case `OnSctpDataChannelClosed` ends up + // releasing the last reference to the channel. + const int channel_id = channel->internal_id(); + + if (state == DataChannelInterface::DataState::kClosed) + OnSctpDataChannelClosed(channel); + + DataChannelUsage channel_usage = sctp_data_channels_n_.empty() + ? DataChannelUsage::kHaveBeenUsed + : DataChannelUsage::kInUse; + signaling_thread()->PostTask(SafeTask( + signaling_safety_.flag(), [this, channel_id, state, channel_usage] { + RTC_DCHECK_RUN_ON(signaling_thread()); + channel_usage_ = channel_usage; + pc_->OnSctpDataChannelStateChanged(channel_id, state); + })); +} + +void DataChannelController::OnDataReceived( + int channel_id, + DataMessageType type, + const rtc::CopyOnWriteBuffer& buffer) { + RTC_DCHECK_RUN_ON(network_thread()); + + if (HandleOpenMessage_n(channel_id, type, buffer)) + return; + + auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) { + return c->sid_n().stream_id_int() == channel_id; + }); + + if (it != sctp_data_channels_n_.end()) + (*it)->OnDataReceived(type, buffer); +} + +void DataChannelController::OnChannelClosing(int channel_id) { + RTC_DCHECK_RUN_ON(network_thread()); + auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) { + return c->sid_n().stream_id_int() == channel_id; + }); + + if (it != sctp_data_channels_n_.end()) + (*it)->OnClosingProcedureStartedRemotely(); +} + +void DataChannelController::OnChannelClosed(int channel_id) { + RTC_DCHECK_RUN_ON(network_thread()); + StreamId sid(channel_id); + sid_allocator_.ReleaseSid(sid); + auto it = absl::c_find_if(sctp_data_channels_n_, + [&](const auto& c) { return c->sid_n() == sid; }); + + if (it != sctp_data_channels_n_.end()) { + rtc::scoped_refptr<SctpDataChannel> channel = std::move(*it); + sctp_data_channels_n_.erase(it); + channel->OnClosingProcedureComplete(); + } +} + +void DataChannelController::OnReadyToSend() { + RTC_DCHECK_RUN_ON(network_thread()); + auto copy = sctp_data_channels_n_; + for (const auto& channel : copy) { + if (channel->sid_n().HasValue()) { + channel->OnTransportReady(); + } else { + // This happens for role==SSL_SERVER channels when we get notified by + // the transport *before* the SDP code calls `AllocateSctpSids` to + // trigger assignment of sids. In this case OnTransportReady() will be + // called from within `AllocateSctpSids` below. + RTC_LOG(LS_INFO) << "OnReadyToSend: Still waiting for an id for channel."; + } + } +} + +void DataChannelController::OnTransportClosed(RTCError error) { + RTC_DCHECK_RUN_ON(network_thread()); + + // This loop will close all data channels and trigger a callback to + // `OnSctpDataChannelClosed`. We'll empty `sctp_data_channels_n_`, first + // and `OnSctpDataChannelClosed` will become a noop but we'll release the + // StreamId here. + std::vector<rtc::scoped_refptr<SctpDataChannel>> temp_sctp_dcs; + temp_sctp_dcs.swap(sctp_data_channels_n_); + for (const auto& channel : temp_sctp_dcs) { + channel->OnTransportChannelClosed(error); + sid_allocator_.ReleaseSid(channel->sid_n()); + } +} + +void DataChannelController::SetupDataChannelTransport_n( + DataChannelTransportInterface* transport) { + RTC_DCHECK_RUN_ON(network_thread()); + RTC_DCHECK(transport); + set_data_channel_transport(transport); +} + +void DataChannelController::PrepareForShutdown() { + RTC_DCHECK_RUN_ON(signaling_thread()); + signaling_safety_.reset(PendingTaskSafetyFlag::CreateDetachedInactive()); + if (channel_usage_ != DataChannelUsage::kNeverUsed) + channel_usage_ = DataChannelUsage::kHaveBeenUsed; +} + +void DataChannelController::TeardownDataChannelTransport_n(RTCError error) { + RTC_DCHECK_RUN_ON(network_thread()); + OnTransportClosed(error); + set_data_channel_transport(nullptr); + RTC_DCHECK(sctp_data_channels_n_.empty()); + weak_factory_.InvalidateWeakPtrs(); +} + +void DataChannelController::OnTransportChanged( + DataChannelTransportInterface* new_data_channel_transport) { + RTC_DCHECK_RUN_ON(network_thread()); + if (data_channel_transport_ && + data_channel_transport_ != new_data_channel_transport) { + // Changed which data channel transport is used for `sctp_mid_` (eg. now + // it's bundled). + set_data_channel_transport(new_data_channel_transport); + } +} + +std::vector<DataChannelStats> DataChannelController::GetDataChannelStats() + const { + RTC_DCHECK_RUN_ON(network_thread()); + std::vector<DataChannelStats> stats; + stats.reserve(sctp_data_channels_n_.size()); + for (const auto& channel : sctp_data_channels_n_) + stats.push_back(channel->GetStats()); + return stats; +} + +bool DataChannelController::HandleOpenMessage_n( + int channel_id, + DataMessageType type, + const rtc::CopyOnWriteBuffer& buffer) { + if (type != DataMessageType::kControl || !IsOpenMessage(buffer)) + return false; + + // Received OPEN message; parse and signal that a new data channel should + // be created. + std::string label; + InternalDataChannelInit config; + config.id = channel_id; + if (!ParseDataChannelOpenMessage(buffer, &label, &config)) { + RTC_LOG(LS_WARNING) << "Failed to parse the OPEN message for sid " + << channel_id; + } else { + config.open_handshake_role = InternalDataChannelInit::kAcker; + auto channel_or_error = CreateDataChannel(label, config); + if (channel_or_error.ok()) { + signaling_thread()->PostTask(SafeTask( + signaling_safety_.flag(), + [this, channel = channel_or_error.MoveValue(), + ready_to_send = data_channel_transport_->IsReadyToSend()] { + RTC_DCHECK_RUN_ON(signaling_thread()); + OnDataChannelOpenMessage(std::move(channel), ready_to_send); + })); + } else { + RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message." + << ToString(channel_or_error.error().type()); + } + } + return true; +} + +void DataChannelController::OnDataChannelOpenMessage( + rtc::scoped_refptr<SctpDataChannel> channel, + bool ready_to_send) { + channel_usage_ = DataChannelUsage::kInUse; + auto proxy = SctpDataChannel::CreateProxy(channel, signaling_safety_.flag()); + + pc_->Observer()->OnDataChannel(proxy); + pc_->NoteDataAddedEvent(); + + if (ready_to_send) { + network_thread()->PostTask([channel = std::move(channel)] { + if (channel->state() != DataChannelInterface::DataState::kClosed) + channel->OnTransportReady(); + }); + } +} + +// RTC_RUN_ON(network_thread()) +RTCError DataChannelController::ReserveOrAllocateSid( + StreamId& sid, + absl::optional<rtc::SSLRole> fallback_ssl_role) { + if (sid.HasValue()) { + return sid_allocator_.ReserveSid(sid) + ? RTCError::OK() + : RTCError(RTCErrorType::INVALID_RANGE, + "StreamId out of range or reserved."); + } + + // Attempt to allocate an ID based on the negotiated role. + absl::optional<rtc::SSLRole> role = pc_->GetSctpSslRole_n(); + if (!role) + role = fallback_ssl_role; + if (role) { + sid = sid_allocator_.AllocateSid(*role); + if (!sid.HasValue()) + return RTCError(RTCErrorType::RESOURCE_EXHAUSTED); + } + // When we get here, we may still not have an ID, but that's a supported case + // whereby an id will be assigned later. + RTC_DCHECK(sid.HasValue() || !role); + return RTCError::OK(); +} + +// RTC_RUN_ON(network_thread()) +RTCErrorOr<rtc::scoped_refptr<SctpDataChannel>> +DataChannelController::CreateDataChannel(const std::string& label, + InternalDataChannelInit& config) { + StreamId sid(config.id); + RTCError err = ReserveOrAllocateSid(sid, config.fallback_ssl_role); + if (!err.ok()) + return err; + + // In case `sid` has changed. Update `config` accordingly. + config.id = sid.stream_id_int(); + + rtc::scoped_refptr<SctpDataChannel> channel = SctpDataChannel::Create( + weak_factory_.GetWeakPtr(), label, data_channel_transport_ != nullptr, + config, signaling_thread(), network_thread()); + RTC_DCHECK(channel); + sctp_data_channels_n_.push_back(channel); + + // If we have an id already, notify the transport. + if (sid.HasValue()) + AddSctpDataStream(sid); + + return channel; +} + +RTCErrorOr<rtc::scoped_refptr<DataChannelInterface>> +DataChannelController::InternalCreateDataChannelWithProxy( + const std::string& label, + const InternalDataChannelInit& config) { + RTC_DCHECK_RUN_ON(signaling_thread()); + RTC_DCHECK(!pc_->IsClosed()); + if (!config.IsValid()) { + LOG_AND_RETURN_ERROR(RTCErrorType::INVALID_PARAMETER, + "Invalid DataChannelInit"); + } + + bool ready_to_send = false; + InternalDataChannelInit new_config = config; + StreamId sid(new_config.id); + auto ret = network_thread()->BlockingCall( + [&]() -> RTCErrorOr<rtc::scoped_refptr<SctpDataChannel>> { + RTC_DCHECK_RUN_ON(network_thread()); + auto channel = CreateDataChannel(label, new_config); + if (!channel.ok()) + return channel; + ready_to_send = + data_channel_transport_ && data_channel_transport_->IsReadyToSend(); + if (ready_to_send) { + // If the transport is ready to send because the initial channel + // ready signal may have been sent before the DataChannel creation. + // This has to be done async because the upper layer objects (e.g. + // Chrome glue and WebKit) are not wired up properly until after + // `InternalCreateDataChannelWithProxy` returns. + network_thread()->PostTask([channel = channel.value()] { + if (channel->state() != DataChannelInterface::DataState::kClosed) + channel->OnTransportReady(); + }); + } + + return channel; + }); + + if (!ret.ok()) + return ret.MoveError(); + + channel_usage_ = DataChannelUsage::kInUse; + return SctpDataChannel::CreateProxy(ret.MoveValue(), + signaling_safety_.flag()); +} + +void DataChannelController::AllocateSctpSids(rtc::SSLRole role) { + RTC_DCHECK_RUN_ON(network_thread()); + + const bool ready_to_send = + data_channel_transport_ && data_channel_transport_->IsReadyToSend(); + + std::vector<std::pair<SctpDataChannel*, StreamId>> channels_to_update; + std::vector<rtc::scoped_refptr<SctpDataChannel>> channels_to_close; + for (auto it = sctp_data_channels_n_.begin(); + it != sctp_data_channels_n_.end();) { + if (!(*it)->sid_n().HasValue()) { + StreamId sid = sid_allocator_.AllocateSid(role); + if (sid.HasValue()) { + (*it)->SetSctpSid_n(sid); + AddSctpDataStream(sid); + if (ready_to_send) { + RTC_LOG(LS_INFO) << "AllocateSctpSids: Id assigned, ready to send."; + (*it)->OnTransportReady(); + } + channels_to_update.push_back(std::make_pair((*it).get(), sid)); + } else { + channels_to_close.push_back(std::move(*it)); + it = sctp_data_channels_n_.erase(it); + continue; + } + } + ++it; + } + + // Since closing modifies the list of channels, we have to do the actual + // closing outside the loop. + for (const auto& channel : channels_to_close) { + channel->CloseAbruptlyWithDataChannelFailure("Failed to allocate SCTP SID"); + } +} + +void DataChannelController::OnSctpDataChannelClosed(SctpDataChannel* channel) { + RTC_DCHECK_RUN_ON(network_thread()); + // After the closing procedure is done, it's safe to use this ID for + // another data channel. + if (channel->sid_n().HasValue()) { + sid_allocator_.ReleaseSid(channel->sid_n()); + } + auto it = absl::c_find_if(sctp_data_channels_n_, + [&](const auto& c) { return c.get() == channel; }); + if (it != sctp_data_channels_n_.end()) + sctp_data_channels_n_.erase(it); +} + +void DataChannelController::set_data_channel_transport( + DataChannelTransportInterface* transport) { + RTC_DCHECK_RUN_ON(network_thread()); + + if (data_channel_transport_) + data_channel_transport_->SetDataSink(nullptr); + + data_channel_transport_ = transport; + + if (data_channel_transport_) { + // There's a new data channel transport. This needs to be signaled to the + // `sctp_data_channels_n_` so that they can reopen and reconnect. This is + // necessary when bundling is applied. + NotifyDataChannelsOfTransportCreated(); + data_channel_transport_->SetDataSink(this); + } +} + +void DataChannelController::NotifyDataChannelsOfTransportCreated() { + RTC_DCHECK_RUN_ON(network_thread()); + RTC_DCHECK(data_channel_transport_); + + for (const auto& channel : sctp_data_channels_n_) { + if (channel->sid_n().HasValue()) + AddSctpDataStream(channel->sid_n()); + channel->OnTransportChannelCreated(); + } +} + +rtc::Thread* DataChannelController::network_thread() const { + return pc_->network_thread(); +} + +rtc::Thread* DataChannelController::signaling_thread() const { + return pc_->signaling_thread(); +} + +} // namespace webrtc |