diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 00:47:55 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 00:47:55 +0000 |
commit | 26a029d407be480d791972afb5975cf62c9360a6 (patch) | |
tree | f435a8308119effd964b339f76abb83a57c29483 /netwerk/sctp/datachannel/DataChannel.cpp | |
parent | Initial commit. (diff) | |
download | firefox-upstream/124.0.1.tar.xz firefox-upstream/124.0.1.zip |
Adding upstream version 124.0.1.upstream/124.0.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'netwerk/sctp/datachannel/DataChannel.cpp')
-rw-r--r-- | netwerk/sctp/datachannel/DataChannel.cpp | 3548 |
1 files changed, 3548 insertions, 0 deletions
diff --git a/netwerk/sctp/datachannel/DataChannel.cpp b/netwerk/sctp/datachannel/DataChannel.cpp new file mode 100644 index 0000000000..8af9a558b8 --- /dev/null +++ b/netwerk/sctp/datachannel/DataChannel.cpp @@ -0,0 +1,3548 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* vim: set ts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this file, + * You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include <algorithm> +#include <stdio.h> +#include <stdlib.h> +#if !defined(__Userspace_os_Windows) +# include <arpa/inet.h> +#endif +// usrsctp.h expects to have errno definitions prior to its inclusion. +#include <errno.h> + +#define SCTP_DEBUG 1 +#define SCTP_STDINT_INCLUDE <stdint.h> + +#ifdef _MSC_VER +// Disable "warning C4200: nonstandard extension used : zero-sized array in +// struct/union" +// ...which the third-party file usrsctp.h runs afoul of. +# pragma warning(push) +# pragma warning(disable : 4200) +#endif + +#include "usrsctp.h" + +#ifdef _MSC_VER +# pragma warning(pop) +#endif + +#include "nsServiceManagerUtils.h" +#include "nsIInputStream.h" +#include "nsIPrefBranch.h" +#include "nsIPrefService.h" +#include "mozilla/Services.h" +#include "mozilla/Sprintf.h" +#include "nsProxyRelease.h" +#include "nsThread.h" +#include "nsThreadUtils.h" +#include "nsNetUtil.h" +#include "nsNetCID.h" +#include "mozilla/RandomNum.h" +#include "mozilla/StaticMutex.h" +#include "mozilla/UniquePtrExtensions.h" +#include "mozilla/Unused.h" +#include "mozilla/dom/RTCDataChannelBinding.h" +#include "mozilla/dom/RTCStatsReportBinding.h" +#include "mozilla/media/MediaUtils.h" +#ifdef MOZ_PEERCONNECTION +# include "transport/runnable_utils.h" +# include "jsapi/MediaTransportHandler.h" +# include "mediapacket.h" +#endif + +#include "DataChannel.h" +#include "DataChannelProtocol.h" + +// Let us turn on and off important assertions in non-debug builds +#ifdef DEBUG +# define ASSERT_WEBRTC(x) MOZ_ASSERT((x)) +#elif defined(MOZ_WEBRTC_ASSERT_ALWAYS) +# define ASSERT_WEBRTC(x) \ + do { \ + if (!(x)) { \ + MOZ_CRASH(); \ + } \ + } while (0) +#endif + +namespace mozilla { + +LazyLogModule gDataChannelLog("DataChannel"); +static LazyLogModule gSCTPLog("SCTP"); + +#define SCTP_LOG(args) \ + MOZ_LOG(mozilla::gSCTPLog, mozilla::LogLevel::Debug, args) + +static void debug_printf(const char* format, ...) { + va_list ap; + char buffer[1024]; + + if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) { + va_start(ap, format); +#ifdef _WIN32 + if (vsnprintf_s(buffer, sizeof(buffer), _TRUNCATE, format, ap) > 0) { +#else + if (VsprintfLiteral(buffer, format, ap) > 0) { +#endif + SCTP_LOG(("%s", buffer)); + } + va_end(ap); + } +} + +static constexpr const char* ToString(DataChannelState state) { + switch (state) { + case DataChannelState::Connecting: + return "CONNECTING"; + case DataChannelState::Open: + return "OPEN"; + case DataChannelState::Closing: + return "CLOSING"; + case DataChannelState::Closed: + return "CLOSED"; + } + return ""; +}; + +static constexpr const char* ToString(DataChannelConnectionState state) { + switch (state) { + case DataChannelConnectionState::Connecting: + return "CONNECTING"; + case DataChannelConnectionState::Open: + return "OPEN"; + case DataChannelConnectionState::Closed: + return "CLOSED"; + } + return ""; +}; + +static constexpr const char* ToString( + DataChannelOnMessageAvailable::EventType type) { + switch (type) { + case DataChannelOnMessageAvailable::EventType::OnConnection: + return "ON_CONNECTION"; + case DataChannelOnMessageAvailable::EventType::OnDisconnected: + return "ON_DISCONNECTED"; + case DataChannelOnMessageAvailable::EventType::OnChannelCreated: + return "ON_CHANNEL_CREATED"; + case DataChannelOnMessageAvailable::EventType::OnDataString: + return "ON_DATA_STRING"; + case DataChannelOnMessageAvailable::EventType::OnDataBinary: + return "ON_DATA_BINARY"; + } + return ""; +}; + +static constexpr const char* ToString(DataChannelConnection::PendingType type) { + switch (type) { + case DataChannelConnection::PendingType::None: + return "NONE"; + case DataChannelConnection::PendingType::Dcep: + return "DCEP"; + case DataChannelConnection::PendingType::Data: + return "DATA"; + } + return ""; +}; + +static constexpr const char* ToString(DataChannelReliabilityPolicy type) { + switch (type) { + case DataChannelReliabilityPolicy::Reliable: + return "RELIABLE"; + case DataChannelReliabilityPolicy::LimitedRetransmissions: + return "LIMITED_RETRANSMISSIONS"; + case DataChannelReliabilityPolicy::LimitedLifetime: + return "LIMITED_LIFETIME"; + } + return ""; +}; + +static constexpr uint16_t ToUsrsctpValue(DataChannelReliabilityPolicy type) { + switch (type) { + case DataChannelReliabilityPolicy::Reliable: + return SCTP_PR_SCTP_NONE; + case DataChannelReliabilityPolicy::LimitedRetransmissions: + return SCTP_PR_SCTP_RTX; + case DataChannelReliabilityPolicy::LimitedLifetime: + return SCTP_PR_SCTP_TTL; + } + return SCTP_PR_SCTP_NONE; +}; + +class DataChannelRegistry { + public: + NS_INLINE_DECL_THREADSAFE_REFCOUNTING(DataChannelRegistry) + + static uintptr_t Register(DataChannelConnection* aConnection) { + StaticMutexAutoLock lock(sInstanceMutex); + uintptr_t result = EnsureInstance()->RegisterImpl(aConnection); + DC_DEBUG( + ("Registering connection %p as ulp %p", aConnection, (void*)result)); + return result; + } + + static void Deregister(uintptr_t aId) { + RefPtr<DataChannelRegistry> maybeTrash; + + { + StaticMutexAutoLock lock(sInstanceMutex); + DC_DEBUG(("Deregistering connection ulp = %p", (void*)aId)); + if (NS_WARN_IF(!Instance())) { + return; + } + Instance()->DeregisterImpl(aId); + if (Instance()->Empty()) { + // Unset singleton inside mutex lock, but don't call Shutdown until we + // unlock, since that involves calling into libusrsctp, which invites + // deadlock. + maybeTrash = Instance().forget(); + } + } + } + + static RefPtr<DataChannelConnection> Lookup(uintptr_t aId) { + StaticMutexAutoLock lock(sInstanceMutex); + if (NS_WARN_IF(!Instance())) { + return nullptr; + } + return Instance()->LookupImpl(aId); + } + + private: + // This is a singleton class, so don't let just anyone create one of these + DataChannelRegistry() { + ASSERT_WEBRTC(NS_IsMainThread()); + mShutdownBlocker = media::ShutdownBlockingTicket::Create( + u"DataChannelRegistry::mShutdownBlocker"_ns, + NS_LITERAL_STRING_FROM_CSTRING(__FILE__), __LINE__); + InitUsrSctp(); + } + + static RefPtr<DataChannelRegistry>& Instance() { + static RefPtr<DataChannelRegistry> sRegistry; + return sRegistry; + } + + static RefPtr<DataChannelRegistry>& EnsureInstance() { + ASSERT_WEBRTC(NS_IsMainThread()); + if (!Instance()) { + Instance() = new DataChannelRegistry(); + } + return Instance(); + } + + uintptr_t RegisterImpl(DataChannelConnection* aConnection) { + ASSERT_WEBRTC(NS_IsMainThread()); + mConnections.emplace(mNextId, aConnection); + return mNextId++; + } + + void DeregisterImpl(uintptr_t aId) { + ASSERT_WEBRTC(NS_IsMainThread()); + mConnections.erase(aId); + } + + bool Empty() const { return mConnections.empty(); } + + RefPtr<DataChannelConnection> LookupImpl(uintptr_t aId) { + auto it = mConnections.find(aId); + if (NS_WARN_IF(it == mConnections.end())) { + DC_DEBUG(("Can't find connection ulp %p", (void*)aId)); + return nullptr; + } + return it->second; + } + + virtual ~DataChannelRegistry() { + ASSERT_WEBRTC(NS_IsMainThread()); + + if (NS_WARN_IF(!mConnections.empty())) { + MOZ_ASSERT(false); + mConnections.clear(); + } + + DeinitUsrSctp(); + } + +#ifdef SCTP_DTLS_SUPPORTED + static int SctpDtlsOutput(void* addr, void* buffer, size_t length, + uint8_t tos, uint8_t set_df) { + uintptr_t id = reinterpret_cast<uintptr_t>(addr); + RefPtr<DataChannelConnection> connection = DataChannelRegistry::Lookup(id); + if (NS_WARN_IF(!connection) || connection->InShutdown()) { + return 0; + } + return connection->SctpDtlsOutput(addr, buffer, length, tos, set_df); + } +#endif + + void InitUsrSctp() { +#ifndef MOZ_PEERCONNECTION + MOZ_CRASH("Trying to use SCTP/DTLS without dom/media/webrtc/transport"); +#endif + + DC_DEBUG(("Calling usrsctp_init %p", this)); + + usrsctp_init(0, DataChannelRegistry::SctpDtlsOutput, debug_printf); + + // Set logging to SCTP:LogLevel::Debug to get SCTP debugs + if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) { + usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL); + } + + // Do not send ABORTs in response to INITs (1). + // Do not send ABORTs for received Out of the Blue packets (2). + usrsctp_sysctl_set_sctp_blackhole(2); + + // Disable the Explicit Congestion Notification extension (currently not + // supported by the Firefox code) + usrsctp_sysctl_set_sctp_ecn_enable(0); + + // Enable interleaving messages for different streams (incoming) + // See: https://tools.ietf.org/html/rfc6458#section-8.1.20 + usrsctp_sysctl_set_sctp_default_frag_interleave(2); + + // Disabling authentication and dynamic address reconfiguration as neither + // of them are used for data channel and only result in additional code + // paths being used. + usrsctp_sysctl_set_sctp_asconf_enable(0); + usrsctp_sysctl_set_sctp_auth_enable(0); + } + + void DeinitUsrSctp() { + DC_DEBUG(("Calling usrsctp_finish %p", this)); + usrsctp_finish(); + } + + uintptr_t mNextId = 1; + std::map<uintptr_t, RefPtr<DataChannelConnection>> mConnections; + UniquePtr<media::ShutdownBlockingTicket> mShutdownBlocker; + static StaticMutex sInstanceMutex MOZ_UNANNOTATED; +}; + +StaticMutex DataChannelRegistry::sInstanceMutex; + +OutgoingMsg::OutgoingMsg(struct sctp_sendv_spa& info, const uint8_t* data, + size_t length) + : mLength(length), mData(data) { + mInfo = &info; + mPos = 0; +} + +void OutgoingMsg::Advance(size_t offset) { + mPos += offset; + if (mPos > mLength) { + mPos = mLength; + } +} + +BufferedOutgoingMsg::BufferedOutgoingMsg(OutgoingMsg& msg) { + size_t length = msg.GetLeft(); + auto* tmp = new uint8_t[length]; // infallible malloc! + memcpy(tmp, msg.GetData(), length); + mLength = length; + mData = tmp; + mInfo = new sctp_sendv_spa; + *mInfo = msg.GetInfo(); + mPos = 0; +} + +BufferedOutgoingMsg::~BufferedOutgoingMsg() { + delete mInfo; + delete[] mData; +} + +static int receive_cb(struct socket* sock, union sctp_sockstore addr, + void* data, size_t datalen, struct sctp_rcvinfo rcv, + int flags, void* ulp_info) { + DC_DEBUG(("In receive_cb, ulp_info=%p", ulp_info)); + uintptr_t id = reinterpret_cast<uintptr_t>(ulp_info); + RefPtr<DataChannelConnection> connection = DataChannelRegistry::Lookup(id); + if (!connection) { + // Unfortunately, we can get callbacks after calling + // usrsctp_close(socket), so we need to simply ignore them if we've + // already killed the DataChannelConnection object + DC_DEBUG(( + "Ignoring receive callback for terminated Connection ulp=%p, %zu bytes", + ulp_info, datalen)); + return 0; + } + return connection->ReceiveCallback(sock, data, datalen, rcv, flags); +} + +static RefPtr<DataChannelConnection> GetConnectionFromSocket( + struct socket* sock) { + struct sockaddr* addrs = nullptr; + int naddrs = usrsctp_getladdrs(sock, 0, &addrs); + if (naddrs <= 0 || addrs[0].sa_family != AF_CONN) { + return nullptr; + } + // usrsctp_getladdrs() returns the addresses bound to this socket, which + // contains the SctpDataMediaChannel* as sconn_addr. Read the pointer, + // then free the list of addresses once we have the pointer. We only open + // AF_CONN sockets, and they should all have the sconn_addr set to the + // pointer that created them, so [0] is as good as any other. + struct sockaddr_conn* sconn = + reinterpret_cast<struct sockaddr_conn*>(&addrs[0]); + uintptr_t id = reinterpret_cast<uintptr_t>(sconn->sconn_addr); + RefPtr<DataChannelConnection> connection = DataChannelRegistry::Lookup(id); + usrsctp_freeladdrs(addrs); + + return connection; +} + +// Called when the buffer empties to the threshold value. This is called +// from SctpDtlsInput() through the sctp stack. SctpDtlsInput() calls +// usrsctp_conninput() under lock +int DataChannelConnection::OnThresholdEvent(struct socket* sock, + uint32_t sb_free, void* ulp_info) { + RefPtr<DataChannelConnection> connection = GetConnectionFromSocket(sock); + connection->mLock.AssertCurrentThreadOwns(); + if (connection) { + connection->SendDeferredMessages(); + } else { + DC_ERROR(("Can't find connection for socket %p", sock)); + } + return 0; +} + +DataChannelConnection::~DataChannelConnection() { + DC_DEBUG(("Deleting DataChannelConnection %p", (void*)this)); + // This may die on the MainThread, or on the STS thread, or on an + // sctp thread if we were in a callback when the DOM side shut things down. + ASSERT_WEBRTC(mState == DataChannelConnectionState::Closed); + MOZ_ASSERT(!mMasterSocket); + MOZ_ASSERT(mPending.GetSize() == 0); + + if (!IsSTSThread()) { + // We may be on MainThread *or* on an sctp thread (being called from + // receive_cb() or SctpDtlsOutput()) + if (mInternalIOThread) { + // Avoid spinning the event thread from here (which if we're mainthread + // is in the event loop already) + nsCOMPtr<nsIRunnable> r = WrapRunnable( + nsCOMPtr<nsIThread>(mInternalIOThread), &nsIThread::AsyncShutdown); + Dispatch(r.forget()); + } + } else { + // on STS, safe to call shutdown + if (mInternalIOThread) { + mInternalIOThread->Shutdown(); + } + } +} + +void DataChannelConnection::Destroy() { + // Though it's probably ok to do this and close the sockets; + // if we really want it to do true clean shutdowns it can + // create a dependant Internal object that would remain around + // until the network shut down the association or timed out. + DC_DEBUG(("Destroying DataChannelConnection %p", (void*)this)); + ASSERT_WEBRTC(NS_IsMainThread()); + CloseAll(); + + MutexAutoLock lock(mLock); + // If we had a pending reset, we aren't waiting for it - clear the list so + // we can deregister this DataChannelConnection without leaking. + ClearResets(); + + MOZ_ASSERT(mSTS); + ASSERT_WEBRTC(NS_IsMainThread()); + mListener = nullptr; + // Finish Destroy on STS thread to avoid bug 876167 - once that's fixed, + // the usrsctp_close() calls can move back here (and just proxy the + // disconnect_all()) + RUN_ON_THREAD(mSTS, + WrapRunnable(RefPtr<DataChannelConnection>(this), + &DataChannelConnection::DestroyOnSTS, mSocket, + mMasterSocket), + NS_DISPATCH_NORMAL); + + // These will be released on STS + mSocket = nullptr; + mMasterSocket = nullptr; // also a flag that we've Destroyed this connection + + // We can't get any more *new* callbacks from the SCTP library + + // All existing callbacks have refs to DataChannelConnection - however, + // we need to handle their destroying the object off mainthread/STS + + // nsDOMDataChannel objects have refs to DataChannels that have refs to us +} + +void DataChannelConnection::DestroyOnSTS(struct socket* aMasterSocket, + struct socket* aSocket) { + if (aSocket && aSocket != aMasterSocket) usrsctp_close(aSocket); + if (aMasterSocket) usrsctp_close(aMasterSocket); + + usrsctp_deregister_address(reinterpret_cast<void*>(mId)); + DC_DEBUG( + ("Deregistered %p from the SCTP stack.", reinterpret_cast<void*>(mId))); +#ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED + mShutdown = true; + DC_DEBUG(("Shutting down connection %p, id %p", this, (void*)mId)); +#endif + + disconnect_all(); + mTransportHandler = nullptr; + GetMainThreadSerialEventTarget()->Dispatch(NS_NewRunnableFunction( + "DataChannelConnection::Destroy", + [id = mId]() { DataChannelRegistry::Deregister(id); })); +} + +Maybe<RefPtr<DataChannelConnection>> DataChannelConnection::Create( + DataChannelConnection::DataConnectionListener* aListener, + nsISerialEventTarget* aTarget, MediaTransportHandler* aHandler, + const uint16_t aLocalPort, const uint16_t aNumStreams, + const Maybe<uint64_t>& aMaxMessageSize) { + ASSERT_WEBRTC(NS_IsMainThread()); + + RefPtr<DataChannelConnection> connection = new DataChannelConnection( + aListener, aTarget, aHandler); // Walks into a bar + return connection->Init(aLocalPort, aNumStreams, aMaxMessageSize) + ? Some(connection) + : Nothing(); +} + +DataChannelConnection::DataChannelConnection( + DataChannelConnection::DataConnectionListener* aListener, + nsISerialEventTarget* aTarget, MediaTransportHandler* aHandler) + : NeckoTargetHolder(aTarget), + mLock("netwerk::sctp::DataChannelConnection"), + mListener(aListener), + mTransportHandler(aHandler) { + DC_VERBOSE(("Constructor DataChannelConnection=%p, listener=%p", this, + mListener.get())); +#ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED + mShutdown = false; +#endif +} + +bool DataChannelConnection::Init(const uint16_t aLocalPort, + const uint16_t aNumStreams, + const Maybe<uint64_t>& aMaxMessageSize) { + ASSERT_WEBRTC(NS_IsMainThread()); + + struct sctp_initmsg initmsg = {}; + struct sctp_assoc_value av = {}; + struct sctp_event event = {}; + socklen_t len; + + uint16_t event_types[] = { + SCTP_ASSOC_CHANGE, SCTP_PEER_ADDR_CHANGE, + SCTP_REMOTE_ERROR, SCTP_SHUTDOWN_EVENT, + SCTP_ADAPTATION_INDICATION, SCTP_PARTIAL_DELIVERY_EVENT, + SCTP_SEND_FAILED_EVENT, SCTP_STREAM_RESET_EVENT, + SCTP_STREAM_CHANGE_EVENT}; + { + // MutexAutoLock lock(mLock); Not needed since we're on mainthread always + mLocalPort = aLocalPort; + SetMaxMessageSize(aMaxMessageSize.isSome(), aMaxMessageSize.valueOr(0)); + } + + mId = DataChannelRegistry::Register(this); + + // XXX FIX! make this a global we get once + // Find the STS thread + nsresult rv; + mSTS = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv); + MOZ_ASSERT(NS_SUCCEEDED(rv)); + + // Open sctp with a callback + if ((mMasterSocket = + usrsctp_socket(AF_CONN, SOCK_STREAM, IPPROTO_SCTP, receive_cb, + &DataChannelConnection::OnThresholdEvent, + usrsctp_sysctl_get_sctp_sendspace() / 2, + reinterpret_cast<void*>(mId))) == nullptr) { + return false; + } + + int buf_size = 1024 * 1024; + + if (usrsctp_setsockopt(mMasterSocket, SOL_SOCKET, SO_RCVBUF, + (const void*)&buf_size, sizeof(buf_size)) < 0) { + DC_ERROR(("Couldn't change receive buffer size on SCTP socket")); + goto error_cleanup; + } + if (usrsctp_setsockopt(mMasterSocket, SOL_SOCKET, SO_SNDBUF, + (const void*)&buf_size, sizeof(buf_size)) < 0) { + DC_ERROR(("Couldn't change send buffer size on SCTP socket")); + goto error_cleanup; + } + + // Make non-blocking for bind/connect. SCTP over UDP defaults to non-blocking + // in associations for normal IO + if (usrsctp_set_non_blocking(mMasterSocket, 1) < 0) { + DC_ERROR(("Couldn't set non_blocking on SCTP socket")); + // We can't handle connect() safely if it will block, not that this will + // even happen. + goto error_cleanup; + } + + // Make sure when we close the socket, make sure it doesn't call us back + // again! This would cause it try to use an invalid DataChannelConnection + // pointer + struct linger l; + l.l_onoff = 1; + l.l_linger = 0; + if (usrsctp_setsockopt(mMasterSocket, SOL_SOCKET, SO_LINGER, (const void*)&l, + (socklen_t)sizeof(struct linger)) < 0) { + DC_ERROR(("Couldn't set SO_LINGER on SCTP socket")); + // unsafe to allow it to continue if this fails + goto error_cleanup; + } + + // XXX Consider disabling this when we add proper SDP negotiation. + // We may want to leave enabled for supporting 'cloning' of SDP offers, which + // implies re-use of the same pseudo-port number, or forcing a renegotiation. + { + const int option_value = 1; + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REUSE_PORT, + (const void*)&option_value, + (socklen_t)sizeof(option_value)) < 0) { + DC_WARN(("Couldn't set SCTP_REUSE_PORT on SCTP socket")); + } + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_NODELAY, + (const void*)&option_value, + (socklen_t)sizeof(option_value)) < 0) { + DC_WARN(("Couldn't set SCTP_NODELAY on SCTP socket")); + } + } + + // Set explicit EOR + { + const int option_value = 1; + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EXPLICIT_EOR, + (const void*)&option_value, + (socklen_t)sizeof(option_value)) < 0) { + DC_ERROR(("*** failed to enable explicit EOR mode %d", errno)); + goto error_cleanup; + } + } + + // Enable ndata + // TODO: Bug 1381145, enable this once ndata has been deployed +#if 0 + av.assoc_id = SCTP_FUTURE_ASSOC; + av.assoc_value = 1; + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INTERLEAVING_SUPPORTED, &av, + (socklen_t)sizeof(struct sctp_assoc_value)) < 0) { + DC_ERROR(("*** failed enable ndata errno %d", errno)); + goto error_cleanup; + } +#endif + + av.assoc_id = SCTP_ALL_ASSOC; + av.assoc_value = SCTP_ENABLE_RESET_STREAM_REQ | SCTP_ENABLE_CHANGE_ASSOC_REQ; + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, + &av, (socklen_t)sizeof(struct sctp_assoc_value)) < 0) { + DC_ERROR(("*** failed enable stream reset errno %d", errno)); + goto error_cleanup; + } + + /* Enable the events of interest. */ + event.se_assoc_id = SCTP_ALL_ASSOC; + event.se_on = 1; + for (unsigned short event_type : event_types) { + event.se_type = event_type; + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EVENT, &event, + sizeof(event)) < 0) { + DC_ERROR(("*** failed setsockopt SCTP_EVENT errno %d", errno)); + goto error_cleanup; + } + } + + len = sizeof(initmsg); + if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, + &len) < 0) { + DC_ERROR(("*** failed getsockopt SCTP_INITMSG")); + goto error_cleanup; + } + DC_DEBUG(("Setting number of SCTP streams to %u, was %u/%u", aNumStreams, + initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams)); + initmsg.sinit_num_ostreams = aNumStreams; + initmsg.sinit_max_instreams = MAX_NUM_STREAMS; + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, + (socklen_t)sizeof(initmsg)) < 0) { + DC_ERROR(("*** failed setsockopt SCTP_INITMSG, errno %d", errno)); + goto error_cleanup; + } + + mSocket = nullptr; + mSTS->Dispatch( + NS_NewRunnableFunction("DataChannelConnection::Init", [id = mId]() { + usrsctp_register_address(reinterpret_cast<void*>(id)); + DC_DEBUG(("Registered %p within the SCTP stack.", + reinterpret_cast<void*>(id))); + })); + + return true; + +error_cleanup: + usrsctp_close(mMasterSocket); + mMasterSocket = nullptr; + return false; +} + +// Only called on MainThread, mMaxMessageSize is read on other threads +void DataChannelConnection::SetMaxMessageSize(bool aMaxMessageSizeSet, + uint64_t aMaxMessageSize) { + ASSERT_WEBRTC(NS_IsMainThread()); + MutexAutoLock lock(mLock); + + if (mMaxMessageSizeSet && !aMaxMessageSizeSet) { + // Don't overwrite already set MMS with default values + return; + } + + mMaxMessageSizeSet = aMaxMessageSizeSet; + mMaxMessageSize = aMaxMessageSize; + + nsresult rv; + nsCOMPtr<nsIPrefService> prefs = + do_GetService("@mozilla.org/preferences-service;1", &rv); + if (!NS_WARN_IF(NS_FAILED(rv))) { + nsCOMPtr<nsIPrefBranch> branch = do_QueryInterface(prefs); + + if (branch) { + int32_t temp; + if (!NS_FAILED(branch->GetIntPref( + "media.peerconnection.sctp.force_maximum_message_size", &temp))) { + if (temp >= 0) { + mMaxMessageSize = (uint64_t)temp; + } + } + } + } + + // Fix remote MMS. This code exists, so future implementations of + // RTCSctpTransport.maxMessageSize can simply provide that value from + // GetMaxMessageSize. + + // TODO: Bug 1382779, once resolved, can be increased to + // min(Uint8ArrayMaxSize, UINT32_MAX) + // TODO: Bug 1381146, once resolved, can be increased to whatever we support + // then (hopefully + // SIZE_MAX) + if (mMaxMessageSize == 0 || + mMaxMessageSize > WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_REMOTE) { + mMaxMessageSize = WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_REMOTE; + } + + DC_DEBUG(("Maximum message size (outgoing data): %" PRIu64 + " (set=%s, enforced=%s)", + mMaxMessageSize, mMaxMessageSizeSet ? "yes" : "no", + aMaxMessageSize != mMaxMessageSize ? "yes" : "no")); +} + +uint64_t DataChannelConnection::GetMaxMessageSize() { + MutexAutoLock lock(mLock); + return mMaxMessageSize; +} + +void DataChannelConnection::AppendStatsToReport( + const UniquePtr<dom::RTCStatsCollection>& aReport, + const DOMHighResTimeStamp aTimestamp) const { + ASSERT_WEBRTC(NS_IsMainThread()); + nsString temp; + for (const RefPtr<DataChannel>& chan : mChannels.GetAll()) { + // If channel is empty, ignore + if (!chan) { + continue; + } + mozilla::dom::RTCDataChannelStats stats; + nsString id = u"dc"_ns; + id.AppendInt(chan->GetStream()); + stats.mId.Construct(id); + chan->GetLabel(temp); + stats.mTimestamp.Construct(aTimestamp); + stats.mType.Construct(mozilla::dom::RTCStatsType::Data_channel); + stats.mLabel.Construct(temp); + chan->GetProtocol(temp); + stats.mProtocol.Construct(temp); + stats.mDataChannelIdentifier.Construct(chan->GetStream()); + { + using State = mozilla::dom::RTCDataChannelState; + State state; + switch (chan->GetReadyState()) { + case DataChannelState::Connecting: + state = State::Connecting; + break; + case DataChannelState::Open: + state = State::Open; + break; + case DataChannelState::Closing: + state = State::Closing; + break; + case DataChannelState::Closed: + state = State::Closed; + break; + }; + stats.mState.Construct(state); + } + auto counters = chan->GetTrafficCounters(); + stats.mMessagesSent.Construct(counters.mMessagesSent); + stats.mBytesSent.Construct(counters.mBytesSent); + stats.mMessagesReceived.Construct(counters.mMessagesReceived); + stats.mBytesReceived.Construct(counters.mBytesReceived); + if (!aReport->mDataChannelStats.AppendElement(stats, fallible)) { + mozalloc_handle_oom(0); + } + } +} + +#ifdef MOZ_PEERCONNECTION + +bool DataChannelConnection::ConnectToTransport(const std::string& aTransportId, + const bool aClient, + const uint16_t aLocalPort, + const uint16_t aRemotePort) { + MutexAutoLock lock(mLock); + + MOZ_ASSERT(mMasterSocket, + "SCTP wasn't initialized before ConnectToTransport!"); + static const auto paramString = + [](const std::string& tId, const Maybe<bool>& client, + const uint16_t localPort, const uint16_t remotePort) -> std::string { + std::ostringstream stream; + stream << "Transport ID: '" << tId << "', Role: '" + << (client ? (client.value() ? "client" : "server") : "") + << "', Local Port: '" << localPort << "', Remote Port: '" + << remotePort << "'"; + return stream.str(); + }; + + const auto params = + paramString(aTransportId, Some(aClient), aLocalPort, aRemotePort); + DC_DEBUG(("ConnectToTransport connecting DTLS transport with parameters: %s", + params.c_str())); + + DataChannelConnectionState state = GetState(); + if (state == DataChannelConnectionState::Open) { + if (aTransportId == mTransportId && mAllocateEven.isSome() && + mAllocateEven.value() == aClient && mLocalPort == aLocalPort && + mRemotePort == aRemotePort) { + DC_WARN( + ("Skipping attempt to connect to an already OPEN transport with " + "identical parameters.")); + return true; + } + DC_WARN( + ("Attempting to connect to an already OPEN transport, because " + "different parameters were provided.")); + DC_WARN(("Original transport parameters: %s", + paramString(mTransportId, mAllocateEven, mLocalPort, aRemotePort) + .c_str())); + DC_WARN(("New transport parameters: %s", params.c_str())); + } + if (NS_WARN_IF(aTransportId.empty())) { + return false; + } + + mLocalPort = aLocalPort; + mRemotePort = aRemotePort; + SetState(DataChannelConnectionState::Connecting); + mAllocateEven = Some(aClient); + + // Could be faster. Probably doesn't matter. + while (auto channel = mChannels.Get(INVALID_STREAM)) { + mChannels.Remove(channel); + channel->mStream = FindFreeStream(); + if (channel->mStream != INVALID_STREAM) { + mChannels.Insert(channel); + } + } + RUN_ON_THREAD(mSTS, + WrapRunnable(RefPtr<DataChannelConnection>(this), + &DataChannelConnection::SetSignals, aTransportId), + NS_DISPATCH_NORMAL); + return true; +} + +void DataChannelConnection::SetSignals(const std::string& aTransportId) { + ASSERT_WEBRTC(IsSTSThread()); + { + MutexAutoLock lock(mLock); + mTransportId = aTransportId; + } + if (!mConnectedToTransportHandler) { + mTransportHandler->SignalPacketReceived.connect( + this, &DataChannelConnection::SctpDtlsInput); + mTransportHandler->SignalStateChange.connect( + this, &DataChannelConnection::TransportStateChange); + mConnectedToTransportHandler = true; + } + // SignalStateChange() doesn't call you with the initial state + if (mTransportHandler->GetState(mTransportId, false) == + TransportLayer::TS_OPEN) { + DC_DEBUG(("Setting transport signals, dtls already open")); + CompleteConnect(); + } else { + DC_DEBUG(("Setting transport signals, dtls not open yet")); + } +} + +void DataChannelConnection::TransportStateChange( + const std::string& aTransportId, TransportLayer::State aState) { + ASSERT_WEBRTC(IsSTSThread()); + if (aTransportId == mTransportId) { + if (aState == TransportLayer::TS_OPEN) { + DC_DEBUG(("Transport is open!")); + CompleteConnect(); + } else if (aState == TransportLayer::TS_CLOSED || + aState == TransportLayer::TS_NONE || + aState == TransportLayer::TS_ERROR) { + DC_DEBUG(("Transport is closed!")); + Stop(); + } + } +} + +void DataChannelConnection::CompleteConnect() { + MutexAutoLock lock(mLock); + + DC_DEBUG(("dtls open")); + ASSERT_WEBRTC(IsSTSThread()); + if (!mMasterSocket) { + return; + } + + struct sockaddr_conn addr = {}; + addr.sconn_family = AF_CONN; +# if defined(__Userspace_os_Darwin) + addr.sconn_len = sizeof(addr); +# endif + addr.sconn_port = htons(mLocalPort); + addr.sconn_addr = reinterpret_cast<void*>(mId); + + DC_DEBUG(("Calling usrsctp_bind")); + int r = usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr*>(&addr), + sizeof(addr)); + if (r < 0) { + DC_ERROR(("usrsctp_bind failed: %d", r)); + } else { + // This is the remote addr + addr.sconn_port = htons(mRemotePort); + DC_DEBUG(("Calling usrsctp_connect")); + r = usrsctp_connect( + mMasterSocket, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)); + if (r >= 0 || errno == EINPROGRESS) { + struct sctp_paddrparams paddrparams = {}; + socklen_t opt_len; + + memcpy(&paddrparams.spp_address, &addr, sizeof(struct sockaddr_conn)); + opt_len = (socklen_t)sizeof(struct sctp_paddrparams); + r = usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS, + &paddrparams, &opt_len); + if (r < 0) { + DC_ERROR(("usrsctp_getsockopt failed: %d", r)); + } else { + // This field is misnamed. |spp_pathmtu| represents the maximum + // _payload_ size in libusrsctp. So: + // 1280 (a reasonable IPV6 MTU according to RFC 8831) + // -12 (sctp header) + // -24 (GCM sipher) + // -13 (DTLS record header) + // -8 (UDP header) + // -4 (TURN ChannelData) + // -40 (IPV6 header) + // = 1179 + // We could further restrict this, because RFC 8831 suggests a starting + // IPV4 path MTU of 1200, which would lead to a value of 1115. + // I suspect that in practice the path MTU for IPV4 is substantially + // larger than 1200. + paddrparams.spp_pathmtu = 1179; + paddrparams.spp_flags &= ~SPP_PMTUD_ENABLE; + paddrparams.spp_flags |= SPP_PMTUD_DISABLE; + opt_len = (socklen_t)sizeof(struct sctp_paddrparams); + r = usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, + SCTP_PEER_ADDR_PARAMS, &paddrparams, opt_len); + if (r < 0) { + DC_ERROR(("usrsctp_getsockopt failed: %d", r)); + } else { + DC_ERROR(("usrsctp: PMTUD disabled, MTU set to %u", + paddrparams.spp_pathmtu)); + } + } + } + if (r < 0) { + if (errno == EINPROGRESS) { + // non-blocking + return; + } + DC_ERROR(("usrsctp_connect failed: %d", errno)); + SetState(DataChannelConnectionState::Closed); + } else { + // We fire ON_CONNECTION via SCTP_COMM_UP when we get that + return; + } + } + // Note: currently this doesn't actually notify the application + Dispatch(do_AddRef(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::EventType::OnConnection, this))); +} + +// Process any pending Opens +void DataChannelConnection::ProcessQueuedOpens() { + // The nsDeque holds channels with an AddRef applied. Another reference + // (may) be held by the DOMDataChannel, unless it's been GC'd. No other + // references should exist. + + // Can't copy nsDeque's. Move into temp array since any that fail will + // go back to mPending + nsRefPtrDeque<DataChannel> temp; + RefPtr<DataChannel> temp_channel; + while (nullptr != (temp_channel = mPending.PopFront())) { + temp.Push(temp_channel.forget()); + } + + RefPtr<DataChannel> channel; + + while (nullptr != (channel = temp.PopFront())) { + if (channel->mHasFinishedOpen) { + DC_DEBUG(("Processing queued open for %p (%u)", channel.get(), + channel->mStream)); + channel->mHasFinishedOpen = false; + // OpenFinish returns a reference itself, so we need to take it can + // Release it + channel = OpenFinish(channel.forget()); // may reset the flag and re-push + } else { + NS_ASSERTION(false, + "How did a DataChannel get queued without the " + "mHasFinishedOpen flag?"); + } + } +} + +void DataChannelConnection::SctpDtlsInput(const std::string& aTransportId, + const MediaPacket& packet) { + MutexAutoLock lock(mLock); + if ((packet.type() != MediaPacket::SCTP) || (mTransportId != aTransportId)) { + return; + } + + if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) { + char* buf; + + if ((buf = usrsctp_dumppacket((void*)packet.data(), packet.len(), + SCTP_DUMP_INBOUND)) != nullptr) { + SCTP_LOG(("%s", buf)); + usrsctp_freedumpbuffer(buf); + } + } + // Pass the data to SCTP + usrsctp_conninput(reinterpret_cast<void*>(mId), packet.data(), packet.len(), + 0); +} + +void DataChannelConnection::SendPacket(std::unique_ptr<MediaPacket>&& packet) { + mSTS->Dispatch(NS_NewRunnableFunction( + "DataChannelConnection::SendPacket", + [this, self = RefPtr<DataChannelConnection>(this), + packet = std::move(packet)]() mutable { + // DC_DEBUG(("%p: SCTP/DTLS sent %ld bytes", this, len)); + if (!mTransportId.empty() && mTransportHandler) { + mTransportHandler->SendPacket(mTransportId, std::move(*packet)); + } + })); +} + +int DataChannelConnection::SctpDtlsOutput(void* addr, void* buffer, + size_t length, uint8_t tos, + uint8_t set_df) { + if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) { + char* buf; + + if ((buf = usrsctp_dumppacket(buffer, length, SCTP_DUMP_OUTBOUND)) != + nullptr) { + SCTP_LOG(("%s", buf)); + usrsctp_freedumpbuffer(buf); + } + } + + // We're async proxying even if on the STSThread because this is called + // with internal SCTP locks held in some cases (such as in usrsctp_connect()). + // SCTP has an option for Apple, on IP connections only, to release at least + // one of the locks before calling a packet output routine; with changes to + // the underlying SCTP stack this might remove the need to use an async proxy. + std::unique_ptr<MediaPacket> packet(new MediaPacket); + packet->SetType(MediaPacket::SCTP); + packet->Copy(static_cast<const uint8_t*>(buffer), length); + + if (NS_IsMainThread() && mDeferSend) { + mDeferredSend.emplace_back(std::move(packet)); + return 0; + } + + SendPacket(std::move(packet)); + return 0; // cheat! Packets can always be dropped later anyways +} +#endif + +#ifdef ALLOW_DIRECT_SCTP_LISTEN_CONNECT +// listen for incoming associations +// Blocks! - Don't call this from main thread! + +bool DataChannelConnection::Listen(unsigned short port) { + struct sockaddr_in addr = {}; + socklen_t addr_len; + + NS_WARNING_ASSERTION(!NS_IsMainThread(), + "Blocks, do not call from main thread!!!"); + + /* Acting as the 'server' */ +# ifdef HAVE_SIN_LEN + addr.sin_len = sizeof(struct sockaddr_in); +# endif + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = htonl(INADDR_ANY); + DC_DEBUG(("Waiting for connections on port %u", ntohs(addr.sin_port))); + { + MutexAutoLock lock(mLock); + SetState(DataChannelConnectionState::Connecting); + } + if (usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr*>(&addr), + sizeof(struct sockaddr_in)) < 0) { + DC_ERROR(("***Failed userspace_bind")); + return false; + } + if (usrsctp_listen(mMasterSocket, 1) < 0) { + DC_ERROR(("***Failed userspace_listen")); + return false; + } + + DC_DEBUG(("Accepting connection")); + addr_len = 0; + if ((mSocket = usrsctp_accept(mMasterSocket, nullptr, &addr_len)) == + nullptr) { + DC_ERROR(("***Failed accept")); + return false; + } + + { + MutexAutoLock lock(mLock); + SetState(DataChannelConnectionState::Open); + } + + struct linger l; + l.l_onoff = 1; + l.l_linger = 0; + if (usrsctp_setsockopt(mSocket, SOL_SOCKET, SO_LINGER, (const void*)&l, + (socklen_t)sizeof(struct linger)) < 0) { + DC_WARN(("Couldn't set SO_LINGER on SCTP socket")); + } + + // Notify Connection open + // XXX We need to make sure connection sticks around until the message is + // delivered + DC_DEBUG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this)); + Dispatch(do_AddRef(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::EventType::OnConnection, this, + (DataChannel*)nullptr))); + return true; +} + +// Blocks! - Don't call this from main thread! +bool DataChannelConnection::Connect(const char* addr, unsigned short port) { + struct sockaddr_in addr4 = {}; + struct sockaddr_in6 addr6 = {}; + + NS_WARNING_ASSERTION(!NS_IsMainThread(), + "Blocks, do not call from main thread!!!"); + + /* Acting as the connector */ + DC_DEBUG(("Connecting to %s, port %u", addr, port)); +# ifdef HAVE_SIN_LEN + addr4.sin_len = sizeof(struct sockaddr_in); +# endif +# ifdef HAVE_SIN6_LEN + addr6.sin6_len = sizeof(struct sockaddr_in6); +# endif + addr4.sin_family = AF_INET; + addr6.sin6_family = AF_INET6; + addr4.sin_port = htons(port); + addr6.sin6_port = htons(port); + { + MutexAutoLock lock(mLock); + SetState(DataChannelConnectionState::Connecting); + } +# if !defined(__Userspace_os_Windows) + if (inet_pton(AF_INET6, addr, &addr6.sin6_addr) == 1) { + if (usrsctp_connect(mMasterSocket, + reinterpret_cast<struct sockaddr*>(&addr6), + sizeof(struct sockaddr_in6)) < 0) { + DC_ERROR(("*** Failed userspace_connect")); + return false; + } + } else if (inet_pton(AF_INET, addr, &addr4.sin_addr) == 1) { + if (usrsctp_connect(mMasterSocket, + reinterpret_cast<struct sockaddr*>(&addr4), + sizeof(struct sockaddr_in)) < 0) { + DC_ERROR(("*** Failed userspace_connect")); + return false; + } + } else { + DC_ERROR(("*** Illegal destination address.")); + } +# else + { + struct sockaddr_storage ss; + int sslen = sizeof(ss); + + if (!WSAStringToAddressA(const_cast<char*>(addr), AF_INET6, nullptr, + (struct sockaddr*)&ss, &sslen)) { + addr6.sin6_addr = + (reinterpret_cast<struct sockaddr_in6*>(&ss))->sin6_addr; + if (usrsctp_connect(mMasterSocket, + reinterpret_cast<struct sockaddr*>(&addr6), + sizeof(struct sockaddr_in6)) < 0) { + DC_ERROR(("*** Failed userspace_connect")); + return false; + } + } else if (!WSAStringToAddressA(const_cast<char*>(addr), AF_INET, nullptr, + (struct sockaddr*)&ss, &sslen)) { + addr4.sin_addr = (reinterpret_cast<struct sockaddr_in*>(&ss))->sin_addr; + if (usrsctp_connect(mMasterSocket, + reinterpret_cast<struct sockaddr*>(&addr4), + sizeof(struct sockaddr_in)) < 0) { + DC_ERROR(("*** Failed userspace_connect")); + return false; + } + } else { + DC_ERROR(("*** Illegal destination address.")); + } + } +# endif + + mSocket = mMasterSocket; + + DC_DEBUG(("connect() succeeded! Entering connected mode")); + { + MutexAutoLock lock(mLock); + SetState(DataChannelConnectionState::Open); + } + // Notify Connection open + // XXX We need to make sure connection sticks around until the message is + // delivered + DC_DEBUG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this)); + Dispatch(do_AddRef(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::EventType::OnConnection, this, + (DataChannel*)nullptr))); + return true; +} +#endif + +DataChannel* DataChannelConnection::FindChannelByStream(uint16_t stream) { + return mChannels.Get(stream).get(); +} + +uint16_t DataChannelConnection::FindFreeStream() const { + ASSERT_WEBRTC(NS_IsMainThread()); + uint16_t i, limit; + + limit = MAX_NUM_STREAMS; + + MOZ_ASSERT(mAllocateEven.isSome()); + for (i = (*mAllocateEven ? 0 : 1); i < limit; i += 2) { + if (mChannels.Get(i)) { + continue; + } + + // Verify it's not still in the process of closing + size_t j; + for (j = 0; j < mStreamsResetting.Length(); ++j) { + if (mStreamsResetting[j] == i) { + break; + } + } + + if (j == mStreamsResetting.Length()) { + return i; + } + } + return INVALID_STREAM; +} + +uint32_t DataChannelConnection::UpdateCurrentStreamIndex() { + RefPtr<DataChannel> channel = mChannels.GetNextChannel(mCurrentStream); + if (!channel) { + mCurrentStream = 0; + } else { + mCurrentStream = channel->mStream; + } + return mCurrentStream; +} + +uint32_t DataChannelConnection::GetCurrentStreamIndex() { + if (!mChannels.Get(mCurrentStream)) { + // The stream muse have been removed, reset + DC_DEBUG(("Reset mCurrentChannel")); + mCurrentStream = 0; + } + return mCurrentStream; +} + +bool DataChannelConnection::RequestMoreStreams(int32_t aNeeded) { + struct sctp_status status = {}; + struct sctp_add_streams sas = {}; + uint32_t outStreamsNeeded; + socklen_t len; + + if (aNeeded + mNegotiatedIdLimit > MAX_NUM_STREAMS) { + aNeeded = MAX_NUM_STREAMS - mNegotiatedIdLimit; + } + if (aNeeded <= 0) { + return false; + } + + len = (socklen_t)sizeof(struct sctp_status); + if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_STATUS, &status, + &len) < 0) { + DC_ERROR(("***failed: getsockopt SCTP_STATUS")); + return false; + } + outStreamsNeeded = aNeeded; // number to add + + // Note: if multiple channel opens happen when we don't have enough space, + // we'll call RequestMoreStreams() multiple times + sas.sas_instrms = 0; + sas.sas_outstrms = (uint16_t)outStreamsNeeded; /* XXX error handling */ + // Doesn't block, we get an event when it succeeds or fails + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ADD_STREAMS, &sas, + (socklen_t)sizeof(struct sctp_add_streams)) < 0) { + if (errno == EALREADY) { + DC_DEBUG(("Already have %u output streams", outStreamsNeeded)); + return true; + } + + DC_ERROR(("***failed: setsockopt ADD errno=%d", errno)); + return false; + } + DC_DEBUG(("Requested %u more streams", outStreamsNeeded)); + // We add to mNegotiatedIdLimit when we get a SCTP_STREAM_CHANGE_EVENT and the + // values are larger than mNegotiatedIdLimit + return true; +} + +// Returns a POSIX error code. +int DataChannelConnection::SendControlMessage(const uint8_t* data, uint32_t len, + uint16_t stream) { + struct sctp_sendv_spa info = {}; + + // General flags + info.sendv_flags = SCTP_SEND_SNDINFO_VALID; + + // Set stream identifier, protocol identifier and flags + info.sendv_sndinfo.snd_sid = stream; + info.sendv_sndinfo.snd_flags = SCTP_EOR; + info.sendv_sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL); + + // Create message instance and send + // Note: Main-thread IO, but doesn't block +#if (UINT32_MAX > SIZE_MAX) + if (len > SIZE_MAX) { + return EMSGSIZE; + } +#endif + OutgoingMsg msg(info, data, (size_t)len); + bool buffered; + int error = SendMsgInternalOrBuffer(mBufferedControl, msg, buffered, nullptr); + + // Set pending type (if buffered) + if (!error && buffered && mPendingType == PendingType::None) { + mPendingType = PendingType::Dcep; + } + return error; +} + +// Returns a POSIX error code. +int DataChannelConnection::SendOpenAckMessage(uint16_t stream) { + struct rtcweb_datachannel_ack ack = {}; + ack.msg_type = DATA_CHANNEL_ACK; + + return SendControlMessage((const uint8_t*)&ack, sizeof(ack), stream); +} + +// Returns a POSIX error code. +int DataChannelConnection::SendOpenRequestMessage( + const nsACString& label, const nsACString& protocol, uint16_t stream, + bool unordered, DataChannelReliabilityPolicy prPolicy, uint32_t prValue) { + const size_t label_len = label.Length(); // not including nul + const size_t proto_len = protocol.Length(); // not including nul + // careful - request struct include one char for the label + const size_t req_size = sizeof(struct rtcweb_datachannel_open_request) - 1 + + label_len + proto_len; + UniqueFreePtr<struct rtcweb_datachannel_open_request> req( + (struct rtcweb_datachannel_open_request*)moz_xmalloc(req_size)); + + memset(req.get(), 0, req_size); + req->msg_type = DATA_CHANNEL_OPEN_REQUEST; + switch (prPolicy) { + case DataChannelReliabilityPolicy::Reliable: + req->channel_type = DATA_CHANNEL_RELIABLE; + break; + case DataChannelReliabilityPolicy::LimitedLifetime: + req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_TIMED; + break; + case DataChannelReliabilityPolicy::LimitedRetransmissions: + req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT; + break; + default: + return EINVAL; + } + if (unordered) { + // Per the current types, all differ by 0x80 between ordered and unordered + req->channel_type |= + 0x80; // NOTE: be careful if new types are added in the future + } + + req->reliability_param = htonl(prValue); + req->priority = htons(0); /* XXX: add support */ + req->label_length = htons(label_len); + req->protocol_length = htons(proto_len); + memcpy(&req->label[0], PromiseFlatCString(label).get(), label_len); + memcpy(&req->label[label_len], PromiseFlatCString(protocol).get(), proto_len); + + // TODO: req_size is an int... that looks hairy + int error = SendControlMessage((const uint8_t*)req.get(), req_size, stream); + return error; +} + +// XXX This should use a separate thread (outbound queue) which should +// select() to know when to *try* to send data to the socket again. +// Alternatively, it can use a timeout, but that's guaranteed to be wrong +// (just not sure in what direction). We could re-implement NSPR's +// PR_POLL_WRITE/etc handling... with a lot of work. + +// Better yet, use the SCTP stack's notifications on buffer state to avoid +// filling the SCTP's buffers. + +// returns if we're still blocked (true) +bool DataChannelConnection::SendDeferredMessages() { + RefPtr<DataChannel> channel; // we may null out the refs to this + + // This may block while something is modifying channels, but should not block + // for IO + ASSERT_WEBRTC(!NS_IsMainThread()); + mLock.AssertCurrentThreadOwns(); + + DC_DEBUG(("SendDeferredMessages called, pending type: %s", + ToString(mPendingType))); + if (mPendingType == PendingType::None) { + return false; + } + + // Send pending control messages + // Note: If ndata is not active, check if DCEP messages are currently + // outstanding. These need to + // be sent first before other streams can be used for sending. + if (!mBufferedControl.IsEmpty() && + (mSendInterleaved || mPendingType == PendingType::Dcep)) { + if (SendBufferedMessages(mBufferedControl, nullptr)) { + return true; + } + + // Note: There may or may not be pending data messages + mPendingType = PendingType::Data; + } + + bool blocked = false; + uint32_t i = GetCurrentStreamIndex(); + uint32_t end = i; + do { + channel = mChannels.Get(i); + // Should already be cleared if closing/closed + if (!channel || channel->mBufferedData.IsEmpty()) { + i = UpdateCurrentStreamIndex(); + continue; + } + + // Send buffered data messages + // Warning: This will fail in case ndata is inactive and a previously + // deallocated data channel has not been closed properly. If you + // ever see that no messages can be sent on any channel, this is + // likely the cause (an explicit EOR message partially sent whose + // remaining chunks are still being waited for). + size_t written = 0; + mDeferSend = true; + blocked = SendBufferedMessages(channel->mBufferedData, &written); + mDeferSend = false; + if (written) { + channel->DecrementBufferedAmount(written); + } + + for (auto&& packet : mDeferredSend) { + MOZ_ASSERT(written); + SendPacket(std::move(packet)); + } + mDeferredSend.clear(); + + // Update current stream index + // Note: If ndata is not active, the outstanding data messages on this + // stream need to be sent first before other streams can be used for + // sending. + if (mSendInterleaved || !blocked) { + i = UpdateCurrentStreamIndex(); + } + } while (!blocked && i != end); + + if (!blocked) { + mPendingType = + mBufferedControl.IsEmpty() ? PendingType::None : PendingType::Dcep; + } + return blocked; +} + +// Called with mLock locked! +// buffer MUST have at least one item! +// returns if we're still blocked (true) +bool DataChannelConnection::SendBufferedMessages( + nsTArray<UniquePtr<BufferedOutgoingMsg>>& buffer, size_t* aWritten) { + do { + // Re-send message + int error = SendMsgInternal(*buffer[0], aWritten); + switch (error) { + case 0: + buffer.RemoveElementAt(0); + break; + case EAGAIN: +#if (EAGAIN != EWOULDBLOCK) + case EWOULDBLOCK: +#endif + return true; + default: + buffer.RemoveElementAt(0); + DC_ERROR(("error on sending: %d", error)); + break; + } + } while (!buffer.IsEmpty()); + + return false; +} + +// Caller must ensure that length <= SIZE_MAX +void DataChannelConnection::HandleOpenRequestMessage( + const struct rtcweb_datachannel_open_request* req, uint32_t length, + uint16_t stream) { + RefPtr<DataChannel> channel; + uint32_t prValue; + DataChannelReliabilityPolicy prPolicy; + + ASSERT_WEBRTC(!NS_IsMainThread()); + mLock.AssertCurrentThreadOwns(); + + const size_t requiredLength = (sizeof(*req) - 1) + ntohs(req->label_length) + + ntohs(req->protocol_length); + if (((size_t)length) != requiredLength) { + if (((size_t)length) < requiredLength) { + DC_ERROR( + ("%s: insufficient length: %u, should be %zu. Unable to continue.", + __FUNCTION__, length, requiredLength)); + return; + } + DC_WARN(("%s: Inconsistent length: %u, should be %zu", __FUNCTION__, length, + requiredLength)); + } + + DC_DEBUG(("%s: length %u, sizeof(*req) = %zu", __FUNCTION__, length, + sizeof(*req))); + + switch (req->channel_type) { + case DATA_CHANNEL_RELIABLE: + case DATA_CHANNEL_RELIABLE_UNORDERED: + prPolicy = DataChannelReliabilityPolicy::Reliable; + break; + case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT: + case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT_UNORDERED: + prPolicy = DataChannelReliabilityPolicy::LimitedRetransmissions; + break; + case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED: + case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED_UNORDERED: + prPolicy = DataChannelReliabilityPolicy::LimitedLifetime; + break; + default: + DC_ERROR(("Unknown channel type %d", req->channel_type)); + /* XXX error handling */ + return; + } + prValue = ntohl(req->reliability_param); + bool ordered = !(req->channel_type & 0x80); + + if ((channel = FindChannelByStream(stream))) { + if (!channel->mNegotiated) { + DC_ERROR( + ("HandleOpenRequestMessage: channel for pre-existing stream " + "%u that was not externally negotiated. JS is lying to us, or " + "there's an id collision.", + stream)); + /* XXX: some error handling */ + } else { + DC_DEBUG(("Open for externally negotiated channel %u", stream)); + // XXX should also check protocol, maybe label + if (prPolicy != channel->mPrPolicy || prValue != channel->mPrValue || + ordered != channel->mOrdered) { + DC_WARN( + ("external negotiation mismatch with OpenRequest:" + "channel %u, policy %s/%s, value %u/%u, ordered %d/%d", + stream, ToString(prPolicy), ToString(channel->mPrPolicy), prValue, + channel->mPrValue, static_cast<int>(ordered), + static_cast<int>(channel->mOrdered))); + } + } + return; + } + if (stream >= mNegotiatedIdLimit) { + DC_ERROR(("%s: stream %u out of bounds (%zu)", __FUNCTION__, stream, + mNegotiatedIdLimit)); + return; + } + + nsCString label( + nsDependentCSubstring(&req->label[0], ntohs(req->label_length))); + nsCString protocol(nsDependentCSubstring( + &req->label[ntohs(req->label_length)], ntohs(req->protocol_length))); + + channel = + new DataChannel(this, stream, DataChannelState::Open, label, protocol, + prPolicy, prValue, ordered, false, nullptr, nullptr); + mChannels.Insert(channel); + + DC_DEBUG(("%s: sending ON_CHANNEL_CREATED for %s/%s: %u", __FUNCTION__, + channel->mLabel.get(), channel->mProtocol.get(), stream)); + Dispatch(do_AddRef(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::EventType::OnChannelCreated, this, + channel))); + + DC_DEBUG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__, + channel.get())); + channel->AnnounceOpen(); + + // Note that any message can be buffered; SendOpenAckMessage may + // error later than this check. + const auto error = SendOpenAckMessage(channel->mStream); + if (error) { + DC_ERROR(("SendOpenRequest failed, error = %d", error)); + Dispatch(NS_NewRunnableFunction( + "DataChannelConnection::HandleOpenRequestMessage", + [channel, connection = RefPtr<DataChannelConnection>(this)]() { + // Close the channel on failure + connection->Close(channel); + })); + return; + } + DeliverQueuedData(channel->mStream); +} + +// NOTE: the updated spec from the IETF says we should set in-order until we +// receive an ACK. That would make this code moot. Keep it for now for +// backwards compatibility. +void DataChannelConnection::DeliverQueuedData(uint16_t stream) { + mLock.AssertCurrentThreadOwns(); + + mQueuedData.RemoveElementsBy([stream, this](const auto& dataItem) { + mLock.AssertCurrentThreadOwns(); + const bool match = dataItem->mStream == stream; + if (match) { + DC_DEBUG(("Delivering queued data for stream %u, length %u", stream, + dataItem->mLength)); + // Deliver the queued data + HandleDataMessage(dataItem->mData, dataItem->mLength, dataItem->mPpid, + dataItem->mStream, dataItem->mFlags); + } + return match; + }); +} + +// Caller must ensure that length <= SIZE_MAX +void DataChannelConnection::HandleOpenAckMessage( + const struct rtcweb_datachannel_ack* ack, uint32_t length, + uint16_t stream) { + DataChannel* channel; + + mLock.AssertCurrentThreadOwns(); + + channel = FindChannelByStream(stream); + if (NS_WARN_IF(!channel)) { + return; + } + + DC_DEBUG(("OpenAck received for stream %u, waiting=%d", stream, + channel->mWaitingForAck ? 1 : 0)); + + channel->mWaitingForAck = false; +} + +// Caller must ensure that length <= SIZE_MAX +void DataChannelConnection::HandleUnknownMessage(uint32_t ppid, uint32_t length, + uint16_t stream) { + /* XXX: Send an error message? */ + DC_ERROR(("unknown DataChannel message received: %u, len %u on stream %d", + ppid, length, stream)); + // XXX Log to JS error console if possible +} + +uint8_t DataChannelConnection::BufferMessage(nsACString& recvBuffer, + const void* data, uint32_t length, + uint32_t ppid, int flags) { + const char* buffer = (const char*)data; + uint8_t bufferFlags = 0; + + if ((flags & MSG_EOR) && ppid != DATA_CHANNEL_PPID_BINARY_PARTIAL && + ppid != DATA_CHANNEL_PPID_DOMSTRING_PARTIAL) { + bufferFlags |= DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_COMPLETE; + + // Return directly if nothing has been buffered + if (recvBuffer.IsEmpty()) { + return bufferFlags; + } + } + + // Ensure it doesn't blow up our buffer + // TODO: Change 'WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL' to whatever the + // new buffer is capable of holding. + if (((uint64_t)recvBuffer.Length()) + ((uint64_t)length) > + WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL) { + bufferFlags |= DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_TOO_LARGE; + return bufferFlags; + } + + // Copy & add to receive buffer + recvBuffer.Append(buffer, length); + bufferFlags |= DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED; + return bufferFlags; +} + +void DataChannelConnection::HandleDataMessage(const void* data, size_t length, + uint32_t ppid, uint16_t stream, + int flags) { + DataChannel* channel; + const char* buffer = (const char*)data; + + mLock.AssertCurrentThreadOwns(); + channel = FindChannelByStream(stream); + + // Note: Until we support SIZE_MAX sized messages, we need this check +#if (SIZE_MAX > UINT32_MAX) + if (length > UINT32_MAX) { + DC_ERROR(("DataChannel: Cannot handle message of size %zu (max=%" PRIu32 + ")", + length, UINT32_MAX)); + CloseLocked(channel); + return; + } +#endif + uint32_t data_length = (uint32_t)length; + + // XXX A closed channel may trip this... check + // NOTE: the updated spec from the IETF says we should set in-order until we + // receive an ACK. That would make this code moot. Keep it for now for + // backwards compatibility. + if (!channel) { + // In the updated 0-RTT open case, the sender can send data immediately + // after Open, and doesn't set the in-order bit (since we don't have a + // response or ack). Also, with external negotiation, data can come in + // before we're told about the external negotiation. We need to buffer + // data until either a) Open comes in, if the ordering get messed up, + // or b) the app tells us this channel was externally negotiated. When + // these occur, we deliver the data. + + // Since this is rare and non-performance, keep a single list of queued + // data messages to deliver once the channel opens. + DC_DEBUG(("Queuing data for stream %u, length %u", stream, data_length)); + // Copies data + mQueuedData.AppendElement( + new QueuedDataMessage(stream, ppid, flags, data, data_length)); + return; + } + + // RFC8832: "MUST be sent ordered, ... After the DATA_CHANNEL_ACK **or any + // other message** has been received on the data channel". + // If the channel was opened on this side, and a message is received, this + // indicates that the peer has already received the DATA_CHANNEL_ACK, as the + // channel is ordered initially. + channel->mWaitingForAck = false; + + bool is_binary = true; + uint8_t bufferFlags; + DataChannelOnMessageAvailable::EventType type; + const char* info = ""; + + if (ppid == DATA_CHANNEL_PPID_DOMSTRING_PARTIAL || + ppid == DATA_CHANNEL_PPID_DOMSTRING || + ppid == DATA_CHANNEL_PPID_DOMSTRING_EMPTY) { + is_binary = false; + } + if (is_binary != channel->mIsRecvBinary && !channel->mRecvBuffer.IsEmpty()) { + NS_WARNING("DataChannel message aborted by fragment type change!"); + // TODO: Maybe closing would be better as this is a hard to detect protocol + // violation? + channel->mRecvBuffer.Truncate(0); + } + channel->mIsRecvBinary = is_binary; + + // Remaining chunks of previously truncated message (due to the buffer being + // full)? + if (channel->mClosingTooLarge) { + DC_ERROR( + ("DataChannel: Ignoring partial message of length %u, buffer full and " + "closing", + data_length)); + // Only unblock if unordered + if (!channel->mOrdered && (flags & MSG_EOR)) { + channel->mClosingTooLarge = false; + } + } + + // Buffer message until complete + bufferFlags = + BufferMessage(channel->mRecvBuffer, buffer, data_length, ppid, flags); + if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_TOO_LARGE) { + DC_ERROR( + ("DataChannel: Buffered message would become too large to handle, " + "closing channel")); + channel->mRecvBuffer.Truncate(0); + channel->mClosingTooLarge = true; + CloseLocked(channel); + return; + } + if (!(bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_COMPLETE)) { + DC_DEBUG( + ("DataChannel: Partial %s message of length %u (total %zu) on channel " + "id %u", + is_binary ? "binary" : "string", data_length, + channel->mRecvBuffer.Length(), channel->mStream)); + return; // Not ready to notify application + } + if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) { + data_length = channel->mRecvBuffer.Length(); + } + + // Complain about large messages (only complain - we can handle it) + if (data_length > WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL) { + DC_WARN( + ("DataChannel: Received message of length %u is > announced maximum " + "message size (%u)", + data_length, WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL)); + } + + bool is_empty = false; + + switch (ppid) { + case DATA_CHANNEL_PPID_DOMSTRING: + DC_DEBUG( + ("DataChannel: Received string message of length %u on channel %u", + data_length, channel->mStream)); + type = DataChannelOnMessageAvailable::EventType::OnDataString; + if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) { + info = " (string fragmented)"; + } + // else send using recvData normally + + // WebSockets checks IsUTF8() here; we can try to deliver it + break; + + case DATA_CHANNEL_PPID_DOMSTRING_EMPTY: + DC_DEBUG( + ("DataChannel: Received empty string message of length %u on channel " + "%u", + data_length, channel->mStream)); + type = DataChannelOnMessageAvailable::EventType::OnDataString; + if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) { + info = " (string fragmented)"; + } + is_empty = true; + break; + + case DATA_CHANNEL_PPID_BINARY: + DC_DEBUG( + ("DataChannel: Received binary message of length %u on channel id %u", + data_length, channel->mStream)); + type = DataChannelOnMessageAvailable::EventType::OnDataBinary; + if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) { + info = " (binary fragmented)"; + } + + // else send using recvData normally + break; + + case DATA_CHANNEL_PPID_BINARY_EMPTY: + DC_DEBUG( + ("DataChannel: Received empty binary message of length %u on channel " + "id %u", + data_length, channel->mStream)); + type = DataChannelOnMessageAvailable::EventType::OnDataBinary; + if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) { + info = " (binary fragmented)"; + } + is_empty = true; + break; + + default: + NS_ERROR("Unknown data PPID"); + DC_ERROR(("Unknown data PPID %" PRIu32, ppid)); + return; + } + + channel->WithTrafficCounters( + [&data_length](DataChannel::TrafficCounters& counters) { + counters.mMessagesReceived++; + counters.mBytesReceived += data_length; + }); + + // Notify onmessage + DC_DEBUG( + ("%s: sending %s%s for %p", __FUNCTION__, ToString(type), info, channel)); + if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) { + channel->SendOrQueue(new DataChannelOnMessageAvailable( + type, this, channel, channel->mRecvBuffer)); + channel->mRecvBuffer.Truncate(0); + } else { + nsAutoCString recvData(is_empty ? "" : buffer, + is_empty ? 0 : data_length); // allocates >64 + channel->SendOrQueue( + new DataChannelOnMessageAvailable(type, this, channel, recvData)); + } +} + +void DataChannelConnection::HandleDCEPMessage(const void* buffer, size_t length, + uint32_t ppid, uint16_t stream, + int flags) { + const struct rtcweb_datachannel_open_request* req; + const struct rtcweb_datachannel_ack* ack; + + // Note: Until we support SIZE_MAX sized messages, we need this check +#if (SIZE_MAX > UINT32_MAX) + if (length > UINT32_MAX) { + DC_ERROR(("DataChannel: Cannot handle message of size %zu (max=%u)", length, + UINT32_MAX)); + Stop(); + return; + } +#endif + uint32_t data_length = (uint32_t)length; + + mLock.AssertCurrentThreadOwns(); + + // Buffer message until complete + const uint8_t bufferFlags = + BufferMessage(mRecvBuffer, buffer, data_length, ppid, flags); + if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_TOO_LARGE) { + DC_ERROR( + ("DataChannel: Buffered message would become too large to handle, " + "closing connection")); + mRecvBuffer.Truncate(0); + Stop(); + return; + } + if (!(bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_COMPLETE)) { + DC_DEBUG(("Buffered partial DCEP message of length %u", data_length)); + return; + } + if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) { + buffer = reinterpret_cast<const void*>(mRecvBuffer.BeginReading()); + data_length = mRecvBuffer.Length(); + } + + req = static_cast<const struct rtcweb_datachannel_open_request*>(buffer); + DC_DEBUG(("Handling DCEP message of length %u", data_length)); + + // Ensure minimum message size (ack is the smallest DCEP message) + if ((size_t)data_length < sizeof(*ack)) { + DC_WARN(("Ignored invalid DCEP message (too short)")); + return; + } + + switch (req->msg_type) { + case DATA_CHANNEL_OPEN_REQUEST: + // structure includes a possibly-unused char label[1] (in a packed + // structure) + if (NS_WARN_IF((size_t)data_length < sizeof(*req) - 1)) { + return; + } + + HandleOpenRequestMessage(req, data_length, stream); + break; + case DATA_CHANNEL_ACK: + // >= sizeof(*ack) checked above + + ack = static_cast<const struct rtcweb_datachannel_ack*>(buffer); + HandleOpenAckMessage(ack, data_length, stream); + break; + default: + HandleUnknownMessage(ppid, data_length, stream); + break; + } + + // Reset buffer + mRecvBuffer.Truncate(0); +} + +// Called with mLock locked! +void DataChannelConnection::HandleMessage(const void* buffer, size_t length, + uint32_t ppid, uint16_t stream, + int flags) { + mLock.AssertCurrentThreadOwns(); + + switch (ppid) { + case DATA_CHANNEL_PPID_CONTROL: + HandleDCEPMessage(buffer, length, ppid, stream, flags); + break; + case DATA_CHANNEL_PPID_DOMSTRING_PARTIAL: + case DATA_CHANNEL_PPID_DOMSTRING: + case DATA_CHANNEL_PPID_DOMSTRING_EMPTY: + case DATA_CHANNEL_PPID_BINARY_PARTIAL: + case DATA_CHANNEL_PPID_BINARY: + case DATA_CHANNEL_PPID_BINARY_EMPTY: + HandleDataMessage(buffer, length, ppid, stream, flags); + break; + default: + DC_ERROR(( + "Unhandled message of length %zu PPID %u on stream %u received (%s).", + length, ppid, stream, (flags & MSG_EOR) ? "complete" : "partial")); + break; + } +} + +void DataChannelConnection::HandleAssociationChangeEvent( + const struct sctp_assoc_change* sac) { + mLock.AssertCurrentThreadOwns(); + + uint32_t i, n; + DataChannelConnectionState state = GetState(); + switch (sac->sac_state) { + case SCTP_COMM_UP: + DC_DEBUG(("Association change: SCTP_COMM_UP")); + if (state == DataChannelConnectionState::Connecting) { + mSocket = mMasterSocket; + SetState(DataChannelConnectionState::Open); + + DC_DEBUG(("Negotiated number of incoming streams: %" PRIu16, + sac->sac_inbound_streams)); + DC_DEBUG(("Negotiated number of outgoing streams: %" PRIu16, + sac->sac_outbound_streams)); + mNegotiatedIdLimit = + std::max(mNegotiatedIdLimit, + static_cast<size_t>(std::max(sac->sac_outbound_streams, + sac->sac_inbound_streams))); + + Dispatch(do_AddRef(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::EventType::OnConnection, this))); + DC_DEBUG(("DTLS connect() succeeded! Entering connected mode")); + + // Open any streams pending... + ProcessQueuedOpens(); + + } else if (state == DataChannelConnectionState::Open) { + DC_DEBUG(("DataConnection Already OPEN")); + } else { + DC_ERROR(("Unexpected state: %s", ToString(state))); + } + break; + case SCTP_COMM_LOST: + DC_DEBUG(("Association change: SCTP_COMM_LOST")); + // This association is toast, so also close all the channels -- from + // mainthread! + Stop(); + break; + case SCTP_RESTART: + DC_DEBUG(("Association change: SCTP_RESTART")); + break; + case SCTP_SHUTDOWN_COMP: + DC_DEBUG(("Association change: SCTP_SHUTDOWN_COMP")); + Stop(); + break; + case SCTP_CANT_STR_ASSOC: + DC_DEBUG(("Association change: SCTP_CANT_STR_ASSOC")); + break; + default: + DC_DEBUG(("Association change: UNKNOWN")); + break; + } + DC_DEBUG(("Association change: streams (in/out) = (%u/%u)", + sac->sac_inbound_streams, sac->sac_outbound_streams)); + + if (NS_WARN_IF(!sac)) { + return; + } + + n = sac->sac_length - sizeof(*sac); + if ((sac->sac_state == SCTP_COMM_UP) || (sac->sac_state == SCTP_RESTART)) { + if (n > 0) { + for (i = 0; i < n; ++i) { + switch (sac->sac_info[i]) { + case SCTP_ASSOC_SUPPORTS_PR: + DC_DEBUG(("Supports: PR")); + break; + case SCTP_ASSOC_SUPPORTS_AUTH: + DC_DEBUG(("Supports: AUTH")); + break; + case SCTP_ASSOC_SUPPORTS_ASCONF: + DC_DEBUG(("Supports: ASCONF")); + break; + case SCTP_ASSOC_SUPPORTS_MULTIBUF: + DC_DEBUG(("Supports: MULTIBUF")); + break; + case SCTP_ASSOC_SUPPORTS_RE_CONFIG: + DC_DEBUG(("Supports: RE-CONFIG")); + break; +#if defined(SCTP_ASSOC_SUPPORTS_INTERLEAVING) + case SCTP_ASSOC_SUPPORTS_INTERLEAVING: + DC_DEBUG(("Supports: NDATA")); + // TODO: This should probably be set earlier above in 'case + // SCTP_COMM_UP' but we also need this for 'SCTP_RESTART'. + mSendInterleaved = true; + break; +#endif + default: + DC_ERROR(("Supports: UNKNOWN(0x%02x)", sac->sac_info[i])); + break; + } + } + } + } else if (((sac->sac_state == SCTP_COMM_LOST) || + (sac->sac_state == SCTP_CANT_STR_ASSOC)) && + (n > 0)) { + DC_DEBUG(("Association: ABORT =")); + for (i = 0; i < n; ++i) { + DC_DEBUG((" 0x%02x", sac->sac_info[i])); + } + } + if ((sac->sac_state == SCTP_CANT_STR_ASSOC) || + (sac->sac_state == SCTP_SHUTDOWN_COMP) || + (sac->sac_state == SCTP_COMM_LOST)) { + return; + } +} + +void DataChannelConnection::HandlePeerAddressChangeEvent( + const struct sctp_paddr_change* spc) { + const char* addr = ""; +#if !defined(__Userspace_os_Windows) + char addr_buf[INET6_ADDRSTRLEN]; + struct sockaddr_in* sin; + struct sockaddr_in6* sin6; +#endif + + switch (spc->spc_aaddr.ss_family) { + case AF_INET: +#if !defined(__Userspace_os_Windows) + sin = (struct sockaddr_in*)&spc->spc_aaddr; + addr = inet_ntop(AF_INET, &sin->sin_addr, addr_buf, INET6_ADDRSTRLEN); +#endif + break; + case AF_INET6: +#if !defined(__Userspace_os_Windows) + sin6 = (struct sockaddr_in6*)&spc->spc_aaddr; + addr = inet_ntop(AF_INET6, &sin6->sin6_addr, addr_buf, INET6_ADDRSTRLEN); +#endif + break; + case AF_CONN: + addr = "DTLS connection"; + break; + default: + break; + } + DC_DEBUG(("Peer address %s is now ", addr)); + switch (spc->spc_state) { + case SCTP_ADDR_AVAILABLE: + DC_DEBUG(("SCTP_ADDR_AVAILABLE")); + break; + case SCTP_ADDR_UNREACHABLE: + DC_DEBUG(("SCTP_ADDR_UNREACHABLE")); + break; + case SCTP_ADDR_REMOVED: + DC_DEBUG(("SCTP_ADDR_REMOVED")); + break; + case SCTP_ADDR_ADDED: + DC_DEBUG(("SCTP_ADDR_ADDED")); + break; + case SCTP_ADDR_MADE_PRIM: + DC_DEBUG(("SCTP_ADDR_MADE_PRIM")); + break; + case SCTP_ADDR_CONFIRMED: + DC_DEBUG(("SCTP_ADDR_CONFIRMED")); + break; + default: + DC_ERROR(("UNKNOWN SCP STATE")); + break; + } + if (spc->spc_error) { + DC_ERROR((" (error = 0x%08x).\n", spc->spc_error)); + } +} + +void DataChannelConnection::HandleRemoteErrorEvent( + const struct sctp_remote_error* sre) { + size_t i, n; + + n = sre->sre_length - sizeof(struct sctp_remote_error); + DC_WARN(("Remote Error (error = 0x%04x): ", sre->sre_error)); + for (i = 0; i < n; ++i) { + DC_WARN((" 0x%02x", sre->sre_data[i])); + } +} + +void DataChannelConnection::HandleShutdownEvent( + const struct sctp_shutdown_event* sse) { + DC_DEBUG(("Shutdown event.")); + /* XXX: notify all channels. */ + // Attempts to actually send anything will fail +} + +void DataChannelConnection::HandleAdaptationIndication( + const struct sctp_adaptation_event* sai) { + DC_DEBUG(("Adaptation indication: %x.", sai->sai_adaptation_ind)); +} + +void DataChannelConnection::HandlePartialDeliveryEvent( + const struct sctp_pdapi_event* spde) { + // Note: Be aware that stream and sequence number being u32 instead of u16 is + // a bug in the SCTP API. This may change in the future. + + DC_DEBUG(("Partial delivery event: ")); + switch (spde->pdapi_indication) { + case SCTP_PARTIAL_DELIVERY_ABORTED: + DC_DEBUG(("delivery aborted ")); + break; + default: + DC_ERROR(("??? ")); + break; + } + DC_DEBUG(("(flags = %x), stream = %" PRIu32 ", sn = %" PRIu32, + spde->pdapi_flags, spde->pdapi_stream, spde->pdapi_seq)); + + // Validate stream ID + if (spde->pdapi_stream >= UINT16_MAX) { + DC_ERROR(("Invalid stream id in partial delivery event: %" PRIu32 "\n", + spde->pdapi_stream)); + return; + } + + // Find channel and reset buffer + DataChannel* channel = FindChannelByStream((uint16_t)spde->pdapi_stream); + if (channel) { + DC_WARN(("Abort partially delivered message of %zu bytes\n", + channel->mRecvBuffer.Length())); + channel->mRecvBuffer.Truncate(0); + } +} + +void DataChannelConnection::HandleSendFailedEvent( + const struct sctp_send_failed_event* ssfe) { + size_t i, n; + + if (ssfe->ssfe_flags & SCTP_DATA_UNSENT) { + DC_DEBUG(("Unsent ")); + } + if (ssfe->ssfe_flags & SCTP_DATA_SENT) { + DC_DEBUG(("Sent ")); + } + if (ssfe->ssfe_flags & ~(SCTP_DATA_SENT | SCTP_DATA_UNSENT)) { + DC_DEBUG(("(flags = %x) ", ssfe->ssfe_flags)); + } +#ifdef XP_WIN +# define PRIPPID "lu" +#else +# define PRIPPID "u" +#endif + DC_DEBUG(("message with PPID = %" PRIPPID + ", SID = %d, flags: 0x%04x due to error = 0x%08x", + ntohl(ssfe->ssfe_info.snd_ppid), ssfe->ssfe_info.snd_sid, + ssfe->ssfe_info.snd_flags, ssfe->ssfe_error)); +#undef PRIPPID + n = ssfe->ssfe_length - sizeof(struct sctp_send_failed_event); + for (i = 0; i < n; ++i) { + DC_DEBUG((" 0x%02x", ssfe->ssfe_data[i])); + } +} + +void DataChannelConnection::ClearResets() { + // Clear all pending resets + if (!mStreamsResetting.IsEmpty()) { + DC_DEBUG(("Clearing resets for %zu streams", mStreamsResetting.Length())); + } + + for (uint32_t i = 0; i < mStreamsResetting.Length(); ++i) { + RefPtr<DataChannel> channel; + channel = FindChannelByStream(mStreamsResetting[i]); + if (channel) { + DC_DEBUG(("Forgetting channel %u (%p) with pending reset", + channel->mStream, channel.get())); + // TODO: Do we _really_ want to remove this? Are we allowed to reuse the + // id? + mChannels.Remove(channel); + } + } + mStreamsResetting.Clear(); +} + +void DataChannelConnection::ResetOutgoingStream(uint16_t stream) { + uint32_t i; + + mLock.AssertCurrentThreadOwns(); + DC_DEBUG( + ("Connection %p: Resetting outgoing stream %u", (void*)this, stream)); + // Rarely has more than a couple items and only for a short time + for (i = 0; i < mStreamsResetting.Length(); ++i) { + if (mStreamsResetting[i] == stream) { + return; + } + } + mStreamsResetting.AppendElement(stream); +} + +void DataChannelConnection::SendOutgoingStreamReset() { + struct sctp_reset_streams* srs; + uint32_t i; + size_t len; + + DC_DEBUG(("Connection %p: Sending outgoing stream reset for %zu streams", + (void*)this, mStreamsResetting.Length())); + mLock.AssertCurrentThreadOwns(); + if (mStreamsResetting.IsEmpty()) { + DC_DEBUG(("No streams to reset")); + return; + } + len = sizeof(sctp_assoc_t) + + (2 + mStreamsResetting.Length()) * sizeof(uint16_t); + srs = static_cast<struct sctp_reset_streams*>( + moz_xmalloc(len)); // infallible malloc + memset(srs, 0, len); + srs->srs_flags = SCTP_STREAM_RESET_OUTGOING; + srs->srs_number_streams = mStreamsResetting.Length(); + for (i = 0; i < mStreamsResetting.Length(); ++i) { + srs->srs_stream_list[i] = mStreamsResetting[i]; + } + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_RESET_STREAMS, srs, + (socklen_t)len) < 0) { + DC_ERROR(("***failed: setsockopt RESET, errno %d", errno)); + // if errno == EALREADY, this is normal - we can't send another reset + // with one pending. + // When we get an incoming reset (which may be a response to our + // outstanding one), see if we have any pending outgoing resets and + // send them + } else { + mStreamsResetting.Clear(); + } + free(srs); +} + +void DataChannelConnection::HandleStreamResetEvent( + const struct sctp_stream_reset_event* strrst) { + uint32_t n, i; + RefPtr<DataChannel> channel; // since we may null out the ref to the channel + + if (!(strrst->strreset_flags & SCTP_STREAM_RESET_DENIED) && + !(strrst->strreset_flags & SCTP_STREAM_RESET_FAILED)) { + n = (strrst->strreset_length - sizeof(struct sctp_stream_reset_event)) / + sizeof(uint16_t); + for (i = 0; i < n; ++i) { + if (strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) { + channel = FindChannelByStream(strrst->strreset_stream_list[i]); + if (channel) { + // The other side closed the channel + // We could be in three states: + // 1. Normal state (input and output streams (OPEN) + // Notify application, send a RESET in response on our + // outbound channel. Go to CLOSED + // 2. We sent our own reset (CLOSING); either they crossed on the + // wire, or this is a response to our Reset. + // Go to CLOSED + // 3. We've sent a open but haven't gotten a response yet (CONNECTING) + // I believe this is impossible, as we don't have an input stream + // yet. + + DC_DEBUG(("Incoming: Channel %u closed", channel->mStream)); + if (mChannels.Remove(channel)) { + // Mark the stream for reset (the reset is sent below) + ResetOutgoingStream(channel->mStream); + } + + DC_DEBUG(("Disconnected DataChannel %p from connection %p", + (void*)channel.get(), (void*)channel->mConnection.get())); + channel->StreamClosedLocked(); + } else { + DC_WARN(("Can't find incoming channel %d", i)); + } + } + } + } + + // Process any pending resets now: + if (!mStreamsResetting.IsEmpty()) { + DC_DEBUG(("Sending %zu pending resets", mStreamsResetting.Length())); + SendOutgoingStreamReset(); + } +} + +void DataChannelConnection::HandleStreamChangeEvent( + const struct sctp_stream_change_event* strchg) { + ASSERT_WEBRTC(!NS_IsMainThread()); + if (strchg->strchange_flags == SCTP_STREAM_CHANGE_DENIED) { + DC_ERROR(("*** Failed increasing number of streams from %zu (%u/%u)", + mNegotiatedIdLimit, strchg->strchange_instrms, + strchg->strchange_outstrms)); + // XXX FIX! notify pending opens of failure + return; + } + if (strchg->strchange_instrms > mNegotiatedIdLimit) { + DC_DEBUG(("Other side increased streams from %zu to %u", mNegotiatedIdLimit, + strchg->strchange_instrms)); + } + uint16_t old_limit = mNegotiatedIdLimit; + uint16_t new_limit = + std::max(strchg->strchange_outstrms, strchg->strchange_instrms); + if (new_limit > mNegotiatedIdLimit) { + DC_DEBUG(("Increasing number of streams from %u to %u - adding %u (in: %u)", + old_limit, new_limit, new_limit - old_limit, + strchg->strchange_instrms)); + // make sure both are the same length + mNegotiatedIdLimit = new_limit; + DC_DEBUG(("New length = %zu (was %d)", mNegotiatedIdLimit, old_limit)); + // Re-process any channels waiting for streams. + // Linear search, but we don't increase channels often and + // the array would only get long in case of an app error normally + + // Make sure we request enough streams if there's a big jump in streams + // Could make a more complex API for OpenXxxFinish() and avoid this loop + auto channels = mChannels.GetAll(); + size_t num_needed = + channels.Length() ? (channels.LastElement()->mStream + 1) : 0; + MOZ_ASSERT(num_needed != INVALID_STREAM); + if (num_needed > new_limit) { + int32_t more_needed = num_needed - ((int32_t)mNegotiatedIdLimit) + 16; + DC_DEBUG(("Not enough new streams, asking for %d more", more_needed)); + // TODO: parameter is an int32_t but we pass size_t + RequestMoreStreams(more_needed); + } else if (strchg->strchange_outstrms < strchg->strchange_instrms) { + DC_DEBUG(("Requesting %d output streams to match partner", + strchg->strchange_instrms - strchg->strchange_outstrms)); + RequestMoreStreams(strchg->strchange_instrms - + strchg->strchange_outstrms); + } + + ProcessQueuedOpens(); + } + // else probably not a change in # of streams + + if ((strchg->strchange_flags & SCTP_STREAM_CHANGE_DENIED) || + (strchg->strchange_flags & SCTP_STREAM_CHANGE_FAILED)) { + // Other side denied our request. Need to AnnounceClosed some stuff. + for (auto& channel : mChannels.GetAll()) { + if (channel->mStream >= mNegotiatedIdLimit) { + /* XXX: Signal to the other end. */ + channel->AnnounceClosed(); + // maybe fire onError (bug 843625) + } + } + } +} + +// Called with mLock locked! +void DataChannelConnection::HandleNotification( + const union sctp_notification* notif, size_t n) { + mLock.AssertCurrentThreadOwns(); + if (notif->sn_header.sn_length != (uint32_t)n) { + return; + } + switch (notif->sn_header.sn_type) { + case SCTP_ASSOC_CHANGE: + HandleAssociationChangeEvent(&(notif->sn_assoc_change)); + break; + case SCTP_PEER_ADDR_CHANGE: + HandlePeerAddressChangeEvent(&(notif->sn_paddr_change)); + break; + case SCTP_REMOTE_ERROR: + HandleRemoteErrorEvent(&(notif->sn_remote_error)); + break; + case SCTP_SHUTDOWN_EVENT: + HandleShutdownEvent(&(notif->sn_shutdown_event)); + break; + case SCTP_ADAPTATION_INDICATION: + HandleAdaptationIndication(&(notif->sn_adaptation_event)); + break; + case SCTP_AUTHENTICATION_EVENT: + DC_DEBUG(("SCTP_AUTHENTICATION_EVENT")); + break; + case SCTP_SENDER_DRY_EVENT: + // DC_DEBUG(("SCTP_SENDER_DRY_EVENT")); + break; + case SCTP_NOTIFICATIONS_STOPPED_EVENT: + DC_DEBUG(("SCTP_NOTIFICATIONS_STOPPED_EVENT")); + break; + case SCTP_PARTIAL_DELIVERY_EVENT: + HandlePartialDeliveryEvent(&(notif->sn_pdapi_event)); + break; + case SCTP_SEND_FAILED_EVENT: + HandleSendFailedEvent(&(notif->sn_send_failed_event)); + break; + case SCTP_STREAM_RESET_EVENT: + HandleStreamResetEvent(&(notif->sn_strreset_event)); + break; + case SCTP_ASSOC_RESET_EVENT: + DC_DEBUG(("SCTP_ASSOC_RESET_EVENT")); + break; + case SCTP_STREAM_CHANGE_EVENT: + HandleStreamChangeEvent(&(notif->sn_strchange_event)); + break; + default: + DC_ERROR(("unknown SCTP event: %u", (uint32_t)notif->sn_header.sn_type)); + break; + } +} + +int DataChannelConnection::ReceiveCallback( + struct socket* sock, void* data, size_t datalen, struct sctp_rcvinfo rcv, + int flags) MOZ_NO_THREAD_SAFETY_ANALYSIS { + ASSERT_WEBRTC(!NS_IsMainThread()); + DC_DEBUG(("In ReceiveCallback")); + + // libusrsctp just went reentrant on us. Put a stop to this. + mSTS->Dispatch(NS_NewRunnableFunction( + "DataChannelConnection::ReceiveCallback", + [data, datalen, rcv, flags, this, + self = RefPtr<DataChannelConnection>(this)]() mutable { + if (!data) { + DC_DEBUG(("ReceiveCallback: SCTP has finished shutting down")); + } else { + mLock.Lock(); + if (flags & MSG_NOTIFICATION) { + HandleNotification(static_cast<union sctp_notification*>(data), + datalen); + } else { + HandleMessage(data, datalen, ntohl(rcv.rcv_ppid), rcv.rcv_sid, + flags); + } + mLock.Unlock(); + // sctp allocates 'data' with malloc(), and expects the receiver to + // free it (presumably with free). + // XXX future optimization: try to deliver messages without an + // internal alloc/copy, and if so delay the free until later. + free(data); + } + })); + + // usrsctp defines the callback as returning an int, but doesn't use it + return 1; +} + +already_AddRefed<DataChannel> DataChannelConnection::Open( + const nsACString& label, const nsACString& protocol, + DataChannelReliabilityPolicy prPolicy, bool inOrder, uint32_t prValue, + DataChannelListener* aListener, nsISupports* aContext, + bool aExternalNegotiated, uint16_t aStream) { + ASSERT_WEBRTC(NS_IsMainThread()); + MutexAutoLock lock(mLock); // OpenFinish assumes this + if (!aExternalNegotiated) { + if (mAllocateEven.isSome()) { + aStream = FindFreeStream(); + if (aStream == INVALID_STREAM) { + return nullptr; + } + } else { + // We do not yet know whether we are client or server, and an id has not + // been chosen for us. We will need to choose later. + aStream = INVALID_STREAM; + } + } + + DC_DEBUG( + ("DC Open: label %s/%s, type %s, inorder %d, prValue %u, listener %p, " + "context %p, external: %s, stream %u", + PromiseFlatCString(label).get(), PromiseFlatCString(protocol).get(), + ToString(prPolicy), inOrder, prValue, aListener, aContext, + aExternalNegotiated ? "true" : "false", aStream)); + + if ((prPolicy == DataChannelReliabilityPolicy::Reliable) && (prValue != 0)) { + return nullptr; + } + + if (aStream != INVALID_STREAM && mChannels.Get(aStream)) { + DC_ERROR(("external negotiation of already-open channel %u", aStream)); + return nullptr; + } + + RefPtr<DataChannel> channel(new DataChannel( + this, aStream, DataChannelState::Connecting, label, protocol, prPolicy, + prValue, inOrder, aExternalNegotiated, aListener, aContext)); + mChannels.Insert(channel); + + return OpenFinish(channel.forget()); +} + +// Separate routine so we can also call it to finish up from pending opens +already_AddRefed<DataChannel> DataChannelConnection::OpenFinish( + already_AddRefed<DataChannel>&& aChannel) { + RefPtr<DataChannel> channel(aChannel); // takes the reference passed in + // Normally 1 reference if called from ::Open(), or 2 if called from + // ProcessQueuedOpens() unless the DOMDataChannel was gc'd + const uint16_t stream = channel->mStream; + + mLock.AssertCurrentThreadOwns(); + + // Cases we care about: + // Pre-negotiated: + // Not Open: + // Doesn't fit: + // -> change initial ask or renegotiate after open + // -> queue open + // Open: + // Doesn't fit: + // -> RequestMoreStreams && queue + // Does fit: + // -> open + // Not negotiated: + // Not Open: + // -> queue open + // Open: + // -> Try to get a stream + // Doesn't fit: + // -> RequestMoreStreams && queue + // Does fit: + // -> open + // So the Open cases are basically the same + // Not Open cases are simply queue for non-negotiated, and + // either change the initial ask or possibly renegotiate after open. + DataChannelConnectionState state = GetState(); + if (state != DataChannelConnectionState::Open || + stream >= mNegotiatedIdLimit) { + if (state == DataChannelConnectionState::Open) { + MOZ_ASSERT(stream != INVALID_STREAM); + // RequestMoreStreams() limits to MAX_NUM_STREAMS -- allocate extra + // streams to avoid going back immediately for more if the ask to N, N+1, + // etc + int32_t more_needed = stream - ((int32_t)mNegotiatedIdLimit) + 16; + if (!RequestMoreStreams(more_needed)) { + // Something bad happened... we're done + goto request_error_cleanup; + } + } + DC_DEBUG(("Queuing channel %p (%u) to finish open", channel.get(), stream)); + // Also serves to mark we told the app + channel->mHasFinishedOpen = true; + mPending.Push(channel); + return channel.forget(); + } + + MOZ_ASSERT(stream != INVALID_STREAM); + MOZ_ASSERT(stream < mNegotiatedIdLimit); + +#ifdef TEST_QUEUED_DATA + // It's painful to write a test for this... + channel->AnnounceOpen(); + SendDataMsgInternalOrBuffer(channel, "Help me!", 8, + DATA_CHANNEL_PPID_DOMSTRING); +#endif + + if (!channel->mNegotiated) { + if (!channel->mOrdered) { + // Don't send unordered until this gets cleared. + channel->mWaitingForAck = true; + } + + int error = SendOpenRequestMessage(channel->mLabel, channel->mProtocol, + stream, !channel->mOrdered, + channel->mPrPolicy, channel->mPrValue); + if (error) { + DC_ERROR(("SendOpenRequest failed, error = %d", error)); + if (channel->mHasFinishedOpen) { + // We already returned the channel to the app. + NS_ERROR("Failed to send open request"); + channel->AnnounceClosed(); + } + // If we haven't returned the channel yet, it will get destroyed when we + // exit this function. + mChannels.Remove(channel); + // we'll be destroying the channel + return nullptr; + /* NOTREACHED */ + } + } + + // Either externally negotiated or we sent Open + // FIX? Move into DOMDataChannel? I don't think we can send it yet here + channel->AnnounceOpen(); + + return channel.forget(); + +request_error_cleanup: + if (channel->mHasFinishedOpen) { + // We already returned the channel to the app. + NS_ERROR("Failed to request more streams"); + channel->AnnounceClosed(); + return channel.forget(); + } + // we'll be destroying the channel, but it never really got set up + // Alternative would be to RUN_ON_THREAD(channel.forget(),::Destroy,...) and + // Dispatch it to ourselves + return nullptr; +} + +// Requires mLock to be locked! +// Returns a POSIX error code directly instead of setting errno. +int DataChannelConnection::SendMsgInternal(OutgoingMsg& msg, size_t* aWritten) { + struct sctp_sndinfo& info = msg.GetInfo().sendv_sndinfo; + int error; + + // EOR set? + bool eor_set = (info.snd_flags & SCTP_EOR) != 0; + + // Send until buffer is empty + size_t left = msg.GetLeft(); + do { + size_t length; + + // Carefully chunk the buffer + if (left > DATA_CHANNEL_MAX_BINARY_FRAGMENT) { + length = DATA_CHANNEL_MAX_BINARY_FRAGMENT; + + // Unset EOR flag + info.snd_flags &= ~SCTP_EOR; + } else { + length = left; + + // Set EOR flag + if (eor_set) { + info.snd_flags |= SCTP_EOR; + } + } + + // Send (or try at least) + // SCTP will return EMSGSIZE if the message is bigger than the buffer + // size (or EAGAIN if there isn't space). However, we can avoid EMSGSIZE + // by carefully crafting small enough message chunks. + ssize_t written = usrsctp_sendv( + mSocket, msg.GetData(), length, nullptr, 0, (void*)&msg.GetInfo(), + (socklen_t)sizeof(struct sctp_sendv_spa), SCTP_SENDV_SPA, 0); + + if (written < 0) { + error = errno; + goto out; + } + + if (aWritten) { + *aWritten += written; + } + DC_DEBUG(("Sent buffer (written=%zu, len=%zu, left=%zu)", (size_t)written, + length, left - (size_t)written)); + + // TODO: Remove once resolved + // (https://github.com/sctplab/usrsctp/issues/132) + if (written == 0) { + DC_ERROR(("@tuexen: usrsctp_sendv returned 0")); + error = EAGAIN; + goto out; + } + + // If not all bytes have been written, this obviously means that usrsctp's + // buffer is full and we need to try again later. + if ((size_t)written < length) { + msg.Advance((size_t)written); + error = EAGAIN; + goto out; + } + + // Update buffer position + msg.Advance((size_t)written); + + // Get amount of bytes left in the buffer + left = msg.GetLeft(); + } while (left > 0); + + // Done + error = 0; + +out: + // Reset EOR flag + if (eor_set) { + info.snd_flags |= SCTP_EOR; + } + + return error; +} + +// Requires mLock to be locked! +// Returns a POSIX error code directly instead of setting errno. +// IMPORTANT: Ensure that the buffer passed is guarded by mLock! +int DataChannelConnection::SendMsgInternalOrBuffer( + nsTArray<UniquePtr<BufferedOutgoingMsg>>& buffer, OutgoingMsg& msg, + bool& buffered, size_t* aWritten) { + NS_WARNING_ASSERTION(msg.GetLength() > 0, "Length is 0?!"); + + int error = 0; + bool need_buffering = false; + + // Note: Main-thread IO, but doesn't block! + // XXX FIX! to deal with heavy overruns of JS trying to pass data in + // (more than the buffersize) queue data onto another thread to do the + // actual sends. See netwerk/protocol/websocket/WebSocketChannel.cpp + + // Avoid a race between buffer-full-failure (where we have to add the + // packet to the buffered-data queue) and the buffer-now-only-half-full + // callback, which happens on a different thread. Otherwise we might + // fail here, then before we add it to the queue get the half-full + // callback, find nothing to do, then on this thread add it to the + // queue - which would sit there. Also, if we later send more data, it + // would arrive ahead of the buffered message, but if the buffer ever + // got to 1/2 full, the message would get sent - but at a semi-random + // time, after other data it was supposed to be in front of. + + // Must lock before empty check for similar reasons! + mLock.AssertCurrentThreadOwns(); + if (buffer.IsEmpty() && + (mSendInterleaved || mPendingType == PendingType::None)) { + error = SendMsgInternal(msg, aWritten); + switch (error) { + case 0: + break; + case EAGAIN: +#if (EAGAIN != EWOULDBLOCK) + case EWOULDBLOCK: +#endif + need_buffering = true; + break; + default: + DC_ERROR(("error %d on sending", error)); + break; + } + } else { + need_buffering = true; + } + + if (need_buffering) { + // queue data for resend! And queue any further data for the stream until + // it is... + auto* bufferedMsg = new BufferedOutgoingMsg(msg); // infallible malloc + buffer.AppendElement(bufferedMsg); // owned by mBufferedData array + DC_DEBUG(("Queued %zu buffers (left=%zu, total=%zu)", buffer.Length(), + msg.GetLeft(), msg.GetLength())); + buffered = true; + return 0; + } + + buffered = false; + return error; +} + +// Caller must ensure that length <= UINT32_MAX +// Returns a POSIX error code. +int DataChannelConnection::SendDataMsgInternalOrBuffer(DataChannel& channel, + const uint8_t* data, + size_t len, + uint32_t ppid) { + if (NS_WARN_IF(channel.GetReadyState() != DataChannelState::Open)) { + return EINVAL; // TODO: Find a better error code + } + + struct sctp_sendv_spa info = {}; + + // General flags + info.sendv_flags = SCTP_SEND_SNDINFO_VALID; + + // Set stream identifier, protocol identifier and flags + info.sendv_sndinfo.snd_sid = channel.mStream; + info.sendv_sndinfo.snd_flags = SCTP_EOR; + info.sendv_sndinfo.snd_ppid = htonl(ppid); + + // Unordered? + // To avoid problems where an in-order OPEN is lost and an + // out-of-order data message "beats" it, require data to be in-order + // until we get an ACK. + if (!channel.mOrdered && !channel.mWaitingForAck) { + info.sendv_sndinfo.snd_flags |= SCTP_UNORDERED; + } + + // Partial reliability policy + if (channel.mPrPolicy != DataChannelReliabilityPolicy::Reliable) { + info.sendv_prinfo.pr_policy = ToUsrsctpValue(channel.mPrPolicy); + info.sendv_prinfo.pr_value = channel.mPrValue; + info.sendv_flags |= SCTP_SEND_PRINFO_VALID; + } + + // Create message instance and send + OutgoingMsg msg(info, data, len); + bool buffered; + size_t written = 0; + mDeferSend = true; + int error = + SendMsgInternalOrBuffer(channel.mBufferedData, msg, buffered, &written); + mDeferSend = false; + if (written && ppid != DATA_CHANNEL_PPID_DOMSTRING_EMPTY && + ppid != DATA_CHANNEL_PPID_BINARY_EMPTY) { + channel.DecrementBufferedAmount(written); + } + + for (auto&& packet : mDeferredSend) { + MOZ_ASSERT(written); + SendPacket(std::move(packet)); + } + mDeferredSend.clear(); + + // Set pending type and stream index (if buffered) + if (!error && buffered && mPendingType == PendingType::None) { + mPendingType = PendingType::Data; + mCurrentStream = channel.mStream; + } + return error; +} + +// Caller must ensure that length <= UINT32_MAX +// Returns a POSIX error code. +int DataChannelConnection::SendDataMsg(DataChannel& channel, + const uint8_t* data, size_t len, + uint32_t ppidPartial, + uint32_t ppidFinal) { + // We *really* don't want to do this from main thread! - and + // SendDataMsgInternalOrBuffer avoids blocking. + mLock.AssertCurrentThreadOwns(); + + if (mMaxMessageSize != 0 && len > mMaxMessageSize) { + DC_ERROR(("Message rejected, too large (%zu > %" PRIu64 ")", len, + mMaxMessageSize)); + return EMSGSIZE; + } + + // This will use EOR-based fragmentation if the message is too large (> 64 + // KiB) + return SendDataMsgInternalOrBuffer(channel, data, len, ppidFinal); +} + +class ReadBlobRunnable : public Runnable { + public: + ReadBlobRunnable(DataChannelConnection* aConnection, uint16_t aStream, + nsIInputStream* aBlob) + : Runnable("ReadBlobRunnable"), + mConnection(aConnection), + mStream(aStream), + mBlob(aBlob) {} + + NS_IMETHOD Run() override { + // ReadBlob() is responsible to releasing the reference + DataChannelConnection* self = mConnection; + self->ReadBlob(mConnection.forget(), mStream, mBlob); + return NS_OK; + } + + private: + // Make sure the Connection doesn't die while there are jobs outstanding. + // Let it die (if released by PeerConnectionImpl while we're running) + // when we send our runnable back to MainThread. Then ~DataChannelConnection + // can send the IOThread to MainThread to die in a runnable, avoiding + // unsafe event loop recursion. Evil. + RefPtr<DataChannelConnection> mConnection; + uint16_t mStream; + // Use RefCount for preventing the object is deleted when SendBlob returns. + RefPtr<nsIInputStream> mBlob; +}; + +// Returns a POSIX error code. +int DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream* aBlob) { + MutexAutoLock lock(mLock); + RefPtr<DataChannel> channel = mChannels.Get(stream); + if (NS_WARN_IF(!channel)) { + return EINVAL; // TODO: Find a better error code + } + + // Spawn a thread to send the data + if (!mInternalIOThread) { + nsresult rv = + NS_NewNamedThread("DataChannel IO", getter_AddRefs(mInternalIOThread)); + if (NS_FAILED(rv)) { + return EINVAL; // TODO: Find a better error code + } + } + + mInternalIOThread->Dispatch( + do_AddRef(new ReadBlobRunnable(this, stream, aBlob)), NS_DISPATCH_NORMAL); + return 0; +} + +class DataChannelBlobSendRunnable : public Runnable { + public: + DataChannelBlobSendRunnable( + already_AddRefed<DataChannelConnection>& aConnection, uint16_t aStream) + : Runnable("DataChannelBlobSendRunnable"), + mConnection(aConnection), + mStream(aStream) {} + + ~DataChannelBlobSendRunnable() override { + if (!NS_IsMainThread() && mConnection) { + MOZ_ASSERT(false); + // explicitly leak the connection if destroyed off mainthread + Unused << mConnection.forget().take(); + } + } + + NS_IMETHOD Run() override { + ASSERT_WEBRTC(NS_IsMainThread()); + + mConnection->SendBinaryMsg(mStream, mData); + mConnection = nullptr; + return NS_OK; + } + + // explicitly public so we can avoid allocating twice and copying + nsCString mData; + + private: + // Note: we can be destroyed off the target thread, so be careful not to let + // this get Released()ed on the temp thread! + RefPtr<DataChannelConnection> mConnection; + uint16_t mStream; +}; + +void DataChannelConnection::SetState(DataChannelConnectionState aState) { + mLock.AssertCurrentThreadOwns(); + + DC_DEBUG( + ("DataChannelConnection labeled %s (%p) switching connection state %s -> " + "%s", + mTransportId.c_str(), this, ToString(mState), ToString(aState))); + + mState = aState; +} + +void DataChannelConnection::ReadBlob( + already_AddRefed<DataChannelConnection> aThis, uint16_t aStream, + nsIInputStream* aBlob) { + // NOTE: 'aThis' has been forgotten by the caller to avoid releasing + // it off mainthread; if PeerConnectionImpl has released then we want + // ~DataChannelConnection() to run on MainThread + + // XXX to do this safely, we must enqueue these atomically onto the + // output socket. We need a sender thread(s?) to enqueue data into the + // socket and to avoid main-thread IO that might block. Even on a + // background thread, we may not want to block on one stream's data. + // I.e. run non-blocking and service multiple channels. + + // Must not let Dispatching it cause the DataChannelConnection to get + // released on the wrong thread. Using + // WrapRunnable(RefPtr<DataChannelConnection>(aThis),... will occasionally + // cause aThis to get released on this thread. Also, an explicit Runnable + // lets us avoid copying the blob data an extra time. + RefPtr<DataChannelBlobSendRunnable> runnable = + new DataChannelBlobSendRunnable(aThis, aStream); + // avoid copying the blob data by passing the mData from the runnable + if (NS_FAILED(NS_ReadInputStreamToString(aBlob, runnable->mData, -1))) { + // Bug 966602: Doesn't return an error to the caller via onerror. + // We must release DataChannelConnection on MainThread to avoid issues (bug + // 876167) aThis is now owned by the runnable; release it there + NS_ReleaseOnMainThread("DataChannelBlobSendRunnable", runnable.forget()); + return; + } + aBlob->Close(); + Dispatch(runnable.forget()); +} + +// Returns a POSIX error code. +int DataChannelConnection::SendDataMsgCommon(uint16_t stream, + const nsACString& aMsg, + bool isBinary) { + ASSERT_WEBRTC(NS_IsMainThread()); + // We really could allow this from other threads, so long as we deal with + // asynchronosity issues with channels closing, in particular access to + // mChannels, and issues with the association closing (access to mSocket). + + const uint8_t* data = (const uint8_t*)aMsg.BeginReading(); + uint32_t len = aMsg.Length(); +#if (UINT32_MAX > SIZE_MAX) + if (len > SIZE_MAX) { + return EMSGSIZE; + } +#endif + + DC_DEBUG(("Sending %sto stream %u: %u bytes", isBinary ? "binary " : "", + stream, len)); + // XXX if we want more efficiency, translate flags once at open time + RefPtr<DataChannel> channelPtr = mChannels.Get(stream); + if (NS_WARN_IF(!channelPtr)) { + return EINVAL; // TODO: Find a better error code + } + bool is_empty = len == 0; + const uint8_t byte = 0; + if (is_empty) { + data = &byte; + len = 1; + } + auto& channel = *channelPtr; + int err = 0; + MutexAutoLock lock(mLock); + if (isBinary) { + err = SendDataMsg( + channel, data, len, DATA_CHANNEL_PPID_BINARY_PARTIAL, + is_empty ? DATA_CHANNEL_PPID_BINARY_EMPTY : DATA_CHANNEL_PPID_BINARY); + } else { + err = SendDataMsg(channel, data, len, DATA_CHANNEL_PPID_DOMSTRING_PARTIAL, + is_empty ? DATA_CHANNEL_PPID_DOMSTRING_EMPTY + : DATA_CHANNEL_PPID_DOMSTRING); + } + if (!err) { + channel.WithTrafficCounters([&len](DataChannel::TrafficCounters& counters) { + counters.mMessagesSent++; + counters.mBytesSent += len; + }); + } + + return err; +} + +void DataChannelConnection::Stop() { + // Note: This will call 'CloseAll' from the main thread + Dispatch(do_AddRef(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::EventType::OnDisconnected, this))); +} + +void DataChannelConnection::Close(DataChannel* aChannel) { + MutexAutoLock lock(mLock); + CloseLocked(aChannel); +} + +// So we can call Close() with the lock already held +// Called from someone who holds a ref via ::Close(), or from ~DataChannel +void DataChannelConnection::CloseLocked(DataChannel* aChannel) { + MOZ_ASSERT(aChannel); + RefPtr<DataChannel> channel(aChannel); // make sure it doesn't go away on us + + mLock.AssertCurrentThreadOwns(); + DC_DEBUG(("Connection %p/Channel %p: Closing stream %u", + channel->mConnection.get(), channel.get(), channel->mStream)); + + aChannel->mBufferedData.Clear(); + if (GetState() == DataChannelConnectionState::Closed) { + // If we're CLOSING, we might leave this in place until we can send a + // reset. + mChannels.Remove(channel); + } + + // This is supposed to only be accessed from Main thread, but this has + // been accessed here from the STS thread for a long time now. + // See Bug 1586475 + DataChannelState channelState = aChannel->GetReadyState(); + // re-test since it may have closed before the lock was grabbed + if (channelState == DataChannelState::Closed || + channelState == DataChannelState::Closing) { + DC_DEBUG(("Channel already closing/closed (%s)", ToString(channelState))); + return; + } + + if (channel->mStream != INVALID_STREAM) { + ResetOutgoingStream(channel->mStream); + if (GetState() != DataChannelConnectionState::Closed) { + // Individual channel is being closed, send reset now. + SendOutgoingStreamReset(); + } + } + aChannel->SetReadyState(DataChannelState::Closing); + if (GetState() == DataChannelConnectionState::Closed) { + // we're not going to hang around waiting + channel->StreamClosedLocked(); + } + // At this point when we leave here, the object is a zombie held alive only by + // the DOM object +} + +void DataChannelConnection::CloseAll() { + DC_DEBUG(("Closing all channels (connection %p)", (void*)this)); + + // Make sure no more channels will be opened + MutexAutoLock lock(mLock); + SetState(DataChannelConnectionState::Closed); + + // Close current channels + // If there are runnables, they hold a strong ref and keep the channel + // and/or connection alive (even if in a CLOSED state) + for (auto& channel : mChannels.GetAll()) { + MutexAutoUnlock lock(mLock); + channel->Close(); + } + + // Clean up any pending opens for channels + RefPtr<DataChannel> channel; + while (nullptr != (channel = mPending.PopFront())) { + DC_DEBUG(("closing pending channel %p, stream %u", channel.get(), + channel->mStream)); + MutexAutoUnlock lock(mLock); + channel->Close(); // also releases the ref on each iteration + } + // It's more efficient to let the Resets queue in shutdown and then + // SendOutgoingStreamReset() here. + SendOutgoingStreamReset(); +} + +bool DataChannelConnection::Channels::IdComparator::Equals( + const RefPtr<DataChannel>& aChannel, uint16_t aId) const { + return aChannel->mStream == aId; +} + +bool DataChannelConnection::Channels::IdComparator::LessThan( + const RefPtr<DataChannel>& aChannel, uint16_t aId) const { + return aChannel->mStream < aId; +} + +bool DataChannelConnection::Channels::IdComparator::Equals( + const RefPtr<DataChannel>& a1, const RefPtr<DataChannel>& a2) const { + return Equals(a1, a2->mStream); +} + +bool DataChannelConnection::Channels::IdComparator::LessThan( + const RefPtr<DataChannel>& a1, const RefPtr<DataChannel>& a2) const { + return LessThan(a1, a2->mStream); +} + +void DataChannelConnection::Channels::Insert( + const RefPtr<DataChannel>& aChannel) { + DC_DEBUG(("Inserting channel %u : %p", aChannel->mStream, aChannel.get())); + MutexAutoLock lock(mMutex); + if (aChannel->mStream != INVALID_STREAM) { + MOZ_ASSERT(!mChannels.ContainsSorted(aChannel, IdComparator())); + } + + MOZ_ASSERT(!mChannels.Contains(aChannel)); + + mChannels.InsertElementSorted(aChannel, IdComparator()); +} + +bool DataChannelConnection::Channels::Remove( + const RefPtr<DataChannel>& aChannel) { + DC_DEBUG(("Removing channel %u : %p", aChannel->mStream, aChannel.get())); + MutexAutoLock lock(mMutex); + if (aChannel->mStream == INVALID_STREAM) { + return mChannels.RemoveElement(aChannel); + } + + return mChannels.RemoveElementSorted(aChannel, IdComparator()); +} + +RefPtr<DataChannel> DataChannelConnection::Channels::Get(uint16_t aId) const { + MutexAutoLock lock(mMutex); + auto index = mChannels.BinaryIndexOf(aId, IdComparator()); + if (index == ChannelArray::NoIndex) { + return nullptr; + } + return mChannels[index]; +} + +RefPtr<DataChannel> DataChannelConnection::Channels::GetNextChannel( + uint16_t aCurrentId) const { + MutexAutoLock lock(mMutex); + if (mChannels.IsEmpty()) { + return nullptr; + } + + auto index = mChannels.IndexOfFirstElementGt(aCurrentId, IdComparator()); + if (index == mChannels.Length()) { + index = 0; + } + return mChannels[index]; +} + +DataChannel::~DataChannel() { + // NS_ASSERTION since this is more "I think I caught all the cases that + // can cause this" than a true kill-the-program assertion. If this is + // wrong, nothing bad happens. A worst it's a leak. + NS_ASSERTION(mReadyState == DataChannelState::Closed || + mReadyState == DataChannelState::Closing, + "unexpected state in ~DataChannel"); +} + +void DataChannel::Close() { + if (mConnection) { + // ensure we don't get deleted + RefPtr<DataChannelConnection> connection(mConnection); + connection->Close(this); + } +} + +// Used when disconnecting from the DataChannelConnection +void DataChannel::StreamClosedLocked() { + MOZ_ASSERT(mConnection); + if (!mConnection) { + return; + } + mConnection->mLock.AssertCurrentThreadOwns(); + + DC_DEBUG(("Destroying Data channel %u", mStream)); + MOZ_ASSERT_IF(mStream != INVALID_STREAM, + !mConnection->FindChannelByStream(mStream)); + AnnounceClosed(); + // We leave mConnection live until the DOM releases us, to avoid races +} + +void DataChannel::ReleaseConnection() { + ASSERT_WEBRTC(NS_IsMainThread()); + mConnection = nullptr; +} + +void DataChannel::SetListener(DataChannelListener* aListener, + nsISupports* aContext) { + ASSERT_WEBRTC(NS_IsMainThread()); + mContext = aContext; + mListener = aListener; +} + +void DataChannel::SendErrnoToErrorResult(int error, size_t aMessageSize, + ErrorResult& aRv) { + switch (error) { + case 0: + break; + case EMSGSIZE: { + nsPrintfCString err("Message size (%zu) exceeds maxMessageSize", + aMessageSize); + aRv.ThrowTypeError(err); + break; + } + default: + aRv.Throw(NS_ERROR_DOM_OPERATION_ERR); + break; + } +} + +void DataChannel::IncrementBufferedAmount(uint32_t aSize, ErrorResult& aRv) { + ASSERT_WEBRTC(NS_IsMainThread()); + if (mBufferedAmount > UINT32_MAX - aSize) { + aRv.Throw(NS_ERROR_FILE_TOO_BIG); + return; + } + + mBufferedAmount += aSize; +} + +void DataChannel::DecrementBufferedAmount(uint32_t aSize) { + mMainThreadEventTarget->Dispatch(NS_NewRunnableFunction( + "DataChannel::DecrementBufferedAmount", + [this, self = RefPtr<DataChannel>(this), aSize] { + MOZ_ASSERT(aSize <= mBufferedAmount); + bool wasLow = mBufferedAmount <= mBufferedThreshold; + mBufferedAmount -= aSize; + if (!wasLow && mBufferedAmount <= mBufferedThreshold) { + DC_DEBUG(("%s: sending BUFFER_LOW_THRESHOLD for %s/%s: %u", + __FUNCTION__, mLabel.get(), mProtocol.get(), mStream)); + mListener->OnBufferLow(mContext); + } + if (mBufferedAmount == 0) { + DC_DEBUG(("%s: sending NO_LONGER_BUFFERED for %s/%s: %u", + __FUNCTION__, mLabel.get(), mProtocol.get(), mStream)); + mListener->NotBuffered(mContext); + } + })); +} + +void DataChannel::AnnounceOpen() { + mMainThreadEventTarget->Dispatch(NS_NewRunnableFunction( + "DataChannel::AnnounceOpen", [this, self = RefPtr<DataChannel>(this)] { + DataChannelState state = GetReadyState(); + // Special-case; spec says to put brand-new remote-created DataChannel + // in "open", but queue the firing of the "open" event. + if (state != DataChannelState::Closing && + state != DataChannelState::Closed) { + if (!mEverOpened && mConnection && mConnection->mListener) { + mEverOpened = true; + mConnection->mListener->NotifyDataChannelOpen(this); + } + SetReadyState(DataChannelState::Open); + DC_DEBUG(("%s: sending ON_CHANNEL_OPEN for %s/%s: %u", __FUNCTION__, + mLabel.get(), mProtocol.get(), mStream)); + if (mListener) { + mListener->OnChannelConnected(mContext); + } + } + })); +} + +void DataChannel::AnnounceClosed() { + mMainThreadEventTarget->Dispatch(NS_NewRunnableFunction( + "DataChannel::AnnounceClosed", [this, self = RefPtr<DataChannel>(this)] { + if (GetReadyState() == DataChannelState::Closed) { + return; + } + if (mEverOpened && mConnection && mConnection->mListener) { + mConnection->mListener->NotifyDataChannelClosed(this); + } + SetReadyState(DataChannelState::Closed); + mBufferedData.Clear(); + if (mListener) { + DC_DEBUG(("%s: sending ON_CHANNEL_CLOSED for %s/%s: %u", __FUNCTION__, + mLabel.get(), mProtocol.get(), mStream)); + mListener->OnChannelClosed(mContext); + } + })); +} + +// Set ready state +void DataChannel::SetReadyState(const DataChannelState aState) { + MOZ_ASSERT(NS_IsMainThread()); + + DC_DEBUG( + ("DataChannelConnection labeled %s(%p) (stream %d) changing ready state " + "%s -> %s", + mLabel.get(), this, mStream, ToString(mReadyState), ToString(aState))); + + mReadyState = aState; +} + +void DataChannel::SendMsg(const nsACString& aMsg, ErrorResult& aRv) { + if (!EnsureValidStream(aRv)) { + return; + } + + SendErrnoToErrorResult(mConnection->SendMsg(mStream, aMsg), aMsg.Length(), + aRv); + if (!aRv.Failed()) { + IncrementBufferedAmount(aMsg.Length(), aRv); + } +} + +void DataChannel::SendBinaryMsg(const nsACString& aMsg, ErrorResult& aRv) { + if (!EnsureValidStream(aRv)) { + return; + } + + SendErrnoToErrorResult(mConnection->SendBinaryMsg(mStream, aMsg), + aMsg.Length(), aRv); + if (!aRv.Failed()) { + IncrementBufferedAmount(aMsg.Length(), aRv); + } +} + +void DataChannel::SendBinaryBlob(dom::Blob& aBlob, ErrorResult& aRv) { + if (!EnsureValidStream(aRv)) { + return; + } + + uint64_t msgLength = aBlob.GetSize(aRv); + if (aRv.Failed()) { + return; + } + + if (msgLength > UINT32_MAX) { + aRv.Throw(NS_ERROR_FILE_TOO_BIG); + return; + } + + // We convert to an nsIInputStream here, because Blob is not threadsafe, and + // we don't convert it earlier because we need to know how large this is so we + // can update bufferedAmount. + nsCOMPtr<nsIInputStream> msgStream; + aBlob.CreateInputStream(getter_AddRefs(msgStream), aRv); + if (NS_WARN_IF(aRv.Failed())) { + return; + } + + SendErrnoToErrorResult(mConnection->SendBlob(mStream, msgStream), msgLength, + aRv); + if (!aRv.Failed()) { + IncrementBufferedAmount(msgLength, aRv); + } +} + +dom::Nullable<uint16_t> DataChannel::GetMaxPacketLifeTime() const { + if (mPrPolicy == DataChannelReliabilityPolicy::LimitedLifetime) { + return dom::Nullable<uint16_t>(mPrValue); + } + return dom::Nullable<uint16_t>(); +} + +dom::Nullable<uint16_t> DataChannel::GetMaxRetransmits() const { + if (mPrPolicy == DataChannelReliabilityPolicy::LimitedRetransmissions) { + return dom::Nullable<uint16_t>(mPrValue); + } + return dom::Nullable<uint16_t>(); +} + +uint32_t DataChannel::GetBufferedAmountLowThreshold() const { + return mBufferedThreshold; +} + +// Never fire immediately, as it's defined to fire on transitions, not state +void DataChannel::SetBufferedAmountLowThreshold(uint32_t aThreshold) { + mBufferedThreshold = aThreshold; +} + +// Called with mLock locked! +void DataChannel::SendOrQueue(DataChannelOnMessageAvailable* aMessage) { + nsCOMPtr<nsIRunnable> runnable = aMessage; + mMainThreadEventTarget->Dispatch(runnable.forget()); +} + +DataChannel::TrafficCounters DataChannel::GetTrafficCounters() const { + MutexAutoLock lock(mStatsLock); + return mTrafficCounters; +} + +bool DataChannel::EnsureValidStream(ErrorResult& aRv) { + MOZ_ASSERT(mConnection); + if (mConnection && mStream != INVALID_STREAM) { + return true; + } + aRv.Throw(NS_ERROR_DOM_INVALID_STATE_ERR); + return false; +} + +void DataChannel::WithTrafficCounters( + const std::function<void(TrafficCounters&)>& aFn) { + MutexAutoLock lock(mStatsLock); + aFn(mTrafficCounters); +} + +nsresult DataChannelOnMessageAvailable::Run() { + MOZ_ASSERT(NS_IsMainThread()); + + // Note: calling the listeners can indirectly cause the listeners to be + // made available for GC (by removing event listeners), especially for + // OnChannelClosed(). We hold a ref to the Channel and the listener + // while calling this. + switch (mType) { + case EventType::OnDataString: + case EventType::OnDataBinary: + if (!mChannel->mListener) { + DC_ERROR(("DataChannelOnMessageAvailable (%s) with null Listener!", + ToString(mType))); + return NS_OK; + } + + if (mChannel->GetReadyState() == DataChannelState::Closed || + mChannel->GetReadyState() == DataChannelState::Closing) { + // Closed by JS, probably + return NS_OK; + } + + if (mType == EventType::OnDataString) { + mChannel->mListener->OnMessageAvailable(mChannel->mContext, mData); + } else { + mChannel->mListener->OnBinaryMessageAvailable(mChannel->mContext, + mData); + } + break; + case EventType::OnDisconnected: + // If we've disconnected, make sure we close all the streams - from + // mainthread! + if (mConnection->mListener) { + mConnection->mListener->NotifySctpClosed(); + } + mConnection->CloseAll(); + break; + case EventType::OnChannelCreated: + if (!mConnection->mListener) { + DC_ERROR(("DataChannelOnMessageAvailable (%s) with null Listener!", + ToString(mType))); + return NS_OK; + } + + // important to give it an already_AddRefed pointer! + mConnection->mListener->NotifyDataChannel(mChannel.forget()); + break; + case EventType::OnConnection: + if (mConnection->mListener) { + mConnection->mListener->NotifySctpConnected(); + } + break; + } + return NS_OK; +} + +} // namespace mozilla |