1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
* Portions Copyright (C) 2013 CohortFS, LLC
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef XIO_CONNECTION_H
#define XIO_CONNECTION_H
#include <atomic>
#include <boost/intrusive/avl_set.hpp>
#include <boost/intrusive/list.hpp>
extern "C" {
#include "libxio.h"
}
#include "XioInSeq.h"
#include "XioSubmit.h"
#include "msg/Connection.h"
#include "msg/Messenger.h"
#include "auth/AuthSessionHandler.h"
#define XIO_ALL_FEATURES (CEPH_FEATURES_ALL)
#define XIO_NOP_TAG_MARKDOWN 0x0001
namespace bi = boost::intrusive;
class XioPortal;
class XioMessenger;
class XioSend;
class XioConnection : public Connection
{
public:
enum type { ACTIVE, PASSIVE };
enum class session_states : unsigned {
INIT = 0,
START,
UP,
FLOW_CONTROLLED,
DISCONNECTED,
DELETED,
BARRIER
};
enum class session_startup_states : unsigned {
IDLE = 0,
CONNECTING,
ACCEPTING,
READY,
FAIL
};
private:
XioConnection::type xio_conn_type;
XioPortal *portal;
std::atomic<bool> connected = { false };
entity_inst_t peer;
struct xio_session *session;
struct xio_connection *conn;
ceph::util::spinlock sp;
std::atomic<int64_t> send = { 0 };
std::atomic<int64_t> recv = { 0 };
uint32_t n_reqs; // Accelio-initiated reqs in progress (!counting partials)
uint32_t magic;
uint32_t special_handling;
uint64_t scount;
uint32_t send_ctr;
int q_high_mark;
int q_low_mark;
struct lifecycle {
// different from Pipe states?
enum lf_state {
INIT,
LOCAL_DISCON,
REMOTE_DISCON,
RECONNECTING,
UP,
DEAD } state;
/* XXX */
uint32_t reconnects;
uint32_t connect_seq, peer_global_seq;
uint64_t in_seq, out_seq_acked; // atomic<uint64_t>, got receipt
std::atomic<int64_t> out_seq = { 0 };
lifecycle() : state(lifecycle::INIT), reconnects(0), connect_seq(0),
peer_global_seq(0), in_seq(0), out_seq_acked(0)
{}
void set_in_seq(uint64_t seq) {
in_seq = seq;
}
uint64_t next_out_seq() {
return ++out_seq;
}
} state;
/* batching */
XioInSeq in_seq;
class CState
{
public:
static const int FLAG_NONE = 0x0000;
static const int FLAG_BAD_AUTH = 0x0001;
static const int FLAG_MAPPED = 0x0002;
static const int FLAG_RESET = 0x0004;
static const int OP_FLAG_NONE = 0x0000;
static const int OP_FLAG_LOCKED = 0x0001;
static const int OP_FLAG_LRU = 0x0002;
uint64_t features;
Messenger::Policy policy;
CryptoKey session_key;
std::shared_ptr<AuthSessionHandler> session_security;
AuthAuthorizer *authorizer;
XioConnection *xcon;
uint32_t protocol_version;
std::atomic<session_states> session_state = { 0 };
std::atomic<session_startup_state> startup_state = { 0 };
uint32_t reconnects;
uint32_t connect_seq, global_seq, peer_global_seq;
uint64_t in_seq, out_seq_acked; // atomic<uint64_t>, got receipt
std::atomic<uint64_t> out_seq = { 0 };
uint32_t flags;
explicit CState(XioConnection* _xcon)
: features(0),
authorizer(NULL),
xcon(_xcon),
protocol_version(0),
session_state(INIT),
startup_state(IDLE),
reconnects(0),
connect_seq(0),
global_seq(0),
peer_global_seq(0),
in_seq(0),
out_seq_acked(0),
flags(FLAG_NONE) {}
uint64_t get_session_state() {
return session_state;
}
uint64_t get_startup_state() {
return startup_state;
}
void set_in_seq(uint64_t seq) {
in_seq = seq;
}
uint64_t next_out_seq() {
return ++out_seq;
};
// state machine
int init_state();
int next_state(Message* m);
#if 0 // future (session startup)
int msg_connect(MConnect *m);
int msg_connect_reply(MConnectReply *m);
int msg_connect_reply(MConnectAuthReply *m);
int msg_connect_auth(MConnectAuth *m);
int msg_connect_auth_reply(MConnectAuthReply *m);
#endif
int state_up_ready(uint32_t flags);
int state_flow_controlled(uint32_t flags);
int state_discon();
int state_fail(Message* m, uint32_t flags);
} cstate; /* CState */
// message submission queue
struct SendQ {
bool keepalive;
bool ack;
utime_t ack_time;
Message::Queue mqueue; // deferred
XioSubmit::Queue requeue;
SendQ():keepalive(false), ack(false){}
} outgoing;
// conns_entity_map comparison functor
struct EntityComp
{
// for internal ordering
bool operator()(const XioConnection &lhs, const XioConnection &rhs) const
{ return lhs.get_peer() < rhs.get_peer(); }
// for external search by entity_inst_t(peer)
bool operator()(const entity_inst_t &peer, const XioConnection &c) const
{ return peer < c.get_peer(); }
bool operator()(const XioConnection &c, const entity_inst_t &peer) const
{ return c.get_peer() < peer; }
};
bi::list_member_hook<> conns_hook;
bi::avl_set_member_hook<> conns_entity_map_hook;
typedef bi::list< XioConnection,
bi::member_hook<XioConnection, bi::list_member_hook<>,
&XioConnection::conns_hook > > ConnList;
typedef bi::member_hook<XioConnection, bi::avl_set_member_hook<>,
&XioConnection::conns_entity_map_hook> EntityHook;
typedef bi::avl_set< XioConnection, EntityHook,
bi::compare<EntityComp> > EntitySet;
friend class XioPortal;
friend class XioMessenger;
friend class XioDispatchHook;
friend class XioMarkDownHook;
friend class XioSend;
int on_disconnect_event() {
std::lock_guard<ceph::spinlock> lg(sp);
connected = false;
discard_out_queues(CState::OP_FLAG_LOCKED);
return 0;
}
int on_teardown_event() {
{
std::lock_guard<ceph::spinlock> lg(sp);
if (conn)
xio_connection_destroy(conn);
conn = NULL;
}
this->put();
return 0;
}
int xio_qdepth_high_mark() {
return q_high_mark;
}
int xio_qdepth_low_mark() {
return q_low_mark;
}
public:
XioConnection(XioMessenger *m, XioConnection::type _type,
const entity_inst_t& peer);
~XioConnection() {
if (conn)
xio_connection_destroy(conn);
}
ostream& conn_prefix(std::ostream *_dout);
bool is_connected() override { return connected; }
int send_message(Message *m) override;
void send_keepalive() override {send_keepalive_or_ack();}
void send_keepalive_or_ack(bool ack = false, const utime_t *tp = nullptr);
void mark_down() override;
int _mark_down(uint32_t flags);
void mark_disposable() override;
int _mark_disposable(uint32_t flags);
const entity_inst_t& get_peer() const { return peer; }
XioConnection* get() {
#if 0
cout << "XioConnection::get " << this << " " << nref.load() << std::endl;
#endif
RefCountedObject::get();
return this;
}
void put() {
RefCountedObject::put();
#if 0
cout << "XioConnection::put " << this << " " << nref.load() << std::endl;
#endif
}
void disconnect() {
if (is_connected()) {
connected = false;
xio_disconnect(conn); // normal teardown will clean up conn
}
}
uint32_t get_magic() { return magic; }
void set_magic(int _magic) { magic = _magic; }
uint32_t get_special_handling() { return special_handling; }
void set_special_handling(int n) { special_handling = n; }
uint64_t get_scount() { return scount; }
int passive_setup(); /* XXX */
int handle_data_msg(struct xio_session *session, struct xio_msg *msg,
int more_in_batch, void *cb_user_context);
int on_msg(struct xio_session *session, struct xio_msg *msg,
int more_in_batch, void *cb_user_context);
int on_ow_msg_send_complete(struct xio_session *session, struct xio_msg *msg,
void *conn_user_context);
int on_msg_error(struct xio_session *session, enum xio_status error,
struct xio_msg *msg, void *conn_user_context);
void msg_send_fail(XioSend *xsend, int code);
void msg_release_fail(struct xio_msg *msg, int code);
private:
void send_keepalive_or_ack_internal(bool ack = false, const utime_t *tp = nullptr);
int flush_out_queues(uint32_t flags);
int discard_out_queues(uint32_t flags);
int adjust_clru(uint32_t flags);
};
typedef boost::intrusive_ptr<XioConnection> XioConnectionRef;
class XioLoopbackConnection : public Connection
{
private:
std::atomic<uint64_t> seq = { 0 };
public:
explicit XioLoopbackConnection(Messenger *m) : Connection(m->cct, m)
{
const entity_inst_t& m_inst = m->get_myinst();
peer_addr = m_inst.addr;
peer_type = m_inst.name.type();
set_features(XIO_ALL_FEATURES); /* XXXX set to ours */
}
XioLoopbackConnection* get() {
return static_cast<XioLoopbackConnection*>(RefCountedObject::get());
}
bool is_connected() override { return true; }
int send_message(Message *m) override;
void send_keepalive() override;
void mark_down() override {}
void mark_disposable() override {}
uint64_t get_seq() {
return seq;
}
uint64_t next_seq() {
return ++seq;
}
};
typedef boost::intrusive_ptr<XioLoopbackConnection> XioLoopbackConnectionRef;
#endif /* XIO_CONNECTION_H */
|