summaryrefslogtreecommitdiffstats
path: root/netwerk/protocol/webtransport/WebTransportStreamProxy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'netwerk/protocol/webtransport/WebTransportStreamProxy.cpp')
-rw-r--r--netwerk/protocol/webtransport/WebTransportStreamProxy.cpp386
1 files changed, 386 insertions, 0 deletions
diff --git a/netwerk/protocol/webtransport/WebTransportStreamProxy.cpp b/netwerk/protocol/webtransport/WebTransportStreamProxy.cpp
new file mode 100644
index 0000000000..6192bda0f7
--- /dev/null
+++ b/netwerk/protocol/webtransport/WebTransportStreamProxy.cpp
@@ -0,0 +1,386 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#include "WebTransportStreamProxy.h"
+
+#include "WebTransportLog.h"
+#include "Http3WebTransportStream.h"
+#include "nsProxyRelease.h"
+#include "nsSocketTransportService2.h"
+
+namespace mozilla::net {
+
+NS_IMPL_ADDREF(WebTransportStreamProxy)
+NS_IMPL_RELEASE(WebTransportStreamProxy)
+
+NS_INTERFACE_MAP_BEGIN(WebTransportStreamProxy)
+ NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIWebTransportReceiveStream)
+ NS_INTERFACE_MAP_ENTRY(nsIWebTransportReceiveStream)
+ NS_INTERFACE_MAP_ENTRY(nsIWebTransportSendStream)
+ NS_INTERFACE_MAP_ENTRY(nsIWebTransportBidirectionalStream)
+NS_INTERFACE_MAP_END
+
+WebTransportStreamProxy::WebTransportStreamProxy(
+ Http3WebTransportStream* aStream)
+ : mWebTransportStream(aStream) {
+ nsCOMPtr<nsIAsyncInputStream> inputStream;
+ nsCOMPtr<nsIAsyncOutputStream> outputStream;
+ mWebTransportStream->GetWriterAndReader(getter_AddRefs(outputStream),
+ getter_AddRefs(inputStream));
+ if (outputStream) {
+ mWriter = new AsyncOutputStreamWrapper(outputStream);
+ }
+ if (inputStream) {
+ mReader = new AsyncInputStreamWrapper(inputStream, mWebTransportStream);
+ }
+}
+
+WebTransportStreamProxy::~WebTransportStreamProxy() {
+ // mWebTransportStream needs to be destroyed on the socket thread.
+ NS_ProxyRelease("WebTransportStreamProxy::~WebTransportStreamProxy",
+ gSocketTransportService, mWebTransportStream.forget());
+}
+
+NS_IMETHODIMP WebTransportStreamProxy::SendStopSending(uint8_t aError) {
+ if (!OnSocketThread()) {
+ RefPtr<WebTransportStreamProxy> self(this);
+ return gSocketTransportService->Dispatch(
+ NS_NewRunnableFunction("WebTransportStreamProxy::SendStopSending",
+ [self{std::move(self)}, error(aError)]() {
+ self->SendStopSending(error);
+ }));
+ }
+
+ mWebTransportStream->SendStopSending(aError);
+ return NS_OK;
+}
+
+NS_IMETHODIMP WebTransportStreamProxy::SendFin(void) {
+ if (!mWriter) {
+ return NS_ERROR_UNEXPECTED;
+ }
+
+ mWriter->Close();
+
+ if (!OnSocketThread()) {
+ RefPtr<WebTransportStreamProxy> self(this);
+ return gSocketTransportService->Dispatch(NS_NewRunnableFunction(
+ "WebTransportStreamProxy::SendFin",
+ [self{std::move(self)}]() { self->mWebTransportStream->SendFin(); }));
+ }
+
+ mWebTransportStream->SendFin();
+ return NS_OK;
+}
+
+NS_IMETHODIMP WebTransportStreamProxy::Reset(uint8_t aErrorCode) {
+ if (!mWriter) {
+ return NS_ERROR_UNEXPECTED;
+ }
+
+ mWriter->Close();
+
+ if (!OnSocketThread()) {
+ RefPtr<WebTransportStreamProxy> self(this);
+ return gSocketTransportService->Dispatch(
+ NS_NewRunnableFunction("WebTransportStreamProxy::Reset",
+ [self{std::move(self)}, error(aErrorCode)]() {
+ self->mWebTransportStream->Reset(error);
+ }));
+ }
+
+ mWebTransportStream->Reset(aErrorCode);
+ return NS_OK;
+}
+
+namespace {
+
+class StatsCallbackWrapper : public nsIWebTransportStreamStatsCallback {
+ public:
+ NS_DECL_THREADSAFE_ISUPPORTS
+
+ explicit StatsCallbackWrapper(nsIWebTransportStreamStatsCallback* aCallback)
+ : mCallback(aCallback), mTarget(GetCurrentSerialEventTarget()) {}
+
+ NS_IMETHOD OnSendStatsAvailable(
+ nsIWebTransportSendStreamStats* aStats) override {
+ if (!mTarget->IsOnCurrentThread()) {
+ RefPtr<StatsCallbackWrapper> self(this);
+ nsCOMPtr<nsIWebTransportSendStreamStats> stats = aStats;
+ Unused << mTarget->Dispatch(NS_NewRunnableFunction(
+ "StatsCallbackWrapper::OnSendStatsAvailable",
+ [self{std::move(self)}, stats{std::move(stats)}]() {
+ self->OnSendStatsAvailable(stats);
+ }));
+ return NS_OK;
+ }
+
+ mCallback->OnSendStatsAvailable(aStats);
+ return NS_OK;
+ }
+
+ NS_IMETHOD OnReceiveStatsAvailable(
+ nsIWebTransportReceiveStreamStats* aStats) override {
+ if (!mTarget->IsOnCurrentThread()) {
+ RefPtr<StatsCallbackWrapper> self(this);
+ nsCOMPtr<nsIWebTransportReceiveStreamStats> stats = aStats;
+ Unused << mTarget->Dispatch(NS_NewRunnableFunction(
+ "StatsCallbackWrapper::OnReceiveStatsAvailable",
+ [self{std::move(self)}, stats{std::move(stats)}]() {
+ self->OnReceiveStatsAvailable(stats);
+ }));
+ return NS_OK;
+ }
+
+ mCallback->OnReceiveStatsAvailable(aStats);
+ return NS_OK;
+ }
+
+ private:
+ virtual ~StatsCallbackWrapper() {
+ NS_ProxyRelease("StatsCallbackWrapper::~StatsCallbackWrapper", mTarget,
+ mCallback.forget());
+ }
+
+ nsCOMPtr<nsIWebTransportStreamStatsCallback> mCallback;
+ nsCOMPtr<nsIEventTarget> mTarget;
+};
+
+NS_IMPL_ISUPPORTS(StatsCallbackWrapper, nsIWebTransportStreamStatsCallback)
+
+} // namespace
+
+NS_IMETHODIMP WebTransportStreamProxy::GetSendStreamStats(
+ nsIWebTransportStreamStatsCallback* aCallback) {
+ if (!OnSocketThread()) {
+ RefPtr<WebTransportStreamProxy> self(this);
+ nsCOMPtr<nsIWebTransportStreamStatsCallback> callback =
+ new StatsCallbackWrapper(aCallback);
+ return gSocketTransportService->Dispatch(NS_NewRunnableFunction(
+ "WebTransportStreamProxy::GetSendStreamStats",
+ [self{std::move(self)}, callback{std::move(callback)}]() {
+ self->GetSendStreamStats(callback);
+ }));
+ }
+
+ nsCOMPtr<nsIWebTransportSendStreamStats> stats =
+ mWebTransportStream->GetSendStreamStats();
+ aCallback->OnSendStatsAvailable(stats);
+ return NS_OK;
+}
+
+NS_IMETHODIMP WebTransportStreamProxy::GetReceiveStreamStats(
+ nsIWebTransportStreamStatsCallback* aCallback) {
+ if (!OnSocketThread()) {
+ RefPtr<WebTransportStreamProxy> self(this);
+ nsCOMPtr<nsIWebTransportStreamStatsCallback> callback =
+ new StatsCallbackWrapper(aCallback);
+ return gSocketTransportService->Dispatch(NS_NewRunnableFunction(
+ "WebTransportStreamProxy::GetReceiveStreamStats",
+ [self{std::move(self)}, callback{std::move(callback)}]() {
+ self->GetReceiveStreamStats(callback);
+ }));
+ }
+
+ nsCOMPtr<nsIWebTransportReceiveStreamStats> stats =
+ mWebTransportStream->GetReceiveStreamStats();
+ aCallback->OnReceiveStatsAvailable(stats);
+ return NS_OK;
+}
+
+NS_IMETHODIMP WebTransportStreamProxy::GetHasReceivedFIN(
+ bool* aHasReceivedFIN) {
+ *aHasReceivedFIN = mWebTransportStream->RecvDone();
+ return NS_OK;
+}
+
+NS_IMETHODIMP WebTransportStreamProxy::GetInputStream(
+ nsIAsyncInputStream** aOut) {
+ if (!mReader) {
+ return NS_ERROR_NOT_AVAILABLE;
+ }
+
+ RefPtr<AsyncInputStreamWrapper> stream = mReader;
+ stream.forget(aOut);
+ return NS_OK;
+}
+
+NS_IMETHODIMP WebTransportStreamProxy::GetOutputStream(
+ nsIAsyncOutputStream** aOut) {
+ if (!mWriter) {
+ return NS_ERROR_NOT_AVAILABLE;
+ }
+
+ RefPtr<AsyncOutputStreamWrapper> stream = mWriter;
+ stream.forget(aOut);
+ return NS_OK;
+}
+
+NS_IMETHODIMP WebTransportStreamProxy::GetStreamId(uint64_t* aId) {
+ *aId = mWebTransportStream->StreamId();
+ return NS_OK;
+}
+
+NS_IMETHODIMP WebTransportStreamProxy::SetSendOrder(Maybe<int64_t> aSendOrder) {
+ if (!OnSocketThread()) {
+ return gSocketTransportService->Dispatch(NS_NewRunnableFunction(
+ "SetSendOrder", [stream = mWebTransportStream, aSendOrder]() {
+ stream->SetSendOrder(aSendOrder);
+ }));
+ }
+ mWebTransportStream->SetSendOrder(aSendOrder);
+ return NS_OK;
+}
+
+//------------------------------------------------------------------------------
+// WebTransportStreamProxy::AsyncInputStreamWrapper
+//------------------------------------------------------------------------------
+
+NS_IMPL_ISUPPORTS(WebTransportStreamProxy::AsyncInputStreamWrapper,
+ nsIInputStream, nsIAsyncInputStream)
+
+WebTransportStreamProxy::AsyncInputStreamWrapper::AsyncInputStreamWrapper(
+ nsIAsyncInputStream* aStream, Http3WebTransportStream* aWebTransportStream)
+ : mStream(aStream), mWebTransportStream(aWebTransportStream) {}
+
+WebTransportStreamProxy::AsyncInputStreamWrapper::~AsyncInputStreamWrapper() =
+ default;
+
+void WebTransportStreamProxy::AsyncInputStreamWrapper::MaybeCloseStream() {
+ if (!mWebTransportStream->RecvDone()) {
+ return;
+ }
+
+ uint64_t available = 0;
+ Unused << Available(&available);
+ if (available) {
+ // Don't close the InputStream if there's unread data available, since it
+ // would be lost. We exit above unless we know no more data will be received
+ // for the stream.
+ return;
+ }
+
+ LOG(
+ ("AsyncInputStreamWrapper::MaybeCloseStream close stream due to FIN "
+ "stream=%p",
+ mWebTransportStream.get()));
+ Close();
+}
+
+NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::Close() {
+ return mStream->CloseWithStatus(NS_BASE_STREAM_CLOSED);
+}
+
+NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::Available(
+ uint64_t* aAvailable) {
+ return mStream->Available(aAvailable);
+}
+
+NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::StreamStatus() {
+ return mStream->StreamStatus();
+}
+
+NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::Read(
+ char* aBuf, uint32_t aCount, uint32_t* aResult) {
+ LOG(("WebTransportStreamProxy::AsyncInputStreamWrapper::Read %p", this));
+ nsresult rv = mStream->Read(aBuf, aCount, aResult);
+ MaybeCloseStream();
+ return rv;
+}
+
+NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::ReadSegments(
+ nsWriteSegmentFun aWriter, void* aClosure, uint32_t aCount,
+ uint32_t* aResult) {
+ LOG(("WebTransportStreamProxy::AsyncInputStreamWrapper::ReadSegments %p",
+ this));
+ nsresult rv = mStream->ReadSegments(aWriter, aClosure, aCount, aResult);
+ if (*aResult > 0) {
+ LOG((" Read %u bytes", *aResult));
+ }
+ MaybeCloseStream();
+ return rv;
+}
+
+NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::IsNonBlocking(
+ bool* aResult) {
+ return mStream->IsNonBlocking(aResult);
+}
+
+NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::CloseWithStatus(
+ nsresult aStatus) {
+ return mStream->CloseWithStatus(aStatus);
+}
+
+NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::AsyncWait(
+ nsIInputStreamCallback* aCallback, uint32_t aFlags,
+ uint32_t aRequestedCount, nsIEventTarget* aEventTarget) {
+ return mStream->AsyncWait(aCallback, aFlags, aRequestedCount, aEventTarget);
+}
+
+//------------------------------------------------------------------------------
+// WebTransportStreamProxy::AsyncOutputStreamWrapper
+//------------------------------------------------------------------------------
+
+NS_IMPL_ISUPPORTS(WebTransportStreamProxy::AsyncOutputStreamWrapper,
+ nsIOutputStream, nsIAsyncOutputStream)
+
+WebTransportStreamProxy::AsyncOutputStreamWrapper::AsyncOutputStreamWrapper(
+ nsIAsyncOutputStream* aStream)
+ : mStream(aStream) {}
+
+WebTransportStreamProxy::AsyncOutputStreamWrapper::~AsyncOutputStreamWrapper() =
+ default;
+
+NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::Flush() {
+ return mStream->Flush();
+}
+
+NS_IMETHODIMP
+WebTransportStreamProxy::AsyncOutputStreamWrapper::StreamStatus() {
+ return mStream->StreamStatus();
+}
+
+NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::Write(
+ const char* aBuf, uint32_t aCount, uint32_t* aResult) {
+ LOG(
+ ("WebTransportStreamProxy::AsyncOutputStreamWrapper::Write %p %u bytes, "
+ "first byte %c",
+ this, aCount, aBuf[0]));
+ return mStream->Write(aBuf, aCount, aResult);
+}
+
+NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::WriteFrom(
+ nsIInputStream* aFromStream, uint32_t aCount, uint32_t* aResult) {
+ return mStream->WriteFrom(aFromStream, aCount, aResult);
+}
+
+NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::WriteSegments(
+ nsReadSegmentFun aReader, void* aClosure, uint32_t aCount,
+ uint32_t* aResult) {
+ return mStream->WriteSegments(aReader, aClosure, aCount, aResult);
+}
+
+NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::AsyncWait(
+ nsIOutputStreamCallback* aCallback, uint32_t aFlags,
+ uint32_t aRequestedCount, nsIEventTarget* aEventTarget) {
+ return mStream->AsyncWait(aCallback, aFlags, aRequestedCount, aEventTarget);
+}
+
+NS_IMETHODIMP
+WebTransportStreamProxy::AsyncOutputStreamWrapper::CloseWithStatus(
+ nsresult aStatus) {
+ return mStream->CloseWithStatus(aStatus);
+}
+
+NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::Close() {
+ return mStream->Close();
+}
+
+NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::IsNonBlocking(
+ bool* aResult) {
+ return mStream->IsNonBlocking(aResult);
+}
+
+} // namespace mozilla::net