summaryrefslogtreecommitdiffstats
path: root/netwerk/sctp/datachannel/DataChannel.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:44:51 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:44:51 +0000
commit9e3c08db40b8916968b9f30096c7be3f00ce9647 (patch)
treea68f146d7fa01f0134297619fbe7e33db084e0aa /netwerk/sctp/datachannel/DataChannel.h
parentInitial commit. (diff)
downloadthunderbird-9e3c08db40b8916968b9f30096c7be3f00ce9647.tar.xz
thunderbird-9e3c08db40b8916968b9f30096c7be3f00ce9647.zip
Adding upstream version 1:115.7.0.upstream/1%115.7.0upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'netwerk/sctp/datachannel/DataChannel.h')
-rw-r--r--netwerk/sctp/datachannel/DataChannel.h711
1 files changed, 711 insertions, 0 deletions
diff --git a/netwerk/sctp/datachannel/DataChannel.h b/netwerk/sctp/datachannel/DataChannel.h
new file mode 100644
index 0000000000..f985e87a5b
--- /dev/null
+++ b/netwerk/sctp/datachannel/DataChannel.h
@@ -0,0 +1,711 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: set ts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this file,
+ * You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#ifndef NETWERK_SCTP_DATACHANNEL_DATACHANNEL_H_
+#define NETWERK_SCTP_DATACHANNEL_DATACHANNEL_H_
+
+#ifdef MOZ_WEBRTC_SIGNALING
+# define SCTP_DTLS_SUPPORTED 1
+#endif
+
+#include <memory>
+#include <string>
+#include <vector>
+#include <errno.h>
+#include "nsISupports.h"
+#include "nsCOMPtr.h"
+#include "mozilla/UniquePtr.h"
+#include "mozilla/WeakPtr.h"
+#include "nsString.h"
+#include "nsThreadUtils.h"
+#include "nsTArray.h"
+#include "nsDeque.h"
+#include "mozilla/dom/Blob.h"
+#include "mozilla/Mutex.h"
+#include "DataChannelProtocol.h"
+#include "DataChannelListener.h"
+#include "mozilla/net/NeckoTargetHolder.h"
+#include "DataChannelLog.h"
+
+#ifdef SCTP_DTLS_SUPPORTED
+# include "transport/sigslot.h"
+# include "transport/transportlayer.h" // For TransportLayer::State
+#endif
+
+#ifndef EALREADY
+# define EALREADY WSAEALREADY
+#endif
+
+extern "C" {
+struct socket;
+struct sctp_rcvinfo;
+}
+
+namespace mozilla {
+
+class DataChannelConnection;
+class DataChannel;
+class DataChannelOnMessageAvailable;
+class MediaPacket;
+class MediaTransportHandler;
+namespace dom {
+struct RTCStatsCollection;
+};
+
+// For sending outgoing messages.
+// This class only holds a reference to the data and the info structure but does
+// not copy it.
+class OutgoingMsg {
+ public:
+ OutgoingMsg(struct sctp_sendv_spa& info, const uint8_t* data, size_t length);
+ ~OutgoingMsg() = default;
+ ;
+ void Advance(size_t offset);
+ struct sctp_sendv_spa& GetInfo() { return *mInfo; };
+ size_t GetLength() const { return mLength; };
+ size_t GetLeft() const { return mLength - mPos; };
+ const uint8_t* GetData() { return (const uint8_t*)(mData + mPos); };
+
+ protected:
+ OutgoingMsg() // Use this for inheritance only
+ : mLength(0), mData(nullptr), mInfo(nullptr), mPos(0){};
+ size_t mLength;
+ const uint8_t* mData;
+ struct sctp_sendv_spa* mInfo;
+ size_t mPos;
+};
+
+// For queuing outgoing messages
+// This class copies data of an outgoing message.
+class BufferedOutgoingMsg : public OutgoingMsg {
+ public:
+ explicit BufferedOutgoingMsg(OutgoingMsg& message);
+ ~BufferedOutgoingMsg();
+};
+
+// for queuing incoming data messages before the Open or
+// external negotiation is indicated to us
+class QueuedDataMessage {
+ public:
+ QueuedDataMessage(uint16_t stream, uint32_t ppid, int flags, const void* data,
+ uint32_t length)
+ : mStream(stream), mPpid(ppid), mFlags(flags), mLength(length) {
+ mData = static_cast<uint8_t*>(moz_xmalloc((size_t)length)); // infallible
+ memcpy(mData, data, (size_t)length);
+ }
+
+ ~QueuedDataMessage() { free(mData); }
+
+ uint16_t mStream;
+ uint32_t mPpid;
+ int mFlags;
+ uint32_t mLength;
+ uint8_t* mData;
+};
+
+// One per PeerConnection
+class DataChannelConnection final : public net::NeckoTargetHolder
+#ifdef SCTP_DTLS_SUPPORTED
+ ,
+ public sigslot::has_slots<>
+#endif
+{
+ friend class DataChannel;
+ friend class DataChannelOnMessageAvailable;
+ friend class DataChannelConnectRunnable;
+
+ virtual ~DataChannelConnection();
+
+ public:
+ enum {
+ PENDING_NONE = 0U, // No outgoing messages are pending
+ PENDING_DCEP = 1U, // Outgoing DCEP messages are pending
+ PENDING_DATA = 2U, // Outgoing data channel messages are pending
+ };
+
+ NS_INLINE_DECL_THREADSAFE_REFCOUNTING(DataChannelConnection)
+
+ class DataConnectionListener : public SupportsWeakPtr {
+ public:
+ virtual ~DataConnectionListener() = default;
+
+ // Called when a new DataChannel has been opened by the other side.
+ virtual void NotifyDataChannel(already_AddRefed<DataChannel> channel) = 0;
+
+ // Called when a DataChannel transitions to state open
+ virtual void NotifyDataChannelOpen(DataChannel* aChannel) = 0;
+
+ // Called when a DataChannel (that was open at some point in the past)
+ // transitions to state closed
+ virtual void NotifyDataChannelClosed(DataChannel* aChannel) = 0;
+
+ // Called when SCTP connects
+ virtual void NotifySctpConnected() = 0;
+
+ // Called when SCTP closes
+ virtual void NotifySctpClosed() = 0;
+ };
+
+ // Create a new DataChannel Connection
+ // Must be called on Main thread
+ static Maybe<RefPtr<DataChannelConnection>> Create(
+ DataConnectionListener* aListener, nsISerialEventTarget* aTarget,
+ MediaTransportHandler* aHandler, const uint16_t aLocalPort,
+ const uint16_t aNumStreams, const Maybe<uint64_t>& aMaxMessageSize);
+
+ void Destroy(); // So we can spawn refs tied to runnables in shutdown
+ // Finish Destroy on STS to avoid SCTP race condition with ABORT from far end
+ void DestroyOnSTS(struct socket* aMasterSocket, struct socket* aSocket);
+ void DestroyOnSTSFinal();
+
+ void SetMaxMessageSize(bool aMaxMessageSizeSet, uint64_t aMaxMessageSize);
+ uint64_t GetMaxMessageSize();
+
+ void AppendStatsToReport(const UniquePtr<dom::RTCStatsCollection>& aReport,
+ const DOMHighResTimeStamp aTimestamp) const;
+#ifdef ALLOW_DIRECT_SCTP_LISTEN_CONNECT
+ // These block; they require something to decide on listener/connector
+ // (though you can do simultaneous Connect()). Do not call these from
+ // the main thread!
+ bool Listen(unsigned short port);
+ bool Connect(const char* addr, unsigned short port);
+#endif
+
+#ifdef SCTP_DTLS_SUPPORTED
+ bool ConnectToTransport(const std::string& aTransportId, const bool aClient,
+ const uint16_t aLocalPort,
+ const uint16_t aRemotePort);
+ void TransportStateChange(const std::string& aTransportId,
+ TransportLayer::State aState);
+ void CompleteConnect();
+ void SetSignals(const std::string& aTransportId);
+#endif
+
+ typedef enum {
+ RELIABLE = 0,
+ PARTIAL_RELIABLE_REXMIT = 1,
+ PARTIAL_RELIABLE_TIMED = 2
+ } Type;
+
+ [[nodiscard]] already_AddRefed<DataChannel> Open(
+ const nsACString& label, const nsACString& protocol, Type type,
+ bool inOrder, uint32_t prValue, DataChannelListener* aListener,
+ nsISupports* aContext, bool aExternalNegotiated, uint16_t aStream);
+
+ void Stop();
+ void Close(DataChannel* aChannel);
+ void CloseLocked(DataChannel* aChannel) MOZ_REQUIRES(mLock);
+ void CloseAll();
+
+ // Returns a POSIX error code.
+ int SendMsg(uint16_t stream, const nsACString& aMsg) {
+ return SendDataMsgCommon(stream, aMsg, false);
+ }
+
+ // Returns a POSIX error code.
+ int SendBinaryMsg(uint16_t stream, const nsACString& aMsg) {
+ return SendDataMsgCommon(stream, aMsg, true);
+ }
+
+ // Returns a POSIX error code.
+ int SendBlob(uint16_t stream, nsIInputStream* aBlob);
+
+ // Called on data reception from the SCTP library
+ // must(?) be public so my c->c++ trampoline can call it
+ // May be called with (STS thread) or without the lock
+ int ReceiveCallback(struct socket* sock, void* data, size_t datalen,
+ struct sctp_rcvinfo rcv, int flags);
+
+ // Find out state
+ enum { CONNECTING = 0U, OPEN = 1U, CLOSING = 2U, CLOSED = 3U };
+
+ Mutex mLock;
+
+ void ReadBlob(already_AddRefed<DataChannelConnection> aThis, uint16_t aStream,
+ nsIInputStream* aBlob);
+
+ bool SendDeferredMessages() MOZ_REQUIRES(mLock);
+
+#ifdef SCTP_DTLS_SUPPORTED
+ int SctpDtlsOutput(void* addr, void* buffer, size_t length, uint8_t tos,
+ uint8_t set_df);
+#endif
+
+ bool InShutdown() const {
+#ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
+ return mShutdown;
+#else
+ return false;
+#endif
+ }
+
+ protected:
+ // Avoid cycles with PeerConnectionImpl
+ // Use from main thread only as WeakPtr is not threadsafe
+ WeakPtr<DataConnectionListener> mListener;
+
+ private:
+ DataChannelConnection(DataConnectionListener* aListener,
+ nsISerialEventTarget* aTarget,
+ MediaTransportHandler* aHandler);
+
+ bool Init(const uint16_t aLocalPort, const uint16_t aNumStreams,
+ const Maybe<uint64_t>& aMaxMessageSize);
+
+ // Caller must hold mLock
+ uint16_t GetReadyState() const MOZ_REQUIRES(mLock) {
+ mLock.AssertCurrentThreadOwns();
+
+ return mState;
+ }
+
+ // Caller must hold mLock
+ void SetReadyState(const uint16_t aState) MOZ_REQUIRES(mLock);
+
+#ifdef SCTP_DTLS_SUPPORTED
+ static void DTLSConnectThread(void* data);
+ void SendPacket(std::unique_ptr<MediaPacket>&& packet);
+ void SctpDtlsInput(const std::string& aTransportId,
+ const MediaPacket& packet);
+#endif
+ DataChannel* FindChannelByStream(uint16_t stream) MOZ_REQUIRES(mLock);
+ uint16_t FindFreeStream() MOZ_REQUIRES(mLock);
+ bool RequestMoreStreams(int32_t aNeeded = 16) MOZ_REQUIRES(mLock);
+ uint32_t UpdateCurrentStreamIndex() MOZ_REQUIRES(mLock);
+ uint32_t GetCurrentStreamIndex() MOZ_REQUIRES(mLock);
+ int SendControlMessage(const uint8_t* data, uint32_t len, uint16_t stream)
+ MOZ_REQUIRES(mLock);
+ int SendOpenAckMessage(uint16_t stream) MOZ_REQUIRES(mLock);
+ int SendOpenRequestMessage(const nsACString& label,
+ const nsACString& protocol, uint16_t stream,
+ bool unordered, uint16_t prPolicy,
+ uint32_t prValue) MOZ_REQUIRES(mLock);
+ bool SendBufferedMessages(nsTArray<UniquePtr<BufferedOutgoingMsg>>& buffer,
+ size_t* aWritten);
+ int SendMsgInternal(OutgoingMsg& msg, size_t* aWritten);
+ int SendMsgInternalOrBuffer(nsTArray<UniquePtr<BufferedOutgoingMsg>>& buffer,
+ OutgoingMsg& msg, bool& buffered,
+ size_t* aWritten) MOZ_REQUIRES(mLock);
+ int SendDataMsgInternalOrBuffer(DataChannel& channel, const uint8_t* data,
+ size_t len, uint32_t ppid)
+ MOZ_REQUIRES(mLock);
+ int SendDataMsg(DataChannel& channel, const uint8_t* data, size_t len,
+ uint32_t ppidPartial, uint32_t ppidFinal) MOZ_REQUIRES(mLock);
+ int SendDataMsgCommon(uint16_t stream, const nsACString& aMsg, bool isBinary);
+
+ void DeliverQueuedData(uint16_t stream) MOZ_REQUIRES(mLock);
+
+ already_AddRefed<DataChannel> OpenFinish(
+ already_AddRefed<DataChannel>&& aChannel) MOZ_REQUIRES(mLock);
+
+ void ProcessQueuedOpens() MOZ_REQUIRES(mLock);
+ void ClearResets() MOZ_REQUIRES(mLock);
+ void SendOutgoingStreamReset() MOZ_REQUIRES(mLock);
+ void ResetOutgoingStream(uint16_t stream) MOZ_REQUIRES(mLock);
+ void HandleOpenRequestMessage(
+ const struct rtcweb_datachannel_open_request* req, uint32_t length,
+ uint16_t stream) MOZ_REQUIRES(mLock);
+ void HandleOpenAckMessage(const struct rtcweb_datachannel_ack* ack,
+ uint32_t length, uint16_t stream);
+ void HandleUnknownMessage(uint32_t ppid, uint32_t length, uint16_t stream)
+ MOZ_REQUIRES(mLock);
+ uint8_t BufferMessage(nsACString& recvBuffer, const void* data,
+ uint32_t length, uint32_t ppid, int flags);
+ void HandleDataMessage(const void* buffer, size_t length, uint32_t ppid,
+ uint16_t stream, int flags) MOZ_REQUIRES(mLock);
+ void HandleDCEPMessage(const void* buffer, size_t length, uint32_t ppid,
+ uint16_t stream, int flags) MOZ_REQUIRES(mLock);
+ void HandleMessage(const void* buffer, size_t length, uint32_t ppid,
+ uint16_t stream, int flags) MOZ_REQUIRES(mLock);
+ void HandleAssociationChangeEvent(const struct sctp_assoc_change* sac)
+ MOZ_REQUIRES(mLock);
+ void HandlePeerAddressChangeEvent(const struct sctp_paddr_change* spc)
+ MOZ_REQUIRES(mLock);
+ void HandleRemoteErrorEvent(const struct sctp_remote_error* sre)
+ MOZ_REQUIRES(mLock);
+ void HandleShutdownEvent(const struct sctp_shutdown_event* sse)
+ MOZ_REQUIRES(mLock);
+ void HandleAdaptationIndication(const struct sctp_adaptation_event* sai)
+ MOZ_REQUIRES(mLock);
+ void HandlePartialDeliveryEvent(const struct sctp_pdapi_event* spde)
+ MOZ_REQUIRES(mLock);
+ void HandleSendFailedEvent(const struct sctp_send_failed_event* ssfe)
+ MOZ_REQUIRES(mLock);
+ void HandleStreamResetEvent(const struct sctp_stream_reset_event* strrst)
+ MOZ_REQUIRES(mLock);
+ void HandleStreamChangeEvent(const struct sctp_stream_change_event* strchg)
+ MOZ_REQUIRES(mLock);
+ void HandleNotification(const union sctp_notification* notif, size_t n)
+ MOZ_REQUIRES(mLock);
+
+#ifdef SCTP_DTLS_SUPPORTED
+ bool IsSTSThread() const {
+ bool on = false;
+ if (mSTS) {
+ mSTS->IsOnCurrentThread(&on);
+ }
+ return on;
+ }
+#endif
+
+ class Channels {
+ public:
+ Channels() : mMutex("DataChannelConnection::Channels::mMutex") {}
+ void Insert(const RefPtr<DataChannel>& aChannel);
+ bool Remove(const RefPtr<DataChannel>& aChannel);
+ RefPtr<DataChannel> Get(uint16_t aId) const;
+ typedef AutoTArray<RefPtr<DataChannel>, 16> ChannelArray;
+ ChannelArray GetAll() const {
+ MutexAutoLock lock(mMutex);
+ return mChannels.Clone();
+ }
+ RefPtr<DataChannel> GetNextChannel(uint16_t aCurrentId) const;
+
+ private:
+ struct IdComparator {
+ bool Equals(const RefPtr<DataChannel>& aChannel, uint16_t aId) const;
+ bool LessThan(const RefPtr<DataChannel>& aChannel, uint16_t aId) const;
+ bool Equals(const RefPtr<DataChannel>& a1,
+ const RefPtr<DataChannel>& a2) const;
+ bool LessThan(const RefPtr<DataChannel>& a1,
+ const RefPtr<DataChannel>& a2) const;
+ };
+ mutable Mutex mMutex;
+ ChannelArray mChannels MOZ_GUARDED_BY(mMutex);
+ };
+
+ bool mSendInterleaved MOZ_GUARDED_BY(mLock) = false;
+ // MainThread only
+ bool mMaxMessageSizeSet = false;
+ // mMaxMessageSize is only set on MainThread, but read off-main-thread
+ uint64_t mMaxMessageSize MOZ_GUARDED_BY(mLock) = 0;
+ // Main thread only
+ Maybe<bool> mAllocateEven;
+ // Data:
+ // NOTE: while this container will auto-expand, increases in the number of
+ // channels available from the stack must be negotiated!
+ // Accessed from both main and sts, API is threadsafe
+ Channels mChannels;
+ // STS only
+ uint32_t mCurrentStream = 0;
+ nsRefPtrDeque<DataChannel> mPending;
+ // STS and main
+ size_t mNegotiatedIdLimit MOZ_GUARDED_BY(mLock) = 0;
+ uint8_t mPendingType MOZ_GUARDED_BY(mLock) = PENDING_NONE;
+ // holds data that's come in before a channel is open
+ nsTArray<UniquePtr<QueuedDataMessage>> mQueuedData MOZ_GUARDED_BY(mLock);
+ // holds outgoing control messages
+ nsTArray<UniquePtr<BufferedOutgoingMsg>> mBufferedControl
+ MOZ_GUARDED_BY(mLock);
+
+ // Streams pending reset. Accessed from main and STS.
+ AutoTArray<uint16_t, 4> mStreamsResetting MOZ_GUARDED_BY(mLock);
+ // accessed from STS thread
+ struct socket* mMasterSocket = nullptr;
+ // cloned from mMasterSocket on successful Connect on STS thread
+ struct socket* mSocket = nullptr;
+ uint16_t mState MOZ_GUARDED_BY(mLock) = CLOSED; // Protected with mLock
+
+#ifdef SCTP_DTLS_SUPPORTED
+ std::string mTransportId;
+ RefPtr<MediaTransportHandler> mTransportHandler;
+ nsCOMPtr<nsIEventTarget> mSTS;
+#endif
+ uint16_t mLocalPort = 0; // Accessed from connect thread
+ uint16_t mRemotePort = 0;
+
+ nsCOMPtr<nsIThread> mInternalIOThread = nullptr;
+ nsCString mRecvBuffer;
+
+ // Workaround to prevent a message from being received on main before the
+ // sender sees the decrease in bufferedAmount.
+ bool mDeferSend = false;
+ std::vector<std::unique_ptr<MediaPacket>> mDeferredSend;
+
+#ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
+ bool mShutdown;
+#endif
+ uintptr_t mId = 0;
+};
+
+#define ENSURE_DATACONNECTION \
+ do { \
+ MOZ_ASSERT(mConnection); \
+ if (!mConnection) { \
+ return; \
+ } \
+ } while (0)
+
+class DataChannel {
+ friend class DataChannelOnMessageAvailable;
+ friend class DataChannelConnection;
+
+ public:
+ enum { CONNECTING = 0U, OPEN = 1U, CLOSING = 2U, CLOSED = 3U };
+
+ DataChannel(DataChannelConnection* connection, uint16_t stream,
+ uint16_t state, const nsACString& label,
+ const nsACString& protocol, uint16_t policy, uint32_t value,
+ bool ordered, bool negotiated, DataChannelListener* aListener,
+ nsISupports* aContext)
+ : mListener(aListener),
+ mContext(aContext),
+ mConnection(connection),
+ mLabel(label),
+ mProtocol(protocol),
+ mReadyState(state),
+ mStream(stream),
+ mPrPolicy(policy),
+ mPrValue(value),
+ mNegotiated(negotiated),
+ mOrdered(ordered),
+ mFlags(0),
+ mIsRecvBinary(false),
+ mBufferedThreshold(0), // default from spec
+ mBufferedAmount(0),
+ mMainThreadEventTarget(connection->GetNeckoTarget()),
+ mStatsLock("netwer::sctp::DataChannel::mStatsLock") {
+ NS_ASSERTION(mConnection, "NULL connection");
+ }
+
+ private:
+ ~DataChannel();
+
+ public:
+ NS_INLINE_DECL_THREADSAFE_REFCOUNTING(DataChannel)
+
+ // when we disconnect from the connection after stream RESET
+ void StreamClosedLocked();
+
+ // Complete dropping of the link between DataChannel and the connection.
+ // After this, except for a few methods below listed to be safe, you can't
+ // call into DataChannel.
+ void ReleaseConnection();
+
+ // Close this DataChannel. Can be called multiple times. MUST be called
+ // before destroying the DataChannel (state must be CLOSED or CLOSING).
+ void Close();
+
+ // Set the listener (especially for channels created from the other side)
+ void SetListener(DataChannelListener* aListener, nsISupports* aContext);
+
+ // Helper for send methods that converts POSIX error codes to an ErrorResult.
+ static void SendErrnoToErrorResult(int error, size_t aMessageSize,
+ ErrorResult& aRv);
+
+ // Send a string
+ void SendMsg(const nsACString& aMsg, ErrorResult& aRv);
+
+ // Send a binary message (TypedArray)
+ void SendBinaryMsg(const nsACString& aMsg, ErrorResult& aRv);
+
+ // Send a binary blob
+ void SendBinaryBlob(dom::Blob& aBlob, ErrorResult& aRv);
+
+ uint16_t GetType() const { return mPrPolicy; }
+
+ dom::Nullable<uint16_t> GetMaxPacketLifeTime() const;
+
+ dom::Nullable<uint16_t> GetMaxRetransmits() const;
+
+ bool GetNegotiated() const { return mNegotiated; }
+
+ bool GetOrdered() const { return mOrdered; }
+
+ void IncrementBufferedAmount(uint32_t aSize, ErrorResult& aRv);
+ void DecrementBufferedAmount(uint32_t aSize);
+
+ // Amount of data buffered to send
+ uint32_t GetBufferedAmount() const {
+ MOZ_ASSERT(NS_IsMainThread());
+ return mBufferedAmount;
+ }
+
+ // Trigger amount for generating BufferedAmountLow events
+ uint32_t GetBufferedAmountLowThreshold() const;
+ void SetBufferedAmountLowThreshold(uint32_t aThreshold);
+
+ void AnnounceOpen();
+ // TODO(bug 843625): Optionally pass an error here.
+ void AnnounceClosed();
+
+ // Find out state
+ uint16_t GetReadyState() const {
+ MOZ_ASSERT(NS_IsMainThread());
+ return mReadyState;
+ }
+
+ // Set ready state
+ void SetReadyState(const uint16_t aState);
+
+ void GetLabel(nsAString& aLabel) { CopyUTF8toUTF16(mLabel, aLabel); }
+ void GetProtocol(nsAString& aProtocol) {
+ CopyUTF8toUTF16(mProtocol, aProtocol);
+ }
+ uint16_t GetStream() const { return mStream; }
+
+ void SendOrQueue(DataChannelOnMessageAvailable* aMessage);
+
+ struct TrafficCounters {
+ uint32_t mMessagesSent = 0;
+ uint64_t mBytesSent = 0;
+ uint32_t mMessagesReceived = 0;
+ uint64_t mBytesReceived = 0;
+ };
+
+ TrafficCounters GetTrafficCounters() const;
+
+ protected:
+ // These are both mainthread only
+ DataChannelListener* mListener;
+ nsCOMPtr<nsISupports> mContext;
+
+ private:
+ nsresult AddDataToBinaryMsg(const char* data, uint32_t size);
+ bool EnsureValidStream(ErrorResult& aRv);
+ void WithTrafficCounters(const std::function<void(TrafficCounters&)>&);
+
+ RefPtr<DataChannelConnection> mConnection;
+ // mainthread only
+ bool mEverOpened = false;
+ nsCString mLabel;
+ nsCString mProtocol;
+ // This is mainthread only
+ uint16_t mReadyState;
+ uint16_t mStream;
+ uint16_t mPrPolicy;
+ uint32_t mPrValue;
+ // Accessed on main and STS
+ const bool mNegotiated;
+ const bool mOrdered;
+ uint32_t mFlags;
+ bool mIsRecvBinary;
+ size_t mBufferedThreshold;
+ // Read/written on main only. Decremented via message-passing, because the
+ // spec requires us to queue a task for this.
+ size_t mBufferedAmount;
+ nsCString mRecvBuffer;
+ nsTArray<UniquePtr<BufferedOutgoingMsg>>
+ mBufferedData; // MOZ_GUARDED_BY(mConnection->mLock)
+ nsCOMPtr<nsISerialEventTarget> mMainThreadEventTarget;
+ mutable Mutex mStatsLock; // protects mTrafficCounters
+ TrafficCounters mTrafficCounters MOZ_GUARDED_BY(mStatsLock);
+};
+
+// used to dispatch notifications of incoming data to the main thread
+// Patterned on CallOnMessageAvailable in WebSockets
+// Also used to proxy other items to MainThread
+class DataChannelOnMessageAvailable : public Runnable {
+ public:
+ enum {
+ ON_CONNECTION,
+ ON_DISCONNECTED,
+ ON_CHANNEL_CREATED,
+ ON_DATA_STRING,
+ ON_DATA_BINARY,
+ }; /* types */
+
+ DataChannelOnMessageAvailable(
+ int32_t aType, DataChannelConnection* aConnection, DataChannel* aChannel,
+ nsCString& aData) // XXX this causes inefficiency
+ : Runnable("DataChannelOnMessageAvailable"),
+ mType(aType),
+ mChannel(aChannel),
+ mConnection(aConnection),
+ mData(aData) {}
+
+ DataChannelOnMessageAvailable(int32_t aType, DataChannel* aChannel)
+ : Runnable("DataChannelOnMessageAvailable"),
+ mType(aType),
+ mChannel(aChannel) {}
+ // XXX is it safe to leave mData uninitialized? This should only be
+ // used for notifications that don't use them, but I'd like more
+ // bulletproof compile-time checking.
+
+ DataChannelOnMessageAvailable(int32_t aType,
+ DataChannelConnection* aConnection,
+ DataChannel* aChannel)
+ : Runnable("DataChannelOnMessageAvailable"),
+ mType(aType),
+ mChannel(aChannel),
+ mConnection(aConnection) {}
+
+ // for ON_CONNECTION/ON_DISCONNECTED
+ DataChannelOnMessageAvailable(int32_t aType,
+ DataChannelConnection* aConnection)
+ : Runnable("DataChannelOnMessageAvailable"),
+ mType(aType),
+ mConnection(aConnection) {}
+
+ NS_IMETHOD Run() override {
+ MOZ_ASSERT(NS_IsMainThread());
+
+ // Note: calling the listeners can indirectly cause the listeners to be
+ // made available for GC (by removing event listeners), especially for
+ // OnChannelClosed(). We hold a ref to the Channel and the listener
+ // while calling this.
+ switch (mType) {
+ case ON_DATA_STRING:
+ case ON_DATA_BINARY:
+ if (!mChannel->mListener) {
+ DC_ERROR(("DataChannelOnMessageAvailable (%d) with null Listener!",
+ mType));
+ return NS_OK;
+ }
+
+ if (mChannel->GetReadyState() == DataChannel::CLOSED ||
+ mChannel->GetReadyState() == DataChannel::CLOSING) {
+ // Closed by JS, probably
+ return NS_OK;
+ }
+
+ if (mType == ON_DATA_STRING) {
+ mChannel->mListener->OnMessageAvailable(mChannel->mContext, mData);
+ } else {
+ mChannel->mListener->OnBinaryMessageAvailable(mChannel->mContext,
+ mData);
+ }
+ break;
+ case ON_DISCONNECTED:
+ // If we've disconnected, make sure we close all the streams - from
+ // mainthread!
+ if (mConnection->mListener) {
+ mConnection->mListener->NotifySctpClosed();
+ }
+ mConnection->CloseAll();
+ break;
+ case ON_CHANNEL_CREATED:
+ if (!mConnection->mListener) {
+ DC_ERROR(("DataChannelOnMessageAvailable (%d) with null Listener!",
+ mType));
+ return NS_OK;
+ }
+
+ // important to give it an already_AddRefed pointer!
+ mConnection->mListener->NotifyDataChannel(mChannel.forget());
+ break;
+ case ON_CONNECTION:
+ if (mConnection->mListener) {
+ mConnection->mListener->NotifySctpConnected();
+ }
+ break;
+ }
+ return NS_OK;
+ }
+
+ private:
+ ~DataChannelOnMessageAvailable() = default;
+
+ int32_t mType;
+ // XXX should use union
+ RefPtr<DataChannel> mChannel;
+ RefPtr<DataChannelConnection> mConnection;
+ nsCString mData;
+};
+
+} // namespace mozilla
+
+#endif // NETWERK_SCTP_DATACHANNEL_DATACHANNEL_H_