// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #pragma once #include #include #include #include #include #include #include #include #include #include #include "Condition.h" namespace ceph::thread { struct WorkItem { virtual ~WorkItem() {} virtual void process() = 0; }; template> struct Task final : WorkItem { Func func; seastar::future_state state; ceph::thread::Condition on_done; public: explicit Task(Func&& f) : func(std::move(f)) {} void process() override { try { state.set(func()); } catch (...) { state.set_exception(std::current_exception()); } on_done.notify(); } seastar::future get_future() { return on_done.wait().then([this] { return seastar::make_ready_future(state.get0(std::move(state).get())); }); } }; struct SubmitQueue { seastar::semaphore free_slots; seastar::gate pending_tasks; explicit SubmitQueue(size_t num_free_slots) : free_slots(num_free_slots) {} seastar::future<> stop() { return pending_tasks.close(); } }; /// an engine for scheduling non-seastar tasks from seastar fibers class ThreadPool { std::atomic stopping = false; std::mutex mutex; std::condition_variable cond; std::vector threads; seastar::sharded submit_queue; const size_t queue_size; boost::lockfree::queue pending; void loop(); bool is_stopping() const { return stopping.load(std::memory_order_relaxed); } static void pin(unsigned cpu_id); seastar::semaphore& local_free_slots() { return submit_queue.local().free_slots; } ThreadPool(const ThreadPool&) = delete; ThreadPool& operator=(const ThreadPool&) = delete; public: /** * @param queue_sz the depth of pending queue. before a task is scheduled, * it waits in this queue. we will round this number to * multiple of the number of cores. * @param n_threads the number of threads in this thread pool. * @param cpu the CPU core to which this thread pool is assigned * @note each @c Task has its own ceph::thread::Condition, which possesses * possesses an fd, so we should keep the size of queue under a reasonable * limit. */ ThreadPool(size_t n_threads, size_t queue_sz, unsigned cpu); ~ThreadPool(); seastar::future<> start(); seastar::future<> stop(); template auto submit(Func&& func, Args&&... args) { auto packaged = [func=std::move(func), args=std::forward_as_tuple(args...)] { return std::apply(std::move(func), std::move(args)); }; return seastar::with_gate(submit_queue.local().pending_tasks, [packaged=std::move(packaged), this] { return local_free_slots().wait() .then([packaged=std::move(packaged), this] { auto task = new Task{std::move(packaged)}; auto fut = task->get_future(); pending.push(task); cond.notify_one(); return fut.finally([task, this] { local_free_slots().signal(); delete task; }); }); }); } }; } // namespace ceph::thread