// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2004-2006 Sage Weil * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #ifndef CEPH_WORKQUEUE_H #define CEPH_WORKQUEUE_H #if defined(WITH_SEASTAR) && !defined(WITH_ALIEN) // for ObjectStore.h struct ThreadPool { struct TPHandle { }; }; #else #include #include #include #include #include #include "common/ceph_mutex.h" #include "include/unordered_map.h" #include "common/config_obs.h" #include "common/HeartbeatMap.h" #include "common/Thread.h" #include "include/common_fwd.h" #include "include/Context.h" #include "common/HBHandle.h" /// Pool of threads that share work submitted to multiple work queues. class ThreadPool : public md_config_obs_t { protected: CephContext *cct; std::string name; std::string thread_name; std::string lockname; ceph::mutex _lock; ceph::condition_variable _cond; bool _stop; int _pause; int _draining; ceph::condition_variable _wait_cond; public: class TPHandle : public HBHandle { friend class ThreadPool; CephContext *cct; ceph::heartbeat_handle_d *hb; ceph::timespan grace; ceph::timespan suicide_grace; public: TPHandle( CephContext *cct, ceph::heartbeat_handle_d *hb, ceph::timespan grace, ceph::timespan suicide_grace) : cct(cct), hb(hb), grace(grace), suicide_grace(suicide_grace) {} void reset_tp_timeout() override final; void suspend_tp_timeout() override final; }; protected: /// Basic interface to a work queue used by the worker threads. struct WorkQueue_ { std::string name; ceph::timespan timeout_interval; ceph::timespan suicide_interval; WorkQueue_(std::string n, ceph::timespan ti, ceph::timespan sti) : name(std::move(n)), timeout_interval(ti), suicide_interval(sti) { } virtual ~WorkQueue_() {} /// Remove all work items from the queue. virtual void _clear() = 0; /// Check whether there is anything to do. virtual bool _empty() = 0; /// Get the next work item to process. virtual void *_void_dequeue() = 0; /** @brief Process the work item. * This function will be called several times in parallel * and must therefore be thread-safe. */ virtual void _void_process(void *item, TPHandle &handle) = 0; /** @brief Synchronously finish processing a work item. * This function is called after _void_process with the global thread pool lock held, * so at most one copy will execute simultaneously for a given thread pool. * It can be used for non-thread-safe finalization. */ virtual void _void_process_finish(void *) = 0; }; // track thread pool size changes unsigned _num_threads; std::string _thread_num_option; const char **_conf_keys; const char **get_tracked_conf_keys() const override { return _conf_keys; } void handle_conf_change(const ConfigProxy& conf, const std::set &changed) override; public: /** @brief Templated by-value work queue. * Skeleton implementation of a queue that processes items submitted by value. * This is useful if the items are single primitive values or very small objects * (a few bytes). The queue will automatically add itself to the thread pool on * construction and remove itself on destruction. */ template class WorkQueueVal : public WorkQueue_ { ceph::mutex _lock = ceph::make_mutex("WorkQueueVal::_lock"); ThreadPool *pool; std::list to_process; std::list to_finish; virtual void _enqueue(T) = 0; virtual void _enqueue_front(T) = 0; bool _empty() override = 0; virtual U _dequeue() = 0; virtual void _process_finish(U) {} void *_void_dequeue() override { { std::lock_guard l(_lock); if (_empty()) return 0; U u = _dequeue(); to_process.push_back(u); } return ((void*)1); // Not used } void _void_process(void *, TPHandle &handle) override { _lock.lock(); ceph_assert(!to_process.empty()); U u = to_process.front(); to_process.pop_front(); _lock.unlock(); _process(u, handle); _lock.lock(); to_finish.push_back(u); _lock.unlock(); } void _void_process_finish(void *) override { _lock.lock(); ceph_assert(!to_finish.empty()); U u = to_finish.front(); to_finish.pop_front(); _lock.unlock(); _process_finish(u); } void _clear() override {} public: WorkQueueVal(std::string n, ceph::timespan ti, ceph::timespan sti, ThreadPool *p) : WorkQueue_(std::move(n), ti, sti), pool(p) { pool->add_work_queue(this); } ~WorkQueueVal() override { pool->remove_work_queue(this); } void queue(T item) { std::lock_guard l(pool->_lock); _enqueue(item); pool->_cond.notify_one(); } void queue_front(T item) { std::lock_guard l(pool->_lock); _enqueue_front(item); pool->_cond.notify_one(); } void drain() { pool->drain(this); } protected: void lock() { pool->lock(); } void unlock() { pool->unlock(); } virtual void _process(U u, TPHandle &) = 0; }; /** @brief Template by-pointer work queue. * Skeleton implementation of a queue that processes items of a given type submitted as pointers. * This is useful when the work item are large or include dynamically allocated memory. The queue * will automatically add itself to the thread pool on construction and remove itself on * destruction. */ template class WorkQueue : public WorkQueue_ { ThreadPool *pool; /// Add a work item to the queue. virtual bool _enqueue(T *) = 0; /// Dequeue a previously submitted work item. virtual void _dequeue(T *) = 0; /// Dequeue a work item and return the original submitted pointer. virtual T *_dequeue() = 0; virtual void _process_finish(T *) {} // implementation of virtual methods from WorkQueue_ void *_void_dequeue() override { return (void *)_dequeue(); } void _void_process(void *p, TPHandle &handle) override { _process(static_cast(p), handle); } void _void_process_finish(void *p) override { _process_finish(static_cast(p)); } protected: /// Process a work item. Called from the worker threads. virtual void _process(T *t, TPHandle &) = 0; public: WorkQueue(std::string n, ceph::timespan ti, ceph::timespan sti, ThreadPool* p) : WorkQueue_(std::move(n), ti, sti), pool(p) { pool->add_work_queue(this); } ~WorkQueue() override { pool->remove_work_queue(this); } bool queue(T *item) { pool->_lock.lock(); bool r = _enqueue(item); pool->_cond.notify_one(); pool->_lock.unlock(); return r; } void dequeue(T *item) { pool->_lock.lock(); _dequeue(item); pool->_lock.unlock(); } void clear() { pool->_lock.lock(); _clear(); pool->_lock.unlock(); } void lock() { pool->lock(); } void unlock() { pool->unlock(); } /// wake up the thread pool (without lock held) void wake() { pool->wake(); } /// wake up the thread pool (with lock already held) void _wake() { pool->_wake(); } void _wait() { pool->_wait(); } void drain() { pool->drain(this); } }; template class PointerWQ : public WorkQueue_ { public: ~PointerWQ() override { m_pool->remove_work_queue(this); ceph_assert(m_processing == 0); } void drain() { { // if this queue is empty and not processing, don't wait for other // queues to finish processing std::lock_guard l(m_pool->_lock); if (m_processing == 0 && m_items.empty()) { return; } } m_pool->drain(this); } void queue(T *item) { std::lock_guard l(m_pool->_lock); m_items.push_back(item); m_pool->_cond.notify_one(); } bool empty() { std::lock_guard l(m_pool->_lock); return _empty(); } protected: PointerWQ(std::string n, ceph::timespan ti, ceph::timespan sti, ThreadPool* p) : WorkQueue_(std::move(n), ti, sti), m_pool(p), m_processing(0) { } void register_work_queue() { m_pool->add_work_queue(this); } void _clear() override { ceph_assert(ceph_mutex_is_locked(m_pool->_lock)); m_items.clear(); } bool _empty() override { ceph_assert(ceph_mutex_is_locked(m_pool->_lock)); return m_items.empty(); } void *_void_dequeue() override { ceph_assert(ceph_mutex_is_locked(m_pool->_lock)); if (m_items.empty()) { return NULL; } ++m_processing; T *item = m_items.front(); m_items.pop_front(); return item; } void _void_process(void *item, ThreadPool::TPHandle &handle) override { process(reinterpret_cast(item)); } void _void_process_finish(void *item) override { ceph_assert(ceph_mutex_is_locked(m_pool->_lock)); ceph_assert(m_processing > 0); --m_processing; } virtual void process(T *item) = 0; void process_finish() { std::lock_guard locker(m_pool->_lock); _void_process_finish(nullptr); } T *front() { ceph_assert(ceph_mutex_is_locked(m_pool->_lock)); if (m_items.empty()) { return NULL; } return m_items.front(); } void requeue_front(T *item) { std::lock_guard pool_locker(m_pool->_lock); _void_process_finish(nullptr); m_items.push_front(item); } void requeue_back(T *item) { std::lock_guard pool_locker(m_pool->_lock); _void_process_finish(nullptr); m_items.push_back(item); } void signal() { std::lock_guard pool_locker(m_pool->_lock); m_pool->_cond.notify_one(); } ceph::mutex &get_pool_lock() { return m_pool->_lock; } private: ThreadPool *m_pool; std::list m_items; uint32_t m_processing; }; protected: std::vector work_queues; int next_work_queue = 0; // threads struct WorkThread : public Thread { ThreadPool *pool; // cppcheck-suppress noExplicitConstructor WorkThread(ThreadPool *p) : pool(p) {} void *entry() override { pool->worker(this); return 0; } }; std::set _threads; std::list _old_threads; ///< need to be joined int processing; void start_threads(); void join_old_threads(); virtual void worker(WorkThread *wt); public: ThreadPool(CephContext *cct_, std::string nm, std::string tn, int n, const char *option = NULL); ~ThreadPool() override; /// return number of threads currently running int get_num_threads() { std::lock_guard l(_lock); return _num_threads; } /// assign a work queue to this thread pool void add_work_queue(WorkQueue_* wq) { std::lock_guard l(_lock); work_queues.push_back(wq); } /// remove a work queue from this thread pool void remove_work_queue(WorkQueue_* wq) { std::lock_guard l(_lock); unsigned i = 0; while (work_queues[i] != wq) i++; for (i++; i < work_queues.size(); i++) work_queues[i-1] = work_queues[i]; ceph_assert(i == work_queues.size()); work_queues.resize(i-1); } /// take thread pool lock void lock() { _lock.lock(); } /// release thread pool lock void unlock() { _lock.unlock(); } /// wait for a kick on this thread pool void wait(ceph::condition_variable &c) { std::unique_lock l(_lock, std::adopt_lock); c.wait(l); } /// wake up a waiter (with lock already held) void _wake() { _cond.notify_all(); } /// wake up a waiter (without lock held) void wake() { std::lock_guard l(_lock); _cond.notify_all(); } void _wait() { std::unique_lock l(_lock, std::adopt_lock); _cond.wait(l); } /// start thread pool thread void start(); /// stop thread pool thread void stop(bool clear_after=true); /// pause thread pool (if it not already paused) void pause(); /// pause initiation of new work void pause_new(); /// resume work in thread pool. must match each pause() call 1:1 to resume. void unpause(); /** @brief Wait until work completes. * If the parameter is NULL, blocks until all threads are idle. * If it is not NULL, blocks until the given work queue does not have * any items left to process. */ void drain(WorkQueue_* wq = 0); }; class GenContextWQ : public ThreadPool::WorkQueueVal*> { std::list*> _queue; public: GenContextWQ(const std::string &name, ceph::timespan ti, ThreadPool *tp) : ThreadPool::WorkQueueVal< GenContext*>(name, ti, ti*10, tp) {} void _enqueue(GenContext *c) override { _queue.push_back(c); } void _enqueue_front(GenContext *c) override { _queue.push_front(c); } bool _empty() override { return _queue.empty(); } GenContext *_dequeue() override { ceph_assert(!_queue.empty()); GenContext *c = _queue.front(); _queue.pop_front(); return c; } void _process(GenContext *c, ThreadPool::TPHandle &tp) override { c->complete(tp); } }; class C_QueueInWQ : public Context { GenContextWQ *wq; GenContext *c; public: C_QueueInWQ(GenContextWQ *wq, GenContext *c) : wq(wq), c(c) {} void finish(int) override { wq->queue(c); } }; /// Work queue that asynchronously completes contexts (executes callbacks). /// @see Finisher class ContextWQ : public ThreadPool::PointerWQ { public: ContextWQ(const std::string &name, ceph::timespan ti, ThreadPool *tp) : ThreadPool::PointerWQ(name, ti, ceph::timespan::zero(), tp) { this->register_work_queue(); } void queue(Context *ctx, int result = 0) { if (result != 0) { std::lock_guard locker(m_lock); m_context_results[ctx] = result; } ThreadPool::PointerWQ::queue(ctx); } protected: void _clear() override { ThreadPool::PointerWQ::_clear(); std::lock_guard locker(m_lock); m_context_results.clear(); } void process(Context *ctx) override { int result = 0; { std::lock_guard locker(m_lock); ceph::unordered_map::iterator it = m_context_results.find(ctx); if (it != m_context_results.end()) { result = it->second; m_context_results.erase(it); } } ctx->complete(result); } private: ceph::mutex m_lock = ceph::make_mutex("ContextWQ::m_lock"); ceph::unordered_map m_context_results; }; class ShardedThreadPool { CephContext *cct; std::string name; std::string thread_name; std::string lockname; ceph::mutex shardedpool_lock; ceph::condition_variable shardedpool_cond; ceph::condition_variable wait_cond; uint32_t num_threads; std::atomic stop_threads = { false }; std::atomic pause_threads = { false }; std::atomic drain_threads = { false }; uint32_t num_paused; uint32_t num_drained; public: class BaseShardedWQ { public: ceph::timespan timeout_interval, suicide_interval; BaseShardedWQ(ceph::timespan ti, ceph::timespan sti) :timeout_interval(ti), suicide_interval(sti) {} virtual ~BaseShardedWQ() {} virtual void _process(uint32_t thread_index, ceph::heartbeat_handle_d *hb ) = 0; virtual void return_waiting_threads() = 0; virtual void stop_return_waiting_threads() = 0; virtual bool is_shard_empty(uint32_t thread_index) = 0; }; template class ShardedWQ: public BaseShardedWQ { ShardedThreadPool* sharded_pool; protected: virtual void _enqueue(T&&) = 0; virtual void _enqueue_front(T&&) = 0; public: ShardedWQ(ceph::timespan ti, ceph::timespan sti, ShardedThreadPool* tp) : BaseShardedWQ(ti, sti), sharded_pool(tp) { tp->set_wq(this); } ~ShardedWQ() override {} void queue(T&& item) { _enqueue(std::move(item)); } void queue_front(T&& item) { _enqueue_front(std::move(item)); } void drain() { sharded_pool->drain(); } }; private: BaseShardedWQ* wq; // threads struct WorkThreadSharded : public Thread { ShardedThreadPool *pool; uint32_t thread_index; WorkThreadSharded(ShardedThreadPool *p, uint32_t pthread_index): pool(p), thread_index(pthread_index) {} void *entry() override { pool->shardedthreadpool_worker(thread_index); return 0; } }; std::vector threads_shardedpool; void start_threads(); void shardedthreadpool_worker(uint32_t thread_index); void set_wq(BaseShardedWQ* swq) { wq = swq; } public: ShardedThreadPool(CephContext *cct_, std::string nm, std::string tn, uint32_t pnum_threads); ~ShardedThreadPool(){}; /// start thread pool thread void start(); /// stop thread pool thread void stop(); /// pause thread pool (if it not already paused) void pause(); /// pause initiation of new work void pause_new(); /// resume work in thread pool. must match each pause() call 1:1 to resume. void unpause(); /// wait for all work to complete void drain(); }; #endif #endif