/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
 * vim: sw=2 ts=4 et :
 */
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

#include "mozilla/ipc/MessageChannel.h"

#include <math.h>

#include <utility>

#include "CrashAnnotations.h"
#include "mozilla/Assertions.h"
#include "mozilla/CycleCollectedJSContext.h"
#include "mozilla/DebugOnly.h"
#include "mozilla/Fuzzing.h"
#include "mozilla/IntentionalCrash.h"
#include "mozilla/Logging.h"
#include "mozilla/Monitor.h"
#include "mozilla/Mutex.h"
#include "mozilla/ProfilerMarkers.h"
#include "mozilla/ScopeExit.h"
#include "mozilla/Sprintf.h"
#include "mozilla/StaticMutex.h"
#include "mozilla/Telemetry.h"
#include "mozilla/TimeStamp.h"
#include "mozilla/UniquePtrExtensions.h"
#include "mozilla/dom/ScriptSettings.h"
#include "mozilla/ipc/NodeController.h"
#include "mozilla/ipc/ProcessChild.h"
#include "mozilla/ipc/ProtocolUtils.h"
#include "nsAppRunner.h"
#include "nsContentUtils.h"
#include "nsIDirectTaskDispatcher.h"
#include "nsTHashMap.h"
#include "nsDebug.h"
#include "nsExceptionHandler.h"
#include "nsIMemoryReporter.h"
#include "nsISupportsImpl.h"
#include "nsPrintfCString.h"
#include "nsThreadUtils.h"

#ifdef OS_WIN
#  include "mozilla/gfx/Logging.h"
#endif

#ifdef FUZZING_SNAPSHOT
#  include "mozilla/fuzzing/IPCFuzzController.h"
#endif

// Undo the damage done by mozzconf.h
#undef compress

static mozilla::LazyLogModule sLogModule("ipc");
#define IPC_LOG(...) MOZ_LOG(sLogModule, LogLevel::Debug, (__VA_ARGS__))

/*
 * IPC design:
 *
 * There are two kinds of messages: async and sync. Sync messages are blocking.
 *
 * Terminology: To dispatch a message Foo is to run the RecvFoo code for
 * it. This is also called "handling" the message.
 *
 * Sync and async messages can sometimes "nest" inside other sync messages
 * (i.e., while waiting for the sync reply, we can dispatch the inner
 * message). The three possible nesting levels are NOT_NESTED,
 * NESTED_INSIDE_SYNC, and NESTED_INSIDE_CPOW.  The intended uses are:
 *   NOT_NESTED - most messages.
 *   NESTED_INSIDE_SYNC - CPOW-related messages, which are always sync
 *                        and can go in either direction.
 *   NESTED_INSIDE_CPOW - messages where we don't want to dispatch
 *                        incoming CPOWs while waiting for the response.
 * These nesting levels are ordered: NOT_NESTED, NESTED_INSIDE_SYNC,
 * NESTED_INSIDE_CPOW.  Async messages cannot be NESTED_INSIDE_SYNC but they can
 * be NESTED_INSIDE_CPOW.
 *
 * To avoid jank, the parent process is not allowed to send NOT_NESTED sync
 * messages. When a process is waiting for a response to a sync message M0, it
 * will dispatch an incoming message M if:
 *   1. M has a higher nesting level than M0, or
 *   2. if M has the same nesting level as M0 and we're in the child, or
 *   3. if M has the same nesting level as M0 and it was sent by the other side
 *      while dispatching M0.
 * The idea is that messages with higher nesting should take precendence. The
 * purpose of rule 2 is to handle a race where both processes send to each other
 * simultaneously. In this case, we resolve the race in favor of the parent (so
 * the child dispatches first).
 *
 * Messages satisfy the following properties:
 *   A. When waiting for a response to a sync message, we won't dispatch any
 *      messages of a lower nesting level.
 *   B. Messages of the same nesting level will be dispatched roughly in the
 *      order they were sent. The exception is when the parent and child send
 *      sync messages to each other simulataneously. In this case, the parent's
 *      message is dispatched first. While it is dispatched, the child may send
 *      further nested messages, and these messages may be dispatched before the
 *      child's original message. We can consider ordering to be preserved here
 *      because we pretend that the child's original message wasn't sent until
 *      after the parent's message is finished being dispatched.
 *
 * When waiting for a sync message reply, we dispatch an async message only if
 * it is NESTED_INSIDE_CPOW. Normally NESTED_INSIDE_CPOW async
 * messages are sent only from the child. However, the parent can send
 * NESTED_INSIDE_CPOW async messages when it is creating a bridged protocol.
 */

using namespace mozilla;
using namespace mozilla::ipc;

using mozilla::MonitorAutoLock;
using mozilla::MonitorAutoUnlock;
using mozilla::dom::AutoNoJSAPI;

#define IPC_ASSERT(_cond, ...)                                           \
  do {                                                                   \
    AssertWorkerThread();                                                \
    mMonitor->AssertCurrentThreadOwns();                                 \
    if (!(_cond)) DebugAbort(__FILE__, __LINE__, #_cond, ##__VA_ARGS__); \
  } while (0)

static MessageChannel* gParentProcessBlocker = nullptr;

namespace mozilla {
namespace ipc {

static const uint32_t kMinTelemetryMessageSize = 4096;

// Note: we round the time we spend waiting for a response to the nearest
// millisecond. So a min value of 1 ms actually captures from 500us and above.
// This is used for both the sending and receiving side telemetry for sync IPC,
// (IPC_SYNC_MAIN_LATENCY_MS and IPC_SYNC_RECEIVE_MS).
static const uint32_t kMinTelemetrySyncIPCLatencyMs = 1;

// static
bool MessageChannel::sIsPumpingMessages = false;

class AutoEnterTransaction {
 public:
  explicit AutoEnterTransaction(MessageChannel* aChan, int32_t aMsgSeqno,
                                int32_t aTransactionID, int aNestedLevel)
      MOZ_REQUIRES(*aChan->mMonitor)
      : mChan(aChan),
        mActive(true),
        mOutgoing(true),
        mNestedLevel(aNestedLevel),
        mSeqno(aMsgSeqno),
        mTransaction(aTransactionID),
        mNext(mChan->mTransactionStack) {
    mChan->mMonitor->AssertCurrentThreadOwns();
    mChan->mTransactionStack = this;
  }

  explicit AutoEnterTransaction(MessageChannel* aChan,
                                const IPC::Message& aMessage)
      MOZ_REQUIRES(*aChan->mMonitor)
      : mChan(aChan),
        mActive(true),
        mOutgoing(false),
        mNestedLevel(aMessage.nested_level()),
        mSeqno(aMessage.seqno()),
        mTransaction(aMessage.transaction_id()),
        mNext(mChan->mTransactionStack) {
    mChan->mMonitor->AssertCurrentThreadOwns();

    if (!aMessage.is_sync()) {
      mActive = false;
      return;
    }

    mChan->mTransactionStack = this;
  }

  ~AutoEnterTransaction() {
    mChan->mMonitor->AssertCurrentThreadOwns();
    if (mActive) {
      mChan->mTransactionStack = mNext;
    }
  }

  void Cancel() {
    mChan->mMonitor->AssertCurrentThreadOwns();
    AutoEnterTransaction* cur = mChan->mTransactionStack;
    MOZ_RELEASE_ASSERT(cur == this);
    while (cur && cur->mNestedLevel != IPC::Message::NOT_NESTED) {
      // Note that, in the following situation, we will cancel multiple
      // transactions:
      // 1. Parent sends NESTED_INSIDE_SYNC message P1 to child.
      // 2. Child sends NESTED_INSIDE_SYNC message C1 to child.
      // 3. Child dispatches P1, parent blocks.
      // 4. Child cancels.
      // In this case, both P1 and C1 are cancelled. The parent will
      // remove C1 from its queue when it gets the cancellation message.
      MOZ_RELEASE_ASSERT(cur->mActive);
      cur->mActive = false;
      cur = cur->mNext;
    }

    mChan->mTransactionStack = cur;

    MOZ_RELEASE_ASSERT(IsComplete());
  }

  bool AwaitingSyncReply() const {
    MOZ_RELEASE_ASSERT(mActive);
    if (mOutgoing) {
      return true;
    }
    return mNext ? mNext->AwaitingSyncReply() : false;
  }

  int AwaitingSyncReplyNestedLevel() const {
    MOZ_RELEASE_ASSERT(mActive);
    if (mOutgoing) {
      return mNestedLevel;
    }
    return mNext ? mNext->AwaitingSyncReplyNestedLevel() : 0;
  }

  bool DispatchingSyncMessage() const {
    MOZ_RELEASE_ASSERT(mActive);
    if (!mOutgoing) {
      return true;
    }
    return mNext ? mNext->DispatchingSyncMessage() : false;
  }

  int DispatchingSyncMessageNestedLevel() const {
    MOZ_RELEASE_ASSERT(mActive);
    if (!mOutgoing) {
      return mNestedLevel;
    }
    return mNext ? mNext->DispatchingSyncMessageNestedLevel() : 0;
  }

  int NestedLevel() const {
    MOZ_RELEASE_ASSERT(mActive);
    return mNestedLevel;
  }

  int32_t SequenceNumber() const {
    MOZ_RELEASE_ASSERT(mActive);
    return mSeqno;
  }

  int32_t TransactionID() const {
    MOZ_RELEASE_ASSERT(mActive);
    return mTransaction;
  }

  void ReceivedReply(UniquePtr<IPC::Message> aMessage) {
    MOZ_RELEASE_ASSERT(aMessage->seqno() == mSeqno);
    MOZ_RELEASE_ASSERT(aMessage->transaction_id() == mTransaction);
    MOZ_RELEASE_ASSERT(!mReply);
    IPC_LOG("Reply received on worker thread: seqno=%d", mSeqno);
    mReply = std::move(aMessage);
    MOZ_RELEASE_ASSERT(IsComplete());
  }

  void HandleReply(UniquePtr<IPC::Message> aMessage) {
    mChan->mMonitor->AssertCurrentThreadOwns();
    AutoEnterTransaction* cur = mChan->mTransactionStack;
    MOZ_RELEASE_ASSERT(cur == this);
    while (cur) {
      MOZ_RELEASE_ASSERT(cur->mActive);
      if (aMessage->seqno() == cur->mSeqno) {
        cur->ReceivedReply(std::move(aMessage));
        break;
      }
      cur = cur->mNext;
      MOZ_RELEASE_ASSERT(cur);
    }
  }

  bool IsComplete() { return !mActive || mReply; }

  bool IsOutgoing() { return mOutgoing; }

  bool IsCanceled() { return !mActive; }

  bool IsBottom() const { return !mNext; }

  bool IsError() {
    MOZ_RELEASE_ASSERT(mReply);
    return mReply->is_reply_error();
  }

  UniquePtr<IPC::Message> GetReply() { return std::move(mReply); }

 private:
  MessageChannel* mChan;

  // Active is true if this transaction is on the mChan->mTransactionStack
  // stack. Generally we're not on the stack if the transaction was canceled
  // or if it was for a message that doesn't require transactions (an async
  // message).
  bool mActive;

  // Is this stack frame for an outgoing message?
  bool mOutgoing;

  // Properties of the message being sent/received.
  int mNestedLevel;
  int32_t mSeqno;
  int32_t mTransaction;

  // Next item in mChan->mTransactionStack.
  AutoEnterTransaction* mNext;

  // Pointer the a reply received for this message, if one was received.
  UniquePtr<IPC::Message> mReply;
};

class PendingResponseReporter final : public nsIMemoryReporter {
  ~PendingResponseReporter() = default;

 public:
  NS_DECL_THREADSAFE_ISUPPORTS

  NS_IMETHOD
  CollectReports(nsIHandleReportCallback* aHandleReport, nsISupports* aData,
                 bool aAnonymize) override {
    MOZ_COLLECT_REPORT(
        "unresolved-ipc-responses", KIND_OTHER, UNITS_COUNT,
        MessageChannel::gUnresolvedResponses,
        "Outstanding IPC async message responses that are still not resolved.");
    return NS_OK;
  }
};

NS_IMPL_ISUPPORTS(PendingResponseReporter, nsIMemoryReporter)

class ChannelCountReporter final : public nsIMemoryReporter {
  ~ChannelCountReporter() = default;

  struct ChannelCounts {
    size_t mNow;
    size_t mMax;

    ChannelCounts() : mNow(0), mMax(0) {}

    void Inc() {
      ++mNow;
      if (mMax < mNow) {
        mMax = mNow;
      }
    }

    void Dec() {
      MOZ_ASSERT(mNow > 0);
      --mNow;
    }
  };

  using CountTable = nsTHashMap<nsDepCharHashKey, ChannelCounts>;

  static StaticMutex sChannelCountMutex;
  static CountTable* sChannelCounts MOZ_GUARDED_BY(sChannelCountMutex);

 public:
  NS_DECL_THREADSAFE_ISUPPORTS

  NS_IMETHOD
  CollectReports(nsIHandleReportCallback* aHandleReport, nsISupports* aData,
                 bool aAnonymize) override {
    AutoTArray<std::pair<const char*, ChannelCounts>, 16> counts;
    {
      StaticMutexAutoLock countLock(sChannelCountMutex);
      if (!sChannelCounts) {
        return NS_OK;
      }
      counts.SetCapacity(sChannelCounts->Count());
      for (const auto& entry : *sChannelCounts) {
        counts.AppendElement(std::pair{entry.GetKey(), entry.GetData()});
      }
    }

    for (const auto& entry : counts) {
      nsPrintfCString pathNow("ipc-channels/%s", entry.first);
      nsPrintfCString pathMax("ipc-channels-peak/%s", entry.first);
      nsPrintfCString descNow(
          "Number of IPC channels for"
          " top-level actor type %s",
          entry.first);
      nsPrintfCString descMax(
          "Peak number of IPC channels for"
          " top-level actor type %s",
          entry.first);

      aHandleReport->Callback(""_ns, pathNow, KIND_OTHER, UNITS_COUNT,
                              entry.second.mNow, descNow, aData);
      aHandleReport->Callback(""_ns, pathMax, KIND_OTHER, UNITS_COUNT,
                              entry.second.mMax, descMax, aData);
    }
    return NS_OK;
  }

  static void Increment(const char* aName) {
    StaticMutexAutoLock countLock(sChannelCountMutex);
    if (!sChannelCounts) {
      sChannelCounts = new CountTable;
    }
    sChannelCounts->LookupOrInsert(aName).Inc();
  }

  static void Decrement(const char* aName) {
    StaticMutexAutoLock countLock(sChannelCountMutex);
    MOZ_ASSERT(sChannelCounts);
    sChannelCounts->LookupOrInsert(aName).Dec();
  }
};

StaticMutex ChannelCountReporter::sChannelCountMutex;
ChannelCountReporter::CountTable* ChannelCountReporter::sChannelCounts;

NS_IMPL_ISUPPORTS(ChannelCountReporter, nsIMemoryReporter)

// In child processes, the first MessageChannel is created before
// XPCOM is initialized enough to construct the memory reporter
// manager.  This retries every time a MessageChannel is constructed,
// which is good enough in practice.
template <class Reporter>
static void TryRegisterStrongMemoryReporter() {
  static Atomic<bool> registered;
  if (registered.compareExchange(false, true)) {
    RefPtr<Reporter> reporter = new Reporter();
    if (NS_FAILED(RegisterStrongMemoryReporter(reporter))) {
      registered = false;
    }
  }
}

Atomic<size_t> MessageChannel::gUnresolvedResponses;

MessageChannel::MessageChannel(const char* aName, IToplevelProtocol* aListener)
    : mName(aName), mListener(aListener), mMonitor(new RefCountedMonitor()) {
  MOZ_COUNT_CTOR(ipc::MessageChannel);

#ifdef OS_WIN
  mEvent = CreateEventW(nullptr, TRUE, FALSE, nullptr);
  MOZ_RELEASE_ASSERT(mEvent, "CreateEvent failed! Nothing is going to work!");
#endif

  TryRegisterStrongMemoryReporter<PendingResponseReporter>();
  TryRegisterStrongMemoryReporter<ChannelCountReporter>();
}

MessageChannel::~MessageChannel() {
  MOZ_COUNT_DTOR(ipc::MessageChannel);
  MonitorAutoLock lock(*mMonitor);
  MOZ_RELEASE_ASSERT(!mOnCxxStack,
                     "MessageChannel destroyed while code on CxxStack");
#ifdef OS_WIN
  if (mEvent) {
    BOOL ok = CloseHandle(mEvent);
    mEvent = nullptr;

    if (!ok) {
      gfxDevCrash(mozilla::gfx::LogReason::MessageChannelCloseFailure)
          << "MessageChannel failed to close. GetLastError: " << GetLastError();
    }
    MOZ_RELEASE_ASSERT(ok);
  } else {
    gfxDevCrash(mozilla::gfx::LogReason::MessageChannelCloseFailure)
        << "MessageChannel destructor ran without an mEvent Handle";
  }
#endif

  // Make sure that the MessageChannel was closed (and therefore cleared) before
  // it was destroyed. We can't properly close the channel at this point, as it
  // would be unsafe to invoke our listener's callbacks, and we may be being
  // destroyed on a thread other than `mWorkerThread`.
  if (!IsClosedLocked()) {
    CrashReporter::AnnotateCrashReport(
        CrashReporter::Annotation::IPCFatalErrorProtocol,
        nsDependentCString(mName));
    switch (mChannelState) {
      case ChannelConnected:
        MOZ_CRASH(
            "MessageChannel destroyed without being closed "
            "(mChannelState == ChannelConnected).");
        break;
      case ChannelClosing:
        MOZ_CRASH(
            "MessageChannel destroyed without being closed "
            "(mChannelState == ChannelClosing).");
        break;
      case ChannelError:
        MOZ_CRASH(
            "MessageChannel destroyed without being closed "
            "(mChannelState == ChannelError).");
        break;
      default:
        MOZ_CRASH("MessageChannel destroyed without being closed.");
    }
  }

  // Double-check other properties for thoroughness.
  MOZ_RELEASE_ASSERT(!mLink);
  MOZ_RELEASE_ASSERT(mPendingResponses.empty());
  MOZ_RELEASE_ASSERT(!mChannelErrorTask);
  MOZ_RELEASE_ASSERT(mPending.isEmpty());
  MOZ_RELEASE_ASSERT(!mShutdownTask);
}

#ifdef DEBUG
void MessageChannel::AssertMaybeDeferredCountCorrect() {
  mMonitor->AssertCurrentThreadOwns();

  size_t count = 0;
  for (MessageTask* task : mPending) {
    task->AssertMonitorHeld(*mMonitor);
    if (!IsAlwaysDeferred(*task->Msg())) {
      count++;
    }
  }

  MOZ_ASSERT(count == mMaybeDeferredPendingCount);
}
#endif

// This function returns the current transaction ID. Since the notion of a
// "current transaction" can be hard to define when messages race with each
// other and one gets canceled and the other doesn't, we require that this
// function is only called when the current transaction is known to be for a
// NESTED_INSIDE_SYNC message. In that case, we know for sure what the caller is
// looking for.
int32_t MessageChannel::CurrentNestedInsideSyncTransaction() const {
  mMonitor->AssertCurrentThreadOwns();
  if (!mTransactionStack) {
    return 0;
  }
  MOZ_RELEASE_ASSERT(mTransactionStack->NestedLevel() ==
                     IPC::Message::NESTED_INSIDE_SYNC);
  return mTransactionStack->TransactionID();
}

bool MessageChannel::AwaitingSyncReply() const {
  mMonitor->AssertCurrentThreadOwns();
  return mTransactionStack ? mTransactionStack->AwaitingSyncReply() : false;
}

int MessageChannel::AwaitingSyncReplyNestedLevel() const {
  mMonitor->AssertCurrentThreadOwns();
  return mTransactionStack ? mTransactionStack->AwaitingSyncReplyNestedLevel()
                           : 0;
}

bool MessageChannel::DispatchingSyncMessage() const {
  mMonitor->AssertCurrentThreadOwns();
  return mTransactionStack ? mTransactionStack->DispatchingSyncMessage()
                           : false;
}

int MessageChannel::DispatchingSyncMessageNestedLevel() const {
  mMonitor->AssertCurrentThreadOwns();
  return mTransactionStack
             ? mTransactionStack->DispatchingSyncMessageNestedLevel()
             : 0;
}

static const char* StringFromIPCSide(Side side) {
  switch (side) {
    case ChildSide:
      return "Child";
    case ParentSide:
      return "Parent";
    default:
      return "Unknown";
  }
}

static void PrintErrorMessage(Side side, const char* channelName,
                              const char* msg) {
  printf_stderr("\n###!!! [%s][%s] Error: %s\n\n", StringFromIPCSide(side),
                channelName, msg);
}

bool MessageChannel::Connected() const {
  mMonitor->AssertCurrentThreadOwns();
  return ChannelConnected == mChannelState;
}

bool MessageChannel::ConnectedOrClosing() const {
  mMonitor->AssertCurrentThreadOwns();
  return ChannelConnected == mChannelState || ChannelClosing == mChannelState;
}

bool MessageChannel::CanSend() const {
  if (!mMonitor) {
    return false;
  }
  MonitorAutoLock lock(*mMonitor);
  return Connected();
}

void MessageChannel::Clear() {
  AssertWorkerThread();
  mMonitor->AssertCurrentThreadOwns();
  MOZ_DIAGNOSTIC_ASSERT(IsClosedLocked(), "MessageChannel cleared too early?");
  MOZ_ASSERT(ChannelClosed == mChannelState || ChannelError == mChannelState);

  // Don't clear mWorkerThread; we use it in AssertWorkerThread().
  //
  // Also don't clear mListener.  If we clear it, then sending a message
  // through this channel after it's Clear()'ed can cause this process to
  // crash.

  if (mShutdownTask) {
    mShutdownTask->Clear();
    mWorkerThread->UnregisterShutdownTask(mShutdownTask);
  }
  mShutdownTask = nullptr;

  if (NS_IsMainThread() && gParentProcessBlocker == this) {
    gParentProcessBlocker = nullptr;
  }

  gUnresolvedResponses -= mPendingResponses.size();
  {
    CallbackMap map = std::move(mPendingResponses);
    MonitorAutoUnlock unlock(*mMonitor);
    for (auto& pair : map) {
      pair.second->Reject(ResponseRejectReason::ChannelClosed);
    }
  }
  mPendingResponses.clear();

  SetIsCrossProcess(false);

  mLink = nullptr;

  if (mChannelErrorTask) {
    mChannelErrorTask->Cancel();
    mChannelErrorTask = nullptr;
  }

  if (mFlushLazySendTask) {
    mFlushLazySendTask->Cancel();
    mFlushLazySendTask = nullptr;
  }

  // Free up any memory used by pending messages.
  mPending.clear();

  mMaybeDeferredPendingCount = 0;
}

bool MessageChannel::Open(ScopedPort aPort, Side aSide,
                          const nsID& aMessageChannelId,
                          nsISerialEventTarget* aEventTarget) {
  nsCOMPtr<nsISerialEventTarget> eventTarget =
      aEventTarget ? aEventTarget : GetCurrentSerialEventTarget();
  MOZ_RELEASE_ASSERT(eventTarget,
                     "Must open MessageChannel on a nsISerialEventTarget");
  MOZ_RELEASE_ASSERT(eventTarget->IsOnCurrentThread(),
                     "Must open MessageChannel from worker thread");

  auto shutdownTask = MakeRefPtr<WorkerTargetShutdownTask>(eventTarget, this);
  nsresult rv = eventTarget->RegisterShutdownTask(shutdownTask);
  MOZ_ASSERT(rv != NS_ERROR_NOT_IMPLEMENTED,
             "target for MessageChannel must support shutdown tasks");
  if (rv == NS_ERROR_UNEXPECTED) {
    // If shutdown tasks have already started running, dispatch our shutdown
    // task manually.
    NS_WARNING("Opening MessageChannel on EventTarget in shutdown");
    rv = eventTarget->Dispatch(shutdownTask->AsRunnable());
  }
  MOZ_RELEASE_ASSERT(NS_SUCCEEDED(rv),
                     "error registering ShutdownTask for MessageChannel");

  {
    MonitorAutoLock lock(*mMonitor);
    MOZ_RELEASE_ASSERT(!mLink, "Open() called > once");
    MOZ_RELEASE_ASSERT(ChannelClosed == mChannelState, "Not currently closed");
    MOZ_ASSERT(mSide == UnknownSide);

    mMessageChannelId = aMessageChannelId;
    mWorkerThread = eventTarget;
    mShutdownTask = shutdownTask;
    mLink = MakeUnique<PortLink>(this, std::move(aPort));
    mChannelState = ChannelConnected;
    mSide = aSide;
  }

  // Notify our listener that the underlying IPC channel has been established.
  // IProtocol will use this callback to create the ActorLifecycleProxy, and
  // perform an `AddRef` call to keep the actor alive until the channel is
  // disconnected.
  //
  // We unlock our monitor before calling `OnIPCChannelOpened` to ensure that
  // any calls back into `MessageChannel` do not deadlock. At this point, we may
  // be receiving messages on the IO thread, however we cannot process them on
  // the worker thread or have notified our listener until after this function
  // returns.
  mListener->OnIPCChannelOpened();
  return true;
}

static Side GetOppSide(Side aSide) {
  switch (aSide) {
    case ChildSide:
      return ParentSide;
    case ParentSide:
      return ChildSide;
    default:
      return UnknownSide;
  }
}

bool MessageChannel::Open(MessageChannel* aTargetChan,
                          nsISerialEventTarget* aEventTarget, Side aSide) {
  // Opens a connection to another thread in the same process.

  MOZ_ASSERT(aTargetChan, "Need a target channel");

  nsID channelId = nsID::GenerateUUID();

  std::pair<ScopedPort, ScopedPort> ports =
      NodeController::GetSingleton()->CreatePortPair();

  // NOTE: This dispatch must be sync as it captures locals by non-owning
  // reference, however we can't use `NS_DispatchAndSpinEventLoopUntilComplete`
  // as that will spin a nested event loop, and doesn't work with certain types
  // of calling event targets.
  base::WaitableEvent event(/* manual_reset */ true,
                            /* initially_signaled */ false);
  MOZ_ALWAYS_SUCCEEDS(aEventTarget->Dispatch(NS_NewCancelableRunnableFunction(
      "ipc::MessageChannel::OpenAsOtherThread", [&]() {
        aTargetChan->Open(std::move(ports.second), GetOppSide(aSide), channelId,
                          aEventTarget);
        event.Signal();
      })));
  bool ok = event.Wait();
  MOZ_RELEASE_ASSERT(ok);

  // Now that the other side has connected, open the port on our side.
  return Open(std::move(ports.first), aSide, channelId);
}

bool MessageChannel::OpenOnSameThread(MessageChannel* aTargetChan,
                                      mozilla::ipc::Side aSide) {
  auto [porta, portb] = NodeController::GetSingleton()->CreatePortPair();

  nsID channelId = nsID::GenerateUUID();

  aTargetChan->mIsSameThreadChannel = true;
  mIsSameThreadChannel = true;

  auto* currentThread = GetCurrentSerialEventTarget();
  return aTargetChan->Open(std::move(portb), GetOppSide(aSide), channelId,
                           currentThread) &&
         Open(std::move(porta), aSide, channelId, currentThread);
}

bool MessageChannel::Send(UniquePtr<Message> aMsg) {
  if (aMsg->size() >= kMinTelemetryMessageSize) {
    Telemetry::Accumulate(Telemetry::IPC_MESSAGE_SIZE2, aMsg->size());
  }

  MOZ_RELEASE_ASSERT(!aMsg->is_sync());
  MOZ_RELEASE_ASSERT(aMsg->nested_level() != IPC::Message::NESTED_INSIDE_SYNC);

  AutoSetValue<bool> setOnCxxStack(mOnCxxStack, true);

  AssertWorkerThread();
  mMonitor->AssertNotCurrentThreadOwns();
  if (MSG_ROUTING_NONE == aMsg->routing_id()) {
    ReportMessageRouteError("MessageChannel::Send");
    return false;
  }

  if (aMsg->seqno() == 0) {
    aMsg->set_seqno(NextSeqno());
  }

  MonitorAutoLock lock(*mMonitor);
  if (!Connected()) {
    ReportConnectionError("Send", aMsg->type());
    return false;
  }

  AddProfilerMarker(*aMsg, MessageDirection::eSending);
  SendMessageToLink(std::move(aMsg));
  return true;
}

void MessageChannel::SendMessageToLink(UniquePtr<Message> aMsg) {
  AssertWorkerThread();
  mMonitor->AssertCurrentThreadOwns();

  // If the channel is not cross-process, there's no reason to be lazy, so we
  // ignore the flag in that case.
  if (aMsg->is_lazy_send() && mIsCrossProcess) {
    // If this is the first lazy message in the queue and our worker thread
    // supports direct task dispatch, dispatch a task to flush messages,
    // ensuring we don't leave them pending forever.
    if (!mFlushLazySendTask) {
      if (nsCOMPtr<nsIDirectTaskDispatcher> dispatcher =
              do_QueryInterface(mWorkerThread)) {
        mFlushLazySendTask = new FlushLazySendMessagesRunnable(this);
        MOZ_ALWAYS_SUCCEEDS(
            dispatcher->DispatchDirectTask(do_AddRef(mFlushLazySendTask)));
      }
    }
    if (mFlushLazySendTask) {
      mFlushLazySendTask->PushMessage(std::move(aMsg));
      return;
    }
  }

  if (mFlushLazySendTask) {
    FlushLazySendMessages();
  }
  mLink->SendMessage(std::move(aMsg));
}

void MessageChannel::FlushLazySendMessages() {
  AssertWorkerThread();
  mMonitor->AssertCurrentThreadOwns();

  // Clean up any SendLazyTask which might be pending.
  auto messages = mFlushLazySendTask->TakeMessages();
  mFlushLazySendTask = nullptr;

  // Send all lazy messages, then clear the queue.
  for (auto& msg : messages) {
    mLink->SendMessage(std::move(msg));
  }
}

UniquePtr<MessageChannel::UntypedCallbackHolder> MessageChannel::PopCallback(
    const Message& aMsg, int32_t aActorId) {
  auto iter = mPendingResponses.find(aMsg.seqno());
  if (iter != mPendingResponses.end() && iter->second->mActorId == aActorId &&
      iter->second->mReplyMsgId == aMsg.type()) {
    UniquePtr<MessageChannel::UntypedCallbackHolder> ret =
        std::move(iter->second);
    mPendingResponses.erase(iter);
    gUnresolvedResponses--;
    return ret;
  }
  return nullptr;
}

void MessageChannel::RejectPendingResponsesForActor(int32_t aActorId) {
  auto itr = mPendingResponses.begin();
  while (itr != mPendingResponses.end()) {
    if (itr->second.get()->mActorId != aActorId) {
      ++itr;
      continue;
    }
    itr->second.get()->Reject(ResponseRejectReason::ActorDestroyed);
    // Take special care of advancing the iterator since we are
    // removing it while iterating.
    itr = mPendingResponses.erase(itr);
    gUnresolvedResponses--;
  }
}

class BuildIDsMatchMessage : public IPC::Message {
 public:
  BuildIDsMatchMessage()
      : IPC::Message(MSG_ROUTING_NONE, BUILD_IDS_MATCH_MESSAGE_TYPE) {}
  void Log(const std::string& aPrefix, FILE* aOutf) const {
    fputs("(special `Build IDs match' message)", aOutf);
  }
};

// Send the parent a special async message to confirm when the parent and child
// are of the same buildID. Skips sending the message and returns false if the
// buildIDs don't match. This is a minor variation on
// MessageChannel::Send(Message* aMsg).
bool MessageChannel::SendBuildIDsMatchMessage(const char* aParentBuildID) {
  MOZ_ASSERT(!XRE_IsParentProcess());

  nsCString parentBuildID(aParentBuildID);
  nsCString childBuildID(mozilla::PlatformBuildID());

  if (parentBuildID != childBuildID) {
    // The build IDs didn't match, usually because an update occurred in the
    // background.
    return false;
  }

  auto msg = MakeUnique<BuildIDsMatchMessage>();

  MOZ_RELEASE_ASSERT(!msg->is_sync());
  MOZ_RELEASE_ASSERT(msg->nested_level() != IPC::Message::NESTED_INSIDE_SYNC);

  AssertWorkerThread();
  mMonitor->AssertNotCurrentThreadOwns();
  // Don't check for MSG_ROUTING_NONE.

  MonitorAutoLock lock(*mMonitor);
  if (!Connected()) {
    ReportConnectionError("SendBuildIDsMatchMessage", msg->type());
    return false;
  }

#if defined(MOZ_DEBUG) && defined(ENABLE_TESTS)
  // Technically, the behavior is interesting for any kind of process
  // but when exercising tests, we want to crash only a content process and
  // avoid making noise with other kind of processes crashing
  if (const char* dontSend = PR_GetEnv("MOZ_BUILDID_MATCH_DONTSEND")) {
    if (dontSend[0] == '1') {
      // Bug 1732999: We are going to crash, so we need to advise leak check
      // tooling to avoid intermittent missing leakcheck
      NoteIntentionalCrash(XRE_GetProcessTypeString());
      if (XRE_IsContentProcess()) {
        return false;
      }
    }
  }
#endif

  SendMessageToLink(std::move(msg));
  return true;
}

class CancelMessage : public IPC::Message {
 public:
  explicit CancelMessage(int transaction)
      : IPC::Message(MSG_ROUTING_NONE, CANCEL_MESSAGE_TYPE) {
    set_transaction_id(transaction);
  }
  static bool Read(const Message* msg) { return true; }
  void Log(const std::string& aPrefix, FILE* aOutf) const {
    fputs("(special `Cancel' message)", aOutf);
  }
};

bool MessageChannel::MaybeInterceptSpecialIOMessage(const Message& aMsg) {
  mMonitor->AssertCurrentThreadOwns();

  if (MSG_ROUTING_NONE == aMsg.routing_id()) {
    if (GOODBYE_MESSAGE_TYPE == aMsg.type()) {
      // We've received a GOODBYE message, close the connection and mark
      // ourselves as "Closing".
      mLink->Close();
      mChannelState = ChannelClosing;
      if (LoggingEnabled()) {
        printf(
            "[%s %u] NOTE: %s actor received `Goodbye' message.  Closing "
            "channel.\n",
            XRE_GeckoProcessTypeToString(XRE_GetProcessType()),
            static_cast<uint32_t>(base::GetCurrentProcId()),
            (mSide == ChildSide) ? "child" : "parent");
      }

      // Notify the worker thread that the connection has been closed, as we
      // will not receive an `OnChannelErrorFromLink` after calling
      // `mLink->Close()`.
      if (AwaitingSyncReply()) {
        NotifyWorkerThread();
      }
      PostErrorNotifyTask();
      return true;
    } else if (CANCEL_MESSAGE_TYPE == aMsg.type()) {
      IPC_LOG("Cancel from message");
      CancelTransaction(aMsg.transaction_id());
      NotifyWorkerThread();
      return true;
    } else if (BUILD_IDS_MATCH_MESSAGE_TYPE == aMsg.type()) {
      IPC_LOG("Build IDs match message");
      mBuildIDsConfirmedMatch = true;
      return true;
    } else if (IMPENDING_SHUTDOWN_MESSAGE_TYPE == aMsg.type()) {
      IPC_LOG("Impending Shutdown received");
      ProcessChild::NotifiedImpendingShutdown();
      return true;
    }
  }
  return false;
}

/* static */
bool MessageChannel::IsAlwaysDeferred(const Message& aMsg) {
  // If a message is not NESTED_INSIDE_CPOW and not sync, then we always defer
  // it.
  return aMsg.nested_level() != IPC::Message::NESTED_INSIDE_CPOW &&
         !aMsg.is_sync();
}

bool MessageChannel::ShouldDeferMessage(const Message& aMsg) {
  // Never defer messages that have the highest nested level, even async
  // ones. This is safe because only the child can send these messages, so
  // they can never nest.
  if (aMsg.nested_level() == IPC::Message::NESTED_INSIDE_CPOW) {
    MOZ_ASSERT(!IsAlwaysDeferred(aMsg));
    return false;
  }

  // Unless they're NESTED_INSIDE_CPOW, we always defer async messages.
  // Note that we never send an async NESTED_INSIDE_SYNC message.
  if (!aMsg.is_sync()) {
    MOZ_RELEASE_ASSERT(aMsg.nested_level() == IPC::Message::NOT_NESTED);
    MOZ_ASSERT(IsAlwaysDeferred(aMsg));
    return true;
  }

  MOZ_ASSERT(!IsAlwaysDeferred(aMsg));

  int msgNestedLevel = aMsg.nested_level();
  int waitingNestedLevel = AwaitingSyncReplyNestedLevel();

  // Always defer if the nested level of the incoming message is less than the
  // nested level of the message we're awaiting.
  if (msgNestedLevel < waitingNestedLevel) return true;

  // Never defer if the message has strictly greater nested level.
  if (msgNestedLevel > waitingNestedLevel) return false;

  // When both sides send sync messages of the same nested level, we resolve the
  // race by dispatching in the child and deferring the incoming message in
  // the parent. However, the parent still needs to dispatch nested sync
  // messages.
  //
  // Deferring in the parent only sort of breaks message ordering. When the
  // child's message comes in, we can pretend the child hasn't quite
  // finished sending it yet. Since the message is sync, we know that the
  // child hasn't moved on yet.
  return mSide == ParentSide &&
         aMsg.transaction_id() != CurrentNestedInsideSyncTransaction();
}

void MessageChannel::OnMessageReceivedFromLink(UniquePtr<Message> aMsg) {
  mMonitor->AssertCurrentThreadOwns();
  MOZ_ASSERT(mChannelState == ChannelConnected);

  if (MaybeInterceptSpecialIOMessage(*aMsg)) {
    return;
  }

  mListener->OnChannelReceivedMessage(*aMsg);

  // If we're awaiting a sync reply, we know that it needs to be immediately
  // handled to unblock us.
  if (aMsg->is_sync() && aMsg->is_reply()) {
    IPC_LOG("Received reply seqno=%d xid=%d", aMsg->seqno(),
            aMsg->transaction_id());

    if (aMsg->seqno() == mTimedOutMessageSeqno) {
      // Drop the message, but allow future sync messages to be sent.
      IPC_LOG("Received reply to timedout message; igoring; xid=%d",
              mTimedOutMessageSeqno);
      EndTimeout();
      return;
    }

    MOZ_RELEASE_ASSERT(AwaitingSyncReply());
    MOZ_RELEASE_ASSERT(!mTimedOutMessageSeqno);

    mTransactionStack->HandleReply(std::move(aMsg));
    NotifyWorkerThread();
    return;
  }

  // Nested messages cannot be compressed.
  MOZ_RELEASE_ASSERT(aMsg->compress_type() == IPC::Message::COMPRESSION_NONE ||
                     aMsg->nested_level() == IPC::Message::NOT_NESTED);

  if (aMsg->compress_type() == IPC::Message::COMPRESSION_ENABLED &&
      !mPending.isEmpty()) {
    auto* last = mPending.getLast();
    last->AssertMonitorHeld(*mMonitor);
    bool compress = last->Msg()->type() == aMsg->type() &&
                    last->Msg()->routing_id() == aMsg->routing_id();
    if (compress) {
      // This message type has compression enabled, and the back of the
      // queue was the same message type and routed to the same destination.
      // Replace it with the newer message.
      MOZ_RELEASE_ASSERT(last->Msg()->compress_type() ==
                         IPC::Message::COMPRESSION_ENABLED);
      last->Msg() = std::move(aMsg);
      return;
    }
  } else if (aMsg->compress_type() == IPC::Message::COMPRESSION_ALL &&
             !mPending.isEmpty()) {
    for (MessageTask* p = mPending.getLast(); p; p = p->getPrevious()) {
      p->AssertMonitorHeld(*mMonitor);
      if (p->Msg()->type() == aMsg->type() &&
          p->Msg()->routing_id() == aMsg->routing_id()) {
        // This message type has compression enabled, and the queue
        // holds a message with the same message type and routed to the
        // same destination. Erase it. Note that, since we always
        // compress these redundancies, There Can Be Only One.
        MOZ_RELEASE_ASSERT(p->Msg()->compress_type() ==
                           IPC::Message::COMPRESSION_ALL);
        MOZ_RELEASE_ASSERT(IsAlwaysDeferred(*p->Msg()));
        p->remove();
        break;
      }
    }
  }

  bool alwaysDeferred = IsAlwaysDeferred(*aMsg);

  bool shouldWakeUp = AwaitingSyncReply() && !ShouldDeferMessage(*aMsg);

  IPC_LOG("Receive from link; seqno=%d, xid=%d, shouldWakeUp=%d", aMsg->seqno(),
          aMsg->transaction_id(), shouldWakeUp);

  // There are two cases we're concerned about, relating to the state of the
  // worker thread:
  //
  // (1) We are waiting on a sync reply - worker thread is blocked on the
  //     IPC monitor.
  //   - If the message is NESTED_INSIDE_SYNC, we wake up the worker thread to
  //     deliver the message depending on ShouldDeferMessage. Otherwise, we
  //     leave it in the mPending queue, posting a task to the worker event
  //     loop, where it will be processed once the synchronous reply has been
  //     received.
  //
  // (2) We are not waiting on a reply.
  //   - We post a task to the worker event loop.
  //
  // Note that, we may notify the worker thread even though the monitor is not
  // blocked. This is okay, since we always check for pending events before
  // blocking again.

  RefPtr<MessageTask> task = new MessageTask(this, std::move(aMsg));
  mPending.insertBack(task);

  if (!alwaysDeferred) {
    mMaybeDeferredPendingCount++;
  }

  if (shouldWakeUp) {
    NotifyWorkerThread();
  }

  // Although we usually don't need to post a message task if
  // shouldWakeUp is true, it's easier to post anyway than to have to
  // guarantee that every Send call processes everything it's supposed to
  // before returning.
  task->AssertMonitorHeld(*mMonitor);
  task->Post();
}

void MessageChannel::PeekMessages(
    const std::function<bool(const Message& aMsg)>& aInvoke) {
  // FIXME: We shouldn't be holding the lock for aInvoke!
  MonitorAutoLock lock(*mMonitor);

  for (MessageTask* it : mPending) {
    it->AssertMonitorHeld(*mMonitor);
    const Message& msg = *it->Msg();
    if (!aInvoke(msg)) {
      break;
    }
  }
}

void MessageChannel::ProcessPendingRequests(
    ActorLifecycleProxy* aProxy, AutoEnterTransaction& aTransaction) {
  mMonitor->AssertCurrentThreadOwns();

  AssertMaybeDeferredCountCorrect();
  if (mMaybeDeferredPendingCount == 0) {
    return;
  }

  IPC_LOG("ProcessPendingRequests for seqno=%d, xid=%d",
          aTransaction.SequenceNumber(), aTransaction.TransactionID());

  // Loop until there aren't any more nested messages to process.
  for (;;) {
    // If we canceled during ProcessPendingRequest, then we need to leave
    // immediately because the results of ShouldDeferMessage will be
    // operating with weird state (as if no Send is in progress). That could
    // cause even NOT_NESTED sync messages to be processed (but not
    // NOT_NESTED async messages), which would break message ordering.
    if (aTransaction.IsCanceled()) {
      return;
    }

    Vector<UniquePtr<Message>> toProcess;

    for (MessageTask* p = mPending.getFirst(); p;) {
      p->AssertMonitorHeld(*mMonitor);
      UniquePtr<Message>& msg = p->Msg();

      MOZ_RELEASE_ASSERT(!aTransaction.IsCanceled(),
                         "Calling ShouldDeferMessage when cancelled");
      bool defer = ShouldDeferMessage(*msg);

      // Only log the interesting messages.
      if (msg->is_sync() ||
          msg->nested_level() == IPC::Message::NESTED_INSIDE_CPOW) {
        IPC_LOG("ShouldDeferMessage(seqno=%d) = %d", msg->seqno(), defer);
      }

      if (!defer) {
        MOZ_ASSERT(!IsAlwaysDeferred(*msg));

        if (!toProcess.append(std::move(msg))) MOZ_CRASH();

        mMaybeDeferredPendingCount--;

        p = p->removeAndGetNext();
        continue;
      }
      p = p->getNext();
    }

    if (toProcess.empty()) {
      break;
    }

    // Processing these messages could result in more messages, so we
    // loop around to check for more afterwards.

    for (auto& msg : toProcess) {
      ProcessPendingRequest(aProxy, std::move(msg));
    }
  }

  AssertMaybeDeferredCountCorrect();
}

bool MessageChannel::Send(UniquePtr<Message> aMsg, UniquePtr<Message>* aReply) {
  mozilla::TimeStamp start = TimeStamp::Now();
  if (aMsg->size() >= kMinTelemetryMessageSize) {
    Telemetry::Accumulate(Telemetry::IPC_MESSAGE_SIZE2, aMsg->size());
  }

  // Sanity checks.
  AssertWorkerThread();
  mMonitor->AssertNotCurrentThreadOwns();
  MOZ_RELEASE_ASSERT(!mIsSameThreadChannel,
                     "sync send over same-thread channel will deadlock!");

  RefPtr<ActorLifecycleProxy> proxy = Listener()->GetLifecycleProxy();

#ifdef OS_WIN
  SyncStackFrame frame(this);
  NeuteredWindowRegion neuteredRgn(mFlags &
                                   REQUIRE_DEFERRED_MESSAGE_PROTECTION);
#endif

  AutoSetValue<bool> setOnCxxStack(mOnCxxStack, true);

  MonitorAutoLock lock(*mMonitor);

  if (mTimedOutMessageSeqno) {
    // Don't bother sending another sync message if a previous one timed out
    // and we haven't received a reply for it. Once the original timed-out
    // message receives a reply, we'll be able to send more sync messages
    // again.
    IPC_LOG("Send() failed due to previous timeout");
    mLastSendError = SyncSendError::PreviousTimeout;
    return false;
  }

  if (DispatchingSyncMessageNestedLevel() == IPC::Message::NOT_NESTED &&
      aMsg->nested_level() > IPC::Message::NOT_NESTED) {
    // Don't allow sending CPOWs while we're dispatching a sync message.
    IPC_LOG("Nested level forbids send");
    mLastSendError = SyncSendError::SendingCPOWWhileDispatchingSync;
    return false;
  }

  if (DispatchingSyncMessageNestedLevel() == IPC::Message::NESTED_INSIDE_CPOW ||
      DispatchingAsyncMessageNestedLevel() ==
          IPC::Message::NESTED_INSIDE_CPOW) {
    // Generally only the parent dispatches urgent messages. And the only
    // sync messages it can send are NESTED_INSIDE_SYNC. Mainly we want to
    // ensure here that we don't return false for non-CPOW messages.
    MOZ_RELEASE_ASSERT(aMsg->nested_level() ==
                       IPC::Message::NESTED_INSIDE_SYNC);
    IPC_LOG("Sending while dispatching urgent message");
    mLastSendError = SyncSendError::SendingCPOWWhileDispatchingUrgent;
    return false;
  }

  if (aMsg->nested_level() < DispatchingSyncMessageNestedLevel() ||
      aMsg->nested_level() < AwaitingSyncReplyNestedLevel()) {
    MOZ_RELEASE_ASSERT(DispatchingSyncMessage() || DispatchingAsyncMessage());
    IPC_LOG("Cancel from Send");
    auto cancel =
        MakeUnique<CancelMessage>(CurrentNestedInsideSyncTransaction());
    CancelTransaction(CurrentNestedInsideSyncTransaction());
    SendMessageToLink(std::move(cancel));
  }

  IPC_ASSERT(aMsg->is_sync(), "can only Send() sync messages here");

  IPC_ASSERT(aMsg->nested_level() >= DispatchingSyncMessageNestedLevel(),
             "can't send sync message of a lesser nested level than what's "
             "being dispatched");
  IPC_ASSERT(AwaitingSyncReplyNestedLevel() <= aMsg->nested_level(),
             "nested sync message sends must be of increasing nested level");
  IPC_ASSERT(
      DispatchingSyncMessageNestedLevel() != IPC::Message::NESTED_INSIDE_CPOW,
      "not allowed to send messages while dispatching urgent messages");

  IPC_ASSERT(
      DispatchingAsyncMessageNestedLevel() != IPC::Message::NESTED_INSIDE_CPOW,
      "not allowed to send messages while dispatching urgent messages");

  if (!Connected()) {
    ReportConnectionError("SendAndWait", aMsg->type());
    mLastSendError = SyncSendError::NotConnectedBeforeSend;
    return false;
  }

  aMsg->set_seqno(NextSeqno());

  int32_t seqno = aMsg->seqno();
  int nestedLevel = aMsg->nested_level();
  msgid_t replyType = aMsg->type() + 1;

  AutoEnterTransaction* stackTop = mTransactionStack;

  // If the most recent message on the stack is NESTED_INSIDE_SYNC, then our
  // message should nest inside that and we use the same transaction
  // ID. Otherwise we need a new transaction ID (so we use the seqno of the
  // message we're sending).
  bool nest =
      stackTop && stackTop->NestedLevel() == IPC::Message::NESTED_INSIDE_SYNC;
  int32_t transaction = nest ? stackTop->TransactionID() : seqno;
  aMsg->set_transaction_id(transaction);

  AutoEnterTransaction transact(this, seqno, transaction, nestedLevel);

  IPC_LOG("Send seqno=%d, xid=%d", seqno, transaction);

  // aMsg will be destroyed soon, let's keep its type.
  const char* msgName = aMsg->name();
  const msgid_t msgType = aMsg->type();

  AddProfilerMarker(*aMsg, MessageDirection::eSending);
  SendMessageToLink(std::move(aMsg));

  while (true) {
    MOZ_RELEASE_ASSERT(!transact.IsCanceled());
    ProcessPendingRequests(proxy, transact);
    if (transact.IsComplete()) {
      break;
    }
    if (!Connected()) {
      ReportConnectionError("Send", msgType);
      mLastSendError = SyncSendError::DisconnectedDuringSend;
      return false;
    }

    MOZ_RELEASE_ASSERT(!mTimedOutMessageSeqno);
    MOZ_RELEASE_ASSERT(!transact.IsComplete());
    MOZ_RELEASE_ASSERT(mTransactionStack == &transact);

    bool maybeTimedOut = !WaitForSyncNotify();

    if (mListener->NeedArtificialSleep()) {
      MonitorAutoUnlock unlock(*mMonitor);
      mListener->ArtificialSleep();
    }

    if (!Connected()) {
      ReportConnectionError("SendAndWait", msgType);
      mLastSendError = SyncSendError::DisconnectedDuringSend;
      return false;
    }

    if (transact.IsCanceled()) {
      break;
    }

    MOZ_RELEASE_ASSERT(mTransactionStack == &transact);

    // We only time out a message if it initiated a new transaction (i.e.,
    // if neither side has any other message Sends on the stack).
    bool canTimeOut = transact.IsBottom();
    if (maybeTimedOut && canTimeOut && !ShouldContinueFromTimeout()) {
      // Since ShouldContinueFromTimeout drops the lock, we need to
      // re-check all our conditions here. We shouldn't time out if any of
      // these things happen because there won't be a reply to the timed
      // out message in these cases.
      if (transact.IsComplete()) {
        break;
      }

      IPC_LOG("Timing out Send: xid=%d", transaction);

      mTimedOutMessageSeqno = seqno;
      mTimedOutMessageNestedLevel = nestedLevel;
      mLastSendError = SyncSendError::TimedOut;
      return false;
    }

    if (transact.IsCanceled()) {
      break;
    }
  }

  if (transact.IsCanceled()) {
    IPC_LOG("Other side canceled seqno=%d, xid=%d", seqno, transaction);
    mLastSendError = SyncSendError::CancelledAfterSend;
    return false;
  }

  if (transact.IsError()) {
    IPC_LOG("Error: seqno=%d, xid=%d", seqno, transaction);
    mLastSendError = SyncSendError::ReplyError;
    return false;
  }

  uint32_t latencyMs = round((TimeStamp::Now() - start).ToMilliseconds());
  IPC_LOG("Got reply: seqno=%d, xid=%d, msgName=%s, latency=%ums", seqno,
          transaction, msgName, latencyMs);

  UniquePtr<Message> reply = transact.GetReply();

  MOZ_RELEASE_ASSERT(reply);
  MOZ_RELEASE_ASSERT(reply->is_reply(), "expected reply");
  MOZ_RELEASE_ASSERT(!reply->is_reply_error());
  MOZ_RELEASE_ASSERT(reply->seqno() == seqno);
  MOZ_RELEASE_ASSERT(reply->type() == replyType, "wrong reply type");
  MOZ_RELEASE_ASSERT(reply->is_sync());

  AddProfilerMarker(*reply, MessageDirection::eReceiving);

  if (reply->size() >= kMinTelemetryMessageSize) {
    Telemetry::Accumulate(Telemetry::IPC_REPLY_SIZE,
                          nsDependentCString(msgName), reply->size());
  }

  *aReply = std::move(reply);

  // NOTE: Only collect IPC_SYNC_MAIN_LATENCY_MS on the main thread (bug
  // 1343729)
  if (NS_IsMainThread() && latencyMs >= kMinTelemetrySyncIPCLatencyMs) {
    Telemetry::Accumulate(Telemetry::IPC_SYNC_MAIN_LATENCY_MS,
                          nsDependentCString(msgName), latencyMs);
  }
  return true;
}

bool MessageChannel::HasPendingEvents() {
  AssertWorkerThread();
  mMonitor->AssertCurrentThreadOwns();
  return ConnectedOrClosing() && !mPending.isEmpty();
}

bool MessageChannel::ProcessPendingRequest(ActorLifecycleProxy* aProxy,
                                           UniquePtr<Message> aUrgent) {
  AssertWorkerThread();
  mMonitor->AssertCurrentThreadOwns();

  IPC_LOG("Process pending: seqno=%d, xid=%d", aUrgent->seqno(),
          aUrgent->transaction_id());

  // keep the error relevant information
  msgid_t msgType = aUrgent->type();

  DispatchMessage(aProxy, std::move(aUrgent));
  if (!ConnectedOrClosing()) {
    ReportConnectionError("ProcessPendingRequest", msgType);
    return false;
  }

  return true;
}

bool MessageChannel::ShouldRunMessage(const Message& aMsg) {
  if (!mTimedOutMessageSeqno) {
    return true;
  }

  // If we've timed out a message and we're awaiting the reply to the timed
  // out message, we have to be careful what messages we process. Here's what
  // can go wrong:
  // 1. child sends a NOT_NESTED sync message S
  // 2. parent sends a NESTED_INSIDE_SYNC sync message H at the same time
  // 3. parent times out H
  // 4. child starts processing H and sends a NESTED_INSIDE_SYNC message H'
  //    nested within the same transaction
  // 5. parent dispatches S and sends reply
  // 6. child asserts because it instead expected a reply to H'.
  //
  // To solve this, we refuse to process S in the parent until we get a reply
  // to H. More generally, let the timed out message be M. We don't process a
  // message unless the child would need the response to that message in order
  // to process M. Those messages are the ones that have a higher nested level
  // than M or that are part of the same transaction as M.
  if (aMsg.nested_level() < mTimedOutMessageNestedLevel ||
      (aMsg.nested_level() == mTimedOutMessageNestedLevel &&
       aMsg.transaction_id() != mTimedOutMessageSeqno)) {
    return false;
  }

  return true;
}

void MessageChannel::RunMessage(ActorLifecycleProxy* aProxy,
                                MessageTask& aTask) {
  AssertWorkerThread();
  mMonitor->AssertCurrentThreadOwns();
  aTask.AssertMonitorHeld(*mMonitor);

  UniquePtr<Message>& msg = aTask.Msg();

  if (!ConnectedOrClosing()) {
    ReportConnectionError("RunMessage", msg->type());
    return;
  }

  // Check that we're going to run the first message that's valid to run.
#if 0
#  ifdef DEBUG
    for (MessageTask* task : mPending) {
        if (task == &aTask) {
            break;
        }

        MOZ_ASSERT(!ShouldRunMessage(*task->Msg()) ||
                   aTask.Msg()->priority() != task->Msg()->priority());

    }
#  endif
#endif

  if (!ShouldRunMessage(*msg)) {
    return;
  }

  MOZ_RELEASE_ASSERT(aTask.isInList());
  aTask.remove();

  if (!IsAlwaysDeferred(*msg)) {
    mMaybeDeferredPendingCount--;
  }

  DispatchMessage(aProxy, std::move(msg));
}

NS_IMPL_ISUPPORTS_INHERITED(MessageChannel::MessageTask, CancelableRunnable,
                            nsIRunnablePriority, nsIRunnableIPCMessageType)

static uint32_t ToRunnablePriority(IPC::Message::PriorityValue aPriority) {
  switch (aPriority) {
    case IPC::Message::NORMAL_PRIORITY:
      return nsIRunnablePriority::PRIORITY_NORMAL;
    case IPC::Message::INPUT_PRIORITY:
      return nsIRunnablePriority::PRIORITY_INPUT_HIGH;
    case IPC::Message::VSYNC_PRIORITY:
      return nsIRunnablePriority::PRIORITY_VSYNC;
    case IPC::Message::MEDIUMHIGH_PRIORITY:
      return nsIRunnablePriority::PRIORITY_MEDIUMHIGH;
    case IPC::Message::CONTROL_PRIORITY:
      return nsIRunnablePriority::PRIORITY_CONTROL;
    default:
      MOZ_ASSERT_UNREACHABLE();
      return nsIRunnablePriority::PRIORITY_NORMAL;
  }
}

MessageChannel::MessageTask::MessageTask(MessageChannel* aChannel,
                                         UniquePtr<Message> aMessage)
    : CancelableRunnable(aMessage->name()),
      mMonitor(aChannel->mMonitor),
      mChannel(aChannel),
      mMessage(std::move(aMessage)),
      mPriority(ToRunnablePriority(mMessage->priority())),
      mScheduled(false)
#ifdef FUZZING_SNAPSHOT
      ,
      mIsFuzzMsg(mMessage->IsFuzzMsg()),
      mFuzzStopped(false)
#endif
{
  MOZ_DIAGNOSTIC_ASSERT(mMessage, "message may not be null");
#ifdef FUZZING_SNAPSHOT
  if (mIsFuzzMsg) {
    MOZ_FUZZING_IPC_MT_CTOR();
  }
#endif
}

MessageChannel::MessageTask::~MessageTask() {
#ifdef FUZZING_SNAPSHOT
  // We track fuzzing messages until their run is complete. To make sure
  // that we don't miss messages that are for some reason destroyed without
  // being run (e.g. canceled), we catch this condition in the destructor.
  if (mIsFuzzMsg && !mFuzzStopped) {
    MOZ_FUZZING_IPC_MT_STOP();
  } else if (!mIsFuzzMsg && !fuzzing::Nyx::instance().started()) {
    MOZ_FUZZING_IPC_PRE_FUZZ_MT_STOP();
  }
#endif
}

nsresult MessageChannel::MessageTask::Run() {
  mMonitor->AssertNotCurrentThreadOwns();

  // Drop the toplevel actor's lifecycle proxy outside of our monitor if we take
  // it, as destroying our ActorLifecycleProxy reference can acquire the
  // monitor.
  RefPtr<ActorLifecycleProxy> proxy;

  MonitorAutoLock lock(*mMonitor);

  // In case we choose not to run this message, we may need to be able to Post
  // it again.
  mScheduled = false;

  if (!isInList()) {
    return NS_OK;
  }

#ifdef FUZZING_SNAPSHOT
  if (!mIsFuzzMsg) {
    if (fuzzing::Nyx::instance().started()) {
      // Once we started fuzzing, prevent non-fuzzing tasks from being
      // run and potentially blocking worker threads.
      //
      // TODO: This currently blocks all MessageTasks from running, not
      // just those belonging to the target process pair. We currently
      // do this for performance reasons, but it should be re-evaluated
      // at a later stage when we found a better snapshot point.
      return NS_OK;
    }
    // Record all running tasks prior to fuzzing, so we can wait for
    // them to settle before snapshotting.
    MOZ_FUZZING_IPC_PRE_FUZZ_MT_RUN();
  }
#endif

  Channel()->AssertWorkerThread();
  mMonitor->AssertSameMonitor(*Channel()->mMonitor);
  proxy = Channel()->Listener()->GetLifecycleProxy();
  Channel()->RunMessage(proxy, *this);

#ifdef FUZZING_SNAPSHOT
  if (mIsFuzzMsg && !mFuzzStopped) {
    MOZ_FUZZING_IPC_MT_STOP();
    mFuzzStopped = true;
  }
#endif
  return NS_OK;
}

// Warning: This method removes the receiver from whatever list it might be in.
nsresult MessageChannel::MessageTask::Cancel() {
  mMonitor->AssertNotCurrentThreadOwns();

  MonitorAutoLock lock(*mMonitor);

  if (!isInList()) {
    return NS_OK;
  }

  Channel()->AssertWorkerThread();
  mMonitor->AssertSameMonitor(*Channel()->mMonitor);
  if (!IsAlwaysDeferred(*Msg())) {
    Channel()->mMaybeDeferredPendingCount--;
  }

  remove();

#ifdef FUZZING_SNAPSHOT
  if (mIsFuzzMsg && !mFuzzStopped) {
    MOZ_FUZZING_IPC_MT_STOP();
    mFuzzStopped = true;
  }
#endif

  return NS_OK;
}

void MessageChannel::MessageTask::Post() {
  mMonitor->AssertCurrentThreadOwns();
  mMonitor->AssertSameMonitor(*Channel()->mMonitor);
  MOZ_RELEASE_ASSERT(!mScheduled);
  MOZ_RELEASE_ASSERT(isInList());

  mScheduled = true;

  Channel()->mWorkerThread->Dispatch(do_AddRef(this));
}

NS_IMETHODIMP
MessageChannel::MessageTask::GetPriority(uint32_t* aPriority) {
  *aPriority = mPriority;
  return NS_OK;
}

NS_IMETHODIMP
MessageChannel::MessageTask::GetType(uint32_t* aType) {
  mMonitor->AssertNotCurrentThreadOwns();

  MonitorAutoLock lock(*mMonitor);
  if (!mMessage) {
    // If mMessage has been moved already elsewhere, we can't know what the type
    // has been.
    return NS_ERROR_FAILURE;
  }

  *aType = mMessage->type();
  return NS_OK;
}

void MessageChannel::DispatchMessage(ActorLifecycleProxy* aProxy,
                                     UniquePtr<Message> aMsg) {
  AssertWorkerThread();
  mMonitor->AssertCurrentThreadOwns();

  Maybe<AutoNoJSAPI> nojsapi;
  if (NS_IsMainThread() && CycleCollectedJSContext::Get()) {
    nojsapi.emplace();
  }

  UniquePtr<Message> reply;

  IPC_LOG("DispatchMessage: seqno=%d, xid=%d", aMsg->seqno(),
          aMsg->transaction_id());
  AddProfilerMarker(*aMsg, MessageDirection::eReceiving);

  {
    AutoEnterTransaction transaction(this, *aMsg);

    int id = aMsg->transaction_id();
    MOZ_RELEASE_ASSERT(!aMsg->is_sync() || id == transaction.TransactionID());

    {
      MonitorAutoUnlock unlock(*mMonitor);
      AutoSetValue<bool> setOnCxxStack(mOnCxxStack, true);

      mListener->ArtificialSleep();

      if (aMsg->is_sync()) {
        DispatchSyncMessage(aProxy, *aMsg, reply);
      } else {
        DispatchAsyncMessage(aProxy, *aMsg);
      }

      mListener->ArtificialSleep();
    }

    if (reply && transaction.IsCanceled()) {
      // The transaction has been canceled. Don't send a reply.
      IPC_LOG("Nulling out reply due to cancellation, seqno=%d, xid=%d",
              aMsg->seqno(), id);
      reply = nullptr;
    }
  }

  if (reply && ChannelConnected == mChannelState) {
    IPC_LOG("Sending reply seqno=%d, xid=%d", aMsg->seqno(),
            aMsg->transaction_id());
    AddProfilerMarker(*reply, MessageDirection::eSending);

    SendMessageToLink(std::move(reply));
  }
}

void MessageChannel::DispatchSyncMessage(ActorLifecycleProxy* aProxy,
                                         const Message& aMsg,
                                         UniquePtr<Message>& aReply) {
  AssertWorkerThread();

  mozilla::TimeStamp start = TimeStamp::Now();

  int nestedLevel = aMsg.nested_level();

  MOZ_RELEASE_ASSERT(nestedLevel == IPC::Message::NOT_NESTED ||
                     NS_IsMainThread());

  MessageChannel* dummy;
  MessageChannel*& blockingVar =
      mSide == ChildSide && NS_IsMainThread() ? gParentProcessBlocker : dummy;

  Result rv;
  {
    AutoSetValue<MessageChannel*> blocked(blockingVar, this);
    rv = aProxy->Get()->OnMessageReceived(aMsg, aReply);
  }

  uint32_t latencyMs = round((TimeStamp::Now() - start).ToMilliseconds());
  if (latencyMs >= kMinTelemetrySyncIPCLatencyMs) {
    Telemetry::Accumulate(Telemetry::IPC_SYNC_RECEIVE_MS,
                          nsDependentCString(aMsg.name()), latencyMs);
  }

  if (!MaybeHandleError(rv, aMsg, "DispatchSyncMessage")) {
    aReply = Message::ForSyncDispatchError(aMsg.nested_level());
  }
  aReply->set_seqno(aMsg.seqno());
  aReply->set_transaction_id(aMsg.transaction_id());
}

void MessageChannel::DispatchAsyncMessage(ActorLifecycleProxy* aProxy,
                                          const Message& aMsg) {
  AssertWorkerThread();
  MOZ_RELEASE_ASSERT(!aMsg.is_sync());

  if (aMsg.routing_id() == MSG_ROUTING_NONE) {
    NS_WARNING("unhandled special message!");
    MaybeHandleError(MsgNotKnown, aMsg, "DispatchAsyncMessage");
    return;
  }

  Result rv;
  {
    int nestedLevel = aMsg.nested_level();
    AutoSetValue<bool> async(mDispatchingAsyncMessage, true);
    AutoSetValue<int> nestedLevelSet(mDispatchingAsyncMessageNestedLevel,
                                     nestedLevel);
    rv = aProxy->Get()->OnMessageReceived(aMsg);
  }
  MaybeHandleError(rv, aMsg, "DispatchAsyncMessage");
}

void MessageChannel::EnqueuePendingMessages() {
  AssertWorkerThread();
  mMonitor->AssertCurrentThreadOwns();

  // XXX performance tuning knob: could process all or k pending
  // messages here, rather than enqueuing for later processing

  RepostAllMessages();
}

bool MessageChannel::WaitResponse(bool aWaitTimedOut) {
  AssertWorkerThread();
  if (aWaitTimedOut) {
    if (mInTimeoutSecondHalf) {
      // We've really timed out this time.
      return false;
    }
    // Try a second time.
    mInTimeoutSecondHalf = true;
  } else {
    mInTimeoutSecondHalf = false;
  }
  return true;
}

#ifndef OS_WIN
bool MessageChannel::WaitForSyncNotify() {
  AssertWorkerThread();
#  ifdef DEBUG
  // WARNING: We don't release the lock here. We can't because the link
  // could signal at this time and we would miss it. Instead we require
  // ArtificialTimeout() to be extremely simple.
  if (mListener->ArtificialTimeout()) {
    return false;
  }
#  endif

  MOZ_RELEASE_ASSERT(!mIsSameThreadChannel,
                     "Wait on same-thread channel will deadlock!");

  TimeDuration timeout = (kNoTimeout == mTimeoutMs)
                             ? TimeDuration::Forever()
                             : TimeDuration::FromMilliseconds(mTimeoutMs);
  CVStatus status = mMonitor->Wait(timeout);

  // If the timeout didn't expire, we know we received an event. The
  // converse is not true.
  return WaitResponse(status == CVStatus::Timeout);
}

void MessageChannel::NotifyWorkerThread() { mMonitor->Notify(); }
#endif

bool MessageChannel::ShouldContinueFromTimeout() {
  AssertWorkerThread();
  mMonitor->AssertCurrentThreadOwns();

  bool cont;
  {
    MonitorAutoUnlock unlock(*mMonitor);
    cont = mListener->ShouldContinueFromReplyTimeout();
    mListener->ArtificialSleep();
  }

  static enum {
    UNKNOWN,
    NOT_DEBUGGING,
    DEBUGGING
  } sDebuggingChildren = UNKNOWN;

  if (sDebuggingChildren == UNKNOWN) {
    sDebuggingChildren =
        getenv("MOZ_DEBUG_CHILD_PROCESS") || getenv("MOZ_DEBUG_CHILD_PAUSE")
            ? DEBUGGING
            : NOT_DEBUGGING;
  }
  if (sDebuggingChildren == DEBUGGING) {
    return true;
  }

  return cont;
}

void MessageChannel::SetReplyTimeoutMs(int32_t aTimeoutMs) {
  // Set channel timeout value. Since this is broken up into
  // two period, the minimum timeout value is 2ms.
  AssertWorkerThread();
  mTimeoutMs =
      (aTimeoutMs <= 0) ? kNoTimeout : (int32_t)ceil((double)aTimeoutMs / 2.0);
}

void MessageChannel::ReportConnectionError(const char* aFunctionName,
                                           const uint32_t aMsgType) const {
  AssertWorkerThread();
  mMonitor->AssertCurrentThreadOwns();

  const char* errorMsg = nullptr;
  switch (mChannelState) {
    case ChannelClosed:
      errorMsg = "Closed channel: cannot send/recv";
      break;
    case ChannelClosing:
      errorMsg = "Channel closing: too late to send, messages will be lost";
      break;
    case ChannelError:
      errorMsg = "Channel error: cannot send/recv";
      break;

    default:
      MOZ_CRASH("unreached");
  }

  // IPC connection errors are fairly common, especially "Channel closing: too
  // late to send/recv, messages will be lost", so shouldn't be being reported
  // on release builds, as that's misleading as to their severity.
  NS_WARNING(nsPrintfCString("IPC Connection Error: [%s][%s] %s(msgname=%s) %s",
                             StringFromIPCSide(mSide), mName, aFunctionName,
                             IPC::StringFromIPCMessageType(aMsgType), errorMsg)
                 .get());

  MonitorAutoUnlock unlock(*mMonitor);
  mListener->ProcessingError(MsgDropped, errorMsg);
}

void MessageChannel::ReportMessageRouteError(const char* channelName) const {
  PrintErrorMessage(mSide, channelName, "Need a route");
  mListener->ProcessingError(MsgRouteError, "MsgRouteError");
}

bool MessageChannel::MaybeHandleError(Result code, const Message& aMsg,
                                      const char* channelName) {
  if (MsgProcessed == code) return true;

  const char* errorMsg = nullptr;
  switch (code) {
    case MsgNotKnown:
      errorMsg = "Unknown message: not processed";
      break;
    case MsgNotAllowed:
      errorMsg = "Message not allowed: cannot be sent/recvd in this state";
      break;
    case MsgPayloadError:
      errorMsg = "Payload error: message could not be deserialized";
      break;
    case MsgProcessingError:
      errorMsg =
          "Processing error: message was deserialized, but the handler "
          "returned false (indicating failure)";
      break;
    case MsgRouteError:
      errorMsg = "Route error: message sent to unknown actor ID";
      break;
    case MsgValueError:
      errorMsg =
          "Value error: message was deserialized, but contained an illegal "
          "value";
      break;

    default:
      MOZ_CRASH("unknown Result code");
      return false;
  }

  char reason[512];
  const char* msgname = aMsg.name();
  if (msgname[0] == '?') {
    SprintfLiteral(reason, "(msgtype=0x%X) %s", aMsg.type(), errorMsg);
  } else {
    SprintfLiteral(reason, "%s %s", msgname, errorMsg);
  }

  PrintErrorMessage(mSide, channelName, reason);

  // Error handled in mozilla::ipc::IPCResult.
  if (code == MsgProcessingError) {
    return false;
  }

  mListener->ProcessingError(code, reason);

  return false;
}

void MessageChannel::OnChannelErrorFromLink() {
  mMonitor->AssertCurrentThreadOwns();
  MOZ_ASSERT(mChannelState == ChannelConnected);

  IPC_LOG("OnChannelErrorFromLink");

  if (AwaitingSyncReply()) {
    NotifyWorkerThread();
  }

  if (mAbortOnError) {
    // mAbortOnError is set by main actors (e.g., ContentChild) to ensure
    // that the process terminates even if normal shutdown is prevented.
    // A MOZ_CRASH() here is not helpful because crash reporting relies
    // on the parent process which we know is dead or otherwise unusable.
    //
    // Additionally, the parent process can (and often is) killed on Android
    // when apps are backgrounded. We don't need to report a crash for
    // normal behavior in that case.
    printf_stderr("Exiting due to channel error.\n");
    ProcessChild::QuickExit();
  }
  mChannelState = ChannelError;
  mMonitor->Notify();

  PostErrorNotifyTask();
}

void MessageChannel::NotifyMaybeChannelError(ReleasableMonitorAutoLock& aLock) {
  AssertWorkerThread();
  mMonitor->AssertCurrentThreadOwns();
  aLock.AssertCurrentThreadOwns();
  MOZ_ASSERT(mChannelState != ChannelConnected);

  if (ChannelClosing == mChannelState || ChannelClosed == mChannelState) {
    // the channel closed, but we received a "Goodbye" message warning us
    // about it. no worries
    mChannelState = ChannelClosed;
    NotifyChannelClosed(aLock);
    return;
  }

  MOZ_ASSERT(ChannelError == mChannelState);

  Clear();

  // IPDL assumes these notifications do not fire twice, so we do not let
  // that happen.
  if (mNotifiedChannelDone) {
    return;
  }
  mNotifiedChannelDone = true;

  // Let our listener know that the channel errored. This may cause the
  // channel to be deleted. Release our caller's `MonitorAutoLock` before
  // invoking the listener, as this may call back into MessageChannel, and/or
  // cause the channel to be destroyed.
  aLock.Unlock();
  mListener->OnChannelError();
}

void MessageChannel::OnNotifyMaybeChannelError() {
  AssertWorkerThread();
  mMonitor->AssertNotCurrentThreadOwns();

  // This lock guard may be reset by `NotifyMaybeChannelError` before invoking
  // listener callbacks which may destroy this `MessageChannel`.
  //
  // Acquiring the lock here also allows us to ensure that
  // `OnChannelErrorFromLink` has finished running before this task is allowed
  // to continue.
  ReleasableMonitorAutoLock lock(*mMonitor);

  mChannelErrorTask = nullptr;

  if (IsOnCxxStack()) {
    // This used to post a 10ms delayed task; however not all
    // nsISerialEventTarget implementations support delayed dispatch.
    // The delay being completely arbitrary, we may not as well have any.
    PostErrorNotifyTask();
    return;
  }

  // This call may destroy `this`.
  NotifyMaybeChannelError(lock);
}

void MessageChannel::PostErrorNotifyTask() {
  mMonitor->AssertCurrentThreadOwns();

  if (mChannelErrorTask) {
    return;
  }

  // This must be the last code that runs on this thread!
  mChannelErrorTask = NewNonOwningCancelableRunnableMethod(
      "ipc::MessageChannel::OnNotifyMaybeChannelError", this,
      &MessageChannel::OnNotifyMaybeChannelError);
  mWorkerThread->Dispatch(do_AddRef(mChannelErrorTask));
}

// Special async message.
class GoodbyeMessage : public IPC::Message {
 public:
  GoodbyeMessage() : IPC::Message(MSG_ROUTING_NONE, GOODBYE_MESSAGE_TYPE) {}
  static bool Read(const Message* msg) { return true; }
  void Log(const std::string& aPrefix, FILE* aOutf) const {
    fputs("(special `Goodbye' message)", aOutf);
  }
};

void MessageChannel::CloseWithError() {
  AssertWorkerThread();

  // This lock guard may be reset by `NotifyMaybeChannelError` before invoking
  // listener callbacks which may destroy this `MessageChannel`.
  ReleasableMonitorAutoLock lock(*mMonitor);

  switch (mChannelState) {
    case ChannelError:
      // Already errored, ensure we notify if we haven't yet.
      NotifyMaybeChannelError(lock);
      return;
    case ChannelClosed:
      // Already closed, we can't do anything.
      return;
    default:
      // Either connected or closing, immediately convert to an error, and
      // notify.
      MOZ_ASSERT(mChannelState == ChannelConnected ||
                 mChannelState == ChannelClosing);
      mLink->Close();
      mChannelState = ChannelError;
      NotifyMaybeChannelError(lock);
      return;
  }
}

void MessageChannel::NotifyImpendingShutdown() {
  UniquePtr<Message> msg =
      MakeUnique<Message>(MSG_ROUTING_NONE, IMPENDING_SHUTDOWN_MESSAGE_TYPE);
  MonitorAutoLock lock(*mMonitor);
  if (Connected()) {
    SendMessageToLink(std::move(msg));
  }
}

void MessageChannel::Close() {
  AssertWorkerThread();
  mMonitor->AssertNotCurrentThreadOwns();

  // This lock guard may be reset by `Notify{ChannelClosed,MaybeChannelError}`
  // before invoking listener callbacks which may destroy this `MessageChannel`.
  ReleasableMonitorAutoLock lock(*mMonitor);

  switch (mChannelState) {
    case ChannelError:
      // See bug 538586: if the listener gets deleted while the
      // IO thread's NotifyChannelError event is still enqueued
      // and subsequently deletes us, then the error event will
      // also be deleted and the listener will never be notified
      // of the channel error.
      NotifyMaybeChannelError(lock);
      return;
    case ChannelClosed:
      // Slightly unexpected but harmless; ignore.  See bug 1554244.
      return;

    default:
      // Notify the other side that we're about to close our socket. If we've
      // already received a Goodbye from the other side (and our state is
      // ChannelClosing), there's no reason to send one.
      if (ChannelConnected == mChannelState) {
        SendMessageToLink(MakeUnique<GoodbyeMessage>());
      }
      mLink->Close();
      mChannelState = ChannelClosed;
      NotifyChannelClosed(lock);
      return;
  }
}

void MessageChannel::NotifyChannelClosed(ReleasableMonitorAutoLock& aLock) {
  AssertWorkerThread();
  mMonitor->AssertCurrentThreadOwns();
  aLock.AssertCurrentThreadOwns();

  if (ChannelClosed != mChannelState) {
    MOZ_CRASH("channel should have been closed!");
  }

  Clear();

  // IPDL assumes these notifications do not fire twice, so we do not let
  // that happen.
  if (mNotifiedChannelDone) {
    return;
  }
  mNotifiedChannelDone = true;

  // Let our listener know that the channel was closed. This may cause the
  // channel to be deleted. Release our caller's `MonitorAutoLock` before
  // invoking the listener, as this may call back into MessageChannel, and/or
  // cause the channel to be destroyed.
  aLock.Unlock();
  mListener->OnChannelClose();
}

void MessageChannel::DebugAbort(const char* file, int line, const char* cond,
                                const char* why, bool reply) {
  AssertWorkerThread();
  mMonitor->AssertCurrentThreadOwns();

  printf_stderr(
      "###!!! [MessageChannel][%s][%s:%d] "
      "Assertion (%s) failed.  %s %s\n",
      mSide == ChildSide ? "Child" : "Parent", file, line, cond, why,
      reply ? "(reply)" : "");

  MessageQueue pending = std::move(mPending);
  while (!pending.isEmpty()) {
    pending.getFirst()->AssertMonitorHeld(*mMonitor);
    printf_stderr("    [ %s%s ]\n",
                  pending.getFirst()->Msg()->is_sync() ? "sync" : "async",
                  pending.getFirst()->Msg()->is_reply() ? "reply" : "");
    pending.popFirst();
  }

  MOZ_CRASH_UNSAFE(why);
}

void MessageChannel::AddProfilerMarker(const IPC::Message& aMessage,
                                       MessageDirection aDirection) {
  mMonitor->AssertCurrentThreadOwns();

  if (profiler_feature_active(ProfilerFeature::IPCMessages)) {
    base::ProcessId pid = mListener->OtherPidMaybeInvalid();
    // Only record markers for IPCs with a valid pid.
    // And if one of the profiler mutexes is locked on this thread, don't record
    // markers, because we don't want to expose profiler IPCs due to the
    // profiler itself, and also to avoid possible re-entrancy issues.
    if (pid != base::kInvalidProcessId &&
        !profiler_is_locked_on_current_thread()) {
      // The current timestamp must be given to the `IPCMarker` payload.
      [[maybe_unused]] const TimeStamp now = TimeStamp::Now();
      bool isThreadBeingProfiled =
          profiler_thread_is_being_profiled_for_markers();
      PROFILER_MARKER(
          "IPC", IPC,
          mozilla::MarkerOptions(
              mozilla::MarkerTiming::InstantAt(now),
              // If the thread is being profiled, add the marker to
              // the current thread. If the thread is not being
              // profiled, add the marker to the main thread. It
              // will appear in the main thread's IPC track. Profiler analysis
              // UI correlates all the IPC markers from different threads and
              // generates processed markers.
              isThreadBeingProfiled ? mozilla::MarkerThreadId::CurrentThread()
                                    : mozilla::MarkerThreadId::MainThread()),
          IPCMarker, now, now, pid, aMessage.seqno(), aMessage.type(), mSide,
          aDirection, MessagePhase::Endpoint, aMessage.is_sync(),
          // aOriginThreadId: If the thread is being profiled, do not include a
          // thread ID, as it's the same as the markers. Only include this field
          // when the marker is being sent from another thread.
          isThreadBeingProfiled ? mozilla::MarkerThreadId{}
                                : mozilla::MarkerThreadId::CurrentThread());
    }
  }
}

void MessageChannel::EndTimeout() {
  mMonitor->AssertCurrentThreadOwns();

  IPC_LOG("Ending timeout of seqno=%d", mTimedOutMessageSeqno);
  mTimedOutMessageSeqno = 0;
  mTimedOutMessageNestedLevel = 0;

  RepostAllMessages();
}

void MessageChannel::RepostAllMessages() {
  mMonitor->AssertCurrentThreadOwns();

  bool needRepost = false;
  for (MessageTask* task : mPending) {
    task->AssertMonitorHeld(*mMonitor);
    if (!task->IsScheduled()) {
      needRepost = true;
      break;
    }
  }
  if (!needRepost) {
    // If everything is already scheduled to run, do nothing.
    return;
  }

  // In some cases we may have deferred dispatch of some messages in the
  // queue. Now we want to run them again. However, we can't just re-post
  // those messages since the messages after them in mPending would then be
  // before them in the event queue. So instead we cancel everything and
  // re-post all messages in the correct order.
  MessageQueue queue = std::move(mPending);
  while (RefPtr<MessageTask> task = queue.popFirst()) {
    task->AssertMonitorHeld(*mMonitor);
    RefPtr<MessageTask> newTask = new MessageTask(this, std::move(task->Msg()));
    newTask->AssertMonitorHeld(*mMonitor);
    mPending.insertBack(newTask);
    newTask->Post();
  }

  AssertMaybeDeferredCountCorrect();
}

void MessageChannel::CancelTransaction(int transaction) {
  mMonitor->AssertCurrentThreadOwns();

  // When we cancel a transaction, we need to behave as if there's no longer
  // any IPC on the stack. Anything we were dispatching or sending will get
  // canceled. Consequently, we have to update the state variables below.
  //
  // We also need to ensure that when any IPC functions on the stack return,
  // they don't reset these values using an RAII class like AutoSetValue. To
  // avoid that, these RAII classes check if the variable they set has been
  // tampered with (by us). If so, they don't reset the variable to the old
  // value.

  IPC_LOG("CancelTransaction: xid=%d", transaction);

  // An unusual case: We timed out a transaction which the other side then
  // cancelled. In this case we just leave the timedout state and try to
  // forget this ever happened.
  if (transaction == mTimedOutMessageSeqno) {
    IPC_LOG("Cancelled timed out message %d", mTimedOutMessageSeqno);
    EndTimeout();

    // Normally mCurrentTransaction == 0 here. But it can be non-zero if:
    // 1. Parent sends NESTED_INSIDE_SYNC message H.
    // 2. Parent times out H.
    // 3. Child dispatches H and sends nested message H' (same transaction).
    // 4. Parent dispatches H' and cancels.
    MOZ_RELEASE_ASSERT(!mTransactionStack ||
                       mTransactionStack->TransactionID() == transaction);
    if (mTransactionStack) {
      mTransactionStack->Cancel();
    }
  } else {
    MOZ_RELEASE_ASSERT(mTransactionStack->TransactionID() == transaction);
    mTransactionStack->Cancel();
  }

  bool foundSync = false;
  for (MessageTask* p = mPending.getFirst(); p;) {
    p->AssertMonitorHeld(*mMonitor);
    UniquePtr<Message>& msg = p->Msg();

    // If there was a race between the parent and the child, then we may
    // have a queued sync message. We want to drop this message from the
    // queue since if will get cancelled along with the transaction being
    // cancelled. This happens if the message in the queue is
    // NESTED_INSIDE_SYNC.
    if (msg->is_sync() && msg->nested_level() != IPC::Message::NOT_NESTED) {
      MOZ_RELEASE_ASSERT(!foundSync);
      MOZ_RELEASE_ASSERT(msg->transaction_id() != transaction);
      IPC_LOG("Removing msg from queue seqno=%d xid=%d", msg->seqno(),
              msg->transaction_id());
      foundSync = true;
      if (!IsAlwaysDeferred(*msg)) {
        mMaybeDeferredPendingCount--;
      }
      p = p->removeAndGetNext();
      continue;
    }

    p = p->getNext();
  }

  AssertMaybeDeferredCountCorrect();
}

void MessageChannel::CancelCurrentTransaction() {
  MonitorAutoLock lock(*mMonitor);
  if (DispatchingSyncMessageNestedLevel() >= IPC::Message::NESTED_INSIDE_SYNC) {
    if (DispatchingSyncMessageNestedLevel() ==
            IPC::Message::NESTED_INSIDE_CPOW ||
        DispatchingAsyncMessageNestedLevel() ==
            IPC::Message::NESTED_INSIDE_CPOW) {
      mListener->IntentionalCrash();
    }

    IPC_LOG("Cancel requested: current xid=%d",
            CurrentNestedInsideSyncTransaction());
    MOZ_RELEASE_ASSERT(DispatchingSyncMessage());
    auto cancel =
        MakeUnique<CancelMessage>(CurrentNestedInsideSyncTransaction());
    CancelTransaction(CurrentNestedInsideSyncTransaction());
    SendMessageToLink(std::move(cancel));
  }
}

void CancelCPOWs() {
  MOZ_ASSERT(NS_IsMainThread());

  if (gParentProcessBlocker) {
    mozilla::Telemetry::Accumulate(mozilla::Telemetry::IPC_TRANSACTION_CANCEL,
                                   true);
    gParentProcessBlocker->CancelCurrentTransaction();
  }
}

bool MessageChannel::IsCrossProcess() const {
  mMonitor->AssertCurrentThreadOwns();
  return mIsCrossProcess;
}

void MessageChannel::SetIsCrossProcess(bool aIsCrossProcess) {
  mMonitor->AssertCurrentThreadOwns();
  if (aIsCrossProcess == mIsCrossProcess) {
    return;
  }
  mIsCrossProcess = aIsCrossProcess;
  if (mIsCrossProcess) {
    ChannelCountReporter::Increment(mName);
  } else {
    ChannelCountReporter::Decrement(mName);
  }
}

NS_IMPL_ISUPPORTS(MessageChannel::WorkerTargetShutdownTask,
                  nsITargetShutdownTask)

MessageChannel::WorkerTargetShutdownTask::WorkerTargetShutdownTask(
    nsISerialEventTarget* aTarget, MessageChannel* aChannel)
    : mTarget(aTarget), mChannel(aChannel) {}

void MessageChannel::WorkerTargetShutdownTask::TargetShutdown() {
  MOZ_RELEASE_ASSERT(mTarget->IsOnCurrentThread());
  IPC_LOG("Closing channel due to event target shutdown");
  if (MessageChannel* channel = std::exchange(mChannel, nullptr)) {
    channel->Close();
  }
}

void MessageChannel::WorkerTargetShutdownTask::Clear() {
  MOZ_RELEASE_ASSERT(mTarget->IsOnCurrentThread());
  mChannel = nullptr;
}

NS_IMPL_ISUPPORTS_INHERITED0(MessageChannel::FlushLazySendMessagesRunnable,
                             CancelableRunnable)

MessageChannel::FlushLazySendMessagesRunnable::FlushLazySendMessagesRunnable(
    MessageChannel* aChannel)
    : CancelableRunnable("MessageChannel::FlushLazyMessagesRunnable"),
      mChannel(aChannel) {}

NS_IMETHODIMP MessageChannel::FlushLazySendMessagesRunnable::Run() {
  if (mChannel) {
    MonitorAutoLock lock(*mChannel->mMonitor);
    MOZ_ASSERT(mChannel->mFlushLazySendTask == this);
    mChannel->FlushLazySendMessages();
  }
  return NS_OK;
}

nsresult MessageChannel::FlushLazySendMessagesRunnable::Cancel() {
  mQueue.Clear();
  mChannel = nullptr;
  return NS_OK;
}

void MessageChannel::FlushLazySendMessagesRunnable::PushMessage(
    UniquePtr<Message> aMsg) {
  MOZ_ASSERT(mChannel);
  mQueue.AppendElement(std::move(aMsg));
}

nsTArray<UniquePtr<IPC::Message>>
MessageChannel::FlushLazySendMessagesRunnable::TakeMessages() {
  mChannel = nullptr;
  return std::move(mQueue);
}

}  // namespace ipc
}  // namespace mozilla