summaryrefslogtreecommitdiffstats
path: root/xpcom/io/nsStreamUtils.cpp
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--xpcom/io/nsStreamUtils.cpp976
1 files changed, 976 insertions, 0 deletions
diff --git a/xpcom/io/nsStreamUtils.cpp b/xpcom/io/nsStreamUtils.cpp
new file mode 100644
index 0000000000..94669b71dc
--- /dev/null
+++ b/xpcom/io/nsStreamUtils.cpp
@@ -0,0 +1,976 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: set ts=8 sts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#include "mozilla/Mutex.h"
+#include "mozilla/Attributes.h"
+#include "mozilla/InputStreamLengthWrapper.h"
+#include "nsIInputStreamLength.h"
+#include "nsStreamUtils.h"
+#include "nsCOMPtr.h"
+#include "nsICloneableInputStream.h"
+#include "nsIEventTarget.h"
+#include "nsICancelableRunnable.h"
+#include "nsISafeOutputStream.h"
+#include "nsString.h"
+#include "nsIAsyncInputStream.h"
+#include "nsIAsyncOutputStream.h"
+#include "nsIBufferedStreams.h"
+#include "nsIPipe.h"
+#include "nsNetCID.h"
+#include "nsServiceManagerUtils.h"
+#include "nsThreadUtils.h"
+#include "nsITransport.h"
+#include "nsIStreamTransportService.h"
+#include "NonBlockingAsyncInputStream.h"
+
+using namespace mozilla;
+
+static NS_DEFINE_CID(kStreamTransportServiceCID, NS_STREAMTRANSPORTSERVICE_CID);
+
+//-----------------------------------------------------------------------------
+
+// This is a nsICancelableRunnable because we can dispatch it to Workers and
+// those can be shut down at any time, and in these cases, Cancel() is called
+// instead of Run().
+class nsInputStreamReadyEvent final : public CancelableRunnable,
+ public nsIInputStreamCallback,
+ public nsIRunnablePriority {
+ public:
+ NS_DECL_ISUPPORTS_INHERITED
+
+ nsInputStreamReadyEvent(const char* aName, nsIInputStreamCallback* aCallback,
+ nsIEventTarget* aTarget, uint32_t aPriority)
+ : CancelableRunnable(aName),
+ mCallback(aCallback),
+ mTarget(aTarget),
+ mPriority(aPriority) {}
+
+ private:
+ ~nsInputStreamReadyEvent() {
+ if (!mCallback) {
+ return;
+ }
+ //
+ // whoa!! looks like we never posted this event. take care to
+ // release mCallback on the correct thread. if mTarget lives on the
+ // calling thread, then we are ok. otherwise, we have to try to
+ // proxy the Release over the right thread. if that thread is dead,
+ // then there's nothing we can do... better to leak than crash.
+ //
+ bool val;
+ nsresult rv = mTarget->IsOnCurrentThread(&val);
+ if (NS_FAILED(rv) || !val) {
+ nsCOMPtr<nsIInputStreamCallback> event = NS_NewInputStreamReadyEvent(
+ "~nsInputStreamReadyEvent", mCallback, mTarget, mPriority);
+ mCallback = nullptr;
+ if (event) {
+ rv = event->OnInputStreamReady(nullptr);
+ if (NS_FAILED(rv)) {
+ MOZ_ASSERT_UNREACHABLE("leaking stream event");
+ nsISupports* sup = event;
+ NS_ADDREF(sup);
+ }
+ }
+ }
+ }
+
+ public:
+ NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream* aStream) override {
+ mStream = aStream;
+
+ nsresult rv = mTarget->Dispatch(this, NS_DISPATCH_NORMAL);
+ if (NS_FAILED(rv)) {
+ NS_WARNING("Dispatch failed");
+ return NS_ERROR_FAILURE;
+ }
+
+ return NS_OK;
+ }
+
+ NS_IMETHOD Run() override {
+ if (mCallback) {
+ if (mStream) {
+ mCallback->OnInputStreamReady(mStream);
+ }
+ mCallback = nullptr;
+ }
+ return NS_OK;
+ }
+
+ nsresult Cancel() override {
+ mCallback = nullptr;
+ return NS_OK;
+ }
+
+ NS_IMETHOD GetPriority(uint32_t* aPriority) override {
+ *aPriority = mPriority;
+ return NS_OK;
+ }
+
+ private:
+ nsCOMPtr<nsIAsyncInputStream> mStream;
+ nsCOMPtr<nsIInputStreamCallback> mCallback;
+ nsCOMPtr<nsIEventTarget> mTarget;
+ uint32_t mPriority;
+};
+
+NS_IMPL_ISUPPORTS_INHERITED(nsInputStreamReadyEvent, CancelableRunnable,
+ nsIInputStreamCallback, nsIRunnablePriority)
+
+//-----------------------------------------------------------------------------
+
+// This is a nsICancelableRunnable because we can dispatch it to Workers and
+// those can be shut down at any time, and in these cases, Cancel() is called
+// instead of Run().
+class nsOutputStreamReadyEvent final : public CancelableRunnable,
+ public nsIOutputStreamCallback {
+ public:
+ NS_DECL_ISUPPORTS_INHERITED
+
+ nsOutputStreamReadyEvent(nsIOutputStreamCallback* aCallback,
+ nsIEventTarget* aTarget)
+ : CancelableRunnable("nsOutputStreamReadyEvent"),
+ mCallback(aCallback),
+ mTarget(aTarget) {}
+
+ private:
+ ~nsOutputStreamReadyEvent() {
+ if (!mCallback) {
+ return;
+ }
+ //
+ // whoa!! looks like we never posted this event. take care to
+ // release mCallback on the correct thread. if mTarget lives on the
+ // calling thread, then we are ok. otherwise, we have to try to
+ // proxy the Release over the right thread. if that thread is dead,
+ // then there's nothing we can do... better to leak than crash.
+ //
+ bool val;
+ nsresult rv = mTarget->IsOnCurrentThread(&val);
+ if (NS_FAILED(rv) || !val) {
+ nsCOMPtr<nsIOutputStreamCallback> event =
+ NS_NewOutputStreamReadyEvent(mCallback, mTarget);
+ mCallback = nullptr;
+ if (event) {
+ rv = event->OnOutputStreamReady(nullptr);
+ if (NS_FAILED(rv)) {
+ MOZ_ASSERT_UNREACHABLE("leaking stream event");
+ nsISupports* sup = event;
+ NS_ADDREF(sup);
+ }
+ }
+ }
+ }
+
+ public:
+ NS_IMETHOD OnOutputStreamReady(nsIAsyncOutputStream* aStream) override {
+ mStream = aStream;
+
+ nsresult rv = mTarget->Dispatch(this, NS_DISPATCH_NORMAL);
+ if (NS_FAILED(rv)) {
+ NS_WARNING("PostEvent failed");
+ return NS_ERROR_FAILURE;
+ }
+
+ return NS_OK;
+ }
+
+ NS_IMETHOD Run() override {
+ if (mCallback) {
+ if (mStream) {
+ mCallback->OnOutputStreamReady(mStream);
+ }
+ mCallback = nullptr;
+ }
+ return NS_OK;
+ }
+
+ nsresult Cancel() override {
+ mCallback = nullptr;
+ return NS_OK;
+ }
+
+ private:
+ nsCOMPtr<nsIAsyncOutputStream> mStream;
+ nsCOMPtr<nsIOutputStreamCallback> mCallback;
+ nsCOMPtr<nsIEventTarget> mTarget;
+};
+
+NS_IMPL_ISUPPORTS_INHERITED(nsOutputStreamReadyEvent, CancelableRunnable,
+ nsIOutputStreamCallback)
+
+//-----------------------------------------------------------------------------
+
+already_AddRefed<nsIInputStreamCallback> NS_NewInputStreamReadyEvent(
+ const char* aName, nsIInputStreamCallback* aCallback,
+ nsIEventTarget* aTarget, uint32_t aPriority) {
+ NS_ASSERTION(aCallback, "null callback");
+ NS_ASSERTION(aTarget, "null target");
+ RefPtr<nsInputStreamReadyEvent> ev =
+ new nsInputStreamReadyEvent(aName, aCallback, aTarget, aPriority);
+ return ev.forget();
+}
+
+already_AddRefed<nsIOutputStreamCallback> NS_NewOutputStreamReadyEvent(
+ nsIOutputStreamCallback* aCallback, nsIEventTarget* aTarget) {
+ NS_ASSERTION(aCallback, "null callback");
+ NS_ASSERTION(aTarget, "null target");
+ RefPtr<nsOutputStreamReadyEvent> ev =
+ new nsOutputStreamReadyEvent(aCallback, aTarget);
+ return ev.forget();
+}
+
+//-----------------------------------------------------------------------------
+// NS_AsyncCopy implementation
+
+// abstract stream copier...
+class nsAStreamCopier : public nsIInputStreamCallback,
+ public nsIOutputStreamCallback,
+ public CancelableRunnable {
+ public:
+ NS_DECL_ISUPPORTS_INHERITED
+
+ nsAStreamCopier()
+ : CancelableRunnable("nsAStreamCopier"),
+ mLock("nsAStreamCopier.mLock"),
+ mCallback(nullptr),
+ mProgressCallback(nullptr),
+ mClosure(nullptr),
+ mChunkSize(0),
+ mEventInProcess(false),
+ mEventIsPending(false),
+ mCloseSource(true),
+ mCloseSink(true),
+ mCanceled(false),
+ mCancelStatus(NS_OK) {}
+
+ // kick off the async copy...
+ nsresult Start(nsIInputStream* aSource, nsIOutputStream* aSink,
+ nsIEventTarget* aTarget, nsAsyncCopyCallbackFun aCallback,
+ void* aClosure, uint32_t aChunksize, bool aCloseSource,
+ bool aCloseSink, nsAsyncCopyProgressFun aProgressCallback) {
+ mSource = aSource;
+ mSink = aSink;
+ mTarget = aTarget;
+ mCallback = aCallback;
+ mClosure = aClosure;
+ mChunkSize = aChunksize;
+ mCloseSource = aCloseSource;
+ mCloseSink = aCloseSink;
+ mProgressCallback = aProgressCallback;
+
+ mAsyncSource = do_QueryInterface(mSource);
+ mAsyncSink = do_QueryInterface(mSink);
+
+ return PostContinuationEvent();
+ }
+
+ // implemented by subclasses, returns number of bytes copied and
+ // sets source and sink condition before returning.
+ virtual uint32_t DoCopy(nsresult* aSourceCondition,
+ nsresult* aSinkCondition) = 0;
+
+ void Process() {
+ if (!mSource || !mSink) {
+ return;
+ }
+
+ nsresult cancelStatus;
+ bool canceled;
+ {
+ MutexAutoLock lock(mLock);
+ canceled = mCanceled;
+ cancelStatus = mCancelStatus;
+ }
+
+ // If the copy was canceled before Process() was even called, then
+ // sourceCondition and sinkCondition should be set to error results to
+ // ensure we don't call Finish() on a canceled nsISafeOutputStream.
+ MOZ_ASSERT(NS_FAILED(cancelStatus) == canceled, "cancel needs an error");
+ nsresult sourceCondition = cancelStatus;
+ nsresult sinkCondition = cancelStatus;
+
+ // Copy data from the source to the sink until we hit failure or have
+ // copied all the data.
+ for (;;) {
+ // Note: copyFailed will be true if the source or the sink have
+ // reported an error, or if we failed to write any bytes
+ // because we have consumed all of our data.
+ bool copyFailed = false;
+ if (!canceled) {
+ uint32_t n = DoCopy(&sourceCondition, &sinkCondition);
+ if (n > 0 && mProgressCallback) {
+ mProgressCallback(mClosure, n);
+ }
+ copyFailed =
+ NS_FAILED(sourceCondition) || NS_FAILED(sinkCondition) || n == 0;
+
+ MutexAutoLock lock(mLock);
+ canceled = mCanceled;
+ cancelStatus = mCancelStatus;
+ }
+ if (copyFailed && !canceled) {
+ if (sourceCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSource) {
+ // need to wait for more data from source. while waiting for
+ // more source data, be sure to observe failures on output end.
+ mAsyncSource->AsyncWait(this, 0, 0, nullptr);
+
+ if (mAsyncSink) {
+ mAsyncSink->AsyncWait(this, nsIAsyncOutputStream::WAIT_CLOSURE_ONLY,
+ 0, nullptr);
+ }
+ break;
+ }
+ if (sinkCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSink) {
+ // need to wait for more room in the sink. while waiting for
+ // more room in the sink, be sure to observer failures on the
+ // input end.
+ mAsyncSink->AsyncWait(this, 0, 0, nullptr);
+
+ if (mAsyncSource) {
+ mAsyncSource->AsyncWait(
+ this, nsIAsyncInputStream::WAIT_CLOSURE_ONLY, 0, nullptr);
+ }
+ break;
+ }
+ }
+ if (copyFailed || canceled) {
+ if (mAsyncSource) {
+ // cancel any previously-registered AsyncWait callbacks to avoid leaks
+ mAsyncSource->AsyncWait(nullptr, 0, 0, nullptr);
+ }
+ if (mCloseSource) {
+ // close source
+ if (mAsyncSource) {
+ mAsyncSource->CloseWithStatus(canceled ? cancelStatus
+ : sinkCondition);
+ } else {
+ mSource->Close();
+ }
+ }
+ mAsyncSource = nullptr;
+ mSource = nullptr;
+
+ if (mAsyncSink) {
+ // cancel any previously-registered AsyncWait callbacks to avoid leaks
+ mAsyncSink->AsyncWait(nullptr, 0, 0, nullptr);
+ }
+ if (mCloseSink) {
+ // close sink
+ if (mAsyncSink) {
+ mAsyncSink->CloseWithStatus(canceled ? cancelStatus
+ : sourceCondition);
+ } else {
+ // If we have an nsISafeOutputStream, and our
+ // sourceCondition and sinkCondition are not set to a
+ // failure state, finish writing.
+ nsCOMPtr<nsISafeOutputStream> sostream = do_QueryInterface(mSink);
+ if (sostream && NS_SUCCEEDED(sourceCondition) &&
+ NS_SUCCEEDED(sinkCondition)) {
+ sostream->Finish();
+ } else {
+ mSink->Close();
+ }
+ }
+ }
+ mAsyncSink = nullptr;
+ mSink = nullptr;
+
+ // notify state complete...
+ if (mCallback) {
+ nsresult status;
+ if (!canceled) {
+ status = sourceCondition;
+ if (NS_SUCCEEDED(status)) {
+ status = sinkCondition;
+ }
+ if (status == NS_BASE_STREAM_CLOSED) {
+ status = NS_OK;
+ }
+ } else {
+ status = cancelStatus;
+ }
+ mCallback(mClosure, status);
+ }
+ break;
+ }
+ }
+ }
+
+ nsresult Cancel(nsresult aReason) {
+ MutexAutoLock lock(mLock);
+ if (mCanceled) {
+ return NS_ERROR_FAILURE;
+ }
+
+ if (NS_SUCCEEDED(aReason)) {
+ NS_WARNING("cancel with non-failure status code");
+ aReason = NS_BASE_STREAM_CLOSED;
+ }
+
+ mCanceled = true;
+ mCancelStatus = aReason;
+ return NS_OK;
+ }
+
+ NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream* aSource) override {
+ PostContinuationEvent();
+ return NS_OK;
+ }
+
+ NS_IMETHOD OnOutputStreamReady(nsIAsyncOutputStream* aSink) override {
+ PostContinuationEvent();
+ return NS_OK;
+ }
+
+ // continuation event handler
+ NS_IMETHOD Run() override {
+ Process();
+
+ // clear "in process" flag and post any pending continuation event
+ MutexAutoLock lock(mLock);
+ mEventInProcess = false;
+ if (mEventIsPending) {
+ mEventIsPending = false;
+ PostContinuationEvent_Locked();
+ }
+
+ return NS_OK;
+ }
+
+ nsresult Cancel() MOZ_MUST_OVERRIDE override = 0;
+
+ nsresult PostContinuationEvent() {
+ // we cannot post a continuation event if there is currently
+ // an event in process. doing so could result in Process being
+ // run simultaneously on multiple threads, so we mark the event
+ // as pending, and if an event is already in process then we
+ // just let that existing event take care of posting the real
+ // continuation event.
+
+ MutexAutoLock lock(mLock);
+ return PostContinuationEvent_Locked();
+ }
+
+ nsresult PostContinuationEvent_Locked() MOZ_REQUIRES(mLock) {
+ nsresult rv = NS_OK;
+ if (mEventInProcess) {
+ mEventIsPending = true;
+ } else {
+ rv = mTarget->Dispatch(this, NS_DISPATCH_NORMAL);
+ if (NS_SUCCEEDED(rv)) {
+ mEventInProcess = true;
+ } else {
+ NS_WARNING("unable to post continuation event");
+ }
+ }
+ return rv;
+ }
+
+ protected:
+ nsCOMPtr<nsIInputStream> mSource;
+ nsCOMPtr<nsIOutputStream> mSink;
+ nsCOMPtr<nsIAsyncInputStream> mAsyncSource;
+ nsCOMPtr<nsIAsyncOutputStream> mAsyncSink;
+ nsCOMPtr<nsIEventTarget> mTarget;
+ Mutex mLock;
+ nsAsyncCopyCallbackFun mCallback;
+ nsAsyncCopyProgressFun mProgressCallback;
+ void* mClosure;
+ uint32_t mChunkSize;
+ bool mEventInProcess MOZ_GUARDED_BY(mLock);
+ bool mEventIsPending MOZ_GUARDED_BY(mLock);
+ bool mCloseSource;
+ bool mCloseSink;
+ bool mCanceled MOZ_GUARDED_BY(mLock);
+ nsresult mCancelStatus MOZ_GUARDED_BY(mLock);
+
+ // virtual since subclasses call superclass Release()
+ virtual ~nsAStreamCopier() = default;
+};
+
+NS_IMPL_ISUPPORTS_INHERITED(nsAStreamCopier, CancelableRunnable,
+ nsIInputStreamCallback, nsIOutputStreamCallback)
+
+class nsStreamCopierIB final : public nsAStreamCopier {
+ public:
+ nsStreamCopierIB() : nsAStreamCopier() {}
+ virtual ~nsStreamCopierIB() = default;
+
+ struct MOZ_STACK_CLASS ReadSegmentsState {
+ // the nsIOutputStream will outlive the ReadSegmentsState on the stack
+ nsIOutputStream* MOZ_NON_OWNING_REF mSink;
+ nsresult mSinkCondition;
+ };
+
+ static nsresult ConsumeInputBuffer(nsIInputStream* aInStr, void* aClosure,
+ const char* aBuffer, uint32_t aOffset,
+ uint32_t aCount, uint32_t* aCountWritten) {
+ ReadSegmentsState* state = (ReadSegmentsState*)aClosure;
+
+ nsresult rv = state->mSink->Write(aBuffer, aCount, aCountWritten);
+ if (NS_FAILED(rv)) {
+ state->mSinkCondition = rv;
+ } else if (*aCountWritten == 0) {
+ state->mSinkCondition = NS_BASE_STREAM_CLOSED;
+ }
+
+ return state->mSinkCondition;
+ }
+
+ uint32_t DoCopy(nsresult* aSourceCondition,
+ nsresult* aSinkCondition) override {
+ ReadSegmentsState state;
+ state.mSink = mSink;
+ state.mSinkCondition = NS_OK;
+
+ uint32_t n;
+ *aSourceCondition =
+ mSource->ReadSegments(ConsumeInputBuffer, &state, mChunkSize, &n);
+ *aSinkCondition = NS_SUCCEEDED(state.mSinkCondition) && n == 0
+ ? mSink->StreamStatus()
+ : state.mSinkCondition;
+ return n;
+ }
+
+ nsresult Cancel() override { return NS_OK; }
+};
+
+class nsStreamCopierOB final : public nsAStreamCopier {
+ public:
+ nsStreamCopierOB() : nsAStreamCopier() {}
+ virtual ~nsStreamCopierOB() = default;
+
+ struct MOZ_STACK_CLASS WriteSegmentsState {
+ // the nsIInputStream will outlive the WriteSegmentsState on the stack
+ nsIInputStream* MOZ_NON_OWNING_REF mSource;
+ nsresult mSourceCondition;
+ };
+
+ static nsresult FillOutputBuffer(nsIOutputStream* aOutStr, void* aClosure,
+ char* aBuffer, uint32_t aOffset,
+ uint32_t aCount, uint32_t* aCountRead) {
+ WriteSegmentsState* state = (WriteSegmentsState*)aClosure;
+
+ nsresult rv = state->mSource->Read(aBuffer, aCount, aCountRead);
+ if (NS_FAILED(rv)) {
+ state->mSourceCondition = rv;
+ } else if (*aCountRead == 0) {
+ state->mSourceCondition = NS_BASE_STREAM_CLOSED;
+ }
+
+ return state->mSourceCondition;
+ }
+
+ uint32_t DoCopy(nsresult* aSourceCondition,
+ nsresult* aSinkCondition) override {
+ WriteSegmentsState state;
+ state.mSource = mSource;
+ state.mSourceCondition = NS_OK;
+
+ uint32_t n;
+ *aSinkCondition =
+ mSink->WriteSegments(FillOutputBuffer, &state, mChunkSize, &n);
+ *aSourceCondition = NS_SUCCEEDED(state.mSourceCondition) && n == 0
+ ? mSource->StreamStatus()
+ : state.mSourceCondition;
+ return n;
+ }
+
+ nsresult Cancel() override { return NS_OK; }
+};
+
+//-----------------------------------------------------------------------------
+
+nsresult NS_AsyncCopy(nsIInputStream* aSource, nsIOutputStream* aSink,
+ nsIEventTarget* aTarget, nsAsyncCopyMode aMode,
+ uint32_t aChunkSize, nsAsyncCopyCallbackFun aCallback,
+ void* aClosure, bool aCloseSource, bool aCloseSink,
+ nsISupports** aCopierCtx,
+ nsAsyncCopyProgressFun aProgressCallback) {
+ NS_ASSERTION(aTarget, "non-null target required");
+
+ nsresult rv;
+ nsAStreamCopier* copier;
+
+ if (aMode == NS_ASYNCCOPY_VIA_READSEGMENTS) {
+ copier = new nsStreamCopierIB();
+ } else {
+ copier = new nsStreamCopierOB();
+ }
+
+ // Start() takes an owning ref to the copier...
+ NS_ADDREF(copier);
+ rv = copier->Start(aSource, aSink, aTarget, aCallback, aClosure, aChunkSize,
+ aCloseSource, aCloseSink, aProgressCallback);
+
+ if (aCopierCtx) {
+ *aCopierCtx = static_cast<nsISupports*>(static_cast<nsIRunnable*>(copier));
+ NS_ADDREF(*aCopierCtx);
+ }
+ NS_RELEASE(copier);
+
+ return rv;
+}
+
+//-----------------------------------------------------------------------------
+
+nsresult NS_CancelAsyncCopy(nsISupports* aCopierCtx, nsresult aReason) {
+ nsAStreamCopier* copier =
+ static_cast<nsAStreamCopier*>(static_cast<nsIRunnable*>(aCopierCtx));
+ return copier->Cancel(aReason);
+}
+
+//-----------------------------------------------------------------------------
+
+namespace {
+template <typename T>
+struct ResultTraits {};
+
+template <>
+struct ResultTraits<nsACString> {
+ static void Clear(nsACString& aString) { aString.Truncate(); }
+
+ static char* GetStorage(nsACString& aString) {
+ return aString.BeginWriting();
+ }
+};
+
+template <>
+struct ResultTraits<nsTArray<uint8_t>> {
+ static void Clear(nsTArray<uint8_t>& aArray) { aArray.Clear(); }
+
+ static char* GetStorage(nsTArray<uint8_t>& aArray) {
+ return reinterpret_cast<char*>(aArray.Elements());
+ }
+};
+} // namespace
+
+template <typename T>
+nsresult DoConsumeStream(nsIInputStream* aStream, uint32_t aMaxCount,
+ T& aResult) {
+ nsresult rv = NS_OK;
+ ResultTraits<T>::Clear(aResult);
+
+ while (aMaxCount) {
+ uint64_t avail64;
+ rv = aStream->Available(&avail64);
+ if (NS_FAILED(rv)) {
+ if (rv == NS_BASE_STREAM_CLOSED) {
+ rv = NS_OK;
+ }
+ break;
+ }
+ if (avail64 == 0) {
+ break;
+ }
+
+ uint32_t avail = (uint32_t)XPCOM_MIN<uint64_t>(avail64, aMaxCount);
+
+ // resize aResult buffer
+ uint32_t length = aResult.Length();
+ CheckedInt<uint32_t> newLength = CheckedInt<uint32_t>(length) + avail;
+ if (!newLength.isValid()) {
+ return NS_ERROR_FILE_TOO_BIG;
+ }
+
+ if (!aResult.SetLength(newLength.value(), fallible)) {
+ return NS_ERROR_OUT_OF_MEMORY;
+ }
+ char* buf = ResultTraits<T>::GetStorage(aResult) + length;
+
+ uint32_t n;
+ rv = aStream->Read(buf, avail, &n);
+ if (NS_FAILED(rv)) {
+ break;
+ }
+ if (n != avail) {
+ MOZ_ASSERT(n < avail, "What happened there???");
+ aResult.SetLength(length + n);
+ }
+ if (n == 0) {
+ break;
+ }
+ aMaxCount -= n;
+ }
+
+ return rv;
+}
+
+nsresult NS_ConsumeStream(nsIInputStream* aStream, uint32_t aMaxCount,
+ nsACString& aResult) {
+ return DoConsumeStream(aStream, aMaxCount, aResult);
+}
+
+nsresult NS_ConsumeStream(nsIInputStream* aStream, uint32_t aMaxCount,
+ nsTArray<uint8_t>& aResult) {
+ return DoConsumeStream(aStream, aMaxCount, aResult);
+}
+
+//-----------------------------------------------------------------------------
+
+static nsresult TestInputStream(nsIInputStream* aInStr, void* aClosure,
+ const char* aBuffer, uint32_t aOffset,
+ uint32_t aCount, uint32_t* aCountWritten) {
+ bool* result = static_cast<bool*>(aClosure);
+ *result = true;
+ *aCountWritten = 0;
+ return NS_ERROR_ABORT; // don't call me anymore
+}
+
+bool NS_InputStreamIsBuffered(nsIInputStream* aStream) {
+ nsCOMPtr<nsIBufferedInputStream> bufferedIn = do_QueryInterface(aStream);
+ if (bufferedIn) {
+ return true;
+ }
+
+ bool result = false;
+ uint32_t n;
+ nsresult rv = aStream->ReadSegments(TestInputStream, &result, 1, &n);
+ return result || rv != NS_ERROR_NOT_IMPLEMENTED;
+}
+
+static nsresult TestOutputStream(nsIOutputStream* aOutStr, void* aClosure,
+ char* aBuffer, uint32_t aOffset,
+ uint32_t aCount, uint32_t* aCountRead) {
+ bool* result = static_cast<bool*>(aClosure);
+ *result = true;
+ *aCountRead = 0;
+ return NS_ERROR_ABORT; // don't call me anymore
+}
+
+bool NS_OutputStreamIsBuffered(nsIOutputStream* aStream) {
+ nsCOMPtr<nsIBufferedOutputStream> bufferedOut = do_QueryInterface(aStream);
+ if (bufferedOut) {
+ return true;
+ }
+
+ bool result = false;
+ uint32_t n;
+ aStream->WriteSegments(TestOutputStream, &result, 1, &n);
+ return result;
+}
+
+//-----------------------------------------------------------------------------
+
+nsresult NS_CopySegmentToStream(nsIInputStream* aInStr, void* aClosure,
+ const char* aBuffer, uint32_t aOffset,
+ uint32_t aCount, uint32_t* aCountWritten) {
+ nsIOutputStream* outStr = static_cast<nsIOutputStream*>(aClosure);
+ *aCountWritten = 0;
+ while (aCount) {
+ uint32_t n;
+ nsresult rv = outStr->Write(aBuffer, aCount, &n);
+ if (NS_FAILED(rv)) {
+ return rv;
+ }
+ aBuffer += n;
+ aCount -= n;
+ *aCountWritten += n;
+ }
+ return NS_OK;
+}
+
+nsresult NS_CopySegmentToBuffer(nsIInputStream* aInStr, void* aClosure,
+ const char* aBuffer, uint32_t aOffset,
+ uint32_t aCount, uint32_t* aCountWritten) {
+ char* toBuf = static_cast<char*>(aClosure);
+ memcpy(&toBuf[aOffset], aBuffer, aCount);
+ *aCountWritten = aCount;
+ return NS_OK;
+}
+
+nsresult NS_CopyBufferToSegment(nsIOutputStream* aOutStr, void* aClosure,
+ char* aBuffer, uint32_t aOffset,
+ uint32_t aCount, uint32_t* aCountRead) {
+ const char* fromBuf = static_cast<const char*>(aClosure);
+ memcpy(aBuffer, &fromBuf[aOffset], aCount);
+ *aCountRead = aCount;
+ return NS_OK;
+}
+
+nsresult NS_CopyStreamToSegment(nsIOutputStream* aOutputStream, void* aClosure,
+ char* aToSegment, uint32_t aFromOffset,
+ uint32_t aCount, uint32_t* aReadCount) {
+ nsIInputStream* fromStream = static_cast<nsIInputStream*>(aClosure);
+ return fromStream->Read(aToSegment, aCount, aReadCount);
+}
+
+nsresult NS_DiscardSegment(nsIInputStream* aInStr, void* aClosure,
+ const char* aBuffer, uint32_t aOffset,
+ uint32_t aCount, uint32_t* aCountWritten) {
+ *aCountWritten = aCount;
+ return NS_OK;
+}
+
+//-----------------------------------------------------------------------------
+
+nsresult NS_WriteSegmentThunk(nsIInputStream* aInStr, void* aClosure,
+ const char* aBuffer, uint32_t aOffset,
+ uint32_t aCount, uint32_t* aCountWritten) {
+ nsWriteSegmentThunk* thunk = static_cast<nsWriteSegmentThunk*>(aClosure);
+ return thunk->mFun(thunk->mStream, thunk->mClosure, aBuffer, aOffset, aCount,
+ aCountWritten);
+}
+
+nsresult NS_FillArray(FallibleTArray<char>& aDest, nsIInputStream* aInput,
+ uint32_t aKeep, uint32_t* aNewBytes) {
+ MOZ_ASSERT(aInput, "null stream");
+ MOZ_ASSERT(aKeep <= aDest.Length(), "illegal keep count");
+
+ char* aBuffer = aDest.Elements();
+ int64_t keepOffset = int64_t(aDest.Length()) - aKeep;
+ if (aKeep != 0 && keepOffset > 0) {
+ memmove(aBuffer, aBuffer + keepOffset, aKeep);
+ }
+
+ nsresult rv =
+ aInput->Read(aBuffer + aKeep, aDest.Capacity() - aKeep, aNewBytes);
+ if (NS_FAILED(rv)) {
+ *aNewBytes = 0;
+ }
+ // NOTE: we rely on the fact that the new slots are NOT initialized by
+ // SetLengthAndRetainStorage here, see nsTArrayElementTraits::Construct()
+ // in nsTArray.h:
+ aDest.SetLengthAndRetainStorage(aKeep + *aNewBytes);
+
+ MOZ_ASSERT(aDest.Length() <= aDest.Capacity(), "buffer overflow");
+ return rv;
+}
+
+bool NS_InputStreamIsCloneable(nsIInputStream* aSource) {
+ if (!aSource) {
+ return false;
+ }
+
+ nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(aSource);
+ return cloneable && cloneable->GetCloneable();
+}
+
+nsresult NS_CloneInputStream(nsIInputStream* aSource,
+ nsIInputStream** aCloneOut,
+ nsIInputStream** aReplacementOut) {
+ if (NS_WARN_IF(!aSource)) {
+ return NS_ERROR_FAILURE;
+ }
+
+ // Attempt to perform the clone directly on the source stream
+ nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(aSource);
+ if (cloneable && cloneable->GetCloneable()) {
+ if (aReplacementOut) {
+ *aReplacementOut = nullptr;
+ }
+ return cloneable->Clone(aCloneOut);
+ }
+
+ // If we failed the clone and the caller does not want to replace their
+ // original stream, then we are done. Return error.
+ if (!aReplacementOut) {
+ return NS_ERROR_FAILURE;
+ }
+
+ // The caller has opted-in to the fallback clone support that replaces
+ // the original stream. Copy the data to a pipe and return two cloned
+ // input streams.
+
+ nsCOMPtr<nsIInputStream> reader;
+ nsCOMPtr<nsIInputStream> readerClone;
+ nsCOMPtr<nsIOutputStream> writer;
+
+ NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer), 0,
+ 0, // default segment size and max size
+ true, true); // non-blocking
+
+ // Propagate length information provided by nsIInputStreamLength. We don't use
+ // InputStreamLengthHelper::GetSyncLength to avoid the risk of blocking when
+ // called off-main-thread.
+ int64_t length = -1;
+ if (nsCOMPtr<nsIInputStreamLength> streamLength = do_QueryInterface(aSource);
+ streamLength && NS_SUCCEEDED(streamLength->Length(&length)) &&
+ length != -1) {
+ reader = new mozilla::InputStreamLengthWrapper(reader.forget(), length);
+ }
+
+ cloneable = do_QueryInterface(reader);
+ MOZ_ASSERT(cloneable && cloneable->GetCloneable());
+
+ nsresult rv = cloneable->Clone(getter_AddRefs(readerClone));
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ nsCOMPtr<nsIEventTarget> target =
+ do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ rv = NS_AsyncCopy(aSource, writer, target, NS_ASYNCCOPY_VIA_WRITESEGMENTS);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ readerClone.forget(aCloneOut);
+ reader.forget(aReplacementOut);
+
+ return NS_OK;
+}
+
+nsresult NS_MakeAsyncNonBlockingInputStream(
+ already_AddRefed<nsIInputStream> aSource,
+ nsIAsyncInputStream** aAsyncInputStream, bool aCloseWhenDone,
+ uint32_t aFlags, uint32_t aSegmentSize, uint32_t aSegmentCount) {
+ nsCOMPtr<nsIInputStream> source = std::move(aSource);
+ if (NS_WARN_IF(!aAsyncInputStream)) {
+ return NS_ERROR_FAILURE;
+ }
+
+ bool nonBlocking = false;
+ nsresult rv = source->IsNonBlocking(&nonBlocking);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ nsCOMPtr<nsIAsyncInputStream> asyncStream = do_QueryInterface(source);
+
+ if (nonBlocking && asyncStream) {
+ // This stream is perfect!
+ asyncStream.forget(aAsyncInputStream);
+ return NS_OK;
+ }
+
+ if (nonBlocking) {
+ // If the stream is non-blocking but not async, we wrap it.
+ return NonBlockingAsyncInputStream::Create(source.forget(),
+ aAsyncInputStream);
+ }
+
+ nsCOMPtr<nsIStreamTransportService> sts =
+ do_GetService(kStreamTransportServiceCID, &rv);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ nsCOMPtr<nsITransport> transport;
+ rv = sts->CreateInputTransport(source, aCloseWhenDone,
+ getter_AddRefs(transport));
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ nsCOMPtr<nsIInputStream> wrapper;
+ rv = transport->OpenInputStream(aFlags, aSegmentSize, aSegmentCount,
+ getter_AddRefs(wrapper));
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ asyncStream = do_QueryInterface(wrapper);
+ MOZ_ASSERT(asyncStream);
+
+ asyncStream.forget(aAsyncInputStream);
+ return NS_OK;
+}