blob: 9d8ea6c10f3d924a73dc78a72b6d7b82ace440b7 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#ifndef CEPH_JOURNAL_JOURNAL_RECORDER_H
#define CEPH_JOURNAL_JOURNAL_RECORDER_H
#include "include/int_types.h"
#include "include/Context.h"
#include "include/rados/librados.hpp"
#include "common/ceph_mutex.h"
#include "common/containers.h"
#include "common/Timer.h"
#include "journal/Future.h"
#include "journal/FutureImpl.h"
#include "journal/JournalMetadata.h"
#include "journal/ObjectRecorder.h"
#include <map>
#include <string>
namespace journal {
class JournalRecorder {
public:
JournalRecorder(librados::IoCtx &ioctx, std::string_view object_oid_prefix,
ceph::ref_t<JournalMetadata> journal_metadata,
uint64_t max_in_flight_appends);
~JournalRecorder();
void shut_down(Context *on_safe);
void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
double flush_age);
Future append(uint64_t tag_tid, const bufferlist &bl);
void flush(Context *on_safe);
ceph::ref_t<ObjectRecorder> get_object(uint8_t splay_offset);
private:
typedef std::map<uint8_t, ceph::ref_t<ObjectRecorder>> ObjectRecorderPtrs;
typedef std::vector<std::unique_lock<ceph::mutex>> Lockers;
struct Listener : public JournalMetadataListener {
JournalRecorder *journal_recorder;
Listener(JournalRecorder *_journal_recorder)
: journal_recorder(_journal_recorder) {}
void handle_update(JournalMetadata *) override {
journal_recorder->handle_update();
}
};
struct ObjectHandler : public ObjectRecorder::Handler {
JournalRecorder *journal_recorder;
ObjectHandler(JournalRecorder *_journal_recorder)
: journal_recorder(_journal_recorder) {
}
void closed(ObjectRecorder *object_recorder) override {
journal_recorder->handle_closed(object_recorder);
}
void overflow(ObjectRecorder *object_recorder) override {
journal_recorder->handle_overflow(object_recorder);
}
};
struct C_AdvanceObjectSet : public Context {
JournalRecorder *journal_recorder;
C_AdvanceObjectSet(JournalRecorder *_journal_recorder)
: journal_recorder(_journal_recorder) {
}
void finish(int r) override {
journal_recorder->handle_advance_object_set(r);
}
};
librados::IoCtx m_ioctx;
CephContext *m_cct = nullptr;
std::string m_object_oid_prefix;
ceph::ref_t<JournalMetadata> m_journal_metadata;
uint32_t m_flush_interval = 0;
uint64_t m_flush_bytes = 0;
double m_flush_age = 0;
uint64_t m_max_in_flight_appends;
Listener m_listener;
ObjectHandler m_object_handler;
ceph::mutex m_lock = ceph::make_mutex("JournalerRecorder::m_lock");
uint32_t m_in_flight_advance_sets = 0;
uint32_t m_in_flight_object_closes = 0;
uint64_t m_current_set;
ObjectRecorderPtrs m_object_ptrs;
ceph::containers::tiny_vector<ceph::mutex> m_object_locks;
ceph::ref_t<FutureImpl> m_prev_future;
Context *m_on_object_set_advanced = nullptr;
void open_object_set();
bool close_object_set(uint64_t active_set);
void advance_object_set();
void handle_advance_object_set(int r);
void close_and_advance_object_set(uint64_t object_set);
ceph::ref_t<ObjectRecorder> create_object_recorder(uint64_t object_number,
ceph::mutex* lock);
bool create_next_object_recorder(ceph::ref_t<ObjectRecorder> object_recorder);
void handle_update();
void handle_closed(ObjectRecorder *object_recorder);
void handle_overflow(ObjectRecorder *object_recorder);
Lockers lock_object_recorders();
};
} // namespace journal
#endif // CEPH_JOURNAL_JOURNAL_RECORDER_H
|