blob: e127d87d524fc9cca0ee290335372a18de52a780 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
#include "thread_pool.h"
#include <chrono>
#include <pthread.h>
#include "include/ceph_assert.h"
#include "crimson/common/config_proxy.h"
using crimson::common::local_conf;
namespace crimson::os {
ThreadPool::ThreadPool(size_t n_threads,
size_t queue_sz,
long cpu_id)
: queue_size{round_up_to(queue_sz, seastar::smp::count)},
pending{queue_size}
{
auto queue_max_wait = std::chrono::seconds(local_conf()->threadpool_empty_queue_max_wait);
for (size_t i = 0; i < n_threads; i++) {
threads.emplace_back([this, cpu_id, queue_max_wait] {
if (cpu_id >= 0) {
pin(cpu_id);
}
loop(queue_max_wait);
});
}
}
ThreadPool::~ThreadPool()
{
for (auto& thread : threads) {
thread.join();
}
}
void ThreadPool::pin(unsigned cpu_id)
{
cpu_set_t cs;
CPU_ZERO(&cs);
CPU_SET(cpu_id, &cs);
[[maybe_unused]] auto r = pthread_setaffinity_np(pthread_self(),
sizeof(cs), &cs);
ceph_assert(r == 0);
}
void ThreadPool::loop(std::chrono::milliseconds queue_max_wait)
{
for (;;) {
WorkItem* work_item = nullptr;
{
std::unique_lock lock{mutex};
cond.wait_for(lock, queue_max_wait,
[this, &work_item] {
return pending.pop(work_item) || is_stopping();
});
}
if (work_item) {
work_item->process();
} else if (is_stopping()) {
break;
}
}
}
seastar::future<> ThreadPool::start()
{
auto slots_per_shard = queue_size / seastar::smp::count;
return submit_queue.start(slots_per_shard);
}
seastar::future<> ThreadPool::stop()
{
return submit_queue.stop().then([this] {
stopping = true;
cond.notify_all();
});
}
} // namespace crimson::os
|