summaryrefslogtreecommitdiffstats
path: root/xpcom/io/nsPipe3.cpp
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
commit36d22d82aa202bb199967e9512281e9a53db42c9 (patch)
tree105e8c98ddea1c1e4784a60a5a6410fa416be2de /xpcom/io/nsPipe3.cpp
parentInitial commit. (diff)
downloadfirefox-esr-upstream.tar.xz
firefox-esr-upstream.zip
Adding upstream version 115.7.0esr.upstream/115.7.0esrupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'xpcom/io/nsPipe3.cpp')
-rw-r--r--xpcom/io/nsPipe3.cpp1884
1 files changed, 1884 insertions, 0 deletions
diff --git a/xpcom/io/nsPipe3.cpp b/xpcom/io/nsPipe3.cpp
new file mode 100644
index 0000000000..3d7486e673
--- /dev/null
+++ b/xpcom/io/nsPipe3.cpp
@@ -0,0 +1,1884 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: set ts=8 sts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#include <algorithm>
+#include "mozilla/Attributes.h"
+#include "mozilla/IntegerPrintfMacros.h"
+#include "mozilla/ReentrantMonitor.h"
+#include "nsIBufferedStreams.h"
+#include "nsICloneableInputStream.h"
+#include "nsIPipe.h"
+#include "nsIEventTarget.h"
+#include "nsITellableStream.h"
+#include "mozilla/RefPtr.h"
+#include "nsSegmentedBuffer.h"
+#include "nsStreamUtils.h"
+#include "nsString.h"
+#include "nsCOMPtr.h"
+#include "nsCRT.h"
+#include "mozilla/Logging.h"
+#include "nsIClassInfoImpl.h"
+#include "nsAlgorithm.h"
+#include "nsPipe.h"
+#include "nsIAsyncInputStream.h"
+#include "nsIAsyncOutputStream.h"
+#include "nsIInputStreamPriority.h"
+#include "nsThreadUtils.h"
+
+using namespace mozilla;
+
+#ifdef LOG
+# undef LOG
+#endif
+//
+// set MOZ_LOG=nsPipe:5
+//
+static LazyLogModule sPipeLog("nsPipe");
+#define LOG(args) MOZ_LOG(sPipeLog, mozilla::LogLevel::Debug, args)
+
+#define DEFAULT_SEGMENT_SIZE 4096
+#define DEFAULT_SEGMENT_COUNT 16
+
+class nsPipe;
+class nsPipeEvents;
+class nsPipeInputStream;
+class nsPipeOutputStream;
+class AutoReadSegment;
+
+namespace {
+
+enum MonitorAction { DoNotNotifyMonitor, NotifyMonitor };
+
+enum SegmentChangeResult { SegmentNotChanged, SegmentAdvanceBufferRead };
+
+} // namespace
+
+//-----------------------------------------------------------------------------
+
+class CallbackHolder {
+ public:
+ CallbackHolder() = default;
+ MOZ_IMPLICIT CallbackHolder(std::nullptr_t) {}
+
+ CallbackHolder(nsIAsyncInputStream* aStream,
+ nsIInputStreamCallback* aCallback, uint32_t aFlags,
+ nsIEventTarget* aEventTarget)
+ : mRunnable(aCallback ? NS_NewCancelableRunnableFunction(
+ "nsPipeInputStream AsyncWait Callback",
+ [stream = nsCOMPtr{aStream},
+ callback = nsCOMPtr{aCallback}]() {
+ callback->OnInputStreamReady(stream);
+ })
+ : nullptr),
+ mEventTarget(aEventTarget),
+ mFlags(aFlags) {}
+
+ CallbackHolder(nsIAsyncOutputStream* aStream,
+ nsIOutputStreamCallback* aCallback, uint32_t aFlags,
+ nsIEventTarget* aEventTarget)
+ : mRunnable(aCallback ? NS_NewCancelableRunnableFunction(
+ "nsPipeOutputStream AsyncWait Callback",
+ [stream = nsCOMPtr{aStream},
+ callback = nsCOMPtr{aCallback}]() {
+ callback->OnOutputStreamReady(stream);
+ })
+ : nullptr),
+ mEventTarget(aEventTarget),
+ mFlags(aFlags) {}
+
+ CallbackHolder(const CallbackHolder&) = delete;
+ CallbackHolder(CallbackHolder&&) = default;
+ CallbackHolder& operator=(const CallbackHolder&) = delete;
+ CallbackHolder& operator=(CallbackHolder&&) = default;
+
+ CallbackHolder& operator=(std::nullptr_t) {
+ mRunnable = nullptr;
+ mEventTarget = nullptr;
+ mFlags = 0;
+ return *this;
+ }
+
+ MOZ_IMPLICIT operator bool() const { return mRunnable; }
+
+ uint32_t Flags() const {
+ MOZ_ASSERT(mRunnable, "Should only be called when a callback is present");
+ return mFlags;
+ }
+
+ void Notify() {
+ nsCOMPtr<nsIRunnable> runnable = mRunnable.forget();
+ nsCOMPtr<nsIEventTarget> eventTarget = mEventTarget.forget();
+ if (runnable) {
+ if (eventTarget) {
+ eventTarget->Dispatch(runnable.forget());
+ } else {
+ runnable->Run();
+ }
+ }
+ }
+
+ private:
+ nsCOMPtr<nsIRunnable> mRunnable;
+ nsCOMPtr<nsIEventTarget> mEventTarget;
+ uint32_t mFlags = 0;
+};
+
+//-----------------------------------------------------------------------------
+
+// this class is used to delay notifications until the end of a particular
+// scope. it helps avoid the complexity of issuing callbacks while inside
+// a critical section.
+class nsPipeEvents {
+ public:
+ nsPipeEvents() = default;
+ ~nsPipeEvents();
+
+ inline void NotifyReady(CallbackHolder aCallback) {
+ mCallbacks.AppendElement(std::move(aCallback));
+ }
+
+ private:
+ nsTArray<CallbackHolder> mCallbacks;
+};
+
+//-----------------------------------------------------------------------------
+
+// This class is used to maintain input stream state. Its broken out from the
+// nsPipeInputStream class because generally the nsPipe should be modifying
+// this state and not the input stream itself.
+struct nsPipeReadState {
+ nsPipeReadState()
+ : mReadCursor(nullptr),
+ mReadLimit(nullptr),
+ mSegment(0),
+ mAvailable(0),
+ mActiveRead(false),
+ mNeedDrain(false) {}
+
+ // All members of this type are guarded by the pipe monitor, however it cannot
+ // be named from this type, so the less-reliable MOZ_GUARDED_VAR is used
+ // instead. In the future it would be nice to avoid this, especially as
+ // MOZ_GUARDED_VAR is deprecated.
+ char* mReadCursor MOZ_GUARDED_VAR;
+ char* mReadLimit MOZ_GUARDED_VAR;
+ int32_t mSegment MOZ_GUARDED_VAR;
+ uint32_t mAvailable MOZ_GUARDED_VAR;
+
+ // This flag is managed using the AutoReadSegment RAII stack class.
+ bool mActiveRead MOZ_GUARDED_VAR;
+
+ // Set to indicate that the input stream has closed and should be drained,
+ // but that drain has been delayed due to an active read. When the read
+ // completes, this flag indicate the drain should then be performed.
+ bool mNeedDrain MOZ_GUARDED_VAR;
+};
+
+//-----------------------------------------------------------------------------
+
+// an input end of a pipe (maintained as a list of refs within the pipe)
+class nsPipeInputStream final : public nsIAsyncInputStream,
+ public nsITellableStream,
+ public nsISearchableInputStream,
+ public nsICloneableInputStream,
+ public nsIClassInfo,
+ public nsIBufferedInputStream,
+ public nsIInputStreamPriority {
+ public:
+ NS_DECL_THREADSAFE_ISUPPORTS
+ NS_DECL_NSIINPUTSTREAM
+ NS_DECL_NSIASYNCINPUTSTREAM
+ NS_DECL_NSITELLABLESTREAM
+ NS_DECL_NSISEARCHABLEINPUTSTREAM
+ NS_DECL_NSICLONEABLEINPUTSTREAM
+ NS_DECL_NSICLASSINFO
+ NS_DECL_NSIBUFFEREDINPUTSTREAM
+ NS_DECL_NSIINPUTSTREAMPRIORITY
+
+ explicit nsPipeInputStream(nsPipe* aPipe)
+ : mPipe(aPipe),
+ mLogicalOffset(0),
+ mInputStatus(NS_OK),
+ mBlocking(true),
+ mBlocked(false),
+ mPriority(nsIRunnablePriority::PRIORITY_NORMAL) {}
+
+ nsPipeInputStream(const nsPipeInputStream& aOther)
+ : mPipe(aOther.mPipe),
+ mLogicalOffset(aOther.mLogicalOffset),
+ mInputStatus(aOther.mInputStatus),
+ mBlocking(aOther.mBlocking),
+ mBlocked(false),
+ mReadState(aOther.mReadState),
+ mPriority(nsIRunnablePriority::PRIORITY_NORMAL) {}
+
+ void SetNonBlocking(bool aNonBlocking) { mBlocking = !aNonBlocking; }
+
+ uint32_t Available() MOZ_REQUIRES(Monitor());
+
+ // synchronously wait for the pipe to become readable.
+ nsresult Wait();
+
+ // These two don't acquire the monitor themselves. Instead they
+ // expect their caller to have done so and to pass the monitor as
+ // evidence.
+ MonitorAction OnInputReadable(uint32_t aBytesWritten, nsPipeEvents&,
+ const ReentrantMonitorAutoEnter& ev)
+ MOZ_REQUIRES(Monitor());
+ MonitorAction OnInputException(nsresult, nsPipeEvents&,
+ const ReentrantMonitorAutoEnter& ev)
+ MOZ_REQUIRES(Monitor());
+
+ nsPipeReadState& ReadState() { return mReadState; }
+
+ const nsPipeReadState& ReadState() const { return mReadState; }
+
+ nsresult Status() const;
+
+ // A version of Status() that doesn't acquire the monitor.
+ nsresult Status(const ReentrantMonitorAutoEnter& ev) const
+ MOZ_REQUIRES(Monitor());
+
+ // The status of this input stream, ignoring the status of the underlying
+ // monitor. If this status is errored, the input stream has either already
+ // been removed from the pipe, or will be removed from the pipe shortly.
+ nsresult InputStatus(const ReentrantMonitorAutoEnter&) const
+ MOZ_REQUIRES(Monitor()) {
+ return mInputStatus;
+ }
+
+ ReentrantMonitor& Monitor() const;
+
+ private:
+ virtual ~nsPipeInputStream();
+
+ RefPtr<nsPipe> mPipe;
+
+ int64_t mLogicalOffset;
+ // Individual input streams can be closed without effecting the rest of the
+ // pipe. So track individual input stream status separately. |mInputStatus|
+ // is protected by |mPipe->mReentrantMonitor|.
+ nsresult mInputStatus MOZ_GUARDED_BY(Monitor());
+ bool mBlocking;
+
+ // these variables can only be accessed while inside the pipe's monitor
+ bool mBlocked MOZ_GUARDED_BY(Monitor());
+ CallbackHolder mCallback MOZ_GUARDED_BY(Monitor());
+
+ // requires pipe's monitor to access members; usually treat as an opaque token
+ // to pass to nsPipe
+ nsPipeReadState mReadState;
+ Atomic<uint32_t, Relaxed> mPriority;
+};
+
+//-----------------------------------------------------------------------------
+
+// the output end of a pipe (allocated as a member of the pipe).
+class nsPipeOutputStream : public nsIAsyncOutputStream, public nsIClassInfo {
+ public:
+ // since this class will be allocated as a member of the pipe, we do not
+ // need our own ref count. instead, we share the lifetime (the ref count)
+ // of the entire pipe. this macro is just convenience since it does not
+ // declare a mRefCount variable; however, don't let the name fool you...
+ // we are not inheriting from nsPipe ;-)
+ NS_DECL_ISUPPORTS_INHERITED
+
+ NS_DECL_NSIOUTPUTSTREAM
+ NS_DECL_NSIASYNCOUTPUTSTREAM
+ NS_DECL_NSICLASSINFO
+
+ explicit nsPipeOutputStream(nsPipe* aPipe)
+ : mPipe(aPipe),
+ mWriterRefCnt(0),
+ mLogicalOffset(0),
+ mBlocking(true),
+ mBlocked(false),
+ mWritable(true) {}
+
+ void SetNonBlocking(bool aNonBlocking) { mBlocking = !aNonBlocking; }
+ void SetWritable(bool aWritable) MOZ_REQUIRES(Monitor()) {
+ mWritable = aWritable;
+ }
+
+ // synchronously wait for the pipe to become writable.
+ nsresult Wait();
+
+ MonitorAction OnOutputWritable(nsPipeEvents&) MOZ_REQUIRES(Monitor());
+ MonitorAction OnOutputException(nsresult, nsPipeEvents&)
+ MOZ_REQUIRES(Monitor());
+
+ ReentrantMonitor& Monitor() const;
+
+ private:
+ nsPipe* mPipe;
+
+ // separate refcnt so that we know when to close the producer
+ ThreadSafeAutoRefCnt mWriterRefCnt;
+ int64_t mLogicalOffset;
+ bool mBlocking;
+
+ // these variables can only be accessed while inside the pipe's monitor
+ bool mBlocked MOZ_GUARDED_BY(Monitor());
+ bool mWritable MOZ_GUARDED_BY(Monitor());
+ CallbackHolder mCallback MOZ_GUARDED_BY(Monitor());
+};
+
+//-----------------------------------------------------------------------------
+
+class nsPipe final {
+ public:
+ friend class nsPipeInputStream;
+ friend class nsPipeOutputStream;
+ friend class AutoReadSegment;
+
+ NS_INLINE_DECL_THREADSAFE_REFCOUNTING(nsPipe)
+
+ // public constructor
+ friend void NS_NewPipe2(nsIAsyncInputStream**, nsIAsyncOutputStream**, bool,
+ bool, uint32_t, uint32_t);
+
+ private:
+ nsPipe(uint32_t aSegmentSize, uint32_t aSegmentCount);
+ ~nsPipe();
+
+ //
+ // Methods below may only be called while inside the pipe's monitor. Some
+ // of these methods require passing a ReentrantMonitorAutoEnter to prove the
+ // monitor is held.
+ //
+
+ void PeekSegment(const nsPipeReadState& aReadState, uint32_t aIndex,
+ char*& aCursor, char*& aLimit)
+ MOZ_REQUIRES(mReentrantMonitor);
+ SegmentChangeResult AdvanceReadSegment(nsPipeReadState& aReadState,
+ const ReentrantMonitorAutoEnter& ev)
+ MOZ_REQUIRES(mReentrantMonitor);
+ bool ReadSegmentBeingWritten(nsPipeReadState& aReadState)
+ MOZ_REQUIRES(mReentrantMonitor);
+ uint32_t CountSegmentReferences(int32_t aSegment)
+ MOZ_REQUIRES(mReentrantMonitor);
+ void SetAllNullReadCursors() MOZ_REQUIRES(mReentrantMonitor);
+ bool AllReadCursorsMatchWriteCursor() MOZ_REQUIRES(mReentrantMonitor);
+ void RollBackAllReadCursors(char* aWriteCursor)
+ MOZ_REQUIRES(mReentrantMonitor);
+ void UpdateAllReadCursors(char* aWriteCursor) MOZ_REQUIRES(mReentrantMonitor);
+ void ValidateAllReadCursors() MOZ_REQUIRES(mReentrantMonitor);
+ uint32_t GetBufferSegmentCount(const nsPipeReadState& aReadState,
+ const ReentrantMonitorAutoEnter& ev) const
+ MOZ_REQUIRES(mReentrantMonitor);
+ bool IsAdvanceBufferFull(const ReentrantMonitorAutoEnter& ev) const
+ MOZ_REQUIRES(mReentrantMonitor);
+
+ //
+ // methods below may be called while outside the pipe's monitor
+ //
+
+ void DrainInputStream(nsPipeReadState& aReadState, nsPipeEvents& aEvents);
+ nsresult GetWriteSegment(char*& aSegment, uint32_t& aSegmentLen);
+ void AdvanceWriteCursor(uint32_t aCount);
+
+ void OnInputStreamException(nsPipeInputStream* aStream, nsresult aReason);
+ void OnPipeException(nsresult aReason, bool aOutputOnly = false);
+
+ nsresult CloneInputStream(nsPipeInputStream* aOriginal,
+ nsIInputStream** aCloneOut);
+
+ // methods below should only be called by AutoReadSegment
+ nsresult GetReadSegment(nsPipeReadState& aReadState, const char*& aSegment,
+ uint32_t& aLength);
+ void ReleaseReadSegment(nsPipeReadState& aReadState, nsPipeEvents& aEvents);
+ void AdvanceReadCursor(nsPipeReadState& aReadState, uint32_t aCount);
+
+ // We can't inherit from both nsIInputStream and nsIOutputStream
+ // because they collide on their Close method. Consequently we nest their
+ // implementations to avoid the extra object allocation.
+ nsPipeOutputStream mOutput;
+
+ // Since the input stream can be cloned, we may have more than one. Use
+ // a weak reference as the streams will clear their entry here in their
+ // destructor. Using a strong reference would create a reference cycle.
+ // Only usable while mReentrantMonitor is locked.
+ nsTArray<nsPipeInputStream*> mInputList MOZ_GUARDED_BY(mReentrantMonitor);
+
+ ReentrantMonitor mReentrantMonitor;
+ nsSegmentedBuffer mBuffer MOZ_GUARDED_BY(mReentrantMonitor);
+
+ // The maximum number of segments to allow to be buffered in advance
+ // of the fastest reader. This is collection of segments is called
+ // the "advance buffer".
+ uint32_t mMaxAdvanceBufferSegmentCount MOZ_GUARDED_BY(mReentrantMonitor);
+
+ int32_t mWriteSegment MOZ_GUARDED_BY(mReentrantMonitor);
+ char* mWriteCursor MOZ_GUARDED_BY(mReentrantMonitor);
+ char* mWriteLimit MOZ_GUARDED_BY(mReentrantMonitor);
+
+ // |mStatus| is protected by |mReentrantMonitor|.
+ nsresult mStatus MOZ_GUARDED_BY(mReentrantMonitor);
+};
+
+//-----------------------------------------------------------------------------
+
+// Declarations of Monitor() methods on the streams.
+//
+// These must be placed early to provide MOZ_RETURN_CAPABILITY annotations for
+// the thread-safety analysis. This couldn't be done at the declaration due to
+// nsPipe not yet being defined.
+
+ReentrantMonitor& nsPipeOutputStream::Monitor() const
+ MOZ_RETURN_CAPABILITY(mPipe->mReentrantMonitor) {
+ return mPipe->mReentrantMonitor;
+}
+
+ReentrantMonitor& nsPipeInputStream::Monitor() const
+ MOZ_RETURN_CAPABILITY(mPipe->mReentrantMonitor) {
+ return mPipe->mReentrantMonitor;
+}
+
+//-----------------------------------------------------------------------------
+
+// RAII class representing an active read segment. When it goes out of scope
+// it automatically updates the read cursor and releases the read segment.
+class MOZ_STACK_CLASS AutoReadSegment final {
+ public:
+ AutoReadSegment(nsPipe* aPipe, nsPipeReadState& aReadState,
+ uint32_t aMaxLength)
+ : mPipe(aPipe),
+ mReadState(aReadState),
+ mStatus(NS_ERROR_FAILURE),
+ mSegment(nullptr),
+ mLength(0),
+ mOffset(0) {
+ MOZ_DIAGNOSTIC_ASSERT(mPipe);
+ MOZ_DIAGNOSTIC_ASSERT(!mReadState.mActiveRead);
+ mStatus = mPipe->GetReadSegment(mReadState, mSegment, mLength);
+ if (NS_SUCCEEDED(mStatus)) {
+ MOZ_DIAGNOSTIC_ASSERT(mReadState.mActiveRead);
+ MOZ_DIAGNOSTIC_ASSERT(mSegment);
+ mLength = std::min(mLength, aMaxLength);
+ MOZ_DIAGNOSTIC_ASSERT(mLength);
+ }
+ }
+
+ ~AutoReadSegment() {
+ if (NS_SUCCEEDED(mStatus)) {
+ if (mOffset) {
+ mPipe->AdvanceReadCursor(mReadState, mOffset);
+ } else {
+ nsPipeEvents events;
+ mPipe->ReleaseReadSegment(mReadState, events);
+ }
+ }
+ MOZ_DIAGNOSTIC_ASSERT(!mReadState.mActiveRead);
+ }
+
+ nsresult Status() const { return mStatus; }
+
+ const char* Data() const {
+ MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(mStatus));
+ MOZ_DIAGNOSTIC_ASSERT(mSegment);
+ return mSegment + mOffset;
+ }
+
+ uint32_t Length() const {
+ MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(mStatus));
+ MOZ_DIAGNOSTIC_ASSERT(mLength >= mOffset);
+ return mLength - mOffset;
+ }
+
+ void Advance(uint32_t aCount) {
+ MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(mStatus));
+ MOZ_DIAGNOSTIC_ASSERT(aCount <= (mLength - mOffset));
+ mOffset += aCount;
+ }
+
+ nsPipeReadState& ReadState() const { return mReadState; }
+
+ private:
+ // guaranteed to remain alive due to limited stack lifetime of AutoReadSegment
+ nsPipe* mPipe;
+ nsPipeReadState& mReadState;
+ nsresult mStatus;
+ const char* mSegment;
+ uint32_t mLength;
+ uint32_t mOffset;
+};
+
+//
+// NOTES on buffer architecture:
+//
+// +-----------------+ - - mBuffer.GetSegment(0)
+// | |
+// + - - - - - - - - + - - nsPipeReadState.mReadCursor
+// |/////////////////|
+// |/////////////////|
+// |/////////////////|
+// |/////////////////|
+// +-----------------+ - - nsPipeReadState.mReadLimit
+// |
+// +-----------------+
+// |/////////////////|
+// |/////////////////|
+// |/////////////////|
+// |/////////////////|
+// |/////////////////|
+// |/////////////////|
+// +-----------------+
+// |
+// +-----------------+ - - mBuffer.GetSegment(mWriteSegment)
+// |/////////////////|
+// |/////////////////|
+// |/////////////////|
+// + - - - - - - - - + - - mWriteCursor
+// | |
+// | |
+// +-----------------+ - - mWriteLimit
+//
+// (shaded region contains data)
+//
+// NOTE: Each input stream produced by the nsPipe contains its own, separate
+// nsPipeReadState. This means there are multiple mReadCursor and
+// mReadLimit values in play. The pipe cannot discard old data until
+// all mReadCursors have moved beyond that point in the stream.
+//
+// Likewise, each input stream reader will have it's own amount of
+// buffered data. The pipe size threshold, however, is only applied
+// to the input stream that is being read fastest. We call this
+// the "advance buffer" in that its in advance of all readers. We
+// allow slower input streams to buffer more data so that we don't
+// stall processing of the faster input stream.
+//
+// NOTE: on some systems (notably OS/2), the heap allocator uses an arena for
+// small allocations (e.g., 64 byte allocations). this means that buffers may
+// be allocated back-to-back. in the diagram above, for example, mReadLimit
+// would actually be pointing at the beginning of the next segment. when
+// making changes to this file, please keep this fact in mind.
+//
+
+//-----------------------------------------------------------------------------
+// nsPipe methods:
+//-----------------------------------------------------------------------------
+
+nsPipe::nsPipe(uint32_t aSegmentSize, uint32_t aSegmentCount)
+ : mOutput(this),
+ mReentrantMonitor("nsPipe.mReentrantMonitor"),
+ // protect against overflow
+ mMaxAdvanceBufferSegmentCount(
+ std::min(aSegmentCount, UINT32_MAX / aSegmentSize)),
+ mWriteSegment(-1),
+ mWriteCursor(nullptr),
+ mWriteLimit(nullptr),
+ mStatus(NS_OK) {
+ // The internal buffer is always "infinite" so that we can allow
+ // the size to expand when cloned streams are read at different
+ // rates. We enforce a limit on how much data can be buffered
+ // ahead of the fastest reader in GetWriteSegment().
+ MOZ_ALWAYS_SUCCEEDS(mBuffer.Init(aSegmentSize));
+}
+
+nsPipe::~nsPipe() = default;
+
+void nsPipe::PeekSegment(const nsPipeReadState& aReadState, uint32_t aIndex,
+ char*& aCursor, char*& aLimit) {
+ if (aIndex == 0) {
+ MOZ_DIAGNOSTIC_ASSERT(!aReadState.mReadCursor || mBuffer.GetSegmentCount());
+ aCursor = aReadState.mReadCursor;
+ aLimit = aReadState.mReadLimit;
+ } else {
+ uint32_t absoluteIndex = aReadState.mSegment + aIndex;
+ uint32_t numSegments = mBuffer.GetSegmentCount();
+ if (absoluteIndex >= numSegments) {
+ aCursor = aLimit = nullptr;
+ } else {
+ aCursor = mBuffer.GetSegment(absoluteIndex);
+ if (mWriteSegment == (int32_t)absoluteIndex) {
+ aLimit = mWriteCursor;
+ } else {
+ aLimit = aCursor + mBuffer.GetSegmentSize();
+ }
+ }
+ }
+}
+
+nsresult nsPipe::GetReadSegment(nsPipeReadState& aReadState,
+ const char*& aSegment, uint32_t& aLength) {
+ ReentrantMonitorAutoEnter mon(mReentrantMonitor);
+
+ if (aReadState.mReadCursor == aReadState.mReadLimit) {
+ return NS_FAILED(mStatus) ? mStatus : NS_BASE_STREAM_WOULD_BLOCK;
+ }
+
+ // The input stream locks the pipe while getting the buffer to read from,
+ // but then unlocks while actual data copying is taking place. In
+ // order to avoid deleting the buffer out from under this lockless read
+ // set a flag to indicate a read is active. This flag is only modified
+ // while the lock is held.
+ MOZ_DIAGNOSTIC_ASSERT(!aReadState.mActiveRead);
+ aReadState.mActiveRead = true;
+
+ aSegment = aReadState.mReadCursor;
+ aLength = aReadState.mReadLimit - aReadState.mReadCursor;
+ MOZ_DIAGNOSTIC_ASSERT(aLength <= aReadState.mAvailable);
+
+ return NS_OK;
+}
+
+void nsPipe::ReleaseReadSegment(nsPipeReadState& aReadState,
+ nsPipeEvents& aEvents) {
+ ReentrantMonitorAutoEnter mon(mReentrantMonitor);
+
+ MOZ_DIAGNOSTIC_ASSERT(aReadState.mActiveRead);
+ aReadState.mActiveRead = false;
+
+ // When a read completes and releases the mActiveRead flag, we may have
+ // blocked a drain from completing. This occurs when the input stream is
+ // closed during the read. In these cases, we need to complete the drain as
+ // soon as the active read completes.
+ if (aReadState.mNeedDrain) {
+ aReadState.mNeedDrain = false;
+ DrainInputStream(aReadState, aEvents);
+ }
+}
+
+void nsPipe::AdvanceReadCursor(nsPipeReadState& aReadState,
+ uint32_t aBytesRead) {
+ MOZ_DIAGNOSTIC_ASSERT(aBytesRead > 0);
+
+ nsPipeEvents events;
+ {
+ ReentrantMonitorAutoEnter mon(mReentrantMonitor);
+
+ LOG(("III advancing read cursor by %u\n", aBytesRead));
+ MOZ_DIAGNOSTIC_ASSERT(aBytesRead <= mBuffer.GetSegmentSize());
+
+ aReadState.mReadCursor += aBytesRead;
+ MOZ_DIAGNOSTIC_ASSERT(aReadState.mReadCursor <= aReadState.mReadLimit);
+
+ MOZ_DIAGNOSTIC_ASSERT(aReadState.mAvailable >= aBytesRead);
+ aReadState.mAvailable -= aBytesRead;
+
+ // Check to see if we're at the end of the available read data. If we
+ // are, and this segment is not still being written, then we can possibly
+ // free up the segment.
+ if (aReadState.mReadCursor == aReadState.mReadLimit &&
+ !ReadSegmentBeingWritten(aReadState)) {
+ // Advance the segment position. If we have read any segments from the
+ // advance buffer then we can potentially notify blocked writers.
+ mOutput.Monitor().AssertCurrentThreadIn();
+ if (AdvanceReadSegment(aReadState, mon) == SegmentAdvanceBufferRead &&
+ mOutput.OnOutputWritable(events) == NotifyMonitor) {
+ mon.NotifyAll();
+ }
+ }
+
+ ReleaseReadSegment(aReadState, events);
+ }
+}
+
+SegmentChangeResult nsPipe::AdvanceReadSegment(
+ nsPipeReadState& aReadState, const ReentrantMonitorAutoEnter& ev) {
+ // Calculate how many segments are buffered for this stream to start.
+ uint32_t startBufferSegments = GetBufferSegmentCount(aReadState, ev);
+
+ int32_t currentSegment = aReadState.mSegment;
+
+ // Move to the next segment to read
+ aReadState.mSegment += 1;
+
+ // If this was the last reference to the first segment, then remove it.
+ if (currentSegment == 0 && CountSegmentReferences(currentSegment) == 0) {
+ // shift write and read segment index (-1 indicates an empty buffer).
+ mWriteSegment -= 1;
+
+ // Directly modify the current read state. If the associated input
+ // stream is closed simultaneous with reading, then it may not be
+ // in the mInputList any more.
+ aReadState.mSegment -= 1;
+
+ for (uint32_t i = 0; i < mInputList.Length(); ++i) {
+ // Skip the current read state structure since we modify it manually
+ // before entering this loop.
+ if (&mInputList[i]->ReadState() == &aReadState) {
+ continue;
+ }
+ mInputList[i]->ReadState().mSegment -= 1;
+ }
+
+ // done with this segment
+ mBuffer.DeleteFirstSegment();
+ LOG(("III deleting first segment\n"));
+ }
+
+ if (mWriteSegment < aReadState.mSegment) {
+ // read cursor has hit the end of written data, so reset it
+ MOZ_DIAGNOSTIC_ASSERT(mWriteSegment == (aReadState.mSegment - 1));
+ aReadState.mReadCursor = nullptr;
+ aReadState.mReadLimit = nullptr;
+ // also, the buffer is completely empty, so reset the write cursor
+ if (mWriteSegment == -1) {
+ mWriteCursor = nullptr;
+ mWriteLimit = nullptr;
+ }
+ } else {
+ // advance read cursor and limit to next buffer segment
+ aReadState.mReadCursor = mBuffer.GetSegment(aReadState.mSegment);
+ if (mWriteSegment == aReadState.mSegment) {
+ aReadState.mReadLimit = mWriteCursor;
+ } else {
+ aReadState.mReadLimit = aReadState.mReadCursor + mBuffer.GetSegmentSize();
+ }
+ }
+
+ // Calculate how many segments are buffered for the stream after
+ // reading.
+ uint32_t endBufferSegments = GetBufferSegmentCount(aReadState, ev);
+
+ // If the stream has read a segment out of the set of advanced buffer
+ // segments, then the writer may advance.
+ if (startBufferSegments >= mMaxAdvanceBufferSegmentCount &&
+ endBufferSegments < mMaxAdvanceBufferSegmentCount) {
+ return SegmentAdvanceBufferRead;
+ }
+
+ // Otherwise there are no significant changes to the segment structure.
+ return SegmentNotChanged;
+}
+
+void nsPipe::DrainInputStream(nsPipeReadState& aReadState,
+ nsPipeEvents& aEvents) {
+ ReentrantMonitorAutoEnter mon(mReentrantMonitor);
+
+ // If a segment is actively being read in ReadSegments() for this input
+ // stream, then we cannot drain the stream. This can happen because
+ // ReadSegments() does not hold the lock while copying from the buffer.
+ // If we detect this condition, simply note that we need a drain once
+ // the read completes and return immediately.
+ if (aReadState.mActiveRead) {
+ MOZ_DIAGNOSTIC_ASSERT(!aReadState.mNeedDrain);
+ aReadState.mNeedDrain = true;
+ return;
+ }
+
+ while (mWriteSegment >= aReadState.mSegment) {
+ // If the last segment to free is still being written to, we're done
+ // draining. We can't free any more.
+ if (ReadSegmentBeingWritten(aReadState)) {
+ break;
+ }
+
+ // Don't bother checking if this results in an advance buffer segment
+ // read. Since we are draining the entire stream we will read an
+ // advance buffer segment no matter what.
+ AdvanceReadSegment(aReadState, mon);
+ }
+
+ // Force the stream into an empty state. Make sure mAvailable, mCursor, and
+ // mReadLimit are consistent with one another.
+ aReadState.mAvailable = 0;
+ aReadState.mReadCursor = nullptr;
+ aReadState.mReadLimit = nullptr;
+
+ // Remove the input stream from the pipe's list of streams. This will
+ // prevent the pipe from holding the stream alive or trying to update
+ // its read state any further.
+ DebugOnly<uint32_t> numRemoved = 0;
+ mInputList.RemoveElementsBy([&](nsPipeInputStream* aEntry) {
+ bool result = &aReadState == &aEntry->ReadState();
+ numRemoved += result ? 1 : 0;
+ return result;
+ });
+ MOZ_ASSERT(numRemoved == 1);
+
+ // If we have read any segments from the advance buffer then we can
+ // potentially notify blocked writers.
+ mOutput.Monitor().AssertCurrentThreadIn();
+ if (!IsAdvanceBufferFull(mon) &&
+ mOutput.OnOutputWritable(aEvents) == NotifyMonitor) {
+ mon.NotifyAll();
+ }
+}
+
+bool nsPipe::ReadSegmentBeingWritten(nsPipeReadState& aReadState) {
+ mReentrantMonitor.AssertCurrentThreadIn();
+ bool beingWritten =
+ mWriteSegment == aReadState.mSegment && mWriteLimit > mWriteCursor;
+ MOZ_DIAGNOSTIC_ASSERT(!beingWritten || aReadState.mReadLimit == mWriteCursor);
+ return beingWritten;
+}
+
+nsresult nsPipe::GetWriteSegment(char*& aSegment, uint32_t& aSegmentLen) {
+ ReentrantMonitorAutoEnter mon(mReentrantMonitor);
+
+ if (NS_FAILED(mStatus)) {
+ return mStatus;
+ }
+
+ // write cursor and limit may both be null indicating an empty buffer.
+ if (mWriteCursor == mWriteLimit) {
+ // The pipe is full if we have hit our limit on advance data buffering.
+ // This means the fastest reader is still reading slower than data is
+ // being written into the pipe.
+ if (IsAdvanceBufferFull(mon)) {
+ return NS_BASE_STREAM_WOULD_BLOCK;
+ }
+
+ // The nsSegmentedBuffer is configured to be "infinite", so this
+ // should never return nullptr here.
+ char* seg = mBuffer.AppendNewSegment();
+ if (!seg) {
+ return NS_ERROR_OUT_OF_MEMORY;
+ }
+
+ LOG(("OOO appended new segment\n"));
+ mWriteCursor = seg;
+ mWriteLimit = mWriteCursor + mBuffer.GetSegmentSize();
+ ++mWriteSegment;
+ }
+
+ // make sure read cursor is initialized
+ SetAllNullReadCursors();
+
+ // check to see if we can roll-back our read and write cursors to the
+ // beginning of the current/first segment. this is purely an optimization.
+ if (mWriteSegment == 0 && AllReadCursorsMatchWriteCursor()) {
+ char* head = mBuffer.GetSegment(0);
+ LOG(("OOO rolling back write cursor %" PRId64 " bytes\n",
+ static_cast<int64_t>(mWriteCursor - head)));
+ RollBackAllReadCursors(head);
+ mWriteCursor = head;
+ }
+
+ aSegment = mWriteCursor;
+ aSegmentLen = mWriteLimit - mWriteCursor;
+ return NS_OK;
+}
+
+void nsPipe::AdvanceWriteCursor(uint32_t aBytesWritten) {
+ MOZ_DIAGNOSTIC_ASSERT(aBytesWritten > 0);
+
+ nsPipeEvents events;
+ {
+ ReentrantMonitorAutoEnter mon(mReentrantMonitor);
+
+ LOG(("OOO advancing write cursor by %u\n", aBytesWritten));
+
+ char* newWriteCursor = mWriteCursor + aBytesWritten;
+ MOZ_DIAGNOSTIC_ASSERT(newWriteCursor <= mWriteLimit);
+
+ // update read limit if reading in the same segment
+ UpdateAllReadCursors(newWriteCursor);
+
+ mWriteCursor = newWriteCursor;
+
+ ValidateAllReadCursors();
+
+ // update the writable flag on the output stream
+ if (mWriteCursor == mWriteLimit) {
+ mOutput.Monitor().AssertCurrentThreadIn();
+ mOutput.SetWritable(!IsAdvanceBufferFull(mon));
+ }
+
+ // notify input stream that pipe now contains additional data
+ bool needNotify = false;
+ for (uint32_t i = 0; i < mInputList.Length(); ++i) {
+ mInputList[i]->Monitor().AssertCurrentThreadIn();
+ if (mInputList[i]->OnInputReadable(aBytesWritten, events, mon) ==
+ NotifyMonitor) {
+ needNotify = true;
+ }
+ }
+
+ if (needNotify) {
+ mon.NotifyAll();
+ }
+ }
+}
+
+void nsPipe::OnInputStreamException(nsPipeInputStream* aStream,
+ nsresult aReason) {
+ MOZ_DIAGNOSTIC_ASSERT(NS_FAILED(aReason));
+
+ nsPipeEvents events;
+ {
+ ReentrantMonitorAutoEnter mon(mReentrantMonitor);
+
+ // Its possible to re-enter this method when we call OnPipeException() or
+ // OnInputExection() below. If there is a caller stuck in our synchronous
+ // Wait() method, then they will get woken up with a failure code which
+ // re-enters this method. Therefore, gracefully handle unknown streams
+ // here.
+
+ // If we only have one stream open and it is the given stream, then shut
+ // down the entire pipe.
+ if (mInputList.Length() == 1) {
+ if (mInputList[0] == aStream) {
+ OnPipeException(aReason);
+ }
+ return;
+ }
+
+ // Otherwise just close the particular stream that hit an exception.
+ for (uint32_t i = 0; i < mInputList.Length(); ++i) {
+ if (mInputList[i] != aStream) {
+ continue;
+ }
+
+ mInputList[i]->Monitor().AssertCurrentThreadIn();
+ MonitorAction action =
+ mInputList[i]->OnInputException(aReason, events, mon);
+
+ // Notify after element is removed in case we re-enter as a result.
+ if (action == NotifyMonitor) {
+ mon.NotifyAll();
+ }
+
+ return;
+ }
+ }
+}
+
+void nsPipe::OnPipeException(nsresult aReason, bool aOutputOnly) {
+ LOG(("PPP nsPipe::OnPipeException [reason=%" PRIx32 " output-only=%d]\n",
+ static_cast<uint32_t>(aReason), aOutputOnly));
+
+ nsPipeEvents events;
+ {
+ ReentrantMonitorAutoEnter mon(mReentrantMonitor);
+
+ // if we've already hit an exception, then ignore this one.
+ if (NS_FAILED(mStatus)) {
+ return;
+ }
+
+ mStatus = aReason;
+
+ bool needNotify = false;
+
+ // OnInputException() can drain the stream and remove it from
+ // mInputList. So iterate over a temp list instead.
+ nsTArray<nsPipeInputStream*> list = mInputList.Clone();
+ for (uint32_t i = 0; i < list.Length(); ++i) {
+ // an output-only exception applies to the input end if the pipe has
+ // zero bytes available.
+ list[i]->Monitor().AssertCurrentThreadIn();
+ if (aOutputOnly && list[i]->Available()) {
+ continue;
+ }
+
+ if (list[i]->OnInputException(aReason, events, mon) == NotifyMonitor) {
+ needNotify = true;
+ }
+ }
+
+ mOutput.Monitor().AssertCurrentThreadIn();
+ if (mOutput.OnOutputException(aReason, events) == NotifyMonitor) {
+ needNotify = true;
+ }
+
+ // Notify after we have removed any input streams from mInputList
+ if (needNotify) {
+ mon.NotifyAll();
+ }
+ }
+}
+
+nsresult nsPipe::CloneInputStream(nsPipeInputStream* aOriginal,
+ nsIInputStream** aCloneOut) {
+ ReentrantMonitorAutoEnter mon(mReentrantMonitor);
+ RefPtr<nsPipeInputStream> ref = new nsPipeInputStream(*aOriginal);
+ // don't add clones of closed pipes to mInputList.
+ ref->Monitor().AssertCurrentThreadIn();
+ if (NS_SUCCEEDED(ref->InputStatus(mon))) {
+ mInputList.AppendElement(ref);
+ }
+ nsCOMPtr<nsIAsyncInputStream> upcast = std::move(ref);
+ upcast.forget(aCloneOut);
+ return NS_OK;
+}
+
+uint32_t nsPipe::CountSegmentReferences(int32_t aSegment) {
+ mReentrantMonitor.AssertCurrentThreadIn();
+ uint32_t count = 0;
+ for (uint32_t i = 0; i < mInputList.Length(); ++i) {
+ if (aSegment >= mInputList[i]->ReadState().mSegment) {
+ count += 1;
+ }
+ }
+ return count;
+}
+
+void nsPipe::SetAllNullReadCursors() {
+ mReentrantMonitor.AssertCurrentThreadIn();
+ for (uint32_t i = 0; i < mInputList.Length(); ++i) {
+ nsPipeReadState& readState = mInputList[i]->ReadState();
+ if (!readState.mReadCursor) {
+ MOZ_DIAGNOSTIC_ASSERT(mWriteSegment == readState.mSegment);
+ readState.mReadCursor = readState.mReadLimit = mWriteCursor;
+ }
+ }
+}
+
+bool nsPipe::AllReadCursorsMatchWriteCursor() {
+ mReentrantMonitor.AssertCurrentThreadIn();
+ for (uint32_t i = 0; i < mInputList.Length(); ++i) {
+ const nsPipeReadState& readState = mInputList[i]->ReadState();
+ if (readState.mSegment != mWriteSegment ||
+ readState.mReadCursor != mWriteCursor) {
+ return false;
+ }
+ }
+ return true;
+}
+
+void nsPipe::RollBackAllReadCursors(char* aWriteCursor) {
+ mReentrantMonitor.AssertCurrentThreadIn();
+ for (uint32_t i = 0; i < mInputList.Length(); ++i) {
+ nsPipeReadState& readState = mInputList[i]->ReadState();
+ MOZ_DIAGNOSTIC_ASSERT(mWriteSegment == readState.mSegment);
+ MOZ_DIAGNOSTIC_ASSERT(mWriteCursor == readState.mReadCursor);
+ MOZ_DIAGNOSTIC_ASSERT(mWriteCursor == readState.mReadLimit);
+ readState.mReadCursor = aWriteCursor;
+ readState.mReadLimit = aWriteCursor;
+ }
+}
+
+void nsPipe::UpdateAllReadCursors(char* aWriteCursor) {
+ mReentrantMonitor.AssertCurrentThreadIn();
+ for (uint32_t i = 0; i < mInputList.Length(); ++i) {
+ nsPipeReadState& readState = mInputList[i]->ReadState();
+ if (mWriteSegment == readState.mSegment &&
+ readState.mReadLimit == mWriteCursor) {
+ readState.mReadLimit = aWriteCursor;
+ }
+ }
+}
+
+void nsPipe::ValidateAllReadCursors() {
+ mReentrantMonitor.AssertCurrentThreadIn();
+ // The only way mReadCursor == mWriteCursor is if:
+ //
+ // - mReadCursor is at the start of a segment (which, based on how
+ // nsSegmentedBuffer works, means that this segment is the "first"
+ // segment)
+ // - mWriteCursor points at the location past the end of the current
+ // write segment (so the current write filled the current write
+ // segment, so we've incremented mWriteCursor to point past the end
+ // of it)
+ // - the segment to which data has just been written is located
+ // exactly one segment's worth of bytes before the first segment
+ // where mReadCursor is located
+ //
+ // Consequently, the byte immediately after the end of the current
+ // write segment is the first byte of the first segment, so
+ // mReadCursor == mWriteCursor. (Another way to think about this is
+ // to consider the buffer architecture diagram above, but consider it
+ // with an arena allocator which allocates from the *end* of the
+ // arena to the *beginning* of the arena.)
+#ifdef DEBUG
+ for (uint32_t i = 0; i < mInputList.Length(); ++i) {
+ const nsPipeReadState& state = mInputList[i]->ReadState();
+ MOZ_ASSERT(state.mReadCursor != mWriteCursor ||
+ (mBuffer.GetSegment(state.mSegment) == state.mReadCursor &&
+ mWriteCursor == mWriteLimit));
+ }
+#endif
+}
+
+uint32_t nsPipe::GetBufferSegmentCount(
+ const nsPipeReadState& aReadState,
+ const ReentrantMonitorAutoEnter& ev) const {
+ // The write segment can be smaller than the current reader position
+ // in some cases. For example, when the first write segment has not
+ // been allocated yet mWriteSegment is negative. In these cases
+ // the stream is effectively using zero segments.
+ if (mWriteSegment < aReadState.mSegment) {
+ return 0;
+ }
+
+ MOZ_DIAGNOSTIC_ASSERT(mWriteSegment >= 0);
+ MOZ_DIAGNOSTIC_ASSERT(aReadState.mSegment >= 0);
+
+ // Otherwise at least one segment is being used. We add one here
+ // since a single segment is being used when the write and read
+ // segment indices are the same.
+ return 1 + mWriteSegment - aReadState.mSegment;
+}
+
+bool nsPipe::IsAdvanceBufferFull(const ReentrantMonitorAutoEnter& ev) const {
+ // If we have fewer total segments than the limit we can immediately
+ // determine we are not full. Note, we must add one to mWriteSegment
+ // to convert from a index to a count.
+ MOZ_DIAGNOSTIC_ASSERT(mWriteSegment >= -1);
+ MOZ_DIAGNOSTIC_ASSERT(mWriteSegment < INT32_MAX);
+ uint32_t totalWriteSegments = mWriteSegment + 1;
+ if (totalWriteSegments < mMaxAdvanceBufferSegmentCount) {
+ return false;
+ }
+
+ // Otherwise we must inspect all of our reader streams. We need
+ // to determine the buffer depth of the fastest reader.
+ uint32_t minBufferSegments = UINT32_MAX;
+ for (uint32_t i = 0; i < mInputList.Length(); ++i) {
+ // Only count buffer segments from input streams that are open.
+ mInputList[i]->Monitor().AssertCurrentThreadIn();
+ if (NS_FAILED(mInputList[i]->Status(ev))) {
+ continue;
+ }
+ const nsPipeReadState& state = mInputList[i]->ReadState();
+ uint32_t bufferSegments = GetBufferSegmentCount(state, ev);
+ minBufferSegments = std::min(minBufferSegments, bufferSegments);
+ // We only care if any reader has fewer segments buffered than
+ // our threshold. We can stop once we hit that threshold.
+ if (minBufferSegments < mMaxAdvanceBufferSegmentCount) {
+ return false;
+ }
+ }
+
+ // Note, its possible for minBufferSegments to exceed our
+ // mMaxAdvanceBufferSegmentCount here. This happens when a cloned
+ // reader gets far behind, but then the fastest reader stream is
+ // closed. This leaves us with a single stream that is buffered
+ // beyond our max. Naturally we continue to indicate the pipe
+ // is full at this point.
+
+ return true;
+}
+
+//-----------------------------------------------------------------------------
+// nsPipeEvents methods:
+//-----------------------------------------------------------------------------
+
+nsPipeEvents::~nsPipeEvents() {
+ // dispatch any pending events
+ for (auto& callback : mCallbacks) {
+ callback.Notify();
+ }
+ mCallbacks.Clear();
+}
+
+//-----------------------------------------------------------------------------
+// nsPipeInputStream methods:
+//-----------------------------------------------------------------------------
+
+NS_IMPL_ADDREF(nsPipeInputStream);
+NS_IMPL_RELEASE(nsPipeInputStream);
+
+NS_INTERFACE_TABLE_HEAD(nsPipeInputStream)
+ NS_INTERFACE_TABLE_BEGIN
+ NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsIAsyncInputStream)
+ NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsITellableStream)
+ NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsISearchableInputStream)
+ NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsICloneableInputStream)
+ NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsIBufferedInputStream)
+ NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsIClassInfo)
+ NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsIInputStreamPriority)
+ NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsPipeInputStream, nsIInputStream,
+ nsIAsyncInputStream)
+ NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsPipeInputStream, nsISupports,
+ nsIAsyncInputStream)
+ NS_INTERFACE_TABLE_END
+NS_INTERFACE_TABLE_TAIL
+
+NS_IMPL_CI_INTERFACE_GETTER(nsPipeInputStream, nsIInputStream,
+ nsIAsyncInputStream, nsITellableStream,
+ nsISearchableInputStream, nsICloneableInputStream,
+ nsIBufferedInputStream)
+
+NS_IMPL_THREADSAFE_CI(nsPipeInputStream)
+
+NS_IMETHODIMP
+nsPipeInputStream::Init(nsIInputStream*, uint32_t) {
+ MOZ_CRASH(
+ "nsPipeInputStream should never be initialized with "
+ "nsIBufferedInputStream::Init!\n");
+}
+
+NS_IMETHODIMP
+nsPipeInputStream::GetData(nsIInputStream** aResult) {
+ // as this was not created with init() we are not
+ // wrapping anything
+ return NS_ERROR_NOT_IMPLEMENTED;
+}
+
+uint32_t nsPipeInputStream::Available() {
+ mPipe->mReentrantMonitor.AssertCurrentThreadIn();
+ return mReadState.mAvailable;
+}
+
+nsresult nsPipeInputStream::Wait() {
+ MOZ_DIAGNOSTIC_ASSERT(mBlocking);
+
+ ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
+
+ while (NS_SUCCEEDED(Status(mon)) && (mReadState.mAvailable == 0)) {
+ LOG(("III pipe input: waiting for data\n"));
+
+ mBlocked = true;
+ mon.Wait();
+ mBlocked = false;
+
+ LOG(("III pipe input: woke up [status=%" PRIx32 " available=%u]\n",
+ static_cast<uint32_t>(Status(mon)), mReadState.mAvailable));
+ }
+
+ return Status(mon) == NS_BASE_STREAM_CLOSED ? NS_OK : Status(mon);
+}
+
+MonitorAction nsPipeInputStream::OnInputReadable(
+ uint32_t aBytesWritten, nsPipeEvents& aEvents,
+ const ReentrantMonitorAutoEnter& ev) {
+ MonitorAction result = DoNotNotifyMonitor;
+
+ mPipe->mReentrantMonitor.AssertCurrentThreadIn();
+ mReadState.mAvailable += aBytesWritten;
+
+ if (mCallback && !(mCallback.Flags() & WAIT_CLOSURE_ONLY)) {
+ aEvents.NotifyReady(std::move(mCallback));
+ } else if (mBlocked) {
+ result = NotifyMonitor;
+ }
+
+ return result;
+}
+
+MonitorAction nsPipeInputStream::OnInputException(
+ nsresult aReason, nsPipeEvents& aEvents,
+ const ReentrantMonitorAutoEnter& ev) {
+ LOG(("nsPipeInputStream::OnInputException [this=%p reason=%" PRIx32 "]\n",
+ this, static_cast<uint32_t>(aReason)));
+
+ MonitorAction result = DoNotNotifyMonitor;
+
+ MOZ_DIAGNOSTIC_ASSERT(NS_FAILED(aReason));
+
+ if (NS_SUCCEEDED(mInputStatus)) {
+ mInputStatus = aReason;
+ }
+
+ // force count of available bytes to zero.
+ mPipe->DrainInputStream(mReadState, aEvents);
+
+ if (mCallback) {
+ aEvents.NotifyReady(std::move(mCallback));
+ } else if (mBlocked) {
+ result = NotifyMonitor;
+ }
+
+ return result;
+}
+
+NS_IMETHODIMP
+nsPipeInputStream::CloseWithStatus(nsresult aReason) {
+ LOG(("III CloseWithStatus [this=%p reason=%" PRIx32 "]\n", this,
+ static_cast<uint32_t>(aReason)));
+
+ ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
+
+ if (NS_FAILED(mInputStatus)) {
+ return NS_OK;
+ }
+
+ if (NS_SUCCEEDED(aReason)) {
+ aReason = NS_BASE_STREAM_CLOSED;
+ }
+
+ mPipe->OnInputStreamException(this, aReason);
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+nsPipeInputStream::SetPriority(uint32_t priority) {
+ mPriority = priority;
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+nsPipeInputStream::GetPriority(uint32_t* priority) {
+ *priority = mPriority;
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+nsPipeInputStream::Close() { return CloseWithStatus(NS_BASE_STREAM_CLOSED); }
+
+NS_IMETHODIMP
+nsPipeInputStream::Available(uint64_t* aResult) {
+ // nsPipeInputStream supports under 4GB stream only
+ ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
+
+ // return error if closed
+ if (!mReadState.mAvailable && NS_FAILED(Status(mon))) {
+ return Status(mon);
+ }
+
+ *aResult = (uint64_t)mReadState.mAvailable;
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+nsPipeInputStream::StreamStatus() {
+ ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
+ return mReadState.mAvailable ? NS_OK : Status(mon);
+}
+
+NS_IMETHODIMP
+nsPipeInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
+ uint32_t aCount, uint32_t* aReadCount) {
+ LOG(("III ReadSegments [this=%p count=%u]\n", this, aCount));
+
+ nsresult rv = NS_OK;
+
+ *aReadCount = 0;
+ while (aCount) {
+ AutoReadSegment segment(mPipe, mReadState, aCount);
+ rv = segment.Status();
+ if (NS_FAILED(rv)) {
+ // ignore this error if we've already read something.
+ if (*aReadCount > 0) {
+ rv = NS_OK;
+ break;
+ }
+ if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
+ // pipe is empty
+ if (!mBlocking) {
+ break;
+ }
+ // wait for some data to be written to the pipe
+ rv = Wait();
+ if (NS_SUCCEEDED(rv)) {
+ continue;
+ }
+ }
+ // ignore this error, just return.
+ if (rv == NS_BASE_STREAM_CLOSED) {
+ rv = NS_OK;
+ break;
+ }
+ mPipe->OnInputStreamException(this, rv);
+ break;
+ }
+
+ uint32_t writeCount;
+ while (segment.Length()) {
+ writeCount = 0;
+
+ rv = aWriter(static_cast<nsIAsyncInputStream*>(this), aClosure,
+ segment.Data(), *aReadCount, segment.Length(), &writeCount);
+
+ if (NS_FAILED(rv) || writeCount == 0) {
+ aCount = 0;
+ // any errors returned from the writer end here: do not
+ // propagate to the caller of ReadSegments.
+ rv = NS_OK;
+ break;
+ }
+
+ MOZ_DIAGNOSTIC_ASSERT(writeCount <= segment.Length());
+ segment.Advance(writeCount);
+ aCount -= writeCount;
+ *aReadCount += writeCount;
+ mLogicalOffset += writeCount;
+ }
+ }
+
+ return rv;
+}
+
+NS_IMETHODIMP
+nsPipeInputStream::Read(char* aToBuf, uint32_t aBufLen, uint32_t* aReadCount) {
+ return ReadSegments(NS_CopySegmentToBuffer, aToBuf, aBufLen, aReadCount);
+}
+
+NS_IMETHODIMP
+nsPipeInputStream::IsNonBlocking(bool* aNonBlocking) {
+ *aNonBlocking = !mBlocking;
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+nsPipeInputStream::AsyncWait(nsIInputStreamCallback* aCallback, uint32_t aFlags,
+ uint32_t aRequestedCount,
+ nsIEventTarget* aTarget) {
+ LOG(("III AsyncWait [this=%p]\n", this));
+
+ nsPipeEvents pipeEvents;
+ {
+ ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
+
+ // replace a pending callback
+ mCallback = nullptr;
+
+ if (!aCallback) {
+ return NS_OK;
+ }
+
+ CallbackHolder callback(this, aCallback, aFlags, aTarget);
+
+ if (NS_FAILED(Status(mon)) ||
+ (mReadState.mAvailable && !(aFlags & WAIT_CLOSURE_ONLY))) {
+ // stream is already closed or readable; post event.
+ pipeEvents.NotifyReady(std::move(callback));
+ } else {
+ // queue up callback object to be notified when data becomes available
+ mCallback = std::move(callback);
+ }
+ }
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+nsPipeInputStream::Tell(int64_t* aOffset) {
+ ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
+
+ // return error if closed
+ if (!mReadState.mAvailable && NS_FAILED(Status(mon))) {
+ return Status(mon);
+ }
+
+ *aOffset = mLogicalOffset;
+ return NS_OK;
+}
+
+static bool strings_equal(bool aIgnoreCase, const char* aS1, const char* aS2,
+ uint32_t aLen) {
+ return aIgnoreCase ? !nsCRT::strncasecmp(aS1, aS2, aLen)
+ : !strncmp(aS1, aS2, aLen);
+}
+
+NS_IMETHODIMP
+nsPipeInputStream::Search(const char* aForString, bool aIgnoreCase,
+ bool* aFound, uint32_t* aOffsetSearchedTo) {
+ LOG(("III Search [for=%s ic=%u]\n", aForString, aIgnoreCase));
+
+ ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
+
+ char* cursor1;
+ char* limit1;
+ uint32_t index = 0, offset = 0;
+ uint32_t strLen = strlen(aForString);
+
+ mPipe->PeekSegment(mReadState, 0, cursor1, limit1);
+ if (cursor1 == limit1) {
+ *aFound = false;
+ *aOffsetSearchedTo = 0;
+ LOG((" result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo));
+ return NS_OK;
+ }
+
+ while (true) {
+ uint32_t i, len1 = limit1 - cursor1;
+
+ // check if the string is in the buffer segment
+ for (i = 0; i < len1 - strLen + 1; i++) {
+ if (strings_equal(aIgnoreCase, &cursor1[i], aForString, strLen)) {
+ *aFound = true;
+ *aOffsetSearchedTo = offset + i;
+ LOG((" result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo));
+ return NS_OK;
+ }
+ }
+
+ // get the next segment
+ char* cursor2;
+ char* limit2;
+ uint32_t len2;
+
+ index++;
+ offset += len1;
+
+ mPipe->PeekSegment(mReadState, index, cursor2, limit2);
+ if (cursor2 == limit2) {
+ *aFound = false;
+ *aOffsetSearchedTo = offset - strLen + 1;
+ LOG((" result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo));
+ return NS_OK;
+ }
+ len2 = limit2 - cursor2;
+
+ // check if the string is straddling the next buffer segment
+ uint32_t lim = XPCOM_MIN(strLen, len2 + 1);
+ for (i = 0; i < lim; ++i) {
+ uint32_t strPart1Len = strLen - i - 1;
+ uint32_t strPart2Len = strLen - strPart1Len;
+ const char* strPart2 = &aForString[strLen - strPart2Len];
+ uint32_t bufSeg1Offset = len1 - strPart1Len;
+ if (strings_equal(aIgnoreCase, &cursor1[bufSeg1Offset], aForString,
+ strPart1Len) &&
+ strings_equal(aIgnoreCase, cursor2, strPart2, strPart2Len)) {
+ *aFound = true;
+ *aOffsetSearchedTo = offset - strPart1Len;
+ LOG((" result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo));
+ return NS_OK;
+ }
+ }
+
+ // finally continue with the next buffer
+ cursor1 = cursor2;
+ limit1 = limit2;
+ }
+
+ MOZ_ASSERT_UNREACHABLE("can't get here");
+ return NS_ERROR_UNEXPECTED; // keep compiler happy
+}
+
+NS_IMETHODIMP
+nsPipeInputStream::GetCloneable(bool* aCloneableOut) {
+ *aCloneableOut = true;
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+nsPipeInputStream::Clone(nsIInputStream** aCloneOut) {
+ return mPipe->CloneInputStream(this, aCloneOut);
+}
+
+nsresult nsPipeInputStream::Status(const ReentrantMonitorAutoEnter& ev) const {
+ if (NS_FAILED(mInputStatus)) {
+ return mInputStatus;
+ }
+
+ if (mReadState.mAvailable) {
+ // Still something to read and this input stream state is OK.
+ return NS_OK;
+ }
+
+ // Nothing to read, just fall through to the pipe's state that
+ // may reflect state of its output stream side (already closed).
+ return mPipe->mStatus;
+}
+
+nsresult nsPipeInputStream::Status() const {
+ ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
+ return Status(mon);
+}
+
+nsPipeInputStream::~nsPipeInputStream() { Close(); }
+
+//-----------------------------------------------------------------------------
+// nsPipeOutputStream methods:
+//-----------------------------------------------------------------------------
+
+NS_IMPL_QUERY_INTERFACE(nsPipeOutputStream, nsIOutputStream,
+ nsIAsyncOutputStream, nsIClassInfo)
+
+NS_IMPL_CI_INTERFACE_GETTER(nsPipeOutputStream, nsIOutputStream,
+ nsIAsyncOutputStream)
+
+NS_IMPL_THREADSAFE_CI(nsPipeOutputStream)
+
+nsresult nsPipeOutputStream::Wait() {
+ MOZ_DIAGNOSTIC_ASSERT(mBlocking);
+
+ ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
+
+ if (NS_SUCCEEDED(mPipe->mStatus) && !mWritable) {
+ LOG(("OOO pipe output: waiting for space\n"));
+ mBlocked = true;
+ mon.Wait();
+ mBlocked = false;
+ LOG(("OOO pipe output: woke up [pipe-status=%" PRIx32 " writable=%u]\n",
+ static_cast<uint32_t>(mPipe->mStatus), mWritable));
+ }
+
+ return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus;
+}
+
+MonitorAction nsPipeOutputStream::OnOutputWritable(nsPipeEvents& aEvents) {
+ MonitorAction result = DoNotNotifyMonitor;
+
+ mWritable = true;
+
+ if (mCallback && !(mCallback.Flags() & WAIT_CLOSURE_ONLY)) {
+ aEvents.NotifyReady(std::move(mCallback));
+ } else if (mBlocked) {
+ result = NotifyMonitor;
+ }
+
+ return result;
+}
+
+MonitorAction nsPipeOutputStream::OnOutputException(nsresult aReason,
+ nsPipeEvents& aEvents) {
+ LOG(("nsPipeOutputStream::OnOutputException [this=%p reason=%" PRIx32 "]\n",
+ this, static_cast<uint32_t>(aReason)));
+
+ MonitorAction result = DoNotNotifyMonitor;
+
+ MOZ_DIAGNOSTIC_ASSERT(NS_FAILED(aReason));
+ mWritable = false;
+
+ if (mCallback) {
+ aEvents.NotifyReady(std::move(mCallback));
+ } else if (mBlocked) {
+ result = NotifyMonitor;
+ }
+
+ return result;
+}
+
+NS_IMETHODIMP_(MozExternalRefCountType)
+nsPipeOutputStream::AddRef() {
+ ++mWriterRefCnt;
+ return mPipe->AddRef();
+}
+
+NS_IMETHODIMP_(MozExternalRefCountType)
+nsPipeOutputStream::Release() {
+ if (--mWriterRefCnt == 0) {
+ Close();
+ }
+ return mPipe->Release();
+}
+
+NS_IMETHODIMP
+nsPipeOutputStream::CloseWithStatus(nsresult aReason) {
+ LOG(("OOO CloseWithStatus [this=%p reason=%" PRIx32 "]\n", this,
+ static_cast<uint32_t>(aReason)));
+
+ if (NS_SUCCEEDED(aReason)) {
+ aReason = NS_BASE_STREAM_CLOSED;
+ }
+
+ // input stream may remain open
+ mPipe->OnPipeException(aReason, true);
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+nsPipeOutputStream::Close() { return CloseWithStatus(NS_BASE_STREAM_CLOSED); }
+
+NS_IMETHODIMP
+nsPipeOutputStream::WriteSegments(nsReadSegmentFun aReader, void* aClosure,
+ uint32_t aCount, uint32_t* aWriteCount) {
+ LOG(("OOO WriteSegments [this=%p count=%u]\n", this, aCount));
+
+ nsresult rv = NS_OK;
+
+ char* segment;
+ uint32_t segmentLen;
+
+ *aWriteCount = 0;
+ while (aCount) {
+ rv = mPipe->GetWriteSegment(segment, segmentLen);
+ if (NS_FAILED(rv)) {
+ if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
+ // pipe is full
+ if (!mBlocking) {
+ // ignore this error if we've already written something
+ if (*aWriteCount > 0) {
+ rv = NS_OK;
+ }
+ break;
+ }
+ // wait for the pipe to have an empty segment.
+ rv = Wait();
+ if (NS_SUCCEEDED(rv)) {
+ continue;
+ }
+ }
+ mPipe->OnPipeException(rv);
+ break;
+ }
+
+ // write no more than aCount
+ if (segmentLen > aCount) {
+ segmentLen = aCount;
+ }
+
+ uint32_t readCount, originalLen = segmentLen;
+ while (segmentLen) {
+ readCount = 0;
+
+ rv = aReader(this, aClosure, segment, *aWriteCount, segmentLen,
+ &readCount);
+
+ if (NS_FAILED(rv) || readCount == 0) {
+ aCount = 0;
+ // any errors returned from the aReader end here: do not
+ // propagate to the caller of WriteSegments.
+ rv = NS_OK;
+ break;
+ }
+
+ MOZ_DIAGNOSTIC_ASSERT(readCount <= segmentLen);
+ segment += readCount;
+ segmentLen -= readCount;
+ aCount -= readCount;
+ *aWriteCount += readCount;
+ mLogicalOffset += readCount;
+ }
+
+ if (segmentLen < originalLen) {
+ mPipe->AdvanceWriteCursor(originalLen - segmentLen);
+ }
+ }
+
+ return rv;
+}
+
+NS_IMETHODIMP
+nsPipeOutputStream::Write(const char* aFromBuf, uint32_t aBufLen,
+ uint32_t* aWriteCount) {
+ return WriteSegments(NS_CopyBufferToSegment, (void*)aFromBuf, aBufLen,
+ aWriteCount);
+}
+
+NS_IMETHODIMP
+nsPipeOutputStream::Flush() {
+ // nothing to do
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+nsPipeOutputStream::StreamStatus() {
+ ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
+ return mPipe->mStatus;
+}
+
+NS_IMETHODIMP
+nsPipeOutputStream::WriteFrom(nsIInputStream* aFromStream, uint32_t aCount,
+ uint32_t* aWriteCount) {
+ return WriteSegments(NS_CopyStreamToSegment, aFromStream, aCount,
+ aWriteCount);
+}
+
+NS_IMETHODIMP
+nsPipeOutputStream::IsNonBlocking(bool* aNonBlocking) {
+ *aNonBlocking = !mBlocking;
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+nsPipeOutputStream::AsyncWait(nsIOutputStreamCallback* aCallback,
+ uint32_t aFlags, uint32_t aRequestedCount,
+ nsIEventTarget* aTarget) {
+ LOG(("OOO AsyncWait [this=%p]\n", this));
+
+ nsPipeEvents pipeEvents;
+ {
+ ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
+
+ // replace a pending callback
+ mCallback = nullptr;
+
+ if (!aCallback) {
+ return NS_OK;
+ }
+
+ CallbackHolder callback(this, aCallback, aFlags, aTarget);
+
+ if (NS_FAILED(mPipe->mStatus) ||
+ (mWritable && !(aFlags & WAIT_CLOSURE_ONLY))) {
+ // stream is already closed or writable; post event.
+ pipeEvents.NotifyReady(std::move(callback));
+ } else {
+ // queue up callback object to be notified when data becomes available
+ mCallback = std::move(callback);
+ }
+ }
+ return NS_OK;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void NS_NewPipe(nsIInputStream** aPipeIn, nsIOutputStream** aPipeOut,
+ uint32_t aSegmentSize, uint32_t aMaxSize,
+ bool aNonBlockingInput, bool aNonBlockingOutput) {
+ if (aSegmentSize == 0) {
+ aSegmentSize = DEFAULT_SEGMENT_SIZE;
+ }
+
+ // Handle aMaxSize of UINT32_MAX as a special case
+ uint32_t segmentCount;
+ if (aMaxSize == UINT32_MAX) {
+ segmentCount = UINT32_MAX;
+ } else {
+ segmentCount = aMaxSize / aSegmentSize;
+ }
+
+ nsIAsyncInputStream* in;
+ nsIAsyncOutputStream* out;
+ NS_NewPipe2(&in, &out, aNonBlockingInput, aNonBlockingOutput, aSegmentSize,
+ segmentCount);
+
+ *aPipeIn = in;
+ *aPipeOut = out;
+}
+
+// Disable thread safety analysis as this is logically a constructor, and no
+// additional threads can observe these objects yet.
+void NS_NewPipe2(nsIAsyncInputStream** aPipeIn, nsIAsyncOutputStream** aPipeOut,
+ bool aNonBlockingInput, bool aNonBlockingOutput,
+ uint32_t aSegmentSize,
+ uint32_t aSegmentCount) MOZ_NO_THREAD_SAFETY_ANALYSIS {
+ RefPtr<nsPipe> pipe =
+ new nsPipe(aSegmentSize ? aSegmentSize : DEFAULT_SEGMENT_SIZE,
+ aSegmentCount ? aSegmentCount : DEFAULT_SEGMENT_COUNT);
+
+ RefPtr<nsPipeInputStream> pipeIn = new nsPipeInputStream(pipe);
+ pipe->mInputList.AppendElement(pipeIn);
+ RefPtr<nsPipeOutputStream> pipeOut = &pipe->mOutput;
+
+ pipeIn->SetNonBlocking(aNonBlockingInput);
+ pipeOut->SetNonBlocking(aNonBlockingOutput);
+
+ pipeIn.forget(aPipeIn);
+ pipeOut.forget(aPipeOut);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+// Thin nsIPipe implementation for consumers of the component manager interface
+// for creating pipes. Acts as a thin wrapper around NS_NewPipe2 for JS callers.
+class nsPipeHolder final : public nsIPipe {
+ public:
+ NS_DECL_THREADSAFE_ISUPPORTS
+ NS_DECL_NSIPIPE
+
+ private:
+ ~nsPipeHolder() = default;
+
+ nsCOMPtr<nsIAsyncInputStream> mInput;
+ nsCOMPtr<nsIAsyncOutputStream> mOutput;
+};
+
+NS_IMPL_ISUPPORTS(nsPipeHolder, nsIPipe)
+
+NS_IMETHODIMP
+nsPipeHolder::Init(bool aNonBlockingInput, bool aNonBlockingOutput,
+ uint32_t aSegmentSize, uint32_t aSegmentCount) {
+ if (mInput || mOutput) {
+ return NS_ERROR_ALREADY_INITIALIZED;
+ }
+ NS_NewPipe2(getter_AddRefs(mInput), getter_AddRefs(mOutput),
+ aNonBlockingInput, aNonBlockingOutput, aSegmentSize,
+ aSegmentCount);
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+nsPipeHolder::GetInputStream(nsIAsyncInputStream** aInputStream) {
+ if (mInput) {
+ *aInputStream = do_AddRef(mInput).take();
+ return NS_OK;
+ }
+ return NS_ERROR_NOT_INITIALIZED;
+}
+
+NS_IMETHODIMP
+nsPipeHolder::GetOutputStream(nsIAsyncOutputStream** aOutputStream) {
+ if (mOutput) {
+ *aOutputStream = do_AddRef(mOutput).take();
+ return NS_OK;
+ }
+ return NS_ERROR_NOT_INITIALIZED;
+}
+
+nsresult nsPipeConstructor(REFNSIID aIID, void** aResult) {
+ RefPtr<nsPipeHolder> pipe = new nsPipeHolder();
+ nsresult rv = pipe->QueryInterface(aIID, aResult);
+ return rv;
+}
+
+////////////////////////////////////////////////////////////////////////////////