diff options
Diffstat (limited to 'ipc/glue/IPCStreamDestination.cpp')
-rw-r--r-- | ipc/glue/IPCStreamDestination.cpp | 400 |
1 files changed, 400 insertions, 0 deletions
diff --git a/ipc/glue/IPCStreamDestination.cpp b/ipc/glue/IPCStreamDestination.cpp new file mode 100644 index 0000000000..d7071391e6 --- /dev/null +++ b/ipc/glue/IPCStreamDestination.cpp @@ -0,0 +1,400 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* vim: set ts=8 sts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "IPCStreamDestination.h" +#include "mozilla/InputStreamLengthWrapper.h" +#include "mozilla/Mutex.h" +#include "nsIAsyncInputStream.h" +#include "nsIAsyncOutputStream.h" +#include "nsIBufferedStreams.h" +#include "nsICloneableInputStream.h" +#include "nsIPipe.h" +#include "nsThreadUtils.h" +#include "mozilla/webrender/WebRenderTypes.h" + +namespace mozilla { +namespace ipc { + +// ---------------------------------------------------------------------------- +// IPCStreamDestination::DelayedStartInputStream +// +// When AutoIPCStream is used with delayedStart, we need to ask for data at the +// first real use of the nsIInputStream. In order to do so, we wrap the +// nsIInputStream, created by the nsIPipe, with this wrapper. + +class IPCStreamDestination::DelayedStartInputStream final + : public nsIAsyncInputStream, + public nsIInputStreamCallback, + public nsISearchableInputStream, + public nsICloneableInputStream, + public nsIBufferedInputStream { + public: + NS_DECL_THREADSAFE_ISUPPORTS + + DelayedStartInputStream(IPCStreamDestination* aDestination, + nsCOMPtr<nsIAsyncInputStream>&& aStream) + : mDestination(aDestination), + mStream(std::move(aStream)), + mMutex("IPCStreamDestination::DelayedStartInputStream::mMutex") { + MOZ_ASSERT(mDestination); + MOZ_ASSERT(mStream); + } + + void DestinationShutdown() { + MutexAutoLock lock(mMutex); + mDestination = nullptr; + } + + // nsIInputStream interface + + NS_IMETHOD + Close() override { + MaybeCloseDestination(); + return mStream->Close(); + } + + NS_IMETHOD + Available(uint64_t* aLength) override { + MaybeStartReading(); + return mStream->Available(aLength); + } + + NS_IMETHOD + Read(char* aBuffer, uint32_t aCount, uint32_t* aReadCount) override { + MaybeStartReading(); + return mStream->Read(aBuffer, aCount, aReadCount); + } + + NS_IMETHOD + ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, uint32_t aCount, + uint32_t* aResult) override { + MaybeStartReading(); + return mStream->ReadSegments(aWriter, aClosure, aCount, aResult); + } + + NS_IMETHOD + IsNonBlocking(bool* aNonBlocking) override { + MaybeStartReading(); + return mStream->IsNonBlocking(aNonBlocking); + } + + // nsIAsyncInputStream interface + + NS_IMETHOD + CloseWithStatus(nsresult aReason) override { + MaybeCloseDestination(); + return mStream->CloseWithStatus(aReason); + } + + NS_IMETHOD + AsyncWait(nsIInputStreamCallback* aCallback, uint32_t aFlags, + uint32_t aRequestedCount, nsIEventTarget* aTarget) override { + { + MutexAutoLock lock(mMutex); + if (mAsyncWaitCallback && aCallback) { + return NS_ERROR_FAILURE; + } + + mAsyncWaitCallback = aCallback; + + MaybeStartReading(lock); + } + + nsCOMPtr<nsIInputStreamCallback> callback = aCallback ? this : nullptr; + return mStream->AsyncWait(callback, aFlags, aRequestedCount, aTarget); + } + + NS_IMETHOD + Search(const char* aForString, bool aIgnoreCase, bool* aFound, + uint32_t* aOffsetSearchedTo) override { + MaybeStartReading(); + nsCOMPtr<nsISearchableInputStream> searchable = do_QueryInterface(mStream); + MOZ_ASSERT(searchable); + return searchable->Search(aForString, aIgnoreCase, aFound, + aOffsetSearchedTo); + } + + // nsICloneableInputStream interface + + NS_IMETHOD + GetCloneable(bool* aCloneable) override { + MaybeStartReading(); + nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(mStream); + MOZ_ASSERT(cloneable); + return cloneable->GetCloneable(aCloneable); + } + + NS_IMETHOD + Clone(nsIInputStream** aResult) override { + MaybeStartReading(); + nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(mStream); + MOZ_ASSERT(cloneable); + return cloneable->Clone(aResult); + } + + // nsIBufferedInputStream + + NS_IMETHOD + Init(nsIInputStream* aStream, uint32_t aBufferSize) override { + MaybeStartReading(); + nsCOMPtr<nsIBufferedInputStream> stream = do_QueryInterface(mStream); + MOZ_ASSERT(stream); + return stream->Init(aStream, aBufferSize); + } + + NS_IMETHODIMP + GetData(nsIInputStream** aResult) override { + return NS_ERROR_NOT_IMPLEMENTED; + } + + // nsIInputStreamCallback + + NS_IMETHOD + OnInputStreamReady(nsIAsyncInputStream* aStream) override { + nsCOMPtr<nsIInputStreamCallback> callback; + + { + MutexAutoLock lock(mMutex); + + // We have been canceled in the meanwhile. + if (!mAsyncWaitCallback) { + return NS_OK; + } + + callback.swap(mAsyncWaitCallback); + } + + callback->OnInputStreamReady(this); + return NS_OK; + } + + void MaybeStartReading(); + void MaybeStartReading(const MutexAutoLock& aProofOfLook); + + void MaybeCloseDestination(); + + private: + ~DelayedStartInputStream() = default; + + IPCStreamDestination* mDestination; + nsCOMPtr<nsIAsyncInputStream> mStream; + + nsCOMPtr<nsIInputStreamCallback> mAsyncWaitCallback; + + // This protects mDestination: any method can be called by any thread. + Mutex mMutex; + + class HelperRunnable; +}; + +class IPCStreamDestination::DelayedStartInputStream::HelperRunnable final + : public Runnable { + public: + enum Op { + eStartReading, + eCloseDestination, + }; + + HelperRunnable( + IPCStreamDestination::DelayedStartInputStream* aDelayedStartInputStream, + Op aOp) + : Runnable( + "ipc::IPCStreamDestination::DelayedStartInputStream::" + "HelperRunnable"), + mDelayedStartInputStream(aDelayedStartInputStream), + mOp(aOp) { + MOZ_ASSERT(aDelayedStartInputStream); + } + + NS_IMETHOD + Run() override { + switch (mOp) { + case eStartReading: + mDelayedStartInputStream->MaybeStartReading(); + break; + case eCloseDestination: + mDelayedStartInputStream->MaybeCloseDestination(); + break; + } + + return NS_OK; + } + + private: + RefPtr<IPCStreamDestination::DelayedStartInputStream> + mDelayedStartInputStream; + Op mOp; +}; + +void IPCStreamDestination::DelayedStartInputStream::MaybeStartReading() { + MutexAutoLock lock(mMutex); + MaybeStartReading(lock); +} + +void IPCStreamDestination::DelayedStartInputStream::MaybeStartReading( + const MutexAutoLock& aProofOfLook) { + if (!mDestination) { + return; + } + + if (mDestination->IsOnOwningThread()) { + mDestination->StartReading(); + mDestination = nullptr; + return; + } + + RefPtr<Runnable> runnable = + new HelperRunnable(this, HelperRunnable::eStartReading); + mDestination->DispatchRunnable(runnable.forget()); +} + +void IPCStreamDestination::DelayedStartInputStream::MaybeCloseDestination() { + MutexAutoLock lock(mMutex); + if (!mDestination) { + return; + } + + if (mDestination->IsOnOwningThread()) { + mDestination->RequestClose(NS_ERROR_ABORT); + mDestination = nullptr; + return; + } + + RefPtr<Runnable> runnable = + new HelperRunnable(this, HelperRunnable::eCloseDestination); + mDestination->DispatchRunnable(runnable.forget()); +} + +NS_IMPL_ADDREF(IPCStreamDestination::DelayedStartInputStream); +NS_IMPL_RELEASE(IPCStreamDestination::DelayedStartInputStream); + +NS_INTERFACE_MAP_BEGIN(IPCStreamDestination::DelayedStartInputStream) + NS_INTERFACE_MAP_ENTRY(nsIAsyncInputStream) + NS_INTERFACE_MAP_ENTRY(nsIInputStreamCallback) + NS_INTERFACE_MAP_ENTRY(nsISearchableInputStream) + NS_INTERFACE_MAP_ENTRY(nsICloneableInputStream) + NS_INTERFACE_MAP_ENTRY(nsIBufferedInputStream) + NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsIInputStream, nsIAsyncInputStream) + NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIAsyncInputStream) +NS_INTERFACE_MAP_END + +// ---------------------------------------------------------------------------- +// IPCStreamDestination + +IPCStreamDestination::IPCStreamDestination() + : mOwningThread(NS_GetCurrentThread()), + mDelayedStart(false) +#ifdef MOZ_DEBUG + , + mLengthSet(false) +#endif +{ +} + +IPCStreamDestination::~IPCStreamDestination() = default; + +nsresult IPCStreamDestination::Initialize() { + MOZ_ASSERT(!mReader); + MOZ_ASSERT(!mWriter); + + // use async versions for both reader and writer even though we are + // opening the writer as an infinite stream. We want to be able to + // use CloseWithStatus() to communicate errors through the pipe. + + // Use an "infinite" pipe because we cannot apply back-pressure through + // the async IPC layer at the moment. Blocking the IPC worker thread + // is not desirable, either. + nsresult rv = NS_NewPipe2(getter_AddRefs(mReader), getter_AddRefs(mWriter), + true, true, // non-blocking + 0, // segment size + UINT32_MAX); // "infinite" pipe + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + return NS_OK; +} + +void IPCStreamDestination::SetDelayedStart(bool aDelayedStart) { + mDelayedStart = aDelayedStart; +} + +void IPCStreamDestination::SetLength(int64_t aLength) { + MOZ_ASSERT(mReader); + MOZ_ASSERT(!mLengthSet); + +#ifdef DEBUG + mLengthSet = true; +#endif + + if (aLength != -1) { + nsCOMPtr<nsIInputStream> finalStream; + finalStream = new InputStreamLengthWrapper(mReader.forget(), aLength); + mReader = do_QueryInterface(finalStream); + MOZ_ASSERT(mReader); + } +} + +already_AddRefed<nsIInputStream> IPCStreamDestination::TakeReader() { + MOZ_ASSERT(mReader); + MOZ_ASSERT(!mDelayedStartInputStream); + + if (mDelayedStart) { + mDelayedStartInputStream = + new DelayedStartInputStream(this, std::move(mReader)); + RefPtr<nsIAsyncInputStream> inputStream = mDelayedStartInputStream; + return inputStream.forget(); + } + + return mReader.forget(); +} + +bool IPCStreamDestination::IsOnOwningThread() const { + return mOwningThread == NS_GetCurrentThread(); +} + +void IPCStreamDestination::DispatchRunnable( + already_AddRefed<nsIRunnable>&& aRunnable) { + nsCOMPtr<nsIRunnable> runnable = aRunnable; + mOwningThread->Dispatch(runnable.forget(), NS_DISPATCH_NORMAL); +} + +void IPCStreamDestination::ActorDestroyed() { + MOZ_ASSERT(mWriter); + + // If we were gracefully closed we should have gotten RecvClose(). In + // that case, the writer will already be closed and this will have no + // effect. This just aborts the writer in the case where the child process + // crashes. + mWriter->CloseWithStatus(NS_ERROR_ABORT); + + if (mDelayedStartInputStream) { + mDelayedStartInputStream->DestinationShutdown(); + mDelayedStartInputStream = nullptr; + } +} + +void IPCStreamDestination::BufferReceived(const wr::ByteBuffer& aBuffer) { + MOZ_ASSERT(mWriter); + + uint32_t numWritten = 0; + + // This should only fail if we hit an OOM condition. + nsresult rv = mWriter->Write(reinterpret_cast<char*>(aBuffer.mData), + aBuffer.mLength, &numWritten); + if (NS_WARN_IF(NS_FAILED(rv))) { + RequestClose(rv); + } +} + +void IPCStreamDestination::CloseReceived(nsresult aRv) { + MOZ_ASSERT(mWriter); + mWriter->CloseWithStatus(aRv); + TerminateDestination(); +} + +} // namespace ipc +} // namespace mozilla |