/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ #include "WebTransportLog.h" #include "Http3WebTransportSession.h" #include "Http3WebTransportStream.h" #include "WebTransportSessionProxy.h" #include "WebTransportStreamProxy.h" #include "nsIAsyncVerifyRedirectCallback.h" #include "nsIHttpChannel.h" #include "nsIHttpChannelInternal.h" #include "nsIRequest.h" #include "nsNetUtil.h" #include "nsProxyRelease.h" #include "nsSocketTransportService2.h" #include "mozilla/Logging.h" #include "mozilla/ScopeExit.h" #include "mozilla/StaticPrefs_network.h" namespace mozilla::net { LazyLogModule webTransportLog("nsWebTransport"); NS_IMPL_ISUPPORTS(WebTransportSessionProxy, WebTransportSessionEventListener, nsIWebTransport, nsIRedirectResultListener, nsIStreamListener, nsIChannelEventSink, nsIInterfaceRequestor); WebTransportSessionProxy::WebTransportSessionProxy() : mMutex("WebTransportSessionProxy::mMutex"), mTarget(GetMainThreadSerialEventTarget()) { LOG(("WebTransportSessionProxy constructor")); } WebTransportSessionProxy::~WebTransportSessionProxy() { if (OnSocketThread()) { return; } MutexAutoLock lock(mMutex); if ((mState != WebTransportSessionProxyState::NEGOTIATING_SUCCEEDED) && (mState != WebTransportSessionProxyState::ACTIVE) && (mState != WebTransportSessionProxyState::SESSION_CLOSE_PENDING)) { return; } MOZ_ASSERT(mState != WebTransportSessionProxyState::SESSION_CLOSE_PENDING, "We can not be in the SESSION_CLOSE_PENDING state in destructor, " "because should e an runnable that holds reference to this" "object."); Unused << gSocketTransportService->Dispatch(NS_NewRunnableFunction( "WebTransportSessionProxy::ProxyHttp3WebTransportSessionRelease", [self{std::move(mWebTransportSession)}]() {})); } //----------------------------------------------------------------------------- // WebTransportSessionProxy::nsIWebTransport //----------------------------------------------------------------------------- nsresult WebTransportSessionProxy::AsyncConnect( nsIURI* aURI, nsIPrincipal* aPrincipal, uint32_t aSecurityFlags, WebTransportSessionEventListener* aListener) { return AsyncConnectWithClient(aURI, aPrincipal, aSecurityFlags, aListener, Maybe()); } nsresult WebTransportSessionProxy::AsyncConnectWithClient( nsIURI* aURI, nsIPrincipal* aPrincipal, uint32_t aSecurityFlags, WebTransportSessionEventListener* aListener, const Maybe& aClientInfo) { MOZ_ASSERT(NS_IsMainThread()); LOG(("WebTransportSessionProxy::AsyncConnect")); { MutexAutoLock lock(mMutex); mListener = aListener; } auto cleanup = MakeScopeExit([self = RefPtr(this)] { MutexAutoLock lock(self->mMutex); self->mListener->OnSessionClosed(0, ""_ns); // TODO: find a better error. self->mChannel = nullptr; self->mListener = nullptr; self->ChangeState(WebTransportSessionProxyState::DONE); }); nsSecurityFlags flags = nsILoadInfo::SEC_COOKIES_OMIT | aSecurityFlags; nsLoadFlags loadFlags = nsIRequest::LOAD_NORMAL | nsIRequest::LOAD_BYPASS_CACHE | nsIRequest::INHIBIT_CACHING; nsresult rv = NS_ERROR_FAILURE; if (aClientInfo.isSome()) { rv = NS_NewChannel(getter_AddRefs(mChannel), aURI, aPrincipal, aClientInfo.ref(), Maybe(), flags, nsContentPolicyType::TYPE_WEB_TRANSPORT, /* aCookieJarSettings */ nullptr, /* aPerformanceStorage */ nullptr, /* aLoadGroup */ nullptr, /* aCallbacks */ this, loadFlags); } else { rv = NS_NewChannel(getter_AddRefs(mChannel), aURI, aPrincipal, flags, nsContentPolicyType::TYPE_WEB_TRANSPORT, /* aCookieJarSettings */ nullptr, /* aPerformanceStorage */ nullptr, /* aLoadGroup */ nullptr, /* aCallbacks */ this, loadFlags); } NS_ENSURE_SUCCESS(rv, rv); // configure HTTP specific stuff nsCOMPtr httpChannel = do_QueryInterface(mChannel); if (!httpChannel) { mChannel = nullptr; return NS_ERROR_ABORT; } { MutexAutoLock lock(mMutex); ChangeState(WebTransportSessionProxyState::NEGOTIATING); } // https://www.ietf.org/archive/id/draft-ietf-webtrans-http3-04.html#section-6 rv = httpChannel->SetRequestHeader("Sec-Webtransport-Http3-Draft02"_ns, "1"_ns, false); if (NS_FAILED(rv)) { return rv; } // To establish a WebTransport session with an origin origin, follow // [WEB-TRANSPORT-HTTP3] section 3.3, with using origin, serialized and // isomorphic encoded, as the `Origin` header of the request. // https://www.w3.org/TR/webtransport/#protocol-concepts nsAutoCString serializedOrigin; if (NS_FAILED(aPrincipal->GetAsciiOrigin(serializedOrigin))) { // origin/URI will be missing for system principals // assign null origin serializedOrigin = "null"_ns; } rv = httpChannel->SetRequestHeader("Origin"_ns, serializedOrigin, false); if (NS_FAILED(rv)) { return rv; } nsCOMPtr internalChannel = do_QueryInterface(mChannel); if (!internalChannel) { mChannel = nullptr; return NS_ERROR_ABORT; } Unused << internalChannel->SetWebTransportSessionEventListener(this); rv = mChannel->AsyncOpen(this); if (NS_SUCCEEDED(rv)) { cleanup.release(); } return rv; } NS_IMETHODIMP WebTransportSessionProxy::RetargetTo(nsIEventTarget* aTarget) { if (!aTarget) { return NS_ERROR_INVALID_ARG; } { MutexAutoLock lock(mMutex); LOG(("WebTransportSessionProxy::RetargetTo mState=%d", mState)); // RetargetTo should be only called after the session is ready. if (mState != WebTransportSessionProxyState::ACTIVE) { return NS_ERROR_UNEXPECTED; } mTarget = aTarget; } return NS_OK; } NS_IMETHODIMP WebTransportSessionProxy::GetStats() { return NS_ERROR_NOT_IMPLEMENTED; } NS_IMETHODIMP WebTransportSessionProxy::CloseSession(uint32_t status, const nsACString& reason) { MutexAutoLock lock(mMutex); MOZ_ASSERT(mTarget->IsOnCurrentThread()); mCloseStatus = status; mReason = reason; mListener = nullptr; mPendingEvents.Clear(); switch (mState) { case WebTransportSessionProxyState::INIT: case WebTransportSessionProxyState::DONE: return NS_ERROR_NOT_INITIALIZED; case WebTransportSessionProxyState::NEGOTIATING: mChannel->Cancel(NS_ERROR_ABORT); mChannel = nullptr; ChangeState(WebTransportSessionProxyState::DONE); break; case WebTransportSessionProxyState::NEGOTIATING_SUCCEEDED: mChannel->Cancel(NS_ERROR_ABORT); mChannel = nullptr; ChangeState(WebTransportSessionProxyState::SESSION_CLOSE_PENDING); CloseSessionInternal(); break; case WebTransportSessionProxyState::ACTIVE: ChangeState(WebTransportSessionProxyState::SESSION_CLOSE_PENDING); CloseSessionInternal(); break; case WebTransportSessionProxyState::CLOSE_CALLBACK_PENDING: ChangeState(WebTransportSessionProxyState::DONE); break; case SESSION_CLOSE_PENDING: break; } return NS_OK; } void WebTransportSessionProxy::CloseSessionInternalLocked() { MutexAutoLock lock(mMutex); CloseSessionInternal(); } void WebTransportSessionProxy::CloseSessionInternal() { if (!OnSocketThread()) { mMutex.AssertCurrentThreadOwns(); RefPtr self(this); Unused << gSocketTransportService->Dispatch(NS_NewRunnableFunction( "WebTransportSessionProxy::CallCloseWebTransportSession", [self{std::move(self)}]() { self->CloseSessionInternalLocked(); })); return; } mMutex.AssertCurrentThreadOwns(); RefPtr wt; uint32_t closeStatus = 0; nsCString reason; if (mState == WebTransportSessionProxyState::SESSION_CLOSE_PENDING) { MOZ_ASSERT(mWebTransportSession); wt = mWebTransportSession; mWebTransportSession = nullptr; closeStatus = mCloseStatus; reason = mReason; ChangeState(WebTransportSessionProxyState::DONE); } else { MOZ_ASSERT(mState == WebTransportSessionProxyState::DONE); } if (wt) { MutexAutoUnlock unlock(mMutex); wt->CloseSession(closeStatus, reason); } } class WebTransportStreamCallbackWrapper final { public: NS_INLINE_DECL_THREADSAFE_REFCOUNTING(WebTransportStreamCallbackWrapper) explicit WebTransportStreamCallbackWrapper( nsIWebTransportStreamCallback* aCallback, bool aBidi) : mCallback(aCallback), mTarget(GetCurrentSerialEventTarget()), mBidi(aBidi) {} void CallOnError(nsresult aError) { if (!mTarget->IsOnCurrentThread()) { RefPtr self(this); Unused << mTarget->Dispatch(NS_NewRunnableFunction( "WebTransportStreamCallbackWrapper::CallOnError", [self{std::move(self)}, error{aError}]() { self->CallOnError(error); })); return; } LOG(("WebTransportStreamCallbackWrapper::OnError aError=0x%" PRIx32, static_cast(aError))); Unused << mCallback->OnError(nsIWebTransport::INVALID_STATE_ERROR); } void CallOnStreamReady(WebTransportStreamProxy* aStream) { if (!mTarget->IsOnCurrentThread()) { RefPtr self(this); RefPtr stream = aStream; Unused << mTarget->Dispatch(NS_NewRunnableFunction( "WebTransportStreamCallbackWrapper::CallOnStreamReady", [self{std::move(self)}, stream{std::move(stream)}]() { self->CallOnStreamReady(stream); })); return; } if (mBidi) { Unused << mCallback->OnBidirectionalStreamReady(aStream); return; } Unused << mCallback->OnUnidirectionalStreamReady(aStream); } private: virtual ~WebTransportStreamCallbackWrapper() { NS_ProxyRelease( "WebTransportStreamCallbackWrapper::~WebTransportStreamCallbackWrapper", mTarget, mCallback.forget()); } nsCOMPtr mCallback; nsCOMPtr mTarget; bool mBidi = false; }; void WebTransportSessionProxy::CreateStreamInternal( nsIWebTransportStreamCallback* callback, bool aBidi) { mMutex.AssertCurrentThreadOwns(); LOG( ("WebTransportSessionProxy::CreateStreamInternal %p " "mState=%d, bidi=%d", this, mState, aBidi)); switch (mState) { case WebTransportSessionProxyState::INIT: case WebTransportSessionProxyState::NEGOTIATING: case WebTransportSessionProxyState::NEGOTIATING_SUCCEEDED: case WebTransportSessionProxyState::ACTIVE: { RefPtr wrapper = new WebTransportStreamCallbackWrapper(callback, aBidi); if (mState == WebTransportSessionProxyState::ACTIVE && mWebTransportSession) { DoCreateStream(wrapper, mWebTransportSession, aBidi); } else { LOG( ("WebTransportSessionProxy::CreateStreamInternal %p " " queue create stream event", this)); auto task = [self = RefPtr{this}, wrapper{std::move(wrapper)}, bidi(aBidi)](nsresult aStatus) { if (NS_FAILED(aStatus)) { wrapper->CallOnError(aStatus); return; } self->DoCreateStream(wrapper, nullptr, bidi); }; // TODO: we should do this properly in bug 1830362. mPendingCreateStreamEvents.AppendElement(std::move(task)); } } break; case WebTransportSessionProxyState::SESSION_CLOSE_PENDING: case WebTransportSessionProxyState::CLOSE_CALLBACK_PENDING: case WebTransportSessionProxyState::DONE: { nsCOMPtr cb(callback); NS_DispatchToCurrentThread(NS_NewRunnableFunction( "WebTransportSessionProxy::CreateStreamInternal", [cb{std::move(cb)}]() { cb->OnError(nsIWebTransport::INVALID_STATE_ERROR); })); } break; } } void WebTransportSessionProxy::DoCreateStream( WebTransportStreamCallbackWrapper* aCallback, Http3WebTransportSession* aSession, bool aBidi) { if (!OnSocketThread()) { RefPtr self(this); RefPtr wrapper(aCallback); Unused << gSocketTransportService->Dispatch(NS_NewRunnableFunction( "WebTransportSessionProxy::DoCreateStream", [self{std::move(self)}, wrapper{std::move(wrapper)}, bidi(aBidi)]() { self->DoCreateStream(wrapper, nullptr, bidi); })); return; } LOG(("WebTransportSessionProxy::DoCreateStream %p bidi=%d", this, aBidi)); RefPtr session = aSession; // Having no session here means that this is called by dispatching tasks. // The mState may be already changed, so we need to check it again. if (!aSession) { MutexAutoLock lock(mMutex); switch (mState) { case WebTransportSessionProxyState::INIT: case WebTransportSessionProxyState::NEGOTIATING: case WebTransportSessionProxyState::NEGOTIATING_SUCCEEDED: MOZ_ASSERT(false, "DoCreateStream called with invalid state"); aCallback->CallOnError(NS_ERROR_UNEXPECTED); return; case WebTransportSessionProxyState::ACTIVE: { session = mWebTransportSession; } break; case WebTransportSessionProxyState::SESSION_CLOSE_PENDING: case WebTransportSessionProxyState::CLOSE_CALLBACK_PENDING: case WebTransportSessionProxyState::DONE: // Session is going to be closed. aCallback->CallOnError(NS_ERROR_NOT_AVAILABLE); return; } } if (!session) { MOZ_ASSERT_UNREACHABLE("This should not happen"); aCallback->CallOnError(NS_ERROR_UNEXPECTED); return; } RefPtr wrapper(aCallback); auto callback = [wrapper{std::move(wrapper)}]( Result, nsresult>&& aResult) { if (aResult.isErr()) { wrapper->CallOnError(aResult.unwrapErr()); return; } RefPtr stream = aResult.unwrap(); RefPtr streamProxy = new WebTransportStreamProxy(stream); wrapper->CallOnStreamReady(streamProxy); }; if (aBidi) { session->CreateOutgoingBidirectionalStream(std::move(callback)); } else { session->CreateOutgoingUnidirectionalStream(std::move(callback)); } } NS_IMETHODIMP WebTransportSessionProxy::CreateOutgoingUnidirectionalStream( nsIWebTransportStreamCallback* callback) { if (!callback) { return NS_ERROR_INVALID_ARG; } MutexAutoLock lock(mMutex); CreateStreamInternal(callback, false); return NS_OK; } NS_IMETHODIMP WebTransportSessionProxy::CreateOutgoingBidirectionalStream( nsIWebTransportStreamCallback* callback) { if (!callback) { return NS_ERROR_INVALID_ARG; } MutexAutoLock lock(mMutex); CreateStreamInternal(callback, true); return NS_OK; } void WebTransportSessionProxy::SendDatagramInternal( const RefPtr& aSession, nsTArray&& aData, uint64_t aTrackingId) { MOZ_ASSERT(OnSocketThread()); aSession->SendDatagram(std::move(aData), aTrackingId); } NS_IMETHODIMP WebTransportSessionProxy::SendDatagram(const nsTArray& aData, uint64_t aTrackingId) { RefPtr session; { MutexAutoLock lock(mMutex); if (mState != WebTransportSessionProxyState::ACTIVE || !mWebTransportSession) { return NS_ERROR_NOT_AVAILABLE; } session = mWebTransportSession; } nsTArray copied; copied.Assign(aData); if (!OnSocketThread()) { return gSocketTransportService->Dispatch(NS_NewRunnableFunction( "WebTransportSessionProxy::SendDatagramInternal", [self = RefPtr{this}, session{std::move(session)}, data{std::move(copied)}, trackingId(aTrackingId)]() mutable { self->SendDatagramInternal(session, std::move(data), trackingId); })); } SendDatagramInternal(session, std::move(copied), aTrackingId); return NS_OK; } void WebTransportSessionProxy::GetMaxDatagramSizeInternal( const RefPtr& aSession) { MOZ_ASSERT(OnSocketThread()); aSession->GetMaxDatagramSize(); } NS_IMETHODIMP WebTransportSessionProxy::GetMaxDatagramSize() { RefPtr session; { MutexAutoLock lock(mMutex); if (mState != WebTransportSessionProxyState::ACTIVE || !mWebTransportSession) { return NS_ERROR_NOT_AVAILABLE; } session = mWebTransportSession; } if (!OnSocketThread()) { return gSocketTransportService->Dispatch(NS_NewRunnableFunction( "WebTransportSessionProxy::GetMaxDatagramSizeInternal", [self = RefPtr{this}, session{std::move(session)}]() { self->GetMaxDatagramSizeInternal(session); })); } GetMaxDatagramSizeInternal(session); return NS_OK; } //----------------------------------------------------------------------------- // WebTransportSessionProxy::nsIStreamListener //----------------------------------------------------------------------------- NS_IMETHODIMP WebTransportSessionProxy::OnStartRequest(nsIRequest* aRequest) { MOZ_ASSERT(NS_IsMainThread()); LOG(("WebTransportSessionProxy::OnStartRequest\n")); nsCOMPtr listener; nsAutoCString reason; uint32_t closeStatus = 0; { MutexAutoLock lock(mMutex); switch (mState) { case WebTransportSessionProxyState::INIT: case WebTransportSessionProxyState::DONE: case WebTransportSessionProxyState::ACTIVE: case WebTransportSessionProxyState::SESSION_CLOSE_PENDING: MOZ_ASSERT(false, "OnStartRequest cannot be called in this state."); break; case WebTransportSessionProxyState::NEGOTIATING: case WebTransportSessionProxyState::CLOSE_CALLBACK_PENDING: listener = mListener; mListener = nullptr; mChannel = nullptr; reason = mReason; closeStatus = mCloseStatus; ChangeState(WebTransportSessionProxyState::DONE); break; case WebTransportSessionProxyState::NEGOTIATING_SUCCEEDED: { uint32_t status; nsCOMPtr httpChannel = do_QueryInterface(mChannel); if (!httpChannel || NS_FAILED(httpChannel->GetResponseStatus(&status)) || !(status >= 200 && status < 300)) { listener = mListener; mListener = nullptr; mChannel = nullptr; mReason = ""_ns; reason = ""_ns; mCloseStatus = 0; // TODO: find a better error. Currently error code 0 is used ChangeState(WebTransportSessionProxyState::SESSION_CLOSE_PENDING); CloseSessionInternal(); // TODO: find a better error. Currently error // code 0 is used. } // The success cases will be handled in OnStopRequest. } break; } } if (listener) { listener->OnSessionClosed(closeStatus, reason); } return NS_OK; } NS_IMETHODIMP WebTransportSessionProxy::OnDataAvailable(nsIRequest* aRequest, nsIInputStream* aStream, uint64_t aOffset, uint32_t aCount) { MOZ_ASSERT(NS_IsMainThread()); MOZ_RELEASE_ASSERT( false, "WebTransportSessionProxy::OnDataAvailable should not be called"); return NS_OK; } NS_IMETHODIMP WebTransportSessionProxy::OnStopRequest(nsIRequest* aRequest, nsresult aStatus) { MOZ_ASSERT(NS_IsMainThread()); mChannel = nullptr; nsCOMPtr listener; nsAutoCString reason; uint32_t closeStatus = 0; uint64_t sessionId; bool succeeded = false; nsTArray> pendingEvents; nsTArray> pendingCreateStreamEvents; { MutexAutoLock lock(mMutex); switch (mState) { case WebTransportSessionProxyState::INIT: case WebTransportSessionProxyState::ACTIVE: case WebTransportSessionProxyState::NEGOTIATING: MOZ_ASSERT(false, "OnStopRequest cannot be called in this state."); break; case WebTransportSessionProxyState::CLOSE_CALLBACK_PENDING: reason = mReason; closeStatus = mCloseStatus; listener = mListener; mListener = nullptr; ChangeState(WebTransportSessionProxyState::DONE); break; case WebTransportSessionProxyState::NEGOTIATING_SUCCEEDED: if (NS_FAILED(aStatus)) { listener = mListener; mListener = nullptr; mReason = ""_ns; reason = ""_ns; mCloseStatus = 0; ChangeState(WebTransportSessionProxyState::SESSION_CLOSE_PENDING); CloseSessionInternal(); // TODO: find a better error. Currently error // code 0 is used. } else { succeeded = true; sessionId = mSessionId; listener = mListener; ChangeState(WebTransportSessionProxyState::ACTIVE); } break; case WebTransportSessionProxyState::SESSION_CLOSE_PENDING: case WebTransportSessionProxyState::DONE: break; } pendingEvents = std::move(mPendingEvents); pendingCreateStreamEvents = std::move(mPendingCreateStreamEvents); if (!pendingCreateStreamEvents.IsEmpty()) { if (NS_SUCCEEDED(aStatus) && (mState == WebTransportSessionProxyState::DONE || mState == WebTransportSessionProxyState::CLOSE_CALLBACK_PENDING || mState == WebTransportSessionProxyState::SESSION_CLOSE_PENDING)) { aStatus = NS_ERROR_FAILURE; } } mStopRequestCalled = true; } if (!pendingCreateStreamEvents.IsEmpty()) { Unused << gSocketTransportService->Dispatch(NS_NewRunnableFunction( "WebTransportSessionProxy::DispatchPendingCreateStreamEvents", [pendingCreateStreamEvents = std::move(pendingCreateStreamEvents), status(aStatus)]() { for (const auto& event : pendingCreateStreamEvents) { event(status); } })); } // otherwise let the CreateStreams just go away if (listener) { if (succeeded) { listener->OnSessionReady(sessionId); if (!pendingEvents.IsEmpty()) { Unused << gSocketTransportService->Dispatch(NS_NewRunnableFunction( "WebTransportSessionProxy::DispatchPendingEvents", [pendingEvents = std::move(pendingEvents)]() { for (const auto& event : pendingEvents) { event(); } })); } } else { listener->OnSessionClosed(closeStatus, reason); // TODO: find a better error. // Currently error code 0 is used. } } return NS_OK; } //----------------------------------------------------------------------------- // WebTransportSessionProxy::nsIChannelEventSink //----------------------------------------------------------------------------- NS_IMETHODIMP WebTransportSessionProxy::AsyncOnChannelRedirect( nsIChannel* aOldChannel, nsIChannel* aNewChannel, uint32_t aFlags, nsIAsyncVerifyRedirectCallback* callback) { // Currently implementation we do not reach this part of the code // as location headers are not forwarded by the http3 stack to the applicaion. // Hence, the channel is aborted due to the location header check in // nsHttpChannel::AsyncProcessRedirection This comment must be removed after // the following neqo bug is resolved // https://github.com/mozilla/neqo/issues/1364 if (!StaticPrefs::network_webtransport_redirect_enabled()) { LOG(("Channel Redirects are disabled for WebTransport sessions")); return NS_ERROR_ABORT; } nsCOMPtr newURI; nsresult rv = NS_GetFinalChannelURI(aNewChannel, getter_AddRefs(newURI)); NS_ENSURE_SUCCESS(rv, rv); rv = aNewChannel->GetURI(getter_AddRefs(newURI)); if (NS_FAILED(rv)) { callback->OnRedirectVerifyCallback(rv); return NS_OK; } // abort the request if redirecting to insecure context if (!newURI->SchemeIs("https")) { callback->OnRedirectVerifyCallback(NS_ERROR_ABORT); return NS_OK; } // Assign to mChannel after we get notification about success of the // redirect in OnRedirectResult. mRedirectChannel = aNewChannel; callback->OnRedirectVerifyCallback(NS_OK); return NS_OK; } //----------------------------------------------------------------------------- // WebTransportSessionProxy::nsIRedirectResultListener //----------------------------------------------------------------------------- NS_IMETHODIMP WebTransportSessionProxy::OnRedirectResult(nsresult aStatus) { if (NS_SUCCEEDED(aStatus) && mRedirectChannel) { mChannel = mRedirectChannel; } mRedirectChannel = nullptr; return NS_OK; } //----------------------------------------------------------------------------- // WebTransportSessionProxy::nsIInterfaceRequestor //----------------------------------------------------------------------------- NS_IMETHODIMP WebTransportSessionProxy::GetInterface(const nsIID& aIID, void** aResult) { if (aIID.Equals(NS_GET_IID(nsIChannelEventSink))) { NS_ADDREF_THIS(); *aResult = static_cast(this); return NS_OK; } if (aIID.Equals(NS_GET_IID(nsIRedirectResultListener))) { NS_ADDREF_THIS(); *aResult = static_cast(this); return NS_OK; } return NS_ERROR_NO_INTERFACE; } //----------------------------------------------------------------------------- // WebTransportSessionProxy::WebTransportSessionEventListener //----------------------------------------------------------------------------- // This function is called when the Http3WebTransportSession is ready. After // this call WebTransportSessionProxy is responsible for the // Http3WebTransportSession, i.e. it is responsible for closing it. // The listener of the WebTransportSessionProxy will be informed during // OnStopRequest call. NS_IMETHODIMP WebTransportSessionProxy::OnSessionReadyInternal( Http3WebTransportSession* aSession) { MOZ_ASSERT(OnSocketThread(), "not on socket thread"); LOG(("WebTransportSessionProxy::OnSessionReadyInternal")); MutexAutoLock lock(mMutex); switch (mState) { case WebTransportSessionProxyState::INIT: case WebTransportSessionProxyState::CLOSE_CALLBACK_PENDING: case WebTransportSessionProxyState::ACTIVE: case WebTransportSessionProxyState::SESSION_CLOSE_PENDING: case WebTransportSessionProxyState::NEGOTIATING_SUCCEEDED: MOZ_ASSERT(false, "OnSessionReadyInternal cannot be called in this state."); return NS_ERROR_ABORT; case WebTransportSessionProxyState::NEGOTIATING: mWebTransportSession = aSession; mSessionId = aSession->StreamId(); ChangeState(WebTransportSessionProxyState::NEGOTIATING_SUCCEEDED); break; case WebTransportSessionProxyState::DONE: // The session has been canceled. We do not need to set // mWebTransportSession. break; } return NS_OK; } NS_IMETHODIMP WebTransportSessionProxy::OnIncomingStreamAvailableInternal( Http3WebTransportStream* aStream) { nsCOMPtr listener; { MutexAutoLock lock(mMutex); LOG( ("WebTransportSessionProxy::OnIncomingStreamAvailableInternal %p " "mState=%d " "mStopRequestCalled=%d", this, mState, mStopRequestCalled)); // Since OnSessionReady on the listener is called on the main thread, // OnIncomingStreamAvailableInternal and OnSessionReady can be racy. If // OnStopRequest is not called yet, OnIncomingStreamAvailableInternal needs // to wait. if (!mStopRequestCalled) { mPendingEvents.AppendElement( [self = RefPtr{this}, stream = RefPtr{aStream}]() { self->OnIncomingStreamAvailableInternal(stream); }); return NS_OK; } if (!mTarget->IsOnCurrentThread()) { RefPtr self(this); RefPtr stream = aStream; Unused << mTarget->Dispatch(NS_NewRunnableFunction( "WebTransportSessionProxy::OnIncomingStreamAvailableInternal", [self{std::move(self)}, stream{std::move(stream)}]() { self->OnIncomingStreamAvailableInternal(stream); })); return NS_OK; } LOG( ("WebTransportSessionProxy::OnIncomingStreamAvailableInternal %p " "mState=%d mListener=%p", this, mState, mListener.get())); if (mState == WebTransportSessionProxyState::ACTIVE) { listener = mListener; } } if (!listener) { // Session can be already closed. return NS_OK; } RefPtr streamProxy = new WebTransportStreamProxy(aStream); if (aStream->StreamType() == WebTransportStreamType::BiDi) { Unused << listener->OnIncomingBidirectionalStreamAvailable(streamProxy); } else { Unused << listener->OnIncomingUnidirectionalStreamAvailable(streamProxy); } return NS_OK; } NS_IMETHODIMP WebTransportSessionProxy::OnIncomingBidirectionalStreamAvailable( nsIWebTransportBidirectionalStream* aStream) { return NS_OK; } NS_IMETHODIMP WebTransportSessionProxy::OnIncomingUnidirectionalStreamAvailable( nsIWebTransportReceiveStream* aStream) { return NS_OK; } NS_IMETHODIMP WebTransportSessionProxy::OnSessionReady(uint64_t ready) { MOZ_ASSERT(false, "Should not b called"); return NS_OK; } NS_IMETHODIMP WebTransportSessionProxy::OnSessionClosed(uint32_t status, const nsACString& reason) { MOZ_ASSERT(OnSocketThread(), "not on socket thread"); MutexAutoLock lock(mMutex); LOG( ("WebTransportSessionProxy::OnSessionClosed %p mState=%d " "mStopRequestCalled=%d", this, mState, mStopRequestCalled)); // Since OnSessionReady on the listener is called on the main thread, // OnSessionClosed and OnSessionReady can be racy. If OnStopRequest is not // called yet, OnSessionClosed needs to wait. if (!mStopRequestCalled) { nsCString closeReason(reason); mPendingEvents.AppendElement( [self = RefPtr{this}, status(status), closeReason(closeReason)]() { Unused << self->OnSessionClosed(status, closeReason); }); return NS_OK; } switch (mState) { case WebTransportSessionProxyState::INIT: case WebTransportSessionProxyState::NEGOTIATING: case WebTransportSessionProxyState::CLOSE_CALLBACK_PENDING: MOZ_ASSERT(false, "OnSessionClosed cannot be called in this state."); return NS_ERROR_ABORT; case WebTransportSessionProxyState::NEGOTIATING_SUCCEEDED: case WebTransportSessionProxyState::ACTIVE: { mCloseStatus = status; mReason = reason; mWebTransportSession = nullptr; ChangeState(WebTransportSessionProxyState::CLOSE_CALLBACK_PENDING); CallOnSessionClosed(); } break; case WebTransportSessionProxyState::SESSION_CLOSE_PENDING: ChangeState(WebTransportSessionProxyState::DONE); break; case WebTransportSessionProxyState::DONE: // The session has been canceled. We do not need to set // mWebTransportSession. break; } return NS_OK; } void WebTransportSessionProxy::CallOnSessionClosedLocked() { MutexAutoLock lock(mMutex); CallOnSessionClosed(); } void WebTransportSessionProxy::CallOnSessionClosed() { mMutex.AssertCurrentThreadOwns(); if (!mTarget->IsOnCurrentThread()) { RefPtr self(this); Unused << mTarget->Dispatch(NS_NewRunnableFunction( "WebTransportSessionProxy::CallOnSessionClosed", [self{std::move(self)}]() { self->CallOnSessionClosedLocked(); })); return; } MOZ_ASSERT(mTarget->IsOnCurrentThread()); nsCOMPtr listener; nsAutoCString reason; uint32_t closeStatus = 0; switch (mState) { case WebTransportSessionProxyState::INIT: case WebTransportSessionProxyState::NEGOTIATING: case WebTransportSessionProxyState::NEGOTIATING_SUCCEEDED: case WebTransportSessionProxyState::ACTIVE: case WebTransportSessionProxyState::SESSION_CLOSE_PENDING: MOZ_ASSERT(false, "CallOnSessionClosed cannot be called in this state."); break; case WebTransportSessionProxyState::CLOSE_CALLBACK_PENDING: listener = mListener; mListener = nullptr; reason = mReason; closeStatus = mCloseStatus; ChangeState(WebTransportSessionProxyState::DONE); break; case WebTransportSessionProxyState::DONE: break; } if (listener) { // Don't invoke the callback under the lock. MutexAutoUnlock unlock(mMutex); listener->OnSessionClosed(closeStatus, reason); } } void WebTransportSessionProxy::ChangeState( WebTransportSessionProxyState newState) { mMutex.AssertCurrentThreadOwns(); LOG(("WebTransportSessionProxy::ChangeState %d -> %d [this=%p]", mState, newState, this)); switch (newState) { case WebTransportSessionProxyState::INIT: MOZ_ASSERT(false, "Cannot change into INIT sate."); break; case WebTransportSessionProxyState::NEGOTIATING: MOZ_ASSERT(mState == WebTransportSessionProxyState::INIT, "Only from INIT can be change into NEGOTIATING"); MOZ_ASSERT(mChannel); MOZ_ASSERT(mListener); break; case WebTransportSessionProxyState::NEGOTIATING_SUCCEEDED: MOZ_ASSERT( mState == WebTransportSessionProxyState::NEGOTIATING, "Only from NEGOTIATING can be change into NEGOTIATING_SUCCEEDED"); MOZ_ASSERT(mChannel); MOZ_ASSERT(mWebTransportSession); MOZ_ASSERT(mListener); break; case WebTransportSessionProxyState::ACTIVE: MOZ_ASSERT(mState == WebTransportSessionProxyState::NEGOTIATING_SUCCEEDED, "Only from NEGOTIATING_SUCCEEDED can be change into ACTIVE"); MOZ_ASSERT(!mChannel); MOZ_ASSERT(mWebTransportSession); MOZ_ASSERT(mListener); break; case WebTransportSessionProxyState::SESSION_CLOSE_PENDING: MOZ_ASSERT( (mState == WebTransportSessionProxyState::NEGOTIATING_SUCCEEDED) || (mState == WebTransportSessionProxyState::ACTIVE), "Only from NEGOTIATING_SUCCEEDED and ACTIVE can be change into" " SESSION_CLOSE_PENDING"); MOZ_ASSERT(!mChannel); MOZ_ASSERT(mWebTransportSession); MOZ_ASSERT(!mListener); break; case WebTransportSessionProxyState::CLOSE_CALLBACK_PENDING: MOZ_ASSERT( (mState == WebTransportSessionProxyState::NEGOTIATING_SUCCEEDED) || (mState == WebTransportSessionProxyState::ACTIVE), "Only from NEGOTIATING_SUCCEEDED and ACTIVE can be change into" " CLOSE_CALLBACK_PENDING"); MOZ_ASSERT(!mWebTransportSession); MOZ_ASSERT(mListener); break; case WebTransportSessionProxyState::DONE: MOZ_ASSERT( (mState == WebTransportSessionProxyState::NEGOTIATING) || (mState == WebTransportSessionProxyState::SESSION_CLOSE_PENDING) || (mState == WebTransportSessionProxyState::CLOSE_CALLBACK_PENDING), "Only from NEGOTIATING, SESSION_CLOSE_PENDING and " "CLOSE_CALLBACK_PENDING can be change into DONE"); MOZ_ASSERT(!mChannel); MOZ_ASSERT(!mWebTransportSession); MOZ_ASSERT(!mListener); break; } mState = newState; } void WebTransportSessionProxy::NotifyDatagramReceived( nsTArray&& aData) { nsCOMPtr listener; { MutexAutoLock lock(mMutex); MOZ_ASSERT(mTarget->IsOnCurrentThread()); if (!mStopRequestCalled) { CopyableTArray copied(aData); mPendingEvents.AppendElement( [self = RefPtr{this}, data = std::move(copied)]() mutable { self->NotifyDatagramReceived(std::move(data)); }); return; } if (mState != WebTransportSessionProxyState::ACTIVE || !mListener) { return; } listener = mListener; } listener->OnDatagramReceived(aData); } NS_IMETHODIMP WebTransportSessionProxy::OnDatagramReceivedInternal( nsTArray&& aData) { MOZ_ASSERT(OnSocketThread()); { MutexAutoLock lock(mMutex); if (!mTarget->IsOnCurrentThread()) { return mTarget->Dispatch(NS_NewRunnableFunction( "WebTransportSessionProxy::OnDatagramReceived", [self = RefPtr{this}, data{std::move(aData)}]() mutable { self->NotifyDatagramReceived(std::move(data)); })); } } NotifyDatagramReceived(std::move(aData)); return NS_OK; } NS_IMETHODIMP WebTransportSessionProxy::OnDatagramReceived( const nsTArray& aData) { return NS_ERROR_NOT_IMPLEMENTED; } void WebTransportSessionProxy::OnMaxDatagramSizeInternal(uint64_t aSize) { nsCOMPtr listener; { MutexAutoLock lock(mMutex); MOZ_ASSERT(mTarget->IsOnCurrentThread()); if (!mStopRequestCalled) { mPendingEvents.AppendElement([self = RefPtr{this}, size(aSize)]() { self->OnMaxDatagramSizeInternal(size); }); return; } if (mState != WebTransportSessionProxyState::ACTIVE || !mListener) { return; } listener = mListener; } listener->OnMaxDatagramSize(aSize); } NS_IMETHODIMP WebTransportSessionProxy::OnMaxDatagramSize(uint64_t aSize) { MOZ_ASSERT(OnSocketThread()); { MutexAutoLock lock(mMutex); if (!mTarget->IsOnCurrentThread()) { return mTarget->Dispatch( NS_NewRunnableFunction("WebTransportSessionProxy::OnMaxDatagramSize", [self = RefPtr{this}, size(aSize)] { self->OnMaxDatagramSizeInternal(size); })); } } OnMaxDatagramSizeInternal(aSize); return NS_OK; } void WebTransportSessionProxy::OnOutgoingDatagramOutComeInternal( uint64_t aId, WebTransportSessionEventListener::DatagramOutcome aOutCome) { nsCOMPtr listener; { MutexAutoLock lock(mMutex); MOZ_ASSERT(mTarget->IsOnCurrentThread()); if (mState != WebTransportSessionProxyState::ACTIVE || !mListener) { return; } listener = mListener; } listener->OnOutgoingDatagramOutCome(aId, aOutCome); } NS_IMETHODIMP WebTransportSessionProxy::OnOutgoingDatagramOutCome( uint64_t aId, WebTransportSessionEventListener::DatagramOutcome aOutCome) { MOZ_ASSERT(OnSocketThread()); { MutexAutoLock lock(mMutex); if (!mTarget->IsOnCurrentThread()) { return mTarget->Dispatch(NS_NewRunnableFunction( "WebTransportSessionProxy::OnOutgoingDatagramOutCome", [self = RefPtr{this}, id(aId), outcome(aOutCome)] { self->OnOutgoingDatagramOutComeInternal(id, outcome); })); } } OnOutgoingDatagramOutComeInternal(aId, aOutCome); return NS_OK; } NS_IMETHODIMP WebTransportSessionProxy::OnStopSending(uint64_t aStreamId, nsresult aError) { MOZ_ASSERT(OnSocketThread()); nsCOMPtr listener; { MutexAutoLock lock(mMutex); MOZ_ASSERT(mTarget->IsOnCurrentThread()); if (mState != WebTransportSessionProxyState::ACTIVE || !mListener) { return NS_OK; } listener = mListener; } listener->OnStopSending(aStreamId, aError); return NS_OK; } NS_IMETHODIMP WebTransportSessionProxy::OnResetReceived(uint64_t aStreamId, nsresult aError) { MOZ_ASSERT(OnSocketThread()); nsCOMPtr listener; { MutexAutoLock lock(mMutex); MOZ_ASSERT(mTarget->IsOnCurrentThread()); if (mState != WebTransportSessionProxyState::ACTIVE || !mListener) { return NS_OK; } listener = mListener; } listener->OnResetReceived(aStreamId, aError); return NS_OK; } } // namespace mozilla::net