summaryrefslogtreecommitdiffstats
path: root/xpcom/io/SnappyUncompressInputStream.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'xpcom/io/SnappyUncompressInputStream.cpp')
-rw-r--r--xpcom/io/SnappyUncompressInputStream.cpp386
1 files changed, 386 insertions, 0 deletions
diff --git a/xpcom/io/SnappyUncompressInputStream.cpp b/xpcom/io/SnappyUncompressInputStream.cpp
new file mode 100644
index 0000000000..2872c8c7a2
--- /dev/null
+++ b/xpcom/io/SnappyUncompressInputStream.cpp
@@ -0,0 +1,386 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: set ts=8 sts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#include "mozilla/SnappyUncompressInputStream.h"
+
+#include <algorithm>
+#include "nsIAsyncInputStream.h"
+#include "nsStreamUtils.h"
+#include "snappy/snappy.h"
+
+namespace mozilla {
+
+NS_IMPL_ISUPPORTS(SnappyUncompressInputStream, nsIInputStream);
+
+// Putting kCompressedBufferLength inside a function avoids a static
+// constructor.
+static size_t CompressedBufferLength() {
+ static size_t kCompressedBufferLength =
+ detail::SnappyFrameUtils::MaxCompressedBufferLength(snappy::kBlockSize);
+
+ MOZ_ASSERT(kCompressedBufferLength > 0);
+ return kCompressedBufferLength;
+}
+
+SnappyUncompressInputStream::SnappyUncompressInputStream(
+ nsIInputStream* aBaseStream)
+ : mBaseStream(aBaseStream),
+ mUncompressedBytes(0),
+ mNextByte(0),
+ mNextChunkType(Unknown),
+ mNextChunkDataLength(0),
+ mNeedFirstStreamIdentifier(true) {
+ // This implementation only supports sync base streams. Verify this in debug
+ // builds. Note, this is a bit complicated because the streams we support
+ // advertise different capabilities:
+ // - nsFileInputStream - blocking and sync
+ // - nsStringInputStream - non-blocking and sync
+ // - nsPipeInputStream - can be blocking, but provides async interface
+#ifdef DEBUG
+ bool baseNonBlocking;
+ nsresult rv = mBaseStream->IsNonBlocking(&baseNonBlocking);
+ MOZ_ASSERT(NS_SUCCEEDED(rv));
+ if (baseNonBlocking) {
+ nsCOMPtr<nsIAsyncInputStream> async = do_QueryInterface(mBaseStream);
+ MOZ_ASSERT(!async);
+ }
+#endif
+}
+
+NS_IMETHODIMP
+SnappyUncompressInputStream::Close() {
+ if (!mBaseStream) {
+ return NS_OK;
+ }
+
+ mBaseStream->Close();
+ mBaseStream = nullptr;
+
+ mUncompressedBuffer = nullptr;
+ mCompressedBuffer = nullptr;
+
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+SnappyUncompressInputStream::Available(uint64_t* aLengthOut) {
+ if (!mBaseStream) {
+ return NS_BASE_STREAM_CLOSED;
+ }
+
+ // If we have uncompressed bytes, then we are done.
+ *aLengthOut = UncompressedLength();
+ if (*aLengthOut > 0) {
+ return NS_OK;
+ }
+
+ // Otherwise, attempt to uncompress bytes until we get something or the
+ // underlying stream is drained. We loop here because some chunks can
+ // be StreamIdentifiers, padding, etc with no data.
+ uint32_t bytesRead;
+ do {
+ nsresult rv = ParseNextChunk(&bytesRead);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+ *aLengthOut = UncompressedLength();
+ } while (*aLengthOut == 0 && bytesRead);
+
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+SnappyUncompressInputStream::StreamStatus() {
+ if (!mBaseStream) {
+ return NS_BASE_STREAM_CLOSED;
+ }
+
+ // If we have uncompressed bytes, then we're still open.
+ if (UncompressedLength() > 0) {
+ return NS_OK;
+ }
+
+ // Otherwise we'll need to read from the underlying stream, so check it
+ return mBaseStream->StreamStatus();
+}
+
+NS_IMETHODIMP
+SnappyUncompressInputStream::Read(char* aBuf, uint32_t aCount,
+ uint32_t* aBytesReadOut) {
+ return ReadSegments(NS_CopySegmentToBuffer, aBuf, aCount, aBytesReadOut);
+}
+
+NS_IMETHODIMP
+SnappyUncompressInputStream::ReadSegments(nsWriteSegmentFun aWriter,
+ void* aClosure, uint32_t aCount,
+ uint32_t* aBytesReadOut) {
+ *aBytesReadOut = 0;
+
+ if (!mBaseStream) {
+ return NS_BASE_STREAM_CLOSED;
+ }
+
+ nsresult rv;
+
+ // Do not try to use the base stream's ReadSegements here. Its very
+ // unlikely we will get a single buffer that contains all of the compressed
+ // data and therefore would have to copy into our own buffer anyways.
+ // Instead, focus on making efficient use of the Read() interface.
+
+ while (aCount > 0) {
+ // We have some decompressed data in our buffer. Provide it to the
+ // callers writer function.
+ if (mUncompressedBytes > 0) {
+ MOZ_ASSERT(mUncompressedBuffer);
+ uint32_t remaining = UncompressedLength();
+ uint32_t numToWrite = std::min(aCount, remaining);
+ uint32_t numWritten;
+ rv = aWriter(this, aClosure, &mUncompressedBuffer[mNextByte],
+ *aBytesReadOut, numToWrite, &numWritten);
+
+ // As defined in nsIInputputStream.idl, do not pass writer func errors.
+ if (NS_FAILED(rv)) {
+ return NS_OK;
+ }
+
+ // End-of-file
+ if (numWritten == 0) {
+ return NS_OK;
+ }
+
+ *aBytesReadOut += numWritten;
+ mNextByte += numWritten;
+ MOZ_ASSERT(mNextByte <= mUncompressedBytes);
+
+ if (mNextByte == mUncompressedBytes) {
+ mNextByte = 0;
+ mUncompressedBytes = 0;
+ }
+
+ aCount -= numWritten;
+
+ continue;
+ }
+
+ // Otherwise uncompress the next chunk and loop. Any resulting data
+ // will set mUncompressedBytes which we check at the top of the loop.
+ uint32_t bytesRead;
+ rv = ParseNextChunk(&bytesRead);
+ if (NS_FAILED(rv)) {
+ return rv;
+ }
+
+ // If we couldn't read anything and there is no more data to provide
+ // to the caller, then this is eof.
+ if (bytesRead == 0 && mUncompressedBytes == 0) {
+ return NS_OK;
+ }
+ }
+
+ return NS_OK;
+}
+
+NS_IMETHODIMP
+SnappyUncompressInputStream::IsNonBlocking(bool* aNonBlockingOut) {
+ *aNonBlockingOut = false;
+ return NS_OK;
+}
+
+SnappyUncompressInputStream::~SnappyUncompressInputStream() { Close(); }
+
+nsresult SnappyUncompressInputStream::ParseNextChunk(uint32_t* aBytesReadOut) {
+ // There must not be any uncompressed data already in mUncompressedBuffer.
+ MOZ_ASSERT(mUncompressedBytes == 0);
+ MOZ_ASSERT(mNextByte == 0);
+
+ nsresult rv;
+ *aBytesReadOut = 0;
+
+ // Lazily create our two buffers so we can report OOM during stream
+ // operation. These allocations only happens once. The buffers are reused
+ // until the stream is closed.
+ if (!mUncompressedBuffer) {
+ mUncompressedBuffer.reset(new (fallible) char[snappy::kBlockSize]);
+ if (NS_WARN_IF(!mUncompressedBuffer)) {
+ return NS_ERROR_OUT_OF_MEMORY;
+ }
+ }
+
+ if (!mCompressedBuffer) {
+ mCompressedBuffer.reset(new (fallible) char[CompressedBufferLength()]);
+ if (NS_WARN_IF(!mCompressedBuffer)) {
+ return NS_ERROR_OUT_OF_MEMORY;
+ }
+ }
+
+ // We have no decompressed data and we also have not seen the start of stream
+ // yet. Read and validate the StreamIdentifier chunk. Also read the next
+ // header to determine the size of the first real data chunk.
+ if (mNeedFirstStreamIdentifier) {
+ const uint32_t firstReadLength =
+ kHeaderLength + kStreamIdentifierDataLength + kHeaderLength;
+ MOZ_ASSERT(firstReadLength <= CompressedBufferLength());
+
+ rv = ReadAll(mCompressedBuffer.get(), firstReadLength, firstReadLength,
+ aBytesReadOut);
+ if (NS_WARN_IF(NS_FAILED(rv)) || *aBytesReadOut == 0) {
+ return rv;
+ }
+
+ rv = ParseHeader(mCompressedBuffer.get(), kHeaderLength, &mNextChunkType,
+ &mNextChunkDataLength);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+ if (NS_WARN_IF(mNextChunkType != StreamIdentifier ||
+ mNextChunkDataLength != kStreamIdentifierDataLength)) {
+ return NS_ERROR_CORRUPTED_CONTENT;
+ }
+ size_t offset = kHeaderLength;
+
+ mNeedFirstStreamIdentifier = false;
+
+ size_t numRead;
+ size_t numWritten;
+ rv = ParseData(mUncompressedBuffer.get(), snappy::kBlockSize,
+ mNextChunkType, &mCompressedBuffer[offset],
+ mNextChunkDataLength, &numWritten, &numRead);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+ MOZ_ASSERT(numWritten == 0);
+ MOZ_ASSERT(numRead == mNextChunkDataLength);
+ offset += numRead;
+
+ rv = ParseHeader(&mCompressedBuffer[offset], *aBytesReadOut - offset,
+ &mNextChunkType, &mNextChunkDataLength);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ return NS_OK;
+ }
+
+ // We have no compressed data and we don't know how big the next chunk is.
+ // This happens when we get an EOF pause in the middle of a stream and also
+ // at the end of the stream. Simply read the next header and return. The
+ // chunk body will be read on the next entry into this method.
+ if (mNextChunkType == Unknown) {
+ rv = ReadAll(mCompressedBuffer.get(), kHeaderLength, kHeaderLength,
+ aBytesReadOut);
+ if (NS_WARN_IF(NS_FAILED(rv)) || *aBytesReadOut == 0) {
+ return rv;
+ }
+
+ rv = ParseHeader(mCompressedBuffer.get(), kHeaderLength, &mNextChunkType,
+ &mNextChunkDataLength);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ return NS_OK;
+ }
+
+ // We have no decompressed data, but we do know the size of the next chunk.
+ // Read at least that much from the base stream.
+ uint32_t readLength = mNextChunkDataLength;
+ MOZ_ASSERT(readLength <= CompressedBufferLength());
+
+ // However, if there is enough data in the base stream, also read the next
+ // chunk header. This helps optimize the stream by avoiding many small reads.
+ uint64_t avail;
+ rv = mBaseStream->Available(&avail);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+ if (avail >= (readLength + kHeaderLength)) {
+ readLength += kHeaderLength;
+ MOZ_ASSERT(readLength <= CompressedBufferLength());
+ }
+
+ rv = ReadAll(mCompressedBuffer.get(), readLength, mNextChunkDataLength,
+ aBytesReadOut);
+ if (NS_WARN_IF(NS_FAILED(rv)) || *aBytesReadOut == 0) {
+ return rv;
+ }
+
+ size_t numRead;
+ size_t numWritten;
+ rv = ParseData(mUncompressedBuffer.get(), snappy::kBlockSize, mNextChunkType,
+ mCompressedBuffer.get(), mNextChunkDataLength, &numWritten,
+ &numRead);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+ MOZ_ASSERT(numRead == mNextChunkDataLength);
+
+ mUncompressedBytes = numWritten;
+
+ // If we were unable to directly read the next chunk header, then clear
+ // our internal state. We will have to perform a small read to get the
+ // header the next time we enter this method.
+ if (*aBytesReadOut <= mNextChunkDataLength) {
+ mNextChunkType = Unknown;
+ mNextChunkDataLength = 0;
+ return NS_OK;
+ }
+
+ // We got the next chunk header. Parse it so that we are ready to for the
+ // next call into this method.
+ rv = ParseHeader(&mCompressedBuffer[numRead], *aBytesReadOut - numRead,
+ &mNextChunkType, &mNextChunkDataLength);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ return NS_OK;
+}
+
+nsresult SnappyUncompressInputStream::ReadAll(char* aBuf, uint32_t aCount,
+ uint32_t aMinValidCount,
+ uint32_t* aBytesReadOut) {
+ MOZ_ASSERT(aCount >= aMinValidCount);
+
+ *aBytesReadOut = 0;
+
+ if (!mBaseStream) {
+ return NS_BASE_STREAM_CLOSED;
+ }
+
+ uint32_t offset = 0;
+ while (aCount > 0) {
+ uint32_t bytesRead = 0;
+ nsresult rv = mBaseStream->Read(aBuf + offset, aCount, &bytesRead);
+ if (NS_WARN_IF(NS_FAILED(rv))) {
+ return rv;
+ }
+
+ // EOF, but don't immediately return. We need to validate min read bytes
+ // below.
+ if (bytesRead == 0) {
+ break;
+ }
+
+ *aBytesReadOut += bytesRead;
+ offset += bytesRead;
+ aCount -= bytesRead;
+ }
+
+ // Reading zero bytes is not an error. Its the expected EOF condition.
+ // Only compare to the minimum valid count if we read at least one byte.
+ if (*aBytesReadOut != 0 && *aBytesReadOut < aMinValidCount) {
+ return NS_ERROR_CORRUPTED_CONTENT;
+ }
+
+ return NS_OK;
+}
+
+size_t SnappyUncompressInputStream::UncompressedLength() const {
+ MOZ_ASSERT(mNextByte <= mUncompressedBytes);
+ return mUncompressedBytes - mNextByte;
+}
+
+} // namespace mozilla