diff options
Diffstat (limited to 'src/test/rgw/amqp_mock.cc')
-rw-r--r-- | src/test/rgw/amqp_mock.cc | 391 |
1 files changed, 391 insertions, 0 deletions
diff --git a/src/test/rgw/amqp_mock.cc b/src/test/rgw/amqp_mock.cc new file mode 100644 index 000000000..8674e5026 --- /dev/null +++ b/src/test/rgw/amqp_mock.cc @@ -0,0 +1,391 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "amqp_mock.h" +#include <amqp.h> +#include <amqp_ssl_socket.h> +#include <amqp_tcp_socket.h> +#include <string> +#include <stdarg.h> +#include <mutex> +#include <boost/lockfree/queue.hpp> +#include <openssl/ssl.h> + +namespace amqp_mock { + +std::mutex set_valid_lock; +int VALID_PORT(5672); +std::string VALID_HOST("localhost"); +std::string VALID_VHOST("/"); +std::string VALID_USER("guest"); +std::string VALID_PASSWORD("guest"); + +void set_valid_port(int port) { + std::lock_guard<std::mutex> lock(set_valid_lock); + VALID_PORT = port; +} + +void set_valid_host(const std::string& host) { + std::lock_guard<std::mutex> lock(set_valid_lock); + VALID_HOST = host; +} + +void set_valid_vhost(const std::string& vhost) { + std::lock_guard<std::mutex> lock(set_valid_lock); + VALID_VHOST = vhost; +} + +void set_valid_user(const std::string& user, const std::string& password) { + std::lock_guard<std::mutex> lock(set_valid_lock); + VALID_USER = user; + VALID_PASSWORD = password; +} + +std::atomic<unsigned> g_tag_skip = 0; +std::atomic<int> g_multiple = 0; + +void set_multiple(unsigned tag_skip) { + g_multiple = 1; + g_tag_skip = tag_skip; +} + +void reset_multiple() { + g_multiple = 0; + g_tag_skip = 0; +} + +bool FAIL_NEXT_WRITE(false); +bool FAIL_NEXT_READ(false); +bool REPLY_ACK(true); +} + +using namespace amqp_mock; + +struct amqp_connection_state_t_ { + amqp_socket_t* socket; + amqp_channel_open_ok_t* channel1; + amqp_channel_open_ok_t* channel2; + amqp_exchange_declare_ok_t* exchange; + amqp_queue_declare_ok_t* queue; + amqp_confirm_select_ok_t* confirm; + amqp_basic_consume_ok_t* consume; + bool login_called; + boost::lockfree::queue<amqp_basic_ack_t> ack_list; + boost::lockfree::queue<amqp_basic_nack_t> nack_list; + std::atomic<uint64_t> delivery_tag; + amqp_rpc_reply_t reply; + amqp_basic_ack_t ack; + amqp_basic_nack_t nack; + bool use_ssl; + // ctor + amqp_connection_state_t_() : + socket(nullptr), + channel1(nullptr), + channel2(nullptr), + exchange(nullptr), + queue(nullptr), + confirm(nullptr), + consume(nullptr), + login_called(false), + ack_list(1024), + nack_list(1024), + delivery_tag(1), + use_ssl(false) { + reply.reply_type = AMQP_RESPONSE_NONE; + } +}; + +struct amqp_socket_t_ { + void *klass; + void *ssl_ctx; + bool open_called; + // ctor + amqp_socket_t_() : klass(nullptr), ssl_ctx(nullptr), open_called(false) { + } +}; + +extern "C" { + +amqp_connection_state_t AMQP_CALL amqp_new_connection(void) { + auto s = new amqp_connection_state_t_; + return s; +} + +int amqp_destroy_connection(amqp_connection_state_t state) { + delete state->socket; + delete state->channel1; + delete state->channel2; + delete state->exchange; + delete state->queue; + delete state->confirm; + delete state->consume; + delete state; + return 0; +} + +amqp_socket_t* amqp_tcp_socket_new(amqp_connection_state_t state) { + state->socket = new amqp_socket_t; + return state->socket; +} + +amqp_socket_t* amqp_ssl_socket_new(amqp_connection_state_t state) { + state->socket = new amqp_socket_t; + state->use_ssl = true; + return state->socket; +} + +int amqp_ssl_socket_set_cacert(amqp_socket_t *self, const char *cacert) { + // do nothing + return AMQP_STATUS_OK; +} + +void amqp_ssl_socket_set_verify_peer(amqp_socket_t *self, amqp_boolean_t verify) { + // do nothing +} + +void amqp_ssl_socket_set_verify_hostname(amqp_socket_t *self, amqp_boolean_t verify) { + // do nothing +} + +#if AMQP_VERSION >= AMQP_VERSION_CODE(0, 10, 0, 1) +void* amqp_ssl_socket_get_context(amqp_socket_t *self) { + return nullptr; +} +#endif + +int SSL_CTX_set_default_verify_paths(SSL_CTX *ctx) { + return 1; +} + +int amqp_socket_open(amqp_socket_t *self, const char *host, int port) { + if (!self) { + return -1; + } + { + std::lock_guard<std::mutex> lock(set_valid_lock); + if (std::string(host) != VALID_HOST) { + return -2; + } + if (port != VALID_PORT) { + return -3; + } + } + self->open_called = true; + return 0; +} + +amqp_rpc_reply_t amqp_login( + amqp_connection_state_t state, + char const *vhost, + int channel_max, + int frame_max, + int heartbeat, + amqp_sasl_method_enum sasl_method, ...) { + state->reply.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION; + state->reply.library_error = 0; + state->reply.reply.decoded = nullptr; + state->reply.reply.id = 0; + if (std::string(vhost) != VALID_VHOST) { + return state->reply; + } + if (sasl_method != AMQP_SASL_METHOD_PLAIN) { + return state->reply; + } + va_list args; + va_start(args, sasl_method); + char* user = va_arg(args, char*); + char* password = va_arg(args, char*); + va_end(args); + if (std::string(user) != VALID_USER) { + return state->reply; + } + if (std::string(password) != VALID_PASSWORD) { + return state->reply; + } + state->reply.reply_type = AMQP_RESPONSE_NORMAL; + state->login_called = true; + return state->reply; +} + +amqp_channel_open_ok_t* amqp_channel_open(amqp_connection_state_t state, amqp_channel_t channel) { + state->reply.reply_type = AMQP_RESPONSE_NORMAL; + if (state->channel1 == nullptr) { + state->channel1 = new amqp_channel_open_ok_t; + return state->channel1; + } + + state->channel2 = new amqp_channel_open_ok_t; + return state->channel2; +} + +amqp_exchange_declare_ok_t* amqp_exchange_declare( + amqp_connection_state_t state, + amqp_channel_t channel, + amqp_bytes_t exchange, + amqp_bytes_t type, + amqp_boolean_t passive, + amqp_boolean_t durable, + amqp_boolean_t auto_delete, + amqp_boolean_t internal, + amqp_table_t arguments) { + state->exchange = new amqp_exchange_declare_ok_t; + state->reply.reply_type = AMQP_RESPONSE_NORMAL; + return state->exchange; +} + +amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state) { + return state->reply; +} + +int amqp_basic_publish( + amqp_connection_state_t state, + amqp_channel_t channel, + amqp_bytes_t exchange, + amqp_bytes_t routing_key, + amqp_boolean_t mandatory, + amqp_boolean_t immediate, + struct amqp_basic_properties_t_ const *properties, + amqp_bytes_t body) { + // make sure that all calls happened before publish + if (state->socket && state->socket->open_called && + state->login_called && state->channel1 && state->channel2 && state->exchange && + !FAIL_NEXT_WRITE) { + state->reply.reply_type = AMQP_RESPONSE_NORMAL; + if (properties) { + if (REPLY_ACK) { + state->ack_list.push(amqp_basic_ack_t{state->delivery_tag++, 0}); + } else { + state->nack_list.push(amqp_basic_nack_t{state->delivery_tag++, 0}); + } + } + return AMQP_STATUS_OK; + } + return AMQP_STATUS_CONNECTION_CLOSED; +} + +const amqp_table_t amqp_empty_table = {0, NULL}; +const amqp_bytes_t amqp_empty_bytes = {0, NULL}; + +const char* amqp_error_string2(int code) { + static const char* str = "mock error"; + return str; +} + +char const* amqp_method_name(amqp_method_number_t methodNumber) { + static const char* str = "mock method"; + return str; +} + +amqp_queue_declare_ok_t* amqp_queue_declare( + amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, + amqp_boolean_t passive, amqp_boolean_t durable, amqp_boolean_t exclusive, + amqp_boolean_t auto_delete, amqp_table_t arguments) { + state->queue = new amqp_queue_declare_ok_t; + static const char* str = "tmp-queue"; + state->queue->queue = amqp_cstring_bytes(str); + state->reply.reply_type = AMQP_RESPONSE_NORMAL; + return state->queue; +} + +amqp_confirm_select_ok_t* amqp_confirm_select(amqp_connection_state_t state, amqp_channel_t channel) { + state->confirm = new amqp_confirm_select_ok_t; + state->reply.reply_type = AMQP_RESPONSE_NORMAL; + return state->confirm; +} + +#if AMQP_VERSION >= AMQP_VERSION_CODE(0, 11, 0, 1) +int amqp_simple_wait_frame_noblock(amqp_connection_state_t state, amqp_frame_t *decoded_frame, const struct timeval* tv) { +#else +int amqp_simple_wait_frame_noblock(amqp_connection_state_t state, amqp_frame_t *decoded_frame, struct timeval* tv) { +#endif + if (state->socket && state->socket->open_called && + state->login_called && state->channel1 && state->channel2 && state->exchange && + state->queue && state->consume && state->confirm && !FAIL_NEXT_READ) { + // "wait" for queue + usleep(tv->tv_sec*1000000+tv->tv_usec); + // read from queue + if (g_multiple) { + // pop multiples and reply once at the end + for (auto i = 0U; i < g_tag_skip; ++i) { + if (REPLY_ACK && !state->ack_list.pop(state->ack)) { + // queue is empty + return AMQP_STATUS_TIMEOUT; + } else if (!REPLY_ACK && !state->nack_list.pop(state->nack)) { + // queue is empty + return AMQP_STATUS_TIMEOUT; + } + } + if (REPLY_ACK) { + state->ack.multiple = g_multiple; + decoded_frame->payload.method.id = AMQP_BASIC_ACK_METHOD; + decoded_frame->payload.method.decoded = &state->ack; + } else { + state->nack.multiple = g_multiple; + decoded_frame->payload.method.id = AMQP_BASIC_NACK_METHOD; + decoded_frame->payload.method.decoded = &state->nack; + } + decoded_frame->frame_type = AMQP_FRAME_METHOD; + state->reply.reply_type = AMQP_RESPONSE_NORMAL; + reset_multiple(); + return AMQP_STATUS_OK; + } + // pop replies one by one + if (REPLY_ACK && state->ack_list.pop(state->ack)) { + state->ack.multiple = g_multiple; + decoded_frame->frame_type = AMQP_FRAME_METHOD; + decoded_frame->payload.method.id = AMQP_BASIC_ACK_METHOD; + decoded_frame->payload.method.decoded = &state->ack; + state->reply.reply_type = AMQP_RESPONSE_NORMAL; + return AMQP_STATUS_OK; + } else if (!REPLY_ACK && state->nack_list.pop(state->nack)) { + state->nack.multiple = g_multiple; + decoded_frame->frame_type = AMQP_FRAME_METHOD; + decoded_frame->payload.method.id = AMQP_BASIC_NACK_METHOD; + decoded_frame->payload.method.decoded = &state->nack; + state->reply.reply_type = AMQP_RESPONSE_NORMAL; + return AMQP_STATUS_OK; + } else { + // queue is empty + return AMQP_STATUS_TIMEOUT; + } + } + return AMQP_STATUS_CONNECTION_CLOSED; +} + +amqp_basic_consume_ok_t* amqp_basic_consume( + amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, + amqp_bytes_t consumer_tag, amqp_boolean_t no_local, amqp_boolean_t no_ack, + amqp_boolean_t exclusive, amqp_table_t arguments) { + state->consume = new amqp_basic_consume_ok_t; + state->reply.reply_type = AMQP_RESPONSE_NORMAL; + return state->consume; +} + +} // extern "C" + +// amqp_parse_url() is linked via the actual rabbitmq-c library code. see: amqp_url.c + +// following functions are the actual implementation copied from rabbitmq-c library + +#include <string.h> + +amqp_bytes_t amqp_cstring_bytes(const char* cstr) { + amqp_bytes_t result; + result.len = strlen(cstr); + result.bytes = (void *)cstr; + return result; +} + +void amqp_bytes_free(amqp_bytes_t bytes) { free(bytes.bytes); } + +amqp_bytes_t amqp_bytes_malloc_dup(amqp_bytes_t src) { + amqp_bytes_t result; + result.len = src.len; + result.bytes = malloc(src.len); + if (result.bytes != NULL) { + memcpy(result.bytes, src.bytes, src.len); + } + return result; +} + + |