summaryrefslogtreecommitdiffstats
path: root/src/crimson/net/chained_dispatchers.cc
blob: 1e4af3baa7dfb52f1e3df0091c3ae4fed79bedba (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#include "crimson/common/log.h"
#include "crimson/net/chained_dispatchers.h"
#include "crimson/net/Connection.h"
#include "crimson/net/Dispatcher.h"
#include "msg/Message.h"

namespace {
  seastar::logger& logger() {
    return crimson::get_logger(ceph_subsys_ms);
  }
}

namespace crimson::net {

seastar::future<>
ChainedDispatchers::ms_dispatch(ConnectionRef conn,
                                MessageRef m) {
  try {
    for (auto& dispatcher : dispatchers) {
      auto dispatched = dispatcher->ms_dispatch(conn, m);
      if (dispatched.has_value()) {
        return std::move(*dispatched
        ).handle_exception([conn] (std::exception_ptr eptr) {
          logger().error("{} got unexpected exception in ms_dispatch() throttling {}",
                         *conn, eptr);
          ceph_abort();
        });
      }
    }
  } catch (...) {
    logger().error("{} got unexpected exception in ms_dispatch() {}",
                   *conn, std::current_exception());
    ceph_abort();
  }
  if (!dispatchers.empty()) {
    logger().error("ms_dispatch unhandled message {}", *m);
  }
  return seastar::now();
}

void
ChainedDispatchers::ms_handle_shard_change(
    ConnectionRef conn,
    seastar::shard_id new_shard,
    bool ac) {
  try {
    for (auto& dispatcher : dispatchers) {
      dispatcher->ms_handle_shard_change(conn, new_shard, ac);
    }
  } catch (...) {
    logger().error("{} got unexpected exception in ms_handle_shard_change() {}",
                   *conn, std::current_exception());
    ceph_abort();
  }
}

void
ChainedDispatchers::ms_handle_accept(
    ConnectionRef conn,
    seastar::shard_id prv_shard,
    bool is_replace) {
  try {
    for (auto& dispatcher : dispatchers) {
      dispatcher->ms_handle_accept(conn, prv_shard, is_replace);
    }
  } catch (...) {
    logger().error("{} got unexpected exception in ms_handle_accept() {}",
                   *conn, std::current_exception());
    ceph_abort();
  }
}

void
ChainedDispatchers::ms_handle_connect(
    ConnectionRef conn,
    seastar::shard_id prv_shard) {
  try {
    for(auto& dispatcher : dispatchers) {
      dispatcher->ms_handle_connect(conn, prv_shard);
    }
  } catch (...) {
    logger().error("{} got unexpected exception in ms_handle_connect() {}",
                   *conn, std::current_exception());
    ceph_abort();
  }
}

void
ChainedDispatchers::ms_handle_reset(ConnectionRef conn, bool is_replace) {
  try {
    for (auto& dispatcher : dispatchers) {
      dispatcher->ms_handle_reset(conn, is_replace);
    }
  } catch (...) {
    logger().error("{} got unexpected exception in ms_handle_reset() {}",
                   *conn, std::current_exception());
    ceph_abort();
  }
}

void
ChainedDispatchers::ms_handle_remote_reset(ConnectionRef conn) {
  try {
    for (auto& dispatcher : dispatchers) {
      dispatcher->ms_handle_remote_reset(conn);
    }
  } catch (...) {
    logger().error("{} got unexpected exception in ms_handle_remote_reset() {}",
                   *conn, std::current_exception());
    ceph_abort();
  }
}

}