summaryrefslogtreecommitdiffstats
path: root/package/source/zipapi/XBufferedThreadedStream.hxx
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:06:44 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:06:44 +0000
commited5640d8b587fbcfed7dd7967f3de04b37a76f26 (patch)
tree7a5f7c6c9d02226d7471cb3cc8fbbf631b415303 /package/source/zipapi/XBufferedThreadedStream.hxx
parentInitial commit. (diff)
downloadlibreoffice-upstream.tar.xz
libreoffice-upstream.zip
Adding upstream version 4:7.4.7.upstream/4%7.4.7upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--package/source/zipapi/XBufferedThreadedStream.hxx83
1 files changed, 83 insertions, 0 deletions
diff --git a/package/source/zipapi/XBufferedThreadedStream.hxx b/package/source/zipapi/XBufferedThreadedStream.hxx
new file mode 100644
index 000000000..ad5d3b0ce
--- /dev/null
+++ b/package/source/zipapi/XBufferedThreadedStream.hxx
@@ -0,0 +1,83 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+#ifndef INCLUDED_PACKAGE_SOURCE_ZIPAPI_XBUFFEREDTHREADEDSTREAM_HXX
+#define INCLUDED_PACKAGE_SOURCE_ZIPAPI_XBUFFEREDTHREADEDSTREAM_HXX
+
+#include <com/sun/star/io/XInputStream.hpp>
+
+#include <cppuhelper/implbase.hxx>
+#include <rtl/ref.hxx>
+#include <salhelper/thread.hxx>
+
+#include <queue>
+#include <mutex>
+#include <condition_variable>
+
+typedef css::uno::Sequence< sal_Int8 > Buffer;
+
+class XBufferedThreadedStream : public cppu::WeakImplHelper< css::io::XInputStream >
+{
+private:
+ const css::uno::Reference<XInputStream> mxSrcStream;
+ sal_Int64 mnPos; /// position in stream
+ sal_Int64 mnStreamSize; /// available size of stream
+
+ Buffer maInUseBuffer; /// Buffer block in use
+ int mnOffset; /// position in maInUseBuffer
+ std::queue < Buffer > maPendingBuffers; /// Buffers that are available for use
+ std::queue < Buffer > maUsedBuffers;
+
+ rtl::Reference< salhelper::Thread > mxUnzippingThread;
+ std::mutex maBufferProtector; /// mutex protecting Buffer queues.
+ std::condition_variable maBufferConsumeResume;
+ std::condition_variable maBufferProduceResume;
+ bool mbTerminateThread; /// indicates the failure of one of the threads
+
+ std::exception_ptr maSavedException; /// exception caught during unzipping is saved to be thrown during reading
+
+ static const size_t nBufferLowWater = 2;
+ static const size_t nBufferHighWater = 4;
+ static const size_t nBufferSize = 32 * 1024;
+
+ const Buffer& getNextBlock();
+ sal_Int64 remainingSize() const { return mnStreamSize - mnPos; }
+ bool hasBytes() const { return mnPos < mnStreamSize; }
+
+ bool canProduce() const
+ {
+ return( mbTerminateThread || maPendingBuffers.size() < nBufferHighWater );
+ }
+
+ bool canConsume() const
+ {
+ return( mbTerminateThread || !maPendingBuffers.empty() );
+ }
+
+public:
+ XBufferedThreadedStream(
+ const css::uno::Reference<XInputStream>& xSrcStream,
+ sal_Int64 nStreamSize /* cf. sal_Int32 available(); */ );
+
+ virtual ~XBufferedThreadedStream() override;
+
+ void produce();
+ void setTerminateThread();
+ void saveException(const std::exception_ptr& exception) { maSavedException = exception; }
+
+ // XInputStream
+ virtual sal_Int32 SAL_CALL readBytes( css::uno::Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead ) override;
+ virtual sal_Int32 SAL_CALL readSomeBytes( css::uno::Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead ) override;
+ virtual void SAL_CALL skipBytes( sal_Int32 nBytesToSkip ) override;
+ virtual sal_Int32 SAL_CALL available( ) override;
+ virtual void SAL_CALL closeInput( ) override;
+};
+#endif
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */