summaryrefslogtreecommitdiffstats
path: root/xpcom/io/nsMultiplexInputStream.cpp
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--xpcom/io/nsMultiplexInputStream.cpp1551
1 files changed, 1551 insertions, 0 deletions
diff --git a/xpcom/io/nsMultiplexInputStream.cpp b/xpcom/io/nsMultiplexInputStream.cpp
new file mode 100644
index 0000000000..a9808aa35a
--- /dev/null
+++ b/xpcom/io/nsMultiplexInputStream.cpp
@@ -0,0 +1,1551 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: set ts=8 sts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/**
+ * The multiplex stream concatenates a list of input streams into a single
+ * stream.
+ */
+
+#include "mozilla/Attributes.h"
+#include "mozilla/CheckedInt.h"
+#include "mozilla/MathAlgorithms.h"
+#include "mozilla/Mutex.h"
+
+#include "base/basictypes.h"
+
+#include "nsMultiplexInputStream.h"
+#include "nsIBufferedStreams.h"
+#include "nsICloneableInputStream.h"
+#include "nsIMultiplexInputStream.h"
+#include "nsISeekableStream.h"
+#include "nsCOMPtr.h"
+#include "nsCOMArray.h"
+#include "nsIClassInfoImpl.h"
+#include "nsIIPCSerializableInputStream.h"
+#include "mozilla/ipc/InputStreamUtils.h"
+#include "nsIAsyncInputStream.h"
+#include "nsIInputStreamLength.h"
+#include "nsNetUtil.h"
+#include "nsStreamUtils.h"
+
+using namespace mozilla;
+using namespace mozilla::ipc;
+
+using mozilla::DeprecatedAbs;
+
+class nsMultiplexInputStream final : public nsIMultiplexInputStream,
+ public nsISeekableStream,
+ public nsIIPCSerializableInputStream,
+ public nsICloneableInputStream,
+ public nsIAsyncInputStream,
+ public nsIInputStreamCallback,
+ public nsIInputStreamLength,
+ public nsIAsyncInputStreamLength {
+ public:
+ nsMultiplexInputStream();
+
+ NS_DECL_THREADSAFE_ISUPPORTS
+ NS_DECL_NSIINPUTSTREAM
+ NS_DECL_NSIMULTIPLEXINPUTSTREAM
+ NS_DECL_NSISEEKABLESTREAM
+ NS_DECL_NSITELLABLESTREAM
+ NS_DECL_NSIIPCSERIALIZABLEINPUTSTREAM
+ NS_DECL_NSICLONEABLEINPUTSTREAM
+ NS_DECL_NSIASYNCINPUTSTREAM
+ NS_DECL_NSIINPUTSTREAMCALLBACK
+ NS_DECL_NSIINPUTSTREAMLENGTH
+ NS_DECL_NSIASYNCINPUTSTREAMLENGTH
+
+ // This is used for nsIAsyncInputStream::AsyncWait
+ void AsyncWaitCompleted();
+
+ // This is used for nsIAsyncInputStreamLength::AsyncLengthWait
+ void AsyncWaitCompleted(int64_t aLength, const MutexAutoLock& aProofOfLock)
+ MOZ_REQUIRES(mLock);
+
+ struct StreamData {
+ nsresult Initialize(nsIInputStream* aOriginalStream) {
+ mCurrentPos = 0;
+
+ mOriginalStream = aOriginalStream;
+
+ mBufferedStream = aOriginalStream;
+ if (!NS_InputStreamIsBuffered(mBufferedStream)) {
+ nsCOMPtr<nsIInputStream> bufferedStream;
+ nsresult rv = NS_NewBufferedInputStream(getter_AddRefs(bufferedStream),
+ mBufferedStream.forget(), 4096);
+ NS_ENSURE_SUCCESS(rv, rv);
+ mBufferedStream = bufferedStream;
+ }
+
+ mAsyncStream = do_QueryInterface(mBufferedStream);
+ mSeekableStream = do_QueryInterface(mBufferedStream);
+
+ return NS_OK;
+ }
+
+ nsCOMPtr<nsIInputStream> mOriginalStream;
+
+ // Equal to mOriginalStream or a wrap around the original stream to make it
+ // buffered.
+ nsCOMPtr<nsIInputStream> mBufferedStream;
+
+ // This can be null.
+ nsCOMPtr<nsIAsyncInputStream> mAsyncStream;
+ // This can be null.
+ nsCOMPtr<nsISeekableStream> mSeekableStream;
+
+ uint64_t mCurrentPos;
+ };
+
+ Mutex& GetLock() MOZ_RETURN_CAPABILITY(mLock) { return mLock; }
+
+ private:
+ ~nsMultiplexInputStream() = default;
+
+ void NextStream() MOZ_REQUIRES(mLock) {
+ ++mCurrentStream;
+ mStartedReadingCurrent = false;
+ }
+
+ nsresult AsyncWaitInternal();
+
+ // This method updates mSeekableStreams, mTellableStreams,
+ // mIPCSerializableStreams and mCloneableStreams values.
+ void UpdateQIMap(StreamData& aStream) MOZ_REQUIRES(mLock);
+
+ struct MOZ_STACK_CLASS ReadSegmentsState {
+ nsCOMPtr<nsIInputStream> mThisStream;
+ uint32_t mOffset;
+ nsWriteSegmentFun mWriter;
+ void* mClosure;
+ bool mDone;
+ };
+
+ void SerializedComplexityInternal(uint32_t aMaxSize, uint32_t* aSizeUsed,
+ uint32_t* aPipes, uint32_t* aTransferables,
+ bool* aSerializeAsPipe);
+
+ static nsresult ReadSegCb(nsIInputStream* aIn, void* aClosure,
+ const char* aFromRawSegment, uint32_t aToOffset,
+ uint32_t aCount, uint32_t* aWriteCount);
+
+ bool IsSeekable() const;
+ bool IsIPCSerializable() const;
+ bool IsCloneable() const;
+ bool IsAsyncInputStream() const;
+ bool IsInputStreamLength() const;
+ bool IsAsyncInputStreamLength() const;
+
+ Mutex mLock; // Protects access to all data members.
+
+ nsTArray<StreamData> mStreams MOZ_GUARDED_BY(mLock);
+
+ uint32_t mCurrentStream MOZ_GUARDED_BY(mLock);
+ bool mStartedReadingCurrent MOZ_GUARDED_BY(mLock);
+ nsresult mStatus MOZ_GUARDED_BY(mLock);
+ nsCOMPtr<nsIInputStreamCallback> mAsyncWaitCallback MOZ_GUARDED_BY(mLock);
+ uint32_t mAsyncWaitFlags MOZ_GUARDED_BY(mLock);
+ uint32_t mAsyncWaitRequestedCount MOZ_GUARDED_BY(mLock);
+ nsCOMPtr<nsIEventTarget> mAsyncWaitEventTarget MOZ_GUARDED_BY(mLock);
+ nsCOMPtr<nsIInputStreamLengthCallback> mAsyncWaitLengthCallback
+ MOZ_GUARDED_BY(mLock);
+
+ class AsyncWaitLengthHelper;
+ RefPtr<AsyncWaitLengthHelper> mAsyncWaitLengthHelper MOZ_GUARDED_BY(mLock);
+
+ uint32_t mSeekableStreams MOZ_GUARDED_BY(mLock);
+ uint32_t mIPCSerializableStreams MOZ_GUARDED_BY(mLock);
+ uint32_t mCloneableStreams MOZ_GUARDED_BY(mLock);
+
+ // These are Atomics so that we can check them in QueryInterface without
+ // taking a lock (to look at mStreams.Length() and the numbers above)
+ // With no streams added yet, all of these are possible
+ Atomic<bool, Relaxed> mIsSeekableStream{true};
+ Atomic<bool, Relaxed> mIsIPCSerializableStream{true};
+ Atomic<bool, Relaxed> mIsCloneableStream{true};
+
+ Atomic<bool, Relaxed> mIsAsyncInputStream{false};
+ Atomic<bool, Relaxed> mIsInputStreamLength{false};
+ Atomic<bool, Relaxed> mIsAsyncInputStreamLength{false};
+};
+
+NS_IMPL_ADDREF(nsMultiplexInputStream)
+NS_IMPL_RELEASE(nsMultiplexInputStream)
+
+NS_IMPL_CLASSINFO(nsMultiplexInputStream, nullptr, nsIClassInfo::THREADSAFE,
+ NS_MULTIPLEXINPUTSTREAM_CID)
+
+NS_INTERFACE_MAP_BEGIN(nsMultiplexInputStream)
+ NS_INTERFACE_MAP_ENTRY(nsIMultiplexInputStream)
+ NS_INTERFACE_MAP_ENTRY(nsIInputStream)
+ NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsISeekableStream, IsSeekable())
+ NS_INTERFACE_MAP_ENTRY(nsITellableStream)
+ NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIIPCSerializableInputStream,
+ IsIPCSerializable())
+ NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsICloneableInputStream, IsCloneable())
+ NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStream, IsAsyncInputStream())
+ NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamCallback,
+ IsAsyncInputStream())
+ NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamLength,
+ IsInputStreamLength())
+ NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStreamLength,
+ IsAsyncInputStreamLength())
+ NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIMultiplexInputStream)
+ NS_IMPL_QUERY_CLASSINFO(nsMultiplexInputStream)
+NS_INTERFACE_MAP_END
+
+NS_IMPL_CI_INTERFACE_GETTER(nsMultiplexInputStream, nsIMultiplexInputStream,
+ nsIInputStream, nsISeekableStream,
+ nsITellableStream)
+
+static nsresult AvailableMaybeSeek(nsMultiplexInputStream::StreamData& aStream,
+ uint64_t* aResult) {
+ nsresult rv = aStream.mBufferedStream->Available(aResult);
+ if (rv == NS_BASE_STREAM_CLOSED) {
+ // Blindly seek to the current position if Available() returns
+ // NS_BASE_STREAM_CLOSED.
+ // If nsIFileInputStream is closed in Read() due to CLOSE_ON_EOF flag,
+ // Seek() could reopen the file if REOPEN_ON_REWIND flag is set.
+ if (aStream.mSeekableStream) {
+ nsresult rvSeek =
+ aStream.mSeekableStream->Seek(nsISeekableStream::NS_SEEK_CUR, 0);
+ if (NS_SUCCEEDED(rvSeek)) {
+ rv = aStream.mBufferedStream->Available(aResult);
+ }
+ }
+ }
+ return rv;
+}
+
+nsMultiplexInputStream::nsMultiplexInputStream()
+ : mLock("nsMultiplexInputStream lock"),
+ mCurrentStream(0),
+ mStartedReadingCurrent(false),
+ mStatus(NS_OK),
+ mAsyncWaitFlags(0),
+ mAsyncWaitRequestedCount(0),
+ mSeekableStreams(0),
+ mIPCSerializableStreams(0),
+ mCloneableStreams(0) {}
+
+NS_IMETHODIMP
+nsMultiplexInputStream::GetCount(uint32_t* aCount) {
+ MutexAutoLock lock(mLock);
+ *aCount = mStreams.Length();
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+nsMultiplexInputStream::AppendStream(nsIInputStream* aStream) {
+ MutexAutoLock lock(mLock);
+
+ StreamData* streamData = mStreams.AppendElement(fallible);
+ if (NS_WARN_IF(!streamData)) {
+ return NS_ERROR_OUT_OF_MEMORY;
+ }
+
+ nsresult rv = streamData->Initialize(aStream);
+ NS_ENSURE_SUCCESS(rv, rv);
+
+ UpdateQIMap(*streamData);
+
+ if (mStatus == NS_BASE_STREAM_CLOSED) {
+ // We were closed, but now we have more data to read.
+ mStatus = NS_OK;
+ }
+
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+nsMultiplexInputStream::GetStream(uint32_t aIndex, nsIInputStream** aResult) {
+ MutexAutoLock lock(mLock);
+
+ if (aIndex >= mStreams.Length()) {
+ return NS_ERROR_NOT_AVAILABLE;
+ }
+
+ StreamData& streamData = mStreams.ElementAt(aIndex);
+ nsCOMPtr<nsIInputStream> stream = streamData.mOriginalStream;
+ stream.forget(aResult);
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+nsMultiplexInputStream::Close() {
+ nsTArray<nsCOMPtr<nsIInputStream>> streams;
+
+ // Let's take a copy of the streams becuase, calling close() it could trigger
+ // a nsIInputStreamCallback immediately and we don't want to create a deadlock
+ // with mutex.
+ {
+ MutexAutoLock lock(mLock);
+ uint32_t len = mStreams.Length();
+ for (uint32_t i = 0; i < len; ++i) {
+ if (NS_WARN_IF(
+ !streams.AppendElement(mStreams[i].mBufferedStream, fallible))) {
+ mStatus = NS_BASE_STREAM_CLOSED;
+ return NS_ERROR_OUT_OF_MEMORY;
+ }
+ }
+ mStatus = NS_BASE_STREAM_CLOSED;
+ }
+
+ nsresult rv = NS_OK;
+
+ uint32_t len = streams.Length();
+ for (uint32_t i = 0; i < len; ++i) {
+ nsresult rv2 = streams[i]->Close();
+ // We still want to close all streams, but we should return an error
+ if (NS_FAILED(rv2)) {
+ rv = rv2;
+ }
+ }
+
+ return rv;
+}
+
+NS_IMETHODIMP
+nsMultiplexInputStream::Available(uint64_t* aResult) {
+ *aResult = 0;
+
+ MutexAutoLock lock(mLock);
+ if (NS_FAILED(mStatus)) {
+ return mStatus;
+ }
+
+ uint64_t avail = 0;
+ nsresult rv = NS_BASE_STREAM_CLOSED;
+
+ uint32_t len = mStreams.Length();
+ for (uint32_t i = mCurrentStream; i < len; i++) {
+ uint64_t streamAvail;
+ rv = AvailableMaybeSeek(mStreams[i], &streamAvail);
+ if (rv == NS_BASE_STREAM_CLOSED) {
+ // If a stream is closed, we continue with the next one.
+ // If this is the current stream we move to the following stream.
+ if (mCurrentStream == i) {
+ NextStream();
+ }
+
+ // If this is the last stream, we want to return this error code.
+ continue;
+ }
+
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ mStatus = rv;
+ return mStatus;
+ }
+
+ // If the current stream is async, we have to return what we have so far
+ // without processing the following streams. This is needed because
+ // ::Available should return only what is currently available. In case of an
+ // nsIAsyncInputStream, we have to call AsyncWait() in order to read more.
+ if (mStreams[i].mAsyncStream) {
+ avail += streamAvail;
+ break;
+ }
+
+ if (streamAvail == 0) {
+ // Nothing to read for this stream. Let's move to the next one.
+ continue;
+ }
+
+ avail += streamAvail;
+ }
+
+ // We still have something to read. We don't want to return an error code yet.
+ if (avail) {
+ *aResult = avail;
+ return NS_OK;
+ }
+
+ // Let's propagate the last error message.
+ mStatus = rv;
+ return rv;
+}
+
+NS_IMETHODIMP
+nsMultiplexInputStream::Read(char* aBuf, uint32_t aCount, uint32_t* aResult) {
+ MutexAutoLock lock(mLock);
+ // It is tempting to implement this method in terms of ReadSegments, but
+ // that would prevent this class from being used with streams that only
+ // implement Read (e.g., file streams).
+
+ *aResult = 0;
+
+ if (mStatus == NS_BASE_STREAM_CLOSED) {
+ return NS_OK;
+ }
+ if (NS_FAILED(mStatus)) {
+ return mStatus;
+ }
+
+ nsresult rv = NS_OK;
+
+ uint32_t len = mStreams.Length();
+ while (mCurrentStream < len && aCount) {
+ uint32_t read;
+ rv = mStreams[mCurrentStream].mBufferedStream->Read(aBuf, aCount, &read);
+
+ // XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
+ // (This is a bug in those stream implementations)
+ if (rv == NS_BASE_STREAM_CLOSED) {
+ MOZ_ASSERT_UNREACHABLE(
+ "Input stream's Read method returned "
+ "NS_BASE_STREAM_CLOSED");
+ rv = NS_OK;
+ read = 0;
+ } else if (NS_FAILED(rv)) {
+ break;
+ }
+
+ if (read == 0) {
+ NextStream();
+ } else {
+ NS_ASSERTION(aCount >= read, "Read more than requested");
+ *aResult += read;
+ aCount -= read;
+ aBuf += read;
+ mStartedReadingCurrent = true;
+
+ mStreams[mCurrentStream].mCurrentPos += read;
+ }
+ }
+ return *aResult ? NS_OK : rv;
+}
+
+NS_IMETHODIMP
+nsMultiplexInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
+ uint32_t aCount, uint32_t* aResult) {
+ MutexAutoLock lock(mLock);
+
+ if (mStatus == NS_BASE_STREAM_CLOSED) {
+ *aResult = 0;
+ return NS_OK;
+ }
+ if (NS_FAILED(mStatus)) {
+ return mStatus;
+ }
+
+ NS_ASSERTION(aWriter, "missing aWriter");
+
+ nsresult rv = NS_OK;
+ ReadSegmentsState state;
+ state.mThisStream = this;
+ state.mOffset = 0;
+ state.mWriter = aWriter;
+ state.mClosure = aClosure;
+ state.mDone = false;
+
+ uint32_t len = mStreams.Length();
+ while (mCurrentStream < len && aCount) {
+ uint32_t read;
+ rv = mStreams[mCurrentStream].mBufferedStream->ReadSegments(
+ ReadSegCb, &state, aCount, &read);
+
+ // XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
+ // (This is a bug in those stream implementations)
+ if (rv == NS_BASE_STREAM_CLOSED) {
+ MOZ_ASSERT_UNREACHABLE(
+ "Input stream's Read method returned "
+ "NS_BASE_STREAM_CLOSED");
+ rv = NS_OK;
+ read = 0;
+ }
+
+ // if |aWriter| decided to stop reading segments...
+ if (state.mDone || NS_FAILED(rv)) {
+ break;
+ }
+
+ // if stream is empty, then advance to the next stream.
+ if (read == 0) {
+ NextStream();
+ } else {
+ NS_ASSERTION(aCount >= read, "Read more than requested");
+ state.mOffset += read;
+ aCount -= read;
+ mStartedReadingCurrent = true;
+
+ mStreams[mCurrentStream].mCurrentPos += read;
+ }
+ }
+
+ // if we successfully read some data, then this call succeeded.
+ *aResult = state.mOffset;
+ return state.mOffset ? NS_OK : rv;
+}
+
+nsresult nsMultiplexInputStream::ReadSegCb(nsIInputStream* aIn, void* aClosure,
+ const char* aFromRawSegment,
+ uint32_t aToOffset, uint32_t aCount,
+ uint32_t* aWriteCount) {
+ nsresult rv;
+ ReadSegmentsState* state = (ReadSegmentsState*)aClosure;
+ rv = (state->mWriter)(state->mThisStream, state->mClosure, aFromRawSegment,
+ aToOffset + state->mOffset, aCount, aWriteCount);
+ if (NS_FAILED(rv)) {
+ state->mDone = true;
+ }
+ return rv;
+}
+
+NS_IMETHODIMP
+nsMultiplexInputStream::IsNonBlocking(bool* aNonBlocking) {
+ MutexAutoLock lock(mLock);
+
+ uint32_t len = mStreams.Length();
+ if (len == 0) {
+ // Claim to be non-blocking, since we won't block the caller.
+ *aNonBlocking = true;
+ return NS_OK;
+ }
+
+ for (uint32_t i = 0; i < len; ++i) {
+ nsresult rv = mStreams[i].mBufferedStream->IsNonBlocking(aNonBlocking);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+ // If one is blocking the entire stream becomes blocking.
+ if (!*aNonBlocking) {
+ return NS_OK;
+ }
+ }
+
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+nsMultiplexInputStream::Seek(int32_t aWhence, int64_t aOffset) {
+ MutexAutoLock lock(mLock);
+
+ if (NS_FAILED(mStatus)) {
+ return mStatus;
+ }
+
+ nsresult rv;
+
+ uint32_t oldCurrentStream = mCurrentStream;
+ bool oldStartedReadingCurrent = mStartedReadingCurrent;
+
+ if (aWhence == NS_SEEK_SET) {
+ int64_t remaining = aOffset;
+ if (aOffset == 0) {
+ mCurrentStream = 0;
+ }
+ for (uint32_t i = 0; i < mStreams.Length(); ++i) {
+ nsCOMPtr<nsISeekableStream> stream = mStreams[i].mSeekableStream;
+ if (!stream) {
+ return NS_ERROR_FAILURE;
+ }
+
+ // See if all remaining streams should be rewound
+ if (remaining == 0) {
+ if (i < oldCurrentStream ||
+ (i == oldCurrentStream && oldStartedReadingCurrent)) {
+ rv = stream->Seek(NS_SEEK_SET, 0);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ mStreams[i].mCurrentPos = 0;
+ continue;
+ } else {
+ break;
+ }
+ }
+
+ // Get position in the current stream
+ int64_t streamPos;
+ if (i > oldCurrentStream ||
+ (i == oldCurrentStream && !oldStartedReadingCurrent)) {
+ streamPos = 0;
+ } else {
+ streamPos = mStreams[i].mCurrentPos;
+ }
+
+ // See if we need to seek the current stream forward or backward
+ if (remaining < streamPos) {
+ rv = stream->Seek(NS_SEEK_SET, remaining);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ mStreams[i].mCurrentPos = remaining;
+ mCurrentStream = i;
+ mStartedReadingCurrent = remaining != 0;
+
+ remaining = 0;
+ } else if (remaining > streamPos) {
+ if (i < oldCurrentStream) {
+ // We're already at end so no need to seek this stream
+ remaining -= streamPos;
+ NS_ASSERTION(remaining >= 0, "Remaining invalid");
+ } else {
+ uint64_t avail;
+ rv = AvailableMaybeSeek(mStreams[i], &avail);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ int64_t newPos = XPCOM_MIN(remaining, streamPos + (int64_t)avail);
+
+ rv = stream->Seek(NS_SEEK_SET, newPos);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ mStreams[i].mCurrentPos = newPos;
+ mCurrentStream = i;
+ mStartedReadingCurrent = true;
+
+ remaining -= newPos;
+ NS_ASSERTION(remaining >= 0, "Remaining invalid");
+ }
+ } else {
+ NS_ASSERTION(remaining == streamPos, "Huh?");
+ MOZ_ASSERT(remaining != 0, "Zero remaining should be handled earlier");
+ remaining = 0;
+ mCurrentStream = i;
+ mStartedReadingCurrent = true;
+ }
+ }
+
+ return NS_OK;
+ }
+
+ if (aWhence == NS_SEEK_CUR && aOffset > 0) {
+ int64_t remaining = aOffset;
+ for (uint32_t i = mCurrentStream; remaining && i < mStreams.Length(); ++i) {
+ uint64_t avail;
+ rv = AvailableMaybeSeek(mStreams[i], &avail);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ int64_t seek = XPCOM_MIN((int64_t)avail, remaining);
+
+ rv = mStreams[i].mSeekableStream->Seek(NS_SEEK_CUR, seek);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ mStreams[i].mCurrentPos += seek;
+ mCurrentStream = i;
+ mStartedReadingCurrent = true;
+
+ remaining -= seek;
+ }
+
+ return NS_OK;
+ }
+
+ if (aWhence == NS_SEEK_CUR && aOffset < 0) {
+ int64_t remaining = -aOffset;
+ for (uint32_t i = mCurrentStream; remaining && i != (uint32_t)-1; --i) {
+ int64_t pos = mStreams[i].mCurrentPos;
+
+ int64_t seek = XPCOM_MIN(pos, remaining);
+
+ rv = mStreams[i].mSeekableStream->Seek(NS_SEEK_CUR, -seek);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ mStreams[i].mCurrentPos -= seek;
+ mCurrentStream = i;
+ mStartedReadingCurrent = seek != -pos;
+
+ remaining -= seek;
+ }
+
+ return NS_OK;
+ }
+
+ if (aWhence == NS_SEEK_CUR) {
+ NS_ASSERTION(aOffset == 0, "Should have handled all non-zero values");
+
+ return NS_OK;
+ }
+
+ if (aWhence == NS_SEEK_END) {
+ if (aOffset > 0) {
+ return NS_ERROR_INVALID_ARG;
+ }
+
+ int64_t remaining = aOffset;
+ int32_t i;
+ for (i = mStreams.Length() - 1; i >= 0; --i) {
+ nsCOMPtr<nsISeekableStream> stream = mStreams[i].mSeekableStream;
+
+ uint64_t avail;
+ rv = AvailableMaybeSeek(mStreams[i], &avail);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ int64_t streamLength = avail + mStreams[i].mCurrentPos;
+
+ // The seek(END) can be completed in the current stream.
+ if (streamLength >= DeprecatedAbs(remaining)) {
+ rv = stream->Seek(NS_SEEK_END, remaining);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ mStreams[i].mCurrentPos = streamLength + remaining;
+ mCurrentStream = i;
+ mStartedReadingCurrent = true;
+ break;
+ }
+
+ // We are at the beginning of this stream.
+ rv = stream->Seek(NS_SEEK_SET, 0);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ remaining += streamLength;
+ mStreams[i].mCurrentPos = 0;
+ }
+
+ // Any other stream must be set to the end.
+ for (--i; i >= 0; --i) {
+ nsCOMPtr<nsISeekableStream> stream = mStreams[i].mSeekableStream;
+
+ uint64_t avail;
+ rv = AvailableMaybeSeek(mStreams[i], &avail);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ int64_t streamLength = avail + mStreams[i].mCurrentPos;
+
+ rv = stream->Seek(NS_SEEK_END, 0);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ mStreams[i].mCurrentPos = streamLength;
+ }
+
+ return NS_OK;
+ }
+
+ // other Seeks not implemented yet
+ return NS_ERROR_NOT_IMPLEMENTED;
+}
+
+NS_IMETHODIMP
+nsMultiplexInputStream::Tell(int64_t* aResult) {
+ MutexAutoLock lock(mLock);
+
+ if (NS_FAILED(mStatus)) {
+ return mStatus;
+ }
+
+ int64_t ret64 = 0;
+#ifdef DEBUG
+ bool zeroFound = false;
+#endif
+
+ for (uint32_t i = 0; i < mStreams.Length(); ++i) {
+ ret64 += mStreams[i].mCurrentPos;
+
+#ifdef DEBUG
+ // When we see 1 stream with currentPos = 0, all the remaining streams must
+ // be set to 0 as well.
+ MOZ_ASSERT_IF(zeroFound, mStreams[i].mCurrentPos == 0);
+ if (mStreams[i].mCurrentPos == 0) {
+ zeroFound = true;
+ }
+#endif
+ }
+ *aResult = ret64;
+
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+nsMultiplexInputStream::SetEOF() { return NS_ERROR_NOT_IMPLEMENTED; }
+
+NS_IMETHODIMP
+nsMultiplexInputStream::CloseWithStatus(nsresult aStatus) { return Close(); }
+
+// This class is used to inform nsMultiplexInputStream that it's time to execute
+// the asyncWait callback.
+class AsyncWaitRunnable final : public DiscardableRunnable {
+ RefPtr<nsMultiplexInputStream> mStream;
+
+ public:
+ static void Create(nsMultiplexInputStream* aStream,
+ nsIEventTarget* aEventTarget) {
+ RefPtr<AsyncWaitRunnable> runnable = new AsyncWaitRunnable(aStream);
+ if (aEventTarget) {
+ aEventTarget->Dispatch(runnable.forget(), NS_DISPATCH_NORMAL);
+ } else {
+ runnable->Run();
+ }
+ }
+
+ NS_IMETHOD
+ Run() override {
+ mStream->AsyncWaitCompleted();
+ return NS_OK;
+ }
+
+ private:
+ explicit AsyncWaitRunnable(nsMultiplexInputStream* aStream)
+ : DiscardableRunnable("AsyncWaitRunnable"), mStream(aStream) {
+ MOZ_ASSERT(aStream);
+ }
+};
+
+NS_IMETHODIMP
+nsMultiplexInputStream::AsyncWait(nsIInputStreamCallback* aCallback,
+ uint32_t aFlags, uint32_t aRequestedCount,
+ nsIEventTarget* aEventTarget) {
+ {
+ MutexAutoLock lock(mLock);
+
+ // We must execute the callback also when the stream is closed.
+ if (NS_FAILED(mStatus) && mStatus != NS_BASE_STREAM_CLOSED) {
+ return mStatus;
+ }
+
+ if (NS_WARN_IF(mAsyncWaitCallback && aCallback &&
+ mAsyncWaitCallback != aCallback)) {
+ return NS_ERROR_FAILURE;
+ }
+
+ mAsyncWaitCallback = aCallback;
+ mAsyncWaitFlags = aFlags;
+ mAsyncWaitRequestedCount = aRequestedCount;
+ mAsyncWaitEventTarget = aEventTarget;
+ }
+
+ return AsyncWaitInternal();
+}
+
+nsresult nsMultiplexInputStream::AsyncWaitInternal() {
+ nsCOMPtr<nsIAsyncInputStream> stream;
+ nsIInputStreamCallback* asyncWaitCallback = nullptr;
+ uint32_t asyncWaitFlags = 0;
+ uint32_t asyncWaitRequestedCount = 0;
+ nsCOMPtr<nsIEventTarget> asyncWaitEventTarget;
+
+ {
+ MutexAutoLock lock(mLock);
+
+ // Let's take the first async stream if we are not already closed, and if
+ // it has data to read or if it async.
+ if (mStatus != NS_BASE_STREAM_CLOSED) {
+ for (; mCurrentStream < mStreams.Length(); NextStream()) {
+ stream = mStreams[mCurrentStream].mAsyncStream;
+ if (stream) {
+ break;
+ }
+
+ uint64_t avail = 0;
+ nsresult rv = AvailableMaybeSeek(mStreams[mCurrentStream], &avail);
+ if (rv == NS_BASE_STREAM_CLOSED || (NS_SUCCEEDED(rv) && avail == 0)) {
+ // Nothing to read here. Let's move on.
+ continue;
+ }
+
+ if (NS_FAILED(rv)) {
+ return rv;
+ }
+
+ break;
+ }
+ }
+
+ asyncWaitCallback = mAsyncWaitCallback ? this : nullptr;
+ asyncWaitFlags = mAsyncWaitFlags;
+ asyncWaitRequestedCount = mAsyncWaitRequestedCount;
+ asyncWaitEventTarget = mAsyncWaitEventTarget;
+
+ MOZ_ASSERT_IF(stream, NS_SUCCEEDED(mStatus));
+ }
+
+ // If we are here it's because we are already closed, or if the current stream
+ // is not async. In both case we have to execute the callback.
+ if (!stream) {
+ if (asyncWaitCallback) {
+ AsyncWaitRunnable::Create(this, asyncWaitEventTarget);
+ }
+ return NS_OK;
+ }
+
+ return stream->AsyncWait(asyncWaitCallback, asyncWaitFlags,
+ asyncWaitRequestedCount, asyncWaitEventTarget);
+}
+
+NS_IMETHODIMP
+nsMultiplexInputStream::OnInputStreamReady(nsIAsyncInputStream* aStream) {
+ nsCOMPtr<nsIInputStreamCallback> callback;
+
+ // When OnInputStreamReady is called, we could be in 2 scenarios:
+ // a. there is something to read;
+ // b. the stream is closed.
+ // But if the stream is closed and it was not the last one, we must proceed
+ // with the following stream in order to have something to read by the callee.
+
+ {
+ MutexAutoLock lock(mLock);
+
+ // The callback has been nullified in the meantime.
+ if (!mAsyncWaitCallback) {
+ return NS_OK;
+ }
+
+ if (NS_SUCCEEDED(mStatus)) {
+ uint64_t avail = 0;
+ nsresult rv = NS_OK;
+ // Only check `Available()` if `aStream` is actually the current stream,
+ // otherwise we'll always want to re-poll, as we got the callback for the
+ // wrong stream.
+ if (mCurrentStream < mStreams.Length() &&
+ aStream == mStreams[mCurrentStream].mAsyncStream) {
+ rv = aStream->Available(&avail);
+ }
+ if (rv == NS_BASE_STREAM_CLOSED || (NS_SUCCEEDED(rv) && avail == 0)) {
+ // This stream is either closed, has no data available, or is not the
+ // current stream. If it is closed and current, move to the next stream,
+ // otherwise re-wait on the current stream until it has data available
+ // or becomes closed.
+ // Unlike streams not implementing nsIAsyncInputStream, async streams
+ // cannot use `Available() == 0` to indicate EOF, so we re-poll in that
+ // situation.
+ if (NS_FAILED(rv)) {
+ NextStream();
+ }
+
+ // Unlock and invoke AsyncWaitInternal to wait again. If this succeeds,
+ // we'll be called again, otherwise fall through and notify.
+ MutexAutoUnlock unlock(mLock);
+ if (NS_SUCCEEDED(AsyncWaitInternal())) {
+ return NS_OK;
+ }
+ }
+ }
+
+ mAsyncWaitCallback.swap(callback);
+ mAsyncWaitEventTarget = nullptr;
+ }
+
+ return callback ? callback->OnInputStreamReady(this) : NS_OK;
+}
+
+void nsMultiplexInputStream::AsyncWaitCompleted() {
+ nsCOMPtr<nsIInputStreamCallback> callback;
+
+ {
+ MutexAutoLock lock(mLock);
+
+ // The callback has been nullified in the meantime.
+ if (!mAsyncWaitCallback) {
+ return;
+ }
+
+ mAsyncWaitCallback.swap(callback);
+ mAsyncWaitEventTarget = nullptr;
+ }
+
+ callback->OnInputStreamReady(this);
+}
+
+nsresult nsMultiplexInputStreamConstructor(REFNSIID aIID, void** aResult) {
+ *aResult = nullptr;
+
+ RefPtr<nsMultiplexInputStream> inst = new nsMultiplexInputStream();
+
+ return inst->QueryInterface(aIID, aResult);
+}
+
+void nsMultiplexInputStream::SerializedComplexity(uint32_t aMaxSize,
+ uint32_t* aSizeUsed,
+ uint32_t* aPipes,
+ uint32_t* aTransferables) {
+ MutexAutoLock lock(mLock);
+ bool serializeAsPipe = false;
+ SerializedComplexityInternal(aMaxSize, aSizeUsed, aPipes, aTransferables,
+ &serializeAsPipe);
+}
+
+void nsMultiplexInputStream::SerializedComplexityInternal(
+ uint32_t aMaxSize, uint32_t* aSizeUsed, uint32_t* aPipes,
+ uint32_t* aTransferables, bool* aSerializeAsPipe) {
+ mLock.AssertCurrentThreadOwns();
+ CheckedUint32 totalSizeUsed = 0;
+ CheckedUint32 totalPipes = 0;
+ CheckedUint32 totalTransferables = 0;
+ CheckedUint32 maxSize = aMaxSize;
+
+ uint32_t streamCount = mStreams.Length();
+
+ for (uint32_t index = 0; index < streamCount; index++) {
+ uint32_t sizeUsed = 0;
+ uint32_t pipes = 0;
+ uint32_t transferables = 0;
+ InputStreamHelper::SerializedComplexity(mStreams[index].mOriginalStream,
+ maxSize.value(), &sizeUsed, &pipes,
+ &transferables);
+
+ MOZ_ASSERT(maxSize.value() >= sizeUsed);
+
+ maxSize -= sizeUsed;
+ MOZ_DIAGNOSTIC_ASSERT(maxSize.isValid());
+ totalSizeUsed += sizeUsed;
+ MOZ_DIAGNOSTIC_ASSERT(totalSizeUsed.isValid());
+ totalPipes += pipes;
+ MOZ_DIAGNOSTIC_ASSERT(totalPipes.isValid());
+ totalTransferables += transferables;
+ MOZ_DIAGNOSTIC_ASSERT(totalTransferables.isValid());
+ }
+
+ // If the combination of all streams when serialized independently is
+ // sufficiently complex, we may choose to serialize it as a pipe to limit the
+ // complexity of the payload.
+ if (totalTransferables.value() == 0) {
+ // If there are no transferables within our serialization, and it would
+ // contain at least one pipe, serialize the entire payload as a pipe for
+ // simplicity.
+ *aSerializeAsPipe = totalSizeUsed.value() > 0 && totalPipes.value() > 0;
+ } else {
+ // Otherwise, we may want to still serialize in segments to take advantage
+ // of the efficiency of serializing transferables. We'll only serialize as a
+ // pipe if the total attachment count exceeds kMaxAttachmentThreshold.
+ static constexpr uint32_t kMaxAttachmentThreshold = 8;
+ CheckedUint32 totalAttachments = totalPipes + totalTransferables;
+ *aSerializeAsPipe = !totalAttachments.isValid() ||
+ totalAttachments.value() > kMaxAttachmentThreshold;
+ }
+
+ if (*aSerializeAsPipe) {
+ NS_WARNING(
+ nsPrintfCString("Choosing to serialize multiplex stream as a pipe "
+ "(would be %u bytes, %u pipes, %u transferables)",
+ totalSizeUsed.value(), totalPipes.value(),
+ totalTransferables.value())
+ .get());
+ *aSizeUsed = 0;
+ *aPipes = 1;
+ *aTransferables = 0;
+ } else {
+ *aSizeUsed = totalSizeUsed.value();
+ *aPipes = totalPipes.value();
+ *aTransferables = totalTransferables.value();
+ }
+}
+
+void nsMultiplexInputStream::Serialize(InputStreamParams& aParams,
+ uint32_t aMaxSize, uint32_t* aSizeUsed) {
+ MutexAutoLock lock(mLock);
+
+ // Check if we should serialize this stream as a pipe to reduce complexity.
+ uint32_t dummySizeUsed = 0, dummyPipes = 0, dummyTransferables = 0;
+ bool serializeAsPipe = false;
+ SerializedComplexityInternal(aMaxSize, &dummySizeUsed, &dummyPipes,
+ &dummyTransferables, &serializeAsPipe);
+ if (serializeAsPipe) {
+ *aSizeUsed = 0;
+ MutexAutoUnlock unlock(mLock);
+ InputStreamHelper::SerializeInputStreamAsPipe(this, aParams);
+ return;
+ }
+
+ MultiplexInputStreamParams params;
+
+ CheckedUint32 totalSizeUsed = 0;
+ CheckedUint32 maxSize = aMaxSize;
+
+ uint32_t streamCount = mStreams.Length();
+ if (streamCount) {
+ nsTArray<InputStreamParams>& streams = params.streams();
+
+ streams.SetCapacity(streamCount);
+ for (uint32_t index = 0; index < streamCount; index++) {
+ uint32_t sizeUsed = 0;
+ InputStreamHelper::SerializeInputStream(mStreams[index].mOriginalStream,
+ *streams.AppendElement(),
+ maxSize.value(), &sizeUsed);
+
+ MOZ_ASSERT(maxSize.value() >= sizeUsed);
+
+ maxSize -= sizeUsed;
+ MOZ_DIAGNOSTIC_ASSERT(maxSize.isValid());
+
+ totalSizeUsed += sizeUsed;
+ MOZ_DIAGNOSTIC_ASSERT(totalSizeUsed.isValid());
+ }
+ }
+
+ params.currentStream() = mCurrentStream;
+ params.status() = mStatus;
+ params.startedReadingCurrent() = mStartedReadingCurrent;
+
+ aParams = std::move(params);
+
+ MOZ_ASSERT(aSizeUsed);
+ *aSizeUsed = totalSizeUsed.value();
+}
+
+bool nsMultiplexInputStream::Deserialize(const InputStreamParams& aParams) {
+ if (aParams.type() != InputStreamParams::TMultiplexInputStreamParams) {
+ NS_ERROR("Received unknown parameters from the other process!");
+ return false;
+ }
+
+ const MultiplexInputStreamParams& params =
+ aParams.get_MultiplexInputStreamParams();
+
+ const nsTArray<InputStreamParams>& streams = params.streams();
+
+ uint32_t streamCount = streams.Length();
+ for (uint32_t index = 0; index < streamCount; index++) {
+ nsCOMPtr<nsIInputStream> stream =
+ InputStreamHelper::DeserializeInputStream(streams[index]);
+ if (!stream) {
+ NS_WARNING("Deserialize failed!");
+ return false;
+ }
+
+ if (NS_FAILED(AppendStream(stream))) {
+ NS_WARNING("AppendStream failed!");
+ return false;
+ }
+ }
+
+ MutexAutoLock lock(mLock);
+ mCurrentStream = params.currentStream();
+ mStatus = params.status();
+ mStartedReadingCurrent = params.startedReadingCurrent();
+
+ return true;
+}
+
+NS_IMETHODIMP
+nsMultiplexInputStream::GetCloneable(bool* aCloneable) {
+ MutexAutoLock lock(mLock);
+ // XXXnsm Cloning a multiplex stream which has started reading is not
+ // permitted right now.
+ if (mCurrentStream > 0 || mStartedReadingCurrent) {
+ *aCloneable = false;
+ return NS_OK;
+ }
+
+ uint32_t len = mStreams.Length();
+ for (uint32_t i = 0; i < len; ++i) {
+ nsCOMPtr<nsICloneableInputStream> cis =
+ do_QueryInterface(mStreams[i].mBufferedStream);
+ if (!cis || !cis->GetCloneable()) {
+ *aCloneable = false;
+ return NS_OK;
+ }
+ }
+
+ *aCloneable = true;
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+nsMultiplexInputStream::Clone(nsIInputStream** aClone) {
+ MutexAutoLock lock(mLock);
+
+ // XXXnsm Cloning a multiplex stream which has started reading is not
+ // permitted right now.
+ if (mCurrentStream > 0 || mStartedReadingCurrent) {
+ return NS_ERROR_FAILURE;
+ }
+
+ RefPtr<nsMultiplexInputStream> clone = new nsMultiplexInputStream();
+
+ nsresult rv;
+ uint32_t len = mStreams.Length();
+ for (uint32_t i = 0; i < len; ++i) {
+ nsCOMPtr<nsICloneableInputStream> substream =
+ do_QueryInterface(mStreams[i].mBufferedStream);
+ if (NS_WARN_IF(!substream)) {
+ return NS_ERROR_FAILURE;
+ }
+
+ nsCOMPtr<nsIInputStream> clonedSubstream;
+ rv = substream->Clone(getter_AddRefs(clonedSubstream));
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ rv = clone->AppendStream(clonedSubstream);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+ }
+
+ clone.forget(aClone);
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+nsMultiplexInputStream::Length(int64_t* aLength) {
+ MutexAutoLock lock(mLock);
+
+ if (mCurrentStream > 0 || mStartedReadingCurrent) {
+ return NS_ERROR_NOT_AVAILABLE;
+ }
+
+ CheckedInt64 length = 0;
+ nsresult retval = NS_OK;
+
+ for (uint32_t i = 0, len = mStreams.Length(); i < len; ++i) {
+ nsCOMPtr<nsIInputStreamLength> substream =
+ do_QueryInterface(mStreams[i].mBufferedStream);
+ if (!substream) {
+ // Let's use available as fallback.
+ uint64_t streamAvail = 0;
+ nsresult rv = AvailableMaybeSeek(mStreams[i], &streamAvail);
+ if (rv == NS_BASE_STREAM_CLOSED) {
+ continue;
+ }
+
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ mStatus = rv;
+ return mStatus;
+ }
+
+ length += streamAvail;
+ if (!length.isValid()) {
+ return NS_ERROR_OUT_OF_MEMORY;
+ }
+
+ continue;
+ }
+
+ int64_t size = 0;
+ nsresult rv = substream->Length(&size);
+ if (rv == NS_BASE_STREAM_CLOSED) {
+ continue;
+ }
+
+ if (rv == NS_ERROR_NOT_AVAILABLE) {
+ return rv;
+ }
+
+ // If one stream blocks, we all block.
+ if (rv != NS_BASE_STREAM_WOULD_BLOCK && NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ // We want to return WOULD_BLOCK if there is 1 stream that blocks. But want
+ // to see if there are other streams with length = -1.
+ if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
+ retval = NS_BASE_STREAM_WOULD_BLOCK;
+ continue;
+ }
+
+ // If one of the stream doesn't know the size, we all don't know the size.
+ if (size == -1) {
+ *aLength = -1;
+ return NS_OK;
+ }
+
+ length += size;
+ if (!length.isValid()) {
+ return NS_ERROR_OUT_OF_MEMORY;
+ }
+ }
+
+ *aLength = length.value();
+ return retval;
+}
+
+class nsMultiplexInputStream::AsyncWaitLengthHelper final
+ : public nsIInputStreamLengthCallback {
+ public:
+ NS_DECL_THREADSAFE_ISUPPORTS
+
+ AsyncWaitLengthHelper()
+ : mStreamNotified(false), mLength(0), mNegativeSize(false) {}
+
+ bool AddStream(nsIAsyncInputStreamLength* aStream) {
+ return mPendingStreams.AppendElement(aStream, fallible);
+ }
+
+ bool AddSize(int64_t aSize) {
+ MOZ_ASSERT(!mNegativeSize);
+
+ mLength += aSize;
+ return mLength.isValid();
+ }
+
+ void NegativeSize() {
+ MOZ_ASSERT(!mNegativeSize);
+ mNegativeSize = true;
+ }
+
+ nsresult Proceed(nsMultiplexInputStream* aParentStream,
+ nsIEventTarget* aEventTarget,
+ const MutexAutoLock& aProofOfLock) {
+ MOZ_ASSERT(!mStream);
+
+ // If we don't need to wait, let's inform the callback immediately.
+ if (mPendingStreams.IsEmpty() || mNegativeSize) {
+ RefPtr<nsMultiplexInputStream> parentStream = aParentStream;
+ int64_t length = -1;
+ if (!mNegativeSize && mLength.isValid()) {
+ length = mLength.value();
+ }
+ nsCOMPtr<nsIRunnable> r = NS_NewRunnableFunction(
+ "AsyncWaitLengthHelper", [parentStream, length]() {
+ MutexAutoLock lock(parentStream->GetLock());
+ parentStream->AsyncWaitCompleted(length, lock);
+ });
+ return aEventTarget->Dispatch(r.forget(), NS_DISPATCH_NORMAL);
+ }
+
+ // Let's store the callback and the parent stream until we have
+ // notifications from the async length streams.
+
+ mStream = aParentStream;
+
+ // Let's activate all the pending streams.
+ for (nsIAsyncInputStreamLength* stream : mPendingStreams) {
+ nsresult rv = stream->AsyncLengthWait(this, aEventTarget);
+ if (rv == NS_BASE_STREAM_CLOSED) {
+ continue;
+ }
+
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+ }
+
+ return NS_OK;
+ }
+
+ NS_IMETHOD
+ OnInputStreamLengthReady(nsIAsyncInputStreamLength* aStream,
+ int64_t aLength) override {
+ MutexAutoLock lock(mStream->GetLock());
+
+ MOZ_ASSERT(mPendingStreams.Contains(aStream));
+ mPendingStreams.RemoveElement(aStream);
+
+ // Already notified.
+ if (mStreamNotified) {
+ return NS_OK;
+ }
+
+ if (aLength == -1) {
+ mNegativeSize = true;
+ } else {
+ mLength += aLength;
+ if (!mLength.isValid()) {
+ mNegativeSize = true;
+ }
+ }
+
+ // We need to wait.
+ if (!mNegativeSize && !mPendingStreams.IsEmpty()) {
+ return NS_OK;
+ }
+
+ // Let's notify the parent stream.
+ mStreamNotified = true;
+ mStream->AsyncWaitCompleted(mNegativeSize ? -1 : mLength.value(), lock);
+ return NS_OK;
+ }
+
+ private:
+ ~AsyncWaitLengthHelper() = default;
+
+ RefPtr<nsMultiplexInputStream> mStream;
+ bool mStreamNotified;
+
+ CheckedInt64 mLength;
+ bool mNegativeSize;
+
+ nsTArray<nsCOMPtr<nsIAsyncInputStreamLength>> mPendingStreams;
+};
+
+NS_IMPL_ISUPPORTS(nsMultiplexInputStream::AsyncWaitLengthHelper,
+ nsIInputStreamLengthCallback)
+
+NS_IMETHODIMP
+nsMultiplexInputStream::AsyncLengthWait(nsIInputStreamLengthCallback* aCallback,
+ nsIEventTarget* aEventTarget) {
+ if (NS_WARN_IF(!aEventTarget)) {
+ return NS_ERROR_NULL_POINTER;
+ }
+
+ MutexAutoLock lock(mLock);
+
+ if (mCurrentStream > 0 || mStartedReadingCurrent) {
+ return NS_ERROR_NOT_AVAILABLE;
+ }
+
+ if (!aCallback) {
+ mAsyncWaitLengthCallback = nullptr;
+ return NS_OK;
+ }
+
+ // We have a pending operation! Let's use this instead of creating a new one.
+ if (mAsyncWaitLengthHelper) {
+ mAsyncWaitLengthCallback = aCallback;
+ return NS_OK;
+ }
+
+ RefPtr<AsyncWaitLengthHelper> helper = new AsyncWaitLengthHelper();
+
+ for (uint32_t i = 0, len = mStreams.Length(); i < len; ++i) {
+ nsCOMPtr<nsIAsyncInputStreamLength> asyncStream =
+ do_QueryInterface(mStreams[i].mBufferedStream);
+ if (asyncStream) {
+ if (NS_WARN_IF(!helper->AddStream(asyncStream))) {
+ return NS_ERROR_OUT_OF_MEMORY;
+ }
+ continue;
+ }
+
+ nsCOMPtr<nsIInputStreamLength> stream =
+ do_QueryInterface(mStreams[i].mBufferedStream);
+ if (!stream) {
+ // Let's use available as fallback.
+ uint64_t streamAvail = 0;
+ nsresult rv = AvailableMaybeSeek(mStreams[i], &streamAvail);
+ if (rv == NS_BASE_STREAM_CLOSED) {
+ continue;
+ }
+
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ mStatus = rv;
+ return mStatus;
+ }
+
+ if (NS_WARN_IF(!helper->AddSize(streamAvail))) {
+ return NS_ERROR_OUT_OF_MEMORY;
+ }
+
+ continue;
+ }
+
+ int64_t size = 0;
+ nsresult rv = stream->Length(&size);
+ if (rv == NS_BASE_STREAM_CLOSED) {
+ continue;
+ }
+
+ MOZ_ASSERT(rv != NS_BASE_STREAM_WOULD_BLOCK,
+ "A nsILengthInutStream returns NS_BASE_STREAM_WOULD_BLOCK but "
+ "it doesn't implement nsIAsyncInputStreamLength.");
+
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ if (size == -1) {
+ helper->NegativeSize();
+ break;
+ }
+
+ if (NS_WARN_IF(!helper->AddSize(size))) {
+ return NS_ERROR_OUT_OF_MEMORY;
+ }
+ }
+
+ nsresult rv = helper->Proceed(this, aEventTarget, lock);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ mAsyncWaitLengthHelper = helper;
+ mAsyncWaitLengthCallback = aCallback;
+ return NS_OK;
+}
+
+void nsMultiplexInputStream::AsyncWaitCompleted(
+ int64_t aLength, const MutexAutoLock& aProofOfLock) {
+ mLock.AssertCurrentThreadOwns();
+
+ nsCOMPtr<nsIInputStreamLengthCallback> callback;
+ callback.swap(mAsyncWaitLengthCallback);
+
+ mAsyncWaitLengthHelper = nullptr;
+
+ // Already canceled.
+ if (!callback) {
+ return;
+ }
+
+ MutexAutoUnlock unlock(mLock);
+ callback->OnInputStreamLengthReady(this, aLength);
+}
+
+#define MAYBE_UPDATE_VALUE_REAL(x, y) \
+ if (y) { \
+ ++x; \
+ }
+
+#define MAYBE_UPDATE_VALUE(x, y) \
+ { \
+ nsCOMPtr<y> substream = do_QueryInterface(aStream.mBufferedStream); \
+ MAYBE_UPDATE_VALUE_REAL(x, substream) \
+ }
+
+#define MAYBE_UPDATE_BOOL(x, y) \
+ if (!x) { \
+ nsCOMPtr<y> substream = do_QueryInterface(aStream.mBufferedStream); \
+ if (substream) { \
+ x = true; \
+ } \
+ }
+
+void nsMultiplexInputStream::UpdateQIMap(StreamData& aStream) {
+ auto length = mStreams.Length();
+
+ MAYBE_UPDATE_VALUE_REAL(mSeekableStreams, aStream.mSeekableStream)
+ mIsSeekableStream = (mSeekableStreams == length);
+ MAYBE_UPDATE_VALUE(mIPCSerializableStreams, nsIIPCSerializableInputStream)
+ mIsIPCSerializableStream = (mIPCSerializableStreams == length);
+ MAYBE_UPDATE_VALUE(mCloneableStreams, nsICloneableInputStream)
+ mIsCloneableStream = (mCloneableStreams == length);
+ // nsMultiplexInputStream is nsIAsyncInputStream if at least 1 of the
+ // substream implements that interface
+ if (!mIsAsyncInputStream && aStream.mAsyncStream) {
+ mIsAsyncInputStream = true;
+ }
+ MAYBE_UPDATE_BOOL(mIsInputStreamLength, nsIInputStreamLength)
+ MAYBE_UPDATE_BOOL(mIsAsyncInputStreamLength, nsIAsyncInputStreamLength)
+}
+
+#undef MAYBE_UPDATE_VALUE
+#undef MAYBE_UPDATE_VALUE_REAL
+#undef MAYBE_UPDATE_BOOL
+
+bool nsMultiplexInputStream::IsSeekable() const { return mIsSeekableStream; }
+
+bool nsMultiplexInputStream::IsIPCSerializable() const {
+ return mIsIPCSerializableStream;
+}
+
+bool nsMultiplexInputStream::IsCloneable() const { return mIsCloneableStream; }
+
+bool nsMultiplexInputStream::IsAsyncInputStream() const {
+ // nsMultiplexInputStream is nsIAsyncInputStream if at least 1 of the
+ // substream implements that interface.
+ return mIsAsyncInputStream;
+}
+
+bool nsMultiplexInputStream::IsInputStreamLength() const {
+ return mIsInputStreamLength;
+}
+
+bool nsMultiplexInputStream::IsAsyncInputStreamLength() const {
+ return mIsAsyncInputStreamLength;
+}