/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- * vim: sw=2 ts=4 et : */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ #include "mozilla/ipc/MessageChannel.h" #include <math.h> #include <utility> #include "CrashAnnotations.h" #include "mozilla/Assertions.h" #include "mozilla/CycleCollectedJSContext.h" #include "mozilla/DebugOnly.h" #include "mozilla/Fuzzing.h" #include "mozilla/IntentionalCrash.h" #include "mozilla/Logging.h" #include "mozilla/Monitor.h" #include "mozilla/Mutex.h" #include "mozilla/ProfilerMarkers.h" #include "mozilla/ScopeExit.h" #include "mozilla/Sprintf.h" #include "mozilla/StaticMutex.h" #include "mozilla/Telemetry.h" #include "mozilla/TimeStamp.h" #include "mozilla/UniquePtrExtensions.h" #include "mozilla/dom/ScriptSettings.h" #include "mozilla/ipc/NodeController.h" #include "mozilla/ipc/ProcessChild.h" #include "mozilla/ipc/ProtocolUtils.h" #include "nsAppRunner.h" #include "nsContentUtils.h" #include "nsIDirectTaskDispatcher.h" #include "nsTHashMap.h" #include "nsDebug.h" #include "nsExceptionHandler.h" #include "nsIMemoryReporter.h" #include "nsISupportsImpl.h" #include "nsPrintfCString.h" #include "nsThreadUtils.h" #ifdef OS_WIN # include "mozilla/gfx/Logging.h" #endif #ifdef FUZZING_SNAPSHOT # include "mozilla/fuzzing/IPCFuzzController.h" #endif // Undo the damage done by mozzconf.h #undef compress static mozilla::LazyLogModule sLogModule("ipc"); #define IPC_LOG(...) MOZ_LOG(sLogModule, LogLevel::Debug, (__VA_ARGS__)) /* * IPC design: * * There are two kinds of messages: async and sync. Sync messages are blocking. * * Terminology: To dispatch a message Foo is to run the RecvFoo code for * it. This is also called "handling" the message. * * Sync and async messages can sometimes "nest" inside other sync messages * (i.e., while waiting for the sync reply, we can dispatch the inner * message). The three possible nesting levels are NOT_NESTED, * NESTED_INSIDE_SYNC, and NESTED_INSIDE_CPOW. The intended uses are: * NOT_NESTED - most messages. * NESTED_INSIDE_SYNC - CPOW-related messages, which are always sync * and can go in either direction. * NESTED_INSIDE_CPOW - messages where we don't want to dispatch * incoming CPOWs while waiting for the response. * These nesting levels are ordered: NOT_NESTED, NESTED_INSIDE_SYNC, * NESTED_INSIDE_CPOW. Async messages cannot be NESTED_INSIDE_SYNC but they can * be NESTED_INSIDE_CPOW. * * To avoid jank, the parent process is not allowed to send NOT_NESTED sync * messages. When a process is waiting for a response to a sync message M0, it * will dispatch an incoming message M if: * 1. M has a higher nesting level than M0, or * 2. if M has the same nesting level as M0 and we're in the child, or * 3. if M has the same nesting level as M0 and it was sent by the other side * while dispatching M0. * The idea is that messages with higher nesting should take precendence. The * purpose of rule 2 is to handle a race where both processes send to each other * simultaneously. In this case, we resolve the race in favor of the parent (so * the child dispatches first). * * Messages satisfy the following properties: * A. When waiting for a response to a sync message, we won't dispatch any * messages of a lower nesting level. * B. Messages of the same nesting level will be dispatched roughly in the * order they were sent. The exception is when the parent and child send * sync messages to each other simulataneously. In this case, the parent's * message is dispatched first. While it is dispatched, the child may send * further nested messages, and these messages may be dispatched before the * child's original message. We can consider ordering to be preserved here * because we pretend that the child's original message wasn't sent until * after the parent's message is finished being dispatched. * * When waiting for a sync message reply, we dispatch an async message only if * it is NESTED_INSIDE_CPOW. Normally NESTED_INSIDE_CPOW async * messages are sent only from the child. However, the parent can send * NESTED_INSIDE_CPOW async messages when it is creating a bridged protocol. */ using namespace mozilla; using namespace mozilla::ipc; using mozilla::MonitorAutoLock; using mozilla::MonitorAutoUnlock; using mozilla::dom::AutoNoJSAPI; #define IPC_ASSERT(_cond, ...) \ do { \ AssertWorkerThread(); \ mMonitor->AssertCurrentThreadOwns(); \ if (!(_cond)) DebugAbort(__FILE__, __LINE__, #_cond, ##__VA_ARGS__); \ } while (0) static MessageChannel* gParentProcessBlocker = nullptr; namespace mozilla { namespace ipc { static const uint32_t kMinTelemetryMessageSize = 4096; // Note: we round the time we spend waiting for a response to the nearest // millisecond. So a min value of 1 ms actually captures from 500us and above. // This is used for both the sending and receiving side telemetry for sync IPC, // (IPC_SYNC_MAIN_LATENCY_MS and IPC_SYNC_RECEIVE_MS). static const uint32_t kMinTelemetrySyncIPCLatencyMs = 1; // static bool MessageChannel::sIsPumpingMessages = false; class AutoEnterTransaction { public: explicit AutoEnterTransaction(MessageChannel* aChan, int32_t aMsgSeqno, int32_t aTransactionID, int aNestedLevel) MOZ_REQUIRES(*aChan->mMonitor) : mChan(aChan), mActive(true), mOutgoing(true), mNestedLevel(aNestedLevel), mSeqno(aMsgSeqno), mTransaction(aTransactionID), mNext(mChan->mTransactionStack) { mChan->mMonitor->AssertCurrentThreadOwns(); mChan->mTransactionStack = this; } explicit AutoEnterTransaction(MessageChannel* aChan, const IPC::Message& aMessage) MOZ_REQUIRES(*aChan->mMonitor) : mChan(aChan), mActive(true), mOutgoing(false), mNestedLevel(aMessage.nested_level()), mSeqno(aMessage.seqno()), mTransaction(aMessage.transaction_id()), mNext(mChan->mTransactionStack) { mChan->mMonitor->AssertCurrentThreadOwns(); if (!aMessage.is_sync()) { mActive = false; return; } mChan->mTransactionStack = this; } ~AutoEnterTransaction() { mChan->mMonitor->AssertCurrentThreadOwns(); if (mActive) { mChan->mTransactionStack = mNext; } } void Cancel() { mChan->mMonitor->AssertCurrentThreadOwns(); AutoEnterTransaction* cur = mChan->mTransactionStack; MOZ_RELEASE_ASSERT(cur == this); while (cur && cur->mNestedLevel != IPC::Message::NOT_NESTED) { // Note that, in the following situation, we will cancel multiple // transactions: // 1. Parent sends NESTED_INSIDE_SYNC message P1 to child. // 2. Child sends NESTED_INSIDE_SYNC message C1 to child. // 3. Child dispatches P1, parent blocks. // 4. Child cancels. // In this case, both P1 and C1 are cancelled. The parent will // remove C1 from its queue when it gets the cancellation message. MOZ_RELEASE_ASSERT(cur->mActive); cur->mActive = false; cur = cur->mNext; } mChan->mTransactionStack = cur; MOZ_RELEASE_ASSERT(IsComplete()); } bool AwaitingSyncReply() const { MOZ_RELEASE_ASSERT(mActive); if (mOutgoing) { return true; } return mNext ? mNext->AwaitingSyncReply() : false; } int AwaitingSyncReplyNestedLevel() const { MOZ_RELEASE_ASSERT(mActive); if (mOutgoing) { return mNestedLevel; } return mNext ? mNext->AwaitingSyncReplyNestedLevel() : 0; } bool DispatchingSyncMessage() const { MOZ_RELEASE_ASSERT(mActive); if (!mOutgoing) { return true; } return mNext ? mNext->DispatchingSyncMessage() : false; } int DispatchingSyncMessageNestedLevel() const { MOZ_RELEASE_ASSERT(mActive); if (!mOutgoing) { return mNestedLevel; } return mNext ? mNext->DispatchingSyncMessageNestedLevel() : 0; } int NestedLevel() const { MOZ_RELEASE_ASSERT(mActive); return mNestedLevel; } int32_t SequenceNumber() const { MOZ_RELEASE_ASSERT(mActive); return mSeqno; } int32_t TransactionID() const { MOZ_RELEASE_ASSERT(mActive); return mTransaction; } void ReceivedReply(UniquePtr<IPC::Message> aMessage) { MOZ_RELEASE_ASSERT(aMessage->seqno() == mSeqno); MOZ_RELEASE_ASSERT(aMessage->transaction_id() == mTransaction); MOZ_RELEASE_ASSERT(!mReply); IPC_LOG("Reply received on worker thread: seqno=%d", mSeqno); mReply = std::move(aMessage); MOZ_RELEASE_ASSERT(IsComplete()); } void HandleReply(UniquePtr<IPC::Message> aMessage) { mChan->mMonitor->AssertCurrentThreadOwns(); AutoEnterTransaction* cur = mChan->mTransactionStack; MOZ_RELEASE_ASSERT(cur == this); while (cur) { MOZ_RELEASE_ASSERT(cur->mActive); if (aMessage->seqno() == cur->mSeqno) { cur->ReceivedReply(std::move(aMessage)); break; } cur = cur->mNext; MOZ_RELEASE_ASSERT(cur); } } bool IsComplete() { return !mActive || mReply; } bool IsOutgoing() { return mOutgoing; } bool IsCanceled() { return !mActive; } bool IsBottom() const { return !mNext; } bool IsError() { MOZ_RELEASE_ASSERT(mReply); return mReply->is_reply_error(); } UniquePtr<IPC::Message> GetReply() { return std::move(mReply); } private: MessageChannel* mChan; // Active is true if this transaction is on the mChan->mTransactionStack // stack. Generally we're not on the stack if the transaction was canceled // or if it was for a message that doesn't require transactions (an async // message). bool mActive; // Is this stack frame for an outgoing message? bool mOutgoing; // Properties of the message being sent/received. int mNestedLevel; int32_t mSeqno; int32_t mTransaction; // Next item in mChan->mTransactionStack. AutoEnterTransaction* mNext; // Pointer the a reply received for this message, if one was received. UniquePtr<IPC::Message> mReply; }; class PendingResponseReporter final : public nsIMemoryReporter { ~PendingResponseReporter() = default; public: NS_DECL_THREADSAFE_ISUPPORTS NS_IMETHOD CollectReports(nsIHandleReportCallback* aHandleReport, nsISupports* aData, bool aAnonymize) override { MOZ_COLLECT_REPORT( "unresolved-ipc-responses", KIND_OTHER, UNITS_COUNT, MessageChannel::gUnresolvedResponses, "Outstanding IPC async message responses that are still not resolved."); return NS_OK; } }; NS_IMPL_ISUPPORTS(PendingResponseReporter, nsIMemoryReporter) class ChannelCountReporter final : public nsIMemoryReporter { ~ChannelCountReporter() = default; struct ChannelCounts { size_t mNow; size_t mMax; ChannelCounts() : mNow(0), mMax(0) {} void Inc() { ++mNow; if (mMax < mNow) { mMax = mNow; } } void Dec() { MOZ_ASSERT(mNow > 0); --mNow; } }; using CountTable = nsTHashMap<nsDepCharHashKey, ChannelCounts>; static StaticMutex sChannelCountMutex; static CountTable* sChannelCounts MOZ_GUARDED_BY(sChannelCountMutex); public: NS_DECL_THREADSAFE_ISUPPORTS NS_IMETHOD CollectReports(nsIHandleReportCallback* aHandleReport, nsISupports* aData, bool aAnonymize) override { AutoTArray<std::pair<const char*, ChannelCounts>, 16> counts; { StaticMutexAutoLock countLock(sChannelCountMutex); if (!sChannelCounts) { return NS_OK; } counts.SetCapacity(sChannelCounts->Count()); for (const auto& entry : *sChannelCounts) { counts.AppendElement(std::pair{entry.GetKey(), entry.GetData()}); } } for (const auto& entry : counts) { nsPrintfCString pathNow("ipc-channels/%s", entry.first); nsPrintfCString pathMax("ipc-channels-peak/%s", entry.first); nsPrintfCString descNow( "Number of IPC channels for" " top-level actor type %s", entry.first); nsPrintfCString descMax( "Peak number of IPC channels for" " top-level actor type %s", entry.first); aHandleReport->Callback(""_ns, pathNow, KIND_OTHER, UNITS_COUNT, entry.second.mNow, descNow, aData); aHandleReport->Callback(""_ns, pathMax, KIND_OTHER, UNITS_COUNT, entry.second.mMax, descMax, aData); } return NS_OK; } static void Increment(const char* aName) { StaticMutexAutoLock countLock(sChannelCountMutex); if (!sChannelCounts) { sChannelCounts = new CountTable; } sChannelCounts->LookupOrInsert(aName).Inc(); } static void Decrement(const char* aName) { StaticMutexAutoLock countLock(sChannelCountMutex); MOZ_ASSERT(sChannelCounts); sChannelCounts->LookupOrInsert(aName).Dec(); } }; StaticMutex ChannelCountReporter::sChannelCountMutex; ChannelCountReporter::CountTable* ChannelCountReporter::sChannelCounts; NS_IMPL_ISUPPORTS(ChannelCountReporter, nsIMemoryReporter) // In child processes, the first MessageChannel is created before // XPCOM is initialized enough to construct the memory reporter // manager. This retries every time a MessageChannel is constructed, // which is good enough in practice. template <class Reporter> static void TryRegisterStrongMemoryReporter() { static Atomic<bool> registered; if (registered.compareExchange(false, true)) { RefPtr<Reporter> reporter = new Reporter(); if (NS_FAILED(RegisterStrongMemoryReporter(reporter))) { registered = false; } } } Atomic<size_t> MessageChannel::gUnresolvedResponses; MessageChannel::MessageChannel(const char* aName, IToplevelProtocol* aListener) : mName(aName), mListener(aListener), mMonitor(new RefCountedMonitor()) { MOZ_COUNT_CTOR(ipc::MessageChannel); #ifdef OS_WIN mEvent = CreateEventW(nullptr, TRUE, FALSE, nullptr); MOZ_RELEASE_ASSERT(mEvent, "CreateEvent failed! Nothing is going to work!"); #endif TryRegisterStrongMemoryReporter<PendingResponseReporter>(); TryRegisterStrongMemoryReporter<ChannelCountReporter>(); } MessageChannel::~MessageChannel() { MOZ_COUNT_DTOR(ipc::MessageChannel); MonitorAutoLock lock(*mMonitor); MOZ_RELEASE_ASSERT(!mOnCxxStack, "MessageChannel destroyed while code on CxxStack"); #ifdef OS_WIN if (mEvent) { BOOL ok = CloseHandle(mEvent); mEvent = nullptr; if (!ok) { gfxDevCrash(mozilla::gfx::LogReason::MessageChannelCloseFailure) << "MessageChannel failed to close. GetLastError: " << GetLastError(); } MOZ_RELEASE_ASSERT(ok); } else { gfxDevCrash(mozilla::gfx::LogReason::MessageChannelCloseFailure) << "MessageChannel destructor ran without an mEvent Handle"; } #endif // Make sure that the MessageChannel was closed (and therefore cleared) before // it was destroyed. We can't properly close the channel at this point, as it // would be unsafe to invoke our listener's callbacks, and we may be being // destroyed on a thread other than `mWorkerThread`. if (!IsClosedLocked()) { CrashReporter::AnnotateCrashReport( CrashReporter::Annotation::IPCFatalErrorProtocol, nsDependentCString(mName)); switch (mChannelState) { case ChannelConnected: MOZ_CRASH( "MessageChannel destroyed without being closed " "(mChannelState == ChannelConnected)."); break; case ChannelClosing: MOZ_CRASH( "MessageChannel destroyed without being closed " "(mChannelState == ChannelClosing)."); break; case ChannelError: MOZ_CRASH( "MessageChannel destroyed without being closed " "(mChannelState == ChannelError)."); break; default: MOZ_CRASH("MessageChannel destroyed without being closed."); } } // Double-check other properties for thoroughness. MOZ_RELEASE_ASSERT(!mLink); MOZ_RELEASE_ASSERT(mPendingResponses.empty()); MOZ_RELEASE_ASSERT(!mChannelErrorTask); MOZ_RELEASE_ASSERT(mPending.isEmpty()); MOZ_RELEASE_ASSERT(!mShutdownTask); } #ifdef DEBUG void MessageChannel::AssertMaybeDeferredCountCorrect() { mMonitor->AssertCurrentThreadOwns(); size_t count = 0; for (MessageTask* task : mPending) { task->AssertMonitorHeld(*mMonitor); if (!IsAlwaysDeferred(*task->Msg())) { count++; } } MOZ_ASSERT(count == mMaybeDeferredPendingCount); } #endif // This function returns the current transaction ID. Since the notion of a // "current transaction" can be hard to define when messages race with each // other and one gets canceled and the other doesn't, we require that this // function is only called when the current transaction is known to be for a // NESTED_INSIDE_SYNC message. In that case, we know for sure what the caller is // looking for. int32_t MessageChannel::CurrentNestedInsideSyncTransaction() const { mMonitor->AssertCurrentThreadOwns(); if (!mTransactionStack) { return 0; } MOZ_RELEASE_ASSERT(mTransactionStack->NestedLevel() == IPC::Message::NESTED_INSIDE_SYNC); return mTransactionStack->TransactionID(); } bool MessageChannel::AwaitingSyncReply() const { mMonitor->AssertCurrentThreadOwns(); return mTransactionStack ? mTransactionStack->AwaitingSyncReply() : false; } int MessageChannel::AwaitingSyncReplyNestedLevel() const { mMonitor->AssertCurrentThreadOwns(); return mTransactionStack ? mTransactionStack->AwaitingSyncReplyNestedLevel() : 0; } bool MessageChannel::DispatchingSyncMessage() const { mMonitor->AssertCurrentThreadOwns(); return mTransactionStack ? mTransactionStack->DispatchingSyncMessage() : false; } int MessageChannel::DispatchingSyncMessageNestedLevel() const { mMonitor->AssertCurrentThreadOwns(); return mTransactionStack ? mTransactionStack->DispatchingSyncMessageNestedLevel() : 0; } static const char* StringFromIPCSide(Side side) { switch (side) { case ChildSide: return "Child"; case ParentSide: return "Parent"; default: return "Unknown"; } } static void PrintErrorMessage(Side side, const char* channelName, const char* msg) { printf_stderr("\n###!!! [%s][%s] Error: %s\n\n", StringFromIPCSide(side), channelName, msg); } bool MessageChannel::Connected() const { mMonitor->AssertCurrentThreadOwns(); return ChannelConnected == mChannelState; } bool MessageChannel::ConnectedOrClosing() const { mMonitor->AssertCurrentThreadOwns(); return ChannelConnected == mChannelState || ChannelClosing == mChannelState; } bool MessageChannel::CanSend() const { if (!mMonitor) { return false; } MonitorAutoLock lock(*mMonitor); return Connected(); } void MessageChannel::Clear() { AssertWorkerThread(); mMonitor->AssertCurrentThreadOwns(); MOZ_DIAGNOSTIC_ASSERT(IsClosedLocked(), "MessageChannel cleared too early?"); MOZ_ASSERT(ChannelClosed == mChannelState || ChannelError == mChannelState); // Don't clear mWorkerThread; we use it in AssertWorkerThread(). // // Also don't clear mListener. If we clear it, then sending a message // through this channel after it's Clear()'ed can cause this process to // crash. if (mShutdownTask) { mShutdownTask->Clear(); mWorkerThread->UnregisterShutdownTask(mShutdownTask); } mShutdownTask = nullptr; if (NS_IsMainThread() && gParentProcessBlocker == this) { gParentProcessBlocker = nullptr; } gUnresolvedResponses -= mPendingResponses.size(); { CallbackMap map = std::move(mPendingResponses); MonitorAutoUnlock unlock(*mMonitor); for (auto& pair : map) { pair.second->Reject(ResponseRejectReason::ChannelClosed); } } mPendingResponses.clear(); SetIsCrossProcess(false); mLink = nullptr; if (mChannelErrorTask) { mChannelErrorTask->Cancel(); mChannelErrorTask = nullptr; } if (mFlushLazySendTask) { mFlushLazySendTask->Cancel(); mFlushLazySendTask = nullptr; } // Free up any memory used by pending messages. mPending.clear(); mMaybeDeferredPendingCount = 0; } bool MessageChannel::Open(ScopedPort aPort, Side aSide, const nsID& aMessageChannelId, nsISerialEventTarget* aEventTarget) { nsCOMPtr<nsISerialEventTarget> eventTarget = aEventTarget ? aEventTarget : GetCurrentSerialEventTarget(); MOZ_RELEASE_ASSERT(eventTarget, "Must open MessageChannel on a nsISerialEventTarget"); MOZ_RELEASE_ASSERT(eventTarget->IsOnCurrentThread(), "Must open MessageChannel from worker thread"); auto shutdownTask = MakeRefPtr<WorkerTargetShutdownTask>(eventTarget, this); nsresult rv = eventTarget->RegisterShutdownTask(shutdownTask); MOZ_ASSERT(rv != NS_ERROR_NOT_IMPLEMENTED, "target for MessageChannel must support shutdown tasks"); if (rv == NS_ERROR_UNEXPECTED) { // If shutdown tasks have already started running, dispatch our shutdown // task manually. NS_WARNING("Opening MessageChannel on EventTarget in shutdown"); rv = eventTarget->Dispatch(shutdownTask->AsRunnable()); } MOZ_RELEASE_ASSERT(NS_SUCCEEDED(rv), "error registering ShutdownTask for MessageChannel"); { MonitorAutoLock lock(*mMonitor); MOZ_RELEASE_ASSERT(!mLink, "Open() called > once"); MOZ_RELEASE_ASSERT(ChannelClosed == mChannelState, "Not currently closed"); MOZ_ASSERT(mSide == UnknownSide); mMessageChannelId = aMessageChannelId; mWorkerThread = eventTarget; mShutdownTask = shutdownTask; mLink = MakeUnique<PortLink>(this, std::move(aPort)); mChannelState = ChannelConnected; mSide = aSide; } // Notify our listener that the underlying IPC channel has been established. // IProtocol will use this callback to create the ActorLifecycleProxy, and // perform an `AddRef` call to keep the actor alive until the channel is // disconnected. // // We unlock our monitor before calling `OnIPCChannelOpened` to ensure that // any calls back into `MessageChannel` do not deadlock. At this point, we may // be receiving messages on the IO thread, however we cannot process them on // the worker thread or have notified our listener until after this function // returns. mListener->OnIPCChannelOpened(); return true; } static Side GetOppSide(Side aSide) { switch (aSide) { case ChildSide: return ParentSide; case ParentSide: return ChildSide; default: return UnknownSide; } } bool MessageChannel::Open(MessageChannel* aTargetChan, nsISerialEventTarget* aEventTarget, Side aSide) { // Opens a connection to another thread in the same process. MOZ_ASSERT(aTargetChan, "Need a target channel"); nsID channelId = nsID::GenerateUUID(); std::pair<ScopedPort, ScopedPort> ports = NodeController::GetSingleton()->CreatePortPair(); // NOTE: This dispatch must be sync as it captures locals by non-owning // reference, however we can't use `NS_DispatchAndSpinEventLoopUntilComplete` // as that will spin a nested event loop, and doesn't work with certain types // of calling event targets. base::WaitableEvent event(/* manual_reset */ true, /* initially_signaled */ false); MOZ_ALWAYS_SUCCEEDS(aEventTarget->Dispatch(NS_NewCancelableRunnableFunction( "ipc::MessageChannel::OpenAsOtherThread", [&]() { aTargetChan->Open(std::move(ports.second), GetOppSide(aSide), channelId, aEventTarget); event.Signal(); }))); bool ok = event.Wait(); MOZ_RELEASE_ASSERT(ok); // Now that the other side has connected, open the port on our side. return Open(std::move(ports.first), aSide, channelId); } bool MessageChannel::OpenOnSameThread(MessageChannel* aTargetChan, mozilla::ipc::Side aSide) { auto [porta, portb] = NodeController::GetSingleton()->CreatePortPair(); nsID channelId = nsID::GenerateUUID(); aTargetChan->mIsSameThreadChannel = true; mIsSameThreadChannel = true; auto* currentThread = GetCurrentSerialEventTarget(); return aTargetChan->Open(std::move(portb), GetOppSide(aSide), channelId, currentThread) && Open(std::move(porta), aSide, channelId, currentThread); } bool MessageChannel::Send(UniquePtr<Message> aMsg) { if (aMsg->size() >= kMinTelemetryMessageSize) { Telemetry::Accumulate(Telemetry::IPC_MESSAGE_SIZE2, aMsg->size()); } MOZ_RELEASE_ASSERT(!aMsg->is_sync()); MOZ_RELEASE_ASSERT(aMsg->nested_level() != IPC::Message::NESTED_INSIDE_SYNC); AutoSetValue<bool> setOnCxxStack(mOnCxxStack, true); AssertWorkerThread(); mMonitor->AssertNotCurrentThreadOwns(); if (MSG_ROUTING_NONE == aMsg->routing_id()) { ReportMessageRouteError("MessageChannel::Send"); return false; } if (aMsg->seqno() == 0) { aMsg->set_seqno(NextSeqno()); } MonitorAutoLock lock(*mMonitor); if (!Connected()) { ReportConnectionError("Send", aMsg->type()); return false; } AddProfilerMarker(*aMsg, MessageDirection::eSending); SendMessageToLink(std::move(aMsg)); return true; } void MessageChannel::SendMessageToLink(UniquePtr<Message> aMsg) { AssertWorkerThread(); mMonitor->AssertCurrentThreadOwns(); // If the channel is not cross-process, there's no reason to be lazy, so we // ignore the flag in that case. if (aMsg->is_lazy_send() && mIsCrossProcess) { // If this is the first lazy message in the queue and our worker thread // supports direct task dispatch, dispatch a task to flush messages, // ensuring we don't leave them pending forever. if (!mFlushLazySendTask) { if (nsCOMPtr<nsIDirectTaskDispatcher> dispatcher = do_QueryInterface(mWorkerThread)) { mFlushLazySendTask = new FlushLazySendMessagesRunnable(this); MOZ_ALWAYS_SUCCEEDS( dispatcher->DispatchDirectTask(do_AddRef(mFlushLazySendTask))); } } if (mFlushLazySendTask) { mFlushLazySendTask->PushMessage(std::move(aMsg)); return; } } if (mFlushLazySendTask) { FlushLazySendMessages(); } mLink->SendMessage(std::move(aMsg)); } void MessageChannel::FlushLazySendMessages() { AssertWorkerThread(); mMonitor->AssertCurrentThreadOwns(); // Clean up any SendLazyTask which might be pending. auto messages = mFlushLazySendTask->TakeMessages(); mFlushLazySendTask = nullptr; // Send all lazy messages, then clear the queue. for (auto& msg : messages) { mLink->SendMessage(std::move(msg)); } } UniquePtr<MessageChannel::UntypedCallbackHolder> MessageChannel::PopCallback( const Message& aMsg, int32_t aActorId) { auto iter = mPendingResponses.find(aMsg.seqno()); if (iter != mPendingResponses.end() && iter->second->mActorId == aActorId && iter->second->mReplyMsgId == aMsg.type()) { UniquePtr<MessageChannel::UntypedCallbackHolder> ret = std::move(iter->second); mPendingResponses.erase(iter); gUnresolvedResponses--; return ret; } return nullptr; } void MessageChannel::RejectPendingResponsesForActor(int32_t aActorId) { auto itr = mPendingResponses.begin(); while (itr != mPendingResponses.end()) { if (itr->second.get()->mActorId != aActorId) { ++itr; continue; } itr->second.get()->Reject(ResponseRejectReason::ActorDestroyed); // Take special care of advancing the iterator since we are // removing it while iterating. itr = mPendingResponses.erase(itr); gUnresolvedResponses--; } } class BuildIDsMatchMessage : public IPC::Message { public: BuildIDsMatchMessage() : IPC::Message(MSG_ROUTING_NONE, BUILD_IDS_MATCH_MESSAGE_TYPE) {} void Log(const std::string& aPrefix, FILE* aOutf) const { fputs("(special `Build IDs match' message)", aOutf); } }; // Send the parent a special async message to confirm when the parent and child // are of the same buildID. Skips sending the message and returns false if the // buildIDs don't match. This is a minor variation on // MessageChannel::Send(Message* aMsg). bool MessageChannel::SendBuildIDsMatchMessage(const char* aParentBuildID) { MOZ_ASSERT(!XRE_IsParentProcess()); nsCString parentBuildID(aParentBuildID); nsCString childBuildID(mozilla::PlatformBuildID()); if (parentBuildID != childBuildID) { // The build IDs didn't match, usually because an update occurred in the // background. return false; } auto msg = MakeUnique<BuildIDsMatchMessage>(); MOZ_RELEASE_ASSERT(!msg->is_sync()); MOZ_RELEASE_ASSERT(msg->nested_level() != IPC::Message::NESTED_INSIDE_SYNC); AssertWorkerThread(); mMonitor->AssertNotCurrentThreadOwns(); // Don't check for MSG_ROUTING_NONE. MonitorAutoLock lock(*mMonitor); if (!Connected()) { ReportConnectionError("SendBuildIDsMatchMessage", msg->type()); return false; } #if defined(MOZ_DEBUG) && defined(ENABLE_TESTS) // Technically, the behavior is interesting for any kind of process // but when exercising tests, we want to crash only a content process and // avoid making noise with other kind of processes crashing if (const char* dontSend = PR_GetEnv("MOZ_BUILDID_MATCH_DONTSEND")) { if (dontSend[0] == '1') { // Bug 1732999: We are going to crash, so we need to advise leak check // tooling to avoid intermittent missing leakcheck NoteIntentionalCrash(XRE_GetProcessTypeString()); if (XRE_IsContentProcess()) { return false; } } } #endif SendMessageToLink(std::move(msg)); return true; } class CancelMessage : public IPC::Message { public: explicit CancelMessage(int transaction) : IPC::Message(MSG_ROUTING_NONE, CANCEL_MESSAGE_TYPE) { set_transaction_id(transaction); } static bool Read(const Message* msg) { return true; } void Log(const std::string& aPrefix, FILE* aOutf) const { fputs("(special `Cancel' message)", aOutf); } }; bool MessageChannel::MaybeInterceptSpecialIOMessage(const Message& aMsg) { mMonitor->AssertCurrentThreadOwns(); if (MSG_ROUTING_NONE == aMsg.routing_id()) { if (GOODBYE_MESSAGE_TYPE == aMsg.type()) { // We've received a GOODBYE message, close the connection and mark // ourselves as "Closing". mLink->Close(); mChannelState = ChannelClosing; if (LoggingEnabled()) { printf( "[%s %u] NOTE: %s actor received `Goodbye' message. Closing " "channel.\n", XRE_GeckoProcessTypeToString(XRE_GetProcessType()), static_cast<uint32_t>(base::GetCurrentProcId()), (mSide == ChildSide) ? "child" : "parent"); } // Notify the worker thread that the connection has been closed, as we // will not receive an `OnChannelErrorFromLink` after calling // `mLink->Close()`. if (AwaitingSyncReply()) { NotifyWorkerThread(); } PostErrorNotifyTask(); return true; } else if (CANCEL_MESSAGE_TYPE == aMsg.type()) { IPC_LOG("Cancel from message"); CancelTransaction(aMsg.transaction_id()); NotifyWorkerThread(); return true; } else if (BUILD_IDS_MATCH_MESSAGE_TYPE == aMsg.type()) { IPC_LOG("Build IDs match message"); mBuildIDsConfirmedMatch = true; return true; } else if (IMPENDING_SHUTDOWN_MESSAGE_TYPE == aMsg.type()) { IPC_LOG("Impending Shutdown received"); ProcessChild::NotifiedImpendingShutdown(); return true; } } return false; } /* static */ bool MessageChannel::IsAlwaysDeferred(const Message& aMsg) { // If a message is not NESTED_INSIDE_CPOW and not sync, then we always defer // it. return aMsg.nested_level() != IPC::Message::NESTED_INSIDE_CPOW && !aMsg.is_sync(); } bool MessageChannel::ShouldDeferMessage(const Message& aMsg) { // Never defer messages that have the highest nested level, even async // ones. This is safe because only the child can send these messages, so // they can never nest. if (aMsg.nested_level() == IPC::Message::NESTED_INSIDE_CPOW) { MOZ_ASSERT(!IsAlwaysDeferred(aMsg)); return false; } // Unless they're NESTED_INSIDE_CPOW, we always defer async messages. // Note that we never send an async NESTED_INSIDE_SYNC message. if (!aMsg.is_sync()) { MOZ_RELEASE_ASSERT(aMsg.nested_level() == IPC::Message::NOT_NESTED); MOZ_ASSERT(IsAlwaysDeferred(aMsg)); return true; } MOZ_ASSERT(!IsAlwaysDeferred(aMsg)); int msgNestedLevel = aMsg.nested_level(); int waitingNestedLevel = AwaitingSyncReplyNestedLevel(); // Always defer if the nested level of the incoming message is less than the // nested level of the message we're awaiting. if (msgNestedLevel < waitingNestedLevel) return true; // Never defer if the message has strictly greater nested level. if (msgNestedLevel > waitingNestedLevel) return false; // When both sides send sync messages of the same nested level, we resolve the // race by dispatching in the child and deferring the incoming message in // the parent. However, the parent still needs to dispatch nested sync // messages. // // Deferring in the parent only sort of breaks message ordering. When the // child's message comes in, we can pretend the child hasn't quite // finished sending it yet. Since the message is sync, we know that the // child hasn't moved on yet. return mSide == ParentSide && aMsg.transaction_id() != CurrentNestedInsideSyncTransaction(); } void MessageChannel::OnMessageReceivedFromLink(UniquePtr<Message> aMsg) { mMonitor->AssertCurrentThreadOwns(); MOZ_ASSERT(mChannelState == ChannelConnected); if (MaybeInterceptSpecialIOMessage(*aMsg)) { return; } mListener->OnChannelReceivedMessage(*aMsg); // If we're awaiting a sync reply, we know that it needs to be immediately // handled to unblock us. if (aMsg->is_sync() && aMsg->is_reply()) { IPC_LOG("Received reply seqno=%d xid=%d", aMsg->seqno(), aMsg->transaction_id()); if (aMsg->seqno() == mTimedOutMessageSeqno) { // Drop the message, but allow future sync messages to be sent. IPC_LOG("Received reply to timedout message; igoring; xid=%d", mTimedOutMessageSeqno); EndTimeout(); return; } MOZ_RELEASE_ASSERT(AwaitingSyncReply()); MOZ_RELEASE_ASSERT(!mTimedOutMessageSeqno); mTransactionStack->HandleReply(std::move(aMsg)); NotifyWorkerThread(); return; } // Nested messages cannot be compressed. MOZ_RELEASE_ASSERT(aMsg->compress_type() == IPC::Message::COMPRESSION_NONE || aMsg->nested_level() == IPC::Message::NOT_NESTED); if (aMsg->compress_type() == IPC::Message::COMPRESSION_ENABLED && !mPending.isEmpty()) { auto* last = mPending.getLast(); last->AssertMonitorHeld(*mMonitor); bool compress = last->Msg()->type() == aMsg->type() && last->Msg()->routing_id() == aMsg->routing_id(); if (compress) { // This message type has compression enabled, and the back of the // queue was the same message type and routed to the same destination. // Replace it with the newer message. MOZ_RELEASE_ASSERT(last->Msg()->compress_type() == IPC::Message::COMPRESSION_ENABLED); last->Msg() = std::move(aMsg); return; } } else if (aMsg->compress_type() == IPC::Message::COMPRESSION_ALL && !mPending.isEmpty()) { for (MessageTask* p = mPending.getLast(); p; p = p->getPrevious()) { p->AssertMonitorHeld(*mMonitor); if (p->Msg()->type() == aMsg->type() && p->Msg()->routing_id() == aMsg->routing_id()) { // This message type has compression enabled, and the queue // holds a message with the same message type and routed to the // same destination. Erase it. Note that, since we always // compress these redundancies, There Can Be Only One. MOZ_RELEASE_ASSERT(p->Msg()->compress_type() == IPC::Message::COMPRESSION_ALL); MOZ_RELEASE_ASSERT(IsAlwaysDeferred(*p->Msg())); p->remove(); break; } } } bool alwaysDeferred = IsAlwaysDeferred(*aMsg); bool shouldWakeUp = AwaitingSyncReply() && !ShouldDeferMessage(*aMsg); IPC_LOG("Receive from link; seqno=%d, xid=%d, shouldWakeUp=%d", aMsg->seqno(), aMsg->transaction_id(), shouldWakeUp); // There are two cases we're concerned about, relating to the state of the // worker thread: // // (1) We are waiting on a sync reply - worker thread is blocked on the // IPC monitor. // - If the message is NESTED_INSIDE_SYNC, we wake up the worker thread to // deliver the message depending on ShouldDeferMessage. Otherwise, we // leave it in the mPending queue, posting a task to the worker event // loop, where it will be processed once the synchronous reply has been // received. // // (2) We are not waiting on a reply. // - We post a task to the worker event loop. // // Note that, we may notify the worker thread even though the monitor is not // blocked. This is okay, since we always check for pending events before // blocking again. RefPtr<MessageTask> task = new MessageTask(this, std::move(aMsg)); mPending.insertBack(task); if (!alwaysDeferred) { mMaybeDeferredPendingCount++; } if (shouldWakeUp) { NotifyWorkerThread(); } // Although we usually don't need to post a message task if // shouldWakeUp is true, it's easier to post anyway than to have to // guarantee that every Send call processes everything it's supposed to // before returning. task->AssertMonitorHeld(*mMonitor); task->Post(); } void MessageChannel::PeekMessages( const std::function<bool(const Message& aMsg)>& aInvoke) { // FIXME: We shouldn't be holding the lock for aInvoke! MonitorAutoLock lock(*mMonitor); for (MessageTask* it : mPending) { it->AssertMonitorHeld(*mMonitor); const Message& msg = *it->Msg(); if (!aInvoke(msg)) { break; } } } void MessageChannel::ProcessPendingRequests( ActorLifecycleProxy* aProxy, AutoEnterTransaction& aTransaction) { mMonitor->AssertCurrentThreadOwns(); AssertMaybeDeferredCountCorrect(); if (mMaybeDeferredPendingCount == 0) { return; } IPC_LOG("ProcessPendingRequests for seqno=%d, xid=%d", aTransaction.SequenceNumber(), aTransaction.TransactionID()); // Loop until there aren't any more nested messages to process. for (;;) { // If we canceled during ProcessPendingRequest, then we need to leave // immediately because the results of ShouldDeferMessage will be // operating with weird state (as if no Send is in progress). That could // cause even NOT_NESTED sync messages to be processed (but not // NOT_NESTED async messages), which would break message ordering. if (aTransaction.IsCanceled()) { return; } Vector<UniquePtr<Message>> toProcess; for (MessageTask* p = mPending.getFirst(); p;) { p->AssertMonitorHeld(*mMonitor); UniquePtr<Message>& msg = p->Msg(); MOZ_RELEASE_ASSERT(!aTransaction.IsCanceled(), "Calling ShouldDeferMessage when cancelled"); bool defer = ShouldDeferMessage(*msg); // Only log the interesting messages. if (msg->is_sync() || msg->nested_level() == IPC::Message::NESTED_INSIDE_CPOW) { IPC_LOG("ShouldDeferMessage(seqno=%d) = %d", msg->seqno(), defer); } if (!defer) { MOZ_ASSERT(!IsAlwaysDeferred(*msg)); if (!toProcess.append(std::move(msg))) MOZ_CRASH(); mMaybeDeferredPendingCount--; p = p->removeAndGetNext(); continue; } p = p->getNext(); } if (toProcess.empty()) { break; } // Processing these messages could result in more messages, so we // loop around to check for more afterwards. for (auto& msg : toProcess) { ProcessPendingRequest(aProxy, std::move(msg)); } } AssertMaybeDeferredCountCorrect(); } bool MessageChannel::Send(UniquePtr<Message> aMsg, UniquePtr<Message>* aReply) { mozilla::TimeStamp start = TimeStamp::Now(); if (aMsg->size() >= kMinTelemetryMessageSize) { Telemetry::Accumulate(Telemetry::IPC_MESSAGE_SIZE2, aMsg->size()); } // Sanity checks. AssertWorkerThread(); mMonitor->AssertNotCurrentThreadOwns(); MOZ_RELEASE_ASSERT(!mIsSameThreadChannel, "sync send over same-thread channel will deadlock!"); RefPtr<ActorLifecycleProxy> proxy = Listener()->GetLifecycleProxy(); #ifdef OS_WIN SyncStackFrame frame(this); NeuteredWindowRegion neuteredRgn(mFlags & REQUIRE_DEFERRED_MESSAGE_PROTECTION); #endif AutoSetValue<bool> setOnCxxStack(mOnCxxStack, true); MonitorAutoLock lock(*mMonitor); if (mTimedOutMessageSeqno) { // Don't bother sending another sync message if a previous one timed out // and we haven't received a reply for it. Once the original timed-out // message receives a reply, we'll be able to send more sync messages // again. IPC_LOG("Send() failed due to previous timeout"); mLastSendError = SyncSendError::PreviousTimeout; return false; } if (DispatchingSyncMessageNestedLevel() == IPC::Message::NOT_NESTED && aMsg->nested_level() > IPC::Message::NOT_NESTED) { // Don't allow sending CPOWs while we're dispatching a sync message. IPC_LOG("Nested level forbids send"); mLastSendError = SyncSendError::SendingCPOWWhileDispatchingSync; return false; } if (DispatchingSyncMessageNestedLevel() == IPC::Message::NESTED_INSIDE_CPOW || DispatchingAsyncMessageNestedLevel() == IPC::Message::NESTED_INSIDE_CPOW) { // Generally only the parent dispatches urgent messages. And the only // sync messages it can send are NESTED_INSIDE_SYNC. Mainly we want to // ensure here that we don't return false for non-CPOW messages. MOZ_RELEASE_ASSERT(aMsg->nested_level() == IPC::Message::NESTED_INSIDE_SYNC); IPC_LOG("Sending while dispatching urgent message"); mLastSendError = SyncSendError::SendingCPOWWhileDispatchingUrgent; return false; } if (aMsg->nested_level() < DispatchingSyncMessageNestedLevel() || aMsg->nested_level() < AwaitingSyncReplyNestedLevel()) { MOZ_RELEASE_ASSERT(DispatchingSyncMessage() || DispatchingAsyncMessage()); IPC_LOG("Cancel from Send"); auto cancel = MakeUnique<CancelMessage>(CurrentNestedInsideSyncTransaction()); CancelTransaction(CurrentNestedInsideSyncTransaction()); SendMessageToLink(std::move(cancel)); } IPC_ASSERT(aMsg->is_sync(), "can only Send() sync messages here"); IPC_ASSERT(aMsg->nested_level() >= DispatchingSyncMessageNestedLevel(), "can't send sync message of a lesser nested level than what's " "being dispatched"); IPC_ASSERT(AwaitingSyncReplyNestedLevel() <= aMsg->nested_level(), "nested sync message sends must be of increasing nested level"); IPC_ASSERT( DispatchingSyncMessageNestedLevel() != IPC::Message::NESTED_INSIDE_CPOW, "not allowed to send messages while dispatching urgent messages"); IPC_ASSERT( DispatchingAsyncMessageNestedLevel() != IPC::Message::NESTED_INSIDE_CPOW, "not allowed to send messages while dispatching urgent messages"); if (!Connected()) { ReportConnectionError("SendAndWait", aMsg->type()); mLastSendError = SyncSendError::NotConnectedBeforeSend; return false; } aMsg->set_seqno(NextSeqno()); int32_t seqno = aMsg->seqno(); int nestedLevel = aMsg->nested_level(); msgid_t replyType = aMsg->type() + 1; AutoEnterTransaction* stackTop = mTransactionStack; // If the most recent message on the stack is NESTED_INSIDE_SYNC, then our // message should nest inside that and we use the same transaction // ID. Otherwise we need a new transaction ID (so we use the seqno of the // message we're sending). bool nest = stackTop && stackTop->NestedLevel() == IPC::Message::NESTED_INSIDE_SYNC; int32_t transaction = nest ? stackTop->TransactionID() : seqno; aMsg->set_transaction_id(transaction); AutoEnterTransaction transact(this, seqno, transaction, nestedLevel); IPC_LOG("Send seqno=%d, xid=%d", seqno, transaction); // aMsg will be destroyed soon, let's keep its type. const char* msgName = aMsg->name(); const msgid_t msgType = aMsg->type(); AddProfilerMarker(*aMsg, MessageDirection::eSending); SendMessageToLink(std::move(aMsg)); while (true) { MOZ_RELEASE_ASSERT(!transact.IsCanceled()); ProcessPendingRequests(proxy, transact); if (transact.IsComplete()) { break; } if (!Connected()) { ReportConnectionError("Send", msgType); mLastSendError = SyncSendError::DisconnectedDuringSend; return false; } MOZ_RELEASE_ASSERT(!mTimedOutMessageSeqno); MOZ_RELEASE_ASSERT(!transact.IsComplete()); MOZ_RELEASE_ASSERT(mTransactionStack == &transact); bool maybeTimedOut = !WaitForSyncNotify(); if (mListener->NeedArtificialSleep()) { MonitorAutoUnlock unlock(*mMonitor); mListener->ArtificialSleep(); } if (!Connected()) { ReportConnectionError("SendAndWait", msgType); mLastSendError = SyncSendError::DisconnectedDuringSend; return false; } if (transact.IsCanceled()) { break; } MOZ_RELEASE_ASSERT(mTransactionStack == &transact); // We only time out a message if it initiated a new transaction (i.e., // if neither side has any other message Sends on the stack). bool canTimeOut = transact.IsBottom(); if (maybeTimedOut && canTimeOut && !ShouldContinueFromTimeout()) { // Since ShouldContinueFromTimeout drops the lock, we need to // re-check all our conditions here. We shouldn't time out if any of // these things happen because there won't be a reply to the timed // out message in these cases. if (transact.IsComplete()) { break; } IPC_LOG("Timing out Send: xid=%d", transaction); mTimedOutMessageSeqno = seqno; mTimedOutMessageNestedLevel = nestedLevel; mLastSendError = SyncSendError::TimedOut; return false; } if (transact.IsCanceled()) { break; } } if (transact.IsCanceled()) { IPC_LOG("Other side canceled seqno=%d, xid=%d", seqno, transaction); mLastSendError = SyncSendError::CancelledAfterSend; return false; } if (transact.IsError()) { IPC_LOG("Error: seqno=%d, xid=%d", seqno, transaction); mLastSendError = SyncSendError::ReplyError; return false; } uint32_t latencyMs = round((TimeStamp::Now() - start).ToMilliseconds()); IPC_LOG("Got reply: seqno=%d, xid=%d, msgName=%s, latency=%ums", seqno, transaction, msgName, latencyMs); UniquePtr<Message> reply = transact.GetReply(); MOZ_RELEASE_ASSERT(reply); MOZ_RELEASE_ASSERT(reply->is_reply(), "expected reply"); MOZ_RELEASE_ASSERT(!reply->is_reply_error()); MOZ_RELEASE_ASSERT(reply->seqno() == seqno); MOZ_RELEASE_ASSERT(reply->type() == replyType, "wrong reply type"); MOZ_RELEASE_ASSERT(reply->is_sync()); AddProfilerMarker(*reply, MessageDirection::eReceiving); if (reply->size() >= kMinTelemetryMessageSize) { Telemetry::Accumulate(Telemetry::IPC_REPLY_SIZE, nsDependentCString(msgName), reply->size()); } *aReply = std::move(reply); // NOTE: Only collect IPC_SYNC_MAIN_LATENCY_MS on the main thread (bug // 1343729) if (NS_IsMainThread() && latencyMs >= kMinTelemetrySyncIPCLatencyMs) { Telemetry::Accumulate(Telemetry::IPC_SYNC_MAIN_LATENCY_MS, nsDependentCString(msgName), latencyMs); } return true; } bool MessageChannel::HasPendingEvents() { AssertWorkerThread(); mMonitor->AssertCurrentThreadOwns(); return ConnectedOrClosing() && !mPending.isEmpty(); } bool MessageChannel::ProcessPendingRequest(ActorLifecycleProxy* aProxy, UniquePtr<Message> aUrgent) { AssertWorkerThread(); mMonitor->AssertCurrentThreadOwns(); IPC_LOG("Process pending: seqno=%d, xid=%d", aUrgent->seqno(), aUrgent->transaction_id()); // keep the error relevant information msgid_t msgType = aUrgent->type(); DispatchMessage(aProxy, std::move(aUrgent)); if (!ConnectedOrClosing()) { ReportConnectionError("ProcessPendingRequest", msgType); return false; } return true; } bool MessageChannel::ShouldRunMessage(const Message& aMsg) { if (!mTimedOutMessageSeqno) { return true; } // If we've timed out a message and we're awaiting the reply to the timed // out message, we have to be careful what messages we process. Here's what // can go wrong: // 1. child sends a NOT_NESTED sync message S // 2. parent sends a NESTED_INSIDE_SYNC sync message H at the same time // 3. parent times out H // 4. child starts processing H and sends a NESTED_INSIDE_SYNC message H' // nested within the same transaction // 5. parent dispatches S and sends reply // 6. child asserts because it instead expected a reply to H'. // // To solve this, we refuse to process S in the parent until we get a reply // to H. More generally, let the timed out message be M. We don't process a // message unless the child would need the response to that message in order // to process M. Those messages are the ones that have a higher nested level // than M or that are part of the same transaction as M. if (aMsg.nested_level() < mTimedOutMessageNestedLevel || (aMsg.nested_level() == mTimedOutMessageNestedLevel && aMsg.transaction_id() != mTimedOutMessageSeqno)) { return false; } return true; } void MessageChannel::RunMessage(ActorLifecycleProxy* aProxy, MessageTask& aTask) { AssertWorkerThread(); mMonitor->AssertCurrentThreadOwns(); aTask.AssertMonitorHeld(*mMonitor); UniquePtr<Message>& msg = aTask.Msg(); if (!ConnectedOrClosing()) { ReportConnectionError("RunMessage", msg->type()); return; } // Check that we're going to run the first message that's valid to run. #if 0 # ifdef DEBUG for (MessageTask* task : mPending) { if (task == &aTask) { break; } MOZ_ASSERT(!ShouldRunMessage(*task->Msg()) || aTask.Msg()->priority() != task->Msg()->priority()); } # endif #endif if (!ShouldRunMessage(*msg)) { return; } MOZ_RELEASE_ASSERT(aTask.isInList()); aTask.remove(); if (!IsAlwaysDeferred(*msg)) { mMaybeDeferredPendingCount--; } DispatchMessage(aProxy, std::move(msg)); } NS_IMPL_ISUPPORTS_INHERITED(MessageChannel::MessageTask, CancelableRunnable, nsIRunnablePriority, nsIRunnableIPCMessageType) static uint32_t ToRunnablePriority(IPC::Message::PriorityValue aPriority) { switch (aPriority) { case IPC::Message::NORMAL_PRIORITY: return nsIRunnablePriority::PRIORITY_NORMAL; case IPC::Message::INPUT_PRIORITY: return nsIRunnablePriority::PRIORITY_INPUT_HIGH; case IPC::Message::VSYNC_PRIORITY: return nsIRunnablePriority::PRIORITY_VSYNC; case IPC::Message::MEDIUMHIGH_PRIORITY: return nsIRunnablePriority::PRIORITY_MEDIUMHIGH; case IPC::Message::CONTROL_PRIORITY: return nsIRunnablePriority::PRIORITY_CONTROL; default: MOZ_ASSERT_UNREACHABLE(); return nsIRunnablePriority::PRIORITY_NORMAL; } } MessageChannel::MessageTask::MessageTask(MessageChannel* aChannel, UniquePtr<Message> aMessage) : CancelableRunnable(aMessage->name()), mMonitor(aChannel->mMonitor), mChannel(aChannel), mMessage(std::move(aMessage)), mPriority(ToRunnablePriority(mMessage->priority())), mScheduled(false) #ifdef FUZZING_SNAPSHOT , mIsFuzzMsg(mMessage->IsFuzzMsg()), mFuzzStopped(false) #endif { MOZ_DIAGNOSTIC_ASSERT(mMessage, "message may not be null"); #ifdef FUZZING_SNAPSHOT if (mIsFuzzMsg) { MOZ_FUZZING_IPC_MT_CTOR(); } #endif } MessageChannel::MessageTask::~MessageTask() { #ifdef FUZZING_SNAPSHOT // We track fuzzing messages until their run is complete. To make sure // that we don't miss messages that are for some reason destroyed without // being run (e.g. canceled), we catch this condition in the destructor. if (mIsFuzzMsg && !mFuzzStopped) { MOZ_FUZZING_IPC_MT_STOP(); } else if (!mIsFuzzMsg && !fuzzing::Nyx::instance().started()) { MOZ_FUZZING_IPC_PRE_FUZZ_MT_STOP(); } #endif } nsresult MessageChannel::MessageTask::Run() { mMonitor->AssertNotCurrentThreadOwns(); // Drop the toplevel actor's lifecycle proxy outside of our monitor if we take // it, as destroying our ActorLifecycleProxy reference can acquire the // monitor. RefPtr<ActorLifecycleProxy> proxy; MonitorAutoLock lock(*mMonitor); // In case we choose not to run this message, we may need to be able to Post // it again. mScheduled = false; if (!isInList()) { return NS_OK; } #ifdef FUZZING_SNAPSHOT if (!mIsFuzzMsg) { if (fuzzing::Nyx::instance().started()) { // Once we started fuzzing, prevent non-fuzzing tasks from being // run and potentially blocking worker threads. // // TODO: This currently blocks all MessageTasks from running, not // just those belonging to the target process pair. We currently // do this for performance reasons, but it should be re-evaluated // at a later stage when we found a better snapshot point. return NS_OK; } // Record all running tasks prior to fuzzing, so we can wait for // them to settle before snapshotting. MOZ_FUZZING_IPC_PRE_FUZZ_MT_RUN(); } #endif Channel()->AssertWorkerThread(); mMonitor->AssertSameMonitor(*Channel()->mMonitor); proxy = Channel()->Listener()->GetLifecycleProxy(); Channel()->RunMessage(proxy, *this); #ifdef FUZZING_SNAPSHOT if (mIsFuzzMsg && !mFuzzStopped) { MOZ_FUZZING_IPC_MT_STOP(); mFuzzStopped = true; } #endif return NS_OK; } // Warning: This method removes the receiver from whatever list it might be in. nsresult MessageChannel::MessageTask::Cancel() { mMonitor->AssertNotCurrentThreadOwns(); MonitorAutoLock lock(*mMonitor); if (!isInList()) { return NS_OK; } Channel()->AssertWorkerThread(); mMonitor->AssertSameMonitor(*Channel()->mMonitor); if (!IsAlwaysDeferred(*Msg())) { Channel()->mMaybeDeferredPendingCount--; } remove(); #ifdef FUZZING_SNAPSHOT if (mIsFuzzMsg && !mFuzzStopped) { MOZ_FUZZING_IPC_MT_STOP(); mFuzzStopped = true; } #endif return NS_OK; } void MessageChannel::MessageTask::Post() { mMonitor->AssertCurrentThreadOwns(); mMonitor->AssertSameMonitor(*Channel()->mMonitor); MOZ_RELEASE_ASSERT(!mScheduled); MOZ_RELEASE_ASSERT(isInList()); mScheduled = true; Channel()->mWorkerThread->Dispatch(do_AddRef(this)); } NS_IMETHODIMP MessageChannel::MessageTask::GetPriority(uint32_t* aPriority) { *aPriority = mPriority; return NS_OK; } NS_IMETHODIMP MessageChannel::MessageTask::GetType(uint32_t* aType) { mMonitor->AssertNotCurrentThreadOwns(); MonitorAutoLock lock(*mMonitor); if (!mMessage) { // If mMessage has been moved already elsewhere, we can't know what the type // has been. return NS_ERROR_FAILURE; } *aType = mMessage->type(); return NS_OK; } void MessageChannel::DispatchMessage(ActorLifecycleProxy* aProxy, UniquePtr<Message> aMsg) { AssertWorkerThread(); mMonitor->AssertCurrentThreadOwns(); Maybe<AutoNoJSAPI> nojsapi; if (NS_IsMainThread() && CycleCollectedJSContext::Get()) { nojsapi.emplace(); } UniquePtr<Message> reply; IPC_LOG("DispatchMessage: seqno=%d, xid=%d", aMsg->seqno(), aMsg->transaction_id()); AddProfilerMarker(*aMsg, MessageDirection::eReceiving); { AutoEnterTransaction transaction(this, *aMsg); int id = aMsg->transaction_id(); MOZ_RELEASE_ASSERT(!aMsg->is_sync() || id == transaction.TransactionID()); { MonitorAutoUnlock unlock(*mMonitor); AutoSetValue<bool> setOnCxxStack(mOnCxxStack, true); mListener->ArtificialSleep(); if (aMsg->is_sync()) { DispatchSyncMessage(aProxy, *aMsg, reply); } else { DispatchAsyncMessage(aProxy, *aMsg); } mListener->ArtificialSleep(); } if (reply && transaction.IsCanceled()) { // The transaction has been canceled. Don't send a reply. IPC_LOG("Nulling out reply due to cancellation, seqno=%d, xid=%d", aMsg->seqno(), id); reply = nullptr; } } if (reply && ChannelConnected == mChannelState) { IPC_LOG("Sending reply seqno=%d, xid=%d", aMsg->seqno(), aMsg->transaction_id()); AddProfilerMarker(*reply, MessageDirection::eSending); SendMessageToLink(std::move(reply)); } } void MessageChannel::DispatchSyncMessage(ActorLifecycleProxy* aProxy, const Message& aMsg, UniquePtr<Message>& aReply) { AssertWorkerThread(); mozilla::TimeStamp start = TimeStamp::Now(); int nestedLevel = aMsg.nested_level(); MOZ_RELEASE_ASSERT(nestedLevel == IPC::Message::NOT_NESTED || NS_IsMainThread()); MessageChannel* dummy; MessageChannel*& blockingVar = mSide == ChildSide && NS_IsMainThread() ? gParentProcessBlocker : dummy; Result rv; { AutoSetValue<MessageChannel*> blocked(blockingVar, this); rv = aProxy->Get()->OnMessageReceived(aMsg, aReply); } uint32_t latencyMs = round((TimeStamp::Now() - start).ToMilliseconds()); if (latencyMs >= kMinTelemetrySyncIPCLatencyMs) { Telemetry::Accumulate(Telemetry::IPC_SYNC_RECEIVE_MS, nsDependentCString(aMsg.name()), latencyMs); } if (!MaybeHandleError(rv, aMsg, "DispatchSyncMessage")) { aReply = Message::ForSyncDispatchError(aMsg.nested_level()); } aReply->set_seqno(aMsg.seqno()); aReply->set_transaction_id(aMsg.transaction_id()); } void MessageChannel::DispatchAsyncMessage(ActorLifecycleProxy* aProxy, const Message& aMsg) { AssertWorkerThread(); MOZ_RELEASE_ASSERT(!aMsg.is_sync()); if (aMsg.routing_id() == MSG_ROUTING_NONE) { NS_WARNING("unhandled special message!"); MaybeHandleError(MsgNotKnown, aMsg, "DispatchAsyncMessage"); return; } Result rv; { int nestedLevel = aMsg.nested_level(); AutoSetValue<bool> async(mDispatchingAsyncMessage, true); AutoSetValue<int> nestedLevelSet(mDispatchingAsyncMessageNestedLevel, nestedLevel); rv = aProxy->Get()->OnMessageReceived(aMsg); } MaybeHandleError(rv, aMsg, "DispatchAsyncMessage"); } void MessageChannel::EnqueuePendingMessages() { AssertWorkerThread(); mMonitor->AssertCurrentThreadOwns(); // XXX performance tuning knob: could process all or k pending // messages here, rather than enqueuing for later processing RepostAllMessages(); } bool MessageChannel::WaitResponse(bool aWaitTimedOut) { AssertWorkerThread(); if (aWaitTimedOut) { if (mInTimeoutSecondHalf) { // We've really timed out this time. return false; } // Try a second time. mInTimeoutSecondHalf = true; } else { mInTimeoutSecondHalf = false; } return true; } #ifndef OS_WIN bool MessageChannel::WaitForSyncNotify() { AssertWorkerThread(); # ifdef DEBUG // WARNING: We don't release the lock here. We can't because the link // could signal at this time and we would miss it. Instead we require // ArtificialTimeout() to be extremely simple. if (mListener->ArtificialTimeout()) { return false; } # endif MOZ_RELEASE_ASSERT(!mIsSameThreadChannel, "Wait on same-thread channel will deadlock!"); TimeDuration timeout = (kNoTimeout == mTimeoutMs) ? TimeDuration::Forever() : TimeDuration::FromMilliseconds(mTimeoutMs); CVStatus status = mMonitor->Wait(timeout); // If the timeout didn't expire, we know we received an event. The // converse is not true. return WaitResponse(status == CVStatus::Timeout); } void MessageChannel::NotifyWorkerThread() { mMonitor->Notify(); } #endif bool MessageChannel::ShouldContinueFromTimeout() { AssertWorkerThread(); mMonitor->AssertCurrentThreadOwns(); bool cont; { MonitorAutoUnlock unlock(*mMonitor); cont = mListener->ShouldContinueFromReplyTimeout(); mListener->ArtificialSleep(); } static enum { UNKNOWN, NOT_DEBUGGING, DEBUGGING } sDebuggingChildren = UNKNOWN; if (sDebuggingChildren == UNKNOWN) { sDebuggingChildren = getenv("MOZ_DEBUG_CHILD_PROCESS") || getenv("MOZ_DEBUG_CHILD_PAUSE") ? DEBUGGING : NOT_DEBUGGING; } if (sDebuggingChildren == DEBUGGING) { return true; } return cont; } void MessageChannel::SetReplyTimeoutMs(int32_t aTimeoutMs) { // Set channel timeout value. Since this is broken up into // two period, the minimum timeout value is 2ms. AssertWorkerThread(); mTimeoutMs = (aTimeoutMs <= 0) ? kNoTimeout : (int32_t)ceil((double)aTimeoutMs / 2.0); } void MessageChannel::ReportConnectionError(const char* aFunctionName, const uint32_t aMsgType) const { AssertWorkerThread(); mMonitor->AssertCurrentThreadOwns(); const char* errorMsg = nullptr; switch (mChannelState) { case ChannelClosed: errorMsg = "Closed channel: cannot send/recv"; break; case ChannelClosing: errorMsg = "Channel closing: too late to send, messages will be lost"; break; case ChannelError: errorMsg = "Channel error: cannot send/recv"; break; default: MOZ_CRASH("unreached"); } // IPC connection errors are fairly common, especially "Channel closing: too // late to send/recv, messages will be lost", so shouldn't be being reported // on release builds, as that's misleading as to their severity. NS_WARNING(nsPrintfCString("IPC Connection Error: [%s][%s] %s(msgname=%s) %s", StringFromIPCSide(mSide), mName, aFunctionName, IPC::StringFromIPCMessageType(aMsgType), errorMsg) .get()); MonitorAutoUnlock unlock(*mMonitor); mListener->ProcessingError(MsgDropped, errorMsg); } void MessageChannel::ReportMessageRouteError(const char* channelName) const { PrintErrorMessage(mSide, channelName, "Need a route"); mListener->ProcessingError(MsgRouteError, "MsgRouteError"); } bool MessageChannel::MaybeHandleError(Result code, const Message& aMsg, const char* channelName) { if (MsgProcessed == code) return true; const char* errorMsg = nullptr; switch (code) { case MsgNotKnown: errorMsg = "Unknown message: not processed"; break; case MsgNotAllowed: errorMsg = "Message not allowed: cannot be sent/recvd in this state"; break; case MsgPayloadError: errorMsg = "Payload error: message could not be deserialized"; break; case MsgProcessingError: errorMsg = "Processing error: message was deserialized, but the handler " "returned false (indicating failure)"; break; case MsgRouteError: errorMsg = "Route error: message sent to unknown actor ID"; break; case MsgValueError: errorMsg = "Value error: message was deserialized, but contained an illegal " "value"; break; default: MOZ_CRASH("unknown Result code"); return false; } char reason[512]; const char* msgname = aMsg.name(); if (msgname[0] == '?') { SprintfLiteral(reason, "(msgtype=0x%X) %s", aMsg.type(), errorMsg); } else { SprintfLiteral(reason, "%s %s", msgname, errorMsg); } PrintErrorMessage(mSide, channelName, reason); // Error handled in mozilla::ipc::IPCResult. if (code == MsgProcessingError) { return false; } mListener->ProcessingError(code, reason); return false; } void MessageChannel::OnChannelErrorFromLink() { mMonitor->AssertCurrentThreadOwns(); MOZ_ASSERT(mChannelState == ChannelConnected); IPC_LOG("OnChannelErrorFromLink"); if (AwaitingSyncReply()) { NotifyWorkerThread(); } if (mAbortOnError) { // mAbortOnError is set by main actors (e.g., ContentChild) to ensure // that the process terminates even if normal shutdown is prevented. // A MOZ_CRASH() here is not helpful because crash reporting relies // on the parent process which we know is dead or otherwise unusable. // // Additionally, the parent process can (and often is) killed on Android // when apps are backgrounded. We don't need to report a crash for // normal behavior in that case. printf_stderr("Exiting due to channel error.\n"); ProcessChild::QuickExit(); } mChannelState = ChannelError; mMonitor->Notify(); PostErrorNotifyTask(); } void MessageChannel::NotifyMaybeChannelError(ReleasableMonitorAutoLock& aLock) { AssertWorkerThread(); mMonitor->AssertCurrentThreadOwns(); aLock.AssertCurrentThreadOwns(); MOZ_ASSERT(mChannelState != ChannelConnected); if (ChannelClosing == mChannelState || ChannelClosed == mChannelState) { // the channel closed, but we received a "Goodbye" message warning us // about it. no worries mChannelState = ChannelClosed; NotifyChannelClosed(aLock); return; } MOZ_ASSERT(ChannelError == mChannelState); Clear(); // IPDL assumes these notifications do not fire twice, so we do not let // that happen. if (mNotifiedChannelDone) { return; } mNotifiedChannelDone = true; // Let our listener know that the channel errored. This may cause the // channel to be deleted. Release our caller's `MonitorAutoLock` before // invoking the listener, as this may call back into MessageChannel, and/or // cause the channel to be destroyed. aLock.Unlock(); mListener->OnChannelError(); } void MessageChannel::OnNotifyMaybeChannelError() { AssertWorkerThread(); mMonitor->AssertNotCurrentThreadOwns(); // This lock guard may be reset by `NotifyMaybeChannelError` before invoking // listener callbacks which may destroy this `MessageChannel`. // // Acquiring the lock here also allows us to ensure that // `OnChannelErrorFromLink` has finished running before this task is allowed // to continue. ReleasableMonitorAutoLock lock(*mMonitor); mChannelErrorTask = nullptr; if (IsOnCxxStack()) { // This used to post a 10ms delayed task; however not all // nsISerialEventTarget implementations support delayed dispatch. // The delay being completely arbitrary, we may not as well have any. PostErrorNotifyTask(); return; } // This call may destroy `this`. NotifyMaybeChannelError(lock); } void MessageChannel::PostErrorNotifyTask() { mMonitor->AssertCurrentThreadOwns(); if (mChannelErrorTask) { return; } // This must be the last code that runs on this thread! mChannelErrorTask = NewNonOwningCancelableRunnableMethod( "ipc::MessageChannel::OnNotifyMaybeChannelError", this, &MessageChannel::OnNotifyMaybeChannelError); mWorkerThread->Dispatch(do_AddRef(mChannelErrorTask)); } // Special async message. class GoodbyeMessage : public IPC::Message { public: GoodbyeMessage() : IPC::Message(MSG_ROUTING_NONE, GOODBYE_MESSAGE_TYPE) {} static bool Read(const Message* msg) { return true; } void Log(const std::string& aPrefix, FILE* aOutf) const { fputs("(special `Goodbye' message)", aOutf); } }; void MessageChannel::CloseWithError() { AssertWorkerThread(); // This lock guard may be reset by `NotifyMaybeChannelError` before invoking // listener callbacks which may destroy this `MessageChannel`. ReleasableMonitorAutoLock lock(*mMonitor); switch (mChannelState) { case ChannelError: // Already errored, ensure we notify if we haven't yet. NotifyMaybeChannelError(lock); return; case ChannelClosed: // Already closed, we can't do anything. return; default: // Either connected or closing, immediately convert to an error, and // notify. MOZ_ASSERT(mChannelState == ChannelConnected || mChannelState == ChannelClosing); mLink->Close(); mChannelState = ChannelError; NotifyMaybeChannelError(lock); return; } } void MessageChannel::NotifyImpendingShutdown() { UniquePtr<Message> msg = MakeUnique<Message>(MSG_ROUTING_NONE, IMPENDING_SHUTDOWN_MESSAGE_TYPE); MonitorAutoLock lock(*mMonitor); if (Connected()) { SendMessageToLink(std::move(msg)); } } void MessageChannel::Close() { AssertWorkerThread(); mMonitor->AssertNotCurrentThreadOwns(); // This lock guard may be reset by `Notify{ChannelClosed,MaybeChannelError}` // before invoking listener callbacks which may destroy this `MessageChannel`. ReleasableMonitorAutoLock lock(*mMonitor); switch (mChannelState) { case ChannelError: // See bug 538586: if the listener gets deleted while the // IO thread's NotifyChannelError event is still enqueued // and subsequently deletes us, then the error event will // also be deleted and the listener will never be notified // of the channel error. NotifyMaybeChannelError(lock); return; case ChannelClosed: // Slightly unexpected but harmless; ignore. See bug 1554244. return; default: // Notify the other side that we're about to close our socket. If we've // already received a Goodbye from the other side (and our state is // ChannelClosing), there's no reason to send one. if (ChannelConnected == mChannelState) { SendMessageToLink(MakeUnique<GoodbyeMessage>()); } mLink->Close(); mChannelState = ChannelClosed; NotifyChannelClosed(lock); return; } } void MessageChannel::NotifyChannelClosed(ReleasableMonitorAutoLock& aLock) { AssertWorkerThread(); mMonitor->AssertCurrentThreadOwns(); aLock.AssertCurrentThreadOwns(); if (ChannelClosed != mChannelState) { MOZ_CRASH("channel should have been closed!"); } Clear(); // IPDL assumes these notifications do not fire twice, so we do not let // that happen. if (mNotifiedChannelDone) { return; } mNotifiedChannelDone = true; // Let our listener know that the channel was closed. This may cause the // channel to be deleted. Release our caller's `MonitorAutoLock` before // invoking the listener, as this may call back into MessageChannel, and/or // cause the channel to be destroyed. aLock.Unlock(); mListener->OnChannelClose(); } void MessageChannel::DebugAbort(const char* file, int line, const char* cond, const char* why, bool reply) { AssertWorkerThread(); mMonitor->AssertCurrentThreadOwns(); printf_stderr( "###!!! [MessageChannel][%s][%s:%d] " "Assertion (%s) failed. %s %s\n", mSide == ChildSide ? "Child" : "Parent", file, line, cond, why, reply ? "(reply)" : ""); MessageQueue pending = std::move(mPending); while (!pending.isEmpty()) { pending.getFirst()->AssertMonitorHeld(*mMonitor); printf_stderr(" [ %s%s ]\n", pending.getFirst()->Msg()->is_sync() ? "sync" : "async", pending.getFirst()->Msg()->is_reply() ? "reply" : ""); pending.popFirst(); } MOZ_CRASH_UNSAFE(why); } void MessageChannel::AddProfilerMarker(const IPC::Message& aMessage, MessageDirection aDirection) { mMonitor->AssertCurrentThreadOwns(); if (profiler_feature_active(ProfilerFeature::IPCMessages)) { base::ProcessId pid = mListener->OtherPidMaybeInvalid(); // Only record markers for IPCs with a valid pid. // And if one of the profiler mutexes is locked on this thread, don't record // markers, because we don't want to expose profiler IPCs due to the // profiler itself, and also to avoid possible re-entrancy issues. if (pid != base::kInvalidProcessId && !profiler_is_locked_on_current_thread()) { // The current timestamp must be given to the `IPCMarker` payload. [[maybe_unused]] const TimeStamp now = TimeStamp::Now(); bool isThreadBeingProfiled = profiler_thread_is_being_profiled_for_markers(); PROFILER_MARKER( "IPC", IPC, mozilla::MarkerOptions( mozilla::MarkerTiming::InstantAt(now), // If the thread is being profiled, add the marker to // the current thread. If the thread is not being // profiled, add the marker to the main thread. It // will appear in the main thread's IPC track. Profiler analysis // UI correlates all the IPC markers from different threads and // generates processed markers. isThreadBeingProfiled ? mozilla::MarkerThreadId::CurrentThread() : mozilla::MarkerThreadId::MainThread()), IPCMarker, now, now, pid, aMessage.seqno(), aMessage.type(), mSide, aDirection, MessagePhase::Endpoint, aMessage.is_sync(), // aOriginThreadId: If the thread is being profiled, do not include a // thread ID, as it's the same as the markers. Only include this field // when the marker is being sent from another thread. isThreadBeingProfiled ? mozilla::MarkerThreadId{} : mozilla::MarkerThreadId::CurrentThread()); } } } void MessageChannel::EndTimeout() { mMonitor->AssertCurrentThreadOwns(); IPC_LOG("Ending timeout of seqno=%d", mTimedOutMessageSeqno); mTimedOutMessageSeqno = 0; mTimedOutMessageNestedLevel = 0; RepostAllMessages(); } void MessageChannel::RepostAllMessages() { mMonitor->AssertCurrentThreadOwns(); bool needRepost = false; for (MessageTask* task : mPending) { task->AssertMonitorHeld(*mMonitor); if (!task->IsScheduled()) { needRepost = true; break; } } if (!needRepost) { // If everything is already scheduled to run, do nothing. return; } // In some cases we may have deferred dispatch of some messages in the // queue. Now we want to run them again. However, we can't just re-post // those messages since the messages after them in mPending would then be // before them in the event queue. So instead we cancel everything and // re-post all messages in the correct order. MessageQueue queue = std::move(mPending); while (RefPtr<MessageTask> task = queue.popFirst()) { task->AssertMonitorHeld(*mMonitor); RefPtr<MessageTask> newTask = new MessageTask(this, std::move(task->Msg())); newTask->AssertMonitorHeld(*mMonitor); mPending.insertBack(newTask); newTask->Post(); } AssertMaybeDeferredCountCorrect(); } void MessageChannel::CancelTransaction(int transaction) { mMonitor->AssertCurrentThreadOwns(); // When we cancel a transaction, we need to behave as if there's no longer // any IPC on the stack. Anything we were dispatching or sending will get // canceled. Consequently, we have to update the state variables below. // // We also need to ensure that when any IPC functions on the stack return, // they don't reset these values using an RAII class like AutoSetValue. To // avoid that, these RAII classes check if the variable they set has been // tampered with (by us). If so, they don't reset the variable to the old // value. IPC_LOG("CancelTransaction: xid=%d", transaction); // An unusual case: We timed out a transaction which the other side then // cancelled. In this case we just leave the timedout state and try to // forget this ever happened. if (transaction == mTimedOutMessageSeqno) { IPC_LOG("Cancelled timed out message %d", mTimedOutMessageSeqno); EndTimeout(); // Normally mCurrentTransaction == 0 here. But it can be non-zero if: // 1. Parent sends NESTED_INSIDE_SYNC message H. // 2. Parent times out H. // 3. Child dispatches H and sends nested message H' (same transaction). // 4. Parent dispatches H' and cancels. MOZ_RELEASE_ASSERT(!mTransactionStack || mTransactionStack->TransactionID() == transaction); if (mTransactionStack) { mTransactionStack->Cancel(); } } else { MOZ_RELEASE_ASSERT(mTransactionStack->TransactionID() == transaction); mTransactionStack->Cancel(); } bool foundSync = false; for (MessageTask* p = mPending.getFirst(); p;) { p->AssertMonitorHeld(*mMonitor); UniquePtr<Message>& msg = p->Msg(); // If there was a race between the parent and the child, then we may // have a queued sync message. We want to drop this message from the // queue since if will get cancelled along with the transaction being // cancelled. This happens if the message in the queue is // NESTED_INSIDE_SYNC. if (msg->is_sync() && msg->nested_level() != IPC::Message::NOT_NESTED) { MOZ_RELEASE_ASSERT(!foundSync); MOZ_RELEASE_ASSERT(msg->transaction_id() != transaction); IPC_LOG("Removing msg from queue seqno=%d xid=%d", msg->seqno(), msg->transaction_id()); foundSync = true; if (!IsAlwaysDeferred(*msg)) { mMaybeDeferredPendingCount--; } p = p->removeAndGetNext(); continue; } p = p->getNext(); } AssertMaybeDeferredCountCorrect(); } void MessageChannel::CancelCurrentTransaction() { MonitorAutoLock lock(*mMonitor); if (DispatchingSyncMessageNestedLevel() >= IPC::Message::NESTED_INSIDE_SYNC) { if (DispatchingSyncMessageNestedLevel() == IPC::Message::NESTED_INSIDE_CPOW || DispatchingAsyncMessageNestedLevel() == IPC::Message::NESTED_INSIDE_CPOW) { mListener->IntentionalCrash(); } IPC_LOG("Cancel requested: current xid=%d", CurrentNestedInsideSyncTransaction()); MOZ_RELEASE_ASSERT(DispatchingSyncMessage()); auto cancel = MakeUnique<CancelMessage>(CurrentNestedInsideSyncTransaction()); CancelTransaction(CurrentNestedInsideSyncTransaction()); SendMessageToLink(std::move(cancel)); } } void CancelCPOWs() { MOZ_ASSERT(NS_IsMainThread()); if (gParentProcessBlocker) { mozilla::Telemetry::Accumulate(mozilla::Telemetry::IPC_TRANSACTION_CANCEL, true); gParentProcessBlocker->CancelCurrentTransaction(); } } bool MessageChannel::IsCrossProcess() const { mMonitor->AssertCurrentThreadOwns(); return mIsCrossProcess; } void MessageChannel::SetIsCrossProcess(bool aIsCrossProcess) { mMonitor->AssertCurrentThreadOwns(); if (aIsCrossProcess == mIsCrossProcess) { return; } mIsCrossProcess = aIsCrossProcess; if (mIsCrossProcess) { ChannelCountReporter::Increment(mName); } else { ChannelCountReporter::Decrement(mName); } } NS_IMPL_ISUPPORTS(MessageChannel::WorkerTargetShutdownTask, nsITargetShutdownTask) MessageChannel::WorkerTargetShutdownTask::WorkerTargetShutdownTask( nsISerialEventTarget* aTarget, MessageChannel* aChannel) : mTarget(aTarget), mChannel(aChannel) {} void MessageChannel::WorkerTargetShutdownTask::TargetShutdown() { MOZ_RELEASE_ASSERT(mTarget->IsOnCurrentThread()); IPC_LOG("Closing channel due to event target shutdown"); if (MessageChannel* channel = std::exchange(mChannel, nullptr)) { channel->Close(); } } void MessageChannel::WorkerTargetShutdownTask::Clear() { MOZ_RELEASE_ASSERT(mTarget->IsOnCurrentThread()); mChannel = nullptr; } NS_IMPL_ISUPPORTS_INHERITED0(MessageChannel::FlushLazySendMessagesRunnable, CancelableRunnable) MessageChannel::FlushLazySendMessagesRunnable::FlushLazySendMessagesRunnable( MessageChannel* aChannel) : CancelableRunnable("MessageChannel::FlushLazyMessagesRunnable"), mChannel(aChannel) {} NS_IMETHODIMP MessageChannel::FlushLazySendMessagesRunnable::Run() { if (mChannel) { MonitorAutoLock lock(*mChannel->mMonitor); MOZ_ASSERT(mChannel->mFlushLazySendTask == this); mChannel->FlushLazySendMessages(); } return NS_OK; } nsresult MessageChannel::FlushLazySendMessagesRunnable::Cancel() { mQueue.Clear(); mChannel = nullptr; return NS_OK; } void MessageChannel::FlushLazySendMessagesRunnable::PushMessage( UniquePtr<Message> aMsg) { MOZ_ASSERT(mChannel); mQueue.AppendElement(std::move(aMsg)); } nsTArray<UniquePtr<IPC::Message>> MessageChannel::FlushLazySendMessagesRunnable::TakeMessages() { mChannel = nullptr; return std::move(mQueue); } } // namespace ipc } // namespace mozilla