/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- * vim: set ts=8 sts=2 et sw=2 tw=80: * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ /* General readable stream abstract operations. */ #include "builtin/streams/ReadableStreamOperations.h" #include "mozilla/Assertions.h" // MOZ_ASSERT{,_IF} #include "mozilla/Attributes.h" // MOZ_MUST_USE #include "builtin/Array.h" // js::NewDenseFullyAllocatedArray #include "builtin/Promise.h" // js::RejectPromiseWithPendingError #include "builtin/streams/PipeToState.h" // js::PipeToState #include "builtin/streams/ReadableStream.h" // js::ReadableStream #include "builtin/streams/ReadableStreamController.h" // js::ReadableStream{,Default}Controller #include "builtin/streams/ReadableStreamDefaultControllerOperations.h" // js::ReadableStreamDefaultController{Close,Enqueue}, js::ReadableStreamControllerError, js::SourceAlgorithms #include "builtin/streams/ReadableStreamInternals.h" // js::ReadableStreamCancel #include "builtin/streams/ReadableStreamReader.h" // js::CreateReadableStreamDefaultReader, js::ForAuthorCodeBool, js::ReadableStream{,Default}Reader, js::ReadableStreamDefaultReaderRead #include "builtin/streams/TeeState.h" // js::TeeState #include "js/CallArgs.h" // JS::CallArgs{,FromVp} #include "js/Promise.h" // JS::CallOriginalPromiseThen, JS::AddPromiseReactions #include "js/RootingAPI.h" // JS::{,Mutable}Handle, JS::Rooted #include "js/Value.h" // JS::Value, JS::UndefinedHandleValue #include "vm/JSContext.h" // JSContext #include "vm/NativeObject.h" // js::NativeObject #include "vm/ObjectOperations.h" // js::GetProperty #include "vm/PromiseObject.h" // js::PromiseObject, js::PromiseResolvedWithUndefined #include "builtin/HandlerFunction-inl.h" // js::NewHandler, js::TargetFromHandler #include "builtin/streams/MiscellaneousOperations-inl.h" // js::ResolveUnwrappedPromiseWithValue #include "builtin/streams/ReadableStreamReader-inl.h" // js::UnwrapReaderFromStream #include "vm/Compartment-inl.h" // JS::Compartment::wrap, js::Unwrap{Callee,Internal}Slot #include "vm/JSContext-inl.h" // JSContext::check #include "vm/JSObject-inl.h" // js::IsCallable, js::NewObjectWithClassProto #include "vm/Realm-inl.h" // js::AutoRealm using js::IsCallable; using js::NewHandler; using js::NewObjectWithClassProto; using js::PromiseObject; using js::ReadableStream; using js::ReadableStreamDefaultController; using js::ReadableStreamDefaultControllerEnqueue; using js::ReadableStreamDefaultReader; using js::ReadableStreamReader; using js::SourceAlgorithms; using js::TargetFromHandler; using js::TeeState; using js::UnwrapCalleeSlot; using JS::CallArgs; using JS::CallArgsFromVp; using JS::Handle; using JS::MutableHandle; using JS::ObjectValue; using JS::Rooted; using JS::UndefinedHandleValue; using JS::Value; /*** 3.4. General readable stream abstract operations ***********************/ // Streams spec, 3.4.1. AcquireReadableStreamBYOBReader ( stream ) // Always inlined. // Streams spec, 3.4.2. AcquireReadableStreamDefaultReader ( stream ) // Always inlined. See CreateReadableStreamDefaultReader. /** * Streams spec, 3.4.3. CreateReadableStream ( * startAlgorithm, pullAlgorithm, cancelAlgorithm * [, highWaterMark [, sizeAlgorithm ] ] ) * * The start/pull/cancelAlgorithm arguments are represented instead as four * arguments: sourceAlgorithms, underlyingSource, pullMethod, cancelMethod. * See the comment on SetUpReadableStreamDefaultController. */ static MOZ_MUST_USE ReadableStream* CreateReadableStream( JSContext* cx, SourceAlgorithms sourceAlgorithms, Handle underlyingSource, Handle pullMethod = UndefinedHandleValue, Handle cancelMethod = UndefinedHandleValue, double highWaterMark = 1, Handle sizeAlgorithm = UndefinedHandleValue, Handle proto = nullptr) { cx->check(underlyingSource, sizeAlgorithm, proto); MOZ_ASSERT(sizeAlgorithm.isUndefined() || IsCallable(sizeAlgorithm)); // Step 1: If highWaterMark was not passed, set it to 1 (implicit). // Step 2: If sizeAlgorithm was not passed, set it to an algorithm that // returns 1 (implicit). // Step 3: Assert: ! IsNonNegativeNumber(highWaterMark) is true. MOZ_ASSERT(highWaterMark >= 0); // Step 4: Let stream be ObjectCreate(the original value of ReadableStream's // prototype property). // Step 5: Perform ! InitializeReadableStream(stream). Rooted stream(cx, ReadableStream::create(cx, nullptr, proto)); if (!stream) { return nullptr; } // Step 6: Let controller be ObjectCreate(the original value of // ReadableStreamDefaultController's prototype property). // Step 7: Perform ? SetUpReadableStreamDefaultController(stream, // controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, // highWaterMark, sizeAlgorithm). if (!SetUpReadableStreamDefaultController( cx, stream, sourceAlgorithms, underlyingSource, pullMethod, cancelMethod, highWaterMark, sizeAlgorithm)) { return nullptr; } // Step 8: Return stream. return stream; } // Streams spec, 3.4.4. CreateReadableByteStream ( // startAlgorithm, pullAlgorithm, cancelAlgorithm // [, highWaterMark [, autoAllocateChunkSize ] ] ) // Not implemented. /** * Streams spec, 3.4.5. InitializeReadableStream ( stream ) */ /* static */ MOZ_MUST_USE ReadableStream* ReadableStream::create( JSContext* cx, void* nsISupportsObject_alreadyAddreffed /* = nullptr */, Handle proto /* = nullptr */) { // In the spec, InitializeReadableStream is always passed a newly created // ReadableStream object. We instead create it here and return it below. Rooted stream( cx, NewObjectWithClassProto(cx, proto)); if (!stream) { return nullptr; } stream->setPrivate(nsISupportsObject_alreadyAddreffed); // Step 1: Set stream.[[state]] to "readable". stream->initStateBits(Readable); MOZ_ASSERT(stream->readable()); // Step 2: Set stream.[[reader]] and stream.[[storedError]] to // undefined (implicit). MOZ_ASSERT(!stream->hasReader()); MOZ_ASSERT(stream->storedError().isUndefined()); // Step 3: Set stream.[[disturbed]] to false (done in step 1). MOZ_ASSERT(!stream->disturbed()); return stream; } // Streams spec, 3.4.6. IsReadableStream ( x ) // Using UnwrapAndTypeCheck templates instead. // Streams spec, 3.4.7. IsReadableStreamDisturbed ( stream ) // Using stream->disturbed() instead. /** * Streams spec, 3.4.8. IsReadableStreamLocked ( stream ) */ bool ReadableStream::locked() const { // Step 1: Assert: ! IsReadableStream(stream) is true (implicit). // Step 2: If stream.[[reader]] is undefined, return false. // Step 3: Return true. // Special-casing for streams with external sources. Those can be locked // explicitly via JSAPI, which is indicated by a controller flag. // IsReadableStreamLocked is called from the controller's constructor, at // which point we can't yet call stream->controller(), but the source also // can't be locked yet. if (hasController() && controller()->sourceLocked()) { return true; } return hasReader(); } // Streams spec, 3.4.9. IsReadableStreamAsyncIterator ( x ) // // Not implemented. /** * Streams spec, 3.4.10. ReadableStreamTee steps 12.c.i-x. */ static bool TeeReaderReadHandler(JSContext* cx, unsigned argc, Value* vp) { CallArgs args = CallArgsFromVp(argc, vp); Rooted unwrappedTeeState(cx, UnwrapCalleeSlot(cx, args, 0)); if (!unwrappedTeeState) { return false; } Handle resultVal = args.get(0); // Step 12.c.i: Set reading to false. unwrappedTeeState->unsetReading(); // Step 12.c.ii: Assert: Type(result) is Object. Rooted result(cx, &resultVal.toObject()); bool done; { // Step 12.c.iii: Let done be ? Get(result, "done"). // (This can fail only if `result` was nuked.) Rooted doneVal(cx); if (!GetProperty(cx, result, result, cx->names().done, &doneVal)) { return false; } // Step 12.c.iv: Assert: Type(done) is Boolean. done = doneVal.toBoolean(); } // Step 12.c.v: If done is true, if (done) { // Step 12.c.v.1: If canceled1 is false, if (!unwrappedTeeState->canceled1()) { // Step 12.c.v.1.a: Perform // ! ReadableStreamDefaultControllerClose( // branch1.[[readableStreamController]]). Rooted unwrappedBranch1( cx, unwrappedTeeState->branch1()); if (!ReadableStreamDefaultControllerClose(cx, unwrappedBranch1)) { return false; } } // Step 12.c.v.2: If canceled2 is false, if (!unwrappedTeeState->canceled2()) { // Step 12.c.v.2.a: Perform // ! ReadableStreamDefaultControllerClose( // branch2.[[readableStreamController]]). Rooted unwrappedBranch2( cx, unwrappedTeeState->branch2()); if (!ReadableStreamDefaultControllerClose(cx, unwrappedBranch2)) { return false; } } args.rval().setUndefined(); return true; } // Step 12.c.vi: Let value be ! Get(result, "value"). // (This can fail only if `result` was nuked.) Rooted value(cx); if (!GetProperty(cx, result, result, cx->names().value, &value)) { return false; } // Step 12.c.vii: Let value1 and value2 be value. // Step 12.c.viii: If canceled2 is false and cloneForBranch2 is true, set // value2 to // ? StructuredDeserialize(? StructuredSerialize(value2), // the current Realm Record). // We don't yet support any specifications that use cloneForBranch2, and // the Streams spec doesn't offer any way for author code to enable it, // so it's always false here. auto& value1 = value; MOZ_ASSERT(!unwrappedTeeState->cloneForBranch2(), "support for cloneForBranch2=true is not yet implemented"); auto& value2 = value; Rooted unwrappedController(cx); // Step 12.c.ix: If canceled1 is false, perform // ? ReadableStreamDefaultControllerEnqueue( // branch1.[[readableStreamController]], value1). if (!unwrappedTeeState->canceled1()) { unwrappedController = unwrappedTeeState->branch1(); if (!ReadableStreamDefaultControllerEnqueue(cx, unwrappedController, value1)) { return false; } } // Step 12.c.x: If canceled2 is false, perform // ? ReadableStreamDefaultControllerEnqueue( // branch2.[[readableStreamController]], value2). if (!unwrappedTeeState->canceled2()) { unwrappedController = unwrappedTeeState->branch2(); if (!ReadableStreamDefaultControllerEnqueue(cx, unwrappedController, value2)) { return false; } } args.rval().setUndefined(); return true; } /** * Streams spec, 3.4.10. ReadableStreamTee step 12, "Let pullAlgorithm be the * following steps:" */ MOZ_MUST_USE PromiseObject* js::ReadableStreamTee_Pull( JSContext* cx, JS::Handle unwrappedTeeState) { // Combine step 12.a/12.e far below, and handle steps 12.b-12.d after // inverting step 12.a's "If reading is true" condition. if (!unwrappedTeeState->reading()) { // Step 12.b: Set reading to true. unwrappedTeeState->setReading(); // Implicit in the spec: Unpack `reader` from the TeeState (by way of the // stream stored in one of its slots). Rooted unwrappedReader(cx); { Rooted unwrappedStream( cx, UnwrapInternalSlot(cx, unwrappedTeeState, TeeState::Slot_Stream)); if (!unwrappedStream) { return nullptr; } ReadableStreamReader* unwrappedReaderObj = UnwrapReaderFromStream(cx, unwrappedStream); if (!unwrappedReaderObj) { return nullptr; } unwrappedReader = &unwrappedReaderObj->as(); } // Step 12.c: Let readPromise be the result of reacting to // ! ReadableStreamDefaultReaderRead(reader) with the following // fulfillment steps given the argument result: [...] // Step 12.d: Set readPromise.[[PromiseIsHandled]] to true. // First, perform |ReadableStreamDefaultReaderRead(reader)|. Rooted readerReadResultPromise( cx, js::ReadableStreamDefaultReaderRead(cx, unwrappedReader)); if (!readerReadResultPromise) { return nullptr; } // Next, create a function to perform the fulfillment steps under step 12.c // (implemented in the |TeeReaderReadHandler| C++ function). Rooted teeState(cx, unwrappedTeeState); if (!cx->compartment()->wrap(cx, &teeState)) { return nullptr; } Rooted onFulfilled( cx, NewHandler(cx, TeeReaderReadHandler, teeState)); if (!onFulfilled) { return nullptr; } // Finally, perform those fulfillment steps when |readerReadResultPromise| // fulfills. (Step 12.c doesn't provide rejection steps, so don't handle // rejection.) // // The spec's |readPromise| promise is unobservable, so implement this using // a JSAPI function that acts as if it created |readPromise| but doesn't // actually do so. // // Step 12.d causes |readPromise| to be treated as handled, even if it // rejects. Use |JS::AddPromiseReactionsIgnoringUnhandledRejection|, not // |JS::AddPromiseReactions|, to avoid reporting a freshly-consed-up promise // as rejected if |readerReadResultPromise| rejects. if (!JS::AddPromiseReactionsIgnoringUnhandledRejection( cx, readerReadResultPromise, onFulfilled, nullptr)) { return nullptr; } } // Step 12.a: (If reading is true,) return a promise resolved with undefined. // Step 12.e: Return a promise resolved with undefined. return PromiseResolvedWithUndefined(cx); } /** * Cancel one branch of a tee'd stream with the given |reason_|. * * Streams spec, 3.4.10. ReadableStreamTee steps 13 and 14: "Let * cancel1Algorithm/cancel2Algorithm be the following steps, taking a reason * argument:" */ MOZ_MUST_USE JSObject* js::ReadableStreamTee_Cancel( JSContext* cx, JS::Handle unwrappedTeeState, JS::Handle unwrappedBranch, JS::Handle reason) { Rooted unwrappedStream( cx, UnwrapInternalSlot(cx, unwrappedTeeState, TeeState::Slot_Stream)); if (!unwrappedStream) { return nullptr; } bool bothBranchesCanceled = false; // Step 13/14.a: Set canceled1/canceled2 to true. // Step 13/14.b: Set reason1/reason2 to reason. { AutoRealm ar(cx, unwrappedTeeState); Rooted unwrappedReason(cx, reason); if (!cx->compartment()->wrap(cx, &unwrappedReason)) { return nullptr; } if (unwrappedBranch->isTeeBranch1()) { unwrappedTeeState->setCanceled1(unwrappedReason); bothBranchesCanceled = unwrappedTeeState->canceled2(); } else { MOZ_ASSERT(unwrappedBranch->isTeeBranch2()); unwrappedTeeState->setCanceled2(unwrappedReason); bothBranchesCanceled = unwrappedTeeState->canceled1(); } } Rooted unwrappedCancelPromise( cx, unwrappedTeeState->cancelPromise()); MOZ_ASSERT(unwrappedCancelPromise != nullptr); // Step 13/14.c: If canceled2/canceled1 is true, if (bothBranchesCanceled) { // Step 13/14.c.i: Let compositeReason be // ! CreateArrayFromList(« reason1, reason2 »). Rooted compositeReason(cx); { Rooted reason1(cx, unwrappedTeeState->reason1()); Rooted reason2(cx, unwrappedTeeState->reason2()); if (!cx->compartment()->wrap(cx, &reason1) || !cx->compartment()->wrap(cx, &reason2)) { return nullptr; } ArrayObject* reasonArray = NewDenseFullyAllocatedArray(cx, 2); if (!reasonArray) { return nullptr; } reasonArray->setDenseInitializedLength(2); reasonArray->initDenseElement(0, reason1); reasonArray->initDenseElement(1, reason2); compositeReason = ObjectValue(*reasonArray); } // Step 13/14.c.ii: Let cancelResult be // ! ReadableStreamCancel(stream, compositeReason). // In our implementation, this can fail with OOM. The best course then // is to reject cancelPromise with an OOM error. Rooted cancelResult( cx, js::ReadableStreamCancel(cx, unwrappedStream, compositeReason)); if (!cancelResult) { // Handle the OOM case mentioned above. AutoRealm ar(cx, unwrappedCancelPromise); if (!RejectPromiseWithPendingError(cx, unwrappedCancelPromise)) { return nullptr; } } else { // Step 13/14.c.iii: Resolve cancelPromise with cancelResult. Rooted cancelResultVal(cx, ObjectValue(*cancelResult)); if (!ResolveUnwrappedPromiseWithValue(cx, unwrappedCancelPromise, cancelResultVal)) { return nullptr; } } } // Step 13/14.d: Return cancelPromise. Rooted cancelPromise(cx, unwrappedCancelPromise); if (!cx->compartment()->wrap(cx, &cancelPromise)) { return nullptr; } return cancelPromise; } /** * Streams spec, 3.4.10. step 18: * Upon rejection of reader.[[closedPromise]] with reason r, */ static bool TeeReaderErroredHandler(JSContext* cx, unsigned argc, JS::Value* vp) { CallArgs args = CallArgsFromVp(argc, vp); Rooted teeState(cx, TargetFromHandler(args)); Handle reason = args.get(0); Rooted unwrappedBranchController(cx); // Step 18.a.i: Perform // ! ReadableStreamDefaultControllerError( // branch1.[[readableStreamController]], r). unwrappedBranchController = teeState->branch1(); if (!ReadableStreamControllerError(cx, unwrappedBranchController, reason)) { return false; } // Step a.ii: Perform // ! ReadableStreamDefaultControllerError( // branch2.[[readableStreamController]], r). unwrappedBranchController = teeState->branch2(); if (!ReadableStreamControllerError(cx, unwrappedBranchController, reason)) { return false; } args.rval().setUndefined(); return true; } /** * Streams spec, 3.4.10. ReadableStreamTee ( stream, cloneForBranch2 ) */ MOZ_MUST_USE bool js::ReadableStreamTee( JSContext* cx, JS::Handle unwrappedStream, bool cloneForBranch2, JS::MutableHandle branch1Stream, JS::MutableHandle branch2Stream) { // Step 1: Assert: ! IsReadableStream(stream) is true (implicit). // Step 2: Assert: Type(cloneForBranch2) is Boolean (implicit). // // The streams spec only ever passes |cloneForBranch2 = false|. It's expected // that external specs that pass |cloneForBranch2 = true| will at some point // come into existence, but we don't presently implement any such specs. MOZ_ASSERT(!cloneForBranch2, "support for cloneForBranch2=true is not yet implemented"); // Step 3: Let reader be ? AcquireReadableStreamDefaultReader(stream). Rooted reader( cx, CreateReadableStreamDefaultReader(cx, unwrappedStream, ForAuthorCodeBool::No)); if (!reader) { return false; } // Several algorithms close over the variables initialized in the next few // steps, so we allocate them in an object, the TeeState. The algorithms // also close over `stream` and `reader`, so TeeState gets a reference to // the stream. // // Step 4: Let reading be false. // Step 5: Let canceled1 be false. // Step 6: Let canceled2 be false. // Step 7: Let reason1 be undefined. // Step 8: Let reason2 be undefined. // Step 9: Let branch1 be undefined. // Step 10: Let branch2 be undefined. // Step 11: Let cancelPromise be a new promise. Rooted teeState(cx, TeeState::create(cx, unwrappedStream)); if (!teeState) { return false; } MOZ_ASSERT(!teeState->reading()); MOZ_ASSERT(!teeState->canceled1()); MOZ_ASSERT(!teeState->canceled2()); // Step 12: Let pullAlgorithm be the following steps: [...] // Step 13: Let cancel1Algorithm be the following steps: [...] // Step 14: Let cancel2Algorithm be the following steps: [...] // Step 15: Let startAlgorithm be an algorithm that returns undefined. // // Implicit. Our implementation does not use objects to represent // [[pullAlgorithm]], [[cancelAlgorithm]], and so on. Instead, we decide // which one to perform based on class checks. For example, our // implementation of ReadableStreamControllerCallPullIfNeeded checks // whether the stream's underlyingSource is a TeeState object. // Step 16: Set branch1 to // ! CreateReadableStream(startAlgorithm, pullAlgorithm, // cancel1Algorithm). Rooted underlyingSource(cx, ObjectValue(*teeState)); branch1Stream.set( CreateReadableStream(cx, SourceAlgorithms::Tee, underlyingSource)); if (!branch1Stream) { return false; } Rooted branch1(cx); branch1 = &branch1Stream->controller()->as(); branch1->setTeeBranch1(); teeState->setBranch1(branch1); // Step 17: Set branch2 to // ! CreateReadableStream(startAlgorithm, pullAlgorithm, // cancel2Algorithm). branch2Stream.set( CreateReadableStream(cx, SourceAlgorithms::Tee, underlyingSource)); if (!branch2Stream) { return false; } Rooted branch2(cx); branch2 = &branch2Stream->controller()->as(); branch2->setTeeBranch2(); teeState->setBranch2(branch2); // Step 18: Upon rejection of reader.[[closedPromise]] with reason r, [...] Rooted closedPromise(cx, reader->closedPromise()); Rooted onRejected( cx, NewHandler(cx, TeeReaderErroredHandler, teeState)); if (!onRejected) { return false; } if (!JS::AddPromiseReactions(cx, closedPromise, nullptr, onRejected)) { return false; } // Step 19: Return « branch1, branch2 ». return true; } /** * Streams spec, 3.4.10. * ReadableStreamPipeTo ( source, dest, preventClose, preventAbort, * preventCancel, signal ) */ PromiseObject* js::ReadableStreamPipeTo(JSContext* cx, Handle unwrappedSource, Handle unwrappedDest, bool preventClose, bool preventAbort, bool preventCancel, Handle signal) { cx->check(signal); // Step 1. Assert: ! IsReadableStream(source) is true. // Step 2. Assert: ! IsWritableStream(dest) is true. // Step 3. Assert: Type(preventClose) is Boolean, Type(preventAbort) is // Boolean, and Type(preventCancel) is Boolean. // (These are guaranteed by the type system.) // Step 12: Let promise be a new promise. // // We reorder this so that this promise can be rejected and returned in case // of internal error. Rooted promise(cx, PromiseObject::createSkippingExecutor(cx)); if (!promise) { return nullptr; } // Steps 4-11, 13-14. Rooted pipeToState( cx, PipeToState::create(cx, promise, unwrappedSource, unwrappedDest, preventClose, preventAbort, preventCancel, signal)); if (!pipeToState) { if (!RejectPromiseWithPendingError(cx, promise)) { return nullptr; } return promise; } // Step 15. return promise; }