summaryrefslogtreecommitdiffstats
path: root/src/common/WorkQueue.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/common/WorkQueue.h')
-rw-r--r--src/common/WorkQueue.h680
1 files changed, 680 insertions, 0 deletions
diff --git a/src/common/WorkQueue.h b/src/common/WorkQueue.h
new file mode 100644
index 000000000..bb9e1b66b
--- /dev/null
+++ b/src/common/WorkQueue.h
@@ -0,0 +1,680 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_WORKQUEUE_H
+#define CEPH_WORKQUEUE_H
+
+#if defined(WITH_SEASTAR) && !defined(WITH_ALIEN)
+// for ObjectStore.h
+struct ThreadPool {
+ struct TPHandle {
+ };
+};
+
+#else
+
+#include <atomic>
+#include <list>
+#include <set>
+#include <string>
+#include <vector>
+
+#include "common/ceph_mutex.h"
+#include "include/unordered_map.h"
+#include "common/config_obs.h"
+#include "common/HeartbeatMap.h"
+#include "common/Thread.h"
+#include "include/common_fwd.h"
+#include "include/Context.h"
+#include "common/HBHandle.h"
+
+
+/// Pool of threads that share work submitted to multiple work queues.
+class ThreadPool : public md_config_obs_t {
+protected:
+ CephContext *cct;
+ std::string name;
+ std::string thread_name;
+ std::string lockname;
+ ceph::mutex _lock;
+ ceph::condition_variable _cond;
+ bool _stop;
+ int _pause;
+ int _draining;
+ ceph::condition_variable _wait_cond;
+
+public:
+ class TPHandle : public HBHandle {
+ friend class ThreadPool;
+ CephContext *cct;
+ ceph::heartbeat_handle_d *hb;
+ ceph::timespan grace;
+ ceph::timespan suicide_grace;
+ public:
+ TPHandle(
+ CephContext *cct,
+ ceph::heartbeat_handle_d *hb,
+ ceph::timespan grace,
+ ceph::timespan suicide_grace)
+ : cct(cct), hb(hb), grace(grace), suicide_grace(suicide_grace) {}
+ void reset_tp_timeout() override final;
+ void suspend_tp_timeout() override final;
+ };
+protected:
+
+ /// Basic interface to a work queue used by the worker threads.
+ struct WorkQueue_ {
+ std::string name;
+ ceph::timespan timeout_interval;
+ ceph::timespan suicide_interval;
+ WorkQueue_(std::string n, ceph::timespan ti, ceph::timespan sti)
+ : name(std::move(n)), timeout_interval(ti), suicide_interval(sti)
+ { }
+ virtual ~WorkQueue_() {}
+ /// Remove all work items from the queue.
+ virtual void _clear() = 0;
+ /// Check whether there is anything to do.
+ virtual bool _empty() = 0;
+ /// Get the next work item to process.
+ virtual void *_void_dequeue() = 0;
+ /** @brief Process the work item.
+ * This function will be called several times in parallel
+ * and must therefore be thread-safe. */
+ virtual void _void_process(void *item, TPHandle &handle) = 0;
+ /** @brief Synchronously finish processing a work item.
+ * This function is called after _void_process with the global thread pool lock held,
+ * so at most one copy will execute simultaneously for a given thread pool.
+ * It can be used for non-thread-safe finalization. */
+ virtual void _void_process_finish(void *) = 0;
+ void set_timeout(time_t ti){
+ timeout_interval = ceph::make_timespan(ti);
+ }
+ void set_suicide_timeout(time_t sti){
+ suicide_interval = ceph::make_timespan(sti);
+ }
+ };
+
+ // track thread pool size changes
+ unsigned _num_threads;
+ std::string _thread_num_option;
+ const char **_conf_keys;
+
+ const char **get_tracked_conf_keys() const override {
+ return _conf_keys;
+ }
+ void handle_conf_change(const ConfigProxy& conf,
+ const std::set <std::string> &changed) override;
+
+public:
+ /** @brief Templated by-value work queue.
+ * Skeleton implementation of a queue that processes items submitted by value.
+ * This is useful if the items are single primitive values or very small objects
+ * (a few bytes). The queue will automatically add itself to the thread pool on
+ * construction and remove itself on destruction. */
+ template<typename T, typename U = T>
+ class WorkQueueVal : public WorkQueue_ {
+ ceph::mutex _lock = ceph::make_mutex("WorkQueueVal::_lock");
+ ThreadPool *pool;
+ std::list<U> to_process;
+ std::list<U> to_finish;
+ virtual void _enqueue(T) = 0;
+ virtual void _enqueue_front(T) = 0;
+ bool _empty() override = 0;
+ virtual U _dequeue() = 0;
+ virtual void _process_finish(U) {}
+
+ void *_void_dequeue() override {
+ {
+ std::lock_guard l(_lock);
+ if (_empty())
+ return 0;
+ U u = _dequeue();
+ to_process.push_back(u);
+ }
+ return ((void*)1); // Not used
+ }
+ void _void_process(void *, TPHandle &handle) override {
+ _lock.lock();
+ ceph_assert(!to_process.empty());
+ U u = to_process.front();
+ to_process.pop_front();
+ _lock.unlock();
+
+ _process(u, handle);
+
+ _lock.lock();
+ to_finish.push_back(u);
+ _lock.unlock();
+ }
+
+ void _void_process_finish(void *) override {
+ _lock.lock();
+ ceph_assert(!to_finish.empty());
+ U u = to_finish.front();
+ to_finish.pop_front();
+ _lock.unlock();
+
+ _process_finish(u);
+ }
+
+ void _clear() override {}
+
+ public:
+ WorkQueueVal(std::string n,
+ ceph::timespan ti,
+ ceph::timespan sti,
+ ThreadPool *p)
+ : WorkQueue_(std::move(n), ti, sti), pool(p) {
+ pool->add_work_queue(this);
+ }
+ ~WorkQueueVal() override {
+ pool->remove_work_queue(this);
+ }
+ void queue(T item) {
+ std::lock_guard l(pool->_lock);
+ _enqueue(item);
+ pool->_cond.notify_one();
+ }
+ void queue_front(T item) {
+ std::lock_guard l(pool->_lock);
+ _enqueue_front(item);
+ pool->_cond.notify_one();
+ }
+ void drain() {
+ pool->drain(this);
+ }
+ protected:
+ void lock() {
+ pool->lock();
+ }
+ void unlock() {
+ pool->unlock();
+ }
+ virtual void _process(U u, TPHandle &) = 0;
+ };
+
+ /** @brief Template by-pointer work queue.
+ * Skeleton implementation of a queue that processes items of a given type submitted as pointers.
+ * This is useful when the work item are large or include dynamically allocated memory. The queue
+ * will automatically add itself to the thread pool on construction and remove itself on
+ * destruction. */
+ template<class T>
+ class WorkQueue : public WorkQueue_ {
+ ThreadPool *pool;
+
+ /// Add a work item to the queue.
+ virtual bool _enqueue(T *) = 0;
+ /// Dequeue a previously submitted work item.
+ virtual void _dequeue(T *) = 0;
+ /// Dequeue a work item and return the original submitted pointer.
+ virtual T *_dequeue() = 0;
+ virtual void _process_finish(T *) {}
+
+ // implementation of virtual methods from WorkQueue_
+ void *_void_dequeue() override {
+ return (void *)_dequeue();
+ }
+ void _void_process(void *p, TPHandle &handle) override {
+ _process(static_cast<T *>(p), handle);
+ }
+ void _void_process_finish(void *p) override {
+ _process_finish(static_cast<T *>(p));
+ }
+
+ protected:
+ /// Process a work item. Called from the worker threads.
+ virtual void _process(T *t, TPHandle &) = 0;
+
+ public:
+ WorkQueue(std::string n,
+ ceph::timespan ti, ceph::timespan sti,
+ ThreadPool* p)
+ : WorkQueue_(std::move(n), ti, sti), pool(p) {
+ pool->add_work_queue(this);
+ }
+ ~WorkQueue() override {
+ pool->remove_work_queue(this);
+ }
+
+ bool queue(T *item) {
+ pool->_lock.lock();
+ bool r = _enqueue(item);
+ pool->_cond.notify_one();
+ pool->_lock.unlock();
+ return r;
+ }
+ void dequeue(T *item) {
+ pool->_lock.lock();
+ _dequeue(item);
+ pool->_lock.unlock();
+ }
+ void clear() {
+ pool->_lock.lock();
+ _clear();
+ pool->_lock.unlock();
+ }
+
+ void lock() {
+ pool->lock();
+ }
+ void unlock() {
+ pool->unlock();
+ }
+ /// wake up the thread pool (without lock held)
+ void wake() {
+ pool->wake();
+ }
+ /// wake up the thread pool (with lock already held)
+ void _wake() {
+ pool->_wake();
+ }
+ void _wait() {
+ pool->_wait();
+ }
+ void drain() {
+ pool->drain(this);
+ }
+
+ };
+
+ template<typename T>
+ class PointerWQ : public WorkQueue_ {
+ public:
+ ~PointerWQ() override {
+ m_pool->remove_work_queue(this);
+ ceph_assert(m_processing == 0);
+ }
+ void drain() {
+ {
+ // if this queue is empty and not processing, don't wait for other
+ // queues to finish processing
+ std::lock_guard l(m_pool->_lock);
+ if (m_processing == 0 && m_items.empty()) {
+ return;
+ }
+ }
+ m_pool->drain(this);
+ }
+ void queue(T *item) {
+ std::lock_guard l(m_pool->_lock);
+ m_items.push_back(item);
+ m_pool->_cond.notify_one();
+ }
+ bool empty() {
+ std::lock_guard l(m_pool->_lock);
+ return _empty();
+ }
+ protected:
+ PointerWQ(std::string n,
+ ceph::timespan ti, ceph::timespan sti,
+ ThreadPool* p)
+ : WorkQueue_(std::move(n), ti, sti), m_pool(p), m_processing(0) {
+ }
+ void register_work_queue() {
+ m_pool->add_work_queue(this);
+ }
+ void _clear() override {
+ ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
+ m_items.clear();
+ }
+ bool _empty() override {
+ ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
+ return m_items.empty();
+ }
+ void *_void_dequeue() override {
+ ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
+ if (m_items.empty()) {
+ return NULL;
+ }
+
+ ++m_processing;
+ T *item = m_items.front();
+ m_items.pop_front();
+ return item;
+ }
+ void _void_process(void *item, ThreadPool::TPHandle &handle) override {
+ process(reinterpret_cast<T *>(item));
+ }
+ void _void_process_finish(void *item) override {
+ ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
+ ceph_assert(m_processing > 0);
+ --m_processing;
+ }
+
+ virtual void process(T *item) = 0;
+ void process_finish() {
+ std::lock_guard locker(m_pool->_lock);
+ _void_process_finish(nullptr);
+ }
+
+ T *front() {
+ ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
+ if (m_items.empty()) {
+ return NULL;
+ }
+ return m_items.front();
+ }
+ void requeue_front(T *item) {
+ std::lock_guard pool_locker(m_pool->_lock);
+ _void_process_finish(nullptr);
+ m_items.push_front(item);
+ }
+ void requeue_back(T *item) {
+ std::lock_guard pool_locker(m_pool->_lock);
+ _void_process_finish(nullptr);
+ m_items.push_back(item);
+ }
+ void signal() {
+ std::lock_guard pool_locker(m_pool->_lock);
+ m_pool->_cond.notify_one();
+ }
+ ceph::mutex &get_pool_lock() {
+ return m_pool->_lock;
+ }
+ private:
+ ThreadPool *m_pool;
+ std::list<T *> m_items;
+ uint32_t m_processing;
+ };
+protected:
+ std::vector<WorkQueue_*> work_queues;
+ int next_work_queue = 0;
+
+
+ // threads
+ struct WorkThread : public Thread {
+ ThreadPool *pool;
+ // cppcheck-suppress noExplicitConstructor
+ WorkThread(ThreadPool *p) : pool(p) {}
+ void *entry() override {
+ pool->worker(this);
+ return 0;
+ }
+ };
+
+ std::set<WorkThread*> _threads;
+ std::list<WorkThread*> _old_threads; ///< need to be joined
+ int processing;
+
+ void start_threads();
+ void join_old_threads();
+ virtual void worker(WorkThread *wt);
+
+public:
+ ThreadPool(CephContext *cct_, std::string nm, std::string tn, int n, const char *option = NULL);
+ ~ThreadPool() override;
+
+ /// return number of threads currently running
+ int get_num_threads() {
+ std::lock_guard l(_lock);
+ return _num_threads;
+ }
+
+ /// assign a work queue to this thread pool
+ void add_work_queue(WorkQueue_* wq) {
+ std::lock_guard l(_lock);
+ work_queues.push_back(wq);
+ }
+ /// remove a work queue from this thread pool
+ void remove_work_queue(WorkQueue_* wq) {
+ std::lock_guard l(_lock);
+ unsigned i = 0;
+ while (work_queues[i] != wq)
+ i++;
+ for (i++; i < work_queues.size(); i++)
+ work_queues[i-1] = work_queues[i];
+ ceph_assert(i == work_queues.size());
+ work_queues.resize(i-1);
+ }
+
+ /// take thread pool lock
+ void lock() {
+ _lock.lock();
+ }
+ /// release thread pool lock
+ void unlock() {
+ _lock.unlock();
+ }
+
+ /// wait for a kick on this thread pool
+ void wait(ceph::condition_variable &c) {
+ std::unique_lock l(_lock, std::adopt_lock);
+ c.wait(l);
+ }
+
+ /// wake up a waiter (with lock already held)
+ void _wake() {
+ _cond.notify_all();
+ }
+ /// wake up a waiter (without lock held)
+ void wake() {
+ std::lock_guard l(_lock);
+ _cond.notify_all();
+ }
+ void _wait() {
+ std::unique_lock l(_lock, std::adopt_lock);
+ _cond.wait(l);
+ }
+
+ /// start thread pool thread
+ void start();
+ /// stop thread pool thread
+ void stop(bool clear_after=true);
+ /// pause thread pool (if it not already paused)
+ void pause();
+ /// pause initiation of new work
+ void pause_new();
+ /// resume work in thread pool. must match each pause() call 1:1 to resume.
+ void unpause();
+ /** @brief Wait until work completes.
+ * If the parameter is NULL, blocks until all threads are idle.
+ * If it is not NULL, blocks until the given work queue does not have
+ * any items left to process. */
+ void drain(WorkQueue_* wq = 0);
+};
+
+class GenContextWQ :
+ public ThreadPool::WorkQueueVal<GenContext<ThreadPool::TPHandle&>*> {
+ std::list<GenContext<ThreadPool::TPHandle&>*> _queue;
+public:
+ GenContextWQ(const std::string &name, ceph::timespan ti, ThreadPool *tp)
+ : ThreadPool::WorkQueueVal<
+ GenContext<ThreadPool::TPHandle&>*>(name, ti, ti*10, tp) {}
+
+ void _enqueue(GenContext<ThreadPool::TPHandle&> *c) override {
+ _queue.push_back(c);
+ }
+ void _enqueue_front(GenContext<ThreadPool::TPHandle&> *c) override {
+ _queue.push_front(c);
+ }
+ bool _empty() override {
+ return _queue.empty();
+ }
+ GenContext<ThreadPool::TPHandle&> *_dequeue() override {
+ ceph_assert(!_queue.empty());
+ GenContext<ThreadPool::TPHandle&> *c = _queue.front();
+ _queue.pop_front();
+ return c;
+ }
+ void _process(GenContext<ThreadPool::TPHandle&> *c,
+ ThreadPool::TPHandle &tp) override {
+ c->complete(tp);
+ }
+};
+
+class C_QueueInWQ : public Context {
+ GenContextWQ *wq;
+ GenContext<ThreadPool::TPHandle&> *c;
+public:
+ C_QueueInWQ(GenContextWQ *wq, GenContext<ThreadPool::TPHandle &> *c)
+ : wq(wq), c(c) {}
+ void finish(int) override {
+ wq->queue(c);
+ }
+};
+
+/// Work queue that asynchronously completes contexts (executes callbacks).
+/// @see Finisher
+class ContextWQ : public ThreadPool::PointerWQ<Context> {
+public:
+ ContextWQ(const std::string &name, ceph::timespan ti, ThreadPool *tp)
+ : ThreadPool::PointerWQ<Context>(name, ti, ceph::timespan::zero(), tp) {
+ this->register_work_queue();
+ }
+
+ void queue(Context *ctx, int result = 0) {
+ if (result != 0) {
+ std::lock_guard locker(m_lock);
+ m_context_results[ctx] = result;
+ }
+ ThreadPool::PointerWQ<Context>::queue(ctx);
+ }
+protected:
+ void _clear() override {
+ ThreadPool::PointerWQ<Context>::_clear();
+
+ std::lock_guard locker(m_lock);
+ m_context_results.clear();
+ }
+
+ void process(Context *ctx) override {
+ int result = 0;
+ {
+ std::lock_guard locker(m_lock);
+ ceph::unordered_map<Context *, int>::iterator it =
+ m_context_results.find(ctx);
+ if (it != m_context_results.end()) {
+ result = it->second;
+ m_context_results.erase(it);
+ }
+ }
+ ctx->complete(result);
+ }
+private:
+ ceph::mutex m_lock = ceph::make_mutex("ContextWQ::m_lock");
+ ceph::unordered_map<Context*, int> m_context_results;
+};
+
+class ShardedThreadPool {
+
+ CephContext *cct;
+ std::string name;
+ std::string thread_name;
+ std::string lockname;
+ ceph::mutex shardedpool_lock;
+ ceph::condition_variable shardedpool_cond;
+ ceph::condition_variable wait_cond;
+ uint32_t num_threads;
+
+ std::atomic<bool> stop_threads = { false };
+ std::atomic<bool> pause_threads = { false };
+ std::atomic<bool> drain_threads = { false };
+
+ uint32_t num_paused;
+ uint32_t num_drained;
+
+public:
+
+ class BaseShardedWQ {
+
+ public:
+ ceph::timespan timeout_interval, suicide_interval;
+ BaseShardedWQ(ceph::timespan ti, ceph::timespan sti)
+ :timeout_interval(ti), suicide_interval(sti) {}
+ virtual ~BaseShardedWQ() {}
+
+ virtual void _process(uint32_t thread_index, ceph::heartbeat_handle_d *hb ) = 0;
+ virtual void return_waiting_threads() = 0;
+ virtual void stop_return_waiting_threads() = 0;
+ virtual bool is_shard_empty(uint32_t thread_index) = 0;
+ };
+
+ template <typename T>
+ class ShardedWQ: public BaseShardedWQ {
+
+ ShardedThreadPool* sharded_pool;
+
+ protected:
+ virtual void _enqueue(T&&) = 0;
+ virtual void _enqueue_front(T&&) = 0;
+
+
+ public:
+ ShardedWQ(ceph::timespan ti,
+ ceph::timespan sti, ShardedThreadPool* tp)
+ : BaseShardedWQ(ti, sti), sharded_pool(tp) {
+ tp->set_wq(this);
+ }
+ ~ShardedWQ() override {}
+
+ void queue(T&& item) {
+ _enqueue(std::move(item));
+ }
+ void queue_front(T&& item) {
+ _enqueue_front(std::move(item));
+ }
+ void drain() {
+ sharded_pool->drain();
+ }
+
+ };
+
+private:
+
+ BaseShardedWQ* wq;
+ // threads
+ struct WorkThreadSharded : public Thread {
+ ShardedThreadPool *pool;
+ uint32_t thread_index;
+ WorkThreadSharded(ShardedThreadPool *p, uint32_t pthread_index): pool(p),
+ thread_index(pthread_index) {}
+ void *entry() override {
+ pool->shardedthreadpool_worker(thread_index);
+ return 0;
+ }
+ };
+
+ std::vector<WorkThreadSharded*> threads_shardedpool;
+ void start_threads();
+ void shardedthreadpool_worker(uint32_t thread_index);
+ void set_wq(BaseShardedWQ* swq) {
+ wq = swq;
+ }
+
+
+
+public:
+
+ ShardedThreadPool(CephContext *cct_, std::string nm, std::string tn, uint32_t pnum_threads);
+
+ ~ShardedThreadPool(){};
+
+ /// start thread pool thread
+ void start();
+ /// stop thread pool thread
+ void stop();
+ /// pause thread pool (if it not already paused)
+ void pause();
+ /// pause initiation of new work
+ void pause_new();
+ /// resume work in thread pool. must match each pause() call 1:1 to resume.
+ void unpause();
+ /// wait for all work to complete
+ void drain();
+
+};
+
+#endif
+
+#endif