summaryrefslogtreecommitdiffstats
path: root/dom/file/ipc/RemoteLazyInputStream.cpp
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--dom/file/ipc/RemoteLazyInputStream.cpp1424
1 files changed, 1424 insertions, 0 deletions
diff --git a/dom/file/ipc/RemoteLazyInputStream.cpp b/dom/file/ipc/RemoteLazyInputStream.cpp
new file mode 100644
index 0000000000..726581ae13
--- /dev/null
+++ b/dom/file/ipc/RemoteLazyInputStream.cpp
@@ -0,0 +1,1424 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: set ts=8 sts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#include "RemoteLazyInputStream.h"
+#include "RemoteLazyInputStreamChild.h"
+#include "RemoteLazyInputStreamParent.h"
+#include "chrome/common/ipc_message_utils.h"
+#include "mozilla/ErrorNames.h"
+#include "mozilla/Logging.h"
+#include "mozilla/PRemoteLazyInputStream.h"
+#include "mozilla/ipc/Endpoint.h"
+#include "mozilla/ipc/InputStreamParams.h"
+#include "mozilla/ipc/MessageChannel.h"
+#include "mozilla/ipc/ProtocolMessageUtils.h"
+#include "mozilla/net/SocketProcessParent.h"
+#include "mozilla/SlicedInputStream.h"
+#include "mozilla/NonBlockingAsyncInputStream.h"
+#include "nsIAsyncInputStream.h"
+#include "nsIAsyncOutputStream.h"
+#include "nsID.h"
+#include "nsIInputStream.h"
+#include "nsIPipe.h"
+#include "nsNetUtil.h"
+#include "nsStreamUtils.h"
+#include "nsStringStream.h"
+#include "RemoteLazyInputStreamStorage.h"
+#include "RemoteLazyInputStreamThread.h"
+
+namespace mozilla {
+
+mozilla::LazyLogModule gRemoteLazyStreamLog("RemoteLazyStream");
+
+namespace {
+
+class InputStreamCallbackRunnable final : public DiscardableRunnable {
+ public:
+ // Note that the execution can be synchronous in case the event target is
+ // null.
+ static void Execute(already_AddRefed<nsIInputStreamCallback> aCallback,
+ already_AddRefed<nsIEventTarget> aEventTarget,
+ RemoteLazyInputStream* aStream) {
+ RefPtr<InputStreamCallbackRunnable> runnable =
+ new InputStreamCallbackRunnable(std::move(aCallback), aStream);
+
+ nsCOMPtr<nsIEventTarget> target = std::move(aEventTarget);
+ if (target) {
+ target->Dispatch(runnable, NS_DISPATCH_NORMAL);
+ } else {
+ runnable->Run();
+ }
+ }
+
+ NS_IMETHOD
+ Run() override {
+ mCallback->OnInputStreamReady(mStream);
+ mCallback = nullptr;
+ mStream = nullptr;
+ return NS_OK;
+ }
+
+ private:
+ InputStreamCallbackRunnable(
+ already_AddRefed<nsIInputStreamCallback> aCallback,
+ RemoteLazyInputStream* aStream)
+ : DiscardableRunnable("dom::InputStreamCallbackRunnable"),
+ mCallback(std::move(aCallback)),
+ mStream(aStream) {
+ MOZ_ASSERT(mCallback);
+ MOZ_ASSERT(mStream);
+ }
+
+ RefPtr<nsIInputStreamCallback> mCallback;
+ RefPtr<RemoteLazyInputStream> mStream;
+};
+
+class FileMetadataCallbackRunnable final : public DiscardableRunnable {
+ public:
+ static void Execute(nsIFileMetadataCallback* aCallback,
+ nsIEventTarget* aEventTarget,
+ RemoteLazyInputStream* aStream) {
+ MOZ_ASSERT(aCallback);
+ MOZ_ASSERT(aEventTarget);
+
+ RefPtr<FileMetadataCallbackRunnable> runnable =
+ new FileMetadataCallbackRunnable(aCallback, aStream);
+
+ nsCOMPtr<nsIEventTarget> target = aEventTarget;
+ target->Dispatch(runnable, NS_DISPATCH_NORMAL);
+ }
+
+ NS_IMETHOD
+ Run() override {
+ mCallback->OnFileMetadataReady(mStream);
+ mCallback = nullptr;
+ mStream = nullptr;
+ return NS_OK;
+ }
+
+ private:
+ FileMetadataCallbackRunnable(nsIFileMetadataCallback* aCallback,
+ RemoteLazyInputStream* aStream)
+ : DiscardableRunnable("dom::FileMetadataCallbackRunnable"),
+ mCallback(aCallback),
+ mStream(aStream) {
+ MOZ_ASSERT(mCallback);
+ MOZ_ASSERT(mStream);
+ }
+
+ nsCOMPtr<nsIFileMetadataCallback> mCallback;
+ RefPtr<RemoteLazyInputStream> mStream;
+};
+
+} // namespace
+
+NS_IMPL_ADDREF(RemoteLazyInputStream);
+NS_IMPL_RELEASE(RemoteLazyInputStream);
+
+NS_INTERFACE_MAP_BEGIN(RemoteLazyInputStream)
+ NS_INTERFACE_MAP_ENTRY(nsIInputStream)
+ NS_INTERFACE_MAP_ENTRY(nsIAsyncInputStream)
+ NS_INTERFACE_MAP_ENTRY(nsIInputStreamCallback)
+ NS_INTERFACE_MAP_ENTRY(nsICloneableInputStream)
+ NS_INTERFACE_MAP_ENTRY(nsICloneableInputStreamWithRange)
+ NS_INTERFACE_MAP_ENTRY(nsIIPCSerializableInputStream)
+ NS_INTERFACE_MAP_ENTRY(nsIFileMetadata)
+ NS_INTERFACE_MAP_ENTRY(nsIAsyncFileMetadata)
+ NS_INTERFACE_MAP_ENTRY(nsIInputStreamLength)
+ NS_INTERFACE_MAP_ENTRY(nsIAsyncInputStreamLength)
+ NS_INTERFACE_MAP_ENTRY(mozIRemoteLazyInputStream)
+ NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIInputStream)
+NS_INTERFACE_MAP_END
+
+RemoteLazyInputStream::RemoteLazyInputStream(RemoteLazyInputStreamChild* aActor,
+ uint64_t aStart, uint64_t aLength)
+ : mStart(aStart), mLength(aLength), mState(eInit), mActor(aActor) {
+ MOZ_ASSERT(aActor);
+
+ mActor->StreamCreated();
+
+ auto storage = RemoteLazyInputStreamStorage::Get().unwrapOr(nullptr);
+ if (storage) {
+ nsCOMPtr<nsIInputStream> stream;
+ storage->GetStream(mActor->StreamID(), mStart, mLength,
+ getter_AddRefs(stream));
+ if (stream) {
+ mState = eRunning;
+ mInnerStream = stream;
+ }
+ }
+}
+
+RemoteLazyInputStream::RemoteLazyInputStream(nsIInputStream* aStream)
+ : mStart(0), mLength(UINT64_MAX), mState(eRunning), mInnerStream(aStream) {}
+
+static already_AddRefed<RemoteLazyInputStreamChild> BindChildActor(
+ nsID aId, mozilla::ipc::Endpoint<PRemoteLazyInputStreamChild> aEndpoint) {
+ auto* thread = RemoteLazyInputStreamThread::GetOrCreate();
+ if (NS_WARN_IF(!thread)) {
+ return nullptr;
+ }
+ auto actor = MakeRefPtr<RemoteLazyInputStreamChild>(aId);
+ thread->Dispatch(
+ NS_NewRunnableFunction("RemoteLazyInputStream::BindChildActor",
+ [actor, childEp = std::move(aEndpoint)]() mutable {
+ bool ok = childEp.Bind(actor);
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Debug,
+ ("Binding child actor for %s (%p): %s",
+ nsIDToCString(actor->StreamID()).get(),
+ actor.get(), ok ? "OK" : "ERROR"));
+ }));
+
+ return actor.forget();
+}
+
+already_AddRefed<RemoteLazyInputStream> RemoteLazyInputStream::WrapStream(
+ nsIInputStream* aInputStream) {
+ MOZ_ASSERT(XRE_IsParentProcess());
+ if (nsCOMPtr<mozIRemoteLazyInputStream> lazyStream =
+ do_QueryInterface(aInputStream)) {
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Debug,
+ ("Returning already-wrapped stream"));
+ return lazyStream.forget().downcast<RemoteLazyInputStream>();
+ }
+
+ // If we have a stream and are in the parent process, create a new actor pair
+ // and transfer ownership of the stream into storage.
+ auto streamStorage = RemoteLazyInputStreamStorage::Get();
+ if (NS_WARN_IF(streamStorage.isErr())) {
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Warning,
+ ("Cannot wrap with no storage!"));
+ return nullptr;
+ }
+
+ nsID id = nsID::GenerateUUID();
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose,
+ ("Wrapping stream %p as %s", aInputStream, nsIDToCString(id).get()));
+ streamStorage.inspect()->AddStream(aInputStream, id);
+
+ mozilla::ipc::Endpoint<PRemoteLazyInputStreamParent> parentEp;
+ mozilla::ipc::Endpoint<PRemoteLazyInputStreamChild> childEp;
+ MOZ_ALWAYS_SUCCEEDS(
+ PRemoteLazyInputStream::CreateEndpoints(&parentEp, &childEp));
+
+ // Bind the actor on our background thread.
+ streamStorage.inspect()->TaskQueue()->Dispatch(NS_NewRunnableFunction(
+ "RemoteLazyInputStreamParent::Bind",
+ [parentEp = std::move(parentEp), id]() mutable {
+ auto actor = MakeRefPtr<RemoteLazyInputStreamParent>(id);
+ bool ok = parentEp.Bind(actor);
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Debug,
+ ("Binding parent actor for %s (%p): %s",
+ nsIDToCString(id).get(), actor.get(), ok ? "OK" : "ERROR"));
+ }));
+
+ RefPtr<RemoteLazyInputStreamChild> actor =
+ BindChildActor(id, std::move(childEp));
+
+ if (!actor) {
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Warning,
+ ("Wrapping stream failed as we are probably late in shutdown!"));
+ return do_AddRef(new RemoteLazyInputStream());
+ }
+
+ return do_AddRef(new RemoteLazyInputStream(actor));
+}
+
+NS_IMETHODIMP RemoteLazyInputStream::TakeInternalStream(
+ nsIInputStream** aStream) {
+ RefPtr<RemoteLazyInputStreamChild> actor;
+ {
+ MutexAutoLock lock(mMutex);
+ if (mState == eInit || mState == ePending) {
+ return NS_BASE_STREAM_WOULD_BLOCK;
+ }
+ if (mState == eClosed) {
+ return NS_BASE_STREAM_CLOSED;
+ }
+ if (mInputStreamCallback) {
+ MOZ_ASSERT_UNREACHABLE(
+ "Do not call TakeInternalStream after calling AsyncWait");
+ return NS_ERROR_UNEXPECTED;
+ }
+
+ // Take the inner stream and return it, then close ourselves.
+ if (mInnerStream) {
+ mInnerStream.forget(aStream);
+ } else if (mAsyncInnerStream) {
+ mAsyncInnerStream.forget(aStream);
+ }
+ mState = eClosed;
+ actor = mActor.forget();
+ }
+ if (actor) {
+ actor->StreamConsumed();
+ }
+ return NS_OK;
+}
+
+NS_IMETHODIMP RemoteLazyInputStream::GetInternalStreamID(nsID& aID) {
+ MutexAutoLock lock(mMutex);
+ if (!mActor) {
+ return NS_ERROR_NOT_AVAILABLE;
+ }
+
+ aID = mActor->StreamID();
+ return NS_OK;
+}
+
+RemoteLazyInputStream::~RemoteLazyInputStream() { Close(); }
+
+nsCString RemoteLazyInputStream::Describe() {
+ const char* state = "?";
+ switch (mState) {
+ case eInit:
+ state = "i";
+ break;
+ case ePending:
+ state = "p";
+ break;
+ case eRunning:
+ state = "r";
+ break;
+ case eClosed:
+ state = "c";
+ break;
+ }
+ return nsPrintfCString(
+ "[%p, %s, %s, %p%s, %s%s|%s%s]", this, state,
+ mActor ? nsIDToCString(mActor->StreamID()).get() : "<no actor>",
+ mInnerStream ? mInnerStream.get() : mAsyncInnerStream.get(),
+ mAsyncInnerStream ? "(A)" : "", mInputStreamCallback ? "I" : "",
+ mInputStreamCallbackEventTarget ? "+" : "",
+ mFileMetadataCallback ? "F" : "",
+ mFileMetadataCallbackEventTarget ? "+" : "");
+}
+
+// nsIInputStream interface
+
+NS_IMETHODIMP
+RemoteLazyInputStream::Available(uint64_t* aLength) {
+ nsCOMPtr<nsIAsyncInputStream> stream;
+ {
+ MutexAutoLock lock(mMutex);
+
+ // We don't have a remoteStream yet: let's return 0.
+ if (mState == eInit || mState == ePending) {
+ *aLength = 0;
+ return NS_OK;
+ }
+
+ if (mState == eClosed) {
+ return NS_BASE_STREAM_CLOSED;
+ }
+
+ MOZ_ASSERT(mState == eRunning);
+ MOZ_ASSERT(mInnerStream || mAsyncInnerStream);
+
+ nsresult rv = EnsureAsyncRemoteStream();
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ stream = mAsyncInnerStream;
+ }
+
+ MOZ_ASSERT(stream);
+ return stream->Available(aLength);
+}
+
+NS_IMETHODIMP
+RemoteLazyInputStream::Read(char* aBuffer, uint32_t aCount,
+ uint32_t* aReadCount) {
+ nsCOMPtr<nsIAsyncInputStream> stream;
+ {
+ MutexAutoLock lock(mMutex);
+
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose,
+ ("Read(%u) %s", aCount, Describe().get()));
+
+ // Read is not available is we don't have a remoteStream.
+ if (mState == eInit || mState == ePending) {
+ return NS_BASE_STREAM_WOULD_BLOCK;
+ }
+
+ if (mState == eClosed) {
+ return NS_BASE_STREAM_CLOSED;
+ }
+
+ MOZ_ASSERT(mState == eRunning);
+ MOZ_ASSERT(mInnerStream || mAsyncInnerStream);
+
+ nsresult rv = EnsureAsyncRemoteStream();
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ stream = mAsyncInnerStream;
+ }
+
+ MOZ_ASSERT(stream);
+ nsresult rv = stream->Read(aBuffer, aCount, aReadCount);
+ if (NS_FAILED(rv)) {
+ return rv;
+ }
+
+ // If some data has been read, we mark the stream as consumed.
+ if (*aReadCount > 0) {
+ MarkConsumed();
+ }
+
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose,
+ ("Read %u/%u bytes", *aReadCount, aCount));
+
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+RemoteLazyInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
+ uint32_t aCount, uint32_t* aResult) {
+ nsCOMPtr<nsIAsyncInputStream> stream;
+ {
+ MutexAutoLock lock(mMutex);
+
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose,
+ ("ReadSegments(%u) %s", aCount, Describe().get()));
+
+ // ReadSegments is not available is we don't have a remoteStream.
+ if (mState == eInit || mState == ePending) {
+ return NS_BASE_STREAM_WOULD_BLOCK;
+ }
+
+ if (mState == eClosed) {
+ return NS_BASE_STREAM_CLOSED;
+ }
+
+ MOZ_ASSERT(mState == eRunning);
+ MOZ_ASSERT(mInnerStream || mAsyncInnerStream);
+
+ nsresult rv = EnsureAsyncRemoteStream();
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Warning,
+ ("EnsureAsyncRemoteStream failed! %s %s",
+ mozilla::GetStaticErrorName(rv), Describe().get()));
+ return rv;
+ }
+
+ stream = mAsyncInnerStream;
+ }
+
+ MOZ_ASSERT(stream);
+ nsresult rv = stream->ReadSegments(aWriter, aClosure, aCount, aResult);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ // If some data has been read, we mark the stream as consumed.
+ if (*aResult != 0) {
+ MarkConsumed();
+ }
+
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose,
+ ("ReadSegments %u/%u bytes", *aResult, aCount));
+
+ return NS_OK;
+}
+
+void RemoteLazyInputStream::MarkConsumed() {
+ RefPtr<RemoteLazyInputStreamChild> actor;
+ {
+ MutexAutoLock lock(mMutex);
+ if (mActor) {
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Debug,
+ ("MarkConsumed %s", Describe().get()));
+ }
+
+ actor = mActor.forget();
+ }
+ if (actor) {
+ actor->StreamConsumed();
+ }
+}
+
+NS_IMETHODIMP
+RemoteLazyInputStream::IsNonBlocking(bool* aNonBlocking) {
+ *aNonBlocking = true;
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+RemoteLazyInputStream::Close() {
+ RefPtr<RemoteLazyInputStreamChild> actor;
+
+ nsCOMPtr<nsIAsyncInputStream> asyncInnerStream;
+ nsCOMPtr<nsIInputStream> innerStream;
+
+ RefPtr<nsIInputStreamCallback> inputStreamCallback;
+ nsCOMPtr<nsIEventTarget> inputStreamCallbackEventTarget;
+
+ {
+ MutexAutoLock lock(mMutex);
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Debug,
+ ("Close %s", Describe().get()));
+
+ actor = mActor.forget();
+
+ asyncInnerStream = mAsyncInnerStream.forget();
+ innerStream = mInnerStream.forget();
+
+ // TODO(Bug 1737783): Notify to the mFileMetadataCallback that this
+ // lazy input stream has been closed.
+ mFileMetadataCallback = nullptr;
+ mFileMetadataCallbackEventTarget = nullptr;
+
+ inputStreamCallback = mInputStreamCallback.forget();
+ inputStreamCallbackEventTarget = mInputStreamCallbackEventTarget.forget();
+
+ mState = eClosed;
+ }
+
+ if (actor) {
+ actor->StreamConsumed();
+ }
+
+ if (inputStreamCallback) {
+ InputStreamCallbackRunnable::Execute(
+ inputStreamCallback.forget(), inputStreamCallbackEventTarget.forget(),
+ this);
+ }
+
+ if (asyncInnerStream) {
+ asyncInnerStream->CloseWithStatus(NS_BASE_STREAM_CLOSED);
+ }
+
+ if (innerStream) {
+ innerStream->Close();
+ }
+
+ return NS_OK;
+}
+
+// nsICloneableInputStream interface
+
+NS_IMETHODIMP
+RemoteLazyInputStream::GetCloneable(bool* aCloneable) {
+ *aCloneable = true;
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+RemoteLazyInputStream::Clone(nsIInputStream** aResult) {
+ return CloneWithRange(0, UINT64_MAX, aResult);
+}
+
+// nsICloneableInputStreamWithRange interface
+
+NS_IMETHODIMP
+RemoteLazyInputStream::CloneWithRange(uint64_t aStart, uint64_t aLength,
+ nsIInputStream** aResult) {
+ MutexAutoLock lock(mMutex);
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Debug,
+ ("CloneWithRange %" PRIu64 " %" PRIu64 " %s", aStart, aLength,
+ Describe().get()));
+
+ nsresult rv;
+
+ RefPtr<RemoteLazyInputStream> stream;
+ if (mState == eClosed) {
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose, ("Cloning closed stream"));
+ stream = new RemoteLazyInputStream();
+ stream.forget(aResult);
+ return NS_OK;
+ }
+
+ uint64_t start = 0;
+ uint64_t length = 0;
+ auto maxLength = CheckedUint64(mLength) - aStart;
+ if (maxLength.isValid()) {
+ start = mStart + aStart;
+ length = std::min(maxLength.value(), aLength);
+ }
+
+ // If the slice would be empty, wrap an empty input stream and return it.
+ if (length == 0) {
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose, ("Creating empty stream"));
+
+ nsCOMPtr<nsIInputStream> emptyStream;
+ rv = NS_NewCStringInputStream(getter_AddRefs(emptyStream), ""_ns);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ stream = new RemoteLazyInputStream(emptyStream);
+ stream.forget(aResult);
+ return NS_OK;
+ }
+
+ // If we still have a connection to our actor, that means we haven't read any
+ // data yet, and can clone + slice by building a new stream backed by the same
+ // actor.
+ if (mActor) {
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose,
+ ("Cloning stream with actor"));
+
+ stream = new RemoteLazyInputStream(mActor, start, length);
+ stream.forget(aResult);
+ return NS_OK;
+ }
+
+ // We no longer have our actor, either because we were constructed without
+ // one, or we've already begun reading. Perform the clone locally on our inner
+ // input stream.
+
+ nsCOMPtr<nsIInputStream> innerStream = mInnerStream;
+ if (mAsyncInnerStream) {
+ innerStream = mAsyncInnerStream;
+ }
+
+ nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(innerStream);
+ if (!cloneable || !cloneable->GetCloneable()) {
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose,
+ ("Cloning non-cloneable stream - copying to pipe"));
+
+ // If our internal stream isn't cloneable, to perform a clone we'll need to
+ // copy into a pipe and replace our internal stream.
+ nsCOMPtr<nsIAsyncInputStream> pipeIn;
+ nsCOMPtr<nsIAsyncOutputStream> pipeOut;
+ NS_NewPipe2(getter_AddRefs(pipeIn), getter_AddRefs(pipeOut), true, true);
+
+ RefPtr<RemoteLazyInputStreamThread> thread =
+ RemoteLazyInputStreamThread::GetOrCreate();
+ if (NS_WARN_IF(!thread)) {
+ return NS_ERROR_ILLEGAL_DURING_SHUTDOWN;
+ }
+
+ mAsyncInnerStream = pipeIn;
+ mInnerStream = nullptr;
+
+ // If we have a callback pending, we need to re-call AsyncWait on the inner
+ // stream. This should not re-enter us immediately, as `pipeIn` hasn't been
+ // sent any data yet, but we may be called again as soon as `NS_AsyncCopy`
+ // has begun copying.
+ if (mInputStreamCallback) {
+ mAsyncInnerStream->AsyncWait(this, mInputStreamCallbackFlags,
+ mInputStreamCallbackRequestedCount,
+ mInputStreamCallbackEventTarget);
+ }
+
+ rv = NS_AsyncCopy(innerStream, pipeOut, thread,
+ NS_ASYNCCOPY_VIA_WRITESEGMENTS);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ // The copy failed, revert the changes we did and restore our previous
+ // inner stream.
+ mAsyncInnerStream = nullptr;
+ mInnerStream = innerStream;
+ return rv;
+ }
+
+ cloneable = do_QueryInterface(mAsyncInnerStream);
+ }
+
+ MOZ_ASSERT(cloneable && cloneable->GetCloneable());
+
+ // Check if we can clone more efficiently with a range.
+ if (length < UINT64_MAX) {
+ if (nsCOMPtr<nsICloneableInputStreamWithRange> cloneableWithRange =
+ do_QueryInterface(cloneable)) {
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose, ("Cloning with range"));
+ nsCOMPtr<nsIInputStream> cloned;
+ rv = cloneableWithRange->CloneWithRange(start, length,
+ getter_AddRefs(cloned));
+ if (NS_FAILED(rv)) {
+ return rv;
+ }
+
+ stream = new RemoteLazyInputStream(cloned);
+ stream.forget(aResult);
+ return NS_OK;
+ }
+ }
+
+ // Directly clone our inner stream, and then slice it if needed.
+ nsCOMPtr<nsIInputStream> cloned;
+ rv = cloneable->Clone(getter_AddRefs(cloned));
+ if (NS_FAILED(rv)) {
+ return rv;
+ }
+
+ if (length < UINT64_MAX) {
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose,
+ ("Slicing stream with %" PRIu64 " %" PRIu64, start, length));
+ cloned = new SlicedInputStream(cloned.forget(), start, length);
+ }
+
+ stream = new RemoteLazyInputStream(cloned);
+ stream.forget(aResult);
+ return NS_OK;
+}
+
+// nsIAsyncInputStream interface
+
+NS_IMETHODIMP
+RemoteLazyInputStream::CloseWithStatus(nsresult aStatus) { return Close(); }
+
+NS_IMETHODIMP
+RemoteLazyInputStream::AsyncWait(nsIInputStreamCallback* aCallback,
+ uint32_t aFlags, uint32_t aRequestedCount,
+ nsIEventTarget* aEventTarget) {
+ // Ensure we always have an event target for AsyncWait callbacks, so that
+ // calls to `AsyncWait` cannot reenter us with `OnInputStreamReady`.
+ nsCOMPtr<nsIEventTarget> eventTarget = aEventTarget;
+ if (aCallback && !eventTarget) {
+ eventTarget = RemoteLazyInputStreamThread::GetOrCreate();
+ if (NS_WARN_IF(!eventTarget)) {
+ return NS_ERROR_ILLEGAL_DURING_SHUTDOWN;
+ }
+ }
+
+ {
+ MutexAutoLock lock(mMutex);
+
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose,
+ ("AsyncWait(%p, %u, %u, %p) %s", aCallback, aFlags, aRequestedCount,
+ aEventTarget, Describe().get()));
+
+ // See RemoteLazyInputStream.h for more information about this state
+ // machine.
+
+ nsCOMPtr<nsIAsyncInputStream> stream;
+ switch (mState) {
+ // First call, we need to retrieve the stream from the parent actor.
+ case eInit:
+ MOZ_ASSERT(mActor);
+
+ mInputStreamCallback = aCallback;
+ mInputStreamCallbackEventTarget = eventTarget;
+ mInputStreamCallbackFlags = aFlags;
+ mInputStreamCallbackRequestedCount = aRequestedCount;
+ mState = ePending;
+
+ StreamNeeded();
+ return NS_OK;
+
+ // We are still waiting for the remote inputStream
+ case ePending: {
+ if (NS_WARN_IF(mInputStreamCallback && aCallback &&
+ mInputStreamCallback != aCallback)) {
+ return NS_ERROR_FAILURE;
+ }
+
+ mInputStreamCallback = aCallback;
+ mInputStreamCallbackEventTarget = eventTarget;
+ mInputStreamCallbackFlags = aFlags;
+ mInputStreamCallbackRequestedCount = aRequestedCount;
+ return NS_OK;
+ }
+
+ // We have the remote inputStream, let's check if we can execute the
+ // callback.
+ case eRunning: {
+ if (NS_WARN_IF(mInputStreamCallback && aCallback &&
+ mInputStreamCallback != aCallback)) {
+ return NS_ERROR_FAILURE;
+ }
+
+ nsresult rv = EnsureAsyncRemoteStream();
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ mInputStreamCallback = aCallback;
+ mInputStreamCallbackEventTarget = eventTarget;
+ mInputStreamCallbackFlags = aFlags;
+ mInputStreamCallbackRequestedCount = aRequestedCount;
+
+ stream = mAsyncInnerStream;
+ break;
+ }
+
+ case eClosed:
+ [[fallthrough]];
+ default:
+ MOZ_ASSERT(mState == eClosed);
+ if (NS_WARN_IF(mInputStreamCallback && aCallback &&
+ mInputStreamCallback != aCallback)) {
+ return NS_ERROR_FAILURE;
+ }
+ break;
+ }
+
+ if (stream) {
+ return stream->AsyncWait(aCallback ? this : nullptr, aFlags,
+ aRequestedCount, eventTarget);
+ }
+ }
+
+ if (aCallback) {
+ // if stream is nullptr here, that probably means the stream has
+ // been closed and the callback can be executed immediately
+ InputStreamCallbackRunnable::Execute(do_AddRef(aCallback),
+ do_AddRef(eventTarget), this);
+ }
+ return NS_OK;
+}
+
+void RemoteLazyInputStream::StreamNeeded() {
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Debug,
+ ("StreamNeeded %s", Describe().get()));
+
+ auto* thread = RemoteLazyInputStreamThread::GetOrCreate();
+ if (NS_WARN_IF(!thread)) {
+ return;
+ }
+ thread->Dispatch(NS_NewRunnableFunction(
+ "RemoteLazyInputStream::StreamNeeded",
+ [self = RefPtr{this}, actor = mActor, start = mStart, length = mLength] {
+ MOZ_LOG(
+ gRemoteLazyStreamLog, LogLevel::Debug,
+ ("Sending StreamNeeded(%" PRIu64 " %" PRIu64 ") %s %d", start,
+ length, nsIDToCString(actor->StreamID()).get(), actor->CanSend()));
+
+ actor->SendStreamNeeded(
+ start, length,
+ [self](const Maybe<mozilla::ipc::IPCStream>& aStream) {
+ // Try to deserialize the stream from our remote, and close our
+ // stream if it fails.
+ nsCOMPtr<nsIInputStream> stream =
+ mozilla::ipc::DeserializeIPCStream(aStream);
+ if (NS_WARN_IF(!stream)) {
+ NS_WARNING("Failed to deserialize IPC stream");
+ self->Close();
+ }
+
+ // Lock our mutex to update the inner stream, and collect any
+ // callbacks which we need to invoke.
+ MutexAutoLock lock(self->mMutex);
+
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Debug,
+ ("ResolveStreamNeeded(%p) %s", stream.get(),
+ self->Describe().get()));
+
+ if (self->mState == ePending) {
+ self->mInnerStream = stream.forget();
+ self->mState = eRunning;
+
+ // Notify any listeners that we've now acquired the underlying
+ // stream, so file metadata information will be available.
+ nsCOMPtr<nsIFileMetadataCallback> fileMetadataCallback =
+ self->mFileMetadataCallback.forget();
+ nsCOMPtr<nsIEventTarget> fileMetadataCallbackEventTarget =
+ self->mFileMetadataCallbackEventTarget.forget();
+ if (fileMetadataCallback) {
+ FileMetadataCallbackRunnable::Execute(
+ fileMetadataCallback, fileMetadataCallbackEventTarget,
+ self);
+ }
+
+ // **NOTE** we can re-enter this class here **NOTE**
+ // If we already have an input stream callback, attempt to
+ // register ourselves with AsyncWait on the underlying stream.
+ if (self->mInputStreamCallback) {
+ if (NS_FAILED(self->EnsureAsyncRemoteStream()) ||
+ NS_FAILED(self->mAsyncInnerStream->AsyncWait(
+ self, self->mInputStreamCallbackFlags,
+ self->mInputStreamCallbackRequestedCount,
+ self->mInputStreamCallbackEventTarget))) {
+ InputStreamCallbackRunnable::Execute(
+ self->mInputStreamCallback.forget(),
+ self->mInputStreamCallbackEventTarget.forget(), self);
+ }
+ }
+ }
+
+ if (stream) {
+ NS_WARNING("Failed to save stream, closing it");
+ stream->Close();
+ }
+ },
+ [self](mozilla::ipc::ResponseRejectReason) {
+ NS_WARNING("SendStreamNeeded rejected");
+ self->Close();
+ });
+ }));
+}
+
+// nsIInputStreamCallback
+
+NS_IMETHODIMP
+RemoteLazyInputStream::OnInputStreamReady(nsIAsyncInputStream* aStream) {
+ RefPtr<nsIInputStreamCallback> callback;
+ nsCOMPtr<nsIEventTarget> callbackEventTarget;
+ {
+ MutexAutoLock lock(mMutex);
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Debug,
+ ("OnInputStreamReady %s", Describe().get()));
+
+ // We have been closed in the meantime.
+ if (mState == eClosed) {
+ return NS_OK;
+ }
+
+ // We got a callback from the wrong stream, likely due to a `CloneWithRange`
+ // call while we were waiting. Ignore this callback.
+ if (mAsyncInnerStream != aStream) {
+ return NS_OK;
+ }
+
+ MOZ_ASSERT(mState == eRunning);
+
+ // The callback has been canceled in the meantime.
+ if (!mInputStreamCallback) {
+ return NS_OK;
+ }
+
+ callback.swap(mInputStreamCallback);
+ callbackEventTarget.swap(mInputStreamCallbackEventTarget);
+ }
+
+ // This must be the last operation because the execution of the callback can
+ // be synchronous.
+ MOZ_ASSERT(callback);
+ InputStreamCallbackRunnable::Execute(callback.forget(),
+ callbackEventTarget.forget(), this);
+ return NS_OK;
+}
+
+// nsIIPCSerializableInputStream
+
+void RemoteLazyInputStream::SerializedComplexity(uint32_t aMaxSize,
+ uint32_t* aSizeUsed,
+ uint32_t* aNewPipes,
+ uint32_t* aTransferables) {
+ *aTransferables = 1;
+}
+
+void RemoteLazyInputStream::Serialize(mozilla::ipc::InputStreamParams& aParams,
+ uint32_t aMaxSize, uint32_t* aSizeUsed) {
+ *aSizeUsed = 0;
+ aParams = mozilla::ipc::RemoteLazyInputStreamParams(this);
+}
+
+bool RemoteLazyInputStream::Deserialize(
+ const mozilla::ipc::InputStreamParams& aParams) {
+ MOZ_CRASH("This should never be called.");
+ return false;
+}
+
+// nsIAsyncFileMetadata
+
+NS_IMETHODIMP
+RemoteLazyInputStream::AsyncFileMetadataWait(nsIFileMetadataCallback* aCallback,
+ nsIEventTarget* aEventTarget) {
+ MOZ_ASSERT(!!aCallback == !!aEventTarget);
+
+ // If we have the callback, we must have the event target.
+ if (NS_WARN_IF(!!aCallback != !!aEventTarget)) {
+ return NS_ERROR_FAILURE;
+ }
+
+ // See RemoteLazyInputStream.h for more information about this state
+ // machine.
+
+ {
+ MutexAutoLock lock(mMutex);
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Debug,
+ ("AsyncFileMetadataWait(%p, %p) %s", aCallback, aEventTarget,
+ Describe().get()));
+
+ switch (mState) {
+ // First call, we need to retrieve the stream from the parent actor.
+ case eInit:
+ MOZ_ASSERT(mActor);
+
+ mFileMetadataCallback = aCallback;
+ mFileMetadataCallbackEventTarget = aEventTarget;
+ mState = ePending;
+
+ StreamNeeded();
+ return NS_OK;
+
+ // We are still waiting for the remote inputStream
+ case ePending:
+ if (mFileMetadataCallback && aCallback) {
+ return NS_ERROR_FAILURE;
+ }
+
+ mFileMetadataCallback = aCallback;
+ mFileMetadataCallbackEventTarget = aEventTarget;
+ return NS_OK;
+
+ // We have the remote inputStream, let's check if we can execute the
+ // callback.
+ case eRunning:
+ break;
+
+ // Stream is closed.
+ default:
+ MOZ_ASSERT(mState == eClosed);
+ return NS_BASE_STREAM_CLOSED;
+ }
+
+ MOZ_ASSERT(mState == eRunning);
+ }
+
+ FileMetadataCallbackRunnable::Execute(aCallback, aEventTarget, this);
+ return NS_OK;
+}
+
+// nsIFileMetadata
+
+NS_IMETHODIMP
+RemoteLazyInputStream::GetSize(int64_t* aRetval) {
+ nsCOMPtr<nsIFileMetadata> fileMetadata;
+ {
+ MutexAutoLock lock(mMutex);
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose,
+ ("GetSize %s", Describe().get()));
+
+ fileMetadata = do_QueryInterface(mInnerStream);
+ if (!fileMetadata) {
+ return mState == eClosed ? NS_BASE_STREAM_CLOSED : NS_ERROR_FAILURE;
+ }
+ }
+
+ return fileMetadata->GetSize(aRetval);
+}
+
+NS_IMETHODIMP
+RemoteLazyInputStream::GetLastModified(int64_t* aRetval) {
+ nsCOMPtr<nsIFileMetadata> fileMetadata;
+ {
+ MutexAutoLock lock(mMutex);
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose,
+ ("GetLastModified %s", Describe().get()));
+
+ fileMetadata = do_QueryInterface(mInnerStream);
+ if (!fileMetadata) {
+ return mState == eClosed ? NS_BASE_STREAM_CLOSED : NS_ERROR_FAILURE;
+ }
+ }
+
+ return fileMetadata->GetLastModified(aRetval);
+}
+
+NS_IMETHODIMP
+RemoteLazyInputStream::GetFileDescriptor(PRFileDesc** aRetval) {
+ nsCOMPtr<nsIFileMetadata> fileMetadata;
+ {
+ MutexAutoLock lock(mMutex);
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose,
+ ("GetFileDescriptor %s", Describe().get()));
+
+ fileMetadata = do_QueryInterface(mInnerStream);
+ if (!fileMetadata) {
+ return mState == eClosed ? NS_BASE_STREAM_CLOSED : NS_ERROR_FAILURE;
+ }
+ }
+
+ return fileMetadata->GetFileDescriptor(aRetval);
+}
+
+nsresult RemoteLazyInputStream::EnsureAsyncRemoteStream() {
+ // We already have an async remote stream.
+ if (mAsyncInnerStream) {
+ return NS_OK;
+ }
+
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Debug,
+ ("EnsureAsyncRemoteStream %s", Describe().get()));
+
+ if (NS_WARN_IF(!mInnerStream)) {
+ return NS_ERROR_FAILURE;
+ }
+
+ nsCOMPtr<nsIInputStream> stream = mInnerStream;
+
+ // Check if the stream is blocking, if it is, we want to make it non-blocking
+ // using a pipe.
+ bool nonBlocking = false;
+ nsresult rv = stream->IsNonBlocking(&nonBlocking);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ // We don't return NS_ERROR_NOT_IMPLEMENTED from ReadSegments,
+ // so it's possible that callers are expecting us to succeed in the future.
+ // We need to make sure the stream we return here supports ReadSegments,
+ // so wrap if in a buffered stream if necessary.
+ //
+ // We only need to do this if we won't be wrapping the stream in a pipe, which
+ // will add buffering anyway.
+ if (nonBlocking && !NS_InputStreamIsBuffered(stream)) {
+ nsCOMPtr<nsIInputStream> bufferedStream;
+ nsresult rv = NS_NewBufferedInputStream(getter_AddRefs(bufferedStream),
+ stream.forget(), 4096);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ stream = bufferedStream;
+ }
+
+ nsCOMPtr<nsIAsyncInputStream> asyncStream = do_QueryInterface(stream);
+
+ // If non-blocking and non-async, let's use NonBlockingAsyncInputStream.
+ if (nonBlocking && !asyncStream) {
+ rv = NonBlockingAsyncInputStream::Create(stream.forget(),
+ getter_AddRefs(asyncStream));
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ MOZ_ASSERT(asyncStream);
+ }
+
+ if (!asyncStream) {
+ // Let's make the stream async using the DOMFile thread.
+ nsCOMPtr<nsIAsyncInputStream> pipeIn;
+ nsCOMPtr<nsIAsyncOutputStream> pipeOut;
+ NS_NewPipe2(getter_AddRefs(pipeIn), getter_AddRefs(pipeOut), true, true);
+
+ RefPtr<RemoteLazyInputStreamThread> thread =
+ RemoteLazyInputStreamThread::GetOrCreate();
+ if (NS_WARN_IF(!thread)) {
+ return NS_ERROR_ILLEGAL_DURING_SHUTDOWN;
+ }
+
+ rv = NS_AsyncCopy(stream, pipeOut, thread, NS_ASYNCCOPY_VIA_WRITESEGMENTS);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ asyncStream = pipeIn;
+ }
+
+ MOZ_ASSERT(asyncStream);
+ mAsyncInnerStream = asyncStream;
+ mInnerStream = nullptr;
+
+ return NS_OK;
+}
+
+// nsIInputStreamLength
+
+NS_IMETHODIMP
+RemoteLazyInputStream::Length(int64_t* aLength) {
+ MutexAutoLock lock(mMutex);
+
+ if (mState == eClosed) {
+ return NS_BASE_STREAM_CLOSED;
+ }
+
+ if (!mActor) {
+ return NS_ERROR_NOT_AVAILABLE;
+ }
+
+ return NS_BASE_STREAM_WOULD_BLOCK;
+}
+
+namespace {
+
+class InputStreamLengthCallbackRunnable final : public DiscardableRunnable {
+ public:
+ static void Execute(nsIInputStreamLengthCallback* aCallback,
+ nsIEventTarget* aEventTarget,
+ RemoteLazyInputStream* aStream, int64_t aLength) {
+ MOZ_ASSERT(aCallback);
+ MOZ_ASSERT(aEventTarget);
+
+ RefPtr<InputStreamLengthCallbackRunnable> runnable =
+ new InputStreamLengthCallbackRunnable(aCallback, aStream, aLength);
+
+ nsCOMPtr<nsIEventTarget> target = aEventTarget;
+ target->Dispatch(runnable, NS_DISPATCH_NORMAL);
+ }
+
+ NS_IMETHOD
+ Run() override {
+ mCallback->OnInputStreamLengthReady(mStream, mLength);
+ mCallback = nullptr;
+ mStream = nullptr;
+ return NS_OK;
+ }
+
+ private:
+ InputStreamLengthCallbackRunnable(nsIInputStreamLengthCallback* aCallback,
+ RemoteLazyInputStream* aStream,
+ int64_t aLength)
+ : DiscardableRunnable("dom::InputStreamLengthCallbackRunnable"),
+ mCallback(aCallback),
+ mStream(aStream),
+ mLength(aLength) {
+ MOZ_ASSERT(mCallback);
+ MOZ_ASSERT(mStream);
+ }
+
+ nsCOMPtr<nsIInputStreamLengthCallback> mCallback;
+ RefPtr<RemoteLazyInputStream> mStream;
+ const int64_t mLength;
+};
+
+} // namespace
+
+// nsIAsyncInputStreamLength
+
+NS_IMETHODIMP
+RemoteLazyInputStream::AsyncLengthWait(nsIInputStreamLengthCallback* aCallback,
+ nsIEventTarget* aEventTarget) {
+ // If we have the callback, we must have the event target.
+ if (NS_WARN_IF(!!aCallback != !!aEventTarget)) {
+ return NS_ERROR_FAILURE;
+ }
+
+ {
+ MutexAutoLock lock(mMutex);
+
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose,
+ ("AsyncLengthWait(%p, %p) %s", aCallback, aEventTarget,
+ Describe().get()));
+
+ if (mActor) {
+ if (aCallback) {
+ auto* thread = RemoteLazyInputStreamThread::GetOrCreate();
+ if (NS_WARN_IF(!thread)) {
+ return NS_ERROR_ILLEGAL_DURING_SHUTDOWN;
+ }
+ thread->Dispatch(NS_NewRunnableFunction(
+ "RemoteLazyInputStream::AsyncLengthWait",
+ [self = RefPtr{this}, actor = mActor,
+ callback = nsCOMPtr{aCallback},
+ eventTarget = nsCOMPtr{aEventTarget}] {
+ actor->SendLengthNeeded(
+ [self, callback, eventTarget](int64_t aLength) {
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose,
+ ("AsyncLengthWait resolve %" PRId64, aLength));
+ int64_t length = -1;
+ if (aLength > 0) {
+ uint64_t sourceLength =
+ aLength - std::min<uint64_t>(aLength, self->mStart);
+ length = int64_t(
+ std::min<uint64_t>(sourceLength, self->mLength));
+ }
+ InputStreamLengthCallbackRunnable::Execute(
+ callback, eventTarget, self, length);
+ },
+ [self, callback,
+ eventTarget](mozilla::ipc::ResponseRejectReason) {
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Warning,
+ ("AsyncLengthWait reject"));
+ InputStreamLengthCallbackRunnable::Execute(
+ callback, eventTarget, self, -1);
+ });
+ }));
+ }
+
+ return NS_OK;
+ }
+ }
+
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose,
+ ("AsyncLengthWait immediate"));
+
+ // If execution has reached here, it means the stream is either closed or
+ // consumed, and therefore the callback can be executed immediately
+ InputStreamLengthCallbackRunnable::Execute(aCallback, aEventTarget, this, -1);
+ return NS_OK;
+}
+
+void RemoteLazyInputStream::IPCWrite(IPC::MessageWriter* aWriter) {
+ // If we have an actor still, serialize efficiently by cloning our actor to
+ // maintain a reference to the parent side.
+ RefPtr<RemoteLazyInputStreamChild> actor;
+
+ nsCOMPtr<nsIInputStream> innerStream;
+
+ RefPtr<nsIInputStreamCallback> inputStreamCallback;
+ nsCOMPtr<nsIEventTarget> inputStreamCallbackEventTarget;
+
+ {
+ MutexAutoLock lock(mMutex);
+
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose,
+ ("Serialize %s", Describe().get()));
+
+ actor = mActor.forget();
+
+ if (mAsyncInnerStream) {
+ MOZ_ASSERT(!mInnerStream);
+ innerStream = mAsyncInnerStream.forget();
+ } else {
+ innerStream = mInnerStream.forget();
+ }
+
+ // TODO(Bug 1737783): Notify to the mFileMetadataCallback that this
+ // lazy input stream has been closed.
+ mFileMetadataCallback = nullptr;
+ mFileMetadataCallbackEventTarget = nullptr;
+
+ inputStreamCallback = mInputStreamCallback.forget();
+ inputStreamCallbackEventTarget = mInputStreamCallbackEventTarget.forget();
+
+ mState = eClosed;
+ }
+
+ if (inputStreamCallback) {
+ InputStreamCallbackRunnable::Execute(
+ inputStreamCallback.forget(), inputStreamCallbackEventTarget.forget(),
+ this);
+ }
+
+ bool closed = !actor && !innerStream;
+ IPC::WriteParam(aWriter, closed);
+ if (closed) {
+ return;
+ }
+
+ // If we still have a connection to our remote actor, create a clone endpoint
+ // for it and tell it that the stream has been consumed. The clone of the
+ // connection can be transferred to another process.
+ if (actor) {
+ MOZ_LOG(
+ gRemoteLazyStreamLog, LogLevel::Debug,
+ ("Serializing as actor: %s", nsIDToCString(actor->StreamID()).get()));
+ // Create a clone of the actor, and then tell it that this stream is no
+ // longer referencing it.
+ mozilla::ipc::Endpoint<PRemoteLazyInputStreamParent> parentEp;
+ mozilla::ipc::Endpoint<PRemoteLazyInputStreamChild> childEp;
+ MOZ_ALWAYS_SUCCEEDS(
+ PRemoteLazyInputStream::CreateEndpoints(&parentEp, &childEp));
+
+ auto* thread = RemoteLazyInputStreamThread::GetOrCreate();
+ if (thread) {
+ thread->Dispatch(NS_NewRunnableFunction(
+ "RemoteLazyInputStreamChild::SendClone",
+ [actor, parentEp = std::move(parentEp)]() mutable {
+ bool ok = actor->SendClone(std::move(parentEp));
+ MOZ_LOG(
+ gRemoteLazyStreamLog, LogLevel::Verbose,
+ ("SendClone for %s: %s", nsIDToCString(actor->StreamID()).get(),
+ ok ? "OK" : "ERR"));
+ }));
+
+ } // else we are shutting down xpcom threads.
+
+ // NOTE: Call `StreamConsumed` after dispatching the `SendClone` runnable,
+ // as this method may dispatch a runnable to `RemoteLazyInputStreamThread`
+ // to call `SendGoodbye`, which needs to happen after `SendClone`.
+ actor->StreamConsumed();
+
+ IPC::WriteParam(aWriter, actor->StreamID());
+ IPC::WriteParam(aWriter, mStart);
+ IPC::WriteParam(aWriter, mLength);
+ IPC::WriteParam(aWriter, std::move(childEp));
+
+ if (innerStream) {
+ innerStream->Close();
+ }
+ return;
+ }
+
+ // If we have a stream and are in the parent process, create a new actor pair
+ // and transfer ownership of the stream into storage.
+ auto streamStorage = RemoteLazyInputStreamStorage::Get();
+ if (streamStorage.isOk()) {
+ MOZ_ASSERT(XRE_IsParentProcess());
+ nsID id = nsID::GenerateUUID();
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Debug,
+ ("Serializing as new stream: %s", nsIDToCString(id).get()));
+
+ streamStorage.inspect()->AddStream(innerStream, id);
+
+ mozilla::ipc::Endpoint<PRemoteLazyInputStreamParent> parentEp;
+ mozilla::ipc::Endpoint<PRemoteLazyInputStreamChild> childEp;
+ MOZ_ALWAYS_SUCCEEDS(
+ PRemoteLazyInputStream::CreateEndpoints(&parentEp, &childEp));
+
+ // Bind the actor on our background thread.
+ streamStorage.inspect()->TaskQueue()->Dispatch(NS_NewRunnableFunction(
+ "RemoteLazyInputStreamParent::Bind",
+ [parentEp = std::move(parentEp), id]() mutable {
+ auto stream = MakeRefPtr<RemoteLazyInputStreamParent>(id);
+ parentEp.Bind(stream);
+ }));
+
+ IPC::WriteParam(aWriter, id);
+ IPC::WriteParam(aWriter, 0);
+ IPC::WriteParam(aWriter, UINT64_MAX);
+ IPC::WriteParam(aWriter, std::move(childEp));
+ return;
+ }
+
+ MOZ_CRASH("Cannot serialize new RemoteLazyInputStream from this process");
+}
+
+already_AddRefed<RemoteLazyInputStream> RemoteLazyInputStream::IPCRead(
+ IPC::MessageReader* aReader) {
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose, ("Deserialize"));
+
+ bool closed;
+ if (NS_WARN_IF(!IPC::ReadParam(aReader, &closed))) {
+ return nullptr;
+ }
+ if (closed) {
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Verbose,
+ ("Deserialize closed stream"));
+ return do_AddRef(new RemoteLazyInputStream());
+ }
+
+ nsID id{};
+ uint64_t start;
+ uint64_t length;
+ mozilla::ipc::Endpoint<PRemoteLazyInputStreamChild> endpoint;
+ if (NS_WARN_IF(!IPC::ReadParam(aReader, &id)) ||
+ NS_WARN_IF(!IPC::ReadParam(aReader, &start)) ||
+ NS_WARN_IF(!IPC::ReadParam(aReader, &length)) ||
+ NS_WARN_IF(!IPC::ReadParam(aReader, &endpoint))) {
+ return nullptr;
+ }
+
+ if (!endpoint.IsValid()) {
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Warning,
+ ("Deserialize failed due to invalid endpoint!"));
+ return do_AddRef(new RemoteLazyInputStream());
+ }
+
+ RefPtr<RemoteLazyInputStreamChild> actor =
+ BindChildActor(id, std::move(endpoint));
+
+ if (!actor) {
+ MOZ_LOG(gRemoteLazyStreamLog, LogLevel::Warning,
+ ("Deserialize failed as we are probably late in shutdown!"));
+ return do_AddRef(new RemoteLazyInputStream());
+ }
+
+ return do_AddRef(new RemoteLazyInputStream(actor, start, length));
+}
+
+} // namespace mozilla
+
+void IPC::ParamTraits<mozilla::RemoteLazyInputStream*>::Write(
+ IPC::MessageWriter* aWriter, mozilla::RemoteLazyInputStream* aParam) {
+ bool nonNull = !!aParam;
+ IPC::WriteParam(aWriter, nonNull);
+ if (aParam) {
+ aParam->IPCWrite(aWriter);
+ }
+}
+
+bool IPC::ParamTraits<mozilla::RemoteLazyInputStream*>::Read(
+ IPC::MessageReader* aReader,
+ RefPtr<mozilla::RemoteLazyInputStream>* aResult) {
+ bool nonNull = false;
+ if (!IPC::ReadParam(aReader, &nonNull)) {
+ return false;
+ }
+ if (!nonNull) {
+ *aResult = nullptr;
+ return true;
+ }
+ *aResult = mozilla::RemoteLazyInputStream::IPCRead(aReader);
+ return *aResult;
+}