summaryrefslogtreecommitdiffstats
path: root/dom/webtransport/api/WebTransportStreams.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'dom/webtransport/api/WebTransportStreams.cpp')
-rw-r--r--dom/webtransport/api/WebTransportStreams.cpp197
1 files changed, 197 insertions, 0 deletions
diff --git a/dom/webtransport/api/WebTransportStreams.cpp b/dom/webtransport/api/WebTransportStreams.cpp
new file mode 100644
index 0000000000..82924fa78f
--- /dev/null
+++ b/dom/webtransport/api/WebTransportStreams.cpp
@@ -0,0 +1,197 @@
+/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim:set ts=2 sw=2 sts=2 et cindent: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#include "mozilla/dom/WebTransportStreams.h"
+
+#include "mozilla/dom/WebTransportLog.h"
+#include "mozilla/dom/Promise-inl.h"
+#include "mozilla/dom/WebTransport.h"
+#include "mozilla/dom/WebTransportBidirectionalStream.h"
+#include "mozilla/dom/WebTransportReceiveStream.h"
+#include "mozilla/dom/WebTransportSendStream.h"
+#include "mozilla/Result.h"
+
+using namespace mozilla::ipc;
+
+namespace mozilla::dom {
+NS_IMPL_CYCLE_COLLECTION_INHERITED(WebTransportIncomingStreamsAlgorithms,
+ UnderlyingSourceAlgorithmsWrapper,
+ mTransport, mCallback)
+NS_IMPL_ADDREF_INHERITED(WebTransportIncomingStreamsAlgorithms,
+ UnderlyingSourceAlgorithmsWrapper)
+NS_IMPL_RELEASE_INHERITED(WebTransportIncomingStreamsAlgorithms,
+ UnderlyingSourceAlgorithmsWrapper)
+NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(WebTransportIncomingStreamsAlgorithms)
+NS_INTERFACE_MAP_END_INHERITING(UnderlyingSourceAlgorithmsWrapper)
+
+WebTransportIncomingStreamsAlgorithms::WebTransportIncomingStreamsAlgorithms(
+ StreamType aUnidirectional, WebTransport* aTransport)
+ : mUnidirectional(aUnidirectional), mTransport(aTransport) {}
+
+WebTransportIncomingStreamsAlgorithms::
+ ~WebTransportIncomingStreamsAlgorithms() = default;
+
+already_AddRefed<Promise>
+WebTransportIncomingStreamsAlgorithms::PullCallbackImpl(
+ JSContext* aCx, ReadableStreamController& aController, ErrorResult& aRv) {
+ // https://w3c.github.io/webtransport/#pullbidirectionalstream and
+ // https://w3c.github.io/webtransport/#pullunidirectionalstream
+
+ // Step 1: If transport.[[State]] is "connecting", then return the result
+ // of performing the following steps upon fulfillment of
+ // transport.[[Ready]]:
+ // We don't explicitly check mState here, since we'll reject
+ // mIncomingStreamPromise if we go to FAILED or CLOSED
+ //
+ // Step 2: Let session be transport.[[Session]].
+ // Step 3: Let p be a new promise.
+ RefPtr<Promise> promise =
+ Promise::CreateInfallible(mTransport->GetParentObject());
+ RefPtr<WebTransportIncomingStreamsAlgorithms> self(this);
+ // The real work of PullCallback()
+ // Step 5: Wait until there is an available incoming unidirectional stream.
+ auto length = (mUnidirectional == StreamType::Unidirectional)
+ ? mTransport->mUnidirectionalStreams.Length()
+ : mTransport->mBidirectionalStreams.Length();
+ if (length == 0) {
+ // We need to wait.
+ // Per
+ // https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pulling
+ // we can't be called again until the promise is resolved
+ MOZ_ASSERT(!mCallback);
+ mCallback = promise;
+
+ LOG(("Incoming%sDirectionalStreams Pull waiting for a stream",
+ mUnidirectional == StreamType::Unidirectional ? "Uni" : "Bi"));
+ Result<RefPtr<Promise>, nsresult> returnResult =
+ promise->ThenWithCycleCollectedArgs(
+ [](JSContext* aCx, JS::Handle<JS::Value>, ErrorResult& aRv,
+ RefPtr<WebTransportIncomingStreamsAlgorithms> self,
+ RefPtr<Promise> aPromise) -> already_AddRefed<Promise> {
+ self->BuildStream(aCx, aRv);
+ return nullptr;
+ },
+ self, promise);
+ if (returnResult.isErr()) {
+ // XXX Reject?
+ aRv.Throw(returnResult.unwrapErr());
+ return nullptr;
+ }
+ // Step 4: Return p and run the remaining steps in parallel.
+ return returnResult.unwrap().forget();
+ }
+ self->BuildStream(aCx, aRv);
+ // Step 4: Return p and run the remaining steps in parallel.
+ return promise.forget();
+}
+
+// Note: fallible
+void WebTransportIncomingStreamsAlgorithms::BuildStream(JSContext* aCx,
+ ErrorResult& aRv) {
+ // https://w3c.github.io/webtransport/#pullbidirectionalstream and
+ // https://w3c.github.io/webtransport/#pullunidirectionalstream
+ LOG(("Incoming%sDirectionalStreams Pull building a stream",
+ mUnidirectional == StreamType::Unidirectional ? "Uni" : "Bi"));
+ if (mUnidirectional == StreamType::Unidirectional) {
+ // Step 6: Let internalStream be the result of receiving an incoming
+ // unidirectional stream.
+ MOZ_ASSERT(mTransport->mUnidirectionalStreams.Length() > 0);
+ std::tuple<uint64_t, RefPtr<mozilla::ipc::DataPipeReceiver>> tuple =
+ mTransport->mUnidirectionalStreams[0];
+ mTransport->mUnidirectionalStreams.RemoveElementAt(0);
+
+ // Step 7.1: Let stream be the result of creating a
+ // WebTransportReceiveStream with internalStream and transport
+ RefPtr<WebTransportReceiveStream> readableStream =
+ WebTransportReceiveStream::Create(mTransport, mTransport->mGlobal,
+ std::get<0>(tuple),
+ std::get<1>(tuple), aRv);
+ if (MOZ_UNLIKELY(!readableStream)) {
+ aRv.ThrowUnknownError("Internal error");
+ return;
+ }
+ // Step 7.2 Enqueue stream to transport.[[IncomingUnidirectionalStreams]].
+ JS::Rooted<JS::Value> jsStream(aCx);
+ if (MOZ_UNLIKELY(!ToJSValue(aCx, readableStream, &jsStream))) {
+ aRv.ThrowUnknownError("Internal error");
+ return;
+ }
+ // EnqueueNative is CAN_RUN_SCRIPT
+ RefPtr<ReadableStream> incomingStream =
+ mTransport->mIncomingUnidirectionalStreams;
+ incomingStream->EnqueueNative(aCx, jsStream, aRv);
+ if (MOZ_UNLIKELY(aRv.Failed())) {
+ aRv.ThrowUnknownError("Internal error");
+ return;
+ }
+ } else {
+ // Step 6: Let internalStream be the result of receiving a bidirectional
+ // stream
+ MOZ_ASSERT(mTransport->mBidirectionalStreams.Length() > 0);
+ std::tuple<uint64_t, UniquePtr<BidirectionalPair>> tuple =
+ std::move(mTransport->mBidirectionalStreams.ElementAt(0));
+ mTransport->mBidirectionalStreams.RemoveElementAt(0);
+ RefPtr<DataPipeReceiver> input = std::get<1>(tuple)->first.forget();
+ RefPtr<DataPipeSender> output = std::get<1>(tuple)->second.forget();
+
+ RefPtr<WebTransportBidirectionalStream> stream =
+ WebTransportBidirectionalStream::Create(mTransport, mTransport->mGlobal,
+ std::get<0>(tuple), input,
+ output, aRv);
+
+ // Step 7.2 Enqueue stream to transport.[[IncomingBidirectionalStreams]].
+ JS::Rooted<JS::Value> jsStream(aCx);
+ if (MOZ_UNLIKELY(!ToJSValue(aCx, stream, &jsStream))) {
+ return;
+ }
+ LOG(("Enqueuing bidirectional stream\n"));
+ // EnqueueNative is CAN_RUN_SCRIPT
+ RefPtr<ReadableStream> incomingStream =
+ mTransport->mIncomingBidirectionalStreams;
+ incomingStream->EnqueueNative(aCx, jsStream, aRv);
+ if (MOZ_UNLIKELY(aRv.Failed())) {
+ return;
+ }
+ }
+ // Step 7.3: Resolve p with undefined.
+}
+
+void WebTransportIncomingStreamsAlgorithms::NotifyIncomingStream() {
+ if (mUnidirectional == StreamType::Unidirectional) {
+ LOG(("NotifyIncomingStream: %zu Unidirectional ",
+ mTransport->mUnidirectionalStreams.Length()));
+#ifdef DEBUG
+ auto number = mTransport->mUnidirectionalStreams.Length();
+ MOZ_ASSERT(number > 0);
+#endif
+ RefPtr<Promise> promise = mCallback.forget();
+ if (promise) {
+ promise->MaybeResolveWithUndefined();
+ }
+ } else {
+ LOG(("NotifyIncomingStream: %zu Bidirectional ",
+ mTransport->mBidirectionalStreams.Length()));
+#ifdef DEBUG
+ auto number = mTransport->mBidirectionalStreams.Length();
+ MOZ_ASSERT(number > 0);
+#endif
+ RefPtr<Promise> promise = mCallback.forget();
+ if (promise) {
+ promise->MaybeResolveWithUndefined();
+ }
+ }
+}
+
+void WebTransportIncomingStreamsAlgorithms::NotifyRejectAll() {
+ // cancel all pulls
+ LOG(("Cancel all WebTransport Pulls"));
+ // Ensure we clear the callback before resolving/rejecting it
+ if (RefPtr<Promise> promise = mCallback.forget()) {
+ promise->MaybeReject(NS_ERROR_FAILURE);
+ }
+}
+
+} // namespace mozilla::dom