From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- src/msg/async/Stack.cc | 206 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 206 insertions(+) create mode 100644 src/msg/async/Stack.cc (limited to 'src/msg/async/Stack.cc') diff --git a/src/msg/async/Stack.cc b/src/msg/async/Stack.cc new file mode 100644 index 000000000..37e15634d --- /dev/null +++ b/src/msg/async/Stack.cc @@ -0,0 +1,206 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSky + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include + +#include "include/compat.h" +#include "common/Cond.h" +#include "common/errno.h" +#include "PosixStack.h" +#ifdef HAVE_RDMA +#include "rdma/RDMAStack.h" +#endif +#ifdef HAVE_DPDK +#include "dpdk/DPDKStack.h" +#endif + +#include "common/dout.h" +#include "include/ceph_assert.h" + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix *_dout << "stack " + +std::function NetworkStack::add_thread(unsigned worker_id) +{ + Worker *w = workers[worker_id]; + return [this, w]() { + char tp_name[16]; + sprintf(tp_name, "msgr-worker-%u", w->id); + ceph_pthread_setname(pthread_self(), tp_name); + const unsigned EventMaxWaitUs = 30000000; + w->center.set_owner(); + ldout(cct, 10) << __func__ << " starting" << dendl; + w->initialize(); + w->init_done(); + while (!w->done) { + ldout(cct, 30) << __func__ << " calling event process" << dendl; + + ceph::timespan dur; + int r = w->center.process_events(EventMaxWaitUs, &dur); + if (r < 0) { + ldout(cct, 20) << __func__ << " process events failed: " + << cpp_strerror(errno) << dendl; + // TODO do something? + } + w->perf_logger->tinc(l_msgr_running_total_time, dur); + } + w->reset(); + w->destroy(); + }; +} + +std::shared_ptr NetworkStack::create(CephContext *c, + const std::string &t) +{ + std::shared_ptr stack = nullptr; + + if (t == "posix") + stack.reset(new PosixNetworkStack(c)); +#ifdef HAVE_RDMA + else if (t == "rdma") + stack.reset(new RDMAStack(c)); +#endif +#ifdef HAVE_DPDK + else if (t == "dpdk") + stack.reset(new DPDKStack(c)); +#endif + + if (stack == nullptr) { + lderr(c) << __func__ << " ms_async_transport_type " << t << + " is not supported! " << dendl; + ceph_abort(); + return nullptr; + } + + const int InitEventNumber = 5000; + for (unsigned worker_id = 0; worker_id < stack->num_workers; ++worker_id) { + Worker *w = stack->create_worker(c, worker_id); + int ret = w->center.init(InitEventNumber, worker_id, t); + if (ret) + throw std::system_error(-ret, std::generic_category()); + stack->workers.push_back(w); + } + + return stack; +} + +NetworkStack::NetworkStack(CephContext *c) + : cct(c) +{ + ceph_assert(cct->_conf->ms_async_op_threads > 0); + + num_workers = cct->_conf->ms_async_op_threads; + if (num_workers >= EventCenter::MAX_EVENTCENTER) { + ldout(cct, 0) << __func__ << " max thread limit is " + << EventCenter::MAX_EVENTCENTER << ", switching to this now. " + << "Higher thread values are unnecessary and currently unsupported." + << dendl; + num_workers = EventCenter::MAX_EVENTCENTER; + } +} + +void NetworkStack::start() +{ + std::unique_lock lk(pool_spin); + + if (started) { + return ; + } + + for (unsigned i = 0; i < num_workers; ++i) { + if (workers[i]->is_init()) + continue; + std::function thread = add_thread(i); + spawn_worker(i, std::move(thread)); + } + started = true; + lk.unlock(); + + for (unsigned i = 0; i < num_workers; ++i) + workers[i]->wait_for_init(); +} + +Worker* NetworkStack::get_worker() +{ + ldout(cct, 30) << __func__ << dendl; + + // start with some reasonably large number + unsigned min_load = std::numeric_limits::max(); + Worker* current_best = nullptr; + + pool_spin.lock(); + // find worker with least references + // tempting case is returning on references == 0, but in reality + // this will happen so rarely that there's no need for special case. + for (unsigned i = 0; i < num_workers; ++i) { + unsigned worker_load = workers[i]->references.load(); + if (worker_load < min_load) { + current_best = workers[i]; + min_load = worker_load; + } + } + + pool_spin.unlock(); + ceph_assert(current_best); + ++current_best->references; + return current_best; +} + +void NetworkStack::stop() +{ + std::lock_guard lk(pool_spin); + for (unsigned i = 0; i < num_workers; ++i) { + workers[i]->done = true; + workers[i]->center.wakeup(); + join_worker(i); + } + started = false; +} + +class C_drain : public EventCallback { + ceph::mutex drain_lock = ceph::make_mutex("C_drain::drain_lock"); + ceph::condition_variable drain_cond; + unsigned drain_count; + + public: + explicit C_drain(size_t c) + : drain_count(c) {} + void do_request(uint64_t id) override { + std::lock_guard l{drain_lock}; + drain_count--; + if (drain_count == 0) drain_cond.notify_all(); + } + void wait() { + std::unique_lock l{drain_lock}; + drain_cond.wait(l, [this] { return drain_count == 0; }); + } +}; + +void NetworkStack::drain() +{ + ldout(cct, 30) << __func__ << " started." << dendl; + pthread_t cur = pthread_self(); + pool_spin.lock(); + C_drain drain(num_workers); + for (unsigned i = 0; i < num_workers; ++i) { + ceph_assert(cur != workers[i]->center.get_owner()); + workers[i]->center.dispatch_event_external(EventCallbackRef(&drain)); + } + pool_spin.unlock(); + drain.wait(); + ldout(cct, 30) << __func__ << " end." << dendl; +} -- cgit v1.2.3