1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef CEPH_JOURNALINGOBJECTSTORE_H
#define CEPH_JOURNALINGOBJECTSTORE_H
#include "os/ObjectStore.h"
#include "Journal.h"
#include "FileJournal.h"
#include "common/RWLock.h"
#include "osd/OpRequest.h"
class JournalingObjectStore : public ObjectStore {
protected:
Journal *journal;
Finisher finisher;
class SubmitManager {
CephContext* cct;
Mutex lock;
uint64_t op_seq;
uint64_t op_submitted;
public:
SubmitManager(CephContext* cct) :
cct(cct), lock("JOS::SubmitManager::lock", false, true, false),
op_seq(0), op_submitted(0)
{}
uint64_t op_submit_start();
void op_submit_finish(uint64_t op);
void set_op_seq(uint64_t seq) {
Mutex::Locker l(lock);
op_submitted = op_seq = seq;
}
uint64_t get_op_seq() {
return op_seq;
}
} submit_manager;
class ApplyManager {
CephContext* cct;
Journal *&journal;
Finisher &finisher;
Mutex apply_lock;
bool blocked;
Cond blocked_cond;
int open_ops;
uint64_t max_applied_seq;
Mutex com_lock;
map<version_t, vector<Context*> > commit_waiters;
uint64_t committing_seq, committed_seq;
public:
ApplyManager(CephContext* cct, Journal *&j, Finisher &f) :
cct(cct), journal(j), finisher(f),
apply_lock("JOS::ApplyManager::apply_lock", false, true, false),
blocked(false),
open_ops(0),
max_applied_seq(0),
com_lock("JOS::ApplyManager::com_lock", false, true, false),
committing_seq(0), committed_seq(0) {}
void reset() {
ceph_assert(open_ops == 0);
ceph_assert(blocked == false);
max_applied_seq = 0;
committing_seq = 0;
committed_seq = 0;
}
void add_waiter(uint64_t, Context*);
uint64_t op_apply_start(uint64_t op);
void op_apply_finish(uint64_t op);
bool commit_start();
void commit_started();
void commit_finish();
bool is_committing() {
Mutex::Locker l(com_lock);
return committing_seq != committed_seq;
}
uint64_t get_committed_seq() {
Mutex::Locker l(com_lock);
return committed_seq;
}
uint64_t get_committing_seq() {
Mutex::Locker l(com_lock);
return committing_seq;
}
void init_seq(uint64_t fs_op_seq) {
{
Mutex::Locker l(com_lock);
committed_seq = fs_op_seq;
committing_seq = fs_op_seq;
}
{
Mutex::Locker l(apply_lock);
max_applied_seq = fs_op_seq;
}
}
} apply_manager;
bool replaying;
protected:
void journal_start();
void journal_stop();
void journal_write_close();
int journal_replay(uint64_t fs_op_seq);
void _op_journal_transactions(bufferlist& tls, uint32_t orig_len, uint64_t op,
Context *onjournal, TrackedOpRef osd_op);
virtual int do_transactions(vector<ObjectStore::Transaction>& tls, uint64_t op_seq) = 0;
public:
bool is_committing() {
return apply_manager.is_committing();
}
uint64_t get_committed_seq() {
return apply_manager.get_committed_seq();
}
public:
JournalingObjectStore(CephContext* cct, const std::string& path)
: ObjectStore(cct, path),
journal(NULL),
finisher(cct, "JournalObjectStore", "fn_jrn_objstore"),
submit_manager(cct),
apply_manager(cct, journal, finisher),
replaying(false) {}
~JournalingObjectStore() override {
}
};
#endif
|