summaryrefslogtreecommitdiffstats
path: root/netwerk/protocol/http/Http3WebTransportStream.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'netwerk/protocol/http/Http3WebTransportStream.cpp')
-rw-r--r--netwerk/protocol/http/Http3WebTransportStream.cpp613
1 files changed, 613 insertions, 0 deletions
diff --git a/netwerk/protocol/http/Http3WebTransportStream.cpp b/netwerk/protocol/http/Http3WebTransportStream.cpp
new file mode 100644
index 0000000000..8378e6b92d
--- /dev/null
+++ b/netwerk/protocol/http/Http3WebTransportStream.cpp
@@ -0,0 +1,613 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#include "Http3WebTransportStream.h"
+
+#include "HttpLog.h"
+#include "Http3Session.h"
+#include "Http3WebTransportSession.h"
+#include "mozilla/TimeStamp.h"
+#include "nsHttpHandler.h"
+#include "nsIOService.h"
+#include "nsIPipe.h"
+#include "nsSocketTransportService2.h"
+#include "nsIWebTransportStream.h"
+
+namespace mozilla::net {
+
+namespace {
+
+// This is an nsAHttpTransaction that does nothing.
+class DummyWebTransportStreamTransaction : public nsAHttpTransaction {
+ public:
+ NS_DECL_THREADSAFE_ISUPPORTS
+
+ DummyWebTransportStreamTransaction() = default;
+
+ void SetConnection(nsAHttpConnection*) override {}
+ nsAHttpConnection* Connection() override { return nullptr; }
+ void GetSecurityCallbacks(nsIInterfaceRequestor**) override {}
+ void OnTransportStatus(nsITransport* transport, nsresult status,
+ int64_t progress) override {}
+ bool IsDone() override { return false; }
+ nsresult Status() override { return NS_OK; }
+ uint32_t Caps() override { return 0; }
+ [[nodiscard]] nsresult ReadSegments(nsAHttpSegmentReader*, uint32_t,
+ uint32_t*) override {
+ return NS_OK;
+ }
+ [[nodiscard]] nsresult WriteSegments(nsAHttpSegmentWriter*, uint32_t,
+ uint32_t*) override {
+ return NS_OK;
+ }
+ void Close(nsresult reason) override {}
+ nsHttpConnectionInfo* ConnectionInfo() override { return nullptr; }
+ void SetProxyConnectFailed() override {}
+ nsHttpRequestHead* RequestHead() override { return nullptr; }
+ uint32_t Http1xTransactionCount() override { return 0; }
+ [[nodiscard]] nsresult TakeSubTransactions(
+ nsTArray<RefPtr<nsAHttpTransaction>>& outTransactions) override {
+ return NS_OK;
+ }
+
+ private:
+ virtual ~DummyWebTransportStreamTransaction() = default;
+};
+
+NS_IMPL_ISUPPORTS(DummyWebTransportStreamTransaction, nsISupportsWeakReference)
+
+class WebTransportSendStreamStats : public nsIWebTransportSendStreamStats {
+ public:
+ NS_DECL_THREADSAFE_ISUPPORTS
+
+ explicit WebTransportSendStreamStats(uint64_t aSent, uint64_t aAcked)
+ : mTimeStamp(TimeStamp::Now()),
+ mTotalSent(aSent),
+ mTotalAcknowledged(aAcked) {}
+
+ NS_IMETHOD GetTimestamp(mozilla::TimeStamp* aTimestamp) override {
+ *aTimestamp = mTimeStamp;
+ return NS_OK;
+ }
+ NS_IMETHOD GetBytesSent(uint64_t* aBytesSent) override {
+ *aBytesSent = mTotalSent;
+ return NS_OK;
+ }
+ NS_IMETHOD GetBytesAcknowledged(uint64_t* aBytesAcknowledged) override {
+ *aBytesAcknowledged = mTotalAcknowledged;
+ return NS_OK;
+ }
+
+ private:
+ virtual ~WebTransportSendStreamStats() = default;
+
+ TimeStamp mTimeStamp;
+ uint64_t mTotalSent;
+ uint64_t mTotalAcknowledged;
+};
+
+NS_IMPL_ISUPPORTS(WebTransportSendStreamStats, nsIWebTransportSendStreamStats)
+
+class WebTransportReceiveStreamStats
+ : public nsIWebTransportReceiveStreamStats {
+ public:
+ NS_DECL_THREADSAFE_ISUPPORTS
+
+ explicit WebTransportReceiveStreamStats(uint64_t aReceived)
+ : mTimeStamp(TimeStamp::Now()), mTotalReceived(aReceived) {}
+
+ NS_IMETHOD GetTimestamp(mozilla::TimeStamp* aTimestamp) override {
+ *aTimestamp = mTimeStamp;
+ return NS_OK;
+ }
+ NS_IMETHOD GetBytesReceived(uint64_t* aByteReceived) override {
+ *aByteReceived = mTotalReceived;
+ return NS_OK;
+ }
+
+ private:
+ virtual ~WebTransportReceiveStreamStats() = default;
+
+ TimeStamp mTimeStamp;
+ uint64_t mTotalReceived;
+};
+
+NS_IMPL_ISUPPORTS(WebTransportReceiveStreamStats,
+ nsIWebTransportReceiveStreamStats)
+
+} // namespace
+
+NS_IMPL_ISUPPORTS(Http3WebTransportStream, nsIInputStreamCallback)
+
+Http3WebTransportStream::Http3WebTransportStream(
+ Http3Session* aSession, uint64_t aSessionId, WebTransportStreamType aType,
+ std::function<void(Result<RefPtr<Http3WebTransportStream>, nsresult>&&)>&&
+ aCallback)
+ : Http3StreamBase(new DummyWebTransportStreamTransaction(), aSession),
+ mSessionId(aSessionId),
+ mStreamType(aType),
+ mStreamRole(OUTGOING),
+ mStreamReadyCallback(std::move(aCallback)) {
+ LOG(("Http3WebTransportStream outgoing ctor %p", this));
+}
+
+Http3WebTransportStream::Http3WebTransportStream(Http3Session* aSession,
+ uint64_t aSessionId,
+ WebTransportStreamType aType,
+ uint64_t aStreamId)
+ : Http3StreamBase(new DummyWebTransportStreamTransaction(), aSession),
+ mSessionId(aSessionId),
+ mStreamType(aType),
+ mStreamRole(INCOMING),
+ // WAITING_DATA indicates we are waiting
+ // Http3WebTransportStream::OnInputStreamReady to be called.
+ mSendState(WAITING_DATA),
+ mStreamReadyCallback(nullptr) {
+ LOG(("Http3WebTransportStream incoming ctor %p", this));
+ mStreamId = aStreamId;
+}
+
+Http3WebTransportStream::~Http3WebTransportStream() {
+ LOG(("Http3WebTransportStream dtor %p", this));
+}
+
+nsresult Http3WebTransportStream::TryActivating() {
+ MOZ_ASSERT(OnSocketThread(), "not on socket thread");
+ return mSession->TryActivatingWebTransportStream(&mStreamId, this);
+}
+
+NS_IMETHODIMP Http3WebTransportStream::OnInputStreamReady(
+ nsIAsyncInputStream* aStream) {
+ LOG1(
+ ("Http3WebTransportStream::OnInputStreamReady [this=%p stream=%p "
+ "state=%d]",
+ this, aStream, mSendState));
+ MOZ_ASSERT(mSendState == WAITING_DATA);
+
+ mSendState = SENDING;
+ mSession->StreamHasDataToWrite(this);
+ return NS_OK;
+}
+
+nsresult Http3WebTransportStream::InitOutputPipe() {
+ nsCOMPtr<nsIAsyncOutputStream> out;
+ nsCOMPtr<nsIAsyncInputStream> in;
+ NS_NewPipe2(getter_AddRefs(in), getter_AddRefs(out), true, true,
+ nsIOService::gDefaultSegmentSize,
+ nsIOService::gDefaultSegmentCount);
+
+ {
+ MutexAutoLock lock(mMutex);
+ mSendStreamPipeIn = std::move(in);
+ mSendStreamPipeOut = std::move(out);
+ }
+
+ nsresult rv =
+ mSendStreamPipeIn->AsyncWait(this, 0, 0, gSocketTransportService);
+ if (NS_FAILED(rv)) {
+ return rv;
+ }
+
+ mSendState = WAITING_DATA;
+ return NS_OK;
+}
+
+nsresult Http3WebTransportStream::InitInputPipe() {
+ nsCOMPtr<nsIAsyncOutputStream> out;
+ nsCOMPtr<nsIAsyncInputStream> in;
+ NS_NewPipe2(getter_AddRefs(in), getter_AddRefs(out), true, true,
+ nsIOService::gDefaultSegmentSize,
+ nsIOService::gDefaultSegmentCount);
+
+ {
+ MutexAutoLock lock(mMutex);
+ mReceiveStreamPipeIn = std::move(in);
+ mReceiveStreamPipeOut = std::move(out);
+ }
+
+ mRecvState = READING;
+ return NS_OK;
+}
+
+void Http3WebTransportStream::GetWriterAndReader(
+ nsIAsyncOutputStream** aOutOutputStream,
+ nsIAsyncInputStream** aOutInputStream) {
+ nsCOMPtr<nsIAsyncOutputStream> output;
+ nsCOMPtr<nsIAsyncInputStream> input;
+ {
+ MutexAutoLock lock(mMutex);
+ output = mSendStreamPipeOut;
+ input = mReceiveStreamPipeIn;
+ }
+
+ output.forget(aOutOutputStream);
+ input.forget(aOutInputStream);
+}
+
+already_AddRefed<nsIWebTransportSendStreamStats>
+Http3WebTransportStream::GetSendStreamStats() {
+ MOZ_ASSERT(OnSocketThread(), "not on socket thread");
+
+ nsCOMPtr<nsIWebTransportSendStreamStats> stats =
+ new WebTransportSendStreamStats(mTotalSent, mTotalAcknowledged);
+ return stats.forget();
+}
+
+already_AddRefed<nsIWebTransportReceiveStreamStats>
+Http3WebTransportStream::GetReceiveStreamStats() {
+ MOZ_ASSERT(OnSocketThread(), "not on socket thread");
+
+ nsCOMPtr<nsIWebTransportReceiveStreamStats> stats =
+ new WebTransportReceiveStreamStats(mTotalReceived);
+ return stats.forget();
+}
+
+nsresult Http3WebTransportStream::OnReadSegment(const char* buf, uint32_t count,
+ uint32_t* countRead) {
+ MOZ_ASSERT(OnSocketThread(), "not on socket thread");
+
+ LOG(("Http3WebTransportStream::OnReadSegment count=%u state=%d [this=%p]",
+ count, mSendState, this));
+
+ nsresult rv = NS_OK;
+
+ switch (mSendState) {
+ case WAITING_TO_ACTIVATE:
+ rv = TryActivating();
+ if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
+ LOG3(
+ ("Http3WebTransportStream::OnReadSegment %p cannot activate now. "
+ "queued.\n",
+ this));
+ break;
+ }
+ if (NS_FAILED(rv)) {
+ LOG3(
+ ("Http3WebTransportStream::OnReadSegment %p cannot activate "
+ "error=0x%" PRIx32 ".",
+ this, static_cast<uint32_t>(rv)));
+ mStreamReadyCallback(Err(rv));
+ break;
+ }
+
+ rv = InitOutputPipe();
+ if (NS_SUCCEEDED(rv) && mStreamType == WebTransportStreamType::BiDi) {
+ rv = InitInputPipe();
+ }
+ if (NS_FAILED(rv)) {
+ LOG3(
+ ("Http3WebTransportStream::OnReadSegment %p failed to create pipe "
+ "error=0x%" PRIx32 ".",
+ this, static_cast<uint32_t>(rv)));
+ mSendState = SEND_DONE;
+ mStreamReadyCallback(Err(rv));
+ break;
+ }
+
+ // Successfully activated.
+ mStreamReadyCallback(RefPtr{this});
+ break;
+ case SENDING: {
+ rv = mSession->SendRequestBody(mStreamId, buf, count, countRead);
+ LOG3(
+ ("Http3WebTransportStream::OnReadSegment %p sending body returns "
+ "error=0x%" PRIx32 ".",
+ this, static_cast<uint32_t>(rv)));
+ mTotalSent += *countRead;
+ } break;
+ default:
+ MOZ_ASSERT(false, "We are done sending this request!");
+ rv = NS_ERROR_UNEXPECTED;
+ break;
+ }
+
+ mSocketOutCondition = rv;
+
+ return mSocketOutCondition;
+}
+
+// static
+nsresult Http3WebTransportStream::ReadRequestSegment(
+ nsIInputStream* stream, void* closure, const char* buf, uint32_t offset,
+ uint32_t count, uint32_t* countRead) {
+ Http3WebTransportStream* wtStream = (Http3WebTransportStream*)closure;
+ nsresult rv = wtStream->OnReadSegment(buf, count, countRead);
+ LOG(("Http3WebTransportStream::ReadRequestSegment %p read=%u", wtStream,
+ *countRead));
+ return rv;
+}
+
+nsresult Http3WebTransportStream::ReadSegments() {
+ MOZ_ASSERT(OnSocketThread(), "not on socket thread");
+ LOG(("Http3WebTransportStream::ReadSegments [this=%p]", this));
+ nsresult rv = NS_OK;
+ uint32_t sendBytes = 0;
+ bool again = true;
+ do {
+ sendBytes = 0;
+ rv = mSocketOutCondition = NS_OK;
+ LOG(("Http3WebTransportStream::ReadSegments state=%d [this=%p]", mSendState,
+ this));
+ switch (mSendState) {
+ case WAITING_TO_ACTIVATE: {
+ LOG3(
+ ("Http3WebTransportStream %p ReadSegments forcing OnReadSegment "
+ "call\n",
+ this));
+ uint32_t wasted = 0;
+ nsresult rv2 = OnReadSegment("", 0, &wasted);
+ LOG3((" OnReadSegment returned 0x%08" PRIx32,
+ static_cast<uint32_t>(rv2)));
+ }
+ [[fallthrough]];
+ case WAITING_DATA:
+ [[fallthrough]];
+ case SENDING: {
+ if (mStreamRole == INCOMING &&
+ mStreamType == WebTransportStreamType::UniDi) {
+ rv = NS_OK;
+ break;
+ }
+ rv = mSendStreamPipeIn->ReadSegments(ReadRequestSegment, this,
+ nsIOService::gDefaultSegmentSize,
+ &sendBytes);
+ } break;
+ case SEND_DONE: {
+ return NS_OK;
+ }
+ default:
+ sendBytes = 0;
+ rv = NS_OK;
+ break;
+ }
+
+ LOG(("Http3WebTransportStream::ReadSegments rv=0x%" PRIx32
+ " read=%u sock-cond=%" PRIx32 " again=%d mSendFin=%d [this=%p]",
+ static_cast<uint32_t>(rv), sendBytes,
+ static_cast<uint32_t>(mSocketOutCondition), again, mSendFin, this));
+
+ // XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
+ if (rv == NS_BASE_STREAM_CLOSED || !mPendingTasks.IsEmpty()) {
+ rv = NS_OK;
+ sendBytes = 0;
+ }
+
+ if (NS_FAILED(rv)) {
+ // if the writer didn't want to write any more data, then
+ // wait for the transaction to call ResumeSend.
+ if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
+ mSendState = WAITING_DATA;
+ rv = mSendStreamPipeIn->AsyncWait(this, 0, 0, gSocketTransportService);
+ }
+ again = false;
+ } else if (NS_FAILED(mSocketOutCondition)) {
+ if (mSocketOutCondition != NS_BASE_STREAM_WOULD_BLOCK) {
+ rv = mSocketOutCondition;
+ }
+ again = false;
+ } else if (!sendBytes) {
+ mSendState = SEND_DONE;
+ rv = NS_OK;
+ again = false;
+ if (!mPendingTasks.IsEmpty()) {
+ LOG(("Has pending tasks to do"));
+ nsTArray<std::function<void()>> tasks = std::move(mPendingTasks);
+ for (const auto& task : tasks) {
+ task();
+ }
+ }
+ }
+
+ // write more to the socket until error or end-of-request...
+ } while (again && gHttpHandler->Active());
+ return rv;
+}
+
+nsresult Http3WebTransportStream::OnWriteSegment(char* buf, uint32_t count,
+ uint32_t* countWritten) {
+ MOZ_ASSERT(OnSocketThread(), "not on socket thread");
+
+ LOG(("Http3WebTransportStream::OnWriteSegment [this=%p, state=%d", this,
+ mRecvState));
+ nsresult rv = NS_OK;
+ switch (mRecvState) {
+ case READING: {
+ rv = mSession->ReadResponseData(mStreamId, buf, count, countWritten,
+ &mFin);
+ if (NS_FAILED(rv)) {
+ break;
+ }
+ if (*countWritten == 0) {
+ if (mFin) {
+ mRecvState = RECV_DONE;
+ rv = NS_BASE_STREAM_CLOSED;
+ } else {
+ rv = NS_BASE_STREAM_WOULD_BLOCK;
+ }
+ } else {
+ mTotalReceived += *countWritten;
+ if (mFin) {
+ mRecvState = RECEIVED_FIN;
+ }
+ }
+ } break;
+ case RECEIVED_FIN:
+ rv = NS_BASE_STREAM_CLOSED;
+ mRecvState = RECV_DONE;
+ break;
+ case RECV_DONE:
+ rv = NS_ERROR_UNEXPECTED;
+ break;
+ default:
+ rv = NS_ERROR_UNEXPECTED;
+ break;
+ }
+
+ // Remember the error received from lower layers. A stream pipe may overwrite
+ // it.
+ // If rv == NS_OK this will reset mSocketInCondition.
+ mSocketInCondition = rv;
+
+ return rv;
+}
+
+// static
+nsresult Http3WebTransportStream::WritePipeSegment(nsIOutputStream* stream,
+ void* closure, char* buf,
+ uint32_t offset,
+ uint32_t count,
+ uint32_t* countWritten) {
+ Http3WebTransportStream* self = (Http3WebTransportStream*)closure;
+
+ nsresult rv = self->OnWriteSegment(buf, count, countWritten);
+ if (NS_FAILED(rv)) {
+ return rv;
+ }
+
+ LOG(("Http3WebTransportStream::WritePipeSegment %p written=%u", self,
+ *countWritten));
+
+ return rv;
+}
+
+nsresult Http3WebTransportStream::WriteSegments() {
+ MOZ_ASSERT(OnSocketThread(), "not on socket thread");
+ LOG(("Http3WebTransportStream::WriteSegments [this=%p]", this));
+
+ nsresult rv = NS_OK;
+ uint32_t countWrittenSingle = 0;
+ bool again = true;
+
+ do {
+ mSocketInCondition = NS_OK;
+ countWrittenSingle = 0;
+ rv = mReceiveStreamPipeOut->WriteSegments(WritePipeSegment, this,
+ nsIOService::gDefaultSegmentSize,
+ &countWrittenSingle);
+ LOG(("Http3Stream::WriteSegments rv=0x%" PRIx32
+ " countWrittenSingle=%" PRIu32 " socketin=%" PRIx32 " [this=%p]",
+ static_cast<uint32_t>(rv), countWrittenSingle,
+ static_cast<uint32_t>(mSocketInCondition), this));
+ if (NS_FAILED(rv)) {
+ if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
+ rv = NS_OK;
+ }
+ again = false;
+ } else if (NS_FAILED(mSocketInCondition)) {
+ if (mSocketInCondition != NS_BASE_STREAM_WOULD_BLOCK) {
+ rv = mSocketInCondition;
+ }
+ again = false;
+ }
+ // read more from the socket until error...
+ } while (again && gHttpHandler->Active());
+
+ return NS_OK;
+}
+
+bool Http3WebTransportStream::Done() const {
+ // To be implemented in bug 1790403.
+ return false;
+}
+
+void Http3WebTransportStream::Close(nsresult aResult) {
+ MOZ_ASSERT(OnSocketThread(), "not on socket thread");
+ LOG(("Http3WebTransportStream::Close [this=%p]", this));
+}
+
+void Http3WebTransportStream::SendFin() {
+ MOZ_ASSERT(OnSocketThread(), "not on socket thread");
+ LOG(("Http3WebTransportStream::SendFin [this=%p mSendState=%d]", this,
+ mSendState));
+
+ if (mSendFin) {
+ // Already closed.
+ return;
+ }
+
+ mSendFin = true;
+
+ switch (mSendState) {
+ case SENDING: {
+ mPendingTasks.AppendElement([self = RefPtr{this}]() {
+ self->mSession->CloseSendingSide(self->mStreamId);
+ });
+ } break;
+ case WAITING_DATA:
+ mSendState = SEND_DONE;
+ [[fallthrough]];
+ case SEND_DONE:
+ mSession->CloseSendingSide(mStreamId);
+ // StreamHasDataToWrite needs to be called to trigger ProcessOutput.
+ mSession->StreamHasDataToWrite(this);
+ break;
+ default:
+ MOZ_ASSERT_UNREACHABLE("invalid mSendState!");
+ break;
+ }
+}
+
+void Http3WebTransportStream::Reset(uint8_t aErrorCode) {
+ MOZ_ASSERT(OnSocketThread(), "not on socket thread");
+ LOG(("Http3WebTransportStream::Reset [this=%p, mSendState=%d]", this,
+ mSendState));
+
+ if (mResetError) {
+ // The stream is already reset.
+ return;
+ }
+
+ mResetError = Some(aErrorCode);
+
+ switch (mSendState) {
+ case SENDING: {
+ LOG(("Http3WebTransportStream::Reset [this=%p] reset after sending data",
+ this));
+ mPendingTasks.AppendElement([self = RefPtr{this}]() {
+ // "Reset" needs a special treatment here. If we are sending data and
+ // ResetWebTransportStream is called before Http3Session::ProcessOutput,
+ // neqo will drop the last piece of data.
+ NS_DispatchToCurrentThread(
+ NS_NewRunnableFunction("Http3WebTransportStream::Reset", [self]() {
+ self->mSession->ResetWebTransportStream(self, *self->mResetError);
+ self->mSession->StreamHasDataToWrite(self);
+ }));
+ });
+ } break;
+ case WAITING_DATA:
+ mSendState = SEND_DONE;
+ [[fallthrough]];
+ case SEND_DONE:
+ mSession->ResetWebTransportStream(this, *mResetError);
+ // StreamHasDataToWrite needs to be called to trigger ProcessOutput.
+ mSession->StreamHasDataToWrite(this);
+ break;
+ default:
+ MOZ_ASSERT_UNREACHABLE("invalid mSendState!");
+ break;
+ }
+}
+
+void Http3WebTransportStream::SendStopSending(uint8_t aErrorCode) {
+ MOZ_ASSERT(OnSocketThread(), "not on socket thread");
+ LOG(("Http3WebTransportStream::SendStopSending [this=%p, mSendState=%d]",
+ this, mSendState));
+
+ if (mSendState == WAITING_TO_ACTIVATE) {
+ return;
+ }
+
+ if (mStopSendingError) {
+ return;
+ }
+
+ mStopSendingError = Some(aErrorCode);
+
+ mSession->StreamStopSending(this, *mStopSendingError);
+ // StreamHasDataToWrite needs to be called to trigger ProcessOutput.
+ mSession->StreamHasDataToWrite(this);
+}
+
+} // namespace mozilla::net