summaryrefslogtreecommitdiffstats
path: root/src/aclk
diff options
context:
space:
mode:
Diffstat (limited to 'src/aclk')
-rw-r--r--src/aclk/aclk.c63
-rw-r--r--src/aclk/aclk_capas.c4
-rw-r--r--src/aclk/aclk_rx_msgs.c12
-rw-r--r--src/aclk/helpers/mqtt_wss_pal.h6
-rw-r--r--src/aclk/https_client.c2
-rw-r--r--src/aclk/mqtt_websockets/mqtt_ng.c56
-rw-r--r--src/aclk/mqtt_websockets/mqtt_wss_client.c148
-rw-r--r--src/aclk/mqtt_websockets/mqtt_wss_log.c12
-rw-r--r--src/aclk/mqtt_websockets/ws_client.c84
-rw-r--r--src/aclk/schema-wrappers/alarm_stream.cc3
-rw-r--r--src/aclk/schema-wrappers/alarm_stream.h12
11 files changed, 185 insertions, 217 deletions
diff --git a/src/aclk/aclk.c b/src/aclk/aclk.c
index 99174549..627edfc9 100644
--- a/src/aclk/aclk.c
+++ b/src/aclk/aclk.c
@@ -52,9 +52,9 @@ time_t aclk_block_until = 0;
#ifdef ENABLE_ACLK
mqtt_wss_client mqttwss_client;
-netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
-#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex)
-#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
+//netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
+//#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex)
+//#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
struct aclk_shared_state aclk_shared_state = {
.mqtt_shutdown_msg_id = -1,
@@ -1058,30 +1058,24 @@ void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, con
static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host)
{
- struct proto_alert_status status;
- memset(&status, 0, sizeof(status));
- if (get_proto_alert_status(host, &status)) {
- buffer_strcat(wb, "\nFailed to get alert streaming status for this host");
+ struct aclk_sync_cfg_t *wc = host->aclk_config;
+ if (!wc)
return;
- }
+
buffer_sprintf(wb,
"\n\t\tUpdates: %d"
- "\n\t\tPending Min Seq ID: %"PRIu64
- "\n\t\tPending Max Seq ID: %"PRIu64
- "\n\t\tLast Submitted Seq ID: %"PRIu64,
- status.alert_updates,
- status.pending_min_sequence_id,
- status.pending_max_sequence_id,
- status.last_submitted_sequence_id
+ "\n\t\tCheckpoints: %d"
+ "\n\t\tAlert count: %d"
+ "\n\t\tAlert snapshot count: %d",
+ wc->stream_alerts,
+ wc->checkpoint_count,
+ wc->alert_count,
+ wc->snapshot_count
);
}
-#endif /* ENABLE_ACLK */
char *aclk_state(void)
{
-#ifndef ENABLE_ACLK
- return strdupz("ACLK Available: No");
-#else
BUFFER *wb = buffer_create(1024, &netdata_buffers_statistics.buffers_aclk);
struct tm *tmptr, tmbuf;
char *ret;
@@ -1163,28 +1157,25 @@ char *aclk_state(void)
ret = strdupz(buffer_tostring(wb));
buffer_free(wb);
return ret;
-#endif /* ENABLE_ACLK */
}
-#ifdef ENABLE_ACLK
static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host)
{
- struct proto_alert_status status;
- memset(&status, 0, sizeof(status));
- if (get_proto_alert_status(host, &status))
+ struct aclk_sync_cfg_t *wc = host->aclk_config;
+ if (!wc)
return;
- json_object *tmp = json_object_new_int(status.alert_updates);
+ json_object *tmp = json_object_new_int(wc->stream_alerts);
json_object_object_add(obj, "updates", tmp);
- tmp = json_object_new_int(status.pending_min_sequence_id);
- json_object_object_add(obj, "pending-min-seq-id", tmp);
+ tmp = json_object_new_int(wc->checkpoint_count);
+ json_object_object_add(obj, "checkpoint-count", tmp);
- tmp = json_object_new_int(status.pending_max_sequence_id);
- json_object_object_add(obj, "pending-max-seq-id", tmp);
+ tmp = json_object_new_int(wc->alert_count);
+ json_object_object_add(obj, "alert-count", tmp);
- tmp = json_object_new_int(status.last_submitted_sequence_id);
- json_object_object_add(obj, "last-submitted-seq-id", tmp);
+ tmp = json_object_new_int(wc->snapshot_count);
+ json_object_object_add(obj, "alert-snapshot-count", tmp);
}
static json_object *timestamp_to_json(const time_t *t)
@@ -1197,13 +1188,9 @@ static json_object *timestamp_to_json(const time_t *t)
}
return NULL;
}
-#endif /* ENABLE_ACLK */
char *aclk_state_json(void)
{
-#ifndef ENABLE_ACLK
- return strdupz("{\"aclk-available\":false}");
-#else
json_object *tmp, *grp, *msg = json_object_new_object();
tmp = json_object_new_boolean(1);
@@ -1313,8 +1300,8 @@ char *aclk_state_json(void)
char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN));
json_object_put(msg);
return str;
-#endif /* ENABLE_ACLK */
}
+#endif /* ENABLE_ACLK */
void add_aclk_host_labels(void) {
RRDLABELS *labels = localhost->rrdlabels;
@@ -1347,7 +1334,7 @@ void add_aclk_host_labels(void) {
void aclk_queue_node_info(RRDHOST *host, bool immediate)
{
- struct aclk_sync_cfg_t *wc = host->aclk_config;
- if (likely(wc))
+ struct aclk_sync_cfg_t *wc = host->aclk_config;
+ if (wc)
wc->node_info_send_time = (host == localhost || immediate) ? 1 : now_realtime_sec();
}
diff --git a/src/aclk/aclk_capas.c b/src/aclk/aclk_capas.c
index 00102ad4..0f7870fd 100644
--- a/src/aclk/aclk_capas.c
+++ b/src/aclk/aclk_capas.c
@@ -16,7 +16,7 @@ const struct capability *aclk_get_agent_capas()
{ .name = "ctx", .version = 1, .enabled = 1 },
{ .name = "funcs", .version = 1, .enabled = 1 },
{ .name = "http_api_v2", .version = HTTP_API_V2_VERSION, .enabled = 1 },
- { .name = "health", .version = 1, .enabled = 0 }, // index 7, below
+ { .name = "health", .version = 2, .enabled = 0 }, // index 7, below
{ .name = "req_cancel", .version = 1, .enabled = 1 },
{ .name = "dyncfg", .version = 2, .enabled = 1 },
{ .name = NULL, .version = 0, .enabled = 0 }
@@ -46,7 +46,7 @@ struct capability *aclk_get_node_instance_capas(RRDHOST *host)
{ .name = "ctx", .version = 1, .enabled = 1 },
{ .name = "funcs", .version = functions ? 1 : 0, .enabled = functions ? 1 : 0 },
{ .name = "http_api_v2", .version = HTTP_API_V2_VERSION, .enabled = 1 },
- { .name = "health", .version = 1, .enabled = host->health.health_enabled },
+ { .name = "health", .version = 2, .enabled = host->health.health_enabled },
{ .name = "req_cancel", .version = 1, .enabled = 1 },
{ .name = "dyncfg", .version = 2, .enabled = dyncfg },
{ .name = NULL, .version = 0, .enabled = 0 }
diff --git a/src/aclk/aclk_rx_msgs.c b/src/aclk/aclk_rx_msgs.c
index 60e42192..8db8e3f1 100644
--- a/src/aclk/aclk_rx_msgs.c
+++ b/src/aclk/aclk_rx_msgs.c
@@ -106,13 +106,13 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur
else if(strncmp(payload, "DELETE /", 8) == 0)
start = payload + 7;
else {
- errno = 0;
+ errno_clear();
netdata_log_error("Only accepting requests that start with GET, POST, PUT, DELETE from CLOUD.");
return 1;
}
if(!(end = strstr(payload, HTTP_1_1 HTTP_ENDL))) {
- errno = 0;
+ errno_clear();
netdata_log_error("Doesn't look like HTTP GET request.");
return 1;
}
@@ -127,7 +127,7 @@ static int aclk_handle_cloud_http_request_v2(struct aclk_request *cloud_to_agent
{
aclk_query_t query;
- errno = 0;
+ errno_clear();
if (cloud_to_agent->version < ACLK_V_COMPRESSION) {
netdata_log_error(
"This handler cannot reply to request with version older than %d, received %d.",
@@ -347,7 +347,7 @@ int start_alarm_streaming(const char *msg, size_t msg_len)
netdata_log_error("Error parsing StartAlarmStreaming");
return 1;
}
- aclk_start_alert_streaming(res.node_id, res.resets);
+ aclk_start_alert_streaming(res.node_id, res.version);
freez(res.node_id);
return 0;
}
@@ -361,7 +361,7 @@ int send_alarm_checkpoint(const char *msg, size_t msg_len)
freez(sac.claim_id);
return 1;
}
- aclk_send_alarm_checkpoint(sac.node_id, sac.claim_id);
+ aclk_alert_version_check(sac.node_id, sac.claim_id, sac.version);
freez(sac.node_id);
freez(sac.claim_id);
return 0;
@@ -375,7 +375,7 @@ int send_alarm_configuration(const char *msg, size_t msg_len)
freez(config_hash);
return 1;
}
- aclk_send_alarm_configuration(config_hash);
+ aclk_send_alert_configuration(config_hash);
freez(config_hash);
return 0;
}
diff --git a/src/aclk/helpers/mqtt_wss_pal.h b/src/aclk/helpers/mqtt_wss_pal.h
index 5c89f8bb..fe1aacf4 100644
--- a/src/aclk/helpers/mqtt_wss_pal.h
+++ b/src/aclk/helpers/mqtt_wss_pal.h
@@ -10,10 +10,4 @@
#undef OPENSSL_VERSION_110
#undef OPENSSL_VERSION_111
-#define mw_malloc(...) mallocz(__VA_ARGS__)
-#define mw_calloc(...) callocz(__VA_ARGS__)
-#define mw_free(...) freez(__VA_ARGS__)
-#define mw_strdup(...) strdupz(__VA_ARGS__)
-#define mw_realloc(...) reallocz(__VA_ARGS__)
-
#endif /* MQTT_WSS_PAL_H */
diff --git a/src/aclk/https_client.c b/src/aclk/https_client.c
index 2bc768f2..8c44f13e 100644
--- a/src/aclk/https_client.c
+++ b/src/aclk/https_client.c
@@ -696,7 +696,7 @@ int https_request(https_req_t *request, https_req_response_t *response) {
goto exit_CTX;
}
- if (!SSL_set_tlsext_host_name(ctx->ssl, connect_host)) {
+ if (!SSL_set_tlsext_host_name(ctx->ssl, request->host)) {
netdata_log_error("Error setting TLS SNI host");
goto exit_CTX;
}
diff --git a/src/aclk/mqtt_websockets/mqtt_ng.c b/src/aclk/mqtt_websockets/mqtt_ng.c
index f570fde7..8ad6bd5c 100644
--- a/src/aclk/mqtt_websockets/mqtt_ng.c
+++ b/src/aclk/mqtt_websockets/mqtt_ng.c
@@ -423,7 +423,7 @@ static void buffer_frag_free_data(struct buffer_fragment *frag)
if ( frag->flags & BUFFER_FRAG_DATA_EXTERNAL && frag->data != NULL) {
switch (ptr2memory_mode(frag->free_fnc)) {
case MEMCPY:
- mw_free(frag->data);
+ freez(frag->data);
break;
case EXTERNAL_FREE_AFTER_USE:
frag->free_fnc(frag->data);
@@ -563,7 +563,7 @@ static int transaction_buffer_grow(struct transaction_buffer *buf, mqtt_wss_log_
if (buf->hdr_buffer.size > max)
buf->hdr_buffer.size = max;
- void *ret = mw_realloc(buf->hdr_buffer.data, buf->hdr_buffer.size);
+ void *ret = reallocz(buf->hdr_buffer.data, buf->hdr_buffer.size);
if (ret == NULL) {
mws_warn(log_ctx, "Buffer growth failed (realloc)");
return 1;
@@ -581,7 +581,7 @@ inline static int transaction_buffer_init(struct transaction_buffer *to_init, si
pthread_mutex_init(&to_init->mutex, NULL);
to_init->hdr_buffer.size = size;
- to_init->hdr_buffer.data = mw_malloc(size);
+ to_init->hdr_buffer.data = mallocz(size);
if (to_init->hdr_buffer.data == NULL)
return 1;
@@ -594,7 +594,7 @@ static void transaction_buffer_destroy(struct transaction_buffer *to_init)
{
buffer_purge(&to_init->hdr_buffer);
pthread_mutex_destroy(&to_init->mutex);
- mw_free(to_init->hdr_buffer.data);
+ freez(to_init->hdr_buffer.data);
}
// Creates transaction
@@ -628,7 +628,7 @@ void transaction_buffer_transaction_rollback(struct transaction_buffer *buf, str
#define RX_ALIASES_INITIALIZE() c_rhash_new(UINT16_MAX >> 8)
struct mqtt_ng_client *mqtt_ng_init(struct mqtt_ng_init *settings)
{
- struct mqtt_ng_client *client = mw_calloc(1, sizeof(struct mqtt_ng_client));
+ struct mqtt_ng_client *client = callocz(1, sizeof(struct mqtt_ng_client));
if (client == NULL)
return NULL;
@@ -672,7 +672,7 @@ err_free_rx_alias:
err_free_trx_buf:
transaction_buffer_destroy(&client->main_buffer);
err_free_client:
- mw_free(client);
+ freez(client);
return NULL;
}
@@ -688,7 +688,7 @@ static void mqtt_ng_destroy_rx_alias_hash(c_rhash hash)
void *to_free;
while(!c_rhash_iter_uint64_keys(hash, &i, &stored_key)) {
c_rhash_get_ptr_by_uint64(hash, stored_key, &to_free);
- mw_free(to_free);
+ freez(to_free);
}
c_rhash_destroy(hash);
}
@@ -700,7 +700,7 @@ static void mqtt_ng_destroy_tx_alias_hash(c_rhash hash)
void *to_free;
while(!c_rhash_iter_str_keys(hash, &i, &stored_key)) {
c_rhash_get_ptr_by_str(hash, stored_key, &to_free);
- mw_free(to_free);
+ freez(to_free);
}
c_rhash_destroy(hash);
}
@@ -714,7 +714,7 @@ void mqtt_ng_destroy(struct mqtt_ng_client *client)
pthread_rwlock_destroy(&client->tx_topic_aliases.rwlock);
mqtt_ng_destroy_rx_alias_hash(client->rx_aliases);
- mw_free(client);
+ freez(client);
}
int frag_set_external_data(mqtt_wss_log_ctx_t log, struct buffer_fragment *frag, void *data, size_t data_len, free_fnc_t data_free_fnc)
@@ -730,7 +730,7 @@ int frag_set_external_data(mqtt_wss_log_ctx_t log, struct buffer_fragment *frag,
switch (ptr2memory_mode(data_free_fnc)) {
case MEMCPY:
- frag->data = mw_malloc(data_len);
+ frag->data = mallocz(data_len);
if (frag->data == NULL) {
mws_error(log, UNIT_LOG_PREFIX "OOM while malloc @_optimized_add");
return 1;
@@ -1408,12 +1408,12 @@ static void mqtt_properties_parser_ctx_reset(struct mqtt_properties_parser_ctx *
struct mqtt_property *f = ctx->head;
ctx->head = ctx->head->next;
if (f->type == MQTT_TYPE_STR || f->type == MQTT_TYPE_STR_PAIR)
- mw_free(f->data.strings[0]);
+ freez(f->data.strings[0]);
if (f->type == MQTT_TYPE_STR_PAIR)
- mw_free(f->data.strings[1]);
+ freez(f->data.strings[1]);
if (f->type == MQTT_TYPE_BIN)
- mw_free(f->data.bindata);
- mw_free(f);
+ freez(f->data.bindata);
+ freez(f);
}
ctx->tail = NULL;
ctx->properties_length = 0;
@@ -1498,7 +1498,7 @@ static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t
return rc;
case PROPERTY_CREATE:
BUF_READ_CHECK_AT_LEAST(data, 1);
- struct mqtt_property *prop = mw_calloc(1, sizeof(struct mqtt_property));
+ struct mqtt_property *prop = callocz(1, sizeof(struct mqtt_property));
if (ctx->head == NULL) {
ctx->head = prop;
ctx->tail = prop;
@@ -1558,7 +1558,7 @@ static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t
break;
case PROPERTY_TYPE_STR:
BUF_READ_CHECK_AT_LEAST(data, ctx->tail->bindata_len);
- ctx->tail->data.strings[ctx->str_idx] = mw_malloc(ctx->tail->bindata_len + 1);
+ ctx->tail->data.strings[ctx->str_idx] = mallocz(ctx->tail->bindata_len + 1);
rbuf_pop(data, ctx->tail->data.strings[ctx->str_idx], ctx->tail->bindata_len);
ctx->tail->data.strings[ctx->str_idx][ctx->tail->bindata_len] = 0;
ctx->str_idx++;
@@ -1571,7 +1571,7 @@ static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t
break;
case PROPERTY_TYPE_BIN:
BUF_READ_CHECK_AT_LEAST(data, ctx->tail->bindata_len);
- ctx->tail->data.bindata = mw_malloc(ctx->tail->bindata_len);
+ ctx->tail->data.bindata = mallocz(ctx->tail->bindata_len);
rbuf_pop(data, ctx->tail->data.bindata, ctx->tail->bindata_len);
ctx->bytes_consumed += ctx->tail->bindata_len;
ctx->state = PROPERTY_NEXT;
@@ -1721,7 +1721,7 @@ static int parse_suback_varhdr(struct mqtt_ng_client *client)
return rc;
parser->mqtt_parsed_len += parser->properties_parser.bytes_consumed;
suback->reason_code_count = parser->mqtt_fixed_hdr_remaining_length - parser->mqtt_parsed_len;
- suback->reason_codes = mw_calloc(suback->reason_code_count, sizeof(*suback->reason_codes));
+ suback->reason_codes = callocz(suback->reason_code_count, sizeof(*suback->reason_codes));
suback->reason_codes_pending = suback->reason_code_count;
parser->varhdr_state = MQTT_PARSE_REASONCODES;
/* FALLTHROUGH */
@@ -1760,7 +1760,7 @@ static int parse_publish_varhdr(struct mqtt_ng_client *client)
parser->varhdr_state = MQTT_PARSE_VARHDR_POST_TOPICNAME;
break;
}
- publish->topic = mw_calloc(1, publish->topic_len + 1 /* add 0x00 */);
+ publish->topic = callocz(1, publish->topic_len + 1 /* add 0x00 */);
if (publish->topic == NULL)
return MQTT_NG_CLIENT_OOM;
parser->varhdr_state = MQTT_PARSE_VARHDR_TOPICNAME;
@@ -1796,7 +1796,7 @@ static int parse_publish_varhdr(struct mqtt_ng_client *client)
/* FALLTHROUGH */
case MQTT_PARSE_PAYLOAD:
if (parser->mqtt_fixed_hdr_remaining_length < parser->mqtt_parsed_len) {
- mw_free(publish->topic);
+ freez(publish->topic);
publish->topic = NULL;
ERROR("Error parsing PUBLISH message");
return MQTT_NG_CLIENT_PROTOCOL_ERROR;
@@ -1808,9 +1808,9 @@ static int parse_publish_varhdr(struct mqtt_ng_client *client)
}
BUF_READ_CHECK_AT_LEAST(parser->received_data, publish->data_len);
- publish->data = mw_malloc(publish->data_len);
+ publish->data = mallocz(publish->data_len);
if (publish->data == NULL) {
- mw_free(publish->topic);
+ freez(publish->topic);
publish->topic = NULL;
return MQTT_NG_CLIENT_OOM;
}
@@ -1867,7 +1867,7 @@ static int parse_data(struct mqtt_ng_client *client)
case MQTT_CPT_SUBACK:
rc = parse_suback_varhdr(client);
if (rc != MQTT_NG_CLIENT_NEED_MORE_BYTES && rc != MQTT_NG_CLIENT_OK_CALL_AGAIN) {
- mw_free(parser->mqtt_packet.suback.reason_codes);
+ freez(parser->mqtt_packet.suback.reason_codes);
}
if (rc == MQTT_NG_CLIENT_PARSE_DONE) {
parser->state = MQTT_PARSE_MQTT_PACKET_DONE;
@@ -2096,8 +2096,8 @@ int handle_incoming_traffic(struct mqtt_ng_client *client)
#endif
pub = &client->parser.mqtt_packet.publish;
if (pub->qos > 1) {
- mw_free(pub->topic);
- mw_free(pub->data);
+ freez(pub->topic);
+ freez(pub->data);
return MQTT_NG_CLIENT_NOT_IMPL_YET;
}
if ( pub->qos == 1 && (rc = mqtt_ng_puback(client, pub->packet_id, 0)) ) {
@@ -2127,8 +2127,8 @@ int handle_incoming_traffic(struct mqtt_ng_client *client)
// in case we have property topic alias and we have topic we take over the string
// and add pointer to it into topic alias list
if (prop == NULL)
- mw_free(pub->topic);
- mw_free(pub->data);
+ freez(pub->topic);
+ freez(pub->data);
return MQTT_NG_CLIENT_WANT_WRITE;
case MQTT_CPT_DISCONNECT:
INFO ("Got MQTT DISCONNECT control packet from server. Reason code: %d", (int)client->parser.mqtt_packet.disconnect.reason_code);
@@ -2225,7 +2225,7 @@ int mqtt_ng_set_topic_alias(struct mqtt_ng_client *client, const char *topic)
return idx;
}
- alias = mw_malloc(sizeof(struct topic_alias_data));
+ alias = mallocz(sizeof(struct topic_alias_data));
idx = ++client->tx_topic_aliases.idx_assigned;
alias->idx = idx;
__atomic_store_n(&alias->usage_count, 0, __ATOMIC_SEQ_CST);
diff --git a/src/aclk/mqtt_websockets/mqtt_wss_client.c b/src/aclk/mqtt_websockets/mqtt_wss_client.c
index f5b4025d..2d231ef4 100644
--- a/src/aclk/mqtt_websockets/mqtt_wss_client.c
+++ b/src/aclk/mqtt_websockets/mqtt_wss_client.c
@@ -1,5 +1,4 @@
// SPDX-License-Identifier: GPL-3.0-only
-// Copyright (C) 2020 Timotej Šiškovič
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
@@ -19,9 +18,6 @@
#include <sys/socket.h>
#include <netinet/in.h>
-#include <arpa/inet.h>
-#include <netinet/tcp.h> //TCP_NODELAY
-#include <netdb.h>
#include <openssl/err.h>
#include <openssl/ssl.h>
@@ -107,8 +103,6 @@ struct mqtt_wss_client_struct {
int mqtt_keepalive;
- pthread_mutex_t pub_lock;
-
// signifies that we didn't write all MQTT wanted
// us to write during last cycle (e.g. due to buffer
// size) and thus we should arm POLLOUT
@@ -121,7 +115,7 @@ struct mqtt_wss_client_struct {
void (*msg_callback)(const char *, const void *, size_t, int);
void (*puback_callback)(uint16_t packet_id);
- pthread_mutex_t stat_lock;
+ SPINLOCK stat_lock;
struct mqtt_wss_stats stats;
#ifdef MQTT_WSS_DEBUG
@@ -173,14 +167,13 @@ mqtt_wss_client mqtt_wss_new(const char *log_prefix,
SSL_library_init();
SSL_load_error_strings();
- mqtt_wss_client client = mw_calloc(1, sizeof(struct mqtt_wss_client_struct));
+ mqtt_wss_client client = callocz(1, sizeof(struct mqtt_wss_client_struct));
if (!client) {
mws_error(log, "OOM alocating mqtt_wss_client");
goto fail;
}
- pthread_mutex_init(&client->pub_lock, NULL);
- pthread_mutex_init(&client->stat_lock, NULL);
+ spinlock_init(&client->stat_lock);
client->msg_callback = msg_callback;
client->puback_callback = puback_callback;
@@ -229,7 +222,7 @@ fail_3:
fail_2:
ws_client_destroy(client->ws_client);
fail_1:
- mw_free(client);
+ freez(client);
fail:
mqtt_wss_log_ctx_destroy(log);
return NULL;
@@ -253,12 +246,15 @@ void mqtt_wss_destroy(mqtt_wss_client client)
// as it "borrows" this pointer and might use it
if (client->target_host == client->host)
client->target_host = NULL;
+
if (client->target_host)
- mw_free(client->target_host);
+ freez(client->target_host);
+
if (client->host)
- mw_free(client->host);
- mw_free(client->proxy_passwd);
- mw_free(client->proxy_uname);
+ freez(client->host);
+
+ freez(client->proxy_passwd);
+ freez(client->proxy_uname);
if (client->ssl)
SSL_free(client->ssl);
@@ -269,11 +265,8 @@ void mqtt_wss_destroy(mqtt_wss_client client)
if (client->sockfd > 0)
close(client->sockfd);
- pthread_mutex_destroy(&client->pub_lock);
- pthread_mutex_destroy(&client->stat_lock);
-
mqtt_wss_log_ctx_destroy(client->log);
- mw_free(client);
+ freez(client);
}
static int cert_verify_callback(int preverify_ok, X509_STORE_CTX *ctx)
@@ -298,7 +291,7 @@ static int cert_verify_callback(int preverify_ok, X509_STORE_CTX *ctx)
mws_error(client->log, "verify error:num=%d:%s:depth=%d:%s", err,
X509_verify_cert_error_string(err), depth, err_str);
- mw_free(err_str);
+ freez(err_str);
}
if (!preverify_ok && err == X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT &&
@@ -362,14 +355,14 @@ static int http_parse_reply(mqtt_wss_client client, rbuf_t buf)
}
if (http_code != 200) {
- ptr = mw_malloc(idx + 1);
+ ptr = mallocz(idx + 1);
if (!ptr)
return 6;
rbuf_pop(buf, ptr, idx);
ptr[idx] = 0;
mws_error(client->log, "http_proxy returned error code %d \"%s\"", http_code, ptr);
- mw_free(ptr);
+ freez(ptr);
return 7;
}/* else
rbuf_bump_tail(buf, idx);*/
@@ -450,7 +443,7 @@ static int http_proxy_connect(mqtt_wss_client client)
if (client->proxy_uname) {
size_t creds_plain_len = strlen(client->proxy_uname) + strlen(client->proxy_passwd) + 2;
- char *creds_plain = mw_malloc(creds_plain_len);
+ char *creds_plain = mallocz(creds_plain_len);
if (!creds_plain) {
mws_error(client->log, "OOM creds_plain");
rc = 6;
@@ -460,9 +453,9 @@ static int http_proxy_connect(mqtt_wss_client client)
// OpenSSL encoder puts newline every 64 output bytes
// we remove those but during encoding we need that space in the buffer
creds_base64_len += (1+(creds_base64_len/64)) * strlen("\n");
- char *creds_base64 = mw_malloc(creds_base64_len + 1);
+ char *creds_base64 = mallocz(creds_base64_len + 1);
if (!creds_base64) {
- mw_free(creds_plain);
+ freez(creds_plain);
mws_error(client->log, "OOM creds_base64");
rc = 6;
goto cleanup;
@@ -475,12 +468,12 @@ static int http_proxy_connect(mqtt_wss_client client)
int b64_len;
base64_encode_helper((unsigned char*)creds_base64, &b64_len, (unsigned char*)creds_plain, strlen(creds_plain));
- mw_free(creds_plain);
+ freez(creds_plain);
r_buf_ptr = rbuf_get_linear_insert_range(r_buf, &r_buf_linear_insert_capacity);
snprintf(r_buf_ptr, r_buf_linear_insert_capacity,"Proxy-Authorization: Basic %s" HTTP_ENDLINE, creds_base64);
write(client->sockfd, r_buf_ptr, strlen(r_buf_ptr));
- mw_free(creds_base64);
+ freez(creds_base64);
}
write(client->sockfd, HTTP_ENDLINE, strlen(HTTP_ENDLINE));
@@ -523,15 +516,14 @@ cleanup:
return rc;
}
-int mqtt_wss_connect(mqtt_wss_client client, char *host, int port, struct mqtt_connect_params *mqtt_params, int ssl_flags, struct mqtt_wss_proxy *proxy)
+int mqtt_wss_connect(
+ mqtt_wss_client client,
+ char *host,
+ int port,
+ struct mqtt_connect_params *mqtt_params,
+ int ssl_flags,
+ struct mqtt_wss_proxy *proxy)
{
- struct sockaddr_in addr;
- memset(&addr, 0, sizeof(addr));
- addr.sin_family = AF_INET;
-
- struct hostent *he;
- struct in_addr **addr_list;
-
if (!mqtt_params) {
mws_error(client->log, "mqtt_params can't be null!");
return -1;
@@ -545,23 +537,35 @@ int mqtt_wss_connect(mqtt_wss_client client, char *host, int port, struct mqtt_c
if (client->target_host == client->host)
client->target_host = NULL;
+
if (client->target_host)
- mw_free(client->target_host);
+ freez(client->target_host);
+
if (client->host)
- mw_free(client->host);
+ freez(client->host);
+
+ if (client->proxy_uname) {
+ freez(client->proxy_uname);
+ client->proxy_uname = NULL;
+ }
+
+ if (client->proxy_passwd) {
+ freez(client->proxy_passwd);
+ client->proxy_passwd = NULL;
+ }
if (proxy && proxy->type != MQTT_WSS_DIRECT) {
- client->host = mw_strdup(proxy->host);
+ client->host = strdupz(proxy->host);
client->port = proxy->port;
- client->target_host = mw_strdup(host);
+ client->target_host = strdupz(host);
client->target_port = port;
client->proxy_type = proxy->type;
if (proxy->username)
- client->proxy_uname = mw_strdup(proxy->username);
+ client->proxy_uname = strdupz(proxy->username);
if (proxy->password)
- client->proxy_passwd = mw_strdup(proxy->password);
+ client->proxy_passwd = strdupz(proxy->password);
} else {
- client->host = mw_strdup(host);
+ client->host = strdupz(host);
client->port = port;
client->target_host = client->host;
client->target_port = port;
@@ -569,30 +573,19 @@ int mqtt_wss_connect(mqtt_wss_client client, char *host, int port, struct mqtt_c
client->ssl_flags = ssl_flags;
- //TODO gethostbyname -> getaddinfo
- // hstrerror -> gai_strerror
- if ((he = gethostbyname(client->host)) == NULL) {
- mws_error(client->log, "gethostbyname() error \"%s\"", hstrerror(h_errno));
- return -1;
- }
-
- addr_list = (struct in_addr **)he->h_addr_list;
- if(!addr_list[0]) {
- mws_error(client->log, "No IP addr resolved");
- return -1;
- }
- mws_debug(client->log, "Resolved IP: %s", inet_ntoa(*addr_list[0]));
- addr.sin_addr = *addr_list[0];
- addr.sin_port = htons(client->port);
-
if (client->sockfd > 0)
close(client->sockfd);
- client->sockfd = socket(AF_INET, SOCK_STREAM | DEFAULT_SOCKET_FLAGS, 0);
- if (client->sockfd < 0) {
- mws_error(client->log, "Couldn't create socket()");
- return -1;
+
+ char port_str[16];
+ snprintf(port_str, sizeof(port_str) -1, "%d", client->port);
+ int fd = connect_to_this_ip46(IPPROTO_TCP, SOCK_STREAM, client->host, 0, port_str, NULL);
+ if (fd < 0) {
+ mws_error(client->log, "Could not connect to remote endpoint \"%s\", port %d.\n", client->host, port);
+ return -3;
}
+ client->sockfd = fd;
+
#ifndef SOCK_CLOEXEC
int flags = fcntl(client->sockfd, F_GETFD);
if (flags != -1)
@@ -600,19 +593,10 @@ int mqtt_wss_connect(mqtt_wss_client client, char *host, int port, struct mqtt_c
#endif
int flag = 1;
- int result = setsockopt(client->sockfd,
- IPPROTO_TCP,
- TCP_NODELAY,
- &flag,
- sizeof(int));
+ int result = setsockopt(client->sockfd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(int));
if (result < 0)
mws_error(client->log, "Could not dissable NAGLE");
- if (connect(client->sockfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
- mws_error(client->log, "Could not connect to remote endpoint \"%s\", port %d.\n", client->host, client->port);
- return -3;
- }
-
client->poll_fds[POLLFD_SOCKET].fd = client->sockfd;
if (fcntl(client->sockfd, F_SETFL, fcntl(client->sockfd, F_GETFL, 0) | O_NONBLOCK) == -1) {
@@ -640,6 +624,7 @@ int mqtt_wss_connect(mqtt_wss_client client, char *host, int port, struct mqtt_c
// free SSL structs from possible previous connections
if (client->ssl)
SSL_free(client->ssl);
+
if (client->ssl_ctx)
SSL_CTX_free(client->ssl_ctx);
@@ -675,6 +660,7 @@ int mqtt_wss_connect(mqtt_wss_client client, char *host, int port, struct mqtt_c
mws_error(client->log, "SSL could not connect");
return -5;
}
+
if (result == -1) {
int ec = SSL_get_error(client->ssl, result);
if (ec != SSL_ERROR_WANT_READ && ec != SSL_ERROR_WANT_WRITE) {
@@ -693,14 +679,16 @@ int mqtt_wss_connect(mqtt_wss_client client, char *host, int port, struct mqtt_c
auth.username_free = NULL;
auth.password = (char*)mqtt_params->password;
auth.password_free = NULL;
+
struct mqtt_lwt_properties lwt;
lwt.will_topic = (char*)mqtt_params->will_topic;
lwt.will_topic_free = NULL;
lwt.will_message = (void*)mqtt_params->will_msg;
lwt.will_message_free = NULL; // TODO expose no copy version to API
lwt.will_message_size = mqtt_params->will_msg_len;
- lwt.will_qos = (mqtt_params->will_flags & MQTT_WSS_PUB_QOSMASK);
- lwt.will_retain = mqtt_params->will_flags & MQTT_WSS_PUB_RETAIN;
+ lwt.will_qos = (int) (mqtt_params->will_flags & MQTT_WSS_PUB_QOSMASK);
+ lwt.will_retain = (int) mqtt_params->will_flags & MQTT_WSS_PUB_RETAIN;
+
int ret = mqtt_ng_connect(client->mqtt, &auth, mqtt_params->will_msg ? &lwt : NULL, 1, client->mqtt_keepalive);
if (ret) {
mws_error(client->log, "Error generating MQTT connect");
@@ -955,9 +943,9 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
#ifdef DEBUG_ULTRA_VERBOSE
mws_debug(client->log, "SSL_Read: Read %d.", ret);
#endif
- pthread_mutex_lock(&client->stat_lock);
+ spinlock_lock(&client->stat_lock);
client->stats.bytes_rx += ret;
- pthread_mutex_unlock(&client->stat_lock);
+ spinlock_unlock(&client->stat_lock);
rbuf_bump_head(client->ws_client->buf_read, ret);
} else {
int errnobkp = errno;
@@ -1023,9 +1011,9 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
#ifdef DEBUG_ULTRA_VERBOSE
mws_debug(client->log, "SSL_Write: Written %d of avail %d.", ret, size);
#endif
- pthread_mutex_lock(&client->stat_lock);
+ spinlock_lock(&client->stat_lock);
client->stats.bytes_tx += ret;
- pthread_mutex_unlock(&client->stat_lock);
+ spinlock_unlock(&client->stat_lock);
rbuf_bump_tail(client->ws_client->buf_write, ret);
} else {
int errnobkp = errno;
@@ -1115,10 +1103,10 @@ int mqtt_wss_subscribe(mqtt_wss_client client, char *topic, int max_qos_level)
struct mqtt_wss_stats mqtt_wss_get_stats(mqtt_wss_client client)
{
struct mqtt_wss_stats current;
- pthread_mutex_lock(&client->stat_lock);
+ spinlock_lock(&client->stat_lock);
current = client->stats;
memset(&client->stats, 0, sizeof(client->stats));
- pthread_mutex_unlock(&client->stat_lock);
+ spinlock_unlock(&client->stat_lock);
mqtt_ng_get_stats(client->mqtt, &current.mqtt);
return current;
}
diff --git a/src/aclk/mqtt_websockets/mqtt_wss_log.c b/src/aclk/mqtt_websockets/mqtt_wss_log.c
index 5e606c12..e5da76fc 100644
--- a/src/aclk/mqtt_websockets/mqtt_wss_log.c
+++ b/src/aclk/mqtt_websockets/mqtt_wss_log.c
@@ -25,13 +25,13 @@ struct mqtt_wss_log_ctx {
#endif
mqtt_wss_log_ctx_t mqtt_wss_log_ctx_create(const char *ctx_prefix, mqtt_wss_log_callback_t log_callback)
{
- mqtt_wss_log_ctx_t ctx = mw_calloc(1, sizeof(struct mqtt_wss_log_ctx));
+ mqtt_wss_log_ctx_t ctx = callocz(1, sizeof(struct mqtt_wss_log_ctx));
if(!ctx)
return NULL;
if(log_callback) {
ctx->extern_log_fnc = log_callback;
- ctx->buffer = mw_calloc(1, LOG_BUFFER_SIZE);
+ ctx->buffer = callocz(1, LOG_BUFFER_SIZE);
if(!ctx->buffer)
goto cleanup;
@@ -60,15 +60,15 @@ mqtt_wss_log_ctx_t mqtt_wss_log_ctx_create(const char *ctx_prefix, mqtt_wss_log_
return ctx;
cleanup:
- mw_free(ctx);
+ freez(ctx);
return NULL;
}
void mqtt_wss_log_ctx_destroy(mqtt_wss_log_ctx_t ctx)
{
- mw_free(ctx->ctx_prefix);
- mw_free(ctx->buffer);
- mw_free(ctx);
+ freez(ctx->ctx_prefix);
+ freez(ctx->buffer);
+ freez(ctx);
}
static inline char severity_to_c(int severity)
diff --git a/src/aclk/mqtt_websockets/ws_client.c b/src/aclk/mqtt_websockets/ws_client.c
index 240e889c..a6b9b23f 100644
--- a/src/aclk/mqtt_websockets/ws_client.c
+++ b/src/aclk/mqtt_websockets/ws_client.c
@@ -53,7 +53,7 @@ ws_client *ws_client_new(size_t buf_size, char **host, mqtt_wss_log_ctx_t log)
if(!host)
return NULL;
- client = mw_calloc(1, sizeof(ws_client));
+ client = callocz(1, sizeof(ws_client));
if (!client)
return NULL;
@@ -87,7 +87,7 @@ cleanup_2:
cleanup_1:
rbuf_free(client->buf_read);
cleanup:
- mw_free(client);
+ freez(client);
return NULL;
}
@@ -99,7 +99,7 @@ void ws_client_free_headers(ws_client *client)
while (ptr) {
tmp = ptr;
ptr = ptr->next;
- mw_free(tmp);
+ freez(tmp);
}
client->hs.headers = NULL;
@@ -110,25 +110,28 @@ void ws_client_free_headers(ws_client *client)
void ws_client_destroy(ws_client *client)
{
ws_client_free_headers(client);
- mw_free(client->hs.nonce_reply);
- mw_free(client->hs.http_reply_msg);
+ freez(client->hs.nonce_reply);
+ freez(client->hs.http_reply_msg);
close(client->entropy_fd);
rbuf_free(client->buf_read);
rbuf_free(client->buf_write);
rbuf_free(client->buf_to_mqtt);
- mw_free(client);
+ freez(client);
}
void ws_client_reset(ws_client *client)
{
ws_client_free_headers(client);
- mw_free(client->hs.nonce_reply);
+ freez(client->hs.nonce_reply);
client->hs.nonce_reply = NULL;
- mw_free(client->hs.http_reply_msg);
+
+ freez(client->hs.http_reply_msg);
client->hs.http_reply_msg = NULL;
+
rbuf_flush(client->buf_read);
rbuf_flush(client->buf_write);
rbuf_flush(client->buf_to_mqtt);
+
client->state = WS_RAW;
client->hs.hdr_state = WS_HDR_HTTP;
client->rx.parse_state = WS_FIRST_2BYTES;
@@ -158,31 +161,11 @@ int ws_client_want_write(ws_client *client)
return rbuf_bytes_available(client->buf_write);
}
-#define RAND_SRC "/dev/urandom"
-static int ws_client_get_nonce(ws_client *client, char *dest, unsigned int size)
-{
- // we do not need crypto secure random here
- // it's just used for protocol negotiation
- int rd;
- int f = open(RAND_SRC, O_RDONLY | O_CLOEXEC);
- if (f < 0) {
- ERROR("Error opening \"%s\". Err: \"%s\"", RAND_SRC, strerror(errno));
- return -2;
- }
-
- if ((rd = read(f, dest, size)) > 0) {
- close(f);
- return rd;
- }
- close(f);
- return -1;
-}
-
#define WEBSOCKET_NONCE_SIZE 16
#define TEMP_BUF_SIZE 4096
int ws_client_start_handshake(ws_client *client)
{
- char nonce[WEBSOCKET_NONCE_SIZE];
+ nd_uuid_t nonce;
char nonce_b64[256];
char second[TEMP_BUF_SIZE];
unsigned int md_len;
@@ -190,16 +173,15 @@ int ws_client_start_handshake(ws_client *client)
EVP_MD_CTX *md_ctx;
const EVP_MD *md;
- if(!*client->host) {
+ if(!client->host || !*client->host) {
ERROR("Hostname has not been set. We should not be able to come here!");
return 1;
}
- ws_client_get_nonce(client, nonce, WEBSOCKET_NONCE_SIZE);
+ uuid_generate_random(nonce);
EVP_EncodeBlock((unsigned char *)nonce_b64, (const unsigned char *)nonce, WEBSOCKET_NONCE_SIZE);
- snprintf(second, TEMP_BUF_SIZE, websocket_upgrage_hdr,
- *client->host,
- nonce_b64);
+ snprintf(second, TEMP_BUF_SIZE, websocket_upgrage_hdr, *client->host, nonce_b64);
+
if(rbuf_bytes_free(client->buf_write) < strlen(second)) {
ERROR("Write buffer capacity too low.");
return 1;
@@ -236,10 +218,10 @@ int ws_client_start_handshake(ws_client *client)
EVP_DigestUpdate(md_ctx, second, strlen(second));
EVP_DigestFinal_ex(md_ctx, digest, &md_len);
- EVP_EncodeBlock((unsigned char *)nonce_b64, digest, md_len);
+ EVP_EncodeBlock((unsigned char *)nonce_b64, digest, (int) md_len);
- mw_free(client->hs.nonce_reply);
- client->hs.nonce_reply = mw_strdup(nonce_b64);
+ freez(client->hs.nonce_reply);
+ client->hs.nonce_reply = strdupz(nonce_b64);
OPENSSL_free(digest);
@@ -263,7 +245,7 @@ int ws_client_start_handshake(ws_client *client)
if (rbuf_bytes_available(client->buf_read) < x) \
return WS_CLIENT_NEED_MORE_BYTES;
-#define MAX_HTTP_LINE_LENGTH 1024*4
+#define MAX_HTTP_LINE_LENGTH (1024 * 4)
#define HTTP_SC_LENGTH 4 // "XXX " http status code as C string
#define WS_CLIENT_HTTP_HDR "HTTP/1.1 "
#define WS_CONN_ACCEPT "sec-websocket-accept"
@@ -278,11 +260,11 @@ int ws_client_start_handshake(ws_client *client)
#error "Buffer too small"
#endif
-#define HTTP_HDR_LINE_CHECK_LIMIT(x) if ((x) >= MAX_HTTP_LINE_LENGTH) \
-{ \
- ERROR("HTTP line received is too long. Maximum is %d", MAX_HTTP_LINE_LENGTH); \
- return WS_CLIENT_PROTOCOL_ERROR; \
-}
+#define HTTP_HDR_LINE_CHECK_LIMIT(x) \
+ if ((x) >= MAX_HTTP_LINE_LENGTH) { \
+ ERROR("HTTP line received is too long. Maximum is %d", MAX_HTTP_LINE_LENGTH); \
+ return WS_CLIENT_PROTOCOL_ERROR; \
+ }
int ws_client_parse_handshake_resp(ws_client *client)
{
@@ -290,6 +272,7 @@ int ws_client_parse_handshake_resp(ws_client *client)
int idx_crlf, idx_sep;
char *ptr;
size_t bytes;
+
switch (client->hs.hdr_state) {
case WS_HDR_HTTP:
BUF_READ_CHECK_AT_LEAST(strlen(WS_CLIENT_HTTP_HDR))
@@ -297,6 +280,7 @@ int ws_client_parse_handshake_resp(ws_client *client)
rbuf_bump_tail(client->buf_read, strlen(WS_CLIENT_HTTP_HDR));
client->hs.hdr_state = WS_HDR_RC;
break;
+
case WS_HDR_RC:
BUF_READ_CHECK_AT_LEAST(HTTP_SC_LENGTH); // "XXX " http return code
rbuf_pop(client->buf_read, buf, HTTP_SC_LENGTH);
@@ -312,6 +296,7 @@ int ws_client_parse_handshake_resp(ws_client *client)
}
client->hs.hdr_state = WS_HDR_ENDLINE;
break;
+
case WS_HDR_ENDLINE:
ptr = rbuf_find_bytes(client->buf_read, WS_HTTP_NEWLINE, strlen(WS_HTTP_NEWLINE), &idx_crlf);
if (!ptr) {
@@ -321,12 +306,13 @@ int ws_client_parse_handshake_resp(ws_client *client)
}
HTTP_HDR_LINE_CHECK_LIMIT(idx_crlf);
- client->hs.http_reply_msg = mw_malloc(idx_crlf+1);
+ client->hs.http_reply_msg = mallocz(idx_crlf+1);
rbuf_pop(client->buf_read, client->hs.http_reply_msg, idx_crlf);
client->hs.http_reply_msg[idx_crlf] = 0;
rbuf_bump_tail(client->buf_read, strlen(WS_HTTP_NEWLINE));
client->hs.hdr_state = WS_HDR_PARSE_HEADERS;
break;
+
case WS_HDR_PARSE_HEADERS:
ptr = rbuf_find_bytes(client->buf_read, WS_HTTP_NEWLINE, strlen(WS_HTTP_NEWLINE), &idx_crlf);
if (!ptr) {
@@ -357,7 +343,7 @@ int ws_client_parse_handshake_resp(ws_client *client)
return WS_CLIENT_PROTOCOL_ERROR;
}
- struct http_header *hdr = mw_calloc(1, sizeof(struct http_header) + idx_crlf); //idx_crlf includes ": " that will be used as 2 \0 bytes
+ struct http_header *hdr = callocz(1, sizeof(struct http_header) + idx_crlf); //idx_crlf includes ": " that will be used as 2 \0 bytes
hdr->key = ((char*)hdr) + sizeof(struct http_header);
hdr->value = hdr->key + idx_sep + 1;
@@ -384,6 +370,7 @@ int ws_client_parse_handshake_resp(ws_client *client)
}
break;
+
case WS_HDR_PARSE_DONE:
if (!client->hs.nonce_matched) {
ERROR("Missing " WS_CONN_ACCEPT " header");
@@ -398,6 +385,7 @@ int ws_client_parse_handshake_resp(ws_client *client)
client->hs.hdr_state = WS_HDR_ALL_DONE;
INFO("Websocket Connection Accepted By Server");
return WS_CLIENT_PARSING_DONE;
+
case WS_HDR_ALL_DONE:
FATAL("This is error we should never come here!");
return WS_CLIENT_PROTOCOL_ERROR;
@@ -642,7 +630,7 @@ int ws_client_process_rx_ws(ws_client *client)
break;
case WS_PAYLOAD_CONNECTION_CLOSE_MSG:
if (!client->rx.specific_data.op_close.reason)
- client->rx.specific_data.op_close.reason = mw_malloc(client->rx.payload_length + 1);
+ client->rx.specific_data.op_close.reason = mallocz(client->rx.payload_length + 1);
while (client->rx.payload_processed < client->rx.payload_length) {
if (!rbuf_bytes_available(client->buf_read))
@@ -655,7 +643,7 @@ int ws_client_process_rx_ws(ws_client *client)
INFO("WebSocket server closed the connection with EC=%d and reason \"%s\"",
client->rx.specific_data.op_close.ec,
client->rx.specific_data.op_close.reason);
- mw_free(client->rx.specific_data.op_close.reason);
+ freez(client->rx.specific_data.op_close.reason);
client->rx.specific_data.op_close.reason = NULL;
client->rx.parse_state = WS_PACKET_DONE;
break;
@@ -672,7 +660,7 @@ int ws_client_process_rx_ws(ws_client *client)
return WS_CLIENT_INTERNAL_ERROR;
}
BUF_READ_CHECK_AT_LEAST(client->rx.payload_length);
- client->rx.specific_data.ping_msg = mw_malloc(client->rx.payload_length);
+ client->rx.specific_data.ping_msg = mallocz(client->rx.payload_length);
rbuf_pop(client->buf_read, client->rx.specific_data.ping_msg, client->rx.payload_length);
// TODO schedule this instead of sending right away
// then attempt to send as soon as buffer space clears up
diff --git a/src/aclk/schema-wrappers/alarm_stream.cc b/src/aclk/schema-wrappers/alarm_stream.cc
index 29d80e39..c0b41bb0 100644
--- a/src/aclk/schema-wrappers/alarm_stream.cc
+++ b/src/aclk/schema-wrappers/alarm_stream.cc
@@ -22,6 +22,7 @@ struct start_alarm_streaming parse_start_alarm_streaming(const char *data, size_
ret.node_id = strdupz(msg.node_id().c_str());
ret.resets = msg.resets();
+ ret.version = msg.version();
return ret;
}
@@ -37,6 +38,7 @@ struct send_alarm_checkpoint parse_send_alarm_checkpoint(const char *data, size_
ret.node_id = strdupz(msg.node_id().c_str());
ret.claim_id = strdupz(msg.claim_id().c_str());
+ ret.version = msg.version();
return ret;
}
@@ -118,6 +120,7 @@ static void fill_alarm_log_entry(struct alarm_log_entry *data, AlarmLogEntry *pr
proto->set_transition_id(data->transition_id);
proto->set_chart_name(data->chart_name);
proto->set_summary(data->summary);
+ proto->set_alert_version(data->version);
}
char *generate_alarm_log_entry(size_t *len, struct alarm_log_entry *data)
diff --git a/src/aclk/schema-wrappers/alarm_stream.h b/src/aclk/schema-wrappers/alarm_stream.h
index 3c81ff44..6e1936b0 100644
--- a/src/aclk/schema-wrappers/alarm_stream.h
+++ b/src/aclk/schema-wrappers/alarm_stream.h
@@ -13,6 +13,7 @@ extern "C" {
struct start_alarm_streaming {
char *node_id;
+ uint64_t version;
bool resets;
};
@@ -36,8 +37,6 @@ struct alarm_log_entry {
char *name;
char *family;
- uint64_t batch_id;
- uint64_t sequence_id;
uint64_t when;
char *config_hash;
@@ -76,13 +75,22 @@ struct alarm_log_entry {
char *chart_name;
uint64_t event_id;
+ uint64_t version;
char *transition_id;
char *summary;
+
+ // local book keeping
+ int64_t health_log_id;
+ int64_t alarm_id;
+ int64_t unique_id;
+ int64_t sequence_id;
};
struct send_alarm_checkpoint {
char *node_id;
char *claim_id;
+ uint64_t version;
+ uint64_t when_end;
};
struct alarm_checkpoint {