summaryrefslogtreecommitdiffstats
path: root/xpcom/io/NonBlockingAsyncInputStream.cpp
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--xpcom/io/NonBlockingAsyncInputStream.cpp388
1 files changed, 388 insertions, 0 deletions
diff --git a/xpcom/io/NonBlockingAsyncInputStream.cpp b/xpcom/io/NonBlockingAsyncInputStream.cpp
new file mode 100644
index 0000000000..00e8598d86
--- /dev/null
+++ b/xpcom/io/NonBlockingAsyncInputStream.cpp
@@ -0,0 +1,388 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#include "NonBlockingAsyncInputStream.h"
+#include "mozilla/ipc/InputStreamUtils.h"
+#include "nsIAsyncInputStream.h"
+#include "nsICloneableInputStream.h"
+#include "nsIInputStream.h"
+#include "nsIIPCSerializableInputStream.h"
+#include "nsISeekableStream.h"
+#include "nsStreamUtils.h"
+
+namespace mozilla {
+
+using namespace ipc;
+
+class NonBlockingAsyncInputStream::AsyncWaitRunnable final
+ : public CancelableRunnable {
+ RefPtr<NonBlockingAsyncInputStream> mStream;
+ nsCOMPtr<nsIInputStreamCallback> mCallback;
+
+ public:
+ AsyncWaitRunnable(NonBlockingAsyncInputStream* aStream,
+ nsIInputStreamCallback* aCallback)
+ : CancelableRunnable("AsyncWaitRunnable"),
+ mStream(aStream),
+ mCallback(aCallback) {}
+
+ NS_IMETHOD
+ Run() override {
+ mStream->RunAsyncWaitCallback(this, mCallback.forget());
+ return NS_OK;
+ }
+
+ nsresult Cancel() override {
+ mStream = nullptr;
+ return NS_OK;
+ }
+};
+
+NS_IMPL_ADDREF(NonBlockingAsyncInputStream);
+NS_IMPL_RELEASE(NonBlockingAsyncInputStream);
+
+NonBlockingAsyncInputStream::WaitClosureOnly::WaitClosureOnly(
+ AsyncWaitRunnable* aRunnable, nsIEventTarget* aEventTarget)
+ : mRunnable(aRunnable), mEventTarget(aEventTarget) {}
+
+NS_INTERFACE_MAP_BEGIN(NonBlockingAsyncInputStream)
+ NS_INTERFACE_MAP_ENTRY(nsIInputStream)
+ NS_INTERFACE_MAP_ENTRY(nsIAsyncInputStream)
+ NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsICloneableInputStream,
+ mWeakCloneableInputStream)
+ NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIIPCSerializableInputStream,
+ mWeakIPCSerializableInputStream)
+ NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsISeekableStream,
+ mWeakSeekableInputStream)
+ NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsITellableStream,
+ mWeakTellableInputStream)
+ NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIInputStream)
+NS_INTERFACE_MAP_END
+
+/* static */
+nsresult NonBlockingAsyncInputStream::Create(
+ already_AddRefed<nsIInputStream> aInputStream,
+ nsIAsyncInputStream** aResult) {
+ MOZ_DIAGNOSTIC_ASSERT(aResult);
+
+ nsCOMPtr<nsIInputStream> inputStream = std::move(aInputStream);
+
+ bool nonBlocking = false;
+ nsresult rv = inputStream->IsNonBlocking(&nonBlocking);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ MOZ_DIAGNOSTIC_ASSERT(nonBlocking);
+
+#ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
+ nsCOMPtr<nsIAsyncInputStream> asyncInputStream =
+ do_QueryInterface(inputStream);
+ MOZ_DIAGNOSTIC_ASSERT(!asyncInputStream);
+#endif // MOZ_DIAGNOSTIC_ASSERT_ENABLED
+
+ RefPtr<NonBlockingAsyncInputStream> stream =
+ new NonBlockingAsyncInputStream(inputStream.forget());
+
+ stream.forget(aResult);
+ return NS_OK;
+}
+
+NonBlockingAsyncInputStream::NonBlockingAsyncInputStream(
+ already_AddRefed<nsIInputStream> aInputStream)
+ : mInputStream(std::move(aInputStream)),
+ mWeakCloneableInputStream(nullptr),
+ mWeakIPCSerializableInputStream(nullptr),
+ mWeakSeekableInputStream(nullptr),
+ mWeakTellableInputStream(nullptr),
+ mLock("NonBlockingAsyncInputStream::mLock"),
+ mClosed(false) {
+ MOZ_ASSERT(mInputStream);
+
+ nsCOMPtr<nsICloneableInputStream> cloneableStream =
+ do_QueryInterface(mInputStream);
+ if (cloneableStream && SameCOMIdentity(mInputStream, cloneableStream)) {
+ mWeakCloneableInputStream = cloneableStream;
+ }
+
+ nsCOMPtr<nsIIPCSerializableInputStream> serializableStream =
+ do_QueryInterface(mInputStream);
+ if (serializableStream && SameCOMIdentity(mInputStream, serializableStream)) {
+ mWeakIPCSerializableInputStream = serializableStream;
+ }
+
+ nsCOMPtr<nsISeekableStream> seekableStream = do_QueryInterface(mInputStream);
+ if (seekableStream && SameCOMIdentity(mInputStream, seekableStream)) {
+ mWeakSeekableInputStream = seekableStream;
+ }
+
+ nsCOMPtr<nsITellableStream> tellableStream = do_QueryInterface(mInputStream);
+ if (tellableStream && SameCOMIdentity(mInputStream, tellableStream)) {
+ mWeakTellableInputStream = tellableStream;
+ }
+}
+
+NonBlockingAsyncInputStream::~NonBlockingAsyncInputStream() = default;
+
+NS_IMETHODIMP
+NonBlockingAsyncInputStream::Close() {
+ RefPtr<AsyncWaitRunnable> waitClosureOnlyRunnable;
+ nsCOMPtr<nsIEventTarget> waitClosureOnlyEventTarget;
+
+ {
+ MutexAutoLock lock(mLock);
+
+ if (mClosed) {
+ // Here we could return NS_BASE_STREAM_CLOSED as well, but just to avoid
+ // warning messages, let's make everybody happy with a NS_OK.
+ return NS_OK;
+ }
+
+ mClosed = true;
+
+ NS_ENSURE_STATE(mInputStream);
+ nsresult rv = mInputStream->Close();
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ mWaitClosureOnly.reset();
+ return rv;
+ }
+
+ // If we have a WaitClosureOnly runnable, it's time to use it.
+ if (mWaitClosureOnly.isSome()) {
+ waitClosureOnlyRunnable = std::move(mWaitClosureOnly->mRunnable);
+ waitClosureOnlyEventTarget = std::move(mWaitClosureOnly->mEventTarget);
+
+ mWaitClosureOnly.reset();
+
+ // Now we want to dispatch the asyncWaitCallback.
+ mAsyncWaitCallback = waitClosureOnlyRunnable;
+ }
+ }
+
+ if (waitClosureOnlyRunnable) {
+ if (waitClosureOnlyEventTarget) {
+ waitClosureOnlyEventTarget->Dispatch(waitClosureOnlyRunnable,
+ NS_DISPATCH_NORMAL);
+ } else {
+ waitClosureOnlyRunnable->Run();
+ }
+ }
+
+ return NS_OK;
+}
+
+// nsIInputStream interface
+
+NS_IMETHODIMP
+NonBlockingAsyncInputStream::Available(uint64_t* aLength) {
+ nsresult rv = mInputStream->Available(aLength);
+ // Don't issue warnings for legal condition NS_BASE_STREAM_CLOSED.
+ if (rv == NS_BASE_STREAM_CLOSED || NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ // Nothing more to read. Let's close the stream now.
+ if (*aLength == 0) {
+ MutexAutoLock lock(mLock);
+ mInputStream->Close();
+ mClosed = true;
+ return NS_BASE_STREAM_CLOSED;
+ }
+
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+NonBlockingAsyncInputStream::StreamStatus() {
+ return mInputStream->StreamStatus();
+}
+
+NS_IMETHODIMP
+NonBlockingAsyncInputStream::Read(char* aBuffer, uint32_t aCount,
+ uint32_t* aReadCount) {
+ return mInputStream->Read(aBuffer, aCount, aReadCount);
+}
+
+namespace {
+
+class MOZ_RAII ReadSegmentsData {
+ public:
+ ReadSegmentsData(NonBlockingAsyncInputStream* aStream,
+ nsWriteSegmentFun aFunc, void* aClosure)
+ : mStream(aStream), mFunc(aFunc), mClosure(aClosure) {}
+
+ NonBlockingAsyncInputStream* mStream;
+ nsWriteSegmentFun mFunc;
+ void* mClosure;
+};
+
+nsresult ReadSegmentsWriter(nsIInputStream* aInStream, void* aClosure,
+ const char* aFromSegment, uint32_t aToOffset,
+ uint32_t aCount, uint32_t* aWriteCount) {
+ ReadSegmentsData* data = static_cast<ReadSegmentsData*>(aClosure);
+ return data->mFunc(data->mStream, data->mClosure, aFromSegment, aToOffset,
+ aCount, aWriteCount);
+}
+
+} // namespace
+
+NS_IMETHODIMP
+NonBlockingAsyncInputStream::ReadSegments(nsWriteSegmentFun aWriter,
+ void* aClosure, uint32_t aCount,
+ uint32_t* aResult) {
+ ReadSegmentsData data(this, aWriter, aClosure);
+ return mInputStream->ReadSegments(ReadSegmentsWriter, &data, aCount, aResult);
+}
+
+NS_IMETHODIMP
+NonBlockingAsyncInputStream::IsNonBlocking(bool* aNonBlocking) {
+ *aNonBlocking = true;
+ return NS_OK;
+}
+
+// nsICloneableInputStream interface
+
+NS_IMETHODIMP
+NonBlockingAsyncInputStream::GetCloneable(bool* aCloneable) {
+ NS_ENSURE_STATE(mWeakCloneableInputStream);
+ return mWeakCloneableInputStream->GetCloneable(aCloneable);
+}
+
+NS_IMETHODIMP
+NonBlockingAsyncInputStream::Clone(nsIInputStream** aResult) {
+ NS_ENSURE_STATE(mWeakCloneableInputStream);
+
+ nsCOMPtr<nsIInputStream> clonedStream;
+ nsresult rv = mWeakCloneableInputStream->Clone(getter_AddRefs(clonedStream));
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ nsCOMPtr<nsIAsyncInputStream> asyncStream;
+ rv = Create(clonedStream.forget(), getter_AddRefs(asyncStream));
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ asyncStream.forget(aResult);
+ return NS_OK;
+}
+
+// nsIAsyncInputStream interface
+
+NS_IMETHODIMP
+NonBlockingAsyncInputStream::CloseWithStatus(nsresult aStatus) {
+ return Close();
+}
+
+NS_IMETHODIMP
+NonBlockingAsyncInputStream::AsyncWait(nsIInputStreamCallback* aCallback,
+ uint32_t aFlags,
+ uint32_t aRequestedCount,
+ nsIEventTarget* aEventTarget) {
+ RefPtr<AsyncWaitRunnable> runnable;
+ {
+ MutexAutoLock lock(mLock);
+
+ mWaitClosureOnly.reset();
+ mAsyncWaitCallback = nullptr;
+
+ if (!aCallback) {
+ // Canceling previous callbacks, which is done above.
+ return NS_OK;
+ }
+
+ // Maybe the stream is already closed.
+ if (!mClosed) {
+ uint64_t length;
+ nsresult rv = mInputStream->Available(&length);
+ if (NS_SUCCEEDED(rv) && length == 0) {
+ mInputStream->Close();
+ mClosed = true;
+ }
+ }
+
+ runnable = new AsyncWaitRunnable(this, aCallback);
+ if ((aFlags & nsIAsyncInputStream::WAIT_CLOSURE_ONLY) && !mClosed) {
+ mWaitClosureOnly.emplace(runnable, aEventTarget);
+ return NS_OK;
+ }
+
+ mAsyncWaitCallback = runnable;
+ }
+
+ MOZ_ASSERT(runnable);
+
+ if (aEventTarget) {
+ return aEventTarget->Dispatch(runnable.forget());
+ }
+
+ return runnable->Run();
+}
+
+// nsIIPCSerializableInputStream
+
+void NonBlockingAsyncInputStream::SerializedComplexity(
+ uint32_t aMaxSize, uint32_t* aSizeUsed, uint32_t* aPipes,
+ uint32_t* aTransferables) {
+ InputStreamHelper::SerializedComplexity(mInputStream, aMaxSize, aSizeUsed,
+ aPipes, aTransferables);
+}
+
+void NonBlockingAsyncInputStream::Serialize(
+ mozilla::ipc::InputStreamParams& aParams, uint32_t aMaxSize,
+ uint32_t* aSizeUsed) {
+ MOZ_ASSERT(mWeakIPCSerializableInputStream);
+ InputStreamHelper::SerializeInputStream(mInputStream, aParams, aMaxSize,
+ aSizeUsed);
+}
+
+bool NonBlockingAsyncInputStream::Deserialize(
+ const mozilla::ipc::InputStreamParams& aParams) {
+ MOZ_CRASH("NonBlockingAsyncInputStream cannot be deserialized!");
+ return true;
+}
+
+// nsISeekableStream
+
+NS_IMETHODIMP
+NonBlockingAsyncInputStream::Seek(int32_t aWhence, int64_t aOffset) {
+ NS_ENSURE_STATE(mWeakSeekableInputStream);
+ return mWeakSeekableInputStream->Seek(aWhence, aOffset);
+}
+
+NS_IMETHODIMP
+NonBlockingAsyncInputStream::SetEOF() {
+ NS_ENSURE_STATE(mWeakSeekableInputStream);
+ return NS_ERROR_NOT_IMPLEMENTED;
+}
+
+// nsITellableStream
+
+NS_IMETHODIMP
+NonBlockingAsyncInputStream::Tell(int64_t* aResult) {
+ NS_ENSURE_STATE(mWeakTellableInputStream);
+ return mWeakTellableInputStream->Tell(aResult);
+}
+
+void NonBlockingAsyncInputStream::RunAsyncWaitCallback(
+ NonBlockingAsyncInputStream::AsyncWaitRunnable* aRunnable,
+ already_AddRefed<nsIInputStreamCallback> aCallback) {
+ nsCOMPtr<nsIInputStreamCallback> callback = std::move(aCallback);
+
+ {
+ MutexAutoLock lock(mLock);
+ if (mAsyncWaitCallback != aRunnable) {
+ // The callback has been canceled in the meantime.
+ return;
+ }
+
+ mAsyncWaitCallback = nullptr;
+ }
+
+ callback->OnInputStreamReady(this);
+}
+
+} // namespace mozilla