summaryrefslogtreecommitdiffstats
path: root/src/crimson/net/SocketMessenger.h
blob: 44c1d3c21375fd27087fcd5d1d417d2d518c1120 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
 * Ceph - scalable distributed file system
 *
 * Copyright (C) 2017 Red Hat, Inc
 *
 * This is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License version 2.1, as published by the Free Software
 * Foundation.  See file COPYING.
 *
 */

#pragma once

#include <map>
#include <set>
#include <vector>
#include <seastar/core/gate.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/shared_future.hh>

#include "crimson/net/chained_dispatchers.h"
#include "Messenger.h"
#include "SocketConnection.h"

namespace crimson::net {

class FixedCPUServerSocket;

class SocketMessenger final : public Messenger {
  const seastar::shard_id master_sid;
  seastar::promise<> shutdown_promise;

  FixedCPUServerSocket* listener = nullptr;
  ChainedDispatchers dispatchers;
  std::map<entity_addr_t, SocketConnectionRef> connections;
  std::set<SocketConnectionRef> accepting_conns;
  std::vector<SocketConnectionRef> closing_conns;
  ceph::net::PolicySet<Throttle> policy_set;
  // Distinguish messengers with meaningful names for debugging
  const std::string logic_name;
  const uint32_t nonce;
  // specifying we haven't learned our addr; set false when we find it.
  bool need_addr = true;
  uint32_t global_seq = 0;
  bool started = false;

  bind_ertr::future<> do_bind(const entity_addrvec_t& addr);

 public:
  SocketMessenger(const entity_name_t& myname,
                  const std::string& logic_name,
                  uint32_t nonce);
  ~SocketMessenger() override { ceph_assert(!listener); }

  seastar::future<> set_myaddrs(const entity_addrvec_t& addr) override;

  // Messenger interfaces are assumed to be called from its own shard, but its
  // behavior should be symmetric when called from any shard.
  bind_ertr::future<> bind(const entity_addrvec_t& addr) override;

  bind_ertr::future<> try_bind(const entity_addrvec_t& addr,
                               uint32_t min_port, uint32_t max_port) override;

  seastar::future<> start(const dispatchers_t& dispatchers) override;

  ConnectionRef connect(const entity_addr_t& peer_addr,
                        const entity_name_t& peer_name) override;
  // can only wait once
  seastar::future<> wait() override {
    assert(seastar::this_shard_id() == master_sid);
    return shutdown_promise.get_future();
  }

  void stop() override {
    dispatchers.clear();
  }

  bool is_started() const override {
    return !dispatchers.empty();
  }

  seastar::future<> shutdown() override;

  void print(ostream& out) const override {
    out << get_myname()
        << "(" << logic_name
        << ") " << get_myaddr();
  }

  SocketPolicy get_policy(entity_type_t peer_type) const override;

  SocketPolicy get_default_policy() const override;

  void set_default_policy(const SocketPolicy& p) override;

  void set_policy(entity_type_t peer_type, const SocketPolicy& p) override;

  void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) override;

 public:
  seastar::future<uint32_t> get_global_seq(uint32_t old=0);
  seastar::future<> learned_addr(const entity_addr_t &peer_addr_for_me,
                                 const SocketConnection& conn);

  SocketConnectionRef lookup_conn(const entity_addr_t& addr);
  void accept_conn(SocketConnectionRef);
  void unaccept_conn(SocketConnectionRef);
  void register_conn(SocketConnectionRef);
  void unregister_conn(SocketConnectionRef);
  void closing_conn(SocketConnectionRef);
  void closed_conn(SocketConnectionRef);
  seastar::shard_id shard_id() const {
    assert(seastar::this_shard_id() == master_sid);
    return master_sid;
  }
};

} // namespace crimson::net