summaryrefslogtreecommitdiffstats
path: root/dom/fetch/FetchStreamReader.cpp
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--dom/fetch/FetchStreamReader.cpp441
1 files changed, 441 insertions, 0 deletions
diff --git a/dom/fetch/FetchStreamReader.cpp b/dom/fetch/FetchStreamReader.cpp
new file mode 100644
index 0000000000..de5a2cfefe
--- /dev/null
+++ b/dom/fetch/FetchStreamReader.cpp
@@ -0,0 +1,441 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: set ts=8 sts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#include "FetchStreamReader.h"
+#include "InternalResponse.h"
+#include "mozilla/ConsoleReportCollector.h"
+#include "mozilla/ErrorResult.h"
+#include "mozilla/StaticAnalysisFunctions.h"
+#include "mozilla/dom/AutoEntryScript.h"
+#include "mozilla/dom/Promise.h"
+#include "mozilla/dom/PromiseBinding.h"
+#include "mozilla/dom/ReadableStream.h"
+#include "mozilla/dom/ReadableStreamDefaultController.h"
+#include "mozilla/dom/ReadableStreamDefaultReader.h"
+#include "mozilla/dom/WorkerPrivate.h"
+#include "mozilla/dom/WorkerRef.h"
+#include "mozilla/HoldDropJSObjects.h"
+#include "mozilla/TaskCategory.h"
+#include "nsContentUtils.h"
+#include "nsDebug.h"
+#include "nsIAsyncInputStream.h"
+#include "nsIPipe.h"
+#include "nsIScriptError.h"
+#include "nsPIDOMWindow.h"
+#include "jsapi.h"
+
+namespace mozilla::dom {
+
+NS_IMPL_CYCLE_COLLECTING_ADDREF(FetchStreamReader)
+NS_IMPL_CYCLE_COLLECTING_RELEASE(FetchStreamReader)
+
+NS_IMPL_CYCLE_COLLECTION_CLASS(FetchStreamReader)
+
+NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN(FetchStreamReader)
+ NS_IMPL_CYCLE_COLLECTION_UNLINK(mGlobal)
+ NS_IMPL_CYCLE_COLLECTION_UNLINK(mReader)
+NS_IMPL_CYCLE_COLLECTION_UNLINK_END
+
+NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN(FetchStreamReader)
+ NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mGlobal)
+ NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mReader)
+NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
+
+NS_IMPL_CYCLE_COLLECTION_TRACE_BEGIN(FetchStreamReader)
+NS_IMPL_CYCLE_COLLECTION_TRACE_END
+
+NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(FetchStreamReader)
+ NS_INTERFACE_MAP_ENTRY(nsIOutputStreamCallback)
+ NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIOutputStreamCallback)
+NS_INTERFACE_MAP_END
+
+/* static */
+nsresult FetchStreamReader::Create(JSContext* aCx, nsIGlobalObject* aGlobal,
+ FetchStreamReader** aStreamReader,
+ nsIInputStream** aInputStream) {
+ MOZ_ASSERT(aCx);
+ MOZ_ASSERT(aGlobal);
+ MOZ_ASSERT(aStreamReader);
+ MOZ_ASSERT(aInputStream);
+
+ RefPtr<FetchStreamReader> streamReader = new FetchStreamReader(aGlobal);
+
+ nsCOMPtr<nsIAsyncInputStream> pipeIn;
+
+ NS_NewPipe2(getter_AddRefs(pipeIn), getter_AddRefs(streamReader->mPipeOut),
+ true, true, 0, 0);
+
+ if (!NS_IsMainThread()) {
+ WorkerPrivate* workerPrivate = GetWorkerPrivateFromContext(aCx);
+ MOZ_ASSERT(workerPrivate);
+
+ RefPtr<StrongWorkerRef> workerRef = StrongWorkerRef::Create(
+ workerPrivate, "FetchStreamReader", [streamReader]() {
+ MOZ_ASSERT(streamReader);
+
+ // mAsyncWaitWorkerRef may keep the (same) StrongWorkerRef alive even
+ // when mWorkerRef has already been nulled out by a previous call to
+ // CloseAndRelease, we can just safely ignore this callback then
+ // (as would the CloseAndRelease do on a second call).
+ if (streamReader->mWorkerRef) {
+ streamReader->CloseAndRelease(
+ streamReader->mWorkerRef->Private()->GetJSContext(),
+ NS_ERROR_DOM_INVALID_STATE_ERR);
+ } else {
+ MOZ_DIAGNOSTIC_ASSERT(streamReader->mAsyncWaitWorkerRef);
+ }
+ });
+
+ if (NS_WARN_IF(!workerRef)) {
+ streamReader->mPipeOut->CloseWithStatus(NS_ERROR_DOM_INVALID_STATE_ERR);
+ return NS_ERROR_DOM_INVALID_STATE_ERR;
+ }
+
+ // These 2 objects create a ref-cycle here that is broken when the stream is
+ // closed or the worker shutsdown.
+ streamReader->mWorkerRef = std::move(workerRef);
+ }
+
+ pipeIn.forget(aInputStream);
+ streamReader.forget(aStreamReader);
+ return NS_OK;
+}
+
+FetchStreamReader::FetchStreamReader(nsIGlobalObject* aGlobal)
+ : mGlobal(aGlobal),
+ mOwningEventTarget(mGlobal->EventTargetFor(TaskCategory::Other)) {
+ MOZ_ASSERT(aGlobal);
+
+ mozilla::HoldJSObjects(this);
+}
+
+FetchStreamReader::~FetchStreamReader() {
+ CloseAndRelease(nullptr, NS_BASE_STREAM_CLOSED);
+
+ mozilla::DropJSObjects(this);
+}
+
+// If a context is provided, an attempt will be made to cancel the reader. The
+// only situation where we don't expect to have a context is when closure is
+// being triggered from the destructor or the WorkerRef is notifying. If
+// we're at the destructor, it's far too late to cancel anything. And if the
+// WorkerRef is being notified, the global is going away, so there's also
+// no need to do further JS work.
+void FetchStreamReader::CloseAndRelease(JSContext* aCx, nsresult aStatus) {
+ NS_ASSERT_OWNINGTHREAD(FetchStreamReader);
+
+ if (mStreamClosed) {
+ // Already closed.
+ return;
+ }
+
+ RefPtr<FetchStreamReader> kungFuDeathGrip = this;
+ if (aCx && mReader) {
+ ErrorResult rv;
+ if (aStatus == NS_ERROR_DOM_WRONG_TYPE_ERR) {
+ rv.ThrowTypeError<MSG_FETCH_BODY_WRONG_TYPE>();
+ } else {
+ rv = aStatus;
+ }
+ JS::Rooted<JS::Value> errorValue(aCx);
+ if (ToJSValue(aCx, std::move(rv), &errorValue)) {
+ IgnoredErrorResult ignoredError;
+ // It's currently safe to cancel an already closed reader because, per the
+ // comments in ReadableStream::cancel() conveying the spec, step 2 of
+ // 3.4.3 that specified ReadableStreamCancel is: If stream.[[state]] is
+ // "closed", return a new promise resolved with undefined.
+ RefPtr<Promise> cancelResultPromise =
+ MOZ_KnownLive(mReader)->Cancel(aCx, errorValue, ignoredError);
+ NS_WARNING_ASSERTION(!ignoredError.Failed(),
+ "Failed to cancel stream during close and release");
+ if (cancelResultPromise) {
+ bool setHandled = cancelResultPromise->SetAnyPromiseIsHandled();
+ NS_WARNING_ASSERTION(setHandled,
+ "Failed to mark cancel promise as handled.");
+ (void)setHandled;
+ }
+ }
+
+ // We don't want to propagate exceptions during the cleanup.
+ JS_ClearPendingException(aCx);
+ }
+
+ mStreamClosed = true;
+
+ mGlobal = nullptr;
+
+ if (mPipeOut) {
+ mPipeOut->CloseWithStatus(aStatus);
+ }
+ mPipeOut = nullptr;
+
+ mWorkerRef = nullptr;
+
+ mReader = nullptr;
+ mBuffer.Clear();
+}
+
+// https://fetch.spec.whatwg.org/#body-incrementally-read
+void FetchStreamReader::StartConsuming(JSContext* aCx, ReadableStream* aStream,
+ ErrorResult& aRv) {
+ MOZ_DIAGNOSTIC_ASSERT(!mReader);
+ MOZ_DIAGNOSTIC_ASSERT(aStream);
+
+ // Step 2: Let reader be the result of getting a reader for body’s stream.
+ RefPtr<ReadableStreamDefaultReader> reader = aStream->GetReader(aRv);
+ if (aRv.Failed()) {
+ CloseAndRelease(aCx, NS_ERROR_DOM_INVALID_STATE_ERR);
+ return;
+ }
+
+ mReader = reader;
+
+ mAsyncWaitWorkerRef = mWorkerRef;
+ aRv = mPipeOut->AsyncWait(this, 0, 0, mOwningEventTarget);
+ if (NS_WARN_IF(aRv.Failed())) {
+ mAsyncWaitWorkerRef = nullptr;
+ CloseAndRelease(aCx, NS_ERROR_DOM_INVALID_STATE_ERR);
+ }
+}
+
+struct FetchReadRequest : public ReadRequest {
+ public:
+ NS_DECL_ISUPPORTS_INHERITED
+ NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(FetchReadRequest, ReadRequest)
+
+ explicit FetchReadRequest(FetchStreamReader* aReader)
+ : mFetchStreamReader(aReader) {}
+
+ MOZ_CAN_RUN_SCRIPT_BOUNDARY
+ void ChunkSteps(JSContext* aCx, JS::Handle<JS::Value> aChunk,
+ ErrorResult& aRv) override {
+ mFetchStreamReader->ChunkSteps(aCx, aChunk, aRv);
+ }
+
+ MOZ_CAN_RUN_SCRIPT_BOUNDARY
+ void CloseSteps(JSContext* aCx, ErrorResult& aRv) override {
+ mFetchStreamReader->CloseSteps(aCx, aRv);
+ }
+
+ MOZ_CAN_RUN_SCRIPT_BOUNDARY
+ void ErrorSteps(JSContext* aCx, JS::Handle<JS::Value> aError,
+ ErrorResult& aRv) override {
+ mFetchStreamReader->ErrorSteps(aCx, aError, aRv);
+ }
+
+ protected:
+ virtual ~FetchReadRequest() = default;
+
+ MOZ_KNOWN_LIVE RefPtr<FetchStreamReader> mFetchStreamReader;
+};
+
+NS_IMPL_CYCLE_COLLECTION_INHERITED(FetchReadRequest, ReadRequest,
+ mFetchStreamReader)
+NS_IMPL_ADDREF_INHERITED(FetchReadRequest, ReadRequest)
+NS_IMPL_RELEASE_INHERITED(FetchReadRequest, ReadRequest)
+NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(FetchReadRequest)
+NS_INTERFACE_MAP_END_INHERITING(ReadRequest)
+
+// nsIOutputStreamCallback interface
+MOZ_CAN_RUN_SCRIPT_BOUNDARY
+NS_IMETHODIMP
+FetchStreamReader::OnOutputStreamReady(nsIAsyncOutputStream* aStream) {
+ NS_ASSERT_OWNINGTHREAD(FetchStreamReader);
+ if (mStreamClosed) {
+ mAsyncWaitWorkerRef = nullptr;
+ return NS_OK;
+ }
+
+ AutoEntryScript aes(mGlobal, "ReadableStreamReader.read", !mWorkerRef);
+ if (!Process(aes.cx())) {
+ // We're done processing data, and haven't queued up a new AsyncWait - we
+ // can clear our mAsyncWaitWorkerRef.
+ mAsyncWaitWorkerRef = nullptr;
+ }
+ return NS_OK;
+}
+
+bool FetchStreamReader::Process(JSContext* aCx) {
+ NS_ASSERT_OWNINGTHREAD(FetchStreamReader);
+ MOZ_ASSERT(mReader);
+
+ if (!mBuffer.IsEmpty()) {
+ nsresult rv = WriteBuffer();
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ CloseAndRelease(aCx, NS_ERROR_DOM_ABORT_ERR);
+ return false;
+ }
+ return true;
+ }
+
+ // Check if the output stream has already been closed. This lets us propagate
+ // errors eagerly, and detect output stream closures even when we have no data
+ // to write.
+ if (NS_WARN_IF(NS_FAILED(mPipeOut->StreamStatus()))) {
+ CloseAndRelease(aCx, NS_ERROR_DOM_ABORT_ERR);
+ return false;
+ }
+
+ // We're waiting on new data - set up a WAIT_CLOSURE_ONLY callback so we
+ // notice if the reader closes.
+ nsresult rv = mPipeOut->AsyncWait(
+ this, nsIAsyncOutputStream::WAIT_CLOSURE_ONLY, 0, mOwningEventTarget);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ CloseAndRelease(aCx, NS_ERROR_DOM_INVALID_STATE_ERR);
+ return false;
+ }
+
+ // If we already have an outstanding read request, don't start another one
+ // concurrently.
+ if (!mHasOutstandingReadRequest) {
+ // https://fetch.spec.whatwg.org/#incrementally-read-loop
+ // The below very loosely tries to implement the incrementally-read-loop
+ // from the fetch spec.
+ // Step 2: Read a chunk from reader given readRequest.
+ RefPtr<ReadRequest> readRequest = new FetchReadRequest(this);
+ RefPtr<ReadableStreamDefaultReader> reader = mReader;
+ mHasOutstandingReadRequest = true;
+
+ IgnoredErrorResult err;
+ reader->ReadChunk(aCx, *readRequest, err);
+ if (NS_WARN_IF(err.Failed())) {
+ // Let's close the stream.
+ mHasOutstandingReadRequest = false;
+ CloseAndRelease(aCx, NS_ERROR_DOM_INVALID_STATE_ERR);
+ // Don't return false, as we've already called `AsyncWait`.
+ }
+ }
+ return true;
+}
+
+void FetchStreamReader::ChunkSteps(JSContext* aCx, JS::Handle<JS::Value> aChunk,
+ ErrorResult& aRv) {
+ // This roughly implements the chunk steps from
+ // https://fetch.spec.whatwg.org/#incrementally-read-loop.
+
+ mHasOutstandingReadRequest = false;
+
+ // Step 2. If chunk is not a Uint8Array object, then set continueAlgorithm to
+ // this step: run processBodyError given a TypeError.
+ RootedSpiderMonkeyInterface<Uint8Array> chunk(aCx);
+ if (!aChunk.isObject() || !chunk.Init(&aChunk.toObject())) {
+ CloseAndRelease(aCx, NS_ERROR_DOM_WRONG_TYPE_ERR);
+ return;
+ }
+ chunk.ComputeState();
+
+ MOZ_DIAGNOSTIC_ASSERT(mBuffer.IsEmpty());
+
+ // Let's take a copy of the data.
+ // FIXME: We could sometimes avoid this copy by trying to write `chunk`
+ // directly into `mPipeOut` eagerly, and only filling `mBuffer` if there isn't
+ // enough space in the pipe's buffer.
+ if (!mBuffer.AppendElements(chunk.Data(), chunk.Length(), fallible)) {
+ CloseAndRelease(aCx, NS_ERROR_OUT_OF_MEMORY);
+ return;
+ }
+
+ mBufferOffset = 0;
+ mBufferRemaining = chunk.Length();
+
+ nsresult rv = WriteBuffer();
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ CloseAndRelease(aCx, NS_ERROR_DOM_ABORT_ERR);
+ }
+}
+
+void FetchStreamReader::CloseSteps(JSContext* aCx, ErrorResult& aRv) {
+ mHasOutstandingReadRequest = false;
+ CloseAndRelease(aCx, NS_BASE_STREAM_CLOSED);
+}
+
+void FetchStreamReader::ErrorSteps(JSContext* aCx, JS::Handle<JS::Value> aError,
+ ErrorResult& aRv) {
+ mHasOutstandingReadRequest = false;
+ ReportErrorToConsole(aCx, aError);
+ CloseAndRelease(aCx, NS_ERROR_FAILURE);
+}
+
+nsresult FetchStreamReader::WriteBuffer() {
+ MOZ_ASSERT(mBuffer.Length() == (mBufferOffset + mBufferRemaining));
+
+ char* data = reinterpret_cast<char*>(mBuffer.Elements());
+
+ while (mBufferRemaining > 0) {
+ uint32_t written = 0;
+ nsresult rv =
+ mPipeOut->Write(data + mBufferOffset, mBufferRemaining, &written);
+
+ if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
+ break;
+ }
+
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ MOZ_ASSERT(written <= mBufferRemaining);
+ mBufferRemaining -= written;
+ mBufferOffset += written;
+
+ if (mBufferRemaining == 0) {
+ mBuffer.Clear();
+ break;
+ }
+ }
+
+ nsresult rv = mPipeOut->AsyncWait(this, 0, 0, mOwningEventTarget);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ return NS_OK;
+}
+
+void FetchStreamReader::ReportErrorToConsole(JSContext* aCx,
+ JS::Handle<JS::Value> aValue) {
+ nsCString sourceSpec;
+ uint32_t line = 0;
+ uint32_t column = 0;
+ nsString valueString;
+
+ nsContentUtils::ExtractErrorValues(aCx, aValue, sourceSpec, &line, &column,
+ valueString);
+
+ nsTArray<nsString> params;
+ params.AppendElement(valueString);
+
+ RefPtr<ConsoleReportCollector> reporter = new ConsoleReportCollector();
+ reporter->AddConsoleReport(nsIScriptError::errorFlag,
+ "ReadableStreamReader.read"_ns,
+ nsContentUtils::eDOM_PROPERTIES, sourceSpec, line,
+ column, "ReadableStreamReadingFailed"_ns, params);
+
+ uint64_t innerWindowId = 0;
+
+ if (NS_IsMainThread()) {
+ nsCOMPtr<nsPIDOMWindowInner> window = do_QueryInterface(mGlobal);
+ if (window) {
+ innerWindowId = window->WindowID();
+ }
+ reporter->FlushReportsToConsole(innerWindowId);
+ return;
+ }
+
+ WorkerPrivate* workerPrivate = GetWorkerPrivateFromContext(aCx);
+ if (workerPrivate) {
+ innerWindowId = workerPrivate->WindowID();
+ }
+
+ RefPtr<Runnable> r = NS_NewRunnableFunction(
+ "FetchStreamReader::ReportErrorToConsole", [reporter, innerWindowId]() {
+ reporter->FlushReportsToConsole(innerWindowId);
+ });
+
+ workerPrivate->DispatchToMainThread(r.forget());
+}
+
+} // namespace mozilla::dom