summaryrefslogtreecommitdiffstats
path: root/src/aclk/mqtt_websockets/mqtt_ng.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/aclk/mqtt_websockets/mqtt_ng.c')
-rw-r--r--src/aclk/mqtt_websockets/mqtt_ng.c541
1 files changed, 231 insertions, 310 deletions
diff --git a/src/aclk/mqtt_websockets/mqtt_ng.c b/src/aclk/mqtt_websockets/mqtt_ng.c
index 8ad6bd5c9..9abe77b5f 100644
--- a/src/aclk/mqtt_websockets/mqtt_ng.c
+++ b/src/aclk/mqtt_websockets/mqtt_ng.c
@@ -1,35 +1,19 @@
-// Copyright: SPDX-License-Identifier: GPL-3.0-only
+// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
-#include <stdint.h>
-#include <stdlib.h>
-#include <string.h>
-#include <pthread.h>
-#include <inttypes.h>
-
-#include "c_rhash/c_rhash.h"
+#include "libnetdata/libnetdata.h"
#include "common_internal.h"
#include "mqtt_constants.h"
-#include "mqtt_wss_log.h"
#include "mqtt_ng.h"
-#define UNIT_LOG_PREFIX "mqtt_client: "
-#define FATAL(fmt, ...) mws_fatal(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
-#define ERROR(fmt, ...) mws_error(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
-#define WARN(fmt, ...) mws_warn (client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
-#define INFO(fmt, ...) mws_info (client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
-#define DEBUG(fmt, ...) mws_debug(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
-
#define SMALL_STRING_DONT_FRAGMENT_LIMIT 128
-#define MIN(a,b) (((a)<(b))?(a):(b))
-
-#define LOCK_HDR_BUFFER(buffer) pthread_mutex_lock(&((buffer)->mutex))
-#define UNLOCK_HDR_BUFFER(buffer) pthread_mutex_unlock(&((buffer)->mutex))
+#define LOCK_HDR_BUFFER(buffer) spinlock_lock(&((buffer)->spinlock))
+#define UNLOCK_HDR_BUFFER(buffer) spinlock_unlock(&((buffer)->spinlock))
#define BUFFER_FRAG_GARBAGE_COLLECT 0x01
// some packets can be marked for garbage collection
@@ -75,17 +59,17 @@ struct transaction_buffer {
// to be able to revert state easily
// in case of error mid processing
struct header_buffer state_backup;
- pthread_mutex_t mutex;
+ SPINLOCK spinlock;
struct buffer_fragment *sending_frag;
};
enum mqtt_client_state {
- RAW = 0,
- CONNECT_PENDING,
- CONNECTING,
- CONNECTED,
- ERROR,
- DISCONNECTED
+ MQTT_STATE_RAW = 0,
+ MQTT_STATE_CONNECT_PENDING,
+ MQTT_STATE_CONNECTING,
+ MQTT_STATE_CONNECTED,
+ MQTT_STATE_ERROR,
+ MQTT_STATE_DISCONNECTED
};
enum parser_state {
@@ -224,7 +208,7 @@ struct topic_aliases_data {
c_rhash stoi_dict;
uint32_t idx_max;
uint32_t idx_assigned;
- pthread_rwlock_t rwlock;
+ SPINLOCK spinlock;
};
struct mqtt_ng_client {
@@ -234,8 +218,6 @@ struct mqtt_ng_client {
mqtt_msg_data connect_msg;
- mqtt_wss_log_ctx_t log;
-
mqtt_ng_send_fnc_t send_fnc_ptr;
void *user_ctx;
@@ -253,7 +235,7 @@ struct mqtt_ng_client {
unsigned int ping_pending:1;
struct mqtt_ng_stats stats;
- pthread_mutex_t stats_mutex;
+ SPINLOCK stats_spinlock;
struct topic_aliases_data tx_topic_aliases;
c_rhash rx_aliases;
@@ -407,7 +389,7 @@ enum memory_mode {
CALLER_RESPONSIBLE
};
-static inline enum memory_mode ptr2memory_mode(void * ptr) {
+static enum memory_mode ptr2memory_mode(void * ptr) {
if (ptr == NULL)
return MEMCPY;
if (ptr == CALLER_RESPONSIBILITY)
@@ -492,15 +474,8 @@ static void buffer_rebuild(struct header_buffer *buf)
} while(frag);
}
-static void buffer_garbage_collect(struct header_buffer *buf, mqtt_wss_log_ctx_t log_ctx)
+static void buffer_garbage_collect(struct header_buffer *buf)
{
-#if !defined(MQTT_DEBUG_VERBOSE) && !defined(ADDITIONAL_CHECKS)
- (void) log_ctx;
-#endif
-#ifdef MQTT_DEBUG_VERBOSE
- mws_debug(log_ctx, "Buffer Garbage Collection!");
-#endif
-
struct buffer_fragment *frag = BUFFER_FIRST_FRAG(buf);
while (frag) {
if (!frag_is_marked_for_gc(frag))
@@ -511,12 +486,8 @@ static void buffer_garbage_collect(struct header_buffer *buf, mqtt_wss_log_ctx_t
frag = frag->next;
}
- if (frag == BUFFER_FIRST_FRAG(buf)) {
-#ifdef MQTT_DEBUG_VERBOSE
- mws_debug(log_ctx, "Buffer Garbage Collection! No Space Reclaimed!");
-#endif
+ if (frag == BUFFER_FIRST_FRAG(buf))
return;
- }
if (!frag) {
buf->tail_frag = NULL;
@@ -535,21 +506,17 @@ static void buffer_garbage_collect(struct header_buffer *buf, mqtt_wss_log_ctx_t
buffer_rebuild(buf);
}
-static void transaction_buffer_garbage_collect(struct transaction_buffer *buf, mqtt_wss_log_ctx_t log_ctx)
+static void transaction_buffer_garbage_collect(struct transaction_buffer *buf)
{
-#ifdef MQTT_DEBUG_VERBOSE
- mws_debug(log_ctx, "Transaction Buffer Garbage Collection! %s", buf->sending_frag == NULL ? "NULL" : "in flight message");
-#endif
-
// Invalidate the cached sending fragment
// as we will move data around
if (buf->sending_frag != &ping_frag)
buf->sending_frag = NULL;
- buffer_garbage_collect(&buf->hdr_buffer, log_ctx);
+ buffer_garbage_collect(&buf->hdr_buffer);
}
-static int transaction_buffer_grow(struct transaction_buffer *buf, mqtt_wss_log_ctx_t log_ctx, float rate, size_t max)
+static int transaction_buffer_grow(struct transaction_buffer *buf, float rate, size_t max)
{
if (buf->hdr_buffer.size >= max)
return 0;
@@ -565,35 +532,30 @@ static int transaction_buffer_grow(struct transaction_buffer *buf, mqtt_wss_log_
void *ret = reallocz(buf->hdr_buffer.data, buf->hdr_buffer.size);
if (ret == NULL) {
- mws_warn(log_ctx, "Buffer growth failed (realloc)");
+ nd_log(NDLS_DAEMON, NDLP_WARNING, "Buffer growth failed (realloc)");
return 1;
}
- mws_debug(log_ctx, "Message metadata buffer was grown");
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "Message metadata buffer was grown");
buf->hdr_buffer.data = ret;
buffer_rebuild(&buf->hdr_buffer);
return 0;
}
-inline static int transaction_buffer_init(struct transaction_buffer *to_init, size_t size)
+inline static void transaction_buffer_init(struct transaction_buffer *to_init, size_t size)
{
- pthread_mutex_init(&to_init->mutex, NULL);
+ spinlock_init(&to_init->spinlock);
to_init->hdr_buffer.size = size;
to_init->hdr_buffer.data = mallocz(size);
- if (to_init->hdr_buffer.data == NULL)
- return 1;
-
to_init->hdr_buffer.tail = to_init->hdr_buffer.data;
to_init->hdr_buffer.tail_frag = NULL;
- return 0;
}
static void transaction_buffer_destroy(struct transaction_buffer *to_init)
{
buffer_purge(&to_init->hdr_buffer);
- pthread_mutex_destroy(&to_init->mutex);
freez(to_init->hdr_buffer.data);
}
@@ -629,54 +591,30 @@ void transaction_buffer_transaction_rollback(struct transaction_buffer *buf, str
struct mqtt_ng_client *mqtt_ng_init(struct mqtt_ng_init *settings)
{
struct mqtt_ng_client *client = callocz(1, sizeof(struct mqtt_ng_client));
- if (client == NULL)
- return NULL;
- if (transaction_buffer_init(&client->main_buffer, HEADER_BUFFER_SIZE))
- goto err_free_client;
+ transaction_buffer_init(&client->main_buffer, HEADER_BUFFER_SIZE);
client->rx_aliases = RX_ALIASES_INITIALIZE();
- if (client->rx_aliases == NULL)
- goto err_free_trx_buf;
- if (pthread_mutex_init(&client->stats_mutex, NULL))
- goto err_free_rx_alias;
+ spinlock_init(&client->stats_spinlock);
+ spinlock_init(&client->tx_topic_aliases.spinlock);
client->tx_topic_aliases.stoi_dict = TX_ALIASES_INITIALIZE();
- if (client->tx_topic_aliases.stoi_dict == NULL)
- goto err_free_stats_mutex;
client->tx_topic_aliases.idx_max = UINT16_MAX;
- if (pthread_rwlock_init(&client->tx_topic_aliases.rwlock, NULL))
- goto err_free_tx_alias;
-
// TODO just embed the struct into mqtt_ng_client
client->parser.received_data = settings->data_in;
client->send_fnc_ptr = settings->data_out_fnc;
client->user_ctx = settings->user_ctx;
- client->log = settings->log;
-
client->puback_callback = settings->puback_callback;
client->connack_callback = settings->connack_callback;
client->msg_callback = settings->msg_callback;
return client;
-
-err_free_tx_alias:
- c_rhash_destroy(client->tx_topic_aliases.stoi_dict);
-err_free_stats_mutex:
- pthread_mutex_destroy(&client->stats_mutex);
-err_free_rx_alias:
- c_rhash_destroy(client->rx_aliases);
-err_free_trx_buf:
- transaction_buffer_destroy(&client->main_buffer);
-err_free_client:
- freez(client);
- return NULL;
}
-static inline uint8_t get_control_packet_type(uint8_t first_hdr_byte)
+static uint8_t get_control_packet_type(uint8_t first_hdr_byte)
{
return first_hdr_byte >> 4;
}
@@ -708,33 +646,27 @@ static void mqtt_ng_destroy_tx_alias_hash(c_rhash hash)
void mqtt_ng_destroy(struct mqtt_ng_client *client)
{
transaction_buffer_destroy(&client->main_buffer);
- pthread_mutex_destroy(&client->stats_mutex);
mqtt_ng_destroy_tx_alias_hash(client->tx_topic_aliases.stoi_dict);
- pthread_rwlock_destroy(&client->tx_topic_aliases.rwlock);
mqtt_ng_destroy_rx_alias_hash(client->rx_aliases);
freez(client);
}
-int frag_set_external_data(mqtt_wss_log_ctx_t log, struct buffer_fragment *frag, void *data, size_t data_len, free_fnc_t data_free_fnc)
+int frag_set_external_data(struct buffer_fragment *frag, void *data, size_t data_len, free_fnc_t data_free_fnc)
{
if (frag->len) {
// TODO?: This could potentially be done in future if we set rule
// external data always follows in buffer data
// could help reduce fragmentation in some messages but
// currently not worth it considering time is tight
- mws_fatal(log, UNIT_LOG_PREFIX "INTERNAL ERROR: Cannot set external data to fragment already containing in buffer data!");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "INTERNAL ERROR: Cannot set external data to fragment already containing in buffer data!");
return 1;
}
switch (ptr2memory_mode(data_free_fnc)) {
case MEMCPY:
frag->data = mallocz(data_len);
- if (frag->data == NULL) {
- mws_error(log, UNIT_LOG_PREFIX "OOM while malloc @_optimized_add");
- return 1;
- }
memcpy(frag->data, data, data_len);
break;
case EXTERNAL_FREE_AFTER_USE:
@@ -816,18 +748,18 @@ static size_t mqtt_ng_connect_size(struct mqtt_auth_properties *auth,
#define PACK_2B_INT(buffer, integer, frag) { *(uint16_t *)WRITE_POS(frag) = htobe16((integer)); \
DATA_ADVANCE(buffer, sizeof(uint16_t), frag); }
-static int _optimized_add(struct header_buffer *buf, mqtt_wss_log_ctx_t log_ctx, void *data, size_t data_len, free_fnc_t data_free_fnc, struct buffer_fragment **frag)
+static int _optimized_add(struct header_buffer *buf, void *data, size_t data_len, free_fnc_t data_free_fnc, struct buffer_fragment **frag)
{
if (data_len > SMALL_STRING_DONT_FRAGMENT_LIMIT) {
buffer_frag_flag_t flags = BUFFER_FRAG_DATA_EXTERNAL;
if ((*frag)->flags & BUFFER_FRAG_GARBAGE_COLLECT_ON_SEND)
flags |= BUFFER_FRAG_GARBAGE_COLLECT_ON_SEND;
if( (*frag = buffer_new_frag(buf, flags)) == NULL ) {
- mws_error(log_ctx, "Out of buffer space while generating the message");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Out of buffer space while generating the message");
return 1;
}
- if (frag_set_external_data(log_ctx, *frag, data, data_len, data_free_fnc)) {
- mws_error(log_ctx, "Error adding external data to newly created fragment");
+ if (frag_set_external_data(*frag, data, data_len, data_free_fnc)) {
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Error adding external data to newly created fragment");
return 1;
}
// we dont want to write to this fragment anymore
@@ -842,31 +774,30 @@ static int _optimized_add(struct header_buffer *buf, mqtt_wss_log_ctx_t log_ctx,
return 0;
}
-#define TRY_GENERATE_MESSAGE(generator_function, client, ...) \
- int rc = generator_function(&client->main_buffer, client->log, ##__VA_ARGS__); \
+#define TRY_GENERATE_MESSAGE(generator_function, ...) \
+ int rc = generator_function(&client->main_buffer, ##__VA_ARGS__); \
if (rc == MQTT_NG_MSGGEN_BUFFER_OOM) { \
LOCK_HDR_BUFFER(&client->main_buffer); \
- transaction_buffer_garbage_collect((&client->main_buffer), client->log); \
+ transaction_buffer_garbage_collect((&client->main_buffer)); \
UNLOCK_HDR_BUFFER(&client->main_buffer); \
- rc = generator_function(&client->main_buffer, client->log, ##__VA_ARGS__); \
+ rc = generator_function(&client->main_buffer, ##__VA_ARGS__); \
if (rc == MQTT_NG_MSGGEN_BUFFER_OOM && client->max_mem_bytes) { \
LOCK_HDR_BUFFER(&client->main_buffer); \
- transaction_buffer_grow((&client->main_buffer), client->log, GROWTH_FACTOR, client->max_mem_bytes); \
+ transaction_buffer_grow((&client->main_buffer),GROWTH_FACTOR, client->max_mem_bytes); \
UNLOCK_HDR_BUFFER(&client->main_buffer); \
- rc = generator_function(&client->main_buffer, client->log, ##__VA_ARGS__); \
+ rc = generator_function(&client->main_buffer, ##__VA_ARGS__); \
} \
if (rc == MQTT_NG_MSGGEN_BUFFER_OOM) \
- mws_error(client->log, "%s failed to generate message due to insufficient buffer space (line %d)", __FUNCTION__, __LINE__); \
+ nd_log(NDLS_DAEMON, NDLP_ERR, "%s failed to generate message due to insufficient buffer space (line %d)", __FUNCTION__, __LINE__); \
} \
if (rc == MQTT_NG_MSGGEN_OK) { \
- pthread_mutex_lock(&client->stats_mutex); \
+ spinlock_lock(&client->stats_spinlock); \
client->stats.tx_messages_queued++; \
- pthread_mutex_unlock(&client->stats_mutex); \
+ spinlock_unlock(&client->stats_spinlock); \
} \
return rc;
mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf,
- mqtt_wss_log_ctx_t log_ctx,
struct mqtt_auth_properties *auth,
struct mqtt_lwt_properties *lwt,
uint8_t clean_start,
@@ -874,7 +805,7 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf,
{
// Sanity Checks First (are given parameters correct and up to MQTT spec)
if (!auth->client_id) {
- mws_error(log_ctx, "ClientID must be set. [MQTT-3.1.3-3]");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "ClientID must be set. [MQTT-3.1.3-3]");
return NULL;
}
@@ -885,29 +816,29 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf,
// however server MUST allow ClientIDs between 1-23 bytes [MQTT-3.1.3-5]
// so we will warn client server might not like this and he is using it
// at his own risk!
- mws_warn(log_ctx, "client_id provided is empty string. This might not be allowed by server [MQTT-3.1.3-6]");
+ nd_log(NDLS_DAEMON, NDLP_WARNING, "client_id provided is empty string. This might not be allowed by server [MQTT-3.1.3-6]");
}
if(len > MQTT_MAX_CLIENT_ID) {
// [MQTT-3.1.3-5] server MUST allow client_id length 1-32
// server MAY allow longer client_id, if user provides longer client_id
// warn them he is doing so at his own risk!
- mws_warn(log_ctx, "client_id provided is longer than 23 bytes, server might not allow that [MQTT-3.1.3-5]");
+ nd_log(NDLS_DAEMON, NDLP_WARNING, "client_id provided is longer than 23 bytes, server might not allow that [MQTT-3.1.3-5]");
}
if (lwt) {
if (lwt->will_message && lwt->will_message_size > 65535) {
- mws_error(log_ctx, "Will message cannot be longer than 65535 bytes due to MQTT protocol limitations [MQTT-3.1.3-4] and [MQTT-1.5.6]");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Will message cannot be longer than 65535 bytes due to MQTT protocol limitations [MQTT-3.1.3-4] and [MQTT-1.5.6]");
return NULL;
}
if (!lwt->will_topic) { //TODO topic given with strlen==0 ? check specs
- mws_error(log_ctx, "If will message is given will topic must also be given [MQTT-3.1.3.3]");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "If will message is given will topic must also be given [MQTT-3.1.3.3]");
return NULL;
}
if (lwt->will_qos > MQTT_MAX_QOS) {
// refer to [MQTT-3-1.2-12]
- mws_error(log_ctx, "QOS for LWT message is bigger than max");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "QOS for LWT message is bigger than max");
return NULL;
}
}
@@ -941,8 +872,10 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf,
*connect_flags = 0;
if (auth->username)
*connect_flags |= MQTT_CONNECT_FLAG_USERNAME;
+
if (auth->password)
*connect_flags |= MQTT_CONNECT_FLAG_PASSWORD;
+
if (lwt) {
*connect_flags |= MQTT_CONNECT_FLAG_LWT;
*connect_flags |= lwt->will_qos << MQTT_CONNECT_FLAG_QOS_BITSHIFT;
@@ -966,7 +899,7 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf,
// [MQTT-3.1.3.1] Client identifier
CHECK_BYTES_AVAILABLE(&trx_buf->hdr_buffer, 2, goto fail_rollback);
PACK_2B_INT(&trx_buf->hdr_buffer, strlen(auth->client_id), frag);
- if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, auth->client_id, strlen(auth->client_id), auth->client_id_free, &frag))
+ if (_optimized_add(&trx_buf->hdr_buffer, auth->client_id, strlen(auth->client_id), auth->client_id_free, &frag))
goto fail_rollback;
if (lwt != NULL) {
@@ -980,7 +913,7 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf,
// Will Topic [MQTT-3.1.3.3]
CHECK_BYTES_AVAILABLE(&trx_buf->hdr_buffer, 2, goto fail_rollback);
PACK_2B_INT(&trx_buf->hdr_buffer, strlen(lwt->will_topic), frag);
- if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, lwt->will_topic, strlen(lwt->will_topic), lwt->will_topic_free, &frag))
+ if (_optimized_add(&trx_buf->hdr_buffer, lwt->will_topic, strlen(lwt->will_topic), lwt->will_topic_free, &frag))
goto fail_rollback;
// Will Payload [MQTT-3.1.3.4]
@@ -988,7 +921,7 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf,
BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback);
CHECK_BYTES_AVAILABLE(&trx_buf->hdr_buffer, 2, goto fail_rollback);
PACK_2B_INT(&trx_buf->hdr_buffer, lwt->will_message_size, frag);
- if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, lwt->will_message, lwt->will_message_size, lwt->will_topic_free, &frag))
+ if (_optimized_add(&trx_buf->hdr_buffer, lwt->will_message, lwt->will_message_size, lwt->will_topic_free, &frag))
goto fail_rollback;
}
}
@@ -998,7 +931,7 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf,
BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback);
CHECK_BYTES_AVAILABLE(&trx_buf->hdr_buffer, 2, goto fail_rollback);
PACK_2B_INT(&trx_buf->hdr_buffer, strlen(auth->username), frag);
- if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, auth->username, strlen(auth->username), auth->username_free, &frag))
+ if (_optimized_add(&trx_buf->hdr_buffer, auth->username, strlen(auth->username), auth->username_free, &frag))
goto fail_rollback;
}
@@ -1007,7 +940,7 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf,
BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback);
CHECK_BYTES_AVAILABLE(&trx_buf->hdr_buffer, 2, goto fail_rollback);
PACK_2B_INT(&trx_buf->hdr_buffer, strlen(auth->password), frag);
- if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, auth->password, strlen(auth->password), auth->password_free, &frag))
+ if (_optimized_add(&trx_buf->hdr_buffer, auth->password, strlen(auth->password), auth->password_free, &frag))
goto fail_rollback;
}
trx_buf->hdr_buffer.tail_frag->flags |= BUFFER_FRAG_MQTT_PACKET_TAIL;
@@ -1024,7 +957,7 @@ int mqtt_ng_connect(struct mqtt_ng_client *client,
uint8_t clean_start,
uint16_t keep_alive)
{
- client->client_state = RAW;
+ client->client_state = MQTT_STATE_RAW;
client->parser.state = MQTT_PARSE_FIXED_HEADER_PACKET_TYPE;
LOCK_HDR_BUFFER(&client->main_buffer);
@@ -1033,28 +966,23 @@ int mqtt_ng_connect(struct mqtt_ng_client *client,
buffer_purge(&client->main_buffer.hdr_buffer);
UNLOCK_HDR_BUFFER(&client->main_buffer);
- pthread_rwlock_wrlock(&client->tx_topic_aliases.rwlock);
+ spinlock_lock(&client->tx_topic_aliases.spinlock);
// according to MQTT spec topic aliases should not be persisted
// even if clean session is true
mqtt_ng_destroy_tx_alias_hash(client->tx_topic_aliases.stoi_dict);
+
client->tx_topic_aliases.stoi_dict = TX_ALIASES_INITIALIZE();
- if (client->tx_topic_aliases.stoi_dict == NULL) {
- pthread_rwlock_unlock(&client->tx_topic_aliases.rwlock);
- return 1;
- }
client->tx_topic_aliases.idx_assigned = 0;
- pthread_rwlock_unlock(&client->tx_topic_aliases.rwlock);
+ spinlock_unlock(&client->tx_topic_aliases.spinlock);
mqtt_ng_destroy_rx_alias_hash(client->rx_aliases);
client->rx_aliases = RX_ALIASES_INITIALIZE();
- if (client->rx_aliases == NULL)
- return 1;
- client->connect_msg = mqtt_ng_generate_connect(&client->main_buffer, client->log, auth, lwt, clean_start, keep_alive);
+ client->connect_msg = mqtt_ng_generate_connect(&client->main_buffer, auth, lwt, clean_start, keep_alive);
if (client->connect_msg == NULL)
return 1;
- pthread_mutex_lock(&client->stats_mutex);
+ spinlock_lock(&client->stats_spinlock);
if (clean_start)
client->stats.tx_messages_queued = 1;
else
@@ -1062,9 +990,9 @@ int mqtt_ng_connect(struct mqtt_ng_client *client,
client->stats.tx_messages_sent = 0;
client->stats.rx_messages_rcvd = 0;
- pthread_mutex_unlock(&client->stats_mutex);
+ spinlock_unlock(&client->stats_spinlock);
- client->client_state = CONNECT_PENDING;
+ client->client_state = MQTT_STATE_CONNECT_PENDING;
return 0;
}
@@ -1074,15 +1002,16 @@ uint16_t get_unused_packet_id() {
return packet_id ? packet_id : ++packet_id;
}
-static inline size_t mqtt_ng_publish_size(const char *topic,
- size_t msg_len,
- uint16_t topic_id)
+static size_t mqtt_ng_publish_size(
+ const char *topic,
+ size_t msg_len,
+ uint16_t topic_id)
{
- size_t retval = 2 /* Topic Name Length */
- + (topic == NULL ? 0 : strlen(topic))
- + 2 /* Packet identifier */
- + 1 /* Properties Length TODO for now fixed to 1 property */
- + msg_len;
+ size_t retval = 2
+ + (topic == NULL ? 0 : strlen(topic)) /* Topic Name Length */
+ + 2 /* Packet identifier */
+ + 1 /* Properties Length for now fixed to 1 property */
+ + msg_len;
if (topic_id)
retval += 3;
@@ -1091,7 +1020,6 @@ static inline size_t mqtt_ng_publish_size(const char *topic,
}
int mqtt_ng_generate_publish(struct transaction_buffer *trx_buf,
- mqtt_wss_log_ctx_t log_ctx,
char *topic,
free_fnc_t topic_free,
void *msg,
@@ -1130,7 +1058,7 @@ int mqtt_ng_generate_publish(struct transaction_buffer *trx_buf,
// [MQTT-3.3.2.1]
PACK_2B_INT(&trx_buf->hdr_buffer, topic == NULL ? 0 : strlen(topic), frag);
if (topic != NULL) {
- if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, topic, strlen(topic), topic_free, &frag))
+ if (_optimized_add(&trx_buf->hdr_buffer, topic, strlen(topic), topic_free, &frag))
goto fail_rollback;
BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback);
}
@@ -1154,7 +1082,7 @@ int mqtt_ng_generate_publish(struct transaction_buffer *trx_buf,
if( (frag = buffer_new_frag(&trx_buf->hdr_buffer, BUFFER_FRAG_DATA_EXTERNAL)) == NULL )
goto fail_rollback;
- if (frag_set_external_data(log_ctx, frag, msg, msg_len, msg_free))
+ if (frag_set_external_data(frag, msg, msg_len, msg_free))
goto fail_rollback;
trx_buf->hdr_buffer.tail_frag->flags |= BUFFER_FRAG_MQTT_PACKET_TAIL;
@@ -1178,9 +1106,9 @@ int mqtt_ng_publish(struct mqtt_ng_client *client,
uint16_t *packet_id)
{
struct topic_alias_data *alias = NULL;
- pthread_rwlock_rdlock(&client->tx_topic_aliases.rwlock);
+ spinlock_lock(&client->tx_topic_aliases.spinlock);
c_rhash_get_ptr_by_str(client->tx_topic_aliases.stoi_dict, topic, (void**)&alias);
- pthread_rwlock_unlock(&client->tx_topic_aliases.rwlock);
+ spinlock_unlock(&client->tx_topic_aliases.spinlock);
uint16_t topic_id = 0;
@@ -1194,14 +1122,14 @@ int mqtt_ng_publish(struct mqtt_ng_client *client,
}
if (client->max_msg_size && PUBLISH_SP_SIZE + mqtt_ng_publish_size(topic, msg_len, topic_id) > client->max_msg_size) {
- mws_error(client->log, "Message too big for server: %zu", msg_len);
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Message too big for server: %zu", msg_len);
return MQTT_NG_MSGGEN_MSG_TOO_BIG;
}
- TRY_GENERATE_MESSAGE(mqtt_ng_generate_publish, client, topic, topic_free, msg, msg_free, msg_len, publish_flags, packet_id, topic_id);
+ TRY_GENERATE_MESSAGE(mqtt_ng_generate_publish, topic, topic_free, msg, msg_free, msg_len, publish_flags, packet_id, topic_id);
}
-static inline size_t mqtt_ng_subscribe_size(struct mqtt_sub *subs, size_t sub_count)
+static size_t mqtt_ng_subscribe_size(struct mqtt_sub *subs, size_t sub_count)
{
size_t len = 2 /* Packet Identifier */ + 1 /* Properties Length TODO for now fixed 0 */;
len += sub_count * (2 /* topic filter string length */ + 1 /* [MQTT-3.8.3.1] Subscription Options Byte */);
@@ -1212,7 +1140,7 @@ static inline size_t mqtt_ng_subscribe_size(struct mqtt_sub *subs, size_t sub_co
return len;
}
-int mqtt_ng_generate_subscribe(struct transaction_buffer *trx_buf, mqtt_wss_log_ctx_t log_ctx, struct mqtt_sub *subs, size_t sub_count)
+int mqtt_ng_generate_subscribe(struct transaction_buffer *trx_buf, struct mqtt_sub *subs, size_t sub_count)
{
// >> START THE RODEO <<
transaction_buffer_transaction_start(trx_buf);
@@ -1247,7 +1175,7 @@ int mqtt_ng_generate_subscribe(struct transaction_buffer *trx_buf, mqtt_wss_log_
for (size_t i = 0; i < sub_count; i++) {
BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback);
PACK_2B_INT(&trx_buf->hdr_buffer, strlen(subs[i].topic), frag);
- if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, subs[i].topic, strlen(subs[i].topic), subs[i].topic_free, &frag))
+ if (_optimized_add(&trx_buf->hdr_buffer, subs[i].topic, strlen(subs[i].topic), subs[i].topic_free, &frag))
goto fail_rollback;
BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback);
*WRITE_POS(frag) = subs[i].options;
@@ -1264,12 +1192,11 @@ fail_rollback:
int mqtt_ng_subscribe(struct mqtt_ng_client *client, struct mqtt_sub *subs, size_t sub_count)
{
- TRY_GENERATE_MESSAGE(mqtt_ng_generate_subscribe, client, subs, sub_count);
+ TRY_GENERATE_MESSAGE(mqtt_ng_generate_subscribe, subs, sub_count);
}
-int mqtt_ng_generate_disconnect(struct transaction_buffer *trx_buf, mqtt_wss_log_ctx_t log_ctx, uint8_t reason_code)
+int mqtt_ng_generate_disconnect(struct transaction_buffer *trx_buf, uint8_t reason_code)
{
- (void) log_ctx;
// >> START THE RODEO <<
transaction_buffer_transaction_start(trx_buf);
@@ -1308,12 +1235,11 @@ fail_rollback:
int mqtt_ng_disconnect(struct mqtt_ng_client *client, uint8_t reason_code)
{
- TRY_GENERATE_MESSAGE(mqtt_ng_generate_disconnect, client, reason_code);
+ TRY_GENERATE_MESSAGE(mqtt_ng_generate_disconnect, reason_code);
}
-static int mqtt_generate_puback(struct transaction_buffer *trx_buf, mqtt_wss_log_ctx_t log_ctx, uint16_t packet_id, uint8_t reason_code)
+static int mqtt_generate_puback(struct transaction_buffer *trx_buf, uint16_t packet_id, uint8_t reason_code)
{
- (void) log_ctx;
// >> START THE RODEO <<
transaction_buffer_transaction_start(trx_buf);
@@ -1353,7 +1279,7 @@ fail_rollback:
static int mqtt_ng_puback(struct mqtt_ng_client *client, uint16_t packet_id, uint8_t reason_code)
{
- TRY_GENERATE_MESSAGE(mqtt_generate_puback, client, packet_id, reason_code);
+ TRY_GENERATE_MESSAGE(mqtt_generate_puback, packet_id, reason_code);
}
int mqtt_ng_ping(struct mqtt_ng_client *client)
@@ -1370,7 +1296,6 @@ int mqtt_ng_ping(struct mqtt_ng_client *client)
#define MQTT_NG_CLIENT_PROTOCOL_ERROR -1
#define MQTT_NG_CLIENT_SERVER_RETURNED_ERROR -2
#define MQTT_NG_CLIENT_NOT_IMPL_YET -3
-#define MQTT_NG_CLIENT_OOM -4
#define MQTT_NG_CLIENT_INTERNAL_ERROR -5
#define BUF_READ_CHECK_AT_LEAST(buf, x) \
@@ -1379,10 +1304,10 @@ int mqtt_ng_ping(struct mqtt_ng_client *client)
#define vbi_parser_reset_ctx(ctx) memset(ctx, 0, sizeof(struct mqtt_vbi_parser_ctx))
-static int vbi_parser_parse(struct mqtt_vbi_parser_ctx *ctx, rbuf_t data, mqtt_wss_log_ctx_t log)
+static int vbi_parser_parse(struct mqtt_vbi_parser_ctx *ctx, rbuf_t data)
{
if (ctx->bytes > MQTT_VBI_MAXBYTES - 1) {
- mws_error(log, "MQTT Variable Byte Integer can't be longer than %d bytes", MQTT_VBI_MAXBYTES);
+ nd_log(NDLS_DAEMON, NDLP_ERR, "MQTT Variable Byte Integer can't be longer than %d bytes", MQTT_VBI_MAXBYTES);
return MQTT_NG_CLIENT_PROTOCOL_ERROR;
}
if (!ctx->bytes || ctx->data[ctx->bytes-1] & MQTT_VBI_CONTINUATION_FLAG) {
@@ -1394,7 +1319,7 @@ static int vbi_parser_parse(struct mqtt_vbi_parser_ctx *ctx, rbuf_t data, mqtt_w
}
if (mqtt_vbi_to_uint32(ctx->data, &ctx->result)) {
- mws_error(log, "MQTT Variable Byte Integer failed to be parsed.");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "MQTT Variable Byte Integer failed to be parsed.");
return MQTT_NG_CLIENT_PROTOCOL_ERROR;
}
@@ -1480,12 +1405,12 @@ struct mqtt_property *get_property_by_id(struct mqtt_property *props, uint8_t pr
}
// Parses [MQTT-2.2.2]
-static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t data, mqtt_wss_log_ctx_t log)
+static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t data)
{
int rc;
switch (ctx->state) {
case PROPERTIES_LENGTH:
- rc = vbi_parser_parse(&ctx->vbi_parser_ctx, data, log);
+ rc = vbi_parser_parse(&ctx->vbi_parser_ctx, data);
if (rc == MQTT_NG_CLIENT_PARSE_DONE) {
ctx->properties_length = ctx->vbi_parser_ctx.result;
ctx->bytes_consumed += ctx->vbi_parser_ctx.bytes;
@@ -1534,7 +1459,7 @@ static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t
ctx->state = PROPERTY_TYPE_STR_BIN_LEN;
break;
default:
- mws_error(log, "Unsupported property type %d for property id %d.", (int)ctx->tail->type, (int)ctx->tail->id);
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Unsupported property type %d for property id %d.", (int)ctx->tail->type, (int)ctx->tail->id);
return MQTT_NG_CLIENT_PROTOCOL_ERROR;
}
break;
@@ -1552,7 +1477,7 @@ static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t
ctx->state = PROPERTY_TYPE_STR;
break;
default:
- mws_error(log, "Unexpected datatype in PROPERTY_TYPE_STR_BIN_LEN %d", (int)ctx->tail->type);
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Unexpected datatype in PROPERTY_TYPE_STR_BIN_LEN %d", (int)ctx->tail->type);
return MQTT_NG_CLIENT_INTERNAL_ERROR;
}
break;
@@ -1577,7 +1502,7 @@ static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t
ctx->state = PROPERTY_NEXT;
break;
case PROPERTY_TYPE_VBI:
- rc = vbi_parser_parse(&ctx->vbi_parser_ctx, data, log);
+ rc = vbi_parser_parse(&ctx->vbi_parser_ctx, data);
if (rc == MQTT_NG_CLIENT_PARSE_DONE) {
ctx->tail->data.uint32 = ctx->vbi_parser_ctx.result;
ctx->bytes_consumed += ctx->vbi_parser_ctx.bytes;
@@ -1627,9 +1552,9 @@ static int parse_connack_varhdr(struct mqtt_ng_client *client)
mqtt_properties_parser_ctx_reset(&parser->properties_parser);
break;
case MQTT_PARSE_VARHDR_PROPS:
- return parse_properties_array(&parser->properties_parser, parser->received_data, client->log);
+ return parse_properties_array(&parser->properties_parser, parser->received_data);
default:
- ERROR("invalid state for connack varhdr parser");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "invalid state for connack varhdr parser");
return MQTT_NG_CLIENT_INTERNAL_ERROR;
}
return MQTT_NG_CLIENT_OK_CALL_AGAIN;
@@ -1653,9 +1578,9 @@ static int parse_disconnect_varhdr(struct mqtt_ng_client *client)
mqtt_properties_parser_ctx_reset(&parser->properties_parser);
break;
case MQTT_PARSE_VARHDR_PROPS:
- return parse_properties_array(&parser->properties_parser, parser->received_data, client->log);
+ return parse_properties_array(&parser->properties_parser, parser->received_data);
default:
- ERROR("invalid state for connack varhdr parser");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "invalid state for connack varhdr parser");
return MQTT_NG_CLIENT_INTERNAL_ERROR;
}
return MQTT_NG_CLIENT_OK_CALL_AGAIN;
@@ -1691,9 +1616,9 @@ static int parse_puback_varhdr(struct mqtt_ng_client *client)
mqtt_properties_parser_ctx_reset(&parser->properties_parser);
/* FALLTHROUGH */
case MQTT_PARSE_VARHDR_PROPS:
- return parse_properties_array(&parser->properties_parser, parser->received_data, client->log);
+ return parse_properties_array(&parser->properties_parser, parser->received_data);
default:
- ERROR("invalid state for puback varhdr parser");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "invalid state for puback varhdr parser");
return MQTT_NG_CLIENT_INTERNAL_ERROR;
}
return MQTT_NG_CLIENT_OK_CALL_AGAIN;
@@ -1716,7 +1641,7 @@ static int parse_suback_varhdr(struct mqtt_ng_client *client)
mqtt_properties_parser_ctx_reset(&parser->properties_parser);
/* FALLTHROUGH */
case MQTT_PARSE_VARHDR_PROPS:
- rc = parse_properties_array(&parser->properties_parser, parser->received_data, client->log);
+ rc = parse_properties_array(&parser->properties_parser, parser->received_data);
if (rc != MQTT_NG_CLIENT_PARSE_DONE)
return rc;
parser->mqtt_parsed_len += parser->properties_parser.bytes_consumed;
@@ -1737,7 +1662,7 @@ static int parse_suback_varhdr(struct mqtt_ng_client *client)
return MQTT_NG_CLIENT_NEED_MORE_BYTES;
default:
- ERROR("invalid state for suback varhdr parser");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "invalid state for suback varhdr parser");
return MQTT_NG_CLIENT_INTERNAL_ERROR;
}
return MQTT_NG_CLIENT_OK_CALL_AGAIN;
@@ -1761,8 +1686,6 @@ static int parse_publish_varhdr(struct mqtt_ng_client *client)
break;
}
publish->topic = callocz(1, publish->topic_len + 1 /* add 0x00 */);
- if (publish->topic == NULL)
- return MQTT_NG_CLIENT_OOM;
parser->varhdr_state = MQTT_PARSE_VARHDR_TOPICNAME;
/* FALLTHROUGH */
case MQTT_PARSE_VARHDR_TOPICNAME:
@@ -1788,7 +1711,7 @@ static int parse_publish_varhdr(struct mqtt_ng_client *client)
parser->mqtt_parsed_len += 2;
/* FALLTHROUGH */
case MQTT_PARSE_VARHDR_PROPS:
- rc = parse_properties_array(&parser->properties_parser, parser->received_data, client->log);
+ rc = parse_properties_array(&parser->properties_parser, parser->received_data);
if (rc != MQTT_NG_CLIENT_PARSE_DONE)
return rc;
parser->mqtt_parsed_len += parser->properties_parser.bytes_consumed;
@@ -1798,7 +1721,7 @@ static int parse_publish_varhdr(struct mqtt_ng_client *client)
if (parser->mqtt_fixed_hdr_remaining_length < parser->mqtt_parsed_len) {
freez(publish->topic);
publish->topic = NULL;
- ERROR("Error parsing PUBLISH message");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Error parsing PUBLISH message");
return MQTT_NG_CLIENT_PROTOCOL_ERROR;
}
publish->data_len = parser->mqtt_fixed_hdr_remaining_length - parser->mqtt_parsed_len;
@@ -1809,18 +1732,12 @@ static int parse_publish_varhdr(struct mqtt_ng_client *client)
BUF_READ_CHECK_AT_LEAST(parser->received_data, publish->data_len);
publish->data = mallocz(publish->data_len);
- if (publish->data == NULL) {
- freez(publish->topic);
- publish->topic = NULL;
- return MQTT_NG_CLIENT_OOM;
- }
-
rbuf_pop(parser->received_data, publish->data, publish->data_len);
parser->mqtt_parsed_len += publish->data_len;
return MQTT_NG_CLIENT_PARSE_DONE;
default:
- ERROR("invalid state for publish varhdr parser");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "invalid state for publish varhdr parser");
return MQTT_NG_CLIENT_INTERNAL_ERROR;
}
return MQTT_NG_CLIENT_OK_CALL_AGAIN;
@@ -1840,7 +1757,7 @@ static int parse_data(struct mqtt_ng_client *client)
parser->state = MQTT_PARSE_FIXED_HEADER_LEN;
break;
case MQTT_PARSE_FIXED_HEADER_LEN:
- rc = vbi_parser_parse(&parser->vbi_parser, parser->received_data, client->log);
+ rc = vbi_parser_parse(&parser->vbi_parser, parser->received_data);
if (rc == MQTT_NG_CLIENT_PARSE_DONE) {
parser->mqtt_fixed_hdr_remaining_length = parser->vbi_parser.result;
parser->state = MQTT_PARSE_VARIABLE_HEADER;
@@ -1883,10 +1800,11 @@ static int parse_data(struct mqtt_ng_client *client)
return rc;
case MQTT_CPT_PINGRESP:
if (parser->mqtt_fixed_hdr_remaining_length) {
- ERROR ("PINGRESP has to be 0 Remaining Length."); // [MQTT-3.13.1]
+ nd_log(NDLS_DAEMON, NDLP_ERR, "PINGRESP has to be 0 Remaining Length."); // [MQTT-3.13.1]
return MQTT_NG_CLIENT_PROTOCOL_ERROR;
}
parser->state = MQTT_PARSE_MQTT_PACKET_DONE;
+ ping_timeout = 0;
break;
case MQTT_CPT_DISCONNECT:
rc = parse_disconnect_varhdr(client);
@@ -1896,7 +1814,7 @@ static int parse_data(struct mqtt_ng_client *client)
}
return rc;
default:
- ERROR("Parsing Control Packet Type %" PRIu8 " not implemented yet.", get_control_packet_type(parser->mqtt_control_packet_type));
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Parsing Control Packet Type %" PRIu8 " not implemented yet.", get_control_packet_type(parser->mqtt_control_packet_type));
rbuf_bump_tail(parser->received_data, parser->mqtt_fixed_hdr_remaining_length);
parser->state = MQTT_PARSE_MQTT_PACKET_DONE;
return MQTT_NG_CLIENT_NOT_IMPL_YET;
@@ -1916,12 +1834,12 @@ static int parse_data(struct mqtt_ng_client *client)
// return -1 on error
// return 0 if there is fragment set
static int mqtt_ng_next_to_send(struct mqtt_ng_client *client) {
- if (client->client_state == CONNECT_PENDING) {
+ if (client->client_state == MQTT_STATE_CONNECT_PENDING) {
client->main_buffer.sending_frag = client->connect_msg;
- client->client_state = CONNECTING;
+ client->client_state = MQTT_STATE_CONNECTING;
return 0;
}
- if (client->client_state != CONNECTED)
+ if (client->client_state != MQTT_STATE_CONNECTED)
return -1;
struct buffer_fragment *frag = BUFFER_FIRST_FRAG(&client->main_buffer.hdr_buffer);
@@ -1959,7 +1877,7 @@ static int send_fragment(struct mqtt_ng_client *client) {
if (bytes)
processed = client->send_fnc_ptr(client->user_ctx, ptr, bytes);
else
- WARN("This fragment was fully sent already. This should not happen!");
+ nd_log(NDLS_DAEMON, NDLP_WARNING, "This fragment was fully sent already. This should not happen!");
frag->sent += processed;
if (frag->sent != frag->len)
@@ -1967,11 +1885,11 @@ static int send_fragment(struct mqtt_ng_client *client) {
if (frag->flags & BUFFER_FRAG_MQTT_PACKET_TAIL) {
client->time_of_last_send = time(NULL);
- pthread_mutex_lock(&client->stats_mutex);
+ spinlock_lock(&client->stats_spinlock);
if (client->main_buffer.sending_frag != &ping_frag)
client->stats.tx_messages_queued--;
client->stats.tx_messages_sent++;
- pthread_mutex_unlock(&client->stats_mutex);
+ spinlock_unlock(&client->stats_spinlock);
client->main_buffer.sending_frag = NULL;
return 1;
}
@@ -1995,7 +1913,7 @@ static void try_send_all(struct mqtt_ng_client *client) {
} while(send_all_message_fragments(client) >= 0);
}
-static inline void mark_message_for_gc(struct buffer_fragment *frag)
+static void mark_message_for_gc(struct buffer_fragment *frag)
{
while (frag) {
frag->flags |= BUFFER_FRAG_GARBAGE_COLLECT;
@@ -2013,7 +1931,7 @@ static int mark_packet_acked(struct mqtt_ng_client *client, uint16_t packet_id)
while (frag) {
if ( (frag->flags & BUFFER_FRAG_MQTT_PACKET_HEAD) && frag->packet_id == packet_id) {
if (!frag->sent) {
- ERROR("Received packet_id (%" PRIu16 ") belongs to MQTT packet which was not yet sent!", packet_id);
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Received packet_id (%" PRIu16 ") belongs to MQTT packet which was not yet sent!", packet_id);
UNLOCK_HDR_BUFFER(&client->main_buffer);
return 1;
}
@@ -2023,7 +1941,7 @@ static int mark_packet_acked(struct mqtt_ng_client *client, uint16_t packet_id)
}
frag = frag->next;
}
- ERROR("Received packet_id (%" PRIu16 ") is unknown!", packet_id);
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Received packet_id (%" PRIu16 ") is unknown!", packet_id);
UNLOCK_HDR_BUFFER(&client->main_buffer);
return 1;
}
@@ -2031,110 +1949,113 @@ static int mark_packet_acked(struct mqtt_ng_client *client, uint16_t packet_id)
int handle_incoming_traffic(struct mqtt_ng_client *client)
{
int rc;
+ while ((rc = parse_data(client)) == MQTT_NG_CLIENT_OK_CALL_AGAIN) {
+ ;
+ }
+ if (rc != MQTT_NG_CLIENT_MQTT_PACKET_DONE)
+ return rc;
+
struct mqtt_publish *pub;
- while( (rc = parse_data(client)) == MQTT_NG_CLIENT_OK_CALL_AGAIN );
- if ( rc == MQTT_NG_CLIENT_MQTT_PACKET_DONE ) {
- struct mqtt_property *prop;
-#ifdef MQTT_DEBUG_VERBOSE
- DEBUG("MQTT Packet Parsed Successfully!");
-#endif
- pthread_mutex_lock(&client->stats_mutex);
- client->stats.rx_messages_rcvd++;
- pthread_mutex_unlock(&client->stats_mutex);
-
- switch (get_control_packet_type(client->parser.mqtt_control_packet_type)) {
- case MQTT_CPT_CONNACK:
-#ifdef MQTT_DEBUG_VERBOSE
- DEBUG("Received CONNACK");
-#endif
- LOCK_HDR_BUFFER(&client->main_buffer);
- mark_message_for_gc(client->connect_msg);
- UNLOCK_HDR_BUFFER(&client->main_buffer);
- client->connect_msg = NULL;
- if (client->client_state != CONNECTING) {
- ERROR("Received unexpected CONNACK");
- client->client_state = ERROR;
- return MQTT_NG_CLIENT_PROTOCOL_ERROR;
- }
- if ((prop = get_property_by_id(client->parser.properties_parser.head, MQTT_PROP_MAX_PKT_SIZE)) != NULL) {
- INFO("MQTT server limits message size to %" PRIu32, prop->data.uint32);
- client->max_msg_size = prop->data.uint32;
- }
- if (client->connack_callback)
- client->connack_callback(client->user_ctx, client->parser.mqtt_packet.connack.reason_code);
- if (!client->parser.mqtt_packet.connack.reason_code) {
- INFO("MQTT Connection Accepted By Server");
- client->client_state = CONNECTED;
- break;
- }
- client->client_state = ERROR;
- return MQTT_NG_CLIENT_SERVER_RETURNED_ERROR;
- case MQTT_CPT_PUBACK:
-#ifdef MQTT_DEBUG_VERBOSE
- DEBUG("Received PUBACK %" PRIu16, client->parser.mqtt_packet.puback.packet_id);
-#endif
- if (mark_packet_acked(client, client->parser.mqtt_packet.puback.packet_id))
- return MQTT_NG_CLIENT_PROTOCOL_ERROR;
- if (client->puback_callback)
- client->puback_callback(client->parser.mqtt_packet.puback.packet_id);
- break;
- case MQTT_CPT_PINGRESP:
-#ifdef MQTT_DEBUG_VERBOSE
- DEBUG("Received PINGRESP");
-#endif
- break;
- case MQTT_CPT_SUBACK:
-#ifdef MQTT_DEBUG_VERBOSE
- DEBUG("Received SUBACK %" PRIu16, client->parser.mqtt_packet.suback.packet_id);
-#endif
- if (mark_packet_acked(client, client->parser.mqtt_packet.suback.packet_id))
- return MQTT_NG_CLIENT_PROTOCOL_ERROR;
+ struct mqtt_property *prop;
+ spinlock_lock(&client->stats_spinlock);
+ client->stats.rx_messages_rcvd++;
+ spinlock_unlock(&client->stats_spinlock);
+
+ uint8_t ctrl_packet_type = get_control_packet_type(client->parser.mqtt_control_packet_type);
+ switch (ctrl_packet_type) {
+ case MQTT_CPT_CONNACK:
+ LOCK_HDR_BUFFER(&client->main_buffer);
+ mark_message_for_gc(client->connect_msg);
+ UNLOCK_HDR_BUFFER(&client->main_buffer);
+
+ client->connect_msg = NULL;
+
+ if (client->client_state != MQTT_STATE_CONNECTING) {
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Received unexpected CONNACK");
+ client->client_state = MQTT_STATE_ERROR;
+ return MQTT_NG_CLIENT_PROTOCOL_ERROR;
+ }
+
+ if ((prop = get_property_by_id(client->parser.properties_parser.head, MQTT_PROP_MAX_PKT_SIZE)) != NULL) {
+ nd_log(NDLS_DAEMON, NDLP_INFO, "MQTT server limits message size to %" PRIu32, prop->data.uint32);
+ client->max_msg_size = prop->data.uint32;
+ }
+
+ if (client->connack_callback)
+ client->connack_callback(client->user_ctx, client->parser.mqtt_packet.connack.reason_code);
+ if (!client->parser.mqtt_packet.connack.reason_code) {
+ nd_log(NDLS_DAEMON, NDLP_INFO, "MQTT Connection Accepted By Server");
+ client->client_state = MQTT_STATE_CONNECTED;
break;
- case MQTT_CPT_PUBLISH:
-#ifdef MQTT_DEBUG_VERBOSE
- DEBUG("Recevied PUBLISH");
-#endif
- pub = &client->parser.mqtt_packet.publish;
- if (pub->qos > 1) {
- freez(pub->topic);
- freez(pub->data);
- return MQTT_NG_CLIENT_NOT_IMPL_YET;
- }
- if ( pub->qos == 1 && (rc = mqtt_ng_puback(client, pub->packet_id, 0)) ) {
- client->client_state = ERROR;
- ERROR("Error generating PUBACK reply for PUBLISH");
- return rc;
- }
- if ( (prop = get_property_by_id(client->parser.properties_parser.head, MQTT_PROP_TOPIC_ALIAS)) != NULL ) {
- // Topic Alias property was sent from server
- void *topic_ptr;
- if (!c_rhash_get_ptr_by_uint64(client->rx_aliases, prop->data.uint8, &topic_ptr)) {
- if (pub->topic != NULL) {
- ERROR("We do not yet support topic alias reassignment");
- return MQTT_NG_CLIENT_NOT_IMPL_YET;
- }
- pub->topic = topic_ptr;
- } else {
- if (pub->topic == NULL) {
- ERROR("Topic alias with id %d unknown and topic not set by server!", prop->data.uint8);
- return MQTT_NG_CLIENT_PROTOCOL_ERROR;
- }
- c_rhash_insert_uint64_ptr(client->rx_aliases, prop->data.uint8, pub->topic);
+ }
+ client->client_state = MQTT_STATE_ERROR;
+ return MQTT_NG_CLIENT_SERVER_RETURNED_ERROR;
+
+ case MQTT_CPT_PUBACK:
+ if (mark_packet_acked(client, client->parser.mqtt_packet.puback.packet_id))
+ return MQTT_NG_CLIENT_PROTOCOL_ERROR;
+ if (client->puback_callback)
+ client->puback_callback(client->parser.mqtt_packet.puback.packet_id);
+ break;
+
+ case MQTT_CPT_PINGRESP:
+ break;
+
+ case MQTT_CPT_SUBACK:
+ if (mark_packet_acked(client, client->parser.mqtt_packet.suback.packet_id))
+ return MQTT_NG_CLIENT_PROTOCOL_ERROR;
+ break;
+
+ case MQTT_CPT_PUBLISH:
+ pub = &client->parser.mqtt_packet.publish;
+
+ if (pub->qos > 1) {
+ freez(pub->topic);
+ freez(pub->data);
+ return MQTT_NG_CLIENT_NOT_IMPL_YET;
+ }
+
+ if ( pub->qos == 1 && ((rc = mqtt_ng_puback(client, pub->packet_id, 0))) ) {
+ client->client_state = MQTT_STATE_ERROR;
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Error generating PUBACK reply for PUBLISH");
+ return rc;
+ }
+
+ if ( (prop = get_property_by_id(client->parser.properties_parser.head, MQTT_PROP_TOPIC_ALIAS)) != NULL ) {
+ // Topic Alias property was sent from server
+ void *topic_ptr;
+ if (!c_rhash_get_ptr_by_uint64(client->rx_aliases, prop->data.uint8, &topic_ptr)) {
+ if (pub->topic != NULL) {
+ nd_log(NDLS_DAEMON, NDLP_ERR, "We do not yet support topic alias reassignment");
+ return MQTT_NG_CLIENT_NOT_IMPL_YET;
}
+ pub->topic = topic_ptr;
+ } else {
+ if (pub->topic == NULL) {
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Topic alias with id %d unknown and topic not set by server!", prop->data.uint8);
+ return MQTT_NG_CLIENT_PROTOCOL_ERROR;
+ }
+ c_rhash_insert_uint64_ptr(client->rx_aliases, prop->data.uint8, pub->topic);
}
- if (client->msg_callback)
- client->msg_callback(pub->topic, pub->data, pub->data_len, pub->qos);
- // in case we have property topic alias and we have topic we take over the string
- // and add pointer to it into topic alias list
- if (prop == NULL)
- freez(pub->topic);
- freez(pub->data);
- return MQTT_NG_CLIENT_WANT_WRITE;
- case MQTT_CPT_DISCONNECT:
- INFO ("Got MQTT DISCONNECT control packet from server. Reason code: %d", (int)client->parser.mqtt_packet.disconnect.reason_code);
- client->client_state = DISCONNECTED;
- break;
- }
+ }
+
+ if (client->msg_callback)
+ client->msg_callback(pub->topic, pub->data, pub->data_len, pub->qos);
+ // in case we have property topic alias and we have topic we take over the string
+ // and add pointer to it into topic alias list
+ if (prop == NULL)
+ freez(pub->topic);
+ freez(pub->data);
+ return MQTT_NG_CLIENT_WANT_WRITE;
+
+ case MQTT_CPT_DISCONNECT:
+ nd_log(NDLS_DAEMON, NDLP_INFO, "Got MQTT DISCONNECT control packet from server. Reason code: %d", (int)client->parser.mqtt_packet.disconnect.reason_code);
+ client->client_state = MQTT_STATE_DISCONNECTED;
+ break;
+
+ default:
+ nd_log(NDLS_DAEMON, NDLP_INFO, "Got unknown control packet %u from server", ctrl_packet_type);
+ break;
}
return rc;
@@ -2142,10 +2063,10 @@ int handle_incoming_traffic(struct mqtt_ng_client *client)
int mqtt_ng_sync(struct mqtt_ng_client *client)
{
- if (client->client_state == RAW || client->client_state == DISCONNECTED)
+ if (client->client_state == MQTT_STATE_RAW || client->client_state == MQTT_STATE_DISCONNECTED)
return 0;
- if (client->client_state == ERROR)
+ if (client->client_state == MQTT_STATE_ERROR)
return 1;
LOCK_HDR_BUFFER(&client->main_buffer);
@@ -2182,9 +2103,9 @@ void mqtt_ng_set_max_mem(struct mqtt_ng_client *client, size_t bytes)
void mqtt_ng_get_stats(struct mqtt_ng_client *client, struct mqtt_ng_stats *stats)
{
- pthread_mutex_lock(&client->stats_mutex);
+ spinlock_lock(&client->stats_spinlock);
memcpy(stats, &client->stats, sizeof(struct mqtt_ng_stats));
- pthread_mutex_unlock(&client->stats_mutex);
+ spinlock_unlock(&client->stats_spinlock);
stats->tx_bytes_queued = 0;
stats->tx_buffer_reclaimable = 0;
@@ -2207,11 +2128,11 @@ void mqtt_ng_get_stats(struct mqtt_ng_client *client, struct mqtt_ng_stats *stat
int mqtt_ng_set_topic_alias(struct mqtt_ng_client *client, const char *topic)
{
uint16_t idx;
- pthread_rwlock_wrlock(&client->tx_topic_aliases.rwlock);
+ spinlock_lock(&client->tx_topic_aliases.spinlock);
if (client->tx_topic_aliases.idx_assigned >= client->tx_topic_aliases.idx_max) {
- pthread_rwlock_unlock(&client->tx_topic_aliases.rwlock);
- mws_error(client->log, "Tx topic alias indexes were exhausted (current version of the library doesn't support reassigning yet. Feel free to contribute.");
+ spinlock_unlock(&client->tx_topic_aliases.spinlock);
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Tx topic alias indexes were exhausted (current version of the library doesn't support reassigning yet. Feel free to contribute.");
return 0; //0 is not a valid topic alias
}
@@ -2220,8 +2141,8 @@ int mqtt_ng_set_topic_alias(struct mqtt_ng_client *client, const char *topic)
// this is not a problem for library but might be helpful to warn user
// as it might indicate bug in their program (but also might be expected)
idx = alias->idx;
- pthread_rwlock_unlock(&client->tx_topic_aliases.rwlock);
- mws_debug(client->log, "%s topic \"%s\" already has alias set. Ignoring.", __FUNCTION__, topic);
+ spinlock_unlock(&client->tx_topic_aliases.spinlock);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "%s topic \"%s\" already has alias set. Ignoring.", __FUNCTION__, topic);
return idx;
}
@@ -2232,6 +2153,6 @@ int mqtt_ng_set_topic_alias(struct mqtt_ng_client *client, const char *topic)
c_rhash_insert_str_ptr(client->tx_topic_aliases.stoi_dict, topic, (void*)alias);
- pthread_rwlock_unlock(&client->tx_topic_aliases.rwlock);
+ spinlock_unlock(&client->tx_topic_aliases.spinlock);
return idx;
}