1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2017 Red Hat, Inc
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#pragma once
#include <map>
#include <optional>
#include <set>
#include <seastar/core/gate.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/sharded.hh>
#include "Messenger.h"
#include "SocketConnection.h"
namespace ceph::net {
class SocketMessenger final : public Messenger, public seastar::peering_sharded_service<SocketMessenger> {
const int master_sid;
const seastar::shard_id sid;
seastar::promise<> shutdown_promise;
std::optional<seastar::server_socket> listener;
Dispatcher *dispatcher = nullptr;
std::map<entity_addr_t, SocketConnectionRef> connections;
std::set<SocketConnectionRef> accepting_conns;
ceph::net::PolicySet<Throttle> policy_set;
// Distinguish messengers with meaningful names for debugging
const std::string logic_name;
const uint32_t nonce;
seastar::future<> accept(seastar::connected_socket socket,
seastar::socket_address paddr);
void do_bind(const entity_addrvec_t& addr);
seastar::future<> do_start(Dispatcher *disp);
seastar::foreign_ptr<ConnectionRef> do_connect(const entity_addr_t& peer_addr,
const entity_type_t& peer_type);
seastar::future<> do_shutdown();
// conn sharding options:
// 0. Compatible (master_sid >= 0): place all connections to one master shard
// 1. Simplest (master_sid < 0): sharded by ip only
// 2. Balanced (not implemented): sharded by ip + port + nonce,
// but, need to move SocketConnection between cores.
seastar::shard_id locate_shard(const entity_addr_t& addr);
public:
SocketMessenger(const entity_name_t& myname,
const std::string& logic_name,
uint32_t nonce,
int master_sid);
seastar::future<> set_myaddrs(const entity_addrvec_t& addr) override;
// Messenger interfaces are assumed to be called from its own shard, but its
// behavior should be symmetric when called from any shard.
seastar::future<> bind(const entity_addrvec_t& addr) override;
seastar::future<> try_bind(const entity_addrvec_t& addr,
uint32_t min_port, uint32_t max_port) override;
seastar::future<> start(Dispatcher *dispatcher) override;
seastar::future<ConnectionXRef> connect(const entity_addr_t& peer_addr,
const entity_type_t& peer_type) override;
// can only wait once
seastar::future<> wait() override {
return shutdown_promise.get_future();
}
seastar::future<> shutdown() override;
Messenger* get_local_shard() override {
return &container().local();
}
void print(ostream& out) const override {
out << get_myname()
<< "(" << logic_name
<< ") " << get_myaddr();
}
void set_default_policy(const SocketPolicy& p) override;
void set_policy(entity_type_t peer_type, const SocketPolicy& p) override;
void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) override;
public:
seastar::future<> learned_addr(const entity_addr_t &peer_addr_for_me);
SocketConnectionRef lookup_conn(const entity_addr_t& addr);
void accept_conn(SocketConnectionRef);
void unaccept_conn(SocketConnectionRef);
void register_conn(SocketConnectionRef);
void unregister_conn(SocketConnectionRef);
// required by sharded<>
seastar::future<> stop() {
return seastar::make_ready_future<>();
}
seastar::shard_id shard_id() const {
return sid;
}
};
} // namespace ceph::net
|