From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/test/rgw/test_rgw_amqp.cc | 529 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 529 insertions(+) create mode 100644 src/test/rgw/test_rgw_amqp.cc (limited to 'src/test/rgw/test_rgw_amqp.cc') diff --git a/src/test/rgw/test_rgw_amqp.cc b/src/test/rgw/test_rgw_amqp.cc new file mode 100644 index 000000000..f49d309c7 --- /dev/null +++ b/src/test/rgw/test_rgw_amqp.cc @@ -0,0 +1,529 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "rgw_amqp.h" +#include "common/ceph_context.h" +#include "amqp_mock.h" +#include +#include +#include +#include + +using namespace rgw; + +const std::chrono::milliseconds wait_time(10); +const std::chrono::milliseconds long_wait_time = wait_time*50; +const std::chrono::seconds idle_time(35); + + +class CctCleaner { + CephContext* cct; +public: + CctCleaner(CephContext* _cct) : cct(_cct) {} + ~CctCleaner() { +#ifdef WITH_SEASTAR + delete cct; +#else + cct->put(); +#endif + } +}; + +auto cct = new CephContext(CEPH_ENTITY_TYPE_CLIENT); + +CctCleaner cleaner(cct); + +class TestAMQP : public ::testing::Test { +protected: + amqp::connection_id_t conn_id; + unsigned current_dequeued = 0U; + + void SetUp() override { + ASSERT_TRUE(amqp::init(cct)); + } + + void TearDown() override { + amqp::shutdown(); + } + + // wait for at least one new (since last drain) message to be dequeueud + // and then wait for all pending answers to be received + void wait_until_drained() { + while (amqp::get_dequeued() == current_dequeued) { + std::this_thread::sleep_for(wait_time); + } + while (amqp::get_inflight() > 0) { + std::this_thread::sleep_for(wait_time); + } + current_dequeued = amqp::get_dequeued(); + } +}; + +std::atomic callback_invoked = false; + +std::atomic callbacks_invoked = 0; + +// note: because these callback are shared among different "publish" calls +// they should be used on different connections + +void my_callback_expect_ack(int rc) { + EXPECT_EQ(0, rc); + callback_invoked = true; +} + +void my_callback_expect_nack(int rc) { + EXPECT_LT(rc, 0); + callback_invoked = true; +} + +void my_callback_expect_multiple_acks(int rc) { + EXPECT_EQ(0, rc); + ++callbacks_invoked; +} + +class dynamic_callback_wrapper { + dynamic_callback_wrapper() = default; +public: + static dynamic_callback_wrapper* create() { + return new dynamic_callback_wrapper; + } + void callback(int rc) { + EXPECT_EQ(0, rc); + ++callbacks_invoked; + delete this; + } +}; + +void my_callback_expect_close_or_ack(int rc) { + // deleting the connection should trigger the callback with -4098 + // but due to race conditions, some my get an ack + EXPECT_TRUE(-4098 == rc || 0 == rc); +} + +TEST_F(TestAMQP, ConnectionOK) +{ + const auto connection_number = amqp::get_connection_count(); + auto rc = amqp::connect(conn_id, "amqp://localhost", "ex1", false, false, boost::none); + EXPECT_TRUE(rc); + EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); + rc = amqp::publish(conn_id, "topic", "message"); + EXPECT_EQ(rc, 0); +} + +TEST_F(TestAMQP, SSLConnectionOK) +{ + const int port = 5671; + const auto connection_number = amqp::get_connection_count(); + amqp_mock::set_valid_port(port); + auto rc = amqp::connect(conn_id, "amqps://localhost", "ex1", false, false, boost::none); + EXPECT_TRUE(rc); + EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); + rc = amqp::publish(conn_id, "topic", "message"); + EXPECT_EQ(rc, 0); + amqp_mock::set_valid_port(5672); +} + +TEST_F(TestAMQP, PlainAndSSLConnectionsOK) +{ + const int port = 5671; + const auto connection_number = amqp::get_connection_count(); + amqp_mock::set_valid_port(port); + amqp::connection_id_t conn_id1; + auto rc = amqp::connect(conn_id1, "amqps://localhost", "ex1", false, false, boost::none); + EXPECT_TRUE(rc); + EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); + rc = amqp::publish(conn_id1, "topic", "message"); + EXPECT_EQ(rc, 0); + EXPECT_EQ(amqp::to_string(conn_id1), "amqps://localhost:5671/?exchange=ex1"); + amqp_mock::set_valid_port(5672); + amqp::connection_id_t conn_id2; + rc = amqp::connect(conn_id2, "amqp://localhost", "ex1", false, false, boost::none); + EXPECT_TRUE(rc); + EXPECT_EQ(amqp::to_string(conn_id2), "amqp://localhost:5672/?exchange=ex1"); + EXPECT_EQ(amqp::get_connection_count(), connection_number + 2); + rc = amqp::publish(conn_id2, "topic", "message"); + EXPECT_EQ(rc, 0); +} + +TEST_F(TestAMQP, ConnectionReuse) +{ + amqp::connection_id_t conn_id1; + auto rc = amqp::connect(conn_id1, "amqp://localhost", "ex1", false, false, boost::none); + EXPECT_TRUE(rc); + const auto connection_number = amqp::get_connection_count(); + amqp::connection_id_t conn_id2; + rc = amqp::connect(conn_id2, "amqp://localhost", "ex1", false, false, boost::none); + EXPECT_TRUE(rc); + EXPECT_EQ(amqp::get_connection_count(), connection_number); + rc = amqp::publish(conn_id1, "topic", "message"); + EXPECT_EQ(rc, 0); +} + +TEST_F(TestAMQP, NameResolutionFail) +{ + callback_invoked = false; + const auto connection_number = amqp::get_connection_count(); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://kaboom", "ex1", false, false, boost::none); + EXPECT_TRUE(rc); + EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack); + EXPECT_EQ(rc, 0); + wait_until_drained(); + EXPECT_TRUE(callback_invoked); +} + +TEST_F(TestAMQP, InvalidPort) +{ + callback_invoked = false; + const auto connection_number = amqp::get_connection_count(); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://localhost:1234", "ex1", false, false, boost::none); + EXPECT_TRUE(rc); + EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack); + EXPECT_EQ(rc, 0); + wait_until_drained(); + EXPECT_TRUE(callback_invoked); +} + +TEST_F(TestAMQP, InvalidHost) +{ + callback_invoked = false; + const auto connection_number = amqp::get_connection_count(); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://0.0.0.1", "ex1", false, false, boost::none); + EXPECT_TRUE(rc); + EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); + EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack); + EXPECT_EQ(rc, 0); + wait_until_drained(); + EXPECT_TRUE(callback_invoked); +} + +TEST_F(TestAMQP, InvalidVhost) +{ + callback_invoked = false; + const auto connection_number = amqp::get_connection_count(); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://localhost/kaboom", "ex1", false, false, boost::none); + EXPECT_TRUE(rc); + EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack); + EXPECT_EQ(rc, 0); + wait_until_drained(); + EXPECT_TRUE(callback_invoked); +} + +TEST_F(TestAMQP, UserPassword) +{ + amqp_mock::set_valid_host("127.0.0.1"); + { + callback_invoked = false; + const auto connection_number = amqp::get_connection_count(); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://foo:bar@127.0.0.1", "ex1", false, false, boost::none); + EXPECT_TRUE(rc); + EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack); + EXPECT_EQ(rc, 0); + wait_until_drained(); + EXPECT_TRUE(callback_invoked); + } + // now try the same connection with default user/password + amqp_mock::set_valid_host("127.0.0.2"); + { + callback_invoked = false; + const auto connection_number = amqp::get_connection_count(); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://guest:guest@127.0.0.2", "ex1", false, false, boost::none); + EXPECT_TRUE(rc); + EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack); + EXPECT_EQ(rc, 0); + wait_until_drained(); + EXPECT_TRUE(callback_invoked); + } + amqp_mock::set_valid_host("localhost"); +} + +TEST_F(TestAMQP, URLParseError) +{ + callback_invoked = false; + const auto connection_number = amqp::get_connection_count(); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "http://localhost", "ex1", false, false, boost::none); + EXPECT_FALSE(rc); + EXPECT_EQ(amqp::get_connection_count(), connection_number); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack); + EXPECT_EQ(rc, 0); + wait_until_drained(); + EXPECT_TRUE(callback_invoked); +} + +TEST_F(TestAMQP, ExchangeMismatch) +{ + callback_invoked = false; + const auto connection_number = amqp::get_connection_count(); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "http://localhost", "ex2", false, false, boost::none); + EXPECT_FALSE(rc); + EXPECT_EQ(amqp::get_connection_count(), connection_number); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack); + EXPECT_EQ(rc, 0); + wait_until_drained(); + EXPECT_TRUE(callback_invoked); +} + +TEST_F(TestAMQP, MaxConnections) +{ + // fill up all connections + std::vector connections; + auto remaining_connections = amqp::get_max_connections() - amqp::get_connection_count(); + while (remaining_connections > 0) { + const auto host = "127.10.0." + std::to_string(remaining_connections); + amqp_mock::set_valid_host(host); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none); + EXPECT_TRUE(rc); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack); + EXPECT_EQ(rc, 0); + --remaining_connections; + connections.push_back(conn_id); + } + EXPECT_EQ(amqp::get_connection_count(), amqp::get_max_connections()); + wait_until_drained(); + // try to add another connection + { + const std::string host = "toomany"; + amqp_mock::set_valid_host(host); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none); + EXPECT_FALSE(rc); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack); + EXPECT_EQ(rc, 0); + wait_until_drained(); + } + EXPECT_EQ(amqp::get_connection_count(), amqp::get_max_connections()); + amqp_mock::set_valid_host("localhost"); +} + + +TEST_F(TestAMQP, ReceiveAck) +{ + callback_invoked = false; + const std::string host("localhost1"); + amqp_mock::set_valid_host(host); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none); + EXPECT_TRUE(rc); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack); + EXPECT_EQ(rc, 0); + wait_until_drained(); + EXPECT_TRUE(callback_invoked); + amqp_mock::set_valid_host("localhost"); +} + +TEST_F(TestAMQP, ImplicitConnectionClose) +{ + callback_invoked = false; + const std::string host("localhost1"); + amqp_mock::set_valid_host(host); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none); + EXPECT_TRUE(rc); + const auto NUMBER_OF_CALLS = 2000; + for (auto i = 0; i < NUMBER_OF_CALLS; ++i) { + auto rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_close_or_ack); + EXPECT_EQ(rc, 0); + } + wait_until_drained(); + amqp_mock::set_valid_host("localhost"); +} + +TEST_F(TestAMQP, ReceiveMultipleAck) +{ + callbacks_invoked = 0; + const std::string host("localhost1"); + amqp_mock::set_valid_host(host); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none); + EXPECT_TRUE(rc); + const auto NUMBER_OF_CALLS = 100; + for (auto i=0; i < NUMBER_OF_CALLS; ++i) { + auto rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_multiple_acks); + EXPECT_EQ(rc, 0); + } + wait_until_drained(); + EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS); + callbacks_invoked = 0; + amqp_mock::set_valid_host("localhost"); +} + +TEST_F(TestAMQP, ReceiveAckForMultiple) +{ + callbacks_invoked = 0; + const std::string host("localhost1"); + amqp_mock::set_valid_host(host); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none); + EXPECT_TRUE(rc); + amqp_mock::set_multiple(59); + const auto NUMBER_OF_CALLS = 100; + for (auto i=0; i < NUMBER_OF_CALLS; ++i) { + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_multiple_acks); + EXPECT_EQ(rc, 0); + } + wait_until_drained(); + EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS); + callbacks_invoked = 0; + amqp_mock::set_valid_host("localhost"); +} + +TEST_F(TestAMQP, DynamicCallback) +{ + callbacks_invoked = 0; + const std::string host("localhost1"); + amqp_mock::set_valid_host(host); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none); + EXPECT_TRUE(rc); + amqp_mock::set_multiple(59); + const auto NUMBER_OF_CALLS = 100; + for (auto i=0; i < NUMBER_OF_CALLS; ++i) { + rc = publish_with_confirm(conn_id, "topic", "message", + std::bind(&dynamic_callback_wrapper::callback, dynamic_callback_wrapper::create(), std::placeholders::_1)); + EXPECT_EQ(rc, 0); + } + wait_until_drained(); + EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS); + callbacks_invoked = 0; + amqp_mock::set_valid_host("localhost"); +} + +TEST_F(TestAMQP, ReceiveNack) +{ + callback_invoked = false; + amqp_mock::REPLY_ACK = false; + const std::string host("localhost2"); + amqp_mock::set_valid_host(host); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none); + EXPECT_TRUE(rc); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack); + EXPECT_EQ(rc, 0); + wait_until_drained(); + EXPECT_TRUE(callback_invoked); + amqp_mock::REPLY_ACK = true; + callback_invoked = false; + amqp_mock::set_valid_host("localhost"); +} + +TEST_F(TestAMQP, FailWrite) +{ + callback_invoked = false; + amqp_mock::FAIL_NEXT_WRITE = true; + const std::string host("localhost2"); + amqp_mock::set_valid_host(host); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none); + EXPECT_TRUE(rc); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack); + EXPECT_EQ(rc, 0); + wait_until_drained(); + EXPECT_TRUE(callback_invoked); + amqp_mock::FAIL_NEXT_WRITE = false; + callback_invoked = false; + amqp_mock::set_valid_host("localhost"); +} + +TEST_F(TestAMQP, RetryInvalidHost) +{ + callback_invoked = false; + const std::string host = "192.168.0.1"; + const auto connection_number = amqp::get_connection_count(); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://"+host, "ex1", false, false, boost::none); + EXPECT_TRUE(rc); + EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack); + EXPECT_EQ(rc, 0); + wait_until_drained(); + EXPECT_TRUE(callback_invoked); + // now next retry should be ok + callback_invoked = false; + amqp_mock::set_valid_host(host); + std::this_thread::sleep_for(long_wait_time); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack); + EXPECT_EQ(rc, 0); + wait_until_drained(); + EXPECT_TRUE(callback_invoked); + amqp_mock::set_valid_host("localhost"); +} + +TEST_F(TestAMQP, RetryInvalidPort) +{ + callback_invoked = false; + const int port = 9999; + const auto connection_number = amqp::get_connection_count(); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://localhost:" + std::to_string(port), "ex1", false, false, boost::none); + EXPECT_TRUE(rc); + EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack); + EXPECT_EQ(rc, 0); + wait_until_drained(); + EXPECT_TRUE(callback_invoked); + // now next retry should be ok + callback_invoked = false; + amqp_mock::set_valid_port(port); + std::this_thread::sleep_for(long_wait_time); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack); + EXPECT_EQ(rc, 0); + wait_until_drained(); + EXPECT_TRUE(callback_invoked); + amqp_mock::set_valid_port(5672); +} + +TEST_F(TestAMQP, RetryFailWrite) +{ + callback_invoked = false; + amqp_mock::FAIL_NEXT_WRITE = true; + const std::string host("localhost2"); + amqp_mock::set_valid_host(host); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none); + EXPECT_TRUE(rc); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack); + EXPECT_EQ(rc, 0); + wait_until_drained(); + EXPECT_TRUE(callback_invoked); + // now next retry should be ok + amqp_mock::FAIL_NEXT_WRITE = false; + callback_invoked = false; + std::this_thread::sleep_for(long_wait_time); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack); + EXPECT_EQ(rc, 0); + wait_until_drained(); + EXPECT_TRUE(callback_invoked); + amqp_mock::set_valid_host("localhost"); +} + +TEST_F(TestAMQP, IdleConnection) +{ + // this test is skipped since it takes 30seconds + //GTEST_SKIP(); + const auto connection_number = amqp::get_connection_count(); + amqp::connection_id_t conn_id; + auto rc = amqp::connect(conn_id, "amqp://localhost", "ex1", false, false, boost::none); + EXPECT_TRUE(rc); + EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); + std::this_thread::sleep_for(idle_time); + EXPECT_EQ(amqp::get_connection_count(), connection_number); + rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack); + EXPECT_EQ(rc, 0); + wait_until_drained(); + EXPECT_TRUE(callback_invoked); +} + -- cgit v1.2.3