summaryrefslogtreecommitdiffstats
path: root/src/test/crimson
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/test/crimson
parentInitial commit. (diff)
downloadceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz
ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/test/crimson')
-rw-r--r--src/test/crimson/CMakeLists.txt42
-rw-r--r--src/test/crimson/perf_crimson_msgr.cc357
-rw-r--r--src/test/crimson/test_alien_echo.cc281
-rw-r--r--src/test/crimson/test_async_echo.cc238
-rw-r--r--src/test/crimson/test_buffer.cc49
-rw-r--r--src/test/crimson/test_config.cc110
-rw-r--r--src/test/crimson/test_denc.cc53
-rw-r--r--src/test/crimson/test_lru.cc213
-rw-r--r--src/test/crimson/test_messenger.cc420
-rw-r--r--src/test/crimson/test_monc.cc78
-rw-r--r--src/test/crimson/test_perfcounters.cc62
-rw-r--r--src/test/crimson/test_thread_pool.cc49
12 files changed, 1952 insertions, 0 deletions
diff --git a/src/test/crimson/CMakeLists.txt b/src/test/crimson/CMakeLists.txt
new file mode 100644
index 00000000..625610c2
--- /dev/null
+++ b/src/test/crimson/CMakeLists.txt
@@ -0,0 +1,42 @@
+add_executable(unittest_seastar_buffer
+ test_buffer.cc)
+target_link_libraries(unittest_seastar_buffer ceph-common crimson)
+
+add_executable(unittest_seastar_denc
+ test_denc.cc)
+target_link_libraries(unittest_seastar_denc crimson GTest::Main)
+
+add_executable(unittest_seastar_messenger test_messenger.cc)
+target_link_libraries(unittest_seastar_messenger ceph-common crimson)
+
+add_executable(perf_crimson_msgr perf_crimson_msgr.cc)
+target_link_libraries(perf_crimson_msgr ceph-common crimson)
+
+add_executable(unittest_seastar_echo
+ test_alien_echo.cc)
+target_link_libraries(unittest_seastar_echo crimson)
+
+add_executable(unittest_async_echo
+ test_async_echo.cc)
+target_link_libraries(unittest_async_echo ceph-common global)
+
+add_executable(unittest_seastar_thread_pool
+ test_thread_pool.cc)
+target_link_libraries(unittest_seastar_thread_pool crimson)
+
+add_executable(unittest_seastar_config
+ test_config.cc)
+target_link_libraries(unittest_seastar_config crimson)
+
+add_executable(unittest_seastar_monc
+ test_monc.cc)
+target_link_libraries(unittest_seastar_monc crimson)
+
+add_executable(unittest_seastar_perfcounters
+ test_perfcounters.cc)
+target_link_libraries(unittest_seastar_perfcounters crimson)
+
+add_executable(unittest_seastar_lru
+ test_lru.cc)
+target_link_libraries(unittest_seastar_lru crimson GTest::Main)
+
diff --git a/src/test/crimson/perf_crimson_msgr.cc b/src/test/crimson/perf_crimson_msgr.cc
new file mode 100644
index 00000000..f1973845
--- /dev/null
+++ b/src/test/crimson/perf_crimson_msgr.cc
@@ -0,0 +1,357 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <map>
+#include <random>
+#include <boost/program_options.hpp>
+
+#include <seastar/core/app-template.hh>
+#include <seastar/core/do_with.hh>
+#include <seastar/core/future-util.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/sleep.hh>
+#include <seastar/core/semaphore.hh>
+
+#include "common/ceph_time.h"
+#include "messages/MOSDOp.h"
+#include "messages/MOSDOpReply.h"
+
+#include "crimson/common/log.h"
+#include "crimson/net/Connection.h"
+#include "crimson/net/Dispatcher.h"
+#include "crimson/net/Messenger.h"
+
+namespace bpo = boost::program_options;
+
+namespace {
+
+template<typename Message>
+using Ref = boost::intrusive_ptr<Message>;
+
+seastar::logger& logger() {
+ return ceph::get_logger(ceph_subsys_ms);
+}
+
+
+enum class perf_mode_t {
+ both,
+ client,
+ server
+};
+
+static std::random_device rd;
+static std::default_random_engine rng{rd()};
+
+static seastar::future<> run(unsigned rounds,
+ double keepalive_ratio,
+ int bs,
+ int depth,
+ std::string addr,
+ perf_mode_t mode)
+{
+ struct test_state {
+ struct Server final
+ : public ceph::net::Dispatcher,
+ public seastar::peering_sharded_service<Server> {
+ ceph::net::Messenger *msgr = nullptr;
+
+ Dispatcher* get_local_shard() override {
+ return &(container().local());
+ }
+ seastar::future<> stop() {
+ return seastar::make_ready_future<>();
+ }
+ seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+ MessageRef m) override {
+ ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
+ // reply
+ Ref<MOSDOp> req = boost::static_pointer_cast<MOSDOp>(m);
+ req->finish_decode();
+ return c->send(MessageRef{ new MOSDOpReply(req.get(), 0, 0, 0, false), false });
+ }
+
+ seastar::future<> init(const entity_name_t& name,
+ const std::string& lname,
+ const uint64_t nonce,
+ const entity_addr_t& addr) {
+ auto&& fut = ceph::net::Messenger::create(name, lname, nonce, 1);
+ return fut.then([this, addr](ceph::net::Messenger *messenger) {
+ return container().invoke_on_all([messenger](auto& server) {
+ server.msgr = messenger->get_local_shard();
+ server.msgr->set_crc_header();
+ }).then([messenger, addr] {
+ return messenger->bind(entity_addrvec_t{addr});
+ }).then([this, messenger] {
+ return messenger->start(this);
+ });
+ });
+ }
+ seastar::future<> shutdown() {
+ ceph_assert(msgr);
+ return msgr->shutdown();
+ }
+ };
+
+ struct Client final
+ : public ceph::net::Dispatcher,
+ public seastar::peering_sharded_service<Client> {
+
+ struct PingSession : public seastar::enable_shared_from_this<PingSession> {
+ unsigned count = 0u;
+ mono_time connected_time;
+ mono_time finish_time;
+ };
+ using PingSessionRef = seastar::shared_ptr<PingSession>;
+
+ unsigned rounds;
+ std::bernoulli_distribution keepalive_dist;
+ ceph::net::Messenger *msgr = nullptr;
+ std::map<ceph::net::Connection*, seastar::promise<>> pending_conns;
+ std::map<ceph::net::ConnectionRef, PingSessionRef> sessions;
+ int msg_len;
+ bufferlist msg_data;
+ seastar::semaphore depth;
+
+ Client(unsigned rounds, double keepalive_ratio, int msg_len, int depth)
+ : rounds(rounds),
+ keepalive_dist(std::bernoulli_distribution{keepalive_ratio}),
+ depth(depth) {
+ bufferptr ptr(msg_len);
+ memset(ptr.c_str(), 0, msg_len);
+ msg_data.append(ptr);
+ }
+
+ PingSessionRef find_session(ceph::net::ConnectionRef c) {
+ auto found = sessions.find(c);
+ if (found == sessions.end()) {
+ ceph_assert(false);
+ }
+ return found->second;
+ }
+
+ Dispatcher* get_local_shard() override {
+ return &(container().local());
+ }
+ seastar::future<> stop() {
+ return seastar::now();
+ }
+ seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override {
+ logger().info("{}: connected to {}", *conn, conn->get_peer_addr());
+ auto session = seastar::make_shared<PingSession>();
+ auto [i, added] = sessions.emplace(conn, session);
+ std::ignore = i;
+ ceph_assert(added);
+ session->connected_time = mono_clock::now();
+ return seastar::now();
+ }
+ seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+ MessageRef m) override {
+ ceph_assert(m->get_type() == CEPH_MSG_OSD_OPREPLY);
+ depth.signal(1);
+ auto session = find_session(c);
+ ++(session->count);
+
+ if (session->count == rounds) {
+ logger().info("{}: finished receiving {} OPREPLYs", *c.get(), session->count);
+ session->finish_time = mono_clock::now();
+ return container().invoke_on_all([conn = c.get()](auto &client) {
+ auto found = client.pending_conns.find(conn);
+ ceph_assert(found != client.pending_conns.end());
+ found->second.set_value();
+ });
+ } else {
+ return seastar::now();
+ }
+ }
+
+ seastar::future<> init(const entity_name_t& name,
+ const std::string& lname,
+ const uint64_t nonce) {
+ return ceph::net::Messenger::create(name, lname, nonce, 2)
+ .then([this](ceph::net::Messenger *messenger) {
+ return container().invoke_on_all([messenger](auto& client) {
+ client.msgr = messenger->get_local_shard();
+ client.msgr->set_crc_header();
+ }).then([this, messenger] {
+ return messenger->start(this);
+ });
+ });
+ }
+
+ seastar::future<> shutdown() {
+ ceph_assert(msgr);
+ return msgr->shutdown();
+ }
+
+ seastar::future<> dispatch_messages(const entity_addr_t& peer_addr, bool foreign_dispatch=true) {
+ mono_time start_time = mono_clock::now();
+ return msgr->connect(peer_addr, entity_name_t::TYPE_OSD)
+ .then([this, foreign_dispatch, start_time](auto conn) {
+ return seastar::futurize_apply([this, conn, foreign_dispatch] {
+ if (foreign_dispatch) {
+ return do_dispatch_messages(&**conn);
+ } else {
+ // NOTE: this could be faster if we don't switch cores in do_dispatch_messages().
+ return container().invoke_on(conn->get()->shard_id(), [conn = &**conn](auto &client) {
+ return client.do_dispatch_messages(conn);
+ });
+ }
+ }).finally([this, conn, start_time] {
+ return container().invoke_on(conn->get()->shard_id(), [conn, start_time](auto &client) {
+ auto session = client.find_session((*conn)->shared_from_this());
+ std::chrono::duration<double> dur_handshake = session->connected_time - start_time;
+ std::chrono::duration<double> dur_messaging = session->finish_time - session->connected_time;
+ logger().info("{}: handshake {}, messaging {}",
+ **conn, dur_handshake.count(), dur_messaging.count());
+ });
+ });
+ });
+ }
+
+ private:
+ seastar::future<> send_msg(ceph::net::Connection* conn) {
+ return depth.wait(1).then([this, conn] {
+ const static pg_t pgid;
+ const static object_locator_t oloc;
+ const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
+ pgid.pool(), oloc.nspace);
+ static spg_t spgid(pgid);
+ MOSDOp *m = new MOSDOp(0, 0, hobj, spgid, 0, 0, 0);
+ bufferlist data(msg_data);
+ m->write(0, msg_len, data);
+ MessageRef msg = {m, false};
+ return conn->send(msg);
+ });
+ }
+
+ seastar::future<> do_dispatch_messages(ceph::net::Connection* conn) {
+ return container().invoke_on_all([conn](auto& client) {
+ auto [i, added] = client.pending_conns.emplace(conn, seastar::promise<>());
+ std::ignore = i;
+ ceph_assert(added);
+ }).then([this, conn] {
+ return seastar::do_with(0u, 0u,
+ [this, conn](auto &count_ping, auto &count_keepalive) {
+ return seastar::do_until(
+ [this, conn, &count_ping, &count_keepalive] {
+ bool stop = (count_ping == rounds);
+ if (stop) {
+ logger().info("{}: finished sending {} OSDOPs with {} keepalives",
+ *conn, count_ping, count_keepalive);
+ }
+ return stop;
+ },
+ [this, conn, &count_ping, &count_keepalive] {
+ return seastar::repeat([this, conn, &count_ping, &count_keepalive] {
+ if (keepalive_dist(rng)) {
+ return conn->keepalive()
+ .then([&count_keepalive] {
+ count_keepalive += 1;
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::no);
+ });
+ } else {
+ return send_msg(conn)
+ .then([&count_ping] {
+ count_ping += 1;
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::yes);
+ });
+ }
+ });
+ }).then([this, conn] {
+ auto found = pending_conns.find(conn);
+ return found->second.get_future();
+ });
+ });
+ });
+ }
+ };
+ };
+
+ return seastar::when_all_succeed(
+ ceph::net::create_sharded<test_state::Server>(),
+ ceph::net::create_sharded<test_state::Client>(rounds, keepalive_ratio, bs, depth))
+ .then([rounds, keepalive_ratio, addr, mode](test_state::Server *server,
+ test_state::Client *client) {
+ entity_addr_t target_addr;
+ target_addr.parse(addr.c_str(), nullptr);
+ target_addr.set_type(entity_addr_t::TYPE_LEGACY);
+ if (mode == perf_mode_t::both) {
+ return seastar::when_all_succeed(
+ server->init(entity_name_t::OSD(0), "server", 0, target_addr),
+ client->init(entity_name_t::OSD(1), "client", 0))
+ // dispatch pingpoing
+ .then([client, target_addr] {
+ return client->dispatch_messages(target_addr, false);
+ // shutdown
+ }).finally([client] {
+ logger().info("client shutdown...");
+ return client->shutdown();
+ }).finally([server] {
+ logger().info("server shutdown...");
+ return server->shutdown();
+ });
+ } else if (mode == perf_mode_t::client) {
+ return client->init(entity_name_t::OSD(1), "client", 0)
+ // dispatch pingpoing
+ .then([client, target_addr] {
+ return client->dispatch_messages(target_addr, false);
+ // shutdown
+ }).finally([client] {
+ logger().info("client shutdown...");
+ return client->shutdown();
+ });
+ } else { // mode == perf_mode_t::server
+ return server->init(entity_name_t::OSD(0), "server", 0, target_addr)
+ // dispatch pingpoing
+ .then([server] {
+ return server->msgr->wait();
+ // shutdown
+ }).finally([server] {
+ logger().info("server shutdown...");
+ return server->shutdown();
+ });
+ }
+ });
+}
+
+}
+
+int main(int argc, char** argv)
+{
+ seastar::app_template app;
+ app.add_options()
+ ("addr", bpo::value<std::string>()->default_value("0.0.0.0:9010"),
+ "start server")
+ ("mode", bpo::value<int>()->default_value(0),
+ "0: both, 1:client, 2:server")
+ ("rounds", bpo::value<unsigned>()->default_value(65536),
+ "number of messaging rounds")
+ ("keepalive-ratio", bpo::value<double>()->default_value(0),
+ "ratio of keepalive in ping messages")
+ ("bs", bpo::value<int>()->default_value(4096),
+ "block size")
+ ("depth", bpo::value<int>()->default_value(512),
+ "io depth");
+ return app.run(argc, argv, [&app] {
+ auto&& config = app.configuration();
+ auto rounds = config["rounds"].as<unsigned>();
+ auto keepalive_ratio = config["keepalive-ratio"].as<double>();
+ auto bs = config["bs"].as<int>();
+ auto depth = config["depth"].as<int>();
+ auto addr = config["addr"].as<std::string>();
+ auto mode = config["mode"].as<int>();
+ logger().info("\nsettings:\n addr={}\n mode={}\n rounds={}\n keepalive-ratio={}\n bs={}\n depth={}",
+ addr, mode, rounds, keepalive_ratio, bs, depth);
+ ceph_assert(mode >= 0 && mode <= 2);
+ auto _mode = static_cast<perf_mode_t>(mode);
+ return run(rounds, keepalive_ratio, bs, depth, addr, _mode)
+ .then([] {
+ std::cout << "successful" << std::endl;
+ }).handle_exception([] (auto eptr) {
+ std::cout << "failed" << std::endl;
+ return seastar::make_exception_future<>(eptr);
+ });
+ });
+}
diff --git a/src/test/crimson/test_alien_echo.cc b/src/test/crimson/test_alien_echo.cc
new file mode 100644
index 00000000..1dbe8133
--- /dev/null
+++ b/src/test/crimson/test_alien_echo.cc
@@ -0,0 +1,281 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+
+#include "auth/Auth.h"
+#include "messages/MPing.h"
+#include "crimson/net/Connection.h"
+#include "crimson/net/Dispatcher.h"
+#include "crimson/net/Messenger.h"
+#include "crimson/net/Config.h"
+#include "crimson/thread/Condition.h"
+#include "crimson/thread/Throttle.h"
+
+#include <seastar/core/alien.hh>
+#include <seastar/core/app-template.hh>
+#include <seastar/core/future-util.hh>
+#include <seastar/core/reactor.hh>
+
+
+enum class echo_role {
+ as_server,
+ as_client,
+};
+
+namespace seastar_pingpong {
+struct DummyAuthAuthorizer : public AuthAuthorizer {
+ DummyAuthAuthorizer()
+ : AuthAuthorizer(CEPH_AUTH_CEPHX)
+ {}
+ bool verify_reply(bufferlist::const_iterator&,
+ std::string *connection_secret) override {
+ return true;
+ }
+ bool add_challenge(CephContext*, const bufferlist&) override {
+ return true;
+ }
+};
+
+struct Server {
+ ceph::thread::Throttle byte_throttler;
+ ceph::net::Messenger& msgr;
+ struct ServerDispatcher : ceph::net::Dispatcher {
+ unsigned count = 0;
+ seastar::condition_variable on_reply;
+ seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+ MessageRef m) override {
+ std::cout << "server got ping " << *m << std::endl;
+ // reply with a pong
+ return c->send(MessageRef{new MPing(), false}).then([this] {
+ ++count;
+ on_reply.signal();
+ });
+ }
+ seastar::future<ceph::net::msgr_tag_t, bufferlist>
+ ms_verify_authorizer(peer_type_t peer_type,
+ auth_proto_t protocol,
+ bufferlist& auth) override {
+ return seastar::make_ready_future<ceph::net::msgr_tag_t, bufferlist>(
+ 0, bufferlist{});
+ }
+ seastar::future<std::unique_ptr<AuthAuthorizer>>
+ ms_get_authorizer(peer_type_t) override {
+ return seastar::make_ready_future<std::unique_ptr<AuthAuthorizer>>(
+ new DummyAuthAuthorizer{});
+ }
+ } dispatcher;
+ Server(ceph::net::Messenger& msgr)
+ : byte_throttler(ceph::net::conf.osd_client_message_size_cap),
+ msgr{msgr}
+ {
+ msgr.set_crc_header();
+ msgr.set_crc_data();
+ }
+};
+
+struct Client {
+ ceph::thread::Throttle byte_throttler;
+ ceph::net::Messenger& msgr;
+ struct ClientDispatcher : ceph::net::Dispatcher {
+ unsigned count = 0;
+ seastar::condition_variable on_reply;
+ seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+ MessageRef m) override {
+ std::cout << "client got pong " << *m << std::endl;
+ ++count;
+ on_reply.signal();
+ return seastar::now();
+ }
+ } dispatcher;
+ Client(ceph::net::Messenger& msgr)
+ : byte_throttler(ceph::net::conf.osd_client_message_size_cap),
+ msgr{msgr}
+ {
+ msgr.set_crc_header();
+ msgr.set_crc_data();
+ }
+};
+} // namespace seastar_pingpong
+
+class SeastarContext {
+ seastar::file_desc begin_fd;
+ ceph::thread::Condition on_end;
+
+public:
+ SeastarContext()
+ : begin_fd{seastar::file_desc::eventfd(0, 0)}
+ {}
+
+ template<class Func>
+ std::thread with_seastar(Func&& func) {
+ return std::thread{[this, func = std::forward<Func>(func)] {
+ // alien: are you ready?
+ wait_for_seastar();
+ // alien: could you help me apply(func)?
+ func();
+ // alien: i've sent my request. have you replied it?
+ // wait_for_seastar();
+ // alien: you are free to go!
+ on_end.notify();
+ }};
+ }
+
+ void run(seastar::app_template& app, int argc, char** argv) {
+ app.run(argc, argv, [this] {
+ return seastar::now().then([this] {
+ return set_seastar_ready();
+ }).then([this] {
+ // seastar: let me know once i am free to leave.
+ return on_end.wait();
+ }).handle_exception([](auto ep) {
+ std::cerr << "Error: " << ep << std::endl;
+ }).finally([] {
+ seastar::engine().exit(0);
+ });
+ });
+ }
+
+ seastar::future<> set_seastar_ready() {
+ // seastar: i am ready to serve!
+ ::eventfd_write(begin_fd.get(), 1);
+ return seastar::now();
+ }
+
+private:
+ void wait_for_seastar() {
+ eventfd_t result = 0;
+ if (int r = ::eventfd_read(begin_fd.get(), &result); r < 0) {
+ std::cerr << "unable to eventfd_read():" << errno << std::endl;
+ }
+ }
+};
+
+static seastar::future<>
+seastar_echo(const entity_addr_t addr, echo_role role, unsigned count)
+{
+ std::cout << "seastar/";
+ if (role == echo_role::as_server) {
+ return ceph::net::Messenger::create(entity_name_t::OSD(0), "server",
+ addr.get_nonce(), 0)
+ .then([addr, count] (auto msgr) {
+ return seastar::do_with(seastar_pingpong::Server{*msgr},
+ [addr, count](auto& server) mutable {
+ std::cout << "server listening at " << addr << std::endl;
+ // bind the server
+ server.msgr.set_policy_throttler(entity_name_t::TYPE_OSD,
+ &server.byte_throttler);
+ return server.msgr.bind(entity_addrvec_t{addr})
+ .then([&server] {
+ return server.msgr.start(&server.dispatcher);
+ }).then([&dispatcher=server.dispatcher, count] {
+ return dispatcher.on_reply.wait([&dispatcher, count] {
+ return dispatcher.count >= count;
+ });
+ }).finally([&server] {
+ std::cout << "server shutting down" << std::endl;
+ return server.msgr.shutdown();
+ });
+ });
+ });
+ } else {
+ return ceph::net::Messenger::create(entity_name_t::OSD(1), "client",
+ addr.get_nonce(), 0)
+ .then([addr, count] (auto msgr) {
+ return seastar::do_with(seastar_pingpong::Client{*msgr},
+ [addr, count](auto& client) {
+ std::cout << "client sending to " << addr << std::endl;
+ client.msgr.set_policy_throttler(entity_name_t::TYPE_OSD,
+ &client.byte_throttler);
+ return client.msgr.start(&client.dispatcher)
+ .then([addr, &client] {
+ return client.msgr.connect(addr, entity_name_t::TYPE_OSD);
+ }).then([&disp=client.dispatcher, count](ceph::net::ConnectionXRef conn) {
+ return seastar::do_until(
+ [&disp,count] { return disp.count >= count; },
+ [&disp,conn] { return (*conn)->send(MessageRef{new MPing(), false})
+ .then([&] { return disp.on_reply.wait(); });
+ });
+ }).finally([&client] {
+ std::cout << "client shutting down" << std::endl;
+ return client.msgr.shutdown();
+ });
+ });
+ });
+ }
+}
+
+int main(int argc, char** argv)
+{
+ namespace po = boost::program_options;
+ po::options_description desc{"Allowed options"};
+ desc.add_options()
+ ("help,h", "show help message")
+ ("role", po::value<std::string>()->default_value("pong"),
+ "role to play (ping | pong)")
+ ("port", po::value<uint16_t>()->default_value(9010),
+ "port #")
+ ("nonce", po::value<uint32_t>()->default_value(42),
+ "a unique number to identify the pong server")
+ ("count", po::value<unsigned>()->default_value(10),
+ "stop after sending/echoing <count> MPing messages")
+ ("v2", po::value<bool>()->default_value(false),
+ "using msgr v2 protocol");
+ po::variables_map vm;
+ std::vector<std::string> unrecognized_options;
+ try {
+ auto parsed = po::command_line_parser(argc, argv)
+ .options(desc)
+ .allow_unregistered()
+ .run();
+ po::store(parsed, vm);
+ if (vm.count("help")) {
+ std::cout << desc << std::endl;
+ return 0;
+ }
+ po::notify(vm);
+ unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional);
+ } catch(const po::error& e) {
+ std::cerr << "error: " << e.what() << std::endl;
+ return 1;
+ }
+
+ entity_addr_t addr;
+ if (vm["v2"].as<bool>()) {
+ addr.set_type(entity_addr_t::TYPE_MSGR2);
+ } else {
+ addr.set_type(entity_addr_t::TYPE_LEGACY);
+ }
+ addr.set_family(AF_INET);
+ addr.set_port(vm["port"].as<std::uint16_t>());
+ addr.set_nonce(vm["nonce"].as<std::uint32_t>());
+
+ echo_role role = echo_role::as_server;
+ if (vm["role"].as<std::string>() == "ping") {
+ role = echo_role::as_client;
+ }
+
+ auto count = vm["count"].as<unsigned>();
+ seastar::app_template app;
+ SeastarContext sc;
+ auto job = sc.with_seastar([&] {
+ auto fut = seastar::alien::submit_to(0, [addr, role, count] {
+ return seastar_echo(addr, role, count);
+ });
+ fut.wait();
+ });
+ std::vector<char*> av{argv[0]};
+ std::transform(begin(unrecognized_options),
+ end(unrecognized_options),
+ std::back_inserter(av),
+ [](auto& s) {
+ return const_cast<char*>(s.c_str());
+ });
+ sc.run(app, av.size(), av.data());
+ job.join();
+}
+
+/*
+ * Local Variables:
+ * compile-command: "make -j4 \
+ * -C ../../../build \
+ * unittest_seastar_echo"
+ * End:
+ */
diff --git a/src/test/crimson/test_async_echo.cc b/src/test/crimson/test_async_echo.cc
new file mode 100644
index 00000000..7a7323f1
--- /dev/null
+++ b/src/test/crimson/test_async_echo.cc
@@ -0,0 +1,238 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+
+#include <boost/program_options/variables_map.hpp>
+#include <boost/program_options/parsers.hpp>
+
+#include "auth/Auth.h"
+#include "global/global_init.h"
+#include "messages/MPing.h"
+#include "msg/Dispatcher.h"
+#include "msg/Messenger.h"
+
+#include "auth/DummyAuth.h"
+
+enum class echo_role {
+ as_server,
+ as_client,
+};
+
+namespace native_pingpong {
+
+constexpr int CEPH_OSD_PROTOCOL = 10;
+
+struct Server {
+ Server(CephContext* cct, const entity_inst_t& entity)
+ : dummy_auth(cct), dispatcher(cct)
+ {
+ msgr.reset(Messenger::create(cct, "async",
+ entity.name, "pong", entity.addr.get_nonce(), 0));
+ dummy_auth.auth_registry.refresh_config();
+ msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
+ msgr->set_default_policy(Messenger::Policy::stateless_server(0));
+ msgr->set_auth_client(&dummy_auth);
+ msgr->set_auth_server(&dummy_auth);
+ dispatcher.ms_set_require_authorizer(false);
+ }
+ DummyAuthClientServer dummy_auth;
+ unique_ptr<Messenger> msgr;
+ struct ServerDispatcher : Dispatcher {
+ std::mutex mutex;
+ std::condition_variable on_reply;
+ bool replied = false;
+ ServerDispatcher(CephContext* cct)
+ : Dispatcher(cct)
+ {}
+ bool ms_can_fast_dispatch_any() const override {
+ return true;
+ }
+ bool ms_can_fast_dispatch(const Message* m) const override {
+ return m->get_type() == CEPH_MSG_PING;
+ }
+ void ms_fast_dispatch(Message* m) override {
+ m->get_connection()->send_message(new MPing);
+ m->put();
+ {
+ std::lock_guard lock{mutex};
+ replied = true;
+ }
+ on_reply.notify_one();
+ }
+ bool ms_dispatch(Message*) override {
+ ceph_abort();
+ }
+ bool ms_handle_reset(Connection*) override {
+ return true;
+ }
+ void ms_handle_remote_reset(Connection*) override {
+ }
+ bool ms_handle_refused(Connection*) override {
+ return true;
+ }
+ void echo() {
+ replied = false;
+ std::unique_lock lock{mutex};
+ return on_reply.wait(lock, [this] { return replied; });
+ }
+ } dispatcher;
+ void echo() {
+ dispatcher.echo();
+ }
+};
+
+struct Client {
+ unique_ptr<Messenger> msgr;
+ Client(CephContext *cct)
+ : dummy_auth(cct), dispatcher(cct)
+ {
+ msgr.reset(Messenger::create(cct, "async",
+ entity_name_t::CLIENT(-1), "ping",
+ getpid(), 0));
+ dummy_auth.auth_registry.refresh_config();
+ msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
+ msgr->set_default_policy(Messenger::Policy::lossy_client(0));
+ msgr->set_auth_client(&dummy_auth);
+ msgr->set_auth_server(&dummy_auth);
+ dispatcher.ms_set_require_authorizer(false);
+ }
+ DummyAuthClientServer dummy_auth;
+ struct ClientDispatcher : Dispatcher {
+ std::mutex mutex;
+ std::condition_variable on_reply;
+ bool replied = false;
+
+ ClientDispatcher(CephContext* cct)
+ : Dispatcher(cct)
+ {}
+ bool ms_can_fast_dispatch_any() const override {
+ return true;
+ }
+ bool ms_can_fast_dispatch(const Message* m) const override {
+ return m->get_type() == CEPH_MSG_PING;
+ }
+ void ms_fast_dispatch(Message* m) override {
+ m->put();
+ {
+ std::lock_guard lock{mutex};
+ replied = true;
+ }
+ on_reply.notify_one();
+ }
+ bool ms_dispatch(Message*) override {
+ ceph_abort();
+ }
+ bool ms_handle_reset(Connection *) override {
+ return true;
+ }
+ void ms_handle_remote_reset(Connection*) override {
+ }
+ bool ms_handle_refused(Connection*) override {
+ return true;
+ }
+ bool ping(Messenger* msgr, const entity_inst_t& peer) {
+ auto conn = msgr->connect_to(peer.name.type(),
+ entity_addrvec_t{peer.addr});
+ replied = false;
+ conn->send_message(new MPing);
+ std::unique_lock lock{mutex};
+ return on_reply.wait_for(lock, 500ms, [&] {
+ return replied;
+ });
+ }
+ } dispatcher;
+ void ping(const entity_inst_t& peer) {
+ dispatcher.ping(msgr.get(), peer);
+ }
+};
+} // namespace native_pingpong
+
+static void ceph_echo(CephContext* cct,
+ entity_addr_t addr, echo_role role, unsigned count)
+{
+ std::cout << "ceph/";
+ entity_inst_t entity{entity_name_t::OSD(0), addr};
+ if (role == echo_role::as_server) {
+ std::cout << "server listening at " << addr << std::endl;
+ native_pingpong::Server server{cct, entity};
+ server.msgr->bind(addr);
+ server.msgr->add_dispatcher_head(&server.dispatcher);
+ server.msgr->start();
+ for (unsigned i = 0; i < count; i++) {
+ server.echo();
+ }
+ server.msgr->shutdown();
+ server.msgr->wait();
+ } else {
+ std::cout << "client sending to " << addr << std::endl;
+ native_pingpong::Client client{cct};
+ client.msgr->add_dispatcher_head(&client.dispatcher);
+ client.msgr->start();
+ auto conn = client.msgr->connect_to(entity.name.type(),
+ entity_addrvec_t{entity.addr});
+ for (unsigned i = 0; i < count; i++) {
+ std::cout << "seq=" << i << std::endl;
+ client.ping(entity);
+ }
+ client.msgr->shutdown();
+ client.msgr->wait();
+ }
+}
+
+int main(int argc, char** argv)
+{
+ namespace po = boost::program_options;
+ po::options_description desc{"Allowed options"};
+ desc.add_options()
+ ("help,h", "show help message")
+ ("role", po::value<std::string>()->default_value("pong"),
+ "role to play (ping | pong)")
+ ("port", po::value<uint16_t>()->default_value(9010),
+ "port #")
+ ("nonce", po::value<uint32_t>()->default_value(42),
+ "a unique number to identify the pong server")
+ ("count", po::value<unsigned>()->default_value(10),
+ "stop after sending/echoing <count> MPing messages")
+ ("v2", po::value<bool>()->default_value(false),
+ "using msgr v2 protocol");
+ po::variables_map vm;
+ std::vector<std::string> unrecognized_options;
+ try {
+ auto parsed = po::command_line_parser(argc, argv)
+ .options(desc)
+ .allow_unregistered()
+ .run();
+ po::store(parsed, vm);
+ if (vm.count("help")) {
+ std::cout << desc << std::endl;
+ return 0;
+ }
+ po::notify(vm);
+ unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional);
+ } catch(const po::error& e) {
+ std::cerr << "error: " << e.what() << std::endl;
+ return 1;
+ }
+
+ entity_addr_t addr;
+ if (vm["v2"].as<bool>()) {
+ addr.set_type(entity_addr_t::TYPE_MSGR2);
+ } else {
+ addr.set_type(entity_addr_t::TYPE_LEGACY);
+ }
+ addr.set_family(AF_INET);
+ addr.set_port(vm["port"].as<std::uint16_t>());
+ addr.set_nonce(vm["nonce"].as<std::uint32_t>());
+
+ echo_role role = echo_role::as_server;
+ if (vm["role"].as<std::string>() == "ping") {
+ role = echo_role::as_client;
+ }
+
+ auto count = vm["count"].as<unsigned>();
+ std::vector<const char*> args(argv, argv + argc);
+ auto cct = global_init(nullptr, args,
+ CEPH_ENTITY_TYPE_CLIENT,
+ CODE_ENVIRONMENT_UTILITY,
+ CINIT_FLAG_NO_MON_CONFIG);
+ common_init_finish(cct.get());
+ ceph_echo(cct.get(), addr, role, count);
+}
diff --git a/src/test/crimson/test_buffer.cc b/src/test/crimson/test_buffer.cc
new file mode 100644
index 00000000..1093b688
--- /dev/null
+++ b/src/test/crimson/test_buffer.cc
@@ -0,0 +1,49 @@
+#include "include/buffer.h"
+#include <seastar/core/app-template.hh>
+#include <seastar/core/future-util.hh>
+#include <seastar/core/reactor.hh>
+
+// allocate a foreign buffer on each cpu, collect them all into a bufferlist,
+// and destruct it on this cpu
+seastar::future<> test_foreign_bufferlist()
+{
+ auto make_foreign_buffer = [] (unsigned cpu) {
+ return seastar::smp::submit_to(cpu, [=] {
+ bufferlist bl;
+ seastar::temporary_buffer<char> buf("abcd", 4);
+ bl.append(buffer::create(std::move(buf)));
+ return bl;
+ });
+ };
+ auto reduce = [] (bufferlist&& lhs, bufferlist&& rhs) {
+ bufferlist bl;
+ bl.claim_append(lhs);
+ bl.claim_append(rhs);
+ return bl;
+ };
+ return seastar::map_reduce(seastar::smp::all_cpus(), make_foreign_buffer,
+ bufferlist(), reduce).then(
+ [] (bufferlist&& bl) {
+ if (bl.length() != 4 * seastar::smp::count) {
+ auto e = std::make_exception_ptr(std::runtime_error("wrong buffer size"));
+ return seastar::make_exception_future<>(e);
+ }
+ bl.clear();
+ return seastar::make_ready_future<>();
+ });
+}
+
+int main(int argc, char** argv)
+{
+ seastar::app_template app;
+ return app.run(argc, argv, [] {
+ return seastar::now().then(
+ &test_foreign_bufferlist
+ ).then([] {
+ std::cout << "All tests succeeded" << std::endl;
+ }).handle_exception([] (auto eptr) {
+ std::cout << "Test failure" << std::endl;
+ return seastar::make_exception_future<>(eptr);
+ });
+ });
+}
diff --git a/src/test/crimson/test_config.cc b/src/test/crimson/test_config.cc
new file mode 100644
index 00000000..35b258f6
--- /dev/null
+++ b/src/test/crimson/test_config.cc
@@ -0,0 +1,110 @@
+#include <chrono>
+#include <string>
+#include <numeric>
+#include <seastar/core/app-template.hh>
+#include <seastar/core/sharded.hh>
+#include "common/ceph_argparse.h"
+#include "common/config_obs.h"
+#include "crimson/common/config_proxy.h"
+
+using Config = ceph::common::ConfigProxy;
+const std::string test_uint_option = "osd_max_pgls";
+const uint64_t INVALID_VALUE = (uint64_t)(-1);
+
+class ConfigObs : public ceph::md_config_obs_impl<Config> {
+ uint64_t last_change = INVALID_VALUE;
+ uint64_t num_changes = 0;
+
+ const char** get_tracked_conf_keys() const override {
+ static const char* keys[] = {
+ test_uint_option.c_str(),
+ nullptr,
+ };
+ return keys;
+ }
+ void handle_conf_change(const Config& conf,
+ const std::set <std::string> &changes) override{
+ if (changes.count(test_uint_option)) {
+ last_change = conf.get_val<uint64_t>(test_uint_option);
+ num_changes += 1;
+ }
+ }
+public:
+ ConfigObs() {
+ ceph::common::local_conf().add_observer(this);
+ }
+
+ uint64_t get_last_change() const { return last_change; }
+ uint64_t get_num_changes() const { return num_changes; }
+ seastar::future<> stop() {
+ ceph::common::local_conf().remove_observer(this);
+ return seastar::now();
+ }
+};
+
+seastar::sharded<ConfigObs> sharded_cobs;
+
+static seastar::future<> test_config()
+{
+ return ceph::common::sharded_conf().start(EntityName{}, string_view{"ceph"}).then([] {
+ std::vector<const char*> args;
+ std::string cluster;
+ std::string conf_file_list;
+ auto init_params = ceph_argparse_early_args(args,
+ CEPH_ENTITY_TYPE_CLIENT,
+ &cluster,
+ &conf_file_list);
+ auto& conf = ceph::common::local_conf();
+ conf->name = init_params.name;
+ conf->cluster = cluster;
+ return conf.parse_config_files(conf_file_list);
+ }).then([] {
+ return ceph::common::sharded_conf().invoke_on(0, &Config::start);
+ }).then([] {
+ return sharded_cobs.start();
+ }).then([] {
+ return ceph::common::sharded_conf().invoke_on_all([](Config& config) {
+ return config.set_val(test_uint_option,
+ std::to_string(seastar::engine().cpu_id()));
+ });
+ }).then([] {
+ auto expected = ceph::common::local_conf().get_val<uint64_t>(test_uint_option);
+ return ceph::common::sharded_conf().invoke_on_all([expected](Config& config) {
+ if (expected != config.get_val<uint64_t>(test_uint_option)) {
+ throw std::runtime_error("configurations don't match");
+ }
+ if (expected != sharded_cobs.local().get_last_change()) {
+ throw std::runtime_error("last applied changes don't match the latest config");
+ }
+ if (seastar::smp::count != sharded_cobs.local().get_num_changes()) {
+ throw std::runtime_error("num changes don't match actual changes");
+ }
+ });
+ }).finally([] {
+ return sharded_cobs.stop();
+ }).finally([] {
+ return ceph::common::sharded_conf().stop();
+ });
+}
+
+int main(int argc, char** argv)
+{
+ seastar::app_template app;
+ return app.run(argc, argv, [&] {
+ return test_config().then([] {
+ std::cout << "All tests succeeded" << std::endl;
+ }).handle_exception([] (auto eptr) {
+ std::cout << "Test failure" << std::endl;
+ return seastar::make_exception_future<>(eptr);
+ });
+ });
+}
+
+
+/*
+ * Local Variables:
+ * compile-command: "make -j4 \
+ * -C ../../../build \
+ * unittest_seastar_config"
+ * End:
+ */
diff --git a/src/test/crimson/test_denc.cc b/src/test/crimson/test_denc.cc
new file mode 100644
index 00000000..10ebd6dc
--- /dev/null
+++ b/src/test/crimson/test_denc.cc
@@ -0,0 +1,53 @@
+#include <string>
+#include <seastar/core/temporary_buffer.hh>
+#include <gtest/gtest.h>
+#include "include/denc.h"
+#include "common/buffer_seastar.h"
+
+using temporary_buffer = seastar::temporary_buffer<char>;
+using buffer_iterator = seastar_buffer_iterator;
+using const_buffer_iterator = const_seastar_buffer_iterator;
+
+template<typename T>
+void test_denc(T v) {
+ // estimate
+ size_t s = 0;
+ denc(v, s);
+ ASSERT_NE(s, 0u);
+
+ // encode
+ temporary_buffer buf{s};
+ buffer_iterator enc{buf};
+ denc(v, enc);
+ size_t len = enc.get() - buf.begin();
+ ASSERT_LE(len, s);
+
+ // decode
+ T out;
+ temporary_buffer encoded = buf.share();
+ encoded.trim(len);
+ const_buffer_iterator dec{encoded};
+ denc(out, dec);
+ ASSERT_EQ(v, out);
+ ASSERT_EQ(dec.get(), enc.get());
+}
+
+TEST(denc, simple)
+{
+ test_denc((uint8_t)4);
+ test_denc((int8_t)-5);
+ test_denc((uint16_t)6);
+ test_denc((int16_t)-7);
+ test_denc((uint32_t)8);
+ test_denc((int32_t)-9);
+ test_denc((uint64_t)10);
+ test_denc((int64_t)-11);
+}
+
+TEST(denc, string)
+{
+ std::string a, b("hi"), c("multi\nline\n");
+ test_denc(a);
+ test_denc(b);
+ test_denc(c);
+}
diff --git a/src/test/crimson/test_lru.cc b/src/test/crimson/test_lru.cc
new file mode 100644
index 00000000..40ab4153
--- /dev/null
+++ b/src/test/crimson/test_lru.cc
@@ -0,0 +1,213 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Cloudwatt <libre.licensing@cloudwatt.com>
+ *
+ * Author: Loic Dachary <loic@dachary.org>
+ * Cheng Cheng <ccheng.leo@gmail.com>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Library Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Library Public License for more details.
+ *
+ */
+
+#include <stdio.h>
+#include "gtest/gtest.h"
+#include "crimson/common/shared_lru.h"
+
+class LRUTest : public SharedLRU<unsigned int, int> {
+public:
+ auto add(unsigned int key, int value, bool* existed = nullptr) {
+ auto pv = new int{value};
+ auto ptr = insert(key, std::unique_ptr<int>{pv});
+ if (existed) {
+ *existed = (ptr.get() != pv);
+ }
+ return ptr;
+ }
+};
+
+TEST(LRU, add) {
+ LRUTest cache;
+ unsigned int key = 1;
+ int value1 = 2;
+ bool existed = false;
+ {
+ auto ptr = cache.add(key, value1, &existed);
+ ASSERT_TRUE(ptr);
+ ASSERT_TRUE(ptr.get());
+ ASSERT_EQ(value1, *ptr);
+ ASSERT_FALSE(existed);
+ }
+ {
+ auto ptr = cache.add(key, 3, &existed);
+ ASSERT_EQ(value1, *ptr);
+ ASSERT_TRUE(existed);
+ }
+}
+
+TEST(LRU, empty) {
+ LRUTest cache;
+ unsigned int key = 1;
+ bool existed = false;
+
+ ASSERT_TRUE(cache.empty());
+ {
+ int value1 = 2;
+ auto ptr = cache.add(key, value1, &existed);
+ ASSERT_EQ(value1, *ptr);
+ ASSERT_FALSE(existed);
+ }
+ ASSERT_FALSE(cache.empty());
+
+ cache.clear();
+ ASSERT_TRUE(cache.empty());
+}
+
+TEST(LRU, lookup) {
+ LRUTest cache;
+ unsigned int key = 1;
+ {
+ int value = 2;
+ auto ptr = cache.add(key, value);
+ ASSERT_TRUE(ptr);
+ ASSERT_TRUE(ptr.get());
+ ASSERT_TRUE(cache.find(key).get());
+ ASSERT_EQ(value, *cache.find(key));
+ }
+ ASSERT_TRUE(cache.find(key).get());
+}
+
+TEST(LRU, lookup_or_create) {
+ LRUTest cache;
+ {
+ int value = 2;
+ unsigned int key = 1;
+ ASSERT_TRUE(cache.add(key, value).get());
+ ASSERT_TRUE(cache[key].get());
+ ASSERT_EQ(value, *cache.find(key));
+ }
+ {
+ unsigned int key = 2;
+ ASSERT_TRUE(cache[key].get());
+ ASSERT_EQ(0, *cache.find(key));
+ }
+ ASSERT_TRUE(cache.find(1).get());
+ ASSERT_TRUE(cache.find(2).get());
+}
+
+TEST(LRU, lower_bound) {
+ LRUTest cache;
+
+ {
+ unsigned int key = 1;
+ ASSERT_FALSE(cache.lower_bound(key));
+ int value = 2;
+
+ ASSERT_TRUE(cache.add(key, value).get());
+ ASSERT_TRUE(cache.lower_bound(key).get());
+ EXPECT_EQ(value, *cache.lower_bound(key));
+ }
+}
+
+TEST(LRU, get_next) {
+
+ {
+ LRUTest cache;
+ const unsigned int key = 0;
+ EXPECT_FALSE(cache.upper_bound(key));
+ }
+ {
+ LRUTest cache;
+ const unsigned int key1 = 111;
+ auto ptr1 = cache[key1];
+ const unsigned int key2 = 222;
+ auto ptr2 = cache[key2];
+
+ auto i = cache.upper_bound(0);
+ ASSERT_TRUE(i);
+ EXPECT_EQ(i->first, key1);
+ auto j = cache.upper_bound(i->first);
+ ASSERT_TRUE(j);
+ EXPECT_EQ(j->first, key2);
+ }
+}
+
+TEST(LRU, clear) {
+ LRUTest cache;
+ unsigned int key = 1;
+ int value = 2;
+ cache.add(key, value);
+ {
+ auto found = cache.find(key);
+ ASSERT_TRUE(found);
+ ASSERT_EQ(value, *found);
+ }
+ ASSERT_TRUE(cache.find(key).get());
+ cache.clear();
+ ASSERT_FALSE(cache.find(key));
+ ASSERT_TRUE(cache.empty());
+}
+
+TEST(LRU, eviction) {
+ LRUTest cache{5};
+ bool existed;
+ // add a bunch of elements, some of them will be evicted
+ for (size_t i = 0; i < 2 * cache.capacity(); ++i) {
+ cache.add(i, i, &existed);
+ ASSERT_FALSE(existed);
+ }
+ size_t i = 0;
+ for (; i < cache.capacity(); ++i) {
+ ASSERT_FALSE(cache.find(i));
+ }
+ for (; i < 2 * cache.capacity(); ++i) {
+ ASSERT_TRUE(cache.find(i));
+ }
+}
+
+TEST(LRU, track_weak) {
+ constexpr int SIZE = 5;
+ LRUTest cache{SIZE};
+
+ bool existed = false;
+ // strong reference to keep 0 alive
+ auto ptr = cache.add(0, 0, &existed);
+ ASSERT_FALSE(existed);
+
+ // add a bunch of elements to get 0 evicted
+ for (size_t i = 1; i < 2 * cache.capacity(); ++i) {
+ cache.add(i, i, &existed);
+ ASSERT_FALSE(existed);
+ }
+ // 0 is still reachable via the cache
+ ASSERT_TRUE(cache.find(0));
+ ASSERT_TRUE(cache.find(0).get());
+ ASSERT_EQ(0, *cache.find(0));
+
+ // [0..SIZE) are evicted when adding [SIZE..2*SIZE)
+ // [SIZE..SIZE * 2) were still in the cache before accessing 0,
+ // but SIZE got evicted when accessing 0
+ ASSERT_FALSE(cache.find(SIZE-1));
+ ASSERT_FALSE(cache.find(SIZE));
+ ASSERT_TRUE(cache.find(SIZE+1));
+ ASSERT_TRUE(cache.find(SIZE+1).get());
+ ASSERT_EQ((int)SIZE+1, *cache.find(SIZE+1));
+
+ ptr.reset();
+ // 0 is still reachable, as it is now put back into LRU cache
+ ASSERT_TRUE(cache.find(0));
+}
+
+// Local Variables:
+// compile-command: "cmake --build ../../../build -j 8 --target unittest_seastar_lru && ctest -R unittest_seastar_lru # --gtest_filter=*.* --log-to-stderr=true"
+// End:
diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc
new file mode 100644
index 00000000..6ec23f6d
--- /dev/null
+++ b/src/test/crimson/test_messenger.cc
@@ -0,0 +1,420 @@
+#include "common/ceph_time.h"
+#include "messages/MPing.h"
+#include "crimson/common/log.h"
+#include "crimson/net/Connection.h"
+#include "crimson/net/Dispatcher.h"
+#include "crimson/net/Messenger.h"
+
+#include <map>
+#include <random>
+#include <boost/program_options.hpp>
+#include <seastar/core/app-template.hh>
+#include <seastar/core/do_with.hh>
+#include <seastar/core/future-util.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/sleep.hh>
+
+namespace bpo = boost::program_options;
+
+namespace {
+
+seastar::logger& logger() {
+ return ceph::get_logger(ceph_subsys_ms);
+}
+
+static std::random_device rd;
+static std::default_random_engine rng{rd()};
+static bool verbose = false;
+
+static seastar::future<> test_echo(unsigned rounds,
+ double keepalive_ratio)
+{
+ struct test_state {
+ struct Server final
+ : public ceph::net::Dispatcher,
+ public seastar::peering_sharded_service<Server> {
+ ceph::net::Messenger *msgr = nullptr;
+ MessageRef msg_pong{new MPing(), false};
+
+ Dispatcher* get_local_shard() override {
+ return &(container().local());
+ }
+ seastar::future<> stop() {
+ return seastar::make_ready_future<>();
+ }
+ seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+ MessageRef m) override {
+ if (verbose) {
+ logger().info("server got {}", *m);
+ }
+ // reply with a pong
+ return c->send(msg_pong);
+ }
+
+ seastar::future<> init(const entity_name_t& name,
+ const std::string& lname,
+ const uint64_t nonce,
+ const entity_addr_t& addr) {
+ auto&& fut = ceph::net::Messenger::create(name, lname, nonce);
+ return fut.then([this, addr](ceph::net::Messenger *messenger) {
+ return container().invoke_on_all([messenger](auto& server) {
+ server.msgr = messenger->get_local_shard();
+ }).then([messenger, addr] {
+ return messenger->bind(entity_addrvec_t{addr});
+ }).then([this, messenger] {
+ return messenger->start(this);
+ });
+ });
+ }
+ seastar::future<> shutdown() {
+ ceph_assert(msgr);
+ return msgr->shutdown();
+ }
+ };
+
+ struct Client final
+ : public ceph::net::Dispatcher,
+ public seastar::peering_sharded_service<Client> {
+
+ struct PingSession : public seastar::enable_shared_from_this<PingSession> {
+ unsigned count = 0u;
+ mono_time connected_time;
+ mono_time finish_time;
+ };
+ using PingSessionRef = seastar::shared_ptr<PingSession>;
+
+ unsigned rounds;
+ std::bernoulli_distribution keepalive_dist;
+ ceph::net::Messenger *msgr = nullptr;
+ std::map<ceph::net::Connection*, seastar::promise<>> pending_conns;
+ std::map<ceph::net::ConnectionRef, PingSessionRef> sessions;
+ MessageRef msg_ping{new MPing(), false};
+
+ Client(unsigned rounds, double keepalive_ratio)
+ : rounds(rounds),
+ keepalive_dist(std::bernoulli_distribution{keepalive_ratio}) {}
+
+ PingSessionRef find_session(ceph::net::ConnectionRef c) {
+ auto found = sessions.find(c);
+ if (found == sessions.end()) {
+ ceph_assert(false);
+ }
+ return found->second;
+ }
+
+ Dispatcher* get_local_shard() override {
+ return &(container().local());
+ }
+ seastar::future<> stop() {
+ return seastar::now();
+ }
+ seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override {
+ logger().info("{}: connected to {}", *conn, conn->get_peer_addr());
+ auto session = seastar::make_shared<PingSession>();
+ auto [i, added] = sessions.emplace(conn, session);
+ std::ignore = i;
+ ceph_assert(added);
+ session->connected_time = mono_clock::now();
+ return seastar::now();
+ }
+ seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+ MessageRef m) override {
+ auto session = find_session(c);
+ ++(session->count);
+ if (verbose) {
+ logger().info("client ms_dispatch {}", session->count);
+ }
+
+ if (session->count == rounds) {
+ logger().info("{}: finished receiving {} pongs", *c.get(), session->count);
+ session->finish_time = mono_clock::now();
+ return container().invoke_on_all([conn = c.get()](auto &client) {
+ auto found = client.pending_conns.find(conn);
+ ceph_assert(found != client.pending_conns.end());
+ found->second.set_value();
+ });
+ } else {
+ return seastar::now();
+ }
+ }
+
+ seastar::future<> init(const entity_name_t& name,
+ const std::string& lname,
+ const uint64_t nonce) {
+ return ceph::net::Messenger::create(name, lname, nonce)
+ .then([this](ceph::net::Messenger *messenger) {
+ return container().invoke_on_all([messenger](auto& client) {
+ client.msgr = messenger->get_local_shard();
+ }).then([this, messenger] {
+ return messenger->start(this);
+ });
+ });
+ }
+
+ seastar::future<> shutdown() {
+ ceph_assert(msgr);
+ return msgr->shutdown();
+ }
+
+ seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr, bool foreign_dispatch=true) {
+ mono_time start_time = mono_clock::now();
+ return msgr->connect(peer_addr, entity_name_t::TYPE_OSD)
+ .then([this, foreign_dispatch, start_time](auto conn) {
+ return seastar::futurize_apply([this, conn, foreign_dispatch] {
+ if (foreign_dispatch) {
+ return do_dispatch_pingpong(&**conn);
+ } else {
+ // NOTE: this could be faster if we don't switch cores in do_dispatch_pingpong().
+ return container().invoke_on(conn->get()->shard_id(), [conn = &**conn](auto &client) {
+ return client.do_dispatch_pingpong(conn);
+ });
+ }
+ }).finally([this, conn, start_time] {
+ return container().invoke_on(conn->get()->shard_id(), [conn, start_time](auto &client) {
+ auto session = client.find_session((*conn)->shared_from_this());
+ std::chrono::duration<double> dur_handshake = session->connected_time - start_time;
+ std::chrono::duration<double> dur_pingpong = session->finish_time - session->connected_time;
+ logger().info("{}: handshake {}, pingpong {}",
+ **conn, dur_handshake.count(), dur_pingpong.count());
+ });
+ });
+ });
+ }
+
+ private:
+ seastar::future<> do_dispatch_pingpong(ceph::net::Connection* conn) {
+ return container().invoke_on_all([conn](auto& client) {
+ auto [i, added] = client.pending_conns.emplace(conn, seastar::promise<>());
+ std::ignore = i;
+ ceph_assert(added);
+ }).then([this, conn] {
+ return seastar::do_with(0u, 0u,
+ [this, conn](auto &count_ping, auto &count_keepalive) {
+ return seastar::do_until(
+ [this, conn, &count_ping, &count_keepalive] {
+ bool stop = (count_ping == rounds);
+ if (stop) {
+ logger().info("{}: finished sending {} pings with {} keepalives",
+ *conn, count_ping, count_keepalive);
+ }
+ return stop;
+ },
+ [this, conn, &count_ping, &count_keepalive] {
+ return seastar::repeat([this, conn, &count_ping, &count_keepalive] {
+ if (keepalive_dist(rng)) {
+ return conn->keepalive()
+ .then([&count_keepalive] {
+ count_keepalive += 1;
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::no);
+ });
+ } else {
+ return conn->send(msg_ping)
+ .then([&count_ping] {
+ count_ping += 1;
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::yes);
+ });
+ }
+ });
+ }).then([this, conn] {
+ auto found = pending_conns.find(conn);
+ return found->second.get_future();
+ });
+ });
+ });
+ }
+ };
+ };
+
+ logger().info("test_echo():");
+ return seastar::when_all_succeed(
+ ceph::net::create_sharded<test_state::Server>(),
+ ceph::net::create_sharded<test_state::Server>(),
+ ceph::net::create_sharded<test_state::Client>(rounds, keepalive_ratio),
+ ceph::net::create_sharded<test_state::Client>(rounds, keepalive_ratio))
+ .then([rounds, keepalive_ratio](test_state::Server *server1,
+ test_state::Server *server2,
+ test_state::Client *client1,
+ test_state::Client *client2) {
+ // start servers and clients
+ entity_addr_t addr1;
+ addr1.parse("127.0.0.1:9010", nullptr);
+ addr1.set_type(entity_addr_t::TYPE_LEGACY);
+ entity_addr_t addr2;
+ addr2.parse("127.0.0.1:9011", nullptr);
+ addr2.set_type(entity_addr_t::TYPE_LEGACY);
+ return seastar::when_all_succeed(
+ server1->init(entity_name_t::OSD(0), "server1", 1, addr1),
+ server2->init(entity_name_t::OSD(1), "server2", 2, addr2),
+ client1->init(entity_name_t::OSD(2), "client1", 3),
+ client2->init(entity_name_t::OSD(3), "client2", 4))
+ // dispatch pingpoing
+ .then([client1, client2, server1, server2] {
+ return seastar::when_all_succeed(
+ // test connecting in parallel, accepting in parallel,
+ // and operating the connection reference from a foreign/local core
+ client1->dispatch_pingpong(server1->msgr->get_myaddr(), true),
+ client1->dispatch_pingpong(server2->msgr->get_myaddr(), false),
+ client2->dispatch_pingpong(server1->msgr->get_myaddr(), false),
+ client2->dispatch_pingpong(server2->msgr->get_myaddr(), true));
+ // shutdown
+ }).finally([client1] {
+ logger().info("client1 shutdown...");
+ return client1->shutdown();
+ }).finally([client2] {
+ logger().info("client2 shutdown...");
+ return client2->shutdown();
+ }).finally([server1] {
+ logger().info("server1 shutdown...");
+ return server1->shutdown();
+ }).finally([server2] {
+ logger().info("server2 shutdown...");
+ return server2->shutdown();
+ });
+ });
+}
+
+static seastar::future<> test_concurrent_dispatch()
+{
+ struct test_state {
+ struct Server final
+ : public ceph::net::Dispatcher,
+ public seastar::peering_sharded_service<Server> {
+ ceph::net::Messenger *msgr = nullptr;
+ int count = 0;
+ seastar::promise<> on_second; // satisfied on second dispatch
+ seastar::promise<> on_done; // satisfied when first dispatch unblocks
+
+ seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+ MessageRef m) override {
+ switch (++count) {
+ case 1:
+ // block on the first request until we reenter with the second
+ return on_second.get_future()
+ .then([this] {
+ return container().invoke_on_all([](Server& server) {
+ server.on_done.set_value();
+ });
+ });
+ case 2:
+ on_second.set_value();
+ return seastar::now();
+ default:
+ throw std::runtime_error("unexpected count");
+ }
+ }
+
+ seastar::future<> wait() { return on_done.get_future(); }
+
+ seastar::future<> init(const entity_name_t& name,
+ const std::string& lname,
+ const uint64_t nonce,
+ const entity_addr_t& addr) {
+ return ceph::net::Messenger::create(name, lname, nonce)
+ .then([this, addr](ceph::net::Messenger *messenger) {
+ return container().invoke_on_all([messenger](auto& server) {
+ server.msgr = messenger->get_local_shard();
+ }).then([messenger, addr] {
+ return messenger->bind(entity_addrvec_t{addr});
+ }).then([this, messenger] {
+ return messenger->start(this);
+ });
+ });
+ }
+
+ Dispatcher* get_local_shard() override {
+ return &(container().local());
+ }
+ seastar::future<> stop() {
+ return seastar::make_ready_future<>();
+ }
+ };
+
+ struct Client final
+ : public ceph::net::Dispatcher,
+ public seastar::peering_sharded_service<Client> {
+ ceph::net::Messenger *msgr = nullptr;
+
+ seastar::future<> init(const entity_name_t& name,
+ const std::string& lname,
+ const uint64_t nonce) {
+ return ceph::net::Messenger::create(name, lname, nonce)
+ .then([this](ceph::net::Messenger *messenger) {
+ return container().invoke_on_all([messenger](auto& client) {
+ client.msgr = messenger->get_local_shard();
+ }).then([this, messenger] {
+ return messenger->start(this);
+ });
+ });
+ }
+
+ Dispatcher* get_local_shard() override {
+ return &(container().local());
+ }
+ seastar::future<> stop() {
+ return seastar::make_ready_future<>();
+ }
+ };
+ };
+
+ logger().info("test_concurrent_dispatch():");
+ return seastar::when_all_succeed(
+ ceph::net::create_sharded<test_state::Server>(),
+ ceph::net::create_sharded<test_state::Client>())
+ .then([](test_state::Server *server,
+ test_state::Client *client) {
+ entity_addr_t addr;
+ addr.parse("127.0.0.1:9010", nullptr);
+ addr.set_type(entity_addr_t::TYPE_LEGACY);
+ addr.set_family(AF_INET);
+ return seastar::when_all_succeed(
+ server->init(entity_name_t::OSD(4), "server3", 5, addr),
+ client->init(entity_name_t::OSD(5), "client3", 6))
+ .then([server, client] {
+ return client->msgr->connect(server->msgr->get_myaddr(),
+ entity_name_t::TYPE_OSD);
+ }).then([](ceph::net::ConnectionXRef conn) {
+ // send two messages
+ (*conn)->send(MessageRef{new MPing, false});
+ (*conn)->send(MessageRef{new MPing, false});
+ }).then([server] {
+ server->wait();
+ }).finally([client] {
+ logger().info("client shutdown...");
+ return client->msgr->shutdown();
+ }).finally([server] {
+ logger().info("server shutdown...");
+ return server->msgr->shutdown();
+ });
+ });
+}
+
+}
+
+int main(int argc, char** argv)
+{
+ seastar::app_template app;
+ app.add_options()
+ ("verbose,v", bpo::value<bool>()->default_value(false),
+ "chatty if true")
+ ("rounds", bpo::value<unsigned>()->default_value(512),
+ "number of pingpong rounds")
+ ("keepalive-ratio", bpo::value<double>()->default_value(0.1),
+ "ratio of keepalive in ping messages");
+ return app.run(argc, argv, [&app] {
+ auto&& config = app.configuration();
+ verbose = config["verbose"].as<bool>();
+ auto rounds = config["rounds"].as<unsigned>();
+ auto keepalive_ratio = config["keepalive-ratio"].as<double>();
+ return test_echo(rounds, keepalive_ratio)
+ .then([] {
+ return test_concurrent_dispatch();
+ }).then([] {
+ std::cout << "All tests succeeded" << std::endl;
+ }).handle_exception([] (auto eptr) {
+ std::cout << "Test failure" << std::endl;
+ return seastar::make_exception_future<>(eptr);
+ });
+ });
+}
diff --git a/src/test/crimson/test_monc.cc b/src/test/crimson/test_monc.cc
new file mode 100644
index 00000000..671aa644
--- /dev/null
+++ b/src/test/crimson/test_monc.cc
@@ -0,0 +1,78 @@
+#include <seastar/core/app-template.hh>
+#include "common/ceph_argparse.h"
+#include "crimson/common/config_proxy.h"
+#include "crimson/mon/MonClient.h"
+#include "crimson/net/Connection.h"
+#include "crimson/net/Messenger.h"
+
+using Config = ceph::common::ConfigProxy;
+using MonClient = ceph::mon::Client;
+
+static seastar::future<> test_monc()
+{
+ return ceph::common::sharded_conf().start(EntityName{}, string_view{"ceph"}).then([] {
+ std::vector<const char*> args;
+ std::string cluster;
+ std::string conf_file_list;
+ auto init_params = ceph_argparse_early_args(args,
+ CEPH_ENTITY_TYPE_CLIENT,
+ &cluster,
+ &conf_file_list);
+ auto& conf = ceph::common::local_conf();
+ conf->name = init_params.name;
+ conf->cluster = cluster;
+ return conf.parse_config_files(conf_file_list);
+ }).then([] {
+ return ceph::common::sharded_perf_coll().start();
+ }).then([] {
+ return ceph::net::Messenger::create(entity_name_t::OSD(0), "monc", 0,
+ seastar::engine().cpu_id())
+ .then([] (ceph::net::Messenger *msgr) {
+ auto& conf = ceph::common::local_conf();
+ if (conf->ms_crc_data) {
+ msgr->set_crc_data();
+ }
+ if (conf->ms_crc_header) {
+ msgr->set_crc_header();
+ }
+ return seastar::do_with(MonClient{*msgr},
+ [msgr](auto& monc) {
+ return msgr->start(&monc).then([&monc] {
+ return seastar::with_timeout(
+ seastar::lowres_clock::now() + std::chrono::seconds{10},
+ monc.start());
+ }).then([&monc] {
+ return monc.stop();
+ });
+ }).finally([msgr] {
+ return msgr->shutdown();
+ });
+ });
+ }).finally([] {
+ return ceph::common::sharded_perf_coll().stop().then([] {
+ return ceph::common::sharded_conf().stop();
+ });
+ });
+}
+
+int main(int argc, char** argv)
+{
+ seastar::app_template app;
+ return app.run(argc, argv, [&] {
+ return test_monc().then([] {
+ std::cout << "All tests succeeded" << std::endl;
+ }).handle_exception([] (auto eptr) {
+ std::cout << "Test failure" << std::endl;
+ return seastar::make_exception_future<>(eptr);
+ });
+ });
+}
+
+
+/*
+ * Local Variables:
+ * compile-command: "make -j4 \
+ * -C ../../../build \
+ * unittest_seastar_monc"
+ * End:
+ */
diff --git a/src/test/crimson/test_perfcounters.cc b/src/test/crimson/test_perfcounters.cc
new file mode 100644
index 00000000..d5c8bb8b
--- /dev/null
+++ b/src/test/crimson/test_perfcounters.cc
@@ -0,0 +1,62 @@
+#include <pthread.h>
+#include <stdlib.h>
+#include <iostream>
+#include <fmt/format.h>
+
+#include "common/Formatter.h"
+#include "common/perf_counters.h"
+#include "crimson/common/perf_counters_collection.h"
+
+#include <seastar/core/app-template.hh>
+#include <seastar/core/sharded.hh>
+
+enum {
+ PERFTEST_FIRST = 1000000,
+ PERFTEST_INDEX,
+ PERFTEST_LAST,
+};
+
+static constexpr uint64_t PERF_VAL = 42;
+
+static seastar::future<> test_perfcounters(){
+ return ceph::common::sharded_perf_coll().start().then([] {
+ return ceph::common::sharded_perf_coll().invoke_on_all([] (auto& s){
+ std::string name =fmt::format("seastar-osd::shard-{}",seastar::engine().cpu_id());
+ PerfCountersBuilder plb(NULL, name, PERFTEST_FIRST,PERFTEST_LAST);
+ plb.add_u64_counter(PERFTEST_INDEX, "perftest_count", "count perftest");
+ auto perf_logger = plb.create_perf_counters();
+ perf_logger->inc(PERFTEST_INDEX,PERF_VAL);
+ s.get_perf_collection()->add(perf_logger);
+ });
+ }).then([]{
+ return ceph::common::sharded_perf_coll().invoke_on_all([] (auto& s){
+ auto pcc = s.get_perf_collection();
+ pcc->with_counters([](auto& by_path){
+ for (auto& perf_counter : by_path) {
+ if (PERF_VAL != perf_counter.second.perf_counters->get(PERFTEST_INDEX)) {
+ throw std::runtime_error("perf counter does not match");
+ }
+ }
+ });
+ });
+ }).finally([] {
+ return ceph::common::sharded_perf_coll().stop();
+ });
+
+}
+
+int main(int argc, char** argv)
+{
+ seastar::app_template app;
+ return app.run(argc, argv, [&] {
+ return test_perfcounters().then([] {
+ std::cout << "All tests succeeded" << std::endl;
+ }).handle_exception([] (auto eptr) {
+ std::cout << "Test failure" << std::endl;
+ return seastar::make_exception_future<>(eptr);
+ });
+ });
+
+}
+
+
diff --git a/src/test/crimson/test_thread_pool.cc b/src/test/crimson/test_thread_pool.cc
new file mode 100644
index 00000000..48be1ffe
--- /dev/null
+++ b/src/test/crimson/test_thread_pool.cc
@@ -0,0 +1,49 @@
+#include <chrono>
+#include <numeric>
+#include <seastar/core/app-template.hh>
+#include "crimson/thread/ThreadPool.h"
+
+using namespace std::chrono_literals;
+using ThreadPool = ceph::thread::ThreadPool;
+
+seastar::future<> test_accumulate(ThreadPool& tp) {
+ static constexpr auto N = 5;
+ static constexpr auto M = 1;
+ auto slow_plus = [&tp](int i) {
+ return tp.submit([=] {
+ std::this_thread::sleep_for(10ns);
+ return i + M;
+ });
+ };
+ return seastar::map_reduce(
+ boost::irange(0, N), slow_plus, 0, std::plus{}).then([] (int sum) {
+ auto r = boost::irange(0 + M, N + M);
+ if (sum != std::accumulate(r.begin(), r.end(), 0)) {
+ throw std::runtime_error("test_accumulate failed");
+ }
+ });
+}
+
+int main(int argc, char** argv)
+{
+ ThreadPool tp{2, 128, 0};
+ seastar::app_template app;
+ return app.run(argc, argv, [&tp] {
+ return tp.start().then([&tp] {
+ return test_accumulate(tp);
+ }).handle_exception([](auto e) {
+ std::cerr << "Error: " << e << std::endl;
+ seastar::engine().exit(1);
+ }).finally([&tp] {
+ return tp.stop();
+ });
+ });
+}
+
+/*
+ * Local Variables:
+ * compile-command: "make -j4 \
+ * -C ../../../build \
+ * unittest_seastar_thread_pool"
+ * End:
+ */