1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
* Portions Copyright (C) 2013 CohortFS, LLC
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef XIO_MESSENGER_H
#define XIO_MESSENGER_H
#include "msg/SimplePolicyMessenger.h"
#include <atomic>
extern "C" {
#include "libxio.h"
}
#include "XioConnection.h"
#include "XioPortal.h"
#include "QueueStrategy.h"
#include "common/Thread.h"
#include "common/Mutex.h"
#include "include/spinlock.h"
class XioInit {
/* safe to be called multiple times */
void package_init(CephContext *cct);
protected:
explicit XioInit(CephContext *cct) {
this->package_init(cct);
}
};
class XioMessenger : public SimplePolicyMessenger, XioInit
{
private:
static std::atomic<uint64_t> nInstances = { 0 };
std::atomic<uint64_t> nsessions = { 0 };
std::atomic<bool> shutdown_called = { false };
ceph::spinlock conns_sp;
XioConnection::ConnList conns_list;
XioConnection::EntitySet conns_entity_map;
XioPortals portals;
DispatchStrategy* dispatch_strategy;
XioLoopbackConnectionRef loop_con;
uint32_t special_handling;
Mutex sh_mtx;
Cond sh_cond;
bool need_addr;
bool did_bind;
/// approximately unique ID set by the Constructor for use in entity_addr_t
uint64_t nonce;
friend class XioConnection;
public:
XioMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t nonce,
uint64_t cflags = 0,
DispatchStrategy* ds = new QueueStrategy(1));
virtual ~XioMessenger();
XioPortal* get_portal() { return portals.get_next_portal(); }
virtual void set_myaddr(const entity_addr_t& a) {
Messenger::set_myaddr(a);
loop_con->set_peer_addr(a);
}
int _send_message(Message *m, const entity_inst_t &dest);
int _send_message(Message *m, Connection *con);
int _send_message_impl(Message *m, XioConnection *xcon);
uint32_t get_special_handling() { return special_handling; }
void set_special_handling(int n) { special_handling = n; }
int pool_hint(uint32_t size);
void try_insert(XioConnection *xcon);
/* xio hooks */
int new_session(struct xio_session *session,
struct xio_new_session_req *req,
void *cb_user_context);
int session_event(struct xio_session *session,
struct xio_session_event_data *event_data,
void *cb_user_context);
/* Messenger interface */
virtual bool set_addr_unknowns(const entity_addrvec_t &addr) override
{ } /* XXX applicable? */
virtual void set_addr(const entity_addr_t &addr) override
{ } /* XXX applicable? */
virtual int get_dispatch_queue_len()
{ return 0; } /* XXX bogus? */
virtual double get_dispatch_queue_max_age(utime_t now)
{ return 0; } /* XXX bogus? */
virtual void set_cluster_protocol(int p)
{ }
virtual int bind(const entity_addr_t& addr);
virtual int rebind(const set<int>& avoid_ports);
virtual int start();
virtual void wait();
virtual int shutdown();
virtual int send_message(Message *m, const entity_inst_t &dest) {
return _send_message(m, dest);
}
virtual int lazy_send_message(Message *m, const entity_inst_t& dest)
{ return EINVAL; }
virtual int lazy_send_message(Message *m, Connection *con)
{ return EINVAL; }
virtual ConnectionRef get_connection(const entity_inst_t& dest);
// compat hack
ConnectionRef connect_to(
int type, const entity_addrvec_t& dest) override {
return get_connection(entity_inst_t(entity_name_t(type, -1),
dest.legacy_addr()));
}
virtual ConnectionRef get_loopback_connection();
void unregister_xcon(XioConnection *xcon);
virtual void mark_down(const entity_addr_t& a);
virtual void mark_down(Connection *con);
virtual void mark_down_all();
virtual void mark_down_on_empty(Connection *con);
virtual void mark_disposable(Connection *con);
void ds_dispatch(Message *m)
{ dispatch_strategy->ds_dispatch(m); }
/**
* Tell the XioMessenger its full IP address.
*
* This is used by clients when connecting to other endpoints, and
* probably shouldn't be called by anybody else.
*/
void learned_addr(const entity_addr_t& peer_addr_for_me);
private:
int get_nconns_per_portal(uint64_t cflags);
int get_nportals(uint64_t cflags);
protected:
virtual void ready()
{ }
};
XioCommand* pool_alloc_xio_command(XioConnection *xcon);
#endif /* XIO_MESSENGER_H */
|