/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim: set ts=8 sts=2 et sw=2 tw=80: */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ #include "mozilla/Mutex.h" #include "mozilla/Attributes.h" #include "mozilla/InputStreamLengthWrapper.h" #include "nsIInputStreamLength.h" #include "nsStreamUtils.h" #include "nsCOMPtr.h" #include "nsICloneableInputStream.h" #include "nsIEventTarget.h" #include "nsICancelableRunnable.h" #include "nsISafeOutputStream.h" #include "nsString.h" #include "nsIAsyncInputStream.h" #include "nsIAsyncOutputStream.h" #include "nsIBufferedStreams.h" #include "nsIPipe.h" #include "nsNetCID.h" #include "nsServiceManagerUtils.h" #include "nsThreadUtils.h" #include "nsITransport.h" #include "nsIStreamTransportService.h" #include "NonBlockingAsyncInputStream.h" using namespace mozilla; static NS_DEFINE_CID(kStreamTransportServiceCID, NS_STREAMTRANSPORTSERVICE_CID); //----------------------------------------------------------------------------- // This is a nsICancelableRunnable because we can dispatch it to Workers and // those can be shut down at any time, and in these cases, Cancel() is called // instead of Run(). class nsInputStreamReadyEvent final : public CancelableRunnable, public nsIInputStreamCallback, public nsIRunnablePriority { public: NS_DECL_ISUPPORTS_INHERITED nsInputStreamReadyEvent(const char* aName, nsIInputStreamCallback* aCallback, nsIEventTarget* aTarget, uint32_t aPriority) : CancelableRunnable(aName), mCallback(aCallback), mTarget(aTarget), mPriority(aPriority) {} private: ~nsInputStreamReadyEvent() { if (!mCallback) { return; } // // whoa!! looks like we never posted this event. take care to // release mCallback on the correct thread. if mTarget lives on the // calling thread, then we are ok. otherwise, we have to try to // proxy the Release over the right thread. if that thread is dead, // then there's nothing we can do... better to leak than crash. // bool val; nsresult rv = mTarget->IsOnCurrentThread(&val); if (NS_FAILED(rv) || !val) { nsCOMPtr<nsIInputStreamCallback> event = NS_NewInputStreamReadyEvent( "~nsInputStreamReadyEvent", mCallback, mTarget, mPriority); mCallback = nullptr; if (event) { rv = event->OnInputStreamReady(nullptr); if (NS_FAILED(rv)) { MOZ_ASSERT_UNREACHABLE("leaking stream event"); nsISupports* sup = event; NS_ADDREF(sup); } } } } public: NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream* aStream) override { mStream = aStream; nsresult rv = mTarget->Dispatch(this, NS_DISPATCH_NORMAL); if (NS_FAILED(rv)) { NS_WARNING("Dispatch failed"); return NS_ERROR_FAILURE; } return NS_OK; } NS_IMETHOD Run() override { if (mCallback) { if (mStream) { mCallback->OnInputStreamReady(mStream); } mCallback = nullptr; } return NS_OK; } nsresult Cancel() override { mCallback = nullptr; return NS_OK; } NS_IMETHOD GetPriority(uint32_t* aPriority) override { *aPriority = mPriority; return NS_OK; } private: nsCOMPtr<nsIAsyncInputStream> mStream; nsCOMPtr<nsIInputStreamCallback> mCallback; nsCOMPtr<nsIEventTarget> mTarget; uint32_t mPriority; }; NS_IMPL_ISUPPORTS_INHERITED(nsInputStreamReadyEvent, CancelableRunnable, nsIInputStreamCallback, nsIRunnablePriority) //----------------------------------------------------------------------------- // This is a nsICancelableRunnable because we can dispatch it to Workers and // those can be shut down at any time, and in these cases, Cancel() is called // instead of Run(). class nsOutputStreamReadyEvent final : public CancelableRunnable, public nsIOutputStreamCallback { public: NS_DECL_ISUPPORTS_INHERITED nsOutputStreamReadyEvent(nsIOutputStreamCallback* aCallback, nsIEventTarget* aTarget) : CancelableRunnable("nsOutputStreamReadyEvent"), mCallback(aCallback), mTarget(aTarget) {} private: ~nsOutputStreamReadyEvent() { if (!mCallback) { return; } // // whoa!! looks like we never posted this event. take care to // release mCallback on the correct thread. if mTarget lives on the // calling thread, then we are ok. otherwise, we have to try to // proxy the Release over the right thread. if that thread is dead, // then there's nothing we can do... better to leak than crash. // bool val; nsresult rv = mTarget->IsOnCurrentThread(&val); if (NS_FAILED(rv) || !val) { nsCOMPtr<nsIOutputStreamCallback> event = NS_NewOutputStreamReadyEvent(mCallback, mTarget); mCallback = nullptr; if (event) { rv = event->OnOutputStreamReady(nullptr); if (NS_FAILED(rv)) { MOZ_ASSERT_UNREACHABLE("leaking stream event"); nsISupports* sup = event; NS_ADDREF(sup); } } } } public: NS_IMETHOD OnOutputStreamReady(nsIAsyncOutputStream* aStream) override { mStream = aStream; nsresult rv = mTarget->Dispatch(this, NS_DISPATCH_NORMAL); if (NS_FAILED(rv)) { NS_WARNING("PostEvent failed"); return NS_ERROR_FAILURE; } return NS_OK; } NS_IMETHOD Run() override { if (mCallback) { if (mStream) { mCallback->OnOutputStreamReady(mStream); } mCallback = nullptr; } return NS_OK; } nsresult Cancel() override { mCallback = nullptr; return NS_OK; } private: nsCOMPtr<nsIAsyncOutputStream> mStream; nsCOMPtr<nsIOutputStreamCallback> mCallback; nsCOMPtr<nsIEventTarget> mTarget; }; NS_IMPL_ISUPPORTS_INHERITED(nsOutputStreamReadyEvent, CancelableRunnable, nsIOutputStreamCallback) //----------------------------------------------------------------------------- already_AddRefed<nsIInputStreamCallback> NS_NewInputStreamReadyEvent( const char* aName, nsIInputStreamCallback* aCallback, nsIEventTarget* aTarget, uint32_t aPriority) { NS_ASSERTION(aCallback, "null callback"); NS_ASSERTION(aTarget, "null target"); RefPtr<nsInputStreamReadyEvent> ev = new nsInputStreamReadyEvent(aName, aCallback, aTarget, aPriority); return ev.forget(); } already_AddRefed<nsIOutputStreamCallback> NS_NewOutputStreamReadyEvent( nsIOutputStreamCallback* aCallback, nsIEventTarget* aTarget) { NS_ASSERTION(aCallback, "null callback"); NS_ASSERTION(aTarget, "null target"); RefPtr<nsOutputStreamReadyEvent> ev = new nsOutputStreamReadyEvent(aCallback, aTarget); return ev.forget(); } //----------------------------------------------------------------------------- // NS_AsyncCopy implementation // abstract stream copier... class nsAStreamCopier : public nsIInputStreamCallback, public nsIOutputStreamCallback, public CancelableRunnable { public: NS_DECL_ISUPPORTS_INHERITED nsAStreamCopier() : CancelableRunnable("nsAStreamCopier"), mLock("nsAStreamCopier.mLock"), mCallback(nullptr), mProgressCallback(nullptr), mClosure(nullptr), mChunkSize(0), mEventInProcess(false), mEventIsPending(false), mCloseSource(true), mCloseSink(true), mCanceled(false), mCancelStatus(NS_OK) {} // kick off the async copy... nsresult Start(nsIInputStream* aSource, nsIOutputStream* aSink, nsIEventTarget* aTarget, nsAsyncCopyCallbackFun aCallback, void* aClosure, uint32_t aChunksize, bool aCloseSource, bool aCloseSink, nsAsyncCopyProgressFun aProgressCallback) { mSource = aSource; mSink = aSink; mTarget = aTarget; mCallback = aCallback; mClosure = aClosure; mChunkSize = aChunksize; mCloseSource = aCloseSource; mCloseSink = aCloseSink; mProgressCallback = aProgressCallback; mAsyncSource = do_QueryInterface(mSource); mAsyncSink = do_QueryInterface(mSink); return PostContinuationEvent(); } // implemented by subclasses, returns number of bytes copied and // sets source and sink condition before returning. virtual uint32_t DoCopy(nsresult* aSourceCondition, nsresult* aSinkCondition) = 0; void Process() { if (!mSource || !mSink) { return; } nsresult cancelStatus; bool canceled; { MutexAutoLock lock(mLock); canceled = mCanceled; cancelStatus = mCancelStatus; } // If the copy was canceled before Process() was even called, then // sourceCondition and sinkCondition should be set to error results to // ensure we don't call Finish() on a canceled nsISafeOutputStream. MOZ_ASSERT(NS_FAILED(cancelStatus) == canceled, "cancel needs an error"); nsresult sourceCondition = cancelStatus; nsresult sinkCondition = cancelStatus; // Copy data from the source to the sink until we hit failure or have // copied all the data. for (;;) { // Note: copyFailed will be true if the source or the sink have // reported an error, or if we failed to write any bytes // because we have consumed all of our data. bool copyFailed = false; if (!canceled) { uint32_t n = DoCopy(&sourceCondition, &sinkCondition); if (n > 0 && mProgressCallback) { mProgressCallback(mClosure, n); } copyFailed = NS_FAILED(sourceCondition) || NS_FAILED(sinkCondition) || n == 0; MutexAutoLock lock(mLock); canceled = mCanceled; cancelStatus = mCancelStatus; } if (copyFailed && !canceled) { if (sourceCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSource) { // need to wait for more data from source. while waiting for // more source data, be sure to observe failures on output end. mAsyncSource->AsyncWait(this, 0, 0, nullptr); if (mAsyncSink) { mAsyncSink->AsyncWait(this, nsIAsyncOutputStream::WAIT_CLOSURE_ONLY, 0, nullptr); } break; } if (sinkCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSink) { // need to wait for more room in the sink. while waiting for // more room in the sink, be sure to observer failures on the // input end. mAsyncSink->AsyncWait(this, 0, 0, nullptr); if (mAsyncSource) { mAsyncSource->AsyncWait( this, nsIAsyncInputStream::WAIT_CLOSURE_ONLY, 0, nullptr); } break; } } if (copyFailed || canceled) { if (mAsyncSource) { // cancel any previously-registered AsyncWait callbacks to avoid leaks mAsyncSource->AsyncWait(nullptr, 0, 0, nullptr); } if (mCloseSource) { // close source if (mAsyncSource) { mAsyncSource->CloseWithStatus(canceled ? cancelStatus : sinkCondition); } else { mSource->Close(); } } mAsyncSource = nullptr; mSource = nullptr; if (mAsyncSink) { // cancel any previously-registered AsyncWait callbacks to avoid leaks mAsyncSink->AsyncWait(nullptr, 0, 0, nullptr); } if (mCloseSink) { // close sink if (mAsyncSink) { mAsyncSink->CloseWithStatus(canceled ? cancelStatus : sourceCondition); } else { // If we have an nsISafeOutputStream, and our // sourceCondition and sinkCondition are not set to a // failure state, finish writing. nsCOMPtr<nsISafeOutputStream> sostream = do_QueryInterface(mSink); if (sostream && NS_SUCCEEDED(sourceCondition) && NS_SUCCEEDED(sinkCondition)) { sostream->Finish(); } else { mSink->Close(); } } } mAsyncSink = nullptr; mSink = nullptr; // notify state complete... if (mCallback) { nsresult status; if (!canceled) { status = sourceCondition; if (NS_SUCCEEDED(status)) { status = sinkCondition; } if (status == NS_BASE_STREAM_CLOSED) { status = NS_OK; } } else { status = cancelStatus; } mCallback(mClosure, status); } break; } } } nsresult Cancel(nsresult aReason) { MutexAutoLock lock(mLock); if (mCanceled) { return NS_ERROR_FAILURE; } if (NS_SUCCEEDED(aReason)) { NS_WARNING("cancel with non-failure status code"); aReason = NS_BASE_STREAM_CLOSED; } mCanceled = true; mCancelStatus = aReason; return NS_OK; } NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream* aSource) override { PostContinuationEvent(); return NS_OK; } NS_IMETHOD OnOutputStreamReady(nsIAsyncOutputStream* aSink) override { PostContinuationEvent(); return NS_OK; } // continuation event handler NS_IMETHOD Run() override { Process(); // clear "in process" flag and post any pending continuation event MutexAutoLock lock(mLock); mEventInProcess = false; if (mEventIsPending) { mEventIsPending = false; PostContinuationEvent_Locked(); } return NS_OK; } nsresult Cancel() MOZ_MUST_OVERRIDE override = 0; nsresult PostContinuationEvent() { // we cannot post a continuation event if there is currently // an event in process. doing so could result in Process being // run simultaneously on multiple threads, so we mark the event // as pending, and if an event is already in process then we // just let that existing event take care of posting the real // continuation event. MutexAutoLock lock(mLock); return PostContinuationEvent_Locked(); } nsresult PostContinuationEvent_Locked() MOZ_REQUIRES(mLock) { nsresult rv = NS_OK; if (mEventInProcess) { mEventIsPending = true; } else { rv = mTarget->Dispatch(this, NS_DISPATCH_NORMAL); if (NS_SUCCEEDED(rv)) { mEventInProcess = true; } else { NS_WARNING("unable to post continuation event"); } } return rv; } protected: nsCOMPtr<nsIInputStream> mSource; nsCOMPtr<nsIOutputStream> mSink; nsCOMPtr<nsIAsyncInputStream> mAsyncSource; nsCOMPtr<nsIAsyncOutputStream> mAsyncSink; nsCOMPtr<nsIEventTarget> mTarget; Mutex mLock; nsAsyncCopyCallbackFun mCallback; nsAsyncCopyProgressFun mProgressCallback; void* mClosure; uint32_t mChunkSize; bool mEventInProcess MOZ_GUARDED_BY(mLock); bool mEventIsPending MOZ_GUARDED_BY(mLock); bool mCloseSource; bool mCloseSink; bool mCanceled MOZ_GUARDED_BY(mLock); nsresult mCancelStatus MOZ_GUARDED_BY(mLock); // virtual since subclasses call superclass Release() virtual ~nsAStreamCopier() = default; }; NS_IMPL_ISUPPORTS_INHERITED(nsAStreamCopier, CancelableRunnable, nsIInputStreamCallback, nsIOutputStreamCallback) class nsStreamCopierIB final : public nsAStreamCopier { public: nsStreamCopierIB() = default; virtual ~nsStreamCopierIB() = default; struct MOZ_STACK_CLASS ReadSegmentsState { // the nsIOutputStream will outlive the ReadSegmentsState on the stack nsIOutputStream* MOZ_NON_OWNING_REF mSink; nsresult mSinkCondition; }; static nsresult ConsumeInputBuffer(nsIInputStream* aInStr, void* aClosure, const char* aBuffer, uint32_t aOffset, uint32_t aCount, uint32_t* aCountWritten) { ReadSegmentsState* state = (ReadSegmentsState*)aClosure; nsresult rv = state->mSink->Write(aBuffer, aCount, aCountWritten); if (NS_FAILED(rv)) { state->mSinkCondition = rv; } else if (*aCountWritten == 0) { state->mSinkCondition = NS_BASE_STREAM_CLOSED; } return state->mSinkCondition; } uint32_t DoCopy(nsresult* aSourceCondition, nsresult* aSinkCondition) override { ReadSegmentsState state; state.mSink = mSink; state.mSinkCondition = NS_OK; uint32_t n; *aSourceCondition = mSource->ReadSegments(ConsumeInputBuffer, &state, mChunkSize, &n); *aSinkCondition = NS_SUCCEEDED(state.mSinkCondition) && n == 0 ? mSink->StreamStatus() : state.mSinkCondition; return n; } nsresult Cancel() override { return NS_OK; } }; class nsStreamCopierOB final : public nsAStreamCopier { public: nsStreamCopierOB() = default; virtual ~nsStreamCopierOB() = default; struct MOZ_STACK_CLASS WriteSegmentsState { // the nsIInputStream will outlive the WriteSegmentsState on the stack nsIInputStream* MOZ_NON_OWNING_REF mSource; nsresult mSourceCondition; }; static nsresult FillOutputBuffer(nsIOutputStream* aOutStr, void* aClosure, char* aBuffer, uint32_t aOffset, uint32_t aCount, uint32_t* aCountRead) { WriteSegmentsState* state = (WriteSegmentsState*)aClosure; nsresult rv = state->mSource->Read(aBuffer, aCount, aCountRead); if (NS_FAILED(rv)) { state->mSourceCondition = rv; } else if (*aCountRead == 0) { state->mSourceCondition = NS_BASE_STREAM_CLOSED; } return state->mSourceCondition; } uint32_t DoCopy(nsresult* aSourceCondition, nsresult* aSinkCondition) override { WriteSegmentsState state; state.mSource = mSource; state.mSourceCondition = NS_OK; uint32_t n; *aSinkCondition = mSink->WriteSegments(FillOutputBuffer, &state, mChunkSize, &n); *aSourceCondition = NS_SUCCEEDED(state.mSourceCondition) && n == 0 ? mSource->StreamStatus() : state.mSourceCondition; return n; } nsresult Cancel() override { return NS_OK; } }; //----------------------------------------------------------------------------- nsresult NS_AsyncCopy(nsIInputStream* aSource, nsIOutputStream* aSink, nsIEventTarget* aTarget, nsAsyncCopyMode aMode, uint32_t aChunkSize, nsAsyncCopyCallbackFun aCallback, void* aClosure, bool aCloseSource, bool aCloseSink, nsISupports** aCopierCtx, nsAsyncCopyProgressFun aProgressCallback) { NS_ASSERTION(aTarget, "non-null target required"); nsresult rv; nsAStreamCopier* copier; if (aMode == NS_ASYNCCOPY_VIA_READSEGMENTS) { copier = new nsStreamCopierIB(); } else { copier = new nsStreamCopierOB(); } // Start() takes an owning ref to the copier... NS_ADDREF(copier); rv = copier->Start(aSource, aSink, aTarget, aCallback, aClosure, aChunkSize, aCloseSource, aCloseSink, aProgressCallback); if (aCopierCtx) { *aCopierCtx = static_cast<nsISupports*>(static_cast<nsIRunnable*>(copier)); NS_ADDREF(*aCopierCtx); } NS_RELEASE(copier); return rv; } //----------------------------------------------------------------------------- nsresult NS_CancelAsyncCopy(nsISupports* aCopierCtx, nsresult aReason) { nsAStreamCopier* copier = static_cast<nsAStreamCopier*>(static_cast<nsIRunnable*>(aCopierCtx)); return copier->Cancel(aReason); } //----------------------------------------------------------------------------- namespace { template <typename T> struct ResultTraits {}; template <> struct ResultTraits<nsACString> { static void Clear(nsACString& aString) { aString.Truncate(); } static char* GetStorage(nsACString& aString) { return aString.BeginWriting(); } }; template <> struct ResultTraits<nsTArray<uint8_t>> { static void Clear(nsTArray<uint8_t>& aArray) { aArray.Clear(); } static char* GetStorage(nsTArray<uint8_t>& aArray) { return reinterpret_cast<char*>(aArray.Elements()); } }; } // namespace template <typename T> nsresult DoConsumeStream(nsIInputStream* aStream, uint32_t aMaxCount, T& aResult) { nsresult rv = NS_OK; ResultTraits<T>::Clear(aResult); while (aMaxCount) { uint64_t avail64; rv = aStream->Available(&avail64); if (NS_FAILED(rv)) { if (rv == NS_BASE_STREAM_CLOSED) { rv = NS_OK; } break; } if (avail64 == 0) { break; } uint32_t avail = (uint32_t)XPCOM_MIN<uint64_t>(avail64, aMaxCount); // resize aResult buffer uint32_t length = aResult.Length(); CheckedInt<uint32_t> newLength = CheckedInt<uint32_t>(length) + avail; if (!newLength.isValid()) { return NS_ERROR_FILE_TOO_BIG; } if (!aResult.SetLength(newLength.value(), fallible)) { return NS_ERROR_OUT_OF_MEMORY; } char* buf = ResultTraits<T>::GetStorage(aResult) + length; uint32_t n; rv = aStream->Read(buf, avail, &n); if (NS_FAILED(rv)) { break; } if (n != avail) { MOZ_ASSERT(n < avail, "What happened there???"); aResult.SetLength(length + n); } if (n == 0) { break; } aMaxCount -= n; } return rv; } nsresult NS_ConsumeStream(nsIInputStream* aStream, uint32_t aMaxCount, nsACString& aResult) { return DoConsumeStream(aStream, aMaxCount, aResult); } nsresult NS_ConsumeStream(nsIInputStream* aStream, uint32_t aMaxCount, nsTArray<uint8_t>& aResult) { return DoConsumeStream(aStream, aMaxCount, aResult); } //----------------------------------------------------------------------------- static nsresult TestInputStream(nsIInputStream* aInStr, void* aClosure, const char* aBuffer, uint32_t aOffset, uint32_t aCount, uint32_t* aCountWritten) { bool* result = static_cast<bool*>(aClosure); *result = true; *aCountWritten = 0; return NS_ERROR_ABORT; // don't call me anymore } bool NS_InputStreamIsBuffered(nsIInputStream* aStream) { nsCOMPtr<nsIBufferedInputStream> bufferedIn = do_QueryInterface(aStream); if (bufferedIn) { return true; } bool result = false; uint32_t n; nsresult rv = aStream->ReadSegments(TestInputStream, &result, 1, &n); return result || rv != NS_ERROR_NOT_IMPLEMENTED; } static nsresult TestOutputStream(nsIOutputStream* aOutStr, void* aClosure, char* aBuffer, uint32_t aOffset, uint32_t aCount, uint32_t* aCountRead) { bool* result = static_cast<bool*>(aClosure); *result = true; *aCountRead = 0; return NS_ERROR_ABORT; // don't call me anymore } bool NS_OutputStreamIsBuffered(nsIOutputStream* aStream) { nsCOMPtr<nsIBufferedOutputStream> bufferedOut = do_QueryInterface(aStream); if (bufferedOut) { return true; } bool result = false; uint32_t n; aStream->WriteSegments(TestOutputStream, &result, 1, &n); return result; } //----------------------------------------------------------------------------- nsresult NS_CopySegmentToStream(nsIInputStream* aInStr, void* aClosure, const char* aBuffer, uint32_t aOffset, uint32_t aCount, uint32_t* aCountWritten) { nsIOutputStream* outStr = static_cast<nsIOutputStream*>(aClosure); *aCountWritten = 0; while (aCount) { uint32_t n; nsresult rv = outStr->Write(aBuffer, aCount, &n); if (NS_FAILED(rv)) { return rv; } aBuffer += n; aCount -= n; *aCountWritten += n; } return NS_OK; } nsresult NS_CopySegmentToBuffer(nsIInputStream* aInStr, void* aClosure, const char* aBuffer, uint32_t aOffset, uint32_t aCount, uint32_t* aCountWritten) { char* toBuf = static_cast<char*>(aClosure); memcpy(&toBuf[aOffset], aBuffer, aCount); *aCountWritten = aCount; return NS_OK; } nsresult NS_CopyBufferToSegment(nsIOutputStream* aOutStr, void* aClosure, char* aBuffer, uint32_t aOffset, uint32_t aCount, uint32_t* aCountRead) { const char* fromBuf = static_cast<const char*>(aClosure); memcpy(aBuffer, &fromBuf[aOffset], aCount); *aCountRead = aCount; return NS_OK; } nsresult NS_CopyStreamToSegment(nsIOutputStream* aOutputStream, void* aClosure, char* aToSegment, uint32_t aFromOffset, uint32_t aCount, uint32_t* aReadCount) { nsIInputStream* fromStream = static_cast<nsIInputStream*>(aClosure); return fromStream->Read(aToSegment, aCount, aReadCount); } nsresult NS_DiscardSegment(nsIInputStream* aInStr, void* aClosure, const char* aBuffer, uint32_t aOffset, uint32_t aCount, uint32_t* aCountWritten) { *aCountWritten = aCount; return NS_OK; } //----------------------------------------------------------------------------- nsresult NS_WriteSegmentThunk(nsIInputStream* aInStr, void* aClosure, const char* aBuffer, uint32_t aOffset, uint32_t aCount, uint32_t* aCountWritten) { nsWriteSegmentThunk* thunk = static_cast<nsWriteSegmentThunk*>(aClosure); return thunk->mFun(thunk->mStream, thunk->mClosure, aBuffer, aOffset, aCount, aCountWritten); } nsresult NS_FillArray(FallibleTArray<char>& aDest, nsIInputStream* aInput, uint32_t aKeep, uint32_t* aNewBytes) { MOZ_ASSERT(aInput, "null stream"); MOZ_ASSERT(aKeep <= aDest.Length(), "illegal keep count"); char* aBuffer = aDest.Elements(); int64_t keepOffset = int64_t(aDest.Length()) - aKeep; if (aKeep != 0 && keepOffset > 0) { memmove(aBuffer, aBuffer + keepOffset, aKeep); } nsresult rv = aInput->Read(aBuffer + aKeep, aDest.Capacity() - aKeep, aNewBytes); if (NS_FAILED(rv)) { *aNewBytes = 0; } // NOTE: we rely on the fact that the new slots are NOT initialized by // SetLengthAndRetainStorage here, see nsTArrayElementTraits::Construct() // in nsTArray.h: aDest.SetLengthAndRetainStorage(aKeep + *aNewBytes); MOZ_ASSERT(aDest.Length() <= aDest.Capacity(), "buffer overflow"); return rv; } bool NS_InputStreamIsCloneable(nsIInputStream* aSource) { if (!aSource) { return false; } nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(aSource); return cloneable && cloneable->GetCloneable(); } nsresult NS_CloneInputStream(nsIInputStream* aSource, nsIInputStream** aCloneOut, nsIInputStream** aReplacementOut) { if (NS_WARN_IF(!aSource)) { return NS_ERROR_FAILURE; } // Attempt to perform the clone directly on the source stream nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(aSource); if (cloneable && cloneable->GetCloneable()) { if (aReplacementOut) { *aReplacementOut = nullptr; } return cloneable->Clone(aCloneOut); } // If we failed the clone and the caller does not want to replace their // original stream, then we are done. Return error. if (!aReplacementOut) { return NS_ERROR_FAILURE; } // The caller has opted-in to the fallback clone support that replaces // the original stream. Copy the data to a pipe and return two cloned // input streams. nsCOMPtr<nsIInputStream> reader; nsCOMPtr<nsIInputStream> readerClone; nsCOMPtr<nsIOutputStream> writer; NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer), 0, 0, // default segment size and max size true, true); // non-blocking // Propagate length information provided by nsIInputStreamLength. We don't use // InputStreamLengthHelper::GetSyncLength to avoid the risk of blocking when // called off-main-thread. int64_t length = -1; if (nsCOMPtr<nsIInputStreamLength> streamLength = do_QueryInterface(aSource); streamLength && NS_SUCCEEDED(streamLength->Length(&length)) && length != -1) { reader = new mozilla::InputStreamLengthWrapper(reader.forget(), length); } cloneable = do_QueryInterface(reader); MOZ_ASSERT(cloneable && cloneable->GetCloneable()); nsresult rv = cloneable->Clone(getter_AddRefs(readerClone)); if (NS_WARN_IF(NS_FAILED(rv))) { return rv; } nsCOMPtr<nsIEventTarget> target = do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv); if (NS_WARN_IF(NS_FAILED(rv))) { return rv; } rv = NS_AsyncCopy(aSource, writer, target, NS_ASYNCCOPY_VIA_WRITESEGMENTS); if (NS_WARN_IF(NS_FAILED(rv))) { return rv; } readerClone.forget(aCloneOut); reader.forget(aReplacementOut); return NS_OK; } nsresult NS_MakeAsyncNonBlockingInputStream( already_AddRefed<nsIInputStream> aSource, nsIAsyncInputStream** aAsyncInputStream, bool aCloseWhenDone, uint32_t aFlags, uint32_t aSegmentSize, uint32_t aSegmentCount) { nsCOMPtr<nsIInputStream> source = std::move(aSource); if (NS_WARN_IF(!aAsyncInputStream)) { return NS_ERROR_FAILURE; } bool nonBlocking = false; nsresult rv = source->IsNonBlocking(&nonBlocking); if (NS_WARN_IF(NS_FAILED(rv))) { return rv; } nsCOMPtr<nsIAsyncInputStream> asyncStream = do_QueryInterface(source); if (nonBlocking && asyncStream) { // This stream is perfect! asyncStream.forget(aAsyncInputStream); return NS_OK; } if (nonBlocking) { // If the stream is non-blocking but not async, we wrap it. return NonBlockingAsyncInputStream::Create(source.forget(), aAsyncInputStream); } nsCOMPtr<nsIStreamTransportService> sts = do_GetService(kStreamTransportServiceCID, &rv); if (NS_WARN_IF(NS_FAILED(rv))) { return rv; } nsCOMPtr<nsITransport> transport; rv = sts->CreateInputTransport(source, aCloseWhenDone, getter_AddRefs(transport)); if (NS_WARN_IF(NS_FAILED(rv))) { return rv; } nsCOMPtr<nsIInputStream> wrapper; rv = transport->OpenInputStream(aFlags, aSegmentSize, aSegmentCount, getter_AddRefs(wrapper)); if (NS_WARN_IF(NS_FAILED(rv))) { return rv; } asyncStream = do_QueryInterface(wrapper); MOZ_ASSERT(asyncStream); asyncStream.forget(aAsyncInputStream); return NS_OK; }