summaryrefslogtreecommitdiffstats
path: root/ipc/glue/IPCStreamSource.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ipc/glue/IPCStreamSource.cpp')
-rw-r--r--ipc/glue/IPCStreamSource.cpp277
1 files changed, 277 insertions, 0 deletions
diff --git a/ipc/glue/IPCStreamSource.cpp b/ipc/glue/IPCStreamSource.cpp
new file mode 100644
index 0000000000..3ddf0c9e52
--- /dev/null
+++ b/ipc/glue/IPCStreamSource.cpp
@@ -0,0 +1,277 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: set ts=8 sts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#include "IPCStreamSource.h"
+
+#include "BackgroundParent.h" // for AssertIsOnBackgroundThread
+#include "mozilla/UniquePtr.h"
+#include "mozilla/dom/RemoteWorkerService.h"
+#include "mozilla/dom/WorkerCommon.h"
+#include "mozilla/webrender/WebRenderTypes.h"
+#include "nsIAsyncInputStream.h"
+#include "nsICancelableRunnable.h"
+#include "nsIRunnable.h"
+#include "nsISerialEventTarget.h"
+#include "nsStreamUtils.h"
+#include "nsThreadUtils.h"
+
+using mozilla::wr::ByteBuffer;
+
+namespace mozilla {
+namespace ipc {
+
+class IPCStreamSource::Callback final : public DiscardableRunnable,
+ public nsIInputStreamCallback {
+ public:
+ explicit Callback(IPCStreamSource* aSource)
+ : DiscardableRunnable("IPCStreamSource::Callback"),
+ mSource(aSource),
+ mOwningEventTarget(GetCurrentSerialEventTarget()) {
+ MOZ_ASSERT(mSource);
+ }
+
+ NS_IMETHOD
+ OnInputStreamReady(nsIAsyncInputStream* aStream) override {
+ // any thread
+ if (mOwningEventTarget->IsOnCurrentThread()) {
+ return Run();
+ }
+
+ // If this fails, then it means the owning thread is a Worker that has
+ // been shutdown. Its ok to lose the event in this case because the
+ // IPCStreamChild listens for this event through the WorkerRef.
+ nsresult rv =
+ mOwningEventTarget->Dispatch(this, nsIThread::DISPATCH_NORMAL);
+ if (NS_FAILED(rv)) {
+ NS_WARNING("Failed to dispatch stream readable event to owning thread");
+ }
+
+ return NS_OK;
+ }
+
+ NS_IMETHOD
+ Run() override {
+ MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
+ if (mSource) {
+ mSource->OnStreamReady(this);
+ }
+ return NS_OK;
+ }
+
+ // OnDiscard() gets called when the Worker thread is being shutdown. We have
+ // nothing to do here because IPCStreamChild handles this case via
+ // the WorkerRef.
+
+ void ClearSource() {
+ MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
+ MOZ_ASSERT(mSource);
+ mSource = nullptr;
+ }
+
+ private:
+ ~Callback() {
+ // called on any thread
+
+ // ClearSource() should be called before the Callback is destroyed
+ MOZ_ASSERT(!mSource);
+ }
+
+ // This is a raw pointer because the source keeps alive the callback and,
+ // before beeing destroyed, it nullifies this pointer (this happens when
+ // ActorDestroyed() is called).
+ IPCStreamSource* mSource;
+
+ nsCOMPtr<nsISerialEventTarget> mOwningEventTarget;
+
+ NS_DECL_ISUPPORTS_INHERITED
+};
+
+NS_IMPL_ISUPPORTS_INHERITED(IPCStreamSource::Callback, DiscardableRunnable,
+ nsIInputStreamCallback);
+
+IPCStreamSource::IPCStreamSource(nsIAsyncInputStream* aInputStream)
+ : mStream(aInputStream), mState(ePending) {
+ MOZ_ASSERT(aInputStream);
+}
+
+IPCStreamSource::~IPCStreamSource() {
+ NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
+ MOZ_ASSERT(mState == eClosed);
+ MOZ_ASSERT(!mCallback);
+ MOZ_ASSERT(!mWorkerRef);
+}
+
+bool IPCStreamSource::Initialize() {
+ bool nonBlocking = false;
+ MOZ_ALWAYS_TRUE(NS_SUCCEEDED(mStream->IsNonBlocking(&nonBlocking)));
+ // IPCStreamChild reads in the current thread, so it is only supported on
+ // non-blocking, async channels
+ if (!nonBlocking) {
+ return false;
+ }
+
+ // A source can be used on any thread, but we only support IPCStream on
+ // main thread, Workers, Worker Launcher, and PBackground thread right now.
+ // This is due to the requirement that the thread be guaranteed to live long
+ // enough to receive messages. We can enforce this guarantee with a
+ // StrongWorkerRef on worker threads, but not other threads. Main-thread,
+ // PBackground, and Worker Launcher threads do not need anything special in
+ // order to be kept alive.
+ if (!NS_IsMainThread()) {
+ if (const auto workerPrivate = dom::GetCurrentThreadWorkerPrivate()) {
+ RefPtr<dom::StrongWorkerRef> workerRef =
+ dom::StrongWorkerRef::CreateForcibly(workerPrivate,
+ "IPCStreamSource");
+ if (NS_WARN_IF(!workerRef)) {
+ return false;
+ }
+
+ mWorkerRef = std::move(workerRef);
+ } else {
+ MOZ_DIAGNOSTIC_ASSERT(
+ IsOnBackgroundThread() ||
+ dom::RemoteWorkerService::Thread()->IsOnCurrentThread());
+ }
+ }
+
+ return true;
+}
+
+void IPCStreamSource::ActorConstructed() {
+ MOZ_ASSERT(mState == ePending);
+ mState = eActorConstructed;
+}
+
+void IPCStreamSource::ActorDestroyed() {
+ NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
+
+ mState = eClosed;
+
+ if (mCallback) {
+ mCallback->ClearSource();
+ mCallback = nullptr;
+ }
+
+ mWorkerRef = nullptr;
+}
+
+void IPCStreamSource::Start() {
+ NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
+ DoRead();
+}
+
+void IPCStreamSource::StartDestroy() {
+ NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
+ OnEnd(NS_ERROR_ABORT);
+}
+
+void IPCStreamSource::DoRead() {
+ NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
+ MOZ_ASSERT(mState == eActorConstructed);
+ MOZ_ASSERT(!mCallback);
+
+ // The input stream (likely a pipe) probably uses a segment size of
+ // 4kb. If there is data already buffered it would be nice to aggregate
+ // multiple segments into a single IPC call. Conversely, don't send too
+ // too large of a buffer in a single call to avoid spiking memory.
+ static const uint64_t kMaxBytesPerMessage = 32 * 1024;
+ static_assert(kMaxBytesPerMessage <= static_cast<uint64_t>(UINT32_MAX),
+ "kMaxBytesPerMessage must cleanly cast to uint32_t");
+
+ UniquePtr<char[]> buffer(new char[kMaxBytesPerMessage]);
+
+ while (true) {
+ // It should not be possible to transition to closed state without
+ // this loop terminating via a return.
+ MOZ_ASSERT(mState == eActorConstructed);
+
+ // See if the stream is closed by checking the return of Available.
+ uint64_t dummy;
+ nsresult rv = mStream->Available(&dummy);
+ if (NS_FAILED(rv)) {
+ OnEnd(rv);
+ return;
+ }
+
+ uint32_t bytesRead = 0;
+ rv = mStream->Read(buffer.get(), kMaxBytesPerMessage, &bytesRead);
+
+ if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
+ MOZ_ASSERT(bytesRead == 0);
+ Wait();
+ return;
+ }
+
+ if (NS_FAILED(rv)) {
+ MOZ_ASSERT(bytesRead == 0);
+ OnEnd(rv);
+ return;
+ }
+
+ // Zero-byte read indicates end-of-stream.
+ if (bytesRead == 0) {
+ OnEnd(NS_BASE_STREAM_CLOSED);
+ return;
+ }
+
+ // We read some data from the stream, send it across.
+ SendData(ByteBuffer(bytesRead, reinterpret_cast<uint8_t*>(buffer.get())));
+ }
+}
+
+void IPCStreamSource::Wait() {
+ NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
+ MOZ_ASSERT(mState == eActorConstructed);
+ MOZ_ASSERT(!mCallback);
+
+ // Set mCallback immediately instead of waiting for success. Its possible
+ // AsyncWait() will callback synchronously.
+ mCallback = new Callback(this);
+ nsresult rv = mStream->AsyncWait(mCallback, 0, 0, nullptr);
+ if (NS_FAILED(rv)) {
+ OnEnd(rv);
+ return;
+ }
+}
+
+void IPCStreamSource::OnStreamReady(Callback* aCallback) {
+ NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
+ MOZ_ASSERT(mCallback);
+ MOZ_ASSERT(aCallback == mCallback);
+ mCallback->ClearSource();
+ mCallback = nullptr;
+
+ // Possibly closed if this callback is (indirectly) called by
+ // IPCStreamSourceParent::RecvRequestClose().
+ if (mState == eClosed) {
+ return;
+ }
+
+ DoRead();
+}
+
+void IPCStreamSource::OnEnd(nsresult aRv) {
+ NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
+ MOZ_ASSERT(aRv != NS_BASE_STREAM_WOULD_BLOCK);
+
+ if (mState == eClosed) {
+ return;
+ }
+
+ mState = eClosed;
+
+ mStream->CloseWithStatus(aRv);
+
+ if (aRv == NS_BASE_STREAM_CLOSED) {
+ aRv = NS_OK;
+ }
+
+ // This will trigger an ActorDestroy() from the other side
+ Close(aRv);
+}
+
+} // namespace ipc
+} // namespace mozilla