From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/msg/async/AsyncMessenger.cc | 963 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 963 insertions(+) create mode 100644 src/msg/async/AsyncMessenger.cc (limited to 'src/msg/async/AsyncMessenger.cc') diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc new file mode 100644 index 000000000..3971e91ff --- /dev/null +++ b/src/msg/async/AsyncMessenger.cc @@ -0,0 +1,963 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 UnitedStack + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "acconfig.h" + +#include +#include + +#include "AsyncMessenger.h" + +#include "common/config.h" +#include "common/Timer.h" +#include "common/errno.h" + +#include "messages/MOSDOp.h" +#include "messages/MOSDOpReply.h" +#include "common/EventTrace.h" + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix _prefix(_dout, this) +static std::ostream& _prefix(std::ostream *_dout, AsyncMessenger *m) { + return *_dout << "-- " << m->get_myaddrs() << " "; +} + +static std::ostream& _prefix(std::ostream *_dout, Processor *p) { + return *_dout << " Processor -- "; +} + + +/******************* + * Processor + */ + +class Processor::C_processor_accept : public EventCallback { + Processor *pro; + + public: + explicit C_processor_accept(Processor *p): pro(p) {} + void do_request(uint64_t id) override { + pro->accept(); + } +}; + +Processor::Processor(AsyncMessenger *r, Worker *w, CephContext *c) + : msgr(r), net(c), worker(w), + listen_handler(new C_processor_accept(this)) {} + +int Processor::bind(const entity_addrvec_t &bind_addrs, + const std::set& avoid_ports, + entity_addrvec_t* bound_addrs) +{ + const auto& conf = msgr->cct->_conf; + // bind to socket(s) + ldout(msgr->cct, 10) << __func__ << " " << bind_addrs << dendl; + + SocketOptions opts; + opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay; + opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf; + + listen_sockets.resize(bind_addrs.v.size()); + *bound_addrs = bind_addrs; + + for (unsigned k = 0; k < bind_addrs.v.size(); ++k) { + auto& listen_addr = bound_addrs->v[k]; + + /* bind to port */ + int r = -1; + + for (int i = 0; i < conf->ms_bind_retry_count; i++) { + if (i > 0) { + lderr(msgr->cct) << __func__ << " was unable to bind. Trying again in " + << conf->ms_bind_retry_delay << " seconds " << dendl; + sleep(conf->ms_bind_retry_delay); + } + + if (listen_addr.get_port()) { + worker->center.submit_to( + worker->center.get_id(), + [this, k, &listen_addr, &opts, &r]() { + r = worker->listen(listen_addr, k, opts, &listen_sockets[k]); + }, false); + if (r < 0) { + lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr + << ": " << cpp_strerror(r) << dendl; + continue; + } + } else { + // try a range of ports + for (int port = msgr->cct->_conf->ms_bind_port_min; + port <= msgr->cct->_conf->ms_bind_port_max; + port++) { + if (avoid_ports.count(port)) + continue; + + listen_addr.set_port(port); + worker->center.submit_to( + worker->center.get_id(), + [this, k, &listen_addr, &opts, &r]() { + r = worker->listen(listen_addr, k, opts, &listen_sockets[k]); + }, false); + if (r == 0) + break; + } + if (r < 0) { + lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr + << " on any port in range " + << msgr->cct->_conf->ms_bind_port_min + << "-" << msgr->cct->_conf->ms_bind_port_max << ": " + << cpp_strerror(r) << dendl; + listen_addr.set_port(0); // Clear port before retry, otherwise we shall fail again. + continue; + } + ldout(msgr->cct, 10) << __func__ << " bound on random port " + << listen_addr << dendl; + } + if (r == 0) { + break; + } + } + + // It seems that binding completely failed, return with that exit status + if (r < 0) { + lderr(msgr->cct) << __func__ << " was unable to bind after " + << conf->ms_bind_retry_count + << " attempts: " << cpp_strerror(r) << dendl; + for (unsigned j = 0; j < k; ++j) { + // clean up previous bind + listen_sockets[j].abort_accept(); + } + return r; + } + } + + ldout(msgr->cct, 10) << __func__ << " bound to " << *bound_addrs << dendl; + return 0; +} + +void Processor::start() +{ + ldout(msgr->cct, 1) << __func__ << dendl; + + // start thread + worker->center.submit_to(worker->center.get_id(), [this]() { + for (auto& listen_socket : listen_sockets) { + if (listen_socket) { + if (listen_socket.fd() == -1) { + ldout(msgr->cct, 1) << __func__ + << " Error: processor restart after listen_socket.fd closed. " + << this << dendl; + return; + } + worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE, + listen_handler); } + } + }, false); +} + +void Processor::accept() +{ + SocketOptions opts; + opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay; + opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf; + opts.priority = msgr->get_socket_priority(); + + for (auto& listen_socket : listen_sockets) { + ldout(msgr->cct, 10) << __func__ << " listen_fd=" << listen_socket.fd() + << dendl; + unsigned accept_error_num = 0; + + while (true) { + entity_addr_t addr; + ConnectedSocket cli_socket; + Worker *w = worker; + if (!msgr->get_stack()->support_local_listen_table()) + w = msgr->get_stack()->get_worker(); + else + ++w->references; + int r = listen_socket.accept(&cli_socket, opts, &addr, w); + if (r == 0) { + ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " + << cli_socket.fd() << dendl; + + msgr->add_accept( + w, std::move(cli_socket), + msgr->get_myaddrs().v[listen_socket.get_addr_slot()], + addr); + accept_error_num = 0; + continue; + } else { + --w->references; + if (r == -EINTR) { + continue; + } else if (r == -EAGAIN) { + break; + } else if (r == -EMFILE || r == -ENFILE) { + lderr(msgr->cct) << __func__ << " open file descriptions limit reached sd = " << listen_socket.fd() + << " errno " << r << " " << cpp_strerror(r) << dendl; + if (++accept_error_num > msgr->cct->_conf->ms_max_accept_failures) { + lderr(msgr->cct) << "Proccessor accept has encountered enough error numbers, just do ceph_abort()." << dendl; + ceph_abort(); + } + continue; + } else if (r == -ECONNABORTED) { + ldout(msgr->cct, 0) << __func__ << " it was closed because of rst arrived sd = " << listen_socket.fd() + << " errno " << r << " " << cpp_strerror(r) << dendl; + continue; + } else { + lderr(msgr->cct) << __func__ << " no incoming connection?" + << " errno " << r << " " << cpp_strerror(r) << dendl; + if (++accept_error_num > msgr->cct->_conf->ms_max_accept_failures) { + lderr(msgr->cct) << "Proccessor accept has encountered enough error numbers, just do ceph_abort()." << dendl; + ceph_abort(); + } + continue; + } + } + } + } +} + +void Processor::stop() +{ + ldout(msgr->cct,10) << __func__ << dendl; + + worker->center.submit_to(worker->center.get_id(), [this]() { + for (auto& listen_socket : listen_sockets) { + if (listen_socket) { + worker->center.delete_file_event(listen_socket.fd(), EVENT_READABLE); + listen_socket.abort_accept(); + } + } + }, false); +} + + +struct StackSingleton { + CephContext *cct; + std::shared_ptr stack; + + explicit StackSingleton(CephContext *c): cct(c) {} + void ready(std::string &type) { + if (!stack) + stack = NetworkStack::create(cct, type); + } + ~StackSingleton() { + stack->stop(); + } +}; + + +class C_handle_reap : public EventCallback { + AsyncMessenger *msgr; + + public: + explicit C_handle_reap(AsyncMessenger *m): msgr(m) {} + void do_request(uint64_t id) override { + // judge whether is a time event + msgr->reap_dead(); + } +}; + +/******************* + * AsyncMessenger + */ + +AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, + const std::string &type, std::string mname, uint64_t _nonce) + : SimplePolicyMessenger(cct, name), + dispatch_queue(cct, this, mname), + nonce(_nonce) +{ + std::string transport_type = "posix"; + if (type.find("rdma") != std::string::npos) + transport_type = "rdma"; + else if (type.find("dpdk") != std::string::npos) + transport_type = "dpdk"; + + auto single = &cct->lookup_or_create_singleton_object( + "AsyncMessenger::NetworkStack::" + transport_type, true, cct); + single->ready(transport_type); + stack = single->stack.get(); + stack->start(); + local_worker = stack->get_worker(); + local_connection = ceph::make_ref(cct, this, &dispatch_queue, + local_worker, true, true); + init_local_connection(); + reap_handler = new C_handle_reap(this); + unsigned processor_num = 1; + if (stack->support_local_listen_table()) + processor_num = stack->get_num_worker(); + for (unsigned i = 0; i < processor_num; ++i) + processors.push_back(new Processor(this, stack->get_worker(i), cct)); +} + +/** + * Destroy the AsyncMessenger. Pretty simple since all the work is done + * elsewhere. + */ +AsyncMessenger::~AsyncMessenger() +{ + delete reap_handler; + ceph_assert(!did_bind); // either we didn't bind or we shut down the Processor + for (auto &&p : processors) + delete p; +} + +void AsyncMessenger::ready() +{ + ldout(cct,10) << __func__ << " " << get_myaddrs() << dendl; + + stack->ready(); + if (pending_bind) { + int err = bindv(pending_bind_addrs, saved_public_addrs); + if (err) { + lderr(cct) << __func__ << " postponed bind failed" << dendl; + ceph_abort(); + } + } + + std::lock_guard l{lock}; + for (auto &&p : processors) + p->start(); + dispatch_queue.start(); +} + +int AsyncMessenger::shutdown() +{ + ldout(cct,10) << __func__ << " " << get_myaddrs() << dendl; + + // done! clean up. + for (auto &&p : processors) + p->stop(); + mark_down_all(); + // break ref cycles on the loopback connection + local_connection->clear_priv(); + local_connection->mark_down(); + did_bind = false; + lock.lock(); + stop_cond.notify_all(); + stopped = true; + lock.unlock(); + stack->drain(); + return 0; +} + +int AsyncMessenger::bind(const entity_addr_t &bind_addr, + std::optional public_addrs) +{ + ldout(cct, 10) << __func__ << " " << bind_addr + << " public " << public_addrs << dendl; + // old bind() can take entity_addr_t(). new bindv() can take a + // 0.0.0.0-like address but needs type and family to be set. + auto a = bind_addr; + if (a == entity_addr_t()) { + a.set_type(entity_addr_t::TYPE_LEGACY); + if (cct->_conf->ms_bind_ipv6) { + a.set_family(AF_INET6); + } else { + a.set_family(AF_INET); + } + } + return bindv(entity_addrvec_t(a), public_addrs); +} + +int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs, + std::optional public_addrs) +{ + lock.lock(); + + if (!pending_bind && started) { + ldout(cct,10) << __func__ << " already started" << dendl; + lock.unlock(); + return -1; + } + + ldout(cct, 10) << __func__ << " " << bind_addrs + << " public " << public_addrs << dendl; + if (public_addrs && bind_addrs != public_addrs) { + // for the sake of rebind() and the is-not-ready case let's + // store public_addrs. there is no point in that if public + // addrs are indifferent from bind_addrs. + saved_public_addrs = std::move(public_addrs); + } + + if (!stack->is_ready()) { + ldout(cct, 10) << __func__ << " Network Stack is not ready for bind yet - postponed" << dendl; + pending_bind_addrs = bind_addrs; + pending_bind = true; + lock.unlock(); + return 0; + } + + lock.unlock(); + + // bind to a socket + std::set avoid_ports; + entity_addrvec_t bound_addrs; + unsigned i = 0; + for (auto &&p : processors) { + int r = p->bind(bind_addrs, avoid_ports, &bound_addrs); + if (r) { + // Note: this is related to local tcp listen table problem. + // Posix(default kernel implementation) backend shares listen table + // in the kernel, so all threads can use the same listen table naturally + // and only one thread need to bind. But other backends(like dpdk) uses local + // listen table, we need to bind/listen tcp port for each worker. So if the + // first worker failed to bind, it could be think the normal error then handle + // it, like port is used case. But if the first worker successfully to bind + // but the second worker failed, it's not expected and we need to assert + // here + ceph_assert(i == 0); + return r; + } + ++i; + } + _finish_bind(bind_addrs, bound_addrs); + return 0; +} + +int AsyncMessenger::rebind(const std::set& avoid_ports) +{ + ldout(cct,1) << __func__ << " rebind avoid " << avoid_ports << dendl; + ceph_assert(did_bind); + + for (auto &&p : processors) + p->stop(); + mark_down_all(); + + // adjust the nonce; we want our entity_addr_t to be truly unique. + nonce += 1000000; + ldout(cct, 10) << __func__ << " new nonce " << nonce + << " and addr " << get_myaddrs() << dendl; + + entity_addrvec_t bound_addrs; + entity_addrvec_t bind_addrs = get_myaddrs(); + std::set new_avoid(avoid_ports); + for (auto& a : bind_addrs.v) { + new_avoid.insert(a.get_port()); + a.set_port(0); + } + ldout(cct, 10) << __func__ << " will try " << bind_addrs + << " and avoid ports " << new_avoid << dendl; + unsigned i = 0; + for (auto &&p : processors) { + int r = p->bind(bind_addrs, avoid_ports, &bound_addrs); + if (r) { + ceph_assert(i == 0); + return r; + } + ++i; + } + _finish_bind(bind_addrs, bound_addrs); + for (auto &&p : processors) { + p->start(); + } + return 0; +} + +int AsyncMessenger::client_bind(const entity_addr_t &bind_addr) +{ + if (!cct->_conf->ms_bind_before_connect) + return 0; + std::lock_guard l{lock}; + if (did_bind) { + return 0; + } + if (started) { + ldout(cct, 10) << __func__ << " already started" << dendl; + return -1; + } + ldout(cct, 10) << __func__ << " " << bind_addr << dendl; + + set_myaddrs(entity_addrvec_t(bind_addr)); + return 0; +} + +void AsyncMessenger::_finish_bind(const entity_addrvec_t& bind_addrs, + const entity_addrvec_t& listen_addrs) +{ + set_myaddrs(bind_addrs); + for (auto& a : bind_addrs.v) { + if (!a.is_blank_ip()) { + learned_addr(a); + } + } + + if (get_myaddrs().front().get_port() == 0) { + set_myaddrs(listen_addrs); + } + + entity_addrvec_t newaddrs; + if (saved_public_addrs) { + newaddrs = *saved_public_addrs; + for (auto& public_addr : newaddrs.v) { + public_addr.set_nonce(nonce); + if (public_addr.is_ip() && public_addr.get_port() == 0) { + // port is not explicitly set. This is fine as it can be figured + // out by msgr. For instance, the low-level `Processor::bind` + // scans for free ports in a range controlled by ms_bind_port_min + // and ms_bind_port_max. + for (const auto& a : my_addrs->v) { + if (public_addr.get_type() == a.get_type() && a.is_ip()) { + public_addr.set_port(a.get_port()); + } + } + } + } + } else { + newaddrs = *my_addrs; + for (auto& a : newaddrs.v) { + a.set_nonce(nonce); + } + } + set_myaddrs(newaddrs); + + init_local_connection(); + + ldout(cct,1) << __func__ << " bind my_addrs is " << get_myaddrs() << dendl; + did_bind = true; +} + +int AsyncMessenger::client_reset() +{ + mark_down_all(); + + std::scoped_lock l{lock}; + // adjust the nonce; we want our entity_addr_t to be truly unique. + nonce += 1000000; + ldout(cct, 10) << __func__ << " new nonce " << nonce << dendl; + + entity_addrvec_t newaddrs = *my_addrs; + for (auto& a : newaddrs.v) { + a.set_nonce(nonce); + } + set_myaddrs(newaddrs); + _init_local_connection(); + return 0; +} + +int AsyncMessenger::start() +{ + std::scoped_lock l{lock}; + ldout(cct,1) << __func__ << " start" << dendl; + + // register at least one entity, first! + ceph_assert(my_name.type() >= 0); + + ceph_assert(!started); + started = true; + stopped = false; + + if (!did_bind) { + entity_addrvec_t newaddrs = *my_addrs; + for (auto& a : newaddrs.v) { + a.nonce = nonce; + } + set_myaddrs(newaddrs); + _init_local_connection(); + } + + return 0; +} + +void AsyncMessenger::wait() +{ + { + std::unique_lock locker{lock}; + if (!started) { + return; + } + if (!stopped) + stop_cond.wait(locker); + } + dispatch_queue.shutdown(); + if (dispatch_queue.is_started()) { + ldout(cct, 10) << __func__ << ": waiting for dispatch queue" << dendl; + dispatch_queue.wait(); + dispatch_queue.discard_local(); + ldout(cct, 10) << __func__ << ": dispatch queue is stopped" << dendl; + } + + // close all connections + shutdown_connections(false); + stack->drain(); + + ldout(cct, 10) << __func__ << ": done." << dendl; + ldout(cct, 1) << __func__ << " complete." << dendl; + started = false; +} + +void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, + const entity_addr_t &listen_addr, + const entity_addr_t &peer_addr) +{ + std::lock_guard l{lock}; + auto conn = ceph::make_ref(cct, this, &dispatch_queue, w, + listen_addr.is_msgr2(), false); + conn->accept(std::move(cli_socket), listen_addr, peer_addr); + accepting_conns.insert(conn); +} + +AsyncConnectionRef AsyncMessenger::create_connect( + const entity_addrvec_t& addrs, int type, bool anon) +{ + ceph_assert(ceph_mutex_is_locked(lock)); + + ldout(cct, 10) << __func__ << " " << addrs + << ", creating connection and registering" << dendl; + + // here is where we decide which of the addrs to connect to. always prefer + // the first one, if we support it. + entity_addr_t target; + for (auto& a : addrs.v) { + if (!a.is_msgr2() && !a.is_legacy()) { + continue; + } + // FIXME: for ipv4 vs ipv6, check whether local host can handle ipv6 before + // trying it? for now, just pick whichever is listed first. + target = a; + break; + } + + // create connection + Worker *w = stack->get_worker(); + auto conn = ceph::make_ref(cct, this, &dispatch_queue, w, + target.is_msgr2(), false); + conn->anon = anon; + conn->connect(addrs, type, target); + if (anon) { + anon_conns.insert(conn); + } else { + ceph_assert(!conns.count(addrs)); + ldout(cct, 10) << __func__ << " " << conn << " " << addrs << " " + << *conn->peer_addrs << dendl; + conns[addrs] = conn; + } + w->get_perf_counter()->inc(l_msgr_active_connections); + + return conn; +} + + +ConnectionRef AsyncMessenger::get_loopback_connection() +{ + return local_connection; +} + +bool AsyncMessenger::should_use_msgr2() +{ + // if we are bound to v1 only, and we are connecting to a v2 peer, + // we cannot use the peer's v2 address. otherwise the connection + // is assymetrical, because they would have to use v1 to connect + // to us, and we would use v2, and connection race detection etc + // would totally break down (among other things). or, the other + // end will be confused that we advertise ourselve with a v1 + // address only (that we bound to) but connected with protocol v2. + return !did_bind || get_myaddrs().has_msgr2(); +} + +entity_addrvec_t AsyncMessenger::_filter_addrs(const entity_addrvec_t& addrs) +{ + if (!should_use_msgr2()) { + ldout(cct, 10) << __func__ << " " << addrs << " limiting to v1 ()" << dendl; + entity_addrvec_t r; + for (auto& i : addrs.v) { + if (i.is_msgr2()) { + continue; + } + r.v.push_back(i); + } + return r; + } else { + return addrs; + } +} + +int AsyncMessenger::send_to(Message *m, int type, const entity_addrvec_t& addrs) +{ + FUNCTRACE(cct); + ceph_assert(m); + +#if defined(WITH_EVENTTRACE) + if (m->get_type() == CEPH_MSG_OSD_OP) + OID_EVENT_TRACE(((MOSDOp *)m)->get_oid().name.c_str(), "SEND_MSG_OSD_OP"); + else if (m->get_type() == CEPH_MSG_OSD_OPREPLY) + OID_EVENT_TRACE(((MOSDOpReply *)m)->get_oid().name.c_str(), "SEND_MSG_OSD_OP_REPLY"); +#endif + + ldout(cct, 1) << __func__ << "--> " << ceph_entity_type_name(type) << " " + << addrs << " -- " << *m << " -- ?+" + << m->get_data().length() << " " << m << dendl; + + if (addrs.empty()) { + ldout(cct,0) << __func__ << " message " << *m + << " with empty dest " << addrs << dendl; + m->put(); + return -EINVAL; + } + + if (cct->_conf->ms_dump_on_send) { + m->encode(-1, MSG_CRC_ALL); + ldout(cct, 0) << __func__ << " submit_message " << *m << "\n"; + m->get_payload().hexdump(*_dout); + if (m->get_data().length() > 0) { + *_dout << " data:\n"; + m->get_data().hexdump(*_dout); + } + *_dout << dendl; + m->clear_payload(); + } + + connect_to(type, addrs, false)->send_message(m); + return 0; +} + +ConnectionRef AsyncMessenger::connect_to(int type, + const entity_addrvec_t& addrs, + bool anon, bool not_local_dest) +{ + if (!not_local_dest) { + if (*my_addrs == addrs || + (addrs.v.size() == 1 && + my_addrs->contains(addrs.front()))) { + // local + return local_connection; + } + } + + auto av = _filter_addrs(addrs); + std::lock_guard l{lock}; + if (anon) { + return create_connect(av, type, anon); + } + + AsyncConnectionRef conn = _lookup_conn(av); + if (conn) { + ldout(cct, 10) << __func__ << " " << av << " existing " << conn << dendl; + } else { + conn = create_connect(av, type, false); + ldout(cct, 10) << __func__ << " " << av << " new " << conn << dendl; + } + + return conn; +} + +/** + * If my_addr doesn't have an IP set, this function + * will fill it in from the passed addr. Otherwise it does nothing and returns. + */ +bool AsyncMessenger::set_addr_unknowns(const entity_addrvec_t &addrs) +{ + ldout(cct,1) << __func__ << " " << addrs << dendl; + bool ret = false; + std::lock_guard l{lock}; + + entity_addrvec_t newaddrs = *my_addrs; + for (auto& a : newaddrs.v) { + if (a.is_blank_ip()) { + int type = a.get_type(); + int port = a.get_port(); + uint32_t nonce = a.get_nonce(); + for (auto& b : addrs.v) { + if (a.get_family() == b.get_family()) { + ldout(cct,1) << __func__ << " assuming my addr " << a + << " matches provided addr " << b << dendl; + a = b; + a.set_nonce(nonce); + a.set_type(type); + a.set_port(port); + ret = true; + break; + } + } + } + } + set_myaddrs(newaddrs); + if (ret) { + _init_local_connection(); + } + ldout(cct,1) << __func__ << " now " << *my_addrs << dendl; + return ret; +} + +void AsyncMessenger::shutdown_connections(bool queue_reset) +{ + ldout(cct,1) << __func__ << " " << dendl; + std::lock_guard l{lock}; + for (const auto& c : accepting_conns) { + ldout(cct, 5) << __func__ << " accepting_conn " << c << dendl; + c->stop(queue_reset); + } + accepting_conns.clear(); + + for (const auto& [e, c] : conns) { + ldout(cct, 5) << __func__ << " mark down " << e << " " << c << dendl; + c->stop(queue_reset); + } + conns.clear(); + + for (const auto& c : anon_conns) { + ldout(cct, 5) << __func__ << " mark down " << c << dendl; + c->stop(queue_reset); + } + anon_conns.clear(); + + { + std::lock_guard l{deleted_lock}; + for (const auto& c : deleted_conns) { + ldout(cct, 5) << __func__ << " delete " << c << dendl; + c->get_perf_counter()->dec(l_msgr_active_connections); + } + deleted_conns.clear(); + } +} + +void AsyncMessenger::mark_down_addrs(const entity_addrvec_t& addrs) +{ + std::lock_guard l{lock}; + const AsyncConnectionRef& conn = _lookup_conn(addrs); + if (conn) { + ldout(cct, 1) << __func__ << " " << addrs << " -- " << conn << dendl; + conn->stop(true); + } else { + ldout(cct, 1) << __func__ << " " << addrs << " -- connection dne" << dendl; + } +} + +int AsyncMessenger::get_proto_version(int peer_type, bool connect) const +{ + int my_type = my_name.type(); + + // set reply protocol version + if (peer_type == my_type) { + // internal + return cluster_protocol; + } else { + // public + switch (connect ? peer_type : my_type) { + case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL; + case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL; + case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL; + } + } + return 0; +} + +int AsyncMessenger::accept_conn(const AsyncConnectionRef& conn) +{ + std::lock_guard l{lock}; + if (conn->policy.server && + conn->policy.lossy && + !conn->policy.register_lossy_clients) { + anon_conns.insert(conn); + conn->get_perf_counter()->inc(l_msgr_active_connections); + return 0; + } + auto it = conns.find(*conn->peer_addrs); + if (it != conns.end()) { + auto& existing = it->second; + + // lazy delete, see "deleted_conns" + // If conn already in, we will return 0 + std::lock_guard l{deleted_lock}; + if (deleted_conns.erase(existing)) { + it->second->get_perf_counter()->dec(l_msgr_active_connections); + conns.erase(it); + } else if (conn != existing) { + return -1; + } + } + ldout(cct, 10) << __func__ << " " << conn << " " << *conn->peer_addrs << dendl; + conns[*conn->peer_addrs] = conn; + conn->get_perf_counter()->inc(l_msgr_active_connections); + accepting_conns.erase(conn); + return 0; +} + + +bool AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) +{ + // be careful here: multiple threads may block here, and readers of + // my_addr do NOT hold any lock. + + // this always goes from true -> false under the protection of the + // mutex. if it is already false, we need not retake the mutex at + // all. + if (!need_addr) + return false; + std::lock_guard l(lock); + if (need_addr) { + if (my_addrs->empty()) { + auto a = peer_addr_for_me; + a.set_type(entity_addr_t::TYPE_ANY); + a.set_nonce(nonce); + if (!did_bind) { + a.set_port(0); + } + set_myaddrs(entity_addrvec_t(a)); + ldout(cct,10) << __func__ << " had no addrs" << dendl; + } else { + // fix all addrs of the same family, regardless of type (msgr2 vs legacy) + entity_addrvec_t newaddrs = *my_addrs; + for (auto& a : newaddrs.v) { + if (a.is_blank_ip() && + a.get_family() == peer_addr_for_me.get_family()) { + entity_addr_t t = peer_addr_for_me; + if (!did_bind) { + t.set_type(entity_addr_t::TYPE_ANY); + t.set_port(0); + } else { + t.set_type(a.get_type()); + t.set_port(a.get_port()); + } + t.set_nonce(a.get_nonce()); + ldout(cct,10) << __func__ << " " << a << " -> " << t << dendl; + a = t; + } + } + set_myaddrs(newaddrs); + } + ldout(cct, 1) << __func__ << " learned my addr " << *my_addrs + << " (peer_addr_for_me " << peer_addr_for_me << ")" << dendl; + _init_local_connection(); + need_addr = false; + return true; + } + return false; +} + +void AsyncMessenger::reap_dead() +{ + ldout(cct, 1) << __func__ << " start" << dendl; + + std::lock_guard l1{lock}; + + { + std::lock_guard l2{deleted_lock}; + for (auto& c : deleted_conns) { + ldout(cct, 5) << __func__ << " delete " << c << dendl; + auto conns_it = conns.find(*c->peer_addrs); + if (conns_it != conns.end() && conns_it->second == c) + conns.erase(conns_it); + accepting_conns.erase(c); + anon_conns.erase(c); + c->get_perf_counter()->dec(l_msgr_active_connections); + } + deleted_conns.clear(); + } +} -- cgit v1.2.3