summaryrefslogtreecommitdiffstats
path: root/src/test/direct_messenger/DirectMessenger.cc
blob: 076f5fc39a61c3f3bcbbea8c01c613607b09b995 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
 * Ceph - scalable distributed file system
 *
 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
 *
 * This is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License version 2.1, as published by the Free Software
 * Foundation.  See file COPYING.
 *
 */

#include "DirectMessenger.h"
#include "msg/DispatchStrategy.h"


class DirectConnection : public Connection {
  /// sent messages are dispatched here
  DispatchStrategy *const dispatchers;

  /// the connection that will be attached to outgoing messages, so that replies
  /// can be dispatched back to the sender. the pointer is atomic for
  /// thread-safety between mark_down() and send_message(). no reference is held
  /// on this Connection to avoid cyclical refs. we don't need a reference
  /// because its owning DirectMessenger will mark both connections down (and
  /// clear this pointer) before dropping its own reference
  std::atomic<Connection*> reply_connection{nullptr};

 public:
  DirectConnection(CephContext *cct, DirectMessenger *m,
                   DispatchStrategy *dispatchers)
    : Connection(cct, m),
      dispatchers(dispatchers)
  {}

  /// sets the Connection that will receive replies to outgoing messages
  void set_direct_reply_connection(ConnectionRef conn);

  /// return true if a peer connection exists
  bool is_connected() override;

  /// pass the given message directly to our dispatchers
  int send_message(Message *m) override;

  /// release our pointer to the peer connection. later calls to is_connected()
  /// will return false, and send_message() will fail with -ENOTCONN
  void mark_down() override;

  /// noop - keepalive messages are not needed within a process
  void send_keepalive() override {}

  /// noop - reconnect/recovery semantics are not needed within a process
  void mark_disposable() override {}
};

void DirectConnection::set_direct_reply_connection(ConnectionRef conn)
{
  reply_connection.store(conn.get());
}

bool DirectConnection::is_connected()
{
  // true between calls to set_direct_reply_connection() and mark_down()
  return reply_connection.load() != nullptr;
}

int DirectConnection::send_message(Message *m)
{
  // read reply_connection atomically and take a reference
  ConnectionRef conn = reply_connection.load();
  if (!conn) {
    m->put();
    return -ENOTCONN;
  }
  // attach reply_connection to the Message, so that calls to
  // m->get_connection()->send_message() can be dispatched back to the sender
  m->set_connection(conn);

  dispatchers->ds_dispatch(m);
  return 0;
}

void DirectConnection::mark_down()
{
  Connection *conn = reply_connection.load();
  if (!conn) {
    return; // already marked down
  }
  if (!reply_connection.compare_exchange_weak(conn, nullptr)) {
    return; // lost the race to mark down
  }
  // called only once to avoid loops
  conn->mark_down();
}


static ConnectionRef create_loopback(DirectMessenger *m,
                                     entity_name_t name,
                                     DispatchStrategy *dispatchers)
{
  auto loopback = boost::intrusive_ptr<DirectConnection>(
      new DirectConnection(m->cct, m, dispatchers));
  // loopback replies go to itself
  loopback->set_direct_reply_connection(loopback);
  loopback->set_peer_type(name.type());
  loopback->set_features(CEPH_FEATURES_ALL);
  return loopback;
}

DirectMessenger::DirectMessenger(CephContext *cct, entity_name_t name,
                                 string mname, uint64_t nonce,
                                 DispatchStrategy *dispatchers)
  : SimplePolicyMessenger(cct, name, mname, nonce),
    dispatchers(dispatchers),
    loopback_connection(create_loopback(this, name, dispatchers))
{
  dispatchers->set_messenger(this);
}

DirectMessenger::~DirectMessenger()
{
}

int DirectMessenger::set_direct_peer(DirectMessenger *peer)
{
  if (get_myinst() == peer->get_myinst()) {
    return -EADDRINUSE; // must have a different entity instance
  }
  peer_inst = peer->get_myinst();

  // allocate a Connection that dispatches to the peer messenger
  auto direct_connection = boost::intrusive_ptr<DirectConnection>(
      new DirectConnection(cct, peer, peer->dispatchers.get()));

  direct_connection->set_peer_addr(peer_inst.addr);
  direct_connection->set_peer_type(peer_inst.name.type());
  direct_connection->set_features(CEPH_FEATURES_ALL);

  // if set_direct_peer() was already called on the peer messenger, we can
  // finish by attaching their connections. if not, the later call to
  // peer->set_direct_peer() will attach their connection to ours
  auto connection = peer->get_connection(get_myinst());
  if (connection) {
    auto p = static_cast<DirectConnection*>(connection.get());

    p->set_direct_reply_connection(direct_connection);
    direct_connection->set_direct_reply_connection(p);
  }

  peer_connection = std::move(direct_connection);
  return 0;
}

int DirectMessenger::bind(const entity_addr_t &bind_addr)
{
  if (peer_connection) {
    return -EINVAL; // can't change address after sharing it with the peer
  }
  set_myaddr(bind_addr);
  loopback_connection->set_peer_addr(bind_addr);
  return 0;
}

int DirectMessenger::client_bind(const entity_addr_t &bind_addr)
{
  // same as bind
  return bind(bind_addr);
}

int DirectMessenger::start()
{
  if (!peer_connection) {
    return -EINVAL; // did not connect to a peer
  }
  if (started) {
    return -EINVAL; // already started
  }

  dispatchers->start();
  return SimplePolicyMessenger::start();
}

int DirectMessenger::shutdown()
{
  if (!started) {
    return -EINVAL; // not started
  }

  mark_down_all();
  peer_connection.reset();
  loopback_connection.reset();

  dispatchers->shutdown();
  SimplePolicyMessenger::shutdown();
  sem.Put(); // signal wait()
  return 0;
}

void DirectMessenger::wait()
{
  sem.Get(); // wait on signal from shutdown()
  dispatchers->wait();
}

ConnectionRef DirectMessenger::get_connection(const entity_inst_t& dst)
{
  if (dst == peer_inst) {
    return peer_connection;
  }
  if (dst == get_myinst()) {
    return loopback_connection;
  }
  return nullptr;
}

ConnectionRef DirectMessenger::get_loopback_connection()
{
  return loopback_connection;
}

int DirectMessenger::send_message(Message *m, const entity_inst_t& dst)
{
  auto conn = get_connection(dst);
  if (!conn) {
    m->put();
    return -ENOTCONN;
  }
  return conn->send_message(m);
}

void DirectMessenger::mark_down(const entity_addr_t& addr)
{
  ConnectionRef conn;
  if (addr == peer_inst.addr) {
    conn = peer_connection;
  } else if (addr == get_myaddr_legacy()) {
    conn = loopback_connection;
  }
  if (conn) {
    conn->mark_down();
  }
}

void DirectMessenger::mark_down_all()
{
  if (peer_connection) {
    peer_connection->mark_down();
  }
  loopback_connection->mark_down();
}