/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim:set ts=2 sw=2 sts=2 et cindent: */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ #include "mozilla/dom/UnderlyingSinkCallbackHelpers.h" #include "StreamUtils.h" #include "mozilla/dom/UnionTypes.h" #include "mozilla/dom/WebTransportError.h" #include "nsHttp.h" using namespace mozilla::dom; NS_IMPL_CYCLE_COLLECTION(UnderlyingSinkAlgorithmsBase) NS_IMPL_CYCLE_COLLECTING_ADDREF(UnderlyingSinkAlgorithmsBase) NS_IMPL_CYCLE_COLLECTING_RELEASE(UnderlyingSinkAlgorithmsBase) NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(UnderlyingSinkAlgorithmsBase) NS_INTERFACE_MAP_ENTRY(nsISupports) NS_INTERFACE_MAP_END NS_IMPL_CYCLE_COLLECTION_INHERITED_WITH_JS_MEMBERS( UnderlyingSinkAlgorithms, UnderlyingSinkAlgorithmsBase, (mGlobal, mStartCallback, mWriteCallback, mCloseCallback, mAbortCallback), (mUnderlyingSink)) NS_IMPL_ADDREF_INHERITED(UnderlyingSinkAlgorithms, UnderlyingSinkAlgorithmsBase) NS_IMPL_RELEASE_INHERITED(UnderlyingSinkAlgorithms, UnderlyingSinkAlgorithmsBase) NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(UnderlyingSinkAlgorithms) NS_INTERFACE_MAP_END_INHERITING(UnderlyingSinkAlgorithmsBase) // https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink void UnderlyingSinkAlgorithms::StartCallback( JSContext* aCx, WritableStreamDefaultController& aController, JS::MutableHandle aRetVal, ErrorResult& aRv) { if (!mStartCallback) { // Step 2: Let startAlgorithm be an algorithm that returns undefined. aRetVal.setUndefined(); return; } // Step 6: If underlyingSinkDict["start"] exists, then set startAlgorithm to // an algorithm which returns the result of invoking // underlyingSinkDict["start"] with argument list « controller » and callback // this value underlyingSink. JS::Rooted thisObj(aCx, mUnderlyingSink); return mStartCallback->Call(thisObj, aController, aRetVal, aRv, "UnderlyingSink.start", CallbackFunction::eRethrowExceptions); } // https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink already_AddRefed UnderlyingSinkAlgorithms::WriteCallback( JSContext* aCx, JS::Handle aChunk, WritableStreamDefaultController& aController, ErrorResult& aRv) { if (!mWriteCallback) { // Step 3: Let writeAlgorithm be an algorithm that returns a promise // resolved with undefined. return Promise::CreateResolvedWithUndefined(mGlobal, aRv); } // Step 7: If underlyingSinkDict["write"] exists, then set writeAlgorithm to // an algorithm which takes an argument chunk and returns the result of // invoking underlyingSinkDict["write"] with argument list « chunk, controller // » and callback this value underlyingSink. JS::Rooted thisObj(aCx, mUnderlyingSink); RefPtr promise = mWriteCallback->Call( thisObj, aChunk, aController, aRv, "UnderlyingSink.write", CallbackFunction::eRethrowExceptions); return promise.forget(); } // https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink already_AddRefed UnderlyingSinkAlgorithms::CloseCallback( JSContext* aCx, ErrorResult& aRv) { if (!mCloseCallback) { // Step 4: Let closeAlgorithm be an algorithm that returns a promise // resolved with undefined. return Promise::CreateResolvedWithUndefined(mGlobal, aRv); } // Step 8: If underlyingSinkDict["close"] exists, then set closeAlgorithm to // an algorithm which returns the result of invoking // underlyingSinkDict["close"] with argument list «» and callback this value // underlyingSink. JS::Rooted thisObj(aCx, mUnderlyingSink); RefPtr promise = mCloseCallback->Call(thisObj, aRv, "UnderlyingSink.close", CallbackFunction::eRethrowExceptions); return promise.forget(); } // https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink already_AddRefed UnderlyingSinkAlgorithms::AbortCallback( JSContext* aCx, const Optional>& aReason, ErrorResult& aRv) { if (!mAbortCallback) { // Step 5: Let abortAlgorithm be an algorithm that returns a promise // resolved with undefined. return Promise::CreateResolvedWithUndefined(mGlobal, aRv); } // Step 9: Let abortAlgorithm be an algorithm that returns a promise resolved // with undefined. JS::Rooted thisObj(aCx, mUnderlyingSink); RefPtr promise = mAbortCallback->Call(thisObj, aReason, aRv, "UnderlyingSink.abort", CallbackFunction::eRethrowExceptions); return promise.forget(); } // https://streams.spec.whatwg.org/#writable-set-up // Step 2.1: Let closeAlgorithmWrapper be an algorithm that runs these steps: already_AddRefed UnderlyingSinkAlgorithmsWrapper::CloseCallback( JSContext* aCx, ErrorResult& aRv) { nsCOMPtr global = xpc::CurrentNativeGlobal(aCx); return PromisifyAlgorithm( global, [&](ErrorResult& aRv) { return CloseCallbackImpl(aCx, aRv); }, aRv); } // https://streams.spec.whatwg.org/#writable-set-up // Step 3.1: Let abortAlgorithmWrapper be an algorithm that runs these steps: already_AddRefed UnderlyingSinkAlgorithmsWrapper::AbortCallback( JSContext* aCx, const Optional>& aReason, ErrorResult& aRv) { nsCOMPtr global = xpc::CurrentNativeGlobal(aCx); return PromisifyAlgorithm( global, [&](ErrorResult& aRv) { return AbortCallbackImpl(aCx, aReason, aRv); }, aRv); } NS_IMPL_ISUPPORTS_CYCLE_COLLECTION_INHERITED(WritableStreamToOutput, UnderlyingSinkAlgorithmsBase, nsIOutputStreamCallback) NS_IMPL_CYCLE_COLLECTION_INHERITED(WritableStreamToOutput, UnderlyingSinkAlgorithmsBase, mParent, mOutput, mPromise) NS_IMETHODIMP WritableStreamToOutput::OnOutputStreamReady(nsIAsyncOutputStream* aStream) { if (!mData) { return NS_OK; } MOZ_ASSERT(mPromise); uint32_t written = 0; nsresult rv = mOutput->Write( reinterpret_cast(mData->Elements() + mWritten), mData->Length() - mWritten, &written); if (NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK) { mPromise->MaybeRejectWithAbortError("Error writing to stream"_ns); ClearData(); // XXX should we add mErrored and fail future calls immediately? // I presume new calls to Write() will fail though, too return rv; } if (NS_SUCCEEDED(rv)) { mWritten += written; MOZ_ASSERT(mWritten <= mData->Length()); if (mWritten >= mData->Length()) { mPromise->MaybeResolveWithUndefined(); ClearData(); return NS_OK; } // more to write } // wrote partial or nothing // Wait for space nsCOMPtr target = mozilla::GetCurrentSerialEventTarget(); rv = mOutput->AsyncWait(this, 0, 0, target); if (NS_FAILED(rv)) { mPromise->MaybeRejectWithUnknownError("error waiting to write data"); ClearData(); // XXX should we add mErrored and fail future calls immediately? // New calls to Write() will fail, note // See step 5.2 of // https://streams.spec.whatwg.org/#writable-stream-default-controller-process-write. return rv; } return NS_OK; } already_AddRefed WritableStreamToOutput::WriteCallback( JSContext* aCx, JS::Handle aChunk, WritableStreamDefaultController& aController, ErrorResult& aError) { ArrayBufferViewOrArrayBuffer data; if (!data.Init(aCx, aChunk)) { aError.StealExceptionFromJSContext(aCx); return nullptr; } // buffer/bufferView MOZ_ASSERT(data.IsArrayBuffer() || data.IsArrayBufferView()); RefPtr promise = Promise::Create(mParent, aError); if (NS_WARN_IF(aError.Failed())) { return nullptr; } // This is a duplicate of dom/encoding/TextDecoderStream.cpp#51-69 // PeterV will deal with that when he lands his patch for TypedArrays auto dataSpan = [&data]() { if (data.IsArrayBuffer()) { const ArrayBuffer& buffer = data.GetAsArrayBuffer(); buffer.ComputeState(); return Span{buffer.Data(), buffer.Length()}; } MOZ_ASSERT(data.IsArrayBufferView()); const ArrayBufferView& buffer = data.GetAsArrayBufferView(); buffer.ComputeState(); return Span{buffer.Data(), buffer.Length()}; }(); // Try to write first, and only enqueue data if we were already blocked // or the write didn't write it all. This avoids allocations and copies // in common cases. MOZ_ASSERT(!mPromise); MOZ_ASSERT(mWritten == 0); uint32_t written = 0; nsresult rv = mOutput->Write(mozilla::AsChars(dataSpan).Elements(), dataSpan.Length(), &written); if (NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK) { promise->MaybeRejectWithAbortError("error writing data"); return promise.forget(); } if (NS_SUCCEEDED(rv)) { if (written == dataSpan.Length()) { promise->MaybeResolveWithUndefined(); return promise.forget(); } dataSpan = dataSpan.From(written); } auto buffer = Buffer::CopyFrom(dataSpan); if (buffer.isNothing()) { promise->MaybeReject(NS_ERROR_OUT_OF_MEMORY); return promise.forget(); } mData = std::move(buffer); mPromise = promise; nsCOMPtr target = mozilla::GetCurrentSerialEventTarget(); rv = mOutput->AsyncWait(this, 0, 0, target); if (NS_FAILED(rv)) { ClearData(); promise->MaybeRejectWithUnknownError("error waiting to write data"); } return promise.forget(); } already_AddRefed WritableStreamToOutput::AbortCallbackImpl( JSContext* aCx, const Optional>& aReason, ErrorResult& aRv) { // https://streams.spec.whatwg.org/#writablestream-set-up // Step 3. Let abortAlgorithmWrapper be an algorithm that runs these steps: if (aReason.WasPassed() && aReason.Value().isObject()) { JS::Rooted obj(aCx, &aReason.Value().toObject()); RefPtr error; UnwrapObject( obj, error, nullptr); if (error) { mOutput->CloseWithStatus(net::GetNSResultFromWebTransportError( error->GetStreamErrorCode().Value())); return nullptr; } } // XXX The close or rather a dedicated abort should be async. For now we have // to always fall back to the Step 3.3 below. // XXX how do we know this stream is used by webtransport? mOutput->CloseWithStatus(NS_ERROR_WEBTRANSPORT_CODE_BASE); // Step 3.3. Return a promise resolved with undefined. // Wrapper handles this return nullptr; } void WritableStreamToOutput::ReleaseObjects() { mOutput->Close(); }