blob: f878ff8a6d18d7b5c66f5597043888206328b39c (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab ft=cpp
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2019 Red Hat, Inc.
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#pragma once
#include <atomic>
#include "common/Thread.h"
#include "common/ceph_mutex.h"
#include "include/common_fwd.h"
#define dout_subsys ceph_subsys_rgw
class RGWRados;
class RGWRadosThread {
class Worker : public Thread, public DoutPrefixProvider {
CephContext *cct;
RGWRadosThread *processor;
ceph::mutex lock = ceph::make_mutex("RGWRadosThread::Worker");
ceph::condition_variable cond;
void wait() {
std::unique_lock l{lock};
cond.wait(l);
};
void wait_interval(const ceph::real_clock::duration& wait_time) {
std::unique_lock l{lock};
cond.wait_for(l, wait_time);
}
public:
Worker(CephContext *_cct, RGWRadosThread *_p) : cct(_cct), processor(_p) {}
void *entry() override;
void signal() {
std::lock_guard l{lock};
cond.notify_all();
}
CephContext *get_cct() const { return cct; }
unsigned get_subsys() const { return dout_subsys; }
std::ostream& gen_prefix(std::ostream& out) const { return out << "rgw rados thread: "; }
};
Worker *worker;
protected:
CephContext *cct;
RGWRados *store;
std::atomic<bool> down_flag = { false };
string thread_name;
virtual uint64_t interval_msec() = 0;
virtual void stop_process() {}
public:
RGWRadosThread(RGWRados *_store, const string& thread_name = "radosgw")
: worker(NULL), cct(_store->ctx()), store(_store), thread_name(thread_name) {}
virtual ~RGWRadosThread() {
stop();
}
virtual int init(const DoutPrefixProvider *dpp) { return 0; }
virtual int process(const DoutPrefixProvider *dpp) = 0;
bool going_down() { return down_flag; }
void start();
void stop();
void signal() {
if (worker) {
worker->signal();
}
}
};
|