1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#ifndef CEPH_JOURNAL_OBJECT_PLAYER_H
#define CEPH_JOURNAL_OBJECT_PLAYER_H
#include "include/Context.h"
#include "include/interval_set.h"
#include "include/rados/librados.hpp"
#include "common/ceph_mutex.h"
#include "common/Timer.h"
#include "common/RefCountedObj.h"
#include "journal/Entry.h"
#include <list>
#include <string>
#include <boost/noncopyable.hpp>
#include <boost/unordered_map.hpp>
#include "include/ceph_assert.h"
namespace journal {
class ObjectPlayer : public RefCountedObject {
public:
typedef std::list<Entry> Entries;
typedef interval_set<uint64_t> InvalidRanges;
enum RefetchState {
REFETCH_STATE_NONE,
REFETCH_STATE_REQUIRED,
REFETCH_STATE_IMMEDIATE
};
inline const std::string &get_oid() const {
return m_oid;
}
inline uint64_t get_object_number() const {
return m_object_num;
}
void fetch(Context *on_finish);
void watch(Context *on_fetch, double interval);
void unwatch();
void front(Entry *entry) const;
void pop_front();
inline bool empty() const {
std::lock_guard locker{m_lock};
return m_entries.empty();
}
inline void get_entries(Entries *entries) {
std::lock_guard locker{m_lock};
*entries = m_entries;
}
inline void get_invalid_ranges(InvalidRanges *invalid_ranges) {
std::lock_guard locker{m_lock};
*invalid_ranges = m_invalid_ranges;
}
inline bool refetch_required() const {
return (get_refetch_state() != REFETCH_STATE_NONE);
}
inline RefetchState get_refetch_state() const {
return m_refetch_state;
}
inline void set_refetch_state(RefetchState refetch_state) {
m_refetch_state = refetch_state;
}
inline void set_max_fetch_bytes(uint64_t max_fetch_bytes) {
std::lock_guard locker{m_lock};
m_max_fetch_bytes = max_fetch_bytes;
}
private:
FRIEND_MAKE_REF(ObjectPlayer);
ObjectPlayer(librados::IoCtx &ioctx, const std::string& object_oid_prefix,
uint64_t object_num, SafeTimer &timer, ceph::mutex &timer_lock,
uint8_t order, uint64_t max_fetch_bytes);
~ObjectPlayer() override;
typedef std::pair<uint64_t, uint64_t> EntryKey;
typedef boost::unordered_map<EntryKey, Entries::iterator> EntryKeys;
struct C_Fetch : public Context {
ceph::ref_t<ObjectPlayer> object_player;
Context *on_finish;
bufferlist read_bl;
C_Fetch(ObjectPlayer *o, Context *ctx) : object_player(o), on_finish(ctx) {
}
void finish(int r) override;
};
struct C_WatchFetch : public Context {
ceph::ref_t<ObjectPlayer> object_player;
C_WatchFetch(ObjectPlayer *o) : object_player(o) {
}
void finish(int r) override;
};
librados::IoCtx m_ioctx;
uint64_t m_object_num;
std::string m_oid;
CephContext *m_cct = nullptr;
SafeTimer &m_timer;
ceph::mutex &m_timer_lock;
uint8_t m_order;
uint64_t m_max_fetch_bytes;
double m_watch_interval = 0;
Context *m_watch_task = nullptr;
mutable ceph::mutex m_lock;
bool m_fetch_in_progress = false;
bufferlist m_read_bl;
uint32_t m_read_off = 0;
uint32_t m_read_bl_off = 0;
Entries m_entries;
EntryKeys m_entry_keys;
InvalidRanges m_invalid_ranges;
Context *m_watch_ctx = nullptr;
bool m_unwatched = false;
RefetchState m_refetch_state = REFETCH_STATE_IMMEDIATE;
int handle_fetch_complete(int r, const bufferlist &bl, bool *refetch);
void clear_invalid_range(uint32_t off, uint32_t len);
void schedule_watch();
bool cancel_watch();
void handle_watch_task();
void handle_watch_fetched(int r);
};
} // namespace journal
#endif // CEPH_JOURNAL_OBJECT_PLAYER_H
|