diff options
Diffstat (limited to 'src/crimson/os/alienstore/thread_pool.cc')
-rw-r--r-- | src/crimson/os/alienstore/thread_pool.cc | 80 |
1 files changed, 80 insertions, 0 deletions
diff --git a/src/crimson/os/alienstore/thread_pool.cc b/src/crimson/os/alienstore/thread_pool.cc new file mode 100644 index 000000000..e127d87d5 --- /dev/null +++ b/src/crimson/os/alienstore/thread_pool.cc @@ -0,0 +1,80 @@ +#include "thread_pool.h" + +#include <chrono> +#include <pthread.h> + +#include "include/ceph_assert.h" +#include "crimson/common/config_proxy.h" + +using crimson::common::local_conf; + +namespace crimson::os { + +ThreadPool::ThreadPool(size_t n_threads, + size_t queue_sz, + long cpu_id) + : queue_size{round_up_to(queue_sz, seastar::smp::count)}, + pending{queue_size} +{ + auto queue_max_wait = std::chrono::seconds(local_conf()->threadpool_empty_queue_max_wait); + for (size_t i = 0; i < n_threads; i++) { + threads.emplace_back([this, cpu_id, queue_max_wait] { + if (cpu_id >= 0) { + pin(cpu_id); + } + loop(queue_max_wait); + }); + } +} + +ThreadPool::~ThreadPool() +{ + for (auto& thread : threads) { + thread.join(); + } +} + +void ThreadPool::pin(unsigned cpu_id) +{ + cpu_set_t cs; + CPU_ZERO(&cs); + CPU_SET(cpu_id, &cs); + [[maybe_unused]] auto r = pthread_setaffinity_np(pthread_self(), + sizeof(cs), &cs); + ceph_assert(r == 0); +} + +void ThreadPool::loop(std::chrono::milliseconds queue_max_wait) +{ + for (;;) { + WorkItem* work_item = nullptr; + { + std::unique_lock lock{mutex}; + cond.wait_for(lock, queue_max_wait, + [this, &work_item] { + return pending.pop(work_item) || is_stopping(); + }); + } + if (work_item) { + work_item->process(); + } else if (is_stopping()) { + break; + } + } +} + +seastar::future<> ThreadPool::start() +{ + auto slots_per_shard = queue_size / seastar::smp::count; + return submit_queue.start(slots_per_shard); +} + +seastar::future<> ThreadPool::stop() +{ + return submit_queue.stop().then([this] { + stopping = true; + cond.notify_all(); + }); +} + +} // namespace crimson::os |