/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim:set ts=2 sw=2 sts=2 et cindent: */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ #include "mozilla/dom/ReadableStream.h" #include "ReadIntoRequest.h" #include "ReadableStreamPipeTo.h" #include "ReadableStreamTee.h" #include "StreamUtils.h" #include "TeeState.h" #include "js/Array.h" #include "js/Exception.h" #include "js/PropertyAndElement.h" #include "js/TypeDecls.h" #include "js/Value.h" #include "mozilla/AlreadyAddRefed.h" #include "mozilla/Assertions.h" #include "mozilla/Attributes.h" #include "mozilla/CycleCollectedJSContext.h" #include "mozilla/FloatingPoint.h" #include "mozilla/HoldDropJSObjects.h" #include "mozilla/StaticPrefs_dom.h" #include "mozilla/dom/BindingCallContext.h" #include "mozilla/dom/ByteStreamHelpers.h" #include "mozilla/dom/BodyStream.h" #include "mozilla/dom/QueueWithSizes.h" #include "mozilla/dom/QueuingStrategyBinding.h" #include "mozilla/dom/ReadRequest.h" #include "mozilla/dom/ReadableByteStreamController.h" #include "mozilla/dom/ReadableStreamBYOBReader.h" #include "mozilla/dom/ReadableStreamBinding.h" #include "mozilla/dom/ReadableStreamController.h" #include "mozilla/dom/ReadableStreamDefaultController.h" #include "mozilla/dom/ReadableStreamDefaultReader.h" #include "mozilla/dom/RootedDictionary.h" #include "mozilla/dom/ScriptSettings.h" #include "mozilla/dom/UnderlyingSourceBinding.h" #include "mozilla/dom/UnderlyingSourceCallbackHelpers.h" #include "mozilla/dom/WritableStream.h" #include "mozilla/dom/WritableStreamDefaultWriter.h" #include "nsCOMPtr.h" #include "mozilla/dom/Promise-inl.h" #include "nsIGlobalObject.h" #include "nsISupports.h" inline void ImplCycleCollectionTraverse( nsCycleCollectionTraversalCallback& aCallback, mozilla::Variant>& aReader, const char* aName, uint32_t aFlags = 0) { if (aReader.is>()) { ImplCycleCollectionTraverse( aCallback, aReader.as>(), aName, aFlags); } } inline void ImplCycleCollectionUnlink( mozilla::Variant>& aReader) { aReader = AsVariant(mozilla::Nothing()); } namespace mozilla::dom { // Only needed for refcounted objects. NS_IMPL_CYCLE_COLLECTION_WRAPPERCACHE_WITH_JS_MEMBERS( ReadableStream, (mGlobal, mController, mReader), (mStoredError)) NS_IMPL_CYCLE_COLLECTING_ADDREF(ReadableStream) NS_IMPL_CYCLE_COLLECTING_RELEASE(ReadableStream) NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(ReadableStream) NS_WRAPPERCACHE_INTERFACE_MAP_ENTRY NS_INTERFACE_MAP_ENTRY(nsISupports) NS_INTERFACE_MAP_END ReadableStream::ReadableStream(nsIGlobalObject* aGlobal) : mGlobal(aGlobal), mReader(nullptr) { mozilla::HoldJSObjects(this); } ReadableStream::ReadableStream(const GlobalObject& aGlobal) : mGlobal(do_QueryInterface(aGlobal.GetAsSupports())), mReader(nullptr) { mozilla::HoldJSObjects(this); } ReadableStream::~ReadableStream() { mozilla::DropJSObjects(this); } JSObject* ReadableStream::WrapObject(JSContext* aCx, JS::Handle aGivenProto) { return ReadableStream_Binding::Wrap(aCx, this, aGivenProto); } ReadableStreamDefaultReader* ReadableStream::GetDefaultReader() { return mReader->AsDefault(); } void ReadableStream::SetReader(ReadableStreamGenericReader* aReader) { mReader = aReader; } // https://streams.spec.whatwg.org/#readable-stream-has-byob-reader bool ReadableStreamHasBYOBReader(ReadableStream* aStream) { // Step 1. Let reader be stream.[[reader]]. ReadableStreamGenericReader* reader = aStream->GetReader(); // Step 2. If reader is undefined, return false. if (!reader) { return false; } // Step 3. If reader implements ReadableStreamBYOBReader, return true. // Step 4. Return false. return reader->IsBYOB(); } // https://streams.spec.whatwg.org/#readable-stream-has-default-reader bool ReadableStreamHasDefaultReader(ReadableStream* aStream) { // Step 1. Let reader be stream.[[reader]]. ReadableStreamGenericReader* reader = aStream->GetReader(); // Step 2. If reader is undefined, return false. if (!reader) { return false; } // Step 3. If reader implements ReadableStreamDefaultReader, return true. // Step 4. Return false. return reader->IsDefault(); } // Streams Spec: 4.2.4: https://streams.spec.whatwg.org/#rs-prototype /* static */ already_AddRefed ReadableStream::Constructor( const GlobalObject& aGlobal, const Optional>& aUnderlyingSource, const QueuingStrategy& aStrategy, ErrorResult& aRv) { // Step 1. JS::Rooted underlyingSourceObj( aGlobal.Context(), aUnderlyingSource.WasPassed() ? aUnderlyingSource.Value() : nullptr); // Step 2. RootedDictionary underlyingSourceDict(aGlobal.Context()); if (underlyingSourceObj) { JS::Rooted objValue(aGlobal.Context(), JS::ObjectValue(*underlyingSourceObj)); dom::BindingCallContext callCx(aGlobal.Context(), "ReadableStream.constructor"); aRv.MightThrowJSException(); if (!underlyingSourceDict.Init(callCx, objValue)) { aRv.StealExceptionFromJSContext(aGlobal.Context()); return nullptr; } } // Step 3. RefPtr readableStream = new ReadableStream(aGlobal); // Step 4. if (underlyingSourceDict.mType.WasPassed()) { // Implicit assertion on above check. MOZ_ASSERT(underlyingSourceDict.mType.Value() == ReadableStreamType::Bytes); // Step 4.1 if (aStrategy.mSize.WasPassed()) { aRv.ThrowRangeError("Implementation preserved member 'size'"); return nullptr; } // Step 4.2 double highWaterMark = ExtractHighWaterMark(aStrategy, 0, aRv); if (aRv.Failed()) { return nullptr; } // Step 4.3 SetUpReadableByteStreamControllerFromUnderlyingSource( aGlobal.Context(), readableStream, underlyingSourceObj, underlyingSourceDict, highWaterMark, aRv); if (aRv.Failed()) { return nullptr; } return readableStream.forget(); } // Step 5.1 (implicit in above check) // Step 5.2. Extract callback. // // Implementation Note: The specification demands that if the size doesn't // exist, we instead would provide an algorithm that returns 1. Instead, we // will teach callers that a missing callback should simply return 1, rather // than gin up a fake callback here. // // This decision may need to be revisited if the default action ever diverges // within the specification. RefPtr sizeAlgorithm = aStrategy.mSize.WasPassed() ? &aStrategy.mSize.Value() : nullptr; // Step 5.3 double highWaterMark = ExtractHighWaterMark(aStrategy, 1, aRv); if (aRv.Failed()) { return nullptr; } // Step 5.4. SetupReadableStreamDefaultControllerFromUnderlyingSource( aGlobal.Context(), readableStream, underlyingSourceObj, underlyingSourceDict, highWaterMark, sizeAlgorithm, aRv); if (aRv.Failed()) { return nullptr; } return readableStream.forget(); } // Dealing with const this ptr is a pain, so just re-implement. // https://streams.spec.whatwg.org/#is-readable-stream-locked bool ReadableStream::Locked() const { // Step 1 + 2. return mReader; } // https://streams.spec.whatwg.org/#initialize-readable-stream static void InitializeReadableStream(ReadableStream* aStream) { // Step 1. aStream->SetState(ReadableStream::ReaderState::Readable); // Step 2. aStream->SetReader(nullptr); aStream->SetStoredError(JS::UndefinedHandleValue); // Step 3. aStream->SetDisturbed(false); } // https://streams.spec.whatwg.org/#create-readable-stream MOZ_CAN_RUN_SCRIPT already_AddRefed CreateReadableStream( JSContext* aCx, nsIGlobalObject* aGlobal, UnderlyingSourceAlgorithmsBase* aAlgorithms, mozilla::Maybe aHighWaterMark, QueuingStrategySize* aSizeAlgorithm, ErrorResult& aRv) { // Step 1. double highWaterMark = aHighWaterMark.isSome() ? *aHighWaterMark : 1.0; // Step 2. consumers of sizeAlgorithm // handle null algorithms correctly. // Step 3. MOZ_ASSERT(IsNonNegativeNumber(highWaterMark)); // Step 4. RefPtr stream = new ReadableStream(aGlobal); // Step 5. InitializeReadableStream(stream); // Step 6. RefPtr controller = new ReadableStreamDefaultController(aGlobal); // Step 7. SetUpReadableStreamDefaultController(aCx, stream, controller, aAlgorithms, highWaterMark, aSizeAlgorithm, aRv); // Step 8. return stream.forget(); } // https://streams.spec.whatwg.org/#readable-stream-close void ReadableStreamClose(JSContext* aCx, ReadableStream* aStream, ErrorResult& aRv) { // Step 1. MOZ_ASSERT(aStream->State() == ReadableStream::ReaderState::Readable); // Step 2. aStream->SetState(ReadableStream::ReaderState::Closed); // Step 3. ReadableStreamGenericReader* reader = aStream->GetReader(); // Step 4. if (!reader) { return; } // Step 5. reader->ClosedPromise()->MaybeResolveWithUndefined(); // Step 6. if (reader->IsDefault()) { // Step 6.1. Let readRequests be reader.[[readRequests]]. // Move LinkedList out of DefaultReader onto stack to avoid the potential // for concurrent modification, which could invalidate the iterator. // // See https://bugs.chromium.org/p/chromium/issues/detail?id=1045874 as an // example of the kind of issue that could occur. LinkedList> readRequests = std::move(reader->AsDefault()->ReadRequests()); // Step 6.2. Set reader.[[readRequests]] to an empty list. // Note: The std::move already cleared this anyway. reader->AsDefault()->ReadRequests().clear(); // Step 6.3. For each readRequest of readRequests, // Drain the local list and destroy elements along the way. while (RefPtr readRequest = readRequests.popFirst()) { // Step 6.3.1. Perform readRequest’s close steps. readRequest->CloseSteps(aCx, aRv); if (aRv.Failed()) { return; } } } } // https://streams.spec.whatwg.org/#readable-stream-cancel already_AddRefed ReadableStreamCancel(JSContext* aCx, ReadableStream* aStream, JS::Handle aError, ErrorResult& aRv) { // Step 1. aStream->SetDisturbed(true); // Step 2. if (aStream->State() == ReadableStream::ReaderState::Closed) { RefPtr promise = Promise::Create(aStream->GetParentObject(), aRv); if (aRv.Failed()) { return nullptr; } promise->MaybeResolveWithUndefined(); return promise.forget(); } // Step 3. if (aStream->State() == ReadableStream::ReaderState::Errored) { JS::Rooted storedError(aCx, aStream->StoredError()); return Promise::CreateRejected(aStream->GetParentObject(), storedError, aRv); } // Step 4. ReadableStreamClose(aCx, aStream, aRv); if (aRv.Failed()) { return nullptr; } // Step 5. ReadableStreamGenericReader* reader = aStream->GetReader(); // Step 6. if (reader && reader->IsBYOB()) { // Step 6.1. Let readIntoRequests be reader.[[readIntoRequests]]. LinkedList> readIntoRequests = std::move(reader->AsBYOB()->ReadIntoRequests()); // Step 6.2. Set reader.[[readIntoRequests]] to an empty list. // Note: The std::move already cleared this anyway. reader->AsBYOB()->ReadIntoRequests().clear(); // Step 6.3. For each readIntoRequest of readIntoRequests, while (RefPtr readIntoRequest = readIntoRequests.popFirst()) { // Step 6.3.1.Perform readIntoRequest’s close steps, given undefined. readIntoRequest->CloseSteps(aCx, JS::UndefinedHandleValue, aRv); if (aRv.Failed()) { return nullptr; } } } // Step 7. RefPtr controller(aStream->Controller()); RefPtr sourceCancelPromise = controller->CancelSteps(aCx, aError, aRv); if (aRv.Failed()) { return nullptr; } // Step 8. RefPtr promise = Promise::Create(sourceCancelPromise->GetParentObject(), aRv); if (aRv.Failed()) { return nullptr; } // ThenWithCycleCollectedArgs will carry promise, keeping it alive until the // callback executes. Result, nsresult> returnResult = sourceCancelPromise->ThenWithCycleCollectedArgs( [](JSContext*, JS::Handle, ErrorResult&, RefPtr newPromise) { newPromise->MaybeResolveWithUndefined(); return newPromise.forget(); }, promise); if (returnResult.isErr()) { aRv.Throw(returnResult.unwrapErr()); return nullptr; } return returnResult.unwrap().forget(); } // https://streams.spec.whatwg.org/#rs-cancel already_AddRefed ReadableStream::Cancel(JSContext* aCx, JS::Handle aReason, ErrorResult& aRv) { // Step 1. If ! IsReadableStreamLocked(this) is true, // return a promise rejected with a TypeError exception. if (Locked()) { aRv.ThrowTypeError("Cannot cancel a stream locked by a reader."); return nullptr; } // Step 2. Return ! ReadableStreamCancel(this, reason). RefPtr thisRefPtr = this; return ReadableStreamCancel(aCx, thisRefPtr, aReason, aRv); } // https://streams.spec.whatwg.org/#acquire-readable-stream-reader already_AddRefed AcquireReadableStreamDefaultReader(ReadableStream* aStream, ErrorResult& aRv) { // Step 1. RefPtr reader = new ReadableStreamDefaultReader(aStream->GetParentObject()); // Step 2. SetUpReadableStreamDefaultReader(reader, aStream, aRv); if (aRv.Failed()) { return nullptr; } // Step 3. return reader.forget(); } // https://streams.spec.whatwg.org/#rs-get-reader void ReadableStream::GetReader(const ReadableStreamGetReaderOptions& aOptions, OwningReadableStreamReader& resultReader, ErrorResult& aRv) { // Step 1. If options["mode"] does not exist, // return ? AcquireReadableStreamDefaultReader(this). if (!aOptions.mMode.WasPassed()) { RefPtr thisRefPtr = this; RefPtr defaultReader = AcquireReadableStreamDefaultReader(thisRefPtr, aRv); if (aRv.Failed()) { return; } resultReader.SetAsReadableStreamDefaultReader() = defaultReader; return; } // Step 2. Assert: options["mode"] is "byob". MOZ_ASSERT(aOptions.mMode.Value() == ReadableStreamReaderMode::Byob); // Step 3. Return ? AcquireReadableStreamBYOBReader(this). RefPtr thisRefPtr = this; RefPtr byobReader = AcquireReadableStreamBYOBReader(thisRefPtr, aRv); if (aRv.Failed()) { return; } resultReader.SetAsReadableStreamBYOBReader() = byobReader; } // https://streams.spec.whatwg.org/#is-readable-stream-locked bool IsReadableStreamLocked(ReadableStream* aStream) { // Step 1 + 2. return aStream->Locked(); } // https://streams.spec.whatwg.org/#rs-pipe-through MOZ_CAN_RUN_SCRIPT already_AddRefed ReadableStream::PipeThrough( const ReadableWritablePair& aTransform, const StreamPipeOptions& aOptions, ErrorResult& aRv) { // Step 1: If ! IsReadableStreamLocked(this) is true, throw a TypeError // exception. if (IsReadableStreamLocked(this)) { aRv.ThrowTypeError("Cannot pipe from a locked stream."); return nullptr; } // Step 2: If ! IsWritableStreamLocked(transform["writable"]) is true, throw a // TypeError exception. if (IsWritableStreamLocked(aTransform.mWritable)) { aRv.ThrowTypeError("Cannot pipe to a locked stream."); return nullptr; } // Step 3: Let signal be options["signal"] if it exists, or undefined // otherwise. RefPtr signal = aOptions.mSignal.WasPassed() ? &aOptions.mSignal.Value() : nullptr; // Step 4: Let promise be ! ReadableStreamPipeTo(this, transform["writable"], // options["preventClose"], options["preventAbort"], options["preventCancel"], // signal). RefPtr writable = aTransform.mWritable; RefPtr promise = ReadableStreamPipeTo( this, writable, aOptions.mPreventClose, aOptions.mPreventAbort, aOptions.mPreventCancel, signal, aRv); if (aRv.Failed()) { return nullptr; } // Step 5: Set promise.[[PromiseIsHandled]] to true. MOZ_ALWAYS_TRUE(promise->SetAnyPromiseIsHandled()); // Step 6: Return transform["readable"]. return do_AddRef(aTransform.mReadable.get()); }; // https://streams.spec.whatwg.org/#readable-stream-get-num-read-requests double ReadableStreamGetNumReadRequests(ReadableStream* aStream) { // Step 1. MOZ_ASSERT(ReadableStreamHasDefaultReader(aStream)); // Step 2. return double(aStream->GetDefaultReader()->ReadRequests().length()); } // https://streams.spec.whatwg.org/#readable-stream-error void ReadableStreamError(JSContext* aCx, ReadableStream* aStream, JS::Handle aValue, ErrorResult& aRv) { // Step 1. MOZ_ASSERT(aStream->State() == ReadableStream::ReaderState::Readable); // Step 2. aStream->SetState(ReadableStream::ReaderState::Errored); // Step 3. aStream->SetStoredError(aValue); // Step 4. ReadableStreamGenericReader* reader = aStream->GetReader(); // Step 5. if (!reader) { return; } // Step 6. reader->ClosedPromise()->MaybeReject(aValue); // Step 7. reader->ClosedPromise()->SetSettledPromiseIsHandled(); // Step 8. if (reader->IsDefault()) { // Step 8.1. Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader, // e). RefPtr defaultReader = reader->AsDefault(); ReadableStreamDefaultReaderErrorReadRequests(aCx, defaultReader, aValue, aRv); if (aRv.Failed()) { return; } } else { // Step 9. Otherwise, // Step 9.1. Assert: reader implements ReadableStreamBYOBReader. MOZ_ASSERT(reader->IsBYOB()); // Step 9.2. Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests(reader, // e). RefPtr byobReader = reader->AsBYOB(); ReadableStreamBYOBReaderErrorReadIntoRequests(aCx, byobReader, aValue, aRv); if (aRv.Failed()) { return; } } } // https://streams.spec.whatwg.org/#rs-default-controller-close void ReadableStreamFulfillReadRequest(JSContext* aCx, ReadableStream* aStream, JS::Handle aChunk, bool aDone, ErrorResult& aRv) { // Step 1. MOZ_ASSERT(ReadableStreamHasDefaultReader(aStream)); // Step 2. ReadableStreamDefaultReader* reader = aStream->GetDefaultReader(); // Step 3. MOZ_ASSERT(!reader->ReadRequests().isEmpty()); // Step 4+5. RefPtr readRequest = reader->ReadRequests().popFirst(); // Step 6. if (aDone) { readRequest->CloseSteps(aCx, aRv); if (aRv.Failed()) { return; } } // Step 7. readRequest->ChunkSteps(aCx, aChunk, aRv); } // https://streams.spec.whatwg.org/#readable-stream-add-read-request void ReadableStreamAddReadRequest(ReadableStream* aStream, ReadRequest* aReadRequest) { // Step 1. MOZ_ASSERT(aStream->GetReader()->IsDefault()); // Step 2. MOZ_ASSERT(aStream->State() == ReadableStream::ReaderState::Readable); // Step 3. aStream->GetDefaultReader()->ReadRequests().insertBack(aReadRequest); } // https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee // Step 14, 15 MOZ_CAN_RUN_SCRIPT already_AddRefed ReadableStreamDefaultTeeSourceAlgorithms::CancelCallback( JSContext* aCx, const Optional>& aReason, ErrorResult& aRv) { // Step 1. mTeeState->SetCanceled(mBranch, true); // Step 2. mTeeState->SetReason(mBranch, aReason.Value()); // Step 3. if (mTeeState->Canceled(OtherTeeBranch(mBranch))) { // Step 3.1 JS::Rooted compositeReason(aCx, JS::NewArrayObject(aCx, 2)); if (!compositeReason) { aRv.StealExceptionFromJSContext(aCx); return nullptr; } JS::Rooted reason1(aCx, mTeeState->Reason1()); if (!JS_SetElement(aCx, compositeReason, 0, reason1)) { aRv.StealExceptionFromJSContext(aCx); return nullptr; } JS::Rooted reason2(aCx, mTeeState->Reason2()); if (!JS_SetElement(aCx, compositeReason, 1, reason2)) { aRv.StealExceptionFromJSContext(aCx); return nullptr; } // Step 3.2 JS::Rooted compositeReasonValue( aCx, JS::ObjectValue(*compositeReason)); RefPtr stream(mTeeState->GetStream()); RefPtr cancelResult = ReadableStreamCancel(aCx, stream, compositeReasonValue, aRv); if (aRv.Failed()) { return nullptr; } // Step 3.3 mTeeState->CancelPromise()->MaybeResolve(cancelResult); } // Step 4. return do_AddRef(mTeeState->CancelPromise()); } // https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee MOZ_CAN_RUN_SCRIPT static void ReadableStreamDefaultTee(JSContext* aCx, ReadableStream* aStream, bool aCloneForBranch2, nsTArray>& aResult, ErrorResult& aRv) { // Step 1. Implicit. // Step 2. Implicit. // Steps 3-12 are contained in the construction of Tee State. RefPtr teeState = TeeState::Create(aStream, aCloneForBranch2, aRv); if (aRv.Failed()) { return; } // Step 13 - 16 auto branch1Algorithms = MakeRefPtr( teeState, TeeBranch::Branch1); auto branch2Algorithms = MakeRefPtr( teeState, TeeBranch::Branch2); // Step 17. nsCOMPtr global( do_AddRef(teeState->GetStream()->GetParentObject())); teeState->SetBranch1(CreateReadableStream(aCx, global, branch1Algorithms, mozilla::Nothing(), nullptr, aRv)); if (aRv.Failed()) { return; } // Step 18. teeState->SetBranch2(CreateReadableStream(aCx, global, branch2Algorithms, mozilla::Nothing(), nullptr, aRv)); if (aRv.Failed()) { return; } // Step 19. teeState->GetReader()->ClosedPromise()->AddCallbacksWithCycleCollectedArgs( [](JSContext* aCx, JS::Handle aValue, ErrorResult& aRv, TeeState* aTeeState) {}, [](JSContext* aCx, JS::Handle aReason, ErrorResult& aRv, TeeState* aTeeState) { // Step 19.1. ReadableStreamDefaultControllerError( aCx, aTeeState->Branch1()->DefaultController(), aReason, aRv); if (aRv.Failed()) { return; } // Step 19.2 ReadableStreamDefaultControllerError( aCx, aTeeState->Branch2()->DefaultController(), aReason, aRv); if (aRv.Failed()) { return; } // Step 19.3 if (!aTeeState->Canceled1() || !aTeeState->Canceled2()) { aTeeState->CancelPromise()->MaybeResolveWithUndefined(); } }, RefPtr(teeState)); // Step 20. aResult.AppendElement(teeState->Branch1()); aResult.AppendElement(teeState->Branch2()); } // https://streams.spec.whatwg.org/#rs-pipe-to already_AddRefed ReadableStream::PipeTo( WritableStream& aDestination, const StreamPipeOptions& aOptions, ErrorResult& aRv) { // Step 1. If !IsReadableStreamLocked(this) is true, return a promise rejected // with a TypeError exception. if (IsReadableStreamLocked(this)) { aRv.ThrowTypeError("Cannot pipe from a locked stream."); return nullptr; } // Step 2. If !IsWritableStreamLocked(destination) is true, return a promise // rejected with a TypeError exception. if (IsWritableStreamLocked(&aDestination)) { aRv.ThrowTypeError("Cannot pipe to a locked stream."); return nullptr; } // Step 3. Let signal be options["signal"] if it exists, or undefined // otherwise. RefPtr signal = aOptions.mSignal.WasPassed() ? &aOptions.mSignal.Value() : nullptr; // Step 4. Return ! ReadableStreamPipeTo(this, destination, // options["preventClose"], options["preventAbort"], options["preventCancel"], // signal). return ReadableStreamPipeTo(this, &aDestination, aOptions.mPreventClose, aOptions.mPreventAbort, aOptions.mPreventCancel, signal, aRv); } // https://streams.spec.whatwg.org/#readable-stream-tee MOZ_CAN_RUN_SCRIPT static void ReadableStreamTee(JSContext* aCx, ReadableStream* aStream, bool aCloneForBranch2, nsTArray>& aResult, ErrorResult& aRv) { // Step 1. Implicit. // Step 2. Implicit. // Step 3. if (aStream->Controller()->IsByte()) { ReadableByteStreamTee(aCx, aStream, aResult, aRv); return; } // Step 4. ReadableStreamDefaultTee(aCx, aStream, aCloneForBranch2, aResult, aRv); } void ReadableStream::Tee(JSContext* aCx, nsTArray>& aResult, ErrorResult& aRv) { ReadableStreamTee(aCx, this, false, aResult, aRv); } void ReadableStream::IteratorData::Traverse( nsCycleCollectionTraversalCallback& cb) { ReadableStream::IteratorData* tmp = this; NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mReader); } void ReadableStream::IteratorData::Unlink() { ReadableStream::IteratorData* tmp = this; NS_IMPL_CYCLE_COLLECTION_UNLINK(mReader); } // https://streams.spec.whatwg.org/#rs-get-iterator void ReadableStream::InitAsyncIteratorData( IteratorData& aData, Iterator::IteratorType aType, const ReadableStreamIteratorOptions& aOptions, ErrorResult& aRv) { // Step 1. Let reader be ? AcquireReadableStreamDefaultReader(stream). RefPtr reader = AcquireReadableStreamDefaultReader(this, aRv); if (aRv.Failed()) { return; } // Step 2. Set iterator’s reader to reader. aData.mReader = reader; // Step 3. Let preventCancel be args[0]["preventCancel"]. // Step 4. Set iterator’s prevent cancel to preventCancel. aData.mPreventCancel = aOptions.mPreventCancel; } // https://streams.spec.whatwg.org/#rs-asynciterator-prototype-next // Step 4. struct IteratorReadRequest : public ReadRequest { public: NS_DECL_ISUPPORTS_INHERITED NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(IteratorReadRequest, ReadRequest) RefPtr mPromise; RefPtr mReader; explicit IteratorReadRequest(Promise* aPromise, ReadableStreamDefaultReader* aReader) : mPromise(aPromise), mReader(aReader) {} // chunk steps, given chunk void ChunkSteps(JSContext* aCx, JS::Handle aChunk, ErrorResult& aRv) override { // Step 1. Resolve promise with chunk. mPromise->MaybeResolve(aChunk); } // close steps void CloseSteps(JSContext* aCx, ErrorResult& aRv) override { // Step 1. Perform ! ReadableStreamDefaultReaderRelease(reader). ReadableStreamDefaultReaderRelease(aCx, mReader, aRv); if (aRv.Failed()) { mPromise->MaybeRejectWithUndefined(); return; } // Step 2. Resolve promise with end of iteration. iterator_utils::ResolvePromiseForFinished(mPromise); } // error steps, given e void ErrorSteps(JSContext* aCx, JS::Handle aError, ErrorResult& aRv) override { // Step 1. Perform ! ReadableStreamDefaultReaderRelease(reader). ReadableStreamDefaultReaderRelease(aCx, mReader, aRv); if (aRv.Failed()) { mPromise->MaybeRejectWithUndefined(); return; } // Step 2. Reject promise with e. mPromise->MaybeReject(aError); } protected: virtual ~IteratorReadRequest() = default; }; NS_IMPL_CYCLE_COLLECTION_INHERITED(IteratorReadRequest, ReadRequest, mPromise, mReader) NS_IMPL_ADDREF_INHERITED(IteratorReadRequest, ReadRequest) NS_IMPL_RELEASE_INHERITED(IteratorReadRequest, ReadRequest) NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(IteratorReadRequest) NS_INTERFACE_MAP_END_INHERITING(ReadRequest) // https://streams.spec.whatwg.org/#rs-asynciterator-prototype-next already_AddRefed ReadableStream::GetNextIterationResult( Iterator* aIterator, ErrorResult& aRv) { // Step 1. Let reader be iterator’s reader. RefPtr reader = aIterator->Data().mReader; // Step 2. Assert: reader.[[stream]] is not undefined. MOZ_ASSERT(reader->GetStream()); // Step 3. Let promise be a new promise. RefPtr promise = Promise::Create(GetParentObject(), aRv); if (aRv.Failed()) { return nullptr; } // Step 4. Let readRequest be a new read request with the following items: RefPtr request = new IteratorReadRequest(promise, reader); // Step 5. Perform ! ReadableStreamDefaultReaderRead(this, readRequest). AutoJSAPI jsapi; if (!jsapi.Init(mGlobal)) { aRv.ThrowUnknownError("Internal error"); return nullptr; } ReadableStreamDefaultReaderRead(jsapi.cx(), reader, request, aRv); if (aRv.Failed()) { return nullptr; } // Step 6. Return promise. return promise.forget(); } // https://streams.spec.whatwg.org/#rs-asynciterator-prototype-return already_AddRefed ReadableStream::IteratorReturn( JSContext* aCx, Iterator* aIterator, JS::Handle aValue, ErrorResult& aRv) { // Step 1. Let reader be iterator’s reader. RefPtr reader = aIterator->Data().mReader; // Step 2. Assert: reader.[[stream]] is not undefined. MOZ_ASSERT(reader->GetStream()); // Step 3. Assert: reader.[[readRequests]] is empty, as the async iterator // machinery guarantees that any previous calls to next() have settled before // this is called. MOZ_ASSERT(reader->ReadRequests().isEmpty()); // Step 4. If iterator’s prevent cancel is false: if (!aIterator->Data().mPreventCancel) { // Step 4.1. Let result be ! ReadableStreamReaderGenericCancel(reader, arg). RefPtr stream(reader->GetStream()); RefPtr result = ReadableStreamCancel(aCx, stream, aValue, aRv); if (NS_WARN_IF(aRv.Failed())) { return nullptr; } // Step 4.2. Perform ! ReadableStreamDefaultReaderRelease(reader). ReadableStreamDefaultReaderRelease(aCx, reader, aRv); if (NS_WARN_IF(aRv.Failed())) { return nullptr; } // Step 4.3. Return result. return result.forget(); } // Step 5. Perform ! ReadableStreamDefaultReaderRelease(reader). ReadableStreamDefaultReaderRelease(aCx, reader, aRv); if (NS_WARN_IF(aRv.Failed())) { return nullptr; } // Step 6. Return a promise resolved with undefined. return Promise::CreateResolvedWithUndefined(GetParentObject(), aRv); } // https://streams.spec.whatwg.org/#readable-stream-add-read-into-request void ReadableStreamAddReadIntoRequest(ReadableStream* aStream, ReadIntoRequest* aReadIntoRequest) { // Step 1. Assert: stream.[[reader]] implements ReadableStreamBYOBReader. MOZ_ASSERT(aStream->GetReader()->IsBYOB()); // Step 2. Assert: stream.[[state]] is "readable" or "closed". MOZ_ASSERT(aStream->State() == ReadableStream::ReaderState::Readable || aStream->State() == ReadableStream::ReaderState::Closed); // Step 3. Append readRequest to stream.[[reader]].[[readIntoRequests]]. aStream->GetReader()->AsBYOB()->ReadIntoRequests().insertBack( aReadIntoRequest); } // https://streams.spec.whatwg.org/#abstract-opdef-createreadablebytestream already_AddRefed CreateReadableByteStream( JSContext* aCx, nsIGlobalObject* aGlobal, UnderlyingSourceAlgorithmsBase* aAlgorithms, ErrorResult& aRv) { // Step 1. Let stream be a new ReadableStream. RefPtr stream = new ReadableStream(aGlobal); // Step 2. Perform ! InitializeReadableStream(stream). InitializeReadableStream(stream); // Step 3. Let controller be a new ReadableByteStreamController. RefPtr controller = new ReadableByteStreamController(aGlobal); // Step 4. Perform ? SetUpReadableByteStreamController(stream, controller, // startAlgorithm, pullAlgorithm, cancelAlgorithm, 0, undefined). SetUpReadableByteStreamController(aCx, stream, controller, aAlgorithms, 0, mozilla::Nothing(), aRv); if (aRv.Failed()) { return nullptr; } // Return stream. return stream.forget(); } // https://streams.spec.whatwg.org/#readablestream-close void ReadableStream::CloseNative(JSContext* aCx, ErrorResult& aRv) { MOZ_ASSERT(mController->GetAlgorithms()->IsNative()); // Step 1: If stream.[[controller]] implements ReadableByteStreamController, if (mController->IsByte()) { RefPtr controller = mController->AsByte(); // Step 1.1: Perform ! // ReadableByteStreamControllerClose(stream.[[controller]]). ReadableByteStreamControllerClose(aCx, controller, aRv); if (aRv.Failed()) { return; } // Step 1.2: If stream.[[controller]].[[pendingPullIntos]] is not empty, // perform ! ReadableByteStreamControllerRespond(stream.[[controller]], 0). if (!controller->PendingPullIntos().isEmpty()) { ReadableByteStreamControllerRespond(aCx, controller, 0, aRv); } return; } // Step 2: Otherwise, perform ! // ReadableStreamDefaultControllerClose(stream.[[controller]]). RefPtr controller = mController->AsDefault(); ReadableStreamDefaultControllerClose(aCx, controller, aRv); } // https://streams.spec.whatwg.org/#readablestream-current-byob-request-view static void CurrentBYOBRequestView(JSContext* aCx, ReadableByteStreamController& aController, JS::MutableHandle aRetVal, ErrorResult& aRv) { // Step 1. Assert: stream.[[controller]] implements // ReadableByteStreamController. (implicit) // Step 2: Let byobRequest be ! // ReadableByteStreamControllerGetBYOBRequest(stream.[[controller]]). RefPtr byobRequest = ReadableByteStreamControllerGetBYOBRequest(aCx, &aController, aRv); // Step 3: If byobRequest is null, then return null. if (!byobRequest) { aRetVal.set(nullptr); return; } // Step 4: Return byobRequest.[[view]]. byobRequest->GetView(aCx, aRetVal); } static bool HasSameBufferView(JSContext* aCx, JS::Handle aX, JS::Handle aY, ErrorResult& aRv) { bool isShared; JS::Rooted viewedBufferX( aCx, JS_GetArrayBufferViewBuffer(aCx, aX, &isShared)); if (!viewedBufferX) { aRv.StealExceptionFromJSContext(aCx); return false; } JS::Rooted viewedBufferY( aCx, JS_GetArrayBufferViewBuffer(aCx, aY, &isShared)); if (!viewedBufferY) { aRv.StealExceptionFromJSContext(aCx); return false; } return viewedBufferX == viewedBufferY; } // https://streams.spec.whatwg.org/#readablestream-enqueue void ReadableStream::EnqueueNative(JSContext* aCx, JS::Handle aChunk, ErrorResult& aRv) { MOZ_ASSERT(mController->GetAlgorithms()->IsNative()); // Step 1: If stream.[[controller]] implements // ReadableStreamDefaultController, if (mController->IsDefault()) { // Step 1.1: Perform ! // ReadableStreamDefaultControllerEnqueue(stream.[[controller]], chunk). RefPtr controller = mController->AsDefault(); ReadableStreamDefaultControllerEnqueue(aCx, controller, aChunk, aRv); return; } // Step 2.1: Assert: stream.[[controller]] implements // ReadableByteStreamController. MOZ_ASSERT(mController->IsByte()); RefPtr controller = mController->AsByte(); // Step 2.2: Assert: chunk is an ArrayBufferView. MOZ_ASSERT(aChunk.isObject() && JS_IsArrayBufferViewObject(&aChunk.toObject())); JS::Rooted chunk(aCx, &aChunk.toObject()); // Step 3: Let byobView be the current BYOB request view for stream. JS::Rooted byobView(aCx); CurrentBYOBRequestView(aCx, *controller, &byobView, aRv); if (aRv.Failed()) { return; } // Step 4: If byobView is non-null, and chunk.[[ViewedArrayBuffer]] is // byobView.[[ViewedArrayBuffer]], then: if (byobView && HasSameBufferView(aCx, chunk, byobView, aRv)) { // Step 4.1: Assert: chunk.[[ByteOffset]] is byobView.[[ByteOffset]]. MOZ_ASSERT(JS_GetArrayBufferViewByteOffset(chunk) == JS_GetArrayBufferViewByteOffset(byobView)); // Step 4.2: Assert: chunk.[[ByteLength]] ≤ byobView.[[ByteLength]]. MOZ_ASSERT(JS_GetArrayBufferViewByteLength(chunk) == JS_GetArrayBufferViewByteLength(byobView)); // Step 4.3: Perform ? // ReadableByteStreamControllerRespond(stream.[[controller]], // chunk.[[ByteLength]]). ReadableByteStreamControllerRespond( aCx, controller, JS_GetArrayBufferViewByteLength(chunk), aRv); return; } if (aRv.Failed()) { return; } // Step 5: Otherwise, perform ? // ReadableByteStreamControllerEnqueue(stream.[[controller]], chunk). ReadableByteStreamControllerEnqueue(aCx, controller, chunk, aRv); } already_AddRefed ReadableStream::Create( JSContext* aCx, nsIGlobalObject* aGlobal, BodyStreamHolder* aUnderlyingSource, ErrorResult& aRv) { RefPtr stream = new ReadableStream(aGlobal); SetUpReadableByteStreamControllerFromBodyStreamUnderlyingSource( aCx, stream, aUnderlyingSource, aRv); if (aRv.Failed()) { return nullptr; } // Step 5. Return stream. return stream.forget(); } } // namespace mozilla::dom