From 2aa4a82499d4becd2284cdb482213d541b8804dd Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 28 Apr 2024 16:29:10 +0200 Subject: Adding upstream version 86.0.1. Signed-off-by: Daniel Baumann --- dom/file/ipc/RemoteLazyInputStream.cpp | 960 +++++++++++++++++++++++++++++++++ 1 file changed, 960 insertions(+) create mode 100644 dom/file/ipc/RemoteLazyInputStream.cpp (limited to 'dom/file/ipc/RemoteLazyInputStream.cpp') diff --git a/dom/file/ipc/RemoteLazyInputStream.cpp b/dom/file/ipc/RemoteLazyInputStream.cpp new file mode 100644 index 0000000000..10e2e41633 --- /dev/null +++ b/dom/file/ipc/RemoteLazyInputStream.cpp @@ -0,0 +1,960 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* vim: set ts=8 sts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "RemoteLazyInputStream.h" +#include "RemoteLazyInputStreamChild.h" +#include "RemoteLazyInputStreamParent.h" +#include "mozilla/ipc/InputStreamParams.h" +#include "mozilla/net/SocketProcessParent.h" +#include "mozilla/SlicedInputStream.h" +#include "mozilla/NonBlockingAsyncInputStream.h" +#include "nsIAsyncInputStream.h" +#include "nsIAsyncOutputStream.h" +#include "nsIPipe.h" +#include "nsNetUtil.h" +#include "nsStreamUtils.h" +#include "nsStringStream.h" +#include "RemoteLazyInputStreamStorage.h" +#include "RemoteLazyInputStreamThread.h" + +namespace mozilla { + +using namespace dom; +using net::SocketProcessParent; + +class RemoteLazyInputStream; + +namespace { + +class InputStreamCallbackRunnable final : public DiscardableRunnable { + public: + // Note that the execution can be synchronous in case the event target is + // null. + static void Execute(nsIInputStreamCallback* aCallback, + nsIEventTarget* aEventTarget, + RemoteLazyInputStream* aStream) { + MOZ_ASSERT(aCallback); + + RefPtr runnable = + new InputStreamCallbackRunnable(aCallback, aStream); + + nsCOMPtr target = aEventTarget; + if (aEventTarget) { + target->Dispatch(runnable, NS_DISPATCH_NORMAL); + } else { + runnable->Run(); + } + } + + NS_IMETHOD + Run() override { + mCallback->OnInputStreamReady(mStream); + mCallback = nullptr; + mStream = nullptr; + return NS_OK; + } + + private: + InputStreamCallbackRunnable(nsIInputStreamCallback* aCallback, + RemoteLazyInputStream* aStream) + : DiscardableRunnable("dom::InputStreamCallbackRunnable"), + mCallback(aCallback), + mStream(aStream) { + MOZ_ASSERT(mCallback); + MOZ_ASSERT(mStream); + } + + nsCOMPtr mCallback; + RefPtr mStream; +}; + +class FileMetadataCallbackRunnable final : public DiscardableRunnable { + public: + static void Execute(nsIFileMetadataCallback* aCallback, + nsIEventTarget* aEventTarget, + RemoteLazyInputStream* aStream) { + MOZ_ASSERT(aCallback); + MOZ_ASSERT(aEventTarget); + + RefPtr runnable = + new FileMetadataCallbackRunnable(aCallback, aStream); + + nsCOMPtr target = aEventTarget; + target->Dispatch(runnable, NS_DISPATCH_NORMAL); + } + + NS_IMETHOD + Run() override { + mCallback->OnFileMetadataReady(mStream); + mCallback = nullptr; + mStream = nullptr; + return NS_OK; + } + + private: + FileMetadataCallbackRunnable(nsIFileMetadataCallback* aCallback, + RemoteLazyInputStream* aStream) + : DiscardableRunnable("dom::FileMetadataCallbackRunnable"), + mCallback(aCallback), + mStream(aStream) { + MOZ_ASSERT(mCallback); + MOZ_ASSERT(mStream); + } + + nsCOMPtr mCallback; + RefPtr mStream; +}; + +} // namespace + +NS_IMPL_ADDREF(RemoteLazyInputStream); +NS_IMPL_RELEASE(RemoteLazyInputStream); + +NS_INTERFACE_MAP_BEGIN(RemoteLazyInputStream) + NS_INTERFACE_MAP_ENTRY(nsIInputStream) + NS_INTERFACE_MAP_ENTRY(nsIAsyncInputStream) + NS_INTERFACE_MAP_ENTRY(nsIInputStreamCallback) + NS_INTERFACE_MAP_ENTRY(nsICloneableInputStream) + NS_INTERFACE_MAP_ENTRY(nsICloneableInputStreamWithRange) + NS_INTERFACE_MAP_ENTRY(nsIIPCSerializableInputStream) + NS_INTERFACE_MAP_ENTRY(nsIFileMetadata) + NS_INTERFACE_MAP_ENTRY(nsIAsyncFileMetadata) + NS_INTERFACE_MAP_ENTRY(nsIInputStreamLength) + NS_INTERFACE_MAP_ENTRY(nsIAsyncInputStreamLength) + NS_INTERFACE_MAP_ENTRY(mozIRemoteLazyInputStream) + NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIInputStream) +NS_INTERFACE_MAP_END + +RemoteLazyInputStream::RemoteLazyInputStream(RemoteLazyInputStreamChild* aActor) + : mActor(aActor), + mState(eInit), + mStart(0), + mLength(0), + mConsumed(false), + mMutex("RemoteLazyInputStream::mMutex") { + MOZ_ASSERT(aActor); + + mLength = aActor->Size(); + + if (XRE_IsParentProcess()) { + nsCOMPtr stream; + auto storage = RemoteLazyInputStreamStorage::Get().unwrapOr(nullptr); + if (storage) { + storage->GetStream(mActor->ID(), 0, mLength, getter_AddRefs(stream)); + if (stream) { + mState = eRunning; + mRemoteStream = stream; + } + } + } +} + +RemoteLazyInputStream::~RemoteLazyInputStream() { Close(); } + +// nsIInputStream interface + +NS_IMETHODIMP +RemoteLazyInputStream::Available(uint64_t* aLength) { + nsCOMPtr asyncRemoteStream; + { + MutexAutoLock lock(mMutex); + + // We don't have a remoteStream yet: let's return 0. + if (mState == eInit || mState == ePending) { + *aLength = 0; + return NS_OK; + } + + if (mState == eClosed) { + return NS_BASE_STREAM_CLOSED; + } + + MOZ_ASSERT(mState == eRunning); + MOZ_ASSERT(mRemoteStream || mAsyncRemoteStream); + + nsresult rv = EnsureAsyncRemoteStream(lock); + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + asyncRemoteStream = mAsyncRemoteStream; + } + + MOZ_ASSERT(asyncRemoteStream); + return asyncRemoteStream->Available(aLength); +} + +NS_IMETHODIMP +RemoteLazyInputStream::Read(char* aBuffer, uint32_t aCount, + uint32_t* aReadCount) { + nsCOMPtr asyncRemoteStream; + { + MutexAutoLock lock(mMutex); + + // Read is not available is we don't have a remoteStream. + if (mState == eInit || mState == ePending) { + return NS_BASE_STREAM_WOULD_BLOCK; + } + + if (mState == eClosed) { + return NS_BASE_STREAM_CLOSED; + } + + MOZ_ASSERT(mState == eRunning); + MOZ_ASSERT(mRemoteStream || mAsyncRemoteStream); + + nsresult rv = EnsureAsyncRemoteStream(lock); + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + asyncRemoteStream = mAsyncRemoteStream; + } + + MOZ_ASSERT(asyncRemoteStream); + nsresult rv = asyncRemoteStream->Read(aBuffer, aCount, aReadCount); + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + { + MutexAutoLock lock(mMutex); + mConsumed = true; + } + + return NS_OK; +} + +NS_IMETHODIMP +RemoteLazyInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, + uint32_t aCount, uint32_t* aResult) { + nsCOMPtr asyncRemoteStream; + { + MutexAutoLock lock(mMutex); + + // ReadSegments is not available is we don't have a remoteStream. + if (mState == eInit || mState == ePending) { + return NS_BASE_STREAM_WOULD_BLOCK; + } + + if (mState == eClosed) { + return NS_BASE_STREAM_CLOSED; + } + + MOZ_ASSERT(mState == eRunning); + MOZ_ASSERT(mRemoteStream || mAsyncRemoteStream); + + nsresult rv = EnsureAsyncRemoteStream(lock); + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + asyncRemoteStream = mAsyncRemoteStream; + } + + MOZ_ASSERT(asyncRemoteStream); + nsresult rv = + asyncRemoteStream->ReadSegments(aWriter, aClosure, aCount, aResult); + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + // If some data has been read, we mark the stream as consumed. + if (*aResult != 0) { + MutexAutoLock lock(mMutex); + mConsumed = true; + } + + return NS_OK; +} + +NS_IMETHODIMP +RemoteLazyInputStream::IsNonBlocking(bool* aNonBlocking) { + *aNonBlocking = true; + return NS_OK; +} + +NS_IMETHODIMP +RemoteLazyInputStream::Close() { + nsCOMPtr asyncRemoteStream; + nsCOMPtr remoteStream; + { + MutexAutoLock lock(mMutex); + + if (mActor) { + mActor->ForgetStream(this); + mActor = nullptr; + } + + asyncRemoteStream.swap(mAsyncRemoteStream); + remoteStream.swap(mRemoteStream); + + mInputStreamCallback = nullptr; + mInputStreamCallbackEventTarget = nullptr; + + mFileMetadataCallback = nullptr; + mFileMetadataCallbackEventTarget = nullptr; + + mState = eClosed; + } + + if (asyncRemoteStream) { + asyncRemoteStream->CloseWithStatus(NS_BASE_STREAM_CLOSED); + } + + if (remoteStream) { + remoteStream->Close(); + } + + return NS_OK; +} + +// nsICloneableInputStream interface + +NS_IMETHODIMP +RemoteLazyInputStream::GetCloneable(bool* aCloneable) { + MutexAutoLock lock(mMutex); + *aCloneable = mState != eClosed; + return NS_OK; +} + +NS_IMETHODIMP +RemoteLazyInputStream::Clone(nsIInputStream** aResult) { + MutexAutoLock lock(mMutex); + + if (mState == eClosed) { + return NS_BASE_STREAM_CLOSED; + } + + MOZ_ASSERT(mActor); + + RefPtr stream = mActor->CreateStream(); + if (!stream) { + return NS_ERROR_FAILURE; + } + + stream->InitWithExistingRange(mStart, mLength, lock); + + stream.forget(aResult); + return NS_OK; +} + +// nsICloneableInputStreamWithRange interface + +NS_IMETHODIMP +RemoteLazyInputStream::CloneWithRange(uint64_t aStart, uint64_t aLength, + nsIInputStream** aResult) { + MutexAutoLock lock(mMutex); + + if (mState == eClosed) { + return NS_BASE_STREAM_CLOSED; + } + + // Too short or out of range. + if (aLength == 0 || aStart >= mLength) { + return NS_NewCStringInputStream(aResult, ""_ns); + } + + MOZ_ASSERT(mActor); + + RefPtr stream = mActor->CreateStream(); + if (!stream) { + return NS_ERROR_FAILURE; + } + + CheckedInt streamSize = mLength; + streamSize -= aStart; + if (!streamSize.isValid()) { + return NS_ERROR_FAILURE; + } + + if (aLength > streamSize.value()) { + aLength = streamSize.value(); + } + + stream->InitWithExistingRange(aStart + mStart, aLength, lock); + + stream.forget(aResult); + return NS_OK; +} + +// nsIAsyncInputStream interface + +NS_IMETHODIMP +RemoteLazyInputStream::CloseWithStatus(nsresult aStatus) { return Close(); } + +NS_IMETHODIMP +RemoteLazyInputStream::AsyncWait(nsIInputStreamCallback* aCallback, + uint32_t aFlags, uint32_t aRequestedCount, + nsIEventTarget* aEventTarget) { + nsCOMPtr asyncRemoteStream; + { + MutexAutoLock lock(mMutex); + + // See RemoteLazyInputStream.h for more information about this state + // machine. + + switch (mState) { + // First call, we need to retrieve the stream from the parent actor. + case eInit: + MOZ_ASSERT(mActor); + + mInputStreamCallback = aCallback; + mInputStreamCallbackEventTarget = aEventTarget; + mState = ePending; + + mActor->StreamNeeded(this, aEventTarget); + return NS_OK; + + // We are still waiting for the remote inputStream + case ePending: { + if (mInputStreamCallback && aCallback) { + return NS_ERROR_FAILURE; + } + + mInputStreamCallback = aCallback; + mInputStreamCallbackEventTarget = aEventTarget; + return NS_OK; + } + + // We have the remote inputStream, let's check if we can execute the + // callback. + case eRunning: { + if (mInputStreamCallback && aCallback) { + return NS_ERROR_FAILURE; + } + + nsresult rv = EnsureAsyncRemoteStream(lock); + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + mInputStreamCallback = aCallback; + mInputStreamCallbackEventTarget = aEventTarget; + + asyncRemoteStream = mAsyncRemoteStream; + break; + } + + case eClosed: + [[fallthrough]]; + default: + MOZ_ASSERT(mState == eClosed); + if (mInputStreamCallback && aCallback) { + return NS_ERROR_FAILURE; + } + break; + } + } + + if (asyncRemoteStream) { + return asyncRemoteStream->AsyncWait(aCallback ? this : nullptr, 0, 0, + aEventTarget); + } + + // if asyncRemoteStream is nullptr here, that probably means the stream has + // been closed and the callback can be executed immediately + InputStreamCallbackRunnable::Execute(aCallback, aEventTarget, this); + return NS_OK; +} + +void RemoteLazyInputStream::StreamReady( + already_AddRefed aInputStream) { + nsCOMPtr inputStream = std::move(aInputStream); + + // If inputStream is null, it means that the serialization went wrong or the + // stream is not available anymore. We keep the state as pending just to + // block any additional operation. + + if (!inputStream) { + return; + } + + nsCOMPtr fileMetadataCallback; + nsCOMPtr fileMetadataCallbackEventTarget; + nsCOMPtr inputStreamCallback; + nsCOMPtr inputStreamCallbackEventTarget; + nsCOMPtr asyncRemoteStream; + { + MutexAutoLock lock(mMutex); + + // We have been closed in the meantime. + if (mState == eClosed) { + if (inputStream) { + MutexAutoUnlock unlock(mMutex); + inputStream->Close(); + } + return; + } + + // Now it's the right time to apply a slice if needed. + if (mStart > 0 || mLength < mActor->Size()) { + inputStream = + new SlicedInputStream(inputStream.forget(), mStart, mLength); + } + + mRemoteStream = inputStream; + + MOZ_ASSERT(mState == ePending); + mState = eRunning; + + fileMetadataCallback.swap(mFileMetadataCallback); + fileMetadataCallbackEventTarget.swap(mFileMetadataCallbackEventTarget); + + inputStreamCallback = mInputStreamCallback ? this : nullptr; + inputStreamCallbackEventTarget = mInputStreamCallbackEventTarget; + + if (inputStreamCallback) { + nsresult rv = EnsureAsyncRemoteStream(lock); + if (NS_WARN_IF(NS_FAILED(rv))) { + return; + } + + MOZ_ASSERT(mAsyncRemoteStream); + asyncRemoteStream = mAsyncRemoteStream; + } + } + + if (fileMetadataCallback) { + FileMetadataCallbackRunnable::Execute( + fileMetadataCallback, fileMetadataCallbackEventTarget, this); + } + + if (inputStreamCallback) { + MOZ_ASSERT(asyncRemoteStream); + + nsresult rv = asyncRemoteStream->AsyncWait(inputStreamCallback, 0, 0, + inputStreamCallbackEventTarget); + Unused << NS_WARN_IF(NS_FAILED(rv)); + } +} + +void RemoteLazyInputStream::InitWithExistingRange( + uint64_t aStart, uint64_t aLength, const MutexAutoLock& aProofOfLock) { + MOZ_ASSERT(mActor->Size() >= aStart + aLength); + mStart = aStart; + mLength = aLength; + + // In the child, we slice in StreamReady() when we set mState to eRunning. + // But in the parent, we start out eRunning, so it's necessary to slice the + // stream as soon as we have the information during the initialization phase + // because the stream is immediately consumable. + if (mState == eRunning && mRemoteStream && XRE_IsParentProcess() && + (mStart > 0 || mLength < mActor->Size())) { + mRemoteStream = + new SlicedInputStream(mRemoteStream.forget(), mStart, mLength); + } +} + +// nsIInputStreamCallback + +NS_IMETHODIMP +RemoteLazyInputStream::OnInputStreamReady(nsIAsyncInputStream* aStream) { + nsCOMPtr callback; + nsCOMPtr callbackEventTarget; + { + MutexAutoLock lock(mMutex); + + // We have been closed in the meantime. + if (mState == eClosed) { + return NS_OK; + } + + MOZ_ASSERT(mState == eRunning); + MOZ_ASSERT(mAsyncRemoteStream == aStream); + + // The callback has been canceled in the meantime. + if (!mInputStreamCallback) { + return NS_OK; + } + + callback.swap(mInputStreamCallback); + callbackEventTarget.swap(mInputStreamCallbackEventTarget); + } + + // This must be the last operation because the execution of the callback can + // be synchronous. + MOZ_ASSERT(callback); + InputStreamCallbackRunnable::Execute(callback, callbackEventTarget, this); + return NS_OK; +} + +// nsIIPCSerializableInputStream + +void RemoteLazyInputStream::Serialize( + mozilla::ipc::InputStreamParams& aParams, + FileDescriptorArray& aFileDescriptors, bool aDelayedStart, + uint32_t aMaxSize, uint32_t* aSizeUsed, + mozilla::ipc::ParentToChildStreamActorManager* aManager) { + MOZ_ASSERT(aSizeUsed); + *aSizeUsed = 0; + + // So far we support only socket process serialization. + MOZ_DIAGNOSTIC_ASSERT( + aManager == SocketProcessParent::GetSingleton(), + "Serializing an RemoteLazyInputStream parent to child is " + "wrong! The caller must be fixed! See IPCBlobUtils.h."); + SocketProcessParent* socketActor = SocketProcessParent::GetSingleton(); + + nsresult rv; + nsCOMPtr asyncRemoteStream; + RefPtr parentActor; + { + MutexAutoLock lock(mMutex); + rv = EnsureAsyncRemoteStream(lock); + MOZ_ASSERT(NS_SUCCEEDED(rv)); + + asyncRemoteStream = mAsyncRemoteStream; + MOZ_ASSERT(NS_SUCCEEDED(rv)); + } + + MOZ_ASSERT(asyncRemoteStream); + + parentActor = RemoteLazyInputStreamParent::Create(asyncRemoteStream, mLength, + 0, &rv, socketActor); + MOZ_ASSERT(parentActor); + + if (!socketActor->SendPRemoteLazyInputStreamConstructor( + parentActor, parentActor->ID(), parentActor->Size())) { + MOZ_CRASH("The serialization is not supposed to fail"); + } + + aParams = mozilla::ipc::RemoteLazyInputStreamParams(parentActor); +} + +void RemoteLazyInputStream::Serialize( + mozilla::ipc::InputStreamParams& aParams, + FileDescriptorArray& aFileDescriptors, bool aDelayedStart, + uint32_t aMaxSize, uint32_t* aSizeUsed, + mozilla::ipc::ChildToParentStreamActorManager* aManager) { + MOZ_ASSERT(aSizeUsed); + *aSizeUsed = 0; + + MutexAutoLock lock(mMutex); + + mozilla::ipc::RemoteLazyInputStreamRef params; + params.id() = mActor->ID(); + params.start() = mStart; + params.length() = mLength; + + aParams = params; +} + +bool RemoteLazyInputStream::Deserialize( + const mozilla::ipc::InputStreamParams& aParams, + const FileDescriptorArray& aFileDescriptors) { + MOZ_CRASH("This should never be called."); + return false; +} + +// nsIAsyncFileMetadata + +NS_IMETHODIMP +RemoteLazyInputStream::AsyncFileMetadataWait(nsIFileMetadataCallback* aCallback, + nsIEventTarget* aEventTarget) { + MOZ_ASSERT(!!aCallback == !!aEventTarget); + + // If we have the callback, we must have the event target. + if (NS_WARN_IF(!!aCallback != !!aEventTarget)) { + return NS_ERROR_FAILURE; + } + + // See RemoteLazyInputStream.h for more information about this state + // machine. + + { + MutexAutoLock lock(mMutex); + + switch (mState) { + // First call, we need to retrieve the stream from the parent actor. + case eInit: + MOZ_ASSERT(mActor); + + mFileMetadataCallback = aCallback; + mFileMetadataCallbackEventTarget = aEventTarget; + mState = ePending; + + mActor->StreamNeeded(this, aEventTarget); + return NS_OK; + + // We are still waiting for the remote inputStream + case ePending: + if (mFileMetadataCallback && aCallback) { + return NS_ERROR_FAILURE; + } + + mFileMetadataCallback = aCallback; + mFileMetadataCallbackEventTarget = aEventTarget; + return NS_OK; + + // We have the remote inputStream, let's check if we can execute the + // callback. + case eRunning: + break; + + // Stream is closed. + default: + MOZ_ASSERT(mState == eClosed); + return NS_BASE_STREAM_CLOSED; + } + + MOZ_ASSERT(mState == eRunning); + } + + FileMetadataCallbackRunnable::Execute(aCallback, aEventTarget, this); + return NS_OK; +} + +// nsIFileMetadata + +NS_IMETHODIMP +RemoteLazyInputStream::GetSize(int64_t* aRetval) { + nsCOMPtr fileMetadata; + { + MutexAutoLock lock(mMutex); + fileMetadata = do_QueryInterface(mRemoteStream); + if (!fileMetadata) { + return mState == eClosed ? NS_BASE_STREAM_CLOSED : NS_ERROR_FAILURE; + } + } + + return fileMetadata->GetSize(aRetval); +} + +NS_IMETHODIMP +RemoteLazyInputStream::GetLastModified(int64_t* aRetval) { + nsCOMPtr fileMetadata; + { + MutexAutoLock lock(mMutex); + fileMetadata = do_QueryInterface(mRemoteStream); + if (!fileMetadata) { + return mState == eClosed ? NS_BASE_STREAM_CLOSED : NS_ERROR_FAILURE; + } + } + + return fileMetadata->GetLastModified(aRetval); +} + +NS_IMETHODIMP +RemoteLazyInputStream::GetFileDescriptor(PRFileDesc** aRetval) { + nsCOMPtr fileMetadata; + { + MutexAutoLock lock(mMutex); + fileMetadata = do_QueryInterface(mRemoteStream); + if (!fileMetadata) { + return mState == eClosed ? NS_BASE_STREAM_CLOSED : NS_ERROR_FAILURE; + } + } + + return fileMetadata->GetFileDescriptor(aRetval); +} + +nsresult RemoteLazyInputStream::EnsureAsyncRemoteStream( + const MutexAutoLock& aProofOfLock) { + // We already have an async remote stream. + if (mAsyncRemoteStream) { + return NS_OK; + } + + if (!mRemoteStream) { + return NS_ERROR_FAILURE; + } + + nsCOMPtr stream = mRemoteStream; + // We don't return NS_ERROR_NOT_IMPLEMENTED from ReadSegments, + // so it's possible that callers are expecting us to succeed in the future. + // We need to make sure the stream we return here supports ReadSegments, + // so wrap if in a buffered stream if necessary. + if (!NS_InputStreamIsBuffered(stream)) { + nsCOMPtr bufferedStream; + nsresult rv = NS_NewBufferedInputStream(getter_AddRefs(bufferedStream), + stream.forget(), 4096); + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + stream = bufferedStream; + } + + // If the stream is blocking, we want to make it unblocking using a pipe. + bool nonBlocking = false; + nsresult rv = stream->IsNonBlocking(&nonBlocking); + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + nsCOMPtr asyncStream = do_QueryInterface(stream); + + // If non-blocking and non-async, let's use NonBlockingAsyncInputStream. + if (nonBlocking && !asyncStream) { + rv = NonBlockingAsyncInputStream::Create(stream.forget(), + getter_AddRefs(asyncStream)); + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + MOZ_ASSERT(asyncStream); + } + + if (!asyncStream) { + // Let's make the stream async using the DOMFile thread. + nsCOMPtr pipeIn; + nsCOMPtr pipeOut; + rv = NS_NewPipe2(getter_AddRefs(pipeIn), getter_AddRefs(pipeOut), true, + true); + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + RefPtr thread = + RemoteLazyInputStreamThread::GetOrCreate(); + if (NS_WARN_IF(!thread)) { + return NS_ERROR_FAILURE; + } + + rv = NS_AsyncCopy(stream, pipeOut, thread, NS_ASYNCCOPY_VIA_WRITESEGMENTS); + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + asyncStream = pipeIn; + } + + MOZ_ASSERT(asyncStream); + mAsyncRemoteStream = asyncStream; + mRemoteStream = nullptr; + + return NS_OK; +} + +// nsIInputStreamLength + +NS_IMETHODIMP +RemoteLazyInputStream::Length(int64_t* aLength) { + MutexAutoLock lock(mMutex); + + if (mState == eClosed) { + return NS_BASE_STREAM_CLOSED; + } + + if (mConsumed) { + return NS_ERROR_NOT_AVAILABLE; + } + + return NS_BASE_STREAM_WOULD_BLOCK; +} + +namespace { + +class InputStreamLengthCallbackRunnable final : public DiscardableRunnable { + public: + static void Execute(nsIInputStreamLengthCallback* aCallback, + nsIEventTarget* aEventTarget, + RemoteLazyInputStream* aStream, int64_t aLength) { + MOZ_ASSERT(aCallback); + MOZ_ASSERT(aEventTarget); + + RefPtr runnable = + new InputStreamLengthCallbackRunnable(aCallback, aStream, aLength); + + nsCOMPtr target = aEventTarget; + target->Dispatch(runnable, NS_DISPATCH_NORMAL); + } + + NS_IMETHOD + Run() override { + mCallback->OnInputStreamLengthReady(mStream, mLength); + mCallback = nullptr; + mStream = nullptr; + return NS_OK; + } + + private: + InputStreamLengthCallbackRunnable(nsIInputStreamLengthCallback* aCallback, + RemoteLazyInputStream* aStream, + int64_t aLength) + : DiscardableRunnable("dom::InputStreamLengthCallbackRunnable"), + mCallback(aCallback), + mStream(aStream), + mLength(aLength) { + MOZ_ASSERT(mCallback); + MOZ_ASSERT(mStream); + } + + nsCOMPtr mCallback; + RefPtr mStream; + const int64_t mLength; +}; + +} // namespace + +// nsIAsyncInputStreamLength + +NS_IMETHODIMP +RemoteLazyInputStream::AsyncLengthWait(nsIInputStreamLengthCallback* aCallback, + nsIEventTarget* aEventTarget) { + // If we have the callback, we must have the event target. + if (NS_WARN_IF(!!aCallback != !!aEventTarget)) { + return NS_ERROR_FAILURE; + } + + { + MutexAutoLock lock(mMutex); + + mLengthCallback = aCallback; + mLengthCallbackEventTarget = aEventTarget; + + if (mState != eClosed && !mConsumed) { + MOZ_ASSERT(mActor); + + if (aCallback) { + mActor->LengthNeeded(this, aEventTarget); + } + + return NS_OK; + } + } + + // If execution has reached here, it means the stream is either closed or + // consumed, and therefore the callback can be executed immediately + InputStreamLengthCallbackRunnable::Execute(aCallback, aEventTarget, this, -1); + return NS_OK; +} + +void RemoteLazyInputStream::LengthReady(int64_t aLength) { + nsCOMPtr lengthCallback; + nsCOMPtr lengthCallbackEventTarget; + + { + MutexAutoLock lock(mMutex); + + // Stream has been closed in the meantime. Callback can be executed + // immediately + if (mState == eClosed || mConsumed) { + aLength = -1; + } else { + if (mStart > 0) { + aLength -= mStart; + } + + if (mLength < mActor->Size()) { + // If the remote stream must be sliced, we must return here the + // correct value. + aLength = XPCOM_MIN(aLength, (int64_t)mLength); + } + } + + lengthCallback.swap(mLengthCallback); + lengthCallbackEventTarget.swap(mLengthCallbackEventTarget); + } + + if (lengthCallback) { + InputStreamLengthCallbackRunnable::Execute( + lengthCallback, lengthCallbackEventTarget, this, aLength); + } +} + +} // namespace mozilla -- cgit v1.2.3