diff options
Diffstat (limited to '')
-rw-r--r-- | xpcom/io/SlicedInputStream.cpp | 653 |
1 files changed, 653 insertions, 0 deletions
diff --git a/xpcom/io/SlicedInputStream.cpp b/xpcom/io/SlicedInputStream.cpp new file mode 100644 index 0000000000..86b78e112f --- /dev/null +++ b/xpcom/io/SlicedInputStream.cpp @@ -0,0 +1,653 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* vim: set ts=8 sts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "SlicedInputStream.h" +#include "mozilla/ipc/InputStreamUtils.h" +#include "mozilla/CheckedInt.h" +#include "mozilla/ScopeExit.h" +#include "nsISeekableStream.h" +#include "nsStreamUtils.h" + +namespace mozilla { + +using namespace ipc; + +NS_IMPL_ADDREF(SlicedInputStream); +NS_IMPL_RELEASE(SlicedInputStream); + +NS_INTERFACE_MAP_BEGIN(SlicedInputStream) + NS_INTERFACE_MAP_ENTRY(nsIInputStream) + NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsICloneableInputStream, + mWeakCloneableInputStream || !mInputStream) + NS_INTERFACE_MAP_ENTRY_CONDITIONAL( + nsIIPCSerializableInputStream, + mWeakIPCSerializableInputStream || !mInputStream) + NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsISeekableStream, + mWeakSeekableInputStream || !mInputStream) + NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsITellableStream, + mWeakTellableInputStream || !mInputStream) + NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStream, + mWeakAsyncInputStream || !mInputStream) + NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamCallback, + mWeakAsyncInputStream || !mInputStream) + NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamLength, + mWeakInputStreamLength || !mInputStream) + NS_INTERFACE_MAP_ENTRY_CONDITIONAL( + nsIAsyncInputStreamLength, mWeakAsyncInputStreamLength || !mInputStream) + NS_INTERFACE_MAP_ENTRY_CONDITIONAL( + nsIInputStreamLengthCallback, + mWeakAsyncInputStreamLength || !mInputStream) + NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIInputStream) +NS_INTERFACE_MAP_END + +SlicedInputStream::SlicedInputStream( + already_AddRefed<nsIInputStream> aInputStream, uint64_t aStart, + uint64_t aLength) + : mWeakCloneableInputStream(nullptr), + mWeakIPCSerializableInputStream(nullptr), + mWeakSeekableInputStream(nullptr), + mWeakTellableInputStream(nullptr), + mWeakAsyncInputStream(nullptr), + mWeakInputStreamLength(nullptr), + mWeakAsyncInputStreamLength(nullptr), + mStart(aStart), + mLength(aLength), + mCurPos(0), + mClosed(false), + mAsyncWaitFlags(0), + mAsyncWaitRequestedCount(0), + mMutex("SlicedInputStream::mMutex") { + nsCOMPtr<nsIInputStream> inputStream = std::move(aInputStream); + SetSourceStream(inputStream.forget()); +} + +SlicedInputStream::SlicedInputStream() + : mWeakCloneableInputStream(nullptr), + mWeakIPCSerializableInputStream(nullptr), + mWeakSeekableInputStream(nullptr), + mWeakTellableInputStream(nullptr), + mWeakAsyncInputStream(nullptr), + mWeakInputStreamLength(nullptr), + mWeakAsyncInputStreamLength(nullptr), + mStart(0), + mLength(0), + mCurPos(0), + mClosed(false), + mAsyncWaitFlags(0), + mAsyncWaitRequestedCount(0), + mMutex("SlicedInputStream::mMutex") {} + +SlicedInputStream::~SlicedInputStream() = default; + +void SlicedInputStream::SetSourceStream( + already_AddRefed<nsIInputStream> aInputStream) { + MOZ_ASSERT(!mInputStream); + + mInputStream = std::move(aInputStream); + + nsCOMPtr<nsICloneableInputStream> cloneableStream = + do_QueryInterface(mInputStream); + if (cloneableStream && SameCOMIdentity(mInputStream, cloneableStream)) { + mWeakCloneableInputStream = cloneableStream; + } + + nsCOMPtr<nsIIPCSerializableInputStream> serializableStream = + do_QueryInterface(mInputStream); + if (serializableStream && SameCOMIdentity(mInputStream, serializableStream)) { + mWeakIPCSerializableInputStream = serializableStream; + } + + nsCOMPtr<nsISeekableStream> seekableStream = do_QueryInterface(mInputStream); + if (seekableStream && SameCOMIdentity(mInputStream, seekableStream)) { + mWeakSeekableInputStream = seekableStream; + } + + nsCOMPtr<nsITellableStream> tellableStream = do_QueryInterface(mInputStream); + if (tellableStream && SameCOMIdentity(mInputStream, tellableStream)) { + mWeakTellableInputStream = tellableStream; + } + + nsCOMPtr<nsIAsyncInputStream> asyncInputStream = + do_QueryInterface(mInputStream); + if (asyncInputStream && SameCOMIdentity(mInputStream, asyncInputStream)) { + mWeakAsyncInputStream = asyncInputStream; + } + + nsCOMPtr<nsIInputStreamLength> streamLength = do_QueryInterface(mInputStream); + if (streamLength && SameCOMIdentity(mInputStream, streamLength)) { + mWeakInputStreamLength = streamLength; + } + + nsCOMPtr<nsIAsyncInputStreamLength> asyncStreamLength = + do_QueryInterface(mInputStream); + if (asyncStreamLength && SameCOMIdentity(mInputStream, asyncStreamLength)) { + mWeakAsyncInputStreamLength = asyncStreamLength; + } +} + +uint64_t SlicedInputStream::AdjustRange(uint64_t aRange) { + CheckedUint64 range(aRange); + range += mCurPos; + + // Let's remove extra length from the end. + if (range.isValid() && range.value() > mStart + mLength) { + aRange -= XPCOM_MIN((uint64_t)aRange, range.value() - (mStart + mLength)); + } + + // Let's remove extra length from the begin. + if (mCurPos < mStart) { + aRange -= XPCOM_MIN((uint64_t)aRange, mStart - mCurPos); + } + + return aRange; +} + +// nsIInputStream interface + +NS_IMETHODIMP +SlicedInputStream::Close() { + NS_ENSURE_STATE(mInputStream); + + mClosed = true; + return mInputStream->Close(); +} + +NS_IMETHODIMP +SlicedInputStream::Available(uint64_t* aLength) { + NS_ENSURE_STATE(mInputStream); + + if (mClosed) { + return NS_BASE_STREAM_CLOSED; + } + + nsresult rv = mInputStream->Available(aLength); + if (rv == NS_BASE_STREAM_CLOSED) { + mClosed = true; + return rv; + } + + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + *aLength = AdjustRange(*aLength); + return NS_OK; +} + +NS_IMETHODIMP +SlicedInputStream::Read(char* aBuffer, uint32_t aCount, uint32_t* aReadCount) { + *aReadCount = 0; + + if (mClosed) { + return NS_OK; + } + + if (mCurPos < mStart) { + nsCOMPtr<nsISeekableStream> seekableStream = + do_QueryInterface(mInputStream); + if (seekableStream) { + nsresult rv = + seekableStream->Seek(nsISeekableStream::NS_SEEK_SET, mStart); + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + mCurPos = mStart; + } else { + char buf[4096]; + while (mCurPos < mStart) { + uint32_t bytesRead; + uint64_t bufCount = XPCOM_MIN(mStart - mCurPos, (uint64_t)sizeof(buf)); + nsresult rv = mInputStream->Read(buf, bufCount, &bytesRead); + if (NS_SUCCEEDED(rv) && bytesRead == 0) { + mClosed = true; + return rv; + } + + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + mCurPos += bytesRead; + } + } + } + + // Let's reduce aCount in case it's too big. + if (mCurPos + aCount > mStart + mLength) { + aCount = mStart + mLength - mCurPos; + } + + // Nothing else to read. + if (!aCount) { + return NS_OK; + } + + nsresult rv = mInputStream->Read(aBuffer, aCount, aReadCount); + if (NS_SUCCEEDED(rv) && *aReadCount == 0) { + mClosed = true; + return rv; + } + + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + mCurPos += *aReadCount; + return NS_OK; +} + +NS_IMETHODIMP +SlicedInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, + uint32_t aCount, uint32_t* aResult) { + return NS_ERROR_NOT_IMPLEMENTED; +} + +NS_IMETHODIMP +SlicedInputStream::IsNonBlocking(bool* aNonBlocking) { + NS_ENSURE_STATE(mInputStream); + return mInputStream->IsNonBlocking(aNonBlocking); +} + +// nsICloneableInputStream interface + +NS_IMETHODIMP +SlicedInputStream::GetCloneable(bool* aCloneable) { + NS_ENSURE_STATE(mInputStream); + NS_ENSURE_STATE(mWeakCloneableInputStream); + + *aCloneable = true; + return NS_OK; +} + +NS_IMETHODIMP +SlicedInputStream::Clone(nsIInputStream** aResult) { + NS_ENSURE_STATE(mInputStream); + NS_ENSURE_STATE(mWeakCloneableInputStream); + + nsCOMPtr<nsIInputStream> clonedStream; + nsresult rv = mWeakCloneableInputStream->Clone(getter_AddRefs(clonedStream)); + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + nsCOMPtr<nsIInputStream> sis = + new SlicedInputStream(clonedStream.forget(), mStart, mLength); + + sis.forget(aResult); + return NS_OK; +} + +// nsIAsyncInputStream interface + +NS_IMETHODIMP +SlicedInputStream::CloseWithStatus(nsresult aStatus) { + NS_ENSURE_STATE(mInputStream); + NS_ENSURE_STATE(mWeakAsyncInputStream); + + mClosed = true; + return mWeakAsyncInputStream->CloseWithStatus(aStatus); +} + +NS_IMETHODIMP +SlicedInputStream::AsyncWait(nsIInputStreamCallback* aCallback, uint32_t aFlags, + uint32_t aRequestedCount, + nsIEventTarget* aEventTarget) { + NS_ENSURE_STATE(mInputStream); + NS_ENSURE_STATE(mWeakAsyncInputStream); + + nsCOMPtr<nsIInputStreamCallback> callback = aCallback ? this : nullptr; + + uint32_t flags = aFlags; + uint32_t requestedCount = aRequestedCount; + + { + MutexAutoLock lock(mMutex); + + if (NS_WARN_IF(mAsyncWaitCallback && aCallback && + mAsyncWaitCallback != aCallback)) { + return NS_ERROR_FAILURE; + } + + mAsyncWaitCallback = aCallback; + + // If we haven't started retrieving data, let's see if we can seek. + // If we cannot seek, we will do consecutive reads. + if (mCurPos < mStart && mWeakSeekableInputStream) { + nsresult rv = mWeakSeekableInputStream->Seek( + nsISeekableStream::NS_SEEK_SET, mStart); + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + mCurPos = mStart; + } + + mAsyncWaitFlags = aFlags; + mAsyncWaitRequestedCount = aRequestedCount; + mAsyncWaitEventTarget = aEventTarget; + + // If we are not at the right position, let's do an asyncWait just internal. + if (mCurPos < mStart) { + flags = 0; + requestedCount = mStart - mCurPos; + } + } + + return mWeakAsyncInputStream->AsyncWait(callback, flags, requestedCount, + aEventTarget); +} + +// nsIInputStreamCallback + +NS_IMETHODIMP +SlicedInputStream::OnInputStreamReady(nsIAsyncInputStream* aStream) { + MOZ_ASSERT(mInputStream); + MOZ_ASSERT(mWeakAsyncInputStream); + MOZ_ASSERT(mWeakAsyncInputStream == aStream); + + nsCOMPtr<nsIInputStreamCallback> callback; + uint32_t asyncWaitFlags = 0; + uint32_t asyncWaitRequestedCount = 0; + nsCOMPtr<nsIEventTarget> asyncWaitEventTarget; + + { + MutexAutoLock lock(mMutex); + + // We have been canceled in the meanwhile. + if (!mAsyncWaitCallback) { + return NS_OK; + } + + auto raii = MakeScopeExit([&] { + mMutex.AssertCurrentThreadOwns(); + mAsyncWaitCallback = nullptr; + mAsyncWaitEventTarget = nullptr; + }); + + asyncWaitFlags = mAsyncWaitFlags; + asyncWaitRequestedCount = mAsyncWaitRequestedCount; + asyncWaitEventTarget = mAsyncWaitEventTarget; + + // If at the end of this locked block, the callback is not null, it will be + // executed, otherwise, we are going to exec another AsyncWait(). + callback = mAsyncWaitCallback; + + if (mCurPos < mStart) { + char buf[4096]; + nsresult rv = NS_OK; + while (mCurPos < mStart) { + uint32_t bytesRead; + uint64_t bufCount = XPCOM_MIN(mStart - mCurPos, (uint64_t)sizeof(buf)); + rv = mInputStream->Read(buf, bufCount, &bytesRead); + if (NS_SUCCEEDED(rv) && bytesRead == 0) { + mClosed = true; + break; + } + + if (rv == NS_BASE_STREAM_WOULD_BLOCK) { + asyncWaitFlags = 0; + asyncWaitRequestedCount = mStart - mCurPos; + // Here we want to exec another AsyncWait(). + callback = nullptr; + break; + } + + if (NS_WARN_IF(NS_FAILED(rv))) { + break; + } + + mCurPos += bytesRead; + } + + // Now we are ready to do the 'real' asyncWait. + if (mCurPos >= mStart) { + // We don't want to nullify the callback now, because it will be needed + // at the next ::OnInputStreamReady. + raii.release(); + callback = nullptr; + } + } + } + + if (callback) { + return callback->OnInputStreamReady(this); + } + + return mWeakAsyncInputStream->AsyncWait( + this, asyncWaitFlags, asyncWaitRequestedCount, asyncWaitEventTarget); +} + +// nsIIPCSerializableInputStream + +void SlicedInputStream::SerializedComplexity(uint32_t aMaxSize, + uint32_t* aSizeUsed, + uint32_t* aPipes, + uint32_t* aTransferables) { + InputStreamHelper::SerializedComplexity(mInputStream, aMaxSize, aSizeUsed, + aPipes, aTransferables); + + // If we're going to be serializing a pipe to transfer the sliced data, and we + // are getting no efficiency improvements from transferables, stream this + // sliced input stream directly as a pipe to avoid streaming data which will + // be sliced off anyway. + if (*aPipes > 0 && *aTransferables == 0) { + *aSizeUsed = 0; + *aPipes = 1; + *aTransferables = 0; + } +} + +void SlicedInputStream::Serialize(mozilla::ipc::InputStreamParams& aParams, + uint32_t aMaxSize, uint32_t* aSizeUsed) { + MOZ_ASSERT(mInputStream); + MOZ_ASSERT(mWeakIPCSerializableInputStream); + + uint32_t sizeUsed = 0, pipes = 0, transferables = 0; + SerializedComplexity(aMaxSize, &sizeUsed, &pipes, &transferables); + if (pipes > 0 && transferables == 0) { + InputStreamHelper::SerializeInputStreamAsPipe(mInputStream, aParams); + return; + } + + SlicedInputStreamParams params; + InputStreamHelper::SerializeInputStream(mInputStream, params.stream(), + aMaxSize, aSizeUsed); + params.start() = mStart; + params.length() = mLength; + params.curPos() = mCurPos; + params.closed() = mClosed; + + aParams = params; +} + +bool SlicedInputStream::Deserialize( + const mozilla::ipc::InputStreamParams& aParams) { + MOZ_ASSERT(!mInputStream); + MOZ_ASSERT(!mWeakIPCSerializableInputStream); + + if (aParams.type() != InputStreamParams::TSlicedInputStreamParams) { + NS_ERROR("Received unknown parameters from the other process!"); + return false; + } + + const SlicedInputStreamParams& params = aParams.get_SlicedInputStreamParams(); + + nsCOMPtr<nsIInputStream> stream = + InputStreamHelper::DeserializeInputStream(params.stream()); + if (!stream) { + NS_WARNING("Deserialize failed!"); + return false; + } + + SetSourceStream(stream.forget()); + + mStart = params.start(); + mLength = params.length(); + mCurPos = params.curPos(); + mClosed = params.closed(); + + return true; +} + +// nsISeekableStream + +NS_IMETHODIMP +SlicedInputStream::Seek(int32_t aWhence, int64_t aOffset) { + NS_ENSURE_STATE(mInputStream); + NS_ENSURE_STATE(mWeakSeekableInputStream); + + int64_t offset; + nsresult rv; + + switch (aWhence) { + case NS_SEEK_SET: + offset = mStart + aOffset; + break; + case NS_SEEK_CUR: + // mCurPos could be lower than mStart if the reading has not started yet. + offset = XPCOM_MAX(mStart, mCurPos) + aOffset; + break; + case NS_SEEK_END: { + uint64_t available; + rv = mInputStream->Available(&available); + if (rv == NS_BASE_STREAM_CLOSED) { + mClosed = true; + return rv; + } + + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + offset = XPCOM_MIN(mStart + mLength, available) + aOffset; + break; + } + default: + return NS_ERROR_ILLEGAL_VALUE; + } + + if (offset < (int64_t)mStart || offset > (int64_t)(mStart + mLength)) { + return NS_ERROR_INVALID_ARG; + } + + rv = mWeakSeekableInputStream->Seek(NS_SEEK_SET, offset); + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + mCurPos = offset; + return NS_OK; +} + +NS_IMETHODIMP +SlicedInputStream::SetEOF() { + NS_ENSURE_STATE(mInputStream); + NS_ENSURE_STATE(mWeakSeekableInputStream); + + mClosed = true; + return mWeakSeekableInputStream->SetEOF(); +} + +// nsITellableStream + +NS_IMETHODIMP +SlicedInputStream::Tell(int64_t* aResult) { + NS_ENSURE_STATE(mInputStream); + NS_ENSURE_STATE(mWeakTellableInputStream); + + int64_t tell = 0; + + nsresult rv = mWeakTellableInputStream->Tell(&tell); + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + if (tell < (int64_t)mStart) { + *aResult = 0; + return NS_OK; + } + + *aResult = tell - mStart; + if (*aResult > (int64_t)mLength) { + *aResult = mLength; + } + + return NS_OK; +} + +// nsIInputStreamLength + +NS_IMETHODIMP +SlicedInputStream::Length(int64_t* aLength) { + NS_ENSURE_STATE(mInputStream); + NS_ENSURE_STATE(mWeakInputStreamLength); + + nsresult rv = mWeakInputStreamLength->Length(aLength); + if (rv == NS_BASE_STREAM_CLOSED) { + mClosed = true; + return rv; + } + + if (NS_WARN_IF(NS_FAILED(rv))) { + return rv; + } + + if (*aLength == -1) { + return NS_OK; + } + + *aLength = (int64_t)AdjustRange((uint64_t)*aLength); + return NS_OK; +} + +// nsIAsyncInputStreamLength + +NS_IMETHODIMP +SlicedInputStream::AsyncLengthWait(nsIInputStreamLengthCallback* aCallback, + nsIEventTarget* aEventTarget) { + NS_ENSURE_STATE(mInputStream); + NS_ENSURE_STATE(mWeakAsyncInputStreamLength); + + nsCOMPtr<nsIInputStreamLengthCallback> callback = aCallback ? this : nullptr; + { + MutexAutoLock lock(mMutex); + mAsyncWaitLengthCallback = aCallback; + } + + return mWeakAsyncInputStreamLength->AsyncLengthWait(callback, aEventTarget); +} + +// nsIInputStreamLengthCallback + +NS_IMETHODIMP +SlicedInputStream::OnInputStreamLengthReady(nsIAsyncInputStreamLength* aStream, + int64_t aLength) { + MOZ_ASSERT(mInputStream); + MOZ_ASSERT(mWeakAsyncInputStreamLength); + MOZ_ASSERT(mWeakAsyncInputStreamLength == aStream); + + nsCOMPtr<nsIInputStreamLengthCallback> callback; + { + MutexAutoLock lock(mMutex); + + // We have been canceled in the meanwhile. + if (!mAsyncWaitLengthCallback) { + return NS_OK; + } + + callback.swap(mAsyncWaitLengthCallback); + } + + if (aLength != -1) { + aLength = (int64_t)AdjustRange((uint64_t)aLength); + } + + MOZ_ASSERT(callback); + return callback->OnInputStreamLengthReady(this, aLength); +} + +} // namespace mozilla |