/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ #include "NonBlockingAsyncInputStream.h" #include "mozilla/ipc/InputStreamUtils.h" #include "nsIAsyncInputStream.h" #include "nsICloneableInputStream.h" #include "nsIInputStream.h" #include "nsIIPCSerializableInputStream.h" #include "nsISeekableStream.h" #include "nsStreamUtils.h" namespace mozilla { using namespace ipc; class NonBlockingAsyncInputStream::AsyncWaitRunnable final : public CancelableRunnable { RefPtr mStream; nsCOMPtr mCallback; public: AsyncWaitRunnable(NonBlockingAsyncInputStream* aStream, nsIInputStreamCallback* aCallback) : CancelableRunnable("AsyncWaitRunnable"), mStream(aStream), mCallback(aCallback) {} NS_IMETHOD Run() override { mStream->RunAsyncWaitCallback(this, mCallback.forget()); return NS_OK; } nsresult Cancel() override { mStream = nullptr; return NS_OK; } }; NS_IMPL_ADDREF(NonBlockingAsyncInputStream); NS_IMPL_RELEASE(NonBlockingAsyncInputStream); NonBlockingAsyncInputStream::WaitClosureOnly::WaitClosureOnly( AsyncWaitRunnable* aRunnable, nsIEventTarget* aEventTarget) : mRunnable(aRunnable), mEventTarget(aEventTarget) {} NS_INTERFACE_MAP_BEGIN(NonBlockingAsyncInputStream) NS_INTERFACE_MAP_ENTRY(nsIInputStream) NS_INTERFACE_MAP_ENTRY(nsIAsyncInputStream) NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsICloneableInputStream, mWeakCloneableInputStream) NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIIPCSerializableInputStream, mWeakIPCSerializableInputStream) NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsISeekableStream, mWeakSeekableInputStream) NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsITellableStream, mWeakTellableInputStream) NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIInputStream) NS_INTERFACE_MAP_END /* static */ nsresult NonBlockingAsyncInputStream::Create( already_AddRefed aInputStream, nsIAsyncInputStream** aResult) { MOZ_DIAGNOSTIC_ASSERT(aResult); nsCOMPtr inputStream = std::move(aInputStream); bool nonBlocking = false; nsresult rv = inputStream->IsNonBlocking(&nonBlocking); if (NS_WARN_IF(NS_FAILED(rv))) { return rv; } MOZ_DIAGNOSTIC_ASSERT(nonBlocking); #ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED nsCOMPtr asyncInputStream = do_QueryInterface(inputStream); MOZ_DIAGNOSTIC_ASSERT(!asyncInputStream); #endif // MOZ_DIAGNOSTIC_ASSERT_ENABLED RefPtr stream = new NonBlockingAsyncInputStream(inputStream.forget()); stream.forget(aResult); return NS_OK; } NonBlockingAsyncInputStream::NonBlockingAsyncInputStream( already_AddRefed aInputStream) : mInputStream(std::move(aInputStream)), mWeakCloneableInputStream(nullptr), mWeakIPCSerializableInputStream(nullptr), mWeakSeekableInputStream(nullptr), mWeakTellableInputStream(nullptr), mLock("NonBlockingAsyncInputStream::mLock"), mClosed(false) { MOZ_ASSERT(mInputStream); nsCOMPtr cloneableStream = do_QueryInterface(mInputStream); if (cloneableStream && SameCOMIdentity(mInputStream, cloneableStream)) { mWeakCloneableInputStream = cloneableStream; } nsCOMPtr serializableStream = do_QueryInterface(mInputStream); if (serializableStream && SameCOMIdentity(mInputStream, serializableStream)) { mWeakIPCSerializableInputStream = serializableStream; } nsCOMPtr seekableStream = do_QueryInterface(mInputStream); if (seekableStream && SameCOMIdentity(mInputStream, seekableStream)) { mWeakSeekableInputStream = seekableStream; } nsCOMPtr tellableStream = do_QueryInterface(mInputStream); if (tellableStream && SameCOMIdentity(mInputStream, tellableStream)) { mWeakTellableInputStream = tellableStream; } } NonBlockingAsyncInputStream::~NonBlockingAsyncInputStream() = default; NS_IMETHODIMP NonBlockingAsyncInputStream::Close() { RefPtr waitClosureOnlyRunnable; nsCOMPtr waitClosureOnlyEventTarget; { MutexAutoLock lock(mLock); if (mClosed) { // Here we could return NS_BASE_STREAM_CLOSED as well, but just to avoid // warning messages, let's make everybody happy with a NS_OK. return NS_OK; } mClosed = true; NS_ENSURE_STATE(mInputStream); nsresult rv = mInputStream->Close(); if (NS_WARN_IF(NS_FAILED(rv))) { mWaitClosureOnly.reset(); return rv; } // If we have a WaitClosureOnly runnable, it's time to use it. if (mWaitClosureOnly.isSome()) { waitClosureOnlyRunnable = std::move(mWaitClosureOnly->mRunnable); waitClosureOnlyEventTarget = std::move(mWaitClosureOnly->mEventTarget); mWaitClosureOnly.reset(); // Now we want to dispatch the asyncWaitCallback. mAsyncWaitCallback = waitClosureOnlyRunnable; } } if (waitClosureOnlyRunnable) { if (waitClosureOnlyEventTarget) { waitClosureOnlyEventTarget->Dispatch(waitClosureOnlyRunnable, NS_DISPATCH_NORMAL); } else { waitClosureOnlyRunnable->Run(); } } return NS_OK; } // nsIInputStream interface NS_IMETHODIMP NonBlockingAsyncInputStream::Available(uint64_t* aLength) { nsresult rv = mInputStream->Available(aLength); // Don't issue warnings for legal condition NS_BASE_STREAM_CLOSED. if (rv == NS_BASE_STREAM_CLOSED || NS_WARN_IF(NS_FAILED(rv))) { return rv; } // Nothing more to read. Let's close the stream now. if (*aLength == 0) { MutexAutoLock lock(mLock); mInputStream->Close(); mClosed = true; return NS_BASE_STREAM_CLOSED; } return NS_OK; } NS_IMETHODIMP NonBlockingAsyncInputStream::StreamStatus() { return mInputStream->StreamStatus(); } NS_IMETHODIMP NonBlockingAsyncInputStream::Read(char* aBuffer, uint32_t aCount, uint32_t* aReadCount) { return mInputStream->Read(aBuffer, aCount, aReadCount); } namespace { class MOZ_RAII ReadSegmentsData { public: ReadSegmentsData(NonBlockingAsyncInputStream* aStream, nsWriteSegmentFun aFunc, void* aClosure) : mStream(aStream), mFunc(aFunc), mClosure(aClosure) {} NonBlockingAsyncInputStream* mStream; nsWriteSegmentFun mFunc; void* mClosure; }; nsresult ReadSegmentsWriter(nsIInputStream* aInStream, void* aClosure, const char* aFromSegment, uint32_t aToOffset, uint32_t aCount, uint32_t* aWriteCount) { ReadSegmentsData* data = static_cast(aClosure); return data->mFunc(data->mStream, data->mClosure, aFromSegment, aToOffset, aCount, aWriteCount); } } // namespace NS_IMETHODIMP NonBlockingAsyncInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, uint32_t aCount, uint32_t* aResult) { ReadSegmentsData data(this, aWriter, aClosure); return mInputStream->ReadSegments(ReadSegmentsWriter, &data, aCount, aResult); } NS_IMETHODIMP NonBlockingAsyncInputStream::IsNonBlocking(bool* aNonBlocking) { *aNonBlocking = true; return NS_OK; } // nsICloneableInputStream interface NS_IMETHODIMP NonBlockingAsyncInputStream::GetCloneable(bool* aCloneable) { NS_ENSURE_STATE(mWeakCloneableInputStream); return mWeakCloneableInputStream->GetCloneable(aCloneable); } NS_IMETHODIMP NonBlockingAsyncInputStream::Clone(nsIInputStream** aResult) { NS_ENSURE_STATE(mWeakCloneableInputStream); nsCOMPtr clonedStream; nsresult rv = mWeakCloneableInputStream->Clone(getter_AddRefs(clonedStream)); if (NS_WARN_IF(NS_FAILED(rv))) { return rv; } nsCOMPtr asyncStream; rv = Create(clonedStream.forget(), getter_AddRefs(asyncStream)); if (NS_WARN_IF(NS_FAILED(rv))) { return rv; } asyncStream.forget(aResult); return NS_OK; } // nsIAsyncInputStream interface NS_IMETHODIMP NonBlockingAsyncInputStream::CloseWithStatus(nsresult aStatus) { return Close(); } NS_IMETHODIMP NonBlockingAsyncInputStream::AsyncWait(nsIInputStreamCallback* aCallback, uint32_t aFlags, uint32_t aRequestedCount, nsIEventTarget* aEventTarget) { RefPtr runnable; { MutexAutoLock lock(mLock); mWaitClosureOnly.reset(); mAsyncWaitCallback = nullptr; if (!aCallback) { // Canceling previous callbacks, which is done above. return NS_OK; } // Maybe the stream is already closed. if (!mClosed) { uint64_t length; nsresult rv = mInputStream->Available(&length); if (NS_SUCCEEDED(rv) && length == 0) { mInputStream->Close(); mClosed = true; } } runnable = new AsyncWaitRunnable(this, aCallback); if ((aFlags & nsIAsyncInputStream::WAIT_CLOSURE_ONLY) && !mClosed) { mWaitClosureOnly.emplace(runnable, aEventTarget); return NS_OK; } mAsyncWaitCallback = runnable; } MOZ_ASSERT(runnable); if (aEventTarget) { return aEventTarget->Dispatch(runnable.forget()); } return runnable->Run(); } // nsIIPCSerializableInputStream void NonBlockingAsyncInputStream::SerializedComplexity( uint32_t aMaxSize, uint32_t* aSizeUsed, uint32_t* aPipes, uint32_t* aTransferables) { InputStreamHelper::SerializedComplexity(mInputStream, aMaxSize, aSizeUsed, aPipes, aTransferables); } void NonBlockingAsyncInputStream::Serialize( mozilla::ipc::InputStreamParams& aParams, uint32_t aMaxSize, uint32_t* aSizeUsed) { MOZ_ASSERT(mWeakIPCSerializableInputStream); InputStreamHelper::SerializeInputStream(mInputStream, aParams, aMaxSize, aSizeUsed); } bool NonBlockingAsyncInputStream::Deserialize( const mozilla::ipc::InputStreamParams& aParams) { MOZ_CRASH("NonBlockingAsyncInputStream cannot be deserialized!"); return true; } // nsISeekableStream NS_IMETHODIMP NonBlockingAsyncInputStream::Seek(int32_t aWhence, int64_t aOffset) { NS_ENSURE_STATE(mWeakSeekableInputStream); return mWeakSeekableInputStream->Seek(aWhence, aOffset); } NS_IMETHODIMP NonBlockingAsyncInputStream::SetEOF() { NS_ENSURE_STATE(mWeakSeekableInputStream); return NS_ERROR_NOT_IMPLEMENTED; } // nsITellableStream NS_IMETHODIMP NonBlockingAsyncInputStream::Tell(int64_t* aResult) { NS_ENSURE_STATE(mWeakTellableInputStream); return mWeakTellableInputStream->Tell(aResult); } void NonBlockingAsyncInputStream::RunAsyncWaitCallback( NonBlockingAsyncInputStream::AsyncWaitRunnable* aRunnable, already_AddRefed aCallback) { nsCOMPtr callback = std::move(aCallback); { MutexAutoLock lock(mLock); if (mAsyncWaitCallback != aRunnable) { // The callback has been canceled in the meantime. return; } mAsyncWaitCallback = nullptr; } callback->OnInputStreamReady(this); } } // namespace mozilla