diff options
Diffstat (limited to 'aclk')
-rw-r--r-- | aclk/README.md | 34 | ||||
-rw-r--r-- | aclk/aclk.c | 31 | ||||
-rw-r--r-- | aclk/aclk.h | 4 | ||||
-rw-r--r-- | aclk/aclk_alarm_api.c | 10 | ||||
-rw-r--r-- | aclk/aclk_alarm_api.h | 2 | ||||
-rw-r--r-- | aclk/aclk_capas.c | 42 | ||||
-rw-r--r-- | aclk/aclk_query.c | 251 | ||||
-rw-r--r-- | aclk/aclk_query.h | 2 | ||||
-rw-r--r-- | aclk/aclk_query_queue.c | 2 | ||||
-rw-r--r-- | aclk/aclk_query_queue.h | 2 | ||||
-rw-r--r-- | aclk/aclk_rx_msgs.c | 55 | ||||
-rw-r--r-- | aclk/aclk_util.c | 8 | ||||
-rw-r--r-- | aclk/aclk_util.h | 2 | ||||
-rw-r--r-- | aclk/schema-wrappers/agent_cmds.cc | 38 | ||||
-rw-r--r-- | aclk/schema-wrappers/agent_cmds.h | 27 | ||||
-rw-r--r-- | aclk/schema-wrappers/alarm_stream.cc | 80 | ||||
-rw-r--r-- | aclk/schema-wrappers/alarm_stream.h | 47 | ||||
-rw-r--r-- | aclk/schema-wrappers/proto_2_json.cc | 11 | ||||
-rw-r--r-- | aclk/schema-wrappers/schema_wrappers.h | 1 |
19 files changed, 349 insertions, 300 deletions
diff --git a/aclk/README.md b/aclk/README.md index 5b338dc2e..4f4693025 100644 --- a/aclk/README.md +++ b/aclk/README.md @@ -1,15 +1,4 @@ -<!-- -title: "Agent-Cloud link (ACLK)" -description: "The Agent-Cloud link (ACLK) is the mechanism responsible for connecting a Netdata agent to Netdata Cloud." -date: "2020-05-11" -custom_edit_url: "https://github.com/netdata/netdata/edit/master/aclk/README.md" -sidebar_label: "Agent-Cloud link (ACLK)" -learn_status: "Published" -learn_topic_type: "Tasks" -learn_rel_path: "Setup" ---> - -# Agent-cloud link (ACLK) +# Agent-Cloud link (ACLK) The Agent-Cloud link (ACLK) is the mechanism responsible for securely connecting a Netdata Agent to your web browser through Netdata Cloud. The ACLK establishes an outgoing secure WebSocket (WSS) connection to Netdata Cloud on port @@ -20,29 +9,22 @@ The Cloud App lives at app.netdata.cloud which currently resolves to the followi - 54.198.178.11 - 44.207.131.212 - 44.196.50.41 - -:::caution -This list of IPs can change without notice, we strongly advise you to whitelist following domains `api.netdata.cloud`, `mqtt.netdata.cloud`, if -this is not an option in your case always verify the current domain resolution (e.g via the `host` command). +> ### Caution +> +>This list of IPs can change without notice, we strongly advise you to whitelist following domains `api.netdata.cloud`, `mqtt.netdata.cloud`, if this is not an option in your case always verify the current domain resolution (e.g via the `host` command). -::: - -For a guide to connecting a node using the ACLK, plus additional troubleshooting and reference information, read our [get -started with Cloud](https://github.com/netdata/netdata/blob/master/docs/cloud/get-started.mdx) guide or the full [connect to Cloud +For a guide to connecting a node using the ACLK, plus additional troubleshooting and reference information, read our [connect to Cloud documentation](https://github.com/netdata/netdata/blob/master/claim/README.md). ## Data privacy + [Data privacy](https://netdata.cloud/privacy/) is very important to us. We firmly believe that your data belongs to you. This is why **we don't store any metric data in Netdata Cloud**. -All the data that you see in the web browser when using Netdata Cloud, is actually streamed directly from the Netdata Agent to the Netdata Cloud dashboard. -The data passes through our systems, but it isn't stored. - -However, to be able to offer the stunning visualizations and advanced functionality of Netdata Cloud, it does store a limited number of _metadata_. - -Read more about [Data privacy in the Netdata Cloud](https://github.com/netdata/netdata/blob/master/docs/cloud/data-privacy.mdx) in the documentation. +All the data that you see in the web browser when using Netdata Cloud, is actually streamed directly from the Netdata Agent to the Netdata Cloud dashboard. The data passes through our systems, but it isn't stored. +However, to be able to offer the stunning visualizations and advanced functionality of Netdata Cloud, it does store a limited number of _metadata_. Read more about our [security and privacy design](https://github.com/netdata/netdata/blob/master/docs/netdata-security.md). ## Enable and configure the ACLK diff --git a/aclk/aclk.c b/aclk/aclk.c index e80897221..399bc9876 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -49,8 +49,6 @@ float last_backoff_value = 0; time_t aclk_block_until = 0; -int aclk_alert_reloaded = 0; //1 on health log exchange, and again on health_reload - #ifdef ENABLE_ACLK mqtt_wss_client mqttwss_client; @@ -249,14 +247,10 @@ void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str) } } -//TODO prevent big buffer on stack -#define RX_MSGLEN_MAX 4096 static void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) { UNUSED(qos); aclk_rcvd_cloud_msgs++; - if (msglen > RX_MSGLEN_MAX) - error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX); debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d", topic, qos); @@ -870,6 +864,11 @@ void aclk_send_node_instances() uuid_unparse_lower(list->host_id, host_id); RRDHOST *host = rrdhost_find_by_guid(host_id); + if (unlikely(!host)) { + freez((void*)node_state_update.node_id); + freez(query); + continue; + } node_state_update.capabilities = aclk_get_node_instance_capas(host); rrdhost_aclk_state_lock(localhost); @@ -927,14 +926,10 @@ static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host) } buffer_sprintf(wb, "\n\t\tUpdates: %d" - "\n\t\tBatch ID: %"PRIu64 - "\n\t\tLast Acked Seq ID: %"PRIu64 "\n\t\tPending Min Seq ID: %"PRIu64 "\n\t\tPending Max Seq ID: %"PRIu64 "\n\t\tLast Submitted Seq ID: %"PRIu64, status.alert_updates, - status.alerts_batch_id, - status.last_acked_sequence_id, status.pending_min_sequence_id, status.pending_max_sequence_id, status.last_submitted_sequence_id @@ -1042,12 +1037,6 @@ static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host) json_object *tmp = json_object_new_int(status.alert_updates); json_object_object_add(obj, "updates", tmp); - tmp = json_object_new_int(status.alerts_batch_id); - json_object_object_add(obj, "batch-id", tmp); - - tmp = json_object_new_int(status.last_acked_sequence_id); - json_object_object_add(obj, "last-acked-seq-id", tmp); - tmp = json_object_new_int(status.pending_min_sequence_id); json_object_object_add(obj, "pending-min-seq-id", tmp); @@ -1216,9 +1205,9 @@ void add_aclk_host_labels(void) { #endif } -void aclk_queue_node_info(RRDHOST *host) { - struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *) host->dbsync_worker; - if (likely(wc)) { - wc->node_info_send = 1; - } +void aclk_queue_node_info(RRDHOST *host, bool immediate) +{ + struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *) host->aclk_sync_host_config; + if (likely(wc)) + wc->node_info_send_time = (host == localhost || immediate) ? 1 : now_realtime_sec(); } diff --git a/aclk/aclk.h b/aclk/aclk.h index 56b24add9..bd8375fb5 100644 --- a/aclk/aclk.h +++ b/aclk/aclk.h @@ -26,8 +26,6 @@ extern time_t aclk_block_until; extern int disconnect_req; -extern int aclk_alert_reloaded; - #ifdef ENABLE_ACLK void *aclk_main(void *ptr); @@ -54,6 +52,6 @@ void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, con char *aclk_state(void); char *aclk_state_json(void); void add_aclk_host_labels(void); -void aclk_queue_node_info(RRDHOST *host); +void aclk_queue_node_info(RRDHOST *host, bool immediate); #endif /* ACLK_H */ diff --git a/aclk/aclk_alarm_api.c b/aclk/aclk_alarm_api.c index 7df51a7b5..664671f70 100644 --- a/aclk/aclk_alarm_api.c +++ b/aclk/aclk_alarm_api.c @@ -8,12 +8,12 @@ #include "aclk.h" -void aclk_send_alarm_log_health(struct alarm_log_health *log_health) +void aclk_send_provide_alarm_checkpoint(struct alarm_checkpoint *checkpoint) { - aclk_query_t query = aclk_query_new(ALARM_LOG_HEALTH); - query->data.bin_payload.payload = generate_alarm_log_health(&query->data.bin_payload.size, log_health); - query->data.bin_payload.topic = ACLK_TOPICID_ALARM_HEALTH; - query->data.bin_payload.msg_name = "AlarmLogHealth"; + aclk_query_t query = aclk_query_new(ALARM_PROVIDE_CHECKPOINT); + query->data.bin_payload.payload = generate_alarm_checkpoint(&query->data.bin_payload.size, checkpoint); + query->data.bin_payload.topic = ACLK_TOPICID_ALARM_CHECKPOINT; + query->data.bin_payload.msg_name = "AlarmCheckpoint"; QUEUE_IF_PAYLOAD_PRESENT(query); } diff --git a/aclk/aclk_alarm_api.h b/aclk/aclk_alarm_api.h index e3fa92b5b..4d9d9447a 100644 --- a/aclk/aclk_alarm_api.h +++ b/aclk/aclk_alarm_api.h @@ -6,7 +6,7 @@ #include "../daemon/common.h" #include "schema-wrappers/schema_wrappers.h" -void aclk_send_alarm_log_health(struct alarm_log_health *log_health); +void aclk_send_provide_alarm_checkpoint(struct alarm_checkpoint *checkpoint); void aclk_send_alarm_log_entry(struct alarm_log_entry *log_entry); void aclk_send_provide_alarm_cfg(struct provide_alarm_configuration *cfg); void aclk_send_alarm_snapshot(alarm_snapshot_proto_ptr_t snapshot); diff --git a/aclk/aclk_capas.c b/aclk/aclk_capas.c index 290e7d8f4..55f6fd3b4 100644 --- a/aclk/aclk_capas.c +++ b/aclk/aclk_capas.c @@ -7,13 +7,16 @@ const struct capability *aclk_get_agent_capas() { static struct capability agent_capabilities[] = { - { .name = "json", .version = 2, .enabled = 0 }, - { .name = "proto", .version = 1, .enabled = 1 }, - { .name = "ml", .version = 0, .enabled = 0 }, - { .name = "mc", .version = 0, .enabled = 0 }, - { .name = "ctx", .version = 1, .enabled = 1 }, - { .name = "funcs", .version = 1, .enabled = 1 }, - { .name = NULL, .version = 0, .enabled = 0 } + { .name = "json", .version = 2, .enabled = 0 }, + { .name = "proto", .version = 1, .enabled = 1 }, + { .name = "ml", .version = 0, .enabled = 0 }, + { .name = "mc", .version = 0, .enabled = 0 }, + { .name = "ctx", .version = 1, .enabled = 1 }, + { .name = "funcs", .version = 1, .enabled = 1 }, + { .name = "http_api_v2", .version = 1, .enabled = 1 }, + { .name = "health", .version = 1, .enabled = 0 }, + { .name = "req_cancel", .version = 1, .enabled = 1 }, + { .name = NULL, .version = 0, .enabled = 0 } }; agent_capabilities[2].version = ml_capable() ? 1 : 0; agent_capabilities[2].enabled = ml_enabled(localhost); @@ -21,27 +24,34 @@ const struct capability *aclk_get_agent_capas() agent_capabilities[3].version = enable_metric_correlations ? metric_correlations_version : 0; agent_capabilities[3].enabled = enable_metric_correlations; + agent_capabilities[7].enabled = localhost->health.health_enabled; + return agent_capabilities; } struct capability *aclk_get_node_instance_capas(RRDHOST *host) { struct capability ni_caps[] = { - { .name = "proto", .version = 1, .enabled = 1 }, - { .name = "ml", .version = ml_capable(), .enabled = ml_enabled(host) }, + { .name = "proto", .version = 1, .enabled = 1 }, + { .name = "ml", .version = ml_capable(), .enabled = ml_enabled(host) }, { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, - { .name = "ctx", .version = 1, .enabled = 1 }, - { .name = "funcs", .version = 0, .enabled = 0 }, - { .name = NULL, .version = 0, .enabled = 0 } + { .name = "ctx", .version = 1, .enabled = 1 }, + { .name = "funcs", .version = 0, .enabled = 0 }, + { .name = "http_api_v2", .version = 2, .enabled = 1 }, + { .name = "health", .version = 1, .enabled = host->health.health_enabled }, + { .name = "req_cancel", .version = 1, .enabled = 1 }, + { .name = NULL, .version = 0, .enabled = 0 } }; - if (host == localhost || (host->receiver && stream_has_capability(host->receiver, STREAM_CAP_FUNCTIONS))) { - ni_caps[4].version = 1; - ni_caps[4].enabled = 1; - } struct capability *ret = mallocz(sizeof(ni_caps)); memcpy(ret, ni_caps, sizeof(ni_caps)); + + if (host == localhost || (host->receiver && stream_has_capability(host->receiver, STREAM_CAP_FUNCTIONS))) { + ret[4].version = 1; + ret[4].enabled = 1; + } + return ret; } diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index 9eced0811..46d1e1e5e 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -3,67 +3,97 @@ #include "aclk_query.h" #include "aclk_stats.h" #include "aclk_tx_msgs.h" +#include "../../web/server/web_client_cache.h" #define WEB_HDR_ACCEPT_ENC "Accept-Encoding:" +#define ACLK_MAX_WEB_RESPONSE_SIZE (30 * 1024 * 1024) pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER; pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER; #define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait) #define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait) -static usec_t aclk_web_api_v1_request(RRDHOST *host, struct web_client *w, char *url) -{ - usec_t t; +struct pending_req_list { + const char *msg_id; + uint32_t hash; - t = now_monotonic_high_precision_usec(); - w->response.code = web_client_api_request_v1(host, w, url); - t = now_monotonic_high_precision_usec() - t; + int canceled; - if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.cloud_q_process_total += t; - aclk_metrics_per_sample.cloud_q_process_count++; - if (aclk_metrics_per_sample.cloud_q_process_max < t) - aclk_metrics_per_sample.cloud_q_process_max = t; - ACLK_STATS_UNLOCK; - } + struct pending_req_list *next; +}; + +static struct pending_req_list *pending_req_list_head = NULL; +static pthread_mutex_t pending_req_list_lock = PTHREAD_MUTEX_INITIALIZER; - return t; +static struct pending_req_list *pending_req_list_add(const char *msg_id) +{ + struct pending_req_list *new = callocz(1, sizeof(struct pending_req_list)); + new->msg_id = msg_id; + new->hash = simple_hash(msg_id); + + pthread_mutex_lock(&pending_req_list_lock); + new->next = pending_req_list_head; + pending_req_list_head = new; + pthread_mutex_unlock(&pending_req_list_lock); + return new; } -static RRDHOST *node_id_2_rrdhost(const char *node_id) +void pending_req_list_rm(const char *msg_id) { - int res; - uuid_t node_id_bin, host_id_bin; + uint32_t hash = simple_hash(msg_id); + struct pending_req_list *prev = NULL; - RRDHOST *host = find_host_by_node_id((char *)node_id); - if (host) - return host; + pthread_mutex_lock(&pending_req_list_lock); + struct pending_req_list *curr = pending_req_list_head; - char host_id[UUID_STR_LEN]; - if (uuid_parse(node_id, node_id_bin)) { - error("Couldn't parse UUID %s", node_id); - return NULL; + while (curr) { + if (curr->hash == hash && strcmp(curr->msg_id, msg_id) == 0) { + if (prev) + prev->next = curr->next; + else + pending_req_list_head = curr->next; + + freez(curr); + break; + } + + prev = curr; + curr = curr->next; } - if ((res = get_host_id(&node_id_bin, &host_id_bin))) { - error("node not found rc=%d", res); - return NULL; + pthread_mutex_unlock(&pending_req_list_lock); +} + +int mark_pending_req_cancelled(const char *msg_id) +{ + uint32_t hash = simple_hash(msg_id); + + pthread_mutex_lock(&pending_req_list_lock); + struct pending_req_list *curr = pending_req_list_head; + + while (curr) { + if (curr->hash == hash && strcmp(curr->msg_id, msg_id) == 0) { + curr->canceled = 1; + pthread_mutex_unlock(&pending_req_list_lock); + return 0; + } + + curr = curr->next; } - uuid_unparse_lower(host_id_bin, host_id); - return rrdhost_find_by_guid(host_id); + pthread_mutex_unlock(&pending_req_list_lock); + return 1; } -#define NODE_ID_QUERY "/node/" -// TODO this function should be quarantied and written nicely -// lots of skeletons from initial ACLK Legacy impl. -// quick and dirty from the start -static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) +static bool aclk_web_client_interrupt_cb(struct web_client *w __maybe_unused, void *data) { + struct pending_req_list *req = (struct pending_req_list *)data; + return req->canceled; +} + +static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) { int retval = 0; - usec_t t; BUFFER *local_buffer = NULL; - BUFFER *log_buffer = buffer_create(NETDATA_WEB_REQUEST_URL_SIZE, &netdata_buffers_statistics.buffers_aclk); - RRDHOST *query_host = localhost; + size_t size = 0; + size_t sent = 0; #ifdef NETDATA_WITH_ZLIB int z_ret; @@ -71,73 +101,55 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) char *start, *end; #endif - struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client)); - w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE, &netdata_buffers_statistics.buffers_aclk); - w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE, &netdata_buffers_statistics.buffers_aclk); - w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE, &netdata_buffers_statistics.buffers_aclk); - strcpy(w->origin, "*"); // Simulate web_client_create_on_fd() - w->cookie1[0] = 0; // Simulate web_client_create_on_fd() - w->cookie2[0] = 0; // Simulate web_client_create_on_fd() + struct web_client *w = web_client_get_from_cache(); w->acl = WEB_CLIENT_ACL_ACLK; + w->mode = WEB_CLIENT_MODE_GET; + w->timings.tv_in = query->created_tv; - buffer_strcat(log_buffer, query->data.http_api_v2.query); - size_t size = 0; - size_t sent = 0; - w->tv_in = query->created_tv; - now_realtime_timeval(&w->tv_ready); - - if (query->timeout) { - int in_queue = (int) (dt_usec(&w->tv_in, &w->tv_ready) / 1000); - if (in_queue > query->timeout) { - log_access("QUERY CANCELED: QUEUE TIME EXCEEDED %d ms (LIMIT %d ms)", in_queue, query->timeout); - retval = 1; - w->response.code = HTTP_RESP_BACKEND_FETCH_FAILED; - aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, NULL, 0); - goto cleanup; - } + w->interrupt.callback = aclk_web_client_interrupt_cb; + w->interrupt.callback_data = pending_req_list_add(query->msg_id); + + usec_t t; + web_client_timeout_checkpoint_set(w, query->timeout); + if(web_client_timeout_checkpoint_and_check(w, &t)) { + log_access("QUERY CANCELED: QUEUE TIME EXCEEDED %llu ms (LIMIT %d ms)", t / USEC_PER_MS, query->timeout); + retval = 1; + w->response.code = HTTP_RESP_BACKEND_FETCH_FAILED; + aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, NULL, 0); + goto cleanup; } - if (!strncmp(query->data.http_api_v2.query, NODE_ID_QUERY, strlen(NODE_ID_QUERY))) { - char *node_uuid = query->data.http_api_v2.query + strlen(NODE_ID_QUERY); - char nodeid[UUID_STR_LEN]; - if (strlen(node_uuid) < (UUID_STR_LEN - 1)) { - error_report(CLOUD_EMSG_MALFORMED_NODE_ID); - retval = 1; - w->response.code = 404; - aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_MALFORMED_NODE_ID, CLOUD_EMSG_MALFORMED_NODE_ID, NULL, 0); - goto cleanup; - } - strncpyz(nodeid, node_uuid, UUID_STR_LEN - 1); - - query_host = node_id_2_rrdhost(nodeid); - if (!query_host) { - error_report("Host with node_id \"%s\" not found! Returning 404 to Cloud!", nodeid); - retval = 1; - w->response.code = 404; - aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_NODE_NOT_FOUND, CLOUD_EMSG_NODE_NOT_FOUND, NULL, 0); - goto cleanup; - } + web_client_decode_path_and_query_string(w, query->data.http_api_v2.query); + char *path = (char *)buffer_tostring(w->url_path_decoded); + + if (aclk_stats_enabled) { + char *url_path_endpoint = strrchr(path, '/'); + ACLK_STATS_LOCK; + int stat_idx = aclk_cloud_req_http_type_to_idx(url_path_endpoint ? url_path_endpoint + 1 : "other"); + aclk_metrics_per_sample.cloud_req_http_by_type[stat_idx]++; + ACLK_STATS_UNLOCK; } - char *mysep = strchr(query->data.http_api_v2.query, '?'); - if (mysep) { - url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1); - *mysep = '\0'; - } else - url_decode_r(w->decoded_query_string, query->data.http_api_v2.query, NETDATA_WEB_REQUEST_URL_SIZE + 1); + w->response.code = web_client_api_request_with_node_selection(localhost, w, path); + web_client_timeout_checkpoint_response_ready(w, &t); - mysep = strrchr(query->data.http_api_v2.query, '/'); + if(buffer_strlen(w->response.data) > ACLK_MAX_WEB_RESPONSE_SIZE) { + buffer_flush(w->response.data); + buffer_strcat(w->response.data, "response is too big"); + w->response.data->content_type = CT_TEXT_PLAIN; + w->response.code = HTTP_RESP_CONTENT_TOO_LONG; + } if (aclk_stats_enabled) { ACLK_STATS_LOCK; - int stat_idx = aclk_cloud_req_http_type_to_idx(mysep ? mysep + 1 : "other"); - aclk_metrics_per_sample.cloud_req_http_by_type[stat_idx]++; + aclk_metrics_per_sample.cloud_q_process_total += t; + aclk_metrics_per_sample.cloud_q_process_count++; + if (aclk_metrics_per_sample.cloud_q_process_max < t) + aclk_metrics_per_sample.cloud_q_process_max = t; ACLK_STATS_UNLOCK; } - // execute the query - t = aclk_web_api_v1_request(query_host, w, mysep ? mysep + 1 : "noop"); - size = (w->mode == WEB_CLIENT_MODE_FILECOPY) ? w->response.rlen : w->response.data->len; + size = w->response.data->len; sent = size; #ifdef NETDATA_WITH_ZLIB @@ -152,8 +164,8 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) w->response.zstream.zfree = Z_NULL; w->response.zstream.opaque = Z_NULL; if(deflateInit2(&w->response.zstream, web_gzip_level, Z_DEFLATED, 15 + 16, 8, web_gzip_strategy) == Z_OK) { - w->response.zinitialized = 1; - w->response.zoutput = 1; + w->response.zinitialized = true; + w->response.zoutput = true; } else error("Failed to initialize zlib. Proceeding without compression."); } @@ -189,10 +201,10 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) } #endif - w->response.data->date = w->tv_ready.tv_sec; + w->response.data->date = w->timings.tv_ready.tv_sec; web_client_build_http_header(w); local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE, &netdata_buffers_statistics.buffers_aclk); - local_buffer->contenttype = CT_APPLICATION_JSON; + local_buffer->content_type = CT_APPLICATION_JSON; buffer_strcat(local_buffer, w->response.header_output->buffer); @@ -217,7 +229,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) struct timeval tv; cleanup: - now_realtime_timeval(&tv); + now_monotonic_high_precision_timeval(&tv); log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'", w->id , gettid() @@ -226,24 +238,21 @@ cleanup: , sent , size , size > sent ? -(((size - sent) / (double)size) * 100.0) : ((size > 0) ? (((sent - size ) / (double)size) * 100.0) : 0.0) - , dt_usec(&w->tv_ready, &w->tv_in) / 1000.0 - , dt_usec(&tv, &w->tv_ready) / 1000.0 - , dt_usec(&tv, &w->tv_in) / 1000.0 + , dt_usec(&w->timings.tv_ready, &w->timings.tv_in) / 1000.0 + , dt_usec(&tv, &w->timings.tv_ready) / 1000.0 + , dt_usec(&tv, &w->timings.tv_in) / 1000.0 , w->response.code - , strip_control_characters((char *)buffer_tostring(log_buffer)) + , strip_control_characters((char *)buffer_tostring(w->url_as_received)) ); + web_client_release_to_cache(w); + + pending_req_list_rm(query->msg_id); + #ifdef NETDATA_WITH_ZLIB - if(w->response.zinitialized) - deflateEnd(&w->response.zstream); buffer_free(z_buffer); #endif - buffer_free(w->response.data); - buffer_free(w->response.header); - buffer_free(w->response.header_output); - freez(w); buffer_free(local_buffer); - buffer_free(log_buffer); return retval; } @@ -257,19 +266,19 @@ static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query) const char *aclk_query_get_name(aclk_query_type_t qt, int unknown_ok) { switch (qt) { - case HTTP_API_V2: return "http_api_request_v2"; - case REGISTER_NODE: return "register_node"; - case NODE_STATE_UPDATE: return "node_state_update"; - case CHART_DIMS_UPDATE: return "chart_and_dim_update"; - case CHART_CONFIG_UPDATED: return "chart_config_updated"; - case CHART_RESET: return "reset_chart_messages"; - case RETENTION_UPDATED: return "update_retention_info"; - case UPDATE_NODE_INFO: return "update_node_info"; - case ALARM_LOG_HEALTH: return "alarm_log_health"; - case ALARM_PROVIDE_CFG: return "provide_alarm_config"; - case ALARM_SNAPSHOT: return "alarm_snapshot"; - case UPDATE_NODE_COLLECTORS: return "update_node_collectors"; - case PROTO_BIN_MESSAGE: return "generic_binary_proto_message"; + case HTTP_API_V2: return "http_api_request_v2"; + case REGISTER_NODE: return "register_node"; + case NODE_STATE_UPDATE: return "node_state_update"; + case CHART_DIMS_UPDATE: return "chart_and_dim_update"; + case CHART_CONFIG_UPDATED: return "chart_config_updated"; + case CHART_RESET: return "reset_chart_messages"; + case RETENTION_UPDATED: return "update_retention_info"; + case UPDATE_NODE_INFO: return "update_node_info"; + case ALARM_PROVIDE_CHECKPOINT: return "alarm_checkpoint"; + case ALARM_PROVIDE_CFG: return "provide_alarm_config"; + case ALARM_SNAPSHOT: return "alarm_snapshot"; + case UPDATE_NODE_COLLECTORS: return "update_node_collectors"; + case PROTO_BIN_MESSAGE: return "generic_binary_proto_message"; default: if (!unknown_ok) error_report("Unknown query type used %d", (int) qt); diff --git a/aclk/aclk_query.h b/aclk/aclk_query.h index c006b0138..dbe6f9e5e 100644 --- a/aclk/aclk_query.h +++ b/aclk/aclk_query.h @@ -33,4 +33,6 @@ void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads); const char *aclk_query_get_name(aclk_query_type_t qt, int unknown_ok); +int mark_pending_req_cancelled(const char *msg_id); + #endif //NETDATA_AGENT_CLOUD_LINK_H diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c index e7cad5ded..78a906d96 100644 --- a/aclk/aclk_query_queue.c +++ b/aclk/aclk_query_queue.c @@ -20,7 +20,7 @@ static struct aclk_query_queue { static inline int _aclk_queue_query(aclk_query_t query) { - now_realtime_timeval(&query->created_tv); + now_monotonic_high_precision_timeval(&query->created_tv); query->created = now_realtime_usec(); ACLK_QUEUE_LOCK; diff --git a/aclk/aclk_query_queue.h b/aclk/aclk_query_queue.h index ab94b6384..944fc0797 100644 --- a/aclk/aclk_query_queue.h +++ b/aclk/aclk_query_queue.h @@ -19,7 +19,7 @@ typedef enum { CHART_RESET, RETENTION_UPDATED, UPDATE_NODE_INFO, - ALARM_LOG_HEALTH, + ALARM_PROVIDE_CHECKPOINT, ALARM_PROVIDE_CFG, ALARM_SNAPSHOT, UPDATE_NODE_COLLECTORS, diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index 104fbcb3e..60bff9ba1 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -6,6 +6,7 @@ #include "aclk_query_queue.h" #include "aclk.h" #include "aclk_capas.h" +#include "aclk_query.h" #include "schema-wrappers/proto_2_json.h" @@ -272,13 +273,12 @@ int create_node_instance_result(const char *msg, size_t msg_len) .live = 0, .queryable = 1, .session_id = aclk_session_newarch, - .node_id = res.node_id + .node_id = res.node_id, + .capabilities = NULL }; RRDHOST *host = rrdhost_find_by_guid(res.machine_guid); - if (host) { - // not all host must have RRDHOST struct created for them - // if they never connected during runtime of agent + if (likely(host)) { if (host == localhost) { node_state_update.live = 1; node_state_update.hops = 0; @@ -286,10 +286,9 @@ int create_node_instance_result(const char *msg, size_t msg_len) node_state_update.live = (!rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN)); node_state_update.hops = host->system_info->hops; } + node_state_update.capabilities = aclk_get_node_instance_capas(host); } - node_state_update.capabilities = aclk_get_node_instance_capas(host); - rrdhost_aclk_state_lock(localhost); node_state_update.claim_id = localhost->aclk_state.claimed_id; query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); @@ -341,25 +340,27 @@ int update_chart_configs(const char *msg, size_t msg_len) int start_alarm_streaming(const char *msg, size_t msg_len) { struct start_alarm_streaming res = parse_start_alarm_streaming(msg, msg_len); - if (!res.node_id || !res.batch_id) { + if (!res.node_id) { error("Error parsing StartAlarmStreaming"); - freez(res.node_id); return 1; } - aclk_start_alert_streaming(res.node_id, res.batch_id, res.start_seq_id); + aclk_start_alert_streaming(res.node_id, res.resets); freez(res.node_id); return 0; } -int send_alarm_log_health(const char *msg, size_t msg_len) +int send_alarm_checkpoint(const char *msg, size_t msg_len) { - char *node_id = parse_send_alarm_log_health(msg, msg_len); - if (!node_id) { - error("Error parsing SendAlarmLogHealth"); + struct send_alarm_checkpoint sac = parse_send_alarm_checkpoint(msg, msg_len); + if (!sac.node_id || !sac.claim_id) { + error("Error parsing SendAlarmCheckpoint"); + freez(sac.node_id); + freez(sac.claim_id); return 1; } - aclk_send_alarm_health_log(node_id); - freez(node_id); + aclk_send_alarm_checkpoint(sac.node_id, sac.claim_id); + freez(sac.node_id); + freez(sac.claim_id); return 0; } @@ -379,12 +380,12 @@ int send_alarm_configuration(const char *msg, size_t msg_len) int send_alarm_snapshot(const char *msg, size_t msg_len) { struct send_alarm_snapshot *sas = parse_send_alarm_snapshot(msg, msg_len); - if (!sas->node_id || !sas->claim_id) { + if (!sas->node_id || !sas->claim_id || !sas->snapshot_uuid) { error("Error parsing SendAlarmSnapshot"); destroy_send_alarm_snapshot(sas); return 1; } - aclk_process_send_alarm_snapshot(sas->node_id, sas->claim_id, sas->snapshot_id, sas->sequence_id); + aclk_process_send_alarm_snapshot(sas->node_id, sas->claim_id, sas->snapshot_uuid); destroy_send_alarm_snapshot(sas); return 0; } @@ -446,6 +447,23 @@ int stop_streaming_contexts(const char *msg, size_t msg_len) return 0; } +int cancel_pending_req(const char *msg, size_t msg_len) +{ + struct aclk_cancel_pending_req cmd; + if(parse_cancel_pending_req(msg, msg_len, &cmd)) { + error_report("Error parsing CancelPendingReq"); + return 1; + } + + log_access("ACLK CancelPendingRequest REQ: %s, cloud trace-id: %s", cmd.request_id, cmd.trace_id); + + if (mark_pending_req_cancelled(cmd.request_id)) + error_report("CancelPending Request for %s failed. No such pending request.", cmd.request_id); + + free_cancel_pending_req(&cmd); + return 0; +} + typedef struct { const char *name; simple_hash_t name_hash; @@ -460,12 +478,13 @@ new_cloud_rx_msg_t rx_msgs[] = { { .name = "ChartsAndDimensionsAck", .name_hash = 0, .fnc = charts_and_dimensions_ack }, { .name = "UpdateChartConfigs", .name_hash = 0, .fnc = update_chart_configs }, { .name = "StartAlarmStreaming", .name_hash = 0, .fnc = start_alarm_streaming }, - { .name = "SendAlarmLogHealth", .name_hash = 0, .fnc = send_alarm_log_health }, + { .name = "SendAlarmCheckpoint", .name_hash = 0, .fnc = send_alarm_checkpoint }, { .name = "SendAlarmConfiguration", .name_hash = 0, .fnc = send_alarm_configuration }, { .name = "SendAlarmSnapshot", .name_hash = 0, .fnc = send_alarm_snapshot }, { .name = "DisconnectReq", .name_hash = 0, .fnc = handle_disconnect_req }, { .name = "ContextsCheckpoint", .name_hash = 0, .fnc = contexts_checkpoint }, { .name = "StopStreamingContexts", .name_hash = 0, .fnc = stop_streaming_contexts }, + { .name = "CancelPendingRequest", .name_hash = 0, .fnc = cancel_pending_req }, { .name = NULL, .name_hash = 0, .fnc = NULL }, }; diff --git a/aclk/aclk_util.c b/aclk/aclk_util.c index ebf428ff9..7d03f97fd 100644 --- a/aclk/aclk_util.c +++ b/aclk/aclk_util.c @@ -120,10 +120,10 @@ struct topic_name { { .id = ACLK_TOPICID_CHART_RESET, .name = "reset-charts" }, { .id = ACLK_TOPICID_RETENTION_UPDATED, .name = "chart-retention-updated" }, { .id = ACLK_TOPICID_NODE_INFO, .name = "node-instance-info" }, - { .id = ACLK_TOPICID_ALARM_LOG, .name = "alarm-log" }, - { .id = ACLK_TOPICID_ALARM_HEALTH, .name = "alarm-health" }, + { .id = ACLK_TOPICID_ALARM_LOG, .name = "alarm-log-v2" }, + { .id = ACLK_TOPICID_ALARM_CHECKPOINT, .name = "alarm-checkpoint" }, { .id = ACLK_TOPICID_ALARM_CONFIG, .name = "alarm-config" }, - { .id = ACLK_TOPICID_ALARM_SNAPSHOT, .name = "alarm-snapshot" }, + { .id = ACLK_TOPICID_ALARM_SNAPSHOT, .name = "alarm-snapshot-v2" }, { .id = ACLK_TOPICID_NODE_COLLECTORS, .name = "node-instance-collectors" }, { .id = ACLK_TOPICID_CTXS_SNAPSHOT, .name = "contexts-snapshot" }, { .id = ACLK_TOPICID_CTXS_UPDATED, .name = "contexts-updated" }, @@ -146,7 +146,7 @@ enum aclk_topics compulsory_topics[] = { ACLK_TOPICID_RETENTION_UPDATED, ACLK_TOPICID_NODE_INFO, ACLK_TOPICID_ALARM_LOG, - ACLK_TOPICID_ALARM_HEALTH, + ACLK_TOPICID_ALARM_CHECKPOINT, ACLK_TOPICID_ALARM_CONFIG, ACLK_TOPICID_ALARM_SNAPSHOT, ACLK_TOPICID_NODE_COLLECTORS, diff --git a/aclk/aclk_util.h b/aclk/aclk_util.h index 76dc8cad9..6b7e4e9c2 100644 --- a/aclk/aclk_util.h +++ b/aclk/aclk_util.h @@ -85,7 +85,7 @@ enum aclk_topics { ACLK_TOPICID_RETENTION_UPDATED = 12, ACLK_TOPICID_NODE_INFO = 13, ACLK_TOPICID_ALARM_LOG = 14, - ACLK_TOPICID_ALARM_HEALTH = 15, + ACLK_TOPICID_ALARM_CHECKPOINT = 15, ACLK_TOPICID_ALARM_CONFIG = 16, ACLK_TOPICID_ALARM_SNAPSHOT = 17, ACLK_TOPICID_NODE_COLLECTORS = 18, diff --git a/aclk/schema-wrappers/agent_cmds.cc b/aclk/schema-wrappers/agent_cmds.cc new file mode 100644 index 000000000..6950f4025 --- /dev/null +++ b/aclk/schema-wrappers/agent_cmds.cc @@ -0,0 +1,38 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "proto/agent/v1/cmds.pb.h" + +#include "agent_cmds.h" + +#include "schema_wrapper_utils.h" + +using namespace agent::v1; + +int parse_cancel_pending_req(const char *msg, size_t msg_len, struct aclk_cancel_pending_req *req) +{ + CancelPendingRequest msg_parsed; + + if (!msg_parsed.ParseFromArray(msg, msg_len)) { + error_report("Failed to parse CancelPendingRequest message"); + return 1; + } + + if (msg_parsed.request_id().c_str() == NULL) { + error_report("CancelPendingRequest message missing request_id"); + return 1; + } + req->request_id = strdupz(msg_parsed.request_id().c_str()); + + if (msg_parsed.trace_id().c_str()) + req->trace_id = strdupz(msg_parsed.trace_id().c_str()); + + set_timeval_from_google_timestamp(msg_parsed.timestamp(), &req->timestamp); + + return 0; +} + +void free_cancel_pending_req(struct aclk_cancel_pending_req *req) +{ + freez(req->request_id); + freez(req->trace_id); +} diff --git a/aclk/schema-wrappers/agent_cmds.h b/aclk/schema-wrappers/agent_cmds.h new file mode 100644 index 000000000..7e01f86c5 --- /dev/null +++ b/aclk/schema-wrappers/agent_cmds.h @@ -0,0 +1,27 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_SCHEMA_WRAPPERS_AGENT_CMDS_H +#define ACLK_SCHEMA_WRAPPERS_AGENT_CMDS_H + +#include "libnetdata/libnetdata.h" + +#ifdef __cplusplus +extern "C" { +#endif + +struct aclk_cancel_pending_req { + char *request_id; + + struct timeval timestamp; + + char *trace_id; +}; + +int parse_cancel_pending_req(const char *msg, size_t msg_len, struct aclk_cancel_pending_req *req); +void free_cancel_pending_req(struct aclk_cancel_pending_req *req); + +#ifdef __cplusplus +} +#endif + +#endif /* ACLK_SCHEMA_WRAPPERS_AGENT_CMDS_H */ diff --git a/aclk/schema-wrappers/alarm_stream.cc b/aclk/schema-wrappers/alarm_stream.cc index f64393300..af0b891ca 100644 --- a/aclk/schema-wrappers/alarm_stream.cc +++ b/aclk/schema-wrappers/alarm_stream.cc @@ -21,57 +21,24 @@ struct start_alarm_streaming parse_start_alarm_streaming(const char *data, size_ return ret; ret.node_id = strdupz(msg.node_id().c_str()); - ret.batch_id = msg.batch_id(); - ret.start_seq_id = msg.start_sequnce_id(); + ret.resets = msg.resets(); return ret; } -char *parse_send_alarm_log_health(const char *data, size_t len) +struct send_alarm_checkpoint parse_send_alarm_checkpoint(const char *data, size_t len) { - SendAlarmLogHealth msg; - if (!msg.ParseFromArray(data, len)) - return NULL; - return strdupz(msg.node_id().c_str()); -} - -char *generate_alarm_log_health(size_t *len, struct alarm_log_health *data) -{ - AlarmLogHealth msg; - LogEntries *entries; - - msg.set_claim_id(data->claim_id); - msg.set_node_id(data->node_id); - msg.set_enabled(data->enabled); - - switch (data->status) { - case alarm_log_status_aclk::ALARM_LOG_STATUS_IDLE: - msg.set_status(alarms::v1::ALARM_LOG_STATUS_IDLE); - break; - case alarm_log_status_aclk::ALARM_LOG_STATUS_RUNNING: - msg.set_status(alarms::v1::ALARM_LOG_STATUS_RUNNING); - break; - case alarm_log_status_aclk::ALARM_LOG_STATUS_UNSPECIFIED: - msg.set_status(alarms::v1::ALARM_LOG_STATUS_UNSPECIFIED); - break; - default: - error("Unknown status of AlarmLogHealth LogEntry"); - return NULL; - } - - entries = msg.mutable_log_entries(); - entries->set_first_sequence_id(data->log_entries.first_seq_id); - entries->set_last_sequence_id(data->log_entries.last_seq_id); + struct send_alarm_checkpoint ret; + memset(&ret, 0, sizeof(ret)); - set_google_timestamp_from_timeval(data->log_entries.first_when, entries->mutable_first_when()); - set_google_timestamp_from_timeval(data->log_entries.last_when, entries->mutable_last_when()); + SendAlarmCheckpoint msg; + if (!msg.ParseFromArray(data, len)) + return ret; - *len = PROTO_COMPAT_MSG_SIZE(msg); - char *bin = (char*)mallocz(*len); - if (!msg.SerializeToArray(bin, *len)) - return NULL; + ret.node_id = strdupz(msg.node_id().c_str()); + ret.claim_id = strdupz(msg.claim_id().c_str()); - return bin; + return ret; } static alarms::v1::AlarmStatus aclk_alarm_status_to_proto(enum aclk_alarm_status status) @@ -131,8 +98,6 @@ static void fill_alarm_log_entry(struct alarm_log_entry *data, AlarmLogEntry *pr if (data->family) proto->set_family(data->family); - proto->set_batch_id(data->batch_id); - proto->set_sequence_id(data->sequence_id); proto->set_when(data->when); proto->set_config_hash(data->config_hash); @@ -187,6 +152,24 @@ char *generate_alarm_log_entry(size_t *len, struct alarm_log_entry *data) return bin; } +char *generate_alarm_checkpoint(size_t *len, struct alarm_checkpoint *data) +{ + AlarmCheckpoint msg; + + msg.set_claim_id(data->claim_id); + msg.set_node_id(data->node_id); + msg.set_checksum(data->checksum); + + *len = PROTO_COMPAT_MSG_SIZE(msg); + char *bin = (char*)mallocz(*len); + if (!msg.SerializeToArray(bin, *len)) { + freez(bin); + return NULL; + } + + return bin; +} + struct send_alarm_snapshot *parse_send_alarm_snapshot(const char *data, size_t len) { SendAlarmSnapshot msg; @@ -198,8 +181,8 @@ struct send_alarm_snapshot *parse_send_alarm_snapshot(const char *data, size_t l ret->claim_id = strdupz(msg.claim_id().c_str()); if (msg.node_id().c_str()) ret->node_id = strdupz(msg.node_id().c_str()); - ret->snapshot_id = msg.snapshot_id(); - ret->sequence_id = msg.sequence_id(); + if (msg.snapshot_uuid().c_str()) + ret->snapshot_uuid = strdupz(msg.snapshot_uuid().c_str()); return ret; } @@ -208,6 +191,7 @@ void destroy_send_alarm_snapshot(struct send_alarm_snapshot *ptr) { freez(ptr->claim_id); freez(ptr->node_id); + freez(ptr->snapshot_uuid); freez(ptr); } @@ -218,7 +202,7 @@ alarm_snapshot_proto_ptr_t generate_alarm_snapshot_proto(struct alarm_snapshot * msg->set_node_id(data->node_id); msg->set_claim_id(data->claim_id); - msg->set_snapshot_id(data->snapshot_id); + msg->set_snapshot_uuid(data->snapshot_uuid); msg->set_chunks(data->chunks); msg->set_chunk(data->chunk); diff --git a/aclk/schema-wrappers/alarm_stream.h b/aclk/schema-wrappers/alarm_stream.h index 63911da3f..83e7c1bce 100644 --- a/aclk/schema-wrappers/alarm_stream.h +++ b/aclk/schema-wrappers/alarm_stream.h @@ -11,38 +11,12 @@ extern "C" { #endif -enum alarm_log_status_aclk { - ALARM_LOG_STATUS_UNSPECIFIED = 0, - ALARM_LOG_STATUS_RUNNING = 1, - ALARM_LOG_STATUS_IDLE = 2 -}; - -struct alarm_log_entries { - int64_t first_seq_id; - struct timeval first_when; - - int64_t last_seq_id; - struct timeval last_when; -}; - -struct alarm_log_health { - char *claim_id; - char *node_id; - int enabled; - enum alarm_log_status_aclk status; - struct alarm_log_entries log_entries; -}; - struct start_alarm_streaming { char *node_id; - uint64_t batch_id; - uint64_t start_seq_id; + bool resets; }; struct start_alarm_streaming parse_start_alarm_streaming(const char *data, size_t len); -char *parse_send_alarm_log_health(const char *data, size_t len); - -char *generate_alarm_log_health(size_t *len, struct alarm_log_health *data); enum aclk_alarm_status { ALARM_STATUS_NULL = 0, @@ -101,17 +75,27 @@ struct alarm_log_entry { char *chart_context; }; +struct send_alarm_checkpoint { + char *node_id; + char *claim_id; +}; + +struct alarm_checkpoint { + char *node_id; + char *claim_id; + char *checksum; +}; + struct send_alarm_snapshot { char *node_id; char *claim_id; - uint64_t snapshot_id; - uint64_t sequence_id; + char *snapshot_uuid; }; struct alarm_snapshot { char *node_id; char *claim_id; - uint64_t snapshot_id; + char *snapshot_uuid; uint32_t chunks; uint32_t chunk; }; @@ -125,6 +109,9 @@ char *generate_alarm_log_entry(size_t *len, struct alarm_log_entry *data); struct send_alarm_snapshot *parse_send_alarm_snapshot(const char *data, size_t len); void destroy_send_alarm_snapshot(struct send_alarm_snapshot *ptr); +struct send_alarm_checkpoint parse_send_alarm_checkpoint(const char *data, size_t len); +char *generate_alarm_checkpoint(size_t *len, struct alarm_checkpoint *data); + alarm_snapshot_proto_ptr_t generate_alarm_snapshot_proto(struct alarm_snapshot *data); void add_alarm_log_entry2snapshot(alarm_snapshot_proto_ptr_t snapshot, struct alarm_log_entry *data); char *generate_alarm_snapshot_bin(size_t *len, alarm_snapshot_proto_ptr_t snapshot); diff --git a/aclk/schema-wrappers/proto_2_json.cc b/aclk/schema-wrappers/proto_2_json.cc index 8853b2e08..854396510 100644 --- a/aclk/schema-wrappers/proto_2_json.cc +++ b/aclk/schema-wrappers/proto_2_json.cc @@ -11,6 +11,7 @@ #include "proto/nodeinstance/info/v1/info.pb.h" #include "proto/context/v1/stream.pb.h" #include "proto/context/v1/context.pb.h" +#include "proto/agent/v1/cmds.pb.h" #include "libnetdata/libnetdata.h" @@ -29,8 +30,8 @@ static google::protobuf::Message *msg_name_to_protomsg(const char *msgname) return new nodeinstance::create::v1::CreateNodeInstance; if (!strcmp(msgname, "UpdateNodeInfo")) return new nodeinstance::info::v1::UpdateNodeInfo; - if (!strcmp(msgname, "AlarmLogHealth")) - return new alarms::v1::AlarmLogHealth; + if (!strcmp(msgname, "AlarmCheckpoint")) + return new alarms::v1::AlarmCheckpoint; if (!strcmp(msgname, "ProvideAlarmConfiguration")) return new alarms::v1::ProvideAlarmConfiguration; if (!strcmp(msgname, "AlarmSnapshot")) @@ -51,8 +52,8 @@ static google::protobuf::Message *msg_name_to_protomsg(const char *msgname) return new agent::v1::SendNodeInstances; if (!strcmp(msgname, "StartAlarmStreaming")) return new alarms::v1::StartAlarmStreaming; - if (!strcmp(msgname, "SendAlarmLogHealth")) - return new alarms::v1::SendAlarmLogHealth; + if (!strcmp(msgname, "SendAlarmCheckpoint")) + return new alarms::v1::SendAlarmCheckpoint; if (!strcmp(msgname, "SendAlarmConfiguration")) return new alarms::v1::SendAlarmConfiguration; if (!strcmp(msgname, "SendAlarmSnapshot")) @@ -63,6 +64,8 @@ static google::protobuf::Message *msg_name_to_protomsg(const char *msgname) return new context::v1::ContextsCheckpoint; if (!strcmp(msgname, "StopStreamingContexts")) return new context::v1::StopStreamingContexts; + if (!strcmp(msgname, "CancelPendingRequest")) + return new agent::v1::CancelPendingRequest; return NULL; } diff --git a/aclk/schema-wrappers/schema_wrappers.h b/aclk/schema-wrappers/schema_wrappers.h index a96f7ea7a..b651b8845 100644 --- a/aclk/schema-wrappers/schema_wrappers.h +++ b/aclk/schema-wrappers/schema_wrappers.h @@ -14,5 +14,6 @@ #include "capability.h" #include "context_stream.h" #include "context.h" +#include "agent_cmds.h" #endif /* SCHEMA_WRAPPERS_H */ |