474 lines
13 KiB
C++
474 lines
13 KiB
C++
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
|
|
/*
|
|
* This file is part of the LibreOffice project.
|
|
*
|
|
* This Source Code Form is subject to the terms of the Mozilla Public
|
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
*
|
|
* This file incorporates work covered by the following license notice:
|
|
*
|
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
* contributor license agreements. See the NOTICE file distributed
|
|
* with this work for additional information regarding copyright
|
|
* ownership. The ASF licenses this file to you under the Apache
|
|
* License, Version 2.0 (the "License"); you may not use this file
|
|
* except in compliance with the License. You may obtain a copy of
|
|
* the License at http://www.apache.org/licenses/LICENSE-2.0 .
|
|
*/
|
|
|
|
#include <sal/config.h>
|
|
|
|
#include <cassert>
|
|
#include <chrono>
|
|
#include <algorithm>
|
|
#include <utility>
|
|
#include <unordered_map>
|
|
|
|
#include <osl/diagnose.h>
|
|
#include <sal/log.hxx>
|
|
|
|
#include <uno/threadpool.h>
|
|
|
|
#include "threadpool.hxx"
|
|
#include "thread.hxx"
|
|
|
|
using namespace ::osl;
|
|
using namespace ::rtl;
|
|
|
|
namespace cppu_threadpool
|
|
{
|
|
WaitingThread::WaitingThread(
|
|
rtl::Reference<ORequestThread> theThread): thread(std::move(theThread))
|
|
{}
|
|
|
|
DisposedCallerAdminHolder const & DisposedCallerAdmin::getInstance()
|
|
{
|
|
static DisposedCallerAdminHolder theDisposedCallerAdmin = std::make_shared<DisposedCallerAdmin>();
|
|
return theDisposedCallerAdmin;
|
|
}
|
|
|
|
DisposedCallerAdmin::~DisposedCallerAdmin()
|
|
{
|
|
SAL_WARN_IF( !m_vector.empty(), "cppu.threadpool", "DisposedCallerList : " << m_vector.size() << " left");
|
|
}
|
|
|
|
void DisposedCallerAdmin::dispose( void const * nDisposeId )
|
|
{
|
|
std::scoped_lock guard( m_mutex );
|
|
m_vector.push_back( nDisposeId );
|
|
}
|
|
|
|
void DisposedCallerAdmin::destroy( void const * nDisposeId )
|
|
{
|
|
std::scoped_lock guard( m_mutex );
|
|
std::erase(m_vector, nDisposeId);
|
|
}
|
|
|
|
bool DisposedCallerAdmin::isDisposed( void const * nDisposeId )
|
|
{
|
|
std::scoped_lock guard( m_mutex );
|
|
return (std::find(m_vector.begin(), m_vector.end(), nDisposeId) != m_vector.end());
|
|
}
|
|
|
|
|
|
ThreadPool::ThreadPool() :
|
|
m_DisposedCallerAdmin( DisposedCallerAdmin::getInstance() )
|
|
{
|
|
}
|
|
|
|
ThreadPool::~ThreadPool()
|
|
{
|
|
SAL_WARN_IF( m_mapQueue.size(), "cppu.threadpool", "ThreadIdHashMap: " << m_mapQueue.size() << " left");
|
|
}
|
|
|
|
void ThreadPool::dispose( void const * nDisposeId )
|
|
{
|
|
m_DisposedCallerAdmin->dispose( nDisposeId );
|
|
|
|
std::scoped_lock guard( m_mutex );
|
|
for (auto const& item : m_mapQueue)
|
|
{
|
|
if( item.second.first )
|
|
{
|
|
item.second.first->dispose( nDisposeId );
|
|
}
|
|
if( item.second.second )
|
|
{
|
|
item.second.second->dispose( nDisposeId );
|
|
}
|
|
}
|
|
}
|
|
|
|
void ThreadPool::destroy( void const * nDisposeId )
|
|
{
|
|
m_DisposedCallerAdmin->destroy( nDisposeId );
|
|
}
|
|
|
|
/******************
|
|
* This method lets the thread wait a certain amount of time. If within this timespan
|
|
* a new request comes in, this thread is reused. This is done only to improve performance,
|
|
* it is not required for threadpool functionality.
|
|
******************/
|
|
void ThreadPool::waitInPool( rtl::Reference< ORequestThread > const & pThread )
|
|
{
|
|
WaitingThread waitingThread(pThread);
|
|
{
|
|
std::scoped_lock guard( m_mutexWaitingThreadList );
|
|
m_dequeThreads.push_front( &waitingThread );
|
|
}
|
|
|
|
// let the thread wait 2 seconds
|
|
waitingThread.condition.wait( std::chrono::seconds(2) );
|
|
|
|
{
|
|
std::scoped_lock guard ( m_mutexWaitingThreadList );
|
|
if( waitingThread.thread.is() )
|
|
{
|
|
// thread wasn't reused, remove it from the list
|
|
WaitingThreadDeque::iterator ii = find(
|
|
m_dequeThreads.begin(), m_dequeThreads.end(), &waitingThread );
|
|
OSL_ASSERT( ii != m_dequeThreads.end() );
|
|
m_dequeThreads.erase( ii );
|
|
}
|
|
}
|
|
}
|
|
|
|
void ThreadPool::joinWorkers()
|
|
{
|
|
{
|
|
std::scoped_lock guard( m_mutexWaitingThreadList );
|
|
for (auto const& thread : m_dequeThreads)
|
|
{
|
|
// wake the threads up
|
|
thread->condition.set();
|
|
}
|
|
}
|
|
m_aThreadAdmin.join();
|
|
}
|
|
|
|
bool ThreadPool::createThread( JobQueue *pQueue ,
|
|
const ByteSequence &aThreadId,
|
|
bool bAsynchron )
|
|
{
|
|
{
|
|
// Can a thread be reused ?
|
|
std::scoped_lock guard( m_mutexWaitingThreadList );
|
|
if( ! m_dequeThreads.empty() )
|
|
{
|
|
// inform the thread and let it go
|
|
struct WaitingThread *pWaitingThread = m_dequeThreads.back();
|
|
pWaitingThread->thread->setTask( pQueue , aThreadId , bAsynchron );
|
|
pWaitingThread->thread = nullptr;
|
|
|
|
// remove from list
|
|
m_dequeThreads.pop_back();
|
|
|
|
// let the thread go
|
|
pWaitingThread->condition.set();
|
|
return true;
|
|
}
|
|
}
|
|
|
|
rtl::Reference pThread(
|
|
new ORequestThread( this, pQueue , aThreadId, bAsynchron) );
|
|
return pThread->launch();
|
|
}
|
|
|
|
bool ThreadPool::revokeQueue( const ByteSequence &aThreadId, bool bAsynchron )
|
|
{
|
|
std::scoped_lock guard( m_mutex );
|
|
|
|
ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
|
|
OSL_ASSERT( ii != m_mapQueue.end() );
|
|
|
|
if( bAsynchron )
|
|
{
|
|
if( ! (*ii).second.second->isEmpty() )
|
|
{
|
|
// another thread has put something into the queue
|
|
return false;
|
|
}
|
|
|
|
(*ii).second.second = nullptr;
|
|
if( (*ii).second.first )
|
|
{
|
|
// all oneway request have been processed, now
|
|
// synchronous requests may go on
|
|
(*ii).second.first->resume();
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if( ! (*ii).second.first->isEmpty() )
|
|
{
|
|
// another thread has put something into the queue
|
|
return false;
|
|
}
|
|
(*ii).second.first = nullptr;
|
|
}
|
|
|
|
if( nullptr == (*ii).second.first && nullptr == (*ii).second.second )
|
|
{
|
|
m_mapQueue.erase( ii );
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
|
|
bool ThreadPool::addJob(
|
|
const ByteSequence &aThreadId ,
|
|
bool bAsynchron,
|
|
void *pThreadSpecificData,
|
|
RequestFun * doRequest,
|
|
void const * disposeId )
|
|
{
|
|
bool bCreateThread = false;
|
|
JobQueue *pQueue = nullptr;
|
|
{
|
|
std::scoped_lock guard( m_mutex );
|
|
if (m_DisposedCallerAdmin->isDisposed(disposeId)) {
|
|
return true;
|
|
}
|
|
|
|
ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
|
|
|
|
if( ii == m_mapQueue.end() )
|
|
{
|
|
m_mapQueue[ aThreadId ] = std::pair < JobQueue * , JobQueue * > ( nullptr , nullptr );
|
|
ii = m_mapQueue.find( aThreadId );
|
|
OSL_ASSERT( ii != m_mapQueue.end() );
|
|
}
|
|
|
|
if( bAsynchron )
|
|
{
|
|
if( ! (*ii).second.second )
|
|
{
|
|
(*ii).second.second = new JobQueue();
|
|
bCreateThread = true;
|
|
}
|
|
pQueue = (*ii).second.second;
|
|
}
|
|
else
|
|
{
|
|
if( ! (*ii).second.first )
|
|
{
|
|
(*ii).second.first = new JobQueue();
|
|
bCreateThread = true;
|
|
}
|
|
pQueue = (*ii).second.first;
|
|
|
|
if( (*ii).second.second && ( (*ii).second.second->isBusy() ) )
|
|
{
|
|
pQueue->suspend();
|
|
}
|
|
}
|
|
pQueue->add( pThreadSpecificData , doRequest );
|
|
}
|
|
|
|
return !bCreateThread || createThread( pQueue , aThreadId , bAsynchron);
|
|
}
|
|
|
|
void ThreadPool::prepare( const ByteSequence &aThreadId )
|
|
{
|
|
std::scoped_lock guard( m_mutex );
|
|
|
|
ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
|
|
|
|
if( ii == m_mapQueue.end() )
|
|
{
|
|
JobQueue *p = new JobQueue();
|
|
m_mapQueue[ aThreadId ] = std::pair< JobQueue * , JobQueue * > ( p , nullptr );
|
|
}
|
|
else if( nullptr == (*ii).second.first )
|
|
{
|
|
(*ii).second.first = new JobQueue();
|
|
}
|
|
}
|
|
|
|
void * ThreadPool::enter( const ByteSequence & aThreadId , void const * nDisposeId )
|
|
{
|
|
JobQueue *pQueue = nullptr;
|
|
{
|
|
std::scoped_lock guard( m_mutex );
|
|
|
|
ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
|
|
|
|
assert(ii != m_mapQueue.end());
|
|
pQueue = (*ii).second.first;
|
|
}
|
|
|
|
assert(pQueue);
|
|
void *pReturn = pQueue->enter( nDisposeId );
|
|
|
|
if( pQueue->isCallstackEmpty() )
|
|
{
|
|
if( revokeQueue( aThreadId , false) )
|
|
{
|
|
// remove queue
|
|
delete pQueue;
|
|
}
|
|
}
|
|
return pReturn;
|
|
}
|
|
}
|
|
|
|
// All uno_ThreadPool handles in g_pThreadpoolHashSet with overlapping life
|
|
// spans share one ThreadPool instance. When g_pThreadpoolHashSet becomes empty
|
|
// (within the last uno_threadpool_destroy) all worker threads spawned by that
|
|
// ThreadPool instance are joined (which implies that uno_threadpool_destroy
|
|
// must never be called from a worker thread); afterwards, the next call to
|
|
// uno_threadpool_create (if any) will lead to a new ThreadPool instance.
|
|
|
|
using namespace cppu_threadpool;
|
|
|
|
namespace {
|
|
|
|
struct uno_ThreadPool_Equal
|
|
{
|
|
bool operator () ( const uno_ThreadPool &a , const uno_ThreadPool &b ) const
|
|
{
|
|
return a == b;
|
|
}
|
|
};
|
|
|
|
struct uno_ThreadPool_Hash
|
|
{
|
|
std::size_t operator () ( const uno_ThreadPool &a ) const
|
|
{
|
|
return reinterpret_cast<std::size_t>( a );
|
|
}
|
|
};
|
|
|
|
}
|
|
|
|
typedef std::unordered_map< uno_ThreadPool, ThreadPoolHolder, uno_ThreadPool_Hash, uno_ThreadPool_Equal > ThreadpoolHashSet;
|
|
|
|
static ThreadpoolHashSet *g_pThreadpoolHashSet;
|
|
|
|
struct _uno_ThreadPool
|
|
{
|
|
sal_Int32 dummy;
|
|
};
|
|
|
|
namespace {
|
|
|
|
ThreadPoolHolder getThreadPool( uno_ThreadPool hPool )
|
|
{
|
|
MutexGuard guard( Mutex::getGlobalMutex() );
|
|
assert( g_pThreadpoolHashSet != nullptr );
|
|
ThreadpoolHashSet::iterator i( g_pThreadpoolHashSet->find(hPool) );
|
|
assert( i != g_pThreadpoolHashSet->end() );
|
|
return i->second;
|
|
}
|
|
|
|
}
|
|
|
|
extern "C" uno_ThreadPool SAL_CALL
|
|
uno_threadpool_create() noexcept
|
|
{
|
|
MutexGuard guard( Mutex::getGlobalMutex() );
|
|
ThreadPoolHolder p;
|
|
if( ! g_pThreadpoolHashSet )
|
|
{
|
|
g_pThreadpoolHashSet = new ThreadpoolHashSet;
|
|
p = new ThreadPool;
|
|
}
|
|
else
|
|
{
|
|
assert( !g_pThreadpoolHashSet->empty() );
|
|
p = g_pThreadpoolHashSet->begin()->second;
|
|
}
|
|
|
|
// Just ensure that the handle is unique in the process (via heap)
|
|
uno_ThreadPool h = new struct _uno_ThreadPool;
|
|
g_pThreadpoolHashSet->emplace( h, p );
|
|
return h;
|
|
}
|
|
|
|
extern "C" void SAL_CALL
|
|
uno_threadpool_attach( uno_ThreadPool hPool ) noexcept
|
|
{
|
|
sal_Sequence *pThreadId = nullptr;
|
|
uno_getIdOfCurrentThread( &pThreadId );
|
|
getThreadPool( hPool )->prepare( pThreadId );
|
|
rtl_byte_sequence_release( pThreadId );
|
|
uno_releaseIdFromCurrentThread();
|
|
}
|
|
|
|
extern "C" void SAL_CALL
|
|
uno_threadpool_enter( uno_ThreadPool hPool , void **ppJob ) noexcept
|
|
{
|
|
sal_Sequence *pThreadId = nullptr;
|
|
uno_getIdOfCurrentThread( &pThreadId );
|
|
*ppJob =
|
|
getThreadPool( hPool )->enter(
|
|
pThreadId,
|
|
hPool );
|
|
rtl_byte_sequence_release( pThreadId );
|
|
uno_releaseIdFromCurrentThread();
|
|
}
|
|
|
|
extern "C" void SAL_CALL
|
|
uno_threadpool_detach(SAL_UNUSED_PARAMETER uno_ThreadPool) noexcept
|
|
{
|
|
// we might do here some tidying up in case a thread called attach but never detach
|
|
}
|
|
|
|
extern "C" void SAL_CALL
|
|
uno_threadpool_putJob(
|
|
uno_ThreadPool hPool,
|
|
sal_Sequence *pThreadId,
|
|
void *pJob,
|
|
void ( SAL_CALL * doRequest ) ( void *pThreadSpecificData ),
|
|
sal_Bool bIsOneway ) noexcept
|
|
{
|
|
if (!getThreadPool(hPool)->addJob( pThreadId, bIsOneway, pJob ,doRequest, hPool ))
|
|
{
|
|
SAL_WARN(
|
|
"cppu.threadpool",
|
|
"uno_threadpool_putJob in parallel with uno_threadpool_destroy");
|
|
}
|
|
}
|
|
|
|
extern "C" void SAL_CALL
|
|
uno_threadpool_dispose( uno_ThreadPool hPool ) noexcept
|
|
{
|
|
getThreadPool(hPool)->dispose(
|
|
hPool );
|
|
}
|
|
|
|
extern "C" void SAL_CALL
|
|
uno_threadpool_destroy( uno_ThreadPool hPool ) noexcept
|
|
{
|
|
ThreadPoolHolder p( getThreadPool(hPool) );
|
|
p->destroy(
|
|
hPool );
|
|
|
|
bool empty;
|
|
{
|
|
assert(g_pThreadpoolHashSet);
|
|
|
|
MutexGuard guard( Mutex::getGlobalMutex() );
|
|
|
|
ThreadpoolHashSet::iterator ii = g_pThreadpoolHashSet->find( hPool );
|
|
OSL_ASSERT( ii != g_pThreadpoolHashSet->end() );
|
|
g_pThreadpoolHashSet->erase( ii );
|
|
delete hPool;
|
|
|
|
empty = g_pThreadpoolHashSet->empty();
|
|
if( empty )
|
|
{
|
|
delete g_pThreadpoolHashSet;
|
|
g_pThreadpoolHashSet = nullptr;
|
|
}
|
|
}
|
|
|
|
if( empty )
|
|
{
|
|
p->joinWorkers();
|
|
}
|
|
}
|
|
|
|
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
|