blob: 241a09709ffebaf5df9e74d666d890719db4dc66 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#ifndef CEPH_JOURNAL_FUTURE_IMPL_H
#define CEPH_JOURNAL_FUTURE_IMPL_H
#include "include/int_types.h"
#include "common/RefCountedObj.h"
#include "include/Context.h"
#include "journal/Future.h"
#include <list>
#include <map>
#include <boost/noncopyable.hpp>
#include "include/ceph_assert.h"
class Context;
namespace journal {
class FutureImpl : public RefCountedObject, boost::noncopyable {
public:
struct FlushHandler {
using ref = std::shared_ptr<FlushHandler>;
virtual void flush(const ceph::ref_t<FutureImpl> &future) = 0;
virtual ~FlushHandler() = default;
};
void init(const ceph::ref_t<FutureImpl> &prev_future);
inline uint64_t get_tag_tid() const {
return m_tag_tid;
}
inline uint64_t get_entry_tid() const {
return m_entry_tid;
}
inline uint64_t get_commit_tid() const {
return m_commit_tid;
}
void flush(Context *on_safe = NULL);
void wait(Context *on_safe);
bool is_complete() const;
int get_return_value() const;
inline bool is_flush_in_progress() const {
std::lock_guard locker{m_lock};
return (m_flush_state == FLUSH_STATE_IN_PROGRESS);
}
inline void set_flush_in_progress() {
auto h = std::move(m_flush_handler);
ceph_assert(h);
std::lock_guard locker{m_lock};
m_flush_state = FLUSH_STATE_IN_PROGRESS;
}
bool attach(FlushHandler::ref flush_handler);
inline void detach() {
m_flush_handler.reset();
}
inline FlushHandler::ref get_flush_handler() const {
return m_flush_handler;
}
void safe(int r);
private:
friend std::ostream &operator<<(std::ostream &, const FutureImpl &);
typedef std::map<FlushHandler::ref, ceph::ref_t<FutureImpl>> FlushHandlers;
typedef std::list<Context *> Contexts;
enum FlushState {
FLUSH_STATE_NONE,
FLUSH_STATE_REQUESTED,
FLUSH_STATE_IN_PROGRESS
};
struct C_ConsistentAck : public Context {
ceph::ref_t<FutureImpl> future;
C_ConsistentAck(ceph::ref_t<FutureImpl> _future) : future(std::move(_future)) {}
void complete(int r) override {
future->consistent(r);
future.reset();
}
void finish(int r) override {}
};
FRIEND_MAKE_REF(FutureImpl);
FutureImpl(uint64_t tag_tid, uint64_t entry_tid, uint64_t commit_tid);
~FutureImpl() override = default;
uint64_t m_tag_tid;
uint64_t m_entry_tid;
uint64_t m_commit_tid;
mutable ceph::mutex m_lock = ceph::make_mutex("FutureImpl::m_lock", false);
ceph::ref_t<FutureImpl> m_prev_future;
bool m_safe = false;
bool m_consistent = false;
int m_return_value = 0;
FlushHandler::ref m_flush_handler;
FlushState m_flush_state = FLUSH_STATE_NONE;
C_ConsistentAck m_consistent_ack;
Contexts m_contexts;
ceph::ref_t<FutureImpl> prepare_flush(FlushHandlers *flush_handlers);
ceph::ref_t<FutureImpl> prepare_flush(FlushHandlers *flush_handlers, ceph::mutex &lock);
void consistent(int r);
void finish_unlock();
};
std::ostream &operator<<(std::ostream &os, const FutureImpl &future);
} // namespace journal
using journal::operator<<;
#endif // CEPH_JOURNAL_FUTURE_IMPL_H
|