diff options
Diffstat (limited to '')
-rw-r--r-- | netwerk/protocol/webtransport/WebTransportStreamProxy.cpp | 368 |
1 files changed, 368 insertions, 0 deletions
diff --git a/netwerk/protocol/webtransport/WebTransportStreamProxy.cpp b/netwerk/protocol/webtransport/WebTransportStreamProxy.cpp new file mode 100644 index 0000000000..090b1cceb9 --- /dev/null +++ b/netwerk/protocol/webtransport/WebTransportStreamProxy.cpp @@ -0,0 +1,368 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "WebTransportStreamProxy.h" + +#include "WebTransportLog.h" +#include "Http3WebTransportStream.h" +#include "nsProxyRelease.h" +#include "nsSocketTransportService2.h" + +namespace mozilla::net { + +NS_IMPL_ADDREF(WebTransportStreamProxy) +NS_IMPL_RELEASE(WebTransportStreamProxy) + +NS_INTERFACE_MAP_BEGIN(WebTransportStreamProxy) + NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIWebTransportReceiveStream) + NS_INTERFACE_MAP_ENTRY(nsIWebTransportReceiveStream) + NS_INTERFACE_MAP_ENTRY(nsIWebTransportSendStream) + NS_INTERFACE_MAP_ENTRY(nsIWebTransportBidirectionalStream) +NS_INTERFACE_MAP_END + +WebTransportStreamProxy::WebTransportStreamProxy( + Http3WebTransportStream* aStream) + : mWebTransportStream(aStream) { + nsCOMPtr<nsIAsyncInputStream> inputStream; + nsCOMPtr<nsIAsyncOutputStream> outputStream; + mWebTransportStream->GetWriterAndReader(getter_AddRefs(outputStream), + getter_AddRefs(inputStream)); + if (outputStream) { + mWriter = new AsyncOutputStreamWrapper(outputStream); + } + if (inputStream) { + mReader = new AsyncInputStreamWrapper(inputStream, mWebTransportStream); + } +} + +WebTransportStreamProxy::~WebTransportStreamProxy() { + // mWebTransportStream needs to be destroyed on the socket thread. + NS_ProxyRelease("WebTransportStreamProxy::~WebTransportStreamProxy", + gSocketTransportService, mWebTransportStream.forget()); +} + +NS_IMETHODIMP WebTransportStreamProxy::SendStopSending(uint8_t aError) { + if (!OnSocketThread()) { + RefPtr<WebTransportStreamProxy> self(this); + return gSocketTransportService->Dispatch( + NS_NewRunnableFunction("WebTransportStreamProxy::SendStopSending", + [self{std::move(self)}, error(aError)]() { + self->SendStopSending(error); + })); + } + + mWebTransportStream->SendStopSending(aError); + return NS_OK; +} + +NS_IMETHODIMP WebTransportStreamProxy::SendFin(void) { + if (!mWriter) { + return NS_ERROR_UNEXPECTED; + } + + mWriter->Close(); + + if (!OnSocketThread()) { + RefPtr<WebTransportStreamProxy> self(this); + return gSocketTransportService->Dispatch(NS_NewRunnableFunction( + "WebTransportStreamProxy::SendFin", + [self{std::move(self)}]() { self->mWebTransportStream->SendFin(); })); + } + + mWebTransportStream->SendFin(); + return NS_OK; +} + +NS_IMETHODIMP WebTransportStreamProxy::Reset(uint8_t aErrorCode) { + if (!mWriter) { + return NS_ERROR_UNEXPECTED; + } + + mWriter->Close(); + + if (!OnSocketThread()) { + RefPtr<WebTransportStreamProxy> self(this); + return gSocketTransportService->Dispatch( + NS_NewRunnableFunction("WebTransportStreamProxy::Reset", + [self{std::move(self)}, error(aErrorCode)]() { + self->mWebTransportStream->Reset(error); + })); + } + + mWebTransportStream->Reset(aErrorCode); + return NS_OK; +} + +namespace { + +class StatsCallbackWrapper : public nsIWebTransportStreamStatsCallback { + public: + NS_DECL_THREADSAFE_ISUPPORTS + + explicit StatsCallbackWrapper(nsIWebTransportStreamStatsCallback* aCallback) + : mCallback(aCallback), mTarget(GetCurrentSerialEventTarget()) {} + + NS_IMETHOD OnSendStatsAvailable( + nsIWebTransportSendStreamStats* aStats) override { + if (!mTarget->IsOnCurrentThread()) { + RefPtr<StatsCallbackWrapper> self(this); + nsCOMPtr<nsIWebTransportSendStreamStats> stats = aStats; + Unused << mTarget->Dispatch(NS_NewRunnableFunction( + "StatsCallbackWrapper::OnSendStatsAvailable", + [self{std::move(self)}, stats{std::move(stats)}]() { + self->OnSendStatsAvailable(stats); + })); + return NS_OK; + } + + mCallback->OnSendStatsAvailable(aStats); + return NS_OK; + } + + NS_IMETHOD OnReceiveStatsAvailable( + nsIWebTransportReceiveStreamStats* aStats) override { + if (!mTarget->IsOnCurrentThread()) { + RefPtr<StatsCallbackWrapper> self(this); + nsCOMPtr<nsIWebTransportReceiveStreamStats> stats = aStats; + Unused << mTarget->Dispatch(NS_NewRunnableFunction( + "StatsCallbackWrapper::OnReceiveStatsAvailable", + [self{std::move(self)}, stats{std::move(stats)}]() { + self->OnReceiveStatsAvailable(stats); + })); + return NS_OK; + } + + mCallback->OnReceiveStatsAvailable(aStats); + return NS_OK; + } + + private: + virtual ~StatsCallbackWrapper() { + NS_ProxyRelease("StatsCallbackWrapper::~StatsCallbackWrapper", mTarget, + mCallback.forget()); + } + + nsCOMPtr<nsIWebTransportStreamStatsCallback> mCallback; + nsCOMPtr<nsIEventTarget> mTarget; +}; + +NS_IMPL_ISUPPORTS(StatsCallbackWrapper, nsIWebTransportStreamStatsCallback) + +} // namespace + +NS_IMETHODIMP WebTransportStreamProxy::GetSendStreamStats( + nsIWebTransportStreamStatsCallback* aCallback) { + if (!OnSocketThread()) { + RefPtr<WebTransportStreamProxy> self(this); + nsCOMPtr<nsIWebTransportStreamStatsCallback> callback = + new StatsCallbackWrapper(aCallback); + return gSocketTransportService->Dispatch(NS_NewRunnableFunction( + "WebTransportStreamProxy::GetSendStreamStats", + [self{std::move(self)}, callback{std::move(callback)}]() { + self->GetSendStreamStats(callback); + })); + } + + nsCOMPtr<nsIWebTransportSendStreamStats> stats = + mWebTransportStream->GetSendStreamStats(); + aCallback->OnSendStatsAvailable(stats); + return NS_OK; +} + +NS_IMETHODIMP WebTransportStreamProxy::GetReceiveStreamStats( + nsIWebTransportStreamStatsCallback* aCallback) { + if (!OnSocketThread()) { + RefPtr<WebTransportStreamProxy> self(this); + nsCOMPtr<nsIWebTransportStreamStatsCallback> callback = + new StatsCallbackWrapper(aCallback); + return gSocketTransportService->Dispatch(NS_NewRunnableFunction( + "WebTransportStreamProxy::GetReceiveStreamStats", + [self{std::move(self)}, callback{std::move(callback)}]() { + self->GetReceiveStreamStats(callback); + })); + } + + nsCOMPtr<nsIWebTransportReceiveStreamStats> stats = + mWebTransportStream->GetReceiveStreamStats(); + aCallback->OnReceiveStatsAvailable(stats); + return NS_OK; +} + +NS_IMETHODIMP WebTransportStreamProxy::GetHasReceivedFIN( + bool* aHasReceivedFIN) { + *aHasReceivedFIN = mWebTransportStream->RecvDone(); + return NS_OK; +} + +NS_IMETHODIMP WebTransportStreamProxy::GetInputStream( + nsIAsyncInputStream** aOut) { + if (!mReader) { + return NS_ERROR_NOT_AVAILABLE; + } + + RefPtr<AsyncInputStreamWrapper> stream = mReader; + stream.forget(aOut); + return NS_OK; +} + +NS_IMETHODIMP WebTransportStreamProxy::GetOutputStream( + nsIAsyncOutputStream** aOut) { + if (!mWriter) { + return NS_ERROR_NOT_AVAILABLE; + } + + RefPtr<AsyncOutputStreamWrapper> stream = mWriter; + stream.forget(aOut); + return NS_OK; +} + +NS_IMETHODIMP WebTransportStreamProxy::GetStreamId(uint64_t* aId) { + *aId = mWebTransportStream->StreamId(); + return NS_OK; +} + +//------------------------------------------------------------------------------ +// WebTransportStreamProxy::AsyncInputStreamWrapper +//------------------------------------------------------------------------------ + +NS_IMPL_ISUPPORTS(WebTransportStreamProxy::AsyncInputStreamWrapper, + nsIInputStream, nsIAsyncInputStream) + +WebTransportStreamProxy::AsyncInputStreamWrapper::AsyncInputStreamWrapper( + nsIAsyncInputStream* aStream, Http3WebTransportStream* aWebTransportStream) + : mStream(aStream), mWebTransportStream(aWebTransportStream) {} + +WebTransportStreamProxy::AsyncInputStreamWrapper::~AsyncInputStreamWrapper() = + default; + +void WebTransportStreamProxy::AsyncInputStreamWrapper::MaybeCloseStream() { + if (!mWebTransportStream->RecvDone()) { + return; + } + + uint64_t available = 0; + Unused << Available(&available); + if (available) { + // Don't close the InputStream if there's unread data available, since it + // would be lost. We exit above unless we know no more data will be received + // for the stream. + return; + } + + LOG( + ("AsyncInputStreamWrapper::MaybeCloseStream close stream due to FIN " + "stream=%p", + mWebTransportStream.get())); + Close(); +} + +NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::Close() { + return mStream->CloseWithStatus(NS_BASE_STREAM_CLOSED); +} + +NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::Available( + uint64_t* aAvailable) { + return mStream->Available(aAvailable); +} + +NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::StreamStatus() { + return mStream->StreamStatus(); +} + +NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::Read( + char* aBuf, uint32_t aCount, uint32_t* aResult) { + LOG(("WebTransportStreamProxy::AsyncInputStreamWrapper::Read %p", this)); + nsresult rv = mStream->Read(aBuf, aCount, aResult); + MaybeCloseStream(); + return rv; +} + +NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::ReadSegments( + nsWriteSegmentFun aWriter, void* aClosure, uint32_t aCount, + uint32_t* aResult) { + LOG(("WebTransportStreamProxy::AsyncInputStreamWrapper::ReadSegments %p", + this)); + nsresult rv = mStream->ReadSegments(aWriter, aClosure, aCount, aResult); + MaybeCloseStream(); + return rv; +} + +NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::IsNonBlocking( + bool* aResult) { + return mStream->IsNonBlocking(aResult); +} + +NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::CloseWithStatus( + nsresult aStatus) { + return mStream->CloseWithStatus(aStatus); +} + +NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::AsyncWait( + nsIInputStreamCallback* aCallback, uint32_t aFlags, + uint32_t aRequestedCount, nsIEventTarget* aEventTarget) { + return mStream->AsyncWait(aCallback, aFlags, aRequestedCount, aEventTarget); +} + +//------------------------------------------------------------------------------ +// WebTransportStreamProxy::AsyncOutputStreamWrapper +//------------------------------------------------------------------------------ + +NS_IMPL_ISUPPORTS(WebTransportStreamProxy::AsyncOutputStreamWrapper, + nsIOutputStream, nsIAsyncOutputStream) + +WebTransportStreamProxy::AsyncOutputStreamWrapper::AsyncOutputStreamWrapper( + nsIAsyncOutputStream* aStream) + : mStream(aStream) {} + +WebTransportStreamProxy::AsyncOutputStreamWrapper::~AsyncOutputStreamWrapper() = + default; + +NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::Flush() { + return mStream->Flush(); +} + +NS_IMETHODIMP +WebTransportStreamProxy::AsyncOutputStreamWrapper::StreamStatus() { + return mStream->StreamStatus(); +} + +NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::Write( + const char* aBuf, uint32_t aCount, uint32_t* aResult) { + return mStream->Write(aBuf, aCount, aResult); +} + +NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::WriteFrom( + nsIInputStream* aFromStream, uint32_t aCount, uint32_t* aResult) { + return mStream->WriteFrom(aFromStream, aCount, aResult); +} + +NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::WriteSegments( + nsReadSegmentFun aReader, void* aClosure, uint32_t aCount, + uint32_t* aResult) { + return mStream->WriteSegments(aReader, aClosure, aCount, aResult); +} + +NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::AsyncWait( + nsIOutputStreamCallback* aCallback, uint32_t aFlags, + uint32_t aRequestedCount, nsIEventTarget* aEventTarget) { + return mStream->AsyncWait(aCallback, aFlags, aRequestedCount, aEventTarget); +} + +NS_IMETHODIMP +WebTransportStreamProxy::AsyncOutputStreamWrapper::CloseWithStatus( + nsresult aStatus) { + return mStream->CloseWithStatus(aStatus); +} + +NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::Close() { + return mStream->Close(); +} + +NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::IsNonBlocking( + bool* aResult) { + return mStream->IsNonBlocking(aResult); +} + +} // namespace mozilla::net |