summaryrefslogtreecommitdiffstats
path: root/src/crimson/net/Interceptor.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/crimson/net/Interceptor.h')
-rw-r--r--src/crimson/net/Interceptor.h186
1 files changed, 186 insertions, 0 deletions
diff --git a/src/crimson/net/Interceptor.h b/src/crimson/net/Interceptor.h
new file mode 100644
index 000000000..35b74e243
--- /dev/null
+++ b/src/crimson/net/Interceptor.h
@@ -0,0 +1,186 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <variant>
+#include <seastar/core/sharded.hh>
+#include <seastar/core/sleep.hh>
+
+#include "Fwd.h"
+#include "msg/async/frames_v2.h"
+
+namespace crimson::net {
+
+enum class custom_bp_t : uint8_t {
+ BANNER_WRITE = 0,
+ BANNER_READ,
+ BANNER_PAYLOAD_READ,
+ SOCKET_CONNECTING,
+ SOCKET_ACCEPTED
+};
+inline const char* get_bp_name(custom_bp_t bp) {
+ uint8_t index = static_cast<uint8_t>(bp);
+ static const char *const bp_names[] = {"BANNER_WRITE",
+ "BANNER_READ",
+ "BANNER_PAYLOAD_READ",
+ "SOCKET_CONNECTING",
+ "SOCKET_ACCEPTED"};
+ assert(index < std::size(bp_names));
+ return bp_names[index];
+}
+
+enum class bp_type_t {
+ READ = 0,
+ WRITE
+};
+
+enum class bp_action_t {
+ CONTINUE = 0,
+ FAULT,
+ BLOCK,
+ STALL
+};
+
+class socket_blocker {
+ std::optional<seastar::abort_source> p_blocked;
+ std::optional<seastar::abort_source> p_unblocked;
+ const seastar::shard_id primary_sid;
+
+ public:
+ socket_blocker() : primary_sid{seastar::this_shard_id()} {}
+
+ seastar::future<> wait_blocked() {
+ ceph_assert(seastar::this_shard_id() == primary_sid);
+ ceph_assert(!p_blocked);
+ if (p_unblocked) {
+ return seastar::make_ready_future<>();
+ } else {
+ p_blocked = seastar::abort_source();
+ return seastar::sleep_abortable(
+ std::chrono::seconds(10), *p_blocked
+ ).then([] {
+ throw std::runtime_error(
+ "Timeout (10s) in socket_blocker::wait_blocked()");
+ }).handle_exception_type([] (const seastar::sleep_aborted& e) {
+ // wait done!
+ });
+ }
+ }
+
+ seastar::future<> block() {
+ return seastar::smp::submit_to(primary_sid, [this] {
+ if (p_blocked) {
+ p_blocked->request_abort();
+ p_blocked = std::nullopt;
+ }
+ ceph_assert(!p_unblocked);
+ p_unblocked = seastar::abort_source();
+ return seastar::sleep_abortable(
+ std::chrono::seconds(10), *p_unblocked
+ ).then([] {
+ ceph_abort("Timeout (10s) in socket_blocker::block()");
+ }).handle_exception_type([] (const seastar::sleep_aborted& e) {
+ // wait done!
+ });
+ });
+ }
+
+ void unblock() {
+ ceph_assert(seastar::this_shard_id() == primary_sid);
+ ceph_assert(!p_blocked);
+ ceph_assert(p_unblocked);
+ p_unblocked->request_abort();
+ p_unblocked = std::nullopt;
+ }
+};
+
+struct tag_bp_t {
+ ceph::msgr::v2::Tag tag;
+ bp_type_t type;
+ bool operator==(const tag_bp_t& x) const {
+ return tag == x.tag && type == x.type;
+ }
+ bool operator!=(const tag_bp_t& x) const { return !operator==(x); }
+ bool operator<(const tag_bp_t& x) const {
+ return std::tie(tag, type) < std::tie(x.tag, x.type);
+ }
+};
+
+struct Breakpoint {
+ using var_t = std::variant<custom_bp_t, tag_bp_t>;
+ var_t bp;
+ Breakpoint(custom_bp_t bp) : bp(bp) { }
+ Breakpoint(ceph::msgr::v2::Tag tag, bp_type_t type)
+ : bp(tag_bp_t{tag, type}) { }
+ bool operator==(const Breakpoint& x) const { return bp == x.bp; }
+ bool operator!=(const Breakpoint& x) const { return !operator==(x); }
+ bool operator==(const custom_bp_t& x) const { return bp == var_t(x); }
+ bool operator!=(const custom_bp_t& x) const { return !operator==(x); }
+ bool operator==(const tag_bp_t& x) const { return bp == var_t(x); }
+ bool operator!=(const tag_bp_t& x) const { return !operator==(x); }
+ bool operator<(const Breakpoint& x) const { return bp < x.bp; }
+};
+
+struct Interceptor {
+ socket_blocker blocker;
+ virtual ~Interceptor() {}
+ virtual void register_conn(ConnectionRef) = 0;
+ virtual void register_conn_ready(ConnectionRef) = 0;
+ virtual void register_conn_closed(ConnectionRef) = 0;
+ virtual void register_conn_replaced(ConnectionRef) = 0;
+
+ virtual seastar::future<bp_action_t>
+ intercept(Connection&, std::vector<Breakpoint> bp) = 0;
+};
+
+} // namespace crimson::net
+
+template<>
+struct fmt::formatter<crimson::net::bp_action_t> : fmt::formatter<std::string_view> {
+ template <typename FormatContext>
+ auto format(const crimson::net::bp_action_t& action, FormatContext& ctx) const {
+ static const char *const action_names[] = {"CONTINUE",
+ "FAULT",
+ "BLOCK",
+ "STALL"};
+ assert(static_cast<size_t>(action) < std::size(action_names));
+ return formatter<std::string_view>::format(action_names[static_cast<size_t>(action)], ctx);
+ }
+};
+
+template<>
+struct fmt::formatter<crimson::net::Breakpoint> : fmt::formatter<std::string_view> {
+ template <typename FormatContext>
+ auto format(const crimson::net::Breakpoint& bp, FormatContext& ctx) const {
+ if (auto custom_bp = std::get_if<crimson::net::custom_bp_t>(&bp.bp)) {
+ return formatter<std::string_view>::format(crimson::net::get_bp_name(*custom_bp), ctx);
+ }
+ auto tag_bp = std::get<crimson::net::tag_bp_t>(bp.bp);
+ static const char *const tag_names[] = {"NONE",
+ "HELLO",
+ "AUTH_REQUEST",
+ "AUTH_BAD_METHOD",
+ "AUTH_REPLY_MORE",
+ "AUTH_REQUEST_MORE",
+ "AUTH_DONE",
+ "AUTH_SIGNATURE",
+ "CLIENT_IDENT",
+ "SERVER_IDENT",
+ "IDENT_MISSING_FEATURES",
+ "SESSION_RECONNECT",
+ "SESSION_RESET",
+ "SESSION_RETRY",
+ "SESSION_RETRY_GLOBAL",
+ "SESSION_RECONNECT_OK",
+ "WAIT",
+ "MESSAGE",
+ "KEEPALIVE2",
+ "KEEPALIVE2_ACK",
+ "ACK"};
+ assert(static_cast<size_t>(tag_bp.tag) < std::size(tag_names));
+ return fmt::format_to(ctx.out(), "{}_{}",
+ tag_names[static_cast<size_t>(tag_bp.tag)],
+ tag_bp.type == crimson::net::bp_type_t::WRITE ? "WRITE" : "READ");
+ }
+};