diff options
Diffstat (limited to 'io/source')
-rw-r--r-- | io/source/TextInputStream/TextInputStream.cxx | 399 | ||||
-rw-r--r-- | io/source/TextOutputStream/TextOutputStream.cxx | 239 | ||||
-rw-r--r-- | io/source/acceptor/acc_pipe.cxx | 190 | ||||
-rw-r--r-- | io/source/acceptor/acc_socket.cxx | 349 | ||||
-rw-r--r-- | io/source/acceptor/acceptor.cxx | 253 | ||||
-rw-r--r-- | io/source/acceptor/acceptor.hxx | 74 | ||||
-rw-r--r-- | io/source/connector/connector.cxx | 178 | ||||
-rw-r--r-- | io/source/connector/connector.hxx | 94 | ||||
-rw-r--r-- | io/source/connector/ctr_pipe.cxx | 97 | ||||
-rw-r--r-- | io/source/connector/ctr_socket.cxx | 225 | ||||
-rw-r--r-- | io/source/io.component | 70 | ||||
-rw-r--r-- | io/source/stm/odata.cxx | 1231 | ||||
-rw-r--r-- | io/source/stm/omark.cxx | 766 | ||||
-rw-r--r-- | io/source/stm/opipe.cxx | 357 | ||||
-rw-r--r-- | io/source/stm/opump.cxx | 455 | ||||
-rw-r--r-- | io/source/stm/streamhelper.cxx | 173 | ||||
-rw-r--r-- | io/source/stm/streamhelper.hxx | 85 |
17 files changed, 5235 insertions, 0 deletions
diff --git a/io/source/TextInputStream/TextInputStream.cxx b/io/source/TextInputStream/TextInputStream.cxx new file mode 100644 index 000000000..9f7c88472 --- /dev/null +++ b/io/source/TextInputStream/TextInputStream.cxx @@ -0,0 +1,399 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#include <string.h> + +#include <comphelper/sequence.hxx> +#include <cppuhelper/implbase.hxx> +#include <cppuhelper/supportsservice.hxx> + +#include <rtl/textenc.h> +#include <rtl/tencinfo.h> + +#include <com/sun/star/io/BufferSizeExceededException.hpp> +#include <com/sun/star/io/IOException.hpp> +#include <com/sun/star/io/NotConnectedException.hpp> +#include <com/sun/star/io/XTextInputStream2.hpp> +#include <com/sun/star/lang/XServiceInfo.hpp> + +#include <vector> + +namespace com::sun::star::uno { class XComponentContext; } + +using namespace ::osl; +using namespace ::cppu; +using namespace ::com::sun::star::uno; +using namespace ::com::sun::star::lang; +using namespace ::com::sun::star::io; + + +// Implementation XTextInputStream + +#define INITIAL_UNICODE_BUFFER_CAPACITY 0x100 +#define READ_BYTE_COUNT 0x100 + +namespace { + +class OTextInputStream : public WeakImplHelper< XTextInputStream2, XServiceInfo > +{ + Reference< XInputStream > mxStream; + + // Encoding + bool mbEncodingInitialized; + rtl_TextToUnicodeConverter mConvText2Unicode; + rtl_TextToUnicodeContext mContextText2Unicode; + Sequence<sal_Int8> mSeqSource; + + // Internal buffer for characters that are already converted successfully + std::vector<sal_Unicode> mvBuffer; + sal_Int32 mnCharsInBuffer; + bool mbReachedEOF; + + /// @throws IOException + /// @throws RuntimeException + OUString implReadString( const Sequence< sal_Unicode >& Delimiters, + bool bRemoveDelimiter, bool bFindLineEnd ); + /// @throws IOException + /// @throws RuntimeException + sal_Int32 implReadNext(); + +public: + OTextInputStream(); + virtual ~OTextInputStream() override; + + // Methods XTextInputStream + virtual OUString SAL_CALL readLine( ) override; + virtual OUString SAL_CALL readString( const Sequence< sal_Unicode >& Delimiters, sal_Bool bRemoveDelimiter ) override; + virtual sal_Bool SAL_CALL isEOF( ) override; + virtual void SAL_CALL setEncoding( const OUString& Encoding ) override; + + // Methods XInputStream + virtual sal_Int32 SAL_CALL readBytes( Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead ) override; + virtual sal_Int32 SAL_CALL readSomeBytes( Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead ) override; + virtual void SAL_CALL skipBytes( sal_Int32 nBytesToSkip ) override; + virtual sal_Int32 SAL_CALL available( ) override; + virtual void SAL_CALL closeInput( ) override; + + // Methods XActiveDataSink + virtual void SAL_CALL setInputStream( const Reference< XInputStream >& aStream ) override; + virtual Reference< XInputStream > SAL_CALL getInputStream() override; + + // Methods XServiceInfo + virtual OUString SAL_CALL getImplementationName() override; + virtual Sequence< OUString > SAL_CALL getSupportedServiceNames() override; + virtual sal_Bool SAL_CALL supportsService(const OUString& ServiceName) override; +}; + +} + +OTextInputStream::OTextInputStream() + : mbEncodingInitialized(false) + , mConvText2Unicode(nullptr) + , mContextText2Unicode(nullptr) + , mSeqSource(READ_BYTE_COUNT) + , mvBuffer(INITIAL_UNICODE_BUFFER_CAPACITY, 0) + , mnCharsInBuffer(0) + , mbReachedEOF(false) +{ +} + +OTextInputStream::~OTextInputStream() +{ + if( mbEncodingInitialized ) + { + rtl_destroyTextToUnicodeContext( mConvText2Unicode, mContextText2Unicode ); + rtl_destroyTextToUnicodeConverter( mConvText2Unicode ); + } +} + + +// XTextInputStream + +OUString OTextInputStream::readLine( ) +{ + static Sequence< sal_Unicode > aDummySeq; + return implReadString( aDummySeq, true, true ); +} + +OUString OTextInputStream::readString( const Sequence< sal_Unicode >& Delimiters, sal_Bool bRemoveDelimiter ) +{ + return implReadString( Delimiters, bRemoveDelimiter, false ); +} + +sal_Bool OTextInputStream::isEOF() +{ + bool bRet = false; + if( mnCharsInBuffer == 0 && mbReachedEOF ) + bRet = true; + return bRet; +} + + +OUString OTextInputStream::implReadString( const Sequence< sal_Unicode >& Delimiters, + bool bRemoveDelimiter, bool bFindLineEnd ) +{ + OUString aRetStr; + if( !mbEncodingInitialized ) + { + setEncoding( "utf8" ); + } + if( !mbEncodingInitialized ) + return aRetStr; + + // Only for bFindLineEnd + sal_Unicode cLineEndChar1 = 0x0D; + sal_Unicode cLineEndChar2 = 0x0A; + + sal_Int32 nBufferReadPos = 0; + sal_Int32 nCopyLen = 0; + bool bFound = false; + bool bFoundFirstLineEndChar = false; + sal_Unicode cFirstLineEndChar = 0; + while( !bFound ) + { + // Still characters available? + if( nBufferReadPos == mnCharsInBuffer ) + { + // Already reached EOF? Then we can't read any more + if( mbReachedEOF ) + break; + + // No, so read new characters + if( !implReadNext() ) + break; + } + + // Now there should be characters available + // (otherwise the loop should have been broken before) + sal_Unicode c = mvBuffer[ nBufferReadPos++ ]; + + if( bFindLineEnd ) + { + if( bFoundFirstLineEndChar ) + { + bFound = true; + nCopyLen = nBufferReadPos - 2; + if( c == cLineEndChar1 || c == cLineEndChar2 ) + { + // Same line end char -> new line break + if( c == cFirstLineEndChar ) + { + nBufferReadPos--; + } + } + else + { + // No second line end char + nBufferReadPos--; + } + } + else if( c == cLineEndChar1 || c == cLineEndChar2 ) + { + bFoundFirstLineEndChar = true; + cFirstLineEndChar = c; + } + } + else if( comphelper::findValue(Delimiters, c) != -1 ) + { + bFound = true; + nCopyLen = nBufferReadPos; + if( bRemoveDelimiter ) + nCopyLen--; + } + } + + // Nothing found? Return all + if( !nCopyLen && !bFound && mbReachedEOF ) + nCopyLen = nBufferReadPos; + + // Create string + if( nCopyLen ) + aRetStr = OUString( mvBuffer.data(), nCopyLen ); + + // Copy rest of buffer + memmove( mvBuffer.data(), mvBuffer.data() + nBufferReadPos, + (mnCharsInBuffer - nBufferReadPos) * sizeof( sal_Unicode ) ); + mnCharsInBuffer -= nBufferReadPos; + + return aRetStr; +} + + +sal_Int32 OTextInputStream::implReadNext() +{ + sal_Int32 nFreeBufferSize = mvBuffer.size() - mnCharsInBuffer; + if( nFreeBufferSize < READ_BYTE_COUNT ) + mvBuffer.resize(mvBuffer.size() * 2); + nFreeBufferSize = mvBuffer.size() - mnCharsInBuffer; + + try + { + sal_Int32 nRead = mxStream->readSomeBytes( mSeqSource, READ_BYTE_COUNT ); + sal_Int32 nTotalRead = nRead; + if( nRead == 0 ) + mbReachedEOF = true; + + // Try to convert + sal_uInt32 uiInfo; + sal_Size nSrcCvtBytes = 0; + sal_Size nTargetCount = 0; + sal_Size nSourceCount = 0; + while( true ) + { + const sal_Int8 *pbSource = mSeqSource.getConstArray(); + + // All invalid characters are transformed to the unicode undefined char + nTargetCount += rtl_convertTextToUnicode( + mConvText2Unicode, + mContextText2Unicode, + reinterpret_cast<const char*>(&( pbSource[nSourceCount] )), + nTotalRead - nSourceCount, + mvBuffer.data() + mnCharsInBuffer + nTargetCount, + nFreeBufferSize - nTargetCount, + RTL_TEXTTOUNICODE_FLAGS_UNDEFINED_DEFAULT | + RTL_TEXTTOUNICODE_FLAGS_MBUNDEFINED_DEFAULT | + RTL_TEXTTOUNICODE_FLAGS_INVALID_DEFAULT, + &uiInfo, + &nSrcCvtBytes ); + nSourceCount += nSrcCvtBytes; + + bool bCont = false; + if( uiInfo & RTL_TEXTTOUNICODE_INFO_DESTBUFFERTOOSMALL ) + { + mvBuffer.resize(mvBuffer.size() * 2); + bCont = true; + } + + if( uiInfo & RTL_TEXTTOUNICODE_INFO_SRCBUFFERTOOSMALL ) + { + // read next byte + static Sequence< sal_Int8 > aOneByteSeq( 1 ); + nRead = mxStream->readSomeBytes( aOneByteSeq, 1 ); + if( nRead == 0 ) + { + mbReachedEOF = true; + break; + } + + sal_Int32 nOldLen = mSeqSource.getLength(); + nTotalRead++; + if( nTotalRead > nOldLen ) + { + mSeqSource.realloc( nTotalRead ); + } + mSeqSource.getArray()[ nOldLen ] = aOneByteSeq.getConstArray()[ 0 ]; + bCont = true; + } + + if( bCont ) + continue; + break; + } + + mnCharsInBuffer += nTargetCount; + return nTargetCount; + } + catch( NotConnectedException& ) + { + throw IOException("Not connected"); + //throw IOException( L"OTextInputStream::implReadString failed" ); + } + catch( BufferSizeExceededException& ) + { + throw IOException("Buffer size exceeded"); + } +} + +void OTextInputStream::setEncoding( const OUString& Encoding ) +{ + OString aOEncodingStr = OUStringToOString( Encoding, RTL_TEXTENCODING_ASCII_US ); + rtl_TextEncoding encoding = rtl_getTextEncodingFromMimeCharset( aOEncodingStr.getStr() ); + if( RTL_TEXTENCODING_DONTKNOW == encoding ) + return; + + mbEncodingInitialized = true; + mConvText2Unicode = rtl_createTextToUnicodeConverter( encoding ); + mContextText2Unicode = rtl_createTextToUnicodeContext( mConvText2Unicode ); +} + + +// XInputStream + +sal_Int32 OTextInputStream::readBytes( Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead ) +{ + return mxStream->readBytes( aData, nBytesToRead ); +} + +sal_Int32 OTextInputStream::readSomeBytes( Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead ) +{ + return mxStream->readSomeBytes( aData, nMaxBytesToRead ); +} + +void OTextInputStream::skipBytes( sal_Int32 nBytesToSkip ) +{ + mxStream->skipBytes( nBytesToSkip ); +} + +sal_Int32 OTextInputStream::available( ) +{ + return mxStream->available(); +} + +void OTextInputStream::closeInput( ) +{ + mxStream->closeInput(); +} + + +// XActiveDataSink + +void OTextInputStream::setInputStream( const Reference< XInputStream >& aStream ) +{ + mxStream = aStream; +} + +Reference< XInputStream > OTextInputStream::getInputStream() +{ + return mxStream; +} + +OUString OTextInputStream::getImplementationName() +{ + return "com.sun.star.comp.io.TextInputStream"; +} + +sal_Bool OTextInputStream::supportsService(const OUString& ServiceName) +{ + return cppu::supportsService(this, ServiceName); +} + +Sequence< OUString > OTextInputStream::getSupportedServiceNames() +{ + return { "com.sun.star.io.TextInputStream" }; +} + +extern "C" SAL_DLLPUBLIC_EXPORT css::uno::XInterface* +io_OTextInputStream_get_implementation( + css::uno::XComponentContext* , css::uno::Sequence<css::uno::Any> const&) +{ + return cppu::acquire(new OTextInputStream()); +} + + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/io/source/TextOutputStream/TextOutputStream.cxx b/io/source/TextOutputStream/TextOutputStream.cxx new file mode 100644 index 000000000..1271c4d09 --- /dev/null +++ b/io/source/TextOutputStream/TextOutputStream.cxx @@ -0,0 +1,239 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + + + +#include <cppuhelper/implbase.hxx> +#include <cppuhelper/supportsservice.hxx> + +#include <rtl/textenc.h> +#include <rtl/tencinfo.h> + +#include <com/sun/star/io/IOException.hpp> +#include <com/sun/star/io/XTextOutputStream2.hpp> +#include <com/sun/star/lang/XServiceInfo.hpp> + +namespace com::sun::star::uno { class XComponentContext; } + +using namespace ::osl; +using namespace ::cppu; +using namespace ::com::sun::star::uno; +using namespace ::com::sun::star::lang; +using namespace ::com::sun::star::io; + +// Implementation XTextOutputStream + +namespace { + +class OTextOutputStream : public WeakImplHelper< XTextOutputStream2, XServiceInfo > +{ + Reference< XOutputStream > mxStream; + + // Encoding + bool mbEncodingInitialized; + rtl_UnicodeToTextConverter mConvUnicode2Text; + rtl_UnicodeToTextContext mContextUnicode2Text; + + Sequence<sal_Int8> implConvert( const OUString& rSource ); + /// @throws IOException + void checkOutputStream() const; + +public: + OTextOutputStream(); + virtual ~OTextOutputStream() override; + + // Methods XTextOutputStream + virtual void SAL_CALL writeString( const OUString& aString ) override; + virtual void SAL_CALL setEncoding( const OUString& Encoding ) override; + + // Methods XOutputStream + virtual void SAL_CALL writeBytes( const Sequence< sal_Int8 >& aData ) override; + virtual void SAL_CALL flush( ) override; + virtual void SAL_CALL closeOutput( ) override; + + // Methods XActiveDataSource + virtual void SAL_CALL setOutputStream( const Reference< XOutputStream >& aStream ) override; + virtual Reference< XOutputStream > SAL_CALL getOutputStream( ) override; + + // Methods XServiceInfo + virtual OUString SAL_CALL getImplementationName() override; + virtual Sequence< OUString > SAL_CALL getSupportedServiceNames() override; + virtual sal_Bool SAL_CALL supportsService(const OUString& ServiceName) override; +}; + +} + +OTextOutputStream::OTextOutputStream() + : mbEncodingInitialized(false) + , mConvUnicode2Text(nullptr) + , mContextUnicode2Text(nullptr) +{ +} + +OTextOutputStream::~OTextOutputStream() +{ + if( mbEncodingInitialized ) + { + rtl_destroyUnicodeToTextContext( mConvUnicode2Text, mContextUnicode2Text ); + rtl_destroyUnicodeToTextConverter( mConvUnicode2Text ); + } +} + +Sequence<sal_Int8> OTextOutputStream::implConvert( const OUString& rSource ) +{ + const sal_Unicode *puSource = rSource.getStr(); + sal_Int32 nSourceSize = rSource.getLength(); + + sal_Size nTargetCount = 0; + sal_Size nSourceCount = 0; + + sal_uInt32 uiInfo; + sal_Size nSrcCvtChars; + + // take nSourceSize * 3 as preference + // this is an upper boundary for converting to utf8, + // which most often used as the target. + sal_Int32 nSeqSize = nSourceSize * 3; + + Sequence<sal_Int8> seqText( nSeqSize ); + char *pTarget = reinterpret_cast<char *>(seqText.getArray()); + while( true ) + { + nTargetCount += rtl_convertUnicodeToText( + mConvUnicode2Text, + mContextUnicode2Text, + &( puSource[nSourceCount] ), + nSourceSize - nSourceCount , + &( pTarget[nTargetCount] ), + nSeqSize - nTargetCount, + RTL_UNICODETOTEXT_FLAGS_UNDEFINED_DEFAULT | + RTL_UNICODETOTEXT_FLAGS_INVALID_DEFAULT , + &uiInfo, + &nSrcCvtChars); + nSourceCount += nSrcCvtChars; + + if( uiInfo & RTL_UNICODETOTEXT_INFO_DESTBUFFERTOSMALL ) + { + nSeqSize *= 2; + seqText.realloc( nSeqSize ); // double array size + pTarget = reinterpret_cast<char*>(seqText.getArray()); + continue; + } + break; + } + + // reduce the size of the buffer (fast, no copy necessary) + seqText.realloc( nTargetCount ); + return seqText; +} + + +// XTextOutputStream + +void OTextOutputStream::writeString( const OUString& aString ) +{ + checkOutputStream(); + if( !mbEncodingInitialized ) + { + setEncoding( "utf8" ); + } + if( !mbEncodingInitialized ) + return; + + Sequence<sal_Int8> aByteSeq = implConvert( aString ); + mxStream->writeBytes( aByteSeq ); +} + +void OTextOutputStream::setEncoding( const OUString& Encoding ) +{ + OString aOEncodingStr = OUStringToOString( Encoding, RTL_TEXTENCODING_ASCII_US ); + rtl_TextEncoding encoding = rtl_getTextEncodingFromMimeCharset( aOEncodingStr.getStr() ); + if( RTL_TEXTENCODING_DONTKNOW == encoding ) + return; + + mbEncodingInitialized = true; + mConvUnicode2Text = rtl_createUnicodeToTextConverter( encoding ); + mContextUnicode2Text = rtl_createUnicodeToTextContext( mConvUnicode2Text ); +} + + +// XOutputStream +void OTextOutputStream::writeBytes( const Sequence< sal_Int8 >& aData ) +{ + checkOutputStream(); + mxStream->writeBytes( aData ); +} + +void OTextOutputStream::flush( ) +{ + checkOutputStream(); + mxStream->flush(); +} + +void OTextOutputStream::closeOutput( ) +{ + checkOutputStream(); + mxStream->closeOutput(); +} + + +void OTextOutputStream::checkOutputStream() const +{ + if (! mxStream.is() ) + throw IOException("output stream is not initialized, you have to use setOutputStream first"); +} + + +// XActiveDataSource + +void OTextOutputStream::setOutputStream( const Reference< XOutputStream >& aStream ) +{ + mxStream = aStream; +} + +Reference< XOutputStream > OTextOutputStream::getOutputStream() +{ + return mxStream; +} + +OUString OTextOutputStream::getImplementationName() +{ + return "com.sun.star.comp.io.TextOutputStream"; +} + +sal_Bool OTextOutputStream::supportsService(const OUString& ServiceName) +{ + return cppu::supportsService(this, ServiceName); +} + +Sequence< OUString > OTextOutputStream::getSupportedServiceNames() +{ + return { "com.sun.star.io.TextOutputStream" }; +} + + + +extern "C" SAL_DLLPUBLIC_EXPORT css::uno::XInterface* +io_OTextOutputStream_get_implementation( + css::uno::XComponentContext* , css::uno::Sequence<css::uno::Any> const&) +{ + return cppu::acquire(new OTextOutputStream()); +} + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/io/source/acceptor/acc_pipe.cxx b/io/source/acceptor/acc_pipe.cxx new file mode 100644 index 000000000..db43c58db --- /dev/null +++ b/io/source/acceptor/acc_pipe.cxx @@ -0,0 +1,190 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#include <osl/security.hxx> +#include "acceptor.hxx" +#include <com/sun/star/connection/XConnection.hpp> +#include <com/sun/star/connection/ConnectionSetupException.hpp> +#include <com/sun/star/io/IOException.hpp> + +#include <osl/diagnose.h> +#include <cppuhelper/implbase.hxx> +#include <rtl/ref.hxx> + +using namespace ::osl; +using namespace ::cppu; +using namespace ::com::sun::star::uno; +using namespace ::com::sun::star::lang; +using namespace ::com::sun::star::connection; +using namespace ::com::sun::star::io; + + +namespace io_acceptor +{ + namespace { + + class PipeConnection : + public WeakImplHelper< XConnection > + { + public: + explicit PipeConnection( const OUString &sConnectionDescription); + + virtual sal_Int32 SAL_CALL read( Sequence< sal_Int8 >& aReadBytes, sal_Int32 nBytesToRead ) override; + virtual void SAL_CALL write( const Sequence< sal_Int8 >& aData ) override; + virtual void SAL_CALL flush( ) override; + virtual void SAL_CALL close( ) override; + virtual OUString SAL_CALL getDescription( ) override; + public: + ::osl::StreamPipe m_pipe; + oslInterlockedCount m_nStatus; + OUString m_sDescription; + }; + + } + + PipeConnection::PipeConnection( const OUString &sConnectionDescription) : + m_nStatus( 0 ), + m_sDescription( sConnectionDescription ) + { + // make it unique + m_sDescription += ",uniqueValue="; + m_sDescription += OUString::number( + sal::static_int_cast<sal_Int64 >( + reinterpret_cast< sal_IntPtr >(&m_pipe)) ); + } + + sal_Int32 PipeConnection::read( Sequence < sal_Int8 > & aReadBytes , sal_Int32 nBytesToRead ) + { + if( m_nStatus ) + { + throw IOException("pipe already closed"); + } + if( aReadBytes.getLength() < nBytesToRead ) + { + aReadBytes.realloc( nBytesToRead ); + } + sal_Int32 n = m_pipe.read( aReadBytes.getArray(), nBytesToRead ); + OSL_ASSERT( n >= 0 && n <= aReadBytes.getLength() ); + if( n < aReadBytes.getLength() ) + { + aReadBytes.realloc( n ); + } + return n; + + } + + void PipeConnection::write( const Sequence < sal_Int8 > &seq ) + { + if( m_nStatus ) + { + throw IOException("pipe already closed"); + } + if( m_pipe.write( seq.getConstArray() , seq.getLength() ) != seq.getLength() ) + { + throw IOException("short write"); + } + } + + void PipeConnection::flush( ) + { + } + + void PipeConnection::close() + { + if( 1 == osl_atomic_increment( (&m_nStatus) ) ) + { + m_pipe.close(); + } + } + + OUString PipeConnection::getDescription() + { + return m_sDescription; + } + + /*************** + * PipeAcceptor + **************/ + PipeAcceptor::PipeAcceptor( const OUString &sPipeName , const OUString & sConnectionDescription) : + m_sPipeName( sPipeName ), + m_sConnectionDescription( sConnectionDescription ), + m_bClosed( false ) + { + } + + + void PipeAcceptor::init() + { + m_pipe = Pipe( m_sPipeName.pData , osl_Pipe_CREATE , osl::Security() ); + if( ! m_pipe.is() ) + { + OUString error = "io.acceptor: Couldn't setup pipe " + m_sPipeName; + throw ConnectionSetupException( error ); + } + } + + Reference< XConnection > PipeAcceptor::accept( ) + { + Pipe pipe; + { + std::unique_lock guard( m_mutex ); + pipe = m_pipe; + } + if( ! pipe.is() ) + { + OUString error = "io.acceptor: pipe already closed" + m_sPipeName; + throw ConnectionSetupException( error ); + } + rtl::Reference<PipeConnection> pConn(new PipeConnection( m_sConnectionDescription )); + + oslPipeError status = pipe.accept( pConn->m_pipe ); + + if( m_bClosed ) + { + // stopAccepting was called ! + return Reference < XConnection >(); + } + else if( osl_Pipe_E_None == status ) + { + return pConn; + } + else + { + OUString error = "io.acceptor: Couldn't setup pipe " + m_sPipeName; + throw ConnectionSetupException( error ); + } + } + + void PipeAcceptor::stopAccepting() + { + m_bClosed = true; + Pipe pipe; + { + std::unique_lock guard( m_mutex ); + pipe = m_pipe; + m_pipe.clear(); + } + if( pipe.is() ) + { + pipe.close(); + } + } +} + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/io/source/acceptor/acc_socket.cxx b/io/source/acceptor/acc_socket.cxx new file mode 100644 index 000000000..d52fdc423 --- /dev/null +++ b/io/source/acceptor/acc_socket.cxx @@ -0,0 +1,349 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#include "acceptor.hxx" + +#include <unordered_set> + +#include <mutex> +#include <rtl/ref.hxx> +#include <com/sun/star/connection/XConnection.hpp> +#include <com/sun/star/connection/XConnectionBroadcaster.hpp> +#include <com/sun/star/connection/ConnectionSetupException.hpp> +#include <com/sun/star/io/IOException.hpp> +#include <cppuhelper/implbase.hxx> + +using namespace ::osl; +using namespace ::cppu; +using namespace ::com::sun::star::uno; +using namespace ::com::sun::star::io; +using namespace ::com::sun::star::connection; + + +namespace io_acceptor { + + typedef std::unordered_set< css::uno::Reference< css::io::XStreamListener> > + XStreamListener_hash_set; + + namespace { + + class SocketConnection : public ::cppu::WeakImplHelper< + css::connection::XConnection, + css::connection::XConnectionBroadcaster> + + { + public: + explicit SocketConnection( const OUString & sConnectionDescription ); + + virtual sal_Int32 SAL_CALL read( css::uno::Sequence< sal_Int8 >& aReadBytes, + sal_Int32 nBytesToRead ) override; + virtual void SAL_CALL write( const css::uno::Sequence< sal_Int8 >& aData ) override; + virtual void SAL_CALL flush( ) override; + virtual void SAL_CALL close( ) override; + virtual OUString SAL_CALL getDescription( ) override; + + // XConnectionBroadcaster + virtual void SAL_CALL addStreamListener(const css::uno::Reference< css::io::XStreamListener>& aListener) override; + virtual void SAL_CALL removeStreamListener(const css::uno::Reference< css::io::XStreamListener>& aListener) override; + + public: + void completeConnectionString(); + + ::osl::StreamSocket m_socket; + oslInterlockedCount m_nStatus; + OUString m_sDescription; + + std::mutex _mutex; + bool _started; + bool _closed; + bool _error; + XStreamListener_hash_set _listeners; + }; + + } + + template<class T> + static void notifyListeners(SocketConnection * pCon, bool * notified, T t) + { + XStreamListener_hash_set listeners; + + { + std::unique_lock guard(pCon->_mutex); + if(!*notified) + { + *notified = true; + listeners = pCon->_listeners; + } + } + + for(auto& listener : listeners) + t(listener); + } + + static void callStarted(const Reference<XStreamListener>& xStreamListener) + { + xStreamListener->started(); + } + + namespace { + + struct callError { + const Any & any; + + explicit callError(const Any & any); + + void operator () (const Reference<XStreamListener>& xStreamListener); + }; + + } + + callError::callError(const Any & aAny) + : any(aAny) + { + } + + void callError::operator () (const Reference<XStreamListener>& xStreamListener) + { + xStreamListener->error(any); + } + + static void callClosed(const Reference<XStreamListener>& xStreamListener) + { + xStreamListener->closed(); + } + + + SocketConnection::SocketConnection( const OUString &sConnectionDescription) : + m_nStatus( 0 ), + m_sDescription( sConnectionDescription ), + _started(false), + _closed(false), + _error(false) + { + // make it unique + m_sDescription += ",uniqueValue=" ; + m_sDescription += OUString::number( + sal::static_int_cast< sal_Int64 >( + reinterpret_cast< sal_IntPtr >(&m_socket)) ); + } + + void SocketConnection::completeConnectionString() + { + m_sDescription += + ",peerPort=" + OUString::number(m_socket.getPeerPort()) + + ",peerHost=" + m_socket.getPeerHost( ) + + ",localPort=" + OUString::number( m_socket.getLocalPort() ) + + ",localHost=" + m_socket.getLocalHost(); + } + + sal_Int32 SocketConnection::read( Sequence < sal_Int8 > & aReadBytes , sal_Int32 nBytesToRead ) + { + if( ! m_nStatus ) + { + notifyListeners(this, &_started, callStarted); + + if( aReadBytes.getLength() != nBytesToRead ) + { + aReadBytes.realloc( nBytesToRead ); + } + + sal_Int32 i = m_socket.read( + aReadBytes.getArray(), aReadBytes.getLength()); + + if(i != nBytesToRead) + { + OUString message = "acc_socket.cxx:SocketConnection::read: error - " + + m_socket.getErrorAsString(); + + IOException ioException(message, static_cast<XConnection *>(this)); + + Any any; + any <<= ioException; + + notifyListeners(this, &_error, callError(any)); + + throw ioException; + } + + return i; + } + else + { + IOException ioException("acc_socket.cxx:SocketConnection::read: error - connection already closed", static_cast<XConnection *>(this)); + + Any any; + any <<= ioException; + + notifyListeners(this, &_error, callError(any)); + + throw ioException; + } + } + + void SocketConnection::write( const Sequence < sal_Int8 > &seq ) + { + if( ! m_nStatus ) + { + if( m_socket.write( seq.getConstArray() , seq.getLength() ) != seq.getLength() ) + { + OUString message = "acc_socket.cxx:SocketConnection::write: error - " + + m_socket.getErrorAsString(); + + IOException ioException(message, static_cast<XConnection *>(this)); + + Any any; + any <<= ioException; + + notifyListeners(this, &_error, callError(any)); + + throw ioException; + } + } + else + { + IOException ioException("acc_socket.cxx:SocketConnection::write: error - connection already closed", static_cast<XConnection *>(this)); + + Any any; + any <<= ioException; + + notifyListeners(this, &_error, callError(any)); + + throw ioException; + } + } + + void SocketConnection::flush( ) + { + + } + + void SocketConnection::close() + { + // ensure close is called only once + if( 1 == osl_atomic_increment( (&m_nStatus) ) ) + { + m_socket.shutdown(); + notifyListeners(this, &_closed, callClosed); + } + } + + OUString SocketConnection::getDescription() + { + return m_sDescription; + } + + + // XConnectionBroadcaster + void SAL_CALL SocketConnection::addStreamListener(const Reference<XStreamListener> & aListener) + { + std::unique_lock guard(_mutex); + + _listeners.insert(aListener); + } + + void SAL_CALL SocketConnection::removeStreamListener(const Reference<XStreamListener> & aListener) + { + std::unique_lock guard(_mutex); + + _listeners.erase(aListener); + } + + SocketAcceptor::SocketAcceptor( const OUString &sSocketName, + sal_uInt16 nPort, + bool bTcpNoDelay, + const OUString &sConnectionDescription) : + m_sSocketName( sSocketName ), + m_sConnectionDescription( sConnectionDescription ), + m_nPort( nPort ), + m_bTcpNoDelay( bTcpNoDelay ), + m_bClosed( false ) + { + } + + + void SocketAcceptor::init() + { + if( ! m_addr.setPort( m_nPort ) ) + { + throw ConnectionSetupException( + "acc_socket.cxx:SocketAcceptor::init - error - invalid tcp/ip port " + + OUString::number( m_nPort )); + } + if( ! m_addr.setHostname( m_sSocketName.pData ) ) + { + throw ConnectionSetupException( + "acc_socket.cxx:SocketAcceptor::init - error - invalid host " + m_sSocketName ); + } + m_socket.setOption( osl_Socket_OptionReuseAddr, 1); + + if(! m_socket.bind(m_addr) ) + { + throw ConnectionSetupException( + "acc_socket.cxx:SocketAcceptor::init - error - couldn't bind on " + + m_sSocketName + ":" + OUString::number(m_nPort)); + } + + if(! m_socket.listen() ) + { + throw ConnectionSetupException( + "acc_socket.cxx:SocketAcceptor::init - error - can't listen on " + + m_sSocketName + ":" + OUString::number(m_nPort) ); + } + } + + Reference< XConnection > SocketAcceptor::accept( ) + { + rtl::Reference<SocketConnection> pConn(new SocketConnection( m_sConnectionDescription )); + + if( m_socket.acceptConnection( pConn->m_socket )!= osl_Socket_Ok ) + { + // stopAccepting was called + return Reference < XConnection > (); + } + if( m_bClosed ) + { + return Reference < XConnection > (); + } + + pConn->completeConnectionString(); + ::osl::SocketAddr remoteAddr; + pConn->m_socket.getPeerAddr(remoteAddr); + OUString remoteHostname = remoteAddr.getHostname(); + // we enable tcpNoDelay for loopback connections because + // it can make a significant speed difference on linux boxes. + if( m_bTcpNoDelay || remoteHostname == "localhost" || + remoteHostname.startsWith("127.0.0.") ) + { + sal_Int32 nTcpNoDelay = sal_Int32(true); + pConn->m_socket.setOption( osl_Socket_OptionTcpNoDelay , &nTcpNoDelay, + sizeof( nTcpNoDelay ) , osl_Socket_LevelTcp ); + } + + return pConn; + } + + void SocketAcceptor::stopAccepting() + { + m_bClosed = true; + m_socket.close(); + } +} + + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/io/source/acceptor/acceptor.cxx b/io/source/acceptor/acceptor.cxx new file mode 100644 index 000000000..af0883be9 --- /dev/null +++ b/io/source/acceptor/acceptor.cxx @@ -0,0 +1,253 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#include <cppuhelper/implbase.hxx> +#include <cppuhelper/supportsservice.hxx> +#include <cppuhelper/unourl.hxx> +#include <rtl/malformeduriexception.hxx> + +#include <com/sun/star/connection/AlreadyAcceptingException.hpp> +#include <com/sun/star/connection/ConnectionSetupException.hpp> +#include <com/sun/star/connection/XAcceptor.hpp> +#include <com/sun/star/lang/IllegalArgumentException.hpp> +#include <com/sun/star/lang/XServiceInfo.hpp> +#include <com/sun/star/uno/XComponentContext.hpp> + +#include "acceptor.hxx" +#include <memory> +#include <mutex> +#include <string_view> + +using namespace ::osl; +using namespace ::cppu; +using namespace ::com::sun::star::uno; +using namespace ::com::sun::star::lang; +using namespace ::com::sun::star::connection; + +namespace { + + class OAcceptor : public WeakImplHelper< XAcceptor, XServiceInfo > + { + public: + explicit OAcceptor(const Reference< XComponentContext > & xCtx); + virtual ~OAcceptor() override; + public: + // Methods + virtual Reference< XConnection > SAL_CALL accept( const OUString& sConnectionDescription ) override; + virtual void SAL_CALL stopAccepting( ) override; + + public: // XServiceInfo + virtual OUString SAL_CALL getImplementationName() override; + virtual Sequence< OUString > SAL_CALL getSupportedServiceNames() override; + virtual sal_Bool SAL_CALL supportsService(const OUString& ServiceName) override; + + private: + std::unique_ptr<io_acceptor::PipeAcceptor> m_pPipe; + std::unique_ptr<io_acceptor::SocketAcceptor> m_pSocket; + std::mutex m_mutex; + OUString m_sLastDescription; + bool m_bInAccept; + + Reference< XMultiComponentFactory > _xSMgr; + Reference< XComponentContext > _xCtx; + Reference<XAcceptor> _xAcceptor; + }; + +} + +OAcceptor::OAcceptor( const Reference< XComponentContext > & xCtx ) + : m_bInAccept( false ) + , _xSMgr( xCtx->getServiceManager() ) + , _xCtx( xCtx ) +{} + +OAcceptor::~OAcceptor() +{ + m_pPipe.reset(); +} + +namespace { +struct BeingInAccept +{ + /// @throws AlreadyAcceptingException + BeingInAccept( bool *pFlag,std::u16string_view sConnectionDescription ) + : m_pFlag( pFlag ) + { + if( *m_pFlag ) + throw AlreadyAcceptingException( OUString::Concat("AlreadyAcceptingException :") + sConnectionDescription ); + *m_pFlag = true; + } + ~BeingInAccept() + { + *m_pFlag = false; + } + bool *m_pFlag; +}; +} + +Reference< XConnection > OAcceptor::accept( const OUString &sConnectionDescription ) +{ + // if there is a thread already accepting in this object, throw an exception. + struct BeingInAccept guard( &m_bInAccept, sConnectionDescription ); + + Reference< XConnection > r; + if( !m_sLastDescription.isEmpty() && + m_sLastDescription != sConnectionDescription ) + { + // instantiate another acceptor for different ports + throw ConnectionSetupException( "acceptor::accept called multiple times with different connection strings\n" ); + } + + if( m_sLastDescription.isEmpty() ) + { + // setup the acceptor + try + { + cppu::UnoUrlDescriptor aDesc(sConnectionDescription); + if ( aDesc.getName() == "pipe" ) + { + OUString aName( + aDesc.getParameter( + "name")); + + m_pPipe.reset(new io_acceptor::PipeAcceptor(aName, sConnectionDescription)); + + try + { + m_pPipe->init(); + } + catch( ... ) + { + { + std::unique_lock g( m_mutex ); + m_pPipe.reset(); + } + throw; + } + } + else if ( aDesc.getName() == "socket" ) + { + OUString aHost; + if (aDesc.hasParameter( + "host")) + aHost = aDesc.getParameter( + "host"); + else + aHost = "localhost"; + sal_uInt16 nPort = static_cast< sal_uInt16 >( + aDesc.getParameter( + "port"). + toInt32()); + bool bTcpNoDelay + = aDesc.getParameter( + "tcpnodelay").toInt32() != 0; + + m_pSocket.reset(new io_acceptor::SocketAcceptor( + aHost, nPort, bTcpNoDelay, sConnectionDescription)); + + try + { + m_pSocket->init(); + } + catch( ... ) + { + { + std::unique_lock g( m_mutex ); + m_pSocket.reset(); + } + throw; + } + } + else + { + OUString delegatee = "com.sun.star.connection.Acceptor." + aDesc.getName(); + _xAcceptor.set(_xSMgr->createInstanceWithContext(delegatee, _xCtx), UNO_QUERY); + + if(!_xAcceptor.is()) + throw ConnectionSetupException("Acceptor: unknown delegatee " + delegatee); + } + } + catch (const rtl::MalformedUriException & rEx) + { + throw IllegalArgumentException( + rEx.getMessage(), + Reference< XInterface > (), + 0 ); + } + m_sLastDescription = sConnectionDescription; + } + + if( m_pPipe ) + { + r = m_pPipe->accept(); + } + else if( m_pSocket ) + { + r = m_pSocket->accept(); + } + else + { + r = _xAcceptor->accept(sConnectionDescription); + } + + return r; +} + +void SAL_CALL OAcceptor::stopAccepting( ) +{ + std::unique_lock guard( m_mutex ); + + if( m_pPipe ) + { + m_pPipe->stopAccepting(); + } + else if ( m_pSocket ) + { + m_pSocket->stopAccepting(); + } + else if( _xAcceptor.is() ) + { + _xAcceptor->stopAccepting(); + } + +} + +OUString OAcceptor::getImplementationName() +{ + return "com.sun.star.comp.io.Acceptor"; +} + +sal_Bool OAcceptor::supportsService(const OUString& ServiceName) +{ + return cppu::supportsService(this, ServiceName); +} + +Sequence< OUString > OAcceptor::getSupportedServiceNames() +{ + return { "com.sun.star.connection.Acceptor" }; +} + +extern "C" SAL_DLLPUBLIC_EXPORT css::uno::XInterface* +io_OAcceptor_get_implementation( + css::uno::XComponentContext* context, css::uno::Sequence<css::uno::Any> const&) +{ + return cppu::acquire(new OAcceptor(context)); +} + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/io/source/acceptor/acceptor.hxx b/io/source/acceptor/acceptor.hxx new file mode 100644 index 000000000..9214a10b7 --- /dev/null +++ b/io/source/acceptor/acceptor.hxx @@ -0,0 +1,74 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#pragma once + +#include <osl/pipe.hxx> +#include <osl/socket.hxx> +#include <mutex> + +#include <com/sun/star/uno/Reference.hxx> + +namespace com::sun::star::connection { class XConnection; } + +namespace io_acceptor { + + class PipeAcceptor + { + public: + PipeAcceptor( const OUString &sPipeName , const OUString &sConnectionDescription ); + + void init(); + css::uno::Reference < css::connection::XConnection > accept( ); + + void stopAccepting(); + + std::mutex m_mutex; + ::osl::Pipe m_pipe; + OUString m_sPipeName; + OUString m_sConnectionDescription; + bool m_bClosed; + }; + + class SocketAcceptor + { + public: + SocketAcceptor( const OUString & sSocketName , + sal_uInt16 nPort, + bool bTcpNoDelay, + const OUString &sConnectionDescription ); + + void init(); + css::uno::Reference < css::connection::XConnection > accept(); + + void stopAccepting(); + + ::osl::SocketAddr m_addr; + ::osl::AcceptorSocket m_socket; + OUString m_sSocketName; + OUString m_sConnectionDescription; + sal_uInt16 m_nPort; + bool m_bTcpNoDelay; + bool m_bClosed; + }; + +} + + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/io/source/connector/connector.cxx b/io/source/connector/connector.cxx new file mode 100644 index 000000000..15720b242 --- /dev/null +++ b/io/source/connector/connector.cxx @@ -0,0 +1,178 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#include <osl/security.hxx> +#include <sal/log.hxx> + +#include <cppuhelper/implbase.hxx> +#include <cppuhelper/supportsservice.hxx> +#include <cppuhelper/unourl.hxx> +#include <rtl/malformeduriexception.hxx> +#include <rtl/ref.hxx> +#include <o3tl/string_view.hxx> + +#include <com/sun/star/lang/XServiceInfo.hpp> +#include <com/sun/star/connection/ConnectionSetupException.hpp> +#include <com/sun/star/connection/NoConnectException.hpp> +#include <com/sun/star/connection/XConnector.hpp> +#include <com/sun/star/uno/XComponentContext.hpp> + +#include "connector.hxx" + +using namespace ::osl; +using namespace ::cppu; +using namespace ::com::sun::star::uno; +using namespace ::com::sun::star::lang; +using namespace ::com::sun::star::connection; + +namespace { + + class OConnector : public WeakImplHelper< XConnector, XServiceInfo > + { + Reference< XMultiComponentFactory > _xSMgr; + Reference< XComponentContext > _xCtx; + public: + explicit OConnector(const Reference< XComponentContext > &xCtx); + + // Methods + virtual Reference< XConnection > SAL_CALL connect( + const OUString& sConnectionDescription ) override; + + public: // XServiceInfo + virtual OUString SAL_CALL getImplementationName() override; + virtual Sequence< OUString > SAL_CALL getSupportedServiceNames() override; + virtual sal_Bool SAL_CALL supportsService(const OUString& ServiceName) override; + }; + +} + +OConnector::OConnector(const Reference< XComponentContext > &xCtx) + : _xSMgr( xCtx->getServiceManager() ) + , _xCtx( xCtx ) +{} + +Reference< XConnection > SAL_CALL OConnector::connect( const OUString& sConnectionDescription ) +{ + // split string into tokens + try + { + cppu::UnoUrlDescriptor aDesc(sConnectionDescription); + + Reference< XConnection > r; + if ( aDesc.getName() == "pipe" ) + { + OUString aName(aDesc.getParameter("name")); + + rtl::Reference<stoc_connector::PipeConnection> pConn(new stoc_connector::PipeConnection( sConnectionDescription )); + + if( pConn->m_pipe.create( aName.pData, osl_Pipe_OPEN, osl::Security() ) ) + { + r = pConn; + } + else + { + OUString const sMessage( + "Connector : couldn't connect to pipe \"" + aName + "\": " + + OUString::number(pConn->m_pipe.getError())); + SAL_WARN("io.connector", sMessage); + throw NoConnectException( sMessage ); + } + } + else if ( aDesc.getName() == "socket" ) + { + OUString aHost; + if (aDesc.hasParameter("host")) + aHost = aDesc.getParameter("host"); + else + aHost = "localhost"; + sal_uInt16 nPort = static_cast< sal_uInt16 >( + aDesc.getParameter("port"). + toInt32()); + bool bTcpNoDelay + = aDesc.getParameter("tcpnodelay").toInt32() != 0; + + rtl::Reference<stoc_connector::SocketConnection> pConn(new stoc_connector::SocketConnection( sConnectionDescription)); + + SocketAddr AddrTarget( aHost.pData, nPort ); + if(pConn->m_socket.connect(AddrTarget) != osl_Socket_Ok) + { + OUString sMessage("Connector : couldn't connect to socket ("); + OUString sError = pConn->m_socket.getErrorAsString(); + sMessage += sError + ")"; + throw NoConnectException( sMessage ); + } + // we enable tcpNoDelay for loopback connections because + // it can make a significant speed difference on linux boxes. + if( bTcpNoDelay || aHost == "localhost" || aHost.startsWith("127.0.0.") ) + { + sal_Int32 nTcpNoDelay = sal_Int32(true); + pConn->m_socket.setOption( osl_Socket_OptionTcpNoDelay , &nTcpNoDelay, + sizeof( nTcpNoDelay ) , osl_Socket_LevelTcp ); + } + pConn->completeConnectionString(); + r = pConn; + } + else + { + OUString delegatee= "com.sun.star.connection.Connector." + aDesc.getName(); + + Reference<XConnector> xConnector( + _xSMgr->createInstanceWithContext(delegatee, _xCtx), UNO_QUERY ); + + if(!xConnector.is()) + throw ConnectionSetupException("Connector: unknown delegatee " + delegatee); + + sal_Int32 index = sConnectionDescription.indexOf(','); + + r = xConnector->connect(OUString(o3tl::trim(sConnectionDescription.subView(index + 1)))); + } + return r; + } + catch (const rtl::MalformedUriException & rEx) + { + throw ConnectionSetupException(rEx.getMessage()); + } +} + +OUString OConnector::getImplementationName() +{ + return "com.sun.star.comp.io.Connector"; +} + +sal_Bool OConnector::supportsService(const OUString& ServiceName) +{ + return cppu::supportsService(this, ServiceName); +} + +Sequence< OUString > OConnector::getSupportedServiceNames() +{ + return { "com.sun.star.connection.Connector" }; +} + +extern "C" SAL_DLLPUBLIC_EXPORT css::uno::XInterface* +io_OConnector_get_implementation( + css::uno::XComponentContext* context, css::uno::Sequence<css::uno::Any> const&) +{ + return cppu::acquire(new OConnector(context)); +} + + + + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/io/source/connector/connector.hxx b/io/source/connector/connector.hxx new file mode 100644 index 000000000..691fc7a88 --- /dev/null +++ b/io/source/connector/connector.hxx @@ -0,0 +1,94 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#pragma once + +#include <cppuhelper/implbase.hxx> + +#include <com/sun/star/connection/XConnection.hpp> +#include <com/sun/star/connection/XConnectionBroadcaster.hpp> + +#include <unordered_set> +#include <osl/socket.hxx> +#include <osl/pipe.hxx> +#include <mutex> + +namespace stoc_connector +{ + typedef std::unordered_set< css::uno::Reference< css::io::XStreamListener> > + XStreamListener_hash_set; + + class PipeConnection : + public ::cppu::WeakImplHelper< css::connection::XConnection > + + { + public: + explicit PipeConnection( const OUString &sConnectionDescription ); + virtual ~PipeConnection() override; + + virtual sal_Int32 SAL_CALL read( css::uno::Sequence< sal_Int8 >& aReadBytes, + sal_Int32 nBytesToRead ) override; + virtual void SAL_CALL write( const css::uno::Sequence< sal_Int8 >& aData ) override; + virtual void SAL_CALL flush( ) override; + virtual void SAL_CALL close( ) override; + virtual OUString SAL_CALL getDescription( ) override; + public: + ::osl::StreamPipe m_pipe; + oslInterlockedCount m_nStatus; + OUString m_sDescription; + }; + + class SocketConnection : + public ::cppu::WeakImplHelper< css::connection::XConnection, css::connection::XConnectionBroadcaster > + + { + public: + explicit SocketConnection( const OUString & sConnectionDescription ); + virtual ~SocketConnection() override; + + virtual sal_Int32 SAL_CALL read( css::uno::Sequence< sal_Int8 >& aReadBytes, + sal_Int32 nBytesToRead ) override; + virtual void SAL_CALL write( const css::uno::Sequence< sal_Int8 >& aData ) override; + virtual void SAL_CALL flush( ) override; + virtual void SAL_CALL close( ) override; + virtual OUString SAL_CALL getDescription( ) override; + + + // XConnectionBroadcaster + virtual void SAL_CALL addStreamListener(const css::uno::Reference< css::io::XStreamListener>& aListener) override; + virtual void SAL_CALL removeStreamListener(const css::uno::Reference< css::io::XStreamListener>& aListener) override; + + public: + void completeConnectionString(); + + ::osl::ConnectorSocket m_socket; + oslInterlockedCount m_nStatus; + OUString m_sDescription; + + std::mutex _mutex; + bool _started; + bool _closed; + bool _error; + + XStreamListener_hash_set _listeners; + }; +} + + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/io/source/connector/ctr_pipe.cxx b/io/source/connector/ctr_pipe.cxx new file mode 100644 index 000000000..5ee0b70bc --- /dev/null +++ b/io/source/connector/ctr_pipe.cxx @@ -0,0 +1,97 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#include <sal/config.h> + +#include <com/sun/star/io/IOException.hpp> + +#include "connector.hxx" +#include <osl/pipe.hxx> + +using namespace ::osl; +using namespace ::com::sun::star::uno; +using namespace ::com::sun::star::io; +using namespace ::com::sun::star::connection; + + +namespace stoc_connector { + + PipeConnection::PipeConnection( const OUString & sConnectionDescription ) : + m_nStatus( 0 ), + m_sDescription( sConnectionDescription ) + { + // make it unique + m_sDescription += ",uniqueValue="; + m_sDescription += OUString::number( + sal::static_int_cast< sal_Int64 >( + reinterpret_cast< sal_IntPtr >(&m_pipe)) ); + } + + PipeConnection::~PipeConnection() + { + } + + sal_Int32 PipeConnection::read( Sequence < sal_Int8 > & aReadBytes , sal_Int32 nBytesToRead ) + { + if( m_nStatus ) + { + throw IOException("pipe already closed"); + } + if( aReadBytes.getLength() != nBytesToRead ) + { + aReadBytes.realloc( nBytesToRead ); + } + return m_pipe.read( aReadBytes.getArray() , aReadBytes.getLength() ); + + } + + void PipeConnection::write( const Sequence < sal_Int8 > &seq ) + { + if( m_nStatus ) + { + throw IOException("pipe already closed"); + } + if( m_pipe.write( seq.getConstArray() , seq.getLength() ) != seq.getLength() ) + { + throw IOException("short write"); + } + } + + void PipeConnection::flush( ) + { + + } + + void PipeConnection::close() + { + // ensure that close is called only once + if(1 == osl_atomic_increment( (&m_nStatus) ) ) + { + m_pipe.close(); + } + } + + OUString PipeConnection::getDescription() + { + return m_sDescription; + } + +} + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/io/source/connector/ctr_socket.cxx b/io/source/connector/ctr_socket.cxx new file mode 100644 index 000000000..dcdeef07f --- /dev/null +++ b/io/source/connector/ctr_socket.cxx @@ -0,0 +1,225 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + + +#include "connector.hxx" +#include <com/sun/star/io/IOException.hpp> + +using namespace ::osl; +using namespace ::com::sun::star::uno; +using namespace ::com::sun::star::io; +using namespace ::com::sun::star::connection; + + +namespace stoc_connector { + template<class T> + static void notifyListeners(SocketConnection * pCon, bool * notified, T t) + { + XStreamListener_hash_set listeners; + + { + std::unique_lock guard(pCon->_mutex); + if(!*notified) + { + *notified = true; + listeners = pCon->_listeners; + } + } + + for(auto& listener : listeners) + t(listener); + } + + + static void callStarted(const Reference<XStreamListener>& xStreamListener) + { + xStreamListener->started(); + } + + namespace { + + struct callError { + const Any & any; + + explicit callError(const Any & any); + + void operator () (const Reference<XStreamListener>& xStreamListener); + }; + + } + + callError::callError(const Any & aAny) + : any(aAny) + { + } + + void callError::operator () (const Reference<XStreamListener>& xStreamListener) + { + xStreamListener->error(any); + } + + static void callClosed(const Reference<XStreamListener>& xStreamListener) + { + xStreamListener->closed(); + } + + + SocketConnection::SocketConnection( const OUString &sConnectionDescription ) : + m_nStatus( 0 ), + m_sDescription( sConnectionDescription ), + _started(false), + _closed(false), + _error(false) + { + // make it unique + m_sDescription += ",uniqueValue="; + m_sDescription += OUString::number( + sal::static_int_cast< sal_Int64 >( + reinterpret_cast< sal_IntPtr >(&m_socket)) ); + } + + SocketConnection::~SocketConnection() + { + } + + void SocketConnection::completeConnectionString() + { + sal_Int32 nPort; + + nPort = m_socket.getPeerPort(); + + m_sDescription += + ",peerPort=" + OUString::number( nPort ) + + ",peerHost=" + m_socket.getPeerHost() + + ",localPort=" + OUString::number( nPort ) + + ",localHost=" + m_socket.getLocalHost( ); + } + + sal_Int32 SocketConnection::read( Sequence < sal_Int8 > & aReadBytes , sal_Int32 nBytesToRead ) + { + if( ! m_nStatus ) + { + notifyListeners(this, &_started, callStarted); + + if( aReadBytes.getLength() != nBytesToRead ) + { + aReadBytes.realloc( nBytesToRead ); + } + sal_Int32 i = m_socket.read( aReadBytes.getArray() , aReadBytes.getLength() ); + + if(i != nBytesToRead && m_socket.getError() != osl_Socket_E_None) + { + OUString message = "ctr_socket.cxx:SocketConnection::read: error - " + + m_socket.getErrorAsString(); + + IOException ioException(message, static_cast<XConnection *>(this)); + + Any any; + any <<= ioException; + + notifyListeners(this, &_error, callError(any)); + + throw ioException; + } + + return i; + } + else + { + IOException ioException("ctr_socket.cxx:SocketConnection::read: error - connection already closed", static_cast<XConnection *>(this)); + + Any any; + any <<= ioException; + + notifyListeners(this, &_error, callError(any)); + + throw ioException; + } + } + + void SocketConnection::write( const Sequence < sal_Int8 > &seq ) + { + if( ! m_nStatus ) + { + if( m_socket.write( seq.getConstArray() , seq.getLength() ) != seq.getLength() ) + { + OUString message = "ctr_socket.cxx:SocketConnection::write: error - " + + m_socket.getErrorAsString(); + + IOException ioException(message, static_cast<XConnection *>(this)); + + Any any; + any <<= ioException; + + notifyListeners(this, &_error, callError(any)); + + throw ioException; + } + } + else + { + IOException ioException("ctr_socket.cxx:SocketConnection::write: error - connection already closed", static_cast<XConnection *>(this)); + + Any any; + any <<= ioException; + + notifyListeners(this, &_error, callError(any)); + + throw ioException; + } + } + + void SocketConnection::flush( ) + { + + } + + void SocketConnection::close() + { + // ensure that close is called only once + if( 1 == osl_atomic_increment( (&m_nStatus) ) ) + { + m_socket.shutdown(); + notifyListeners(this, &_closed, callClosed); + } + } + + OUString SocketConnection::getDescription() + { + return m_sDescription; + } + + + // XConnectionBroadcaster + void SAL_CALL SocketConnection::addStreamListener(const Reference<XStreamListener> & aListener) + { + std::unique_lock guard(_mutex); + + _listeners.insert(aListener); + } + + void SAL_CALL SocketConnection::removeStreamListener(const Reference<XStreamListener> & aListener) + { + std::unique_lock guard(_mutex); + + _listeners.erase(aListener); + } +} + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/io/source/io.component b/io/source/io.component new file mode 100644 index 000000000..a5712c0f6 --- /dev/null +++ b/io/source/io.component @@ -0,0 +1,70 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + --> + +<component loader="com.sun.star.loader.SharedLibrary" environment="@CPPU_ENV@" + xmlns="http://openoffice.org/2010/uno-components"> + <implementation name="com.sun.star.comp.io.Pump" + constructor="io_Pump_get_implementation"> + <service name="com.sun.star.io.Pump"/> + </implementation> + <implementation name="com.sun.star.comp.io.stm.DataInputStream" + constructor="io_ODataInputStream_get_implementation"> + <service name="com.sun.star.io.DataInputStream"/> + </implementation> + <implementation name="com.sun.star.comp.io.stm.DataOutputStream" + constructor="io_ODataOutputStream_get_implementation"> + <service name="com.sun.star.io.DataOutputStream"/> + </implementation> + <implementation name="com.sun.star.comp.io.stm.MarkableInputStream" + constructor="io_OMarkableInputStream_get_implementation"> + <service name="com.sun.star.io.MarkableInputStream"/> + </implementation> + <implementation name="com.sun.star.comp.io.stm.MarkableOutputStream" + constructor="io_OMarkableOutputStream_get_implementation"> + <service name="com.sun.star.io.MarkableOutputStream"/> + </implementation> + <implementation name="com.sun.star.comp.io.stm.ObjectInputStream" + constructor="io_OObjectInputStream_get_implementation"> + <service name="com.sun.star.io.ObjectInputStream"/> + </implementation> + <implementation name="com.sun.star.comp.io.stm.ObjectOutputStream" + constructor="io_OObjectOutputStream_get_implementation"> + <service name="com.sun.star.io.ObjectOutputStream"/> + </implementation> + <implementation name="com.sun.star.comp.io.stm.Pipe" + constructor="io_OPipeImpl_get_implementation"> + <service name="com.sun.star.io.Pipe"/> + </implementation> + <implementation name="com.sun.star.comp.io.Acceptor" + constructor="io_OAcceptor_get_implementation"> + <service name="com.sun.star.connection.Acceptor"/> + </implementation> + <implementation name="com.sun.star.comp.io.Connector" + constructor="io_OConnector_get_implementation"> + <service name="com.sun.star.connection.Connector"/> + </implementation> + <implementation name="com.sun.star.comp.io.TextInputStream" + constructor="io_OTextInputStream_get_implementation"> + <service name="com.sun.star.io.TextInputStream"/> + </implementation> + <implementation name="com.sun.star.comp.io.TextOutputStream" + constructor="io_OTextOutputStream_get_implementation"> + <service name="com.sun.star.io.TextOutputStream"/> + </implementation> +</component> diff --git a/io/source/stm/odata.cxx b/io/source/stm/odata.cxx new file mode 100644 index 000000000..754cda450 --- /dev/null +++ b/io/source/stm/odata.cxx @@ -0,0 +1,1231 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#include <string.h> +#include <unordered_map> +#include <vector> + +#include <cppuhelper/weak.hxx> +#include <cppuhelper/implbase.hxx> +#include <cppuhelper/supportsservice.hxx> +#include <osl/endian.h> +#include <tools/long.hxx> + +#include <com/sun/star/io/NotConnectedException.hpp> +#include <com/sun/star/io/XObjectInputStream.hpp> +#include <com/sun/star/io/XObjectOutputStream.hpp> +#include <com/sun/star/io/XActiveDataSource.hpp> +#include <com/sun/star/io/XActiveDataSink.hpp> +#include <com/sun/star/io/XMarkableStream.hpp> +#include <com/sun/star/io/XConnectable.hpp> +#include <com/sun/star/io/UnexpectedEOFException.hpp> +#include <com/sun/star/io/WrongFormatException.hpp> +#include <com/sun/star/lang/XServiceInfo.hpp> +#include <com/sun/star/uno/XComponentContext.hpp> + +using namespace ::cppu; +using namespace ::osl; +using namespace ::std; +using namespace ::com::sun::star::io; +using namespace ::com::sun::star::uno; +using namespace ::com::sun::star::lang; + +namespace io_stm { + +namespace { + +class ODataInputStream : + public WeakImplHelper < + XDataInputStream, + XActiveDataSink, + XConnectable, + XServiceInfo + > +{ +public: + ODataInputStream( ) + : m_bValidStream( false ) + { + } + +public: // XInputStream + virtual sal_Int32 SAL_CALL readBytes(Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead) override; + virtual sal_Int32 SAL_CALL readSomeBytes(Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead) override; + virtual void SAL_CALL skipBytes(sal_Int32 nBytesToSkip) override; + virtual sal_Int32 SAL_CALL available() override; + virtual void SAL_CALL closeInput() override; + +public: // XDataInputStream + virtual sal_Int8 SAL_CALL readBoolean() override; + virtual sal_Int8 SAL_CALL readByte() override; + virtual sal_Unicode SAL_CALL readChar() override; + virtual sal_Int16 SAL_CALL readShort() override; + virtual sal_Int32 SAL_CALL readLong() override; + virtual sal_Int64 SAL_CALL readHyper() override; + virtual float SAL_CALL readFloat() override; + virtual double SAL_CALL readDouble() override; + virtual OUString SAL_CALL readUTF() override; + + +public: // XActiveDataSink + virtual void SAL_CALL setInputStream(const Reference< XInputStream > & aStream) override; + virtual Reference< XInputStream > SAL_CALL getInputStream() override; + +public: // XConnectable + virtual void SAL_CALL setPredecessor(const Reference < XConnectable >& aPredecessor) override; + virtual Reference < XConnectable > SAL_CALL getPredecessor() override; + virtual void SAL_CALL setSuccessor(const Reference < XConnectable >& aSuccessor) override; + virtual Reference < XConnectable > SAL_CALL getSuccessor() override ; + + +public: // XServiceInfo + OUString SAL_CALL getImplementationName() override; + Sequence< OUString > SAL_CALL getSupportedServiceNames() override; + sal_Bool SAL_CALL supportsService(const OUString& ServiceName) override; + +protected: + + Reference < XConnectable > m_pred; + Reference < XConnectable > m_succ; + Reference < XInputStream > m_input; + bool m_bValidStream; +}; + +} + +// XInputStream +sal_Int32 ODataInputStream::readBytes(Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead) +{ + if( !m_bValidStream ) + { + throw NotConnectedException( ); + } + sal_Int32 nRead = m_input->readBytes( aData , nBytesToRead ); + return nRead; +} + +sal_Int32 ODataInputStream::readSomeBytes(Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead) +{ + if( !m_bValidStream ) + throw NotConnectedException( ); + sal_Int32 nRead = m_input->readSomeBytes( aData , nMaxBytesToRead ); + return nRead; +} +void ODataInputStream::skipBytes(sal_Int32 nBytesToSkip) +{ + if( !m_bValidStream ) + throw NotConnectedException( ); + m_input->skipBytes( nBytesToSkip ); +} + + +sal_Int32 ODataInputStream::available() +{ + if( !m_bValidStream ) + throw NotConnectedException( ); + sal_Int32 nAvail = m_input->available( ); + return nAvail; +} + +void ODataInputStream::closeInput() +{ + if( !m_bValidStream ) + throw NotConnectedException( ); + m_input->closeInput( ); + setInputStream( Reference< XInputStream > () ); + setPredecessor( Reference < XConnectable >() ); + setSuccessor( Reference < XConnectable >() ); + m_bValidStream = false; +} + + +//== XDataInputStream =========================================== + +// XDataInputStream +sal_Int8 ODataInputStream::readBoolean() +{ + return readByte(); +} + +sal_Int8 ODataInputStream::readByte() +{ + Sequence<sal_Int8> aTmp(1); + if( 1 != readBytes( aTmp, 1 ) ) + { + throw UnexpectedEOFException(); + } + return aTmp.getConstArray()[0]; +} + +sal_Unicode ODataInputStream::readChar() +{ + Sequence<sal_Int8> aTmp(2); + if( 2 != readBytes( aTmp, 2 ) ) + { + throw UnexpectedEOFException(); + } + + const sal_uInt8 * pBytes = reinterpret_cast<const sal_uInt8 *>(aTmp.getConstArray()); + return (static_cast<sal_Unicode>(pBytes[0]) << 8) + pBytes[1]; +} + +sal_Int16 ODataInputStream::readShort() +{ + Sequence<sal_Int8> aTmp(2); + if( 2 != readBytes( aTmp, 2 ) ) + { + throw UnexpectedEOFException(); + } + + const sal_uInt8 * pBytes = reinterpret_cast<const sal_uInt8 *>(aTmp.getConstArray()); + return (static_cast<sal_Int16>(pBytes[0]) << 8) + pBytes[1]; +} + + +sal_Int32 ODataInputStream::readLong() +{ + Sequence<sal_Int8> aTmp(4); + if( 4 != readBytes( aTmp, 4 ) ) + { + throw UnexpectedEOFException( ); + } + + const sal_uInt8 * pBytes = reinterpret_cast<const sal_uInt8 *>(aTmp.getConstArray()); + return (static_cast<sal_Int32>(pBytes[0]) << 24) + (static_cast<sal_Int32>(pBytes[1]) << 16) + (static_cast<sal_Int32>(pBytes[2]) << 8) + pBytes[3]; +} + + +sal_Int64 ODataInputStream::readHyper() +{ + Sequence<sal_Int8> aTmp(8); + if( 8 != readBytes( aTmp, 8 ) ) + { + throw UnexpectedEOFException( ); + } + + const sal_uInt8 * pBytes = reinterpret_cast<const sal_uInt8 *>(aTmp.getConstArray()); + return + (static_cast<sal_Int64>(pBytes[0]) << 56) + + (static_cast<sal_Int64>(pBytes[1]) << 48) + + (static_cast<sal_Int64>(pBytes[2]) << 40) + + (static_cast<sal_Int64>(pBytes[3]) << 32) + + (static_cast<sal_Int64>(pBytes[4]) << 24) + + (static_cast<sal_Int64>(pBytes[5]) << 16) + + (static_cast<sal_Int64>(pBytes[6]) << 8) + + pBytes[7]; +} + +float ODataInputStream::readFloat() +{ + union { float f; sal_uInt32 n; } a; + a.n = readLong(); + return a.f; +} + +double ODataInputStream::readDouble() +{ + union { double d; struct { sal_uInt32 n1; sal_uInt32 n2; } ad; } a; +#if defined OSL_LITENDIAN + a.ad.n2 = readLong(); + a.ad.n1 = readLong(); +#else + a.ad.n1 = readLong(); + a.ad.n2 = readLong(); +#endif + return a.d; +} + +OUString ODataInputStream::readUTF() +{ + sal_uInt16 nShortLen = static_cast<sal_uInt16>(readShort()); + sal_Int32 nUTFLen; + + if( (sal_uInt16(0xffff)) == nShortLen ) + { + // is interpreted as a sign, that string is longer than 64k + // incompatible to older XDataInputStream-routines, when strings are exactly 64k + nUTFLen = readLong(); + } + else + { + nUTFLen = static_cast<sal_Int32>(nShortLen); + } + + Sequence<sal_Unicode> aBuffer( nUTFLen ); + sal_Unicode * pStr = aBuffer.getArray(); + + sal_Int32 nCount = 0; + sal_Int32 nStrLen = 0; + while( nCount < nUTFLen ) + { + sal_uInt8 c = static_cast<sal_uInt8>(readByte()); + sal_uInt8 char2, char3; + switch( c >> 4 ) + { + case 0: case 1: case 2: case 3: case 4: case 5: case 6: case 7: + // 0xxxxxxx + nCount++; + pStr[nStrLen++] = c; + break; + + case 12: case 13: + // 110x xxxx 10xx xxxx + nCount += 2; + if( nCount > nUTFLen ) + { + throw WrongFormatException( ); + } + + char2 = static_cast<sal_uInt8>(readByte()); + if( (char2 & 0xC0) != 0x80 ) + { + throw WrongFormatException( ); + } + + pStr[nStrLen++] = (sal_Unicode(c & 0x1F) << 6) | (char2 & 0x3F); + break; + + case 14: + // 1110 xxxx 10xx xxxx 10xx xxxx + nCount += 3; + if( nCount > nUTFLen ) + { + throw WrongFormatException( ); + } + + char2 = static_cast<sal_uInt8>(readByte()); + char3 = static_cast<sal_uInt8>(readByte()); + + if( ((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80) ) { + throw WrongFormatException( ); + } + pStr[nStrLen++] = (sal_Unicode(c & 0x0F) << 12) | + (sal_Unicode(char2 & 0x3F) << 6) | + (char3 & 0x3F); + break; + + default: + // 10xx xxxx, 1111 xxxx + throw WrongFormatException(); + //throw new UTFDataFormatException(); + } + } + return OUString( pStr, nStrLen ); +} + + +// XActiveDataSource +void ODataInputStream::setInputStream(const Reference< XInputStream > & aStream) +{ + + if( m_input != aStream ) { + m_input = aStream; + + Reference < XConnectable > pred( m_input , UNO_QUERY ); + setPredecessor( pred ); + } + + m_bValidStream = m_input.is(); +} + +Reference< XInputStream > ODataInputStream::getInputStream() +{ + return m_input; +} + + +// XDataSink +void ODataInputStream::setSuccessor( const Reference < XConnectable > &r ) +{ + /// if the references match, nothing needs to be done + if( m_succ != r ) { + /// store the reference for later use + m_succ = r; + + if( m_succ.is() ) { + /// set this instance as the sink ! + m_succ->setPredecessor( Reference< XConnectable > ( + static_cast< XConnectable * >(this) ) ); + } + } +} + +Reference < XConnectable > ODataInputStream::getSuccessor() +{ + return m_succ; +} + + +// XDataSource +void ODataInputStream::setPredecessor( const Reference < XConnectable > &r ) +{ + if( r != m_pred ) { + m_pred = r; + if( m_pred.is() ) { + m_pred->setSuccessor( Reference< XConnectable > ( + static_cast< XConnectable * >(this) ) ); + } + } +} +Reference < XConnectable > ODataInputStream::getPredecessor() +{ + return m_pred; +} + +// XServiceInfo +OUString ODataInputStream::getImplementationName() +{ + return "com.sun.star.comp.io.stm.DataInputStream"; +} + +// XServiceInfo +sal_Bool ODataInputStream::supportsService(const OUString& ServiceName) +{ + return cppu::supportsService(this, ServiceName); +} + +// XServiceInfo +Sequence< OUString > ODataInputStream::getSupportedServiceNames() +{ + return { "com.sun.star.io.DataInputStream" }; +} + +extern "C" SAL_DLLPUBLIC_EXPORT css::uno::XInterface* +io_ODataInputStream_get_implementation( + css::uno::XComponentContext* , css::uno::Sequence<css::uno::Any> const&) +{ + return cppu::acquire(new ODataInputStream()); +} + +namespace { + +class ODataOutputStream : + public WeakImplHelper < + XDataOutputStream, + XActiveDataSource, + XConnectable, + XServiceInfo > +{ +public: + ODataOutputStream() + : m_bValidStream( false ) + { + } + +public: // XOutputStream + virtual void SAL_CALL writeBytes(const Sequence< sal_Int8 >& aData) override; + virtual void SAL_CALL flush() override; + virtual void SAL_CALL closeOutput() override; + +public: // XDataOutputStream + virtual void SAL_CALL writeBoolean(sal_Bool Value) override; + virtual void SAL_CALL writeByte(sal_Int8 Value) override; + virtual void SAL_CALL writeChar(sal_Unicode Value) override; + virtual void SAL_CALL writeShort(sal_Int16 Value) override; + virtual void SAL_CALL writeLong(sal_Int32 Value) override; + virtual void SAL_CALL writeHyper(sal_Int64 Value) override; + virtual void SAL_CALL writeFloat(float Value) override; + virtual void SAL_CALL writeDouble(double Value) override; + virtual void SAL_CALL writeUTF(const OUString& Value) override; + +public: // XActiveDataSource + virtual void SAL_CALL setOutputStream(const Reference< XOutputStream > & aStream) override; + virtual Reference < XOutputStream > SAL_CALL getOutputStream() override; + +public: // XConnectable + virtual void SAL_CALL setPredecessor(const Reference < XConnectable >& aPredecessor) override; + virtual Reference < XConnectable > SAL_CALL getPredecessor() override; + virtual void SAL_CALL setSuccessor(const Reference < XConnectable >& aSuccessor) override; + virtual Reference < XConnectable > SAL_CALL getSuccessor() override; + +public: // XServiceInfo + OUString SAL_CALL getImplementationName() override; + Sequence< OUString > SAL_CALL getSupportedServiceNames() override; + sal_Bool SAL_CALL supportsService(const OUString& ServiceName) override; + +protected: + Reference < XConnectable > m_succ; + Reference < XConnectable > m_pred; + Reference< XOutputStream > m_output; + bool m_bValidStream; +}; + +} + +// XOutputStream +void ODataOutputStream::writeBytes(const Sequence< sal_Int8 >& aData) +{ + if( !m_bValidStream ) + throw NotConnectedException( ); + m_output->writeBytes( aData ); +} + +void ODataOutputStream::flush() +{ + if( !m_bValidStream ) + throw NotConnectedException(); + m_output->flush(); +} + + +void ODataOutputStream::closeOutput() +{ + if( !m_bValidStream ) + throw NotConnectedException(); + m_output->closeOutput(); + setOutputStream( Reference< XOutputStream > () ); + setPredecessor( Reference < XConnectable >() ); + setSuccessor( Reference < XConnectable >() ); +} + +// XDataOutputStream +void ODataOutputStream::writeBoolean(sal_Bool Value) +{ + if( Value ) + { + writeByte( 1 ); + } + else + { + writeByte( 0 ); + } +} + + +void ODataOutputStream::writeByte(sal_Int8 Value) +{ + writeBytes( { Value } ); +} + +void ODataOutputStream::writeChar(sal_Unicode Value) +{ + writeBytes( { sal_Int8(Value >> 8), + sal_Int8(Value) } ); +} + + +void ODataOutputStream::writeShort(sal_Int16 Value) +{ + writeBytes( { sal_Int8(Value >> 8), + sal_Int8(Value) } ); +} + +void ODataOutputStream::writeLong(sal_Int32 Value) +{ + writeBytes( { sal_Int8(Value >> 24), + sal_Int8(Value >> 16), + sal_Int8(Value >> 8), + sal_Int8(Value) } ); +} + +void ODataOutputStream::writeHyper(sal_Int64 Value) +{ + writeBytes( { sal_Int8(Value >> 56), + sal_Int8(Value >> 48), + sal_Int8(Value >> 40), + sal_Int8(Value >> 32), + sal_Int8(Value >> 24), + sal_Int8(Value >> 16), + sal_Int8(Value >> 8), + sal_Int8(Value) } ); +} + + +void ODataOutputStream::writeFloat(float Value) +{ + union { float f; sal_uInt32 n; } a; + a.f = Value; + writeLong( a.n ); +} + +void ODataOutputStream::writeDouble(double Value) +{ + union { double d; struct { sal_uInt32 n1; sal_uInt32 n2; } ad; } a; + a.d = Value; +#if defined OSL_LITENDIAN + writeLong( a.ad.n2 ); + writeLong( a.ad.n1 ); +#else + writeLong( a.ad.n1 ); + writeLong( a.ad.n2 ); +#endif +} + +void ODataOutputStream::writeUTF(const OUString& Value) +{ + sal_Int32 nStrLen = Value.getLength(); + const sal_Unicode * pStr = Value.getStr(); + sal_Int32 nUTFLen = 0; + sal_Int32 i; + + for( i = 0 ; i < nStrLen ; i++ ) + { + sal_uInt16 c = pStr[i]; + if( (c >= 0x0001) && (c <= 0x007F) ) + { + nUTFLen++; + } + else if( c > 0x07FF ) + { + nUTFLen += 3; + } + else + { + nUTFLen += 2; + } + } + + + // compatibility mode for older implementations, where it was not possible + // to write blocks bigger than 64 k. Note that there is a tradeoff. Blocks, + // that are exactly 64k long can not be read by older routines when written + // with these routines and the other way round !!!!! + if( nUTFLen >= 0xFFFF ) { + writeShort( sal_Int16(-1) ); + writeLong( nUTFLen ); + } + else { + writeShort( static_cast<sal_uInt16>(nUTFLen) ); + } + for( i = 0 ; i < nStrLen ; i++ ) + { + sal_uInt16 c = pStr[i]; + if( (c >= 0x0001) && (c <= 0x007F) ) + { + writeByte(sal_Int8(c)); + } + else if( c > 0x07FF ) + { + writeByte(sal_Int8(0xE0 | ((c >> 12) & 0x0F))); + writeByte(sal_Int8(0x80 | ((c >> 6) & 0x3F))); + writeByte(sal_Int8(0x80 | ((c >> 0) & 0x3F))); + } + else + { + writeByte(sal_Int8(0xC0 | ((c >> 6) & 0x1F))); + writeByte(sal_Int8(0x80 | ((c >> 0) & 0x3F))); + } + } +} + +// XActiveDataSource +void ODataOutputStream::setOutputStream(const Reference< XOutputStream > & aStream) +{ + if( m_output != aStream ) { + m_output = aStream; + m_bValidStream = m_output.is(); + + Reference < XConnectable > succ( m_output , UNO_QUERY ); + setSuccessor( succ ); + } +} + +Reference< XOutputStream > ODataOutputStream::getOutputStream() +{ + return m_output; +} + + +// XDataSink +void ODataOutputStream::setSuccessor( const Reference < XConnectable > &r ) +{ + /// if the references match, nothing needs to be done + if( m_succ != r ) + { + /// store the reference for later use + m_succ = r; + + if( m_succ.is() ) + { + /// set this instance as the sink ! + m_succ->setPredecessor( Reference < XConnectable > ( + static_cast< XConnectable * >(this) )); + } + } +} +Reference < XConnectable > ODataOutputStream::getSuccessor() +{ + return m_succ; +} + + +// XDataSource +void ODataOutputStream::setPredecessor( const Reference < XConnectable > &r ) +{ + if( r != m_pred ) { + m_pred = r; + if( m_pred.is() ) { + m_pred->setSuccessor( Reference< XConnectable > ( + static_cast< XConnectable * >(this) )); + } + } +} +Reference < XConnectable > ODataOutputStream::getPredecessor() +{ + return m_pred; +} + + +// XServiceInfo +OUString ODataOutputStream::getImplementationName() +{ + return "com.sun.star.comp.io.stm.DataOutputStream"; +} + +// XServiceInfo +sal_Bool ODataOutputStream::supportsService(const OUString& ServiceName) +{ + return cppu::supportsService(this, ServiceName); +} + +// XServiceInfo +Sequence< OUString > ODataOutputStream::getSupportedServiceNames() +{ + return { "com.sun.star.io.DataOutputStream" }; +} + +extern "C" SAL_DLLPUBLIC_EXPORT css::uno::XInterface* +io_ODataOutputStream_get_implementation( + css::uno::XComponentContext* , css::uno::Sequence<css::uno::Any> const&) +{ + return cppu::acquire(new ODataOutputStream()); +} + +namespace { + +struct equalObjectContainer_Impl +{ + bool operator()(const Reference< XInterface > & s1, + const Reference< XInterface > & s2) const + { + return s1 == s2; + } +}; + + +struct hashObjectContainer_Impl +{ + size_t operator()(const Reference< XInterface > & xRef) const + { + return reinterpret_cast<size_t>(xRef.get()); + } +}; + +} + +typedef std::unordered_map +< + Reference< XInterface >, + sal_Int32, + hashObjectContainer_Impl, + equalObjectContainer_Impl +> ObjectContainer_Impl; + +namespace { + +class OObjectOutputStream: + public ImplInheritanceHelper< + ODataOutputStream, /* parent */ + XObjectOutputStream, XMarkableStream > +{ +public: + OObjectOutputStream() + : m_nMaxId(0) , + m_bValidMarkable(false) + { + } + +public: + // XOutputStream + virtual void SAL_CALL writeBytes(const Sequence< sal_Int8 >& aData) override + { ODataOutputStream::writeBytes( aData ); } + + virtual void SAL_CALL flush() override + { ODataOutputStream::flush(); } + + virtual void SAL_CALL closeOutput() override + { ODataOutputStream::closeOutput(); } + +public: + // XDataOutputStream + virtual void SAL_CALL writeBoolean(sal_Bool Value) override + { ODataOutputStream::writeBoolean( Value ); } + virtual void SAL_CALL writeByte(sal_Int8 Value) override + { ODataOutputStream::writeByte( Value ); } + virtual void SAL_CALL writeChar(sal_Unicode Value) override + { ODataOutputStream::writeChar( Value ); } + virtual void SAL_CALL writeShort(sal_Int16 Value) override + { ODataOutputStream::writeShort( Value ); } + virtual void SAL_CALL writeLong(sal_Int32 Value) override + { ODataOutputStream::writeLong( Value ); } + virtual void SAL_CALL writeHyper(sal_Int64 Value) override + { ODataOutputStream::writeHyper( Value ); } + virtual void SAL_CALL writeFloat(float Value) override + { ODataOutputStream::writeFloat( Value ); } + virtual void SAL_CALL writeDouble(double Value) override + { ODataOutputStream::writeDouble( Value ); } + virtual void SAL_CALL writeUTF(const OUString& Value) override + { ODataOutputStream::writeUTF( Value );} + + // XObjectOutputStream + virtual void SAL_CALL writeObject( const Reference< XPersistObject > & r ) override; + +public: // XMarkableStream + virtual sal_Int32 SAL_CALL createMark() override; + virtual void SAL_CALL deleteMark(sal_Int32 Mark) override; + virtual void SAL_CALL jumpToMark(sal_Int32 nMark) override; + virtual void SAL_CALL jumpToFurthest() override; + virtual sal_Int32 SAL_CALL offsetToMark(sal_Int32 nMark) override; + +public: // XServiceInfo + OUString SAL_CALL getImplementationName() override; + Sequence< OUString > SAL_CALL getSupportedServiceNames() override; + sal_Bool SAL_CALL supportsService(const OUString& ServiceName) override; + +private: + void connectToMarkable(); +private: + ObjectContainer_Impl m_mapObject; + sal_Int32 m_nMaxId; + Reference< XMarkableStream > m_rMarkable; + bool m_bValidMarkable; +}; + +} + +void OObjectOutputStream::writeObject( const Reference< XPersistObject > & xPObj ) +{ + + connectToMarkable(); + bool bWriteObj = false; + // create Mark to write length of info + sal_uInt32 nInfoLenMark = m_rMarkable->createMark(); + + // length of the info data (is later rewritten) + OObjectOutputStream::writeShort( 0 ); + + // write the object identifier + if( xPObj.is() ) + { + Reference< XInterface > rX( xPObj , UNO_QUERY ); + + ObjectContainer_Impl::const_iterator aIt + = m_mapObject.find( rX ); + if( aIt == m_mapObject.end() ) + { + // insert new object in hash table + m_mapObject[ rX ] = ++m_nMaxId; + ODataOutputStream::writeLong( m_nMaxId ); + ODataOutputStream::writeUTF( xPObj->getServiceName() ); + bWriteObj = true; + } + else + { + ODataOutputStream::writeLong( (*aIt).second ); + ODataOutputStream::writeUTF( OUString() ); + } + } + else + { + ODataOutputStream::writeLong( 0 ); + ODataOutputStream::writeUTF( OUString() ); + } + + sal_uInt32 nObjLenMark = m_rMarkable->createMark(); + ODataOutputStream::writeLong( 0 ); + + sal_Int32 nInfoLen = m_rMarkable->offsetToMark( nInfoLenMark ); + m_rMarkable->jumpToMark( nInfoLenMark ); + // write length of the info data + ODataOutputStream::writeShort( static_cast<sal_Int16>(nInfoLen) ); + // jump to the end of the stream + m_rMarkable->jumpToFurthest(); + + if( bWriteObj ) + xPObj->write( Reference< XObjectOutputStream > ( + static_cast< XObjectOutputStream * >(this) ) ); + + sal_Int32 nObjLen = m_rMarkable->offsetToMark( nObjLenMark ) -4; + m_rMarkable->jumpToMark( nObjLenMark ); + // write length of the info data + ODataOutputStream::writeLong( nObjLen ); + // jump to the end of the stream + m_rMarkable->jumpToFurthest(); + + m_rMarkable->deleteMark( nObjLenMark ); + m_rMarkable->deleteMark( nInfoLenMark ); +} + + +void OObjectOutputStream::connectToMarkable() +{ + if( m_bValidMarkable ) + return; + + if( ! m_bValidStream ) + throw NotConnectedException(); + + // find the markable stream ! + Reference< XInterface > rTry(m_output); + while( true ) { + if( ! rTry.is() ) + { + throw NotConnectedException(); + } + Reference < XMarkableStream > markable( rTry , UNO_QUERY ); + if( markable.is() ) + { + m_rMarkable = markable; + break; + } + Reference < XActiveDataSource > source( rTry , UNO_QUERY ); + rTry = source; + } + m_bValidMarkable = true; +} + + +sal_Int32 OObjectOutputStream::createMark() +{ + connectToMarkable(); // throws an exception, if a markable is not connected ! + + return m_rMarkable->createMark(); +} + +void OObjectOutputStream::deleteMark(sal_Int32 Mark) +{ + if( ! m_bValidMarkable ) + { + throw NotConnectedException(); + } + m_rMarkable->deleteMark( Mark ); +} + +void OObjectOutputStream::jumpToMark(sal_Int32 nMark) +{ + if( ! m_bValidMarkable ) + { + throw NotConnectedException(); + } + m_rMarkable->jumpToMark( nMark ); +} + + +void OObjectOutputStream::jumpToFurthest() +{ + connectToMarkable(); + m_rMarkable->jumpToFurthest(); +} + +sal_Int32 OObjectOutputStream::offsetToMark(sal_Int32 nMark) +{ + if( ! m_bValidMarkable ) + { + throw NotConnectedException(); + } + return m_rMarkable->offsetToMark( nMark ); +} + +// XServiceInfo +OUString OObjectOutputStream::getImplementationName() +{ + return "com.sun.star.comp.io.stm.ObjectOutputStream"; +} + +// XServiceInfo +sal_Bool OObjectOutputStream::supportsService(const OUString& ServiceName) +{ + return cppu::supportsService(this, ServiceName); +} + +// XServiceInfo +Sequence< OUString > OObjectOutputStream::getSupportedServiceNames() +{ + return { "com.sun.star.io.ObjectOutputStream" }; +} + +extern "C" SAL_DLLPUBLIC_EXPORT css::uno::XInterface* +io_OObjectOutputStream_get_implementation( + css::uno::XComponentContext* , css::uno::Sequence<css::uno::Any> const&) +{ + return cppu::acquire(new OObjectOutputStream()); +} + +namespace { + +class OObjectInputStream: + public ImplInheritanceHelper< + ODataInputStream, /* parent */ + XObjectInputStream, XMarkableStream > +{ +public: + explicit OObjectInputStream( const Reference < XComponentContext > &r) + : m_rSMgr( r->getServiceManager() ) + , m_rCxt( r ) + , m_bValidMarkable(false) + { + } + +public: // XInputStream + virtual sal_Int32 SAL_CALL readBytes(Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead) override + { return ODataInputStream::readBytes( aData , nBytesToRead ); } + + virtual sal_Int32 SAL_CALL readSomeBytes(Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead) override + { return ODataInputStream::readSomeBytes( aData, nMaxBytesToRead ); } + + virtual void SAL_CALL skipBytes(sal_Int32 nBytesToSkip) override + { ODataInputStream::skipBytes( nBytesToSkip ); } + + virtual sal_Int32 SAL_CALL available() override + { return ODataInputStream::available(); } + + virtual void SAL_CALL closeInput() override + { ODataInputStream::closeInput(); } + +public: // XDataInputStream + virtual sal_Int8 SAL_CALL readBoolean() override + { return ODataInputStream::readBoolean(); } + virtual sal_Int8 SAL_CALL readByte() override + { return ODataInputStream::readByte(); } + virtual sal_Unicode SAL_CALL readChar() override + { return ODataInputStream::readChar(); } + virtual sal_Int16 SAL_CALL readShort() override + { return ODataInputStream::readShort(); } + virtual sal_Int32 SAL_CALL readLong() override + { return ODataInputStream::readLong(); } + virtual sal_Int64 SAL_CALL readHyper() override + { return ODataInputStream::readHyper(); } + virtual float SAL_CALL readFloat() override + { return ODataInputStream::readFloat(); } + virtual double SAL_CALL readDouble() override + { return ODataInputStream::readDouble(); } + virtual OUString SAL_CALL readUTF() override + { return ODataInputStream::readUTF(); } + +public: // XObjectInputStream + virtual Reference< XPersistObject > SAL_CALL readObject( ) override; + +public: // XMarkableStream + virtual sal_Int32 SAL_CALL createMark() override; + virtual void SAL_CALL deleteMark(sal_Int32 Mark) override; + virtual void SAL_CALL jumpToMark(sal_Int32 nMark) override; + virtual void SAL_CALL jumpToFurthest() override; + virtual sal_Int32 SAL_CALL offsetToMark(sal_Int32 nMark) override; + +public: // XServiceInfo + OUString SAL_CALL getImplementationName() override; + Sequence< OUString > SAL_CALL getSupportedServiceNames() override; + sal_Bool SAL_CALL supportsService(const OUString& ServiceName) override; + +private: + void connectToMarkable(); +private: + Reference < XMultiComponentFactory > m_rSMgr; + Reference < XComponentContext > m_rCxt; + bool m_bValidMarkable; + Reference < XMarkableStream > m_rMarkable; + vector < Reference< XPersistObject > > m_aPersistVector; + +}; + +} + +Reference< XPersistObject > OObjectInputStream::readObject() +{ + // check if chain contains a XMarkableStream + connectToMarkable(); + + Reference< XPersistObject > xLoadedObj; + + // create Mark to skip newer versions + sal_uInt32 nMark = m_rMarkable->createMark(); + // length of the data + sal_Int32 nLen = static_cast<sal_uInt16>(ODataInputStream::readShort()); + if( nLen < 0xc ) + { + throw WrongFormatException(); + } + + // read the object identifier + sal_uInt32 nId = readLong(); + + // the name of the persist model + // MM ??? + OUString aName = readUTF(); + + // Read the length of the object + sal_Int32 nObjLen = readLong(); + if( 0 == nId && 0 != nObjLen ) + { + throw WrongFormatException(); + } + + // skip data of new version + skipBytes( nLen - m_rMarkable->offsetToMark( nMark ) ); + + bool bLoadSuccessful = true; + if( nId ) + { + if( !aName.isEmpty() ) + { + // load the object + Reference< XInterface > x = m_rSMgr->createInstanceWithContext( aName, m_rCxt ); + xLoadedObj.set( x, UNO_QUERY ); + if( xLoadedObj.is() ) + { + sal_uInt32 nSize = m_aPersistVector.size(); + if( nSize <= nId ) + { + // grow to the right size + Reference< XPersistObject > xEmpty; + m_aPersistVector.insert( m_aPersistVector.end(), static_cast<tools::Long>(nId - nSize + 1), xEmpty ); + } + + m_aPersistVector[nId] = xLoadedObj; + xLoadedObj->read( Reference< XObjectInputStream >( + static_cast< XObjectInputStream * >(this) ) ); + } + else + { + // no service with this name could be instantiated + bLoadSuccessful = false; + } + } + else { + if (nId >= m_aPersistVector.size()) + { + // id unknown, load failure ! + bLoadSuccessful = false; + } + else + { + // Object has already been read, + xLoadedObj = m_aPersistVector[nId]; + } + } + } + + // skip to the position behind the object + skipBytes( nObjLen + nLen - m_rMarkable->offsetToMark( nMark ) ); + m_rMarkable->deleteMark( nMark ); + + if( ! bLoadSuccessful ) + { + throw WrongFormatException(); + } + return xLoadedObj; +} + + +void OObjectInputStream::connectToMarkable() +{ + if( m_bValidMarkable ) return; + + if( ! m_bValidStream ) + { + throw NotConnectedException( ); + } + + // find the markable stream ! + Reference< XInterface > rTry(m_input); + while( true ) { + if( ! rTry.is() ) + { + throw NotConnectedException( ); + } + Reference< XMarkableStream > markable( rTry , UNO_QUERY ); + if( markable.is() ) + { + m_rMarkable = markable; + break; + } + Reference < XActiveDataSink > sink( rTry , UNO_QUERY ); + rTry = sink; + } + m_bValidMarkable = true; +} + +sal_Int32 OObjectInputStream::createMark() +{ + connectToMarkable(); // throws an exception, if a markable is not connected ! + + return m_rMarkable->createMark(); +} + +void OObjectInputStream::deleteMark(sal_Int32 Mark) +{ + if( ! m_bValidMarkable ) + { + throw NotConnectedException(); + } + m_rMarkable->deleteMark( Mark ); +} + +void OObjectInputStream::jumpToMark(sal_Int32 nMark) +{ + if( ! m_bValidMarkable ) + { + throw NotConnectedException(); + } + m_rMarkable->jumpToMark( nMark ); +} +void OObjectInputStream::jumpToFurthest() +{ + connectToMarkable(); + m_rMarkable->jumpToFurthest(); +} + +sal_Int32 OObjectInputStream::offsetToMark(sal_Int32 nMark) +{ + if( ! m_bValidMarkable ) + { + throw NotConnectedException(); + } + return m_rMarkable->offsetToMark( nMark ); +} + +// XServiceInfo +OUString OObjectInputStream::getImplementationName() +{ + return "com.sun.star.comp.io.stm.ObjectInputStream"; +} + +// XServiceInfo +sal_Bool OObjectInputStream::supportsService(const OUString& ServiceName) +{ + return cppu::supportsService(this, ServiceName); +} + +// XServiceInfo +Sequence< OUString > OObjectInputStream::getSupportedServiceNames() +{ + return { "com.sun.star.io.ObjectInputStream" }; +} + +extern "C" SAL_DLLPUBLIC_EXPORT css::uno::XInterface* +io_OObjectInputStream_get_implementation( + css::uno::XComponentContext* context, css::uno::Sequence<css::uno::Any> const&) +{ + return cppu::acquire(new OObjectInputStream(context)); +} + +} + + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/io/source/stm/omark.cxx b/io/source/stm/omark.cxx new file mode 100644 index 000000000..3991e2c9d --- /dev/null +++ b/io/source/stm/omark.cxx @@ -0,0 +1,766 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + + +#include <map> +#include <memory> + +#include <com/sun/star/io/BufferSizeExceededException.hpp> +#include <com/sun/star/io/NotConnectedException.hpp> +#include <com/sun/star/io/XMarkableStream.hpp> +#include <com/sun/star/io/XOutputStream.hpp> +#include <com/sun/star/io/XInputStream.hpp> +#include <com/sun/star/io/XActiveDataSource.hpp> +#include <com/sun/star/io/XActiveDataSink.hpp> +#include <com/sun/star/io/XConnectable.hpp> +#include <com/sun/star/lang/IllegalArgumentException.hpp> +#include <com/sun/star/lang/XServiceInfo.hpp> +#include <com/sun/star/uno/XComponentContext.hpp> + +#include <cppuhelper/weak.hxx> +#include <cppuhelper/implbase.hxx> +#include <cppuhelper/supportsservice.hxx> + +#include <osl/diagnose.h> +#include <mutex> + +using namespace ::std; +using namespace ::cppu; +using namespace ::osl; +using namespace ::com::sun::star::io; +using namespace ::com::sun::star::uno; +using namespace ::com::sun::star::lang; + +#include "streamhelper.hxx" + +namespace io_stm { + +namespace { + +/*********************** +* +* OMarkableOutputStream. +* +* This object allows to set marks in an outputstream. It is allowed to jump back to the marks and +* rewrite the same bytes. +* +* The object must buffer the data since the last mark set. Flush will not +* have any effect. As soon as the last mark has been removed, the object may write the data +* through to the chained object. +* +**********************/ +class OMarkableOutputStream : + public WeakImplHelper< XOutputStream , + XActiveDataSource , + XMarkableStream , + XConnectable, + XServiceInfo + > +{ +public: + OMarkableOutputStream( ); + +public: // XOutputStream + virtual void SAL_CALL writeBytes(const Sequence< sal_Int8 >& aData) override; + virtual void SAL_CALL flush() override; + virtual void SAL_CALL closeOutput() override; + +public: // XMarkable + virtual sal_Int32 SAL_CALL createMark() override; + virtual void SAL_CALL deleteMark(sal_Int32 Mark) override; + virtual void SAL_CALL jumpToMark(sal_Int32 nMark) override; + virtual void SAL_CALL jumpToFurthest() override; + virtual sal_Int32 SAL_CALL offsetToMark(sal_Int32 nMark) override; + +public: // XActiveDataSource + virtual void SAL_CALL setOutputStream(const Reference < XOutputStream > & aStream) override; + virtual Reference < XOutputStream > SAL_CALL getOutputStream() override; + +public: // XConnectable + virtual void SAL_CALL setPredecessor(const Reference < XConnectable > & aPredecessor) override; + virtual Reference < XConnectable > SAL_CALL getPredecessor() override; + virtual void SAL_CALL setSuccessor(const Reference < XConnectable >& aSuccessor) override; + virtual Reference< XConnectable > SAL_CALL getSuccessor() override; + +public: // XServiceInfo + OUString SAL_CALL getImplementationName() override; + Sequence< OUString > SAL_CALL getSupportedServiceNames() override; + sal_Bool SAL_CALL supportsService(const OUString& ServiceName) override; + +private: + // helper methods + /// @throws NotConnectedException + /// @throws BufferSizeExceededException + void checkMarksAndFlush(); + + Reference< XConnectable > m_succ; + Reference< XConnectable > m_pred; + + Reference< XOutputStream > m_output; + bool m_bValidStream; + + std::unique_ptr<MemRingBuffer> m_pBuffer; + map<sal_Int32,sal_Int32,less< sal_Int32 > > m_mapMarks; + sal_Int32 m_nCurrentPos; + sal_Int32 m_nCurrentMark; + + std::mutex m_mutex; +}; + +} + +OMarkableOutputStream::OMarkableOutputStream( ) + : m_bValidStream(false) + , m_pBuffer( new MemRingBuffer ) + , m_nCurrentPos(0) + , m_nCurrentMark(0) +{ +} + +// XOutputStream +void OMarkableOutputStream::writeBytes(const Sequence< sal_Int8 >& aData) +{ + if( !m_bValidStream ) { + throw NotConnectedException(); + } + if( m_mapMarks.empty() && ( m_pBuffer->getSize() == 0 ) ) { + // no mark and buffer active, simple write through + m_output->writeBytes( aData ); + } + else { + std::unique_lock guard( m_mutex ); + // new data must be buffered + m_pBuffer->writeAt( m_nCurrentPos , aData ); + m_nCurrentPos += aData.getLength(); + checkMarksAndFlush(); + } + +} + +void OMarkableOutputStream::flush() +{ + Reference< XOutputStream > output; + { + std::unique_lock guard( m_mutex ); + output = m_output; + } + + // Markable cannot flush buffered data, because the data may get rewritten, + // however one can forward the flush to the chained stream to give it + // a chance to write data buffered in the chained stream. + if( output.is() ) + { + output->flush(); + } +} + +void OMarkableOutputStream::closeOutput() +{ + if( !m_bValidStream ) { + throw NotConnectedException(); + } + std::unique_lock guard( m_mutex ); + // all marks must be cleared and all + + m_mapMarks.clear(); + m_nCurrentPos = m_pBuffer->getSize(); + checkMarksAndFlush(); + + m_output->closeOutput(); + + setOutputStream( Reference< XOutputStream > () ); + setPredecessor( Reference < XConnectable >() ); + setSuccessor( Reference< XConnectable > () ); + +} + + +sal_Int32 OMarkableOutputStream::createMark() +{ + std::unique_lock guard( m_mutex ); + sal_Int32 nMark = m_nCurrentMark; + + m_mapMarks[nMark] = m_nCurrentPos; + + m_nCurrentMark ++; + return nMark; +} + +void OMarkableOutputStream::deleteMark(sal_Int32 Mark) +{ + std::unique_lock guard( m_mutex ); + map<sal_Int32,sal_Int32,less<sal_Int32> >::iterator ii = m_mapMarks.find( Mark ); + + if( ii == m_mapMarks.end() ) { + throw IllegalArgumentException( + "MarkableOutputStream::deleteMark unknown mark (" + OUString::number(Mark) + ")", + *this, 0); + } + m_mapMarks.erase( ii ); + checkMarksAndFlush(); +} + +void OMarkableOutputStream::jumpToMark(sal_Int32 nMark) +{ + std::unique_lock guard( m_mutex ); + map<sal_Int32,sal_Int32,less<sal_Int32> >::iterator ii = m_mapMarks.find( nMark ); + + if( ii == m_mapMarks.end() ) { + throw IllegalArgumentException( + "MarkableOutputStream::jumpToMark unknown mark (" + OUString::number(nMark) + ")", + *this, 0); + } + m_nCurrentPos = (*ii).second; +} + +void OMarkableOutputStream::jumpToFurthest() +{ + std::unique_lock guard( m_mutex ); + m_nCurrentPos = m_pBuffer->getSize(); + checkMarksAndFlush(); +} + +sal_Int32 OMarkableOutputStream::offsetToMark(sal_Int32 nMark) +{ + + std::unique_lock guard( m_mutex ); + map<sal_Int32,sal_Int32,less<sal_Int32> >::const_iterator ii = m_mapMarks.find( nMark ); + + if( ii == m_mapMarks.end() ) + { + throw IllegalArgumentException( + "MarkableOutputStream::offsetToMark unknown mark (" + OUString::number(nMark) + ")", + *this, 0); + } + return m_nCurrentPos - (*ii).second; +} + + +// XActiveDataSource2 +void OMarkableOutputStream::setOutputStream(const Reference < XOutputStream >& aStream) +{ + if( m_output != aStream ) { + m_output = aStream; + + Reference < XConnectable > succ( m_output , UNO_QUERY ); + setSuccessor( succ ); + } + m_bValidStream = m_output.is(); +} + +Reference< XOutputStream > OMarkableOutputStream::getOutputStream() +{ + return m_output; +} + + +void OMarkableOutputStream::setSuccessor( const Reference< XConnectable > &r ) +{ + /// if the references match, nothing needs to be done + if( m_succ != r ) { + /// store the reference for later use + m_succ = r; + + if( m_succ.is() ) { + m_succ->setPredecessor( Reference < XConnectable > ( + static_cast< XConnectable * >(this) ) ); + } + } +} +Reference <XConnectable > OMarkableOutputStream::getSuccessor() +{ + return m_succ; +} + + +// XDataSource +void OMarkableOutputStream::setPredecessor( const Reference< XConnectable > &r ) +{ + if( r != m_pred ) { + m_pred = r; + if( m_pred.is() ) { + m_pred->setSuccessor( Reference < XConnectable > ( + static_cast< XConnectable * >(this ) ) ); + } + } +} +Reference < XConnectable > OMarkableOutputStream::getPredecessor() +{ + return m_pred; +} + + +// private methods + +void OMarkableOutputStream::checkMarksAndFlush() +{ + // find the smallest mark + sal_Int32 nNextFound = m_nCurrentPos; + for (auto const& mark : m_mapMarks) + { + if( mark.second <= nNextFound ) { + nNextFound = mark.second; + } + } + + if( nNextFound ) { + // some data must be released ! + m_nCurrentPos -= nNextFound; + for (auto & mark : m_mapMarks) + { + mark.second -= nNextFound; + } + + Sequence<sal_Int8> seq(nNextFound); + m_pBuffer->readAt( 0 , seq , nNextFound ); + m_pBuffer->forgetFromStart( nNextFound ); + + // now write data through to streams + m_output->writeBytes( seq ); + } + else { + // nothing to do. There is a mark or the current cursor position, that prevents + // releasing data ! + } +} + + +// XServiceInfo +OUString OMarkableOutputStream::getImplementationName() +{ + return "com.sun.star.comp.io.stm.MarkableOutputStream"; +} + +// XServiceInfo +sal_Bool OMarkableOutputStream::supportsService(const OUString& ServiceName) +{ + return cppu::supportsService(this, ServiceName); +} + +// XServiceInfo +Sequence< OUString > OMarkableOutputStream::getSupportedServiceNames() +{ + return { "com.sun.star.io.MarkableOutputStream" }; +} + +extern "C" SAL_DLLPUBLIC_EXPORT css::uno::XInterface* +io_OMarkableOutputStream_get_implementation( + css::uno::XComponentContext* , css::uno::Sequence<css::uno::Any> const&) +{ + return cppu::acquire(new OMarkableOutputStream()); +} + + +// XMarkableInputStream + +namespace { + +class OMarkableInputStream : + public WeakImplHelper + < + XInputStream, + XActiveDataSink, + XMarkableStream, + XConnectable, + XServiceInfo + > +{ +public: + OMarkableInputStream( ); + + +public: // XInputStream + virtual sal_Int32 SAL_CALL readBytes(Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead) override ; + virtual sal_Int32 SAL_CALL readSomeBytes(Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead) override; + virtual void SAL_CALL skipBytes(sal_Int32 nBytesToSkip) override; + + virtual sal_Int32 SAL_CALL available() override; + virtual void SAL_CALL closeInput() override; + +public: // XMarkable + virtual sal_Int32 SAL_CALL createMark() override; + virtual void SAL_CALL deleteMark(sal_Int32 Mark) override; + virtual void SAL_CALL jumpToMark(sal_Int32 nMark) override; + virtual void SAL_CALL jumpToFurthest() override; + virtual sal_Int32 SAL_CALL offsetToMark(sal_Int32 nMark) override; + +public: // XActiveDataSink + virtual void SAL_CALL setInputStream(const Reference < XInputStream > & aStream) override; + virtual Reference < XInputStream > SAL_CALL getInputStream() override; + +public: // XConnectable + virtual void SAL_CALL setPredecessor(const Reference < XConnectable > & aPredecessor) override; + virtual Reference < XConnectable > SAL_CALL getPredecessor() override; + virtual void SAL_CALL setSuccessor(const Reference < XConnectable > & aSuccessor) override; + virtual Reference < XConnectable > SAL_CALL getSuccessor() override; + +public: // XServiceInfo + OUString SAL_CALL getImplementationName() override; + Sequence< OUString > SAL_CALL getSupportedServiceNames() override; + sal_Bool SAL_CALL supportsService(const OUString& ServiceName) override; + +private: + void checkMarksAndFlush(); + + Reference < XConnectable > m_succ; + Reference < XConnectable > m_pred; + + Reference< XInputStream > m_input; + bool m_bValidStream; + + std::unique_ptr<MemRingBuffer> m_pBuffer; + map<sal_Int32,sal_Int32,less< sal_Int32 > > m_mapMarks; + sal_Int32 m_nCurrentPos; + sal_Int32 m_nCurrentMark; + + std::mutex m_mutex; +}; + +} + +OMarkableInputStream::OMarkableInputStream() + : m_bValidStream(false) + , m_nCurrentPos(0) + , m_nCurrentMark(0) +{ + m_pBuffer.reset( new MemRingBuffer ); +} + + +// XInputStream + +sal_Int32 OMarkableInputStream::readBytes(Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead) +{ + sal_Int32 nBytesRead; + + if( !m_bValidStream ) { + throw NotConnectedException( + "MarkableInputStream::readBytes NotConnectedException", + *this ); + } + std::unique_lock guard( m_mutex ); + if( m_mapMarks.empty() && ! m_pBuffer->getSize() ) { + // normal read ! + nBytesRead = m_input->readBytes( aData, nBytesToRead ); + } + else { + // read from buffer + sal_Int32 nRead; + + // read enough bytes into buffer + if( m_pBuffer->getSize() - m_nCurrentPos < nBytesToRead ) { + sal_Int32 nToRead = nBytesToRead - ( m_pBuffer->getSize() - m_nCurrentPos ); + nRead = m_input->readBytes( aData , nToRead ); + + OSL_ASSERT( aData.getLength() == nRead ); + + m_pBuffer->writeAt( m_pBuffer->getSize() , aData ); + + if( nRead < nToRead ) { + nBytesToRead = nBytesToRead - (nToRead-nRead); + } + } + + OSL_ASSERT( m_pBuffer->getSize() - m_nCurrentPos >= nBytesToRead ); + + m_pBuffer->readAt( m_nCurrentPos , aData , nBytesToRead ); + + m_nCurrentPos += nBytesToRead; + nBytesRead = nBytesToRead; + } + + return nBytesRead; +} + + +sal_Int32 OMarkableInputStream::readSomeBytes(Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead) +{ + + sal_Int32 nBytesRead; + if( !m_bValidStream ) { + throw NotConnectedException( + "MarkableInputStream::readSomeBytes NotConnectedException", + *this ); + } + + std::unique_lock guard( m_mutex ); + if( m_mapMarks.empty() && ! m_pBuffer->getSize() ) { + // normal read ! + nBytesRead = m_input->readSomeBytes( aData, nMaxBytesToRead ); + } + else { + // read from buffer + sal_Int32 nRead = 0; + sal_Int32 nInBuffer = m_pBuffer->getSize() - m_nCurrentPos; + sal_Int32 nAdditionalBytesToRead = std::min<sal_Int32>(nMaxBytesToRead-nInBuffer,m_input->available()); + nAdditionalBytesToRead = std::max<sal_Int32>(0 , nAdditionalBytesToRead ); + + // read enough bytes into buffer + if( 0 == nInBuffer ) { + nRead = m_input->readSomeBytes( aData , nMaxBytesToRead ); + } + else if( nAdditionalBytesToRead ) { + nRead = m_input->readBytes( aData , nAdditionalBytesToRead ); + } + + if( nRead ) { + aData.realloc( nRead ); + m_pBuffer->writeAt( m_pBuffer->getSize() , aData ); + } + + nBytesRead = std::min( nMaxBytesToRead , nInBuffer + nRead ); + + // now take everything from buffer ! + m_pBuffer->readAt( m_nCurrentPos , aData , nBytesRead ); + + m_nCurrentPos += nBytesRead; + } + + return nBytesRead; + + +} + + +void OMarkableInputStream::skipBytes(sal_Int32 nBytesToSkip) +{ + if ( nBytesToSkip < 0 ) + throw BufferSizeExceededException( + "precondition not met: XInputStream::skipBytes: non-negative integer required!", + *this + ); + + // this method is blocking + Sequence<sal_Int8> seqDummy( nBytesToSkip ); + readBytes( seqDummy , nBytesToSkip ); +} + +sal_Int32 OMarkableInputStream::available() +{ + if( !m_bValidStream ) { + throw NotConnectedException( + "MarkableInputStream::available NotConnectedException", + *this ); + } + + std::unique_lock guard( m_mutex ); + sal_Int32 nAvail = m_input->available() + ( m_pBuffer->getSize() - m_nCurrentPos ); + return nAvail; +} + + +void OMarkableInputStream::closeInput() +{ + if( !m_bValidStream ) { + throw NotConnectedException( + "MarkableInputStream::closeInput NotConnectedException", + *this ); + } + std::unique_lock guard( m_mutex ); + + m_input->closeInput(); + + setInputStream( Reference< XInputStream > () ); + setPredecessor( Reference< XConnectable > () ); + setSuccessor( Reference< XConnectable >() ); + + m_pBuffer.reset(); + m_nCurrentPos = 0; + m_nCurrentMark = 0; +} + +// XMarkable + +sal_Int32 OMarkableInputStream::createMark() +{ + std::unique_lock guard( m_mutex ); + sal_Int32 nMark = m_nCurrentMark; + + m_mapMarks[nMark] = m_nCurrentPos; + + m_nCurrentMark ++; + return nMark; +} + +void OMarkableInputStream::deleteMark(sal_Int32 Mark) +{ + std::unique_lock guard( m_mutex ); + map<sal_Int32,sal_Int32,less<sal_Int32> >::iterator ii = m_mapMarks.find( Mark ); + + if( ii == m_mapMarks.end() ) { + throw IllegalArgumentException( + "MarkableInputStream::deleteMark unknown mark (" + OUString::number(Mark) + ")", + *this , 0 ); + } + m_mapMarks.erase( ii ); + checkMarksAndFlush(); +} + +void OMarkableInputStream::jumpToMark(sal_Int32 nMark) +{ + std::unique_lock guard( m_mutex ); + map<sal_Int32,sal_Int32,less<sal_Int32> >::iterator ii = m_mapMarks.find( nMark ); + + if( ii == m_mapMarks.end() ) + { + throw IllegalArgumentException( + "MarkableInputStream::jumpToMark unknown mark (" + OUString::number(nMark) + ")", + *this , 0 ); + } + m_nCurrentPos = (*ii).second; +} + +void OMarkableInputStream::jumpToFurthest() +{ + std::unique_lock guard( m_mutex ); + m_nCurrentPos = m_pBuffer->getSize(); + checkMarksAndFlush(); +} + +sal_Int32 OMarkableInputStream::offsetToMark(sal_Int32 nMark) +{ + std::unique_lock guard( m_mutex ); + map<sal_Int32,sal_Int32,less<sal_Int32> >::const_iterator ii = m_mapMarks.find( nMark ); + + if( ii == m_mapMarks.end() ) + { + throw IllegalArgumentException( + "MarkableInputStream::offsetToMark unknown mark (" + OUString::number(nMark) + ")", + *this, 0 ); + } + return m_nCurrentPos - (*ii).second; +} + + +// XActiveDataSource +void OMarkableInputStream::setInputStream(const Reference< XInputStream > & aStream) +{ + + if( m_input != aStream ) { + m_input = aStream; + + Reference < XConnectable > pred( m_input , UNO_QUERY ); + setPredecessor( pred ); + } + + m_bValidStream = m_input.is(); + +} + +Reference< XInputStream > OMarkableInputStream::getInputStream() +{ + return m_input; +} + + +// XDataSink +void OMarkableInputStream::setSuccessor( const Reference< XConnectable > &r ) +{ + /// if the references match, nothing needs to be done + if( m_succ != r ) { + /// store the reference for later use + m_succ = r; + + if( m_succ.is() ) { + /// set this instance as the sink ! + m_succ->setPredecessor( Reference< XConnectable > ( + static_cast< XConnectable * >(this) ) ); + } + } +} + +Reference < XConnectable > OMarkableInputStream::getSuccessor() +{ + return m_succ; +} + + +// XDataSource +void OMarkableInputStream::setPredecessor( const Reference < XConnectable > &r ) +{ + if( r != m_pred ) { + m_pred = r; + if( m_pred.is() ) { + m_pred->setSuccessor( Reference< XConnectable > ( + static_cast< XConnectable * >(this) ) ); + } + } +} +Reference< XConnectable > OMarkableInputStream::getPredecessor() +{ + return m_pred; +} + + +void OMarkableInputStream::checkMarksAndFlush() +{ + // find the smallest mark + sal_Int32 nNextFound = m_nCurrentPos; + for (auto const& mark : m_mapMarks) + { + if( mark.second <= nNextFound ) { + nNextFound = mark.second; + } + } + + if( nNextFound ) { + // some data must be released ! + m_nCurrentPos -= nNextFound; + for (auto & mark : m_mapMarks) + { + mark.second -= nNextFound; + } + + m_pBuffer->forgetFromStart( nNextFound ); + + } + else { + // nothing to do. There is a mark or the current cursor position, that prevents + // releasing data ! + } +} + +// XServiceInfo +OUString OMarkableInputStream::getImplementationName() +{ + return "com.sun.star.comp.io.stm.MarkableInputStream"; +} + +// XServiceInfo +sal_Bool OMarkableInputStream::supportsService(const OUString& ServiceName) +{ + return cppu::supportsService(this, ServiceName); +} + +// XServiceInfo +Sequence< OUString > OMarkableInputStream::getSupportedServiceNames() +{ + return { "com.sun.star.io.MarkableInputStream" }; +} + +extern "C" SAL_DLLPUBLIC_EXPORT css::uno::XInterface* +io_OMarkableInputStream_get_implementation( + css::uno::XComponentContext* , css::uno::Sequence<css::uno::Any> const&) +{ + return cppu::acquire(new OMarkableInputStream()); +} + +} + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/io/source/stm/opipe.cxx b/io/source/stm/opipe.cxx new file mode 100644 index 000000000..07e203bf0 --- /dev/null +++ b/io/source/stm/opipe.cxx @@ -0,0 +1,357 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#include <sal/config.h> + +#include <com/sun/star/io/BufferSizeExceededException.hpp> +#include <com/sun/star/io/NotConnectedException.hpp> +#include <com/sun/star/io/XPipe.hpp> +#include <com/sun/star/io/XConnectable.hpp> + +#include <com/sun/star/lang/XServiceInfo.hpp> + +#include <cppuhelper/implbase.hxx> +#include <cppuhelper/supportsservice.hxx> + +#include <osl/conditn.hxx> +#include <osl/mutex.hxx> + +#include <limits> +#include <memory> +#include <string.h> + +using namespace ::osl; +using namespace ::cppu; +using namespace ::com::sun::star::uno; +using namespace ::com::sun::star::io; +using namespace ::com::sun::star::lang; + +#include "streamhelper.hxx" + +namespace com::sun::star::uno { class XComponentContext; } + +namespace io_stm{ + +namespace { + +class OPipeImpl : + public WeakImplHelper< XPipe , XConnectable , XServiceInfo > +{ +public: + OPipeImpl( ); + +public: // XInputStream + virtual sal_Int32 SAL_CALL readBytes(Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead) override; + virtual sal_Int32 SAL_CALL readSomeBytes(Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead) override; + virtual void SAL_CALL skipBytes(sal_Int32 nBytesToSkip) override; + virtual sal_Int32 SAL_CALL available() override; + virtual void SAL_CALL closeInput() override; + +public: // XOutputStream + + virtual void SAL_CALL writeBytes(const Sequence< sal_Int8 >& aData) override; + virtual void SAL_CALL flush() override; + virtual void SAL_CALL closeOutput() override; + +public: // XConnectable + virtual void SAL_CALL setPredecessor(const Reference< XConnectable >& aPredecessor) override; + virtual Reference< XConnectable > SAL_CALL getPredecessor() override; + virtual void SAL_CALL setSuccessor(const Reference < XConnectable > & aSuccessor) override; + virtual Reference < XConnectable > SAL_CALL getSuccessor() override ; + + +public: // XServiceInfo + OUString SAL_CALL getImplementationName() override; + Sequence< OUString > SAL_CALL getSupportedServiceNames() override; + sal_Bool SAL_CALL supportsService(const OUString& ServiceName) override; + +private: + + Reference < XConnectable > m_succ; + Reference < XConnectable > m_pred; + + sal_Int32 m_nBytesToSkip; + + bool m_bOutputStreamClosed; + bool m_bInputStreamClosed; + + osl::Condition m_conditionBytesAvail; + Mutex m_mutexAccess; + std::unique_ptr<MemFIFO> m_pFIFO; +}; + +} + +OPipeImpl::OPipeImpl() + : m_nBytesToSkip(0 ) + , m_bOutputStreamClosed(false ) + , m_bInputStreamClosed( false ) + , m_pFIFO( new MemFIFO ) +{ +} + + + +sal_Int32 OPipeImpl::readBytes(Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead) +{ + while( true ) + { + { // start guarded section + MutexGuard guard( m_mutexAccess ); + if( m_bInputStreamClosed ) + { + throw NotConnectedException( + "Pipe::readBytes NotConnectedException", + *this ); + } + sal_Int32 nOccupiedBufferLen = m_pFIFO->getSize(); + + if( m_bOutputStreamClosed && nBytesToRead > nOccupiedBufferLen ) + { + nBytesToRead = nOccupiedBufferLen; + } + + if( nOccupiedBufferLen < nBytesToRead ) + { + // wait outside guarded section + m_conditionBytesAvail.reset(); + } + else { + // necessary bytes are available + m_pFIFO->read( aData , nBytesToRead ); + return nBytesToRead; + } + } // end guarded section + + // wait for new data outside guarded section! + m_conditionBytesAvail.wait(); + } +} + + +sal_Int32 OPipeImpl::readSomeBytes(Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead) +{ + while( true ) { + { + MutexGuard guard( m_mutexAccess ); + if( m_bInputStreamClosed ) + { + throw NotConnectedException( + "Pipe::readSomeBytes NotConnectedException", + *this ); + } + if( m_pFIFO->getSize() ) + { + sal_Int32 nSize = std::min( nMaxBytesToRead , m_pFIFO->getSize() ); + aData.realloc( nSize ); + m_pFIFO->read( aData , nSize ); + return nSize; + } + + if( m_bOutputStreamClosed ) + { + // no bytes in buffer anymore + return 0; + } + } + + m_conditionBytesAvail.wait(); + } +} + + +void OPipeImpl::skipBytes(sal_Int32 nBytesToSkip) +{ + MutexGuard guard( m_mutexAccess ); + if( m_bInputStreamClosed ) + { + throw NotConnectedException( + "Pipe::skipBytes NotConnectedException", + *this ); + } + + if( nBytesToSkip < 0 + || (nBytesToSkip + > std::numeric_limits< sal_Int32 >::max() - m_nBytesToSkip) ) + { + throw BufferSizeExceededException( + "Pipe::skipBytes BufferSizeExceededException", + *this ); + } + m_nBytesToSkip += nBytesToSkip; + + nBytesToSkip = std::min( m_pFIFO->getSize() , m_nBytesToSkip ); + m_pFIFO->skip( nBytesToSkip ); + m_nBytesToSkip -= nBytesToSkip; +} + + +sal_Int32 OPipeImpl::available() + { + MutexGuard guard( m_mutexAccess ); + if( m_bInputStreamClosed ) + { + throw NotConnectedException( + "Pipe::available NotConnectedException", + *this ); + } + return m_pFIFO->getSize(); +} + +void OPipeImpl::closeInput() +{ + MutexGuard guard( m_mutexAccess ); + + m_bInputStreamClosed = true; + + m_pFIFO.reset(); + + // readBytes may throw an exception + m_conditionBytesAvail.set(); + + setSuccessor( Reference< XConnectable > () ); +} + + +void OPipeImpl::writeBytes(const Sequence< sal_Int8 >& aData) +{ + MutexGuard guard( m_mutexAccess ); + + if( m_bOutputStreamClosed ) + { + throw NotConnectedException( + "Pipe::writeBytes NotConnectedException (outputstream)", + *this ); + } + + if( m_bInputStreamClosed ) + { + throw NotConnectedException( + "Pipe::writeBytes NotConnectedException (inputstream)", + *this ); + } + + // check skipping + sal_Int32 nLen = aData.getLength(); + if( m_nBytesToSkip && m_nBytesToSkip >= nLen ) { + // all must be skipped - forget whole call + m_nBytesToSkip -= nLen; + return; + } + + // adjust buffersize if necessary + if( m_nBytesToSkip ) + { + Sequence< sal_Int8 > seqCopy( nLen - m_nBytesToSkip ); + memcpy( seqCopy.getArray() , &( aData.getConstArray()[m_nBytesToSkip] ) , nLen-m_nBytesToSkip ); + m_pFIFO->write( seqCopy ); + } + else + { + m_pFIFO->write( aData ); + } + m_nBytesToSkip = 0; + + // readBytes may check again if enough bytes are available + m_conditionBytesAvail.set(); +} + + +void OPipeImpl::flush() +{ + // nothing to do for a pipe +} + +void OPipeImpl::closeOutput() +{ + MutexGuard guard( m_mutexAccess ); + + m_bOutputStreamClosed = true; + m_conditionBytesAvail.set(); + setPredecessor( Reference < XConnectable > () ); +} + + +void OPipeImpl::setSuccessor( const Reference < XConnectable > &r ) +{ + /// if the references match, nothing needs to be done + if( m_succ != r ) { + /// store the reference for later use + m_succ = r; + + if( m_succ.is() ) + { + m_succ->setPredecessor( + Reference< XConnectable > ( static_cast< XConnectable * >(this) ) ); + } + } +} + +Reference < XConnectable > OPipeImpl::getSuccessor() +{ + return m_succ; +} + + +// XDataSource +void OPipeImpl::setPredecessor( const Reference < XConnectable > &r ) +{ + if( r != m_pred ) { + m_pred = r; + if( m_pred.is() ) { + m_pred->setSuccessor( + Reference < XConnectable > ( static_cast< XConnectable * >(this) ) ); + } + } +} + +Reference < XConnectable > OPipeImpl::getPredecessor() +{ + return m_pred; +} + + +// XServiceInfo +OUString OPipeImpl::getImplementationName() +{ + return "com.sun.star.comp.io.stm.Pipe"; +} + +// XServiceInfo +sal_Bool OPipeImpl::supportsService(const OUString& ServiceName) +{ + return cppu::supportsService(this, ServiceName); +} + +// XServiceInfo +Sequence< OUString > OPipeImpl::getSupportedServiceNames() +{ + return { "com.sun.star.io.Pipe" }; +} + +} + +extern "C" SAL_DLLPUBLIC_EXPORT css::uno::XInterface* +io_OPipeImpl_get_implementation( + css::uno::XComponentContext* , css::uno::Sequence<css::uno::Any> const&) +{ + return cppu::acquire(new io_stm::OPipeImpl()); +} + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/io/source/stm/opump.cxx b/io/source/stm/opump.cxx new file mode 100644 index 000000000..fc751b677 --- /dev/null +++ b/io/source/stm/opump.cxx @@ -0,0 +1,455 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + + +#include <sal/log.hxx> + +#include <com/sun/star/io/IOException.hpp> +#include <com/sun/star/io/NotConnectedException.hpp> +#include <com/sun/star/io/XActiveDataSource.hpp> +#include <com/sun/star/io/XActiveDataSink.hpp> +#include <com/sun/star/io/XActiveDataControl.hpp> +#include <com/sun/star/io/XConnectable.hpp> +#include <com/sun/star/lang/XServiceInfo.hpp> +#include <com/sun/star/uno/XComponentContext.hpp> + +#include <cppuhelper/implbase.hxx> +#include <comphelper/interfacecontainer4.hxx> +#include <cppuhelper/supportsservice.hxx> +#include <osl/thread.h> +#include <mutex> + +using namespace osl; +using namespace cppu; +using namespace com::sun::star::uno; +using namespace com::sun::star::lang; +using namespace com::sun::star::io; + +namespace io_stm { + + namespace { + + class Pump : public WeakImplHelper< + XActiveDataSource, XActiveDataSink, XActiveDataControl, XConnectable, XServiceInfo > + { + std::mutex m_aMutex; + oslThread m_aThread; + + Reference< XConnectable > m_xPred; + Reference< XConnectable > m_xSucc; + Reference< XInputStream > m_xInput; + Reference< XOutputStream > m_xOutput; + comphelper::OInterfaceContainerHelper4<XStreamListener> m_cnt; + bool m_closeFired; + + void run(); + static void static_run( void* pObject ); + + void close(); + void fireClose(); + void fireStarted(); + void fireTerminated(); + void fireError( const Any &a ); + + public: + Pump(); + virtual ~Pump() override; + + // XActiveDataSource + virtual void SAL_CALL setOutputStream( const Reference< css::io::XOutputStream >& xOutput ) override; + virtual Reference< css::io::XOutputStream > SAL_CALL getOutputStream() override; + + // XActiveDataSink + virtual void SAL_CALL setInputStream( const Reference< css::io::XInputStream >& xStream ) override; + virtual Reference< css::io::XInputStream > SAL_CALL getInputStream() override; + + // XActiveDataControl + virtual void SAL_CALL addListener( const Reference< css::io::XStreamListener >& xListener ) override; + virtual void SAL_CALL removeListener( const Reference< css::io::XStreamListener >& xListener ) override; + virtual void SAL_CALL start() override; + virtual void SAL_CALL terminate() override; + + // XConnectable + virtual void SAL_CALL setPredecessor( const Reference< css::io::XConnectable >& xPred ) override; + virtual Reference< css::io::XConnectable > SAL_CALL getPredecessor() override; + virtual void SAL_CALL setSuccessor( const Reference< css::io::XConnectable >& xSucc ) override; + virtual Reference< css::io::XConnectable > SAL_CALL getSuccessor() override; + + public: // XServiceInfo + virtual OUString SAL_CALL getImplementationName() override; + virtual Sequence< OUString > SAL_CALL getSupportedServiceNames() override; + virtual sal_Bool SAL_CALL supportsService(const OUString& ServiceName) override; + }; + + } + +Pump::Pump() : m_aThread( nullptr ), + m_closeFired( false ) +{ +} + +Pump::~Pump() +{ + // exit gracefully + if( m_aThread ) + { + osl_joinWithThread( m_aThread ); + osl_destroyThread( m_aThread ); + } +} + +void Pump::fireError( const Any & exception ) +{ + std::unique_lock guard( m_aMutex ); + comphelper::OInterfaceIteratorHelper4<XStreamListener> iter( guard, m_cnt ); + guard.unlock(); + while( iter.hasMoreElements() ) + { + try + { + iter.next()->error( exception ); + } + catch ( const RuntimeException &e ) + { + SAL_WARN("io.streams","com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners" << e); + } + } +} + +void Pump::fireClose() +{ + bool bFire = false; + { + std::unique_lock guard( m_aMutex ); + if( ! m_closeFired ) + { + m_closeFired = true; + bFire = true; + } + } + + if( !bFire ) + return; + + std::unique_lock guard( m_aMutex ); + comphelper::OInterfaceIteratorHelper4<XStreamListener> iter( guard, m_cnt ); + guard.unlock(); + while( iter.hasMoreElements() ) + { + try + { + iter.next()->closed( ); + } + catch ( const RuntimeException &e ) + { + SAL_WARN("io.streams","com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners" << e); + } + } +} + +void Pump::fireStarted() +{ + std::unique_lock guard( m_aMutex ); + comphelper::OInterfaceIteratorHelper4<XStreamListener> iter( guard, m_cnt ); + guard.unlock(); + while( iter.hasMoreElements() ) + { + try + { + iter.next()->started( ); + } + catch ( const RuntimeException &e ) + { + SAL_WARN("io.streams","com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners" << e); + } + } +} + +void Pump::fireTerminated() +{ + std::unique_lock guard( m_aMutex ); + comphelper::OInterfaceIteratorHelper4<XStreamListener> iter( guard, m_cnt ); + guard.unlock(); + while( iter.hasMoreElements() ) + { + try + { + iter.next()->terminated(); + } + catch ( const RuntimeException &e ) + { + SAL_WARN("io.streams","com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners" << e); + } + } +} + + +void Pump::close() +{ + // close streams and release references + Reference< XInputStream > rInput; + Reference< XOutputStream > rOutput; + { + std::unique_lock guard( m_aMutex ); + rInput = m_xInput; + m_xInput.clear(); + + rOutput = m_xOutput; + m_xOutput.clear(); + m_xSucc.clear(); + m_xPred.clear(); + } + if( rInput.is() ) + { + try + { + rInput->closeInput(); + } + catch( Exception & ) + { + // go down calm + } + } + if( rOutput.is() ) + { + try + { + rOutput->closeOutput(); + } + catch( Exception & ) + { + // go down calm + } + } +} + +void Pump::static_run( void* pObject ) +{ + osl_setThreadName("io_stm::Pump::run()"); + static_cast<Pump*>(pObject)->run(); + static_cast<Pump*>(pObject)->release(); +} + +void Pump::run() +{ + try + { + fireStarted(); + try + { + Reference< XInputStream > rInput; + Reference< XOutputStream > rOutput; + { + std::unique_lock aGuard( m_aMutex ); + rInput = m_xInput; + rOutput = m_xOutput; + } + + if( ! rInput.is() ) + { + throw NotConnectedException( "no input stream set", static_cast<OWeakObject*>(this) ); + } + Sequence< sal_Int8 > aData; + while( rInput->readSomeBytes( aData, 65536 ) ) + { + if( ! rOutput.is() ) + { + throw NotConnectedException( "no output stream set", static_cast<OWeakObject*>(this) ); + } + rOutput->writeBytes( aData ); + osl_yieldThread(); + } + } + catch ( const IOException & e ) + { + fireError( Any( e ) ); + } + catch ( const RuntimeException & e ) + { + fireError( Any( e ) ); + } + catch ( const Exception & e ) + { + fireError( Any( e ) ); + } + + close(); + fireClose(); + } + catch ( const css::uno::Exception &e ) + { + // we are the last on the stack. + // this is to avoid crashing the program, when e.g. a bridge crashes + SAL_WARN("io.streams","com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners" << e); + } +} + + +/* + * XConnectable + */ + +void Pump::setPredecessor( const Reference< XConnectable >& xPred ) +{ + std::unique_lock aGuard( m_aMutex ); + m_xPred = xPred; +} + + +Reference< XConnectable > Pump::getPredecessor() +{ + std::unique_lock aGuard( m_aMutex ); + return m_xPred; +} + + +void Pump::setSuccessor( const Reference< XConnectable >& xSucc ) +{ + std::unique_lock aGuard( m_aMutex ); + m_xSucc = xSucc; +} + + +Reference< XConnectable > Pump::getSuccessor() +{ + std::unique_lock aGuard( m_aMutex ); + return m_xSucc; +} + + +/* + * XActiveDataControl + */ + +void Pump::addListener( const Reference< XStreamListener >& xListener ) +{ + std::unique_lock aGuard( m_aMutex ); + m_cnt.addInterface( aGuard, xListener ); +} + + +void Pump::removeListener( const Reference< XStreamListener >& xListener ) +{ + std::unique_lock aGuard( m_aMutex ); + m_cnt.removeInterface( aGuard, xListener ); +} + + +void Pump::start() +{ + std::unique_lock aGuard( m_aMutex ); + m_aThread = osl_createSuspendedThread(Pump::static_run,this); + if( !m_aThread ) + { + throw RuntimeException( + "Pump::start Couldn't create worker thread", + *this); + } + + // will be released by OPump::static_run + acquire(); + osl_resumeThread( m_aThread ); + +} + + +void Pump::terminate() +{ + close(); + + // wait for the worker to die + if( m_aThread ) + osl_joinWithThread( m_aThread ); + + fireTerminated(); + fireClose(); +} + + +/* + * XActiveDataSink + */ + +void Pump::setInputStream( const Reference< XInputStream >& xStream ) +{ + std::unique_lock aGuard( m_aMutex ); + m_xInput = xStream; + Reference< XConnectable > xConnect( xStream, UNO_QUERY ); + if( xConnect.is() ) + xConnect->setSuccessor( this ); + // data transfer starts in XActiveDataControl::start +} + + +Reference< XInputStream > Pump::getInputStream() +{ + std::unique_lock aGuard( m_aMutex ); + return m_xInput; +} + + +/* + * XActiveDataSource + */ + +void Pump::setOutputStream( const Reference< XOutputStream >& xOut ) +{ + std::unique_lock aGuard( m_aMutex ); + m_xOutput = xOut; + Reference< XConnectable > xConnect( xOut, UNO_QUERY ); + if( xConnect.is() ) + xConnect->setPredecessor( this ); + // data transfer starts in XActiveDataControl::start +} + +Reference< XOutputStream > Pump::getOutputStream() +{ + std::unique_lock aGuard( m_aMutex ); + return m_xOutput; +} + +// XServiceInfo +OUString Pump::getImplementationName() +{ + return "com.sun.star.comp.io.Pump"; +} + +// XServiceInfo +sal_Bool Pump::supportsService(const OUString& ServiceName) +{ + return cppu::supportsService(this, ServiceName); +} + +// XServiceInfo +Sequence< OUString > Pump::getSupportedServiceNames() +{ + return { "com.sun.star.io.Pump" }; +} + +} + +extern "C" SAL_DLLPUBLIC_EXPORT css::uno::XInterface* +io_Pump_get_implementation( + css::uno::XComponentContext* , css::uno::Sequence<css::uno::Any> const&) +{ + return cppu::acquire(new io_stm::Pump()); +} + + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/io/source/stm/streamhelper.cxx b/io/source/stm/streamhelper.cxx new file mode 100644 index 000000000..0933ac966 --- /dev/null +++ b/io/source/stm/streamhelper.cxx @@ -0,0 +1,173 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#include <limits> +#include <string.h> + +#include <com/sun/star/uno/Sequence.hxx> + +#include <com/sun/star/io/BufferSizeExceededException.hpp> + +using namespace ::com::sun::star::uno; + +#include "streamhelper.hxx" + +namespace io_stm { + +void MemFIFO::write( const Sequence< sal_Int8 > &seq ) +{ + writeAt(getSize(), seq); +} + +void MemFIFO::read( Sequence<sal_Int8> &seq , sal_Int32 nBufferLen ) +{ + readAt(0, seq , nBufferLen); + forgetFromStart( nBufferLen ); +} + +void MemFIFO::skip( sal_Int32 nBytesToSkip ) +{ + forgetFromStart( nBytesToSkip ); +} + +MemRingBuffer::MemRingBuffer() : m_p(nullptr), m_nBufferLen(0), m_nStart(0), m_nOccupiedBuffer(0) +{ +} + +MemRingBuffer::~MemRingBuffer() +{ + std::free( m_p ); +} + +void MemRingBuffer::resizeBuffer( sal_Int32 nMinSize ) +{ + sal_Int32 nNewLen = 1; + + while( nMinSize > nNewLen ) { + nNewLen = nNewLen << 1; + } + + // buffer never shrinks ! + if( nNewLen < m_nBufferLen ) { + nNewLen = m_nBufferLen; + } + + if( nNewLen == m_nBufferLen ) + return; + + auto p = static_cast<sal_Int8*>(std::realloc(m_p, nNewLen)); + if (!p) + throw css::io::BufferSizeExceededException( + "MemRingBuffer::resizeBuffer BufferSizeExceededException"); + + m_p = p; + + + if( m_nStart + m_nOccupiedBuffer > m_nBufferLen ) { + memmove( &( m_p[m_nStart+(nNewLen-m_nBufferLen)]) , &(m_p[m_nStart]) , m_nBufferLen - m_nStart ); + m_nStart += nNewLen - m_nBufferLen; + } + m_nBufferLen = nNewLen; +} + + +void MemRingBuffer::readAt( sal_Int32 nPos, Sequence<sal_Int8> &seq , sal_Int32 nBytesToRead ) const +{ + if( nPos + nBytesToRead > m_nOccupiedBuffer ) { + throw css::io::BufferSizeExceededException( + "MemRingBuffer::readAt BufferSizeExceededException"); + } + + sal_Int32 nStartReadingPos = nPos + m_nStart; + if( nStartReadingPos >= m_nBufferLen ) { + nStartReadingPos -= m_nBufferLen; + } + + seq.realloc( nBytesToRead ); + + if( nStartReadingPos + nBytesToRead > m_nBufferLen ) { + sal_Int32 nDeltaLen = m_nBufferLen - nStartReadingPos; + memcpy( seq.getArray() , &(m_p[nStartReadingPos]) , nDeltaLen ); + memcpy( &(seq.getArray()[nDeltaLen]), m_p , nBytesToRead - nDeltaLen ); + } + else { + memcpy( seq.getArray() , &(m_p[nStartReadingPos]) , nBytesToRead ); + } +} + + +void MemRingBuffer::writeAt( sal_Int32 nPos, const Sequence<sal_Int8> &seq ) +{ + checkInvariants(); + const sal_Int32 nLen = seq.getLength(); + + if( nPos < 0 || nPos > std::numeric_limits< sal_Int32 >::max() - nLen ) + { + throw css::io::BufferSizeExceededException( + "MemRingBuffer::writeAt BufferSizeExceededException"); + } + + if( nPos + nLen - m_nOccupiedBuffer > 0 ) { + resizeBuffer( nPos + nLen ); + m_nOccupiedBuffer = nPos + nLen; + } + + sal_Int32 nStartWritingIndex = m_nStart + nPos; + if( nStartWritingIndex >= m_nBufferLen ) { + nStartWritingIndex -= m_nBufferLen; + } + + if( const sal_Int32 nBufferRestLen = m_nBufferLen-nStartWritingIndex; nLen > nBufferRestLen ) { + // two area copy + memcpy( &(m_p[nStartWritingIndex]) , seq.getConstArray(), nBufferRestLen ); + memcpy( m_p , &( seq.getConstArray()[nBufferRestLen] ), nLen - nBufferRestLen ); + + } + else { + // one area copy + memcpy( &( m_p[nStartWritingIndex]), seq.getConstArray() , nLen ); + } + checkInvariants(); +} + + +sal_Int32 MemRingBuffer::getSize() const noexcept +{ + return m_nOccupiedBuffer; +} + +void MemRingBuffer::forgetFromStart( sal_Int32 nBytesToForget ) +{ + checkInvariants(); + if( nBytesToForget > m_nOccupiedBuffer ) { + throw css::io::BufferSizeExceededException( + "MemRingBuffer::forgetFromStart BufferSizeExceededException"); + } + m_nStart += nBytesToForget; + if( m_nStart >= m_nBufferLen ) { + m_nStart = m_nStart - m_nBufferLen; + } + m_nOccupiedBuffer -= nBytesToForget; + checkInvariants(); +} + + +} + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/io/source/stm/streamhelper.hxx b/io/source/stm/streamhelper.hxx new file mode 100644 index 000000000..8cacaf1f2 --- /dev/null +++ b/io/source/stm/streamhelper.hxx @@ -0,0 +1,85 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#pragma once + +#include <com/sun/star/uno/Sequence.hxx> + +#include <assert.h> + +using namespace com::sun::star::uno; + +namespace io_stm +{ + +class MemRingBuffer +{ +public: + MemRingBuffer(); + virtual ~MemRingBuffer(); + + /*** + * overwrites data at given position. Size is automatically extended, when + * data is written beyond end. + ***/ + /// @throws css::io::BufferSizeExceededException + void writeAt( sal_Int32 nPos, const Sequence<sal_Int8> &); + /// @throws css::io::BufferSizeExceededException + void readAt( sal_Int32 nPos, Sequence<sal_Int8> & , sal_Int32 nBytesToRead ) const; + sal_Int32 getSize() const noexcept; + /// @throws css::io::BufferSizeExceededException + void forgetFromStart(sal_Int32 nBytesToForget); + +private: + /// @throws css::io::BufferSizeExceededException + void resizeBuffer(sal_Int32 nMinSize); + void checkInvariants() const { + assert( m_nBufferLen >= 0 ); + assert( m_nOccupiedBuffer >= 0 ); + assert( m_nOccupiedBuffer <= m_nBufferLen ); + assert( m_nStart >= 0 ); + assert( 0 == m_nStart || m_nStart < m_nBufferLen ); + (void) this; // avoid loplugin:staticmethods + } + + sal_Int8 *m_p; + sal_Int32 m_nBufferLen; + sal_Int32 m_nStart; + sal_Int32 m_nOccupiedBuffer; +}; + + +class MemFIFO : + private MemRingBuffer +{ +public: + /// @throws css::io::BufferSizeExceededException + void write( const Sequence<sal_Int8> &); + /// @throws css::io::BufferSizeExceededException + void read( Sequence<sal_Int8> & , sal_Int32 nBytesToRead ); + /// @throws css::io::BufferSizeExceededException + void skip( sal_Int32 nBytesToSkip ); + sal_Int32 getSize() const noexcept + { return MemRingBuffer::getSize(); } + +}; + +} + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ |