diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-15 05:54:39 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-15 05:54:39 +0000 |
commit | 267c6f2ac71f92999e969232431ba04678e7437e (patch) | |
tree | 358c9467650e1d0a1d7227a21dac2e3d08b622b2 /cppu/source/threadpool | |
parent | Initial commit. (diff) | |
download | libreoffice-267c6f2ac71f92999e969232431ba04678e7437e.tar.xz libreoffice-267c6f2ac71f92999e969232431ba04678e7437e.zip |
Adding upstream version 4:24.2.0.upstream/4%24.2.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'cppu/source/threadpool')
-rw-r--r-- | cppu/source/threadpool/current.cxx | 210 | ||||
-rw-r--r-- | cppu/source/threadpool/current.hxx | 47 | ||||
-rw-r--r-- | cppu/source/threadpool/jobqueue.cxx | 177 | ||||
-rw-r--r-- | cppu/source/threadpool/jobqueue.hxx | 72 | ||||
-rw-r--r-- | cppu/source/threadpool/thread.cxx | 198 | ||||
-rw-r--r-- | cppu/source/threadpool/thread.hxx | 67 | ||||
-rw-r--r-- | cppu/source/threadpool/threadident.cxx | 119 | ||||
-rw-r--r-- | cppu/source/threadpool/threadpool.cxx | 475 | ||||
-rw-r--r-- | cppu/source/threadpool/threadpool.hxx | 162 |
9 files changed, 1527 insertions, 0 deletions
diff --git a/cppu/source/threadpool/current.cxx b/cppu/source/threadpool/current.cxx new file mode 100644 index 0000000000..64e6bfb8f6 --- /dev/null +++ b/cppu/source/threadpool/current.cxx @@ -0,0 +1,210 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#include <sal/config.h> + +#include <rtl/byteseq.h> +#include <osl/mutex.hxx> + +#include <uno/current_context.h> +#include <uno/environment.hxx> +#include <uno/mapping.hxx> +#include <typelib/typedescription.h> + +#include "current.hxx" + + +using namespace ::osl; +using namespace ::rtl; +using namespace ::cppu; +using namespace ::com::sun::star::uno; + +namespace cppu +{ + +static typelib_InterfaceTypeDescription * get_type_XCurrentContext() +{ + static typelib_InterfaceTypeDescription* s_type_XCurrentContext = []() { + OUString sTypeName("com.sun.star.uno.XCurrentContext"); + typelib_InterfaceTypeDescription* pTD = nullptr; + typelib_TypeDescriptionReference* pMembers[1] = { nullptr }; + OUString sMethodName0("com.sun.star.uno.XCurrentContext::getValueByName"); + typelib_typedescriptionreference_new(&pMembers[0], typelib_TypeClass_INTERFACE_METHOD, + sMethodName0.pData); + typelib_typedescription_newInterface( + &pTD, sTypeName.pData, 0, 0, 0, 0, 0, + *typelib_static_type_getByTypeClass(typelib_TypeClass_INTERFACE), 1, pMembers); + + typelib_typedescription_register(reinterpret_cast<typelib_TypeDescription**>(&pTD)); + typelib_typedescriptionreference_release(pMembers[0]); + + typelib_InterfaceMethodTypeDescription* pMethod = nullptr; + typelib_Parameter_Init aParameters[1]; + OUString sParamName0("Name"); + OUString sParamType0("string"); + aParameters[0].pParamName = sParamName0.pData; + aParameters[0].eTypeClass = typelib_TypeClass_STRING; + aParameters[0].pTypeName = sParamType0.pData; + aParameters[0].bIn = true; + aParameters[0].bOut = false; + rtl_uString* pExceptions[1]; + OUString sExceptionName0("com.sun.star.uno.RuntimeException"); + pExceptions[0] = sExceptionName0.pData; + OUString sReturnType0("any"); + typelib_typedescription_newInterfaceMethod(&pMethod, 3, false, sMethodName0.pData, + typelib_TypeClass_ANY, sReturnType0.pData, 1, + aParameters, 1, pExceptions); + typelib_typedescription_register(reinterpret_cast<typelib_TypeDescription**>(&pMethod)); + typelib_typedescription_release(&pMethod->aBase.aBase); + // another static ref: + ++reinterpret_cast<typelib_TypeDescription*>(pTD)->nStaticRefCount; + return pTD; + }(); + + return s_type_XCurrentContext; +} + +IdContainer::IdContainer() + : pCurrentContext(nullptr) + , pCurrentContextEnv(nullptr) + , pLocalThreadId(nullptr) + , pCurrentId(nullptr) + , nRefCountOfCurrentId(0) + , bInit(false) +{ +} + +IdContainer::~IdContainer() +{ + if (pCurrentContext) + { + (*pCurrentContextEnv->releaseInterface)( + pCurrentContextEnv, pCurrentContext ); + (*pCurrentContextEnv->aBase.release)( + &pCurrentContextEnv->aBase ); + } + if (bInit) + { + ::rtl_byte_sequence_release( pLocalThreadId ); + ::rtl_byte_sequence_release( pCurrentId ); + } +} + +IdContainer& getIdContainer() +{ + static thread_local IdContainer aId; + return aId; +} + +} + +extern "C" sal_Bool SAL_CALL uno_setCurrentContext( + void * pCurrentContext, + rtl_uString * pEnvTypeName, void * pEnvContext ) + SAL_THROW_EXTERN_C() +{ + IdContainer& id = getIdContainer(); + + // free old one + if (id.pCurrentContext) + { + (*id.pCurrentContextEnv->releaseInterface)( + id.pCurrentContextEnv, id.pCurrentContext ); + (*id.pCurrentContextEnv->aBase.release)( + &id.pCurrentContextEnv->aBase ); + id.pCurrentContextEnv = nullptr; + + id.pCurrentContext = nullptr; + } + + if (!pCurrentContext) + return true; + + uno_Environment * pEnv = nullptr; + ::uno_getEnvironment( &pEnv, pEnvTypeName, pEnvContext ); + OSL_ASSERT( pEnv && pEnv->pExtEnv ); + if (pEnv) + { + if (pEnv->pExtEnv) + { + id.pCurrentContextEnv = pEnv->pExtEnv; + (*id.pCurrentContextEnv->acquireInterface)( + id.pCurrentContextEnv, pCurrentContext ); + id.pCurrentContext = pCurrentContext; + } + else + { + (*pEnv->release)( pEnv ); + return false; + } + } + else + { + return false; + } + return true; +} + +extern "C" sal_Bool SAL_CALL uno_getCurrentContext( + void ** ppCurrentContext, rtl_uString * pEnvTypeName, void * pEnvContext ) + SAL_THROW_EXTERN_C() +{ + IdContainer& id = getIdContainer(); + + Environment target_env; + + // release inout parameter + if (*ppCurrentContext) + { + target_env = Environment(OUString(pEnvTypeName), pEnvContext); + OSL_ASSERT( target_env.is() ); + if (! target_env.is()) + return false; + uno_ExtEnvironment * pEnv = target_env.get()->pExtEnv; + OSL_ASSERT( nullptr != pEnv ); + if (nullptr == pEnv) + return false; + (*pEnv->releaseInterface)( pEnv, *ppCurrentContext ); + + *ppCurrentContext = nullptr; + } + + // case: null-ref + if (nullptr == id.pCurrentContext) + return true; + + if (! target_env.is()) + { + target_env = Environment(OUString(pEnvTypeName), pEnvContext); + OSL_ASSERT( target_env.is() ); + if (! target_env.is()) + return false; + } + + Mapping mapping(&id.pCurrentContextEnv->aBase, target_env.get()); + OSL_ASSERT( mapping.is() ); + if (! mapping.is()) + return false; + + mapping.mapInterface(ppCurrentContext, id.pCurrentContext, ::cppu::get_type_XCurrentContext()); + + return true; +} + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/cppu/source/threadpool/current.hxx b/cppu/source/threadpool/current.hxx new file mode 100644 index 0000000000..1f6ce66427 --- /dev/null +++ b/cppu/source/threadpool/current.hxx @@ -0,0 +1,47 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#pragma once + +#include <sal/config.h> + +#include <sal/types.h> + +struct _uno_ExtEnvironment; + +namespace cppu +{ +struct IdContainer +{ + void * pCurrentContext; + _uno_ExtEnvironment * pCurrentContextEnv; + + sal_Sequence * pLocalThreadId; + sal_Sequence * pCurrentId; + sal_Int32 nRefCountOfCurrentId; + bool bInit; + + IdContainer(); + ~IdContainer(); +}; + +IdContainer& getIdContainer(); +} + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/cppu/source/threadpool/jobqueue.cxx b/cppu/source/threadpool/jobqueue.cxx new file mode 100644 index 0000000000..1be424024d --- /dev/null +++ b/cppu/source/threadpool/jobqueue.cxx @@ -0,0 +1,177 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#include <sal/config.h> + +#include <cassert> + +#include "jobqueue.hxx" +#include "threadpool.hxx" + +namespace cppu_threadpool { + + JobQueue::JobQueue() : + m_nToDo( 0 ), + m_bSuspended( false ), + m_DisposedCallerAdmin( DisposedCallerAdmin::getInstance() ) + { + } + + void JobQueue::add( void *pThreadSpecificData, RequestFun * doRequest ) + { + std::scoped_lock guard( m_mutex ); + Job job = { pThreadSpecificData , doRequest }; + m_lstJob.push_back( job ); + if( ! m_bSuspended ) + { + m_cndWait.notify_all(); + } + m_nToDo ++; + } + + void *JobQueue::enter( void const * nDisposeId , bool bReturnWhenNoJob ) + { + void *pReturn = nullptr; + { + // synchronize with the dispose calls + std::scoped_lock guard( m_mutex ); + if( m_DisposedCallerAdmin->isDisposed( nDisposeId ) ) + { + return nullptr; + } + m_lstCallstack.push_front( nDisposeId ); + } + + + while( true ) + { + struct Job job={nullptr,nullptr}; + { + std::unique_lock guard( m_mutex ); + + while (m_bSuspended + || (m_lstCallstack.front() != nullptr && !bReturnWhenNoJob + && m_lstJob.empty())) + { + m_cndWait.wait(guard); + } + + if( nullptr == m_lstCallstack.front() ) + { + // disposed ! + if (!m_lstJob.empty() && m_lstJob.front().doRequest == nullptr) { + // If this thread was waiting for a remote response, that response may or + // may not have been enqueued; if it has not been enqueued, there cannot be + // another enqueued response, so it is always correct to remove any enqueued + // response here: + m_lstJob.pop_front(); + } + break; + } + + if( m_lstJob.empty() ) + { + assert(bReturnWhenNoJob); + break; + } + + job = m_lstJob.front(); + m_lstJob.pop_front(); + } + + if( job.doRequest ) + { + job.doRequest( job.pThreadSpecificData ); + std::scoped_lock guard( m_mutex ); + m_nToDo --; + } + else + { + pReturn = job.pThreadSpecificData; + std::scoped_lock guard( m_mutex ); + m_nToDo --; + break; + } + } + + { + // synchronize with the dispose calls + std::scoped_lock guard( m_mutex ); + m_lstCallstack.pop_front(); + } + + return pReturn; + } + + void JobQueue::dispose( void const * nDisposeId ) + { + std::scoped_lock guard( m_mutex ); + for( auto& rId : m_lstCallstack ) + { + if( rId == nDisposeId ) + { + rId = nullptr; + } + } + + if( !m_lstCallstack.empty() && ! m_lstCallstack.front() ) + { + // The thread is waiting for a disposed pCallerId, let it go + m_cndWait.notify_all(); + } + } + + void JobQueue::suspend() + { + std::scoped_lock guard( m_mutex ); + m_bSuspended = true; + } + + void JobQueue::resume() + { + std::scoped_lock guard( m_mutex ); + m_bSuspended = false; + if( ! m_lstJob.empty() ) + { + m_cndWait.notify_all(); + } + } + + bool JobQueue::isEmpty() const + { + std::scoped_lock guard( m_mutex ); + return m_lstJob.empty(); + } + + bool JobQueue::isCallstackEmpty() const + { + std::scoped_lock guard( m_mutex ); + return m_lstCallstack.empty(); + } + + bool JobQueue::isBusy() const + { + std::scoped_lock guard( m_mutex ); + return m_nToDo > 0; + } + + +} + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/cppu/source/threadpool/jobqueue.hxx b/cppu/source/threadpool/jobqueue.hxx new file mode 100644 index 0000000000..5b92f2476e --- /dev/null +++ b/cppu/source/threadpool/jobqueue.hxx @@ -0,0 +1,72 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#pragma once + +#include <sal/config.h> + +#include <condition_variable> +#include <deque> +#include <memory> +#include <mutex> + +#include <sal/types.h> + +namespace cppu_threadpool +{ + extern "C" typedef void (RequestFun)(void *); + + struct Job + { + void *pThreadSpecificData; + RequestFun * doRequest; + }; + + class DisposedCallerAdmin; + typedef std::shared_ptr<DisposedCallerAdmin> DisposedCallerAdminHolder; + + class JobQueue + { + public: + JobQueue(); + + void add( void *pThreadSpecificData, RequestFun * doRequest ); + + void *enter( void const * nDisposeId , bool bReturnWhenNoJob = false ); + void dispose( void const * nDisposeId ); + + void suspend(); + void resume(); + + bool isEmpty() const; + bool isCallstackEmpty() const; + bool isBusy() const; + + private: + mutable std::mutex m_mutex; + std::deque < struct Job > m_lstJob; + std::deque<void const *> m_lstCallstack; + sal_Int32 m_nToDo; + bool m_bSuspended; + std::condition_variable m_cndWait; + DisposedCallerAdminHolder m_DisposedCallerAdmin; + }; +} + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/cppu/source/threadpool/thread.cxx b/cppu/source/threadpool/thread.cxx new file mode 100644 index 0000000000..56e52838be --- /dev/null +++ b/cppu/source/threadpool/thread.cxx @@ -0,0 +1,198 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#include <sal/config.h> + +#include <algorithm> +#include <cstdlib> +#include <osl/diagnose.h> +#include <uno/threadpool.h> +#include <sal/log.hxx> +#include <utility> + +#include "thread.hxx" +#include "jobqueue.hxx" +#include "threadpool.hxx" + +using namespace osl; +using namespace rtl; + +namespace cppu_threadpool { + + + ThreadAdmin::ThreadAdmin(): m_disposed(false) {} + + ThreadAdmin::~ThreadAdmin() + { + SAL_WARN_IF(m_deque.size(), "cppu.threadpool", m_deque.size() << "Threads left"); + } + + bool ThreadAdmin::add_locked( rtl::Reference< ORequestThread > const & p ) + { + if( m_disposed ) + { + return false; + } + m_deque.push_back( p ); + return true; + } + + void ThreadAdmin::remove_locked( rtl::Reference< ORequestThread > const & p ) + { + m_deque.erase(std::find( m_deque.begin(), m_deque.end(), p ), m_deque.end()); + } + + void ThreadAdmin::remove( rtl::Reference< ORequestThread > const & p ) + { + std::scoped_lock aGuard( m_mutex ); + remove_locked( p ); + } + + void ThreadAdmin::join() + { + { + std::scoped_lock aGuard( m_mutex ); + m_disposed = true; + } + for (;;) + { + rtl::Reference< ORequestThread > pCurrent; + { + std::scoped_lock aGuard( m_mutex ); + if( m_deque.empty() ) + { + break; + } + pCurrent = m_deque.front(); + m_deque.pop_front(); + } + if (pCurrent->getIdentifier() + != osl::Thread::getCurrentIdentifier()) + { + pCurrent->join(); + } + } + } + + + ORequestThread::ORequestThread( ThreadPoolHolder aThreadPool, + JobQueue *pQueue, + ByteSequence aThreadId, + bool bAsynchron ) + : m_aThreadPool(std::move( aThreadPool )) + , m_pQueue( pQueue ) + , m_aThreadId(std::move( aThreadId )) + , m_bAsynchron( bAsynchron ) + {} + + ORequestThread::~ORequestThread() {} + + void ORequestThread::setTask( JobQueue *pQueue, + const ByteSequence &aThreadId, + bool bAsynchron ) + { + m_pQueue = pQueue; + m_aThreadId = aThreadId; + m_bAsynchron = bAsynchron; + } + + bool ORequestThread::launch() + { + // Assumption is that osl::Thread::create returns normally with a true + // return value iff it causes osl::Thread::run to start executing: + acquire(); + ThreadAdmin & rThreadAdmin = m_aThreadPool->getThreadAdmin(); + std::unique_lock g(rThreadAdmin.m_mutex); + if (!rThreadAdmin.add_locked( this )) { + return false; + } + try { + if (!create()) { + std::abort(); + } + } catch (...) { + rThreadAdmin.remove_locked( this ); + g.release(); + release(); + throw; + } + return true; + } + + void ORequestThread::onTerminated() + { + m_aThreadPool->getThreadAdmin().remove( this ); + release(); + } + + void ORequestThread::run() + { + osl_setThreadName("cppu_threadpool::ORequestThread"); + + try + { + while ( m_pQueue ) + { + if( ! m_bAsynchron ) + { + if ( !uno_bindIdToCurrentThread( m_aThreadId.getHandle() ) ) + { + OSL_ASSERT( false ); + } + } + + while( ! m_pQueue->isEmpty() ) + { + // Note : Oneways should not get a disposable disposeid, + // It does not make sense to dispose a call in this state. + // That's way we put it a disposeid, that can't be used otherwise. + m_pQueue->enter( + this, + true ); + + if( m_pQueue->isEmpty() ) + { + m_aThreadPool->revokeQueue( m_aThreadId , m_bAsynchron ); + // Note : revokeQueue might have failed because m_pQueue.isEmpty() + // may be false (race). + } + } + + delete m_pQueue; + m_pQueue = nullptr; + + if( ! m_bAsynchron ) + { + uno_releaseIdFromCurrentThread(); + } + + m_aThreadPool->waitInPool( this ); + } + } + catch (...) + { + // Work around the problem that onTerminated is not called if run + // throws an exception: + onTerminated(); + throw; + } + } +} + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/cppu/source/threadpool/thread.hxx b/cppu/source/threadpool/thread.hxx new file mode 100644 index 0000000000..5d03de88e1 --- /dev/null +++ b/cppu/source/threadpool/thread.hxx @@ -0,0 +1,67 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ +#pragma once + +#include <osl/thread.hxx> +#include <sal/types.h> +#include <salhelper/simplereferenceobject.hxx> + +#include "threadpool.hxx" + +namespace cppu_threadpool { + + class JobQueue; + + + // private thread class for the threadpool + + class ORequestThread: + public salhelper::SimpleReferenceObject, public osl::Thread + { + public: + ORequestThread( ThreadPoolHolder aThreadPool, + JobQueue * , + ::rtl::ByteSequence aThreadId, + bool bAsynchron ); + virtual ~ORequestThread() override; + + void setTask( JobQueue * , const ::rtl::ByteSequence & aThreadId , bool bAsynchron ); + + bool launch(); + + static void * operator new(std::size_t size) + { return SimpleReferenceObject::operator new(size); } + + static void operator delete(void * pointer) + { SimpleReferenceObject::operator delete(pointer); } + + private: + virtual void SAL_CALL run() override; + virtual void SAL_CALL onTerminated() override; + + ThreadPoolHolder m_aThreadPool; + JobQueue *m_pQueue; + ::rtl::ByteSequence m_aThreadId; + bool m_bAsynchron; + }; + +} // end cppu_threadpool + + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/cppu/source/threadpool/threadident.cxx b/cppu/source/threadpool/threadident.cxx new file mode 100644 index 0000000000..f2b9511e6b --- /dev/null +++ b/cppu/source/threadpool/threadident.cxx @@ -0,0 +1,119 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#include <osl/thread.hxx> +#include <osl/diagnose.h> + +#include <rtl/process.h> +#include <rtl/byteseq.hxx> + +#include <uno/threadpool.h> + +#include "current.hxx" + +using namespace ::osl; +using namespace ::cppu; + +static void createLocalId( sal_Sequence **ppThreadId ) +{ + rtl_byte_sequence_constructNoDefault( ppThreadId , 4 + 16 ); + sal_uInt32 id = osl::Thread::getCurrentIdentifier(); + (*ppThreadId)->elements[0] = id & 0xFF; + (*ppThreadId)->elements[1] = (id >> 8) & 0xFF; + (*ppThreadId)->elements[2] = (id >> 16) & 0xFF; + (*ppThreadId)->elements[3] = (id >> 24) & 0xFF; + rtl_getGlobalProcessId( reinterpret_cast<sal_uInt8 *>(&(*ppThreadId)->elements[4]) ); +} + +extern "C" void SAL_CALL +uno_getIdOfCurrentThread( sal_Sequence **ppThreadId ) + SAL_THROW_EXTERN_C() +{ + IdContainer& id = getIdContainer(); + if (!id.bInit) + { + // first time, that the thread enters the bridge + createLocalId( ppThreadId ); + + // TODO + // note : this is a leak ! + id.pLocalThreadId = *ppThreadId; + id.pCurrentId = *ppThreadId; + id.nRefCountOfCurrentId = 1; + rtl_byte_sequence_acquire( id.pLocalThreadId ); + rtl_byte_sequence_acquire( id.pCurrentId ); + id.bInit = true; + } + else + { + id.nRefCountOfCurrentId ++; + if( *ppThreadId ) + { + rtl_byte_sequence_release( *ppThreadId ); + } + *ppThreadId = id.pCurrentId; + rtl_byte_sequence_acquire( *ppThreadId ); + } +} + +extern "C" void SAL_CALL uno_releaseIdFromCurrentThread() + SAL_THROW_EXTERN_C() +{ + IdContainer& id = getIdContainer(); + OSL_ASSERT( id.bInit ); + OSL_ASSERT( id.nRefCountOfCurrentId ); + + id.nRefCountOfCurrentId --; + if( ! id.nRefCountOfCurrentId && (id.pLocalThreadId != id.pCurrentId) ) + { + rtl_byte_sequence_assign( &(id.pCurrentId) , id.pLocalThreadId ); + } +} + +extern "C" sal_Bool SAL_CALL uno_bindIdToCurrentThread( sal_Sequence *pThreadId ) + SAL_THROW_EXTERN_C() +{ + IdContainer& id = getIdContainer(); + if (!id.bInit) + { + id.pLocalThreadId = nullptr; + createLocalId( &(id.pLocalThreadId) ); + id.nRefCountOfCurrentId = 1; + id.pCurrentId = pThreadId; + rtl_byte_sequence_acquire(id.pCurrentId); + id.bInit = true; + } + else + { + OSL_ASSERT( 0 == id.nRefCountOfCurrentId ); + if( 0 == id.nRefCountOfCurrentId ) + { + rtl_byte_sequence_assign(&( id.pCurrentId ), pThreadId ); + id.nRefCountOfCurrentId ++; + } + else + { + return false; + } + + } + return true; +} + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/cppu/source/threadpool/threadpool.cxx b/cppu/source/threadpool/threadpool.cxx new file mode 100644 index 0000000000..a74d8678d9 --- /dev/null +++ b/cppu/source/threadpool/threadpool.cxx @@ -0,0 +1,475 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#include <sal/config.h> + +#include <cassert> +#include <chrono> +#include <algorithm> +#include <utility> +#include <unordered_map> + +#include <osl/diagnose.h> +#include <sal/log.hxx> + +#include <uno/threadpool.h> + +#include "threadpool.hxx" +#include "thread.hxx" + +using namespace ::osl; +using namespace ::rtl; + +namespace cppu_threadpool +{ + WaitingThread::WaitingThread( + rtl::Reference<ORequestThread> theThread): thread(std::move(theThread)) + {} + + DisposedCallerAdminHolder const & DisposedCallerAdmin::getInstance() + { + static DisposedCallerAdminHolder theDisposedCallerAdmin = std::make_shared<DisposedCallerAdmin>(); + return theDisposedCallerAdmin; + } + + DisposedCallerAdmin::~DisposedCallerAdmin() + { + SAL_WARN_IF( !m_vector.empty(), "cppu.threadpool", "DisposedCallerList : " << m_vector.size() << " left"); + } + + void DisposedCallerAdmin::dispose( void const * nDisposeId ) + { + std::scoped_lock guard( m_mutex ); + m_vector.push_back( nDisposeId ); + } + + void DisposedCallerAdmin::destroy( void const * nDisposeId ) + { + std::scoped_lock guard( m_mutex ); + std::erase(m_vector, nDisposeId); + } + + bool DisposedCallerAdmin::isDisposed( void const * nDisposeId ) + { + std::scoped_lock guard( m_mutex ); + return (std::find(m_vector.begin(), m_vector.end(), nDisposeId) != m_vector.end()); + } + + + ThreadPool::ThreadPool() : + m_DisposedCallerAdmin( DisposedCallerAdmin::getInstance() ) + { + } + + ThreadPool::~ThreadPool() + { + SAL_WARN_IF( m_mapQueue.size(), "cppu.threadpool", "ThreadIdHashMap: " << m_mapQueue.size() << " left"); + } + + void ThreadPool::dispose( void const * nDisposeId ) + { + m_DisposedCallerAdmin->dispose( nDisposeId ); + + std::scoped_lock guard( m_mutex ); + for (auto const& item : m_mapQueue) + { + if( item.second.first ) + { + item.second.first->dispose( nDisposeId ); + } + if( item.second.second ) + { + item.second.second->dispose( nDisposeId ); + } + } + } + + void ThreadPool::destroy( void const * nDisposeId ) + { + m_DisposedCallerAdmin->destroy( nDisposeId ); + } + + /****************** + * This methods lets the thread wait a certain amount of time. If within this timespan + * a new request comes in, this thread is reused. This is done only to improve performance, + * it is not required for threadpool functionality. + ******************/ + void ThreadPool::waitInPool( rtl::Reference< ORequestThread > const & pThread ) + { + WaitingThread waitingThread(pThread); + { + std::scoped_lock guard( m_mutexWaitingThreadList ); + m_dequeThreads.push_front( &waitingThread ); + } + + // let the thread wait 2 seconds + waitingThread.condition.wait( std::chrono::seconds(2) ); + + { + std::scoped_lock guard ( m_mutexWaitingThreadList ); + if( waitingThread.thread.is() ) + { + // thread wasn't reused, remove it from the list + WaitingThreadDeque::iterator ii = find( + m_dequeThreads.begin(), m_dequeThreads.end(), &waitingThread ); + OSL_ASSERT( ii != m_dequeThreads.end() ); + m_dequeThreads.erase( ii ); + } + } + } + + void ThreadPool::joinWorkers() + { + { + std::scoped_lock guard( m_mutexWaitingThreadList ); + for (auto const& thread : m_dequeThreads) + { + // wake the threads up + thread->condition.set(); + } + } + m_aThreadAdmin.join(); + } + + bool ThreadPool::createThread( JobQueue *pQueue , + const ByteSequence &aThreadId, + bool bAsynchron ) + { + { + // Can a thread be reused ? + std::scoped_lock guard( m_mutexWaitingThreadList ); + if( ! m_dequeThreads.empty() ) + { + // inform the thread and let it go + struct WaitingThread *pWaitingThread = m_dequeThreads.back(); + pWaitingThread->thread->setTask( pQueue , aThreadId , bAsynchron ); + pWaitingThread->thread = nullptr; + + // remove from list + m_dequeThreads.pop_back(); + + // let the thread go + pWaitingThread->condition.set(); + return true; + } + } + + rtl::Reference pThread( + new ORequestThread( this, pQueue , aThreadId, bAsynchron) ); + return pThread->launch(); + } + + bool ThreadPool::revokeQueue( const ByteSequence &aThreadId, bool bAsynchron ) + { + std::scoped_lock guard( m_mutex ); + + ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId ); + OSL_ASSERT( ii != m_mapQueue.end() ); + + if( bAsynchron ) + { + if( ! (*ii).second.second->isEmpty() ) + { + // another thread has put something into the queue + return false; + } + + (*ii).second.second = nullptr; + if( (*ii).second.first ) + { + // all oneway request have been processed, now + // synchronous requests may go on + (*ii).second.first->resume(); + } + } + else + { + if( ! (*ii).second.first->isEmpty() ) + { + // another thread has put something into the queue + return false; + } + (*ii).second.first = nullptr; + } + + if( nullptr == (*ii).second.first && nullptr == (*ii).second.second ) + { + m_mapQueue.erase( ii ); + } + + return true; + } + + + bool ThreadPool::addJob( + const ByteSequence &aThreadId , + bool bAsynchron, + void *pThreadSpecificData, + RequestFun * doRequest, + void const * disposeId ) + { + bool bCreateThread = false; + JobQueue *pQueue = nullptr; + { + std::scoped_lock guard( m_mutex ); + if (m_DisposedCallerAdmin->isDisposed(disposeId)) { + return true; + } + + ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId ); + + if( ii == m_mapQueue.end() ) + { + m_mapQueue[ aThreadId ] = std::pair < JobQueue * , JobQueue * > ( nullptr , nullptr ); + ii = m_mapQueue.find( aThreadId ); + OSL_ASSERT( ii != m_mapQueue.end() ); + } + + if( bAsynchron ) + { + if( ! (*ii).second.second ) + { + (*ii).second.second = new JobQueue(); + bCreateThread = true; + } + pQueue = (*ii).second.second; + } + else + { + if( ! (*ii).second.first ) + { + (*ii).second.first = new JobQueue(); + bCreateThread = true; + } + pQueue = (*ii).second.first; + + if( (*ii).second.second && ( (*ii).second.second->isBusy() ) ) + { + pQueue->suspend(); + } + } + pQueue->add( pThreadSpecificData , doRequest ); + } + + return !bCreateThread || createThread( pQueue , aThreadId , bAsynchron); + } + + void ThreadPool::prepare( const ByteSequence &aThreadId ) + { + std::scoped_lock guard( m_mutex ); + + ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId ); + + if( ii == m_mapQueue.end() ) + { + JobQueue *p = new JobQueue(); + m_mapQueue[ aThreadId ] = std::pair< JobQueue * , JobQueue * > ( p , nullptr ); + } + else if( nullptr == (*ii).second.first ) + { + (*ii).second.first = new JobQueue(); + } + } + + void * ThreadPool::enter( const ByteSequence & aThreadId , void const * nDisposeId ) + { + JobQueue *pQueue = nullptr; + { + std::scoped_lock guard( m_mutex ); + + ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId ); + + OSL_ASSERT( ii != m_mapQueue.end() ); + pQueue = (*ii).second.first; + } + + OSL_ASSERT( pQueue ); + void *pReturn = pQueue->enter( nDisposeId ); + + if( pQueue->isCallstackEmpty() ) + { + if( revokeQueue( aThreadId , false) ) + { + // remove queue + delete pQueue; + } + } + return pReturn; + } +} + +// All uno_ThreadPool handles in g_pThreadpoolHashSet with overlapping life +// spans share one ThreadPool instance. When g_pThreadpoolHashSet becomes empty +// (within the last uno_threadpool_destroy) all worker threads spawned by that +// ThreadPool instance are joined (which implies that uno_threadpool_destroy +// must never be called from a worker thread); afterwards, the next call to +// uno_threadpool_create (if any) will lead to a new ThreadPool instance. + +using namespace cppu_threadpool; + +namespace { + +struct uno_ThreadPool_Equal +{ + bool operator () ( const uno_ThreadPool &a , const uno_ThreadPool &b ) const + { + return a == b; + } +}; + +struct uno_ThreadPool_Hash +{ + std::size_t operator () ( const uno_ThreadPool &a ) const + { + return reinterpret_cast<std::size_t>( a ); + } +}; + +} + +typedef std::unordered_map< uno_ThreadPool, ThreadPoolHolder, uno_ThreadPool_Hash, uno_ThreadPool_Equal > ThreadpoolHashSet; + +static ThreadpoolHashSet *g_pThreadpoolHashSet; + +struct _uno_ThreadPool +{ + sal_Int32 dummy; +}; + +namespace { + +ThreadPoolHolder getThreadPool( uno_ThreadPool hPool ) +{ + MutexGuard guard( Mutex::getGlobalMutex() ); + assert( g_pThreadpoolHashSet != nullptr ); + ThreadpoolHashSet::iterator i( g_pThreadpoolHashSet->find(hPool) ); + assert( i != g_pThreadpoolHashSet->end() ); + return i->second; +} + +} + +extern "C" uno_ThreadPool SAL_CALL +uno_threadpool_create() SAL_THROW_EXTERN_C() +{ + MutexGuard guard( Mutex::getGlobalMutex() ); + ThreadPoolHolder p; + if( ! g_pThreadpoolHashSet ) + { + g_pThreadpoolHashSet = new ThreadpoolHashSet; + p = new ThreadPool; + } + else + { + assert( !g_pThreadpoolHashSet->empty() ); + p = g_pThreadpoolHashSet->begin()->second; + } + + // Just ensure that the handle is unique in the process (via heap) + uno_ThreadPool h = new struct _uno_ThreadPool; + g_pThreadpoolHashSet->emplace( h, p ); + return h; +} + +extern "C" void SAL_CALL +uno_threadpool_attach( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C() +{ + sal_Sequence *pThreadId = nullptr; + uno_getIdOfCurrentThread( &pThreadId ); + getThreadPool( hPool )->prepare( pThreadId ); + rtl_byte_sequence_release( pThreadId ); + uno_releaseIdFromCurrentThread(); +} + +extern "C" void SAL_CALL +uno_threadpool_enter( uno_ThreadPool hPool , void **ppJob ) + SAL_THROW_EXTERN_C() +{ + sal_Sequence *pThreadId = nullptr; + uno_getIdOfCurrentThread( &pThreadId ); + *ppJob = + getThreadPool( hPool )->enter( + pThreadId, + hPool ); + rtl_byte_sequence_release( pThreadId ); + uno_releaseIdFromCurrentThread(); +} + +extern "C" void SAL_CALL +uno_threadpool_detach(SAL_UNUSED_PARAMETER uno_ThreadPool) SAL_THROW_EXTERN_C() +{ + // we might do here some tidying up in case a thread called attach but never detach +} + +extern "C" void SAL_CALL +uno_threadpool_putJob( + uno_ThreadPool hPool, + sal_Sequence *pThreadId, + void *pJob, + void ( SAL_CALL * doRequest ) ( void *pThreadSpecificData ), + sal_Bool bIsOneway ) SAL_THROW_EXTERN_C() +{ + if (!getThreadPool(hPool)->addJob( pThreadId, bIsOneway, pJob ,doRequest, hPool )) + { + SAL_WARN( + "cppu.threadpool", + "uno_threadpool_putJob in parallel with uno_threadpool_destroy"); + } +} + +extern "C" void SAL_CALL +uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C() +{ + getThreadPool(hPool)->dispose( + hPool ); +} + +extern "C" void SAL_CALL +uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C() +{ + ThreadPoolHolder p( getThreadPool(hPool) ); + p->destroy( + hPool ); + + bool empty; + { + OSL_ASSERT( g_pThreadpoolHashSet ); + + MutexGuard guard( Mutex::getGlobalMutex() ); + + ThreadpoolHashSet::iterator ii = g_pThreadpoolHashSet->find( hPool ); + OSL_ASSERT( ii != g_pThreadpoolHashSet->end() ); + g_pThreadpoolHashSet->erase( ii ); + delete hPool; + + empty = g_pThreadpoolHashSet->empty(); + if( empty ) + { + delete g_pThreadpoolHashSet; + g_pThreadpoolHashSet = nullptr; + } + } + + if( empty ) + { + p->joinWorkers(); + } +} + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/cppu/source/threadpool/threadpool.hxx b/cppu/source/threadpool/threadpool.hxx new file mode 100644 index 0000000000..afcae7a7e3 --- /dev/null +++ b/cppu/source/threadpool/threadpool.hxx @@ -0,0 +1,162 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#pragma once + +#include <mutex> +#include <vector> +#include <unordered_map> + +#include <osl/conditn.hxx> +#include <osl/mutex.hxx> +#include <rtl/byteseq.hxx> +#include <rtl/ref.hxx> +#include <salhelper/simplereferenceobject.hxx> + +#include "jobqueue.hxx" + + +namespace cppu_threadpool { + class ORequestThread; + + struct EqualThreadId + { + bool operator () ( const ::rtl::ByteSequence &a , const ::rtl::ByteSequence &b ) const + { + return a == b; + } + }; + + struct HashThreadId + { + sal_Int32 operator () ( const ::rtl::ByteSequence &a ) const + { + if( a.getLength() >= 4 ) + { + return *reinterpret_cast<sal_Int32 const *>(a.getConstArray()); + } + return 0; + } + }; + + typedef std::unordered_map + < + ::rtl::ByteSequence, // ThreadID + std::pair < JobQueue * , JobQueue * >, + HashThreadId, + EqualThreadId + > ThreadIdHashMap; + + struct WaitingThread + { + osl::Condition condition; + rtl::Reference< ORequestThread > thread; + + explicit WaitingThread( + rtl::Reference<ORequestThread> theThread); + }; + + typedef std::deque< struct ::cppu_threadpool::WaitingThread * > WaitingThreadDeque; + + class DisposedCallerAdmin; + typedef std::shared_ptr<DisposedCallerAdmin> DisposedCallerAdminHolder; + + class DisposedCallerAdmin + { + public: + ~DisposedCallerAdmin(); + + static DisposedCallerAdminHolder const & getInstance(); + + void dispose( void const * nDisposeId ); + void destroy( void const * nDisposeId ); + bool isDisposed( void const * nDisposeId ); + + private: + std::mutex m_mutex; + std::vector< void const * > m_vector; + }; + + class ThreadAdmin + { + public: + ThreadAdmin(); + ~ThreadAdmin (); + + void remove( rtl::Reference< ORequestThread > const & ); + void join(); + + bool add_locked( rtl::Reference< ORequestThread > const & ); + void remove_locked( rtl::Reference< ORequestThread > const & ); + std::mutex m_mutex; + + private: + std::deque< rtl::Reference< ORequestThread > > m_deque; + bool m_disposed; + }; + + class ThreadPool; + typedef rtl::Reference<ThreadPool> ThreadPoolHolder; + + class ThreadPool: public salhelper::SimpleReferenceObject + { + public: + ThreadPool(); + virtual ~ThreadPool() override; + + void dispose( void const * nDisposeId ); + void destroy( void const * nDisposeId ); + + bool addJob( const ::rtl::ByteSequence &aThreadId, + bool bAsynchron, + void *pThreadSpecificData, + RequestFun * doRequest, + void const * disposeId ); + + void prepare( const ::rtl::ByteSequence &aThreadId ); + void * enter( const ::rtl::ByteSequence &aThreadId, void const * nDisposeId ); + + /******** + * @return true, if queue could be successfully revoked. + ********/ + bool revokeQueue( const ::rtl::ByteSequence & aThreadId , bool bAsynchron ); + + void waitInPool( rtl::Reference< ORequestThread > const & pThread ); + + void joinWorkers(); + + ThreadAdmin & getThreadAdmin() { return m_aThreadAdmin; } + + private: + bool createThread( JobQueue *pQueue, const ::rtl::ByteSequence &aThreadId, bool bAsynchron); + + + ThreadIdHashMap m_mapQueue; + std::mutex m_mutex; + + std::mutex m_mutexWaitingThreadList; + WaitingThreadDeque m_dequeThreads; + + DisposedCallerAdminHolder m_DisposedCallerAdmin; + ThreadAdmin m_aThreadAdmin; + }; + +} // end namespace cppu_threadpool + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ |