summaryrefslogtreecommitdiffstats
path: root/cppu/source/threadpool
diff options
context:
space:
mode:
Diffstat (limited to 'cppu/source/threadpool')
-rw-r--r--cppu/source/threadpool/current.cxx210
-rw-r--r--cppu/source/threadpool/current.hxx47
-rw-r--r--cppu/source/threadpool/jobqueue.cxx177
-rw-r--r--cppu/source/threadpool/jobqueue.hxx72
-rw-r--r--cppu/source/threadpool/thread.cxx198
-rw-r--r--cppu/source/threadpool/thread.hxx67
-rw-r--r--cppu/source/threadpool/threadident.cxx120
-rw-r--r--cppu/source/threadpool/threadpool.cxx476
-rw-r--r--cppu/source/threadpool/threadpool.hxx162
9 files changed, 1529 insertions, 0 deletions
diff --git a/cppu/source/threadpool/current.cxx b/cppu/source/threadpool/current.cxx
new file mode 100644
index 000000000..64e6bfb8f
--- /dev/null
+++ b/cppu/source/threadpool/current.cxx
@@ -0,0 +1,210 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * This file incorporates work covered by the following license notice:
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding copyright
+ * ownership. The ASF licenses this file to you under the Apache
+ * License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of
+ * the License at http://www.apache.org/licenses/LICENSE-2.0 .
+ */
+
+#include <sal/config.h>
+
+#include <rtl/byteseq.h>
+#include <osl/mutex.hxx>
+
+#include <uno/current_context.h>
+#include <uno/environment.hxx>
+#include <uno/mapping.hxx>
+#include <typelib/typedescription.h>
+
+#include "current.hxx"
+
+
+using namespace ::osl;
+using namespace ::rtl;
+using namespace ::cppu;
+using namespace ::com::sun::star::uno;
+
+namespace cppu
+{
+
+static typelib_InterfaceTypeDescription * get_type_XCurrentContext()
+{
+ static typelib_InterfaceTypeDescription* s_type_XCurrentContext = []() {
+ OUString sTypeName("com.sun.star.uno.XCurrentContext");
+ typelib_InterfaceTypeDescription* pTD = nullptr;
+ typelib_TypeDescriptionReference* pMembers[1] = { nullptr };
+ OUString sMethodName0("com.sun.star.uno.XCurrentContext::getValueByName");
+ typelib_typedescriptionreference_new(&pMembers[0], typelib_TypeClass_INTERFACE_METHOD,
+ sMethodName0.pData);
+ typelib_typedescription_newInterface(
+ &pTD, sTypeName.pData, 0, 0, 0, 0, 0,
+ *typelib_static_type_getByTypeClass(typelib_TypeClass_INTERFACE), 1, pMembers);
+
+ typelib_typedescription_register(reinterpret_cast<typelib_TypeDescription**>(&pTD));
+ typelib_typedescriptionreference_release(pMembers[0]);
+
+ typelib_InterfaceMethodTypeDescription* pMethod = nullptr;
+ typelib_Parameter_Init aParameters[1];
+ OUString sParamName0("Name");
+ OUString sParamType0("string");
+ aParameters[0].pParamName = sParamName0.pData;
+ aParameters[0].eTypeClass = typelib_TypeClass_STRING;
+ aParameters[0].pTypeName = sParamType0.pData;
+ aParameters[0].bIn = true;
+ aParameters[0].bOut = false;
+ rtl_uString* pExceptions[1];
+ OUString sExceptionName0("com.sun.star.uno.RuntimeException");
+ pExceptions[0] = sExceptionName0.pData;
+ OUString sReturnType0("any");
+ typelib_typedescription_newInterfaceMethod(&pMethod, 3, false, sMethodName0.pData,
+ typelib_TypeClass_ANY, sReturnType0.pData, 1,
+ aParameters, 1, pExceptions);
+ typelib_typedescription_register(reinterpret_cast<typelib_TypeDescription**>(&pMethod));
+ typelib_typedescription_release(&pMethod->aBase.aBase);
+ // another static ref:
+ ++reinterpret_cast<typelib_TypeDescription*>(pTD)->nStaticRefCount;
+ return pTD;
+ }();
+
+ return s_type_XCurrentContext;
+}
+
+IdContainer::IdContainer()
+ : pCurrentContext(nullptr)
+ , pCurrentContextEnv(nullptr)
+ , pLocalThreadId(nullptr)
+ , pCurrentId(nullptr)
+ , nRefCountOfCurrentId(0)
+ , bInit(false)
+{
+}
+
+IdContainer::~IdContainer()
+{
+ if (pCurrentContext)
+ {
+ (*pCurrentContextEnv->releaseInterface)(
+ pCurrentContextEnv, pCurrentContext );
+ (*pCurrentContextEnv->aBase.release)(
+ &pCurrentContextEnv->aBase );
+ }
+ if (bInit)
+ {
+ ::rtl_byte_sequence_release( pLocalThreadId );
+ ::rtl_byte_sequence_release( pCurrentId );
+ }
+}
+
+IdContainer& getIdContainer()
+{
+ static thread_local IdContainer aId;
+ return aId;
+}
+
+}
+
+extern "C" sal_Bool SAL_CALL uno_setCurrentContext(
+ void * pCurrentContext,
+ rtl_uString * pEnvTypeName, void * pEnvContext )
+ SAL_THROW_EXTERN_C()
+{
+ IdContainer& id = getIdContainer();
+
+ // free old one
+ if (id.pCurrentContext)
+ {
+ (*id.pCurrentContextEnv->releaseInterface)(
+ id.pCurrentContextEnv, id.pCurrentContext );
+ (*id.pCurrentContextEnv->aBase.release)(
+ &id.pCurrentContextEnv->aBase );
+ id.pCurrentContextEnv = nullptr;
+
+ id.pCurrentContext = nullptr;
+ }
+
+ if (!pCurrentContext)
+ return true;
+
+ uno_Environment * pEnv = nullptr;
+ ::uno_getEnvironment( &pEnv, pEnvTypeName, pEnvContext );
+ OSL_ASSERT( pEnv && pEnv->pExtEnv );
+ if (pEnv)
+ {
+ if (pEnv->pExtEnv)
+ {
+ id.pCurrentContextEnv = pEnv->pExtEnv;
+ (*id.pCurrentContextEnv->acquireInterface)(
+ id.pCurrentContextEnv, pCurrentContext );
+ id.pCurrentContext = pCurrentContext;
+ }
+ else
+ {
+ (*pEnv->release)( pEnv );
+ return false;
+ }
+ }
+ else
+ {
+ return false;
+ }
+ return true;
+}
+
+extern "C" sal_Bool SAL_CALL uno_getCurrentContext(
+ void ** ppCurrentContext, rtl_uString * pEnvTypeName, void * pEnvContext )
+ SAL_THROW_EXTERN_C()
+{
+ IdContainer& id = getIdContainer();
+
+ Environment target_env;
+
+ // release inout parameter
+ if (*ppCurrentContext)
+ {
+ target_env = Environment(OUString(pEnvTypeName), pEnvContext);
+ OSL_ASSERT( target_env.is() );
+ if (! target_env.is())
+ return false;
+ uno_ExtEnvironment * pEnv = target_env.get()->pExtEnv;
+ OSL_ASSERT( nullptr != pEnv );
+ if (nullptr == pEnv)
+ return false;
+ (*pEnv->releaseInterface)( pEnv, *ppCurrentContext );
+
+ *ppCurrentContext = nullptr;
+ }
+
+ // case: null-ref
+ if (nullptr == id.pCurrentContext)
+ return true;
+
+ if (! target_env.is())
+ {
+ target_env = Environment(OUString(pEnvTypeName), pEnvContext);
+ OSL_ASSERT( target_env.is() );
+ if (! target_env.is())
+ return false;
+ }
+
+ Mapping mapping(&id.pCurrentContextEnv->aBase, target_env.get());
+ OSL_ASSERT( mapping.is() );
+ if (! mapping.is())
+ return false;
+
+ mapping.mapInterface(ppCurrentContext, id.pCurrentContext, ::cppu::get_type_XCurrentContext());
+
+ return true;
+}
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/cppu/source/threadpool/current.hxx b/cppu/source/threadpool/current.hxx
new file mode 100644
index 000000000..1f6ce6642
--- /dev/null
+++ b/cppu/source/threadpool/current.hxx
@@ -0,0 +1,47 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * This file incorporates work covered by the following license notice:
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding copyright
+ * ownership. The ASF licenses this file to you under the Apache
+ * License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of
+ * the License at http://www.apache.org/licenses/LICENSE-2.0 .
+ */
+
+#pragma once
+
+#include <sal/config.h>
+
+#include <sal/types.h>
+
+struct _uno_ExtEnvironment;
+
+namespace cppu
+{
+struct IdContainer
+{
+ void * pCurrentContext;
+ _uno_ExtEnvironment * pCurrentContextEnv;
+
+ sal_Sequence * pLocalThreadId;
+ sal_Sequence * pCurrentId;
+ sal_Int32 nRefCountOfCurrentId;
+ bool bInit;
+
+ IdContainer();
+ ~IdContainer();
+};
+
+IdContainer& getIdContainer();
+}
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/cppu/source/threadpool/jobqueue.cxx b/cppu/source/threadpool/jobqueue.cxx
new file mode 100644
index 000000000..1be424024
--- /dev/null
+++ b/cppu/source/threadpool/jobqueue.cxx
@@ -0,0 +1,177 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * This file incorporates work covered by the following license notice:
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding copyright
+ * ownership. The ASF licenses this file to you under the Apache
+ * License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of
+ * the License at http://www.apache.org/licenses/LICENSE-2.0 .
+ */
+
+#include <sal/config.h>
+
+#include <cassert>
+
+#include "jobqueue.hxx"
+#include "threadpool.hxx"
+
+namespace cppu_threadpool {
+
+ JobQueue::JobQueue() :
+ m_nToDo( 0 ),
+ m_bSuspended( false ),
+ m_DisposedCallerAdmin( DisposedCallerAdmin::getInstance() )
+ {
+ }
+
+ void JobQueue::add( void *pThreadSpecificData, RequestFun * doRequest )
+ {
+ std::scoped_lock guard( m_mutex );
+ Job job = { pThreadSpecificData , doRequest };
+ m_lstJob.push_back( job );
+ if( ! m_bSuspended )
+ {
+ m_cndWait.notify_all();
+ }
+ m_nToDo ++;
+ }
+
+ void *JobQueue::enter( void const * nDisposeId , bool bReturnWhenNoJob )
+ {
+ void *pReturn = nullptr;
+ {
+ // synchronize with the dispose calls
+ std::scoped_lock guard( m_mutex );
+ if( m_DisposedCallerAdmin->isDisposed( nDisposeId ) )
+ {
+ return nullptr;
+ }
+ m_lstCallstack.push_front( nDisposeId );
+ }
+
+
+ while( true )
+ {
+ struct Job job={nullptr,nullptr};
+ {
+ std::unique_lock guard( m_mutex );
+
+ while (m_bSuspended
+ || (m_lstCallstack.front() != nullptr && !bReturnWhenNoJob
+ && m_lstJob.empty()))
+ {
+ m_cndWait.wait(guard);
+ }
+
+ if( nullptr == m_lstCallstack.front() )
+ {
+ // disposed !
+ if (!m_lstJob.empty() && m_lstJob.front().doRequest == nullptr) {
+ // If this thread was waiting for a remote response, that response may or
+ // may not have been enqueued; if it has not been enqueued, there cannot be
+ // another enqueued response, so it is always correct to remove any enqueued
+ // response here:
+ m_lstJob.pop_front();
+ }
+ break;
+ }
+
+ if( m_lstJob.empty() )
+ {
+ assert(bReturnWhenNoJob);
+ break;
+ }
+
+ job = m_lstJob.front();
+ m_lstJob.pop_front();
+ }
+
+ if( job.doRequest )
+ {
+ job.doRequest( job.pThreadSpecificData );
+ std::scoped_lock guard( m_mutex );
+ m_nToDo --;
+ }
+ else
+ {
+ pReturn = job.pThreadSpecificData;
+ std::scoped_lock guard( m_mutex );
+ m_nToDo --;
+ break;
+ }
+ }
+
+ {
+ // synchronize with the dispose calls
+ std::scoped_lock guard( m_mutex );
+ m_lstCallstack.pop_front();
+ }
+
+ return pReturn;
+ }
+
+ void JobQueue::dispose( void const * nDisposeId )
+ {
+ std::scoped_lock guard( m_mutex );
+ for( auto& rId : m_lstCallstack )
+ {
+ if( rId == nDisposeId )
+ {
+ rId = nullptr;
+ }
+ }
+
+ if( !m_lstCallstack.empty() && ! m_lstCallstack.front() )
+ {
+ // The thread is waiting for a disposed pCallerId, let it go
+ m_cndWait.notify_all();
+ }
+ }
+
+ void JobQueue::suspend()
+ {
+ std::scoped_lock guard( m_mutex );
+ m_bSuspended = true;
+ }
+
+ void JobQueue::resume()
+ {
+ std::scoped_lock guard( m_mutex );
+ m_bSuspended = false;
+ if( ! m_lstJob.empty() )
+ {
+ m_cndWait.notify_all();
+ }
+ }
+
+ bool JobQueue::isEmpty() const
+ {
+ std::scoped_lock guard( m_mutex );
+ return m_lstJob.empty();
+ }
+
+ bool JobQueue::isCallstackEmpty() const
+ {
+ std::scoped_lock guard( m_mutex );
+ return m_lstCallstack.empty();
+ }
+
+ bool JobQueue::isBusy() const
+ {
+ std::scoped_lock guard( m_mutex );
+ return m_nToDo > 0;
+ }
+
+
+}
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/cppu/source/threadpool/jobqueue.hxx b/cppu/source/threadpool/jobqueue.hxx
new file mode 100644
index 000000000..5b92f2476
--- /dev/null
+++ b/cppu/source/threadpool/jobqueue.hxx
@@ -0,0 +1,72 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * This file incorporates work covered by the following license notice:
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding copyright
+ * ownership. The ASF licenses this file to you under the Apache
+ * License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of
+ * the License at http://www.apache.org/licenses/LICENSE-2.0 .
+ */
+
+#pragma once
+
+#include <sal/config.h>
+
+#include <condition_variable>
+#include <deque>
+#include <memory>
+#include <mutex>
+
+#include <sal/types.h>
+
+namespace cppu_threadpool
+{
+ extern "C" typedef void (RequestFun)(void *);
+
+ struct Job
+ {
+ void *pThreadSpecificData;
+ RequestFun * doRequest;
+ };
+
+ class DisposedCallerAdmin;
+ typedef std::shared_ptr<DisposedCallerAdmin> DisposedCallerAdminHolder;
+
+ class JobQueue
+ {
+ public:
+ JobQueue();
+
+ void add( void *pThreadSpecificData, RequestFun * doRequest );
+
+ void *enter( void const * nDisposeId , bool bReturnWhenNoJob = false );
+ void dispose( void const * nDisposeId );
+
+ void suspend();
+ void resume();
+
+ bool isEmpty() const;
+ bool isCallstackEmpty() const;
+ bool isBusy() const;
+
+ private:
+ mutable std::mutex m_mutex;
+ std::deque < struct Job > m_lstJob;
+ std::deque<void const *> m_lstCallstack;
+ sal_Int32 m_nToDo;
+ bool m_bSuspended;
+ std::condition_variable m_cndWait;
+ DisposedCallerAdminHolder m_DisposedCallerAdmin;
+ };
+}
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/cppu/source/threadpool/thread.cxx b/cppu/source/threadpool/thread.cxx
new file mode 100644
index 000000000..56e52838b
--- /dev/null
+++ b/cppu/source/threadpool/thread.cxx
@@ -0,0 +1,198 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * This file incorporates work covered by the following license notice:
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding copyright
+ * ownership. The ASF licenses this file to you under the Apache
+ * License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of
+ * the License at http://www.apache.org/licenses/LICENSE-2.0 .
+ */
+
+#include <sal/config.h>
+
+#include <algorithm>
+#include <cstdlib>
+#include <osl/diagnose.h>
+#include <uno/threadpool.h>
+#include <sal/log.hxx>
+#include <utility>
+
+#include "thread.hxx"
+#include "jobqueue.hxx"
+#include "threadpool.hxx"
+
+using namespace osl;
+using namespace rtl;
+
+namespace cppu_threadpool {
+
+
+ ThreadAdmin::ThreadAdmin(): m_disposed(false) {}
+
+ ThreadAdmin::~ThreadAdmin()
+ {
+ SAL_WARN_IF(m_deque.size(), "cppu.threadpool", m_deque.size() << "Threads left");
+ }
+
+ bool ThreadAdmin::add_locked( rtl::Reference< ORequestThread > const & p )
+ {
+ if( m_disposed )
+ {
+ return false;
+ }
+ m_deque.push_back( p );
+ return true;
+ }
+
+ void ThreadAdmin::remove_locked( rtl::Reference< ORequestThread > const & p )
+ {
+ m_deque.erase(std::find( m_deque.begin(), m_deque.end(), p ), m_deque.end());
+ }
+
+ void ThreadAdmin::remove( rtl::Reference< ORequestThread > const & p )
+ {
+ std::scoped_lock aGuard( m_mutex );
+ remove_locked( p );
+ }
+
+ void ThreadAdmin::join()
+ {
+ {
+ std::scoped_lock aGuard( m_mutex );
+ m_disposed = true;
+ }
+ for (;;)
+ {
+ rtl::Reference< ORequestThread > pCurrent;
+ {
+ std::scoped_lock aGuard( m_mutex );
+ if( m_deque.empty() )
+ {
+ break;
+ }
+ pCurrent = m_deque.front();
+ m_deque.pop_front();
+ }
+ if (pCurrent->getIdentifier()
+ != osl::Thread::getCurrentIdentifier())
+ {
+ pCurrent->join();
+ }
+ }
+ }
+
+
+ ORequestThread::ORequestThread( ThreadPoolHolder aThreadPool,
+ JobQueue *pQueue,
+ ByteSequence aThreadId,
+ bool bAsynchron )
+ : m_aThreadPool(std::move( aThreadPool ))
+ , m_pQueue( pQueue )
+ , m_aThreadId(std::move( aThreadId ))
+ , m_bAsynchron( bAsynchron )
+ {}
+
+ ORequestThread::~ORequestThread() {}
+
+ void ORequestThread::setTask( JobQueue *pQueue,
+ const ByteSequence &aThreadId,
+ bool bAsynchron )
+ {
+ m_pQueue = pQueue;
+ m_aThreadId = aThreadId;
+ m_bAsynchron = bAsynchron;
+ }
+
+ bool ORequestThread::launch()
+ {
+ // Assumption is that osl::Thread::create returns normally with a true
+ // return value iff it causes osl::Thread::run to start executing:
+ acquire();
+ ThreadAdmin & rThreadAdmin = m_aThreadPool->getThreadAdmin();
+ std::unique_lock g(rThreadAdmin.m_mutex);
+ if (!rThreadAdmin.add_locked( this )) {
+ return false;
+ }
+ try {
+ if (!create()) {
+ std::abort();
+ }
+ } catch (...) {
+ rThreadAdmin.remove_locked( this );
+ g.release();
+ release();
+ throw;
+ }
+ return true;
+ }
+
+ void ORequestThread::onTerminated()
+ {
+ m_aThreadPool->getThreadAdmin().remove( this );
+ release();
+ }
+
+ void ORequestThread::run()
+ {
+ osl_setThreadName("cppu_threadpool::ORequestThread");
+
+ try
+ {
+ while ( m_pQueue )
+ {
+ if( ! m_bAsynchron )
+ {
+ if ( !uno_bindIdToCurrentThread( m_aThreadId.getHandle() ) )
+ {
+ OSL_ASSERT( false );
+ }
+ }
+
+ while( ! m_pQueue->isEmpty() )
+ {
+ // Note : Oneways should not get a disposable disposeid,
+ // It does not make sense to dispose a call in this state.
+ // That's way we put it a disposeid, that can't be used otherwise.
+ m_pQueue->enter(
+ this,
+ true );
+
+ if( m_pQueue->isEmpty() )
+ {
+ m_aThreadPool->revokeQueue( m_aThreadId , m_bAsynchron );
+ // Note : revokeQueue might have failed because m_pQueue.isEmpty()
+ // may be false (race).
+ }
+ }
+
+ delete m_pQueue;
+ m_pQueue = nullptr;
+
+ if( ! m_bAsynchron )
+ {
+ uno_releaseIdFromCurrentThread();
+ }
+
+ m_aThreadPool->waitInPool( this );
+ }
+ }
+ catch (...)
+ {
+ // Work around the problem that onTerminated is not called if run
+ // throws an exception:
+ onTerminated();
+ throw;
+ }
+ }
+}
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/cppu/source/threadpool/thread.hxx b/cppu/source/threadpool/thread.hxx
new file mode 100644
index 000000000..5d03de88e
--- /dev/null
+++ b/cppu/source/threadpool/thread.hxx
@@ -0,0 +1,67 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * This file incorporates work covered by the following license notice:
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding copyright
+ * ownership. The ASF licenses this file to you under the Apache
+ * License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of
+ * the License at http://www.apache.org/licenses/LICENSE-2.0 .
+ */
+#pragma once
+
+#include <osl/thread.hxx>
+#include <sal/types.h>
+#include <salhelper/simplereferenceobject.hxx>
+
+#include "threadpool.hxx"
+
+namespace cppu_threadpool {
+
+ class JobQueue;
+
+
+ // private thread class for the threadpool
+
+ class ORequestThread:
+ public salhelper::SimpleReferenceObject, public osl::Thread
+ {
+ public:
+ ORequestThread( ThreadPoolHolder aThreadPool,
+ JobQueue * ,
+ ::rtl::ByteSequence aThreadId,
+ bool bAsynchron );
+ virtual ~ORequestThread() override;
+
+ void setTask( JobQueue * , const ::rtl::ByteSequence & aThreadId , bool bAsynchron );
+
+ bool launch();
+
+ static void * operator new(std::size_t size)
+ { return SimpleReferenceObject::operator new(size); }
+
+ static void operator delete(void * pointer)
+ { SimpleReferenceObject::operator delete(pointer); }
+
+ private:
+ virtual void SAL_CALL run() override;
+ virtual void SAL_CALL onTerminated() override;
+
+ ThreadPoolHolder m_aThreadPool;
+ JobQueue *m_pQueue;
+ ::rtl::ByteSequence m_aThreadId;
+ bool m_bAsynchron;
+ };
+
+} // end cppu_threadpool
+
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/cppu/source/threadpool/threadident.cxx b/cppu/source/threadpool/threadident.cxx
new file mode 100644
index 000000000..16b7c0b27
--- /dev/null
+++ b/cppu/source/threadpool/threadident.cxx
@@ -0,0 +1,120 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * This file incorporates work covered by the following license notice:
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding copyright
+ * ownership. The ASF licenses this file to you under the Apache
+ * License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of
+ * the License at http://www.apache.org/licenses/LICENSE-2.0 .
+ */
+
+#include <osl/thread.hxx>
+#include <osl/diagnose.h>
+
+#include <rtl/process.h>
+#include <rtl/byteseq.hxx>
+
+#include <uno/threadpool.h>
+
+#include "current.hxx"
+
+using namespace ::std;
+using namespace ::osl;
+using namespace ::cppu;
+
+static void createLocalId( sal_Sequence **ppThreadId )
+{
+ rtl_byte_sequence_constructNoDefault( ppThreadId , 4 + 16 );
+ sal_uInt32 id = osl::Thread::getCurrentIdentifier();
+ (*ppThreadId)->elements[0] = id & 0xFF;
+ (*ppThreadId)->elements[1] = (id >> 8) & 0xFF;
+ (*ppThreadId)->elements[2] = (id >> 16) & 0xFF;
+ (*ppThreadId)->elements[3] = (id >> 24) & 0xFF;
+ rtl_getGlobalProcessId( reinterpret_cast<sal_uInt8 *>(&(*ppThreadId)->elements[4]) );
+}
+
+extern "C" void SAL_CALL
+uno_getIdOfCurrentThread( sal_Sequence **ppThreadId )
+ SAL_THROW_EXTERN_C()
+{
+ IdContainer& id = getIdContainer();
+ if (!id.bInit)
+ {
+ // first time, that the thread enters the bridge
+ createLocalId( ppThreadId );
+
+ // TODO
+ // note : this is a leak !
+ id.pLocalThreadId = *ppThreadId;
+ id.pCurrentId = *ppThreadId;
+ id.nRefCountOfCurrentId = 1;
+ rtl_byte_sequence_acquire( id.pLocalThreadId );
+ rtl_byte_sequence_acquire( id.pCurrentId );
+ id.bInit = true;
+ }
+ else
+ {
+ id.nRefCountOfCurrentId ++;
+ if( *ppThreadId )
+ {
+ rtl_byte_sequence_release( *ppThreadId );
+ }
+ *ppThreadId = id.pCurrentId;
+ rtl_byte_sequence_acquire( *ppThreadId );
+ }
+}
+
+extern "C" void SAL_CALL uno_releaseIdFromCurrentThread()
+ SAL_THROW_EXTERN_C()
+{
+ IdContainer& id = getIdContainer();
+ OSL_ASSERT( id.bInit );
+ OSL_ASSERT( id.nRefCountOfCurrentId );
+
+ id.nRefCountOfCurrentId --;
+ if( ! id.nRefCountOfCurrentId && (id.pLocalThreadId != id.pCurrentId) )
+ {
+ rtl_byte_sequence_assign( &(id.pCurrentId) , id.pLocalThreadId );
+ }
+}
+
+extern "C" sal_Bool SAL_CALL uno_bindIdToCurrentThread( sal_Sequence *pThreadId )
+ SAL_THROW_EXTERN_C()
+{
+ IdContainer& id = getIdContainer();
+ if (!id.bInit)
+ {
+ id.pLocalThreadId = nullptr;
+ createLocalId( &(id.pLocalThreadId) );
+ id.nRefCountOfCurrentId = 1;
+ id.pCurrentId = pThreadId;
+ rtl_byte_sequence_acquire(id.pCurrentId);
+ id.bInit = true;
+ }
+ else
+ {
+ OSL_ASSERT( 0 == id.nRefCountOfCurrentId );
+ if( 0 == id.nRefCountOfCurrentId )
+ {
+ rtl_byte_sequence_assign(&( id.pCurrentId ), pThreadId );
+ id.nRefCountOfCurrentId ++;
+ }
+ else
+ {
+ return false;
+ }
+
+ }
+ return true;
+}
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/cppu/source/threadpool/threadpool.cxx b/cppu/source/threadpool/threadpool.cxx
new file mode 100644
index 000000000..897bebed9
--- /dev/null
+++ b/cppu/source/threadpool/threadpool.cxx
@@ -0,0 +1,476 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * This file incorporates work covered by the following license notice:
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding copyright
+ * ownership. The ASF licenses this file to you under the Apache
+ * License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of
+ * the License at http://www.apache.org/licenses/LICENSE-2.0 .
+ */
+
+#include <sal/config.h>
+
+#include <cassert>
+#include <chrono>
+#include <algorithm>
+#include <utility>
+#include <unordered_map>
+
+#include <osl/diagnose.h>
+#include <sal/log.hxx>
+
+#include <uno/threadpool.h>
+
+#include "threadpool.hxx"
+#include "thread.hxx"
+
+using namespace ::std;
+using namespace ::osl;
+using namespace ::rtl;
+
+namespace cppu_threadpool
+{
+ WaitingThread::WaitingThread(
+ rtl::Reference<ORequestThread> theThread): thread(std::move(theThread))
+ {}
+
+ DisposedCallerAdminHolder const & DisposedCallerAdmin::getInstance()
+ {
+ static DisposedCallerAdminHolder theDisposedCallerAdmin = std::make_shared<DisposedCallerAdmin>();
+ return theDisposedCallerAdmin;
+ }
+
+ DisposedCallerAdmin::~DisposedCallerAdmin()
+ {
+ SAL_WARN_IF( !m_vector.empty(), "cppu.threadpool", "DisposedCallerList : " << m_vector.size() << " left");
+ }
+
+ void DisposedCallerAdmin::dispose( void const * nDisposeId )
+ {
+ std::scoped_lock guard( m_mutex );
+ m_vector.push_back( nDisposeId );
+ }
+
+ void DisposedCallerAdmin::destroy( void const * nDisposeId )
+ {
+ std::scoped_lock guard( m_mutex );
+ m_vector.erase(std::remove(m_vector.begin(), m_vector.end(), nDisposeId), m_vector.end());
+ }
+
+ bool DisposedCallerAdmin::isDisposed( void const * nDisposeId )
+ {
+ std::scoped_lock guard( m_mutex );
+ return (std::find(m_vector.begin(), m_vector.end(), nDisposeId) != m_vector.end());
+ }
+
+
+ ThreadPool::ThreadPool() :
+ m_DisposedCallerAdmin( DisposedCallerAdmin::getInstance() )
+ {
+ }
+
+ ThreadPool::~ThreadPool()
+ {
+ SAL_WARN_IF( m_mapQueue.size(), "cppu.threadpool", "ThreadIdHashMap: " << m_mapQueue.size() << " left");
+ }
+
+ void ThreadPool::dispose( void const * nDisposeId )
+ {
+ m_DisposedCallerAdmin->dispose( nDisposeId );
+
+ std::scoped_lock guard( m_mutex );
+ for (auto const& item : m_mapQueue)
+ {
+ if( item.second.first )
+ {
+ item.second.first->dispose( nDisposeId );
+ }
+ if( item.second.second )
+ {
+ item.second.second->dispose( nDisposeId );
+ }
+ }
+ }
+
+ void ThreadPool::destroy( void const * nDisposeId )
+ {
+ m_DisposedCallerAdmin->destroy( nDisposeId );
+ }
+
+ /******************
+ * This methods lets the thread wait a certain amount of time. If within this timespan
+ * a new request comes in, this thread is reused. This is done only to improve performance,
+ * it is not required for threadpool functionality.
+ ******************/
+ void ThreadPool::waitInPool( rtl::Reference< ORequestThread > const & pThread )
+ {
+ WaitingThread waitingThread(pThread);
+ {
+ std::scoped_lock guard( m_mutexWaitingThreadList );
+ m_dequeThreads.push_front( &waitingThread );
+ }
+
+ // let the thread wait 2 seconds
+ waitingThread.condition.wait( std::chrono::seconds(2) );
+
+ {
+ std::scoped_lock guard ( m_mutexWaitingThreadList );
+ if( waitingThread.thread.is() )
+ {
+ // thread wasn't reused, remove it from the list
+ WaitingThreadDeque::iterator ii = find(
+ m_dequeThreads.begin(), m_dequeThreads.end(), &waitingThread );
+ OSL_ASSERT( ii != m_dequeThreads.end() );
+ m_dequeThreads.erase( ii );
+ }
+ }
+ }
+
+ void ThreadPool::joinWorkers()
+ {
+ {
+ std::scoped_lock guard( m_mutexWaitingThreadList );
+ for (auto const& thread : m_dequeThreads)
+ {
+ // wake the threads up
+ thread->condition.set();
+ }
+ }
+ m_aThreadAdmin.join();
+ }
+
+ bool ThreadPool::createThread( JobQueue *pQueue ,
+ const ByteSequence &aThreadId,
+ bool bAsynchron )
+ {
+ {
+ // Can a thread be reused ?
+ std::scoped_lock guard( m_mutexWaitingThreadList );
+ if( ! m_dequeThreads.empty() )
+ {
+ // inform the thread and let it go
+ struct WaitingThread *pWaitingThread = m_dequeThreads.back();
+ pWaitingThread->thread->setTask( pQueue , aThreadId , bAsynchron );
+ pWaitingThread->thread = nullptr;
+
+ // remove from list
+ m_dequeThreads.pop_back();
+
+ // let the thread go
+ pWaitingThread->condition.set();
+ return true;
+ }
+ }
+
+ rtl::Reference pThread(
+ new ORequestThread( this, pQueue , aThreadId, bAsynchron) );
+ return pThread->launch();
+ }
+
+ bool ThreadPool::revokeQueue( const ByteSequence &aThreadId, bool bAsynchron )
+ {
+ std::scoped_lock guard( m_mutex );
+
+ ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
+ OSL_ASSERT( ii != m_mapQueue.end() );
+
+ if( bAsynchron )
+ {
+ if( ! (*ii).second.second->isEmpty() )
+ {
+ // another thread has put something into the queue
+ return false;
+ }
+
+ (*ii).second.second = nullptr;
+ if( (*ii).second.first )
+ {
+ // all oneway request have been processed, now
+ // synchronous requests may go on
+ (*ii).second.first->resume();
+ }
+ }
+ else
+ {
+ if( ! (*ii).second.first->isEmpty() )
+ {
+ // another thread has put something into the queue
+ return false;
+ }
+ (*ii).second.first = nullptr;
+ }
+
+ if( nullptr == (*ii).second.first && nullptr == (*ii).second.second )
+ {
+ m_mapQueue.erase( ii );
+ }
+
+ return true;
+ }
+
+
+ bool ThreadPool::addJob(
+ const ByteSequence &aThreadId ,
+ bool bAsynchron,
+ void *pThreadSpecificData,
+ RequestFun * doRequest,
+ void const * disposeId )
+ {
+ bool bCreateThread = false;
+ JobQueue *pQueue = nullptr;
+ {
+ std::scoped_lock guard( m_mutex );
+ if (m_DisposedCallerAdmin->isDisposed(disposeId)) {
+ return true;
+ }
+
+ ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
+
+ if( ii == m_mapQueue.end() )
+ {
+ m_mapQueue[ aThreadId ] = pair < JobQueue * , JobQueue * > ( nullptr , nullptr );
+ ii = m_mapQueue.find( aThreadId );
+ OSL_ASSERT( ii != m_mapQueue.end() );
+ }
+
+ if( bAsynchron )
+ {
+ if( ! (*ii).second.second )
+ {
+ (*ii).second.second = new JobQueue();
+ bCreateThread = true;
+ }
+ pQueue = (*ii).second.second;
+ }
+ else
+ {
+ if( ! (*ii).second.first )
+ {
+ (*ii).second.first = new JobQueue();
+ bCreateThread = true;
+ }
+ pQueue = (*ii).second.first;
+
+ if( (*ii).second.second && ( (*ii).second.second->isBusy() ) )
+ {
+ pQueue->suspend();
+ }
+ }
+ pQueue->add( pThreadSpecificData , doRequest );
+ }
+
+ return !bCreateThread || createThread( pQueue , aThreadId , bAsynchron);
+ }
+
+ void ThreadPool::prepare( const ByteSequence &aThreadId )
+ {
+ std::scoped_lock guard( m_mutex );
+
+ ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
+
+ if( ii == m_mapQueue.end() )
+ {
+ JobQueue *p = new JobQueue();
+ m_mapQueue[ aThreadId ] = pair< JobQueue * , JobQueue * > ( p , nullptr );
+ }
+ else if( nullptr == (*ii).second.first )
+ {
+ (*ii).second.first = new JobQueue();
+ }
+ }
+
+ void * ThreadPool::enter( const ByteSequence & aThreadId , void const * nDisposeId )
+ {
+ JobQueue *pQueue = nullptr;
+ {
+ std::scoped_lock guard( m_mutex );
+
+ ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
+
+ OSL_ASSERT( ii != m_mapQueue.end() );
+ pQueue = (*ii).second.first;
+ }
+
+ OSL_ASSERT( pQueue );
+ void *pReturn = pQueue->enter( nDisposeId );
+
+ if( pQueue->isCallstackEmpty() )
+ {
+ if( revokeQueue( aThreadId , false) )
+ {
+ // remove queue
+ delete pQueue;
+ }
+ }
+ return pReturn;
+ }
+}
+
+// All uno_ThreadPool handles in g_pThreadpoolHashSet with overlapping life
+// spans share one ThreadPool instance. When g_pThreadpoolHashSet becomes empty
+// (within the last uno_threadpool_destroy) all worker threads spawned by that
+// ThreadPool instance are joined (which implies that uno_threadpool_destroy
+// must never be called from a worker thread); afterwards, the next call to
+// uno_threadpool_create (if any) will lead to a new ThreadPool instance.
+
+using namespace cppu_threadpool;
+
+namespace {
+
+struct uno_ThreadPool_Equal
+{
+ bool operator () ( const uno_ThreadPool &a , const uno_ThreadPool &b ) const
+ {
+ return a == b;
+ }
+};
+
+struct uno_ThreadPool_Hash
+{
+ std::size_t operator () ( const uno_ThreadPool &a ) const
+ {
+ return reinterpret_cast<std::size_t>( a );
+ }
+};
+
+}
+
+typedef std::unordered_map< uno_ThreadPool, ThreadPoolHolder, uno_ThreadPool_Hash, uno_ThreadPool_Equal > ThreadpoolHashSet;
+
+static ThreadpoolHashSet *g_pThreadpoolHashSet;
+
+struct _uno_ThreadPool
+{
+ sal_Int32 dummy;
+};
+
+namespace {
+
+ThreadPoolHolder getThreadPool( uno_ThreadPool hPool )
+{
+ MutexGuard guard( Mutex::getGlobalMutex() );
+ assert( g_pThreadpoolHashSet != nullptr );
+ ThreadpoolHashSet::iterator i( g_pThreadpoolHashSet->find(hPool) );
+ assert( i != g_pThreadpoolHashSet->end() );
+ return i->second;
+}
+
+}
+
+extern "C" uno_ThreadPool SAL_CALL
+uno_threadpool_create() SAL_THROW_EXTERN_C()
+{
+ MutexGuard guard( Mutex::getGlobalMutex() );
+ ThreadPoolHolder p;
+ if( ! g_pThreadpoolHashSet )
+ {
+ g_pThreadpoolHashSet = new ThreadpoolHashSet;
+ p = new ThreadPool;
+ }
+ else
+ {
+ assert( !g_pThreadpoolHashSet->empty() );
+ p = g_pThreadpoolHashSet->begin()->second;
+ }
+
+ // Just ensure that the handle is unique in the process (via heap)
+ uno_ThreadPool h = new struct _uno_ThreadPool;
+ g_pThreadpoolHashSet->emplace( h, p );
+ return h;
+}
+
+extern "C" void SAL_CALL
+uno_threadpool_attach( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
+{
+ sal_Sequence *pThreadId = nullptr;
+ uno_getIdOfCurrentThread( &pThreadId );
+ getThreadPool( hPool )->prepare( pThreadId );
+ rtl_byte_sequence_release( pThreadId );
+ uno_releaseIdFromCurrentThread();
+}
+
+extern "C" void SAL_CALL
+uno_threadpool_enter( uno_ThreadPool hPool , void **ppJob )
+ SAL_THROW_EXTERN_C()
+{
+ sal_Sequence *pThreadId = nullptr;
+ uno_getIdOfCurrentThread( &pThreadId );
+ *ppJob =
+ getThreadPool( hPool )->enter(
+ pThreadId,
+ hPool );
+ rtl_byte_sequence_release( pThreadId );
+ uno_releaseIdFromCurrentThread();
+}
+
+extern "C" void SAL_CALL
+uno_threadpool_detach(SAL_UNUSED_PARAMETER uno_ThreadPool) SAL_THROW_EXTERN_C()
+{
+ // we might do here some tidying up in case a thread called attach but never detach
+}
+
+extern "C" void SAL_CALL
+uno_threadpool_putJob(
+ uno_ThreadPool hPool,
+ sal_Sequence *pThreadId,
+ void *pJob,
+ void ( SAL_CALL * doRequest ) ( void *pThreadSpecificData ),
+ sal_Bool bIsOneway ) SAL_THROW_EXTERN_C()
+{
+ if (!getThreadPool(hPool)->addJob( pThreadId, bIsOneway, pJob ,doRequest, hPool ))
+ {
+ SAL_WARN(
+ "cppu.threadpool",
+ "uno_threadpool_putJob in parallel with uno_threadpool_destroy");
+ }
+}
+
+extern "C" void SAL_CALL
+uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
+{
+ getThreadPool(hPool)->dispose(
+ hPool );
+}
+
+extern "C" void SAL_CALL
+uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
+{
+ ThreadPoolHolder p( getThreadPool(hPool) );
+ p->destroy(
+ hPool );
+
+ bool empty;
+ {
+ OSL_ASSERT( g_pThreadpoolHashSet );
+
+ MutexGuard guard( Mutex::getGlobalMutex() );
+
+ ThreadpoolHashSet::iterator ii = g_pThreadpoolHashSet->find( hPool );
+ OSL_ASSERT( ii != g_pThreadpoolHashSet->end() );
+ g_pThreadpoolHashSet->erase( ii );
+ delete hPool;
+
+ empty = g_pThreadpoolHashSet->empty();
+ if( empty )
+ {
+ delete g_pThreadpoolHashSet;
+ g_pThreadpoolHashSet = nullptr;
+ }
+ }
+
+ if( empty )
+ {
+ p->joinWorkers();
+ }
+}
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/cppu/source/threadpool/threadpool.hxx b/cppu/source/threadpool/threadpool.hxx
new file mode 100644
index 000000000..afcae7a7e
--- /dev/null
+++ b/cppu/source/threadpool/threadpool.hxx
@@ -0,0 +1,162 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * This file incorporates work covered by the following license notice:
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding copyright
+ * ownership. The ASF licenses this file to you under the Apache
+ * License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of
+ * the License at http://www.apache.org/licenses/LICENSE-2.0 .
+ */
+
+#pragma once
+
+#include <mutex>
+#include <vector>
+#include <unordered_map>
+
+#include <osl/conditn.hxx>
+#include <osl/mutex.hxx>
+#include <rtl/byteseq.hxx>
+#include <rtl/ref.hxx>
+#include <salhelper/simplereferenceobject.hxx>
+
+#include "jobqueue.hxx"
+
+
+namespace cppu_threadpool {
+ class ORequestThread;
+
+ struct EqualThreadId
+ {
+ bool operator () ( const ::rtl::ByteSequence &a , const ::rtl::ByteSequence &b ) const
+ {
+ return a == b;
+ }
+ };
+
+ struct HashThreadId
+ {
+ sal_Int32 operator () ( const ::rtl::ByteSequence &a ) const
+ {
+ if( a.getLength() >= 4 )
+ {
+ return *reinterpret_cast<sal_Int32 const *>(a.getConstArray());
+ }
+ return 0;
+ }
+ };
+
+ typedef std::unordered_map
+ <
+ ::rtl::ByteSequence, // ThreadID
+ std::pair < JobQueue * , JobQueue * >,
+ HashThreadId,
+ EqualThreadId
+ > ThreadIdHashMap;
+
+ struct WaitingThread
+ {
+ osl::Condition condition;
+ rtl::Reference< ORequestThread > thread;
+
+ explicit WaitingThread(
+ rtl::Reference<ORequestThread> theThread);
+ };
+
+ typedef std::deque< struct ::cppu_threadpool::WaitingThread * > WaitingThreadDeque;
+
+ class DisposedCallerAdmin;
+ typedef std::shared_ptr<DisposedCallerAdmin> DisposedCallerAdminHolder;
+
+ class DisposedCallerAdmin
+ {
+ public:
+ ~DisposedCallerAdmin();
+
+ static DisposedCallerAdminHolder const & getInstance();
+
+ void dispose( void const * nDisposeId );
+ void destroy( void const * nDisposeId );
+ bool isDisposed( void const * nDisposeId );
+
+ private:
+ std::mutex m_mutex;
+ std::vector< void const * > m_vector;
+ };
+
+ class ThreadAdmin
+ {
+ public:
+ ThreadAdmin();
+ ~ThreadAdmin ();
+
+ void remove( rtl::Reference< ORequestThread > const & );
+ void join();
+
+ bool add_locked( rtl::Reference< ORequestThread > const & );
+ void remove_locked( rtl::Reference< ORequestThread > const & );
+ std::mutex m_mutex;
+
+ private:
+ std::deque< rtl::Reference< ORequestThread > > m_deque;
+ bool m_disposed;
+ };
+
+ class ThreadPool;
+ typedef rtl::Reference<ThreadPool> ThreadPoolHolder;
+
+ class ThreadPool: public salhelper::SimpleReferenceObject
+ {
+ public:
+ ThreadPool();
+ virtual ~ThreadPool() override;
+
+ void dispose( void const * nDisposeId );
+ void destroy( void const * nDisposeId );
+
+ bool addJob( const ::rtl::ByteSequence &aThreadId,
+ bool bAsynchron,
+ void *pThreadSpecificData,
+ RequestFun * doRequest,
+ void const * disposeId );
+
+ void prepare( const ::rtl::ByteSequence &aThreadId );
+ void * enter( const ::rtl::ByteSequence &aThreadId, void const * nDisposeId );
+
+ /********
+ * @return true, if queue could be successfully revoked.
+ ********/
+ bool revokeQueue( const ::rtl::ByteSequence & aThreadId , bool bAsynchron );
+
+ void waitInPool( rtl::Reference< ORequestThread > const & pThread );
+
+ void joinWorkers();
+
+ ThreadAdmin & getThreadAdmin() { return m_aThreadAdmin; }
+
+ private:
+ bool createThread( JobQueue *pQueue, const ::rtl::ByteSequence &aThreadId, bool bAsynchron);
+
+
+ ThreadIdHashMap m_mapQueue;
+ std::mutex m_mutex;
+
+ std::mutex m_mutexWaitingThreadList;
+ WaitingThreadDeque m_dequeThreads;
+
+ DisposedCallerAdminHolder m_DisposedCallerAdmin;
+ ThreadAdmin m_aThreadAdmin;
+ };
+
+} // end namespace cppu_threadpool
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */