summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_amqp.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rgw/rgw_amqp.cc')
-rw-r--r--src/rgw/rgw_amqp.cc1051
1 files changed, 1051 insertions, 0 deletions
diff --git a/src/rgw/rgw_amqp.cc b/src/rgw/rgw_amqp.cc
new file mode 100644
index 000000000..3014edd1d
--- /dev/null
+++ b/src/rgw/rgw_amqp.cc
@@ -0,0 +1,1051 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#include "rgw_amqp.h"
+#include <amqp.h>
+#include <amqp_ssl_socket.h>
+#include <amqp_tcp_socket.h>
+#include <amqp_framing.h>
+#include "include/ceph_assert.h"
+#include <sstream>
+#include <cstring>
+#include <unordered_map>
+#include <string>
+#include <vector>
+#include <thread>
+#include <atomic>
+#include <mutex>
+#include <boost/lockfree/queue.hpp>
+#include <boost/functional/hash.hpp>
+#include "common/dout.h"
+#include <openssl/ssl.h>
+
+#define dout_subsys ceph_subsys_rgw
+
+// TODO investigation, not necessarily issues:
+// (1) in case of single threaded writer context use spsc_queue
+// (2) support multiple channels
+// (3) check performance of emptying queue to local list, and go over the list and publish
+// (4) use std::shared_mutex (c++17) or equivalent for the connections lock
+
+namespace rgw::amqp {
+
+// RGW AMQP status codes for publishing
+static const int RGW_AMQP_STATUS_BROKER_NACK = -0x1001;
+static const int RGW_AMQP_STATUS_CONNECTION_CLOSED = -0x1002;
+static const int RGW_AMQP_STATUS_QUEUE_FULL = -0x1003;
+static const int RGW_AMQP_STATUS_MAX_INFLIGHT = -0x1004;
+static const int RGW_AMQP_STATUS_MANAGER_STOPPED = -0x1005;
+// RGW AMQP status code for connection opening
+static const int RGW_AMQP_STATUS_CONN_ALLOC_FAILED = -0x2001;
+static const int RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED = -0x2002;
+static const int RGW_AMQP_STATUS_SOCKET_OPEN_FAILED = -0x2003;
+static const int RGW_AMQP_STATUS_LOGIN_FAILED = -0x2004;
+static const int RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED = -0x2005;
+static const int RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED = -0x2006;
+static const int RGW_AMQP_STATUS_Q_DECLARE_FAILED = -0x2007;
+static const int RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED = -0x2008;
+static const int RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED = -0x2009;
+static const int RGW_AMQP_STATUS_SOCKET_CACERT_FAILED = -0x2010;
+
+static const int RGW_AMQP_RESPONSE_SOCKET_ERROR = -0x3008;
+static const int RGW_AMQP_NO_REPLY_CODE = 0x0;
+
+// the amqp_connection_info struct does not hold any memory and just points to the URL string
+// so, strings are copied into connection_id_t
+connection_id_t::connection_id_t(const amqp_connection_info& info, const std::string& _exchange)
+ : host(info.host), port(info.port), vhost(info.vhost), exchange(_exchange), ssl(info.ssl) {}
+
+// equality operator and hasher functor are needed
+// so that connection_id_t could be used as key in unordered_map
+bool operator==(const connection_id_t& lhs, const connection_id_t& rhs) {
+ return lhs.host == rhs.host && lhs.port == rhs.port &&
+ lhs.vhost == rhs.vhost && lhs.exchange == rhs.exchange;
+}
+
+struct connection_id_hasher {
+ std::size_t operator()(const connection_id_t& k) const {
+ std::size_t h = 0;
+ boost::hash_combine(h, k.host);
+ boost::hash_combine(h, k.port);
+ boost::hash_combine(h, k.vhost);
+ boost::hash_combine(h, k.exchange);
+ return h;
+ }
+};
+
+std::string to_string(const connection_id_t& id) {
+ return fmt::format("{}://{}:{}{}?exchange={}",
+ id.ssl ? "amqps" : "amqp",
+ id.host, id.port, id.vhost, id.exchange);
+}
+
+// automatically cleans amqp state when gets out of scope
+class ConnectionCleaner {
+ private:
+ amqp_connection_state_t state;
+ public:
+ ConnectionCleaner(amqp_connection_state_t _state) : state(_state) {}
+ ~ConnectionCleaner() {
+ if (state) {
+ amqp_destroy_connection(state);
+ }
+ }
+ // call reset() if cleanup is not needed anymore
+ void reset() {
+ state = nullptr;
+ }
+};
+
+// struct for holding the callback and its tag in the callback list
+struct reply_callback_with_tag_t {
+ uint64_t tag;
+ reply_callback_t cb;
+
+ reply_callback_with_tag_t(uint64_t _tag, reply_callback_t _cb) : tag(_tag), cb(_cb) {}
+
+ bool operator==(uint64_t rhs) {
+ return tag == rhs;
+ }
+};
+
+typedef std::vector<reply_callback_with_tag_t> CallbackList;
+
+// struct for holding the connection state object as well as the exchange
+struct connection_t {
+ CephContext* cct = nullptr;
+ amqp_connection_state_t state = nullptr;
+ amqp_bytes_t reply_to_queue = amqp_empty_bytes;
+ uint64_t delivery_tag = 1;
+ int status = AMQP_STATUS_OK;
+ int reply_type = AMQP_RESPONSE_NORMAL;
+ int reply_code = RGW_AMQP_NO_REPLY_CODE;
+ CallbackList callbacks;
+ ceph::coarse_real_clock::time_point next_reconnect = ceph::coarse_real_clock::now();
+ bool mandatory = false;
+ const bool use_ssl = false;
+ std::string user;
+ std::string password;
+ bool verify_ssl = true;
+ boost::optional<std::string> ca_location;
+ utime_t timestamp = ceph_clock_now();
+
+ connection_t(CephContext* _cct, const amqp_connection_info& info, bool _verify_ssl, boost::optional<const std::string&> _ca_location) :
+ cct(_cct), use_ssl(info.ssl), user(info.user), password(info.password), verify_ssl(_verify_ssl), ca_location(_ca_location) {}
+
+ // cleanup of all internal connection resource
+ // the object can still remain, and internal connection
+ // resources created again on successful reconnection
+ void destroy(int s) {
+ status = s;
+ ConnectionCleaner clean_state(state);
+ state = nullptr;
+ amqp_bytes_free(reply_to_queue);
+ reply_to_queue = amqp_empty_bytes;
+ // fire all remaining callbacks
+ std::for_each(callbacks.begin(), callbacks.end(), [this](auto& cb_tag) {
+ cb_tag.cb(status);
+ ldout(cct, 20) << "AMQP destroy: invoking callback with tag=" << cb_tag.tag << dendl;
+ });
+ callbacks.clear();
+ delivery_tag = 1;
+ }
+
+ bool is_ok() const {
+ return (state != nullptr);
+ }
+
+ // dtor also destroys the internals
+ ~connection_t() {
+ destroy(RGW_AMQP_STATUS_CONNECTION_CLOSED);
+ }
+};
+
+// convert connection info to string
+std::string to_string(const amqp_connection_info& info) {
+ std::stringstream ss;
+ ss << "connection info:" <<
+ "\nHost: " << info.host <<
+ "\nPort: " << info.port <<
+ "\nUser: " << info.user <<
+ "\nPassword: " << info.password <<
+ "\nvhost: " << info.vhost <<
+ "\nSSL support: " << info.ssl << std::endl;
+ return ss.str();
+}
+
+// convert reply to error code
+int reply_to_code(const amqp_rpc_reply_t& reply) {
+ switch (reply.reply_type) {
+ case AMQP_RESPONSE_NONE:
+ case AMQP_RESPONSE_NORMAL:
+ return RGW_AMQP_NO_REPLY_CODE;
+ case AMQP_RESPONSE_LIBRARY_EXCEPTION:
+ return reply.library_error;
+ case AMQP_RESPONSE_SERVER_EXCEPTION:
+ if (reply.reply.decoded) {
+ const amqp_connection_close_t* m = (amqp_connection_close_t*)reply.reply.decoded;
+ return m->reply_code;
+ }
+ return reply.reply.id;
+ }
+ return RGW_AMQP_NO_REPLY_CODE;
+}
+
+// convert reply to string
+std::string to_string(const amqp_rpc_reply_t& reply) {
+ std::stringstream ss;
+ switch (reply.reply_type) {
+ case AMQP_RESPONSE_NORMAL:
+ return "";
+ case AMQP_RESPONSE_NONE:
+ return "missing RPC reply type";
+ case AMQP_RESPONSE_LIBRARY_EXCEPTION:
+ return amqp_error_string2(reply.library_error);
+ case AMQP_RESPONSE_SERVER_EXCEPTION:
+ {
+ switch (reply.reply.id) {
+ case AMQP_CONNECTION_CLOSE_METHOD:
+ ss << "server connection error: ";
+ break;
+ case AMQP_CHANNEL_CLOSE_METHOD:
+ ss << "server channel error: ";
+ break;
+ default:
+ ss << "server unknown error: ";
+ break;
+ }
+ if (reply.reply.decoded) {
+ amqp_connection_close_t* m = (amqp_connection_close_t*)reply.reply.decoded;
+ ss << m->reply_code << " text: " << std::string((char*)m->reply_text.bytes, m->reply_text.len);
+ }
+ return ss.str();
+ }
+ default:
+ ss << "unknown error, method id: " << reply.reply.id;
+ return ss.str();
+ }
+}
+
+// convert status enum to string
+std::string to_string(amqp_status_enum s) {
+ switch (s) {
+ case AMQP_STATUS_OK:
+ return "AMQP_STATUS_OK";
+ case AMQP_STATUS_NO_MEMORY:
+ return "AMQP_STATUS_NO_MEMORY";
+ case AMQP_STATUS_BAD_AMQP_DATA:
+ return "AMQP_STATUS_BAD_AMQP_DATA";
+ case AMQP_STATUS_UNKNOWN_CLASS:
+ return "AMQP_STATUS_UNKNOWN_CLASS";
+ case AMQP_STATUS_UNKNOWN_METHOD:
+ return "AMQP_STATUS_UNKNOWN_METHOD";
+ case AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED:
+ return "AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED";
+ case AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION:
+ return "AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION";
+ case AMQP_STATUS_CONNECTION_CLOSED:
+ return "AMQP_STATUS_CONNECTION_CLOSED";
+ case AMQP_STATUS_BAD_URL:
+ return "AMQP_STATUS_BAD_URL";
+ case AMQP_STATUS_SOCKET_ERROR:
+ return "AMQP_STATUS_SOCKET_ERROR";
+ case AMQP_STATUS_INVALID_PARAMETER:
+ return "AMQP_STATUS_INVALID_PARAMETER";
+ case AMQP_STATUS_TABLE_TOO_BIG:
+ return "AMQP_STATUS_TABLE_TOO_BIG";
+ case AMQP_STATUS_WRONG_METHOD:
+ return "AMQP_STATUS_WRONG_METHOD";
+ case AMQP_STATUS_TIMEOUT:
+ return "AMQP_STATUS_TIMEOUT";
+ case AMQP_STATUS_TIMER_FAILURE:
+ return "AMQP_STATUS_TIMER_FAILURE";
+ case AMQP_STATUS_HEARTBEAT_TIMEOUT:
+ return "AMQP_STATUS_HEARTBEAT_TIMEOUT";
+ case AMQP_STATUS_UNEXPECTED_STATE:
+ return "AMQP_STATUS_UNEXPECTED_STATE";
+ case AMQP_STATUS_SOCKET_CLOSED:
+ return "AMQP_STATUS_SOCKET_CLOSED";
+ case AMQP_STATUS_SOCKET_INUSE:
+ return "AMQP_STATUS_SOCKET_INUSE";
+ case AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD:
+ return "AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD";
+#if AMQP_VERSION >= AMQP_VERSION_CODE(0, 8, 0, 0)
+ case AMQP_STATUS_UNSUPPORTED:
+ return "AMQP_STATUS_UNSUPPORTED";
+#endif
+ case _AMQP_STATUS_NEXT_VALUE:
+ return "AMQP_STATUS_INTERNAL";
+ case AMQP_STATUS_TCP_ERROR:
+ return "AMQP_STATUS_TCP_ERROR";
+ case AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR:
+ return "AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR";
+ case _AMQP_STATUS_TCP_NEXT_VALUE:
+ return "AMQP_STATUS_INTERNAL";
+ case AMQP_STATUS_SSL_ERROR:
+ return "AMQP_STATUS_SSL_ERROR";
+ case AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED:
+ return "AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED";
+ case AMQP_STATUS_SSL_PEER_VERIFY_FAILED:
+ return "AMQP_STATUS_SSL_PEER_VERIFY_FAILED";
+ case AMQP_STATUS_SSL_CONNECTION_FAILED:
+ return "AMQP_STATUS_SSL_CONNECTION_FAILED";
+ case _AMQP_STATUS_SSL_NEXT_VALUE:
+ return "AMQP_STATUS_INTERNAL";
+#if AMQP_VERSION >= AMQP_VERSION_CODE(0, 11, 0, 0)
+ case AMQP_STATUS_SSL_SET_ENGINE_FAILED:
+ return "AMQP_STATUS_SSL_SET_ENGINE_FAILED";
+#endif
+ default:
+ return "AMQP_STATUS_UNKNOWN";
+ }
+}
+
+// TODO: add status_to_string on the connection object to prinf full status
+
+// convert int status to string - including RGW specific values
+std::string status_to_string(int s) {
+ switch (s) {
+ case RGW_AMQP_STATUS_BROKER_NACK:
+ return "RGW_AMQP_STATUS_BROKER_NACK";
+ case RGW_AMQP_STATUS_CONNECTION_CLOSED:
+ return "RGW_AMQP_STATUS_CONNECTION_CLOSED";
+ case RGW_AMQP_STATUS_QUEUE_FULL:
+ return "RGW_AMQP_STATUS_QUEUE_FULL";
+ case RGW_AMQP_STATUS_MAX_INFLIGHT:
+ return "RGW_AMQP_STATUS_MAX_INFLIGHT";
+ case RGW_AMQP_STATUS_MANAGER_STOPPED:
+ return "RGW_AMQP_STATUS_MANAGER_STOPPED";
+ case RGW_AMQP_STATUS_CONN_ALLOC_FAILED:
+ return "RGW_AMQP_STATUS_CONN_ALLOC_FAILED";
+ case RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED:
+ return "RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED";
+ case RGW_AMQP_STATUS_SOCKET_OPEN_FAILED:
+ return "RGW_AMQP_STATUS_SOCKET_OPEN_FAILED";
+ case RGW_AMQP_STATUS_LOGIN_FAILED:
+ return "RGW_AMQP_STATUS_LOGIN_FAILED";
+ case RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED:
+ return "RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED";
+ case RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED:
+ return "RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED";
+ case RGW_AMQP_STATUS_Q_DECLARE_FAILED:
+ return "RGW_AMQP_STATUS_Q_DECLARE_FAILED";
+ case RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED:
+ return "RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED";
+ case RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED:
+ return "RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED";
+ case RGW_AMQP_STATUS_SOCKET_CACERT_FAILED:
+ return "RGW_AMQP_STATUS_SOCKET_CACERT_FAILED";
+ }
+ return to_string((amqp_status_enum)s);
+}
+
+// check the result from calls and return if error (=null)
+#define RETURN_ON_ERROR(C, S, OK) \
+ if (!OK) { \
+ C->status = S; \
+ return false; \
+ }
+
+// in case of RPC calls, getting the RPC reply and return if an error is detected
+#define RETURN_ON_REPLY_ERROR(C, ST, S) { \
+ const auto reply = amqp_get_rpc_reply(ST); \
+ if (reply.reply_type != AMQP_RESPONSE_NORMAL) { \
+ C->status = S; \
+ C->reply_type = reply.reply_type; \
+ C->reply_code = reply_to_code(reply); \
+ return false; \
+ } \
+ }
+
+static const amqp_channel_t CHANNEL_ID = 1;
+static const amqp_channel_t CONFIRMING_CHANNEL_ID = 2;
+
+// utility function to create a connection, when the connection object already exists
+bool new_state(connection_t* conn, const connection_id_t& conn_id) {
+ // state must be null at this point
+ ceph_assert(!conn->state);
+ // reset all status codes
+ conn->status = AMQP_STATUS_OK;
+ conn->reply_type = AMQP_RESPONSE_NORMAL;
+ conn->reply_code = RGW_AMQP_NO_REPLY_CODE;
+
+ auto state = amqp_new_connection();
+ if (!state) {
+ conn->status = RGW_AMQP_STATUS_CONN_ALLOC_FAILED;
+ return false;
+ }
+ // make sure that the connection state is cleaned up in case of error
+ ConnectionCleaner state_guard(state);
+
+ // create and open socket
+ amqp_socket_t *socket = nullptr;
+ if (conn->use_ssl) {
+ socket = amqp_ssl_socket_new(state);
+#if AMQP_VERSION >= AMQP_VERSION_CODE(0, 10, 0, 1)
+ SSL_CTX* ssl_ctx = reinterpret_cast<SSL_CTX*>(amqp_ssl_socket_get_context(socket));
+#else
+ // taken from https://github.com/alanxz/rabbitmq-c/pull/560
+ struct hack {
+ const struct amqp_socket_class_t *klass;
+ SSL_CTX *ctx;
+ };
+
+ struct hack *h = reinterpret_cast<struct hack*>(socket);
+ SSL_CTX* ssl_ctx = h->ctx;
+#endif
+ // ensure system CA certificates get loaded
+ SSL_CTX_set_default_verify_paths(ssl_ctx);
+ }
+ else {
+ socket = amqp_tcp_socket_new(state);
+ }
+
+ if (!socket) {
+ conn->status = RGW_AMQP_STATUS_SOCKET_ALLOC_FAILED;
+ return false;
+ }
+ if (conn->use_ssl) {
+ if (!conn->verify_ssl) {
+ amqp_ssl_socket_set_verify_peer(socket, 0);
+ amqp_ssl_socket_set_verify_hostname(socket, 0);
+ }
+ if (conn->ca_location.has_value()) {
+ const auto s = amqp_ssl_socket_set_cacert(socket, conn->ca_location.get().c_str());
+ if (s != AMQP_STATUS_OK) {
+ conn->status = RGW_AMQP_STATUS_SOCKET_CACERT_FAILED;
+ conn->reply_code = s;
+ return false;
+ }
+ }
+ }
+ const auto s = amqp_socket_open(socket, conn_id.host.c_str(), conn_id.port);
+ if (s < 0) {
+ conn->status = RGW_AMQP_STATUS_SOCKET_OPEN_FAILED;
+ conn->reply_type = RGW_AMQP_RESPONSE_SOCKET_ERROR;
+ conn->reply_code = s;
+ return false;
+ }
+
+ // login to broker
+ const auto reply = amqp_login(state,
+ conn_id.vhost.c_str(),
+ AMQP_DEFAULT_MAX_CHANNELS,
+ AMQP_DEFAULT_FRAME_SIZE,
+ 0, // no heartbeat TODO: add conf
+ AMQP_SASL_METHOD_PLAIN, // TODO: add other types of security
+ conn->user.c_str(),
+ conn->password.c_str());
+ if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
+ conn->status = RGW_AMQP_STATUS_LOGIN_FAILED;
+ conn->reply_type = reply.reply_type;
+ conn->reply_code = reply_to_code(reply);
+ return false;
+ }
+
+ // open channels
+ {
+ const auto ok = amqp_channel_open(state, CHANNEL_ID);
+ RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED, ok);
+ RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED);
+ }
+ {
+ const auto ok = amqp_channel_open(state, CONFIRMING_CHANNEL_ID);
+ RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED, ok);
+ RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CHANNEL_OPEN_FAILED);
+ }
+ {
+ const auto ok = amqp_confirm_select(state, CONFIRMING_CHANNEL_ID);
+ RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED, ok);
+ RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CONFIRM_DECLARE_FAILED);
+ }
+
+ // verify that the topic exchange is there
+ // TODO: make this step optional
+ {
+ const auto ok = amqp_exchange_declare(state,
+ CHANNEL_ID,
+ amqp_cstring_bytes(conn_id.exchange.c_str()),
+ amqp_cstring_bytes("topic"),
+ 1, // passive - exchange must already exist on broker
+ 1, // durable
+ 0, // dont auto-delete
+ 0, // not internal
+ amqp_empty_table);
+ RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED, ok);
+ RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED);
+ }
+ {
+ // create queue for confirmations
+ const auto queue_ok = amqp_queue_declare(state,
+ CHANNEL_ID, // use the regular channel for this call
+ amqp_empty_bytes, // let broker allocate queue name
+ 0, // not passive - create the queue
+ 0, // not durable
+ 1, // exclusive
+ 1, // auto-delete
+ amqp_empty_table // not args TODO add args from conf: TTL, max length etc.
+ );
+ RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_Q_DECLARE_FAILED, queue_ok);
+ RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_Q_DECLARE_FAILED);
+
+ // define consumption for connection
+ const auto consume_ok = amqp_basic_consume(state,
+ CONFIRMING_CHANNEL_ID,
+ queue_ok->queue,
+ amqp_empty_bytes, // broker will generate consumer tag
+ 1, // messages sent from client are never routed back
+ 1, // client does not ack thr acks
+ 1, // exclusive access to queue
+ amqp_empty_table // no parameters
+ );
+
+ RETURN_ON_ERROR(conn, RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED, consume_ok);
+ RETURN_ON_REPLY_ERROR(conn, state, RGW_AMQP_STATUS_CONSUME_DECLARE_FAILED);
+ // broker generated consumer_tag could be used to cancel sending of n/acks from broker - not needed
+
+ state_guard.reset();
+ conn->state = state;
+ conn->reply_to_queue = amqp_bytes_malloc_dup(queue_ok->queue);
+ }
+ return true;
+}
+
+/// struct used for holding messages in the message queue
+struct message_wrapper_t {
+ connection_id_t conn_id;
+ std::string topic;
+ std::string message;
+ reply_callback_t cb;
+
+ message_wrapper_t(const connection_id_t& _conn_id,
+ const std::string& _topic,
+ const std::string& _message,
+ reply_callback_t _cb) : conn_id(_conn_id), topic(_topic), message(_message), cb(_cb) {}
+};
+
+using connection_t_ptr = std::unique_ptr<connection_t>;
+
+typedef std::unordered_map<connection_id_t, connection_t_ptr, connection_id_hasher> ConnectionList;
+typedef boost::lockfree::queue<message_wrapper_t*, boost::lockfree::fixed_sized<true>> MessageQueue;
+
+// macros used inside a loop where an iterator is either incremented or erased
+#define INCREMENT_AND_CONTINUE(IT) \
+ ++IT; \
+ continue;
+
+#define ERASE_AND_CONTINUE(IT,CONTAINER) \
+ IT=CONTAINER.erase(IT); \
+ --connection_count; \
+ continue;
+
+class Manager {
+public:
+ const size_t max_connections;
+ const size_t max_inflight;
+ const size_t max_queue;
+ const size_t max_idle_time;
+private:
+ std::atomic<size_t> connection_count;
+ std::atomic<bool> stopped;
+ struct timeval read_timeout;
+ ConnectionList connections;
+ MessageQueue messages;
+ std::atomic<size_t> queued;
+ std::atomic<size_t> dequeued;
+ CephContext* const cct;
+ mutable std::mutex connections_lock;
+ const ceph::coarse_real_clock::duration idle_time;
+ const ceph::coarse_real_clock::duration reconnect_time;
+ std::thread runner;
+
+ void publish_internal(message_wrapper_t* message) {
+ const std::unique_ptr<message_wrapper_t> msg_owner(message);
+ const auto& conn_id = message->conn_id;
+ auto conn_it = connections.find(conn_id);
+ if (conn_it == connections.end()) {
+ ldout(cct, 1) << "AMQP publish: connection '" << to_string(conn_id) << "' not found" << dendl;
+ if (message->cb) {
+ message->cb(RGW_AMQP_STATUS_CONNECTION_CLOSED);
+ }
+ return;
+ }
+
+ auto& conn = conn_it->second;
+
+ conn->timestamp = ceph_clock_now();
+
+ if (!conn->is_ok()) {
+ // connection had an issue while message was in the queue
+ ldout(cct, 1) << "AMQP publish: connection '" << to_string(conn_id) << "' is closed" << dendl;
+ if (message->cb) {
+ message->cb(RGW_AMQP_STATUS_CONNECTION_CLOSED);
+ }
+ return;
+ }
+
+ if (message->cb == nullptr) {
+ const auto rc = amqp_basic_publish(conn->state,
+ CHANNEL_ID,
+ amqp_cstring_bytes(conn_id.exchange.c_str()),
+ amqp_cstring_bytes(message->topic.c_str()),
+ 0, // does not have to be routable
+ 0, // not immediate
+ nullptr, // no properties needed
+ amqp_cstring_bytes(message->message.c_str()));
+ if (rc == AMQP_STATUS_OK) {
+ ldout(cct, 20) << "AMQP publish (no callback): OK" << dendl;
+ return;
+ }
+ ldout(cct, 1) << "AMQP publish (no callback): failed with error " << status_to_string(rc) << dendl;
+ // an error occurred, close connection
+ // it will be retied by the main loop
+ conn->destroy(rc);
+ return;
+ }
+
+ amqp_basic_properties_t props;
+ props._flags =
+ AMQP_BASIC_DELIVERY_MODE_FLAG |
+ AMQP_BASIC_REPLY_TO_FLAG;
+ props.delivery_mode = 2; // persistent delivery TODO take from conf
+ props.reply_to = conn->reply_to_queue;
+
+ const auto rc = amqp_basic_publish(conn->state,
+ CONFIRMING_CHANNEL_ID,
+ amqp_cstring_bytes(conn_id.exchange.c_str()),
+ amqp_cstring_bytes(message->topic.c_str()),
+ conn->mandatory,
+ 0, // not immediate
+ &props,
+ amqp_cstring_bytes(message->message.c_str()));
+
+ if (rc == AMQP_STATUS_OK) {
+ auto const q_len = conn->callbacks.size();
+ if (q_len < max_inflight) {
+ ldout(cct, 20) << "AMQP publish (with callback, tag=" << conn->delivery_tag << "): OK. Queue has: " << q_len << " callbacks" << dendl;
+ conn->callbacks.emplace_back(conn->delivery_tag++, message->cb);
+ } else {
+ // immediately invoke callback with error
+ ldout(cct, 1) << "AMQP publish (with callback): failed with error: callback queue full" << dendl;
+ message->cb(RGW_AMQP_STATUS_MAX_INFLIGHT);
+ }
+ } else {
+ // an error occurred, close connection
+ // it will be retied by the main loop
+ ldout(cct, 1) << "AMQP publish (with callback): failed with error: " << status_to_string(rc) << dendl;
+ conn->destroy(rc);
+ // immediately invoke callback with error
+ message->cb(rc);
+ }
+ }
+
+ // the managers thread:
+ // (1) empty the queue of messages to be published
+ // (2) loop over all connections and read acks
+ // (3) manages deleted connections
+ // (4) TODO reconnect on connection errors
+ // (5) TODO cleanup timedout callbacks
+ void run() noexcept {
+ amqp_frame_t frame;
+ while (!stopped) {
+
+ // publish all messages in the queue
+ const auto count = messages.consume_all(std::bind(&Manager::publish_internal, this, std::placeholders::_1));
+ dequeued += count;
+ ConnectionList::iterator conn_it;
+ ConnectionList::const_iterator end_it;
+ {
+ // thread safe access to the connection list
+ // once the iterators are fetched they are guaranteed to remain valid
+ std::lock_guard lock(connections_lock);
+ conn_it = connections.begin();
+ end_it = connections.end();
+ }
+ auto incoming_message = false;
+ // loop over all connections to read acks
+ for (;conn_it != end_it;) {
+
+ const auto& conn_id = conn_it->first;
+ auto& conn = conn_it->second;
+
+ if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) {
+ ldout(cct, 20) << "AMQP run: Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
+ ERASE_AND_CONTINUE(conn_it, connections);
+ }
+
+ // try to reconnect the connection if it has an error
+ if (!conn->is_ok()) {
+ const auto now = ceph::coarse_real_clock::now();
+ if (now >= conn->next_reconnect) {
+ // pointers are used temporarily inside the amqp_connection_info object
+ // as read-only values, hence the assignment, and const_cast are safe here
+ ldout(cct, 20) << "AMQP run: retry connection" << dendl;
+ if (!new_state(conn.get(), conn_id)) {
+ ldout(cct, 10) << "AMQP run: connection '" << to_string(conn_id) << "' retry failed. error: " <<
+ status_to_string(conn->status) << " (" << conn->reply_code << ")" << dendl;
+ // TODO: add error counter for failed retries
+ // TODO: add exponential backoff for retries
+ conn->next_reconnect = now + reconnect_time;
+ } else {
+ ldout(cct, 10) << "AMQP run: connection '" << to_string(conn_id) << "' retry successfull" << dendl;
+ }
+ }
+ INCREMENT_AND_CONTINUE(conn_it);
+ }
+
+ const auto rc = amqp_simple_wait_frame_noblock(conn->state, &frame, &read_timeout);
+
+ if (rc == AMQP_STATUS_TIMEOUT) {
+ // TODO mark connection as idle
+ INCREMENT_AND_CONTINUE(conn_it);
+ }
+
+ // this is just to prevent spinning idle, does not indicate that a message
+ // was successfully processed or not
+ incoming_message = true;
+
+ // check if error occurred that require reopening the connection
+ if (rc != AMQP_STATUS_OK) {
+ // an error occurred, close connection
+ // it will be retied by the main loop
+ ldout(cct, 1) << "AMQP run: connection read error: " << status_to_string(rc) << dendl;
+ conn->destroy(rc);
+ INCREMENT_AND_CONTINUE(conn_it);
+ }
+
+ if (frame.frame_type != AMQP_FRAME_METHOD) {
+ ldout(cct, 10) << "AMQP run: ignoring non n/ack messages. frame type: "
+ << unsigned(frame.frame_type) << dendl;
+ // handler is for publish confirmation only - handle only method frames
+ INCREMENT_AND_CONTINUE(conn_it);
+ }
+
+ uint64_t tag;
+ bool multiple;
+ int result;
+
+ switch (frame.payload.method.id) {
+ case AMQP_BASIC_ACK_METHOD:
+ {
+ result = AMQP_STATUS_OK;
+ const auto ack = (amqp_basic_ack_t*)frame.payload.method.decoded;
+ ceph_assert(ack);
+ tag = ack->delivery_tag;
+ multiple = ack->multiple;
+ break;
+ }
+ case AMQP_BASIC_NACK_METHOD:
+ {
+ result = RGW_AMQP_STATUS_BROKER_NACK;
+ const auto nack = (amqp_basic_nack_t*)frame.payload.method.decoded;
+ ceph_assert(nack);
+ tag = nack->delivery_tag;
+ multiple = nack->multiple;
+ break;
+ }
+ case AMQP_BASIC_REJECT_METHOD:
+ {
+ result = RGW_AMQP_STATUS_BROKER_NACK;
+ const auto reject = (amqp_basic_reject_t*)frame.payload.method.decoded;
+ tag = reject->delivery_tag;
+ multiple = false;
+ break;
+ }
+ case AMQP_CONNECTION_CLOSE_METHOD:
+ // TODO on channel close, no need to reopen the connection
+ case AMQP_CHANNEL_CLOSE_METHOD:
+ {
+ // other side closed the connection, no need to continue
+ ldout(cct, 10) << "AMQP run: connection was closed by broker" << dendl;
+ conn->destroy(rc);
+ INCREMENT_AND_CONTINUE(conn_it);
+ }
+ case AMQP_BASIC_RETURN_METHOD:
+ // message was not delivered, returned to sender
+ ldout(cct, 10) << "AMQP run: message was not routable" << dendl;
+ INCREMENT_AND_CONTINUE(conn_it);
+ break;
+ default:
+ // unexpected method
+ ldout(cct, 10) << "AMQP run: unexpected message" << dendl;
+ INCREMENT_AND_CONTINUE(conn_it);
+ }
+
+ const auto tag_it = std::find(conn->callbacks.begin(), conn->callbacks.end(), tag);
+ if (tag_it != conn->callbacks.end()) {
+ if (multiple) {
+ // n/ack all up to (and including) the tag
+ ldout(cct, 20) << "AMQP run: multiple n/acks received with tag=" << tag << " and result=" << result << dendl;
+ auto it = conn->callbacks.begin();
+ while (it->tag <= tag && it != conn->callbacks.end()) {
+ ldout(cct, 20) << "AMQP run: invoking callback with tag=" << it->tag << dendl;
+ it->cb(result);
+ it = conn->callbacks.erase(it);
+ }
+ } else {
+ // n/ack a specific tag
+ ldout(cct, 20) << "AMQP run: n/ack received, invoking callback with tag=" << tag << " and result=" << result << dendl;
+ tag_it->cb(result);
+ conn->callbacks.erase(tag_it);
+ }
+ } else {
+ ldout(cct, 10) << "AMQP run: unsolicited n/ack received with tag=" << tag << dendl;
+ }
+ // just increment the iterator
+ ++conn_it;
+ }
+ // if no messages were received or published, sleep for 100ms
+ if (count == 0 && !incoming_message) {
+ std::this_thread::sleep_for(idle_time);
+ }
+ }
+ }
+
+ // used in the dtor for message cleanup
+ static void delete_message(const message_wrapper_t* message) {
+ delete message;
+ }
+
+public:
+ Manager(size_t _max_connections,
+ size_t _max_inflight,
+ size_t _max_queue,
+ long _usec_timeout,
+ unsigned reconnect_time_ms,
+ unsigned idle_time_ms,
+ CephContext* _cct) :
+ max_connections(_max_connections),
+ max_inflight(_max_inflight),
+ max_queue(_max_queue),
+ max_idle_time(30),
+ connection_count(0),
+ stopped(false),
+ read_timeout{0, _usec_timeout},
+ connections(_max_connections),
+ messages(max_queue),
+ queued(0),
+ dequeued(0),
+ cct(_cct),
+ idle_time(std::chrono::milliseconds(idle_time_ms)),
+ reconnect_time(std::chrono::milliseconds(reconnect_time_ms)),
+ runner(&Manager::run, this) {
+ // The hashmap has "max connections" as the initial number of buckets,
+ // and allows for 10 collisions per bucket before rehash.
+ // This is to prevent rehashing so that iterators are not invalidated
+ // when a new connection is added.
+ connections.max_load_factor(10.0);
+ // give the runner thread a name for easier debugging
+ const auto rc = ceph_pthread_setname(runner.native_handle(), "amqp_manager");
+ ceph_assert(rc==0);
+ }
+
+ // non copyable
+ Manager(const Manager&) = delete;
+ const Manager& operator=(const Manager&) = delete;
+
+ // stop the main thread
+ void stop() {
+ stopped = true;
+ }
+
+ // connect to a broker, or reuse an existing connection if already connected
+ bool connect(connection_id_t& id, const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
+ boost::optional<const std::string&> ca_location) {
+ if (stopped) {
+ ldout(cct, 1) << "AMQP connect: manager is stopped" << dendl;
+ return false;
+ }
+
+ amqp_connection_info info;
+ // cache the URL so that parsing could happen in-place
+ std::vector<char> url_cache(url.c_str(), url.c_str()+url.size()+1);
+ const auto retcode = amqp_parse_url(url_cache.data(), &info);
+ if (AMQP_STATUS_OK != retcode) {
+ ldout(cct, 1) << "AMQP connect: URL parsing failed. error: " << retcode << dendl;
+ return false;
+ }
+ connection_id_t tmp_id(info, exchange);
+
+ std::lock_guard lock(connections_lock);
+ const auto it = connections.find(tmp_id);
+ if (it != connections.end()) {
+ // connection found - return even if non-ok
+ ldout(cct, 20) << "AMQP connect: connection found" << dendl;
+ id = it->first;
+ return true;
+ }
+
+ // connection not found, creating a new one
+ if (connection_count >= max_connections) {
+ ldout(cct, 1) << "AMQP connect: max connections exceeded" << dendl;
+ return false;
+ }
+ // if error occurred during creation the creation will be retried in the main thread
+ ++connection_count;
+ auto conn = connections.emplace(tmp_id, std::make_unique<connection_t>(cct, info, verify_ssl, ca_location)).first->second.get();
+ ldout(cct, 10) << "AMQP connect: new connection is created. Total connections: " << connection_count << dendl;
+ if (!new_state(conn, tmp_id)) {
+ ldout(cct, 1) << "AMQP connect: new connection '" << to_string(tmp_id) << "' is created. but state creation failed (will retry). error: " <<
+ status_to_string(conn->status) << " (" << conn->reply_code << ")" << dendl;
+ }
+ id = std::move(tmp_id);
+ return true;
+ }
+
+ // TODO publish with confirm is needed in "none" case as well, cb should be invoked publish is ok (no ack)
+ int publish(const connection_id_t& conn_id,
+ const std::string& topic,
+ const std::string& message) {
+ if (stopped) {
+ ldout(cct, 1) << "AMQP publish: manager is not running" << dendl;
+ return RGW_AMQP_STATUS_MANAGER_STOPPED;
+ }
+ auto wrapper = std::make_unique<message_wrapper_t>(conn_id, topic, message, nullptr);
+ if (messages.push(wrapper.get())) {
+ std::ignore = wrapper.release();
+ ++queued;
+ return AMQP_STATUS_OK;
+ }
+ ldout(cct, 1) << "AMQP publish: queue is full" << dendl;
+ return RGW_AMQP_STATUS_QUEUE_FULL;
+ }
+
+ int publish_with_confirm(const connection_id_t& conn_id,
+ const std::string& topic,
+ const std::string& message,
+ reply_callback_t cb) {
+ if (stopped) {
+ ldout(cct, 1) << "AMQP publish_with_confirm: manager is not running" << dendl;
+ return RGW_AMQP_STATUS_MANAGER_STOPPED;
+ }
+ auto wrapper = std::make_unique<message_wrapper_t>(conn_id, topic, message, cb);
+ if (messages.push(wrapper.get())) {
+ std::ignore = wrapper.release();
+ ++queued;
+ return AMQP_STATUS_OK;
+ }
+ ldout(cct, 1) << "AMQP publish_with_confirm: queue is full" << dendl;
+ return RGW_AMQP_STATUS_QUEUE_FULL;
+ }
+
+ // dtor wait for thread to stop
+ // then connection are cleaned-up
+ ~Manager() {
+ stopped = true;
+ runner.join();
+ messages.consume_all(delete_message);
+ }
+
+ // get the number of connections
+ size_t get_connection_count() const {
+ return connection_count;
+ }
+
+ // get the number of in-flight messages
+ size_t get_inflight() const {
+ size_t sum = 0;
+ std::lock_guard lock(connections_lock);
+ std::for_each(connections.begin(), connections.end(), [&sum](auto& conn_pair) {
+ // concurrent access to the callback vector is safe without locking
+ sum += conn_pair.second->callbacks.size();
+ });
+ return sum;
+ }
+
+ // running counter of the queued messages
+ size_t get_queued() const {
+ return queued;
+ }
+
+ // running counter of the dequeued messages
+ size_t get_dequeued() const {
+ return dequeued;
+ }
+};
+
+// singleton manager
+// note that the manager itself is not a singleton, and multiple instances may co-exist
+// TODO make the pointer atomic in allocation and deallocation to avoid race conditions
+static Manager* s_manager = nullptr;
+
+static const size_t MAX_CONNECTIONS_DEFAULT = 256;
+static const size_t MAX_INFLIGHT_DEFAULT = 8192;
+static const size_t MAX_QUEUE_DEFAULT = 8192;
+static const long READ_TIMEOUT_USEC = 100;
+static const unsigned IDLE_TIME_MS = 100;
+static const unsigned RECONNECT_TIME_MS = 100;
+
+bool init(CephContext* cct) {
+ if (s_manager) {
+ return false;
+ }
+ // TODO: take conf from CephContext
+ s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT,
+ READ_TIMEOUT_USEC, IDLE_TIME_MS, RECONNECT_TIME_MS, cct);
+ return true;
+}
+
+void shutdown() {
+ delete s_manager;
+ s_manager = nullptr;
+}
+
+bool connect(connection_id_t& conn_id, const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
+ boost::optional<const std::string&> ca_location) {
+ if (!s_manager) return false;
+ return s_manager->connect(conn_id, url, exchange, mandatory_delivery, verify_ssl, ca_location);
+}
+
+int publish(const connection_id_t& conn_id,
+ const std::string& topic,
+ const std::string& message) {
+ if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
+ return s_manager->publish(conn_id, topic, message);
+}
+
+int publish_with_confirm(const connection_id_t& conn_id,
+ const std::string& topic,
+ const std::string& message,
+ reply_callback_t cb) {
+ if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
+ return s_manager->publish_with_confirm(conn_id, topic, message, cb);
+}
+
+size_t get_connection_count() {
+ if (!s_manager) return 0;
+ return s_manager->get_connection_count();
+}
+
+size_t get_inflight() {
+ if (!s_manager) return 0;
+ return s_manager->get_inflight();
+}
+
+size_t get_queued() {
+ if (!s_manager) return 0;
+ return s_manager->get_queued();
+}
+
+size_t get_dequeued() {
+ if (!s_manager) return 0;
+ return s_manager->get_dequeued();
+}
+
+size_t get_max_connections() {
+ if (!s_manager) return MAX_CONNECTIONS_DEFAULT;
+ return s_manager->max_connections;
+}
+
+size_t get_max_inflight() {
+ if (!s_manager) return MAX_INFLIGHT_DEFAULT;
+ return s_manager->max_inflight;
+}
+
+size_t get_max_queue() {
+ if (!s_manager) return MAX_QUEUE_DEFAULT;
+ return s_manager->max_queue;
+}
+
+} // namespace amqp
+