/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim: set ts=8 sts=2 et sw=2 tw=80: */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ #include "StreamFilterParent.h" #include "HttpChannelChild.h" #include "mozilla/ExtensionPolicyService.h" #include "mozilla/Unused.h" #include "mozilla/dom/ContentParent.h" #include "mozilla/net/ChannelEventQueue.h" #include "mozilla/StaticPrefs_extensions.h" #include "mozilla/Try.h" #include "nsHttpChannel.h" #include "nsIChannel.h" #include "nsIInputStream.h" #include "nsITraceableChannel.h" #include "nsProxyRelease.h" #include "nsQueryObject.h" #include "nsSocketTransportService2.h" #include "nsStringStream.h" #include "mozilla/net/DocumentChannelChild.h" #include "nsIViewSourceChannel.h" namespace mozilla { namespace extensions { /***************************************************************************** * Event queueing helpers *****************************************************************************/ using net::ChannelEvent; using net::ChannelEventQueue; namespace { // Define some simple ChannelEvent sub-classes that store the appropriate // EventTarget and delegate their Run methods to a wrapped Runnable or lambda // function. class ChannelEventWrapper : public ChannelEvent { public: ChannelEventWrapper(nsIEventTarget* aTarget) : mTarget(aTarget) {} already_AddRefed GetEventTarget() override { return do_AddRef(mTarget); } protected: ~ChannelEventWrapper() override = default; private: nsCOMPtr mTarget; }; class ChannelEventFunction final : public ChannelEventWrapper { public: ChannelEventFunction(nsIEventTarget* aTarget, std::function&& aFunc) : ChannelEventWrapper(aTarget), mFunc(std::move(aFunc)) {} void Run() override { mFunc(); } protected: ~ChannelEventFunction() override = default; private: std::function mFunc; }; class ChannelEventRunnable final : public ChannelEventWrapper { public: ChannelEventRunnable(nsIEventTarget* aTarget, already_AddRefed aRunnable) : ChannelEventWrapper(aTarget), mRunnable(aRunnable) {} void Run() override { nsresult rv = mRunnable->Run(); Unused << NS_WARN_IF(NS_FAILED(rv)); } protected: ~ChannelEventRunnable() override = default; private: RefPtr mRunnable; }; } // anonymous namespace /***************************************************************************** * Initialization *****************************************************************************/ StreamFilterParent::StreamFilterParent() : mMainThread(GetCurrentSerialEventTarget()), mIOThread(mMainThread), mQueue(new ChannelEventQueue(static_cast(this))), mBufferMutex("StreamFilter buffer mutex"), mReceivedStop(false), mSentStop(false), mContext(nullptr), mOffset(0), mState(State::Uninitialized) {} StreamFilterParent::~StreamFilterParent() { NS_ReleaseOnMainThread("StreamFilterParent::mChannel", mChannel.forget()); NS_ReleaseOnMainThread("StreamFilterParent::mLoadGroup", mLoadGroup.forget()); NS_ReleaseOnMainThread("StreamFilterParent::mOrigListener", mOrigListener.forget()); NS_ReleaseOnMainThread("StreamFilterParent::mContext", mContext.forget()); mQueue->NotifyReleasingOwner(); } auto StreamFilterParent::Create(dom::ContentParent* aContentParent, uint64_t aChannelId, const nsAString& aAddonId) -> RefPtr { AssertIsMainThread(); auto& webreq = WebRequestService::GetSingleton(); RefPtr addonId = NS_Atomize(aAddonId); nsCOMPtr channel = webreq.GetTraceableChannel(aChannelId, addonId, aContentParent); RefPtr chan = do_QueryObject(channel); if (!chan) { return ChildEndpointPromise::CreateAndReject(false, __func__); } nsCOMPtr genChan(do_QueryInterface(channel)); if (!StaticPrefs::extensions_filterResponseServiceWorkerScript_disabled() && ChannelWrapper::IsServiceWorkerScript(genChan)) { RefPtr addonPolicy = ExtensionPolicyService::GetSingleton().GetByID(aAddonId); if (!addonPolicy || !addonPolicy->HasPermission( nsGkAtoms::webRequestFilterResponse_serviceWorkerScript)) { return ChildEndpointPromise::CreateAndReject(false, __func__); } } // Disable alt-data for extension stream listeners. nsCOMPtr internal(do_QueryObject(channel)); internal->DisableAltDataCache(); return chan->AttachStreamFilter(); } /* static */ void StreamFilterParent::Attach(nsIChannel* aChannel, ParentEndpoint&& aEndpoint) { auto self = MakeRefPtr(); self->ActorThread()->Dispatch( NewRunnableMethod("StreamFilterParent::Bind", self, &StreamFilterParent::Bind, std::move(aEndpoint)), NS_DISPATCH_NORMAL); // If aChannel is a HttpChannelChild, ask HttpChannelChild to hold a weak // reference on this StreamFilterParent. Such that HttpChannelChild has a // chance to disconnect this StreamFilterParent if internal redirection // happens, i.e. ServiceWorker fallback redirection. RefPtr channelChild = do_QueryObject(aChannel); if (channelChild) { channelChild->RegisterStreamFilter(self); } self->Init(aChannel); } void StreamFilterParent::Disconnect(const nsACString& aReason) { AssertIsMainThread(); MOZ_DIAGNOSTIC_ASSERT(mBeforeOnStartRequest); mDisconnected = true; nsAutoCString reason(aReason); RefPtr self(this); RunOnActorThread(FUNC, [self, reason] { if (self->IPCActive()) { self->mState = State::Disconnected; self->CheckResult(self->SendError(reason)); } }); } void StreamFilterParent::Bind(ParentEndpoint&& aEndpoint) { aEndpoint.Bind(this); } void StreamFilterParent::Init(nsIChannel* aChannel) { mChannel = aChannel; nsCOMPtr traceable = do_QueryInterface(aChannel); if (MOZ_UNLIKELY(!traceable)) { // nsViewSourceChannel is not nsITraceableChannel, but wraps one. Unwrap it. nsCOMPtr vsc = do_QueryInterface(aChannel); if (vsc) { traceable = do_QueryObject(vsc->GetInnerChannel()); // OnStartRequest etc. is passed the unwrapped channel, so update mChannel // to prevent OnStartRequest from mistaking it for a redirect, which would // close the filter. mChannel = do_QueryObject(traceable); } // TODO bug 1683403: Replace assertion; Close StreamFilter instead. MOZ_RELEASE_ASSERT(traceable); } nsresult rv = traceable->SetNewListener(this, /* aMustApplyContentConversion = */ true, getter_AddRefs(mOrigListener)); MOZ_RELEASE_ASSERT(NS_SUCCEEDED(rv)); } /***************************************************************************** * nsIThreadRetargetableStreamListener *****************************************************************************/ NS_IMETHODIMP StreamFilterParent::CheckListenerChain() { AssertIsMainThread(); nsCOMPtr trsl = do_QueryInterface(mOrigListener); if (trsl) { return trsl->CheckListenerChain(); } return NS_ERROR_FAILURE; } NS_IMETHODIMP StreamFilterParent::OnDataFinished(nsresult aStatus) { AssertIsIOThread(); // Forwarding onDataFinished to the mOriginListener when: // - the StreamFilter is already disconnected // - it does not have any buffered data which would still need // to be sent to the mOrigListener and we have // - we have not yet called mOrigListener OnStopRequest method. if (!mDisconnected || !mBufferedData.isEmpty() || mSentStop) { return NS_OK; } nsCOMPtr listener = do_QueryInterface(mOrigListener); if (listener) { return listener->OnDataFinished(aStatus); } return NS_OK; } /***************************************************************************** * Error handling *****************************************************************************/ void StreamFilterParent::Broken() { AssertIsActorThread(); switch (mState) { case State::Initialized: case State::TransferringData: case State::Suspended: { mState = State::Disconnecting; RefPtr self(this); RunOnMainThread(FUNC, [=] { if (self->mChannel) { self->mChannel->Cancel(NS_ERROR_FAILURE); } }); FinishDisconnect(); } break; default: break; } } /***************************************************************************** * State change requests *****************************************************************************/ IPCResult StreamFilterParent::RecvClose() { AssertIsActorThread(); mState = State::Closed; if (!mSentStop) { RefPtr self(this); RunOnMainThread(FUNC, [=] { nsresult rv = self->EmitStopRequest(NS_OK); Unused << NS_WARN_IF(NS_FAILED(rv)); }); } Unused << SendClosed(); Destroy(); return IPC_OK(); } void StreamFilterParent::Destroy() { // Close the channel asynchronously so the actor is never destroyed before // this message is fully processed. ActorThread()->Dispatch(NewRunnableMethod("StreamFilterParent::Close", this, &StreamFilterParent::Close), NS_DISPATCH_NORMAL); } IPCResult StreamFilterParent::RecvDestroy() { AssertIsActorThread(); Destroy(); return IPC_OK(); } IPCResult StreamFilterParent::RecvSuspend() { AssertIsActorThread(); if (mState == State::TransferringData) { RefPtr self(this); RunOnMainThread(FUNC, [=] { self->mChannel->Suspend(); RunOnActorThread(FUNC, [=] { if (self->IPCActive()) { self->mState = State::Suspended; self->CheckResult(self->SendSuspended()); } }); }); } return IPC_OK(); } IPCResult StreamFilterParent::RecvResume() { AssertIsActorThread(); if (mState == State::Suspended) { // Change state before resuming so incoming data is handled correctly // immediately after resuming. mState = State::TransferringData; RefPtr self(this); RunOnMainThread(FUNC, [=] { self->mChannel->Resume(); RunOnActorThread(FUNC, [=] { if (self->IPCActive()) { self->CheckResult(self->SendResumed()); } }); }); } return IPC_OK(); } IPCResult StreamFilterParent::RecvDisconnect() { AssertIsActorThread(); if (mState == State::Suspended) { RefPtr self(this); RunOnMainThread(FUNC, [=] { self->mChannel->Resume(); }); } else if (mState != State::TransferringData) { return IPC_OK(); } mState = State::Disconnecting; CheckResult(SendFlushData()); return IPC_OK(); } IPCResult StreamFilterParent::RecvFlushedData() { AssertIsActorThread(); MOZ_ASSERT(mState == State::Disconnecting); Destroy(); FinishDisconnect(); return IPC_OK(); } void StreamFilterParent::FinishDisconnect() { RefPtr self(this); RunOnIOThread(FUNC, [=] { self->FlushBufferedData(); RunOnMainThread(FUNC, [=] { if (self->mReceivedStop && !self->mSentStop) { nsresult rv = self->EmitStopRequest(NS_OK); Unused << NS_WARN_IF(NS_FAILED(rv)); } else if (self->mLoadGroup && !self->mDisconnected) { Unused << self->mLoadGroup->RemoveRequest(self, nullptr, NS_OK); } self->mDisconnected = true; }); RunOnActorThread(FUNC, [=] { if (self->mState != State::Closed) { self->mState = State::Disconnected; } }); }); } /***************************************************************************** * Data output *****************************************************************************/ IPCResult StreamFilterParent::RecvWrite(Data&& aData) { AssertIsActorThread(); RunOnIOThread(NewRunnableMethod("StreamFilterParent::WriteMove", this, &StreamFilterParent::WriteMove, std::move(aData))); return IPC_OK(); } void StreamFilterParent::WriteMove(Data&& aData) { nsresult rv = Write(aData); Unused << NS_WARN_IF(NS_FAILED(rv)); } nsresult StreamFilterParent::Write(Data& aData) { AssertIsIOThread(); nsCOMPtr stream; nsresult rv = NS_NewByteInputStream( getter_AddRefs(stream), Span(reinterpret_cast(aData.Elements()), aData.Length()), NS_ASSIGNMENT_DEPEND); NS_ENSURE_SUCCESS(rv, rv); rv = mOrigListener->OnDataAvailable(mChannel, stream, mOffset, aData.Length()); NS_ENSURE_SUCCESS(rv, rv); mOffset += aData.Length(); return NS_OK; } /***************************************************************************** * nsIRequest *****************************************************************************/ NS_IMETHODIMP StreamFilterParent::GetName(nsACString& aName) { AssertIsMainThread(); MOZ_ASSERT(mChannel); return mChannel->GetName(aName); } NS_IMETHODIMP StreamFilterParent::GetStatus(nsresult* aStatus) { AssertIsMainThread(); MOZ_ASSERT(mChannel); return mChannel->GetStatus(aStatus); } NS_IMETHODIMP StreamFilterParent::IsPending(bool* aIsPending) { switch (mState) { case State::Initialized: case State::TransferringData: case State::Suspended: *aIsPending = true; break; default: *aIsPending = false; } return NS_OK; } NS_IMETHODIMP StreamFilterParent::SetCanceledReason(const nsACString& aReason) { return SetCanceledReasonImpl(aReason); } NS_IMETHODIMP StreamFilterParent::GetCanceledReason(nsACString& aReason) { return GetCanceledReasonImpl(aReason); } NS_IMETHODIMP StreamFilterParent::CancelWithReason(nsresult aStatus, const nsACString& aReason) { return CancelWithReasonImpl(aStatus, aReason); } NS_IMETHODIMP StreamFilterParent::Cancel(nsresult aResult) { AssertIsMainThread(); MOZ_ASSERT(mChannel); return mChannel->Cancel(aResult); } NS_IMETHODIMP StreamFilterParent::Suspend() { AssertIsMainThread(); MOZ_ASSERT(mChannel); return mChannel->Suspend(); } NS_IMETHODIMP StreamFilterParent::Resume() { AssertIsMainThread(); MOZ_ASSERT(mChannel); return mChannel->Resume(); } NS_IMETHODIMP StreamFilterParent::GetLoadGroup(nsILoadGroup** aLoadGroup) { *aLoadGroup = mLoadGroup; return NS_OK; } NS_IMETHODIMP StreamFilterParent::SetLoadGroup(nsILoadGroup* aLoadGroup) { return NS_ERROR_NOT_IMPLEMENTED; } NS_IMETHODIMP StreamFilterParent::GetLoadFlags(nsLoadFlags* aLoadFlags) { AssertIsMainThread(); MOZ_ASSERT(mChannel); MOZ_TRY(mChannel->GetLoadFlags(aLoadFlags)); *aLoadFlags &= ~nsIChannel::LOAD_DOCUMENT_URI; return NS_OK; } NS_IMETHODIMP StreamFilterParent::SetLoadFlags(nsLoadFlags aLoadFlags) { AssertIsMainThread(); MOZ_ASSERT(mChannel); return mChannel->SetLoadFlags(aLoadFlags); } NS_IMETHODIMP StreamFilterParent::GetTRRMode(nsIRequest::TRRMode* aTRRMode) { return GetTRRModeImpl(aTRRMode); } NS_IMETHODIMP StreamFilterParent::SetTRRMode(nsIRequest::TRRMode aTRRMode) { return SetTRRModeImpl(aTRRMode); } /***************************************************************************** * nsIStreamListener *****************************************************************************/ NS_IMETHODIMP StreamFilterParent::OnStartRequest(nsIRequest* aRequest) { AssertIsMainThread(); // Always reset mChannel if aRequest is different. Various calls in // StreamFilterParent will use mChannel, but aRequest is *always* the // right channel to use at this point. // // For ALL redirections, we will disconnect this listener. Extensions // will create a new filter if they need it. mBeforeOnStartRequest = false; if (aRequest != mChannel) { nsCOMPtr channel = do_QueryInterface(aRequest); nsCOMPtr loadInfo = channel ? channel->LoadInfo() : nullptr; mChannel = channel; if (!(loadInfo && loadInfo->RedirectChainIncludingInternalRedirects().IsEmpty())) { mDisconnected = true; mDisconnectedByOnStartRequest = true; RefPtr self(this); RunOnActorThread(FUNC, [=] { if (self->IPCActive()) { self->mState = State::Disconnected; CheckResult(self->SendError("Channel redirected"_ns)); } }); } } // Check if alterate cached data is being sent, if so we receive un-decoded // data and we must disconnect the filter and send an error to the extension. if (!mDisconnected) { RefPtr chan = do_QueryObject(aRequest); if (chan && chan->IsDeliveringAltData()) { mDisconnected = true; mDisconnectedByOnStartRequest = true; RefPtr self(this); RunOnActorThread(FUNC, [=] { if (self->IPCActive()) { self->mState = State::Disconnected; CheckResult( self->SendError("Channel is delivering cached alt-data"_ns)); } }); } } if (!mDisconnected) { Unused << mChannel->GetLoadGroup(getter_AddRefs(mLoadGroup)); if (mLoadGroup) { Unused << mLoadGroup->AddRequest(this, nullptr); } } nsresult rv = mOrigListener->OnStartRequest(aRequest); // Important: Do this only *after* running the next listener in the chain, so // that we get the final delivery target after any retargeting that it may do. if (nsCOMPtr req = do_QueryInterface(aRequest)) { nsCOMPtr thread; Unused << req->GetDeliveryTarget(getter_AddRefs(thread)); if (thread) { mIOThread = std::move(thread); } } // Important: Do this *after* we have set the thread delivery target, or it is // possible in rare circumstances for an extension to attempt to write data // before the thread has been set up, even though there are several layers of // asynchrony involved. if (!mDisconnected) { RefPtr self(this); RunOnActorThread(FUNC, [=] { if (self->IPCActive()) { self->mState = State::TransferringData; self->CheckResult(self->SendStartRequest()); } }); } return rv; } NS_IMETHODIMP StreamFilterParent::OnStopRequest(nsIRequest* aRequest, nsresult aStatusCode) { AssertIsMainThread(); MOZ_ASSERT(aRequest == mChannel); mReceivedStop = true; if (mDisconnected) { return EmitStopRequest(aStatusCode); } RefPtr self(this); RunOnActorThread(FUNC, [=] { if (self->IPCActive()) { self->CheckResult(self->SendStopRequest(aStatusCode)); } else if (self->mState != State::Disconnecting) { // If we're currently disconnecting, then we'll emit a stop // request at the end of that process. Otherwise we need to // manually emit one here, since we won't be getting a response // from the child. RunOnMainThread(FUNC, [=] { if (!self->mSentStop) { self->EmitStopRequest(aStatusCode); } }); } }); return NS_OK; } nsresult StreamFilterParent::EmitStopRequest(nsresult aStatusCode) { AssertIsMainThread(); MOZ_ASSERT(!mSentStop); mSentStop = true; nsresult rv = mOrigListener->OnStopRequest(mChannel, aStatusCode); if (mLoadGroup && !mDisconnected) { Unused << mLoadGroup->RemoveRequest(this, nullptr, aStatusCode); } return rv; } /***************************************************************************** * Incoming data handling *****************************************************************************/ void StreamFilterParent::DoSendData(Data&& aData) { AssertIsActorThread(); if (mState == State::TransferringData) { CheckResult(SendData(aData)); } } NS_IMETHODIMP StreamFilterParent::OnDataAvailable(nsIRequest* aRequest, nsIInputStream* aInputStream, uint64_t aOffset, uint32_t aCount) { AssertIsIOThread(); if (mDisconnectedByOnStartRequest || mState == State::Disconnected) { // If we're offloading data in a thread pool, it's possible that we'll // have buffered some additional data while waiting for the buffer to // flush. So, if there's any buffered data left, flush that before we // flush this incoming data. // // Note: When in the eDisconnected state, the buffer list is guaranteed // never to be accessed by another thread during an OnDataAvailable call. if (!mBufferedData.isEmpty()) { FlushBufferedData(); } mOffset += aCount; return mOrigListener->OnDataAvailable(aRequest, aInputStream, mOffset - aCount, aCount); } Data data; data.SetLength(aCount); uint32_t count; nsresult rv = aInputStream->Read(reinterpret_cast(data.Elements()), aCount, &count); NS_ENSURE_SUCCESS(rv, rv); NS_ENSURE_TRUE(count == aCount, NS_ERROR_UNEXPECTED); if (mState == State::Disconnecting) { MutexAutoLock al(mBufferMutex); BufferData(std::move(data)); } else if (mState == State::Closed) { return NS_ERROR_FAILURE; } else { ActorThread()->Dispatch( NewRunnableMethod("StreamFilterParent::DoSendData", this, &StreamFilterParent::DoSendData, std::move(data)), NS_DISPATCH_NORMAL); } return NS_OK; } nsresult StreamFilterParent::FlushBufferedData() { AssertIsIOThread(); // When offloading data to a thread pool, OnDataAvailable isn't guaranteed // to always run in the same thread, so it's possible for this function to // run in parallel with OnDataAvailable. MutexAutoLock al(mBufferMutex); while (!mBufferedData.isEmpty()) { UniquePtr data(mBufferedData.popFirst()); nsresult rv = Write(data->mData); NS_ENSURE_SUCCESS(rv, rv); } return NS_OK; } /***************************************************************************** * Thread helpers *****************************************************************************/ nsIEventTarget* StreamFilterParent::ActorThread() { return net::gSocketTransportService; } bool StreamFilterParent::IsActorThread() { return ActorThread()->IsOnCurrentThread(); } void StreamFilterParent::AssertIsActorThread() { MOZ_ASSERT(IsActorThread()); } nsISerialEventTarget* StreamFilterParent::IOThread() { return mIOThread; } bool StreamFilterParent::IsIOThread() { return mIOThread->IsOnCurrentThread(); } void StreamFilterParent::AssertIsIOThread() { MOZ_ASSERT(IsIOThread()); } template void StreamFilterParent::RunOnMainThread(const char* aName, Function&& aFunc) { mQueue->RunOrEnqueue(new ChannelEventFunction(mMainThread, std::move(aFunc))); } void StreamFilterParent::RunOnMainThread(already_AddRefed aRunnable) { mQueue->RunOrEnqueue( new ChannelEventRunnable(mMainThread, std::move(aRunnable))); } template void StreamFilterParent::RunOnIOThread(const char* aName, Function&& aFunc) { mQueue->RunOrEnqueue(new ChannelEventFunction(mIOThread, std::move(aFunc))); } void StreamFilterParent::RunOnIOThread(already_AddRefed aRunnable) { mQueue->RunOrEnqueue( new ChannelEventRunnable(mIOThread, std::move(aRunnable))); } template void StreamFilterParent::RunOnActorThread(const char* aName, Function&& aFunc) { // We don't use mQueue for dispatch to the actor thread. // // The main thread and IO thread are used for dispatching events to the // wrapped stream listener, and those events need to be processed // consistently, in the order they were dispatched. An event dispatched to the // main thread can't be run before events that were dispatched to the IO // thread before it. // // Additionally, the IO thread is likely to be a thread pool, which means that // without thread-safe queuing, it's possible for multiple events dispatched // to it to be processed in parallel, or out of order. // // The actor thread, however, is always a serial event target. Its events are // always processed in order, and events dispatched to the actor thread are // independent of the events in the output event queue. if (IsActorThread()) { aFunc(); } else { ActorThread()->Dispatch(std::move(NS_NewRunnableFunction(aName, aFunc)), NS_DISPATCH_NORMAL); } } /***************************************************************************** * Glue *****************************************************************************/ void StreamFilterParent::ActorDestroy(ActorDestroyReason aWhy) { AssertIsActorThread(); if (mState != State::Disconnected && mState != State::Closed) { Broken(); } } NS_INTERFACE_MAP_BEGIN(StreamFilterParent) NS_INTERFACE_MAP_ENTRY(nsIStreamListener) NS_INTERFACE_MAP_ENTRY(nsIRequestObserver) NS_INTERFACE_MAP_ENTRY(nsIRequest) NS_INTERFACE_MAP_ENTRY(nsIThreadRetargetableStreamListener) NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIStreamListener) NS_INTERFACE_MAP_END NS_IMPL_ADDREF(StreamFilterParent) NS_IMPL_RELEASE(StreamFilterParent) } // namespace extensions } // namespace mozilla