summaryrefslogtreecommitdiffstats
path: root/dom/streams/UnderlyingSinkCallbackHelpers.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'dom/streams/UnderlyingSinkCallbackHelpers.cpp')
-rw-r--r--dom/streams/UnderlyingSinkCallbackHelpers.cpp274
1 files changed, 274 insertions, 0 deletions
diff --git a/dom/streams/UnderlyingSinkCallbackHelpers.cpp b/dom/streams/UnderlyingSinkCallbackHelpers.cpp
new file mode 100644
index 0000000000..91562a2db3
--- /dev/null
+++ b/dom/streams/UnderlyingSinkCallbackHelpers.cpp
@@ -0,0 +1,274 @@
+/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim:set ts=2 sw=2 sts=2 et cindent: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#include "mozilla/dom/UnderlyingSinkCallbackHelpers.h"
+#include "StreamUtils.h"
+#include "mozilla/dom/UnionTypes.h"
+#include "mozilla/dom/WebTransportError.h"
+#include "nsHttp.h"
+
+using namespace mozilla::dom;
+
+NS_IMPL_CYCLE_COLLECTION(UnderlyingSinkAlgorithmsBase)
+NS_IMPL_CYCLE_COLLECTING_ADDREF(UnderlyingSinkAlgorithmsBase)
+NS_IMPL_CYCLE_COLLECTING_RELEASE(UnderlyingSinkAlgorithmsBase)
+NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(UnderlyingSinkAlgorithmsBase)
+ NS_INTERFACE_MAP_ENTRY(nsISupports)
+NS_INTERFACE_MAP_END
+
+NS_IMPL_CYCLE_COLLECTION_INHERITED_WITH_JS_MEMBERS(
+ UnderlyingSinkAlgorithms, UnderlyingSinkAlgorithmsBase,
+ (mGlobal, mStartCallback, mWriteCallback, mCloseCallback, mAbortCallback),
+ (mUnderlyingSink))
+NS_IMPL_ADDREF_INHERITED(UnderlyingSinkAlgorithms, UnderlyingSinkAlgorithmsBase)
+NS_IMPL_RELEASE_INHERITED(UnderlyingSinkAlgorithms,
+ UnderlyingSinkAlgorithmsBase)
+NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(UnderlyingSinkAlgorithms)
+NS_INTERFACE_MAP_END_INHERITING(UnderlyingSinkAlgorithmsBase)
+
+// https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink
+void UnderlyingSinkAlgorithms::StartCallback(
+ JSContext* aCx, WritableStreamDefaultController& aController,
+ JS::MutableHandle<JS::Value> aRetVal, ErrorResult& aRv) {
+ if (!mStartCallback) {
+ // Step 2: Let startAlgorithm be an algorithm that returns undefined.
+ aRetVal.setUndefined();
+ return;
+ }
+
+ // Step 6: If underlyingSinkDict["start"] exists, then set startAlgorithm to
+ // an algorithm which returns the result of invoking
+ // underlyingSinkDict["start"] with argument list « controller » and callback
+ // this value underlyingSink.
+ JS::Rooted<JSObject*> thisObj(aCx, mUnderlyingSink);
+ return mStartCallback->Call(thisObj, aController, aRetVal, aRv,
+ "UnderlyingSink.start",
+ CallbackFunction::eRethrowExceptions);
+}
+
+// https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink
+already_AddRefed<Promise> UnderlyingSinkAlgorithms::WriteCallback(
+ JSContext* aCx, JS::Handle<JS::Value> aChunk,
+ WritableStreamDefaultController& aController, ErrorResult& aRv) {
+ if (!mWriteCallback) {
+ // Step 3: Let writeAlgorithm be an algorithm that returns a promise
+ // resolved with undefined.
+ return Promise::CreateResolvedWithUndefined(mGlobal, aRv);
+ }
+
+ // Step 7: If underlyingSinkDict["write"] exists, then set writeAlgorithm to
+ // an algorithm which takes an argument chunk and returns the result of
+ // invoking underlyingSinkDict["write"] with argument list « chunk, controller
+ // » and callback this value underlyingSink.
+ JS::Rooted<JSObject*> thisObj(aCx, mUnderlyingSink);
+ RefPtr<Promise> promise = mWriteCallback->Call(
+ thisObj, aChunk, aController, aRv, "UnderlyingSink.write",
+ CallbackFunction::eRethrowExceptions);
+ return promise.forget();
+}
+
+// https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink
+already_AddRefed<Promise> UnderlyingSinkAlgorithms::CloseCallback(
+ JSContext* aCx, ErrorResult& aRv) {
+ if (!mCloseCallback) {
+ // Step 4: Let closeAlgorithm be an algorithm that returns a promise
+ // resolved with undefined.
+ return Promise::CreateResolvedWithUndefined(mGlobal, aRv);
+ }
+
+ // Step 8: If underlyingSinkDict["close"] exists, then set closeAlgorithm to
+ // an algorithm which returns the result of invoking
+ // underlyingSinkDict["close"] with argument list «» and callback this value
+ // underlyingSink.
+ JS::Rooted<JSObject*> thisObj(aCx, mUnderlyingSink);
+ RefPtr<Promise> promise =
+ mCloseCallback->Call(thisObj, aRv, "UnderlyingSink.close",
+ CallbackFunction::eRethrowExceptions);
+ return promise.forget();
+}
+
+// https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink
+already_AddRefed<Promise> UnderlyingSinkAlgorithms::AbortCallback(
+ JSContext* aCx, const Optional<JS::Handle<JS::Value>>& aReason,
+ ErrorResult& aRv) {
+ if (!mAbortCallback) {
+ // Step 5: Let abortAlgorithm be an algorithm that returns a promise
+ // resolved with undefined.
+ return Promise::CreateResolvedWithUndefined(mGlobal, aRv);
+ }
+
+ // Step 9: Let abortAlgorithm be an algorithm that returns a promise resolved
+ // with undefined.
+ JS::Rooted<JSObject*> thisObj(aCx, mUnderlyingSink);
+ RefPtr<Promise> promise =
+ mAbortCallback->Call(thisObj, aReason, aRv, "UnderlyingSink.abort",
+ CallbackFunction::eRethrowExceptions);
+
+ return promise.forget();
+}
+
+// https://streams.spec.whatwg.org/#writable-set-up
+// Step 2.1: Let closeAlgorithmWrapper be an algorithm that runs these steps:
+already_AddRefed<Promise> UnderlyingSinkAlgorithmsWrapper::CloseCallback(
+ JSContext* aCx, ErrorResult& aRv) {
+ nsCOMPtr<nsIGlobalObject> global = xpc::CurrentNativeGlobal(aCx);
+ return PromisifyAlgorithm(
+ global, [&](ErrorResult& aRv) { return CloseCallbackImpl(aCx, aRv); },
+ aRv);
+}
+
+// https://streams.spec.whatwg.org/#writable-set-up
+// Step 3.1: Let abortAlgorithmWrapper be an algorithm that runs these steps:
+already_AddRefed<Promise> UnderlyingSinkAlgorithmsWrapper::AbortCallback(
+ JSContext* aCx, const Optional<JS::Handle<JS::Value>>& aReason,
+ ErrorResult& aRv) {
+ nsCOMPtr<nsIGlobalObject> global = xpc::CurrentNativeGlobal(aCx);
+ return PromisifyAlgorithm(
+ global,
+ [&](ErrorResult& aRv) { return AbortCallbackImpl(aCx, aReason, aRv); },
+ aRv);
+}
+
+NS_IMPL_ISUPPORTS_CYCLE_COLLECTION_INHERITED(WritableStreamToOutput,
+ UnderlyingSinkAlgorithmsBase,
+ nsIOutputStreamCallback)
+NS_IMPL_CYCLE_COLLECTION_INHERITED(WritableStreamToOutput,
+ UnderlyingSinkAlgorithmsBase, mParent,
+ mOutput, mPromise)
+
+NS_IMETHODIMP
+WritableStreamToOutput::OnOutputStreamReady(nsIAsyncOutputStream* aStream) {
+ if (!mData) {
+ return NS_OK;
+ }
+ MOZ_ASSERT(mPromise);
+ uint32_t written = 0;
+ nsresult rv = mOutput->Write(
+ reinterpret_cast<const char*>(mData->Elements() + mWritten),
+ mData->Length() - mWritten, &written);
+ if (NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK) {
+ mPromise->MaybeRejectWithAbortError("Error writing to stream"_ns);
+ ClearData();
+ // XXX should we add mErrored and fail future calls immediately?
+ // I presume new calls to Write() will fail though, too
+ return rv;
+ }
+ if (NS_SUCCEEDED(rv)) {
+ mWritten += written;
+ MOZ_ASSERT(mWritten <= mData->Length());
+ if (mWritten >= mData->Length()) {
+ mPromise->MaybeResolveWithUndefined();
+ ClearData();
+ return NS_OK;
+ }
+ // more to write
+ }
+ // wrote partial or nothing
+ // Wait for space
+ nsCOMPtr<nsIEventTarget> target = mozilla::GetCurrentSerialEventTarget();
+ rv = mOutput->AsyncWait(this, 0, 0, target);
+ if (NS_FAILED(rv)) {
+ mPromise->MaybeRejectWithUnknownError("error waiting to write data");
+ ClearData();
+ // XXX should we add mErrored and fail future calls immediately?
+ // New calls to Write() will fail, note
+ // See step 5.2 of
+ // https://streams.spec.whatwg.org/#writable-stream-default-controller-process-write.
+ return rv;
+ }
+ return NS_OK;
+}
+
+already_AddRefed<Promise> WritableStreamToOutput::WriteCallback(
+ JSContext* aCx, JS::Handle<JS::Value> aChunk,
+ WritableStreamDefaultController& aController, ErrorResult& aError) {
+ ArrayBufferViewOrArrayBuffer data;
+ if (!data.Init(aCx, aChunk)) {
+ aError.StealExceptionFromJSContext(aCx);
+ return nullptr;
+ }
+ // buffer/bufferView
+ MOZ_ASSERT(data.IsArrayBuffer() || data.IsArrayBufferView());
+
+ RefPtr<Promise> promise = Promise::Create(mParent, aError);
+ if (NS_WARN_IF(aError.Failed())) {
+ return nullptr;
+ }
+
+ // Try to write first, and only enqueue data if we were already blocked
+ // or the write didn't write it all. This avoids allocations and copies
+ // in common cases.
+ MOZ_ASSERT(!mPromise);
+ MOZ_ASSERT(mWritten == 0);
+ uint32_t written = 0;
+ ProcessTypedArraysFixed(data, [&](const Span<uint8_t>& aData) {
+ Span<uint8_t> dataSpan = aData;
+ nsresult rv = mOutput->Write(mozilla::AsChars(dataSpan).Elements(),
+ dataSpan.Length(), &written);
+ if (NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK) {
+ promise->MaybeRejectWithAbortError("error writing data");
+ return;
+ }
+ if (NS_SUCCEEDED(rv)) {
+ if (written == dataSpan.Length()) {
+ promise->MaybeResolveWithUndefined();
+ return;
+ }
+ dataSpan = dataSpan.From(written);
+ }
+
+ auto buffer = Buffer<uint8_t>::CopyFrom(dataSpan);
+ if (buffer.isNothing()) {
+ promise->MaybeReject(NS_ERROR_OUT_OF_MEMORY);
+ return;
+ }
+ mData = std::move(buffer);
+ });
+
+ if (promise->State() != Promise::PromiseState::Pending) {
+ return promise.forget();
+ }
+
+ mPromise = promise;
+
+ nsCOMPtr<nsIEventTarget> target = mozilla::GetCurrentSerialEventTarget();
+ nsresult rv = mOutput->AsyncWait(this, 0, 0, target);
+ if (NS_FAILED(rv)) {
+ ClearData();
+ promise->MaybeRejectWithUnknownError("error waiting to write data");
+ }
+ return promise.forget();
+}
+
+already_AddRefed<Promise> WritableStreamToOutput::AbortCallbackImpl(
+ JSContext* aCx, const Optional<JS::Handle<JS::Value>>& aReason,
+ ErrorResult& aRv) {
+ // https://streams.spec.whatwg.org/#writablestream-set-up
+ // Step 3. Let abortAlgorithmWrapper be an algorithm that runs these steps:
+
+ if (aReason.WasPassed() && aReason.Value().isObject()) {
+ JS::Rooted<JSObject*> obj(aCx, &aReason.Value().toObject());
+ RefPtr<WebTransportError> error;
+ UnwrapObject<prototypes::id::WebTransportError, WebTransportError>(
+ obj, error, nullptr);
+ if (error) {
+ mOutput->CloseWithStatus(net::GetNSResultFromWebTransportError(
+ error->GetStreamErrorCode().Value()));
+ return nullptr;
+ }
+ }
+
+ // XXX The close or rather a dedicated abort should be async. For now we have
+ // to always fall back to the Step 3.3 below.
+ // XXX how do we know this stream is used by webtransport?
+ mOutput->CloseWithStatus(NS_ERROR_WEBTRANSPORT_CODE_BASE);
+
+ // Step 3.3. Return a promise resolved with undefined.
+ // Wrapper handles this
+ return nullptr;
+}
+
+void WritableStreamToOutput::ReleaseObjects() { mOutput->Close(); }