diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
commit | 36d22d82aa202bb199967e9512281e9a53db42c9 (patch) | |
tree | 105e8c98ddea1c1e4784a60a5a6410fa416be2de /ipc/glue/DataPipe.cpp | |
parent | Initial commit. (diff) | |
download | firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.tar.xz firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.zip |
Adding upstream version 115.7.0esr.upstream/115.7.0esr
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | ipc/glue/DataPipe.cpp | 741 |
1 files changed, 741 insertions, 0 deletions
diff --git a/ipc/glue/DataPipe.cpp b/ipc/glue/DataPipe.cpp new file mode 100644 index 0000000000..55e38ddd67 --- /dev/null +++ b/ipc/glue/DataPipe.cpp @@ -0,0 +1,741 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* vim: set ts=8 sts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "DataPipe.h" +#include "mozilla/AlreadyAddRefed.h" +#include "mozilla/Assertions.h" +#include "mozilla/CheckedInt.h" +#include "mozilla/ErrorNames.h" +#include "mozilla/Logging.h" +#include "mozilla/MoveOnlyFunction.h" +#include "mozilla/ipc/InputStreamParams.h" +#include "nsIAsyncInputStream.h" +#include "nsStreamUtils.h" +#include "nsThreadUtils.h" + +namespace mozilla { +namespace ipc { + +LazyLogModule gDataPipeLog("DataPipe"); + +namespace data_pipe_detail { + +// Helper for queueing up actions to be run once the mutex has been unlocked. +// Actions will be run in-order. +class MOZ_SCOPED_CAPABILITY DataPipeAutoLock { + public: + explicit DataPipeAutoLock(Mutex& aMutex) MOZ_CAPABILITY_ACQUIRE(aMutex) + : mMutex(aMutex) { + mMutex.Lock(); + } + DataPipeAutoLock(const DataPipeAutoLock&) = delete; + DataPipeAutoLock& operator=(const DataPipeAutoLock&) = delete; + + template <typename F> + void AddUnlockAction(F aAction) { + mActions.AppendElement(std::move(aAction)); + } + + ~DataPipeAutoLock() MOZ_CAPABILITY_RELEASE() { + mMutex.Unlock(); + for (auto& action : mActions) { + action(); + } + } + + private: + Mutex& mMutex; + AutoTArray<MoveOnlyFunction<void()>, 4> mActions; +}; + +static void DoNotifyOnUnlock(DataPipeAutoLock& aLock, + already_AddRefed<nsIRunnable> aCallback, + already_AddRefed<nsIEventTarget> aTarget) { + nsCOMPtr<nsIRunnable> callback{std::move(aCallback)}; + nsCOMPtr<nsIEventTarget> target{std::move(aTarget)}; + if (callback) { + aLock.AddUnlockAction( + [callback = std::move(callback), target = std::move(target)]() mutable { + if (target) { + target->Dispatch(callback.forget()); + } else { + NS_DispatchBackgroundTask(callback.forget()); + } + }); + } +} + +class DataPipeLink : public NodeController::PortObserver { + public: + DataPipeLink(bool aReceiverSide, std::shared_ptr<Mutex> aMutex, + ScopedPort aPort, SharedMemory* aShmem, uint32_t aCapacity, + nsresult aPeerStatus, uint32_t aOffset, uint32_t aAvailable) + : mMutex(std::move(aMutex)), + mPort(std::move(aPort)), + mShmem(aShmem), + mCapacity(aCapacity), + mReceiverSide(aReceiverSide), + mPeerStatus(aPeerStatus), + mOffset(aOffset), + mAvailable(aAvailable) {} + + void Init() MOZ_EXCLUDES(*mMutex) { + { + DataPipeAutoLock lock(*mMutex); + if (NS_FAILED(mPeerStatus)) { + return; + } + MOZ_ASSERT(mPort.IsValid()); + mPort.Controller()->SetPortObserver(mPort.Port(), this); + } + OnPortStatusChanged(); + } + + void OnPortStatusChanged() final MOZ_EXCLUDES(*mMutex); + + // Add a task to notify the callback after `aLock` is unlocked. + // + // This method is safe to call multiple times, as after the first time it is + // called, `mCallback` will be cleared. + void NotifyOnUnlock(DataPipeAutoLock& aLock) MOZ_REQUIRES(*mMutex) { + DoNotifyOnUnlock(aLock, mCallback.forget(), mCallbackTarget.forget()); + } + + void SendBytesConsumedOnUnlock(DataPipeAutoLock& aLock, uint32_t aBytes) + MOZ_REQUIRES(*mMutex) { + MOZ_LOG(gDataPipeLog, LogLevel::Verbose, + ("SendOnUnlock CONSUMED(%u) %s", aBytes, Describe(aLock).get())); + if (NS_FAILED(mPeerStatus)) { + return; + } + + // `mPort` may be destroyed by `SetPeerError` after the DataPipe is unlocked + // but before we send the message. The strong controller and port references + // will allow us to try to send the message anyway, and it will be safely + // dropped if the port has already been closed. CONSUMED messages are safe + // to deliver out-of-order, so we don't need to worry about ordering here. + aLock.AddUnlockAction([controller = RefPtr{mPort.Controller()}, + port = mPort.Port(), aBytes]() mutable { + auto message = MakeUnique<IPC::Message>( + MSG_ROUTING_NONE, DATA_PIPE_BYTES_CONSUMED_MESSAGE_TYPE); + IPC::MessageWriter writer(*message); + WriteParam(&writer, aBytes); + controller->SendUserMessage(port, std::move(message)); + }); + } + + void SetPeerError(DataPipeAutoLock& aLock, nsresult aStatus, + bool aSendClosed = false) MOZ_REQUIRES(*mMutex) { + MOZ_LOG(gDataPipeLog, LogLevel::Debug, + ("SetPeerError(%s%s) %s", GetStaticErrorName(aStatus), + aSendClosed ? ", send" : "", Describe(aLock).get())); + // The pipe was closed or errored. Clear the observer reference back + // to this type from the port layer, and ensure we notify waiters. + MOZ_ASSERT(NS_SUCCEEDED(mPeerStatus)); + mPeerStatus = NS_SUCCEEDED(aStatus) ? NS_BASE_STREAM_CLOSED : aStatus; + aLock.AddUnlockAction([port = std::move(mPort), aStatus, aSendClosed] { + if (aSendClosed) { + auto message = MakeUnique<IPC::Message>(MSG_ROUTING_NONE, + DATA_PIPE_CLOSED_MESSAGE_TYPE); + IPC::MessageWriter writer(*message); + WriteParam(&writer, aStatus); + port.Controller()->SendUserMessage(port.Port(), std::move(message)); + } + // The `ScopedPort` being destroyed with this action will close it, + // clearing the observer reference from the ports layer. + }); + NotifyOnUnlock(aLock); + } + + nsCString Describe(DataPipeAutoLock& aLock) const MOZ_REQUIRES(*mMutex) { + return nsPrintfCString( + "[%s(%p) c=%u e=%s o=%u a=%u, cb=%s]", + mReceiverSide ? "Receiver" : "Sender", this, mCapacity, + GetStaticErrorName(mPeerStatus), mOffset, mAvailable, + mCallback ? (mCallbackClosureOnly ? "clo" : "yes") : "no"); + } + + // This mutex is shared with the `DataPipeBase` which owns this + // `DataPipeLink`. + std::shared_ptr<Mutex> mMutex; + + ScopedPort mPort MOZ_GUARDED_BY(*mMutex); + const RefPtr<SharedMemory> mShmem; + const uint32_t mCapacity; + const bool mReceiverSide; + + bool mProcessingSegment MOZ_GUARDED_BY(*mMutex) = false; + + nsresult mPeerStatus MOZ_GUARDED_BY(*mMutex) = NS_OK; + uint32_t mOffset MOZ_GUARDED_BY(*mMutex) = 0; + uint32_t mAvailable MOZ_GUARDED_BY(*mMutex) = 0; + + bool mCallbackClosureOnly MOZ_GUARDED_BY(*mMutex) = false; + nsCOMPtr<nsIRunnable> mCallback MOZ_GUARDED_BY(*mMutex); + nsCOMPtr<nsIEventTarget> mCallbackTarget MOZ_GUARDED_BY(*mMutex); +}; + +void DataPipeLink::OnPortStatusChanged() { + DataPipeAutoLock lock(*mMutex); + + while (NS_SUCCEEDED(mPeerStatus)) { + UniquePtr<IPC::Message> message; + if (!mPort.Controller()->GetMessage(mPort.Port(), &message)) { + SetPeerError(lock, NS_BASE_STREAM_CLOSED); + return; + } + if (!message) { + return; // no more messages + } + + IPC::MessageReader reader(*message); + switch (message->type()) { + case DATA_PIPE_CLOSED_MESSAGE_TYPE: { + nsresult status = NS_OK; + if (!ReadParam(&reader, &status)) { + NS_WARNING("Unable to parse nsresult error from peer"); + status = NS_ERROR_UNEXPECTED; + } + MOZ_LOG(gDataPipeLog, LogLevel::Debug, + ("Got CLOSED(%s) %s", GetStaticErrorName(status), + Describe(lock).get())); + SetPeerError(lock, status); + return; + } + case DATA_PIPE_BYTES_CONSUMED_MESSAGE_TYPE: { + uint32_t consumed = 0; + if (!ReadParam(&reader, &consumed)) { + NS_WARNING("Unable to parse bytes consumed from peer"); + SetPeerError(lock, NS_ERROR_UNEXPECTED); + return; + } + + MOZ_LOG(gDataPipeLog, LogLevel::Verbose, + ("Got CONSUMED(%u) %s", consumed, Describe(lock).get())); + auto newAvailable = CheckedUint32{mAvailable} + consumed; + if (!newAvailable.isValid() || newAvailable.value() > mCapacity) { + NS_WARNING("Illegal bytes consumed message received from peer"); + SetPeerError(lock, NS_ERROR_UNEXPECTED); + return; + } + mAvailable = newAvailable.value(); + if (!mCallbackClosureOnly) { + NotifyOnUnlock(lock); + } + break; + } + default: { + NS_WARNING("Illegal message type received from peer"); + SetPeerError(lock, NS_ERROR_UNEXPECTED); + return; + } + } + } +} + +DataPipeBase::DataPipeBase(bool aReceiverSide, nsresult aError) + : mMutex(std::make_shared<Mutex>(aReceiverSide ? "DataPipeReceiver" + : "DataPipeSender")), + mStatus(NS_SUCCEEDED(aError) ? NS_BASE_STREAM_CLOSED : aError) {} + +DataPipeBase::DataPipeBase(bool aReceiverSide, ScopedPort aPort, + SharedMemory* aShmem, uint32_t aCapacity, + nsresult aPeerStatus, uint32_t aOffset, + uint32_t aAvailable) + : mMutex(std::make_shared<Mutex>(aReceiverSide ? "DataPipeReceiver" + : "DataPipeSender")), + mStatus(NS_OK), + mLink(new DataPipeLink(aReceiverSide, mMutex, std::move(aPort), aShmem, + aCapacity, aPeerStatus, aOffset, aAvailable)) { + mLink->Init(); +} + +DataPipeBase::~DataPipeBase() { + DataPipeAutoLock lock(*mMutex); + CloseInternal(lock, NS_BASE_STREAM_CLOSED); +} + +void DataPipeBase::CloseInternal(DataPipeAutoLock& aLock, nsresult aStatus) { + if (NS_FAILED(mStatus)) { + return; + } + + MOZ_LOG( + gDataPipeLog, LogLevel::Debug, + ("Closing(%s) %s", GetStaticErrorName(aStatus), Describe(aLock).get())); + + // Set our status to an errored status. + mStatus = NS_SUCCEEDED(aStatus) ? NS_BASE_STREAM_CLOSED : aStatus; + RefPtr<DataPipeLink> link = mLink.forget(); + AssertSameMutex(link->mMutex); + link->NotifyOnUnlock(aLock); + + // If our peer hasn't disappeared yet, clean up our connection to it. + if (NS_SUCCEEDED(link->mPeerStatus)) { + link->SetPeerError(aLock, mStatus, /* aSendClosed */ true); + } +} + +nsresult DataPipeBase::ProcessSegmentsInternal( + uint32_t aCount, ProcessSegmentFun aProcessSegment, + uint32_t* aProcessedCount) { + *aProcessedCount = 0; + + while (*aProcessedCount < aCount) { + DataPipeAutoLock lock(*mMutex); + mMutex->AssertCurrentThreadOwns(); + + MOZ_LOG(gDataPipeLog, LogLevel::Verbose, + ("ProcessSegments(%u of %u) %s", *aProcessedCount, aCount, + Describe(lock).get())); + + nsresult status = CheckStatus(lock); + if (NS_FAILED(status)) { + if (*aProcessedCount > 0) { + return NS_OK; + } + return status == NS_BASE_STREAM_CLOSED ? NS_OK : status; + } + + RefPtr<DataPipeLink> link = mLink; + AssertSameMutex(link->mMutex); + if (!link->mAvailable) { + MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(link->mPeerStatus), + "CheckStatus will have returned an error"); + return *aProcessedCount > 0 ? NS_OK : NS_BASE_STREAM_WOULD_BLOCK; + } + + MOZ_RELEASE_ASSERT(!link->mProcessingSegment, + "Only one thread may be processing a segment at a time"); + + // Extract an iterator over the next contiguous region of the shared memory + // buffer which will be used . + char* start = static_cast<char*>(link->mShmem->memory()) + link->mOffset; + char* iter = start; + char* end = start + std::min({aCount - *aProcessedCount, link->mAvailable, + link->mCapacity - link->mOffset}); + + // Record the consumed region from our segment when exiting this scope, + // telling our peer how many bytes were consumed. Hold on to `mLink` to keep + // the shmem mapped and make sure we can clean up even if we're closed while + // processing the shmem region. + link->mProcessingSegment = true; + auto scopeExit = MakeScopeExit([&] { + mMutex->AssertCurrentThreadOwns(); // should still be held + AssertSameMutex(link->mMutex); + + MOZ_RELEASE_ASSERT(link->mProcessingSegment); + link->mProcessingSegment = false; + uint32_t totalProcessed = iter - start; + if (totalProcessed > 0) { + link->mOffset += totalProcessed; + MOZ_RELEASE_ASSERT(link->mOffset <= link->mCapacity); + if (link->mOffset == link->mCapacity) { + link->mOffset = 0; + } + link->mAvailable -= totalProcessed; + link->SendBytesConsumedOnUnlock(lock, totalProcessed); + } + MOZ_LOG(gDataPipeLog, LogLevel::Verbose, + ("Processed Segment(%u of %zu) %s", totalProcessed, end - start, + Describe(lock).get())); + }); + + { + MutexAutoUnlock unlock(*mMutex); + while (iter < end) { + uint32_t processed = 0; + Span segment{iter, end}; + nsresult rv = aProcessSegment(segment, *aProcessedCount, &processed); + if (NS_FAILED(rv) || processed == 0) { + return NS_OK; + } + + MOZ_RELEASE_ASSERT(processed <= segment.Length()); + iter += processed; + *aProcessedCount += processed; + } + } + } + MOZ_DIAGNOSTIC_ASSERT(*aProcessedCount == aCount, + "Must have processed exactly aCount"); + return NS_OK; +} + +void DataPipeBase::AsyncWaitInternal(already_AddRefed<nsIRunnable> aCallback, + already_AddRefed<nsIEventTarget> aTarget, + bool aClosureOnly) { + RefPtr<nsIRunnable> callback = std::move(aCallback); + RefPtr<nsIEventTarget> target = std::move(aTarget); + + DataPipeAutoLock lock(*mMutex); + MOZ_LOG(gDataPipeLog, LogLevel::Debug, + ("AsyncWait %s %p %s", aClosureOnly ? "(closure)" : "(ready)", + callback.get(), Describe(lock).get())); + + if (NS_FAILED(CheckStatus(lock))) { +#ifdef DEBUG + if (mLink) { + AssertSameMutex(mLink->mMutex); + MOZ_ASSERT(!mLink->mCallback); + } +#endif + DoNotifyOnUnlock(lock, callback.forget(), target.forget()); + return; + } + + AssertSameMutex(mLink->mMutex); + + // NOTE: After this point, `mLink` may have previously had a callback which is + // now being cancelled, make sure we clear `mCallback` even if we're going to + // call `aCallback` immediately. + mLink->mCallback = callback.forget(); + mLink->mCallbackTarget = target.forget(); + mLink->mCallbackClosureOnly = aClosureOnly; + if (!aClosureOnly && mLink->mAvailable) { + mLink->NotifyOnUnlock(lock); + } +} + +nsresult DataPipeBase::CheckStatus(DataPipeAutoLock& aLock) { + // If our peer has closed or errored, we may need to close our local side to + // reflect the error code our peer provided. If we're a sender, we want to + // become closed immediately, whereas if we're a receiver we want to wait + // until our available buffer has been exhausted. + // + // NOTE: There may still be 2-stage writes/reads ongoing at this point, which + // will continue due to `mLink` being kept alive by the + // `ProcessSegmentsInternal` function. + if (NS_FAILED(mStatus)) { + return mStatus; + } + AssertSameMutex(mLink->mMutex); + if (NS_FAILED(mLink->mPeerStatus) && + (!mLink->mReceiverSide || !mLink->mAvailable)) { + CloseInternal(aLock, mLink->mPeerStatus); + } + return mStatus; +} + +nsCString DataPipeBase::Describe(DataPipeAutoLock& aLock) { + if (mLink) { + AssertSameMutex(mLink->mMutex); + return mLink->Describe(aLock); + } + return nsPrintfCString("[status=%s]", GetStaticErrorName(mStatus)); +} + +template <typename T> +void DataPipeWrite(IPC::MessageWriter* aWriter, T* aParam) { + DataPipeAutoLock lock(*aParam->mMutex); + MOZ_LOG(gDataPipeLog, LogLevel::Debug, + ("IPC Write: %s", aParam->Describe(lock).get())); + + WriteParam(aWriter, aParam->mStatus); + if (NS_FAILED(aParam->mStatus)) { + return; + } + + aParam->AssertSameMutex(aParam->mLink->mMutex); + MOZ_RELEASE_ASSERT(!aParam->mLink->mProcessingSegment, + "cannot transfer while processing a segment"); + + // Serialize relevant parameters to our peer. + WriteParam(aWriter, std::move(aParam->mLink->mPort)); + if (!aParam->mLink->mShmem->WriteHandle(aWriter)) { + aWriter->FatalError("failed to write DataPipe shmem handle"); + MOZ_CRASH("failed to write DataPipe shmem handle"); + } + WriteParam(aWriter, aParam->mLink->mCapacity); + WriteParam(aWriter, aParam->mLink->mPeerStatus); + WriteParam(aWriter, aParam->mLink->mOffset); + WriteParam(aWriter, aParam->mLink->mAvailable); + + // Mark our peer as closed so we don't try to send to it when closing. + aParam->mLink->mPeerStatus = NS_ERROR_NOT_INITIALIZED; + aParam->CloseInternal(lock, NS_ERROR_NOT_INITIALIZED); +} + +template <typename T> +bool DataPipeRead(IPC::MessageReader* aReader, RefPtr<T>* aResult) { + nsresult rv = NS_OK; + if (!ReadParam(aReader, &rv)) { + aReader->FatalError("failed to read DataPipe status"); + return false; + } + if (NS_FAILED(rv)) { + *aResult = new T(rv); + MOZ_LOG(gDataPipeLog, LogLevel::Debug, + ("IPC Read: [status=%s]", GetStaticErrorName(rv))); + return true; + } + + ScopedPort port; + if (!ReadParam(aReader, &port)) { + aReader->FatalError("failed to read DataPipe port"); + return false; + } + RefPtr shmem = new SharedMemoryBasic(); + if (!shmem->ReadHandle(aReader)) { + aReader->FatalError("failed to read DataPipe shmem"); + return false; + } + uint32_t capacity = 0; + nsresult peerStatus = NS_OK; + uint32_t offset = 0; + uint32_t available = 0; + if (!ReadParam(aReader, &capacity) || !ReadParam(aReader, &peerStatus) || + !ReadParam(aReader, &offset) || !ReadParam(aReader, &available)) { + aReader->FatalError("failed to read DataPipe fields"); + return false; + } + if (!capacity || offset >= capacity || available > capacity) { + aReader->FatalError("received DataPipe state values are inconsistent"); + return false; + } + if (!shmem->Map(SharedMemory::PageAlignedSize(capacity))) { + aReader->FatalError("failed to map DataPipe shared memory region"); + return false; + } + + *aResult = + new T(std::move(port), shmem, capacity, peerStatus, offset, available); + if (MOZ_LOG_TEST(gDataPipeLog, LogLevel::Debug)) { + DataPipeAutoLock lock(*(*aResult)->mMutex); + MOZ_LOG(gDataPipeLog, LogLevel::Debug, + ("IPC Read: %s", (*aResult)->Describe(lock).get())); + } + return true; +} + +} // namespace data_pipe_detail + +//----------------------------------------------------------------------------- +// DataPipeSender +//----------------------------------------------------------------------------- + +NS_IMPL_ISUPPORTS(DataPipeSender, nsIOutputStream, nsIAsyncOutputStream, + DataPipeSender) + +// nsIOutputStream + +NS_IMETHODIMP DataPipeSender::Close() { + return CloseWithStatus(NS_BASE_STREAM_CLOSED); +} + +NS_IMETHODIMP DataPipeSender::Flush() { return NS_OK; } + +NS_IMETHODIMP DataPipeSender::StreamStatus() { + data_pipe_detail::DataPipeAutoLock lock(*mMutex); + return CheckStatus(lock); +} + +NS_IMETHODIMP DataPipeSender::Write(const char* aBuf, uint32_t aCount, + uint32_t* aWriteCount) { + return WriteSegments(NS_CopyBufferToSegment, (void*)aBuf, aCount, + aWriteCount); +} + +NS_IMETHODIMP DataPipeSender::WriteFrom(nsIInputStream* aFromStream, + uint32_t aCount, + uint32_t* aWriteCount) { + return WriteSegments(NS_CopyStreamToSegment, aFromStream, aCount, + aWriteCount); +} + +NS_IMETHODIMP DataPipeSender::WriteSegments(nsReadSegmentFun aReader, + void* aClosure, uint32_t aCount, + uint32_t* aWriteCount) { + auto processSegment = [&](Span<char> aSpan, uint32_t aToOffset, + uint32_t* aReadCount) -> nsresult { + return aReader(this, aClosure, aSpan.data(), aToOffset, aSpan.Length(), + aReadCount); + }; + return ProcessSegmentsInternal(aCount, processSegment, aWriteCount); +} + +NS_IMETHODIMP DataPipeSender::IsNonBlocking(bool* _retval) { + *_retval = true; + return NS_OK; +} + +// nsIAsyncOutputStream + +NS_IMETHODIMP DataPipeSender::CloseWithStatus(nsresult reason) { + data_pipe_detail::DataPipeAutoLock lock(*mMutex); + CloseInternal(lock, reason); + return NS_OK; +} + +NS_IMETHODIMP DataPipeSender::AsyncWait(nsIOutputStreamCallback* aCallback, + uint32_t aFlags, + uint32_t aRequestedCount, + nsIEventTarget* aTarget) { + AsyncWaitInternal( + aCallback ? NS_NewCancelableRunnableFunction( + "DataPipeReceiver::AsyncWait", + [self = RefPtr{this}, callback = RefPtr{aCallback}] { + MOZ_LOG(gDataPipeLog, LogLevel::Debug, + ("Calling OnOutputStreamReady(%p, %p)", + callback.get(), self.get())); + callback->OnOutputStreamReady(self); + }) + : nullptr, + do_AddRef(aTarget), aFlags & WAIT_CLOSURE_ONLY); + return NS_OK; +} + +//----------------------------------------------------------------------------- +// DataPipeReceiver +//----------------------------------------------------------------------------- + +NS_IMPL_ISUPPORTS(DataPipeReceiver, nsIInputStream, nsIAsyncInputStream, + nsIIPCSerializableInputStream, DataPipeReceiver) + +// nsIInputStream + +NS_IMETHODIMP DataPipeReceiver::Close() { + return CloseWithStatus(NS_BASE_STREAM_CLOSED); +} + +NS_IMETHODIMP DataPipeReceiver::Available(uint64_t* _retval) { + data_pipe_detail::DataPipeAutoLock lock(*mMutex); + nsresult rv = CheckStatus(lock); + if (NS_FAILED(rv)) { + return rv; + } + AssertSameMutex(mLink->mMutex); + *_retval = mLink->mAvailable; + return NS_OK; +} + +NS_IMETHODIMP DataPipeReceiver::StreamStatus() { + data_pipe_detail::DataPipeAutoLock lock(*mMutex); + return CheckStatus(lock); +} + +NS_IMETHODIMP DataPipeReceiver::Read(char* aBuf, uint32_t aCount, + uint32_t* aReadCount) { + return ReadSegments(NS_CopySegmentToBuffer, aBuf, aCount, aReadCount); +} + +NS_IMETHODIMP DataPipeReceiver::ReadSegments(nsWriteSegmentFun aWriter, + void* aClosure, uint32_t aCount, + uint32_t* aReadCount) { + auto processSegment = [&](Span<char> aSpan, uint32_t aToOffset, + uint32_t* aWriteCount) -> nsresult { + return aWriter(this, aClosure, aSpan.data(), aToOffset, aSpan.Length(), + aWriteCount); + }; + return ProcessSegmentsInternal(aCount, processSegment, aReadCount); +} + +NS_IMETHODIMP DataPipeReceiver::IsNonBlocking(bool* _retval) { + *_retval = true; + return NS_OK; +} + +// nsIAsyncInputStream + +NS_IMETHODIMP DataPipeReceiver::CloseWithStatus(nsresult aStatus) { + data_pipe_detail::DataPipeAutoLock lock(*mMutex); + CloseInternal(lock, aStatus); + return NS_OK; +} + +NS_IMETHODIMP DataPipeReceiver::AsyncWait(nsIInputStreamCallback* aCallback, + uint32_t aFlags, + uint32_t aRequestedCount, + nsIEventTarget* aTarget) { + AsyncWaitInternal( + aCallback ? NS_NewCancelableRunnableFunction( + "DataPipeReceiver::AsyncWait", + [self = RefPtr{this}, callback = RefPtr{aCallback}] { + MOZ_LOG(gDataPipeLog, LogLevel::Debug, + ("Calling OnInputStreamReady(%p, %p)", + callback.get(), self.get())); + callback->OnInputStreamReady(self); + }) + : nullptr, + do_AddRef(aTarget), aFlags & WAIT_CLOSURE_ONLY); + return NS_OK; +} + +// nsIIPCSerializableInputStream + +void DataPipeReceiver::SerializedComplexity(uint32_t aMaxSize, + uint32_t* aSizeUsed, + uint32_t* aPipes, + uint32_t* aTransferables) { + // We report DataPipeReceiver as taking one transferrable to serialize, rather + // than one pipe, as we aren't starting a new pipe for this purpose, and are + // instead transferring an existing pipe. + *aTransferables = 1; +} + +void DataPipeReceiver::Serialize(InputStreamParams& aParams, uint32_t aMaxSize, + uint32_t* aSizeUsed) { + *aSizeUsed = 0; + aParams = DataPipeReceiverStreamParams(this); +} + +bool DataPipeReceiver::Deserialize(const InputStreamParams& aParams) { + MOZ_CRASH("Handled directly in `DeserializeInputStream`"); +} + +//----------------------------------------------------------------------------- +// NewDataPipe +//----------------------------------------------------------------------------- + +nsresult NewDataPipe(uint32_t aCapacity, DataPipeSender** aSender, + DataPipeReceiver** aReceiver) { + if (!aCapacity) { + aCapacity = kDefaultDataPipeCapacity; + } + + RefPtr<NodeController> controller = NodeController::GetSingleton(); + if (!controller) { + return NS_ERROR_ILLEGAL_DURING_SHUTDOWN; + } + + auto [senderPort, receiverPort] = controller->CreatePortPair(); + auto shmem = MakeRefPtr<SharedMemoryBasic>(); + size_t alignedCapacity = SharedMemory::PageAlignedSize(aCapacity); + if (!shmem->Create(alignedCapacity) || !shmem->Map(alignedCapacity)) { + return NS_ERROR_OUT_OF_MEMORY; + } + + RefPtr sender = new DataPipeSender(std::move(senderPort), shmem, aCapacity, + NS_OK, 0, aCapacity); + RefPtr receiver = new DataPipeReceiver(std::move(receiverPort), shmem, + aCapacity, NS_OK, 0, 0); + sender.forget(aSender); + receiver.forget(aReceiver); + return NS_OK; +} + +} // namespace ipc +} // namespace mozilla + +void IPC::ParamTraits<mozilla::ipc::DataPipeSender*>::Write( + MessageWriter* aWriter, mozilla::ipc::DataPipeSender* aParam) { + mozilla::ipc::data_pipe_detail::DataPipeWrite(aWriter, aParam); +} + +bool IPC::ParamTraits<mozilla::ipc::DataPipeSender*>::Read( + MessageReader* aReader, RefPtr<mozilla::ipc::DataPipeSender>* aResult) { + return mozilla::ipc::data_pipe_detail::DataPipeRead(aReader, aResult); +} + +void IPC::ParamTraits<mozilla::ipc::DataPipeReceiver*>::Write( + MessageWriter* aWriter, mozilla::ipc::DataPipeReceiver* aParam) { + mozilla::ipc::data_pipe_detail::DataPipeWrite(aWriter, aParam); +} + +bool IPC::ParamTraits<mozilla::ipc::DataPipeReceiver*>::Read( + MessageReader* aReader, RefPtr<mozilla::ipc::DataPipeReceiver>* aResult) { + return mozilla::ipc::data_pipe_detail::DataPipeRead(aReader, aResult); +} |