/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ // HttpLog.h should generally be included first #include "HttpLog.h" #include #include "Http2WebTransportStream.h" #include "Http2WebTransportSession.h" #include "Capsule.h" #include "CapsuleEncoder.h" #include "nsIOService.h" namespace mozilla::net { NS_IMPL_ISUPPORTS(Http2WebTransportStream, nsIOutputStreamCallback, nsIInputStreamCallback) Http2WebTransportStream::Http2WebTransportStream( Http2WebTransportSessionImpl* aWebTransportSession, StreamId aStreamId, uint64_t aInitialMaxStreamData, uint64_t aInitialLocalMaxStreamData, std::function, nsresult>&&)>&& aCallback) : WebTransportStreamBase(aWebTransportSession->GetStreamId(), std::move(aCallback)), mWebTransportSession(aWebTransportSession), mStreamId(aStreamId), mOwnerThread(GetCurrentSerialEventTarget()), mFc(aStreamId, aInitialMaxStreamData), mReceiverFc(aStreamId, aInitialLocalMaxStreamData) { LOG(("Http2WebTransportStream outgoing ctor:%p", this)); mStreamRole = OUTGOING; mStreamType = mStreamId.StreamType(); } Http2WebTransportStream::Http2WebTransportStream( Http2WebTransportSessionImpl* aWebTransportSession, uint64_t aInitialMaxStreamData, uint64_t aInitialLocalMaxStreamData, StreamId aStreamId) : WebTransportStreamBase(aWebTransportSession->GetStreamId(), nullptr), mWebTransportSession(aWebTransportSession), mStreamId(aStreamId), mOwnerThread(GetCurrentSerialEventTarget()), mFc(aStreamId, aInitialMaxStreamData), mReceiverFc(aStreamId, aInitialLocalMaxStreamData) { LOG(("Http2WebTransportStream incoming ctor:%p", this)); mStreamRole = INCOMING; mStreamType = mStreamId.StreamType(); } Http2WebTransportStream::~Http2WebTransportStream() { LOG(("Http2WebTransportStream dtor:%p", this)); } nsresult Http2WebTransportStream::Init() { nsresult rv = NS_OK; auto resultCallback = MakeScopeExit([&] { if (NS_FAILED(rv)) { mSendState = SEND_DONE; mRecvState = RECV_DONE; if (mStreamReadyCallback) { mStreamReadyCallback(Err(rv)); } } else { mSocketInCondition = NS_OK; mSocketOutCondition = NS_OK; RefPtr stream = this; if (mStreamReadyCallback) { mStreamReadyCallback(stream); } } mStreamReadyCallback = nullptr; }); if (mStreamRole == INCOMING) { rv = InitInputPipe(); if (NS_FAILED(rv)) { return rv; } if (mStreamType == WebTransportStreamType::BiDi) { rv = InitOutputPipe(); } return rv; } MOZ_ASSERT(mStreamRole == OUTGOING); rv = InitOutputPipe(); if (NS_FAILED(rv)) { return rv; } if (mStreamType == WebTransportStreamType::BiDi) { rv = InitInputPipe(); } if (mSendStreamPipeIn) { rv = mSendStreamPipeIn->AsyncWait(this, 0, 0, mOwnerThread); } return rv; } class StreamId Http2WebTransportStream::WebTransportStreamId() const { return mStreamId; } uint64_t Http2WebTransportStream::GetStreamId() const { return mStreamId; } void Http2WebTransportStream::SendStopSending(uint8_t aErrorCode) { if (mSentStopSending || !mWebTransportSession) { // https://www.ietf.org/archive/id/draft-ietf-webtrans-http2-11.html#section-6.3 // A WT_STOP_SENDING capsule MUST NOT be sent multiple times for the same // stream. return; } mSentStopSending = true; mStopSendingCapsule.emplace( Capsule::WebTransportStopSending(aErrorCode, mStreamId)); mWebTransportSession->StreamHasCapsuleToSend(); mRecvState = RECV_DONE; } void Http2WebTransportStream::SendFin() {} void Http2WebTransportStream::Reset(uint64_t aErrorCode) { if (mSentReset || !mWebTransportSession || mSendState == SEND_DONE) { // https://www.ietf.org/archive/id/draft-ietf-webtrans-http2-11.html#section-6.2 // A WT_RESET_STREAM capsule MUST NOT be sent after a stream is closed or // reset. return; } mSentReset = true; mStreamResetCapsule.emplace(Capsule::WebTransportResetStream( aErrorCode, mTotalSent.value(), mStreamId)); mWebTransportSession->StreamHasCapsuleToSend(); mRecvState = RECV_DONE; mSendState = SEND_DONE; } already_AddRefed Http2WebTransportStream::GetSendStreamStats() { return nullptr; } already_AddRefed Http2WebTransportStream::GetReceiveStreamStats() { return nullptr; } bool Http2WebTransportStream::RecvDone() const { return false; } void Http2WebTransportStream::SetSendOrder(Maybe aSendOrder) {} NS_IMETHODIMP Http2WebTransportStream::OnInputStreamReady(nsIAsyncInputStream* aIn) { LOG1( ("Http2WebTransportStream::OnInputStreamReady [this=%p stream=%p " "state=%d]", this, aIn, mSendState)); if (mSendState == SEND_DONE) { // already closed return NS_OK; } uint32_t sendBytes = 0; return mSendStreamPipeIn->ReadSegments( ReadRequestSegment, this, nsIOService::gDefaultSegmentSize, &sendBytes); } NS_IMETHODIMP Http2WebTransportStream::OnOutputStreamReady(nsIAsyncOutputStream* aOut) { if (!mCurrentOut) { if (mOutgoingQueue.IsEmpty()) { return NS_OK; } mCurrentOut = mOutgoingQueue.Pop(); } while (mCurrentOut && mReceiveStreamPipeOut && (mRecvState != RECV_DONE)) { char* writeBuffer = reinterpret_cast(const_cast( mCurrentOut->GetData().Elements())) + mWriteOffset; uint32_t toWrite = mCurrentOut->GetData().Length() - mWriteOffset; if (mReliableSize) { if (mTotalReceived + toWrite > *mReliableSize) { toWrite = *mReliableSize - mTotalReceived; } } uint32_t wrote = 0; nsresult rv = mReceiveStreamPipeOut->Write(writeBuffer, toWrite, &wrote); LOG(("Http2WebTransportStream::Write rv=0x%" PRIx32 " wrote=%" PRIu32 " socketin=%" PRIx32 " [this=%p]", static_cast(rv), wrote, static_cast(mSocketInCondition), this)); if (rv == NS_BASE_STREAM_WOULD_BLOCK) { mSocketInCondition = mReceiveStreamPipeOut->AsyncWait(this, 0, 0, nullptr); return mSocketInCondition; } if (NS_FAILED(rv)) { LOG(("Http2WebTransportStream::OnOutputStreamReady %p failed %u\n", this, static_cast(rv))); // TODO: close this stream mSocketInCondition = rv; mCurrentOut = nullptr; mRecvState = RECV_DONE; return NS_OK; } // Retire when sending data to the consumer. mReceiverFc.AddRetired(wrote); mWebTransportSession->ReceiverFc().AddRetired(wrote); mWriteOffset += wrote; mTotalReceived += wrote; // https://www.ietf.org/archive/id/draft-ietf-webtrans-http2-11.html#section-6.2 // A receiver of a WT_RESET_STREAM capsule can discard any data in excess of // the Reliable Size indicated, even if that data was already received. if (mReliableSize && mTotalReceived == *mReliableSize) { mSocketInCondition = NS_OK; mWriteOffset = 0; mCurrentOut = nullptr; mOutgoingQueue.Clear(); mRecvState = RECV_DONE; break; } if (toWrite == wrote) { mWriteOffset = 0; mCurrentOut = mOutgoingQueue.IsEmpty() ? nullptr : mOutgoingQueue.Pop(); } } return NS_OK; } // static nsresult Http2WebTransportStream::ReadRequestSegment( nsIInputStream* stream, void* closure, const char* buf, uint32_t offset, uint32_t count, uint32_t* countRead) { Http2WebTransportStream* wtStream = (Http2WebTransportStream*)closure; LOG(("Http2WebTransportStream::ReadRequestSegment %p count=%u", wtStream, count)); *countRead = 0; if (!wtStream->mWebTransportSession) { return NS_ERROR_UNEXPECTED; } uint64_t limit = std::min(wtStream->mWebTransportSession->SessionDataFc().Available(), wtStream->mFc.Available()); if (limit < count) { if (wtStream->mWebTransportSession->SessionDataFc().Available() < count) { LOG(("blocked by session level flow control")); wtStream->mWebTransportSession->SessionDataFc().Blocked(); } if (wtStream->mFc.Available() < count) { LOG(("blocked by stream level flow control")); wtStream->mFc.Blocked(); } return NS_BASE_STREAM_WOULD_BLOCK; } nsTArray data; data.AppendElements(buf, count); Capsule capsule = Capsule::WebTransportStreamData(wtStream->mStreamId, false, std::move(data)); UniquePtr encoder = MakeUnique(); encoder->EncodeCapsule(capsule); wtStream->mCapsuleQueue.Push(std::move(encoder)); *countRead = count; return NS_OK; } void Http2WebTransportStream::TakeOutputCapsule( mozilla::Queue>& aOutput) { LOG(("Http2WebTransportStream::TakeOutputCapsule %p", this)); if (mCapsuleQueue.IsEmpty()) { mSendStreamPipeIn->AsyncWait(this, 0, 0, mOwnerThread); return; } while (!mCapsuleQueue.IsEmpty()) { UniquePtr entry = mCapsuleQueue.Pop(); aOutput.Push(std::move(entry)); } mSendStreamPipeIn->AsyncWait(this, 0, 0, mOwnerThread); } void Http2WebTransportStream::WriteMaintenanceCapsules( mozilla::Queue>& aOutput) { if (mStopSendingCapsule) { UniquePtr encoder = MakeUnique(); encoder->EncodeCapsule(*mStopSendingCapsule); mStopSendingCapsule = Nothing(); aOutput.Push(std::move(encoder)); } if (mStreamResetCapsule) { UniquePtr encoder = MakeUnique(); encoder->EncodeCapsule(*mStreamResetCapsule); mStreamResetCapsule = Nothing(); aOutput.Push(std::move(encoder)); } auto dataBlocked = mFc.CreateStreamDataBlockedCapsule(); if (dataBlocked) { aOutput.Push(MakeUnique(dataBlocked.ref())); } auto maxStreamData = mReceiverFc.CreateMaxStreamDataCapsule(); if (maxStreamData) { aOutput.Push(MakeUnique(maxStreamData.ref())); } // Keep reading data from the consumer. mSendStreamPipeIn->AsyncWait(this, 0, 0, mOwnerThread); } nsresult Http2WebTransportStream::OnCapsule(Capsule&& aCapsule) { switch (aCapsule.Type()) { case CapsuleType::WT_STREAM: { LOG(("Handling WT_STREAM\n")); WebTransportStreamDataCapsule& streamData = aCapsule.GetWebTransportStreamDataCapsule(); return HandleStreamData(false, std::move(streamData.mData)); } case CapsuleType::WT_STREAM_FIN: LOG(("Handling WT_STREAM_FIN\n")); break; case CapsuleType::WT_MAX_STREAM_DATA: { LOG(("Handling WT_MAX_STREAM_DATA\n")); WebTransportMaxStreamDataCapsule& maxStreamData = aCapsule.GetWebTransportMaxStreamDataCapsule(); return HandleMaxStreamData(maxStreamData.mLimit); } case CapsuleType::WT_STREAM_DATA_BLOCKED: LOG(("Handling WT_STREAM_DATA_BLOCKED\n")); break; default: LOG(("Unhandled capsule type\n")); break; } return NS_OK; } nsresult Http2WebTransportStream::HandleMaxStreamData(uint64_t aLimit) { mFc.Update(aLimit); return NS_OK; } void Http2WebTransportStream::OnStopSending() { mSendState = SEND_DONE; } void Http2WebTransportStream::OnReset(uint64_t aSize) { if (mReliableSize) { return; } mReliableSize.emplace(aSize); LOG(("Http2WebTransportStream::OnReset %p mReliableSize=%" PRIu64 " mTotalReceived=%" PRIu64, this, *mReliableSize, mTotalReceived)); if (*mReliableSize < mTotalReceived) { // A receiver MUST treat the receipt of a WT_RESET_STREAM with a Reliable // Size smaller than the number of bytes it has received on the stream as a // session error. // TODO: find a better error code. mWebTransportSession->OnError(0); } } void Http2WebTransportStream::OnStreamDataSent(size_t aCount) { LOG(("Http2WebTransportStream::OnStreamDataSent %p aCount=%" PRIu64 " mTotalSent=%" PRIu64, this, static_cast(aCount), mTotalSent.value())); mTotalSent += aCount; if (!mTotalSent.isValid()) { // TODO: find a better error code. mWebTransportSession->OnError(0); return; } mFc.Consume(aCount); mWebTransportSession->SessionDataFc().Consume(aCount); } void Http2WebTransportStream::Close(nsresult aResult) { if (mSendStreamPipeIn) { mSendStreamPipeIn->AsyncWait(nullptr, 0, 0, nullptr); mSendStreamPipeIn->CloseWithStatus(aResult); } if (mReceiveStreamPipeOut) { mReceiveStreamPipeOut->AsyncWait(nullptr, 0, 0, nullptr); mReceiveStreamPipeOut->CloseWithStatus(aResult); } mSendState = SEND_DONE; mRecvState = RECV_DONE; mWebTransportSession = nullptr; } nsresult Http2WebTransportStream::HandleStreamData(bool aFin, nsTArray&& aData) { LOG(("Http2WebTransportStream::HandleStreamData [this=%p, state=%d aFin=%d", this, static_cast(mRecvState), aFin)); if (NS_FAILED(mSocketInCondition)) { mRecvState = RECV_DONE; } uint32_t countWrittenSingle = 0; switch (mRecvState) { case READING: { size_t length = aData.Length(); if (length) { auto newConsumed = mReceiverFc.SetConsumed(mReceiverFc.Consumed() + length); if (newConsumed.isErr()) { mSocketInCondition = newConsumed.unwrapErr(); } else { if (!mWebTransportSession->ReceiverFc().Consume( newConsumed.unwrap())) { LOG(("Exceed session flow control limit")); mSocketInCondition = NS_ERROR_NOT_AVAILABLE; } else { mOutgoingQueue.Push(MakeUnique(std::move(aData))); mSocketInCondition = OnOutputStreamReady(mReceiveStreamPipeOut); } } } else { // https://www.ietf.org/archive/id/draft-ietf-webtrans-http2-10.html#section-6.4 // Empty WT_STREAM capsules MUST NOT be used unless they open or close a // stream // TODO: Handle empty stream capsule } LOG(( "Http2WebTransportStream::HandleStreamData " "countWrittenSingle=%" PRIu32 " socketin=%" PRIx32 " [this=%p]", countWrittenSingle, static_cast(mSocketInCondition), this)); if (NS_FAILED(mSocketInCondition)) { mReceiveStreamPipeOut->Close(); mRecvState = RECV_DONE; } else { if (aFin) { mRecvState = RECEIVED_FIN; } } } break; case RECEIVED_FIN: mRecvState = RECV_DONE; break; case RECV_DONE: mSocketInCondition = NS_ERROR_UNEXPECTED; break; default: mSocketInCondition = NS_ERROR_UNEXPECTED; break; } return mSocketInCondition; } } // namespace mozilla::net