/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim: set sw=2 ts=8 et tw=80 : */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ // HttpLog.h should generally be included first #include "HttpLog.h" #include "Capsule.h" #include "CapsuleEncoder.h" #include "Http2WebTransportSession.h" #include "Http2WebTransportStream.h" #include "Http2Session.h" #include "mozilla/net/NeqoHttp3Conn.h" #include "nsIWebTransport.h" #include "nsIOService.h" #include "nsHttp.h" namespace mozilla::net { Http2WebTransportSessionImpl::CapsuleQueue::CapsuleQueue() = default; mozilla::Queue>& Http2WebTransportSessionImpl::CapsuleQueue::operator[]( CapsuleTransmissionPriority aPriority) { if (aPriority == CapsuleTransmissionPriority::Critical) { return mCritical; } if (aPriority == CapsuleTransmissionPriority::Important) { return mImportant; } if (aPriority == CapsuleTransmissionPriority::High) { return mHigh; } if (aPriority == CapsuleTransmissionPriority::Normal) { return mNormal; } return mLow; } Http2WebTransportSessionImpl::Http2WebTransportSessionImpl( CapsuleIOHandler* aHandler, Http2WebTransportInitialSettings aSettings) : mSettings(aSettings), mRemoteStreamsFlowControl(aSettings.mInitialLocalMaxStreamsBidi, aSettings.mInitialLocalMaxStreamsUnidi), mHandler(aHandler), mSessionDataFc(aSettings.mInitialMaxData), mReceiverFc(aSettings.mInitialLocalMaxData) { LOG(("Http2WebTransportSessionImpl ctor:%p", this)); mLocalStreamsFlowControl[WebTransportStreamType::UniDi].Update( mSettings.mInitialMaxStreamsUni); mLocalStreamsFlowControl[WebTransportStreamType::BiDi].Update( mSettings.mInitialMaxStreamsBidi); } Http2WebTransportSessionImpl::~Http2WebTransportSessionImpl() { LOG(("Http2WebTransportSessionImpl dtor:%p", this)); } void Http2WebTransportSessionImpl::CloseSession(uint32_t aStatus, const nsACString& aReason) { LOG(("Http2WebTransportSessionImpl::CloseSession %p aStatus=%x", this, aStatus)); mHandler->SetSentFin(); Capsule capsule = Capsule::CloseWebTransportSession(aStatus, aReason); UniquePtr encoder = MakeUnique(); encoder->EncodeCapsule(capsule); EnqueueOutCapsule(CapsuleTransmissionPriority::Important, std::move(encoder)); } uint64_t Http2WebTransportSessionImpl::GetStreamId() const { return mStreamId; } void Http2WebTransportSessionImpl::GetMaxDatagramSize() {} void Http2WebTransportSessionImpl::SendDatagram(nsTArray&& aData, uint64_t aTrackingId) { LOG(("Http2WebTransportSession::SendDatagram %p", this)); Capsule capsule = Capsule::WebTransportDatagram(std::move(aData)); UniquePtr encoder = MakeUnique(); encoder->EncodeCapsule(capsule); EnqueueOutCapsule(CapsuleTransmissionPriority::Normal, std::move(encoder)); } void Http2WebTransportSessionImpl::CreateOutgoingStreamInternal( StreamId aStreamId, std::function, nsresult>&&)>&& aCallback) { LOG( ("Http2WebTransportSessionImpl::CreateOutgoingStreamInternal %p " "id:%" PRIx64, this, (uint64_t)aStreamId)); RefPtr stream = new Http2WebTransportStream( this, aStreamId, aStreamId.IsBiDi() ? mSettings.mInitialMaxStreamDataBidi : mSettings.mInitialMaxStreamDataUni, aStreamId.IsBiDi() ? mSettings.mInitialLocalMaxStreamDataBidi : mSettings.mInitialLocalMaxStreamDataUnidi, std::move(aCallback)); if (NS_FAILED(stream->Init())) { return; } mOutgoingStreams.InsertOrUpdate(aStreamId, std::move(stream)); } void Http2WebTransportSessionImpl::ProcessPendingStreamCallbacks( mozilla::Queue>& aCallbacks, WebTransportStreamType aStreamType) { size_t size = aCallbacks.Count(); for (size_t count = 0; count < size; ++count) { auto id = mLocalStreamsFlowControl.TakeStreamId(aStreamType); if (!id) { break; } UniquePtr callback = aCallbacks.Pop(); auto cb = callback->TakeCallback(); CreateOutgoingStreamInternal(*id, std::move(cb)); } } void Http2WebTransportSessionImpl::CreateOutgoingBidirectionalStream( std::function, nsresult>&&)>&& aCallback) { auto id = mLocalStreamsFlowControl.TakeStreamId(WebTransportStreamType::BiDi); if (!id) { mBidiPendingStreamCallbacks.Push( MakeUnique(std::move(aCallback))); return; } CreateOutgoingStreamInternal(*id, std::move(aCallback)); } void Http2WebTransportSessionImpl::CreateOutgoingUnidirectionalStream( std::function, nsresult>&&)>&& aCallback) { auto id = mLocalStreamsFlowControl.TakeStreamId(WebTransportStreamType::UniDi); if (!id) { mUnidiPendingStreamCallbacks.Push( MakeUnique(std::move(aCallback))); return; } CreateOutgoingStreamInternal(*id, std::move(aCallback)); } void Http2WebTransportSessionImpl::StartReading() { LOG(("Http2WebTransportSessionImpl::StartReading %p", this)); mHandler->StartReading(); } void Http2WebTransportSessionImpl::EnqueueOutCapsule( CapsuleTransmissionPriority aPriority, UniquePtr&& aData) { mCapsuleQueue[aPriority].Push(std::move(aData)); mHandler->HasCapsuleToSend(); } void Http2WebTransportSessionImpl::SendMaintenanceCapsules( CapsuleTransmissionPriority aPriority) { auto encoder = mSessionDataFc.CreateSessionDataBlockedCapsule(); if (encoder) { mCapsuleQueue[aPriority].Push(MakeUnique(encoder.ref())); } encoder = mReceiverFc.CreateMaxDataCapsule(); if (encoder) { mCapsuleQueue[aPriority].Push(MakeUnique(encoder.ref())); } encoder = mLocalStreamsFlowControl[WebTransportStreamType::BiDi] .CreateStreamsBlockedCapsule(); if (encoder) { mCapsuleQueue[aPriority].Push(MakeUnique(encoder.ref())); } encoder = mLocalStreamsFlowControl[WebTransportStreamType::UniDi] .CreateStreamsBlockedCapsule(); if (encoder) { mCapsuleQueue[aPriority].Push(MakeUnique(encoder.ref())); } encoder = mRemoteStreamsFlowControl[WebTransportStreamType::BiDi] .FlowControl() .CreateMaxStreamsCapsule(); if (encoder) { mCapsuleQueue[aPriority].Push(MakeUnique(encoder.ref())); } encoder = mRemoteStreamsFlowControl[WebTransportStreamType::UniDi] .FlowControl() .CreateMaxStreamsCapsule(); if (encoder) { mCapsuleQueue[aPriority].Push(MakeUnique(encoder.ref())); } for (const auto& stream : mOutgoingStreams.Values()) { stream->WriteMaintenanceCapsules(mCapsuleQueue[aPriority]); } for (const auto& stream : mIncomingStreams.Values()) { stream->WriteMaintenanceCapsules(mCapsuleQueue[aPriority]); } } void Http2WebTransportSessionImpl::StreamHasCapsuleToSend() { mHandler->HasCapsuleToSend(); } void Http2WebTransportSessionImpl::PrepareCapsulesToSend( mozilla::Queue>& aOutput) { // Like neqo, flow control capsules are at level // CapsuleTransmissionPriority::Important. SendMaintenanceCapsules(CapsuleTransmissionPriority::Important); for (const auto& stream : mOutgoingStreams.Values()) { stream->TakeOutputCapsule( mCapsuleQueue[CapsuleTransmissionPriority::Normal]); } for (const auto& stream : mIncomingStreams.Values()) { stream->TakeOutputCapsule( mCapsuleQueue[CapsuleTransmissionPriority::Normal]); } static constexpr CapsuleTransmissionPriority priorities[] = { CapsuleTransmissionPriority::Critical, CapsuleTransmissionPriority::Important, CapsuleTransmissionPriority::High, CapsuleTransmissionPriority::Normal, CapsuleTransmissionPriority::Low, }; for (CapsuleTransmissionPriority priority : priorities) { auto& queue = mCapsuleQueue[priority]; while (!queue.IsEmpty()) { UniquePtr entry = queue.Pop(); aOutput.Push(std::move(entry)); } } } void Http2WebTransportSessionImpl::Close(nsresult aReason) { for (const auto& stream : mOutgoingStreams.Values()) { stream->Close(aReason); } for (const auto& stream : mIncomingStreams.Values()) { stream->Close(aReason); } mOutgoingStreams.Clear(); mIncomingStreams.Clear(); } void Http2WebTransportSessionImpl::OnStreamClosed( Http2WebTransportStream* aStream) { LOG(("Http2WebTransportSessionImpl::OnStreamClosed %p stream:%p", this, aStream)); RefPtr stream = aStream; StreamId id = stream->WebTransportStreamId(); if (id.IsClientInitiated()) { mOutgoingStreams.Remove(id); } else { mIncomingStreams.Remove(id); mRemoteStreamsFlowControl[id.StreamType()].FlowControl().AddRetired(1); } } bool Http2WebTransportSessionImpl::OnCapsule(Capsule&& aCapsule) { switch (aCapsule.Type()) { case CapsuleType::CLOSE_WEBTRANSPORT_SESSION: LOG(("Handling CLOSE_WEBTRANSPORT_SESSION\n")); break; case CapsuleType::DRAIN_WEBTRANSPORT_SESSION: LOG(("Handling DRAIN_WEBTRANSPORT_SESSION\n")); break; case CapsuleType::PADDING: LOG(("Handling PADDING\n")); break; case CapsuleType::WT_RESET_STREAM: { WebTransportResetStreamCapsule& reset = aCapsule.GetWebTransportResetStreamCapsule(); StreamId id = StreamId(reset.mID); if (!HandleStreamResetCapsule(id, std::move(aCapsule))) { return false; } } break; case CapsuleType::WT_STOP_SENDING: { WebTransportStopSendingCapsule& stopSending = aCapsule.GetWebTransportStopSendingCapsule(); StreamId id = StreamId(stopSending.mID); if (!HandleStreamStopSendingCapsule(id, std::move(aCapsule))) { return false; } } break; case CapsuleType::WT_STREAM: { WebTransportStreamDataCapsule& streamData = aCapsule.GetWebTransportStreamDataCapsule(); StreamId id = StreamId(streamData.mID); if (id.IsServerInitiated()) { return ProcessIncomingStreamCapsule(std::move(aCapsule), id, id.StreamType()); } else { RefPtr stream = mOutgoingStreams.Get(id); if (!stream) { LOG( ("Http2WebTransportSessionImpl::OnCapsule - " "stream not found " "stream_id=0x%" PRIx64 " [this=%p].", static_cast(id), this)); return false; } if (NS_FAILED(stream->OnCapsule(std::move(aCapsule)))) { return false; } } break; } case CapsuleType::WT_STREAM_FIN: LOG(("Handling WT_STREAM_FIN\n")); break; case CapsuleType::WT_MAX_DATA: { LOG(("Handling WT_MAX_DATA\n")); WebTransportMaxDataCapsule& maxData = aCapsule.GetWebTransportMaxDataCapsule(); mSessionDataFc.Update(maxData.mMaxDataSize); } break; case CapsuleType::WT_MAX_STREAM_DATA: { WebTransportMaxStreamDataCapsule& maxStreamData = aCapsule.GetWebTransportMaxStreamDataCapsule(); StreamId id = StreamId(maxStreamData.mID); if (!HandleMaxStreamDataCapsule(id, std::move(aCapsule))) { return false; } } break; case CapsuleType::WT_MAX_STREAMS_BIDI: { LOG(("Handling WT_MAX_STREAMS_BIDI\n")); WebTransportMaxStreamsCapsule& maxStreams = aCapsule.GetWebTransportMaxStreamsCapsule(); mLocalStreamsFlowControl[WebTransportStreamType::BiDi].Update( maxStreams.mLimit); ProcessPendingStreamCallbacks(mBidiPendingStreamCallbacks, WebTransportStreamType::BiDi); break; } case CapsuleType::WT_MAX_STREAMS_UNIDI: { LOG(("Handling WT_MAX_STREAMS_UNIDI\n")); WebTransportMaxStreamsCapsule& maxStreams = aCapsule.GetWebTransportMaxStreamsCapsule(); mLocalStreamsFlowControl[WebTransportStreamType::UniDi].Update( maxStreams.mLimit); ProcessPendingStreamCallbacks(mUnidiPendingStreamCallbacks, WebTransportStreamType::UniDi); break; } case CapsuleType::WT_DATA_BLOCKED: LOG(("Handling WT_DATA_BLOCKED\n")); break; case CapsuleType::WT_STREAM_DATA_BLOCKED: LOG(("Handling WT_STREAM_DATA_BLOCKED\n")); break; case CapsuleType::WT_STREAMS_BLOCKED_BIDI: LOG(("Handling WT_STREAMS_BLOCKED_BIDI\n")); break; case CapsuleType::WT_STREAMS_BLOCKED_UNIDI: LOG(("Handling WT_STREAMS_BLOCKED_UNIDI\n")); break; case CapsuleType::DATAGRAM: { LOG(("Handling DATAGRAM\n")); WebTransportDatagramCapsule& datagram = aCapsule.GetWebTransportDatagramCapsule(); if (nsCOMPtr listener = do_QueryInterface(mListener)) { listener->OnDatagramReceivedInternal(std::move(datagram.mPayload)); } break; } default: LOG(("Unhandled capsule type\n")); break; } return true; } already_AddRefed Http2WebTransportSessionImpl::GetStream(StreamId aId) { RefPtr stream; if (aId.IsClientInitiated()) { stream = mOutgoingStreams.Get(aId); } else { stream = mIncomingStreams.Get(aId); } if (!stream) { LOG( ("Http2WebTransportSessionImpl::GetStream - " "stream not found " "stream_id=0x%" PRIx64 " [this=%p].", static_cast(aId), this)); return nullptr; } return stream.forget(); } bool Http2WebTransportSessionImpl::HandleMaxStreamDataCapsule( StreamId aId, Capsule&& aCapsule) { RefPtr stream = GetStream(aId); if (!stream) { return false; } if (NS_FAILED(stream->OnCapsule(std::move(aCapsule)))) { return false; } return true; } bool Http2WebTransportSessionImpl::HandleStreamStopSendingCapsule( StreamId aId, Capsule&& aCapsule) { RefPtr stream = GetStream(aId); if (!stream) { return false; } stream->OnStopSending(); WebTransportStopSendingCapsule& stopSending = aCapsule.GetWebTransportStopSendingCapsule(); LOG( ("Http2WebTransportSessionImpl::HandleStreamStopSendingCapsule %p " "aID=%" PRIu64 " error=%" PRIu64, this, (uint64_t)aId, stopSending.mErrorCode)); uint8_t wtError = Http3ErrorToWebTransportError(stopSending.mErrorCode); nsresult rv = GetNSResultFromWebTransportError(wtError); if (mListener) { mListener->OnStopSending(aId, rv); } return true; } bool Http2WebTransportSessionImpl::HandleStreamResetCapsule( StreamId aId, Capsule&& aCapsule) { RefPtr stream = GetStream(aId); if (!stream) { return false; } WebTransportResetStreamCapsule& reset = aCapsule.GetWebTransportResetStreamCapsule(); stream->OnReset(reset.mReliableSize); uint8_t wtError = Http3ErrorToWebTransportError(reset.mErrorCode); nsresult rv = GetNSResultFromWebTransportError(wtError); if (mListener) { mListener->OnResetReceived(aId, rv); } return true; } void Http2WebTransportSessionImpl::OnStreamDataSent(StreamId aId, size_t aCount) { RefPtr stream = GetStream(aId); if (!stream) { return; } stream->OnStreamDataSent(aCount); } void Http2WebTransportSessionImpl::OnError(uint64_t aError) { LOG(("Http2WebTransportSessionImpl::OnError %p aError=%" PRIu64, this, aError)); // To be implemented. } bool Http2WebTransportSessionImpl::ProcessIncomingStreamCapsule( Capsule&& aCapsule, StreamId aID, WebTransportStreamType aStreamType) { LOG( ("Http2WebTransportSessionImpl::ProcessIncomingStreamCapsule %p " "aID=%" PRIu64 " type:%s", this, (uint64_t)aID, aStreamType == WebTransportStreamType::BiDi ? "BiDi" : "UniDi")); RefPtr stream = mIncomingStreams.Get(aID); if (stream) { return NS_SUCCEEDED(stream->OnCapsule(std::move(aCapsule))); } while (true) { auto res = mRemoteStreamsFlowControl[aStreamType].IsNewStream(aID); if (res.isErr() || !res.unwrap()) { break; } StreamId newStreamID = mRemoteStreamsFlowControl[aStreamType].TakeStreamId(); stream = new Http2WebTransportStream( this, aStreamType == WebTransportStreamType::BiDi ? mSettings.mInitialMaxStreamDataBidi : 0, aStreamType == WebTransportStreamType::BiDi ? mSettings.mInitialLocalMaxStreamDataBidi : mSettings.mInitialLocalMaxStreamDataUnidi, newStreamID); if (NS_FAILED(stream->Init())) { return false; } mIncomingStreams.InsertOrUpdate(newStreamID, stream); if (nsCOMPtr listener = do_QueryInterface(mListener)) { listener->OnIncomingStreamAvailableInternal(stream); } } stream = mIncomingStreams.Get(aID); if (stream) { return NS_SUCCEEDED(stream->OnCapsule(std::move(aCapsule))); } return true; } void Http2WebTransportSessionImpl::OnCapsuleParseFailure(nsresult aError) { mHandler->OnCapsuleParseFailure(aError); } NS_IMPL_ISUPPORTS_INHERITED(Http2WebTransportSession, Http2StreamTunnel, nsIOutputStreamCallback, nsIInputStreamCallback) Http2WebTransportSession::Http2WebTransportSession( Http2Session* aSession, int32_t aPriority, uint64_t aBcId, nsHttpConnectionInfo* aConnectionInfo, Http2WebTransportInitialSettings aSettings) : Http2StreamTunnel(aSession, aPriority, aBcId, aConnectionInfo), mImpl(MakeRefPtr(this, aSettings)), mCapsuleParser(MakeUnique(mImpl)) { LOG(("Http2WebTransportSession ctor:%p", this)); } Http2WebTransportSession::~Http2WebTransportSession() { LOG(("Http2WebTransportSession dtor:%p", this)); } void Http2WebTransportSession::CloseStream(nsresult aReason) { LOG(("Http2WebTransportSession::CloseStream this=%p aReason=%x", this, static_cast(aReason))); if (mTransaction) { mTransaction->Close(aReason); mTransaction = nullptr; } mInput->AsyncWait(nullptr, 0, 0, nullptr); mOutput->AsyncWait(nullptr, 0, 0, nullptr); Http2StreamTunnel::CloseStream(aReason); mCapsuleParser = nullptr; if (mImpl) { mImpl->Close(aReason); mImpl = nullptr; } } nsresult Http2WebTransportSession::GenerateHeaders(nsCString& aCompressedData, uint8_t& aFirstFrameFlags) { nsHttpRequestHead* head = mTransaction->RequestHead(); nsAutoCString authorityHeader; nsresult rv = head->GetHeader(nsHttp::Host, authorityHeader); NS_ENSURE_SUCCESS(rv, rv); RefPtr session = Session(); LOG3(("Http2WebTransportSession %p Stream ID 0x%X [session=%p] for %s\n", this, mStreamID, session.get(), authorityHeader.get())); nsAutoCString path; head->Path(path); rv = session->Compressor()->EncodeHeaderBlock( mFlatHttpRequestHeaders, "CONNECT"_ns, path, authorityHeader, "https"_ns, "webtransport"_ns, false, aCompressedData); NS_ENSURE_SUCCESS(rv, rv); mRequestBodyLenRemaining = 0x0fffffffffffffffULL; if (mInput) { mInput->AsyncWait(this, 0, 0, nullptr); } return NS_OK; } NS_IMETHODIMP Http2WebTransportSession::OnInputStreamReady(nsIAsyncInputStream* aIn) { char buffer[nsIOService::gDefaultSegmentSize]; uint32_t remainingCapacity = sizeof(buffer); uint32_t read = 0; while (remainingCapacity > 0) { uint32_t count = 0; nsresult rv = mInput->Read(buffer + read, remainingCapacity, &count); if (rv == NS_BASE_STREAM_WOULD_BLOCK) { break; } if (NS_FAILED(rv)) { LOG(("Http2WebTransportSession::OnInputStreamReady %p failed 0x%x\n", this, static_cast(rv))); // TODO: close connection return rv; } // base stream closed if (count == 0) { LOG(( "Http2WebTransportSession::OnInputStreamReady %p connection closed\n", this)); // Close with NS_BASE_STREAM_CLOSED return NS_OK; } remainingCapacity -= count; read += count; } if (read > 0) { Http2Session::LogIO(nullptr, this, "Http2WebTransportSession", buffer, read); mCapsuleParser->ProcessCapsuleData(reinterpret_cast(buffer), read); } mInput->AsyncWait(this, 0, 0, nullptr); return NS_OK; } NS_IMETHODIMP Http2WebTransportSession::OnOutputStreamReady(nsIAsyncOutputStream* aOut) { if (!mCurrentOutCapsule) { mImpl->PrepareCapsulesToSend(mOutgoingQueue); if (mOutgoingQueue.IsEmpty()) { return NS_OK; } mCurrentOutCapsule = mOutgoingQueue.Pop(); } while (mCurrentOutCapsule && mOutput) { auto buffer = mCurrentOutCapsule->GetBuffer(); const char* writeBuffer = reinterpret_cast(buffer.Elements()) + mWriteOffset; uint32_t toWrite = buffer.Length() - mWriteOffset; uint32_t wrote = 0; nsresult rv = mOutput->Write(writeBuffer, toWrite, &wrote); if (rv == NS_BASE_STREAM_WOULD_BLOCK) { mOutput->AsyncWait(this, 0, 0, nullptr); return NS_OK; } if (NS_FAILED(rv)) { LOG(("Http2WebTransportSession::OnOutputStreamReady %p failed %u\n", this, static_cast(rv))); // TODO: close connection return NS_OK; } mWriteOffset += wrote; Maybe metadata = mCurrentOutCapsule->GetStreamMetadata(); // This is a WT_STREAM_DATA capsule, so we need to track how many bytes of // stream data are sent. if (metadata) { if (mWriteOffset > metadata->mStartOfData) { uint64_t dataSent = mWriteOffset - metadata->mStartOfData; mImpl->OnStreamDataSent(StreamId(metadata->mID), dataSent); } } if (toWrite == wrote) { mWriteOffset = 0; mCurrentOutCapsule = mOutgoingQueue.IsEmpty() ? nullptr : mOutgoingQueue.Pop(); } } return NS_OK; } void Http2WebTransportSession::HasCapsuleToSend() { LOG(("Http2WebTransportSession::HasCapsuleToSend %p mSendClosed=%d", this, mSendClosed)); if (mSendClosed) { return; } mImpl->PrepareCapsulesToSend(mOutgoingQueue); if (mOutput) { OnOutputStreamReady(mOutput); } } void Http2WebTransportSession::SetSentFin() { Http2StreamTunnel::SetSentFin(true); } void Http2WebTransportSession::StartReading() { if (mInput) { mInput->AsyncWait(this, 0, 0, nullptr); } } void Http2WebTransportSession::OnCapsuleParseFailure(nsresult aError) { LOG(("Http2WebTransportSession::OnCapsuleParseFailure %p aError=%" PRIX32, this, static_cast(aError))); } } // namespace mozilla::net