diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/crimson/net/SocketMessenger.cc | |
parent | Initial commit. (diff) | |
download | ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/crimson/net/SocketMessenger.cc | 283 |
1 files changed, 283 insertions, 0 deletions
diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc new file mode 100644 index 00000000..46a38ff7 --- /dev/null +++ b/src/crimson/net/SocketMessenger.cc @@ -0,0 +1,283 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "SocketMessenger.h" + +#include <tuple> +#include <boost/functional/hash.hpp> + +#include "auth/Auth.h" +#include "Errors.h" +#include "Dispatcher.h" +#include "Socket.h" + +using namespace ceph::net; + +namespace { + seastar::logger& logger() { + return ceph::get_logger(ceph_subsys_ms); + } +} + +SocketMessenger::SocketMessenger(const entity_name_t& myname, + const std::string& logic_name, + uint32_t nonce, + int master_sid) + : Messenger{myname}, + master_sid{master_sid}, + sid{seastar::engine().cpu_id()}, + logic_name{logic_name}, + nonce{nonce} +{} + +seastar::future<> SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs) +{ + auto my_addrs = addrs; + for (auto& addr : my_addrs.v) { + addr.nonce = nonce; + } + return container().invoke_on_all([my_addrs](auto& msgr) { + return msgr.Messenger::set_myaddrs(my_addrs); + }); +} + +seastar::future<> SocketMessenger::bind(const entity_addrvec_t& addrs) +{ + ceph_assert(addrs.legacy_addr().get_family() == AF_INET); + auto my_addrs = addrs; + for (auto& addr : my_addrs.v) { + addr.nonce = nonce; + } + logger().info("listening on {}", my_addrs.legacy_addr().in4_addr()); + return container().invoke_on_all([my_addrs](auto& msgr) { + msgr.do_bind(my_addrs); + }); +} + +seastar::future<> +SocketMessenger::try_bind(const entity_addrvec_t& addrs, + uint32_t min_port, uint32_t max_port) +{ + auto addr = addrs.legacy_or_front_addr(); + if (addr.get_port() != 0) { + return bind(addrs); + } + ceph_assert(min_port <= max_port); + return seastar::do_with(uint32_t(min_port), + [this, max_port, addr] (auto& port) { + return seastar::repeat([this, max_port, addr, &port] { + auto to_bind = addr; + to_bind.set_port(port); + return bind(entity_addrvec_t{to_bind}) + .then([this] { + logger().info("{}: try_bind: done", *this); + return stop_t::yes; + }).handle_exception_type([this, max_port, &port] (const std::system_error& e) { + logger().debug("{}: try_bind: {} already used", *this, port); + if (port == max_port) { + throw e; + } + ++port; + return stop_t::no; + }); + }); + }); +} + +seastar::future<> SocketMessenger::start(Dispatcher *disp) { + return container().invoke_on_all([disp](auto& msgr) { + return msgr.do_start(disp->get_local_shard()); + }); +} + +seastar::future<ceph::net::ConnectionXRef> +SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type) +{ + auto shard = locate_shard(peer_addr); + return container().invoke_on(shard, [peer_addr, peer_type](auto& msgr) { + return msgr.do_connect(peer_addr, peer_type); + }).then([](seastar::foreign_ptr<ConnectionRef>&& conn) { + return seastar::make_lw_shared<seastar::foreign_ptr<ConnectionRef>>(std::move(conn)); + }); +} + +seastar::future<> SocketMessenger::shutdown() +{ + return container().invoke_on_all([](auto& msgr) { + return msgr.do_shutdown(); + }).finally([this] { + return container().invoke_on_all([](auto& msgr) { + msgr.shutdown_promise.set_value(); + }); + }); +} + +void SocketMessenger::do_bind(const entity_addrvec_t& addrs) +{ + Messenger::set_myaddrs(addrs); + + // TODO: v2: listen on multiple addresses + seastar::socket_address address(addrs.legacy_addr().in4_addr()); + seastar::listen_options lo; + lo.reuse_address = true; + listener = seastar::listen(address, lo); +} + +seastar::future<> SocketMessenger::do_start(Dispatcher *disp) +{ + dispatcher = disp; + + // start listening if bind() was called + if (listener) { + seastar::keep_doing([this] { + return listener->accept() + .then([this] (seastar::connected_socket socket, + seastar::socket_address paddr) { + // allocate the connection + entity_addr_t peer_addr; + peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); + auto shard = locate_shard(peer_addr); +#warning fixme + // we currently do dangerous i/o from a Connection core, different from the Socket core. + auto sock = seastar::make_foreign(std::make_unique<Socket>(std::move(socket))); + // don't wait before accepting another + container().invoke_on(shard, [sock = std::move(sock), peer_addr, this](auto& msgr) mutable { + SocketConnectionRef conn = seastar::make_shared<SocketConnection>(msgr, *msgr.dispatcher); + conn->start_accept(std::move(sock), peer_addr); + }); + }); + }).handle_exception_type([this] (const std::system_error& e) { + // stop gracefully on connection_aborted + if (e.code() != error::connection_aborted) { + logger().error("{} unexpected error during accept: {}", *this, e); + } + }); + } + + return seastar::now(); +} + +seastar::foreign_ptr<ceph::net::ConnectionRef> +SocketMessenger::do_connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type) +{ + if (auto found = lookup_conn(peer_addr); found) { + return seastar::make_foreign(found->shared_from_this()); + } + SocketConnectionRef conn = seastar::make_shared<SocketConnection>(*this, *dispatcher); + conn->start_connect(peer_addr, peer_type); + return seastar::make_foreign(conn->shared_from_this()); +} + +seastar::future<> SocketMessenger::do_shutdown() +{ + if (listener) { + listener->abort_accept(); + } + // close all connections + return seastar::parallel_for_each(accepting_conns, [] (auto conn) { + return conn->close(); + }).then([this] { + ceph_assert(accepting_conns.empty()); + return seastar::parallel_for_each(connections, [] (auto conn) { + return conn.second->close(); + }); + }).finally([this] { + ceph_assert(connections.empty()); + }); +} + +seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) +{ + if (!get_myaddr().is_blank_ip()) { + // already learned or binded + return seastar::now(); + } + + // Only learn IP address if blank. + entity_addr_t addr = get_myaddr(); + addr.u = peer_addr_for_me.u; + addr.set_type(peer_addr_for_me.get_type()); + addr.set_port(get_myaddr().get_port()); + return set_myaddrs(entity_addrvec_t{addr}); +} + +void SocketMessenger::set_default_policy(const SocketPolicy& p) +{ + policy_set.set_default(p); +} + +void SocketMessenger::set_policy(entity_type_t peer_type, + const SocketPolicy& p) +{ + policy_set.set(peer_type, p); +} + +void SocketMessenger::set_policy_throttler(entity_type_t peer_type, + Throttle* throttle) +{ + // only byte throttler is used in OSD + policy_set.set_throttlers(peer_type, throttle, nullptr); +} + +seastar::shard_id SocketMessenger::locate_shard(const entity_addr_t& addr) +{ + ceph_assert(addr.get_family() == AF_INET); + if (master_sid >= 0) { + return master_sid; + } + std::size_t seed = 0; + boost::hash_combine(seed, addr.u.sin.sin_addr.s_addr); + //boost::hash_combine(seed, addr.u.sin.sin_port); + //boost::hash_combine(seed, addr.nonce); + return seed % seastar::smp::count; +} + +ceph::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr) +{ + if (auto found = connections.find(addr); + found != connections.end()) { + return found->second; + } else { + return nullptr; + } +} + +void SocketMessenger::accept_conn(SocketConnectionRef conn) +{ + accepting_conns.insert(conn); +} + +void SocketMessenger::unaccept_conn(SocketConnectionRef conn) +{ + accepting_conns.erase(conn); +} + +void SocketMessenger::register_conn(SocketConnectionRef conn) +{ + if (master_sid >= 0) { + ceph_assert(static_cast<int>(sid) == master_sid); + } + auto [i, added] = connections.emplace(conn->get_peer_addr(), conn); + std::ignore = i; + ceph_assert(added); +} + +void SocketMessenger::unregister_conn(SocketConnectionRef conn) +{ + ceph_assert(conn); + auto found = connections.find(conn->get_peer_addr()); + ceph_assert(found != connections.end()); + ceph_assert(found->second == conn); + connections.erase(found); +} |