diff options
Diffstat (limited to 'dom/streams/ReadableStreamPipeTo.cpp')
-rw-r--r-- | dom/streams/ReadableStreamPipeTo.cpp | 977 |
1 files changed, 977 insertions, 0 deletions
diff --git a/dom/streams/ReadableStreamPipeTo.cpp b/dom/streams/ReadableStreamPipeTo.cpp new file mode 100644 index 0000000000..45cc79b0b2 --- /dev/null +++ b/dom/streams/ReadableStreamPipeTo.cpp @@ -0,0 +1,977 @@ +/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* vim:set ts=2 sw=2 sts=2 et cindent: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "ReadableStreamPipeTo.h" + +#include "mozilla/dom/AbortFollower.h" +#include "mozilla/dom/AbortSignal.h" +#include "mozilla/dom/ReadableStream.h" +#include "mozilla/dom/ReadableStreamDefaultReader.h" +#include "mozilla/dom/WritableStream.h" +#include "mozilla/dom/WritableStreamDefaultWriter.h" +#include "mozilla/dom/Promise.h" +#include "mozilla/dom/Promise-inl.h" +#include "mozilla/dom/PromiseNativeHandler.h" +#include "mozilla/AlreadyAddRefed.h" +#include "mozilla/ErrorResult.h" +#include "nsCycleCollectionParticipant.h" +#include "nsISupportsImpl.h" + +#include "js/Exception.h" + +namespace mozilla::dom { + +using namespace streams_abstract; + +struct PipeToReadRequest; +class WriteFinishedPromiseHandler; +class ShutdownActionFinishedPromiseHandler; + +// https://streams.spec.whatwg.org/#readable-stream-pipe-to (Steps 14-15.) +// +// This class implements everything that is required to read all chunks from +// the reader (source) and write them to writer (destination), while +// following the constraints given in the spec using our implementation-defined +// behavior. +// +// The cycle-collected references look roughly like this: +// clang-format off +// +// Closed promise <-- ReadableStreamDefaultReader <--> ReadableStream +// | ^ | +// |(PromiseHandler) |(mReader) |(ReadRequest) +// | | | +// |-------------> PipeToPump <------- +// ^ | | +// |---------------| | | +// | | |-------(mLastWrite) --------> +// |(PromiseHandler) | |< ---- (PromiseHandler) ---- Promise +// | | ^ +// | |(mWriter) |(mWriteRequests) +// | v | +// Closed promise <-- WritableStreamDefaultWriter <--------> WritableStream +// +// clang-format on +class PipeToPump final : public AbortFollower { + NS_DECL_CYCLE_COLLECTING_ISUPPORTS + NS_DECL_CYCLE_COLLECTION_CLASS(PipeToPump) + + friend struct PipeToReadRequest; + friend class WriteFinishedPromiseHandler; + friend class ShutdownActionFinishedPromiseHandler; + + PipeToPump(Promise* aPromise, ReadableStreamDefaultReader* aReader, + WritableStreamDefaultWriter* aWriter, bool aPreventClose, + bool aPreventAbort, bool aPreventCancel) + : mPromise(aPromise), + mReader(aReader), + mWriter(aWriter), + mPreventClose(aPreventClose), + mPreventAbort(aPreventAbort), + mPreventCancel(aPreventCancel) {} + + MOZ_CAN_RUN_SCRIPT void Start(JSContext* aCx, AbortSignal* aSignal); + + MOZ_CAN_RUN_SCRIPT_BOUNDARY void RunAbortAlgorithm() override; + + private: + ~PipeToPump() override = default; + + MOZ_CAN_RUN_SCRIPT void PerformAbortAlgorithm(JSContext* aCx, + AbortSignalImpl* aSignal); + + MOZ_CAN_RUN_SCRIPT bool SourceOrDestErroredOrClosed(JSContext* aCx); + + using ShutdownAction = already_AddRefed<Promise> (*)( + JSContext*, PipeToPump*, JS::Handle<mozilla::Maybe<JS::Value>>, + ErrorResult&); + + MOZ_CAN_RUN_SCRIPT void ShutdownWithAction( + JSContext* aCx, ShutdownAction aAction, + JS::Handle<mozilla::Maybe<JS::Value>> aError); + MOZ_CAN_RUN_SCRIPT void ShutdownWithActionAfterFinishedWrite( + JSContext* aCx, ShutdownAction aAction, + JS::Handle<mozilla::Maybe<JS::Value>> aError); + + MOZ_CAN_RUN_SCRIPT void Shutdown( + JSContext* aCx, JS::Handle<mozilla::Maybe<JS::Value>> aError); + + void Finalize(JSContext* aCx, JS::Handle<mozilla::Maybe<JS::Value>> aError); + + MOZ_CAN_RUN_SCRIPT void OnReadFulfilled(JSContext* aCx, + JS::Handle<JS::Value> aChunk, + ErrorResult& aRv); + MOZ_CAN_RUN_SCRIPT void OnWriterReady(JSContext* aCx, JS::Handle<JS::Value>); + MOZ_CAN_RUN_SCRIPT void Read(JSContext* aCx); + + MOZ_CAN_RUN_SCRIPT void OnSourceClosed(JSContext* aCx, JS::Handle<JS::Value>); + MOZ_CAN_RUN_SCRIPT void OnSourceErrored( + JSContext* aCx, JS::Handle<JS::Value> aSourceStoredError); + + MOZ_CAN_RUN_SCRIPT void OnDestClosed(JSContext* aCx, JS::Handle<JS::Value>); + MOZ_CAN_RUN_SCRIPT void OnDestErrored(JSContext* aCx, + JS::Handle<JS::Value> aDestStoredError); + + RefPtr<Promise> mPromise; + RefPtr<ReadableStreamDefaultReader> mReader; + RefPtr<WritableStreamDefaultWriter> mWriter; + RefPtr<Promise> mLastWritePromise; + const bool mPreventClose; + const bool mPreventAbort; + const bool mPreventCancel; + bool mShuttingDown = false; +#ifdef DEBUG + bool mReadChunk = false; +#endif +}; + +// This is a helper class for PipeToPump that allows it to attach +// member functions as promise handlers. +class PipeToPumpHandler final : public PromiseNativeHandler { + virtual ~PipeToPumpHandler() = default; + + using FunPtr = void (PipeToPump::*)(JSContext*, JS::Handle<JS::Value>); + + RefPtr<PipeToPump> mPipeToPump; + FunPtr mResolved; + FunPtr mRejected; + + public: + NS_DECL_CYCLE_COLLECTING_ISUPPORTS + NS_DECL_CYCLE_COLLECTION_CLASS(PipeToPumpHandler) + + explicit PipeToPumpHandler(PipeToPump* aPipeToPump, FunPtr aResolved, + FunPtr aRejected) + : mPipeToPump(aPipeToPump), mResolved(aResolved), mRejected(aRejected) {} + + void ResolvedCallback(JSContext* aCx, JS::Handle<JS::Value> aValue, + ErrorResult&) override { + if (mResolved) { + (mPipeToPump->*mResolved)(aCx, aValue); + } + } + + void RejectedCallback(JSContext* aCx, JS::Handle<JS::Value> aReason, + ErrorResult&) override { + if (mRejected) { + (mPipeToPump->*mRejected)(aCx, aReason); + } + } +}; + +NS_IMPL_CYCLE_COLLECTION(PipeToPumpHandler, mPipeToPump) +NS_IMPL_CYCLE_COLLECTING_ADDREF(PipeToPumpHandler) +NS_IMPL_CYCLE_COLLECTING_RELEASE(PipeToPumpHandler) +NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(PipeToPumpHandler) + NS_INTERFACE_MAP_ENTRY(nsISupports) +NS_INTERFACE_MAP_END + +void PipeToPump::RunAbortAlgorithm() { + AutoJSAPI jsapi; + if (!jsapi.Init(mReader->GetStream()->GetParentObject())) { + NS_WARNING( + "Failed to initialize AutoJSAPI in PipeToPump::RunAbortAlgorithm"); + return; + } + JSContext* cx = jsapi.cx(); + + RefPtr<AbortSignalImpl> signal = Signal(); + PerformAbortAlgorithm(cx, signal); +} + +void PipeToPump::PerformAbortAlgorithm(JSContext* aCx, + AbortSignalImpl* aSignal) { + MOZ_ASSERT(aSignal->Aborted()); + + // https://streams.spec.whatwg.org/#readable-stream-pipe-to + // Step 14.1. Let abortAlgorithm be the following steps: + // Note: All the following steps are 14.1.xx + + // Step 1. Let error be signal’s abort reason. + JS::Rooted<JS::Value> error(aCx); + aSignal->GetReason(aCx, &error); + + auto action = [](JSContext* aCx, PipeToPump* aPipeToPump, + JS::Handle<mozilla::Maybe<JS::Value>> aError, + ErrorResult& aRv) MOZ_CAN_RUN_SCRIPT { + JS::Rooted<JS::Value> error(aCx, *aError); + + // Step 2. Let actions be an empty ordered set. + nsTArray<RefPtr<Promise>> actions; + + // Step 3. If preventAbort is false, append the following action to actions: + if (!aPipeToPump->mPreventAbort) { + RefPtr<WritableStream> dest = aPipeToPump->mWriter->GetStream(); + + // Step 3.1. If dest.[[state]] is "writable", return ! + // WritableStreamAbort(dest, error). + if (dest->State() == WritableStream::WriterState::Writable) { + RefPtr<Promise> p = WritableStreamAbort(aCx, dest, error, aRv); + if (aRv.Failed()) { + return already_AddRefed<Promise>(); + } + actions.AppendElement(p); + } + + // Step 3.2. Otherwise, return a promise resolved with undefined. + // Note: This is basically a no-op. + } + + // Step 4. If preventCancel is false, append the following action action to + // actions: + if (!aPipeToPump->mPreventCancel) { + RefPtr<ReadableStream> source = aPipeToPump->mReader->GetStream(); + + // Step 4.1. If source.[[state]] is "readable", return ! + // ReadableStreamCancel(source, error). + if (source->State() == ReadableStream::ReaderState::Readable) { + RefPtr<Promise> p = ReadableStreamCancel(aCx, source, error, aRv); + if (aRv.Failed()) { + return already_AddRefed<Promise>(); + } + actions.AppendElement(p); + } + + // Step 4.2. Otherwise, return a promise resolved with undefined. + // No-op again. + } + + // Step 5. .. action consisting of getting a promise to wait for + // all of the actions in actions ... + return Promise::All(aCx, actions, aRv); + }; + + // Step 5. Shutdown with an action consisting of getting a promise to wait for + // all of the actions in actions, and with error. + JS::Rooted<Maybe<JS::Value>> someError(aCx, Some(error.get())); + ShutdownWithAction(aCx, action, someError); +} + +bool PipeToPump::SourceOrDestErroredOrClosed(JSContext* aCx) { + // (Constraint) Error and close states must be propagated: + // the following conditions must be applied in order. + RefPtr<ReadableStream> source = mReader->GetStream(); + RefPtr<WritableStream> dest = mWriter->GetStream(); + + // Step 1. Errors must be propagated forward: if source.[[state]] is or + // becomes "errored", then + if (source->State() == ReadableStream::ReaderState::Errored) { + JS::Rooted<JS::Value> storedError(aCx, source->StoredError()); + OnSourceErrored(aCx, storedError); + return true; + } + + // Step 2. Errors must be propagated backward: if dest.[[state]] is or becomes + // "errored", then + if (dest->State() == WritableStream::WriterState::Errored) { + JS::Rooted<JS::Value> storedError(aCx, dest->StoredError()); + OnDestErrored(aCx, storedError); + return true; + } + + // Step 3. Closing must be propagated forward: if source.[[state]] is or + // becomes "closed", then + if (source->State() == ReadableStream::ReaderState::Closed) { + OnSourceClosed(aCx, JS::UndefinedHandleValue); + return true; + } + + // Step 4. Closing must be propagated backward: + // if ! WritableStreamCloseQueuedOrInFlight(dest) is true + // or dest.[[state]] is "closed", then + if (dest->CloseQueuedOrInFlight() || + dest->State() == WritableStream::WriterState::Closed) { + OnDestClosed(aCx, JS::UndefinedHandleValue); + return true; + } + + return false; +} + +// https://streams.spec.whatwg.org/#readable-stream-pipe-to +// Steps 14-15. +void PipeToPump::Start(JSContext* aCx, AbortSignal* aSignal) { + // Step 14. If signal is not undefined, + if (aSignal) { + // Step 14.1. Let abortAlgorithm be the following steps: + // ... This is implemented by RunAbortAlgorithm. + + // Step 14.2. If signal is aborted, perform abortAlgorithm and + // return promise. + if (aSignal->Aborted()) { + PerformAbortAlgorithm(aCx, aSignal); + return; + } + + // Step 14.3. Add abortAlgorithm to signal. + Follow(aSignal); + } + + // Step 15. In parallel but not really; see #905, using reader and writer, + // read all chunks from source and write them to dest. + // Due to the locking provided by the reader and writer, + // the exact manner in which this happens is not observable to author code, + // and so there is flexibility in how this is done. + + // (Constraint) Error and close states must be propagated + + // Before piping has started, we have to check for source/destination being + // errored/closed manually. + if (SourceOrDestErroredOrClosed(aCx)) { + return; + } + + // We use the following two promises to propagate error and close states + // during piping. + RefPtr<Promise> readerClosed = mReader->ClosedPromise(); + readerClosed->AppendNativeHandler(new PipeToPumpHandler( + this, &PipeToPump::OnSourceClosed, &PipeToPump::OnSourceErrored)); + + // Note: Because we control the destination/writer it should never be closed + // after we did the initial check above with SourceOrDestErroredOrClosed. + RefPtr<Promise> writerClosed = mWriter->ClosedPromise(); + writerClosed->AppendNativeHandler(new PipeToPumpHandler( + this, &PipeToPump::OnDestClosed, &PipeToPump::OnDestErrored)); + + Read(aCx); +} + +class WriteFinishedPromiseHandler final : public PromiseNativeHandler { + RefPtr<PipeToPump> mPipeToPump; + PipeToPump::ShutdownAction mAction; + bool mHasError; + JS::Heap<JS::Value> mError; + + virtual ~WriteFinishedPromiseHandler() { mozilla::DropJSObjects(this); }; + + public: + NS_DECL_CYCLE_COLLECTING_ISUPPORTS + NS_DECL_CYCLE_COLLECTION_SCRIPT_HOLDER_CLASS(WriteFinishedPromiseHandler) + + explicit WriteFinishedPromiseHandler( + JSContext* aCx, PipeToPump* aPipeToPump, + PipeToPump::ShutdownAction aAction, + JS::Handle<mozilla::Maybe<JS::Value>> aError) + : mPipeToPump(aPipeToPump), mAction(aAction) { + mHasError = aError.isSome(); + if (mHasError) { + mError = *aError; + } + mozilla::HoldJSObjects(this); + } + + MOZ_CAN_RUN_SCRIPT void WriteFinished(JSContext* aCx) { + RefPtr<PipeToPump> pipeToPump = mPipeToPump; // XXX known-live? + JS::Rooted<Maybe<JS::Value>> error(aCx); + if (mHasError) { + error = Some(mError); + } + pipeToPump->ShutdownWithActionAfterFinishedWrite(aCx, mAction, error); + } + + MOZ_CAN_RUN_SCRIPT void ResolvedCallback(JSContext* aCx, + JS::Handle<JS::Value> aValue, + ErrorResult&) override { + WriteFinished(aCx); + } + + MOZ_CAN_RUN_SCRIPT void RejectedCallback(JSContext* aCx, + JS::Handle<JS::Value> aReason, + ErrorResult&) override { + WriteFinished(aCx); + } +}; + +NS_IMPL_CYCLE_COLLECTION_WITH_JS_MEMBERS(WriteFinishedPromiseHandler, + (mPipeToPump), (mError)) +NS_IMPL_CYCLE_COLLECTING_ADDREF(WriteFinishedPromiseHandler) +NS_IMPL_CYCLE_COLLECTING_RELEASE(WriteFinishedPromiseHandler) +NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(WriteFinishedPromiseHandler) + NS_INTERFACE_MAP_ENTRY(nsISupports) +NS_INTERFACE_MAP_END + +// https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action +// Shutdown with an action: if any of the above requirements ask to shutdown +// with an action action, optionally with an error originalError, then: +void PipeToPump::ShutdownWithAction( + JSContext* aCx, ShutdownAction aAction, + JS::Handle<mozilla::Maybe<JS::Value>> aError) { + // Step 1. If shuttingDown is true, abort these substeps. + if (mShuttingDown) { + return; + } + + // Step 2. Set shuttingDown to true. + mShuttingDown = true; + + // Step 3. If dest.[[state]] is "writable" and ! + // WritableStreamCloseQueuedOrInFlight(dest) is false, + RefPtr<WritableStream> dest = mWriter->GetStream(); + if (dest->State() == WritableStream::WriterState::Writable && + !dest->CloseQueuedOrInFlight()) { + // Step 3.1. If any chunks have been read but not yet written, write them to + // dest. + // Step 3.2. Wait until every chunk that has been read has been + // written (i.e. the corresponding promises have settled). + // + // Note: Write requests are processed in order, so when the promise + // for the last written chunk is settled all previous chunks have been + // written as well. + if (mLastWritePromise) { + mLastWritePromise->AppendNativeHandler( + new WriteFinishedPromiseHandler(aCx, this, aAction, aError)); + return; + } + } + + // Don't have to wait for last write, immediately continue. + ShutdownWithActionAfterFinishedWrite(aCx, aAction, aError); +} + +class ShutdownActionFinishedPromiseHandler final : public PromiseNativeHandler { + RefPtr<PipeToPump> mPipeToPump; + bool mHasError; + JS::Heap<JS::Value> mError; + + virtual ~ShutdownActionFinishedPromiseHandler() { + mozilla::DropJSObjects(this); + } + + public: + NS_DECL_CYCLE_COLLECTING_ISUPPORTS + NS_DECL_CYCLE_COLLECTION_SCRIPT_HOLDER_CLASS( + ShutdownActionFinishedPromiseHandler) + + explicit ShutdownActionFinishedPromiseHandler( + JSContext* aCx, PipeToPump* aPipeToPump, + JS::Handle<mozilla::Maybe<JS::Value>> aError) + : mPipeToPump(aPipeToPump) { + mHasError = aError.isSome(); + if (mHasError) { + mError = *aError; + } + mozilla::HoldJSObjects(this); + } + + void ResolvedCallback(JSContext* aCx, JS::Handle<JS::Value> aValue, + ErrorResult&) override { + // https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action + // Step 5. Upon fulfillment of p, finalize, passing along originalError if + // it was given. + JS::Rooted<Maybe<JS::Value>> error(aCx); + if (mHasError) { + error = Some(mError); + } + mPipeToPump->Finalize(aCx, error); + } + + void RejectedCallback(JSContext* aCx, JS::Handle<JS::Value> aReason, + ErrorResult&) override { + // https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action + // Step 6. Upon rejection of p with reason newError, finalize with + // newError. + JS::Rooted<Maybe<JS::Value>> error(aCx, Some(aReason)); + mPipeToPump->Finalize(aCx, error); + } +}; + +NS_IMPL_CYCLE_COLLECTION_WITH_JS_MEMBERS(ShutdownActionFinishedPromiseHandler, + (mPipeToPump), (mError)) +NS_IMPL_CYCLE_COLLECTING_ADDREF(ShutdownActionFinishedPromiseHandler) +NS_IMPL_CYCLE_COLLECTING_RELEASE(ShutdownActionFinishedPromiseHandler) +NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(ShutdownActionFinishedPromiseHandler) + NS_INTERFACE_MAP_ENTRY(nsISupports) +NS_INTERFACE_MAP_END + +// https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action +// Continuation after Step 3. triggered a promise resolution. +void PipeToPump::ShutdownWithActionAfterFinishedWrite( + JSContext* aCx, ShutdownAction aAction, + JS::Handle<mozilla::Maybe<JS::Value>> aError) { + if (!aAction) { + // Used to implement shutdown without action. Finalize immediately. + Finalize(aCx, aError); + return; + } + + // Step 4. Let p be the result of performing action. + RefPtr<PipeToPump> thisRefPtr = this; + ErrorResult rv; + RefPtr<Promise> p = aAction(aCx, thisRefPtr, aError, rv); + + // Error while calling actions above, continue immediately with finalization. + if (rv.MaybeSetPendingException(aCx)) { + JS::Rooted<Maybe<JS::Value>> someError(aCx); + + JS::Rooted<JS::Value> error(aCx); + if (JS_GetPendingException(aCx, &error)) { + someError = Some(error.get()); + } + + JS_ClearPendingException(aCx); + + Finalize(aCx, someError); + return; + } + + // Steps 5-6. + p->AppendNativeHandler( + new ShutdownActionFinishedPromiseHandler(aCx, this, aError)); +} + +// https://streams.spec.whatwg.org/#rs-pipeTo-shutdown +// Shutdown: if any of the above requirements or steps ask to shutdown, +// optionally with an error error, then: +void PipeToPump::Shutdown(JSContext* aCx, + JS::Handle<mozilla::Maybe<JS::Value>> aError) { + // Note: We implement "shutdown" in terms of "shutdown with action". + // We can observe that when passing along an action that always succeeds + // shutdown with action and shutdown have the same behavior, when + // Ignoring the potential micro task for the promise that we skip anyway. + ShutdownWithAction(aCx, nullptr, aError); +} + +// https://streams.spec.whatwg.org/#rs-pipeTo-finalize +// Finalize: both forms of shutdown will eventually ask to finalize, +// optionally with an error error, which means to perform the following steps: +void PipeToPump::Finalize(JSContext* aCx, + JS::Handle<mozilla::Maybe<JS::Value>> aError) { + IgnoredErrorResult rv; + // Step 1. Perform ! WritableStreamDefaultWriterRelease(writer). + WritableStreamDefaultWriterRelease(aCx, mWriter); + + // Step 2. If reader implements ReadableStreamBYOBReader, + // perform ! ReadableStreamBYOBReaderRelease(reader). + // Note: We always use a default reader. + MOZ_ASSERT(!mReader->IsBYOB()); + + // Step 3. Otherwise, perform ! ReadableStreamDefaultReaderRelease(reader). + ReadableStreamDefaultReaderRelease(aCx, mReader, rv); + NS_WARNING_ASSERTION(!rv.Failed(), + "ReadableStreamReaderGenericRelease should not fail."); + + // Step 3. If signal is not undefined, remove abortAlgorithm from signal. + if (IsFollowing()) { + Unfollow(); + } + + // Step 4. If error was given, reject promise with error. + if (aError.isSome()) { + JS::Rooted<JS::Value> error(aCx, *aError); + mPromise->MaybeReject(error); + } else { + // Step 5. Otherwise, resolve promise with undefined. + mPromise->MaybeResolveWithUndefined(); + } + + // Remove all references. + mPromise = nullptr; + mReader = nullptr; + mWriter = nullptr; + mLastWritePromise = nullptr; + Unfollow(); +} + +void PipeToPump::OnReadFulfilled(JSContext* aCx, JS::Handle<JS::Value> aChunk, + ErrorResult& aRv) { + // (Constraint) Shutdown must stop activity: + // if shuttingDown becomes true, the user agent must not initiate further + // reads from reader, and must only perform writes of already-read chunks ... + // + // We may reach this point after |On{Source,Dest}{Clos,Error}ed| has responded + // to an out-of-band change. Per the comment in |OnSourceErrored|, we want to + // allow the implicated shutdown to proceed, and we don't want to interfere + // with or additionally alter its operation. Particularly, we don't want to + // queue up the successfully-read chunk (if there was one, and this isn't just + // reporting "done") to be written: it wasn't "already-read" when that + // error/closure happened. + // + // All specified reactions to a closure/error invoke either the shutdown, or + // shutdown with an action, algorithms. Those algorithms each abort if either + // shutdown algorithm has already been invoked. So we check for shutdown here + // in case of asynchronous closure/error and abort if shutdown has already + // started (and possibly finished). + // + // TODO: Implement the eventual resolution from + // https://github.com/whatwg/streams/issues/1207 + if (mShuttingDown) { + return; + } + + // Write asynchronously. Roughly this is like: + // `Promise.resolve().then(() => stream.write(chunk));` + // XXX: The spec currently does not require asynchronicity, but this still + // matches other engines' behavior. See + // https://github.com/whatwg/streams/issues/1243. + RefPtr<Promise> promise = + Promise::CreateInfallible(mWriter->GetParentObject()); + promise->MaybeResolveWithUndefined(); + auto result = promise->ThenWithCycleCollectedArgsJS( + [](JSContext* aCx, JS::Handle<JS::Value>, ErrorResult& aRv, + const RefPtr<PipeToPump>& aSelf, + const RefPtr<WritableStreamDefaultWriter>& aWriter, + JS::Handle<JS::Value> aChunk) + MOZ_CAN_RUN_SCRIPT_FOR_DEFINITION -> already_AddRefed<Promise> { + RefPtr<Promise> promise = + WritableStreamDefaultWriterWrite(aCx, aWriter, aChunk, aRv); + + // Last read has finished, so it's time to start reading again. + aSelf->Read(aCx); + + return promise.forget(); + }, + std::make_tuple(RefPtr{this}, mWriter), std::make_tuple(aChunk)); + if (result.isErr()) { + mLastWritePromise = nullptr; + return; + } + mLastWritePromise = result.unwrap(); + + mLastWritePromise->AppendNativeHandler( + new PipeToPumpHandler(this, nullptr, &PipeToPump::OnDestErrored)); +} + +void PipeToPump::OnWriterReady(JSContext* aCx, JS::Handle<JS::Value>) { + // Writer is ready again (i.e. backpressure was resolved), so read. + Read(aCx); +} + +struct PipeToReadRequest : public ReadRequest { + public: + NS_DECL_ISUPPORTS_INHERITED + NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(PipeToReadRequest, ReadRequest) + + RefPtr<PipeToPump> mPipeToPump; + + explicit PipeToReadRequest(PipeToPump* aPipeToPump) + : mPipeToPump(aPipeToPump) {} + + MOZ_CAN_RUN_SCRIPT void ChunkSteps(JSContext* aCx, + JS::Handle<JS::Value> aChunk, + ErrorResult& aRv) override { + RefPtr<PipeToPump> pipeToPump = mPipeToPump; // XXX known live? + pipeToPump->OnReadFulfilled(aCx, aChunk, aRv); + } + + // The reader's closed promise handlers will already call OnSourceClosed/ + // OnSourceErrored, so these steps can just be ignored. + void CloseSteps(JSContext* aCx, ErrorResult& aRv) override {} + void ErrorSteps(JSContext* aCx, JS::Handle<JS::Value> aError, + ErrorResult& aRv) override {} + + protected: + virtual ~PipeToReadRequest() = default; +}; + +NS_IMPL_CYCLE_COLLECTION_INHERITED(PipeToReadRequest, ReadRequest, mPipeToPump) + +NS_IMPL_ADDREF_INHERITED(PipeToReadRequest, ReadRequest) +NS_IMPL_RELEASE_INHERITED(PipeToReadRequest, ReadRequest) + +NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(PipeToReadRequest) +NS_INTERFACE_MAP_END_INHERITING(ReadRequest) + +void PipeToPump::Read(JSContext* aCx) { +#ifdef DEBUG + mReadChunk = true; +#endif + + // (Constraint) Shutdown must stop activity: + // If shuttingDown becomes true, the user agent must not initiate + // further reads from reader + if (mShuttingDown) { + return; + } + + // (Constraint) Backpressure must be enforced: + // While WritableStreamDefaultWriterGetDesiredSize(writer) is ≤ 0 or is null, + // the user agent must not read from reader. + Nullable<double> desiredSize = + WritableStreamDefaultWriterGetDesiredSize(mWriter); + if (desiredSize.IsNull()) { + // This means the writer has errored. This is going to be handled + // by the writer closed promise. + return; + } + + if (desiredSize.Value() <= 0) { + // Wait for the writer to become ready before reading more data from + // the reader. We don't care about rejections here, because those are + // already handled by the writer closed promise. + RefPtr<Promise> readyPromise = mWriter->Ready(); + readyPromise->AppendNativeHandler( + new PipeToPumpHandler(this, &PipeToPump::OnWriterReady, nullptr)); + return; + } + + RefPtr<ReadableStreamDefaultReader> reader = mReader; + RefPtr<ReadRequest> request = new PipeToReadRequest(this); + ErrorResult rv; + ReadableStreamDefaultReaderRead(aCx, reader, request, rv); + if (rv.MaybeSetPendingException(aCx)) { + // XXX It's actually not quite obvious what we should do here. + // We've got an error during reading, so on the surface it seems logical + // to invoke `OnSourceErrored`. However in certain cases the required + // condition > source.[[state]] is or becomes "errored" < won't actually + // happen i.e. when `WritableStreamDefaultWriterWrite` called from + // `OnReadFulfilled` (via PipeToReadRequest::ChunkSteps) fails in + // a synchronous fashion. + JS::Rooted<JS::Value> error(aCx); + JS::Rooted<Maybe<JS::Value>> someError(aCx); + + // The error was moved to the JSContext by MaybeSetPendingException. + if (JS_GetPendingException(aCx, &error)) { + someError = Some(error.get()); + } + + JS_ClearPendingException(aCx); + + Shutdown(aCx, someError); + } +} + +// Step 3. Closing must be propagated forward: if source.[[state]] is or +// becomes "closed", then +void PipeToPump::OnSourceClosed(JSContext* aCx, JS::Handle<JS::Value>) { + // Step 3.1. If preventClose is false, shutdown with an action of + // ! WritableStreamDefaultWriterCloseWithErrorPropagation(writer). + if (!mPreventClose) { + ShutdownWithAction( + aCx, + [](JSContext* aCx, PipeToPump* aPipeToPump, + JS::Handle<mozilla::Maybe<JS::Value>> aError, ErrorResult& aRv) + MOZ_CAN_RUN_SCRIPT { + RefPtr<WritableStreamDefaultWriter> writer = aPipeToPump->mWriter; + return WritableStreamDefaultWriterCloseWithErrorPropagation( + aCx, writer, aRv); + }, + JS::NothingHandleValue); + } else { + // Step 3.2 Otherwise, shutdown. + Shutdown(aCx, JS::NothingHandleValue); + } +} + +// Step 1. Errors must be propagated forward: if source.[[state]] is or +// becomes "errored", then +void PipeToPump::OnSourceErrored(JSContext* aCx, + JS::Handle<JS::Value> aSourceStoredError) { + // If |source| becomes errored not during a pending read, it's clear we must + // react immediately. + // + // But what if |source| becomes errored *during* a pending read? Should this + // first error, or the pending-read second error, predominate? Two semantics + // are possible when |source|/|dest| become closed or errored while there's a + // pending read: + // + // 1. Wait until the read fulfills or rejects, then respond to the + // closure/error without regard to the read having fulfilled or rejected. + // (This will simply not react to the read being rejected, or it will + // queue up the read chunk to be written during shutdown.) + // 2. React to the closure/error immediately per "Error and close states + // must be propagated". Then when the read fulfills or rejects later, do + // nothing. + // + // The spec doesn't clearly require either semantics. It requires that + // *already-read* chunks be written (at least if |dest| didn't become errored + // or closed such that no further writes can occur). But it's silent as to + // not-fully-read chunks. (These semantic differences may only be observable + // with very carefully constructed readable/writable streams.) + // + // It seems best, generally, to react to the temporally-earliest problem that + // arises, so we implement option #2. (Blink, in contrast, currently + // implements option #1.) + // + // All specified reactions to a closure/error invoke either the shutdown, or + // shutdown with an action, algorithms. Those algorithms each abort if either + // shutdown algorithm has already been invoked. So we don't need to do + // anything special here to deal with a pending read. + // + // TODO: Implement the eventual resolution from + // https://github.com/whatwg/streams/issues/1207 + + // Step 1.1 If preventAbort is false, shutdown with an action of + // ! WritableStreamAbort(dest, source.[[storedError]]) + // and with source.[[storedError]]. + JS::Rooted<Maybe<JS::Value>> error(aCx, Some(aSourceStoredError)); + if (!mPreventAbort) { + ShutdownWithAction( + aCx, + [](JSContext* aCx, PipeToPump* aPipeToPump, + JS::Handle<mozilla::Maybe<JS::Value>> aError, ErrorResult& aRv) + MOZ_CAN_RUN_SCRIPT { + JS::Rooted<JS::Value> error(aCx, *aError); + RefPtr<WritableStream> dest = aPipeToPump->mWriter->GetStream(); + return WritableStreamAbort(aCx, dest, error, aRv); + }, + error); + } else { + // Step 1.1. Otherwise, shutdown with source.[[storedError]]. + Shutdown(aCx, error); + } +} + +// Step 4. Closing must be propagated backward: +// if ! WritableStreamCloseQueuedOrInFlight(dest) is true +// or dest.[[state]] is "closed", then +void PipeToPump::OnDestClosed(JSContext* aCx, JS::Handle<JS::Value>) { + // Step 4.1. Assert: no chunks have been read or written. + // Note: No reading automatically implies no writing. + // In a perfect world OnDestClosed would only be called before we start + // piping, because afterwards the writer has an exclusive lock on the stream. + // In reality the closed promise can still be resolved after we release + // the lock on the writer in Finalize. + if (mShuttingDown) { + return; + } + MOZ_ASSERT(!mReadChunk); + + // Step 4.2. Let destClosed be a new TypeError. + JS::Rooted<Maybe<JS::Value>> destClosed(aCx, Nothing()); + { + ErrorResult rv; + rv.ThrowTypeError("Cannot pipe to closed stream"); + JS::Rooted<JS::Value> error(aCx); + bool ok = ToJSValue(aCx, std::move(rv), &error); + MOZ_RELEASE_ASSERT(ok, "must be ok"); + destClosed = Some(error.get()); + } + + // Step 4.3. If preventCancel is false, shutdown with an action of + // ! ReadableStreamCancel(source, destClosed) and with destClosed. + if (!mPreventCancel) { + ShutdownWithAction( + aCx, + [](JSContext* aCx, PipeToPump* aPipeToPump, + JS::Handle<mozilla::Maybe<JS::Value>> aError, ErrorResult& aRv) + MOZ_CAN_RUN_SCRIPT { + JS::Rooted<JS::Value> error(aCx, *aError); + RefPtr<ReadableStream> dest = aPipeToPump->mReader->GetStream(); + return ReadableStreamCancel(aCx, dest, error, aRv); + }, + destClosed); + } else { + // Step 4.4. Otherwise, shutdown with destClosed. + Shutdown(aCx, destClosed); + } +} + +// Step 2. Errors must be propagated backward: if dest.[[state]] is or becomes +// "errored", then +void PipeToPump::OnDestErrored(JSContext* aCx, + JS::Handle<JS::Value> aDestStoredError) { + // Step 2.1. If preventCancel is false, shutdown with an action of + // ! ReadableStreamCancel(source, dest.[[storedError]]) + // and with dest.[[storedError]]. + JS::Rooted<Maybe<JS::Value>> error(aCx, Some(aDestStoredError)); + if (!mPreventCancel) { + ShutdownWithAction( + aCx, + [](JSContext* aCx, PipeToPump* aPipeToPump, + JS::Handle<mozilla::Maybe<JS::Value>> aError, ErrorResult& aRv) + MOZ_CAN_RUN_SCRIPT { + JS::Rooted<JS::Value> error(aCx, *aError); + RefPtr<ReadableStream> dest = aPipeToPump->mReader->GetStream(); + return ReadableStreamCancel(aCx, dest, error, aRv); + }, + error); + } else { + // Step 2.1. Otherwise, shutdown with dest.[[storedError]]. + Shutdown(aCx, error); + } +} + +NS_IMPL_CYCLE_COLLECTION_CLASS(PipeToPump) +NS_IMPL_CYCLE_COLLECTING_ADDREF(PipeToPump) +NS_IMPL_CYCLE_COLLECTING_RELEASE(PipeToPump) +NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(PipeToPump) + NS_INTERFACE_MAP_ENTRY(nsISupports) +NS_INTERFACE_MAP_END + +NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN(PipeToPump) + NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mPromise) + NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mReader) + NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mWriter) + NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mLastWritePromise) +NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END + +NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN(PipeToPump) + NS_IMPL_CYCLE_COLLECTION_UNLINK(mPromise) + NS_IMPL_CYCLE_COLLECTION_UNLINK(mReader) + NS_IMPL_CYCLE_COLLECTION_UNLINK(mWriter) + NS_IMPL_CYCLE_COLLECTION_UNLINK(mLastWritePromise) +NS_IMPL_CYCLE_COLLECTION_UNLINK_END + +namespace streams_abstract { +// https://streams.spec.whatwg.org/#readable-stream-pipe-to +already_AddRefed<Promise> ReadableStreamPipeTo( + ReadableStream* aSource, WritableStream* aDest, bool aPreventClose, + bool aPreventAbort, bool aPreventCancel, AbortSignal* aSignal, + mozilla::ErrorResult& aRv) { + // Step 1. Assert: source implements ReadableStream. (Implicit) + // Step 2. Assert: dest implements WritableStream. (Implicit) + // Step 3. Assert: preventClose, preventAbort, and preventCancel are all + // booleans (Implicit) + // Step 4. If signal was not given, let signal be + // undefined. (Implicit) + // Step 5. Assert: either signal is undefined, or signal + // implements AbortSignal. (Implicit) + // Step 6. Assert: !IsReadableStreamLocked(source) is false. + MOZ_ASSERT(!IsReadableStreamLocked(aSource)); + + // Step 7. Assert: !IsWritableStreamLocked(dest) is false. + MOZ_ASSERT(!IsWritableStreamLocked(aDest)); + + AutoJSAPI jsapi; + if (!jsapi.Init(aSource->GetParentObject())) { + aRv.ThrowUnknownError("Internal error"); + return nullptr; + } + JSContext* cx = jsapi.cx(); + + // Step 8. If source.[[controller]] implements ReadableByteStreamController, + // let reader be either !AcquireReadableStreamBYOBReader(source) or + // !AcquireReadableStreamDefaultReader(source), at the user agent’s + // discretion. + // Step 9. Otherwise, let reader be + // !AcquireReadableStreamDefaultReader(source). + + // Note: In the interests of simplicity, we choose here to always acquire + // a default reader. + RefPtr<ReadableStreamDefaultReader> reader = + AcquireReadableStreamDefaultReader(aSource, aRv); + if (aRv.Failed()) { + return nullptr; + } + + // Step 10. Let writer be ! AcquireWritableStreamDefaultWriter(dest). + RefPtr<WritableStreamDefaultWriter> writer = + AcquireWritableStreamDefaultWriter(aDest, aRv); + if (aRv.Failed()) { + return nullptr; + } + + // Step 11. Set source.[[disturbed]] to true. + aSource->SetDisturbed(true); + + // Step 12. Let shuttingDown be false. + // Note: PipeToPump ensures this by construction. + + // Step 13. Let promise be a new promise. + RefPtr<Promise> promise = + Promise::CreateInfallible(aSource->GetParentObject()); + + // Steps 14-15. + RefPtr<PipeToPump> pump = new PipeToPump( + promise, reader, writer, aPreventClose, aPreventAbort, aPreventCancel); + pump->Start(cx, aSignal); + + // Step 16. Return promise. + return promise.forget(); +} +} // namespace streams_abstract + +} // namespace mozilla::dom |