summaryrefslogtreecommitdiffstats
path: root/src/crimson/mon/MonClient.h
blob: ef9bcb690561eb32925facc22874fae4356735ae (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-

#pragma once

#include <memory>

#include <seastar/core/future.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/lowres_clock.hh>
#include <seastar/core/timer.hh>

#include "auth/KeyRing.h"

#include "crimson/net/Dispatcher.h"
#include "crimson/net/Fwd.h"

#include "mon/MonMap.h"

#include "mon/MonSub.h"

template<typename Message> using Ref = boost::intrusive_ptr<Message>;
namespace ceph::net {
  class Messenger;
}

class AuthMethodList;
class MAuthReply;
struct MMonMap;
struct MMonSubscribeAck;
struct MMonGetVersionReply;
struct MMonCommandAck;
struct MLogAck;
struct MConfig;

namespace ceph::mon {

class Connection;

class Client : public ceph::net::Dispatcher {
  EntityName entity_name;
  KeyRing keyring;
  std::unique_ptr<AuthMethodList> auth_methods;
  const uint32_t want_keys;

  MonMap monmap;
  seastar::promise<MessageRef> reply;
  std::unique_ptr<Connection> active_con;
  std::vector<Connection> pending_conns;
  seastar::timer<seastar::lowres_clock> timer;
  seastar::gate tick_gate;

  ceph::net::Messenger& msgr;

  // commands
  using get_version_t = seastar::future<version_t, version_t>;

  ceph_tid_t last_version_req_id = 0;
  std::map<ceph_tid_t, typename get_version_t::promise_type> version_reqs;

  ceph_tid_t last_mon_command_id = 0;
  using command_result_t =
    seastar::future<std::int32_t, string, ceph::bufferlist>;
  std::map<ceph_tid_t, typename command_result_t::promise_type> mon_commands;

  MonSub sub;

public:
  Client(ceph::net::Messenger& messenger);
  Client(Client&&);
  ~Client();
  seastar::future<> start();
  seastar::future<> stop();

  const uuid_d& get_fsid() const {
    return monmap.fsid;
  }
  get_version_t get_version(const std::string& map);
  command_result_t run_command(const std::vector<std::string>& cmd,
			       const bufferlist& bl);
  seastar::future<> send_message(MessageRef);
  bool sub_want(const std::string& what, version_t start, unsigned flags);
  void sub_got(const std::string& what, version_t have);
  void sub_unwant(const std::string& what);
  bool sub_want_increment(const std::string& what, version_t start, unsigned flags);
  seastar::future<> renew_subs();

private:
  void tick();

  seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn,
				MessageRef m) override;
  seastar::future<> ms_handle_reset(ceph::net::ConnectionRef conn) override;

  seastar::future<> handle_monmap(ceph::net::ConnectionRef conn,
				  Ref<MMonMap> m);
  seastar::future<> handle_auth_reply(ceph::net::ConnectionRef conn,
				      Ref<MAuthReply> m);
  seastar::future<> handle_subscribe_ack(Ref<MMonSubscribeAck> m);
  seastar::future<> handle_get_version_reply(Ref<MMonGetVersionReply> m);
  seastar::future<> handle_mon_command_ack(Ref<MMonCommandAck> m);
  seastar::future<> handle_log_ack(Ref<MLogAck> m);
  seastar::future<> handle_config(Ref<MConfig> m);

private:
  seastar::future<> load_keyring();
  seastar::future<> authenticate();

  bool is_hunting() const;
  seastar::future<> reopen_session(int rank);
  std::vector<unsigned> get_random_mons(unsigned n) const;
  seastar::future<> _add_conn(unsigned rank, uint64_t global_id);
};

} // namespace ceph::mon