From 0ebf5bdf043a27fd3dfb7f92e0cb63d88954c44d Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 19 Apr 2024 03:47:29 +0200 Subject: Adding upstream version 115.8.0esr. Signed-off-by: Daniel Baumann --- dom/file/ipc/RemoteLazyInputStream.cpp | 1458 ++++++++++++++++++++++++++++++++ 1 file changed, 1458 insertions(+) create mode 100644 dom/file/ipc/RemoteLazyInputStream.cpp (limited to 'dom/file/ipc/RemoteLazyInputStream.cpp') diff --git a/dom/file/ipc/RemoteLazyInputStream.cpp b/dom/file/ipc/RemoteLazyInputStream.cpp new file mode 100644 index 0000000000..936438df13 --- /dev/null +++ b/dom/file/ipc/RemoteLazyInputStream.cpp @@ -0,0 +1,1458 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* vim: set ts=8 sts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "RemoteLazyInputStream.h" +#include "RemoteLazyInputStreamChild.h" +#include "RemoteLazyInputStreamParent.h" +#include "chrome/common/ipc_message_utils.h" +#include "mozilla/ErrorNames.h" +#include "mozilla/Logging.h" +#include "mozilla/PRemoteLazyInputStream.h" +#include "mozilla/ipc/Endpoint.h" +#include "mozilla/ipc/InputStreamParams.h" +#include "mozilla/ipc/MessageChannel.h" +#include "mozilla/ipc/ProtocolMessageUtils.h" +#include "mozilla/net/SocketProcessParent.h" +#include "mozilla/SlicedInputStream.h" +#include "mozilla/NonBlockingAsyncInputStream.h" +#include "nsIAsyncInputStream.h" +#include "nsIAsyncOutputStream.h" +#include "nsID.h" +#include "nsIInputStream.h" +#include "nsIPipe.h" +#include "nsNetUtil.h" +#include "nsStreamUtils.h" +#include "nsStringStream.h" +#include "RemoteLazyInputStreamStorage.h" +#include "RemoteLazyInputStreamThread.h" + +namespace mozilla { + +mozilla::LazyLogModule gRemoteLazyStreamLog("RemoteLazyStream"); + +namespace { + +class InputStreamCallbackRunnable final : public DiscardableRunnable { + public: + // Note that the execution can be synchronous in case the event target is + // null. + static void Execute(already_AddRefed aCallback, + already_AddRefed aEventTarget, + RemoteLazyInputStream* aStream) { + RefPtr runnable = + new InputStreamCallbackRunnable(std::move(aCallback), aStream); + + nsCOMPtr target = std::move(aEventTarget); + if (target) { + target->Dispatch(runnable, NS_DISPATCH_NORMAL); + } else { + runnable->Run(); + } + } + + NS_IMETHOD + Run() override { + mCallback->OnInputStreamReady(mStream); + mCallback = nullptr; + mStream = nullptr; + return NS_OK; + } + + private: + InputStreamCallbackRunnable( + already_AddRefed aCallback, + RemoteLazyInputStream* aStream) + : DiscardableRunnable("dom::InputStreamCallbackRunnable"), + mCallback(std::move(aCallback)), + mStream(aStream) { + MOZ_ASSERT(mCallback); + MOZ_ASSERT(mStream); + } + + RefPtr mCallback; + RefPtr mStream; +}; + +class FileMetadataCallbackRunnable final : public DiscardableRunnable { + public: + static void Execute(nsIFileMetadataCallback* aCallback, + nsIEventTarget* aEventTarget, + RemoteLazyInputStream* aStream) { + MOZ_ASSERT(aCallback); + MOZ_ASSERT(aEventTarget); + + RefPtr runnable = + new FileMetadataCallbackRunnable(aCallback, aStream); + + nsCOMPtr target = aEventTarget; + target->Dispatch(runnable, NS_DISPATCH_NORMAL); + } + + NS_IMETHOD + Run() override { + mCallback->OnFileMetadataReady(mStream); + mCallback = nullptr; + mStream = nullptr; + return NS_OK; + } + + private: + FileMetadataCallbackRunnable(nsIFileMetadataCallback* aCallback, + RemoteLazyInputStream* aStream) + : DiscardableRunnable("dom::FileMetadataCallbackRunnable"), + mCallback(aCallback), + mStream(aStream) { + MOZ_ASSERT(mCallback); + MOZ_ASSERT(mStream); + } + + nsCOMPtr mCallback; + RefPtr mStream; +}; + +} // namespace + +NS_IMPL_ADDREF(RemoteLazyInputStream); +NS_IMPL_RELEASE(RemoteLazyInputStream); + +NS_INTERFACE_MAP_BEGIN(RemoteLazyInputStream) + NS_INTERFACE_MAP_ENTRY(nsIInputStream) + NS_INTERFACE_MAP_ENTRY(nsIAsyncInputStream) + NS_INTERFACE_MAP_ENTRY(nsIInputStreamCallback) + NS_INTERFACE_MAP_ENTRY(nsICloneableInputStream) + NS_INTERFACE_MAP_ENTRY(nsICloneableInputStreamWithRange) + NS_INTERFACE_MAP_ENTRY(nsIIPCSerializableInputStream) + NS_INTERFACE_MAP_ENTRY(nsIFileMetadata) + NS_INTERFACE_MAP_ENTRY(nsIAsyncFileMetadata) + NS_INTERFACE_MAP_ENTRY(nsIInputStreamLength) + NS_INTERFACE_MAP_ENTRY(nsIAsyncInputStreamLength) + NS_INTERFACE_MAP_ENTRY(mozIRemoteLazyInputStream) + NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIInputStream) +NS_INTERFACE_MAP_END + +RemoteLazyInputStream::RemoteLazyInputStream(RemoteLazyInputStreamChild* aActor, + uint64_t aStart, uint64_t aLength) + : mStart(aStart), mLength(aLength), mState(eInit), mActor(aActor) { + MOZ_ASSERT(aActor); + + mActor->StreamCreated(); + + auto storage = RemoteLazyInputStreamStorage::Get().unwrapOr(nullptr); + if (storage) { + nsCOMPtr stream; + storage->GetStream(mActor->StreamID(), mStart, mLength, + getter_AddRefs(stream)); + if (stream) { + mState = eRunning; + mInnerStream = stream; + } + } +} + +RemoteLazyInputStream::RemoteLazyInputStream(nsIInputStream* aStream) + : mStart(0), mLength(UINT64_MAX), mState(eRunning), mInnerStream(aStream) {} + +static already_AddRefed BindChildActor( + nsID aId, mozilla::ipc::Endpoint aEndpoint) { + auto* thread = RemoteLazyInputStreamThread::GetOrCreate(); + if (NS_WARN_IF(!thread)) { + return nullptr; + } + auto actor = MakeRefPtr(aId); + thread->Dispatch( + NS_NewRunnableFunction("RemoteLazyInputStream::BindChildActor", + [actor, childEp = std::move(aEndpoint)]() mutable { + bool ok = childEp.Bind(actor); + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Debug, + ("Binding child actor for %s (%p): %s", + nsIDToCString(actor->StreamID()).get(), + actor.get(), ok ? "OK" : "ERROR")); + })); + + return actor.forget(); +} + +already_AddRefed RemoteLazyInputStream::WrapStream( + nsIInputStream* aInputStream) { + MOZ_ASSERT(XRE_IsParentProcess()); + if (nsCOMPtr lazyStream = + do_QueryInterface(aInputStream)) { + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Debug, + ("Returning already-wrapped stream")); + return lazyStream.forget().downcast(); + } + + // If we have a stream and are in the parent process, create a new actor pair + // and transfer ownership of the stream into storage. + auto streamStorage = RemoteLazyInputStreamStorage::Get(); + if (NS_WARN_IF(streamStorage.isErr())) { + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Warning, + ("Cannot wrap with no storage!")); + return nullptr; + } + + nsID id = nsID::GenerateUUID(); + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose, + ("Wrapping stream %p as %s", aInputStream, nsIDToCString(id).get())); + streamStorage.inspect()->AddStream(aInputStream, id); + + mozilla::ipc::Endpoint parentEp; + mozilla::ipc::Endpoint childEp; + MOZ_ALWAYS_SUCCEEDS( + PRemoteLazyInputStream::CreateEndpoints(&parentEp, &childEp)); + + // Bind the actor on our background thread. + streamStorage.inspect()->TaskQueue()->Dispatch(NS_NewRunnableFunction( + "RemoteLazyInputStreamParent::Bind", + [parentEp = std::move(parentEp), id]() mutable { + auto actor = MakeRefPtr(id); + bool ok = parentEp.Bind(actor); + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Debug, + ("Binding parent actor for %s (%p): %s", + nsIDToCString(id).get(), actor.get(), ok ? "OK" : "ERROR")); + })); + + RefPtr actor = + BindChildActor(id, std::move(childEp)); + + if (!actor) { + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Warning, + ("Wrapping stream failed as we are probably late in shutdown!")); + return do_AddRef(new RemoteLazyInputStream()); + } + + return do_AddRef(new RemoteLazyInputStream(actor)); +} + +NS_IMETHODIMP RemoteLazyInputStream::TakeInternalStream( + nsIInputStream** aStream) { + RefPtr actor; + { + MutexAutoLock lock(mMutex); + if (mState == eInit || mState == ePending) { + return NS_BASE_STREAM_WOULD_BLOCK; + } + if (mState == eClosed) { + return NS_BASE_STREAM_CLOSED; + } + if (mInputStreamCallback) { + MOZ_ASSERT_UNREACHABLE( + "Do not call TakeInternalStream after calling AsyncWait"); + return NS_ERROR_UNEXPECTED; + } + + // Take the inner stream and return it, then close ourselves. + if (mInnerStream) { + mInnerStream.forget(aStream); + } else if (mAsyncInnerStream) { + mAsyncInnerStream.forget(aStream); + } + mState = eClosed; + actor = mActor.forget(); + } + if (actor) { + actor->StreamConsumed(); + } + return NS_OK; +} + +NS_IMETHODIMP RemoteLazyInputStream::GetInternalStreamID(nsID& aID) { + MutexAutoLock lock(mMutex); + if (!mActor) { + return NS_ERROR_NOT_AVAILABLE; + } + + aID = mActor->StreamID(); + return NS_OK; +} + +RemoteLazyInputStream::~RemoteLazyInputStream() { Close(); } + +nsCString RemoteLazyInputStream::Describe() { + const char* state = "?"; + switch (mState) { + case eInit: + state = "i"; + break; + case ePending: + state = "p"; + break; + case eRunning: + state = "r"; + break; + case eClosed: + state = "c"; + break; + } + return nsPrintfCString( + "[%p, %s, %s, %p%s, %s%s|%s%s]", this, state, + mActor ? nsIDToCString(mActor->StreamID()).get() : "", + mInnerStream ? mInnerStream.get() : mAsyncInnerStream.get(), + mAsyncInnerStream ? "(A)" : "", mInputStreamCallback ? "I" : "", + mInputStreamCallbackEventTarget ? "+" : "", + mFileMetadataCallback ? "F" : "", + mFileMetadataCallbackEventTarget ? "+" : ""); +} + +// nsIInputStream interface + +NS_IMETHODIMP +RemoteLazyInputStream::Available(uint64_t* aLength) { + nsCOMPtr stream; + { + MutexAutoLock lock(mMutex); + + // We don't have a remoteStream yet: let's return 0. + if (mState == eInit || mState == ePending) { + *aLength = 0; + return NS_OK; + } + + if (mState == eClosed) { + return NS_BASE_STREAM_CLOSED; + } + + MOZ_ASSERT(mState == eRunning); + MOZ_ASSERT(mInnerStream || mAsyncInnerStream); + + nsresult rv = EnsureAsyncRemoteStream(); + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + stream = mAsyncInnerStream; + } + + MOZ_ASSERT(stream); + return stream->Available(aLength); +} + +NS_IMETHODIMP +RemoteLazyInputStream::StreamStatus() { + nsCOMPtr stream; + { + MutexAutoLock lock(mMutex); + + // We don't have a remoteStream yet: let's return 0. + if (mState == eInit || mState == ePending) { + return NS_OK; + } + + if (mState == eClosed) { + return NS_BASE_STREAM_CLOSED; + } + + MOZ_ASSERT(mState == eRunning); + MOZ_ASSERT(mInnerStream || mAsyncInnerStream); + + nsresult rv = EnsureAsyncRemoteStream(); + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + stream = mAsyncInnerStream; + } + + MOZ_ASSERT(stream); + return stream->StreamStatus(); +} + +NS_IMETHODIMP +RemoteLazyInputStream::Read(char* aBuffer, uint32_t aCount, + uint32_t* aReadCount) { + nsCOMPtr stream; + { + MutexAutoLock lock(mMutex); + + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose, + ("Read(%u) %s", aCount, Describe().get())); + + // Read is not available is we don't have a remoteStream. + if (mState == eInit || mState == ePending) { + return NS_BASE_STREAM_WOULD_BLOCK; + } + + if (mState == eClosed) { + return NS_BASE_STREAM_CLOSED; + } + + MOZ_ASSERT(mState == eRunning); + MOZ_ASSERT(mInnerStream || mAsyncInnerStream); + + nsresult rv = EnsureAsyncRemoteStream(); + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + stream = mAsyncInnerStream; + } + + MOZ_ASSERT(stream); + nsresult rv = stream->Read(aBuffer, aCount, aReadCount); + if (NS_FAILED(rv)) { + return rv; + } + + // If some data has been read, we mark the stream as consumed. + if (*aReadCount > 0) { + MarkConsumed(); + } + + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose, + ("Read %u/%u bytes", *aReadCount, aCount)); + + return NS_OK; +} + +NS_IMETHODIMP +RemoteLazyInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, + uint32_t aCount, uint32_t* aResult) { + nsCOMPtr stream; + { + MutexAutoLock lock(mMutex); + + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose, + ("ReadSegments(%u) %s", aCount, Describe().get())); + + // ReadSegments is not available is we don't have a remoteStream. + if (mState == eInit || mState == ePending) { + return NS_BASE_STREAM_WOULD_BLOCK; + } + + if (mState == eClosed) { + return NS_BASE_STREAM_CLOSED; + } + + MOZ_ASSERT(mState == eRunning); + MOZ_ASSERT(mInnerStream || mAsyncInnerStream); + + nsresult rv = EnsureAsyncRemoteStream(); + if (NS_WARN_IF(NS_FAILED(rv))) { + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Warning, + ("EnsureAsyncRemoteStream failed! %s %s", + mozilla::GetStaticErrorName(rv), Describe().get())); + return rv; + } + + stream = mAsyncInnerStream; + } + + MOZ_ASSERT(stream); + nsresult rv = stream->ReadSegments(aWriter, aClosure, aCount, aResult); + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + // If some data has been read, we mark the stream as consumed. + if (*aResult != 0) { + MarkConsumed(); + } + + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose, + ("ReadSegments %u/%u bytes", *aResult, aCount)); + + return NS_OK; +} + +void RemoteLazyInputStream::MarkConsumed() { + RefPtr actor; + { + MutexAutoLock lock(mMutex); + if (mActor) { + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Debug, + ("MarkConsumed %s", Describe().get())); + } + + actor = mActor.forget(); + } + if (actor) { + actor->StreamConsumed(); + } +} + +NS_IMETHODIMP +RemoteLazyInputStream::IsNonBlocking(bool* aNonBlocking) { + *aNonBlocking = true; + return NS_OK; +} + +NS_IMETHODIMP +RemoteLazyInputStream::Close() { + RefPtr actor; + + nsCOMPtr asyncInnerStream; + nsCOMPtr innerStream; + + RefPtr inputStreamCallback; + nsCOMPtr inputStreamCallbackEventTarget; + + { + MutexAutoLock lock(mMutex); + if (mState == eClosed) { + return NS_OK; + } + + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Debug, + ("Close %s", Describe().get())); + + actor = mActor.forget(); + + asyncInnerStream = mAsyncInnerStream.forget(); + innerStream = mInnerStream.forget(); + + // TODO(Bug 1737783): Notify to the mFileMetadataCallback that this + // lazy input stream has been closed. + mFileMetadataCallback = nullptr; + mFileMetadataCallbackEventTarget = nullptr; + + inputStreamCallback = mInputStreamCallback.forget(); + inputStreamCallbackEventTarget = mInputStreamCallbackEventTarget.forget(); + + mState = eClosed; + } + + if (actor) { + actor->StreamConsumed(); + } + + if (inputStreamCallback) { + InputStreamCallbackRunnable::Execute( + inputStreamCallback.forget(), inputStreamCallbackEventTarget.forget(), + this); + } + + if (asyncInnerStream) { + asyncInnerStream->CloseWithStatus(NS_BASE_STREAM_CLOSED); + } + + if (innerStream) { + innerStream->Close(); + } + + return NS_OK; +} + +// nsICloneableInputStream interface + +NS_IMETHODIMP +RemoteLazyInputStream::GetCloneable(bool* aCloneable) { + *aCloneable = true; + return NS_OK; +} + +NS_IMETHODIMP +RemoteLazyInputStream::Clone(nsIInputStream** aResult) { + return CloneWithRange(0, UINT64_MAX, aResult); +} + +// nsICloneableInputStreamWithRange interface + +NS_IMETHODIMP +RemoteLazyInputStream::CloneWithRange(uint64_t aStart, uint64_t aLength, + nsIInputStream** aResult) { + MutexAutoLock lock(mMutex); + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Debug, + ("CloneWithRange %" PRIu64 " %" PRIu64 " %s", aStart, aLength, + Describe().get())); + + nsresult rv; + + RefPtr stream; + if (mState == eClosed) { + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose, ("Cloning closed stream")); + stream = new RemoteLazyInputStream(); + stream.forget(aResult); + return NS_OK; + } + + uint64_t start = 0; + uint64_t length = 0; + auto maxLength = CheckedUint64(mLength) - aStart; + if (maxLength.isValid()) { + start = mStart + aStart; + length = std::min(maxLength.value(), aLength); + } + + // If the slice would be empty, wrap an empty input stream and return it. + if (length == 0) { + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose, ("Creating empty stream")); + + nsCOMPtr emptyStream; + rv = NS_NewCStringInputStream(getter_AddRefs(emptyStream), ""_ns); + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + stream = new RemoteLazyInputStream(emptyStream); + stream.forget(aResult); + return NS_OK; + } + + // If we still have a connection to our actor, that means we haven't read any + // data yet, and can clone + slice by building a new stream backed by the same + // actor. + if (mActor) { + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose, + ("Cloning stream with actor")); + + stream = new RemoteLazyInputStream(mActor, start, length); + stream.forget(aResult); + return NS_OK; + } + + // We no longer have our actor, either because we were constructed without + // one, or we've already begun reading. Perform the clone locally on our inner + // input stream. + + nsCOMPtr innerStream = mInnerStream; + if (mAsyncInnerStream) { + innerStream = mAsyncInnerStream; + } + + nsCOMPtr cloneable = do_QueryInterface(innerStream); + if (!cloneable || !cloneable->GetCloneable()) { + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose, + ("Cloning non-cloneable stream - copying to pipe")); + + // If our internal stream isn't cloneable, to perform a clone we'll need to + // copy into a pipe and replace our internal stream. + nsCOMPtr pipeIn; + nsCOMPtr pipeOut; + NS_NewPipe2(getter_AddRefs(pipeIn), getter_AddRefs(pipeOut), true, true); + + RefPtr thread = + RemoteLazyInputStreamThread::GetOrCreate(); + if (NS_WARN_IF(!thread)) { + return NS_ERROR_ILLEGAL_DURING_SHUTDOWN; + } + + mAsyncInnerStream = pipeIn; + mInnerStream = nullptr; + + // If we have a callback pending, we need to re-call AsyncWait on the inner + // stream. This should not re-enter us immediately, as `pipeIn` hasn't been + // sent any data yet, but we may be called again as soon as `NS_AsyncCopy` + // has begun copying. + if (mInputStreamCallback) { + mAsyncInnerStream->AsyncWait(this, mInputStreamCallbackFlags, + mInputStreamCallbackRequestedCount, + mInputStreamCallbackEventTarget); + } + + rv = NS_AsyncCopy(innerStream, pipeOut, thread, + NS_ASYNCCOPY_VIA_WRITESEGMENTS); + if (NS_WARN_IF(NS_FAILED(rv))) { + // The copy failed, revert the changes we did and restore our previous + // inner stream. + mAsyncInnerStream = nullptr; + mInnerStream = innerStream; + return rv; + } + + cloneable = do_QueryInterface(mAsyncInnerStream); + } + + MOZ_ASSERT(cloneable && cloneable->GetCloneable()); + + // Check if we can clone more efficiently with a range. + if (length < UINT64_MAX) { + if (nsCOMPtr cloneableWithRange = + do_QueryInterface(cloneable)) { + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose, ("Cloning with range")); + nsCOMPtr cloned; + rv = cloneableWithRange->CloneWithRange(start, length, + getter_AddRefs(cloned)); + if (NS_FAILED(rv)) { + return rv; + } + + stream = new RemoteLazyInputStream(cloned); + stream.forget(aResult); + return NS_OK; + } + } + + // Directly clone our inner stream, and then slice it if needed. + nsCOMPtr cloned; + rv = cloneable->Clone(getter_AddRefs(cloned)); + if (NS_FAILED(rv)) { + return rv; + } + + if (length < UINT64_MAX) { + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose, + ("Slicing stream with %" PRIu64 " %" PRIu64, start, length)); + cloned = new SlicedInputStream(cloned.forget(), start, length); + } + + stream = new RemoteLazyInputStream(cloned); + stream.forget(aResult); + return NS_OK; +} + +// nsIAsyncInputStream interface + +NS_IMETHODIMP +RemoteLazyInputStream::CloseWithStatus(nsresult aStatus) { return Close(); } + +NS_IMETHODIMP +RemoteLazyInputStream::AsyncWait(nsIInputStreamCallback* aCallback, + uint32_t aFlags, uint32_t aRequestedCount, + nsIEventTarget* aEventTarget) { + // Ensure we always have an event target for AsyncWait callbacks, so that + // calls to `AsyncWait` cannot reenter us with `OnInputStreamReady`. + nsCOMPtr eventTarget = aEventTarget; + if (aCallback && !eventTarget) { + eventTarget = RemoteLazyInputStreamThread::GetOrCreate(); + if (NS_WARN_IF(!eventTarget)) { + return NS_ERROR_ILLEGAL_DURING_SHUTDOWN; + } + } + + { + MutexAutoLock lock(mMutex); + + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose, + ("AsyncWait(%p, %u, %u, %p) %s", aCallback, aFlags, aRequestedCount, + aEventTarget, Describe().get())); + + // See RemoteLazyInputStream.h for more information about this state + // machine. + + nsCOMPtr stream; + switch (mState) { + // First call, we need to retrieve the stream from the parent actor. + case eInit: + MOZ_ASSERT(mActor); + + mInputStreamCallback = aCallback; + mInputStreamCallbackEventTarget = eventTarget; + mInputStreamCallbackFlags = aFlags; + mInputStreamCallbackRequestedCount = aRequestedCount; + mState = ePending; + + StreamNeeded(); + return NS_OK; + + // We are still waiting for the remote inputStream + case ePending: { + if (NS_WARN_IF(mInputStreamCallback && aCallback && + mInputStreamCallback != aCallback)) { + return NS_ERROR_FAILURE; + } + + mInputStreamCallback = aCallback; + mInputStreamCallbackEventTarget = eventTarget; + mInputStreamCallbackFlags = aFlags; + mInputStreamCallbackRequestedCount = aRequestedCount; + return NS_OK; + } + + // We have the remote inputStream, let's check if we can execute the + // callback. + case eRunning: { + if (NS_WARN_IF(mInputStreamCallback && aCallback && + mInputStreamCallback != aCallback)) { + return NS_ERROR_FAILURE; + } + + nsresult rv = EnsureAsyncRemoteStream(); + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + mInputStreamCallback = aCallback; + mInputStreamCallbackEventTarget = eventTarget; + mInputStreamCallbackFlags = aFlags; + mInputStreamCallbackRequestedCount = aRequestedCount; + + stream = mAsyncInnerStream; + break; + } + + case eClosed: + [[fallthrough]]; + default: + MOZ_ASSERT(mState == eClosed); + if (NS_WARN_IF(mInputStreamCallback && aCallback && + mInputStreamCallback != aCallback)) { + return NS_ERROR_FAILURE; + } + break; + } + + if (stream) { + return stream->AsyncWait(aCallback ? this : nullptr, aFlags, + aRequestedCount, eventTarget); + } + } + + if (aCallback) { + // if stream is nullptr here, that probably means the stream has + // been closed and the callback can be executed immediately + InputStreamCallbackRunnable::Execute(do_AddRef(aCallback), + do_AddRef(eventTarget), this); + } + return NS_OK; +} + +void RemoteLazyInputStream::StreamNeeded() { + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Debug, + ("StreamNeeded %s", Describe().get())); + + auto* thread = RemoteLazyInputStreamThread::GetOrCreate(); + if (NS_WARN_IF(!thread)) { + return; + } + thread->Dispatch(NS_NewRunnableFunction( + "RemoteLazyInputStream::StreamNeeded", + [self = RefPtr{this}, actor = mActor, start = mStart, length = mLength] { + MOZ_LOG( + gRemoteLazyStreamLog, LogLevel::Debug, + ("Sending StreamNeeded(%" PRIu64 " %" PRIu64 ") %s %d", start, + length, nsIDToCString(actor->StreamID()).get(), actor->CanSend())); + + actor->SendStreamNeeded( + start, length, + [self](const Maybe& aStream) { + // Try to deserialize the stream from our remote, and close our + // stream if it fails. + nsCOMPtr stream = + mozilla::ipc::DeserializeIPCStream(aStream); + if (NS_WARN_IF(!stream)) { + NS_WARNING("Failed to deserialize IPC stream"); + self->Close(); + } + + // Lock our mutex to update the inner stream, and collect any + // callbacks which we need to invoke. + MutexAutoLock lock(self->mMutex); + + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Debug, + ("ResolveStreamNeeded(%p) %s", stream.get(), + self->Describe().get())); + + if (self->mState == ePending) { + self->mInnerStream = stream.forget(); + self->mState = eRunning; + + // Notify any listeners that we've now acquired the underlying + // stream, so file metadata information will be available. + nsCOMPtr fileMetadataCallback = + self->mFileMetadataCallback.forget(); + nsCOMPtr fileMetadataCallbackEventTarget = + self->mFileMetadataCallbackEventTarget.forget(); + if (fileMetadataCallback) { + FileMetadataCallbackRunnable::Execute( + fileMetadataCallback, fileMetadataCallbackEventTarget, + self); + } + + // **NOTE** we can re-enter this class here **NOTE** + // If we already have an input stream callback, attempt to + // register ourselves with AsyncWait on the underlying stream. + if (self->mInputStreamCallback) { + if (NS_FAILED(self->EnsureAsyncRemoteStream()) || + NS_FAILED(self->mAsyncInnerStream->AsyncWait( + self, self->mInputStreamCallbackFlags, + self->mInputStreamCallbackRequestedCount, + self->mInputStreamCallbackEventTarget))) { + InputStreamCallbackRunnable::Execute( + self->mInputStreamCallback.forget(), + self->mInputStreamCallbackEventTarget.forget(), self); + } + } + } + + if (stream) { + NS_WARNING("Failed to save stream, closing it"); + stream->Close(); + } + }, + [self](mozilla::ipc::ResponseRejectReason) { + NS_WARNING("SendStreamNeeded rejected"); + self->Close(); + }); + })); +} + +// nsIInputStreamCallback + +NS_IMETHODIMP +RemoteLazyInputStream::OnInputStreamReady(nsIAsyncInputStream* aStream) { + RefPtr callback; + nsCOMPtr callbackEventTarget; + { + MutexAutoLock lock(mMutex); + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Debug, + ("OnInputStreamReady %s", Describe().get())); + + // We have been closed in the meantime. + if (mState == eClosed) { + return NS_OK; + } + + // We got a callback from the wrong stream, likely due to a `CloneWithRange` + // call while we were waiting. Ignore this callback. + if (mAsyncInnerStream != aStream) { + return NS_OK; + } + + MOZ_ASSERT(mState == eRunning); + + // The callback has been canceled in the meantime. + if (!mInputStreamCallback) { + return NS_OK; + } + + callback.swap(mInputStreamCallback); + callbackEventTarget.swap(mInputStreamCallbackEventTarget); + } + + // This must be the last operation because the execution of the callback can + // be synchronous. + MOZ_ASSERT(callback); + InputStreamCallbackRunnable::Execute(callback.forget(), + callbackEventTarget.forget(), this); + return NS_OK; +} + +// nsIIPCSerializableInputStream + +void RemoteLazyInputStream::SerializedComplexity(uint32_t aMaxSize, + uint32_t* aSizeUsed, + uint32_t* aNewPipes, + uint32_t* aTransferables) { + *aTransferables = 1; +} + +void RemoteLazyInputStream::Serialize(mozilla::ipc::InputStreamParams& aParams, + uint32_t aMaxSize, uint32_t* aSizeUsed) { + *aSizeUsed = 0; + aParams = mozilla::ipc::RemoteLazyInputStreamParams(this); +} + +bool RemoteLazyInputStream::Deserialize( + const mozilla::ipc::InputStreamParams& aParams) { + MOZ_CRASH("This should never be called."); + return false; +} + +// nsIAsyncFileMetadata + +NS_IMETHODIMP +RemoteLazyInputStream::AsyncFileMetadataWait(nsIFileMetadataCallback* aCallback, + nsIEventTarget* aEventTarget) { + MOZ_ASSERT(!!aCallback == !!aEventTarget); + + // If we have the callback, we must have the event target. + if (NS_WARN_IF(!!aCallback != !!aEventTarget)) { + return NS_ERROR_FAILURE; + } + + // See RemoteLazyInputStream.h for more information about this state + // machine. + + { + MutexAutoLock lock(mMutex); + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Debug, + ("AsyncFileMetadataWait(%p, %p) %s", aCallback, aEventTarget, + Describe().get())); + + switch (mState) { + // First call, we need to retrieve the stream from the parent actor. + case eInit: + MOZ_ASSERT(mActor); + + mFileMetadataCallback = aCallback; + mFileMetadataCallbackEventTarget = aEventTarget; + mState = ePending; + + StreamNeeded(); + return NS_OK; + + // We are still waiting for the remote inputStream + case ePending: + if (mFileMetadataCallback && aCallback) { + return NS_ERROR_FAILURE; + } + + mFileMetadataCallback = aCallback; + mFileMetadataCallbackEventTarget = aEventTarget; + return NS_OK; + + // We have the remote inputStream, let's check if we can execute the + // callback. + case eRunning: + break; + + // Stream is closed. + default: + MOZ_ASSERT(mState == eClosed); + return NS_BASE_STREAM_CLOSED; + } + + MOZ_ASSERT(mState == eRunning); + } + + FileMetadataCallbackRunnable::Execute(aCallback, aEventTarget, this); + return NS_OK; +} + +// nsIFileMetadata + +NS_IMETHODIMP +RemoteLazyInputStream::GetSize(int64_t* aRetval) { + nsCOMPtr fileMetadata; + { + MutexAutoLock lock(mMutex); + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose, + ("GetSize %s", Describe().get())); + + fileMetadata = do_QueryInterface(mInnerStream); + if (!fileMetadata) { + return mState == eClosed ? NS_BASE_STREAM_CLOSED : NS_ERROR_FAILURE; + } + } + + return fileMetadata->GetSize(aRetval); +} + +NS_IMETHODIMP +RemoteLazyInputStream::GetLastModified(int64_t* aRetval) { + nsCOMPtr fileMetadata; + { + MutexAutoLock lock(mMutex); + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose, + ("GetLastModified %s", Describe().get())); + + fileMetadata = do_QueryInterface(mInnerStream); + if (!fileMetadata) { + return mState == eClosed ? NS_BASE_STREAM_CLOSED : NS_ERROR_FAILURE; + } + } + + return fileMetadata->GetLastModified(aRetval); +} + +NS_IMETHODIMP +RemoteLazyInputStream::GetFileDescriptor(PRFileDesc** aRetval) { + nsCOMPtr fileMetadata; + { + MutexAutoLock lock(mMutex); + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose, + ("GetFileDescriptor %s", Describe().get())); + + fileMetadata = do_QueryInterface(mInnerStream); + if (!fileMetadata) { + return mState == eClosed ? NS_BASE_STREAM_CLOSED : NS_ERROR_FAILURE; + } + } + + return fileMetadata->GetFileDescriptor(aRetval); +} + +nsresult RemoteLazyInputStream::EnsureAsyncRemoteStream() { + // We already have an async remote stream. + if (mAsyncInnerStream) { + return NS_OK; + } + + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Debug, + ("EnsureAsyncRemoteStream %s", Describe().get())); + + if (NS_WARN_IF(!mInnerStream)) { + return NS_ERROR_FAILURE; + } + + nsCOMPtr stream = mInnerStream; + + // Check if the stream is blocking, if it is, we want to make it non-blocking + // using a pipe. + bool nonBlocking = false; + nsresult rv = stream->IsNonBlocking(&nonBlocking); + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + // We don't return NS_ERROR_NOT_IMPLEMENTED from ReadSegments, + // so it's possible that callers are expecting us to succeed in the future. + // We need to make sure the stream we return here supports ReadSegments, + // so wrap if in a buffered stream if necessary. + // + // We only need to do this if we won't be wrapping the stream in a pipe, which + // will add buffering anyway. + if (nonBlocking && !NS_InputStreamIsBuffered(stream)) { + nsCOMPtr bufferedStream; + nsresult rv = NS_NewBufferedInputStream(getter_AddRefs(bufferedStream), + stream.forget(), 4096); + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + stream = bufferedStream; + } + + nsCOMPtr asyncStream = do_QueryInterface(stream); + + // If non-blocking and non-async, let's use NonBlockingAsyncInputStream. + if (nonBlocking && !asyncStream) { + rv = NonBlockingAsyncInputStream::Create(stream.forget(), + getter_AddRefs(asyncStream)); + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + MOZ_ASSERT(asyncStream); + } + + if (!asyncStream) { + // Let's make the stream async using the DOMFile thread. + nsCOMPtr pipeIn; + nsCOMPtr pipeOut; + NS_NewPipe2(getter_AddRefs(pipeIn), getter_AddRefs(pipeOut), true, true); + + RefPtr thread = + RemoteLazyInputStreamThread::GetOrCreate(); + if (NS_WARN_IF(!thread)) { + return NS_ERROR_ILLEGAL_DURING_SHUTDOWN; + } + + rv = NS_AsyncCopy(stream, pipeOut, thread, NS_ASYNCCOPY_VIA_WRITESEGMENTS); + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + asyncStream = pipeIn; + } + + MOZ_ASSERT(asyncStream); + mAsyncInnerStream = asyncStream; + mInnerStream = nullptr; + + return NS_OK; +} + +// nsIInputStreamLength + +NS_IMETHODIMP +RemoteLazyInputStream::Length(int64_t* aLength) { + MutexAutoLock lock(mMutex); + + if (mState == eClosed) { + return NS_BASE_STREAM_CLOSED; + } + + if (!mActor) { + return NS_ERROR_NOT_AVAILABLE; + } + + return NS_BASE_STREAM_WOULD_BLOCK; +} + +namespace { + +class InputStreamLengthCallbackRunnable final : public DiscardableRunnable { + public: + static void Execute(nsIInputStreamLengthCallback* aCallback, + nsIEventTarget* aEventTarget, + RemoteLazyInputStream* aStream, int64_t aLength) { + MOZ_ASSERT(aCallback); + MOZ_ASSERT(aEventTarget); + + RefPtr runnable = + new InputStreamLengthCallbackRunnable(aCallback, aStream, aLength); + + nsCOMPtr target = aEventTarget; + target->Dispatch(runnable, NS_DISPATCH_NORMAL); + } + + NS_IMETHOD + Run() override { + mCallback->OnInputStreamLengthReady(mStream, mLength); + mCallback = nullptr; + mStream = nullptr; + return NS_OK; + } + + private: + InputStreamLengthCallbackRunnable(nsIInputStreamLengthCallback* aCallback, + RemoteLazyInputStream* aStream, + int64_t aLength) + : DiscardableRunnable("dom::InputStreamLengthCallbackRunnable"), + mCallback(aCallback), + mStream(aStream), + mLength(aLength) { + MOZ_ASSERT(mCallback); + MOZ_ASSERT(mStream); + } + + nsCOMPtr mCallback; + RefPtr mStream; + const int64_t mLength; +}; + +} // namespace + +// nsIAsyncInputStreamLength + +NS_IMETHODIMP +RemoteLazyInputStream::AsyncLengthWait(nsIInputStreamLengthCallback* aCallback, + nsIEventTarget* aEventTarget) { + // If we have the callback, we must have the event target. + if (NS_WARN_IF(!!aCallback != !!aEventTarget)) { + return NS_ERROR_FAILURE; + } + + { + MutexAutoLock lock(mMutex); + + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose, + ("AsyncLengthWait(%p, %p) %s", aCallback, aEventTarget, + Describe().get())); + + if (mActor) { + if (aCallback) { + auto* thread = RemoteLazyInputStreamThread::GetOrCreate(); + if (NS_WARN_IF(!thread)) { + return NS_ERROR_ILLEGAL_DURING_SHUTDOWN; + } + thread->Dispatch(NS_NewRunnableFunction( + "RemoteLazyInputStream::AsyncLengthWait", + [self = RefPtr{this}, actor = mActor, + callback = nsCOMPtr{aCallback}, + eventTarget = nsCOMPtr{aEventTarget}] { + actor->SendLengthNeeded( + [self, callback, eventTarget](int64_t aLength) { + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose, + ("AsyncLengthWait resolve %" PRId64, aLength)); + int64_t length = -1; + if (aLength > 0) { + uint64_t sourceLength = + aLength - std::min(aLength, self->mStart); + length = int64_t( + std::min(sourceLength, self->mLength)); + } + InputStreamLengthCallbackRunnable::Execute( + callback, eventTarget, self, length); + }, + [self, callback, + eventTarget](mozilla::ipc::ResponseRejectReason) { + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Warning, + ("AsyncLengthWait reject")); + InputStreamLengthCallbackRunnable::Execute( + callback, eventTarget, self, -1); + }); + })); + } + + return NS_OK; + } + } + + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose, + ("AsyncLengthWait immediate")); + + // If execution has reached here, it means the stream is either closed or + // consumed, and therefore the callback can be executed immediately + InputStreamLengthCallbackRunnable::Execute(aCallback, aEventTarget, this, -1); + return NS_OK; +} + +void RemoteLazyInputStream::IPCWrite(IPC::MessageWriter* aWriter) { + // If we have an actor still, serialize efficiently by cloning our actor to + // maintain a reference to the parent side. + RefPtr actor; + + nsCOMPtr innerStream; + + RefPtr inputStreamCallback; + nsCOMPtr inputStreamCallbackEventTarget; + + { + MutexAutoLock lock(mMutex); + + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose, + ("Serialize %s", Describe().get())); + + actor = mActor.forget(); + + if (mAsyncInnerStream) { + MOZ_ASSERT(!mInnerStream); + innerStream = mAsyncInnerStream.forget(); + } else { + innerStream = mInnerStream.forget(); + } + + // TODO(Bug 1737783): Notify to the mFileMetadataCallback that this + // lazy input stream has been closed. + mFileMetadataCallback = nullptr; + mFileMetadataCallbackEventTarget = nullptr; + + inputStreamCallback = mInputStreamCallback.forget(); + inputStreamCallbackEventTarget = mInputStreamCallbackEventTarget.forget(); + + mState = eClosed; + } + + if (inputStreamCallback) { + InputStreamCallbackRunnable::Execute( + inputStreamCallback.forget(), inputStreamCallbackEventTarget.forget(), + this); + } + + bool closed = !actor && !innerStream; + IPC::WriteParam(aWriter, closed); + if (closed) { + return; + } + + // If we still have a connection to our remote actor, create a clone endpoint + // for it and tell it that the stream has been consumed. The clone of the + // connection can be transferred to another process. + if (actor) { + MOZ_LOG( + gRemoteLazyStreamLog, LogLevel::Debug, + ("Serializing as actor: %s", nsIDToCString(actor->StreamID()).get())); + // Create a clone of the actor, and then tell it that this stream is no + // longer referencing it. + mozilla::ipc::Endpoint parentEp; + mozilla::ipc::Endpoint childEp; + MOZ_ALWAYS_SUCCEEDS( + PRemoteLazyInputStream::CreateEndpoints(&parentEp, &childEp)); + + auto* thread = RemoteLazyInputStreamThread::GetOrCreate(); + if (thread) { + thread->Dispatch(NS_NewRunnableFunction( + "RemoteLazyInputStreamChild::SendClone", + [actor, parentEp = std::move(parentEp)]() mutable { + bool ok = actor->SendClone(std::move(parentEp)); + MOZ_LOG( + gRemoteLazyStreamLog, LogLevel::Verbose, + ("SendClone for %s: %s", nsIDToCString(actor->StreamID()).get(), + ok ? "OK" : "ERR")); + })); + + } // else we are shutting down xpcom threads. + + // NOTE: Call `StreamConsumed` after dispatching the `SendClone` runnable, + // as this method may dispatch a runnable to `RemoteLazyInputStreamThread` + // to call `SendGoodbye`, which needs to happen after `SendClone`. + actor->StreamConsumed(); + + IPC::WriteParam(aWriter, actor->StreamID()); + IPC::WriteParam(aWriter, mStart); + IPC::WriteParam(aWriter, mLength); + IPC::WriteParam(aWriter, std::move(childEp)); + + if (innerStream) { + innerStream->Close(); + } + return; + } + + // If we have a stream and are in the parent process, create a new actor pair + // and transfer ownership of the stream into storage. + auto streamStorage = RemoteLazyInputStreamStorage::Get(); + if (streamStorage.isOk()) { + MOZ_ASSERT(XRE_IsParentProcess()); + nsID id = nsID::GenerateUUID(); + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Debug, + ("Serializing as new stream: %s", nsIDToCString(id).get())); + + streamStorage.inspect()->AddStream(innerStream, id); + + mozilla::ipc::Endpoint parentEp; + mozilla::ipc::Endpoint childEp; + MOZ_ALWAYS_SUCCEEDS( + PRemoteLazyInputStream::CreateEndpoints(&parentEp, &childEp)); + + // Bind the actor on our background thread. + streamStorage.inspect()->TaskQueue()->Dispatch(NS_NewRunnableFunction( + "RemoteLazyInputStreamParent::Bind", + [parentEp = std::move(parentEp), id]() mutable { + auto stream = MakeRefPtr(id); + parentEp.Bind(stream); + })); + + IPC::WriteParam(aWriter, id); + IPC::WriteParam(aWriter, 0); + IPC::WriteParam(aWriter, UINT64_MAX); + IPC::WriteParam(aWriter, std::move(childEp)); + return; + } + + MOZ_CRASH("Cannot serialize new RemoteLazyInputStream from this process"); +} + +already_AddRefed RemoteLazyInputStream::IPCRead( + IPC::MessageReader* aReader) { + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose, ("Deserialize")); + + bool closed; + if (NS_WARN_IF(!IPC::ReadParam(aReader, &closed))) { + return nullptr; + } + if (closed) { + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose, + ("Deserialize closed stream")); + return do_AddRef(new RemoteLazyInputStream()); + } + + nsID id{}; + uint64_t start; + uint64_t length; + mozilla::ipc::Endpoint endpoint; + if (NS_WARN_IF(!IPC::ReadParam(aReader, &id)) || + NS_WARN_IF(!IPC::ReadParam(aReader, &start)) || + NS_WARN_IF(!IPC::ReadParam(aReader, &length)) || + NS_WARN_IF(!IPC::ReadParam(aReader, &endpoint))) { + return nullptr; + } + + if (!endpoint.IsValid()) { + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Warning, + ("Deserialize failed due to invalid endpoint!")); + return do_AddRef(new RemoteLazyInputStream()); + } + + RefPtr actor = + BindChildActor(id, std::move(endpoint)); + + if (!actor) { + MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Warning, + ("Deserialize failed as we are probably late in shutdown!")); + return do_AddRef(new RemoteLazyInputStream()); + } + + return do_AddRef(new RemoteLazyInputStream(actor, start, length)); +} + +} // namespace mozilla + +void IPC::ParamTraits::Write( + IPC::MessageWriter* aWriter, mozilla::RemoteLazyInputStream* aParam) { + bool nonNull = !!aParam; + IPC::WriteParam(aWriter, nonNull); + if (aParam) { + aParam->IPCWrite(aWriter); + } +} + +bool IPC::ParamTraits::Read( + IPC::MessageReader* aReader, + RefPtr* aResult) { + bool nonNull = false; + if (!IPC::ReadParam(aReader, &nonNull)) { + return false; + } + if (!nonNull) { + *aResult = nullptr; + return true; + } + *aResult = mozilla::RemoteLazyInputStream::IPCRead(aReader); + return *aResult; +} -- cgit v1.2.3