1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
|
/*
* Copyright (C) 2018 Codership Oy <info@codership.com>
*
* This file is part of wsrep-lib.
*
* Wsrep-lib is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* Wsrep-lib is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
*/
/** @file transaction.hpp */
#ifndef WSREP_TRANSACTION_HPP
#define WSREP_TRANSACTION_HPP
#include "provider.hpp"
#include "server_state.hpp"
#include "transaction_id.hpp"
#include "streaming_context.hpp"
#include "lock.hpp"
#include "sr_key_set.hpp"
#include "buffer.hpp"
#include "client_service.hpp"
#include "xid.hpp"
#include <cassert>
#include <vector>
namespace wsrep
{
class client_service;
class client_state;
class key;
class const_buffer;
class transaction
{
public:
enum state
{
s_executing,
s_preparing,
s_prepared,
s_certifying,
s_committing,
s_ordered_commit,
s_committed,
s_cert_failed,
s_must_abort,
s_aborting,
s_aborted,
s_must_replay,
s_replaying
};
static const int n_states = s_replaying + 1;
enum state state() const
{ return state_; }
transaction(wsrep::client_state& client_state);
~transaction();
// Accessors
wsrep::transaction_id id() const
{ return id_; }
const wsrep::id& server_id() const
{ return server_id_; }
bool active() const
{ return (id_ != wsrep::transaction_id::undefined()); }
void state(wsrep::unique_lock<wsrep::mutex>&, enum state);
// Return true if the certification of the last
// fragment succeeded
bool certified() const { return certified_; }
wsrep::seqno seqno() const
{
return ws_meta_.seqno();
}
// Return true if the last fragment was ordered by the
// provider
bool ordered() const
{ return (ws_meta_.seqno().is_undefined() == false); }
/**
* Return true if any fragments have been successfully certified
* for the transaction.
*/
bool is_streaming() const
{
return (streaming_context_.fragments_certified() > 0);
}
/**
* Return number of fragments certified for current statement.
*
* This counts fragments which have been successfully certified
* since the construction of object or last after_statement()
* call.
*
* @return Number of fragments certified for current statement.
*/
size_t fragments_certified_for_statement() const
{
return fragments_certified_for_statement_;
}
/**
* Return true if transaction has not generated any changes.
*/
bool is_empty() const
{
return sr_keys_.empty();
}
bool is_xa() const
{
return !xid_.is_null();
}
void assign_xid(const wsrep::xid& xid);
const wsrep::xid& xid() const
{
return xid_;
}
int restore_to_prepared_state(const wsrep::xid& xid);
int commit_or_rollback_by_xid(const wsrep::xid& xid, bool commit);
void xa_detach();
int xa_replay(wsrep::unique_lock<wsrep::mutex>&);
bool pa_unsafe() const { return (flags() & wsrep::provider::flag::pa_unsafe); }
void pa_unsafe(bool pa_unsafe) {
if (pa_unsafe) {
flags(flags() | wsrep::provider::flag::pa_unsafe);
} else {
flags(flags() & ~wsrep::provider::flag::pa_unsafe);
}
}
bool implicit_deps() const { return implicit_deps_; }
void implicit_deps(bool implicit) { implicit_deps_ = implicit; }
int start_transaction(const wsrep::transaction_id& id);
int start_transaction(const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta);
int next_fragment(const wsrep::ws_meta& ws_meta);
void adopt(const transaction& transaction);
void fragment_applied(wsrep::seqno seqno);
int prepare_for_ordering(const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
bool is_commit);
int assign_read_view(const wsrep::gtid* gtid);
int append_key(const wsrep::key&);
int append_data(const wsrep::const_buffer&);
int after_row();
int before_prepare(wsrep::unique_lock<wsrep::mutex>&);
int after_prepare(wsrep::unique_lock<wsrep::mutex>&);
int before_commit();
int ordered_commit();
int after_commit();
int before_rollback();
int after_rollback();
int before_statement();
int after_statement();
void after_command_must_abort(wsrep::unique_lock<wsrep::mutex>&);
void after_applying();
bool bf_abort(wsrep::unique_lock<wsrep::mutex>& lock,
wsrep::seqno bf_seqno);
bool total_order_bf_abort(wsrep::unique_lock<wsrep::mutex>&,
wsrep::seqno bf_seqno);
void clone_for_replay(const wsrep::transaction& other);
bool bf_aborted() const
{
return (bf_abort_client_state_ != 0);
}
bool bf_aborted_in_total_order() const
{
return bf_aborted_in_total_order_;
}
int flags() const
{
return flags_;
}
// wsrep::mutex& mutex();
wsrep::ws_handle& ws_handle() { return ws_handle_; }
const wsrep::ws_handle& ws_handle() const { return ws_handle_; }
const wsrep::ws_meta& ws_meta() const { return ws_meta_; }
const wsrep::streaming_context& streaming_context() const
{ return streaming_context_; }
wsrep::streaming_context& streaming_context()
{ return streaming_context_; }
void adopt_apply_error(wsrep::mutable_buffer& buf)
{
apply_error_buf_ = std::move(buf);
}
const wsrep::mutable_buffer& apply_error() const
{ return apply_error_buf_; }
private:
transaction(const transaction&);
transaction operator=(const transaction&);
wsrep::provider& provider();
void flags(int flags) { flags_ = flags; }
// Return true if the transaction must abort, is aborting,
// or has been aborted, or has been interrupted by DBMS
// as indicated by client_service::interrupted() call.
// The call will adjust transaction state and set client_state
// error status accordingly.
bool abort_or_interrupt(wsrep::unique_lock<wsrep::mutex>&);
int streaming_step(wsrep::unique_lock<wsrep::mutex>&, bool force = false);
int certify_fragment(wsrep::unique_lock<wsrep::mutex>&);
int certify_commit(wsrep::unique_lock<wsrep::mutex>&);
int append_sr_keys_for_commit();
int release_commit_order(wsrep::unique_lock<wsrep::mutex>&);
void streaming_rollback(wsrep::unique_lock<wsrep::mutex>&);
int replay(wsrep::unique_lock<wsrep::mutex>&);
void xa_replay_common(wsrep::unique_lock<wsrep::mutex>&);
int xa_replay_commit(wsrep::unique_lock<wsrep::mutex>&);
void cleanup();
void debug_log_state(const char*) const;
void debug_log_key_append(const wsrep::key& key) const;
wsrep::server_service& server_service_;
wsrep::client_service& client_service_;
wsrep::client_state& client_state_;
wsrep::id server_id_;
wsrep::transaction_id id_;
enum state state_;
std::vector<enum state> state_hist_;
enum state bf_abort_state_;
enum wsrep::provider::status bf_abort_provider_status_;
int bf_abort_client_state_;
bool bf_aborted_in_total_order_;
wsrep::ws_handle ws_handle_;
wsrep::ws_meta ws_meta_;
int flags_;
bool implicit_deps_;
bool certified_;
size_t fragments_certified_for_statement_;
wsrep::streaming_context streaming_context_;
wsrep::sr_key_set sr_keys_;
wsrep::mutable_buffer apply_error_buf_;
wsrep::xid xid_;
bool streaming_rollback_in_progress_;
};
static inline const char* to_c_string(enum wsrep::transaction::state state)
{
switch (state)
{
case wsrep::transaction::s_executing: return "executing";
case wsrep::transaction::s_preparing: return "preparing";
case wsrep::transaction::s_prepared: return "prepared";
case wsrep::transaction::s_certifying: return "certifying";
case wsrep::transaction::s_committing: return "committing";
case wsrep::transaction::s_ordered_commit: return "ordered_commit";
case wsrep::transaction::s_committed: return "committed";
case wsrep::transaction::s_cert_failed: return "cert_failed";
case wsrep::transaction::s_must_abort: return "must_abort";
case wsrep::transaction::s_aborting: return "aborting";
case wsrep::transaction::s_aborted: return "aborted";
case wsrep::transaction::s_must_replay: return "must_replay";
case wsrep::transaction::s_replaying: return "replaying";
}
return "unknown";
}
static inline std::string to_string(enum wsrep::transaction::state state)
{
return to_c_string(state);
}
}
#endif // WSREP_TRANSACTION_HPP
|