summaryrefslogtreecommitdiffstats
path: root/js/src/builtin/streams/PipeToState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'js/src/builtin/streams/PipeToState.cpp')
-rw-r--r--js/src/builtin/streams/PipeToState.cpp1271
1 files changed, 1271 insertions, 0 deletions
diff --git a/js/src/builtin/streams/PipeToState.cpp b/js/src/builtin/streams/PipeToState.cpp
new file mode 100644
index 0000000000..f79e81d3fe
--- /dev/null
+++ b/js/src/builtin/streams/PipeToState.cpp
@@ -0,0 +1,1271 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*-
+ * vim: set ts=8 sts=2 et sw=2 tw=80:
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/* ReadableStream.prototype.pipeTo state. */
+
+#include "builtin/streams/PipeToState-inl.h"
+
+#include "mozilla/Assertions.h" // MOZ_ASSERT
+#include "mozilla/Attributes.h" // MOZ_MUST_USE
+#include "mozilla/Maybe.h" // mozilla::Maybe, mozilla::Nothing, mozilla::Some
+
+#include "jsapi.h" // JS_ReportErrorNumberASCII
+
+#include "builtin/Promise.h" // js::RejectPromiseWithPendingError
+#include "builtin/streams/ReadableStream.h" // js::ReadableStream
+#include "builtin/streams/ReadableStreamReader.h" // js::CreateReadableStreamDefaultReader, js::ForAuthorCodeBool, js::ReadableStreamDefaultReader, js::ReadableStreamReaderGenericRelease
+#include "builtin/streams/WritableStream.h" // js::WritableStream
+#include "builtin/streams/WritableStreamDefaultWriter.h" // js::CreateWritableStreamDefaultWriter, js::WritableStreamDefaultWriter
+#include "builtin/streams/WritableStreamOperations.h" // js::WritableStreamCloseQueuedOrInFlight
+#include "builtin/streams/WritableStreamWriterOperations.h" // js::WritableStreamDefaultWriter{GetDesiredSize,Release,Write}
+#include "js/CallArgs.h" // JS::CallArgsFromVp, JS::CallArgs
+#include "js/Class.h" // JSClass, JSCLASS_HAS_RESERVED_SLOTS
+#include "js/friend/ErrorMessages.h" // js::GetErrorMessage, JSMSG_*
+#include "js/Promise.h" // JS::AddPromiseReactions
+#include "js/RootingAPI.h" // JS::Handle, JS::Rooted
+#include "js/Value.h" // JS::{,Int32,Magic,Object}Value, JS::UndefinedHandleValue
+#include "vm/JSContext.h" // JSContext
+#include "vm/PromiseObject.h" // js::PromiseObject
+#include "vm/Runtime.h" // JSRuntime
+
+#include "builtin/HandlerFunction-inl.h" // js::ExtraValueFromHandler, js::NewHandler{,WithExtraValue}, js::TargetFromHandler
+#include "builtin/streams/ReadableStreamReader-inl.h" // js::UnwrapReaderFromStream, js::UnwrapStreamFromReader
+#include "builtin/streams/WritableStream-inl.h" // js::UnwrapWriterFromStream
+#include "builtin/streams/WritableStreamDefaultWriter-inl.h" // js::UnwrapStreamFromWriter
+#include "vm/JSContext-inl.h" // JSContext::check
+#include "vm/JSObject-inl.h" // js::NewBuiltinClassInstance
+#include "vm/Realm-inl.h" // js::AutoRealm
+
+using mozilla::Maybe;
+using mozilla::Nothing;
+using mozilla::Some;
+
+using JS::CallArgs;
+using JS::CallArgsFromVp;
+using JS::Handle;
+using JS::Int32Value;
+using JS::MagicValue;
+using JS::ObjectValue;
+using JS::Rooted;
+using JS::UndefinedHandleValue;
+using JS::Value;
+
+using js::ExtraValueFromHandler;
+using js::GetErrorMessage;
+using js::NewHandler;
+using js::NewHandlerWithExtraValue;
+using js::PipeToState;
+using js::PromiseObject;
+using js::ReadableStream;
+using js::ReadableStreamDefaultReader;
+using js::ReadableStreamReaderGenericRelease;
+using js::TargetFromHandler;
+using js::UnwrapReaderFromStream;
+using js::UnwrapStreamFromWriter;
+using js::UnwrapWriterFromStream;
+using js::WritableStream;
+using js::WritableStreamDefaultWriter;
+using js::WritableStreamDefaultWriterRelease;
+using js::WritableStreamDefaultWriterWrite;
+
+static ReadableStream* GetUnwrappedSource(JSContext* cx,
+ Handle<PipeToState*> state) {
+ cx->check(state);
+
+ Rooted<ReadableStreamDefaultReader*> reader(cx, state->reader());
+ cx->check(reader);
+
+ return UnwrapStreamFromReader(cx, reader);
+}
+
+static WritableStream* GetUnwrappedDest(JSContext* cx,
+ Handle<PipeToState*> state) {
+ cx->check(state);
+
+ Rooted<WritableStreamDefaultWriter*> writer(cx, state->writer());
+ cx->check(writer);
+
+ return UnwrapStreamFromWriter(cx, writer);
+}
+
+static bool WritableAndNotClosing(const WritableStream* unwrappedDest) {
+ return unwrappedDest->writable() &&
+ WritableStreamCloseQueuedOrInFlight(unwrappedDest);
+}
+
+static MOZ_MUST_USE bool Finalize(JSContext* cx, Handle<PipeToState*> state,
+ Handle<Maybe<Value>> error) {
+ cx->check(state);
+ cx->check(error);
+
+ // Step 1: Perform ! WritableStreamDefaultWriterRelease(writer).
+ Rooted<WritableStreamDefaultWriter*> writer(cx, state->writer());
+ cx->check(writer);
+ if (!WritableStreamDefaultWriterRelease(cx, writer)) {
+ return false;
+ }
+
+ // Step 2: Perform ! ReadableStreamReaderGenericRelease(reader).
+ Rooted<ReadableStreamDefaultReader*> reader(cx, state->reader());
+ cx->check(reader);
+ if (!ReadableStreamReaderGenericRelease(cx, reader)) {
+ return false;
+ }
+
+ // Step 3: If signal is not undefined, remove abortAlgorithm from signal.
+ // XXX
+
+ Rooted<PromiseObject*> promise(cx, state->promise());
+ cx->check(promise);
+
+ // Step 4: If error was given, reject promise with error.
+ if (error.isSome()) {
+ Rooted<Value> errorVal(cx, *error.get());
+ return PromiseObject::reject(cx, promise, errorVal);
+ }
+
+ // Step 5: Otherwise, resolve promise with undefined.
+ return PromiseObject::resolve(cx, promise, UndefinedHandleValue);
+}
+
+static MOZ_MUST_USE bool Finalize(JSContext* cx, unsigned argc, Value* vp) {
+ CallArgs args = CallArgsFromVp(argc, vp);
+
+ Rooted<PipeToState*> state(cx, TargetFromHandler<PipeToState>(args));
+ cx->check(state);
+
+ Rooted<Maybe<Value>> optionalError(cx, Nothing());
+ if (Value maybeError = ExtraValueFromHandler(args);
+ !maybeError.isMagic(JS_READABLESTREAM_PIPETO_FINALIZE_WITHOUT_ERROR)) {
+ optionalError = Some(maybeError);
+ }
+ cx->check(optionalError);
+
+ if (!Finalize(cx, state, optionalError)) {
+ return false;
+ }
+
+ args.rval().setUndefined();
+ return true;
+}
+
+// Shutdown with an action, steps d-f:
+// d. Let p be the result of performing action.
+// e. Upon fulfillment of p, finalize, passing along originalError if it was
+// given.
+// f. Upon rejection of p with reason newError, finalize with newError.
+static MOZ_MUST_USE bool ActAndFinalize(JSContext* cx,
+ Handle<PipeToState*> state,
+ Handle<Maybe<Value>> error) {
+ // Step d: Let p be the result of performing action.
+ Rooted<JSObject*> p(cx);
+ switch (state->shutdownAction()) {
+ // This corresponds to the action performed by |abortAlgorithm| in
+ // ReadableStreamPipeTo step 14.1.5.
+ case PipeToState::ShutdownAction::AbortAlgorithm: {
+ MOZ_ASSERT(error.get().isSome());
+
+ // From ReadableStreamPipeTo:
+ // Step 14.1.2: Let actions be an empty ordered set.
+ // Step 14.1.3: If preventAbort is false, append the following action to
+ // actions:
+ // Step 14.1.3.1: If dest.[[state]] is "writable", return
+ // ! WritableStreamAbort(dest, error).
+ // Step 14.1.3.2: Otherwise, return a promise resolved with undefined.
+ // Step 14.1.4: If preventCancel is false, append the following action
+ // action to actions:
+ // Step 14.1.4.1.: If source.[[state]] is "readable", return
+ // ! ReadableStreamCancel(source, error).
+ // Step 14.1.4.2: Otherwise, return a promise resolved with undefined.
+ JS_ReportErrorNumberASCII(cx, GetErrorMessage, nullptr,
+ JSMSG_READABLESTREAM_METHOD_NOT_IMPLEMENTED,
+ "any required actions during abortAlgorithm");
+ return false;
+ }
+
+ // This corresponds to the action in "shutdown with an action of
+ // ! WritableStreamAbort(dest, source.[[storedError]]) and with
+ // source.[[storedError]]."
+ case PipeToState::ShutdownAction::AbortDestStream: {
+ MOZ_ASSERT(error.get().isSome());
+
+ Rooted<WritableStream*> unwrappedDest(cx, GetUnwrappedDest(cx, state));
+ if (!unwrappedDest) {
+ return false;
+ }
+
+ Rooted<Value> sourceStoredError(cx, *error.get());
+ cx->check(sourceStoredError);
+
+ p = WritableStreamAbort(cx, unwrappedDest, sourceStoredError);
+ break;
+ }
+
+ // This corresponds to two actions:
+ //
+ // * The action in "shutdown with an action of
+ // ! ReadableStreamCancel(source, dest.[[storedError]]) and with
+ // dest.[[storedError]]" as used in "Errors must be propagated backward:
+ // if dest.[[state]] is or becomes 'errored'".
+ // * The action in "shutdown with an action of
+ // ! ReadableStreamCancel(source, destClosed) and with destClosed" as used
+ // in "Closing must be propagated backward: if
+ // ! WritableStreamCloseQueuedOrInFlight(dest) is true or dest.[[state]]
+ // is 'closed'".
+ //
+ // The different reason-values are passed as |error|.
+ case PipeToState::ShutdownAction::CancelSource: {
+ MOZ_ASSERT(error.get().isSome());
+
+ Rooted<ReadableStream*> unwrappedSource(cx,
+ GetUnwrappedSource(cx, state));
+ if (!unwrappedSource) {
+ return false;
+ }
+
+ Rooted<Value> reason(cx, *error.get());
+ cx->check(reason);
+
+ p = ReadableStreamCancel(cx, unwrappedSource, reason);
+ break;
+ }
+
+ // This corresponds to the action in "shutdown with an action of
+ // ! WritableStreamDefaultWriterCloseWithErrorPropagation(writer)" as done
+ // in "Closing must be propagated forward: if source.[[state]] is or becomes
+ // 'closed'".
+ case PipeToState::ShutdownAction::CloseWriterWithErrorPropagation: {
+ MOZ_ASSERT(error.get().isNothing());
+
+ Rooted<WritableStreamDefaultWriter*> writer(cx, state->writer());
+ cx->check(writer); // just for good measure: we don't depend on this
+
+ p = WritableStreamDefaultWriterCloseWithErrorPropagation(cx, writer);
+ break;
+ }
+ }
+ if (!p) {
+ return false;
+ }
+
+ // Step e: Upon fulfillment of p, finalize, passing along originalError if it
+ // was given.
+ Rooted<JSFunction*> onFulfilled(cx);
+ {
+ Rooted<Value> optionalError(
+ cx, error.isSome()
+ ? *error.get()
+ : MagicValue(JS_READABLESTREAM_PIPETO_FINALIZE_WITHOUT_ERROR));
+ onFulfilled = NewHandlerWithExtraValue(cx, Finalize, state, optionalError);
+ if (!onFulfilled) {
+ return false;
+ }
+ }
+
+ // Step f: Upon rejection of p with reason newError, finalize with newError.
+ auto OnRejected = [](JSContext* cx, unsigned argc, Value* vp) {
+ CallArgs args = CallArgsFromVp(argc, vp);
+
+ Rooted<PipeToState*> state(cx, TargetFromHandler<PipeToState>(args));
+ cx->check(state);
+
+ Rooted<Maybe<Value>> newError(cx, Some(args[0]));
+ cx->check(newError);
+ if (!Finalize(cx, state, newError)) {
+ return false;
+ }
+
+ args.rval().setUndefined();
+ return true;
+ };
+
+ Rooted<JSFunction*> onRejected(cx, NewHandler(cx, OnRejected, state));
+ if (!onRejected) {
+ return false;
+ }
+
+ return JS::AddPromiseReactions(cx, p, onFulfilled, onRejected);
+}
+
+static MOZ_MUST_USE bool ActAndFinalize(JSContext* cx, unsigned argc,
+ Value* vp) {
+ CallArgs args = CallArgsFromVp(argc, vp);
+
+ Rooted<PipeToState*> state(cx, TargetFromHandler<PipeToState>(args));
+ cx->check(state);
+
+ Rooted<Maybe<Value>> optionalError(cx, Nothing());
+ if (Value maybeError = ExtraValueFromHandler(args);
+ !maybeError.isMagic(JS_READABLESTREAM_PIPETO_FINALIZE_WITHOUT_ERROR)) {
+ optionalError = Some(maybeError);
+ }
+ cx->check(optionalError);
+
+ if (!ActAndFinalize(cx, state, optionalError)) {
+ return false;
+ }
+
+ args.rval().setUndefined();
+ return true;
+}
+
+// Shutdown with an action: if any of the above requirements ask to shutdown
+// with an action action, optionally with an error originalError, then:
+static MOZ_MUST_USE bool ShutdownWithAction(
+ JSContext* cx, Handle<PipeToState*> state,
+ PipeToState::ShutdownAction action, Handle<Maybe<Value>> originalError) {
+ cx->check(state);
+ cx->check(originalError);
+
+ // Step a: If shuttingDown is true, abort these substeps.
+ if (state->shuttingDown()) {
+ return true;
+ }
+
+ // Step b: Set shuttingDown to true.
+ state->setShuttingDown();
+
+ // Save the action away for later, potentially asynchronous, use.
+ state->setShutdownAction(action);
+
+ // Step c: If dest.[[state]] is "writable" and
+ // ! WritableStreamCloseQueuedOrInFlight(dest) is false,
+ WritableStream* unwrappedDest = GetUnwrappedDest(cx, state);
+ if (!unwrappedDest) {
+ return false;
+ }
+ if (WritableAndNotClosing(unwrappedDest)) {
+ // Step c.i: If any chunks have been read but not yet written, write them
+ // to dest.
+ //
+ // Any chunk that has been read, will have been processed and a pending
+ // write for it created by this point. (A pending read has not been "read".
+ // And any pending read, will not be processed into a pending write because
+ // of the |state->setShuttingDown()| above in concert with the early exit
+ // in this case in |ReadFulfilled|.)
+
+ // Step c.ii: Wait until every chunk that has been read has been written
+ // (i.e. the corresponding promises have settled).
+ if (PromiseObject* p = state->lastWriteRequest()) {
+ Rooted<PromiseObject*> lastWriteRequest(cx, p);
+
+ Rooted<Value> extra(
+ cx,
+ originalError.isSome()
+ ? *originalError.get()
+ : MagicValue(JS_READABLESTREAM_PIPETO_FINALIZE_WITHOUT_ERROR));
+
+ Rooted<JSFunction*> actAndfinalize(
+ cx, NewHandlerWithExtraValue(cx, ActAndFinalize, state, extra));
+ if (!actAndfinalize) {
+ return false;
+ }
+
+ return JS::AddPromiseReactions(cx, lastWriteRequest, actAndfinalize,
+ actAndfinalize);
+ }
+
+ // If no last write request was ever created, we can fall through and
+ // synchronously perform the remaining steps.
+ }
+
+ // Step d: Let p be the result of performing action.
+ // Step e: Upon fulfillment of p, finalize, passing along originalError if it
+ // was given.
+ // Step f: Upon rejection of p with reason newError, finalize with newError.
+ return ActAndFinalize(cx, state, originalError);
+}
+
+// Shutdown: if any of the above requirements or steps ask to shutdown,
+// optionally with an error error, then:
+static MOZ_MUST_USE bool Shutdown(JSContext* cx, Handle<PipeToState*> state,
+ Handle<Maybe<Value>> error) {
+ cx->check(state);
+ cx->check(error);
+
+ // Step a: If shuttingDown is true, abort these substeps.
+ if (state->shuttingDown()) {
+ return true;
+ }
+
+ // Step b: Set shuttingDown to true.
+ state->setShuttingDown();
+
+ // Step c: If dest.[[state]] is "writable" and
+ // ! WritableStreamCloseQueuedOrInFlight(dest) is false,
+ WritableStream* unwrappedDest = GetUnwrappedDest(cx, state);
+ if (!unwrappedDest) {
+ return false;
+ }
+ if (WritableAndNotClosing(unwrappedDest)) {
+ // Step 1: If any chunks have been read but not yet written, write them to
+ // dest.
+ //
+ // Any chunk that has been read, will have been processed and a pending
+ // write for it created by this point. (A pending read has not been "read".
+ // And any pending read, will not be processed into a pending write because
+ // of the |state->setShuttingDown()| above in concert with the early exit
+ // in this case in |ReadFulfilled|.)
+
+ // Step 2: Wait until every chunk that has been read has been written
+ // (i.e. the corresponding promises have settled).
+ if (PromiseObject* p = state->lastWriteRequest()) {
+ Rooted<PromiseObject*> lastWriteRequest(cx, p);
+
+ Rooted<Value> extra(
+ cx,
+ error.isSome()
+ ? *error.get()
+ : MagicValue(JS_READABLESTREAM_PIPETO_FINALIZE_WITHOUT_ERROR));
+
+ Rooted<JSFunction*> finalize(
+ cx, NewHandlerWithExtraValue(cx, Finalize, state, extra));
+ if (!finalize) {
+ return false;
+ }
+
+ return JS::AddPromiseReactions(cx, lastWriteRequest, finalize, finalize);
+ }
+
+ // If no last write request was ever created, we can fall through and
+ // synchronously perform the remaining steps.
+ }
+
+ // Step d: Finalize, passing along error if it was given.
+ return Finalize(cx, state, error);
+}
+
+/**
+ * Streams spec, 3.4.11. ReadableStreamPipeTo step 14:
+ * "a. Errors must be propagated forward: if source.[[state]] is or becomes
+ * 'errored', then..."
+ */
+static MOZ_MUST_USE bool OnSourceErrored(
+ JSContext* cx, Handle<PipeToState*> state,
+ Handle<ReadableStream*> unwrappedSource) {
+ cx->check(state);
+
+ Rooted<Maybe<Value>> storedError(cx, Some(unwrappedSource->storedError()));
+ if (!cx->compartment()->wrap(cx, &storedError)) {
+ return false;
+ }
+
+ // If |source| becomes errored not during a pending read, it's clear we must
+ // react immediately.
+ //
+ // But what if |source| becomes errored *during* a pending read? Should this
+ // first error, or the pending-read second error, predominate? Two semantics
+ // are possible when |source|/|dest| become closed or errored while there's a
+ // pending read:
+ //
+ // 1. Wait until the read fulfills or rejects, then respond to the
+ // closure/error without regard to the read having fulfilled or rejected.
+ // (This will simply not react to the read being rejected, or it will
+ // queue up the read chunk to be written during shutdown.)
+ // 2. React to the closure/error immediately per "Error and close states
+ // must be propagated". Then when the read fulfills or rejects later, do
+ // nothing.
+ //
+ // The spec doesn't clearly require either semantics. It requires that
+ // *already-read* chunks be written (at least if |dest| didn't become errored
+ // or closed such that no further writes can occur). But it's silent as to
+ // not-fully-read chunks. (These semantic differences may only be observable
+ // with very carefully constructed readable/writable streams.)
+ //
+ // It seems best, generally, to react to the temporally-earliest problem that
+ // arises, so we implement option #2. (Blink, in contrast, currently
+ // implements option #1.)
+ //
+ // All specified reactions to a closure/error invoke either the shutdown, or
+ // shutdown with an action, algorithms. Those algorithms each abort if either
+ // shutdown algorithm has already been invoked. So we don't need to do
+ // anything special here to deal with a pending read.
+
+ // ii. Otherwise (if preventAbort is true), shutdown with
+ // source.[[storedError]].
+ if (state->preventAbort()) {
+ if (!Shutdown(cx, state, storedError)) {
+ return false;
+ }
+ }
+ // i. (If preventAbort is false,) shutdown with an action of
+ // ! WritableStreamAbort(dest, source.[[storedError]]) and with
+ // source.[[storedError]].
+ else {
+ if (!ShutdownWithAction(cx, state,
+ PipeToState::ShutdownAction::AbortDestStream,
+ storedError)) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+/**
+ * Streams spec, 3.4.11. ReadableStreamPipeTo step 14:
+ * "b. Errors must be propagated backward: if dest.[[state]] is or becomes
+ * 'errored', then..."
+ */
+static MOZ_MUST_USE bool OnDestErrored(JSContext* cx,
+ Handle<PipeToState*> state,
+ Handle<WritableStream*> unwrappedDest) {
+ cx->check(state);
+
+ Rooted<Maybe<Value>> storedError(cx, Some(unwrappedDest->storedError()));
+ if (!cx->compartment()->wrap(cx, &storedError)) {
+ return false;
+ }
+
+ // As in |OnSourceErrored| above, we must deal with the case of |dest|
+ // erroring before a pending read has fulfilled or rejected.
+ //
+ // As noted there, we handle the *first* error that arises. And because this
+ // algorithm immediately invokes a shutdown algorithm, and shutting down will
+ // inhibit future shutdown attempts, we don't need to do anything special
+ // *here*, either.
+
+ // ii. Otherwise (if preventCancel is true), shutdown with
+ // dest.[[storedError]].
+ if (state->preventCancel()) {
+ if (!Shutdown(cx, state, storedError)) {
+ return false;
+ }
+ }
+ // i. If preventCancel is false, shutdown with an action of
+ // ! ReadableStreamCancel(source, dest.[[storedError]]) and with
+ // dest.[[storedError]].
+ else {
+ if (!ShutdownWithAction(cx, state,
+ PipeToState::ShutdownAction::CancelSource,
+ storedError)) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+/**
+ * Streams spec, 3.4.11. ReadableStreamPipeTo step 14:
+ * "c. Closing must be propagated forward: if source.[[state]] is or becomes
+ * 'closed', then..."
+ */
+static MOZ_MUST_USE bool OnSourceClosed(JSContext* cx,
+ Handle<PipeToState*> state) {
+ cx->check(state);
+
+ Rooted<Maybe<Value>> noError(cx, Nothing());
+
+ // It shouldn't be possible for |source| to become closed *during* a pending
+ // read: such spontaneous closure *should* be enqueued for processing *after*
+ // the settling of the pending read. (Note also that a [[closedPromise]]
+ // resolution in |ReadableStreamClose| occurs only after all pending reads are
+ // resolved.) So we need not do anything to handle a source closure while a
+ // read is in progress.
+
+ // ii. Otherwise (if preventClose is true), shutdown.
+ if (state->preventClose()) {
+ if (!Shutdown(cx, state, noError)) {
+ return false;
+ }
+ }
+ // i. If preventClose is false, shutdown with an action of
+ // ! WritableStreamDefaultWriterCloseWithErrorPropagation(writer).
+ else {
+ if (!ShutdownWithAction(
+ cx, state,
+ PipeToState::ShutdownAction::CloseWriterWithErrorPropagation,
+ noError)) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+/**
+ * Streams spec, 3.4.11. ReadableStreamPipeTo step 14:
+ * "d. Closing must be propagated backward: if
+ * ! WritableStreamCloseQueuedOrInFlight(dest) is true or dest.[[state]] is
+ * 'closed', then..."
+ */
+static MOZ_MUST_USE bool OnDestClosed(JSContext* cx,
+ Handle<PipeToState*> state) {
+ cx->check(state);
+
+ // i. Assert: no chunks have been read or written.
+ //
+ // This assertion holds when this function is called by
+ // |SourceOrDestErroredOrClosed|, before any async internal piping operations
+ // happen.
+ //
+ // But it wouldn't hold for streams that can spontaneously close of their own
+ // accord, like say a hypothetical DOM TCP socket. I think?
+ //
+ // XXX Add this assertion if it really does hold (and is easily performed),
+ // else report a spec bug.
+
+ // ii. Let destClosed be a new TypeError.
+ Rooted<Maybe<Value>> destClosed(cx, Nothing());
+ {
+ JS_ReportErrorNumberASCII(cx, GetErrorMessage, nullptr,
+ JSMSG_WRITABLESTREAM_WRITE_CLOSING_OR_CLOSED);
+ Rooted<Value> v(cx);
+ if (!cx->isExceptionPending() || !GetAndClearException(cx, &v)) {
+ return false;
+ }
+
+ destClosed = Some(v.get());
+ }
+
+ // As in all the |On{Source,Dest}{Closed,Errored}| above, we must consider the
+ // possibility that we're in the middle of a pending read. |state->writer()|
+ // has a lock on |dest| here, so we know only we can be writing chunks to
+ // |dest| -- but there's no reason why |dest| couldn't become closed of its
+ // own accord here (for example, a socket might become closed on its own), and
+ // such closure may or may not be equivalent to error.
+ //
+ // For the reasons noted in |OnSourceErrored|, we process closure in the
+ // middle of a pending read immediately, without delaying for that read to
+ // fulfill or reject. We trigger a shutdown operation below, which will
+ // ensure shutdown only occurs once, so we need not do anything special here.
+
+ // iv. Otherwise (if preventCancel is true), shutdown with destClosed.
+ if (state->preventCancel()) {
+ if (!Shutdown(cx, state, destClosed)) {
+ return false;
+ }
+ }
+ // iii. If preventCancel is false, shutdown with an action of
+ // ! ReadableStreamCancel(source, destClosed) and with destClosed.
+ else {
+ if (!ShutdownWithAction(
+ cx, state, PipeToState::ShutdownAction::CancelSource, destClosed)) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+/**
+ * Streams spec, 3.4.11. ReadableStreamPipeTo step 14:
+ * "Error and close states must be propagated: the following conditions must be
+ * applied in order.", as applied at the very start of piping, before any reads
+ * from source or writes to dest have been triggered.
+ */
+static MOZ_MUST_USE bool SourceOrDestErroredOrClosed(
+ JSContext* cx, Handle<PipeToState*> state,
+ Handle<ReadableStream*> unwrappedSource,
+ Handle<WritableStream*> unwrappedDest, bool* erroredOrClosed) {
+ cx->check(state);
+
+ *erroredOrClosed = true;
+
+ // a. Errors must be propagated forward: if source.[[state]] is or becomes
+ // "errored", then
+ if (unwrappedSource->errored()) {
+ return OnSourceErrored(cx, state, unwrappedSource);
+ }
+
+ // b. Errors must be propagated backward: if dest.[[state]] is or becomes
+ // "errored", then
+ if (unwrappedDest->errored()) {
+ return OnDestErrored(cx, state, unwrappedDest);
+ }
+
+ // c. Closing must be propagated forward: if source.[[state]] is or becomes
+ // "closed", then
+ if (unwrappedSource->closed()) {
+ return OnSourceClosed(cx, state);
+ }
+
+ // d. Closing must be propagated backward: if
+ // ! WritableStreamCloseQueuedOrInFlight(dest) is true or dest.[[state]] is
+ // "closed", then
+ if (WritableStreamCloseQueuedOrInFlight(unwrappedDest) ||
+ unwrappedDest->closed()) {
+ return OnDestClosed(cx, state);
+ }
+
+ *erroredOrClosed = false;
+ return true;
+}
+
+static MOZ_MUST_USE bool OnSourceClosed(JSContext* cx, unsigned argc,
+ Value* vp) {
+ CallArgs args = CallArgsFromVp(argc, vp);
+
+ Rooted<PipeToState*> state(cx, TargetFromHandler<PipeToState>(args));
+ cx->check(state);
+
+ if (!OnSourceClosed(cx, state)) {
+ return false;
+ }
+
+ args.rval().setUndefined();
+ return true;
+}
+
+static MOZ_MUST_USE bool OnSourceErrored(JSContext* cx, unsigned argc,
+ Value* vp) {
+ CallArgs args = CallArgsFromVp(argc, vp);
+
+ Rooted<PipeToState*> state(cx, TargetFromHandler<PipeToState>(args));
+ cx->check(state);
+
+ Rooted<ReadableStream*> unwrappedSource(cx, GetUnwrappedSource(cx, state));
+ if (!unwrappedSource) {
+ return false;
+ }
+
+ if (!OnSourceErrored(cx, state, unwrappedSource)) {
+ return false;
+ }
+
+ args.rval().setUndefined();
+ return true;
+}
+
+static MOZ_MUST_USE bool OnDestClosed(JSContext* cx, unsigned argc, Value* vp) {
+ CallArgs args = CallArgsFromVp(argc, vp);
+
+ Rooted<PipeToState*> state(cx, TargetFromHandler<PipeToState>(args));
+ cx->check(state);
+
+ if (!OnDestClosed(cx, state)) {
+ return false;
+ }
+
+ args.rval().setUndefined();
+ return true;
+}
+
+static MOZ_MUST_USE bool OnDestErrored(JSContext* cx, unsigned argc,
+ Value* vp) {
+ CallArgs args = CallArgsFromVp(argc, vp);
+
+ Rooted<PipeToState*> state(cx, TargetFromHandler<PipeToState>(args));
+ cx->check(state);
+
+ Rooted<WritableStream*> unwrappedDest(cx, GetUnwrappedDest(cx, state));
+ if (!unwrappedDest) {
+ return false;
+ }
+
+ if (!OnDestErrored(cx, state, unwrappedDest)) {
+ return false;
+ }
+
+ args.rval().setUndefined();
+ return true;
+}
+
+template <class StreamAccessor, class Stream>
+static inline JSObject* GetClosedPromise(
+ JSContext* cx, Handle<Stream*> unwrappedStream,
+ StreamAccessor* (&unwrapAccessorFromStream)(JSContext*, Handle<Stream*>)) {
+ StreamAccessor* unwrappedAccessor =
+ unwrapAccessorFromStream(cx, unwrappedStream);
+ if (!unwrappedAccessor) {
+ return nullptr;
+ }
+
+ return unwrappedAccessor->closedPromise();
+}
+
+static MOZ_MUST_USE bool ReadFromSource(JSContext* cx,
+ Handle<PipeToState*> state);
+
+static bool ReadFulfilled(JSContext* cx, Handle<PipeToState*> state,
+ Handle<JSObject*> result) {
+ cx->check(state);
+ cx->check(result);
+
+ state->clearPendingRead();
+
+ // "Shutdown must stop activity: if shuttingDown becomes true, the user agent
+ // must not initiate further reads from reader, and must only perform writes
+ // of already-read chunks".
+ //
+ // We may reach this point after |On{Source,Dest}{Clos,Error}ed| has responded
+ // to an out-of-band change. Per the comment in |OnSourceErrored|, we want to
+ // allow the implicated shutdown to proceed, and we don't want to interfere
+ // with or additionally alter its operation. Particularly, we don't want to
+ // queue up the successfully-read chunk (if there was one, and this isn't just
+ // reporting "done") to be written: it wasn't "already-read" when that
+ // error/closure happened.
+ //
+ // All specified reactions to a closure/error invoke either the shutdown, or
+ // shutdown with an action, algorithms. Those algorithms each abort if either
+ // shutdown algorithm has already been invoked. So we check for shutdown here
+ // in case of asynchronous closure/error and abort if shutdown has already
+ // started (and possibly finished).
+ if (state->shuttingDown()) {
+ return true;
+ }
+
+ {
+ bool done;
+ {
+ Rooted<Value> doneVal(cx);
+ if (!GetProperty(cx, result, result, cx->names().done, &doneVal)) {
+ return false;
+ }
+ done = doneVal.toBoolean();
+ }
+
+ if (done) {
+ // All chunks have been read from |reader| and written to |writer| (but
+ // not necessarily fulfilled yet, in the latter case). Proceed as if
+ // |source| is now closed. (This will asynchronously wait until any
+ // pending writes have fulfilled.)
+ return OnSourceClosed(cx, state);
+ }
+ }
+
+ // A chunk was read, and *at the time the read was requested*, |dest| was
+ // ready to accept a write. (Only one read is processed at a time per
+ // |state->hasPendingRead()|, so this condition remains true now.) Write the
+ // chunk to |dest|.
+ {
+ Rooted<Value> chunk(cx);
+ if (!GetProperty(cx, result, result, cx->names().value, &chunk)) {
+ return false;
+ }
+
+ Rooted<WritableStreamDefaultWriter*> writer(cx, state->writer());
+ cx->check(writer);
+
+ PromiseObject* writeRequest =
+ WritableStreamDefaultWriterWrite(cx, writer, chunk);
+ if (!writeRequest) {
+ return false;
+ }
+
+ // Stash away this new last write request. (The shutdown process will react
+ // to this write request to finish shutdown only once all pending writes are
+ // completed.)
+ state->updateLastWriteRequest(writeRequest);
+ }
+
+ // Read another chunk if this write didn't fill up |dest|.
+ //
+ // While we (properly) ignored |state->shuttingDown()| earlier, this call will
+ // *not* initiate a fresh read if |!state->shuttingDown()|.
+ return ReadFromSource(cx, state);
+}
+
+static bool ReadFulfilled(JSContext* cx, unsigned argc, Value* vp) {
+ CallArgs args = CallArgsFromVp(argc, vp);
+ MOZ_ASSERT(args.length() == 1);
+
+ Rooted<PipeToState*> state(cx, TargetFromHandler<PipeToState>(args));
+ cx->check(state);
+
+ Rooted<JSObject*> result(cx, &args[0].toObject());
+ cx->check(result);
+
+ if (!ReadFulfilled(cx, state, result)) {
+ return false;
+ }
+
+ args.rval().setUndefined();
+ return true;
+}
+
+static bool ReadFromSource(JSContext* cx, unsigned argc, Value* vp);
+
+static MOZ_MUST_USE bool ReadFromSource(JSContext* cx,
+ Handle<PipeToState*> state) {
+ cx->check(state);
+
+ MOZ_ASSERT(!state->hasPendingRead(),
+ "should only have one read in flight at a time, because multiple "
+ "reads could cause the latter read to ignore backpressure "
+ "signals");
+
+ // "Shutdown must stop activity: if shuttingDown becomes true, the user agent
+ // must not initiate further reads from reader..."
+ if (state->shuttingDown()) {
+ return true;
+ }
+
+ Rooted<WritableStreamDefaultWriter*> writer(cx, state->writer());
+ cx->check(writer);
+
+ // "While WritableStreamDefaultWriterGetDesiredSize(writer) is ≤ 0 or is null,
+ // the user agent must not read from reader."
+ Rooted<Value> desiredSize(cx);
+ if (!WritableStreamDefaultWriterGetDesiredSize(cx, writer, &desiredSize)) {
+ return false;
+ }
+
+ // If we're in the middle of erroring or are fully errored, either way the
+ // |dest|-closed reaction queued up in |StartPiping| will do the right
+ // thing, so do nothing here.
+ if (desiredSize.isNull()) {
+#ifdef DEBUG
+ {
+ WritableStream* unwrappedDest = GetUnwrappedDest(cx, state);
+ if (!unwrappedDest) {
+ return false;
+ }
+
+ MOZ_ASSERT(unwrappedDest->erroring() || unwrappedDest->errored());
+ }
+#endif
+
+ return true;
+ }
+
+ // If |dest| isn't ready to receive writes yet (i.e. backpressure applies),
+ // resume when it is.
+ MOZ_ASSERT(desiredSize.isNumber());
+ if (desiredSize.toNumber() <= 0) {
+ Rooted<JSObject*> readyPromise(cx, writer->readyPromise());
+ cx->check(readyPromise);
+
+ Rooted<JSFunction*> readFromSource(cx,
+ NewHandler(cx, ReadFromSource, state));
+ if (!readFromSource) {
+ return false;
+ }
+
+ // Resume when there's writable capacity. Don't bother handling rejection:
+ // if this happens, the stream is going to be errored shortly anyway, and
+ // |StartPiping| has us ready to react to that already.
+ //
+ // XXX Double-check the claim that we need not handle rejections and that a
+ // rejection of [[readyPromise]] *necessarily* is always followed by
+ // rejection of [[closedPromise]].
+ return JS::AddPromiseReactionsIgnoringUnhandledRejection(
+ cx, readyPromise, readFromSource, nullptr);
+ }
+
+ // |dest| is ready to receive at least one write. Read one chunk from the
+ // reader now that we're not subject to backpressure.
+ Rooted<ReadableStreamDefaultReader*> reader(cx, state->reader());
+ cx->check(reader);
+
+ Rooted<PromiseObject*> readRequest(
+ cx, js::ReadableStreamDefaultReaderRead(cx, reader));
+ if (!readRequest) {
+ return false;
+ }
+
+ Rooted<JSFunction*> readFulfilled(cx, NewHandler(cx, ReadFulfilled, state));
+ if (!readFulfilled) {
+ return false;
+ }
+
+#ifdef DEBUG
+ MOZ_ASSERT(!state->pendingReadWouldBeRejected());
+
+ // The specification for ReadableStreamError ensures that rejecting a read or
+ // read-into request is immediately followed by rejecting the reader's
+ // [[closedPromise]]. Therefore, it does not appear *necessary* to handle the
+ // rejected case -- the [[closedPromise]] reaction will do so for us.
+ //
+ // However, this is all very stateful and gnarly, so we implement a rejection
+ // handler that sets a flag to indicate the read was rejected. Then if the
+ // [[closedPromise]] reaction function is invoked, we can assert that *if*
+ // a read is recorded as pending at that instant, a reject handler would have
+ // been invoked for it.
+ auto ReadRejected = [](JSContext* cx, unsigned argc, Value* vp) {
+ CallArgs args = CallArgsFromVp(argc, vp);
+ MOZ_ASSERT(args.length() == 1);
+
+ Rooted<PipeToState*> state(cx, TargetFromHandler<PipeToState>(args));
+ cx->check(state);
+
+ state->setPendingReadWouldBeRejected();
+
+ args.rval().setUndefined();
+ return true;
+ };
+
+ Rooted<JSFunction*> readRejected(cx, NewHandler(cx, ReadRejected, state));
+ if (!readRejected) {
+ return false;
+ }
+#else
+ auto readRejected = nullptr;
+#endif
+
+ // Once the chunk is read, immediately write it and attempt to read more.
+ // Don't bother handling a rejection: |source| will be closed/errored, and
+ // |StartPiping| poised us to react to that already.
+ if (!JS::AddPromiseReactionsIgnoringUnhandledRejection(
+ cx, readRequest, readFulfilled, readRejected)) {
+ return false;
+ }
+
+ // The spec is clear that a write started before an error/stream-closure is
+ // encountered must be completed before shutdown. It is *not* clear that a
+ // read that hasn't yet fulfilled should delay shutdown (or until that read's
+ // successive write is completed).
+ //
+ // It seems easiest to explain, both from a user perspective (no read is ever
+ // just dropped on the ground) and an implementer perspective (if we *don't*
+ // delay, then a read could be started, a shutdown could be started, then the
+ // read could finish but we can't write it which arguably conflicts with the
+ // requirement that chunks that have been read must be written before shutdown
+ // completes), to delay. XXX file a spec issue to require this!
+ state->setPendingRead();
+ return true;
+}
+
+static bool ReadFromSource(JSContext* cx, unsigned argc, Value* vp) {
+ CallArgs args = CallArgsFromVp(argc, vp);
+
+ Rooted<PipeToState*> state(cx, TargetFromHandler<PipeToState>(args));
+ cx->check(state);
+
+ if (!ReadFromSource(cx, state)) {
+ return false;
+ }
+
+ args.rval().setUndefined();
+ return true;
+}
+
+static MOZ_MUST_USE bool StartPiping(JSContext* cx, Handle<PipeToState*> state,
+ Handle<ReadableStream*> unwrappedSource,
+ Handle<WritableStream*> unwrappedDest) {
+ cx->check(state);
+
+ // "Shutdown must stop activity: if shuttingDown becomes true, the user agent
+ // must not initiate further reads from reader..."
+ MOZ_ASSERT(!state->shuttingDown(), "can't be shutting down when starting");
+
+ // "Error and close states must be propagated: the following conditions must
+ // be applied in order."
+ //
+ // Before piping has started, we have to check for source/dest being errored
+ // or closed manually.
+ bool erroredOrClosed;
+ if (!SourceOrDestErroredOrClosed(cx, state, unwrappedSource, unwrappedDest,
+ &erroredOrClosed)) {
+ return false;
+ }
+ if (erroredOrClosed) {
+ return true;
+ }
+
+ // *After* piping has started, add reactions to respond to source/dest
+ // becoming errored or closed.
+ {
+ Rooted<JSObject*> unwrappedClosedPromise(cx);
+ Rooted<JSObject*> onClosed(cx);
+ Rooted<JSObject*> onErrored(cx);
+
+ auto ReactWhenClosedOrErrored =
+ [&unwrappedClosedPromise, &onClosed, &onErrored, &state](
+ JSContext* cx, JSNative onClosedFunc, JSNative onErroredFunc) {
+ onClosed = NewHandler(cx, onClosedFunc, state);
+ if (!onClosed) {
+ return false;
+ }
+
+ onErrored = NewHandler(cx, onErroredFunc, state);
+ if (!onErrored) {
+ return false;
+ }
+
+ return JS::AddPromiseReactions(cx, unwrappedClosedPromise, onClosed,
+ onErrored);
+ };
+
+ unwrappedClosedPromise =
+ GetClosedPromise(cx, unwrappedSource, UnwrapReaderFromStream);
+ if (!unwrappedClosedPromise) {
+ return false;
+ }
+
+ if (!ReactWhenClosedOrErrored(cx, OnSourceClosed, OnSourceErrored)) {
+ return false;
+ }
+
+ unwrappedClosedPromise =
+ GetClosedPromise(cx, unwrappedDest, UnwrapWriterFromStream);
+ if (!unwrappedClosedPromise) {
+ return false;
+ }
+
+ if (!ReactWhenClosedOrErrored(cx, OnDestClosed, OnDestErrored)) {
+ return false;
+ }
+ }
+
+ return ReadFromSource(cx, state);
+}
+
+/**
+ * Stream spec, 4.8.1. ReadableStreamPipeTo ( source, dest,
+ * preventClose, preventAbort,
+ * preventCancel[, signal] )
+ * Step 14.1 abortAlgorithm.
+ */
+static MOZ_MUST_USE bool PerformAbortAlgorithm(JSContext* cx,
+ Handle<PipeToState*> state) {
+ cx->check(state);
+
+ // Step 14.1: Let abortAlgorithm be the following steps:
+ // Step 14.1.1: Let error be a new "AbortError" DOMException.
+ // Step 14.1.2: Let actions be an empty ordered set.
+ // Step 14.1.3: If preventAbort is false, append the following action to
+ // actions:
+ // Step 14.1.3.1: If dest.[[state]] is "writable", return
+ // ! WritableStreamAbort(dest, error).
+ // Step 14.1.3.2: Otherwise, return a promise resolved with undefined.
+ // Step 14.1.4: If preventCancel is false, append the following action action
+ // to actions:
+ // Step 14.1.4.1: If source.[[state]] is "readable", return
+ // ! ReadableStreamCancel(source, error).
+ // Step 14.1.4.2: Otherwise, return a promise resolved with undefined.
+ // Step 14.1.5: Shutdown with an action consisting of getting a promise to
+ // wait for all of the actions in actions, and with error.
+ // XXX jwalden
+ JS_ReportErrorNumberASCII(cx, GetErrorMessage, nullptr,
+ JSMSG_READABLESTREAM_METHOD_NOT_IMPLEMENTED,
+ "abortAlgorithm steps");
+ return false;
+}
+
+/**
+ * Stream spec, 3.4.11. ReadableStreamPipeTo ( source, dest,
+ * preventClose, preventAbort,
+ * preventCancel, signal )
+ * Steps 4-11, 13-14.
+ */
+/* static */ PipeToState* PipeToState::create(
+ JSContext* cx, Handle<PromiseObject*> promise,
+ Handle<ReadableStream*> unwrappedSource,
+ Handle<WritableStream*> unwrappedDest, bool preventClose, bool preventAbort,
+ bool preventCancel, Handle<JSObject*> signal) {
+ cx->check(promise);
+ cx->check(signal);
+
+ Rooted<PipeToState*> state(cx,
+ NewTenuredBuiltinClassInstance<PipeToState>(cx));
+ if (!state) {
+ return nullptr;
+ }
+
+ // Step 4. Assert: signal is undefined or signal is an instance of the
+ // AbortSignal interface.
+ MOZ_ASSERT(state->getFixedSlot(Slot_Signal).isUndefined());
+ if (signal) {
+ // |signal| is double-checked to be an |AbortSignal| further down.
+ state->initFixedSlot(Slot_Signal, ObjectValue(*signal));
+ }
+
+ // Step 5: Assert: ! IsReadableStreamLocked(source) is false.
+ MOZ_ASSERT(!unwrappedSource->locked());
+
+ // Step 6: Assert: ! IsWritableStreamLocked(dest) is false.
+ MOZ_ASSERT(!unwrappedDest->isLocked());
+
+ MOZ_ASSERT(state->getFixedSlot(Slot_Promise).isUndefined());
+ state->initFixedSlot(Slot_Promise, ObjectValue(*promise));
+
+ // Step 7: If ! IsReadableByteStreamController(
+ // source.[[readableStreamController]]) is true, let reader
+ // be either ! AcquireReadableStreamBYOBReader(source) or
+ // ! AcquireReadableStreamDefaultReader(source), at the user agent’s
+ // discretion.
+ // Step 8: Otherwise, let reader be
+ // ! AcquireReadableStreamDefaultReader(source).
+ // We don't implement byte streams, so we always acquire a default reader.
+ {
+ ReadableStreamDefaultReader* reader = CreateReadableStreamDefaultReader(
+ cx, unwrappedSource, ForAuthorCodeBool::No);
+ if (!reader) {
+ return nullptr;
+ }
+
+ MOZ_ASSERT(state->getFixedSlot(Slot_Reader).isUndefined());
+ state->initFixedSlot(Slot_Reader, ObjectValue(*reader));
+ }
+
+ // Step 9: Let writer be ! AcquireWritableStreamDefaultWriter(dest).
+ {
+ WritableStreamDefaultWriter* writer =
+ CreateWritableStreamDefaultWriter(cx, unwrappedDest);
+ if (!writer) {
+ return nullptr;
+ }
+
+ MOZ_ASSERT(state->getFixedSlot(Slot_Writer).isUndefined());
+ state->initFixedSlot(Slot_Writer, ObjectValue(*writer));
+ }
+
+ // Step 10: Set source.[[disturbed]] to true.
+ unwrappedSource->setDisturbed();
+
+ state->initFlags(preventClose, preventAbort, preventCancel);
+ MOZ_ASSERT(state->preventClose() == preventClose);
+ MOZ_ASSERT(state->preventAbort() == preventAbort);
+ MOZ_ASSERT(state->preventCancel() == preventCancel);
+
+ // Step 11: Let shuttingDown be false.
+ MOZ_ASSERT(!state->shuttingDown(), "should be set to false by initFlags");
+
+ // Step 12 ("Let promise be a new promise.") was performed by the caller and
+ // |promise| was its result.
+
+ // XXX This used to be step 13 but is now step 14, all the step-comments of
+ // the overall algorithm need renumbering.
+ // Step 13: If signal is not undefined,
+ if (signal) {
+ // Step 14.2: If signal’s aborted flag is set, perform abortAlgorithm and
+ // return promise.
+ bool aborted;
+ {
+ // Sadly, we can't assert |signal| is an |AbortSignal| here because it
+ // could have become a nuked CCW since it was type-checked.
+ JSObject* unwrappedSignal = UnwrapSignalFromPipeToState(cx, state);
+ if (!unwrappedSignal) {
+ return nullptr;
+ }
+
+ JSRuntime* rt = cx->runtime();
+ MOZ_ASSERT(unwrappedSignal->hasClass(rt->maybeAbortSignalClass()));
+
+ AutoRealm ar(cx, unwrappedSignal);
+ aborted = rt->abortSignalIsAborted(unwrappedSignal);
+ }
+ if (aborted) {
+ if (!PerformAbortAlgorithm(cx, state)) {
+ return nullptr;
+ }
+
+ // Returning |state| here will cause |promise| to be returned by the
+ // overall algorithm.
+ return state;
+ }
+
+ // Step 14.3: Add abortAlgorithm to signal.
+ // XXX jwalden need JSAPI to add an algorithm/steps to an AbortSignal
+ JS_ReportErrorNumberASCII(cx, GetErrorMessage, nullptr,
+ JSMSG_READABLESTREAM_METHOD_NOT_IMPLEMENTED,
+ "adding abortAlgorithm to signal");
+ return nullptr;
+ }
+
+ // Step 14: In parallel, using reader and writer, read all chunks from source
+ // and write them to dest.
+ if (!StartPiping(cx, state, unwrappedSource, unwrappedDest)) {
+ return nullptr;
+ }
+
+ return state;
+}
+
+const JSClass PipeToState::class_ = {"PipeToState",
+ JSCLASS_HAS_RESERVED_SLOTS(SlotCount)};