diff options
Diffstat (limited to 'src/aclk/mqtt_websockets/mqtt_ng.c')
-rw-r--r-- | src/aclk/mqtt_websockets/mqtt_ng.c | 541 |
1 files changed, 231 insertions, 310 deletions
diff --git a/src/aclk/mqtt_websockets/mqtt_ng.c b/src/aclk/mqtt_websockets/mqtt_ng.c index 8ad6bd5c9..9abe77b5f 100644 --- a/src/aclk/mqtt_websockets/mqtt_ng.c +++ b/src/aclk/mqtt_websockets/mqtt_ng.c @@ -1,35 +1,19 @@ -// Copyright: SPDX-License-Identifier: GPL-3.0-only +// SPDX-License-Identifier: GPL-3.0-or-later #ifndef _GNU_SOURCE #define _GNU_SOURCE #endif -#include <stdint.h> -#include <stdlib.h> -#include <string.h> -#include <pthread.h> -#include <inttypes.h> - -#include "c_rhash/c_rhash.h" +#include "libnetdata/libnetdata.h" #include "common_internal.h" #include "mqtt_constants.h" -#include "mqtt_wss_log.h" #include "mqtt_ng.h" -#define UNIT_LOG_PREFIX "mqtt_client: " -#define FATAL(fmt, ...) mws_fatal(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) -#define ERROR(fmt, ...) mws_error(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) -#define WARN(fmt, ...) mws_warn (client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) -#define INFO(fmt, ...) mws_info (client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) -#define DEBUG(fmt, ...) mws_debug(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) - #define SMALL_STRING_DONT_FRAGMENT_LIMIT 128 -#define MIN(a,b) (((a)<(b))?(a):(b)) - -#define LOCK_HDR_BUFFER(buffer) pthread_mutex_lock(&((buffer)->mutex)) -#define UNLOCK_HDR_BUFFER(buffer) pthread_mutex_unlock(&((buffer)->mutex)) +#define LOCK_HDR_BUFFER(buffer) spinlock_lock(&((buffer)->spinlock)) +#define UNLOCK_HDR_BUFFER(buffer) spinlock_unlock(&((buffer)->spinlock)) #define BUFFER_FRAG_GARBAGE_COLLECT 0x01 // some packets can be marked for garbage collection @@ -75,17 +59,17 @@ struct transaction_buffer { // to be able to revert state easily // in case of error mid processing struct header_buffer state_backup; - pthread_mutex_t mutex; + SPINLOCK spinlock; struct buffer_fragment *sending_frag; }; enum mqtt_client_state { - RAW = 0, - CONNECT_PENDING, - CONNECTING, - CONNECTED, - ERROR, - DISCONNECTED + MQTT_STATE_RAW = 0, + MQTT_STATE_CONNECT_PENDING, + MQTT_STATE_CONNECTING, + MQTT_STATE_CONNECTED, + MQTT_STATE_ERROR, + MQTT_STATE_DISCONNECTED }; enum parser_state { @@ -224,7 +208,7 @@ struct topic_aliases_data { c_rhash stoi_dict; uint32_t idx_max; uint32_t idx_assigned; - pthread_rwlock_t rwlock; + SPINLOCK spinlock; }; struct mqtt_ng_client { @@ -234,8 +218,6 @@ struct mqtt_ng_client { mqtt_msg_data connect_msg; - mqtt_wss_log_ctx_t log; - mqtt_ng_send_fnc_t send_fnc_ptr; void *user_ctx; @@ -253,7 +235,7 @@ struct mqtt_ng_client { unsigned int ping_pending:1; struct mqtt_ng_stats stats; - pthread_mutex_t stats_mutex; + SPINLOCK stats_spinlock; struct topic_aliases_data tx_topic_aliases; c_rhash rx_aliases; @@ -407,7 +389,7 @@ enum memory_mode { CALLER_RESPONSIBLE }; -static inline enum memory_mode ptr2memory_mode(void * ptr) { +static enum memory_mode ptr2memory_mode(void * ptr) { if (ptr == NULL) return MEMCPY; if (ptr == CALLER_RESPONSIBILITY) @@ -492,15 +474,8 @@ static void buffer_rebuild(struct header_buffer *buf) } while(frag); } -static void buffer_garbage_collect(struct header_buffer *buf, mqtt_wss_log_ctx_t log_ctx) +static void buffer_garbage_collect(struct header_buffer *buf) { -#if !defined(MQTT_DEBUG_VERBOSE) && !defined(ADDITIONAL_CHECKS) - (void) log_ctx; -#endif -#ifdef MQTT_DEBUG_VERBOSE - mws_debug(log_ctx, "Buffer Garbage Collection!"); -#endif - struct buffer_fragment *frag = BUFFER_FIRST_FRAG(buf); while (frag) { if (!frag_is_marked_for_gc(frag)) @@ -511,12 +486,8 @@ static void buffer_garbage_collect(struct header_buffer *buf, mqtt_wss_log_ctx_t frag = frag->next; } - if (frag == BUFFER_FIRST_FRAG(buf)) { -#ifdef MQTT_DEBUG_VERBOSE - mws_debug(log_ctx, "Buffer Garbage Collection! No Space Reclaimed!"); -#endif + if (frag == BUFFER_FIRST_FRAG(buf)) return; - } if (!frag) { buf->tail_frag = NULL; @@ -535,21 +506,17 @@ static void buffer_garbage_collect(struct header_buffer *buf, mqtt_wss_log_ctx_t buffer_rebuild(buf); } -static void transaction_buffer_garbage_collect(struct transaction_buffer *buf, mqtt_wss_log_ctx_t log_ctx) +static void transaction_buffer_garbage_collect(struct transaction_buffer *buf) { -#ifdef MQTT_DEBUG_VERBOSE - mws_debug(log_ctx, "Transaction Buffer Garbage Collection! %s", buf->sending_frag == NULL ? "NULL" : "in flight message"); -#endif - // Invalidate the cached sending fragment // as we will move data around if (buf->sending_frag != &ping_frag) buf->sending_frag = NULL; - buffer_garbage_collect(&buf->hdr_buffer, log_ctx); + buffer_garbage_collect(&buf->hdr_buffer); } -static int transaction_buffer_grow(struct transaction_buffer *buf, mqtt_wss_log_ctx_t log_ctx, float rate, size_t max) +static int transaction_buffer_grow(struct transaction_buffer *buf, float rate, size_t max) { if (buf->hdr_buffer.size >= max) return 0; @@ -565,35 +532,30 @@ static int transaction_buffer_grow(struct transaction_buffer *buf, mqtt_wss_log_ void *ret = reallocz(buf->hdr_buffer.data, buf->hdr_buffer.size); if (ret == NULL) { - mws_warn(log_ctx, "Buffer growth failed (realloc)"); + nd_log(NDLS_DAEMON, NDLP_WARNING, "Buffer growth failed (realloc)"); return 1; } - mws_debug(log_ctx, "Message metadata buffer was grown"); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "Message metadata buffer was grown"); buf->hdr_buffer.data = ret; buffer_rebuild(&buf->hdr_buffer); return 0; } -inline static int transaction_buffer_init(struct transaction_buffer *to_init, size_t size) +inline static void transaction_buffer_init(struct transaction_buffer *to_init, size_t size) { - pthread_mutex_init(&to_init->mutex, NULL); + spinlock_init(&to_init->spinlock); to_init->hdr_buffer.size = size; to_init->hdr_buffer.data = mallocz(size); - if (to_init->hdr_buffer.data == NULL) - return 1; - to_init->hdr_buffer.tail = to_init->hdr_buffer.data; to_init->hdr_buffer.tail_frag = NULL; - return 0; } static void transaction_buffer_destroy(struct transaction_buffer *to_init) { buffer_purge(&to_init->hdr_buffer); - pthread_mutex_destroy(&to_init->mutex); freez(to_init->hdr_buffer.data); } @@ -629,54 +591,30 @@ void transaction_buffer_transaction_rollback(struct transaction_buffer *buf, str struct mqtt_ng_client *mqtt_ng_init(struct mqtt_ng_init *settings) { struct mqtt_ng_client *client = callocz(1, sizeof(struct mqtt_ng_client)); - if (client == NULL) - return NULL; - if (transaction_buffer_init(&client->main_buffer, HEADER_BUFFER_SIZE)) - goto err_free_client; + transaction_buffer_init(&client->main_buffer, HEADER_BUFFER_SIZE); client->rx_aliases = RX_ALIASES_INITIALIZE(); - if (client->rx_aliases == NULL) - goto err_free_trx_buf; - if (pthread_mutex_init(&client->stats_mutex, NULL)) - goto err_free_rx_alias; + spinlock_init(&client->stats_spinlock); + spinlock_init(&client->tx_topic_aliases.spinlock); client->tx_topic_aliases.stoi_dict = TX_ALIASES_INITIALIZE(); - if (client->tx_topic_aliases.stoi_dict == NULL) - goto err_free_stats_mutex; client->tx_topic_aliases.idx_max = UINT16_MAX; - if (pthread_rwlock_init(&client->tx_topic_aliases.rwlock, NULL)) - goto err_free_tx_alias; - // TODO just embed the struct into mqtt_ng_client client->parser.received_data = settings->data_in; client->send_fnc_ptr = settings->data_out_fnc; client->user_ctx = settings->user_ctx; - client->log = settings->log; - client->puback_callback = settings->puback_callback; client->connack_callback = settings->connack_callback; client->msg_callback = settings->msg_callback; return client; - -err_free_tx_alias: - c_rhash_destroy(client->tx_topic_aliases.stoi_dict); -err_free_stats_mutex: - pthread_mutex_destroy(&client->stats_mutex); -err_free_rx_alias: - c_rhash_destroy(client->rx_aliases); -err_free_trx_buf: - transaction_buffer_destroy(&client->main_buffer); -err_free_client: - freez(client); - return NULL; } -static inline uint8_t get_control_packet_type(uint8_t first_hdr_byte) +static uint8_t get_control_packet_type(uint8_t first_hdr_byte) { return first_hdr_byte >> 4; } @@ -708,33 +646,27 @@ static void mqtt_ng_destroy_tx_alias_hash(c_rhash hash) void mqtt_ng_destroy(struct mqtt_ng_client *client) { transaction_buffer_destroy(&client->main_buffer); - pthread_mutex_destroy(&client->stats_mutex); mqtt_ng_destroy_tx_alias_hash(client->tx_topic_aliases.stoi_dict); - pthread_rwlock_destroy(&client->tx_topic_aliases.rwlock); mqtt_ng_destroy_rx_alias_hash(client->rx_aliases); freez(client); } -int frag_set_external_data(mqtt_wss_log_ctx_t log, struct buffer_fragment *frag, void *data, size_t data_len, free_fnc_t data_free_fnc) +int frag_set_external_data(struct buffer_fragment *frag, void *data, size_t data_len, free_fnc_t data_free_fnc) { if (frag->len) { // TODO?: This could potentially be done in future if we set rule // external data always follows in buffer data // could help reduce fragmentation in some messages but // currently not worth it considering time is tight - mws_fatal(log, UNIT_LOG_PREFIX "INTERNAL ERROR: Cannot set external data to fragment already containing in buffer data!"); + nd_log(NDLS_DAEMON, NDLP_ERR, "INTERNAL ERROR: Cannot set external data to fragment already containing in buffer data!"); return 1; } switch (ptr2memory_mode(data_free_fnc)) { case MEMCPY: frag->data = mallocz(data_len); - if (frag->data == NULL) { - mws_error(log, UNIT_LOG_PREFIX "OOM while malloc @_optimized_add"); - return 1; - } memcpy(frag->data, data, data_len); break; case EXTERNAL_FREE_AFTER_USE: @@ -816,18 +748,18 @@ static size_t mqtt_ng_connect_size(struct mqtt_auth_properties *auth, #define PACK_2B_INT(buffer, integer, frag) { *(uint16_t *)WRITE_POS(frag) = htobe16((integer)); \ DATA_ADVANCE(buffer, sizeof(uint16_t), frag); } -static int _optimized_add(struct header_buffer *buf, mqtt_wss_log_ctx_t log_ctx, void *data, size_t data_len, free_fnc_t data_free_fnc, struct buffer_fragment **frag) +static int _optimized_add(struct header_buffer *buf, void *data, size_t data_len, free_fnc_t data_free_fnc, struct buffer_fragment **frag) { if (data_len > SMALL_STRING_DONT_FRAGMENT_LIMIT) { buffer_frag_flag_t flags = BUFFER_FRAG_DATA_EXTERNAL; if ((*frag)->flags & BUFFER_FRAG_GARBAGE_COLLECT_ON_SEND) flags |= BUFFER_FRAG_GARBAGE_COLLECT_ON_SEND; if( (*frag = buffer_new_frag(buf, flags)) == NULL ) { - mws_error(log_ctx, "Out of buffer space while generating the message"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Out of buffer space while generating the message"); return 1; } - if (frag_set_external_data(log_ctx, *frag, data, data_len, data_free_fnc)) { - mws_error(log_ctx, "Error adding external data to newly created fragment"); + if (frag_set_external_data(*frag, data, data_len, data_free_fnc)) { + nd_log(NDLS_DAEMON, NDLP_ERR, "Error adding external data to newly created fragment"); return 1; } // we dont want to write to this fragment anymore @@ -842,31 +774,30 @@ static int _optimized_add(struct header_buffer *buf, mqtt_wss_log_ctx_t log_ctx, return 0; } -#define TRY_GENERATE_MESSAGE(generator_function, client, ...) \ - int rc = generator_function(&client->main_buffer, client->log, ##__VA_ARGS__); \ +#define TRY_GENERATE_MESSAGE(generator_function, ...) \ + int rc = generator_function(&client->main_buffer, ##__VA_ARGS__); \ if (rc == MQTT_NG_MSGGEN_BUFFER_OOM) { \ LOCK_HDR_BUFFER(&client->main_buffer); \ - transaction_buffer_garbage_collect((&client->main_buffer), client->log); \ + transaction_buffer_garbage_collect((&client->main_buffer)); \ UNLOCK_HDR_BUFFER(&client->main_buffer); \ - rc = generator_function(&client->main_buffer, client->log, ##__VA_ARGS__); \ + rc = generator_function(&client->main_buffer, ##__VA_ARGS__); \ if (rc == MQTT_NG_MSGGEN_BUFFER_OOM && client->max_mem_bytes) { \ LOCK_HDR_BUFFER(&client->main_buffer); \ - transaction_buffer_grow((&client->main_buffer), client->log, GROWTH_FACTOR, client->max_mem_bytes); \ + transaction_buffer_grow((&client->main_buffer),GROWTH_FACTOR, client->max_mem_bytes); \ UNLOCK_HDR_BUFFER(&client->main_buffer); \ - rc = generator_function(&client->main_buffer, client->log, ##__VA_ARGS__); \ + rc = generator_function(&client->main_buffer, ##__VA_ARGS__); \ } \ if (rc == MQTT_NG_MSGGEN_BUFFER_OOM) \ - mws_error(client->log, "%s failed to generate message due to insufficient buffer space (line %d)", __FUNCTION__, __LINE__); \ + nd_log(NDLS_DAEMON, NDLP_ERR, "%s failed to generate message due to insufficient buffer space (line %d)", __FUNCTION__, __LINE__); \ } \ if (rc == MQTT_NG_MSGGEN_OK) { \ - pthread_mutex_lock(&client->stats_mutex); \ + spinlock_lock(&client->stats_spinlock); \ client->stats.tx_messages_queued++; \ - pthread_mutex_unlock(&client->stats_mutex); \ + spinlock_unlock(&client->stats_spinlock); \ } \ return rc; mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf, - mqtt_wss_log_ctx_t log_ctx, struct mqtt_auth_properties *auth, struct mqtt_lwt_properties *lwt, uint8_t clean_start, @@ -874,7 +805,7 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf, { // Sanity Checks First (are given parameters correct and up to MQTT spec) if (!auth->client_id) { - mws_error(log_ctx, "ClientID must be set. [MQTT-3.1.3-3]"); + nd_log(NDLS_DAEMON, NDLP_ERR, "ClientID must be set. [MQTT-3.1.3-3]"); return NULL; } @@ -885,29 +816,29 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf, // however server MUST allow ClientIDs between 1-23 bytes [MQTT-3.1.3-5] // so we will warn client server might not like this and he is using it // at his own risk! - mws_warn(log_ctx, "client_id provided is empty string. This might not be allowed by server [MQTT-3.1.3-6]"); + nd_log(NDLS_DAEMON, NDLP_WARNING, "client_id provided is empty string. This might not be allowed by server [MQTT-3.1.3-6]"); } if(len > MQTT_MAX_CLIENT_ID) { // [MQTT-3.1.3-5] server MUST allow client_id length 1-32 // server MAY allow longer client_id, if user provides longer client_id // warn them he is doing so at his own risk! - mws_warn(log_ctx, "client_id provided is longer than 23 bytes, server might not allow that [MQTT-3.1.3-5]"); + nd_log(NDLS_DAEMON, NDLP_WARNING, "client_id provided is longer than 23 bytes, server might not allow that [MQTT-3.1.3-5]"); } if (lwt) { if (lwt->will_message && lwt->will_message_size > 65535) { - mws_error(log_ctx, "Will message cannot be longer than 65535 bytes due to MQTT protocol limitations [MQTT-3.1.3-4] and [MQTT-1.5.6]"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Will message cannot be longer than 65535 bytes due to MQTT protocol limitations [MQTT-3.1.3-4] and [MQTT-1.5.6]"); return NULL; } if (!lwt->will_topic) { //TODO topic given with strlen==0 ? check specs - mws_error(log_ctx, "If will message is given will topic must also be given [MQTT-3.1.3.3]"); + nd_log(NDLS_DAEMON, NDLP_ERR, "If will message is given will topic must also be given [MQTT-3.1.3.3]"); return NULL; } if (lwt->will_qos > MQTT_MAX_QOS) { // refer to [MQTT-3-1.2-12] - mws_error(log_ctx, "QOS for LWT message is bigger than max"); + nd_log(NDLS_DAEMON, NDLP_ERR, "QOS for LWT message is bigger than max"); return NULL; } } @@ -941,8 +872,10 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf, *connect_flags = 0; if (auth->username) *connect_flags |= MQTT_CONNECT_FLAG_USERNAME; + if (auth->password) *connect_flags |= MQTT_CONNECT_FLAG_PASSWORD; + if (lwt) { *connect_flags |= MQTT_CONNECT_FLAG_LWT; *connect_flags |= lwt->will_qos << MQTT_CONNECT_FLAG_QOS_BITSHIFT; @@ -966,7 +899,7 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf, // [MQTT-3.1.3.1] Client identifier CHECK_BYTES_AVAILABLE(&trx_buf->hdr_buffer, 2, goto fail_rollback); PACK_2B_INT(&trx_buf->hdr_buffer, strlen(auth->client_id), frag); - if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, auth->client_id, strlen(auth->client_id), auth->client_id_free, &frag)) + if (_optimized_add(&trx_buf->hdr_buffer, auth->client_id, strlen(auth->client_id), auth->client_id_free, &frag)) goto fail_rollback; if (lwt != NULL) { @@ -980,7 +913,7 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf, // Will Topic [MQTT-3.1.3.3] CHECK_BYTES_AVAILABLE(&trx_buf->hdr_buffer, 2, goto fail_rollback); PACK_2B_INT(&trx_buf->hdr_buffer, strlen(lwt->will_topic), frag); - if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, lwt->will_topic, strlen(lwt->will_topic), lwt->will_topic_free, &frag)) + if (_optimized_add(&trx_buf->hdr_buffer, lwt->will_topic, strlen(lwt->will_topic), lwt->will_topic_free, &frag)) goto fail_rollback; // Will Payload [MQTT-3.1.3.4] @@ -988,7 +921,7 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf, BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback); CHECK_BYTES_AVAILABLE(&trx_buf->hdr_buffer, 2, goto fail_rollback); PACK_2B_INT(&trx_buf->hdr_buffer, lwt->will_message_size, frag); - if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, lwt->will_message, lwt->will_message_size, lwt->will_topic_free, &frag)) + if (_optimized_add(&trx_buf->hdr_buffer, lwt->will_message, lwt->will_message_size, lwt->will_topic_free, &frag)) goto fail_rollback; } } @@ -998,7 +931,7 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf, BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback); CHECK_BYTES_AVAILABLE(&trx_buf->hdr_buffer, 2, goto fail_rollback); PACK_2B_INT(&trx_buf->hdr_buffer, strlen(auth->username), frag); - if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, auth->username, strlen(auth->username), auth->username_free, &frag)) + if (_optimized_add(&trx_buf->hdr_buffer, auth->username, strlen(auth->username), auth->username_free, &frag)) goto fail_rollback; } @@ -1007,7 +940,7 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf, BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback); CHECK_BYTES_AVAILABLE(&trx_buf->hdr_buffer, 2, goto fail_rollback); PACK_2B_INT(&trx_buf->hdr_buffer, strlen(auth->password), frag); - if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, auth->password, strlen(auth->password), auth->password_free, &frag)) + if (_optimized_add(&trx_buf->hdr_buffer, auth->password, strlen(auth->password), auth->password_free, &frag)) goto fail_rollback; } trx_buf->hdr_buffer.tail_frag->flags |= BUFFER_FRAG_MQTT_PACKET_TAIL; @@ -1024,7 +957,7 @@ int mqtt_ng_connect(struct mqtt_ng_client *client, uint8_t clean_start, uint16_t keep_alive) { - client->client_state = RAW; + client->client_state = MQTT_STATE_RAW; client->parser.state = MQTT_PARSE_FIXED_HEADER_PACKET_TYPE; LOCK_HDR_BUFFER(&client->main_buffer); @@ -1033,28 +966,23 @@ int mqtt_ng_connect(struct mqtt_ng_client *client, buffer_purge(&client->main_buffer.hdr_buffer); UNLOCK_HDR_BUFFER(&client->main_buffer); - pthread_rwlock_wrlock(&client->tx_topic_aliases.rwlock); + spinlock_lock(&client->tx_topic_aliases.spinlock); // according to MQTT spec topic aliases should not be persisted // even if clean session is true mqtt_ng_destroy_tx_alias_hash(client->tx_topic_aliases.stoi_dict); + client->tx_topic_aliases.stoi_dict = TX_ALIASES_INITIALIZE(); - if (client->tx_topic_aliases.stoi_dict == NULL) { - pthread_rwlock_unlock(&client->tx_topic_aliases.rwlock); - return 1; - } client->tx_topic_aliases.idx_assigned = 0; - pthread_rwlock_unlock(&client->tx_topic_aliases.rwlock); + spinlock_unlock(&client->tx_topic_aliases.spinlock); mqtt_ng_destroy_rx_alias_hash(client->rx_aliases); client->rx_aliases = RX_ALIASES_INITIALIZE(); - if (client->rx_aliases == NULL) - return 1; - client->connect_msg = mqtt_ng_generate_connect(&client->main_buffer, client->log, auth, lwt, clean_start, keep_alive); + client->connect_msg = mqtt_ng_generate_connect(&client->main_buffer, auth, lwt, clean_start, keep_alive); if (client->connect_msg == NULL) return 1; - pthread_mutex_lock(&client->stats_mutex); + spinlock_lock(&client->stats_spinlock); if (clean_start) client->stats.tx_messages_queued = 1; else @@ -1062,9 +990,9 @@ int mqtt_ng_connect(struct mqtt_ng_client *client, client->stats.tx_messages_sent = 0; client->stats.rx_messages_rcvd = 0; - pthread_mutex_unlock(&client->stats_mutex); + spinlock_unlock(&client->stats_spinlock); - client->client_state = CONNECT_PENDING; + client->client_state = MQTT_STATE_CONNECT_PENDING; return 0; } @@ -1074,15 +1002,16 @@ uint16_t get_unused_packet_id() { return packet_id ? packet_id : ++packet_id; } -static inline size_t mqtt_ng_publish_size(const char *topic, - size_t msg_len, - uint16_t topic_id) +static size_t mqtt_ng_publish_size( + const char *topic, + size_t msg_len, + uint16_t topic_id) { - size_t retval = 2 /* Topic Name Length */ - + (topic == NULL ? 0 : strlen(topic)) - + 2 /* Packet identifier */ - + 1 /* Properties Length TODO for now fixed to 1 property */ - + msg_len; + size_t retval = 2 + + (topic == NULL ? 0 : strlen(topic)) /* Topic Name Length */ + + 2 /* Packet identifier */ + + 1 /* Properties Length for now fixed to 1 property */ + + msg_len; if (topic_id) retval += 3; @@ -1091,7 +1020,6 @@ static inline size_t mqtt_ng_publish_size(const char *topic, } int mqtt_ng_generate_publish(struct transaction_buffer *trx_buf, - mqtt_wss_log_ctx_t log_ctx, char *topic, free_fnc_t topic_free, void *msg, @@ -1130,7 +1058,7 @@ int mqtt_ng_generate_publish(struct transaction_buffer *trx_buf, // [MQTT-3.3.2.1] PACK_2B_INT(&trx_buf->hdr_buffer, topic == NULL ? 0 : strlen(topic), frag); if (topic != NULL) { - if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, topic, strlen(topic), topic_free, &frag)) + if (_optimized_add(&trx_buf->hdr_buffer, topic, strlen(topic), topic_free, &frag)) goto fail_rollback; BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback); } @@ -1154,7 +1082,7 @@ int mqtt_ng_generate_publish(struct transaction_buffer *trx_buf, if( (frag = buffer_new_frag(&trx_buf->hdr_buffer, BUFFER_FRAG_DATA_EXTERNAL)) == NULL ) goto fail_rollback; - if (frag_set_external_data(log_ctx, frag, msg, msg_len, msg_free)) + if (frag_set_external_data(frag, msg, msg_len, msg_free)) goto fail_rollback; trx_buf->hdr_buffer.tail_frag->flags |= BUFFER_FRAG_MQTT_PACKET_TAIL; @@ -1178,9 +1106,9 @@ int mqtt_ng_publish(struct mqtt_ng_client *client, uint16_t *packet_id) { struct topic_alias_data *alias = NULL; - pthread_rwlock_rdlock(&client->tx_topic_aliases.rwlock); + spinlock_lock(&client->tx_topic_aliases.spinlock); c_rhash_get_ptr_by_str(client->tx_topic_aliases.stoi_dict, topic, (void**)&alias); - pthread_rwlock_unlock(&client->tx_topic_aliases.rwlock); + spinlock_unlock(&client->tx_topic_aliases.spinlock); uint16_t topic_id = 0; @@ -1194,14 +1122,14 @@ int mqtt_ng_publish(struct mqtt_ng_client *client, } if (client->max_msg_size && PUBLISH_SP_SIZE + mqtt_ng_publish_size(topic, msg_len, topic_id) > client->max_msg_size) { - mws_error(client->log, "Message too big for server: %zu", msg_len); + nd_log(NDLS_DAEMON, NDLP_ERR, "Message too big for server: %zu", msg_len); return MQTT_NG_MSGGEN_MSG_TOO_BIG; } - TRY_GENERATE_MESSAGE(mqtt_ng_generate_publish, client, topic, topic_free, msg, msg_free, msg_len, publish_flags, packet_id, topic_id); + TRY_GENERATE_MESSAGE(mqtt_ng_generate_publish, topic, topic_free, msg, msg_free, msg_len, publish_flags, packet_id, topic_id); } -static inline size_t mqtt_ng_subscribe_size(struct mqtt_sub *subs, size_t sub_count) +static size_t mqtt_ng_subscribe_size(struct mqtt_sub *subs, size_t sub_count) { size_t len = 2 /* Packet Identifier */ + 1 /* Properties Length TODO for now fixed 0 */; len += sub_count * (2 /* topic filter string length */ + 1 /* [MQTT-3.8.3.1] Subscription Options Byte */); @@ -1212,7 +1140,7 @@ static inline size_t mqtt_ng_subscribe_size(struct mqtt_sub *subs, size_t sub_co return len; } -int mqtt_ng_generate_subscribe(struct transaction_buffer *trx_buf, mqtt_wss_log_ctx_t log_ctx, struct mqtt_sub *subs, size_t sub_count) +int mqtt_ng_generate_subscribe(struct transaction_buffer *trx_buf, struct mqtt_sub *subs, size_t sub_count) { // >> START THE RODEO << transaction_buffer_transaction_start(trx_buf); @@ -1247,7 +1175,7 @@ int mqtt_ng_generate_subscribe(struct transaction_buffer *trx_buf, mqtt_wss_log_ for (size_t i = 0; i < sub_count; i++) { BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback); PACK_2B_INT(&trx_buf->hdr_buffer, strlen(subs[i].topic), frag); - if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, subs[i].topic, strlen(subs[i].topic), subs[i].topic_free, &frag)) + if (_optimized_add(&trx_buf->hdr_buffer, subs[i].topic, strlen(subs[i].topic), subs[i].topic_free, &frag)) goto fail_rollback; BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback); *WRITE_POS(frag) = subs[i].options; @@ -1264,12 +1192,11 @@ fail_rollback: int mqtt_ng_subscribe(struct mqtt_ng_client *client, struct mqtt_sub *subs, size_t sub_count) { - TRY_GENERATE_MESSAGE(mqtt_ng_generate_subscribe, client, subs, sub_count); + TRY_GENERATE_MESSAGE(mqtt_ng_generate_subscribe, subs, sub_count); } -int mqtt_ng_generate_disconnect(struct transaction_buffer *trx_buf, mqtt_wss_log_ctx_t log_ctx, uint8_t reason_code) +int mqtt_ng_generate_disconnect(struct transaction_buffer *trx_buf, uint8_t reason_code) { - (void) log_ctx; // >> START THE RODEO << transaction_buffer_transaction_start(trx_buf); @@ -1308,12 +1235,11 @@ fail_rollback: int mqtt_ng_disconnect(struct mqtt_ng_client *client, uint8_t reason_code) { - TRY_GENERATE_MESSAGE(mqtt_ng_generate_disconnect, client, reason_code); + TRY_GENERATE_MESSAGE(mqtt_ng_generate_disconnect, reason_code); } -static int mqtt_generate_puback(struct transaction_buffer *trx_buf, mqtt_wss_log_ctx_t log_ctx, uint16_t packet_id, uint8_t reason_code) +static int mqtt_generate_puback(struct transaction_buffer *trx_buf, uint16_t packet_id, uint8_t reason_code) { - (void) log_ctx; // >> START THE RODEO << transaction_buffer_transaction_start(trx_buf); @@ -1353,7 +1279,7 @@ fail_rollback: static int mqtt_ng_puback(struct mqtt_ng_client *client, uint16_t packet_id, uint8_t reason_code) { - TRY_GENERATE_MESSAGE(mqtt_generate_puback, client, packet_id, reason_code); + TRY_GENERATE_MESSAGE(mqtt_generate_puback, packet_id, reason_code); } int mqtt_ng_ping(struct mqtt_ng_client *client) @@ -1370,7 +1296,6 @@ int mqtt_ng_ping(struct mqtt_ng_client *client) #define MQTT_NG_CLIENT_PROTOCOL_ERROR -1 #define MQTT_NG_CLIENT_SERVER_RETURNED_ERROR -2 #define MQTT_NG_CLIENT_NOT_IMPL_YET -3 -#define MQTT_NG_CLIENT_OOM -4 #define MQTT_NG_CLIENT_INTERNAL_ERROR -5 #define BUF_READ_CHECK_AT_LEAST(buf, x) \ @@ -1379,10 +1304,10 @@ int mqtt_ng_ping(struct mqtt_ng_client *client) #define vbi_parser_reset_ctx(ctx) memset(ctx, 0, sizeof(struct mqtt_vbi_parser_ctx)) -static int vbi_parser_parse(struct mqtt_vbi_parser_ctx *ctx, rbuf_t data, mqtt_wss_log_ctx_t log) +static int vbi_parser_parse(struct mqtt_vbi_parser_ctx *ctx, rbuf_t data) { if (ctx->bytes > MQTT_VBI_MAXBYTES - 1) { - mws_error(log, "MQTT Variable Byte Integer can't be longer than %d bytes", MQTT_VBI_MAXBYTES); + nd_log(NDLS_DAEMON, NDLP_ERR, "MQTT Variable Byte Integer can't be longer than %d bytes", MQTT_VBI_MAXBYTES); return MQTT_NG_CLIENT_PROTOCOL_ERROR; } if (!ctx->bytes || ctx->data[ctx->bytes-1] & MQTT_VBI_CONTINUATION_FLAG) { @@ -1394,7 +1319,7 @@ static int vbi_parser_parse(struct mqtt_vbi_parser_ctx *ctx, rbuf_t data, mqtt_w } if (mqtt_vbi_to_uint32(ctx->data, &ctx->result)) { - mws_error(log, "MQTT Variable Byte Integer failed to be parsed."); + nd_log(NDLS_DAEMON, NDLP_ERR, "MQTT Variable Byte Integer failed to be parsed."); return MQTT_NG_CLIENT_PROTOCOL_ERROR; } @@ -1480,12 +1405,12 @@ struct mqtt_property *get_property_by_id(struct mqtt_property *props, uint8_t pr } // Parses [MQTT-2.2.2] -static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t data, mqtt_wss_log_ctx_t log) +static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t data) { int rc; switch (ctx->state) { case PROPERTIES_LENGTH: - rc = vbi_parser_parse(&ctx->vbi_parser_ctx, data, log); + rc = vbi_parser_parse(&ctx->vbi_parser_ctx, data); if (rc == MQTT_NG_CLIENT_PARSE_DONE) { ctx->properties_length = ctx->vbi_parser_ctx.result; ctx->bytes_consumed += ctx->vbi_parser_ctx.bytes; @@ -1534,7 +1459,7 @@ static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t ctx->state = PROPERTY_TYPE_STR_BIN_LEN; break; default: - mws_error(log, "Unsupported property type %d for property id %d.", (int)ctx->tail->type, (int)ctx->tail->id); + nd_log(NDLS_DAEMON, NDLP_ERR, "Unsupported property type %d for property id %d.", (int)ctx->tail->type, (int)ctx->tail->id); return MQTT_NG_CLIENT_PROTOCOL_ERROR; } break; @@ -1552,7 +1477,7 @@ static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t ctx->state = PROPERTY_TYPE_STR; break; default: - mws_error(log, "Unexpected datatype in PROPERTY_TYPE_STR_BIN_LEN %d", (int)ctx->tail->type); + nd_log(NDLS_DAEMON, NDLP_ERR, "Unexpected datatype in PROPERTY_TYPE_STR_BIN_LEN %d", (int)ctx->tail->type); return MQTT_NG_CLIENT_INTERNAL_ERROR; } break; @@ -1577,7 +1502,7 @@ static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t ctx->state = PROPERTY_NEXT; break; case PROPERTY_TYPE_VBI: - rc = vbi_parser_parse(&ctx->vbi_parser_ctx, data, log); + rc = vbi_parser_parse(&ctx->vbi_parser_ctx, data); if (rc == MQTT_NG_CLIENT_PARSE_DONE) { ctx->tail->data.uint32 = ctx->vbi_parser_ctx.result; ctx->bytes_consumed += ctx->vbi_parser_ctx.bytes; @@ -1627,9 +1552,9 @@ static int parse_connack_varhdr(struct mqtt_ng_client *client) mqtt_properties_parser_ctx_reset(&parser->properties_parser); break; case MQTT_PARSE_VARHDR_PROPS: - return parse_properties_array(&parser->properties_parser, parser->received_data, client->log); + return parse_properties_array(&parser->properties_parser, parser->received_data); default: - ERROR("invalid state for connack varhdr parser"); + nd_log(NDLS_DAEMON, NDLP_ERR, "invalid state for connack varhdr parser"); return MQTT_NG_CLIENT_INTERNAL_ERROR; } return MQTT_NG_CLIENT_OK_CALL_AGAIN; @@ -1653,9 +1578,9 @@ static int parse_disconnect_varhdr(struct mqtt_ng_client *client) mqtt_properties_parser_ctx_reset(&parser->properties_parser); break; case MQTT_PARSE_VARHDR_PROPS: - return parse_properties_array(&parser->properties_parser, parser->received_data, client->log); + return parse_properties_array(&parser->properties_parser, parser->received_data); default: - ERROR("invalid state for connack varhdr parser"); + nd_log(NDLS_DAEMON, NDLP_ERR, "invalid state for connack varhdr parser"); return MQTT_NG_CLIENT_INTERNAL_ERROR; } return MQTT_NG_CLIENT_OK_CALL_AGAIN; @@ -1691,9 +1616,9 @@ static int parse_puback_varhdr(struct mqtt_ng_client *client) mqtt_properties_parser_ctx_reset(&parser->properties_parser); /* FALLTHROUGH */ case MQTT_PARSE_VARHDR_PROPS: - return parse_properties_array(&parser->properties_parser, parser->received_data, client->log); + return parse_properties_array(&parser->properties_parser, parser->received_data); default: - ERROR("invalid state for puback varhdr parser"); + nd_log(NDLS_DAEMON, NDLP_ERR, "invalid state for puback varhdr parser"); return MQTT_NG_CLIENT_INTERNAL_ERROR; } return MQTT_NG_CLIENT_OK_CALL_AGAIN; @@ -1716,7 +1641,7 @@ static int parse_suback_varhdr(struct mqtt_ng_client *client) mqtt_properties_parser_ctx_reset(&parser->properties_parser); /* FALLTHROUGH */ case MQTT_PARSE_VARHDR_PROPS: - rc = parse_properties_array(&parser->properties_parser, parser->received_data, client->log); + rc = parse_properties_array(&parser->properties_parser, parser->received_data); if (rc != MQTT_NG_CLIENT_PARSE_DONE) return rc; parser->mqtt_parsed_len += parser->properties_parser.bytes_consumed; @@ -1737,7 +1662,7 @@ static int parse_suback_varhdr(struct mqtt_ng_client *client) return MQTT_NG_CLIENT_NEED_MORE_BYTES; default: - ERROR("invalid state for suback varhdr parser"); + nd_log(NDLS_DAEMON, NDLP_ERR, "invalid state for suback varhdr parser"); return MQTT_NG_CLIENT_INTERNAL_ERROR; } return MQTT_NG_CLIENT_OK_CALL_AGAIN; @@ -1761,8 +1686,6 @@ static int parse_publish_varhdr(struct mqtt_ng_client *client) break; } publish->topic = callocz(1, publish->topic_len + 1 /* add 0x00 */); - if (publish->topic == NULL) - return MQTT_NG_CLIENT_OOM; parser->varhdr_state = MQTT_PARSE_VARHDR_TOPICNAME; /* FALLTHROUGH */ case MQTT_PARSE_VARHDR_TOPICNAME: @@ -1788,7 +1711,7 @@ static int parse_publish_varhdr(struct mqtt_ng_client *client) parser->mqtt_parsed_len += 2; /* FALLTHROUGH */ case MQTT_PARSE_VARHDR_PROPS: - rc = parse_properties_array(&parser->properties_parser, parser->received_data, client->log); + rc = parse_properties_array(&parser->properties_parser, parser->received_data); if (rc != MQTT_NG_CLIENT_PARSE_DONE) return rc; parser->mqtt_parsed_len += parser->properties_parser.bytes_consumed; @@ -1798,7 +1721,7 @@ static int parse_publish_varhdr(struct mqtt_ng_client *client) if (parser->mqtt_fixed_hdr_remaining_length < parser->mqtt_parsed_len) { freez(publish->topic); publish->topic = NULL; - ERROR("Error parsing PUBLISH message"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Error parsing PUBLISH message"); return MQTT_NG_CLIENT_PROTOCOL_ERROR; } publish->data_len = parser->mqtt_fixed_hdr_remaining_length - parser->mqtt_parsed_len; @@ -1809,18 +1732,12 @@ static int parse_publish_varhdr(struct mqtt_ng_client *client) BUF_READ_CHECK_AT_LEAST(parser->received_data, publish->data_len); publish->data = mallocz(publish->data_len); - if (publish->data == NULL) { - freez(publish->topic); - publish->topic = NULL; - return MQTT_NG_CLIENT_OOM; - } - rbuf_pop(parser->received_data, publish->data, publish->data_len); parser->mqtt_parsed_len += publish->data_len; return MQTT_NG_CLIENT_PARSE_DONE; default: - ERROR("invalid state for publish varhdr parser"); + nd_log(NDLS_DAEMON, NDLP_ERR, "invalid state for publish varhdr parser"); return MQTT_NG_CLIENT_INTERNAL_ERROR; } return MQTT_NG_CLIENT_OK_CALL_AGAIN; @@ -1840,7 +1757,7 @@ static int parse_data(struct mqtt_ng_client *client) parser->state = MQTT_PARSE_FIXED_HEADER_LEN; break; case MQTT_PARSE_FIXED_HEADER_LEN: - rc = vbi_parser_parse(&parser->vbi_parser, parser->received_data, client->log); + rc = vbi_parser_parse(&parser->vbi_parser, parser->received_data); if (rc == MQTT_NG_CLIENT_PARSE_DONE) { parser->mqtt_fixed_hdr_remaining_length = parser->vbi_parser.result; parser->state = MQTT_PARSE_VARIABLE_HEADER; @@ -1883,10 +1800,11 @@ static int parse_data(struct mqtt_ng_client *client) return rc; case MQTT_CPT_PINGRESP: if (parser->mqtt_fixed_hdr_remaining_length) { - ERROR ("PINGRESP has to be 0 Remaining Length."); // [MQTT-3.13.1] + nd_log(NDLS_DAEMON, NDLP_ERR, "PINGRESP has to be 0 Remaining Length."); // [MQTT-3.13.1] return MQTT_NG_CLIENT_PROTOCOL_ERROR; } parser->state = MQTT_PARSE_MQTT_PACKET_DONE; + ping_timeout = 0; break; case MQTT_CPT_DISCONNECT: rc = parse_disconnect_varhdr(client); @@ -1896,7 +1814,7 @@ static int parse_data(struct mqtt_ng_client *client) } return rc; default: - ERROR("Parsing Control Packet Type %" PRIu8 " not implemented yet.", get_control_packet_type(parser->mqtt_control_packet_type)); + nd_log(NDLS_DAEMON, NDLP_ERR, "Parsing Control Packet Type %" PRIu8 " not implemented yet.", get_control_packet_type(parser->mqtt_control_packet_type)); rbuf_bump_tail(parser->received_data, parser->mqtt_fixed_hdr_remaining_length); parser->state = MQTT_PARSE_MQTT_PACKET_DONE; return MQTT_NG_CLIENT_NOT_IMPL_YET; @@ -1916,12 +1834,12 @@ static int parse_data(struct mqtt_ng_client *client) // return -1 on error // return 0 if there is fragment set static int mqtt_ng_next_to_send(struct mqtt_ng_client *client) { - if (client->client_state == CONNECT_PENDING) { + if (client->client_state == MQTT_STATE_CONNECT_PENDING) { client->main_buffer.sending_frag = client->connect_msg; - client->client_state = CONNECTING; + client->client_state = MQTT_STATE_CONNECTING; return 0; } - if (client->client_state != CONNECTED) + if (client->client_state != MQTT_STATE_CONNECTED) return -1; struct buffer_fragment *frag = BUFFER_FIRST_FRAG(&client->main_buffer.hdr_buffer); @@ -1959,7 +1877,7 @@ static int send_fragment(struct mqtt_ng_client *client) { if (bytes) processed = client->send_fnc_ptr(client->user_ctx, ptr, bytes); else - WARN("This fragment was fully sent already. This should not happen!"); + nd_log(NDLS_DAEMON, NDLP_WARNING, "This fragment was fully sent already. This should not happen!"); frag->sent += processed; if (frag->sent != frag->len) @@ -1967,11 +1885,11 @@ static int send_fragment(struct mqtt_ng_client *client) { if (frag->flags & BUFFER_FRAG_MQTT_PACKET_TAIL) { client->time_of_last_send = time(NULL); - pthread_mutex_lock(&client->stats_mutex); + spinlock_lock(&client->stats_spinlock); if (client->main_buffer.sending_frag != &ping_frag) client->stats.tx_messages_queued--; client->stats.tx_messages_sent++; - pthread_mutex_unlock(&client->stats_mutex); + spinlock_unlock(&client->stats_spinlock); client->main_buffer.sending_frag = NULL; return 1; } @@ -1995,7 +1913,7 @@ static void try_send_all(struct mqtt_ng_client *client) { } while(send_all_message_fragments(client) >= 0); } -static inline void mark_message_for_gc(struct buffer_fragment *frag) +static void mark_message_for_gc(struct buffer_fragment *frag) { while (frag) { frag->flags |= BUFFER_FRAG_GARBAGE_COLLECT; @@ -2013,7 +1931,7 @@ static int mark_packet_acked(struct mqtt_ng_client *client, uint16_t packet_id) while (frag) { if ( (frag->flags & BUFFER_FRAG_MQTT_PACKET_HEAD) && frag->packet_id == packet_id) { if (!frag->sent) { - ERROR("Received packet_id (%" PRIu16 ") belongs to MQTT packet which was not yet sent!", packet_id); + nd_log(NDLS_DAEMON, NDLP_ERR, "Received packet_id (%" PRIu16 ") belongs to MQTT packet which was not yet sent!", packet_id); UNLOCK_HDR_BUFFER(&client->main_buffer); return 1; } @@ -2023,7 +1941,7 @@ static int mark_packet_acked(struct mqtt_ng_client *client, uint16_t packet_id) } frag = frag->next; } - ERROR("Received packet_id (%" PRIu16 ") is unknown!", packet_id); + nd_log(NDLS_DAEMON, NDLP_ERR, "Received packet_id (%" PRIu16 ") is unknown!", packet_id); UNLOCK_HDR_BUFFER(&client->main_buffer); return 1; } @@ -2031,110 +1949,113 @@ static int mark_packet_acked(struct mqtt_ng_client *client, uint16_t packet_id) int handle_incoming_traffic(struct mqtt_ng_client *client) { int rc; + while ((rc = parse_data(client)) == MQTT_NG_CLIENT_OK_CALL_AGAIN) { + ; + } + if (rc != MQTT_NG_CLIENT_MQTT_PACKET_DONE) + return rc; + struct mqtt_publish *pub; - while( (rc = parse_data(client)) == MQTT_NG_CLIENT_OK_CALL_AGAIN ); - if ( rc == MQTT_NG_CLIENT_MQTT_PACKET_DONE ) { - struct mqtt_property *prop; -#ifdef MQTT_DEBUG_VERBOSE - DEBUG("MQTT Packet Parsed Successfully!"); -#endif - pthread_mutex_lock(&client->stats_mutex); - client->stats.rx_messages_rcvd++; - pthread_mutex_unlock(&client->stats_mutex); - - switch (get_control_packet_type(client->parser.mqtt_control_packet_type)) { - case MQTT_CPT_CONNACK: -#ifdef MQTT_DEBUG_VERBOSE - DEBUG("Received CONNACK"); -#endif - LOCK_HDR_BUFFER(&client->main_buffer); - mark_message_for_gc(client->connect_msg); - UNLOCK_HDR_BUFFER(&client->main_buffer); - client->connect_msg = NULL; - if (client->client_state != CONNECTING) { - ERROR("Received unexpected CONNACK"); - client->client_state = ERROR; - return MQTT_NG_CLIENT_PROTOCOL_ERROR; - } - if ((prop = get_property_by_id(client->parser.properties_parser.head, MQTT_PROP_MAX_PKT_SIZE)) != NULL) { - INFO("MQTT server limits message size to %" PRIu32, prop->data.uint32); - client->max_msg_size = prop->data.uint32; - } - if (client->connack_callback) - client->connack_callback(client->user_ctx, client->parser.mqtt_packet.connack.reason_code); - if (!client->parser.mqtt_packet.connack.reason_code) { - INFO("MQTT Connection Accepted By Server"); - client->client_state = CONNECTED; - break; - } - client->client_state = ERROR; - return MQTT_NG_CLIENT_SERVER_RETURNED_ERROR; - case MQTT_CPT_PUBACK: -#ifdef MQTT_DEBUG_VERBOSE - DEBUG("Received PUBACK %" PRIu16, client->parser.mqtt_packet.puback.packet_id); -#endif - if (mark_packet_acked(client, client->parser.mqtt_packet.puback.packet_id)) - return MQTT_NG_CLIENT_PROTOCOL_ERROR; - if (client->puback_callback) - client->puback_callback(client->parser.mqtt_packet.puback.packet_id); - break; - case MQTT_CPT_PINGRESP: -#ifdef MQTT_DEBUG_VERBOSE - DEBUG("Received PINGRESP"); -#endif - break; - case MQTT_CPT_SUBACK: -#ifdef MQTT_DEBUG_VERBOSE - DEBUG("Received SUBACK %" PRIu16, client->parser.mqtt_packet.suback.packet_id); -#endif - if (mark_packet_acked(client, client->parser.mqtt_packet.suback.packet_id)) - return MQTT_NG_CLIENT_PROTOCOL_ERROR; + struct mqtt_property *prop; + spinlock_lock(&client->stats_spinlock); + client->stats.rx_messages_rcvd++; + spinlock_unlock(&client->stats_spinlock); + + uint8_t ctrl_packet_type = get_control_packet_type(client->parser.mqtt_control_packet_type); + switch (ctrl_packet_type) { + case MQTT_CPT_CONNACK: + LOCK_HDR_BUFFER(&client->main_buffer); + mark_message_for_gc(client->connect_msg); + UNLOCK_HDR_BUFFER(&client->main_buffer); + + client->connect_msg = NULL; + + if (client->client_state != MQTT_STATE_CONNECTING) { + nd_log(NDLS_DAEMON, NDLP_ERR, "Received unexpected CONNACK"); + client->client_state = MQTT_STATE_ERROR; + return MQTT_NG_CLIENT_PROTOCOL_ERROR; + } + + if ((prop = get_property_by_id(client->parser.properties_parser.head, MQTT_PROP_MAX_PKT_SIZE)) != NULL) { + nd_log(NDLS_DAEMON, NDLP_INFO, "MQTT server limits message size to %" PRIu32, prop->data.uint32); + client->max_msg_size = prop->data.uint32; + } + + if (client->connack_callback) + client->connack_callback(client->user_ctx, client->parser.mqtt_packet.connack.reason_code); + if (!client->parser.mqtt_packet.connack.reason_code) { + nd_log(NDLS_DAEMON, NDLP_INFO, "MQTT Connection Accepted By Server"); + client->client_state = MQTT_STATE_CONNECTED; break; - case MQTT_CPT_PUBLISH: -#ifdef MQTT_DEBUG_VERBOSE - DEBUG("Recevied PUBLISH"); -#endif - pub = &client->parser.mqtt_packet.publish; - if (pub->qos > 1) { - freez(pub->topic); - freez(pub->data); - return MQTT_NG_CLIENT_NOT_IMPL_YET; - } - if ( pub->qos == 1 && (rc = mqtt_ng_puback(client, pub->packet_id, 0)) ) { - client->client_state = ERROR; - ERROR("Error generating PUBACK reply for PUBLISH"); - return rc; - } - if ( (prop = get_property_by_id(client->parser.properties_parser.head, MQTT_PROP_TOPIC_ALIAS)) != NULL ) { - // Topic Alias property was sent from server - void *topic_ptr; - if (!c_rhash_get_ptr_by_uint64(client->rx_aliases, prop->data.uint8, &topic_ptr)) { - if (pub->topic != NULL) { - ERROR("We do not yet support topic alias reassignment"); - return MQTT_NG_CLIENT_NOT_IMPL_YET; - } - pub->topic = topic_ptr; - } else { - if (pub->topic == NULL) { - ERROR("Topic alias with id %d unknown and topic not set by server!", prop->data.uint8); - return MQTT_NG_CLIENT_PROTOCOL_ERROR; - } - c_rhash_insert_uint64_ptr(client->rx_aliases, prop->data.uint8, pub->topic); + } + client->client_state = MQTT_STATE_ERROR; + return MQTT_NG_CLIENT_SERVER_RETURNED_ERROR; + + case MQTT_CPT_PUBACK: + if (mark_packet_acked(client, client->parser.mqtt_packet.puback.packet_id)) + return MQTT_NG_CLIENT_PROTOCOL_ERROR; + if (client->puback_callback) + client->puback_callback(client->parser.mqtt_packet.puback.packet_id); + break; + + case MQTT_CPT_PINGRESP: + break; + + case MQTT_CPT_SUBACK: + if (mark_packet_acked(client, client->parser.mqtt_packet.suback.packet_id)) + return MQTT_NG_CLIENT_PROTOCOL_ERROR; + break; + + case MQTT_CPT_PUBLISH: + pub = &client->parser.mqtt_packet.publish; + + if (pub->qos > 1) { + freez(pub->topic); + freez(pub->data); + return MQTT_NG_CLIENT_NOT_IMPL_YET; + } + + if ( pub->qos == 1 && ((rc = mqtt_ng_puback(client, pub->packet_id, 0))) ) { + client->client_state = MQTT_STATE_ERROR; + nd_log(NDLS_DAEMON, NDLP_ERR, "Error generating PUBACK reply for PUBLISH"); + return rc; + } + + if ( (prop = get_property_by_id(client->parser.properties_parser.head, MQTT_PROP_TOPIC_ALIAS)) != NULL ) { + // Topic Alias property was sent from server + void *topic_ptr; + if (!c_rhash_get_ptr_by_uint64(client->rx_aliases, prop->data.uint8, &topic_ptr)) { + if (pub->topic != NULL) { + nd_log(NDLS_DAEMON, NDLP_ERR, "We do not yet support topic alias reassignment"); + return MQTT_NG_CLIENT_NOT_IMPL_YET; } + pub->topic = topic_ptr; + } else { + if (pub->topic == NULL) { + nd_log(NDLS_DAEMON, NDLP_ERR, "Topic alias with id %d unknown and topic not set by server!", prop->data.uint8); + return MQTT_NG_CLIENT_PROTOCOL_ERROR; + } + c_rhash_insert_uint64_ptr(client->rx_aliases, prop->data.uint8, pub->topic); } - if (client->msg_callback) - client->msg_callback(pub->topic, pub->data, pub->data_len, pub->qos); - // in case we have property topic alias and we have topic we take over the string - // and add pointer to it into topic alias list - if (prop == NULL) - freez(pub->topic); - freez(pub->data); - return MQTT_NG_CLIENT_WANT_WRITE; - case MQTT_CPT_DISCONNECT: - INFO ("Got MQTT DISCONNECT control packet from server. Reason code: %d", (int)client->parser.mqtt_packet.disconnect.reason_code); - client->client_state = DISCONNECTED; - break; - } + } + + if (client->msg_callback) + client->msg_callback(pub->topic, pub->data, pub->data_len, pub->qos); + // in case we have property topic alias and we have topic we take over the string + // and add pointer to it into topic alias list + if (prop == NULL) + freez(pub->topic); + freez(pub->data); + return MQTT_NG_CLIENT_WANT_WRITE; + + case MQTT_CPT_DISCONNECT: + nd_log(NDLS_DAEMON, NDLP_INFO, "Got MQTT DISCONNECT control packet from server. Reason code: %d", (int)client->parser.mqtt_packet.disconnect.reason_code); + client->client_state = MQTT_STATE_DISCONNECTED; + break; + + default: + nd_log(NDLS_DAEMON, NDLP_INFO, "Got unknown control packet %u from server", ctrl_packet_type); + break; } return rc; @@ -2142,10 +2063,10 @@ int handle_incoming_traffic(struct mqtt_ng_client *client) int mqtt_ng_sync(struct mqtt_ng_client *client) { - if (client->client_state == RAW || client->client_state == DISCONNECTED) + if (client->client_state == MQTT_STATE_RAW || client->client_state == MQTT_STATE_DISCONNECTED) return 0; - if (client->client_state == ERROR) + if (client->client_state == MQTT_STATE_ERROR) return 1; LOCK_HDR_BUFFER(&client->main_buffer); @@ -2182,9 +2103,9 @@ void mqtt_ng_set_max_mem(struct mqtt_ng_client *client, size_t bytes) void mqtt_ng_get_stats(struct mqtt_ng_client *client, struct mqtt_ng_stats *stats) { - pthread_mutex_lock(&client->stats_mutex); + spinlock_lock(&client->stats_spinlock); memcpy(stats, &client->stats, sizeof(struct mqtt_ng_stats)); - pthread_mutex_unlock(&client->stats_mutex); + spinlock_unlock(&client->stats_spinlock); stats->tx_bytes_queued = 0; stats->tx_buffer_reclaimable = 0; @@ -2207,11 +2128,11 @@ void mqtt_ng_get_stats(struct mqtt_ng_client *client, struct mqtt_ng_stats *stat int mqtt_ng_set_topic_alias(struct mqtt_ng_client *client, const char *topic) { uint16_t idx; - pthread_rwlock_wrlock(&client->tx_topic_aliases.rwlock); + spinlock_lock(&client->tx_topic_aliases.spinlock); if (client->tx_topic_aliases.idx_assigned >= client->tx_topic_aliases.idx_max) { - pthread_rwlock_unlock(&client->tx_topic_aliases.rwlock); - mws_error(client->log, "Tx topic alias indexes were exhausted (current version of the library doesn't support reassigning yet. Feel free to contribute."); + spinlock_unlock(&client->tx_topic_aliases.spinlock); + nd_log(NDLS_DAEMON, NDLP_ERR, "Tx topic alias indexes were exhausted (current version of the library doesn't support reassigning yet. Feel free to contribute."); return 0; //0 is not a valid topic alias } @@ -2220,8 +2141,8 @@ int mqtt_ng_set_topic_alias(struct mqtt_ng_client *client, const char *topic) // this is not a problem for library but might be helpful to warn user // as it might indicate bug in their program (but also might be expected) idx = alias->idx; - pthread_rwlock_unlock(&client->tx_topic_aliases.rwlock); - mws_debug(client->log, "%s topic \"%s\" already has alias set. Ignoring.", __FUNCTION__, topic); + spinlock_unlock(&client->tx_topic_aliases.spinlock); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "%s topic \"%s\" already has alias set. Ignoring.", __FUNCTION__, topic); return idx; } @@ -2232,6 +2153,6 @@ int mqtt_ng_set_topic_alias(struct mqtt_ng_client *client, const char *topic) c_rhash_insert_str_ptr(client->tx_topic_aliases.stoi_dict, topic, (void*)alias); - pthread_rwlock_unlock(&client->tx_topic_aliases.rwlock); + spinlock_unlock(&client->tx_topic_aliases.spinlock); return idx; } |