diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:44:51 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:44:51 +0000 |
commit | 9e3c08db40b8916968b9f30096c7be3f00ce9647 (patch) | |
tree | a68f146d7fa01f0134297619fbe7e33db084e0aa /ipc/glue/MessageChannel.cpp | |
parent | Initial commit. (diff) | |
download | thunderbird-9e3c08db40b8916968b9f30096c7be3f00ce9647.tar.xz thunderbird-9e3c08db40b8916968b9f30096c7be3f00ce9647.zip |
Adding upstream version 1:115.7.0.upstream/1%115.7.0upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'ipc/glue/MessageChannel.cpp')
-rw-r--r-- | ipc/glue/MessageChannel.cpp | 2482 |
1 files changed, 2482 insertions, 0 deletions
diff --git a/ipc/glue/MessageChannel.cpp b/ipc/glue/MessageChannel.cpp new file mode 100644 index 0000000000..ea01c54f0d --- /dev/null +++ b/ipc/glue/MessageChannel.cpp @@ -0,0 +1,2482 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- + * vim: sw=2 ts=4 et : + */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "mozilla/ipc/MessageChannel.h" + +#include <math.h> + +#include <utility> + +#include "CrashAnnotations.h" +#include "mozilla/Assertions.h" +#include "mozilla/CycleCollectedJSContext.h" +#include "mozilla/DebugOnly.h" +#include "mozilla/Fuzzing.h" +#include "mozilla/IntentionalCrash.h" +#include "mozilla/Logging.h" +#include "mozilla/Monitor.h" +#include "mozilla/Mutex.h" +#include "mozilla/ProfilerMarkers.h" +#include "mozilla/ScopeExit.h" +#include "mozilla/Sprintf.h" +#include "mozilla/StaticMutex.h" +#include "mozilla/Telemetry.h" +#include "mozilla/TimeStamp.h" +#include "mozilla/UniquePtrExtensions.h" +#include "mozilla/dom/ScriptSettings.h" +#include "mozilla/ipc/NodeController.h" +#include "mozilla/ipc/ProcessChild.h" +#include "mozilla/ipc/ProtocolUtils.h" +#include "nsAppRunner.h" +#include "nsContentUtils.h" +#include "nsIDirectTaskDispatcher.h" +#include "nsTHashMap.h" +#include "nsDebug.h" +#include "nsExceptionHandler.h" +#include "nsIMemoryReporter.h" +#include "nsISupportsImpl.h" +#include "nsPrintfCString.h" +#include "nsThreadUtils.h" + +#ifdef OS_WIN +# include "mozilla/gfx/Logging.h" +#endif + +#ifdef FUZZING_SNAPSHOT +# include "mozilla/fuzzing/IPCFuzzController.h" +#endif + +// Undo the damage done by mozzconf.h +#undef compress + +static mozilla::LazyLogModule sLogModule("ipc"); +#define IPC_LOG(...) MOZ_LOG(sLogModule, LogLevel::Debug, (__VA_ARGS__)) + +/* + * IPC design: + * + * There are two kinds of messages: async and sync. Sync messages are blocking. + * + * Terminology: To dispatch a message Foo is to run the RecvFoo code for + * it. This is also called "handling" the message. + * + * Sync and async messages can sometimes "nest" inside other sync messages + * (i.e., while waiting for the sync reply, we can dispatch the inner + * message). The three possible nesting levels are NOT_NESTED, + * NESTED_INSIDE_SYNC, and NESTED_INSIDE_CPOW. The intended uses are: + * NOT_NESTED - most messages. + * NESTED_INSIDE_SYNC - CPOW-related messages, which are always sync + * and can go in either direction. + * NESTED_INSIDE_CPOW - messages where we don't want to dispatch + * incoming CPOWs while waiting for the response. + * These nesting levels are ordered: NOT_NESTED, NESTED_INSIDE_SYNC, + * NESTED_INSIDE_CPOW. Async messages cannot be NESTED_INSIDE_SYNC but they can + * be NESTED_INSIDE_CPOW. + * + * To avoid jank, the parent process is not allowed to send NOT_NESTED sync + * messages. When a process is waiting for a response to a sync message M0, it + * will dispatch an incoming message M if: + * 1. M has a higher nesting level than M0, or + * 2. if M has the same nesting level as M0 and we're in the child, or + * 3. if M has the same nesting level as M0 and it was sent by the other side + * while dispatching M0. + * The idea is that messages with higher nesting should take precendence. The + * purpose of rule 2 is to handle a race where both processes send to each other + * simultaneously. In this case, we resolve the race in favor of the parent (so + * the child dispatches first). + * + * Messages satisfy the following properties: + * A. When waiting for a response to a sync message, we won't dispatch any + * messages of a lower nesting level. + * B. Messages of the same nesting level will be dispatched roughly in the + * order they were sent. The exception is when the parent and child send + * sync messages to each other simulataneously. In this case, the parent's + * message is dispatched first. While it is dispatched, the child may send + * further nested messages, and these messages may be dispatched before the + * child's original message. We can consider ordering to be preserved here + * because we pretend that the child's original message wasn't sent until + * after the parent's message is finished being dispatched. + * + * When waiting for a sync message reply, we dispatch an async message only if + * it is NESTED_INSIDE_CPOW. Normally NESTED_INSIDE_CPOW async + * messages are sent only from the child. However, the parent can send + * NESTED_INSIDE_CPOW async messages when it is creating a bridged protocol. + */ + +using namespace mozilla; +using namespace mozilla::ipc; + +using mozilla::MonitorAutoLock; +using mozilla::MonitorAutoUnlock; +using mozilla::dom::AutoNoJSAPI; + +#define IPC_ASSERT(_cond, ...) \ + do { \ + AssertWorkerThread(); \ + mMonitor->AssertCurrentThreadOwns(); \ + if (!(_cond)) DebugAbort(__FILE__, __LINE__, #_cond, ##__VA_ARGS__); \ + } while (0) + +static MessageChannel* gParentProcessBlocker = nullptr; + +namespace mozilla { +namespace ipc { + +static const uint32_t kMinTelemetryMessageSize = 4096; + +// Note: we round the time we spend waiting for a response to the nearest +// millisecond. So a min value of 1 ms actually captures from 500us and above. +// This is used for both the sending and receiving side telemetry for sync IPC, +// (IPC_SYNC_MAIN_LATENCY_MS and IPC_SYNC_RECEIVE_MS). +static const uint32_t kMinTelemetrySyncIPCLatencyMs = 1; + +// static +bool MessageChannel::sIsPumpingMessages = false; + +class AutoEnterTransaction { + public: + explicit AutoEnterTransaction(MessageChannel* aChan, int32_t aMsgSeqno, + int32_t aTransactionID, int aNestedLevel) + MOZ_REQUIRES(*aChan->mMonitor) + : mChan(aChan), + mActive(true), + mOutgoing(true), + mNestedLevel(aNestedLevel), + mSeqno(aMsgSeqno), + mTransaction(aTransactionID), + mNext(mChan->mTransactionStack) { + mChan->mMonitor->AssertCurrentThreadOwns(); + mChan->mTransactionStack = this; + } + + explicit AutoEnterTransaction(MessageChannel* aChan, + const IPC::Message& aMessage) + MOZ_REQUIRES(*aChan->mMonitor) + : mChan(aChan), + mActive(true), + mOutgoing(false), + mNestedLevel(aMessage.nested_level()), + mSeqno(aMessage.seqno()), + mTransaction(aMessage.transaction_id()), + mNext(mChan->mTransactionStack) { + mChan->mMonitor->AssertCurrentThreadOwns(); + + if (!aMessage.is_sync()) { + mActive = false; + return; + } + + mChan->mTransactionStack = this; + } + + ~AutoEnterTransaction() { + mChan->mMonitor->AssertCurrentThreadOwns(); + if (mActive) { + mChan->mTransactionStack = mNext; + } + } + + void Cancel() { + mChan->mMonitor->AssertCurrentThreadOwns(); + AutoEnterTransaction* cur = mChan->mTransactionStack; + MOZ_RELEASE_ASSERT(cur == this); + while (cur && cur->mNestedLevel != IPC::Message::NOT_NESTED) { + // Note that, in the following situation, we will cancel multiple + // transactions: + // 1. Parent sends NESTED_INSIDE_SYNC message P1 to child. + // 2. Child sends NESTED_INSIDE_SYNC message C1 to child. + // 3. Child dispatches P1, parent blocks. + // 4. Child cancels. + // In this case, both P1 and C1 are cancelled. The parent will + // remove C1 from its queue when it gets the cancellation message. + MOZ_RELEASE_ASSERT(cur->mActive); + cur->mActive = false; + cur = cur->mNext; + } + + mChan->mTransactionStack = cur; + + MOZ_RELEASE_ASSERT(IsComplete()); + } + + bool AwaitingSyncReply() const { + MOZ_RELEASE_ASSERT(mActive); + if (mOutgoing) { + return true; + } + return mNext ? mNext->AwaitingSyncReply() : false; + } + + int AwaitingSyncReplyNestedLevel() const { + MOZ_RELEASE_ASSERT(mActive); + if (mOutgoing) { + return mNestedLevel; + } + return mNext ? mNext->AwaitingSyncReplyNestedLevel() : 0; + } + + bool DispatchingSyncMessage() const { + MOZ_RELEASE_ASSERT(mActive); + if (!mOutgoing) { + return true; + } + return mNext ? mNext->DispatchingSyncMessage() : false; + } + + int DispatchingSyncMessageNestedLevel() const { + MOZ_RELEASE_ASSERT(mActive); + if (!mOutgoing) { + return mNestedLevel; + } + return mNext ? mNext->DispatchingSyncMessageNestedLevel() : 0; + } + + int NestedLevel() const { + MOZ_RELEASE_ASSERT(mActive); + return mNestedLevel; + } + + int32_t SequenceNumber() const { + MOZ_RELEASE_ASSERT(mActive); + return mSeqno; + } + + int32_t TransactionID() const { + MOZ_RELEASE_ASSERT(mActive); + return mTransaction; + } + + void ReceivedReply(UniquePtr<IPC::Message> aMessage) { + MOZ_RELEASE_ASSERT(aMessage->seqno() == mSeqno); + MOZ_RELEASE_ASSERT(aMessage->transaction_id() == mTransaction); + MOZ_RELEASE_ASSERT(!mReply); + IPC_LOG("Reply received on worker thread: seqno=%d", mSeqno); + mReply = std::move(aMessage); + MOZ_RELEASE_ASSERT(IsComplete()); + } + + void HandleReply(UniquePtr<IPC::Message> aMessage) { + mChan->mMonitor->AssertCurrentThreadOwns(); + AutoEnterTransaction* cur = mChan->mTransactionStack; + MOZ_RELEASE_ASSERT(cur == this); + while (cur) { + MOZ_RELEASE_ASSERT(cur->mActive); + if (aMessage->seqno() == cur->mSeqno) { + cur->ReceivedReply(std::move(aMessage)); + break; + } + cur = cur->mNext; + MOZ_RELEASE_ASSERT(cur); + } + } + + bool IsComplete() { return !mActive || mReply; } + + bool IsOutgoing() { return mOutgoing; } + + bool IsCanceled() { return !mActive; } + + bool IsBottom() const { return !mNext; } + + bool IsError() { + MOZ_RELEASE_ASSERT(mReply); + return mReply->is_reply_error(); + } + + UniquePtr<IPC::Message> GetReply() { return std::move(mReply); } + + private: + MessageChannel* mChan; + + // Active is true if this transaction is on the mChan->mTransactionStack + // stack. Generally we're not on the stack if the transaction was canceled + // or if it was for a message that doesn't require transactions (an async + // message). + bool mActive; + + // Is this stack frame for an outgoing message? + bool mOutgoing; + + // Properties of the message being sent/received. + int mNestedLevel; + int32_t mSeqno; + int32_t mTransaction; + + // Next item in mChan->mTransactionStack. + AutoEnterTransaction* mNext; + + // Pointer the a reply received for this message, if one was received. + UniquePtr<IPC::Message> mReply; +}; + +class PendingResponseReporter final : public nsIMemoryReporter { + ~PendingResponseReporter() = default; + + public: + NS_DECL_THREADSAFE_ISUPPORTS + + NS_IMETHOD + CollectReports(nsIHandleReportCallback* aHandleReport, nsISupports* aData, + bool aAnonymize) override { + MOZ_COLLECT_REPORT( + "unresolved-ipc-responses", KIND_OTHER, UNITS_COUNT, + MessageChannel::gUnresolvedResponses, + "Outstanding IPC async message responses that are still not resolved."); + return NS_OK; + } +}; + +NS_IMPL_ISUPPORTS(PendingResponseReporter, nsIMemoryReporter) + +class ChannelCountReporter final : public nsIMemoryReporter { + ~ChannelCountReporter() = default; + + struct ChannelCounts { + size_t mNow; + size_t mMax; + + ChannelCounts() : mNow(0), mMax(0) {} + + void Inc() { + ++mNow; + if (mMax < mNow) { + mMax = mNow; + } + } + + void Dec() { + MOZ_ASSERT(mNow > 0); + --mNow; + } + }; + + using CountTable = nsTHashMap<nsDepCharHashKey, ChannelCounts>; + + static StaticMutex sChannelCountMutex; + static CountTable* sChannelCounts MOZ_GUARDED_BY(sChannelCountMutex); + + public: + NS_DECL_THREADSAFE_ISUPPORTS + + NS_IMETHOD + CollectReports(nsIHandleReportCallback* aHandleReport, nsISupports* aData, + bool aAnonymize) override { + AutoTArray<std::pair<const char*, ChannelCounts>, 16> counts; + { + StaticMutexAutoLock countLock(sChannelCountMutex); + if (!sChannelCounts) { + return NS_OK; + } + counts.SetCapacity(sChannelCounts->Count()); + for (const auto& entry : *sChannelCounts) { + counts.AppendElement(std::pair{entry.GetKey(), entry.GetData()}); + } + } + + for (const auto& entry : counts) { + nsPrintfCString pathNow("ipc-channels/%s", entry.first); + nsPrintfCString pathMax("ipc-channels-peak/%s", entry.first); + nsPrintfCString descNow( + "Number of IPC channels for" + " top-level actor type %s", + entry.first); + nsPrintfCString descMax( + "Peak number of IPC channels for" + " top-level actor type %s", + entry.first); + + aHandleReport->Callback(""_ns, pathNow, KIND_OTHER, UNITS_COUNT, + entry.second.mNow, descNow, aData); + aHandleReport->Callback(""_ns, pathMax, KIND_OTHER, UNITS_COUNT, + entry.second.mMax, descMax, aData); + } + return NS_OK; + } + + static void Increment(const char* aName) { + StaticMutexAutoLock countLock(sChannelCountMutex); + if (!sChannelCounts) { + sChannelCounts = new CountTable; + } + sChannelCounts->LookupOrInsert(aName).Inc(); + } + + static void Decrement(const char* aName) { + StaticMutexAutoLock countLock(sChannelCountMutex); + MOZ_ASSERT(sChannelCounts); + sChannelCounts->LookupOrInsert(aName).Dec(); + } +}; + +StaticMutex ChannelCountReporter::sChannelCountMutex; +ChannelCountReporter::CountTable* ChannelCountReporter::sChannelCounts; + +NS_IMPL_ISUPPORTS(ChannelCountReporter, nsIMemoryReporter) + +// In child processes, the first MessageChannel is created before +// XPCOM is initialized enough to construct the memory reporter +// manager. This retries every time a MessageChannel is constructed, +// which is good enough in practice. +template <class Reporter> +static void TryRegisterStrongMemoryReporter() { + static Atomic<bool> registered; + if (registered.compareExchange(false, true)) { + RefPtr<Reporter> reporter = new Reporter(); + if (NS_FAILED(RegisterStrongMemoryReporter(reporter))) { + registered = false; + } + } +} + +Atomic<size_t> MessageChannel::gUnresolvedResponses; + +MessageChannel::MessageChannel(const char* aName, IToplevelProtocol* aListener) + : mName(aName), mListener(aListener), mMonitor(new RefCountedMonitor()) { + MOZ_COUNT_CTOR(ipc::MessageChannel); + +#ifdef OS_WIN + mEvent = CreateEventW(nullptr, TRUE, FALSE, nullptr); + MOZ_RELEASE_ASSERT(mEvent, "CreateEvent failed! Nothing is going to work!"); +#endif + + TryRegisterStrongMemoryReporter<PendingResponseReporter>(); + TryRegisterStrongMemoryReporter<ChannelCountReporter>(); +} + +MessageChannel::~MessageChannel() { + MOZ_COUNT_DTOR(ipc::MessageChannel); + MonitorAutoLock lock(*mMonitor); + MOZ_RELEASE_ASSERT(!mOnCxxStack, + "MessageChannel destroyed while code on CxxStack"); +#ifdef OS_WIN + if (mEvent) { + BOOL ok = CloseHandle(mEvent); + mEvent = nullptr; + + if (!ok) { + gfxDevCrash(mozilla::gfx::LogReason::MessageChannelCloseFailure) + << "MessageChannel failed to close. GetLastError: " << GetLastError(); + } + MOZ_RELEASE_ASSERT(ok); + } else { + gfxDevCrash(mozilla::gfx::LogReason::MessageChannelCloseFailure) + << "MessageChannel destructor ran without an mEvent Handle"; + } +#endif + + // Make sure that the MessageChannel was closed (and therefore cleared) before + // it was destroyed. We can't properly close the channel at this point, as it + // would be unsafe to invoke our listener's callbacks, and we may be being + // destroyed on a thread other than `mWorkerThread`. + if (!IsClosedLocked()) { + CrashReporter::AnnotateCrashReport( + CrashReporter::Annotation::IPCFatalErrorProtocol, + nsDependentCString(mName)); + switch (mChannelState) { + case ChannelConnected: + MOZ_CRASH( + "MessageChannel destroyed without being closed " + "(mChannelState == ChannelConnected)."); + break; + case ChannelClosing: + MOZ_CRASH( + "MessageChannel destroyed without being closed " + "(mChannelState == ChannelClosing)."); + break; + case ChannelError: + MOZ_CRASH( + "MessageChannel destroyed without being closed " + "(mChannelState == ChannelError)."); + break; + default: + MOZ_CRASH("MessageChannel destroyed without being closed."); + } + } + + // Double-check other properties for thoroughness. + MOZ_RELEASE_ASSERT(!mLink); + MOZ_RELEASE_ASSERT(mPendingResponses.empty()); + MOZ_RELEASE_ASSERT(!mChannelErrorTask); + MOZ_RELEASE_ASSERT(mPending.isEmpty()); + MOZ_RELEASE_ASSERT(!mShutdownTask); +} + +#ifdef DEBUG +void MessageChannel::AssertMaybeDeferredCountCorrect() { + mMonitor->AssertCurrentThreadOwns(); + + size_t count = 0; + for (MessageTask* task : mPending) { + task->AssertMonitorHeld(*mMonitor); + if (!IsAlwaysDeferred(*task->Msg())) { + count++; + } + } + + MOZ_ASSERT(count == mMaybeDeferredPendingCount); +} +#endif + +// This function returns the current transaction ID. Since the notion of a +// "current transaction" can be hard to define when messages race with each +// other and one gets canceled and the other doesn't, we require that this +// function is only called when the current transaction is known to be for a +// NESTED_INSIDE_SYNC message. In that case, we know for sure what the caller is +// looking for. +int32_t MessageChannel::CurrentNestedInsideSyncTransaction() const { + mMonitor->AssertCurrentThreadOwns(); + if (!mTransactionStack) { + return 0; + } + MOZ_RELEASE_ASSERT(mTransactionStack->NestedLevel() == + IPC::Message::NESTED_INSIDE_SYNC); + return mTransactionStack->TransactionID(); +} + +bool MessageChannel::AwaitingSyncReply() const { + mMonitor->AssertCurrentThreadOwns(); + return mTransactionStack ? mTransactionStack->AwaitingSyncReply() : false; +} + +int MessageChannel::AwaitingSyncReplyNestedLevel() const { + mMonitor->AssertCurrentThreadOwns(); + return mTransactionStack ? mTransactionStack->AwaitingSyncReplyNestedLevel() + : 0; +} + +bool MessageChannel::DispatchingSyncMessage() const { + mMonitor->AssertCurrentThreadOwns(); + return mTransactionStack ? mTransactionStack->DispatchingSyncMessage() + : false; +} + +int MessageChannel::DispatchingSyncMessageNestedLevel() const { + mMonitor->AssertCurrentThreadOwns(); + return mTransactionStack + ? mTransactionStack->DispatchingSyncMessageNestedLevel() + : 0; +} + +static const char* StringFromIPCSide(Side side) { + switch (side) { + case ChildSide: + return "Child"; + case ParentSide: + return "Parent"; + default: + return "Unknown"; + } +} + +static void PrintErrorMessage(Side side, const char* channelName, + const char* msg) { + printf_stderr("\n###!!! [%s][%s] Error: %s\n\n", StringFromIPCSide(side), + channelName, msg); +} + +bool MessageChannel::Connected() const { + mMonitor->AssertCurrentThreadOwns(); + return ChannelConnected == mChannelState; +} + +bool MessageChannel::ConnectedOrClosing() const { + mMonitor->AssertCurrentThreadOwns(); + return ChannelConnected == mChannelState || ChannelClosing == mChannelState; +} + +bool MessageChannel::CanSend() const { + if (!mMonitor) { + return false; + } + MonitorAutoLock lock(*mMonitor); + return Connected(); +} + +void MessageChannel::Clear() { + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + MOZ_DIAGNOSTIC_ASSERT(IsClosedLocked(), "MessageChannel cleared too early?"); + MOZ_ASSERT(ChannelClosed == mChannelState || ChannelError == mChannelState); + + // Don't clear mWorkerThread; we use it in AssertWorkerThread(). + // + // Also don't clear mListener. If we clear it, then sending a message + // through this channel after it's Clear()'ed can cause this process to + // crash. + + if (mShutdownTask) { + mShutdownTask->Clear(); + mWorkerThread->UnregisterShutdownTask(mShutdownTask); + } + mShutdownTask = nullptr; + + if (NS_IsMainThread() && gParentProcessBlocker == this) { + gParentProcessBlocker = nullptr; + } + + gUnresolvedResponses -= mPendingResponses.size(); + { + CallbackMap map = std::move(mPendingResponses); + MonitorAutoUnlock unlock(*mMonitor); + for (auto& pair : map) { + pair.second->Reject(ResponseRejectReason::ChannelClosed); + } + } + mPendingResponses.clear(); + + SetIsCrossProcess(false); + + mLink = nullptr; + + if (mChannelErrorTask) { + mChannelErrorTask->Cancel(); + mChannelErrorTask = nullptr; + } + + if (mFlushLazySendTask) { + mFlushLazySendTask->Cancel(); + mFlushLazySendTask = nullptr; + } + + // Free up any memory used by pending messages. + mPending.clear(); + + mMaybeDeferredPendingCount = 0; +} + +bool MessageChannel::Open(ScopedPort aPort, Side aSide, + const nsID& aMessageChannelId, + nsISerialEventTarget* aEventTarget) { + nsCOMPtr<nsISerialEventTarget> eventTarget = + aEventTarget ? aEventTarget : GetCurrentSerialEventTarget(); + MOZ_RELEASE_ASSERT(eventTarget, + "Must open MessageChannel on a nsISerialEventTarget"); + MOZ_RELEASE_ASSERT(eventTarget->IsOnCurrentThread(), + "Must open MessageChannel from worker thread"); + + auto shutdownTask = MakeRefPtr<WorkerTargetShutdownTask>(eventTarget, this); + nsresult rv = eventTarget->RegisterShutdownTask(shutdownTask); + MOZ_ASSERT(rv != NS_ERROR_NOT_IMPLEMENTED, + "target for MessageChannel must support shutdown tasks"); + if (rv == NS_ERROR_UNEXPECTED) { + // If shutdown tasks have already started running, dispatch our shutdown + // task manually. + NS_WARNING("Opening MessageChannel on EventTarget in shutdown"); + rv = eventTarget->Dispatch(shutdownTask->AsRunnable()); + } + MOZ_RELEASE_ASSERT(NS_SUCCEEDED(rv), + "error registering ShutdownTask for MessageChannel"); + + { + MonitorAutoLock lock(*mMonitor); + MOZ_RELEASE_ASSERT(!mLink, "Open() called > once"); + MOZ_RELEASE_ASSERT(ChannelClosed == mChannelState, "Not currently closed"); + MOZ_ASSERT(mSide == UnknownSide); + + mMessageChannelId = aMessageChannelId; + mWorkerThread = eventTarget; + mShutdownTask = shutdownTask; + mLink = MakeUnique<PortLink>(this, std::move(aPort)); + mChannelState = ChannelConnected; + mSide = aSide; + } + + // Notify our listener that the underlying IPC channel has been established. + // IProtocol will use this callback to create the ActorLifecycleProxy, and + // perform an `AddRef` call to keep the actor alive until the channel is + // disconnected. + // + // We unlock our monitor before calling `OnIPCChannelOpened` to ensure that + // any calls back into `MessageChannel` do not deadlock. At this point, we may + // be receiving messages on the IO thread, however we cannot process them on + // the worker thread or have notified our listener until after this function + // returns. + mListener->OnIPCChannelOpened(); + return true; +} + +static Side GetOppSide(Side aSide) { + switch (aSide) { + case ChildSide: + return ParentSide; + case ParentSide: + return ChildSide; + default: + return UnknownSide; + } +} + +bool MessageChannel::Open(MessageChannel* aTargetChan, + nsISerialEventTarget* aEventTarget, Side aSide) { + // Opens a connection to another thread in the same process. + + MOZ_ASSERT(aTargetChan, "Need a target channel"); + + nsID channelId = nsID::GenerateUUID(); + + std::pair<ScopedPort, ScopedPort> ports = + NodeController::GetSingleton()->CreatePortPair(); + + // NOTE: This dispatch must be sync as it captures locals by non-owning + // reference, however we can't use `NS_DispatchAndSpinEventLoopUntilComplete` + // as that will spin a nested event loop, and doesn't work with certain types + // of calling event targets. + base::WaitableEvent event(/* manual_reset */ true, + /* initially_signaled */ false); + MOZ_ALWAYS_SUCCEEDS(aEventTarget->Dispatch(NS_NewCancelableRunnableFunction( + "ipc::MessageChannel::OpenAsOtherThread", [&]() { + aTargetChan->Open(std::move(ports.second), GetOppSide(aSide), channelId, + aEventTarget); + event.Signal(); + }))); + bool ok = event.Wait(); + MOZ_RELEASE_ASSERT(ok); + + // Now that the other side has connected, open the port on our side. + return Open(std::move(ports.first), aSide, channelId); +} + +bool MessageChannel::OpenOnSameThread(MessageChannel* aTargetChan, + mozilla::ipc::Side aSide) { + auto [porta, portb] = NodeController::GetSingleton()->CreatePortPair(); + + nsID channelId = nsID::GenerateUUID(); + + aTargetChan->mIsSameThreadChannel = true; + mIsSameThreadChannel = true; + + auto* currentThread = GetCurrentSerialEventTarget(); + return aTargetChan->Open(std::move(portb), GetOppSide(aSide), channelId, + currentThread) && + Open(std::move(porta), aSide, channelId, currentThread); +} + +bool MessageChannel::Send(UniquePtr<Message> aMsg) { + if (aMsg->size() >= kMinTelemetryMessageSize) { + Telemetry::Accumulate(Telemetry::IPC_MESSAGE_SIZE2, aMsg->size()); + } + + MOZ_RELEASE_ASSERT(!aMsg->is_sync()); + MOZ_RELEASE_ASSERT(aMsg->nested_level() != IPC::Message::NESTED_INSIDE_SYNC); + + AutoSetValue<bool> setOnCxxStack(mOnCxxStack, true); + + AssertWorkerThread(); + mMonitor->AssertNotCurrentThreadOwns(); + if (MSG_ROUTING_NONE == aMsg->routing_id()) { + ReportMessageRouteError("MessageChannel::Send"); + return false; + } + + if (aMsg->seqno() == 0) { + aMsg->set_seqno(NextSeqno()); + } + + MonitorAutoLock lock(*mMonitor); + if (!Connected()) { + ReportConnectionError("Send", aMsg->type()); + return false; + } + + AddProfilerMarker(*aMsg, MessageDirection::eSending); + SendMessageToLink(std::move(aMsg)); + return true; +} + +void MessageChannel::SendMessageToLink(UniquePtr<Message> aMsg) { + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + + // If the channel is not cross-process, there's no reason to be lazy, so we + // ignore the flag in that case. + if (aMsg->is_lazy_send() && mIsCrossProcess) { + // If this is the first lazy message in the queue and our worker thread + // supports direct task dispatch, dispatch a task to flush messages, + // ensuring we don't leave them pending forever. + if (!mFlushLazySendTask) { + if (nsCOMPtr<nsIDirectTaskDispatcher> dispatcher = + do_QueryInterface(mWorkerThread)) { + mFlushLazySendTask = new FlushLazySendMessagesRunnable(this); + MOZ_ALWAYS_SUCCEEDS( + dispatcher->DispatchDirectTask(do_AddRef(mFlushLazySendTask))); + } + } + if (mFlushLazySendTask) { + mFlushLazySendTask->PushMessage(std::move(aMsg)); + return; + } + } + + if (mFlushLazySendTask) { + FlushLazySendMessages(); + } + mLink->SendMessage(std::move(aMsg)); +} + +void MessageChannel::FlushLazySendMessages() { + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + + // Clean up any SendLazyTask which might be pending. + auto messages = mFlushLazySendTask->TakeMessages(); + mFlushLazySendTask = nullptr; + + // Send all lazy messages, then clear the queue. + for (auto& msg : messages) { + mLink->SendMessage(std::move(msg)); + } +} + +UniquePtr<MessageChannel::UntypedCallbackHolder> MessageChannel::PopCallback( + const Message& aMsg, int32_t aActorId) { + auto iter = mPendingResponses.find(aMsg.seqno()); + if (iter != mPendingResponses.end() && iter->second->mActorId == aActorId && + iter->second->mReplyMsgId == aMsg.type()) { + UniquePtr<MessageChannel::UntypedCallbackHolder> ret = + std::move(iter->second); + mPendingResponses.erase(iter); + gUnresolvedResponses--; + return ret; + } + return nullptr; +} + +void MessageChannel::RejectPendingResponsesForActor(int32_t aActorId) { + auto itr = mPendingResponses.begin(); + while (itr != mPendingResponses.end()) { + if (itr->second.get()->mActorId != aActorId) { + ++itr; + continue; + } + itr->second.get()->Reject(ResponseRejectReason::ActorDestroyed); + // Take special care of advancing the iterator since we are + // removing it while iterating. + itr = mPendingResponses.erase(itr); + gUnresolvedResponses--; + } +} + +class BuildIDsMatchMessage : public IPC::Message { + public: + BuildIDsMatchMessage() + : IPC::Message(MSG_ROUTING_NONE, BUILD_IDS_MATCH_MESSAGE_TYPE) {} + void Log(const std::string& aPrefix, FILE* aOutf) const { + fputs("(special `Build IDs match' message)", aOutf); + } +}; + +// Send the parent a special async message to confirm when the parent and child +// are of the same buildID. Skips sending the message and returns false if the +// buildIDs don't match. This is a minor variation on +// MessageChannel::Send(Message* aMsg). +bool MessageChannel::SendBuildIDsMatchMessage(const char* aParentBuildID) { + MOZ_ASSERT(!XRE_IsParentProcess()); + + nsCString parentBuildID(aParentBuildID); + nsCString childBuildID(mozilla::PlatformBuildID()); + + if (parentBuildID != childBuildID) { + // The build IDs didn't match, usually because an update occurred in the + // background. + return false; + } + + auto msg = MakeUnique<BuildIDsMatchMessage>(); + + MOZ_RELEASE_ASSERT(!msg->is_sync()); + MOZ_RELEASE_ASSERT(msg->nested_level() != IPC::Message::NESTED_INSIDE_SYNC); + + AssertWorkerThread(); + mMonitor->AssertNotCurrentThreadOwns(); + // Don't check for MSG_ROUTING_NONE. + + MonitorAutoLock lock(*mMonitor); + if (!Connected()) { + ReportConnectionError("SendBuildIDsMatchMessage", msg->type()); + return false; + } + +#if defined(MOZ_DEBUG) && defined(ENABLE_TESTS) + // Technically, the behavior is interesting for any kind of process + // but when exercising tests, we want to crash only a content process and + // avoid making noise with other kind of processes crashing + if (const char* dontSend = PR_GetEnv("MOZ_BUILDID_MATCH_DONTSEND")) { + if (dontSend[0] == '1') { + // Bug 1732999: We are going to crash, so we need to advise leak check + // tooling to avoid intermittent missing leakcheck + NoteIntentionalCrash(XRE_GetProcessTypeString()); + if (XRE_IsContentProcess()) { + return false; + } + } + } +#endif + + SendMessageToLink(std::move(msg)); + return true; +} + +class CancelMessage : public IPC::Message { + public: + explicit CancelMessage(int transaction) + : IPC::Message(MSG_ROUTING_NONE, CANCEL_MESSAGE_TYPE) { + set_transaction_id(transaction); + } + static bool Read(const Message* msg) { return true; } + void Log(const std::string& aPrefix, FILE* aOutf) const { + fputs("(special `Cancel' message)", aOutf); + } +}; + +bool MessageChannel::MaybeInterceptSpecialIOMessage(const Message& aMsg) { + mMonitor->AssertCurrentThreadOwns(); + + if (MSG_ROUTING_NONE == aMsg.routing_id()) { + if (GOODBYE_MESSAGE_TYPE == aMsg.type()) { + // We've received a GOODBYE message, close the connection and mark + // ourselves as "Closing". + mLink->Close(); + mChannelState = ChannelClosing; + if (LoggingEnabled()) { + printf( + "[%s %u] NOTE: %s actor received `Goodbye' message. Closing " + "channel.\n", + XRE_GeckoProcessTypeToString(XRE_GetProcessType()), + static_cast<uint32_t>(base::GetCurrentProcId()), + (mSide == ChildSide) ? "child" : "parent"); + } + + // Notify the worker thread that the connection has been closed, as we + // will not receive an `OnChannelErrorFromLink` after calling + // `mLink->Close()`. + if (AwaitingSyncReply()) { + NotifyWorkerThread(); + } + PostErrorNotifyTask(); + return true; + } else if (CANCEL_MESSAGE_TYPE == aMsg.type()) { + IPC_LOG("Cancel from message"); + CancelTransaction(aMsg.transaction_id()); + NotifyWorkerThread(); + return true; + } else if (BUILD_IDS_MATCH_MESSAGE_TYPE == aMsg.type()) { + IPC_LOG("Build IDs match message"); + mBuildIDsConfirmedMatch = true; + return true; + } else if (IMPENDING_SHUTDOWN_MESSAGE_TYPE == aMsg.type()) { + IPC_LOG("Impending Shutdown received"); + ProcessChild::NotifiedImpendingShutdown(); + return true; + } + } + return false; +} + +/* static */ +bool MessageChannel::IsAlwaysDeferred(const Message& aMsg) { + // If a message is not NESTED_INSIDE_CPOW and not sync, then we always defer + // it. + return aMsg.nested_level() != IPC::Message::NESTED_INSIDE_CPOW && + !aMsg.is_sync(); +} + +bool MessageChannel::ShouldDeferMessage(const Message& aMsg) { + // Never defer messages that have the highest nested level, even async + // ones. This is safe because only the child can send these messages, so + // they can never nest. + if (aMsg.nested_level() == IPC::Message::NESTED_INSIDE_CPOW) { + MOZ_ASSERT(!IsAlwaysDeferred(aMsg)); + return false; + } + + // Unless they're NESTED_INSIDE_CPOW, we always defer async messages. + // Note that we never send an async NESTED_INSIDE_SYNC message. + if (!aMsg.is_sync()) { + MOZ_RELEASE_ASSERT(aMsg.nested_level() == IPC::Message::NOT_NESTED); + MOZ_ASSERT(IsAlwaysDeferred(aMsg)); + return true; + } + + MOZ_ASSERT(!IsAlwaysDeferred(aMsg)); + + int msgNestedLevel = aMsg.nested_level(); + int waitingNestedLevel = AwaitingSyncReplyNestedLevel(); + + // Always defer if the nested level of the incoming message is less than the + // nested level of the message we're awaiting. + if (msgNestedLevel < waitingNestedLevel) return true; + + // Never defer if the message has strictly greater nested level. + if (msgNestedLevel > waitingNestedLevel) return false; + + // When both sides send sync messages of the same nested level, we resolve the + // race by dispatching in the child and deferring the incoming message in + // the parent. However, the parent still needs to dispatch nested sync + // messages. + // + // Deferring in the parent only sort of breaks message ordering. When the + // child's message comes in, we can pretend the child hasn't quite + // finished sending it yet. Since the message is sync, we know that the + // child hasn't moved on yet. + return mSide == ParentSide && + aMsg.transaction_id() != CurrentNestedInsideSyncTransaction(); +} + +void MessageChannel::OnMessageReceivedFromLink(UniquePtr<Message> aMsg) { + mMonitor->AssertCurrentThreadOwns(); + MOZ_ASSERT(mChannelState == ChannelConnected); + + if (MaybeInterceptSpecialIOMessage(*aMsg)) { + return; + } + + mListener->OnChannelReceivedMessage(*aMsg); + + // If we're awaiting a sync reply, we know that it needs to be immediately + // handled to unblock us. + if (aMsg->is_sync() && aMsg->is_reply()) { + IPC_LOG("Received reply seqno=%d xid=%d", aMsg->seqno(), + aMsg->transaction_id()); + + if (aMsg->seqno() == mTimedOutMessageSeqno) { + // Drop the message, but allow future sync messages to be sent. + IPC_LOG("Received reply to timedout message; igoring; xid=%d", + mTimedOutMessageSeqno); + EndTimeout(); + return; + } + + MOZ_RELEASE_ASSERT(AwaitingSyncReply()); + MOZ_RELEASE_ASSERT(!mTimedOutMessageSeqno); + + mTransactionStack->HandleReply(std::move(aMsg)); + NotifyWorkerThread(); + return; + } + + // Nested messages cannot be compressed. + MOZ_RELEASE_ASSERT(aMsg->compress_type() == IPC::Message::COMPRESSION_NONE || + aMsg->nested_level() == IPC::Message::NOT_NESTED); + + if (aMsg->compress_type() == IPC::Message::COMPRESSION_ENABLED && + !mPending.isEmpty()) { + auto* last = mPending.getLast(); + last->AssertMonitorHeld(*mMonitor); + bool compress = last->Msg()->type() == aMsg->type() && + last->Msg()->routing_id() == aMsg->routing_id(); + if (compress) { + // This message type has compression enabled, and the back of the + // queue was the same message type and routed to the same destination. + // Replace it with the newer message. + MOZ_RELEASE_ASSERT(last->Msg()->compress_type() == + IPC::Message::COMPRESSION_ENABLED); + last->Msg() = std::move(aMsg); + return; + } + } else if (aMsg->compress_type() == IPC::Message::COMPRESSION_ALL && + !mPending.isEmpty()) { + for (MessageTask* p = mPending.getLast(); p; p = p->getPrevious()) { + p->AssertMonitorHeld(*mMonitor); + if (p->Msg()->type() == aMsg->type() && + p->Msg()->routing_id() == aMsg->routing_id()) { + // This message type has compression enabled, and the queue + // holds a message with the same message type and routed to the + // same destination. Erase it. Note that, since we always + // compress these redundancies, There Can Be Only One. + MOZ_RELEASE_ASSERT(p->Msg()->compress_type() == + IPC::Message::COMPRESSION_ALL); + MOZ_RELEASE_ASSERT(IsAlwaysDeferred(*p->Msg())); + p->remove(); + break; + } + } + } + + bool alwaysDeferred = IsAlwaysDeferred(*aMsg); + + bool shouldWakeUp = AwaitingSyncReply() && !ShouldDeferMessage(*aMsg); + + IPC_LOG("Receive from link; seqno=%d, xid=%d, shouldWakeUp=%d", aMsg->seqno(), + aMsg->transaction_id(), shouldWakeUp); + + // There are two cases we're concerned about, relating to the state of the + // worker thread: + // + // (1) We are waiting on a sync reply - worker thread is blocked on the + // IPC monitor. + // - If the message is NESTED_INSIDE_SYNC, we wake up the worker thread to + // deliver the message depending on ShouldDeferMessage. Otherwise, we + // leave it in the mPending queue, posting a task to the worker event + // loop, where it will be processed once the synchronous reply has been + // received. + // + // (2) We are not waiting on a reply. + // - We post a task to the worker event loop. + // + // Note that, we may notify the worker thread even though the monitor is not + // blocked. This is okay, since we always check for pending events before + // blocking again. + + RefPtr<MessageTask> task = new MessageTask(this, std::move(aMsg)); + mPending.insertBack(task); + + if (!alwaysDeferred) { + mMaybeDeferredPendingCount++; + } + + if (shouldWakeUp) { + NotifyWorkerThread(); + } + + // Although we usually don't need to post a message task if + // shouldWakeUp is true, it's easier to post anyway than to have to + // guarantee that every Send call processes everything it's supposed to + // before returning. + task->AssertMonitorHeld(*mMonitor); + task->Post(); +} + +void MessageChannel::PeekMessages( + const std::function<bool(const Message& aMsg)>& aInvoke) { + // FIXME: We shouldn't be holding the lock for aInvoke! + MonitorAutoLock lock(*mMonitor); + + for (MessageTask* it : mPending) { + it->AssertMonitorHeld(*mMonitor); + const Message& msg = *it->Msg(); + if (!aInvoke(msg)) { + break; + } + } +} + +void MessageChannel::ProcessPendingRequests( + ActorLifecycleProxy* aProxy, AutoEnterTransaction& aTransaction) { + mMonitor->AssertCurrentThreadOwns(); + + AssertMaybeDeferredCountCorrect(); + if (mMaybeDeferredPendingCount == 0) { + return; + } + + IPC_LOG("ProcessPendingRequests for seqno=%d, xid=%d", + aTransaction.SequenceNumber(), aTransaction.TransactionID()); + + // Loop until there aren't any more nested messages to process. + for (;;) { + // If we canceled during ProcessPendingRequest, then we need to leave + // immediately because the results of ShouldDeferMessage will be + // operating with weird state (as if no Send is in progress). That could + // cause even NOT_NESTED sync messages to be processed (but not + // NOT_NESTED async messages), which would break message ordering. + if (aTransaction.IsCanceled()) { + return; + } + + Vector<UniquePtr<Message>> toProcess; + + for (MessageTask* p = mPending.getFirst(); p;) { + p->AssertMonitorHeld(*mMonitor); + UniquePtr<Message>& msg = p->Msg(); + + MOZ_RELEASE_ASSERT(!aTransaction.IsCanceled(), + "Calling ShouldDeferMessage when cancelled"); + bool defer = ShouldDeferMessage(*msg); + + // Only log the interesting messages. + if (msg->is_sync() || + msg->nested_level() == IPC::Message::NESTED_INSIDE_CPOW) { + IPC_LOG("ShouldDeferMessage(seqno=%d) = %d", msg->seqno(), defer); + } + + if (!defer) { + MOZ_ASSERT(!IsAlwaysDeferred(*msg)); + + if (!toProcess.append(std::move(msg))) MOZ_CRASH(); + + mMaybeDeferredPendingCount--; + + p = p->removeAndGetNext(); + continue; + } + p = p->getNext(); + } + + if (toProcess.empty()) { + break; + } + + // Processing these messages could result in more messages, so we + // loop around to check for more afterwards. + + for (auto& msg : toProcess) { + ProcessPendingRequest(aProxy, std::move(msg)); + } + } + + AssertMaybeDeferredCountCorrect(); +} + +bool MessageChannel::Send(UniquePtr<Message> aMsg, UniquePtr<Message>* aReply) { + mozilla::TimeStamp start = TimeStamp::Now(); + if (aMsg->size() >= kMinTelemetryMessageSize) { + Telemetry::Accumulate(Telemetry::IPC_MESSAGE_SIZE2, aMsg->size()); + } + + // Sanity checks. + AssertWorkerThread(); + mMonitor->AssertNotCurrentThreadOwns(); + MOZ_RELEASE_ASSERT(!mIsSameThreadChannel, + "sync send over same-thread channel will deadlock!"); + + RefPtr<ActorLifecycleProxy> proxy = Listener()->GetLifecycleProxy(); + +#ifdef OS_WIN + SyncStackFrame frame(this); + NeuteredWindowRegion neuteredRgn(mFlags & + REQUIRE_DEFERRED_MESSAGE_PROTECTION); +#endif + + AutoSetValue<bool> setOnCxxStack(mOnCxxStack, true); + + MonitorAutoLock lock(*mMonitor); + + if (mTimedOutMessageSeqno) { + // Don't bother sending another sync message if a previous one timed out + // and we haven't received a reply for it. Once the original timed-out + // message receives a reply, we'll be able to send more sync messages + // again. + IPC_LOG("Send() failed due to previous timeout"); + mLastSendError = SyncSendError::PreviousTimeout; + return false; + } + + if (DispatchingSyncMessageNestedLevel() == IPC::Message::NOT_NESTED && + aMsg->nested_level() > IPC::Message::NOT_NESTED) { + // Don't allow sending CPOWs while we're dispatching a sync message. + IPC_LOG("Nested level forbids send"); + mLastSendError = SyncSendError::SendingCPOWWhileDispatchingSync; + return false; + } + + if (DispatchingSyncMessageNestedLevel() == IPC::Message::NESTED_INSIDE_CPOW || + DispatchingAsyncMessageNestedLevel() == + IPC::Message::NESTED_INSIDE_CPOW) { + // Generally only the parent dispatches urgent messages. And the only + // sync messages it can send are NESTED_INSIDE_SYNC. Mainly we want to + // ensure here that we don't return false for non-CPOW messages. + MOZ_RELEASE_ASSERT(aMsg->nested_level() == + IPC::Message::NESTED_INSIDE_SYNC); + IPC_LOG("Sending while dispatching urgent message"); + mLastSendError = SyncSendError::SendingCPOWWhileDispatchingUrgent; + return false; + } + + if (aMsg->nested_level() < DispatchingSyncMessageNestedLevel() || + aMsg->nested_level() < AwaitingSyncReplyNestedLevel()) { + MOZ_RELEASE_ASSERT(DispatchingSyncMessage() || DispatchingAsyncMessage()); + IPC_LOG("Cancel from Send"); + auto cancel = + MakeUnique<CancelMessage>(CurrentNestedInsideSyncTransaction()); + CancelTransaction(CurrentNestedInsideSyncTransaction()); + SendMessageToLink(std::move(cancel)); + } + + IPC_ASSERT(aMsg->is_sync(), "can only Send() sync messages here"); + + IPC_ASSERT(aMsg->nested_level() >= DispatchingSyncMessageNestedLevel(), + "can't send sync message of a lesser nested level than what's " + "being dispatched"); + IPC_ASSERT(AwaitingSyncReplyNestedLevel() <= aMsg->nested_level(), + "nested sync message sends must be of increasing nested level"); + IPC_ASSERT( + DispatchingSyncMessageNestedLevel() != IPC::Message::NESTED_INSIDE_CPOW, + "not allowed to send messages while dispatching urgent messages"); + + IPC_ASSERT( + DispatchingAsyncMessageNestedLevel() != IPC::Message::NESTED_INSIDE_CPOW, + "not allowed to send messages while dispatching urgent messages"); + + if (!Connected()) { + ReportConnectionError("SendAndWait", aMsg->type()); + mLastSendError = SyncSendError::NotConnectedBeforeSend; + return false; + } + + aMsg->set_seqno(NextSeqno()); + + int32_t seqno = aMsg->seqno(); + int nestedLevel = aMsg->nested_level(); + msgid_t replyType = aMsg->type() + 1; + + AutoEnterTransaction* stackTop = mTransactionStack; + + // If the most recent message on the stack is NESTED_INSIDE_SYNC, then our + // message should nest inside that and we use the same transaction + // ID. Otherwise we need a new transaction ID (so we use the seqno of the + // message we're sending). + bool nest = + stackTop && stackTop->NestedLevel() == IPC::Message::NESTED_INSIDE_SYNC; + int32_t transaction = nest ? stackTop->TransactionID() : seqno; + aMsg->set_transaction_id(transaction); + + AutoEnterTransaction transact(this, seqno, transaction, nestedLevel); + + IPC_LOG("Send seqno=%d, xid=%d", seqno, transaction); + + // aMsg will be destroyed soon, let's keep its type. + const char* msgName = aMsg->name(); + const msgid_t msgType = aMsg->type(); + + AddProfilerMarker(*aMsg, MessageDirection::eSending); + SendMessageToLink(std::move(aMsg)); + + while (true) { + MOZ_RELEASE_ASSERT(!transact.IsCanceled()); + ProcessPendingRequests(proxy, transact); + if (transact.IsComplete()) { + break; + } + if (!Connected()) { + ReportConnectionError("Send", msgType); + mLastSendError = SyncSendError::DisconnectedDuringSend; + return false; + } + + MOZ_RELEASE_ASSERT(!mTimedOutMessageSeqno); + MOZ_RELEASE_ASSERT(!transact.IsComplete()); + MOZ_RELEASE_ASSERT(mTransactionStack == &transact); + + bool maybeTimedOut = !WaitForSyncNotify(); + + if (mListener->NeedArtificialSleep()) { + MonitorAutoUnlock unlock(*mMonitor); + mListener->ArtificialSleep(); + } + + if (!Connected()) { + ReportConnectionError("SendAndWait", msgType); + mLastSendError = SyncSendError::DisconnectedDuringSend; + return false; + } + + if (transact.IsCanceled()) { + break; + } + + MOZ_RELEASE_ASSERT(mTransactionStack == &transact); + + // We only time out a message if it initiated a new transaction (i.e., + // if neither side has any other message Sends on the stack). + bool canTimeOut = transact.IsBottom(); + if (maybeTimedOut && canTimeOut && !ShouldContinueFromTimeout()) { + // Since ShouldContinueFromTimeout drops the lock, we need to + // re-check all our conditions here. We shouldn't time out if any of + // these things happen because there won't be a reply to the timed + // out message in these cases. + if (transact.IsComplete()) { + break; + } + + IPC_LOG("Timing out Send: xid=%d", transaction); + + mTimedOutMessageSeqno = seqno; + mTimedOutMessageNestedLevel = nestedLevel; + mLastSendError = SyncSendError::TimedOut; + return false; + } + + if (transact.IsCanceled()) { + break; + } + } + + if (transact.IsCanceled()) { + IPC_LOG("Other side canceled seqno=%d, xid=%d", seqno, transaction); + mLastSendError = SyncSendError::CancelledAfterSend; + return false; + } + + if (transact.IsError()) { + IPC_LOG("Error: seqno=%d, xid=%d", seqno, transaction); + mLastSendError = SyncSendError::ReplyError; + return false; + } + + uint32_t latencyMs = round((TimeStamp::Now() - start).ToMilliseconds()); + IPC_LOG("Got reply: seqno=%d, xid=%d, msgName=%s, latency=%ums", seqno, + transaction, msgName, latencyMs); + + UniquePtr<Message> reply = transact.GetReply(); + + MOZ_RELEASE_ASSERT(reply); + MOZ_RELEASE_ASSERT(reply->is_reply(), "expected reply"); + MOZ_RELEASE_ASSERT(!reply->is_reply_error()); + MOZ_RELEASE_ASSERT(reply->seqno() == seqno); + MOZ_RELEASE_ASSERT(reply->type() == replyType, "wrong reply type"); + MOZ_RELEASE_ASSERT(reply->is_sync()); + + AddProfilerMarker(*reply, MessageDirection::eReceiving); + + if (reply->size() >= kMinTelemetryMessageSize) { + Telemetry::Accumulate(Telemetry::IPC_REPLY_SIZE, + nsDependentCString(msgName), reply->size()); + } + + *aReply = std::move(reply); + + // NOTE: Only collect IPC_SYNC_MAIN_LATENCY_MS on the main thread (bug + // 1343729) + if (NS_IsMainThread() && latencyMs >= kMinTelemetrySyncIPCLatencyMs) { + Telemetry::Accumulate(Telemetry::IPC_SYNC_MAIN_LATENCY_MS, + nsDependentCString(msgName), latencyMs); + } + return true; +} + +bool MessageChannel::HasPendingEvents() { + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + return ConnectedOrClosing() && !mPending.isEmpty(); +} + +bool MessageChannel::ProcessPendingRequest(ActorLifecycleProxy* aProxy, + UniquePtr<Message> aUrgent) { + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + + IPC_LOG("Process pending: seqno=%d, xid=%d", aUrgent->seqno(), + aUrgent->transaction_id()); + + // keep the error relevant information + msgid_t msgType = aUrgent->type(); + + DispatchMessage(aProxy, std::move(aUrgent)); + if (!ConnectedOrClosing()) { + ReportConnectionError("ProcessPendingRequest", msgType); + return false; + } + + return true; +} + +bool MessageChannel::ShouldRunMessage(const Message& aMsg) { + if (!mTimedOutMessageSeqno) { + return true; + } + + // If we've timed out a message and we're awaiting the reply to the timed + // out message, we have to be careful what messages we process. Here's what + // can go wrong: + // 1. child sends a NOT_NESTED sync message S + // 2. parent sends a NESTED_INSIDE_SYNC sync message H at the same time + // 3. parent times out H + // 4. child starts processing H and sends a NESTED_INSIDE_SYNC message H' + // nested within the same transaction + // 5. parent dispatches S and sends reply + // 6. child asserts because it instead expected a reply to H'. + // + // To solve this, we refuse to process S in the parent until we get a reply + // to H. More generally, let the timed out message be M. We don't process a + // message unless the child would need the response to that message in order + // to process M. Those messages are the ones that have a higher nested level + // than M or that are part of the same transaction as M. + if (aMsg.nested_level() < mTimedOutMessageNestedLevel || + (aMsg.nested_level() == mTimedOutMessageNestedLevel && + aMsg.transaction_id() != mTimedOutMessageSeqno)) { + return false; + } + + return true; +} + +void MessageChannel::RunMessage(ActorLifecycleProxy* aProxy, + MessageTask& aTask) { + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + aTask.AssertMonitorHeld(*mMonitor); + + UniquePtr<Message>& msg = aTask.Msg(); + + if (!ConnectedOrClosing()) { + ReportConnectionError("RunMessage", msg->type()); + return; + } + + // Check that we're going to run the first message that's valid to run. +#if 0 +# ifdef DEBUG + for (MessageTask* task : mPending) { + if (task == &aTask) { + break; + } + + MOZ_ASSERT(!ShouldRunMessage(*task->Msg()) || + aTask.Msg()->priority() != task->Msg()->priority()); + + } +# endif +#endif + + if (!ShouldRunMessage(*msg)) { + return; + } + + MOZ_RELEASE_ASSERT(aTask.isInList()); + aTask.remove(); + + if (!IsAlwaysDeferred(*msg)) { + mMaybeDeferredPendingCount--; + } + + DispatchMessage(aProxy, std::move(msg)); +} + +NS_IMPL_ISUPPORTS_INHERITED(MessageChannel::MessageTask, CancelableRunnable, + nsIRunnablePriority, nsIRunnableIPCMessageType) + +static uint32_t ToRunnablePriority(IPC::Message::PriorityValue aPriority) { + switch (aPriority) { + case IPC::Message::NORMAL_PRIORITY: + return nsIRunnablePriority::PRIORITY_NORMAL; + case IPC::Message::INPUT_PRIORITY: + return nsIRunnablePriority::PRIORITY_INPUT_HIGH; + case IPC::Message::VSYNC_PRIORITY: + return nsIRunnablePriority::PRIORITY_VSYNC; + case IPC::Message::MEDIUMHIGH_PRIORITY: + return nsIRunnablePriority::PRIORITY_MEDIUMHIGH; + case IPC::Message::CONTROL_PRIORITY: + return nsIRunnablePriority::PRIORITY_CONTROL; + default: + MOZ_ASSERT_UNREACHABLE(); + return nsIRunnablePriority::PRIORITY_NORMAL; + } +} + +MessageChannel::MessageTask::MessageTask(MessageChannel* aChannel, + UniquePtr<Message> aMessage) + : CancelableRunnable(aMessage->name()), + mMonitor(aChannel->mMonitor), + mChannel(aChannel), + mMessage(std::move(aMessage)), + mPriority(ToRunnablePriority(mMessage->priority())), + mScheduled(false) +#ifdef FUZZING_SNAPSHOT + , + mIsFuzzMsg(mMessage->IsFuzzMsg()), + mFuzzStopped(false) +#endif +{ + MOZ_DIAGNOSTIC_ASSERT(mMessage, "message may not be null"); +#ifdef FUZZING_SNAPSHOT + if (mIsFuzzMsg) { + MOZ_FUZZING_IPC_MT_CTOR(); + } +#endif +} + +MessageChannel::MessageTask::~MessageTask() { +#ifdef FUZZING_SNAPSHOT + // We track fuzzing messages until their run is complete. To make sure + // that we don't miss messages that are for some reason destroyed without + // being run (e.g. canceled), we catch this condition in the destructor. + if (mIsFuzzMsg && !mFuzzStopped) { + MOZ_FUZZING_IPC_MT_STOP(); + } else if (!mIsFuzzMsg && !fuzzing::Nyx::instance().started()) { + MOZ_FUZZING_IPC_PRE_FUZZ_MT_STOP(); + } +#endif +} + +nsresult MessageChannel::MessageTask::Run() { + mMonitor->AssertNotCurrentThreadOwns(); + + // Drop the toplevel actor's lifecycle proxy outside of our monitor if we take + // it, as destroying our ActorLifecycleProxy reference can acquire the + // monitor. + RefPtr<ActorLifecycleProxy> proxy; + + MonitorAutoLock lock(*mMonitor); + + // In case we choose not to run this message, we may need to be able to Post + // it again. + mScheduled = false; + + if (!isInList()) { + return NS_OK; + } + +#ifdef FUZZING_SNAPSHOT + if (!mIsFuzzMsg) { + if (fuzzing::Nyx::instance().started()) { + // Once we started fuzzing, prevent non-fuzzing tasks from being + // run and potentially blocking worker threads. + // + // TODO: This currently blocks all MessageTasks from running, not + // just those belonging to the target process pair. We currently + // do this for performance reasons, but it should be re-evaluated + // at a later stage when we found a better snapshot point. + return NS_OK; + } + // Record all running tasks prior to fuzzing, so we can wait for + // them to settle before snapshotting. + MOZ_FUZZING_IPC_PRE_FUZZ_MT_RUN(); + } +#endif + + Channel()->AssertWorkerThread(); + mMonitor->AssertSameMonitor(*Channel()->mMonitor); + proxy = Channel()->Listener()->GetLifecycleProxy(); + Channel()->RunMessage(proxy, *this); + +#ifdef FUZZING_SNAPSHOT + if (mIsFuzzMsg && !mFuzzStopped) { + MOZ_FUZZING_IPC_MT_STOP(); + mFuzzStopped = true; + } +#endif + return NS_OK; +} + +// Warning: This method removes the receiver from whatever list it might be in. +nsresult MessageChannel::MessageTask::Cancel() { + mMonitor->AssertNotCurrentThreadOwns(); + + MonitorAutoLock lock(*mMonitor); + + if (!isInList()) { + return NS_OK; + } + + Channel()->AssertWorkerThread(); + mMonitor->AssertSameMonitor(*Channel()->mMonitor); + if (!IsAlwaysDeferred(*Msg())) { + Channel()->mMaybeDeferredPendingCount--; + } + + remove(); + +#ifdef FUZZING_SNAPSHOT + if (mIsFuzzMsg && !mFuzzStopped) { + MOZ_FUZZING_IPC_MT_STOP(); + mFuzzStopped = true; + } +#endif + + return NS_OK; +} + +void MessageChannel::MessageTask::Post() { + mMonitor->AssertCurrentThreadOwns(); + mMonitor->AssertSameMonitor(*Channel()->mMonitor); + MOZ_RELEASE_ASSERT(!mScheduled); + MOZ_RELEASE_ASSERT(isInList()); + + mScheduled = true; + + Channel()->mWorkerThread->Dispatch(do_AddRef(this)); +} + +NS_IMETHODIMP +MessageChannel::MessageTask::GetPriority(uint32_t* aPriority) { + *aPriority = mPriority; + return NS_OK; +} + +NS_IMETHODIMP +MessageChannel::MessageTask::GetType(uint32_t* aType) { + mMonitor->AssertNotCurrentThreadOwns(); + + MonitorAutoLock lock(*mMonitor); + if (!mMessage) { + // If mMessage has been moved already elsewhere, we can't know what the type + // has been. + return NS_ERROR_FAILURE; + } + + *aType = mMessage->type(); + return NS_OK; +} + +void MessageChannel::DispatchMessage(ActorLifecycleProxy* aProxy, + UniquePtr<Message> aMsg) { + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + + Maybe<AutoNoJSAPI> nojsapi; + if (NS_IsMainThread() && CycleCollectedJSContext::Get()) { + nojsapi.emplace(); + } + + UniquePtr<Message> reply; + + IPC_LOG("DispatchMessage: seqno=%d, xid=%d", aMsg->seqno(), + aMsg->transaction_id()); + AddProfilerMarker(*aMsg, MessageDirection::eReceiving); + + { + AutoEnterTransaction transaction(this, *aMsg); + + int id = aMsg->transaction_id(); + MOZ_RELEASE_ASSERT(!aMsg->is_sync() || id == transaction.TransactionID()); + + { + MonitorAutoUnlock unlock(*mMonitor); + AutoSetValue<bool> setOnCxxStack(mOnCxxStack, true); + + mListener->ArtificialSleep(); + + if (aMsg->is_sync()) { + DispatchSyncMessage(aProxy, *aMsg, reply); + } else { + DispatchAsyncMessage(aProxy, *aMsg); + } + + mListener->ArtificialSleep(); + } + + if (reply && transaction.IsCanceled()) { + // The transaction has been canceled. Don't send a reply. + IPC_LOG("Nulling out reply due to cancellation, seqno=%d, xid=%d", + aMsg->seqno(), id); + reply = nullptr; + } + } + + if (reply && ChannelConnected == mChannelState) { + IPC_LOG("Sending reply seqno=%d, xid=%d", aMsg->seqno(), + aMsg->transaction_id()); + AddProfilerMarker(*reply, MessageDirection::eSending); + + SendMessageToLink(std::move(reply)); + } +} + +void MessageChannel::DispatchSyncMessage(ActorLifecycleProxy* aProxy, + const Message& aMsg, + UniquePtr<Message>& aReply) { + AssertWorkerThread(); + + mozilla::TimeStamp start = TimeStamp::Now(); + + int nestedLevel = aMsg.nested_level(); + + MOZ_RELEASE_ASSERT(nestedLevel == IPC::Message::NOT_NESTED || + NS_IsMainThread()); + + MessageChannel* dummy; + MessageChannel*& blockingVar = + mSide == ChildSide && NS_IsMainThread() ? gParentProcessBlocker : dummy; + + Result rv; + { + AutoSetValue<MessageChannel*> blocked(blockingVar, this); + rv = aProxy->Get()->OnMessageReceived(aMsg, aReply); + } + + uint32_t latencyMs = round((TimeStamp::Now() - start).ToMilliseconds()); + if (latencyMs >= kMinTelemetrySyncIPCLatencyMs) { + Telemetry::Accumulate(Telemetry::IPC_SYNC_RECEIVE_MS, + nsDependentCString(aMsg.name()), latencyMs); + } + + if (!MaybeHandleError(rv, aMsg, "DispatchSyncMessage")) { + aReply = Message::ForSyncDispatchError(aMsg.nested_level()); + } + aReply->set_seqno(aMsg.seqno()); + aReply->set_transaction_id(aMsg.transaction_id()); +} + +void MessageChannel::DispatchAsyncMessage(ActorLifecycleProxy* aProxy, + const Message& aMsg) { + AssertWorkerThread(); + MOZ_RELEASE_ASSERT(!aMsg.is_sync()); + + if (aMsg.routing_id() == MSG_ROUTING_NONE) { + NS_WARNING("unhandled special message!"); + MaybeHandleError(MsgNotKnown, aMsg, "DispatchAsyncMessage"); + return; + } + + Result rv; + { + int nestedLevel = aMsg.nested_level(); + AutoSetValue<bool> async(mDispatchingAsyncMessage, true); + AutoSetValue<int> nestedLevelSet(mDispatchingAsyncMessageNestedLevel, + nestedLevel); + rv = aProxy->Get()->OnMessageReceived(aMsg); + } + MaybeHandleError(rv, aMsg, "DispatchAsyncMessage"); +} + +void MessageChannel::EnqueuePendingMessages() { + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + + // XXX performance tuning knob: could process all or k pending + // messages here, rather than enqueuing for later processing + + RepostAllMessages(); +} + +bool MessageChannel::WaitResponse(bool aWaitTimedOut) { + AssertWorkerThread(); + if (aWaitTimedOut) { + if (mInTimeoutSecondHalf) { + // We've really timed out this time. + return false; + } + // Try a second time. + mInTimeoutSecondHalf = true; + } else { + mInTimeoutSecondHalf = false; + } + return true; +} + +#ifndef OS_WIN +bool MessageChannel::WaitForSyncNotify() { + AssertWorkerThread(); +# ifdef DEBUG + // WARNING: We don't release the lock here. We can't because the link + // could signal at this time and we would miss it. Instead we require + // ArtificialTimeout() to be extremely simple. + if (mListener->ArtificialTimeout()) { + return false; + } +# endif + + MOZ_RELEASE_ASSERT(!mIsSameThreadChannel, + "Wait on same-thread channel will deadlock!"); + + TimeDuration timeout = (kNoTimeout == mTimeoutMs) + ? TimeDuration::Forever() + : TimeDuration::FromMilliseconds(mTimeoutMs); + CVStatus status = mMonitor->Wait(timeout); + + // If the timeout didn't expire, we know we received an event. The + // converse is not true. + return WaitResponse(status == CVStatus::Timeout); +} + +void MessageChannel::NotifyWorkerThread() { mMonitor->Notify(); } +#endif + +bool MessageChannel::ShouldContinueFromTimeout() { + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + + bool cont; + { + MonitorAutoUnlock unlock(*mMonitor); + cont = mListener->ShouldContinueFromReplyTimeout(); + mListener->ArtificialSleep(); + } + + static enum { + UNKNOWN, + NOT_DEBUGGING, + DEBUGGING + } sDebuggingChildren = UNKNOWN; + + if (sDebuggingChildren == UNKNOWN) { + sDebuggingChildren = + getenv("MOZ_DEBUG_CHILD_PROCESS") || getenv("MOZ_DEBUG_CHILD_PAUSE") + ? DEBUGGING + : NOT_DEBUGGING; + } + if (sDebuggingChildren == DEBUGGING) { + return true; + } + + return cont; +} + +void MessageChannel::SetReplyTimeoutMs(int32_t aTimeoutMs) { + // Set channel timeout value. Since this is broken up into + // two period, the minimum timeout value is 2ms. + AssertWorkerThread(); + mTimeoutMs = + (aTimeoutMs <= 0) ? kNoTimeout : (int32_t)ceil((double)aTimeoutMs / 2.0); +} + +void MessageChannel::ReportConnectionError(const char* aFunctionName, + const uint32_t aMsgType) const { + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + + const char* errorMsg = nullptr; + switch (mChannelState) { + case ChannelClosed: + errorMsg = "Closed channel: cannot send/recv"; + break; + case ChannelClosing: + errorMsg = "Channel closing: too late to send, messages will be lost"; + break; + case ChannelError: + errorMsg = "Channel error: cannot send/recv"; + break; + + default: + MOZ_CRASH("unreached"); + } + + // IPC connection errors are fairly common, especially "Channel closing: too + // late to send/recv, messages will be lost", so shouldn't be being reported + // on release builds, as that's misleading as to their severity. + NS_WARNING(nsPrintfCString("IPC Connection Error: [%s][%s] %s(msgname=%s) %s", + StringFromIPCSide(mSide), mName, aFunctionName, + IPC::StringFromIPCMessageType(aMsgType), errorMsg) + .get()); + + MonitorAutoUnlock unlock(*mMonitor); + mListener->ProcessingError(MsgDropped, errorMsg); +} + +void MessageChannel::ReportMessageRouteError(const char* channelName) const { + PrintErrorMessage(mSide, channelName, "Need a route"); + mListener->ProcessingError(MsgRouteError, "MsgRouteError"); +} + +bool MessageChannel::MaybeHandleError(Result code, const Message& aMsg, + const char* channelName) { + if (MsgProcessed == code) return true; + + const char* errorMsg = nullptr; + switch (code) { + case MsgNotKnown: + errorMsg = "Unknown message: not processed"; + break; + case MsgNotAllowed: + errorMsg = "Message not allowed: cannot be sent/recvd in this state"; + break; + case MsgPayloadError: + errorMsg = "Payload error: message could not be deserialized"; + break; + case MsgProcessingError: + errorMsg = + "Processing error: message was deserialized, but the handler " + "returned false (indicating failure)"; + break; + case MsgRouteError: + errorMsg = "Route error: message sent to unknown actor ID"; + break; + case MsgValueError: + errorMsg = + "Value error: message was deserialized, but contained an illegal " + "value"; + break; + + default: + MOZ_CRASH("unknown Result code"); + return false; + } + + char reason[512]; + const char* msgname = aMsg.name(); + if (msgname[0] == '?') { + SprintfLiteral(reason, "(msgtype=0x%X) %s", aMsg.type(), errorMsg); + } else { + SprintfLiteral(reason, "%s %s", msgname, errorMsg); + } + + PrintErrorMessage(mSide, channelName, reason); + + // Error handled in mozilla::ipc::IPCResult. + if (code == MsgProcessingError) { + return false; + } + + mListener->ProcessingError(code, reason); + + return false; +} + +void MessageChannel::OnChannelErrorFromLink() { + mMonitor->AssertCurrentThreadOwns(); + MOZ_ASSERT(mChannelState == ChannelConnected); + + IPC_LOG("OnChannelErrorFromLink"); + + if (AwaitingSyncReply()) { + NotifyWorkerThread(); + } + + if (mAbortOnError) { + // mAbortOnError is set by main actors (e.g., ContentChild) to ensure + // that the process terminates even if normal shutdown is prevented. + // A MOZ_CRASH() here is not helpful because crash reporting relies + // on the parent process which we know is dead or otherwise unusable. + // + // Additionally, the parent process can (and often is) killed on Android + // when apps are backgrounded. We don't need to report a crash for + // normal behavior in that case. + printf_stderr("Exiting due to channel error.\n"); + ProcessChild::QuickExit(); + } + mChannelState = ChannelError; + mMonitor->Notify(); + + PostErrorNotifyTask(); +} + +void MessageChannel::NotifyMaybeChannelError(ReleasableMonitorAutoLock& aLock) { + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + aLock.AssertCurrentThreadOwns(); + MOZ_ASSERT(mChannelState != ChannelConnected); + + if (ChannelClosing == mChannelState || ChannelClosed == mChannelState) { + // the channel closed, but we received a "Goodbye" message warning us + // about it. no worries + mChannelState = ChannelClosed; + NotifyChannelClosed(aLock); + return; + } + + MOZ_ASSERT(ChannelError == mChannelState); + + Clear(); + + // IPDL assumes these notifications do not fire twice, so we do not let + // that happen. + if (mNotifiedChannelDone) { + return; + } + mNotifiedChannelDone = true; + + // Let our listener know that the channel errored. This may cause the + // channel to be deleted. Release our caller's `MonitorAutoLock` before + // invoking the listener, as this may call back into MessageChannel, and/or + // cause the channel to be destroyed. + aLock.Unlock(); + mListener->OnChannelError(); +} + +void MessageChannel::OnNotifyMaybeChannelError() { + AssertWorkerThread(); + mMonitor->AssertNotCurrentThreadOwns(); + + // This lock guard may be reset by `NotifyMaybeChannelError` before invoking + // listener callbacks which may destroy this `MessageChannel`. + // + // Acquiring the lock here also allows us to ensure that + // `OnChannelErrorFromLink` has finished running before this task is allowed + // to continue. + ReleasableMonitorAutoLock lock(*mMonitor); + + mChannelErrorTask = nullptr; + + if (IsOnCxxStack()) { + // This used to post a 10ms delayed task; however not all + // nsISerialEventTarget implementations support delayed dispatch. + // The delay being completely arbitrary, we may not as well have any. + PostErrorNotifyTask(); + return; + } + + // This call may destroy `this`. + NotifyMaybeChannelError(lock); +} + +void MessageChannel::PostErrorNotifyTask() { + mMonitor->AssertCurrentThreadOwns(); + + if (mChannelErrorTask) { + return; + } + + // This must be the last code that runs on this thread! + mChannelErrorTask = NewNonOwningCancelableRunnableMethod( + "ipc::MessageChannel::OnNotifyMaybeChannelError", this, + &MessageChannel::OnNotifyMaybeChannelError); + mWorkerThread->Dispatch(do_AddRef(mChannelErrorTask)); +} + +// Special async message. +class GoodbyeMessage : public IPC::Message { + public: + GoodbyeMessage() : IPC::Message(MSG_ROUTING_NONE, GOODBYE_MESSAGE_TYPE) {} + static bool Read(const Message* msg) { return true; } + void Log(const std::string& aPrefix, FILE* aOutf) const { + fputs("(special `Goodbye' message)", aOutf); + } +}; + +void MessageChannel::CloseWithError() { + AssertWorkerThread(); + + // This lock guard may be reset by `NotifyMaybeChannelError` before invoking + // listener callbacks which may destroy this `MessageChannel`. + ReleasableMonitorAutoLock lock(*mMonitor); + + switch (mChannelState) { + case ChannelError: + // Already errored, ensure we notify if we haven't yet. + NotifyMaybeChannelError(lock); + return; + case ChannelClosed: + // Already closed, we can't do anything. + return; + default: + // Either connected or closing, immediately convert to an error, and + // notify. + MOZ_ASSERT(mChannelState == ChannelConnected || + mChannelState == ChannelClosing); + mLink->Close(); + mChannelState = ChannelError; + NotifyMaybeChannelError(lock); + return; + } +} + +void MessageChannel::NotifyImpendingShutdown() { + UniquePtr<Message> msg = + MakeUnique<Message>(MSG_ROUTING_NONE, IMPENDING_SHUTDOWN_MESSAGE_TYPE); + MonitorAutoLock lock(*mMonitor); + if (Connected()) { + SendMessageToLink(std::move(msg)); + } +} + +void MessageChannel::Close() { + AssertWorkerThread(); + mMonitor->AssertNotCurrentThreadOwns(); + + // This lock guard may be reset by `Notify{ChannelClosed,MaybeChannelError}` + // before invoking listener callbacks which may destroy this `MessageChannel`. + ReleasableMonitorAutoLock lock(*mMonitor); + + switch (mChannelState) { + case ChannelError: + // See bug 538586: if the listener gets deleted while the + // IO thread's NotifyChannelError event is still enqueued + // and subsequently deletes us, then the error event will + // also be deleted and the listener will never be notified + // of the channel error. + NotifyMaybeChannelError(lock); + return; + case ChannelClosed: + // Slightly unexpected but harmless; ignore. See bug 1554244. + return; + + default: + // Notify the other side that we're about to close our socket. If we've + // already received a Goodbye from the other side (and our state is + // ChannelClosing), there's no reason to send one. + if (ChannelConnected == mChannelState) { + SendMessageToLink(MakeUnique<GoodbyeMessage>()); + } + mLink->Close(); + mChannelState = ChannelClosed; + NotifyChannelClosed(lock); + return; + } +} + +void MessageChannel::NotifyChannelClosed(ReleasableMonitorAutoLock& aLock) { + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + aLock.AssertCurrentThreadOwns(); + + if (ChannelClosed != mChannelState) { + MOZ_CRASH("channel should have been closed!"); + } + + Clear(); + + // IPDL assumes these notifications do not fire twice, so we do not let + // that happen. + if (mNotifiedChannelDone) { + return; + } + mNotifiedChannelDone = true; + + // Let our listener know that the channel was closed. This may cause the + // channel to be deleted. Release our caller's `MonitorAutoLock` before + // invoking the listener, as this may call back into MessageChannel, and/or + // cause the channel to be destroyed. + aLock.Unlock(); + mListener->OnChannelClose(); +} + +void MessageChannel::DebugAbort(const char* file, int line, const char* cond, + const char* why, bool reply) { + AssertWorkerThread(); + mMonitor->AssertCurrentThreadOwns(); + + printf_stderr( + "###!!! [MessageChannel][%s][%s:%d] " + "Assertion (%s) failed. %s %s\n", + mSide == ChildSide ? "Child" : "Parent", file, line, cond, why, + reply ? "(reply)" : ""); + + MessageQueue pending = std::move(mPending); + while (!pending.isEmpty()) { + pending.getFirst()->AssertMonitorHeld(*mMonitor); + printf_stderr(" [ %s%s ]\n", + pending.getFirst()->Msg()->is_sync() ? "sync" : "async", + pending.getFirst()->Msg()->is_reply() ? "reply" : ""); + pending.popFirst(); + } + + MOZ_CRASH_UNSAFE(why); +} + +void MessageChannel::AddProfilerMarker(const IPC::Message& aMessage, + MessageDirection aDirection) { + mMonitor->AssertCurrentThreadOwns(); + + if (profiler_feature_active(ProfilerFeature::IPCMessages)) { + base::ProcessId pid = mListener->OtherPidMaybeInvalid(); + // Only record markers for IPCs with a valid pid. + // And if one of the profiler mutexes is locked on this thread, don't record + // markers, because we don't want to expose profiler IPCs due to the + // profiler itself, and also to avoid possible re-entrancy issues. + if (pid != base::kInvalidProcessId && + !profiler_is_locked_on_current_thread()) { + // The current timestamp must be given to the `IPCMarker` payload. + [[maybe_unused]] const TimeStamp now = TimeStamp::Now(); + bool isThreadBeingProfiled = + profiler_thread_is_being_profiled_for_markers(); + PROFILER_MARKER( + "IPC", IPC, + mozilla::MarkerOptions( + mozilla::MarkerTiming::InstantAt(now), + // If the thread is being profiled, add the marker to + // the current thread. If the thread is not being + // profiled, add the marker to the main thread. It + // will appear in the main thread's IPC track. Profiler analysis + // UI correlates all the IPC markers from different threads and + // generates processed markers. + isThreadBeingProfiled ? mozilla::MarkerThreadId::CurrentThread() + : mozilla::MarkerThreadId::MainThread()), + IPCMarker, now, now, pid, aMessage.seqno(), aMessage.type(), mSide, + aDirection, MessagePhase::Endpoint, aMessage.is_sync(), + // aOriginThreadId: If the thread is being profiled, do not include a + // thread ID, as it's the same as the markers. Only include this field + // when the marker is being sent from another thread. + isThreadBeingProfiled ? mozilla::MarkerThreadId{} + : mozilla::MarkerThreadId::CurrentThread()); + } + } +} + +void MessageChannel::EndTimeout() { + mMonitor->AssertCurrentThreadOwns(); + + IPC_LOG("Ending timeout of seqno=%d", mTimedOutMessageSeqno); + mTimedOutMessageSeqno = 0; + mTimedOutMessageNestedLevel = 0; + + RepostAllMessages(); +} + +void MessageChannel::RepostAllMessages() { + mMonitor->AssertCurrentThreadOwns(); + + bool needRepost = false; + for (MessageTask* task : mPending) { + task->AssertMonitorHeld(*mMonitor); + if (!task->IsScheduled()) { + needRepost = true; + break; + } + } + if (!needRepost) { + // If everything is already scheduled to run, do nothing. + return; + } + + // In some cases we may have deferred dispatch of some messages in the + // queue. Now we want to run them again. However, we can't just re-post + // those messages since the messages after them in mPending would then be + // before them in the event queue. So instead we cancel everything and + // re-post all messages in the correct order. + MessageQueue queue = std::move(mPending); + while (RefPtr<MessageTask> task = queue.popFirst()) { + task->AssertMonitorHeld(*mMonitor); + RefPtr<MessageTask> newTask = new MessageTask(this, std::move(task->Msg())); + newTask->AssertMonitorHeld(*mMonitor); + mPending.insertBack(newTask); + newTask->Post(); + } + + AssertMaybeDeferredCountCorrect(); +} + +void MessageChannel::CancelTransaction(int transaction) { + mMonitor->AssertCurrentThreadOwns(); + + // When we cancel a transaction, we need to behave as if there's no longer + // any IPC on the stack. Anything we were dispatching or sending will get + // canceled. Consequently, we have to update the state variables below. + // + // We also need to ensure that when any IPC functions on the stack return, + // they don't reset these values using an RAII class like AutoSetValue. To + // avoid that, these RAII classes check if the variable they set has been + // tampered with (by us). If so, they don't reset the variable to the old + // value. + + IPC_LOG("CancelTransaction: xid=%d", transaction); + + // An unusual case: We timed out a transaction which the other side then + // cancelled. In this case we just leave the timedout state and try to + // forget this ever happened. + if (transaction == mTimedOutMessageSeqno) { + IPC_LOG("Cancelled timed out message %d", mTimedOutMessageSeqno); + EndTimeout(); + + // Normally mCurrentTransaction == 0 here. But it can be non-zero if: + // 1. Parent sends NESTED_INSIDE_SYNC message H. + // 2. Parent times out H. + // 3. Child dispatches H and sends nested message H' (same transaction). + // 4. Parent dispatches H' and cancels. + MOZ_RELEASE_ASSERT(!mTransactionStack || + mTransactionStack->TransactionID() == transaction); + if (mTransactionStack) { + mTransactionStack->Cancel(); + } + } else { + MOZ_RELEASE_ASSERT(mTransactionStack->TransactionID() == transaction); + mTransactionStack->Cancel(); + } + + bool foundSync = false; + for (MessageTask* p = mPending.getFirst(); p;) { + p->AssertMonitorHeld(*mMonitor); + UniquePtr<Message>& msg = p->Msg(); + + // If there was a race between the parent and the child, then we may + // have a queued sync message. We want to drop this message from the + // queue since if will get cancelled along with the transaction being + // cancelled. This happens if the message in the queue is + // NESTED_INSIDE_SYNC. + if (msg->is_sync() && msg->nested_level() != IPC::Message::NOT_NESTED) { + MOZ_RELEASE_ASSERT(!foundSync); + MOZ_RELEASE_ASSERT(msg->transaction_id() != transaction); + IPC_LOG("Removing msg from queue seqno=%d xid=%d", msg->seqno(), + msg->transaction_id()); + foundSync = true; + if (!IsAlwaysDeferred(*msg)) { + mMaybeDeferredPendingCount--; + } + p = p->removeAndGetNext(); + continue; + } + + p = p->getNext(); + } + + AssertMaybeDeferredCountCorrect(); +} + +void MessageChannel::CancelCurrentTransaction() { + MonitorAutoLock lock(*mMonitor); + if (DispatchingSyncMessageNestedLevel() >= IPC::Message::NESTED_INSIDE_SYNC) { + if (DispatchingSyncMessageNestedLevel() == + IPC::Message::NESTED_INSIDE_CPOW || + DispatchingAsyncMessageNestedLevel() == + IPC::Message::NESTED_INSIDE_CPOW) { + mListener->IntentionalCrash(); + } + + IPC_LOG("Cancel requested: current xid=%d", + CurrentNestedInsideSyncTransaction()); + MOZ_RELEASE_ASSERT(DispatchingSyncMessage()); + auto cancel = + MakeUnique<CancelMessage>(CurrentNestedInsideSyncTransaction()); + CancelTransaction(CurrentNestedInsideSyncTransaction()); + SendMessageToLink(std::move(cancel)); + } +} + +void CancelCPOWs() { + MOZ_ASSERT(NS_IsMainThread()); + + if (gParentProcessBlocker) { + mozilla::Telemetry::Accumulate(mozilla::Telemetry::IPC_TRANSACTION_CANCEL, + true); + gParentProcessBlocker->CancelCurrentTransaction(); + } +} + +bool MessageChannel::IsCrossProcess() const { + mMonitor->AssertCurrentThreadOwns(); + return mIsCrossProcess; +} + +void MessageChannel::SetIsCrossProcess(bool aIsCrossProcess) { + mMonitor->AssertCurrentThreadOwns(); + if (aIsCrossProcess == mIsCrossProcess) { + return; + } + mIsCrossProcess = aIsCrossProcess; + if (mIsCrossProcess) { + ChannelCountReporter::Increment(mName); + } else { + ChannelCountReporter::Decrement(mName); + } +} + +NS_IMPL_ISUPPORTS(MessageChannel::WorkerTargetShutdownTask, + nsITargetShutdownTask) + +MessageChannel::WorkerTargetShutdownTask::WorkerTargetShutdownTask( + nsISerialEventTarget* aTarget, MessageChannel* aChannel) + : mTarget(aTarget), mChannel(aChannel) {} + +void MessageChannel::WorkerTargetShutdownTask::TargetShutdown() { + MOZ_RELEASE_ASSERT(mTarget->IsOnCurrentThread()); + IPC_LOG("Closing channel due to event target shutdown"); + if (MessageChannel* channel = std::exchange(mChannel, nullptr)) { + channel->Close(); + } +} + +void MessageChannel::WorkerTargetShutdownTask::Clear() { + MOZ_RELEASE_ASSERT(mTarget->IsOnCurrentThread()); + mChannel = nullptr; +} + +NS_IMPL_ISUPPORTS_INHERITED0(MessageChannel::FlushLazySendMessagesRunnable, + CancelableRunnable) + +MessageChannel::FlushLazySendMessagesRunnable::FlushLazySendMessagesRunnable( + MessageChannel* aChannel) + : CancelableRunnable("MessageChannel::FlushLazyMessagesRunnable"), + mChannel(aChannel) {} + +NS_IMETHODIMP MessageChannel::FlushLazySendMessagesRunnable::Run() { + if (mChannel) { + MonitorAutoLock lock(*mChannel->mMonitor); + MOZ_ASSERT(mChannel->mFlushLazySendTask == this); + mChannel->FlushLazySendMessages(); + } + return NS_OK; +} + +nsresult MessageChannel::FlushLazySendMessagesRunnable::Cancel() { + mQueue.Clear(); + mChannel = nullptr; + return NS_OK; +} + +void MessageChannel::FlushLazySendMessagesRunnable::PushMessage( + UniquePtr<Message> aMsg) { + MOZ_ASSERT(mChannel); + mQueue.AppendElement(std::move(aMsg)); +} + +nsTArray<UniquePtr<IPC::Message>> +MessageChannel::FlushLazySendMessagesRunnable::TakeMessages() { + mChannel = nullptr; + return std::move(mQueue); +} + +} // namespace ipc +} // namespace mozilla |