diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 16:51:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 16:51:28 +0000 |
commit | 940b4d1848e8c70ab7642901a68594e8016caffc (patch) | |
tree | eb72f344ee6c3d9b80a7ecc079ea79e9fba8676d /cppu/source/threadpool | |
parent | Initial commit. (diff) | |
download | libreoffice-940b4d1848e8c70ab7642901a68594e8016caffc.tar.xz libreoffice-940b4d1848e8c70ab7642901a68594e8016caffc.zip |
Adding upstream version 1:7.0.4.upstream/1%7.0.4upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'cppu/source/threadpool')
-rw-r--r-- | cppu/source/threadpool/current.cxx | 265 | ||||
-rw-r--r-- | cppu/source/threadpool/current.hxx | 47 | ||||
-rw-r--r-- | cppu/source/threadpool/jobqueue.cxx | 191 | ||||
-rw-r--r-- | cppu/source/threadpool/jobqueue.hxx | 73 | ||||
-rw-r--r-- | cppu/source/threadpool/thread.cxx | 199 | ||||
-rw-r--r-- | cppu/source/threadpool/thread.hxx | 70 | ||||
-rw-r--r-- | cppu/source/threadpool/threadident.cxx | 125 | ||||
-rw-r--r-- | cppu/source/threadpool/threadpool.cxx | 487 | ||||
-rw-r--r-- | cppu/source/threadpool/threadpool.hxx | 163 |
9 files changed, 1620 insertions, 0 deletions
diff --git a/cppu/source/threadpool/current.cxx b/cppu/source/threadpool/current.cxx new file mode 100644 index 000000000..7dbfc9050 --- /dev/null +++ b/cppu/source/threadpool/current.cxx @@ -0,0 +1,265 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#include <sal/config.h> + +#include <rtl/byteseq.h> +#include <osl/thread.h> +#include <osl/mutex.hxx> + +#include <uno/current_context.h> +#include <uno/environment.hxx> +#include <uno/mapping.hxx> +#include <typelib/typedescription.h> + +#include "current.hxx" + + +using namespace ::osl; +using namespace ::rtl; +using namespace ::cppu; +using namespace ::com::sun::star::uno; + +namespace cppu +{ + +static typelib_InterfaceTypeDescription * get_type_XCurrentContext() +{ + static typelib_InterfaceTypeDescription* s_type_XCurrentContext = []() { + OUString sTypeName("com.sun.star.uno.XCurrentContext"); + typelib_InterfaceTypeDescription* pTD = nullptr; + typelib_TypeDescriptionReference* pMembers[1] = { nullptr }; + OUString sMethodName0("com.sun.star.uno.XCurrentContext::getValueByName"); + typelib_typedescriptionreference_new(&pMembers[0], typelib_TypeClass_INTERFACE_METHOD, + sMethodName0.pData); + typelib_typedescription_newInterface( + &pTD, sTypeName.pData, 0, 0, 0, 0, 0, + *typelib_static_type_getByTypeClass(typelib_TypeClass_INTERFACE), 1, pMembers); + + typelib_typedescription_register(reinterpret_cast<typelib_TypeDescription**>(&pTD)); + typelib_typedescriptionreference_release(pMembers[0]); + + typelib_InterfaceMethodTypeDescription* pMethod = nullptr; + typelib_Parameter_Init aParameters[1]; + OUString sParamName0("Name"); + OUString sParamType0("string"); + aParameters[0].pParamName = sParamName0.pData; + aParameters[0].eTypeClass = typelib_TypeClass_STRING; + aParameters[0].pTypeName = sParamType0.pData; + aParameters[0].bIn = true; + aParameters[0].bOut = false; + rtl_uString* pExceptions[1]; + OUString sExceptionName0("com.sun.star.uno.RuntimeException"); + pExceptions[0] = sExceptionName0.pData; + OUString sReturnType0("any"); + typelib_typedescription_newInterfaceMethod(&pMethod, 3, false, sMethodName0.pData, + typelib_TypeClass_ANY, sReturnType0.pData, 1, + aParameters, 1, pExceptions); + typelib_typedescription_register(reinterpret_cast<typelib_TypeDescription**>(&pMethod)); + typelib_typedescription_release(&pMethod->aBase.aBase); + // another static ref: + ++reinterpret_cast<typelib_TypeDescription*>(pTD)->nStaticRefCount; + return pTD; + }(); + + return s_type_XCurrentContext; +} + +namespace { + +class ThreadKey +{ + bool _bInit; + oslThreadKey _hThreadKey; + oslThreadKeyCallbackFunction _pCallback; + +public: + oslThreadKey getThreadKey() + { + if (! _bInit) + { + MutexGuard aGuard( Mutex::getGlobalMutex() ); + if (! _bInit) + { + _hThreadKey = ::osl_createThreadKey( _pCallback ); + _bInit = true; + } + } + return _hThreadKey; + } + + explicit ThreadKey( oslThreadKeyCallbackFunction pCallback ) + : _bInit(false) + , _hThreadKey(nullptr) + , _pCallback(pCallback) + { + } + + ~ThreadKey() + { + if (_bInit) + { + ::osl_destroyThreadKey( _hThreadKey ); + } + } +}; + +} + +extern "C" { + +static void delete_IdContainer( void * p ) +{ + if (!p) + return; + + IdContainer * pId = static_cast< IdContainer * >( p ); + if (pId->pCurrentContext) + { + (*pId->pCurrentContextEnv->releaseInterface)( + pId->pCurrentContextEnv, pId->pCurrentContext ); + (*pId->pCurrentContextEnv->aBase.release)( + &pId->pCurrentContextEnv->aBase ); + } + if (pId->bInit) + { + ::rtl_byte_sequence_release( pId->pLocalThreadId ); + ::rtl_byte_sequence_release( pId->pCurrentId ); + } + delete pId; +} + +} + +IdContainer * getIdContainer() +{ + static ThreadKey s_key( delete_IdContainer ); + oslThreadKey aKey = s_key.getThreadKey(); + + IdContainer * pId = static_cast< IdContainer * >( ::osl_getThreadKeyData( aKey ) ); + if (! pId) + { + pId = new IdContainer; + pId->pCurrentContext = nullptr; + pId->pCurrentContextEnv = nullptr; + pId->bInit = false; + ::osl_setThreadKeyData( aKey, pId ); + } + return pId; +} + +} + + +extern "C" sal_Bool SAL_CALL uno_setCurrentContext( + void * pCurrentContext, + rtl_uString * pEnvTypeName, void * pEnvContext ) + SAL_THROW_EXTERN_C() +{ + IdContainer * pId = getIdContainer(); + OSL_ASSERT( pId ); + + // free old one + if (pId->pCurrentContext) + { + (*pId->pCurrentContextEnv->releaseInterface)( + pId->pCurrentContextEnv, pId->pCurrentContext ); + (*pId->pCurrentContextEnv->aBase.release)( + &pId->pCurrentContextEnv->aBase ); + pId->pCurrentContextEnv = nullptr; + + pId->pCurrentContext = nullptr; + } + + if (pCurrentContext) + { + uno_Environment * pEnv = nullptr; + ::uno_getEnvironment( &pEnv, pEnvTypeName, pEnvContext ); + OSL_ASSERT( pEnv && pEnv->pExtEnv ); + if (pEnv) + { + if (pEnv->pExtEnv) + { + pId->pCurrentContextEnv = pEnv->pExtEnv; + (*pId->pCurrentContextEnv->acquireInterface)( + pId->pCurrentContextEnv, pCurrentContext ); + pId->pCurrentContext = pCurrentContext; + } + else + { + (*pEnv->release)( pEnv ); + return false; + } + } + else + { + return false; + } + } + return true; +} + +extern "C" sal_Bool SAL_CALL uno_getCurrentContext( + void ** ppCurrentContext, rtl_uString * pEnvTypeName, void * pEnvContext ) + SAL_THROW_EXTERN_C() +{ + IdContainer * pId = getIdContainer(); + OSL_ASSERT( pId ); + + Environment target_env; + + // release inout parameter + if (*ppCurrentContext) + { + target_env = Environment(OUString(pEnvTypeName), pEnvContext); + OSL_ASSERT( target_env.is() ); + if (! target_env.is()) + return false; + uno_ExtEnvironment * pEnv = target_env.get()->pExtEnv; + OSL_ASSERT( nullptr != pEnv ); + if (nullptr == pEnv) + return false; + (*pEnv->releaseInterface)( pEnv, *ppCurrentContext ); + + *ppCurrentContext = nullptr; + } + + // case: null-ref + if (nullptr == pId->pCurrentContext) + return true; + + if (! target_env.is()) + { + target_env = Environment(OUString(pEnvTypeName), pEnvContext); + OSL_ASSERT( target_env.is() ); + if (! target_env.is()) + return false; + } + + Mapping mapping(&pId->pCurrentContextEnv->aBase, target_env.get()); + OSL_ASSERT( mapping.is() ); + if (! mapping.is()) + return false; + + mapping.mapInterface(ppCurrentContext, pId->pCurrentContext, ::cppu::get_type_XCurrentContext() ); + + return true; +} + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/cppu/source/threadpool/current.hxx b/cppu/source/threadpool/current.hxx new file mode 100644 index 000000000..ea26851dd --- /dev/null +++ b/cppu/source/threadpool/current.hxx @@ -0,0 +1,47 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#ifndef INCLUDED_CPPU_SOURCE_THREADPOOL_CURRENT_HXX +#define INCLUDED_CPPU_SOURCE_THREADPOOL_CURRENT_HXX + +#include <sal/config.h> + +#include <sal/types.h> + +struct _uno_ExtEnvironment; + +namespace cppu +{ +struct IdContainer +{ + void * pCurrentContext; + _uno_ExtEnvironment * pCurrentContextEnv; + + bool bInit; + sal_Sequence * pLocalThreadId; + sal_Int32 nRefCountOfCurrentId; + sal_Sequence * pCurrentId; +}; + +IdContainer * getIdContainer(); +} + +#endif + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/cppu/source/threadpool/jobqueue.cxx b/cppu/source/threadpool/jobqueue.cxx new file mode 100644 index 000000000..b4974fdc1 --- /dev/null +++ b/cppu/source/threadpool/jobqueue.cxx @@ -0,0 +1,191 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#include "jobqueue.hxx" +#include "threadpool.hxx" + +#include <osl/diagnose.h> + +using namespace ::osl; + +namespace cppu_threadpool { + + JobQueue::JobQueue() : + m_nToDo( 0 ), + m_bSuspended( false ), + m_DisposedCallerAdmin( DisposedCallerAdmin::getInstance() ) + { + } + + void JobQueue::add( void *pThreadSpecificData, RequestFun * doRequest ) + { + MutexGuard guard( m_mutex ); + Job job = { pThreadSpecificData , doRequest }; + m_lstJob.push_back( job ); + if( ! m_bSuspended ) + { + m_cndWait.set(); + } + m_nToDo ++; + } + + void *JobQueue::enter( sal_Int64 nDisposeId , bool bReturnWhenNoJob ) + { + void *pReturn = nullptr; + { + // synchronize with the dispose calls + MutexGuard guard( m_mutex ); + if( m_DisposedCallerAdmin->isDisposed( nDisposeId ) ) + { + return nullptr; + } + m_lstCallstack.push_front( nDisposeId ); + } + + + while( true ) + { + if( bReturnWhenNoJob ) + { + MutexGuard guard( m_mutex ); + if( m_lstJob.empty() ) + { + break; + } + } + + m_cndWait.wait(); + + struct Job job={nullptr,nullptr}; + { + // synchronize with add and dispose calls + MutexGuard guard( m_mutex ); + + if( 0 == m_lstCallstack.front() ) + { + // disposed ! + if (!m_lstJob.empty() && m_lstJob.front().doRequest == nullptr) { + // If this thread was waiting for a remote response, that response may or + // may not have been enqueued; if it has not been enqueued, there cannot be + // another enqueued response, so it is always correct to remove any enqueued + // response here: + m_lstJob.pop_front(); + } + if( m_lstJob.empty() + && (m_lstCallstack.empty() + || m_lstCallstack.front() != 0) ) + { + m_cndWait.reset(); + } + break; + } + + OSL_ASSERT( ! m_lstJob.empty() ); + if( ! m_lstJob.empty() ) + { + job = m_lstJob.front(); + m_lstJob.pop_front(); + } + if( m_lstJob.empty() + && (m_lstCallstack.empty() || m_lstCallstack.front() != 0) ) + { + m_cndWait.reset(); + } + } + + if( job.doRequest ) + { + job.doRequest( job.pThreadSpecificData ); + MutexGuard guard( m_mutex ); + m_nToDo --; + } + else + { + pReturn = job.pThreadSpecificData; + MutexGuard guard( m_mutex ); + m_nToDo --; + break; + } + } + + { + // synchronize with the dispose calls + MutexGuard guard( m_mutex ); + m_lstCallstack.pop_front(); + } + + return pReturn; + } + + void JobQueue::dispose( sal_Int64 nDisposeId ) + { + MutexGuard guard( m_mutex ); + for( auto& rId : m_lstCallstack ) + { + if( rId == nDisposeId ) + { + rId = 0; + } + } + + if( !m_lstCallstack.empty() && ! m_lstCallstack.front() ) + { + // The thread is waiting for a disposed pCallerId, let it go + m_cndWait.set(); + } + } + + void JobQueue::suspend() + { + MutexGuard guard( m_mutex ); + m_bSuspended = true; + } + + void JobQueue::resume() + { + MutexGuard guard( m_mutex ); + m_bSuspended = false; + if( ! m_lstJob.empty() ) + { + m_cndWait.set(); + } + } + + bool JobQueue::isEmpty() const + { + MutexGuard guard( m_mutex ); + return m_lstJob.empty(); + } + + bool JobQueue::isCallstackEmpty() const + { + MutexGuard guard( m_mutex ); + return m_lstCallstack.empty(); + } + + bool JobQueue::isBusy() const + { + MutexGuard guard( m_mutex ); + return m_nToDo > 0; + } + + +} + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/cppu/source/threadpool/jobqueue.hxx b/cppu/source/threadpool/jobqueue.hxx new file mode 100644 index 000000000..abfdfa19d --- /dev/null +++ b/cppu/source/threadpool/jobqueue.hxx @@ -0,0 +1,73 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#ifndef INCLUDED_CPPU_SOURCE_THREADPOOL_JOBQUEUE_HXX +#define INCLUDED_CPPU_SOURCE_THREADPOOL_JOBQUEUE_HXX + +#include <deque> +#include <memory> +#include <sal/types.h> + +#include <osl/conditn.hxx> +#include <osl/mutex.hxx> + +namespace cppu_threadpool +{ + extern "C" typedef void (RequestFun)(void *); + + struct Job + { + void *pThreadSpecificData; + RequestFun * doRequest; + }; + + class DisposedCallerAdmin; + typedef std::shared_ptr<DisposedCallerAdmin> DisposedCallerAdminHolder; + + class JobQueue + { + public: + JobQueue(); + + void add( void *pThreadSpecificData, RequestFun * doRequest ); + + void *enter( sal_Int64 nDisposeId , bool bReturnWhenNoJob = false ); + void dispose( sal_Int64 nDisposeId ); + + void suspend(); + void resume(); + + bool isEmpty() const; + bool isCallstackEmpty() const; + bool isBusy() const; + + private: + mutable ::osl::Mutex m_mutex; + std::deque < struct Job > m_lstJob; + std::deque<sal_Int64> m_lstCallstack; + sal_Int32 m_nToDo; + bool m_bSuspended; + osl::Condition m_cndWait; + DisposedCallerAdminHolder m_DisposedCallerAdmin; + }; +} + +#endif + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/cppu/source/threadpool/thread.cxx b/cppu/source/threadpool/thread.cxx new file mode 100644 index 000000000..2af33eff7 --- /dev/null +++ b/cppu/source/threadpool/thread.cxx @@ -0,0 +1,199 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#include <sal/config.h> + +#include <algorithm> +#include <cstdlib> +#include <osl/diagnose.h> +#include <uno/threadpool.h> +#include <sal/log.hxx> + +#include "thread.hxx" +#include "jobqueue.hxx" +#include "threadpool.hxx" + +using namespace osl; +using namespace rtl; + +namespace cppu_threadpool { + + + ThreadAdmin::ThreadAdmin(): m_disposed(false) {} + + ThreadAdmin::~ThreadAdmin() + { + SAL_WARN_IF(m_deque.size(), "cppu.threadpool", m_deque.size() << "Threads left"); + } + + bool ThreadAdmin::add( rtl::Reference< ORequestThread > const & p ) + { + MutexGuard aGuard( m_mutex ); + if( m_disposed ) + { + return false; + } + m_deque.push_back( p ); + return true; + } + + void ThreadAdmin::remove_locked( rtl::Reference< ORequestThread > const & p ) + { + m_deque.erase(std::find( m_deque.begin(), m_deque.end(), p ), m_deque.end()); + } + + void ThreadAdmin::remove( rtl::Reference< ORequestThread > const & p ) + { + MutexGuard aGuard( m_mutex ); + remove_locked( p ); + } + + void ThreadAdmin::join() + { + { + MutexGuard aGuard( m_mutex ); + m_disposed = true; + } + for (;;) + { + rtl::Reference< ORequestThread > pCurrent; + { + MutexGuard aGuard( m_mutex ); + if( m_deque.empty() ) + { + break; + } + pCurrent = m_deque.front(); + m_deque.pop_front(); + } + if (pCurrent->getIdentifier() + != osl::Thread::getCurrentIdentifier()) + { + pCurrent->join(); + } + } + } + + + ORequestThread::ORequestThread( ThreadPoolHolder const &aThreadPool, + JobQueue *pQueue, + const ByteSequence &aThreadId, + bool bAsynchron ) + : m_aThreadPool( aThreadPool ) + , m_pQueue( pQueue ) + , m_aThreadId( aThreadId ) + , m_bAsynchron( bAsynchron ) + {} + + ORequestThread::~ORequestThread() {} + + void ORequestThread::setTask( JobQueue *pQueue, + const ByteSequence &aThreadId, + bool bAsynchron ) + { + m_pQueue = pQueue; + m_aThreadId = aThreadId; + m_bAsynchron = bAsynchron; + } + + bool ORequestThread::launch() + { + // Assumption is that osl::Thread::create returns normally with a true + // return value iff it causes osl::Thread::run to start executing: + acquire(); + ThreadAdmin & rThreadAdmin = m_aThreadPool->getThreadAdmin(); + osl::ClearableMutexGuard g(rThreadAdmin.m_mutex); + if (!rThreadAdmin.add( this )) { + return false; + } + try { + if (!create()) { + std::abort(); + } + } catch (...) { + rThreadAdmin.remove_locked( this ); + g.clear(); + release(); + throw; + } + return true; + } + + void ORequestThread::onTerminated() + { + m_aThreadPool->getThreadAdmin().remove( this ); + release(); + } + + void ORequestThread::run() + { + osl_setThreadName("cppu_threadpool::ORequestThread"); + + try + { + while ( m_pQueue ) + { + if( ! m_bAsynchron ) + { + if ( !uno_bindIdToCurrentThread( m_aThreadId.getHandle() ) ) + { + OSL_ASSERT( false ); + } + } + + while( ! m_pQueue->isEmpty() ) + { + // Note : Oneways should not get a disposable disposeid, + // It does not make sense to dispose a call in this state. + // That's way we put it a disposeid, that can't be used otherwise. + m_pQueue->enter( + sal::static_int_cast< sal_Int64 >( + reinterpret_cast< sal_IntPtr >(this)), + true ); + + if( m_pQueue->isEmpty() ) + { + m_aThreadPool->revokeQueue( m_aThreadId , m_bAsynchron ); + // Note : revokeQueue might have failed because m_pQueue.isEmpty() + // may be false (race). + } + } + + delete m_pQueue; + m_pQueue = nullptr; + + if( ! m_bAsynchron ) + { + uno_releaseIdFromCurrentThread(); + } + + m_aThreadPool->waitInPool( this ); + } + } + catch (...) + { + // Work around the problem that onTerminated is not called if run + // throws an exception: + onTerminated(); + throw; + } + } +} + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/cppu/source/threadpool/thread.hxx b/cppu/source/threadpool/thread.hxx new file mode 100644 index 000000000..e9258a97b --- /dev/null +++ b/cppu/source/threadpool/thread.hxx @@ -0,0 +1,70 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ +#ifndef INCLUDED_CPPU_SOURCE_THREADPOOL_THREAD_HXX +#define INCLUDED_CPPU_SOURCE_THREADPOOL_THREAD_HXX + +#include <osl/thread.hxx> +#include <sal/types.h> +#include <salhelper/simplereferenceobject.hxx> + +#include "threadpool.hxx" + +namespace cppu_threadpool { + + class JobQueue; + + + // private thread class for the threadpool + + class ORequestThread: + public salhelper::SimpleReferenceObject, public osl::Thread + { + public: + ORequestThread( ThreadPoolHolder const &aThreadPool, + JobQueue * , + const ::rtl::ByteSequence &aThreadId, + bool bAsynchron ); + virtual ~ORequestThread() override; + + void setTask( JobQueue * , const ::rtl::ByteSequence & aThreadId , bool bAsynchron ); + + bool launch(); + + static void * operator new(std::size_t size) + { return SimpleReferenceObject::operator new(size); } + + static void operator delete(void * pointer) + { SimpleReferenceObject::operator delete(pointer); } + + private: + virtual void SAL_CALL run() override; + virtual void SAL_CALL onTerminated() override; + + ThreadPoolHolder m_aThreadPool; + JobQueue *m_pQueue; + ::rtl::ByteSequence m_aThreadId; + bool m_bAsynchron; + }; + +} // end cppu_threadpool + + +#endif + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/cppu/source/threadpool/threadident.cxx b/cppu/source/threadpool/threadident.cxx new file mode 100644 index 000000000..544acc7af --- /dev/null +++ b/cppu/source/threadpool/threadident.cxx @@ -0,0 +1,125 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#include <osl/thread.hxx> +#include <osl/diagnose.h> + +#include <rtl/process.h> +#include <rtl/byteseq.hxx> + +#include <uno/threadpool.h> + +#include "current.hxx" + + +using namespace ::std; +using namespace ::osl; +using namespace ::cppu; + + +static void createLocalId( sal_Sequence **ppThreadId ) +{ + rtl_byte_sequence_constructNoDefault( ppThreadId , 4 + 16 ); + sal_uInt32 id = osl::Thread::getCurrentIdentifier(); + (*ppThreadId)->elements[0] = id & 0xFF; + (*ppThreadId)->elements[1] = (id >> 8) & 0xFF; + (*ppThreadId)->elements[2] = (id >> 16) & 0xFF; + (*ppThreadId)->elements[3] = (id >> 24) & 0xFF; + rtl_getGlobalProcessId( reinterpret_cast<sal_uInt8 *>(&(*ppThreadId)->elements[4]) ); +} + + +extern "C" void SAL_CALL +uno_getIdOfCurrentThread( sal_Sequence **ppThreadId ) + SAL_THROW_EXTERN_C() +{ + IdContainer * p = getIdContainer(); + if( ! p->bInit ) + { + // first time, that the thread enters the bridge + createLocalId( ppThreadId ); + + // TODO + // note : this is a leak ! + p->pLocalThreadId = *ppThreadId; + p->pCurrentId = *ppThreadId; + p->nRefCountOfCurrentId = 1; + rtl_byte_sequence_acquire( p->pLocalThreadId ); + rtl_byte_sequence_acquire( p->pCurrentId ); + p->bInit = true; + } + else + { + p->nRefCountOfCurrentId ++; + if( *ppThreadId ) + { + rtl_byte_sequence_release( *ppThreadId ); + } + *ppThreadId = p->pCurrentId; + rtl_byte_sequence_acquire( *ppThreadId ); + } +} + + +extern "C" void SAL_CALL uno_releaseIdFromCurrentThread() + SAL_THROW_EXTERN_C() +{ + IdContainer *p = getIdContainer(); + OSL_ASSERT( p ); + OSL_ASSERT( p->bInit ); + OSL_ASSERT( p->nRefCountOfCurrentId ); + + p->nRefCountOfCurrentId --; + if( ! p->nRefCountOfCurrentId && (p->pLocalThreadId != p->pCurrentId) ) + { + rtl_byte_sequence_assign( &(p->pCurrentId) , p->pLocalThreadId ); + } +} + +extern "C" sal_Bool SAL_CALL uno_bindIdToCurrentThread( sal_Sequence *pThreadId ) + SAL_THROW_EXTERN_C() +{ + IdContainer *p = getIdContainer(); + if( ! p->bInit ) + { + p->pLocalThreadId = nullptr; + createLocalId( &(p->pLocalThreadId) ); + p->nRefCountOfCurrentId = 1; + p->pCurrentId = pThreadId; + rtl_byte_sequence_acquire( p->pCurrentId ); + p->bInit = true; + } + else + { + OSL_ASSERT( 0 == p->nRefCountOfCurrentId ); + if( 0 == p->nRefCountOfCurrentId ) + { + rtl_byte_sequence_assign(&( p->pCurrentId ), pThreadId ); + p->nRefCountOfCurrentId ++; + } + else + { + return false; + } + + } + return true; +} + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/cppu/source/threadpool/threadpool.cxx b/cppu/source/threadpool/threadpool.cxx new file mode 100644 index 000000000..d11268b85 --- /dev/null +++ b/cppu/source/threadpool/threadpool.cxx @@ -0,0 +1,487 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#include <sal/config.h> + +#include <cassert> +#include <chrono> +#include <algorithm> +#include <unordered_map> + +#include <osl/diagnose.h> +#include <osl/mutex.hxx> +#include <rtl/instance.hxx> +#include <sal/log.hxx> + +#include <uno/threadpool.h> + +#include "threadpool.hxx" +#include "thread.hxx" + +using namespace ::std; +using namespace ::osl; +using namespace ::rtl; + +namespace cppu_threadpool +{ + WaitingThread::WaitingThread( + rtl::Reference<ORequestThread> const & theThread): thread(theThread) + {} + + namespace { + + struct theDisposedCallerAdmin : + public rtl::StaticWithInit< DisposedCallerAdminHolder, theDisposedCallerAdmin > + { + DisposedCallerAdminHolder operator () () { + return std::make_shared<DisposedCallerAdmin>(); + } + }; + + } + + DisposedCallerAdminHolder const & DisposedCallerAdmin::getInstance() + { + return theDisposedCallerAdmin::get(); + } + + DisposedCallerAdmin::~DisposedCallerAdmin() + { + SAL_WARN_IF( !m_vector.empty(), "cppu.threadpool", "DisposedCallerList : " << m_vector.size() << " left"); + } + + void DisposedCallerAdmin::dispose( sal_Int64 nDisposeId ) + { + MutexGuard guard( m_mutex ); + m_vector.push_back( nDisposeId ); + } + + void DisposedCallerAdmin::destroy( sal_Int64 nDisposeId ) + { + MutexGuard guard( m_mutex ); + m_vector.erase(std::remove(m_vector.begin(), m_vector.end(), nDisposeId), m_vector.end()); + } + + bool DisposedCallerAdmin::isDisposed( sal_Int64 nDisposeId ) + { + MutexGuard guard( m_mutex ); + return (std::find(m_vector.begin(), m_vector.end(), nDisposeId) != m_vector.end()); + } + + + ThreadPool::ThreadPool() : + m_DisposedCallerAdmin( DisposedCallerAdmin::getInstance() ) + { + } + + ThreadPool::~ThreadPool() + { + SAL_WARN_IF( m_mapQueue.size(), "cppu.threadpool", "ThreadIdHashMap: " << m_mapQueue.size() << " left"); + } + + void ThreadPool::dispose( sal_Int64 nDisposeId ) + { + m_DisposedCallerAdmin->dispose( nDisposeId ); + + MutexGuard guard( m_mutex ); + for (auto const& item : m_mapQueue) + { + if( item.second.first ) + { + item.second.first->dispose( nDisposeId ); + } + if( item.second.second ) + { + item.second.second->dispose( nDisposeId ); + } + } + } + + void ThreadPool::destroy( sal_Int64 nDisposeId ) + { + m_DisposedCallerAdmin->destroy( nDisposeId ); + } + + /****************** + * This methods lets the thread wait a certain amount of time. If within this timespan + * a new request comes in, this thread is reused. This is done only to improve performance, + * it is not required for threadpool functionality. + ******************/ + void ThreadPool::waitInPool( rtl::Reference< ORequestThread > const & pThread ) + { + WaitingThread waitingThread(pThread); + { + MutexGuard guard( m_mutexWaitingThreadList ); + m_dequeThreads.push_front( &waitingThread ); + } + + // let the thread wait 2 seconds + waitingThread.condition.wait( std::chrono::seconds(2) ); + + { + MutexGuard guard ( m_mutexWaitingThreadList ); + if( waitingThread.thread.is() ) + { + // thread wasn't reused, remove it from the list + WaitingThreadDeque::iterator ii = find( + m_dequeThreads.begin(), m_dequeThreads.end(), &waitingThread ); + OSL_ASSERT( ii != m_dequeThreads.end() ); + m_dequeThreads.erase( ii ); + } + } + } + + void ThreadPool::joinWorkers() + { + { + MutexGuard guard( m_mutexWaitingThreadList ); + for (auto const& thread : m_dequeThreads) + { + // wake the threads up + thread->condition.set(); + } + } + m_aThreadAdmin.join(); + } + + bool ThreadPool::createThread( JobQueue *pQueue , + const ByteSequence &aThreadId, + bool bAsynchron ) + { + { + // Can a thread be reused ? + MutexGuard guard( m_mutexWaitingThreadList ); + if( ! m_dequeThreads.empty() ) + { + // inform the thread and let it go + struct WaitingThread *pWaitingThread = m_dequeThreads.back(); + pWaitingThread->thread->setTask( pQueue , aThreadId , bAsynchron ); + pWaitingThread->thread = nullptr; + + // remove from list + m_dequeThreads.pop_back(); + + // let the thread go + pWaitingThread->condition.set(); + return true; + } + } + + rtl::Reference pThread( + new ORequestThread( this, pQueue , aThreadId, bAsynchron) ); + return pThread->launch(); + } + + bool ThreadPool::revokeQueue( const ByteSequence &aThreadId, bool bAsynchron ) + { + MutexGuard guard( m_mutex ); + + ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId ); + OSL_ASSERT( ii != m_mapQueue.end() ); + + if( bAsynchron ) + { + if( ! (*ii).second.second->isEmpty() ) + { + // another thread has put something into the queue + return false; + } + + (*ii).second.second = nullptr; + if( (*ii).second.first ) + { + // all oneway request have been processed, now + // synchronous requests may go on + (*ii).second.first->resume(); + } + } + else + { + if( ! (*ii).second.first->isEmpty() ) + { + // another thread has put something into the queue + return false; + } + (*ii).second.first = nullptr; + } + + if( nullptr == (*ii).second.first && nullptr == (*ii).second.second ) + { + m_mapQueue.erase( ii ); + } + + return true; + } + + + bool ThreadPool::addJob( + const ByteSequence &aThreadId , + bool bAsynchron, + void *pThreadSpecificData, + RequestFun * doRequest ) + { + bool bCreateThread = false; + JobQueue *pQueue = nullptr; + { + MutexGuard guard( m_mutex ); + + ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId ); + + if( ii == m_mapQueue.end() ) + { + m_mapQueue[ aThreadId ] = pair < JobQueue * , JobQueue * > ( nullptr , nullptr ); + ii = m_mapQueue.find( aThreadId ); + OSL_ASSERT( ii != m_mapQueue.end() ); + } + + if( bAsynchron ) + { + if( ! (*ii).second.second ) + { + (*ii).second.second = new JobQueue(); + bCreateThread = true; + } + pQueue = (*ii).second.second; + } + else + { + if( ! (*ii).second.first ) + { + (*ii).second.first = new JobQueue(); + bCreateThread = true; + } + pQueue = (*ii).second.first; + + if( (*ii).second.second && ( (*ii).second.second->isBusy() ) ) + { + pQueue->suspend(); + } + } + pQueue->add( pThreadSpecificData , doRequest ); + } + + return !bCreateThread || createThread( pQueue , aThreadId , bAsynchron); + } + + void ThreadPool::prepare( const ByteSequence &aThreadId ) + { + MutexGuard guard( m_mutex ); + + ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId ); + + if( ii == m_mapQueue.end() ) + { + JobQueue *p = new JobQueue(); + m_mapQueue[ aThreadId ] = pair< JobQueue * , JobQueue * > ( p , nullptr ); + } + else if( nullptr == (*ii).second.first ) + { + (*ii).second.first = new JobQueue(); + } + } + + void * ThreadPool::enter( const ByteSequence & aThreadId , sal_Int64 nDisposeId ) + { + JobQueue *pQueue = nullptr; + { + MutexGuard guard( m_mutex ); + + ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId ); + + OSL_ASSERT( ii != m_mapQueue.end() ); + pQueue = (*ii).second.first; + } + + OSL_ASSERT( pQueue ); + void *pReturn = pQueue->enter( nDisposeId ); + + if( pQueue->isCallstackEmpty() ) + { + if( revokeQueue( aThreadId , false) ) + { + // remove queue + delete pQueue; + } + } + return pReturn; + } +} + +// All uno_ThreadPool handles in g_pThreadpoolHashSet with overlapping life +// spans share one ThreadPool instance. When g_pThreadpoolHashSet becomes empty +// (within the last uno_threadpool_destroy) all worker threads spawned by that +// ThreadPool instance are joined (which implies that uno_threadpool_destroy +// must never be called from a worker thread); afterwards, the next call to +// uno_threadpool_create (if any) will lead to a new ThreadPool instance. + +using namespace cppu_threadpool; + +namespace { + +struct uno_ThreadPool_Equal +{ + bool operator () ( const uno_ThreadPool &a , const uno_ThreadPool &b ) const + { + return a == b; + } +}; + +struct uno_ThreadPool_Hash +{ + std::size_t operator () ( const uno_ThreadPool &a ) const + { + return reinterpret_cast<std::size_t>( a ); + } +}; + +} + +typedef std::unordered_map< uno_ThreadPool, ThreadPoolHolder, uno_ThreadPool_Hash, uno_ThreadPool_Equal > ThreadpoolHashSet; + +static ThreadpoolHashSet *g_pThreadpoolHashSet; + +struct _uno_ThreadPool +{ + sal_Int32 dummy; +}; + +namespace { + +ThreadPoolHolder getThreadPool( uno_ThreadPool hPool ) +{ + MutexGuard guard( Mutex::getGlobalMutex() ); + assert( g_pThreadpoolHashSet != nullptr ); + ThreadpoolHashSet::iterator i( g_pThreadpoolHashSet->find(hPool) ); + assert( i != g_pThreadpoolHashSet->end() ); + return i->second; +} + +} + +extern "C" uno_ThreadPool SAL_CALL +uno_threadpool_create() SAL_THROW_EXTERN_C() +{ + MutexGuard guard( Mutex::getGlobalMutex() ); + ThreadPoolHolder p; + if( ! g_pThreadpoolHashSet ) + { + g_pThreadpoolHashSet = new ThreadpoolHashSet; + p = new ThreadPool; + } + else + { + assert( !g_pThreadpoolHashSet->empty() ); + p = g_pThreadpoolHashSet->begin()->second; + } + + // Just ensure that the handle is unique in the process (via heap) + uno_ThreadPool h = new struct _uno_ThreadPool; + g_pThreadpoolHashSet->emplace( h, p ); + return h; +} + +extern "C" void SAL_CALL +uno_threadpool_attach( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C() +{ + sal_Sequence *pThreadId = nullptr; + uno_getIdOfCurrentThread( &pThreadId ); + getThreadPool( hPool )->prepare( pThreadId ); + rtl_byte_sequence_release( pThreadId ); + uno_releaseIdFromCurrentThread(); +} + +extern "C" void SAL_CALL +uno_threadpool_enter( uno_ThreadPool hPool , void **ppJob ) + SAL_THROW_EXTERN_C() +{ + sal_Sequence *pThreadId = nullptr; + uno_getIdOfCurrentThread( &pThreadId ); + *ppJob = + getThreadPool( hPool )->enter( + pThreadId, + sal::static_int_cast< sal_Int64 >( + reinterpret_cast< sal_IntPtr >(hPool)) ); + rtl_byte_sequence_release( pThreadId ); + uno_releaseIdFromCurrentThread(); +} + +extern "C" void SAL_CALL +uno_threadpool_detach(SAL_UNUSED_PARAMETER uno_ThreadPool) SAL_THROW_EXTERN_C() +{ + // we might do here some tidying up in case a thread called attach but never detach +} + +extern "C" void SAL_CALL +uno_threadpool_putJob( + uno_ThreadPool hPool, + sal_Sequence *pThreadId, + void *pJob, + void ( SAL_CALL * doRequest ) ( void *pThreadSpecificData ), + sal_Bool bIsOneway ) SAL_THROW_EXTERN_C() +{ + if (!getThreadPool(hPool)->addJob( pThreadId, bIsOneway, pJob ,doRequest )) + { + SAL_WARN( + "cppu.threadpool", + "uno_threadpool_putJob in parallel with uno_threadpool_destroy"); + } +} + +extern "C" void SAL_CALL +uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C() +{ + getThreadPool(hPool)->dispose( + sal::static_int_cast< sal_Int64 >( + reinterpret_cast< sal_IntPtr >(hPool)) ); +} + +extern "C" void SAL_CALL +uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C() +{ + ThreadPoolHolder p( getThreadPool(hPool) ); + p->destroy( + sal::static_int_cast< sal_Int64 >( + reinterpret_cast< sal_IntPtr >(hPool)) ); + + bool empty; + { + OSL_ASSERT( g_pThreadpoolHashSet ); + + MutexGuard guard( Mutex::getGlobalMutex() ); + + ThreadpoolHashSet::iterator ii = g_pThreadpoolHashSet->find( hPool ); + OSL_ASSERT( ii != g_pThreadpoolHashSet->end() ); + g_pThreadpoolHashSet->erase( ii ); + delete hPool; + + empty = g_pThreadpoolHashSet->empty(); + if( empty ) + { + delete g_pThreadpoolHashSet; + g_pThreadpoolHashSet = nullptr; + } + } + + if( empty ) + { + p->joinWorkers(); + } +} + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/cppu/source/threadpool/threadpool.hxx b/cppu/source/threadpool/threadpool.hxx new file mode 100644 index 000000000..85188fc52 --- /dev/null +++ b/cppu/source/threadpool/threadpool.hxx @@ -0,0 +1,163 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#ifndef INCLUDED_CPPU_SOURCE_THREADPOOL_THREADPOOL_HXX +#define INCLUDED_CPPU_SOURCE_THREADPOOL_THREADPOOL_HXX + +#include <vector> +#include <unordered_map> + +#include <osl/conditn.hxx> + +#include <rtl/byteseq.hxx> +#include <rtl/ref.hxx> +#include <salhelper/simplereferenceobject.hxx> + +#include "jobqueue.hxx" + + +namespace cppu_threadpool { + class ORequestThread; + + struct EqualThreadId + { + bool operator () ( const ::rtl::ByteSequence &a , const ::rtl::ByteSequence &b ) const + { + return a == b; + } + }; + + struct HashThreadId + { + sal_Int32 operator () ( const ::rtl::ByteSequence &a ) const + { + if( a.getLength() >= 4 ) + { + return *reinterpret_cast<sal_Int32 const *>(a.getConstArray()); + } + return 0; + } + }; + + typedef std::unordered_map + < + ::rtl::ByteSequence, // ThreadID + std::pair < JobQueue * , JobQueue * >, + HashThreadId, + EqualThreadId + > ThreadIdHashMap; + + struct WaitingThread + { + osl::Condition condition; + rtl::Reference< ORequestThread > thread; + + explicit WaitingThread( + rtl::Reference<ORequestThread> const & theThread); + }; + + typedef std::deque< struct ::cppu_threadpool::WaitingThread * > WaitingThreadDeque; + + class DisposedCallerAdmin; + typedef std::shared_ptr<DisposedCallerAdmin> DisposedCallerAdminHolder; + + class DisposedCallerAdmin + { + public: + ~DisposedCallerAdmin(); + + static DisposedCallerAdminHolder const & getInstance(); + + void dispose( sal_Int64 nDisposeId ); + void destroy( sal_Int64 nDisposeId ); + bool isDisposed( sal_Int64 nDisposeId ); + + private: + ::osl::Mutex m_mutex; + std::vector< sal_Int64 > m_vector; + }; + + class ThreadAdmin + { + public: + ThreadAdmin(); + ~ThreadAdmin (); + + bool add( rtl::Reference< ORequestThread > const & ); + void remove( rtl::Reference< ORequestThread > const & ); + void join(); + + void remove_locked( rtl::Reference< ORequestThread > const & ); + ::osl::Mutex m_mutex; + + private: + std::deque< rtl::Reference< ORequestThread > > m_deque; + bool m_disposed; + }; + + class ThreadPool; + typedef rtl::Reference<ThreadPool> ThreadPoolHolder; + + class ThreadPool: public salhelper::SimpleReferenceObject + { + public: + ThreadPool(); + virtual ~ThreadPool() override; + + void dispose( sal_Int64 nDisposeId ); + void destroy( sal_Int64 nDisposeId ); + + bool addJob( const ::rtl::ByteSequence &aThreadId, + bool bAsynchron, + void *pThreadSpecificData, + RequestFun * doRequest ); + + void prepare( const ::rtl::ByteSequence &aThreadId ); + void * enter( const ::rtl::ByteSequence &aThreadId, sal_Int64 nDisposeId ); + + /******** + * @return true, if queue could be successfully revoked. + ********/ + bool revokeQueue( const ::rtl::ByteSequence & aThreadId , bool bAsynchron ); + + void waitInPool( rtl::Reference< ORequestThread > const & pThread ); + + void joinWorkers(); + + ThreadAdmin & getThreadAdmin() { return m_aThreadAdmin; } + + private: + bool createThread( JobQueue *pQueue, const ::rtl::ByteSequence &aThreadId, bool bAsynchron); + + + ThreadIdHashMap m_mapQueue; + ::osl::Mutex m_mutex; + + ::osl::Mutex m_mutexWaitingThreadList; + WaitingThreadDeque m_dequeThreads; + + DisposedCallerAdminHolder m_DisposedCallerAdmin; + ThreadAdmin m_aThreadAdmin; + }; + +} // end namespace cppu_threadpool + +#endif + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ |