summaryrefslogtreecommitdiffstats
path: root/ipc/glue/IPCStreamDestination.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ipc/glue/IPCStreamDestination.cpp')
-rw-r--r--ipc/glue/IPCStreamDestination.cpp400
1 files changed, 400 insertions, 0 deletions
diff --git a/ipc/glue/IPCStreamDestination.cpp b/ipc/glue/IPCStreamDestination.cpp
new file mode 100644
index 0000000000..d7071391e6
--- /dev/null
+++ b/ipc/glue/IPCStreamDestination.cpp
@@ -0,0 +1,400 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: set ts=8 sts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#include "IPCStreamDestination.h"
+#include "mozilla/InputStreamLengthWrapper.h"
+#include "mozilla/Mutex.h"
+#include "nsIAsyncInputStream.h"
+#include "nsIAsyncOutputStream.h"
+#include "nsIBufferedStreams.h"
+#include "nsICloneableInputStream.h"
+#include "nsIPipe.h"
+#include "nsThreadUtils.h"
+#include "mozilla/webrender/WebRenderTypes.h"
+
+namespace mozilla {
+namespace ipc {
+
+// ----------------------------------------------------------------------------
+// IPCStreamDestination::DelayedStartInputStream
+//
+// When AutoIPCStream is used with delayedStart, we need to ask for data at the
+// first real use of the nsIInputStream. In order to do so, we wrap the
+// nsIInputStream, created by the nsIPipe, with this wrapper.
+
+class IPCStreamDestination::DelayedStartInputStream final
+ : public nsIAsyncInputStream,
+ public nsIInputStreamCallback,
+ public nsISearchableInputStream,
+ public nsICloneableInputStream,
+ public nsIBufferedInputStream {
+ public:
+ NS_DECL_THREADSAFE_ISUPPORTS
+
+ DelayedStartInputStream(IPCStreamDestination* aDestination,
+ nsCOMPtr<nsIAsyncInputStream>&& aStream)
+ : mDestination(aDestination),
+ mStream(std::move(aStream)),
+ mMutex("IPCStreamDestination::DelayedStartInputStream::mMutex") {
+ MOZ_ASSERT(mDestination);
+ MOZ_ASSERT(mStream);
+ }
+
+ void DestinationShutdown() {
+ MutexAutoLock lock(mMutex);
+ mDestination = nullptr;
+ }
+
+ // nsIInputStream interface
+
+ NS_IMETHOD
+ Close() override {
+ MaybeCloseDestination();
+ return mStream->Close();
+ }
+
+ NS_IMETHOD
+ Available(uint64_t* aLength) override {
+ MaybeStartReading();
+ return mStream->Available(aLength);
+ }
+
+ NS_IMETHOD
+ Read(char* aBuffer, uint32_t aCount, uint32_t* aReadCount) override {
+ MaybeStartReading();
+ return mStream->Read(aBuffer, aCount, aReadCount);
+ }
+
+ NS_IMETHOD
+ ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, uint32_t aCount,
+ uint32_t* aResult) override {
+ MaybeStartReading();
+ return mStream->ReadSegments(aWriter, aClosure, aCount, aResult);
+ }
+
+ NS_IMETHOD
+ IsNonBlocking(bool* aNonBlocking) override {
+ MaybeStartReading();
+ return mStream->IsNonBlocking(aNonBlocking);
+ }
+
+ // nsIAsyncInputStream interface
+
+ NS_IMETHOD
+ CloseWithStatus(nsresult aReason) override {
+ MaybeCloseDestination();
+ return mStream->CloseWithStatus(aReason);
+ }
+
+ NS_IMETHOD
+ AsyncWait(nsIInputStreamCallback* aCallback, uint32_t aFlags,
+ uint32_t aRequestedCount, nsIEventTarget* aTarget) override {
+ {
+ MutexAutoLock lock(mMutex);
+ if (mAsyncWaitCallback && aCallback) {
+ return NS_ERROR_FAILURE;
+ }
+
+ mAsyncWaitCallback = aCallback;
+
+ MaybeStartReading(lock);
+ }
+
+ nsCOMPtr<nsIInputStreamCallback> callback = aCallback ? this : nullptr;
+ return mStream->AsyncWait(callback, aFlags, aRequestedCount, aTarget);
+ }
+
+ NS_IMETHOD
+ Search(const char* aForString, bool aIgnoreCase, bool* aFound,
+ uint32_t* aOffsetSearchedTo) override {
+ MaybeStartReading();
+ nsCOMPtr<nsISearchableInputStream> searchable = do_QueryInterface(mStream);
+ MOZ_ASSERT(searchable);
+ return searchable->Search(aForString, aIgnoreCase, aFound,
+ aOffsetSearchedTo);
+ }
+
+ // nsICloneableInputStream interface
+
+ NS_IMETHOD
+ GetCloneable(bool* aCloneable) override {
+ MaybeStartReading();
+ nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(mStream);
+ MOZ_ASSERT(cloneable);
+ return cloneable->GetCloneable(aCloneable);
+ }
+
+ NS_IMETHOD
+ Clone(nsIInputStream** aResult) override {
+ MaybeStartReading();
+ nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(mStream);
+ MOZ_ASSERT(cloneable);
+ return cloneable->Clone(aResult);
+ }
+
+ // nsIBufferedInputStream
+
+ NS_IMETHOD
+ Init(nsIInputStream* aStream, uint32_t aBufferSize) override {
+ MaybeStartReading();
+ nsCOMPtr<nsIBufferedInputStream> stream = do_QueryInterface(mStream);
+ MOZ_ASSERT(stream);
+ return stream->Init(aStream, aBufferSize);
+ }
+
+ NS_IMETHODIMP
+ GetData(nsIInputStream** aResult) override {
+ return NS_ERROR_NOT_IMPLEMENTED;
+ }
+
+ // nsIInputStreamCallback
+
+ NS_IMETHOD
+ OnInputStreamReady(nsIAsyncInputStream* aStream) override {
+ nsCOMPtr<nsIInputStreamCallback> callback;
+
+ {
+ MutexAutoLock lock(mMutex);
+
+ // We have been canceled in the meanwhile.
+ if (!mAsyncWaitCallback) {
+ return NS_OK;
+ }
+
+ callback.swap(mAsyncWaitCallback);
+ }
+
+ callback->OnInputStreamReady(this);
+ return NS_OK;
+ }
+
+ void MaybeStartReading();
+ void MaybeStartReading(const MutexAutoLock& aProofOfLook);
+
+ void MaybeCloseDestination();
+
+ private:
+ ~DelayedStartInputStream() = default;
+
+ IPCStreamDestination* mDestination;
+ nsCOMPtr<nsIAsyncInputStream> mStream;
+
+ nsCOMPtr<nsIInputStreamCallback> mAsyncWaitCallback;
+
+ // This protects mDestination: any method can be called by any thread.
+ Mutex mMutex;
+
+ class HelperRunnable;
+};
+
+class IPCStreamDestination::DelayedStartInputStream::HelperRunnable final
+ : public Runnable {
+ public:
+ enum Op {
+ eStartReading,
+ eCloseDestination,
+ };
+
+ HelperRunnable(
+ IPCStreamDestination::DelayedStartInputStream* aDelayedStartInputStream,
+ Op aOp)
+ : Runnable(
+ "ipc::IPCStreamDestination::DelayedStartInputStream::"
+ "HelperRunnable"),
+ mDelayedStartInputStream(aDelayedStartInputStream),
+ mOp(aOp) {
+ MOZ_ASSERT(aDelayedStartInputStream);
+ }
+
+ NS_IMETHOD
+ Run() override {
+ switch (mOp) {
+ case eStartReading:
+ mDelayedStartInputStream->MaybeStartReading();
+ break;
+ case eCloseDestination:
+ mDelayedStartInputStream->MaybeCloseDestination();
+ break;
+ }
+
+ return NS_OK;
+ }
+
+ private:
+ RefPtr<IPCStreamDestination::DelayedStartInputStream>
+ mDelayedStartInputStream;
+ Op mOp;
+};
+
+void IPCStreamDestination::DelayedStartInputStream::MaybeStartReading() {
+ MutexAutoLock lock(mMutex);
+ MaybeStartReading(lock);
+}
+
+void IPCStreamDestination::DelayedStartInputStream::MaybeStartReading(
+ const MutexAutoLock& aProofOfLook) {
+ if (!mDestination) {
+ return;
+ }
+
+ if (mDestination->IsOnOwningThread()) {
+ mDestination->StartReading();
+ mDestination = nullptr;
+ return;
+ }
+
+ RefPtr<Runnable> runnable =
+ new HelperRunnable(this, HelperRunnable::eStartReading);
+ mDestination->DispatchRunnable(runnable.forget());
+}
+
+void IPCStreamDestination::DelayedStartInputStream::MaybeCloseDestination() {
+ MutexAutoLock lock(mMutex);
+ if (!mDestination) {
+ return;
+ }
+
+ if (mDestination->IsOnOwningThread()) {
+ mDestination->RequestClose(NS_ERROR_ABORT);
+ mDestination = nullptr;
+ return;
+ }
+
+ RefPtr<Runnable> runnable =
+ new HelperRunnable(this, HelperRunnable::eCloseDestination);
+ mDestination->DispatchRunnable(runnable.forget());
+}
+
+NS_IMPL_ADDREF(IPCStreamDestination::DelayedStartInputStream);
+NS_IMPL_RELEASE(IPCStreamDestination::DelayedStartInputStream);
+
+NS_INTERFACE_MAP_BEGIN(IPCStreamDestination::DelayedStartInputStream)
+ NS_INTERFACE_MAP_ENTRY(nsIAsyncInputStream)
+ NS_INTERFACE_MAP_ENTRY(nsIInputStreamCallback)
+ NS_INTERFACE_MAP_ENTRY(nsISearchableInputStream)
+ NS_INTERFACE_MAP_ENTRY(nsICloneableInputStream)
+ NS_INTERFACE_MAP_ENTRY(nsIBufferedInputStream)
+ NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsIInputStream, nsIAsyncInputStream)
+ NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIAsyncInputStream)
+NS_INTERFACE_MAP_END
+
+// ----------------------------------------------------------------------------
+// IPCStreamDestination
+
+IPCStreamDestination::IPCStreamDestination()
+ : mOwningThread(NS_GetCurrentThread()),
+ mDelayedStart(false)
+#ifdef MOZ_DEBUG
+ ,
+ mLengthSet(false)
+#endif
+{
+}
+
+IPCStreamDestination::~IPCStreamDestination() = default;
+
+nsresult IPCStreamDestination::Initialize() {
+ MOZ_ASSERT(!mReader);
+ MOZ_ASSERT(!mWriter);
+
+ // use async versions for both reader and writer even though we are
+ // opening the writer as an infinite stream. We want to be able to
+ // use CloseWithStatus() to communicate errors through the pipe.
+
+ // Use an "infinite" pipe because we cannot apply back-pressure through
+ // the async IPC layer at the moment. Blocking the IPC worker thread
+ // is not desirable, either.
+ nsresult rv = NS_NewPipe2(getter_AddRefs(mReader), getter_AddRefs(mWriter),
+ true, true, // non-blocking
+ 0, // segment size
+ UINT32_MAX); // "infinite" pipe
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ return NS_OK;
+}
+
+void IPCStreamDestination::SetDelayedStart(bool aDelayedStart) {
+ mDelayedStart = aDelayedStart;
+}
+
+void IPCStreamDestination::SetLength(int64_t aLength) {
+ MOZ_ASSERT(mReader);
+ MOZ_ASSERT(!mLengthSet);
+
+#ifdef DEBUG
+ mLengthSet = true;
+#endif
+
+ if (aLength != -1) {
+ nsCOMPtr<nsIInputStream> finalStream;
+ finalStream = new InputStreamLengthWrapper(mReader.forget(), aLength);
+ mReader = do_QueryInterface(finalStream);
+ MOZ_ASSERT(mReader);
+ }
+}
+
+already_AddRefed<nsIInputStream> IPCStreamDestination::TakeReader() {
+ MOZ_ASSERT(mReader);
+ MOZ_ASSERT(!mDelayedStartInputStream);
+
+ if (mDelayedStart) {
+ mDelayedStartInputStream =
+ new DelayedStartInputStream(this, std::move(mReader));
+ RefPtr<nsIAsyncInputStream> inputStream = mDelayedStartInputStream;
+ return inputStream.forget();
+ }
+
+ return mReader.forget();
+}
+
+bool IPCStreamDestination::IsOnOwningThread() const {
+ return mOwningThread == NS_GetCurrentThread();
+}
+
+void IPCStreamDestination::DispatchRunnable(
+ already_AddRefed<nsIRunnable>&& aRunnable) {
+ nsCOMPtr<nsIRunnable> runnable = aRunnable;
+ mOwningThread->Dispatch(runnable.forget(), NS_DISPATCH_NORMAL);
+}
+
+void IPCStreamDestination::ActorDestroyed() {
+ MOZ_ASSERT(mWriter);
+
+ // If we were gracefully closed we should have gotten RecvClose(). In
+ // that case, the writer will already be closed and this will have no
+ // effect. This just aborts the writer in the case where the child process
+ // crashes.
+ mWriter->CloseWithStatus(NS_ERROR_ABORT);
+
+ if (mDelayedStartInputStream) {
+ mDelayedStartInputStream->DestinationShutdown();
+ mDelayedStartInputStream = nullptr;
+ }
+}
+
+void IPCStreamDestination::BufferReceived(const wr::ByteBuffer& aBuffer) {
+ MOZ_ASSERT(mWriter);
+
+ uint32_t numWritten = 0;
+
+ // This should only fail if we hit an OOM condition.
+ nsresult rv = mWriter->Write(reinterpret_cast<char*>(aBuffer.mData),
+ aBuffer.mLength, &numWritten);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ RequestClose(rv);
+ }
+}
+
+void IPCStreamDestination::CloseReceived(nsresult aRv) {
+ MOZ_ASSERT(mWriter);
+ mWriter->CloseWithStatus(aRv);
+ TerminateDestination();
+}
+
+} // namespace ipc
+} // namespace mozilla