summaryrefslogtreecommitdiffstats
path: root/third_party/libwebrtc/pc/data_channel_controller.cc
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/libwebrtc/pc/data_channel_controller.cc')
-rw-r--r--third_party/libwebrtc/pc/data_channel_controller.cc425
1 files changed, 425 insertions, 0 deletions
diff --git a/third_party/libwebrtc/pc/data_channel_controller.cc b/third_party/libwebrtc/pc/data_channel_controller.cc
new file mode 100644
index 0000000000..36f11fe12e
--- /dev/null
+++ b/third_party/libwebrtc/pc/data_channel_controller.cc
@@ -0,0 +1,425 @@
+/*
+ * Copyright 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "pc/data_channel_controller.h"
+
+#include <utility>
+
+#include "api/peer_connection_interface.h"
+#include "api/rtc_error.h"
+#include "pc/peer_connection_internal.h"
+#include "pc/sctp_utils.h"
+#include "rtc_base/logging.h"
+
+namespace webrtc {
+
+DataChannelController::~DataChannelController() {
+ // Since channels may have multiple owners, we cannot guarantee that
+ // they will be deallocated before destroying the controller.
+ // Therefore, detach them from the controller.
+ for (auto channel : sctp_data_channels_) {
+ channel->DetachFromController();
+ }
+}
+
+bool DataChannelController::HasDataChannels() const {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ return !sctp_data_channels_.empty();
+}
+
+bool DataChannelController::SendData(int sid,
+ const SendDataParams& params,
+ const rtc::CopyOnWriteBuffer& payload,
+ cricket::SendDataResult* result) {
+ if (data_channel_transport())
+ return DataChannelSendData(sid, params, payload, result);
+ RTC_LOG(LS_ERROR) << "SendData called before transport is ready";
+ return false;
+}
+
+bool DataChannelController::ConnectDataChannel(
+ SctpDataChannel* webrtc_data_channel) {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ if (!data_channel_transport()) {
+ // Don't log an error here, because DataChannels are expected to call
+ // ConnectDataChannel in this state. It's the only way to initially tell
+ // whether or not the underlying transport is ready.
+ return false;
+ }
+ SignalDataChannelTransportWritable_s.connect(
+ webrtc_data_channel, &SctpDataChannel::OnTransportReady);
+ SignalDataChannelTransportReceivedData_s.connect(
+ webrtc_data_channel, &SctpDataChannel::OnDataReceived);
+ SignalDataChannelTransportChannelClosing_s.connect(
+ webrtc_data_channel, &SctpDataChannel::OnClosingProcedureStartedRemotely);
+ SignalDataChannelTransportChannelClosed_s.connect(
+ webrtc_data_channel, &SctpDataChannel::OnClosingProcedureComplete);
+ return true;
+}
+
+void DataChannelController::DisconnectDataChannel(
+ SctpDataChannel* webrtc_data_channel) {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ if (!data_channel_transport()) {
+ RTC_LOG(LS_ERROR)
+ << "DisconnectDataChannel called when sctp_transport_ is NULL.";
+ return;
+ }
+ SignalDataChannelTransportWritable_s.disconnect(webrtc_data_channel);
+ SignalDataChannelTransportReceivedData_s.disconnect(webrtc_data_channel);
+ SignalDataChannelTransportChannelClosing_s.disconnect(webrtc_data_channel);
+ SignalDataChannelTransportChannelClosed_s.disconnect(webrtc_data_channel);
+}
+
+void DataChannelController::AddSctpDataStream(int sid) {
+ if (data_channel_transport()) {
+ network_thread()->BlockingCall([this, sid] {
+ if (data_channel_transport()) {
+ data_channel_transport()->OpenChannel(sid);
+ }
+ });
+ }
+}
+
+void DataChannelController::RemoveSctpDataStream(int sid) {
+ if (data_channel_transport()) {
+ network_thread()->BlockingCall([this, sid] {
+ if (data_channel_transport()) {
+ data_channel_transport()->CloseChannel(sid);
+ }
+ });
+ }
+}
+
+bool DataChannelController::ReadyToSendData() const {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ return (data_channel_transport() && data_channel_transport_ready_to_send_);
+}
+
+void DataChannelController::OnDataReceived(
+ int channel_id,
+ DataMessageType type,
+ const rtc::CopyOnWriteBuffer& buffer) {
+ RTC_DCHECK_RUN_ON(network_thread());
+ cricket::ReceiveDataParams params;
+ params.sid = channel_id;
+ params.type = type;
+ signaling_thread()->PostTask(
+ [self = weak_factory_.GetWeakPtr(), params, buffer] {
+ if (self) {
+ RTC_DCHECK_RUN_ON(self->signaling_thread());
+ // TODO(bugs.webrtc.org/11547): The data being received should be
+ // delivered on the network thread. The way HandleOpenMessage_s works
+ // right now is that it's called for all types of buffers and operates
+ // as a selector function. Change this so that it's only called for
+ // buffers that it should be able to handle. Once we do that, we can
+ // deliver all other buffers on the network thread (change
+ // SignalDataChannelTransportReceivedData_s to
+ // SignalDataChannelTransportReceivedData_n).
+ if (!self->HandleOpenMessage_s(params, buffer)) {
+ self->SignalDataChannelTransportReceivedData_s(params, buffer);
+ }
+ }
+ });
+}
+
+void DataChannelController::OnChannelClosing(int channel_id) {
+ RTC_DCHECK_RUN_ON(network_thread());
+ signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr(), channel_id] {
+ if (self) {
+ RTC_DCHECK_RUN_ON(self->signaling_thread());
+ self->SignalDataChannelTransportChannelClosing_s(channel_id);
+ }
+ });
+}
+
+void DataChannelController::OnChannelClosed(int channel_id) {
+ RTC_DCHECK_RUN_ON(network_thread());
+ signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr(), channel_id] {
+ if (self) {
+ RTC_DCHECK_RUN_ON(self->signaling_thread());
+ self->SignalDataChannelTransportChannelClosed_s(channel_id);
+ }
+ });
+}
+
+void DataChannelController::OnReadyToSend() {
+ RTC_DCHECK_RUN_ON(network_thread());
+ signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr()] {
+ if (self) {
+ RTC_DCHECK_RUN_ON(self->signaling_thread());
+ self->data_channel_transport_ready_to_send_ = true;
+ self->SignalDataChannelTransportWritable_s(
+ self->data_channel_transport_ready_to_send_);
+ }
+ });
+}
+
+void DataChannelController::OnTransportClosed(RTCError error) {
+ RTC_DCHECK_RUN_ON(network_thread());
+ signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr(), error] {
+ if (self) {
+ RTC_DCHECK_RUN_ON(self->signaling_thread());
+ self->OnTransportChannelClosed(error);
+ }
+ });
+}
+
+void DataChannelController::SetupDataChannelTransport_n() {
+ RTC_DCHECK_RUN_ON(network_thread());
+
+ // There's a new data channel transport. This needs to be signaled to the
+ // `sctp_data_channels_` so that they can reopen and reconnect. This is
+ // necessary when bundling is applied.
+ NotifyDataChannelsOfTransportCreated();
+}
+
+void DataChannelController::TeardownDataChannelTransport_n() {
+ RTC_DCHECK_RUN_ON(network_thread());
+ if (data_channel_transport()) {
+ data_channel_transport()->SetDataSink(nullptr);
+ }
+ set_data_channel_transport(nullptr);
+}
+
+void DataChannelController::OnTransportChanged(
+ DataChannelTransportInterface* new_data_channel_transport) {
+ RTC_DCHECK_RUN_ON(network_thread());
+ if (data_channel_transport() &&
+ data_channel_transport() != new_data_channel_transport) {
+ // Changed which data channel transport is used for `sctp_mid_` (eg. now
+ // it's bundled).
+ data_channel_transport()->SetDataSink(nullptr);
+ set_data_channel_transport(new_data_channel_transport);
+ if (new_data_channel_transport) {
+ new_data_channel_transport->SetDataSink(this);
+
+ // There's a new data channel transport. This needs to be signaled to the
+ // `sctp_data_channels_` so that they can reopen and reconnect. This is
+ // necessary when bundling is applied.
+ NotifyDataChannelsOfTransportCreated();
+ }
+ }
+}
+
+std::vector<DataChannelStats> DataChannelController::GetDataChannelStats()
+ const {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ std::vector<DataChannelStats> stats;
+ stats.reserve(sctp_data_channels_.size());
+ for (const auto& channel : sctp_data_channels_)
+ stats.push_back(channel->GetStats());
+ return stats;
+}
+
+bool DataChannelController::HandleOpenMessage_s(
+ const cricket::ReceiveDataParams& params,
+ const rtc::CopyOnWriteBuffer& buffer) {
+ if (params.type == DataMessageType::kControl && IsOpenMessage(buffer)) {
+ // Received OPEN message; parse and signal that a new data channel should
+ // be created.
+ std::string label;
+ InternalDataChannelInit config;
+ config.id = params.sid;
+ if (!ParseDataChannelOpenMessage(buffer, &label, &config)) {
+ RTC_LOG(LS_WARNING) << "Failed to parse the OPEN message for sid "
+ << params.sid;
+ return true;
+ }
+ config.open_handshake_role = InternalDataChannelInit::kAcker;
+ OnDataChannelOpenMessage(label, config);
+ return true;
+ }
+ return false;
+}
+
+void DataChannelController::OnDataChannelOpenMessage(
+ const std::string& label,
+ const InternalDataChannelInit& config) {
+ rtc::scoped_refptr<DataChannelInterface> channel(
+ InternalCreateDataChannelWithProxy(label, &config));
+ if (!channel.get()) {
+ RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message.";
+ return;
+ }
+
+ pc_->Observer()->OnDataChannel(std::move(channel));
+ pc_->NoteDataAddedEvent();
+}
+
+rtc::scoped_refptr<DataChannelInterface>
+DataChannelController::InternalCreateDataChannelWithProxy(
+ const std::string& label,
+ const InternalDataChannelInit* config) {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ if (pc_->IsClosed()) {
+ return nullptr;
+ }
+
+ rtc::scoped_refptr<SctpDataChannel> channel =
+ InternalCreateSctpDataChannel(label, config);
+ if (channel) {
+ return SctpDataChannel::CreateProxy(channel);
+ }
+
+ return nullptr;
+}
+
+rtc::scoped_refptr<SctpDataChannel>
+DataChannelController::InternalCreateSctpDataChannel(
+ const std::string& label,
+ const InternalDataChannelInit* config) {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ InternalDataChannelInit new_config =
+ config ? (*config) : InternalDataChannelInit();
+ if (new_config.id < 0) {
+ rtc::SSLRole role;
+ if ((pc_->GetSctpSslRole(&role)) &&
+ !sid_allocator_.AllocateSid(role, &new_config.id)) {
+ RTC_LOG(LS_ERROR) << "No id can be allocated for the SCTP data channel.";
+ return nullptr;
+ }
+ } else if (!sid_allocator_.ReserveSid(new_config.id)) {
+ RTC_LOG(LS_ERROR) << "Failed to create a SCTP data channel "
+ "because the id is already in use or out of range.";
+ return nullptr;
+ }
+ rtc::scoped_refptr<SctpDataChannel> channel(SctpDataChannel::Create(
+ this, label, new_config, signaling_thread(), network_thread()));
+ if (!channel) {
+ sid_allocator_.ReleaseSid(new_config.id);
+ return nullptr;
+ }
+ sctp_data_channels_.push_back(channel);
+ channel->SignalClosed.connect(
+ pc_, &PeerConnectionInternal::OnSctpDataChannelClosed);
+ SignalSctpDataChannelCreated_(channel.get());
+ return channel;
+}
+
+void DataChannelController::AllocateSctpSids(rtc::SSLRole role) {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ std::vector<rtc::scoped_refptr<SctpDataChannel>> channels_to_close;
+ for (const auto& channel : sctp_data_channels_) {
+ if (channel->id() < 0) {
+ int sid;
+ if (!sid_allocator_.AllocateSid(role, &sid)) {
+ RTC_LOG(LS_ERROR) << "Failed to allocate SCTP sid, closing channel.";
+ channels_to_close.push_back(channel);
+ continue;
+ }
+ channel->SetSctpSid(sid);
+ }
+ }
+ // Since closing modifies the list of channels, we have to do the actual
+ // closing outside the loop.
+ for (const auto& channel : channels_to_close) {
+ channel->CloseAbruptlyWithDataChannelFailure("Failed to allocate SCTP SID");
+ }
+}
+
+void DataChannelController::OnSctpDataChannelClosed(SctpDataChannel* channel) {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ for (auto it = sctp_data_channels_.begin(); it != sctp_data_channels_.end();
+ ++it) {
+ if (it->get() == channel) {
+ if (channel->id() >= 0) {
+ // After the closing procedure is done, it's safe to use this ID for
+ // another data channel.
+ sid_allocator_.ReleaseSid(channel->id());
+ }
+ // Since this method is triggered by a signal from the DataChannel,
+ // we can't free it directly here; we need to free it asynchronously.
+ sctp_data_channels_to_free_.push_back(*it);
+ sctp_data_channels_.erase(it);
+ signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr()] {
+ if (self) {
+ RTC_DCHECK_RUN_ON(self->signaling_thread());
+ self->sctp_data_channels_to_free_.clear();
+ }
+ });
+ return;
+ }
+ }
+}
+
+void DataChannelController::OnTransportChannelClosed(RTCError error) {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ // Use a temporary copy of the SCTP DataChannel list because the
+ // DataChannel may callback to us and try to modify the list.
+ std::vector<rtc::scoped_refptr<SctpDataChannel>> temp_sctp_dcs;
+ temp_sctp_dcs.swap(sctp_data_channels_);
+ for (const auto& channel : temp_sctp_dcs) {
+ channel->OnTransportChannelClosed(error);
+ }
+}
+
+DataChannelTransportInterface* DataChannelController::data_channel_transport()
+ const {
+ // TODO(bugs.webrtc.org/11547): Only allow this accessor to be called on the
+ // network thread.
+ // RTC_DCHECK_RUN_ON(network_thread());
+ return data_channel_transport_;
+}
+
+void DataChannelController::set_data_channel_transport(
+ DataChannelTransportInterface* transport) {
+ RTC_DCHECK_RUN_ON(network_thread());
+ data_channel_transport_ = transport;
+}
+
+bool DataChannelController::DataChannelSendData(
+ int sid,
+ const SendDataParams& params,
+ const rtc::CopyOnWriteBuffer& payload,
+ cricket::SendDataResult* result) {
+ // TODO(bugs.webrtc.org/11547): Expect method to be called on the network
+ // thread instead. Remove the BlockingCall() below and move assocated state to
+ // the network thread.
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ RTC_DCHECK(data_channel_transport());
+
+ RTCError error = network_thread()->BlockingCall([this, sid, params, payload] {
+ return data_channel_transport()->SendData(sid, params, payload);
+ });
+
+ if (error.ok()) {
+ *result = cricket::SendDataResult::SDR_SUCCESS;
+ return true;
+ } else if (error.type() == RTCErrorType::RESOURCE_EXHAUSTED) {
+ // SCTP transport uses RESOURCE_EXHAUSTED when it's blocked.
+ // TODO(mellem): Stop using RTCError here and get rid of the mapping.
+ *result = cricket::SendDataResult::SDR_BLOCK;
+ return false;
+ }
+ *result = cricket::SendDataResult::SDR_ERROR;
+ return false;
+}
+
+void DataChannelController::NotifyDataChannelsOfTransportCreated() {
+ RTC_DCHECK_RUN_ON(network_thread());
+ signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr()] {
+ if (self) {
+ RTC_DCHECK_RUN_ON(self->signaling_thread());
+ for (const auto& channel : self->sctp_data_channels_) {
+ channel->OnTransportChannelCreated();
+ }
+ }
+ });
+}
+
+rtc::Thread* DataChannelController::network_thread() const {
+ return pc_->network_thread();
+}
+rtc::Thread* DataChannelController::signaling_thread() const {
+ return pc_->signaling_thread();
+}
+
+} // namespace webrtc