diff options
Diffstat (limited to 'src/test/mon/test-mon-msg.cc')
-rw-r--r-- | src/test/mon/test-mon-msg.cc | 339 |
1 files changed, 339 insertions, 0 deletions
diff --git a/src/test/mon/test-mon-msg.cc b/src/test/mon/test-mon-msg.cc new file mode 100644 index 000000000..1ec328d1f --- /dev/null +++ b/src/test/mon/test-mon-msg.cc @@ -0,0 +1,339 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* +* Ceph - scalable distributed file system +* +* Copyright (C) 2014 Red Hat +* +* This is free software; you can redistribute it and/or +* modify it under the terms of the GNU Lesser General Public +* License version 2.1, as published by the Free Software +* Foundation. See file COPYING. +*/ +#include <stdio.h> +#include <string.h> +#include <iostream> +#include <sstream> +#include <time.h> +#include <stdlib.h> +#include <map> + +#include "global/global_init.h" +#include "global/global_context.h" +#include "common/async/context_pool.h" +#include "common/ceph_argparse.h" +#include "common/version.h" +#include "common/dout.h" +#include "common/debug.h" +#include "common/ceph_mutex.h" +#include "common/Timer.h" +#include "common/errno.h" +#include "mon/MonClient.h" +#include "msg/Dispatcher.h" +#include "include/err.h" +#include <boost/scoped_ptr.hpp> + +#include "gtest/gtest.h" + +#include "common/config.h" +#include "include/ceph_assert.h" + +#include "messages/MMonProbe.h" +#include "messages/MRoute.h" +#include "messages/MGenericMessage.h" +#include "messages/MMonJoin.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_ +#undef dout_prefix +#define dout_prefix *_dout << "test-mon-msg " + +using namespace std; + +class MonClientHelper : public Dispatcher +{ +protected: + CephContext *cct; + ceph::async::io_context_pool poolctx; + Messenger *msg; + MonClient monc; + + ceph::mutex lock = ceph::make_mutex("mon-msg-test::lock"); + + set<int> wanted; + +public: + + explicit MonClientHelper(CephContext *cct_) + : Dispatcher(cct_), + cct(cct_), + poolctx(1), + msg(NULL), + monc(cct_, poolctx) + { } + + + int post_init() { + dout(1) << __func__ << dendl; + if (!msg) + return -EINVAL; + msg->add_dispatcher_tail(this); + return 0; + } + + int init_messenger() { + dout(1) << __func__ << dendl; + + std::string public_msgr_type = cct->_conf->ms_public_type.empty() ? cct->_conf.get_val<std::string>("ms_type") : cct->_conf->ms_public_type; + msg = Messenger::create(cct, public_msgr_type, entity_name_t::CLIENT(-1), + "test-mon-msg", 0); + ceph_assert(msg != NULL); + msg->set_default_policy(Messenger::Policy::lossy_client(0)); + dout(0) << __func__ << " starting messenger at " + << msg->get_myaddrs() << dendl; + msg->start(); + return 0; + } + + int init_monc() { + dout(1) << __func__ << dendl; + ceph_assert(msg != NULL); + int err = monc.build_initial_monmap(); + if (err < 0) { + derr << __func__ << " error building monmap: " + << cpp_strerror(err) << dendl; + return err; + } + + monc.set_messenger(msg); + msg->add_dispatcher_head(&monc); + + monc.set_want_keys(CEPH_ENTITY_TYPE_MON); + err = monc.init(); + if (err < 0) { + derr << __func__ << " monc init failed: " + << cpp_strerror(err) << dendl; + goto fail; + } + + err = monc.authenticate(); + if (err < 0) { + derr << __func__ << " monc auth failed: " + << cpp_strerror(err) << dendl; + goto fail_monc; + } + monc.wait_auth_rotating(30.0); + monc.renew_subs(); + dout(0) << __func__ << " finished" << dendl; + return 0; + +fail_monc: + derr << __func__ << " failing monc" << dendl; + monc.shutdown(); +fail: + return err; + } + + void shutdown_messenger() { + dout(0) << __func__ << dendl; + msg->shutdown(); + msg->wait(); + } + + void shutdown_monc() { + dout(0) << __func__ << dendl; + monc.shutdown(); + } + + void shutdown() { + dout(0) << __func__ << dendl; + shutdown_monc(); + shutdown_messenger(); + } + + MonMap *get_monmap() { + return &monc.monmap; + } + + int init() { + int err = init_messenger(); + if (err < 0) + goto fail; + err = init_monc(); + if (err < 0) + goto fail_msgr; + err = post_init(); + if (err < 0) + goto fail_monc; + return 0; +fail_monc: + shutdown_monc(); +fail_msgr: + shutdown_messenger(); +fail: + return err; + } + + virtual void handle_wanted(Message *m) { } + + bool handle_message(Message *m) { + dout(1) << __func__ << " " << *m << dendl; + if (!is_wanted(m)) { + dout(10) << __func__ << " not wanted" << dendl; + return false; + } + handle_wanted(m); + m->put(); + + return true; + } + + bool ms_dispatch(Message *m) override { + return handle_message(m); + } + void ms_handle_connect(Connection *con) override { } + void ms_handle_remote_reset(Connection *con) override { } + bool ms_handle_reset(Connection *con) override { return false; } + bool ms_handle_refused(Connection *con) override { return false; } + + bool is_wanted(Message *m) { + dout(20) << __func__ << " " << *m << " type " << m->get_type() << dendl; + return (wanted.find(m->get_type()) != wanted.end()); + } + + void add_wanted(int t) { + dout(20) << __func__ << " type " << t << dendl; + wanted.insert(t); + } + + void rm_wanted(int t) { + dout(20) << __func__ << " type " << t << dendl; + wanted.erase(t); + } + + void send_message(Message *m) { + dout(15) << __func__ << " " << *m << dendl; + monc.send_mon_message(m); + } + + void wait() { msg->wait(); } +}; + +class MonMsgTest : public MonClientHelper, + public ::testing::Test +{ +protected: + int reply_type = 0; + Message *reply_msg = nullptr; + ceph::mutex lock = ceph::make_mutex("lock"); + ceph::condition_variable cond; + + MonMsgTest() : + MonClientHelper(g_ceph_context) { } + +public: + void SetUp() override { + reply_type = -1; + if (reply_msg) { + reply_msg->put(); + reply_msg = nullptr; + } + ASSERT_EQ(init(), 0); + } + + void TearDown() override { + shutdown(); + if (reply_msg) { + reply_msg->put(); + reply_msg = nullptr; + } + } + + void handle_wanted(Message *m) override { + std::lock_guard l{lock}; + // caller will put() after they call us, so hold on to a ref + m->get(); + reply_msg = m; + cond.notify_all(); + } + + Message *send_wait_reply(Message *m, int t, double timeout=30.0) { + std::unique_lock l{lock}; + reply_type = t; + add_wanted(t); + send_message(m); + + std::cv_status status = std::cv_status::no_timeout; + if (timeout > 0) { + utime_t s = ceph_clock_now(); + status = cond.wait_for(l, ceph::make_timespan(timeout)); + utime_t e = ceph_clock_now(); + dout(20) << __func__ << " took " << (e-s) << " seconds" << dendl; + } else { + cond.wait(l); + } + rm_wanted(t); + l.unlock(); + if (status == std::cv_status::timeout) { + dout(20) << __func__ << " error: " << cpp_strerror(ETIMEDOUT) << dendl; + return (Message*)((long)-ETIMEDOUT); + } + + if (!reply_msg) + dout(20) << __func__ << " reply_msg is nullptr" << dendl; + else + dout(20) << __func__ << " reply_msg " << *reply_msg << dendl; + return reply_msg; + } +}; + +TEST_F(MonMsgTest, MMonProbeTest) +{ + Message *m = new MMonProbe(get_monmap()->fsid, + MMonProbe::OP_PROBE, "b", false, + ceph_release()); + Message *r = send_wait_reply(m, MSG_MON_PROBE); + ASSERT_NE(IS_ERR(r), 0); + ASSERT_EQ(PTR_ERR(r), -ETIMEDOUT); +} + +TEST_F(MonMsgTest, MRouteTest) +{ + Message *payload = new MGenericMessage(CEPH_MSG_SHUTDOWN); + MRoute *m = new MRoute; + m->msg = payload; + Message *r = send_wait_reply(m, CEPH_MSG_SHUTDOWN); + // we want an error + ASSERT_NE(IS_ERR(r), 0); + ASSERT_EQ(PTR_ERR(r), -ETIMEDOUT); +} + +/* MMonScrub and MMonSync have other safeguards in place that prevent + * us from actually receiving a reply even if the message is handled + * by the monitor due to lack of cap checking. + */ +TEST_F(MonMsgTest, MMonJoin) +{ + Message *m = new MMonJoin(get_monmap()->fsid, string("client"), + msg->get_myaddrs()); + send_wait_reply(m, MSG_MON_PAXOS, 10.0); + + int r = monc.get_monmap(); + ASSERT_EQ(r, 0); + ASSERT_FALSE(monc.monmap.contains("client")); +} + +int main(int argc, char *argv[]) +{ + auto args = argv_to_vec(argc, argv); + + auto cct = global_init(nullptr, args, + CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, + CINIT_FLAG_NO_DEFAULT_CONFIG_FILE); + common_init_finish(g_ceph_context); + g_ceph_context->_conf.apply_changes(nullptr); + ::testing::InitGoogleTest(&argc, argv); + + return RUN_ALL_TESTS(); +} + |