1
0
Fork 0
firefox/netwerk/protocol/http/Http2WebTransportSession.cpp
Daniel Baumann 5e9a113729
Adding upstream version 140.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
2025-06-25 09:37:52 +02:00

720 lines
23 KiB
C++

/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set sw=2 ts=8 et tw=80 : */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
// HttpLog.h should generally be included first
#include "HttpLog.h"
#include "Capsule.h"
#include "CapsuleEncoder.h"
#include "Http2WebTransportSession.h"
#include "Http2WebTransportStream.h"
#include "Http2Session.h"
#include "mozilla/net/NeqoHttp3Conn.h"
#include "nsIWebTransport.h"
#include "nsIOService.h"
#include "nsHttp.h"
namespace mozilla::net {
Http2WebTransportSessionImpl::CapsuleQueue::CapsuleQueue() = default;
mozilla::Queue<UniquePtr<CapsuleEncoder>>&
Http2WebTransportSessionImpl::CapsuleQueue::operator[](
CapsuleTransmissionPriority aPriority) {
if (aPriority == CapsuleTransmissionPriority::Critical) {
return mCritical;
}
if (aPriority == CapsuleTransmissionPriority::Important) {
return mImportant;
}
if (aPriority == CapsuleTransmissionPriority::High) {
return mHigh;
}
if (aPriority == CapsuleTransmissionPriority::Normal) {
return mNormal;
}
return mLow;
}
Http2WebTransportSessionImpl::Http2WebTransportSessionImpl(
CapsuleIOHandler* aHandler, Http2WebTransportInitialSettings aSettings)
: mSettings(aSettings),
mRemoteStreamsFlowControl(aSettings.mInitialLocalMaxStreamsBidi,
aSettings.mInitialLocalMaxStreamsUnidi),
mHandler(aHandler),
mSessionDataFc(aSettings.mInitialMaxData),
mReceiverFc(aSettings.mInitialLocalMaxData) {
LOG(("Http2WebTransportSessionImpl ctor:%p", this));
mLocalStreamsFlowControl[WebTransportStreamType::UniDi].Update(
mSettings.mInitialMaxStreamsUni);
mLocalStreamsFlowControl[WebTransportStreamType::BiDi].Update(
mSettings.mInitialMaxStreamsBidi);
}
Http2WebTransportSessionImpl::~Http2WebTransportSessionImpl() {
LOG(("Http2WebTransportSessionImpl dtor:%p", this));
}
void Http2WebTransportSessionImpl::CloseSession(uint32_t aStatus,
const nsACString& aReason) {
LOG(("Http2WebTransportSessionImpl::CloseSession %p aStatus=%x", this,
aStatus));
mHandler->SetSentFin();
Capsule capsule = Capsule::CloseWebTransportSession(aStatus, aReason);
UniquePtr<CapsuleEncoder> encoder = MakeUnique<CapsuleEncoder>();
encoder->EncodeCapsule(capsule);
EnqueueOutCapsule(CapsuleTransmissionPriority::Important, std::move(encoder));
}
uint64_t Http2WebTransportSessionImpl::GetStreamId() const { return mStreamId; }
void Http2WebTransportSessionImpl::GetMaxDatagramSize() {}
void Http2WebTransportSessionImpl::SendDatagram(nsTArray<uint8_t>&& aData,
uint64_t aTrackingId) {
LOG(("Http2WebTransportSession::SendDatagram %p", this));
Capsule capsule = Capsule::WebTransportDatagram(std::move(aData));
UniquePtr<CapsuleEncoder> encoder = MakeUnique<CapsuleEncoder>();
encoder->EncodeCapsule(capsule);
EnqueueOutCapsule(CapsuleTransmissionPriority::Normal, std::move(encoder));
}
void Http2WebTransportSessionImpl::CreateOutgoingStreamInternal(
StreamId aStreamId,
std::function<void(Result<RefPtr<WebTransportStreamBase>, nsresult>&&)>&&
aCallback) {
LOG(
("Http2WebTransportSessionImpl::CreateOutgoingStreamInternal %p "
"id:%" PRIx64,
this, (uint64_t)aStreamId));
RefPtr<Http2WebTransportStream> stream = new Http2WebTransportStream(
this, aStreamId,
aStreamId.IsBiDi() ? mSettings.mInitialMaxStreamDataBidi
: mSettings.mInitialMaxStreamDataUni,
aStreamId.IsBiDi() ? mSettings.mInitialLocalMaxStreamDataBidi
: mSettings.mInitialLocalMaxStreamDataUnidi,
std::move(aCallback));
if (NS_FAILED(stream->Init())) {
return;
}
mOutgoingStreams.InsertOrUpdate(aStreamId, std::move(stream));
}
void Http2WebTransportSessionImpl::ProcessPendingStreamCallbacks(
mozilla::Queue<UniquePtr<PendingStreamCallback>>& aCallbacks,
WebTransportStreamType aStreamType) {
size_t size = aCallbacks.Count();
for (size_t count = 0; count < size; ++count) {
auto id = mLocalStreamsFlowControl.TakeStreamId(aStreamType);
if (!id) {
break;
}
UniquePtr<PendingStreamCallback> callback = aCallbacks.Pop();
auto cb = callback->TakeCallback();
CreateOutgoingStreamInternal(*id, std::move(cb));
}
}
void Http2WebTransportSessionImpl::CreateOutgoingBidirectionalStream(
std::function<void(Result<RefPtr<WebTransportStreamBase>, nsresult>&&)>&&
aCallback) {
auto id = mLocalStreamsFlowControl.TakeStreamId(WebTransportStreamType::BiDi);
if (!id) {
mBidiPendingStreamCallbacks.Push(
MakeUnique<PendingStreamCallback>(std::move(aCallback)));
return;
}
CreateOutgoingStreamInternal(*id, std::move(aCallback));
}
void Http2WebTransportSessionImpl::CreateOutgoingUnidirectionalStream(
std::function<void(Result<RefPtr<WebTransportStreamBase>, nsresult>&&)>&&
aCallback) {
auto id =
mLocalStreamsFlowControl.TakeStreamId(WebTransportStreamType::UniDi);
if (!id) {
mUnidiPendingStreamCallbacks.Push(
MakeUnique<PendingStreamCallback>(std::move(aCallback)));
return;
}
CreateOutgoingStreamInternal(*id, std::move(aCallback));
}
void Http2WebTransportSessionImpl::StartReading() {
LOG(("Http2WebTransportSessionImpl::StartReading %p", this));
mHandler->StartReading();
}
void Http2WebTransportSessionImpl::EnqueueOutCapsule(
CapsuleTransmissionPriority aPriority, UniquePtr<CapsuleEncoder>&& aData) {
mCapsuleQueue[aPriority].Push(std::move(aData));
mHandler->HasCapsuleToSend();
}
void Http2WebTransportSessionImpl::SendMaintenanceCapsules(
CapsuleTransmissionPriority aPriority) {
auto encoder = mSessionDataFc.CreateSessionDataBlockedCapsule();
if (encoder) {
mCapsuleQueue[aPriority].Push(MakeUnique<CapsuleEncoder>(encoder.ref()));
}
encoder = mReceiverFc.CreateMaxDataCapsule();
if (encoder) {
mCapsuleQueue[aPriority].Push(MakeUnique<CapsuleEncoder>(encoder.ref()));
}
encoder = mLocalStreamsFlowControl[WebTransportStreamType::BiDi]
.CreateStreamsBlockedCapsule();
if (encoder) {
mCapsuleQueue[aPriority].Push(MakeUnique<CapsuleEncoder>(encoder.ref()));
}
encoder = mLocalStreamsFlowControl[WebTransportStreamType::UniDi]
.CreateStreamsBlockedCapsule();
if (encoder) {
mCapsuleQueue[aPriority].Push(MakeUnique<CapsuleEncoder>(encoder.ref()));
}
encoder = mRemoteStreamsFlowControl[WebTransportStreamType::BiDi]
.FlowControl()
.CreateMaxStreamsCapsule();
if (encoder) {
mCapsuleQueue[aPriority].Push(MakeUnique<CapsuleEncoder>(encoder.ref()));
}
encoder = mRemoteStreamsFlowControl[WebTransportStreamType::UniDi]
.FlowControl()
.CreateMaxStreamsCapsule();
if (encoder) {
mCapsuleQueue[aPriority].Push(MakeUnique<CapsuleEncoder>(encoder.ref()));
}
for (const auto& stream : mOutgoingStreams.Values()) {
stream->WriteMaintenanceCapsules(mCapsuleQueue[aPriority]);
}
for (const auto& stream : mIncomingStreams.Values()) {
stream->WriteMaintenanceCapsules(mCapsuleQueue[aPriority]);
}
}
void Http2WebTransportSessionImpl::StreamHasCapsuleToSend() {
mHandler->HasCapsuleToSend();
}
void Http2WebTransportSessionImpl::PrepareCapsulesToSend(
mozilla::Queue<UniquePtr<CapsuleEncoder>>& aOutput) {
// Like neqo, flow control capsules are at level
// CapsuleTransmissionPriority::Important.
SendMaintenanceCapsules(CapsuleTransmissionPriority::Important);
for (const auto& stream : mOutgoingStreams.Values()) {
stream->TakeOutputCapsule(
mCapsuleQueue[CapsuleTransmissionPriority::Normal]);
}
for (const auto& stream : mIncomingStreams.Values()) {
stream->TakeOutputCapsule(
mCapsuleQueue[CapsuleTransmissionPriority::Normal]);
}
static constexpr CapsuleTransmissionPriority priorities[] = {
CapsuleTransmissionPriority::Critical,
CapsuleTransmissionPriority::Important,
CapsuleTransmissionPriority::High,
CapsuleTransmissionPriority::Normal,
CapsuleTransmissionPriority::Low,
};
for (CapsuleTransmissionPriority priority : priorities) {
auto& queue = mCapsuleQueue[priority];
while (!queue.IsEmpty()) {
UniquePtr<CapsuleEncoder> entry = queue.Pop();
aOutput.Push(std::move(entry));
}
}
}
void Http2WebTransportSessionImpl::Close(nsresult aReason) {
for (const auto& stream : mOutgoingStreams.Values()) {
stream->Close(aReason);
}
for (const auto& stream : mIncomingStreams.Values()) {
stream->Close(aReason);
}
mOutgoingStreams.Clear();
mIncomingStreams.Clear();
}
void Http2WebTransportSessionImpl::OnStreamClosed(
Http2WebTransportStream* aStream) {
LOG(("Http2WebTransportSessionImpl::OnStreamClosed %p stream:%p", this,
aStream));
RefPtr<Http2WebTransportStream> stream = aStream;
StreamId id = stream->WebTransportStreamId();
if (id.IsClientInitiated()) {
mOutgoingStreams.Remove(id);
} else {
mIncomingStreams.Remove(id);
mRemoteStreamsFlowControl[id.StreamType()].FlowControl().AddRetired(1);
}
}
bool Http2WebTransportSessionImpl::OnCapsule(Capsule&& aCapsule) {
switch (aCapsule.Type()) {
case CapsuleType::CLOSE_WEBTRANSPORT_SESSION:
LOG(("Handling CLOSE_WEBTRANSPORT_SESSION\n"));
break;
case CapsuleType::DRAIN_WEBTRANSPORT_SESSION:
LOG(("Handling DRAIN_WEBTRANSPORT_SESSION\n"));
break;
case CapsuleType::PADDING:
LOG(("Handling PADDING\n"));
break;
case CapsuleType::WT_RESET_STREAM: {
WebTransportResetStreamCapsule& reset =
aCapsule.GetWebTransportResetStreamCapsule();
StreamId id = StreamId(reset.mID);
if (!HandleStreamResetCapsule(id, std::move(aCapsule))) {
return false;
}
} break;
case CapsuleType::WT_STOP_SENDING: {
WebTransportStopSendingCapsule& stopSending =
aCapsule.GetWebTransportStopSendingCapsule();
StreamId id = StreamId(stopSending.mID);
if (!HandleStreamStopSendingCapsule(id, std::move(aCapsule))) {
return false;
}
} break;
case CapsuleType::WT_STREAM: {
WebTransportStreamDataCapsule& streamData =
aCapsule.GetWebTransportStreamDataCapsule();
StreamId id = StreamId(streamData.mID);
if (id.IsServerInitiated()) {
return ProcessIncomingStreamCapsule(std::move(aCapsule), id,
id.StreamType());
} else {
RefPtr<Http2WebTransportStream> stream = mOutgoingStreams.Get(id);
if (!stream) {
LOG(
("Http2WebTransportSessionImpl::OnCapsule - "
"stream not found "
"stream_id=0x%" PRIx64 " [this=%p].",
static_cast<uint64_t>(id), this));
return false;
}
if (NS_FAILED(stream->OnCapsule(std::move(aCapsule)))) {
return false;
}
}
break;
}
case CapsuleType::WT_STREAM_FIN:
LOG(("Handling WT_STREAM_FIN\n"));
break;
case CapsuleType::WT_MAX_DATA: {
LOG(("Handling WT_MAX_DATA\n"));
WebTransportMaxDataCapsule& maxData =
aCapsule.GetWebTransportMaxDataCapsule();
mSessionDataFc.Update(maxData.mMaxDataSize);
} break;
case CapsuleType::WT_MAX_STREAM_DATA: {
WebTransportMaxStreamDataCapsule& maxStreamData =
aCapsule.GetWebTransportMaxStreamDataCapsule();
StreamId id = StreamId(maxStreamData.mID);
if (!HandleMaxStreamDataCapsule(id, std::move(aCapsule))) {
return false;
}
} break;
case CapsuleType::WT_MAX_STREAMS_BIDI: {
LOG(("Handling WT_MAX_STREAMS_BIDI\n"));
WebTransportMaxStreamsCapsule& maxStreams =
aCapsule.GetWebTransportMaxStreamsCapsule();
mLocalStreamsFlowControl[WebTransportStreamType::BiDi].Update(
maxStreams.mLimit);
ProcessPendingStreamCallbacks(mBidiPendingStreamCallbacks,
WebTransportStreamType::BiDi);
break;
}
case CapsuleType::WT_MAX_STREAMS_UNIDI: {
LOG(("Handling WT_MAX_STREAMS_UNIDI\n"));
WebTransportMaxStreamsCapsule& maxStreams =
aCapsule.GetWebTransportMaxStreamsCapsule();
mLocalStreamsFlowControl[WebTransportStreamType::UniDi].Update(
maxStreams.mLimit);
ProcessPendingStreamCallbacks(mUnidiPendingStreamCallbacks,
WebTransportStreamType::UniDi);
break;
}
case CapsuleType::WT_DATA_BLOCKED:
LOG(("Handling WT_DATA_BLOCKED\n"));
break;
case CapsuleType::WT_STREAM_DATA_BLOCKED:
LOG(("Handling WT_STREAM_DATA_BLOCKED\n"));
break;
case CapsuleType::WT_STREAMS_BLOCKED_BIDI:
LOG(("Handling WT_STREAMS_BLOCKED_BIDI\n"));
break;
case CapsuleType::WT_STREAMS_BLOCKED_UNIDI:
LOG(("Handling WT_STREAMS_BLOCKED_UNIDI\n"));
break;
case CapsuleType::DATAGRAM: {
LOG(("Handling DATAGRAM\n"));
WebTransportDatagramCapsule& datagram =
aCapsule.GetWebTransportDatagramCapsule();
if (nsCOMPtr<WebTransportSessionEventListenerInternal> listener =
do_QueryInterface(mListener)) {
listener->OnDatagramReceivedInternal(std::move(datagram.mPayload));
}
break;
}
default:
LOG(("Unhandled capsule type\n"));
break;
}
return true;
}
already_AddRefed<Http2WebTransportStream>
Http2WebTransportSessionImpl::GetStream(StreamId aId) {
RefPtr<Http2WebTransportStream> stream;
if (aId.IsClientInitiated()) {
stream = mOutgoingStreams.Get(aId);
} else {
stream = mIncomingStreams.Get(aId);
}
if (!stream) {
LOG(
("Http2WebTransportSessionImpl::GetStream - "
"stream not found "
"stream_id=0x%" PRIx64 " [this=%p].",
static_cast<uint64_t>(aId), this));
return nullptr;
}
return stream.forget();
}
bool Http2WebTransportSessionImpl::HandleMaxStreamDataCapsule(
StreamId aId, Capsule&& aCapsule) {
RefPtr<Http2WebTransportStream> stream = GetStream(aId);
if (!stream) {
return false;
}
if (NS_FAILED(stream->OnCapsule(std::move(aCapsule)))) {
return false;
}
return true;
}
bool Http2WebTransportSessionImpl::HandleStreamStopSendingCapsule(
StreamId aId, Capsule&& aCapsule) {
RefPtr<Http2WebTransportStream> stream = GetStream(aId);
if (!stream) {
return false;
}
stream->OnStopSending();
WebTransportStopSendingCapsule& stopSending =
aCapsule.GetWebTransportStopSendingCapsule();
LOG(
("Http2WebTransportSessionImpl::HandleStreamStopSendingCapsule %p "
"aID=%" PRIu64 " error=%" PRIu64,
this, (uint64_t)aId, stopSending.mErrorCode));
uint8_t wtError = Http3ErrorToWebTransportError(stopSending.mErrorCode);
nsresult rv = GetNSResultFromWebTransportError(wtError);
if (mListener) {
mListener->OnStopSending(aId, rv);
}
return true;
}
bool Http2WebTransportSessionImpl::HandleStreamResetCapsule(
StreamId aId, Capsule&& aCapsule) {
RefPtr<Http2WebTransportStream> stream = GetStream(aId);
if (!stream) {
return false;
}
WebTransportResetStreamCapsule& reset =
aCapsule.GetWebTransportResetStreamCapsule();
stream->OnReset(reset.mReliableSize);
uint8_t wtError = Http3ErrorToWebTransportError(reset.mErrorCode);
nsresult rv = GetNSResultFromWebTransportError(wtError);
if (mListener) {
mListener->OnResetReceived(aId, rv);
}
return true;
}
void Http2WebTransportSessionImpl::OnStreamDataSent(StreamId aId,
size_t aCount) {
RefPtr<Http2WebTransportStream> stream = GetStream(aId);
if (!stream) {
return;
}
stream->OnStreamDataSent(aCount);
}
void Http2WebTransportSessionImpl::OnError(uint64_t aError) {
LOG(("Http2WebTransportSessionImpl::OnError %p aError=%" PRIu64, this,
aError));
// To be implemented.
}
bool Http2WebTransportSessionImpl::ProcessIncomingStreamCapsule(
Capsule&& aCapsule, StreamId aID, WebTransportStreamType aStreamType) {
LOG(
("Http2WebTransportSessionImpl::ProcessIncomingStreamCapsule %p "
"aID=%" PRIu64 " type:%s",
this, (uint64_t)aID,
aStreamType == WebTransportStreamType::BiDi ? "BiDi" : "UniDi"));
RefPtr<Http2WebTransportStream> stream = mIncomingStreams.Get(aID);
if (stream) {
return NS_SUCCEEDED(stream->OnCapsule(std::move(aCapsule)));
}
while (true) {
auto res = mRemoteStreamsFlowControl[aStreamType].IsNewStream(aID);
if (res.isErr() || !res.unwrap()) {
break;
}
StreamId newStreamID =
mRemoteStreamsFlowControl[aStreamType].TakeStreamId();
stream = new Http2WebTransportStream(
this,
aStreamType == WebTransportStreamType::BiDi
? mSettings.mInitialMaxStreamDataBidi
: 0,
aStreamType == WebTransportStreamType::BiDi
? mSettings.mInitialLocalMaxStreamDataBidi
: mSettings.mInitialLocalMaxStreamDataUnidi,
newStreamID);
if (NS_FAILED(stream->Init())) {
return false;
}
mIncomingStreams.InsertOrUpdate(newStreamID, stream);
if (nsCOMPtr<WebTransportSessionEventListenerInternal> listener =
do_QueryInterface(mListener)) {
listener->OnIncomingStreamAvailableInternal(stream);
}
}
stream = mIncomingStreams.Get(aID);
if (stream) {
return NS_SUCCEEDED(stream->OnCapsule(std::move(aCapsule)));
}
return true;
}
void Http2WebTransportSessionImpl::OnCapsuleParseFailure(nsresult aError) {
mHandler->OnCapsuleParseFailure(aError);
}
NS_IMPL_ISUPPORTS_INHERITED(Http2WebTransportSession, Http2StreamTunnel,
nsIOutputStreamCallback, nsIInputStreamCallback)
Http2WebTransportSession::Http2WebTransportSession(
Http2Session* aSession, int32_t aPriority, uint64_t aBcId,
nsHttpConnectionInfo* aConnectionInfo,
Http2WebTransportInitialSettings aSettings)
: Http2StreamTunnel(aSession, aPriority, aBcId, aConnectionInfo),
mImpl(MakeRefPtr<Http2WebTransportSessionImpl>(this, aSettings)),
mCapsuleParser(MakeUnique<CapsuleParser>(mImpl)) {
LOG(("Http2WebTransportSession ctor:%p", this));
}
Http2WebTransportSession::~Http2WebTransportSession() {
LOG(("Http2WebTransportSession dtor:%p", this));
}
void Http2WebTransportSession::CloseStream(nsresult aReason) {
LOG(("Http2WebTransportSession::CloseStream this=%p aReason=%x", this,
static_cast<uint32_t>(aReason)));
if (mTransaction) {
mTransaction->Close(aReason);
mTransaction = nullptr;
}
mInput->AsyncWait(nullptr, 0, 0, nullptr);
mOutput->AsyncWait(nullptr, 0, 0, nullptr);
Http2StreamTunnel::CloseStream(aReason);
mCapsuleParser = nullptr;
if (mImpl) {
mImpl->Close(aReason);
mImpl = nullptr;
}
}
nsresult Http2WebTransportSession::GenerateHeaders(nsCString& aCompressedData,
uint8_t& aFirstFrameFlags) {
nsHttpRequestHead* head = mTransaction->RequestHead();
nsAutoCString authorityHeader;
nsresult rv = head->GetHeader(nsHttp::Host, authorityHeader);
NS_ENSURE_SUCCESS(rv, rv);
RefPtr<Http2Session> session = Session();
LOG3(("Http2WebTransportSession %p Stream ID 0x%X [session=%p] for %s\n",
this, mStreamID, session.get(), authorityHeader.get()));
nsAutoCString path;
head->Path(path);
rv = session->Compressor()->EncodeHeaderBlock(
mFlatHttpRequestHeaders, "CONNECT"_ns, path, authorityHeader, "https"_ns,
"webtransport"_ns, false, aCompressedData);
NS_ENSURE_SUCCESS(rv, rv);
mRequestBodyLenRemaining = 0x0fffffffffffffffULL;
if (mInput) {
mInput->AsyncWait(this, 0, 0, nullptr);
}
return NS_OK;
}
NS_IMETHODIMP
Http2WebTransportSession::OnInputStreamReady(nsIAsyncInputStream* aIn) {
char buffer[nsIOService::gDefaultSegmentSize];
uint32_t remainingCapacity = sizeof(buffer);
uint32_t read = 0;
while (remainingCapacity > 0) {
uint32_t count = 0;
nsresult rv = mInput->Read(buffer + read, remainingCapacity, &count);
if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
break;
}
if (NS_FAILED(rv)) {
LOG(("Http2WebTransportSession::OnInputStreamReady %p failed 0x%x\n",
this, static_cast<uint32_t>(rv)));
// TODO: close connection
return rv;
}
// base stream closed
if (count == 0) {
LOG((
"Http2WebTransportSession::OnInputStreamReady %p connection closed\n",
this));
// Close with NS_BASE_STREAM_CLOSED
return NS_OK;
}
remainingCapacity -= count;
read += count;
}
if (read > 0) {
Http2Session::LogIO(nullptr, this, "Http2WebTransportSession", buffer,
read);
mCapsuleParser->ProcessCapsuleData(reinterpret_cast<uint8_t*>(buffer),
read);
}
mInput->AsyncWait(this, 0, 0, nullptr);
return NS_OK;
}
NS_IMETHODIMP
Http2WebTransportSession::OnOutputStreamReady(nsIAsyncOutputStream* aOut) {
if (!mCurrentOutCapsule) {
mImpl->PrepareCapsulesToSend(mOutgoingQueue);
if (mOutgoingQueue.IsEmpty()) {
return NS_OK;
}
mCurrentOutCapsule = mOutgoingQueue.Pop();
}
while (mCurrentOutCapsule && mOutput) {
auto buffer = mCurrentOutCapsule->GetBuffer();
const char* writeBuffer =
reinterpret_cast<const char*>(buffer.Elements()) + mWriteOffset;
uint32_t toWrite = buffer.Length() - mWriteOffset;
uint32_t wrote = 0;
nsresult rv = mOutput->Write(writeBuffer, toWrite, &wrote);
if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
mOutput->AsyncWait(this, 0, 0, nullptr);
return NS_OK;
}
if (NS_FAILED(rv)) {
LOG(("Http2WebTransportSession::OnOutputStreamReady %p failed %u\n", this,
static_cast<uint32_t>(rv)));
// TODO: close connection
return NS_OK;
}
mWriteOffset += wrote;
Maybe<StreamMetadata> metadata = mCurrentOutCapsule->GetStreamMetadata();
// This is a WT_STREAM_DATA capsule, so we need to track how many bytes of
// stream data are sent.
if (metadata) {
if (mWriteOffset > metadata->mStartOfData) {
uint64_t dataSent = mWriteOffset - metadata->mStartOfData;
mImpl->OnStreamDataSent(StreamId(metadata->mID), dataSent);
}
}
if (toWrite == wrote) {
mWriteOffset = 0;
mCurrentOutCapsule =
mOutgoingQueue.IsEmpty() ? nullptr : mOutgoingQueue.Pop();
}
}
return NS_OK;
}
void Http2WebTransportSession::HasCapsuleToSend() {
LOG(("Http2WebTransportSession::HasCapsuleToSend %p mSendClosed=%d", this,
mSendClosed));
if (mSendClosed) {
return;
}
mImpl->PrepareCapsulesToSend(mOutgoingQueue);
if (mOutput) {
OnOutputStreamReady(mOutput);
}
}
void Http2WebTransportSession::SetSentFin() {
Http2StreamTunnel::SetSentFin(true);
}
void Http2WebTransportSession::StartReading() {
if (mInput) {
mInput->AsyncWait(this, 0, 0, nullptr);
}
}
void Http2WebTransportSession::OnCapsuleParseFailure(nsresult aError) {
LOG(("Http2WebTransportSession::OnCapsuleParseFailure %p aError=%" PRIX32,
this, static_cast<uint32_t>(aError)));
}
} // namespace mozilla::net