summaryrefslogtreecommitdiffstats
path: root/src/crimson/net/Messenger.h
blob: 9d766cb061e4f74505b3a68063294abfd03cbd69 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
 * Ceph - scalable distributed file system
 *
 * Copyright (C) 2017 Red Hat, Inc
 *
 * This is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License version 2.1, as published by the Free Software
 * Foundation.  See file COPYING.
 *
 */

#pragma once

#include <seastar/core/future.hh>

#include "Fwd.h"
#include "crimson/thread/Throttle.h"
#include "msg/Policy.h"

class AuthAuthorizer;

namespace ceph::net {

using Throttle = ceph::thread::Throttle;
using SocketPolicy = ceph::net::Policy<Throttle>;

class Messenger {
  entity_name_t my_name;
  entity_addrvec_t my_addrs;
  uint32_t global_seq = 0;
  uint32_t crc_flags = 0;

 public:
  Messenger(const entity_name_t& name)
    : my_name(name)
  {}
  virtual ~Messenger() {}

  const entity_name_t& get_myname() const { return my_name; }
  const entity_addrvec_t& get_myaddrs() const { return my_addrs; }
  entity_addr_t get_myaddr() const { return my_addrs.front(); }
  virtual seastar::future<> set_myaddrs(const entity_addrvec_t& addrs) {
    my_addrs = addrs;
    return seastar::now();
  }

  /// bind to the given address
  virtual seastar::future<> bind(const entity_addrvec_t& addr) = 0;

  /// try to bind to the first unused port of given address
  virtual seastar::future<> try_bind(const entity_addrvec_t& addr,
                                     uint32_t min_port, uint32_t max_port) = 0;

  /// start the messenger
  virtual seastar::future<> start(Dispatcher *dispatcher) = 0;

  /// either return an existing connection to the peer,
  /// or a new pending connection
  virtual seastar::future<ConnectionXRef>
  connect(const entity_addr_t& peer_addr,
          const entity_type_t& peer_type) = 0;

  // wait for messenger shutdown
  virtual seastar::future<> wait() = 0;

  /// stop listenening and wait for all connections to close. safe to destruct
  /// after this future becomes available
  virtual seastar::future<> shutdown() = 0;

  uint32_t get_global_seq(uint32_t old=0) {
    if (old > global_seq) {
      global_seq = old;
    }
    return ++global_seq;
  }

  uint32_t get_crc_flags() const {
    return crc_flags;
  }
  void set_crc_data() {
    crc_flags |= MSG_CRC_DATA;
  }
  void set_crc_header() {
    crc_flags |= MSG_CRC_HEADER;
  }

  // get the local messenger shard if it is accessed by another core
  virtual Messenger* get_local_shard() {
    return this;
  }

  virtual void print(ostream& out) const = 0;

  virtual void set_default_policy(const SocketPolicy& p) = 0;

  virtual void set_policy(entity_type_t peer_type, const SocketPolicy& p) = 0;

  virtual void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) = 0;

  static seastar::future<Messenger*>
  create(const entity_name_t& name,
         const std::string& lname,
         const uint64_t nonce,
         const int master_sid=-1);
};

inline ostream& operator<<(ostream& out, const Messenger& msgr) {
  out << "[";
  msgr.print(out);
  out << "]";
  return out;
}

} // namespace ceph::net