summaryrefslogtreecommitdiffstats
path: root/package/source/zipapi/XBufferedThreadedStream.cxx
diff options
context:
space:
mode:
Diffstat (limited to 'package/source/zipapi/XBufferedThreadedStream.cxx')
-rw-r--r--package/source/zipapi/XBufferedThreadedStream.cxx189
1 files changed, 189 insertions, 0 deletions
diff --git a/package/source/zipapi/XBufferedThreadedStream.cxx b/package/source/zipapi/XBufferedThreadedStream.cxx
new file mode 100644
index 0000000000..d3bf995d90
--- /dev/null
+++ b/package/source/zipapi/XBufferedThreadedStream.cxx
@@ -0,0 +1,189 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+#include "XBufferedThreadedStream.hxx"
+
+using namespace css::uno;
+
+namespace {
+
+class UnzippingThread: public salhelper::Thread
+{
+ XBufferedThreadedStream &mxStream;
+public:
+ explicit UnzippingThread(XBufferedThreadedStream &xStream): Thread("Unzipping"), mxStream(xStream) {}
+private:
+ virtual void execute() override
+ {
+ try
+ {
+ mxStream.produce();
+ }
+ catch (...)
+ {
+ mxStream.saveException(std::current_exception());
+ }
+
+ mxStream.setTerminateThread();
+ }
+};
+
+}
+
+XBufferedThreadedStream::XBufferedThreadedStream(
+ const Reference<XInputStream>& xSrcStream,
+ sal_Int64 nStreamSize)
+: mxSrcStream( xSrcStream )
+, mnPos(0)
+, mnStreamSize( nStreamSize )
+, mnOffset( 0 )
+, mxUnzippingThread( new UnzippingThread(*this) )
+, mbTerminateThread( false )
+{
+ mxUnzippingThread->launch();
+}
+
+XBufferedThreadedStream::~XBufferedThreadedStream()
+{
+ setTerminateThread();
+ mxUnzippingThread->join();
+}
+
+/**
+ * Reads from UnbufferedStream in a separate thread and stores the buffer blocks
+ * in maPendingBuffers queue for further use.
+ */
+void XBufferedThreadedStream::produce()
+{
+ Buffer pProducedBuffer;
+ sal_Int64 nTotalBytesRead(0);
+ std::unique_lock<std::mutex> aGuard( maBufferProtector );
+ do
+ {
+ if( !maUsedBuffers.empty() )
+ {
+ pProducedBuffer = maUsedBuffers.front();
+ maUsedBuffers.pop();
+ }
+
+ aGuard.unlock();
+ nTotalBytesRead += mxSrcStream->readBytes( pProducedBuffer, nBufferSize );
+
+ aGuard.lock();
+ maPendingBuffers.push( pProducedBuffer );
+ maBufferConsumeResume.notify_one();
+
+ if (!mbTerminateThread)
+ maBufferProduceResume.wait( aGuard, [&]{return canProduce(); } );
+
+ } while( !mbTerminateThread && nTotalBytesRead < mnStreamSize );
+}
+
+/**
+ * Fetches next available block from maPendingBuffers for use in Reading thread.
+ */
+const Buffer& XBufferedThreadedStream::getNextBlock()
+{
+ std::unique_lock<std::mutex> aGuard( maBufferProtector );
+ const sal_Int32 nBufSize = maInUseBuffer.getLength();
+ if( nBufSize <= 0 || mnOffset >= nBufSize )
+ {
+ if( mnOffset >= nBufSize )
+ maUsedBuffers.push( maInUseBuffer );
+
+ maBufferConsumeResume.wait( aGuard, [&]{return canConsume(); } );
+
+ if( maPendingBuffers.empty() )
+ {
+ maInUseBuffer = Buffer();
+ if (maSavedException)
+ std::rethrow_exception(maSavedException);
+ }
+ else
+ {
+ maInUseBuffer = maPendingBuffers.front();
+ maPendingBuffers.pop();
+ mnOffset = 0;
+
+ if( maPendingBuffers.size() <= nBufferLowWater )
+ maBufferProduceResume.notify_one();
+ }
+ }
+
+ return maInUseBuffer;
+}
+
+void XBufferedThreadedStream::setTerminateThread()
+{
+ std::scoped_lock<std::mutex> aGuard( maBufferProtector );
+ mbTerminateThread = true;
+ maBufferProduceResume.notify_one();
+ maBufferConsumeResume.notify_one();
+}
+
+sal_Int32 SAL_CALL XBufferedThreadedStream::readBytes( Sequence< sal_Int8 >& rData, sal_Int32 nBytesToRead )
+{
+ if( !hasBytes() )
+ return 0;
+
+ const sal_Int32 nAvailableSize = static_cast< sal_Int32 > ( std::min< sal_Int64 >( nBytesToRead, remainingSize() ) );
+ rData.realloc( nAvailableSize );
+ auto pData = rData.getArray();
+ sal_Int32 i = 0, nPendingBytes = nAvailableSize;
+
+ while( nPendingBytes )
+ {
+ const Buffer &pBuffer = getNextBlock();
+ if( !pBuffer.hasElements() )
+ {
+ rData.realloc( nAvailableSize - nPendingBytes );
+ return nAvailableSize - nPendingBytes;
+ }
+ const sal_Int32 limit = std::min<sal_Int32>( nPendingBytes, pBuffer.getLength() - mnOffset );
+
+ memcpy( &pData[i], &pBuffer[mnOffset], limit );
+
+ nPendingBytes -= limit;
+ mnOffset += limit;
+ mnPos += limit;
+ i += limit;
+ }
+
+ return nAvailableSize;
+}
+
+sal_Int32 SAL_CALL XBufferedThreadedStream::readSomeBytes( Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead )
+{
+ return readBytes( aData, nMaxBytesToRead );
+}
+void SAL_CALL XBufferedThreadedStream::skipBytes( sal_Int32 nBytesToSkip )
+{
+ if( nBytesToSkip )
+ {
+ Sequence < sal_Int8 > aSequence( nBytesToSkip );
+ readBytes( aSequence, nBytesToSkip );
+ }
+}
+
+sal_Int32 SAL_CALL XBufferedThreadedStream::available()
+{
+ if( !hasBytes() )
+ return 0;
+
+ return static_cast< sal_Int32 > ( std::min< sal_Int64 >( SAL_MAX_INT32, remainingSize() ) );
+}
+
+void SAL_CALL XBufferedThreadedStream::closeInput()
+{
+ setTerminateThread();
+ mxUnzippingThread->join();
+ mxSrcStream->closeInput();
+}
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */