diff options
Diffstat (limited to 'toolkit/components/extensions/webrequest/StreamFilterParent.cpp')
-rw-r--r-- | toolkit/components/extensions/webrequest/StreamFilterParent.cpp | 850 |
1 files changed, 850 insertions, 0 deletions
diff --git a/toolkit/components/extensions/webrequest/StreamFilterParent.cpp b/toolkit/components/extensions/webrequest/StreamFilterParent.cpp new file mode 100644 index 0000000000..467616ed0f --- /dev/null +++ b/toolkit/components/extensions/webrequest/StreamFilterParent.cpp @@ -0,0 +1,850 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* vim: set ts=8 sts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "StreamFilterParent.h" + +#include "HttpChannelChild.h" +#include "mozilla/ExtensionPolicyService.h" +#include "mozilla/Unused.h" +#include "mozilla/dom/ContentParent.h" +#include "mozilla/net/ChannelEventQueue.h" +#include "mozilla/StaticPrefs_extensions.h" +#include "mozilla/Try.h" +#include "nsHttpChannel.h" +#include "nsIChannel.h" +#include "nsIInputStream.h" +#include "nsITraceableChannel.h" +#include "nsProxyRelease.h" +#include "nsQueryObject.h" +#include "nsSocketTransportService2.h" +#include "nsStringStream.h" +#include "mozilla/net/DocumentChannelChild.h" +#include "nsIViewSourceChannel.h" + +namespace mozilla { +namespace extensions { + +/***************************************************************************** + * Event queueing helpers + *****************************************************************************/ + +using net::ChannelEvent; +using net::ChannelEventQueue; + +namespace { + +// Define some simple ChannelEvent sub-classes that store the appropriate +// EventTarget and delegate their Run methods to a wrapped Runnable or lambda +// function. + +class ChannelEventWrapper : public ChannelEvent { + public: + ChannelEventWrapper(nsIEventTarget* aTarget) : mTarget(aTarget) {} + + already_AddRefed<nsIEventTarget> GetEventTarget() override { + return do_AddRef(mTarget); + } + + protected: + ~ChannelEventWrapper() override = default; + + private: + nsCOMPtr<nsIEventTarget> mTarget; +}; + +class ChannelEventFunction final : public ChannelEventWrapper { + public: + ChannelEventFunction(nsIEventTarget* aTarget, std::function<void()>&& aFunc) + : ChannelEventWrapper(aTarget), mFunc(std::move(aFunc)) {} + + void Run() override { mFunc(); } + + protected: + ~ChannelEventFunction() override = default; + + private: + std::function<void()> mFunc; +}; + +class ChannelEventRunnable final : public ChannelEventWrapper { + public: + ChannelEventRunnable(nsIEventTarget* aTarget, + already_AddRefed<Runnable> aRunnable) + : ChannelEventWrapper(aTarget), mRunnable(aRunnable) {} + + void Run() override { + nsresult rv = mRunnable->Run(); + Unused << NS_WARN_IF(NS_FAILED(rv)); + } + + protected: + ~ChannelEventRunnable() override = default; + + private: + RefPtr<Runnable> mRunnable; +}; + +} // anonymous namespace + +/***************************************************************************** + * Initialization + *****************************************************************************/ + +StreamFilterParent::StreamFilterParent() + : mMainThread(GetCurrentSerialEventTarget()), + mIOThread(mMainThread), + mQueue(new ChannelEventQueue(static_cast<nsIStreamListener*>(this))), + mBufferMutex("StreamFilter buffer mutex"), + mReceivedStop(false), + mSentStop(false), + mContext(nullptr), + mOffset(0), + mState(State::Uninitialized) {} + +StreamFilterParent::~StreamFilterParent() { + NS_ReleaseOnMainThread("StreamFilterParent::mChannel", mChannel.forget()); + NS_ReleaseOnMainThread("StreamFilterParent::mLoadGroup", mLoadGroup.forget()); + NS_ReleaseOnMainThread("StreamFilterParent::mOrigListener", + mOrigListener.forget()); + NS_ReleaseOnMainThread("StreamFilterParent::mContext", mContext.forget()); + mQueue->NotifyReleasingOwner(); +} + +auto StreamFilterParent::Create(dom::ContentParent* aContentParent, + uint64_t aChannelId, const nsAString& aAddonId) + -> RefPtr<ChildEndpointPromise> { + AssertIsMainThread(); + + auto& webreq = WebRequestService::GetSingleton(); + + RefPtr<nsAtom> addonId = NS_Atomize(aAddonId); + nsCOMPtr<nsITraceableChannel> channel = + webreq.GetTraceableChannel(aChannelId, addonId, aContentParent); + + RefPtr<mozilla::net::nsHttpChannel> chan = do_QueryObject(channel); + if (!chan) { + return ChildEndpointPromise::CreateAndReject(false, __func__); + } + + nsCOMPtr<nsIChannel> genChan(do_QueryInterface(channel)); + if (!StaticPrefs::extensions_filterResponseServiceWorkerScript_disabled() && + ChannelWrapper::IsServiceWorkerScript(genChan)) { + RefPtr<extensions::WebExtensionPolicy> addonPolicy = + ExtensionPolicyService::GetSingleton().GetByID(aAddonId); + + if (!addonPolicy || + !addonPolicy->HasPermission( + nsGkAtoms::webRequestFilterResponse_serviceWorkerScript)) { + return ChildEndpointPromise::CreateAndReject(false, __func__); + } + } + + // Disable alt-data for extension stream listeners. + nsCOMPtr<nsIHttpChannelInternal> internal(do_QueryObject(channel)); + internal->DisableAltDataCache(); + + return chan->AttachStreamFilter(); +} + +/* static */ +void StreamFilterParent::Attach(nsIChannel* aChannel, + ParentEndpoint&& aEndpoint) { + auto self = MakeRefPtr<StreamFilterParent>(); + + self->ActorThread()->Dispatch( + NewRunnableMethod<ParentEndpoint&&>("StreamFilterParent::Bind", self, + &StreamFilterParent::Bind, + std::move(aEndpoint)), + NS_DISPATCH_NORMAL); + + // If aChannel is a HttpChannelChild, ask HttpChannelChild to hold a weak + // reference on this StreamFilterParent. Such that HttpChannelChild has a + // chance to disconnect this StreamFilterParent if internal redirection + // happens, i.e. ServiceWorker fallback redirection. + RefPtr<net::HttpChannelChild> channelChild = do_QueryObject(aChannel); + if (channelChild) { + channelChild->RegisterStreamFilter(self); + } + + self->Init(aChannel); +} + +void StreamFilterParent::Disconnect(const nsACString& aReason) { + AssertIsMainThread(); + MOZ_DIAGNOSTIC_ASSERT(mBeforeOnStartRequest); + + mDisconnected = true; + + nsAutoCString reason(aReason); + + RefPtr<StreamFilterParent> self(this); + RunOnActorThread(FUNC, [self, reason] { + if (self->IPCActive()) { + self->mState = State::Disconnected; + self->CheckResult(self->SendError(reason)); + } + }); +} + +void StreamFilterParent::Bind(ParentEndpoint&& aEndpoint) { + aEndpoint.Bind(this); +} + +void StreamFilterParent::Init(nsIChannel* aChannel) { + mChannel = aChannel; + + nsCOMPtr<nsITraceableChannel> traceable = do_QueryInterface(aChannel); + if (MOZ_UNLIKELY(!traceable)) { + // nsViewSourceChannel is not nsITraceableChannel, but wraps one. Unwrap it. + nsCOMPtr<nsIViewSourceChannel> vsc = do_QueryInterface(aChannel); + if (vsc) { + traceable = do_QueryObject(vsc->GetInnerChannel()); + // OnStartRequest etc. is passed the unwrapped channel, so update mChannel + // to prevent OnStartRequest from mistaking it for a redirect, which would + // close the filter. + mChannel = do_QueryObject(traceable); + } + // TODO bug 1683403: Replace assertion; Close StreamFilter instead. + MOZ_RELEASE_ASSERT(traceable); + } + + nsresult rv = + traceable->SetNewListener(this, /* aMustApplyContentConversion = */ true, + getter_AddRefs(mOrigListener)); + MOZ_RELEASE_ASSERT(NS_SUCCEEDED(rv)); +} + +/***************************************************************************** + * nsIThreadRetargetableStreamListener + *****************************************************************************/ + +NS_IMETHODIMP +StreamFilterParent::CheckListenerChain() { + AssertIsMainThread(); + + nsCOMPtr<nsIThreadRetargetableStreamListener> trsl = + do_QueryInterface(mOrigListener); + if (trsl) { + return trsl->CheckListenerChain(); + } + return NS_ERROR_FAILURE; +} + +NS_IMETHODIMP +StreamFilterParent::OnDataFinished(nsresult aStatus) { + AssertIsIOThread(); + + // Forwarding onDataFinished to the mOriginListener when: + // - the StreamFilter is already disconnected + // - it does not have any buffered data which would still need + // to be sent to the mOrigListener and we have + // - we have not yet called mOrigListener OnStopRequest method. + if (!mDisconnected || !mBufferedData.isEmpty() || mSentStop) { + return NS_OK; + } + + nsCOMPtr<nsIThreadRetargetableStreamListener> listener = + do_QueryInterface(mOrigListener); + + if (listener) { + return listener->OnDataFinished(aStatus); + } + + return NS_OK; +} + +/***************************************************************************** + * Error handling + *****************************************************************************/ + +void StreamFilterParent::Broken() { + AssertIsActorThread(); + + switch (mState) { + case State::Initialized: + case State::TransferringData: + case State::Suspended: { + mState = State::Disconnecting; + RefPtr<StreamFilterParent> self(this); + RunOnMainThread(FUNC, [=] { + if (self->mChannel) { + self->mChannel->Cancel(NS_ERROR_FAILURE); + } + }); + + FinishDisconnect(); + } break; + + default: + break; + } +} + +/***************************************************************************** + * State change requests + *****************************************************************************/ + +IPCResult StreamFilterParent::RecvClose() { + AssertIsActorThread(); + + mState = State::Closed; + + if (!mSentStop) { + RefPtr<StreamFilterParent> self(this); + RunOnMainThread(FUNC, [=] { + nsresult rv = self->EmitStopRequest(NS_OK); + Unused << NS_WARN_IF(NS_FAILED(rv)); + }); + } + + Unused << SendClosed(); + Destroy(); + return IPC_OK(); +} + +void StreamFilterParent::Destroy() { + // Close the channel asynchronously so the actor is never destroyed before + // this message is fully processed. + ActorThread()->Dispatch(NewRunnableMethod("StreamFilterParent::Close", this, + &StreamFilterParent::Close), + NS_DISPATCH_NORMAL); +} + +IPCResult StreamFilterParent::RecvDestroy() { + AssertIsActorThread(); + Destroy(); + return IPC_OK(); +} + +IPCResult StreamFilterParent::RecvSuspend() { + AssertIsActorThread(); + + if (mState == State::TransferringData) { + RefPtr<StreamFilterParent> self(this); + RunOnMainThread(FUNC, [=] { + self->mChannel->Suspend(); + + RunOnActorThread(FUNC, [=] { + if (self->IPCActive()) { + self->mState = State::Suspended; + self->CheckResult(self->SendSuspended()); + } + }); + }); + } + return IPC_OK(); +} + +IPCResult StreamFilterParent::RecvResume() { + AssertIsActorThread(); + + if (mState == State::Suspended) { + // Change state before resuming so incoming data is handled correctly + // immediately after resuming. + mState = State::TransferringData; + + RefPtr<StreamFilterParent> self(this); + RunOnMainThread(FUNC, [=] { + self->mChannel->Resume(); + + RunOnActorThread(FUNC, [=] { + if (self->IPCActive()) { + self->CheckResult(self->SendResumed()); + } + }); + }); + } + return IPC_OK(); +} +IPCResult StreamFilterParent::RecvDisconnect() { + AssertIsActorThread(); + + if (mState == State::Suspended) { + RefPtr<StreamFilterParent> self(this); + RunOnMainThread(FUNC, [=] { self->mChannel->Resume(); }); + } else if (mState != State::TransferringData) { + return IPC_OK(); + } + + mState = State::Disconnecting; + CheckResult(SendFlushData()); + return IPC_OK(); +} + +IPCResult StreamFilterParent::RecvFlushedData() { + AssertIsActorThread(); + + MOZ_ASSERT(mState == State::Disconnecting); + + Destroy(); + + FinishDisconnect(); + return IPC_OK(); +} + +void StreamFilterParent::FinishDisconnect() { + RefPtr<StreamFilterParent> self(this); + RunOnIOThread(FUNC, [=] { + self->FlushBufferedData(); + + RunOnMainThread(FUNC, [=] { + if (self->mReceivedStop && !self->mSentStop) { + nsresult rv = self->EmitStopRequest(NS_OK); + Unused << NS_WARN_IF(NS_FAILED(rv)); + } else if (self->mLoadGroup && !self->mDisconnected) { + Unused << self->mLoadGroup->RemoveRequest(self, nullptr, NS_OK); + } + self->mDisconnected = true; + }); + + RunOnActorThread(FUNC, [=] { + if (self->mState != State::Closed) { + self->mState = State::Disconnected; + } + }); + }); +} + +/***************************************************************************** + * Data output + *****************************************************************************/ + +IPCResult StreamFilterParent::RecvWrite(Data&& aData) { + AssertIsActorThread(); + + RunOnIOThread(NewRunnableMethod<Data&&>("StreamFilterParent::WriteMove", this, + &StreamFilterParent::WriteMove, + std::move(aData))); + return IPC_OK(); +} + +void StreamFilterParent::WriteMove(Data&& aData) { + nsresult rv = Write(aData); + Unused << NS_WARN_IF(NS_FAILED(rv)); +} + +nsresult StreamFilterParent::Write(Data& aData) { + AssertIsIOThread(); + + nsCOMPtr<nsIInputStream> stream; + nsresult rv = NS_NewByteInputStream( + getter_AddRefs(stream), + Span(reinterpret_cast<char*>(aData.Elements()), aData.Length()), + NS_ASSIGNMENT_DEPEND); + NS_ENSURE_SUCCESS(rv, rv); + + rv = + mOrigListener->OnDataAvailable(mChannel, stream, mOffset, aData.Length()); + NS_ENSURE_SUCCESS(rv, rv); + + mOffset += aData.Length(); + return NS_OK; +} + +/***************************************************************************** + * nsIRequest + *****************************************************************************/ + +NS_IMETHODIMP +StreamFilterParent::GetName(nsACString& aName) { + AssertIsMainThread(); + MOZ_ASSERT(mChannel); + return mChannel->GetName(aName); +} + +NS_IMETHODIMP +StreamFilterParent::GetStatus(nsresult* aStatus) { + AssertIsMainThread(); + MOZ_ASSERT(mChannel); + return mChannel->GetStatus(aStatus); +} + +NS_IMETHODIMP +StreamFilterParent::IsPending(bool* aIsPending) { + switch (mState) { + case State::Initialized: + case State::TransferringData: + case State::Suspended: + *aIsPending = true; + break; + default: + *aIsPending = false; + } + return NS_OK; +} + +NS_IMETHODIMP StreamFilterParent::SetCanceledReason(const nsACString& aReason) { + return SetCanceledReasonImpl(aReason); +} + +NS_IMETHODIMP StreamFilterParent::GetCanceledReason(nsACString& aReason) { + return GetCanceledReasonImpl(aReason); +} + +NS_IMETHODIMP StreamFilterParent::CancelWithReason(nsresult aStatus, + const nsACString& aReason) { + return CancelWithReasonImpl(aStatus, aReason); +} + +NS_IMETHODIMP +StreamFilterParent::Cancel(nsresult aResult) { + AssertIsMainThread(); + MOZ_ASSERT(mChannel); + return mChannel->Cancel(aResult); +} + +NS_IMETHODIMP +StreamFilterParent::Suspend() { + AssertIsMainThread(); + MOZ_ASSERT(mChannel); + return mChannel->Suspend(); +} + +NS_IMETHODIMP +StreamFilterParent::Resume() { + AssertIsMainThread(); + MOZ_ASSERT(mChannel); + return mChannel->Resume(); +} + +NS_IMETHODIMP +StreamFilterParent::GetLoadGroup(nsILoadGroup** aLoadGroup) { + *aLoadGroup = mLoadGroup; + return NS_OK; +} + +NS_IMETHODIMP +StreamFilterParent::SetLoadGroup(nsILoadGroup* aLoadGroup) { + return NS_ERROR_NOT_IMPLEMENTED; +} + +NS_IMETHODIMP +StreamFilterParent::GetLoadFlags(nsLoadFlags* aLoadFlags) { + AssertIsMainThread(); + MOZ_ASSERT(mChannel); + MOZ_TRY(mChannel->GetLoadFlags(aLoadFlags)); + *aLoadFlags &= ~nsIChannel::LOAD_DOCUMENT_URI; + return NS_OK; +} + +NS_IMETHODIMP +StreamFilterParent::SetLoadFlags(nsLoadFlags aLoadFlags) { + AssertIsMainThread(); + MOZ_ASSERT(mChannel); + return mChannel->SetLoadFlags(aLoadFlags); +} + +NS_IMETHODIMP +StreamFilterParent::GetTRRMode(nsIRequest::TRRMode* aTRRMode) { + return GetTRRModeImpl(aTRRMode); +} + +NS_IMETHODIMP +StreamFilterParent::SetTRRMode(nsIRequest::TRRMode aTRRMode) { + return SetTRRModeImpl(aTRRMode); +} + +/***************************************************************************** + * nsIStreamListener + *****************************************************************************/ + +NS_IMETHODIMP +StreamFilterParent::OnStartRequest(nsIRequest* aRequest) { + AssertIsMainThread(); + + // Always reset mChannel if aRequest is different. Various calls in + // StreamFilterParent will use mChannel, but aRequest is *always* the + // right channel to use at this point. + // + // For ALL redirections, we will disconnect this listener. Extensions + // will create a new filter if they need it. + mBeforeOnStartRequest = false; + if (aRequest != mChannel) { + nsCOMPtr<nsIChannel> channel = do_QueryInterface(aRequest); + nsCOMPtr<nsILoadInfo> loadInfo = channel ? channel->LoadInfo() : nullptr; + mChannel = channel; + + if (!(loadInfo && + loadInfo->RedirectChainIncludingInternalRedirects().IsEmpty())) { + mDisconnected = true; + mDisconnectedByOnStartRequest = true; + + RefPtr<StreamFilterParent> self(this); + RunOnActorThread(FUNC, [=] { + if (self->IPCActive()) { + self->mState = State::Disconnected; + CheckResult(self->SendError("Channel redirected"_ns)); + } + }); + } + } + + // Check if alterate cached data is being sent, if so we receive un-decoded + // data and we must disconnect the filter and send an error to the extension. + if (!mDisconnected) { + RefPtr<net::HttpBaseChannel> chan = do_QueryObject(aRequest); + if (chan && chan->IsDeliveringAltData()) { + mDisconnected = true; + mDisconnectedByOnStartRequest = true; + + RefPtr<StreamFilterParent> self(this); + RunOnActorThread(FUNC, [=] { + if (self->IPCActive()) { + self->mState = State::Disconnected; + CheckResult( + self->SendError("Channel is delivering cached alt-data"_ns)); + } + }); + } + } + + if (!mDisconnected) { + Unused << mChannel->GetLoadGroup(getter_AddRefs(mLoadGroup)); + if (mLoadGroup) { + Unused << mLoadGroup->AddRequest(this, nullptr); + } + } + + nsresult rv = mOrigListener->OnStartRequest(aRequest); + + // Important: Do this only *after* running the next listener in the chain, so + // that we get the final delivery target after any retargeting that it may do. + if (nsCOMPtr<nsIThreadRetargetableRequest> req = + do_QueryInterface(aRequest)) { + nsCOMPtr<nsISerialEventTarget> thread; + Unused << req->GetDeliveryTarget(getter_AddRefs(thread)); + if (thread) { + mIOThread = std::move(thread); + } + } + + // Important: Do this *after* we have set the thread delivery target, or it is + // possible in rare circumstances for an extension to attempt to write data + // before the thread has been set up, even though there are several layers of + // asynchrony involved. + if (!mDisconnected) { + RefPtr<StreamFilterParent> self(this); + RunOnActorThread(FUNC, [=] { + if (self->IPCActive()) { + self->mState = State::TransferringData; + self->CheckResult(self->SendStartRequest()); + } + }); + } + + return rv; +} + +NS_IMETHODIMP +StreamFilterParent::OnStopRequest(nsIRequest* aRequest, nsresult aStatusCode) { + AssertIsMainThread(); + MOZ_ASSERT(aRequest == mChannel); + + mReceivedStop = true; + if (mDisconnected) { + return EmitStopRequest(aStatusCode); + } + + RefPtr<StreamFilterParent> self(this); + RunOnActorThread(FUNC, [=] { + if (self->IPCActive()) { + self->CheckResult(self->SendStopRequest(aStatusCode)); + } else if (self->mState != State::Disconnecting) { + // If we're currently disconnecting, then we'll emit a stop + // request at the end of that process. Otherwise we need to + // manually emit one here, since we won't be getting a response + // from the child. + RunOnMainThread(FUNC, [=] { + if (!self->mSentStop) { + self->EmitStopRequest(aStatusCode); + } + }); + } + }); + return NS_OK; +} + +nsresult StreamFilterParent::EmitStopRequest(nsresult aStatusCode) { + AssertIsMainThread(); + MOZ_ASSERT(!mSentStop); + + mSentStop = true; + nsresult rv = mOrigListener->OnStopRequest(mChannel, aStatusCode); + + if (mLoadGroup && !mDisconnected) { + Unused << mLoadGroup->RemoveRequest(this, nullptr, aStatusCode); + } + + return rv; +} + +/***************************************************************************** + * Incoming data handling + *****************************************************************************/ + +void StreamFilterParent::DoSendData(Data&& aData) { + AssertIsActorThread(); + + if (mState == State::TransferringData) { + CheckResult(SendData(aData)); + } +} + +NS_IMETHODIMP +StreamFilterParent::OnDataAvailable(nsIRequest* aRequest, + nsIInputStream* aInputStream, + uint64_t aOffset, uint32_t aCount) { + AssertIsIOThread(); + + if (mDisconnectedByOnStartRequest || mState == State::Disconnected) { + // If we're offloading data in a thread pool, it's possible that we'll + // have buffered some additional data while waiting for the buffer to + // flush. So, if there's any buffered data left, flush that before we + // flush this incoming data. + // + // Note: When in the eDisconnected state, the buffer list is guaranteed + // never to be accessed by another thread during an OnDataAvailable call. + if (!mBufferedData.isEmpty()) { + FlushBufferedData(); + } + + mOffset += aCount; + return mOrigListener->OnDataAvailable(aRequest, aInputStream, + mOffset - aCount, aCount); + } + + Data data; + data.SetLength(aCount); + + uint32_t count; + nsresult rv = aInputStream->Read(reinterpret_cast<char*>(data.Elements()), + aCount, &count); + NS_ENSURE_SUCCESS(rv, rv); + NS_ENSURE_TRUE(count == aCount, NS_ERROR_UNEXPECTED); + + if (mState == State::Disconnecting) { + MutexAutoLock al(mBufferMutex); + BufferData(std::move(data)); + } else if (mState == State::Closed) { + return NS_ERROR_FAILURE; + } else { + ActorThread()->Dispatch( + NewRunnableMethod<Data&&>("StreamFilterParent::DoSendData", this, + &StreamFilterParent::DoSendData, + std::move(data)), + NS_DISPATCH_NORMAL); + } + return NS_OK; +} + +nsresult StreamFilterParent::FlushBufferedData() { + AssertIsIOThread(); + + // When offloading data to a thread pool, OnDataAvailable isn't guaranteed + // to always run in the same thread, so it's possible for this function to + // run in parallel with OnDataAvailable. + MutexAutoLock al(mBufferMutex); + + while (!mBufferedData.isEmpty()) { + UniquePtr<BufferedData> data(mBufferedData.popFirst()); + + nsresult rv = Write(data->mData); + NS_ENSURE_SUCCESS(rv, rv); + } + + return NS_OK; +} + +/***************************************************************************** + * Thread helpers + *****************************************************************************/ + +nsIEventTarget* StreamFilterParent::ActorThread() { + return net::gSocketTransportService; +} + +bool StreamFilterParent::IsActorThread() { + return ActorThread()->IsOnCurrentThread(); +} + +void StreamFilterParent::AssertIsActorThread() { MOZ_ASSERT(IsActorThread()); } + +nsISerialEventTarget* StreamFilterParent::IOThread() { return mIOThread; } + +bool StreamFilterParent::IsIOThread() { return mIOThread->IsOnCurrentThread(); } + +void StreamFilterParent::AssertIsIOThread() { MOZ_ASSERT(IsIOThread()); } + +template <typename Function> +void StreamFilterParent::RunOnMainThread(const char* aName, Function&& aFunc) { + mQueue->RunOrEnqueue(new ChannelEventFunction(mMainThread, std::move(aFunc))); +} + +void StreamFilterParent::RunOnMainThread(already_AddRefed<Runnable> aRunnable) { + mQueue->RunOrEnqueue( + new ChannelEventRunnable(mMainThread, std::move(aRunnable))); +} + +template <typename Function> +void StreamFilterParent::RunOnIOThread(const char* aName, Function&& aFunc) { + mQueue->RunOrEnqueue(new ChannelEventFunction(mIOThread, std::move(aFunc))); +} + +void StreamFilterParent::RunOnIOThread(already_AddRefed<Runnable> aRunnable) { + mQueue->RunOrEnqueue( + new ChannelEventRunnable(mIOThread, std::move(aRunnable))); +} + +template <typename Function> +void StreamFilterParent::RunOnActorThread(const char* aName, Function&& aFunc) { + // We don't use mQueue for dispatch to the actor thread. + // + // The main thread and IO thread are used for dispatching events to the + // wrapped stream listener, and those events need to be processed + // consistently, in the order they were dispatched. An event dispatched to the + // main thread can't be run before events that were dispatched to the IO + // thread before it. + // + // Additionally, the IO thread is likely to be a thread pool, which means that + // without thread-safe queuing, it's possible for multiple events dispatched + // to it to be processed in parallel, or out of order. + // + // The actor thread, however, is always a serial event target. Its events are + // always processed in order, and events dispatched to the actor thread are + // independent of the events in the output event queue. + if (IsActorThread()) { + aFunc(); + } else { + ActorThread()->Dispatch(std::move(NS_NewRunnableFunction(aName, aFunc)), + NS_DISPATCH_NORMAL); + } +} + +/***************************************************************************** + * Glue + *****************************************************************************/ + +void StreamFilterParent::ActorDestroy(ActorDestroyReason aWhy) { + AssertIsActorThread(); + + if (mState != State::Disconnected && mState != State::Closed) { + Broken(); + } +} + +NS_INTERFACE_MAP_BEGIN(StreamFilterParent) + NS_INTERFACE_MAP_ENTRY(nsIStreamListener) + NS_INTERFACE_MAP_ENTRY(nsIRequestObserver) + NS_INTERFACE_MAP_ENTRY(nsIRequest) + NS_INTERFACE_MAP_ENTRY(nsIThreadRetargetableStreamListener) + NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIStreamListener) +NS_INTERFACE_MAP_END + +NS_IMPL_ADDREF(StreamFilterParent) +NS_IMPL_RELEASE(StreamFilterParent) + +} // namespace extensions +} // namespace mozilla |