diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-08-26 08:15:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-08-26 08:15:20 +0000 |
commit | 87d772a7d708fec12f48cd8adc0dedff6e1025da (patch) | |
tree | 1fee344c64cc3f43074a01981e21126c8482a522 /src/aclk | |
parent | Adding upstream version 1.46.3. (diff) | |
download | netdata-87d772a7d708fec12f48cd8adc0dedff6e1025da.tar.xz netdata-87d772a7d708fec12f48cd8adc0dedff6e1025da.zip |
Adding upstream version 1.47.0.upstream/1.47.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/aclk')
-rw-r--r-- | src/aclk/aclk.c | 63 | ||||
-rw-r--r-- | src/aclk/aclk_capas.c | 4 | ||||
-rw-r--r-- | src/aclk/aclk_rx_msgs.c | 12 | ||||
-rw-r--r-- | src/aclk/helpers/mqtt_wss_pal.h | 6 | ||||
-rw-r--r-- | src/aclk/https_client.c | 2 | ||||
-rw-r--r-- | src/aclk/mqtt_websockets/mqtt_ng.c | 56 | ||||
-rw-r--r-- | src/aclk/mqtt_websockets/mqtt_wss_client.c | 148 | ||||
-rw-r--r-- | src/aclk/mqtt_websockets/mqtt_wss_log.c | 12 | ||||
-rw-r--r-- | src/aclk/mqtt_websockets/ws_client.c | 84 | ||||
-rw-r--r-- | src/aclk/schema-wrappers/alarm_stream.cc | 3 | ||||
-rw-r--r-- | src/aclk/schema-wrappers/alarm_stream.h | 12 |
11 files changed, 185 insertions, 217 deletions
diff --git a/src/aclk/aclk.c b/src/aclk/aclk.c index 99174549..627edfc9 100644 --- a/src/aclk/aclk.c +++ b/src/aclk/aclk.c @@ -52,9 +52,9 @@ time_t aclk_block_until = 0; #ifdef ENABLE_ACLK mqtt_wss_client mqttwss_client; -netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; -#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex) -#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex) +//netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; +//#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex) +//#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex) struct aclk_shared_state aclk_shared_state = { .mqtt_shutdown_msg_id = -1, @@ -1058,30 +1058,24 @@ void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, con static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host) { - struct proto_alert_status status; - memset(&status, 0, sizeof(status)); - if (get_proto_alert_status(host, &status)) { - buffer_strcat(wb, "\nFailed to get alert streaming status for this host"); + struct aclk_sync_cfg_t *wc = host->aclk_config; + if (!wc) return; - } + buffer_sprintf(wb, "\n\t\tUpdates: %d" - "\n\t\tPending Min Seq ID: %"PRIu64 - "\n\t\tPending Max Seq ID: %"PRIu64 - "\n\t\tLast Submitted Seq ID: %"PRIu64, - status.alert_updates, - status.pending_min_sequence_id, - status.pending_max_sequence_id, - status.last_submitted_sequence_id + "\n\t\tCheckpoints: %d" + "\n\t\tAlert count: %d" + "\n\t\tAlert snapshot count: %d", + wc->stream_alerts, + wc->checkpoint_count, + wc->alert_count, + wc->snapshot_count ); } -#endif /* ENABLE_ACLK */ char *aclk_state(void) { -#ifndef ENABLE_ACLK - return strdupz("ACLK Available: No"); -#else BUFFER *wb = buffer_create(1024, &netdata_buffers_statistics.buffers_aclk); struct tm *tmptr, tmbuf; char *ret; @@ -1163,28 +1157,25 @@ char *aclk_state(void) ret = strdupz(buffer_tostring(wb)); buffer_free(wb); return ret; -#endif /* ENABLE_ACLK */ } -#ifdef ENABLE_ACLK static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host) { - struct proto_alert_status status; - memset(&status, 0, sizeof(status)); - if (get_proto_alert_status(host, &status)) + struct aclk_sync_cfg_t *wc = host->aclk_config; + if (!wc) return; - json_object *tmp = json_object_new_int(status.alert_updates); + json_object *tmp = json_object_new_int(wc->stream_alerts); json_object_object_add(obj, "updates", tmp); - tmp = json_object_new_int(status.pending_min_sequence_id); - json_object_object_add(obj, "pending-min-seq-id", tmp); + tmp = json_object_new_int(wc->checkpoint_count); + json_object_object_add(obj, "checkpoint-count", tmp); - tmp = json_object_new_int(status.pending_max_sequence_id); - json_object_object_add(obj, "pending-max-seq-id", tmp); + tmp = json_object_new_int(wc->alert_count); + json_object_object_add(obj, "alert-count", tmp); - tmp = json_object_new_int(status.last_submitted_sequence_id); - json_object_object_add(obj, "last-submitted-seq-id", tmp); + tmp = json_object_new_int(wc->snapshot_count); + json_object_object_add(obj, "alert-snapshot-count", tmp); } static json_object *timestamp_to_json(const time_t *t) @@ -1197,13 +1188,9 @@ static json_object *timestamp_to_json(const time_t *t) } return NULL; } -#endif /* ENABLE_ACLK */ char *aclk_state_json(void) { -#ifndef ENABLE_ACLK - return strdupz("{\"aclk-available\":false}"); -#else json_object *tmp, *grp, *msg = json_object_new_object(); tmp = json_object_new_boolean(1); @@ -1313,8 +1300,8 @@ char *aclk_state_json(void) char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN)); json_object_put(msg); return str; -#endif /* ENABLE_ACLK */ } +#endif /* ENABLE_ACLK */ void add_aclk_host_labels(void) { RRDLABELS *labels = localhost->rrdlabels; @@ -1347,7 +1334,7 @@ void add_aclk_host_labels(void) { void aclk_queue_node_info(RRDHOST *host, bool immediate) { - struct aclk_sync_cfg_t *wc = host->aclk_config; - if (likely(wc)) + struct aclk_sync_cfg_t *wc = host->aclk_config; + if (wc) wc->node_info_send_time = (host == localhost || immediate) ? 1 : now_realtime_sec(); } diff --git a/src/aclk/aclk_capas.c b/src/aclk/aclk_capas.c index 00102ad4..0f7870fd 100644 --- a/src/aclk/aclk_capas.c +++ b/src/aclk/aclk_capas.c @@ -16,7 +16,7 @@ const struct capability *aclk_get_agent_capas() { .name = "ctx", .version = 1, .enabled = 1 }, { .name = "funcs", .version = 1, .enabled = 1 }, { .name = "http_api_v2", .version = HTTP_API_V2_VERSION, .enabled = 1 }, - { .name = "health", .version = 1, .enabled = 0 }, // index 7, below + { .name = "health", .version = 2, .enabled = 0 }, // index 7, below { .name = "req_cancel", .version = 1, .enabled = 1 }, { .name = "dyncfg", .version = 2, .enabled = 1 }, { .name = NULL, .version = 0, .enabled = 0 } @@ -46,7 +46,7 @@ struct capability *aclk_get_node_instance_capas(RRDHOST *host) { .name = "ctx", .version = 1, .enabled = 1 }, { .name = "funcs", .version = functions ? 1 : 0, .enabled = functions ? 1 : 0 }, { .name = "http_api_v2", .version = HTTP_API_V2_VERSION, .enabled = 1 }, - { .name = "health", .version = 1, .enabled = host->health.health_enabled }, + { .name = "health", .version = 2, .enabled = host->health.health_enabled }, { .name = "req_cancel", .version = 1, .enabled = 1 }, { .name = "dyncfg", .version = 2, .enabled = dyncfg }, { .name = NULL, .version = 0, .enabled = 0 } diff --git a/src/aclk/aclk_rx_msgs.c b/src/aclk/aclk_rx_msgs.c index 60e42192..8db8e3f1 100644 --- a/src/aclk/aclk_rx_msgs.c +++ b/src/aclk/aclk_rx_msgs.c @@ -106,13 +106,13 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur else if(strncmp(payload, "DELETE /", 8) == 0) start = payload + 7; else { - errno = 0; + errno_clear(); netdata_log_error("Only accepting requests that start with GET, POST, PUT, DELETE from CLOUD."); return 1; } if(!(end = strstr(payload, HTTP_1_1 HTTP_ENDL))) { - errno = 0; + errno_clear(); netdata_log_error("Doesn't look like HTTP GET request."); return 1; } @@ -127,7 +127,7 @@ static int aclk_handle_cloud_http_request_v2(struct aclk_request *cloud_to_agent { aclk_query_t query; - errno = 0; + errno_clear(); if (cloud_to_agent->version < ACLK_V_COMPRESSION) { netdata_log_error( "This handler cannot reply to request with version older than %d, received %d.", @@ -347,7 +347,7 @@ int start_alarm_streaming(const char *msg, size_t msg_len) netdata_log_error("Error parsing StartAlarmStreaming"); return 1; } - aclk_start_alert_streaming(res.node_id, res.resets); + aclk_start_alert_streaming(res.node_id, res.version); freez(res.node_id); return 0; } @@ -361,7 +361,7 @@ int send_alarm_checkpoint(const char *msg, size_t msg_len) freez(sac.claim_id); return 1; } - aclk_send_alarm_checkpoint(sac.node_id, sac.claim_id); + aclk_alert_version_check(sac.node_id, sac.claim_id, sac.version); freez(sac.node_id); freez(sac.claim_id); return 0; @@ -375,7 +375,7 @@ int send_alarm_configuration(const char *msg, size_t msg_len) freez(config_hash); return 1; } - aclk_send_alarm_configuration(config_hash); + aclk_send_alert_configuration(config_hash); freez(config_hash); return 0; } diff --git a/src/aclk/helpers/mqtt_wss_pal.h b/src/aclk/helpers/mqtt_wss_pal.h index 5c89f8bb..fe1aacf4 100644 --- a/src/aclk/helpers/mqtt_wss_pal.h +++ b/src/aclk/helpers/mqtt_wss_pal.h @@ -10,10 +10,4 @@ #undef OPENSSL_VERSION_110 #undef OPENSSL_VERSION_111 -#define mw_malloc(...) mallocz(__VA_ARGS__) -#define mw_calloc(...) callocz(__VA_ARGS__) -#define mw_free(...) freez(__VA_ARGS__) -#define mw_strdup(...) strdupz(__VA_ARGS__) -#define mw_realloc(...) reallocz(__VA_ARGS__) - #endif /* MQTT_WSS_PAL_H */ diff --git a/src/aclk/https_client.c b/src/aclk/https_client.c index 2bc768f2..8c44f13e 100644 --- a/src/aclk/https_client.c +++ b/src/aclk/https_client.c @@ -696,7 +696,7 @@ int https_request(https_req_t *request, https_req_response_t *response) { goto exit_CTX; } - if (!SSL_set_tlsext_host_name(ctx->ssl, connect_host)) { + if (!SSL_set_tlsext_host_name(ctx->ssl, request->host)) { netdata_log_error("Error setting TLS SNI host"); goto exit_CTX; } diff --git a/src/aclk/mqtt_websockets/mqtt_ng.c b/src/aclk/mqtt_websockets/mqtt_ng.c index f570fde7..8ad6bd5c 100644 --- a/src/aclk/mqtt_websockets/mqtt_ng.c +++ b/src/aclk/mqtt_websockets/mqtt_ng.c @@ -423,7 +423,7 @@ static void buffer_frag_free_data(struct buffer_fragment *frag) if ( frag->flags & BUFFER_FRAG_DATA_EXTERNAL && frag->data != NULL) { switch (ptr2memory_mode(frag->free_fnc)) { case MEMCPY: - mw_free(frag->data); + freez(frag->data); break; case EXTERNAL_FREE_AFTER_USE: frag->free_fnc(frag->data); @@ -563,7 +563,7 @@ static int transaction_buffer_grow(struct transaction_buffer *buf, mqtt_wss_log_ if (buf->hdr_buffer.size > max) buf->hdr_buffer.size = max; - void *ret = mw_realloc(buf->hdr_buffer.data, buf->hdr_buffer.size); + void *ret = reallocz(buf->hdr_buffer.data, buf->hdr_buffer.size); if (ret == NULL) { mws_warn(log_ctx, "Buffer growth failed (realloc)"); return 1; @@ -581,7 +581,7 @@ inline static int transaction_buffer_init(struct transaction_buffer *to_init, si pthread_mutex_init(&to_init->mutex, NULL); to_init->hdr_buffer.size = size; - to_init->hdr_buffer.data = mw_malloc(size); + to_init->hdr_buffer.data = mallocz(size); if (to_init->hdr_buffer.data == NULL) return 1; @@ -594,7 +594,7 @@ static void transaction_buffer_destroy(struct transaction_buffer *to_init) { buffer_purge(&to_init->hdr_buffer); pthread_mutex_destroy(&to_init->mutex); - mw_free(to_init->hdr_buffer.data); + freez(to_init->hdr_buffer.data); } // Creates transaction @@ -628,7 +628,7 @@ void transaction_buffer_transaction_rollback(struct transaction_buffer *buf, str #define RX_ALIASES_INITIALIZE() c_rhash_new(UINT16_MAX >> 8) struct mqtt_ng_client *mqtt_ng_init(struct mqtt_ng_init *settings) { - struct mqtt_ng_client *client = mw_calloc(1, sizeof(struct mqtt_ng_client)); + struct mqtt_ng_client *client = callocz(1, sizeof(struct mqtt_ng_client)); if (client == NULL) return NULL; @@ -672,7 +672,7 @@ err_free_rx_alias: err_free_trx_buf: transaction_buffer_destroy(&client->main_buffer); err_free_client: - mw_free(client); + freez(client); return NULL; } @@ -688,7 +688,7 @@ static void mqtt_ng_destroy_rx_alias_hash(c_rhash hash) void *to_free; while(!c_rhash_iter_uint64_keys(hash, &i, &stored_key)) { c_rhash_get_ptr_by_uint64(hash, stored_key, &to_free); - mw_free(to_free); + freez(to_free); } c_rhash_destroy(hash); } @@ -700,7 +700,7 @@ static void mqtt_ng_destroy_tx_alias_hash(c_rhash hash) void *to_free; while(!c_rhash_iter_str_keys(hash, &i, &stored_key)) { c_rhash_get_ptr_by_str(hash, stored_key, &to_free); - mw_free(to_free); + freez(to_free); } c_rhash_destroy(hash); } @@ -714,7 +714,7 @@ void mqtt_ng_destroy(struct mqtt_ng_client *client) pthread_rwlock_destroy(&client->tx_topic_aliases.rwlock); mqtt_ng_destroy_rx_alias_hash(client->rx_aliases); - mw_free(client); + freez(client); } int frag_set_external_data(mqtt_wss_log_ctx_t log, struct buffer_fragment *frag, void *data, size_t data_len, free_fnc_t data_free_fnc) @@ -730,7 +730,7 @@ int frag_set_external_data(mqtt_wss_log_ctx_t log, struct buffer_fragment *frag, switch (ptr2memory_mode(data_free_fnc)) { case MEMCPY: - frag->data = mw_malloc(data_len); + frag->data = mallocz(data_len); if (frag->data == NULL) { mws_error(log, UNIT_LOG_PREFIX "OOM while malloc @_optimized_add"); return 1; @@ -1408,12 +1408,12 @@ static void mqtt_properties_parser_ctx_reset(struct mqtt_properties_parser_ctx * struct mqtt_property *f = ctx->head; ctx->head = ctx->head->next; if (f->type == MQTT_TYPE_STR || f->type == MQTT_TYPE_STR_PAIR) - mw_free(f->data.strings[0]); + freez(f->data.strings[0]); if (f->type == MQTT_TYPE_STR_PAIR) - mw_free(f->data.strings[1]); + freez(f->data.strings[1]); if (f->type == MQTT_TYPE_BIN) - mw_free(f->data.bindata); - mw_free(f); + freez(f->data.bindata); + freez(f); } ctx->tail = NULL; ctx->properties_length = 0; @@ -1498,7 +1498,7 @@ static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t return rc; case PROPERTY_CREATE: BUF_READ_CHECK_AT_LEAST(data, 1); - struct mqtt_property *prop = mw_calloc(1, sizeof(struct mqtt_property)); + struct mqtt_property *prop = callocz(1, sizeof(struct mqtt_property)); if (ctx->head == NULL) { ctx->head = prop; ctx->tail = prop; @@ -1558,7 +1558,7 @@ static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t break; case PROPERTY_TYPE_STR: BUF_READ_CHECK_AT_LEAST(data, ctx->tail->bindata_len); - ctx->tail->data.strings[ctx->str_idx] = mw_malloc(ctx->tail->bindata_len + 1); + ctx->tail->data.strings[ctx->str_idx] = mallocz(ctx->tail->bindata_len + 1); rbuf_pop(data, ctx->tail->data.strings[ctx->str_idx], ctx->tail->bindata_len); ctx->tail->data.strings[ctx->str_idx][ctx->tail->bindata_len] = 0; ctx->str_idx++; @@ -1571,7 +1571,7 @@ static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t break; case PROPERTY_TYPE_BIN: BUF_READ_CHECK_AT_LEAST(data, ctx->tail->bindata_len); - ctx->tail->data.bindata = mw_malloc(ctx->tail->bindata_len); + ctx->tail->data.bindata = mallocz(ctx->tail->bindata_len); rbuf_pop(data, ctx->tail->data.bindata, ctx->tail->bindata_len); ctx->bytes_consumed += ctx->tail->bindata_len; ctx->state = PROPERTY_NEXT; @@ -1721,7 +1721,7 @@ static int parse_suback_varhdr(struct mqtt_ng_client *client) return rc; parser->mqtt_parsed_len += parser->properties_parser.bytes_consumed; suback->reason_code_count = parser->mqtt_fixed_hdr_remaining_length - parser->mqtt_parsed_len; - suback->reason_codes = mw_calloc(suback->reason_code_count, sizeof(*suback->reason_codes)); + suback->reason_codes = callocz(suback->reason_code_count, sizeof(*suback->reason_codes)); suback->reason_codes_pending = suback->reason_code_count; parser->varhdr_state = MQTT_PARSE_REASONCODES; /* FALLTHROUGH */ @@ -1760,7 +1760,7 @@ static int parse_publish_varhdr(struct mqtt_ng_client *client) parser->varhdr_state = MQTT_PARSE_VARHDR_POST_TOPICNAME; break; } - publish->topic = mw_calloc(1, publish->topic_len + 1 /* add 0x00 */); + publish->topic = callocz(1, publish->topic_len + 1 /* add 0x00 */); if (publish->topic == NULL) return MQTT_NG_CLIENT_OOM; parser->varhdr_state = MQTT_PARSE_VARHDR_TOPICNAME; @@ -1796,7 +1796,7 @@ static int parse_publish_varhdr(struct mqtt_ng_client *client) /* FALLTHROUGH */ case MQTT_PARSE_PAYLOAD: if (parser->mqtt_fixed_hdr_remaining_length < parser->mqtt_parsed_len) { - mw_free(publish->topic); + freez(publish->topic); publish->topic = NULL; ERROR("Error parsing PUBLISH message"); return MQTT_NG_CLIENT_PROTOCOL_ERROR; @@ -1808,9 +1808,9 @@ static int parse_publish_varhdr(struct mqtt_ng_client *client) } BUF_READ_CHECK_AT_LEAST(parser->received_data, publish->data_len); - publish->data = mw_malloc(publish->data_len); + publish->data = mallocz(publish->data_len); if (publish->data == NULL) { - mw_free(publish->topic); + freez(publish->topic); publish->topic = NULL; return MQTT_NG_CLIENT_OOM; } @@ -1867,7 +1867,7 @@ static int parse_data(struct mqtt_ng_client *client) case MQTT_CPT_SUBACK: rc = parse_suback_varhdr(client); if (rc != MQTT_NG_CLIENT_NEED_MORE_BYTES && rc != MQTT_NG_CLIENT_OK_CALL_AGAIN) { - mw_free(parser->mqtt_packet.suback.reason_codes); + freez(parser->mqtt_packet.suback.reason_codes); } if (rc == MQTT_NG_CLIENT_PARSE_DONE) { parser->state = MQTT_PARSE_MQTT_PACKET_DONE; @@ -2096,8 +2096,8 @@ int handle_incoming_traffic(struct mqtt_ng_client *client) #endif pub = &client->parser.mqtt_packet.publish; if (pub->qos > 1) { - mw_free(pub->topic); - mw_free(pub->data); + freez(pub->topic); + freez(pub->data); return MQTT_NG_CLIENT_NOT_IMPL_YET; } if ( pub->qos == 1 && (rc = mqtt_ng_puback(client, pub->packet_id, 0)) ) { @@ -2127,8 +2127,8 @@ int handle_incoming_traffic(struct mqtt_ng_client *client) // in case we have property topic alias and we have topic we take over the string // and add pointer to it into topic alias list if (prop == NULL) - mw_free(pub->topic); - mw_free(pub->data); + freez(pub->topic); + freez(pub->data); return MQTT_NG_CLIENT_WANT_WRITE; case MQTT_CPT_DISCONNECT: INFO ("Got MQTT DISCONNECT control packet from server. Reason code: %d", (int)client->parser.mqtt_packet.disconnect.reason_code); @@ -2225,7 +2225,7 @@ int mqtt_ng_set_topic_alias(struct mqtt_ng_client *client, const char *topic) return idx; } - alias = mw_malloc(sizeof(struct topic_alias_data)); + alias = mallocz(sizeof(struct topic_alias_data)); idx = ++client->tx_topic_aliases.idx_assigned; alias->idx = idx; __atomic_store_n(&alias->usage_count, 0, __ATOMIC_SEQ_CST); diff --git a/src/aclk/mqtt_websockets/mqtt_wss_client.c b/src/aclk/mqtt_websockets/mqtt_wss_client.c index f5b4025d..2d231ef4 100644 --- a/src/aclk/mqtt_websockets/mqtt_wss_client.c +++ b/src/aclk/mqtt_websockets/mqtt_wss_client.c @@ -1,5 +1,4 @@ // SPDX-License-Identifier: GPL-3.0-only -// Copyright (C) 2020 Timotej Šiškovič #ifndef _GNU_SOURCE #define _GNU_SOURCE @@ -19,9 +18,6 @@ #include <sys/socket.h> #include <netinet/in.h> -#include <arpa/inet.h> -#include <netinet/tcp.h> //TCP_NODELAY -#include <netdb.h> #include <openssl/err.h> #include <openssl/ssl.h> @@ -107,8 +103,6 @@ struct mqtt_wss_client_struct { int mqtt_keepalive; - pthread_mutex_t pub_lock; - // signifies that we didn't write all MQTT wanted // us to write during last cycle (e.g. due to buffer // size) and thus we should arm POLLOUT @@ -121,7 +115,7 @@ struct mqtt_wss_client_struct { void (*msg_callback)(const char *, const void *, size_t, int); void (*puback_callback)(uint16_t packet_id); - pthread_mutex_t stat_lock; + SPINLOCK stat_lock; struct mqtt_wss_stats stats; #ifdef MQTT_WSS_DEBUG @@ -173,14 +167,13 @@ mqtt_wss_client mqtt_wss_new(const char *log_prefix, SSL_library_init(); SSL_load_error_strings(); - mqtt_wss_client client = mw_calloc(1, sizeof(struct mqtt_wss_client_struct)); + mqtt_wss_client client = callocz(1, sizeof(struct mqtt_wss_client_struct)); if (!client) { mws_error(log, "OOM alocating mqtt_wss_client"); goto fail; } - pthread_mutex_init(&client->pub_lock, NULL); - pthread_mutex_init(&client->stat_lock, NULL); + spinlock_init(&client->stat_lock); client->msg_callback = msg_callback; client->puback_callback = puback_callback; @@ -229,7 +222,7 @@ fail_3: fail_2: ws_client_destroy(client->ws_client); fail_1: - mw_free(client); + freez(client); fail: mqtt_wss_log_ctx_destroy(log); return NULL; @@ -253,12 +246,15 @@ void mqtt_wss_destroy(mqtt_wss_client client) // as it "borrows" this pointer and might use it if (client->target_host == client->host) client->target_host = NULL; + if (client->target_host) - mw_free(client->target_host); + freez(client->target_host); + if (client->host) - mw_free(client->host); - mw_free(client->proxy_passwd); - mw_free(client->proxy_uname); + freez(client->host); + + freez(client->proxy_passwd); + freez(client->proxy_uname); if (client->ssl) SSL_free(client->ssl); @@ -269,11 +265,8 @@ void mqtt_wss_destroy(mqtt_wss_client client) if (client->sockfd > 0) close(client->sockfd); - pthread_mutex_destroy(&client->pub_lock); - pthread_mutex_destroy(&client->stat_lock); - mqtt_wss_log_ctx_destroy(client->log); - mw_free(client); + freez(client); } static int cert_verify_callback(int preverify_ok, X509_STORE_CTX *ctx) @@ -298,7 +291,7 @@ static int cert_verify_callback(int preverify_ok, X509_STORE_CTX *ctx) mws_error(client->log, "verify error:num=%d:%s:depth=%d:%s", err, X509_verify_cert_error_string(err), depth, err_str); - mw_free(err_str); + freez(err_str); } if (!preverify_ok && err == X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT && @@ -362,14 +355,14 @@ static int http_parse_reply(mqtt_wss_client client, rbuf_t buf) } if (http_code != 200) { - ptr = mw_malloc(idx + 1); + ptr = mallocz(idx + 1); if (!ptr) return 6; rbuf_pop(buf, ptr, idx); ptr[idx] = 0; mws_error(client->log, "http_proxy returned error code %d \"%s\"", http_code, ptr); - mw_free(ptr); + freez(ptr); return 7; }/* else rbuf_bump_tail(buf, idx);*/ @@ -450,7 +443,7 @@ static int http_proxy_connect(mqtt_wss_client client) if (client->proxy_uname) { size_t creds_plain_len = strlen(client->proxy_uname) + strlen(client->proxy_passwd) + 2; - char *creds_plain = mw_malloc(creds_plain_len); + char *creds_plain = mallocz(creds_plain_len); if (!creds_plain) { mws_error(client->log, "OOM creds_plain"); rc = 6; @@ -460,9 +453,9 @@ static int http_proxy_connect(mqtt_wss_client client) // OpenSSL encoder puts newline every 64 output bytes // we remove those but during encoding we need that space in the buffer creds_base64_len += (1+(creds_base64_len/64)) * strlen("\n"); - char *creds_base64 = mw_malloc(creds_base64_len + 1); + char *creds_base64 = mallocz(creds_base64_len + 1); if (!creds_base64) { - mw_free(creds_plain); + freez(creds_plain); mws_error(client->log, "OOM creds_base64"); rc = 6; goto cleanup; @@ -475,12 +468,12 @@ static int http_proxy_connect(mqtt_wss_client client) int b64_len; base64_encode_helper((unsigned char*)creds_base64, &b64_len, (unsigned char*)creds_plain, strlen(creds_plain)); - mw_free(creds_plain); + freez(creds_plain); r_buf_ptr = rbuf_get_linear_insert_range(r_buf, &r_buf_linear_insert_capacity); snprintf(r_buf_ptr, r_buf_linear_insert_capacity,"Proxy-Authorization: Basic %s" HTTP_ENDLINE, creds_base64); write(client->sockfd, r_buf_ptr, strlen(r_buf_ptr)); - mw_free(creds_base64); + freez(creds_base64); } write(client->sockfd, HTTP_ENDLINE, strlen(HTTP_ENDLINE)); @@ -523,15 +516,14 @@ cleanup: return rc; } -int mqtt_wss_connect(mqtt_wss_client client, char *host, int port, struct mqtt_connect_params *mqtt_params, int ssl_flags, struct mqtt_wss_proxy *proxy) +int mqtt_wss_connect( + mqtt_wss_client client, + char *host, + int port, + struct mqtt_connect_params *mqtt_params, + int ssl_flags, + struct mqtt_wss_proxy *proxy) { - struct sockaddr_in addr; - memset(&addr, 0, sizeof(addr)); - addr.sin_family = AF_INET; - - struct hostent *he; - struct in_addr **addr_list; - if (!mqtt_params) { mws_error(client->log, "mqtt_params can't be null!"); return -1; @@ -545,23 +537,35 @@ int mqtt_wss_connect(mqtt_wss_client client, char *host, int port, struct mqtt_c if (client->target_host == client->host) client->target_host = NULL; + if (client->target_host) - mw_free(client->target_host); + freez(client->target_host); + if (client->host) - mw_free(client->host); + freez(client->host); + + if (client->proxy_uname) { + freez(client->proxy_uname); + client->proxy_uname = NULL; + } + + if (client->proxy_passwd) { + freez(client->proxy_passwd); + client->proxy_passwd = NULL; + } if (proxy && proxy->type != MQTT_WSS_DIRECT) { - client->host = mw_strdup(proxy->host); + client->host = strdupz(proxy->host); client->port = proxy->port; - client->target_host = mw_strdup(host); + client->target_host = strdupz(host); client->target_port = port; client->proxy_type = proxy->type; if (proxy->username) - client->proxy_uname = mw_strdup(proxy->username); + client->proxy_uname = strdupz(proxy->username); if (proxy->password) - client->proxy_passwd = mw_strdup(proxy->password); + client->proxy_passwd = strdupz(proxy->password); } else { - client->host = mw_strdup(host); + client->host = strdupz(host); client->port = port; client->target_host = client->host; client->target_port = port; @@ -569,30 +573,19 @@ int mqtt_wss_connect(mqtt_wss_client client, char *host, int port, struct mqtt_c client->ssl_flags = ssl_flags; - //TODO gethostbyname -> getaddinfo - // hstrerror -> gai_strerror - if ((he = gethostbyname(client->host)) == NULL) { - mws_error(client->log, "gethostbyname() error \"%s\"", hstrerror(h_errno)); - return -1; - } - - addr_list = (struct in_addr **)he->h_addr_list; - if(!addr_list[0]) { - mws_error(client->log, "No IP addr resolved"); - return -1; - } - mws_debug(client->log, "Resolved IP: %s", inet_ntoa(*addr_list[0])); - addr.sin_addr = *addr_list[0]; - addr.sin_port = htons(client->port); - if (client->sockfd > 0) close(client->sockfd); - client->sockfd = socket(AF_INET, SOCK_STREAM | DEFAULT_SOCKET_FLAGS, 0); - if (client->sockfd < 0) { - mws_error(client->log, "Couldn't create socket()"); - return -1; + + char port_str[16]; + snprintf(port_str, sizeof(port_str) -1, "%d", client->port); + int fd = connect_to_this_ip46(IPPROTO_TCP, SOCK_STREAM, client->host, 0, port_str, NULL); + if (fd < 0) { + mws_error(client->log, "Could not connect to remote endpoint \"%s\", port %d.\n", client->host, port); + return -3; } + client->sockfd = fd; + #ifndef SOCK_CLOEXEC int flags = fcntl(client->sockfd, F_GETFD); if (flags != -1) @@ -600,19 +593,10 @@ int mqtt_wss_connect(mqtt_wss_client client, char *host, int port, struct mqtt_c #endif int flag = 1; - int result = setsockopt(client->sockfd, - IPPROTO_TCP, - TCP_NODELAY, - &flag, - sizeof(int)); + int result = setsockopt(client->sockfd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(int)); if (result < 0) mws_error(client->log, "Could not dissable NAGLE"); - if (connect(client->sockfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) { - mws_error(client->log, "Could not connect to remote endpoint \"%s\", port %d.\n", client->host, client->port); - return -3; - } - client->poll_fds[POLLFD_SOCKET].fd = client->sockfd; if (fcntl(client->sockfd, F_SETFL, fcntl(client->sockfd, F_GETFL, 0) | O_NONBLOCK) == -1) { @@ -640,6 +624,7 @@ int mqtt_wss_connect(mqtt_wss_client client, char *host, int port, struct mqtt_c // free SSL structs from possible previous connections if (client->ssl) SSL_free(client->ssl); + if (client->ssl_ctx) SSL_CTX_free(client->ssl_ctx); @@ -675,6 +660,7 @@ int mqtt_wss_connect(mqtt_wss_client client, char *host, int port, struct mqtt_c mws_error(client->log, "SSL could not connect"); return -5; } + if (result == -1) { int ec = SSL_get_error(client->ssl, result); if (ec != SSL_ERROR_WANT_READ && ec != SSL_ERROR_WANT_WRITE) { @@ -693,14 +679,16 @@ int mqtt_wss_connect(mqtt_wss_client client, char *host, int port, struct mqtt_c auth.username_free = NULL; auth.password = (char*)mqtt_params->password; auth.password_free = NULL; + struct mqtt_lwt_properties lwt; lwt.will_topic = (char*)mqtt_params->will_topic; lwt.will_topic_free = NULL; lwt.will_message = (void*)mqtt_params->will_msg; lwt.will_message_free = NULL; // TODO expose no copy version to API lwt.will_message_size = mqtt_params->will_msg_len; - lwt.will_qos = (mqtt_params->will_flags & MQTT_WSS_PUB_QOSMASK); - lwt.will_retain = mqtt_params->will_flags & MQTT_WSS_PUB_RETAIN; + lwt.will_qos = (int) (mqtt_params->will_flags & MQTT_WSS_PUB_QOSMASK); + lwt.will_retain = (int) mqtt_params->will_flags & MQTT_WSS_PUB_RETAIN; + int ret = mqtt_ng_connect(client->mqtt, &auth, mqtt_params->will_msg ? &lwt : NULL, 1, client->mqtt_keepalive); if (ret) { mws_error(client->log, "Error generating MQTT connect"); @@ -955,9 +943,9 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms) #ifdef DEBUG_ULTRA_VERBOSE mws_debug(client->log, "SSL_Read: Read %d.", ret); #endif - pthread_mutex_lock(&client->stat_lock); + spinlock_lock(&client->stat_lock); client->stats.bytes_rx += ret; - pthread_mutex_unlock(&client->stat_lock); + spinlock_unlock(&client->stat_lock); rbuf_bump_head(client->ws_client->buf_read, ret); } else { int errnobkp = errno; @@ -1023,9 +1011,9 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms) #ifdef DEBUG_ULTRA_VERBOSE mws_debug(client->log, "SSL_Write: Written %d of avail %d.", ret, size); #endif - pthread_mutex_lock(&client->stat_lock); + spinlock_lock(&client->stat_lock); client->stats.bytes_tx += ret; - pthread_mutex_unlock(&client->stat_lock); + spinlock_unlock(&client->stat_lock); rbuf_bump_tail(client->ws_client->buf_write, ret); } else { int errnobkp = errno; @@ -1115,10 +1103,10 @@ int mqtt_wss_subscribe(mqtt_wss_client client, char *topic, int max_qos_level) struct mqtt_wss_stats mqtt_wss_get_stats(mqtt_wss_client client) { struct mqtt_wss_stats current; - pthread_mutex_lock(&client->stat_lock); + spinlock_lock(&client->stat_lock); current = client->stats; memset(&client->stats, 0, sizeof(client->stats)); - pthread_mutex_unlock(&client->stat_lock); + spinlock_unlock(&client->stat_lock); mqtt_ng_get_stats(client->mqtt, ¤t.mqtt); return current; } diff --git a/src/aclk/mqtt_websockets/mqtt_wss_log.c b/src/aclk/mqtt_websockets/mqtt_wss_log.c index 5e606c12..e5da76fc 100644 --- a/src/aclk/mqtt_websockets/mqtt_wss_log.c +++ b/src/aclk/mqtt_websockets/mqtt_wss_log.c @@ -25,13 +25,13 @@ struct mqtt_wss_log_ctx { #endif mqtt_wss_log_ctx_t mqtt_wss_log_ctx_create(const char *ctx_prefix, mqtt_wss_log_callback_t log_callback) { - mqtt_wss_log_ctx_t ctx = mw_calloc(1, sizeof(struct mqtt_wss_log_ctx)); + mqtt_wss_log_ctx_t ctx = callocz(1, sizeof(struct mqtt_wss_log_ctx)); if(!ctx) return NULL; if(log_callback) { ctx->extern_log_fnc = log_callback; - ctx->buffer = mw_calloc(1, LOG_BUFFER_SIZE); + ctx->buffer = callocz(1, LOG_BUFFER_SIZE); if(!ctx->buffer) goto cleanup; @@ -60,15 +60,15 @@ mqtt_wss_log_ctx_t mqtt_wss_log_ctx_create(const char *ctx_prefix, mqtt_wss_log_ return ctx; cleanup: - mw_free(ctx); + freez(ctx); return NULL; } void mqtt_wss_log_ctx_destroy(mqtt_wss_log_ctx_t ctx) { - mw_free(ctx->ctx_prefix); - mw_free(ctx->buffer); - mw_free(ctx); + freez(ctx->ctx_prefix); + freez(ctx->buffer); + freez(ctx); } static inline char severity_to_c(int severity) diff --git a/src/aclk/mqtt_websockets/ws_client.c b/src/aclk/mqtt_websockets/ws_client.c index 240e889c..a6b9b23f 100644 --- a/src/aclk/mqtt_websockets/ws_client.c +++ b/src/aclk/mqtt_websockets/ws_client.c @@ -53,7 +53,7 @@ ws_client *ws_client_new(size_t buf_size, char **host, mqtt_wss_log_ctx_t log) if(!host) return NULL; - client = mw_calloc(1, sizeof(ws_client)); + client = callocz(1, sizeof(ws_client)); if (!client) return NULL; @@ -87,7 +87,7 @@ cleanup_2: cleanup_1: rbuf_free(client->buf_read); cleanup: - mw_free(client); + freez(client); return NULL; } @@ -99,7 +99,7 @@ void ws_client_free_headers(ws_client *client) while (ptr) { tmp = ptr; ptr = ptr->next; - mw_free(tmp); + freez(tmp); } client->hs.headers = NULL; @@ -110,25 +110,28 @@ void ws_client_free_headers(ws_client *client) void ws_client_destroy(ws_client *client) { ws_client_free_headers(client); - mw_free(client->hs.nonce_reply); - mw_free(client->hs.http_reply_msg); + freez(client->hs.nonce_reply); + freez(client->hs.http_reply_msg); close(client->entropy_fd); rbuf_free(client->buf_read); rbuf_free(client->buf_write); rbuf_free(client->buf_to_mqtt); - mw_free(client); + freez(client); } void ws_client_reset(ws_client *client) { ws_client_free_headers(client); - mw_free(client->hs.nonce_reply); + freez(client->hs.nonce_reply); client->hs.nonce_reply = NULL; - mw_free(client->hs.http_reply_msg); + + freez(client->hs.http_reply_msg); client->hs.http_reply_msg = NULL; + rbuf_flush(client->buf_read); rbuf_flush(client->buf_write); rbuf_flush(client->buf_to_mqtt); + client->state = WS_RAW; client->hs.hdr_state = WS_HDR_HTTP; client->rx.parse_state = WS_FIRST_2BYTES; @@ -158,31 +161,11 @@ int ws_client_want_write(ws_client *client) return rbuf_bytes_available(client->buf_write); } -#define RAND_SRC "/dev/urandom" -static int ws_client_get_nonce(ws_client *client, char *dest, unsigned int size) -{ - // we do not need crypto secure random here - // it's just used for protocol negotiation - int rd; - int f = open(RAND_SRC, O_RDONLY | O_CLOEXEC); - if (f < 0) { - ERROR("Error opening \"%s\". Err: \"%s\"", RAND_SRC, strerror(errno)); - return -2; - } - - if ((rd = read(f, dest, size)) > 0) { - close(f); - return rd; - } - close(f); - return -1; -} - #define WEBSOCKET_NONCE_SIZE 16 #define TEMP_BUF_SIZE 4096 int ws_client_start_handshake(ws_client *client) { - char nonce[WEBSOCKET_NONCE_SIZE]; + nd_uuid_t nonce; char nonce_b64[256]; char second[TEMP_BUF_SIZE]; unsigned int md_len; @@ -190,16 +173,15 @@ int ws_client_start_handshake(ws_client *client) EVP_MD_CTX *md_ctx; const EVP_MD *md; - if(!*client->host) { + if(!client->host || !*client->host) { ERROR("Hostname has not been set. We should not be able to come here!"); return 1; } - ws_client_get_nonce(client, nonce, WEBSOCKET_NONCE_SIZE); + uuid_generate_random(nonce); EVP_EncodeBlock((unsigned char *)nonce_b64, (const unsigned char *)nonce, WEBSOCKET_NONCE_SIZE); - snprintf(second, TEMP_BUF_SIZE, websocket_upgrage_hdr, - *client->host, - nonce_b64); + snprintf(second, TEMP_BUF_SIZE, websocket_upgrage_hdr, *client->host, nonce_b64); + if(rbuf_bytes_free(client->buf_write) < strlen(second)) { ERROR("Write buffer capacity too low."); return 1; @@ -236,10 +218,10 @@ int ws_client_start_handshake(ws_client *client) EVP_DigestUpdate(md_ctx, second, strlen(second)); EVP_DigestFinal_ex(md_ctx, digest, &md_len); - EVP_EncodeBlock((unsigned char *)nonce_b64, digest, md_len); + EVP_EncodeBlock((unsigned char *)nonce_b64, digest, (int) md_len); - mw_free(client->hs.nonce_reply); - client->hs.nonce_reply = mw_strdup(nonce_b64); + freez(client->hs.nonce_reply); + client->hs.nonce_reply = strdupz(nonce_b64); OPENSSL_free(digest); @@ -263,7 +245,7 @@ int ws_client_start_handshake(ws_client *client) if (rbuf_bytes_available(client->buf_read) < x) \ return WS_CLIENT_NEED_MORE_BYTES; -#define MAX_HTTP_LINE_LENGTH 1024*4 +#define MAX_HTTP_LINE_LENGTH (1024 * 4) #define HTTP_SC_LENGTH 4 // "XXX " http status code as C string #define WS_CLIENT_HTTP_HDR "HTTP/1.1 " #define WS_CONN_ACCEPT "sec-websocket-accept" @@ -278,11 +260,11 @@ int ws_client_start_handshake(ws_client *client) #error "Buffer too small" #endif -#define HTTP_HDR_LINE_CHECK_LIMIT(x) if ((x) >= MAX_HTTP_LINE_LENGTH) \ -{ \ - ERROR("HTTP line received is too long. Maximum is %d", MAX_HTTP_LINE_LENGTH); \ - return WS_CLIENT_PROTOCOL_ERROR; \ -} +#define HTTP_HDR_LINE_CHECK_LIMIT(x) \ + if ((x) >= MAX_HTTP_LINE_LENGTH) { \ + ERROR("HTTP line received is too long. Maximum is %d", MAX_HTTP_LINE_LENGTH); \ + return WS_CLIENT_PROTOCOL_ERROR; \ + } int ws_client_parse_handshake_resp(ws_client *client) { @@ -290,6 +272,7 @@ int ws_client_parse_handshake_resp(ws_client *client) int idx_crlf, idx_sep; char *ptr; size_t bytes; + switch (client->hs.hdr_state) { case WS_HDR_HTTP: BUF_READ_CHECK_AT_LEAST(strlen(WS_CLIENT_HTTP_HDR)) @@ -297,6 +280,7 @@ int ws_client_parse_handshake_resp(ws_client *client) rbuf_bump_tail(client->buf_read, strlen(WS_CLIENT_HTTP_HDR)); client->hs.hdr_state = WS_HDR_RC; break; + case WS_HDR_RC: BUF_READ_CHECK_AT_LEAST(HTTP_SC_LENGTH); // "XXX " http return code rbuf_pop(client->buf_read, buf, HTTP_SC_LENGTH); @@ -312,6 +296,7 @@ int ws_client_parse_handshake_resp(ws_client *client) } client->hs.hdr_state = WS_HDR_ENDLINE; break; + case WS_HDR_ENDLINE: ptr = rbuf_find_bytes(client->buf_read, WS_HTTP_NEWLINE, strlen(WS_HTTP_NEWLINE), &idx_crlf); if (!ptr) { @@ -321,12 +306,13 @@ int ws_client_parse_handshake_resp(ws_client *client) } HTTP_HDR_LINE_CHECK_LIMIT(idx_crlf); - client->hs.http_reply_msg = mw_malloc(idx_crlf+1); + client->hs.http_reply_msg = mallocz(idx_crlf+1); rbuf_pop(client->buf_read, client->hs.http_reply_msg, idx_crlf); client->hs.http_reply_msg[idx_crlf] = 0; rbuf_bump_tail(client->buf_read, strlen(WS_HTTP_NEWLINE)); client->hs.hdr_state = WS_HDR_PARSE_HEADERS; break; + case WS_HDR_PARSE_HEADERS: ptr = rbuf_find_bytes(client->buf_read, WS_HTTP_NEWLINE, strlen(WS_HTTP_NEWLINE), &idx_crlf); if (!ptr) { @@ -357,7 +343,7 @@ int ws_client_parse_handshake_resp(ws_client *client) return WS_CLIENT_PROTOCOL_ERROR; } - struct http_header *hdr = mw_calloc(1, sizeof(struct http_header) + idx_crlf); //idx_crlf includes ": " that will be used as 2 \0 bytes + struct http_header *hdr = callocz(1, sizeof(struct http_header) + idx_crlf); //idx_crlf includes ": " that will be used as 2 \0 bytes hdr->key = ((char*)hdr) + sizeof(struct http_header); hdr->value = hdr->key + idx_sep + 1; @@ -384,6 +370,7 @@ int ws_client_parse_handshake_resp(ws_client *client) } break; + case WS_HDR_PARSE_DONE: if (!client->hs.nonce_matched) { ERROR("Missing " WS_CONN_ACCEPT " header"); @@ -398,6 +385,7 @@ int ws_client_parse_handshake_resp(ws_client *client) client->hs.hdr_state = WS_HDR_ALL_DONE; INFO("Websocket Connection Accepted By Server"); return WS_CLIENT_PARSING_DONE; + case WS_HDR_ALL_DONE: FATAL("This is error we should never come here!"); return WS_CLIENT_PROTOCOL_ERROR; @@ -642,7 +630,7 @@ int ws_client_process_rx_ws(ws_client *client) break; case WS_PAYLOAD_CONNECTION_CLOSE_MSG: if (!client->rx.specific_data.op_close.reason) - client->rx.specific_data.op_close.reason = mw_malloc(client->rx.payload_length + 1); + client->rx.specific_data.op_close.reason = mallocz(client->rx.payload_length + 1); while (client->rx.payload_processed < client->rx.payload_length) { if (!rbuf_bytes_available(client->buf_read)) @@ -655,7 +643,7 @@ int ws_client_process_rx_ws(ws_client *client) INFO("WebSocket server closed the connection with EC=%d and reason \"%s\"", client->rx.specific_data.op_close.ec, client->rx.specific_data.op_close.reason); - mw_free(client->rx.specific_data.op_close.reason); + freez(client->rx.specific_data.op_close.reason); client->rx.specific_data.op_close.reason = NULL; client->rx.parse_state = WS_PACKET_DONE; break; @@ -672,7 +660,7 @@ int ws_client_process_rx_ws(ws_client *client) return WS_CLIENT_INTERNAL_ERROR; } BUF_READ_CHECK_AT_LEAST(client->rx.payload_length); - client->rx.specific_data.ping_msg = mw_malloc(client->rx.payload_length); + client->rx.specific_data.ping_msg = mallocz(client->rx.payload_length); rbuf_pop(client->buf_read, client->rx.specific_data.ping_msg, client->rx.payload_length); // TODO schedule this instead of sending right away // then attempt to send as soon as buffer space clears up diff --git a/src/aclk/schema-wrappers/alarm_stream.cc b/src/aclk/schema-wrappers/alarm_stream.cc index 29d80e39..c0b41bb0 100644 --- a/src/aclk/schema-wrappers/alarm_stream.cc +++ b/src/aclk/schema-wrappers/alarm_stream.cc @@ -22,6 +22,7 @@ struct start_alarm_streaming parse_start_alarm_streaming(const char *data, size_ ret.node_id = strdupz(msg.node_id().c_str()); ret.resets = msg.resets(); + ret.version = msg.version(); return ret; } @@ -37,6 +38,7 @@ struct send_alarm_checkpoint parse_send_alarm_checkpoint(const char *data, size_ ret.node_id = strdupz(msg.node_id().c_str()); ret.claim_id = strdupz(msg.claim_id().c_str()); + ret.version = msg.version(); return ret; } @@ -118,6 +120,7 @@ static void fill_alarm_log_entry(struct alarm_log_entry *data, AlarmLogEntry *pr proto->set_transition_id(data->transition_id); proto->set_chart_name(data->chart_name); proto->set_summary(data->summary); + proto->set_alert_version(data->version); } char *generate_alarm_log_entry(size_t *len, struct alarm_log_entry *data) diff --git a/src/aclk/schema-wrappers/alarm_stream.h b/src/aclk/schema-wrappers/alarm_stream.h index 3c81ff44..6e1936b0 100644 --- a/src/aclk/schema-wrappers/alarm_stream.h +++ b/src/aclk/schema-wrappers/alarm_stream.h @@ -13,6 +13,7 @@ extern "C" { struct start_alarm_streaming { char *node_id; + uint64_t version; bool resets; }; @@ -36,8 +37,6 @@ struct alarm_log_entry { char *name; char *family; - uint64_t batch_id; - uint64_t sequence_id; uint64_t when; char *config_hash; @@ -76,13 +75,22 @@ struct alarm_log_entry { char *chart_name; uint64_t event_id; + uint64_t version; char *transition_id; char *summary; + + // local book keeping + int64_t health_log_id; + int64_t alarm_id; + int64_t unique_id; + int64_t sequence_id; }; struct send_alarm_checkpoint { char *node_id; char *claim_id; + uint64_t version; + uint64_t when_end; }; struct alarm_checkpoint { |