diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-05-08 16:27:04 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-05-08 16:27:04 +0000 |
commit | a836a244a3d2bdd4da1ee2641e3e957850668cea (patch) | |
tree | cb87c75b3677fab7144f868435243f864048a1e6 /aclk/aclk_rx_msgs.c | |
parent | Adding upstream version 1.38.1. (diff) | |
download | netdata-a836a244a3d2bdd4da1ee2641e3e957850668cea.tar.xz netdata-a836a244a3d2bdd4da1ee2641e3e957850668cea.zip |
Adding upstream version 1.39.0.upstream/1.39.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'aclk/aclk_rx_msgs.c')
-rw-r--r-- | aclk/aclk_rx_msgs.c | 55 |
1 files changed, 37 insertions, 18 deletions
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index 104fbcb3e..60bff9ba1 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -6,6 +6,7 @@ #include "aclk_query_queue.h" #include "aclk.h" #include "aclk_capas.h" +#include "aclk_query.h" #include "schema-wrappers/proto_2_json.h" @@ -272,13 +273,12 @@ int create_node_instance_result(const char *msg, size_t msg_len) .live = 0, .queryable = 1, .session_id = aclk_session_newarch, - .node_id = res.node_id + .node_id = res.node_id, + .capabilities = NULL }; RRDHOST *host = rrdhost_find_by_guid(res.machine_guid); - if (host) { - // not all host must have RRDHOST struct created for them - // if they never connected during runtime of agent + if (likely(host)) { if (host == localhost) { node_state_update.live = 1; node_state_update.hops = 0; @@ -286,10 +286,9 @@ int create_node_instance_result(const char *msg, size_t msg_len) node_state_update.live = (!rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN)); node_state_update.hops = host->system_info->hops; } + node_state_update.capabilities = aclk_get_node_instance_capas(host); } - node_state_update.capabilities = aclk_get_node_instance_capas(host); - rrdhost_aclk_state_lock(localhost); node_state_update.claim_id = localhost->aclk_state.claimed_id; query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); @@ -341,25 +340,27 @@ int update_chart_configs(const char *msg, size_t msg_len) int start_alarm_streaming(const char *msg, size_t msg_len) { struct start_alarm_streaming res = parse_start_alarm_streaming(msg, msg_len); - if (!res.node_id || !res.batch_id) { + if (!res.node_id) { error("Error parsing StartAlarmStreaming"); - freez(res.node_id); return 1; } - aclk_start_alert_streaming(res.node_id, res.batch_id, res.start_seq_id); + aclk_start_alert_streaming(res.node_id, res.resets); freez(res.node_id); return 0; } -int send_alarm_log_health(const char *msg, size_t msg_len) +int send_alarm_checkpoint(const char *msg, size_t msg_len) { - char *node_id = parse_send_alarm_log_health(msg, msg_len); - if (!node_id) { - error("Error parsing SendAlarmLogHealth"); + struct send_alarm_checkpoint sac = parse_send_alarm_checkpoint(msg, msg_len); + if (!sac.node_id || !sac.claim_id) { + error("Error parsing SendAlarmCheckpoint"); + freez(sac.node_id); + freez(sac.claim_id); return 1; } - aclk_send_alarm_health_log(node_id); - freez(node_id); + aclk_send_alarm_checkpoint(sac.node_id, sac.claim_id); + freez(sac.node_id); + freez(sac.claim_id); return 0; } @@ -379,12 +380,12 @@ int send_alarm_configuration(const char *msg, size_t msg_len) int send_alarm_snapshot(const char *msg, size_t msg_len) { struct send_alarm_snapshot *sas = parse_send_alarm_snapshot(msg, msg_len); - if (!sas->node_id || !sas->claim_id) { + if (!sas->node_id || !sas->claim_id || !sas->snapshot_uuid) { error("Error parsing SendAlarmSnapshot"); destroy_send_alarm_snapshot(sas); return 1; } - aclk_process_send_alarm_snapshot(sas->node_id, sas->claim_id, sas->snapshot_id, sas->sequence_id); + aclk_process_send_alarm_snapshot(sas->node_id, sas->claim_id, sas->snapshot_uuid); destroy_send_alarm_snapshot(sas); return 0; } @@ -446,6 +447,23 @@ int stop_streaming_contexts(const char *msg, size_t msg_len) return 0; } +int cancel_pending_req(const char *msg, size_t msg_len) +{ + struct aclk_cancel_pending_req cmd; + if(parse_cancel_pending_req(msg, msg_len, &cmd)) { + error_report("Error parsing CancelPendingReq"); + return 1; + } + + log_access("ACLK CancelPendingRequest REQ: %s, cloud trace-id: %s", cmd.request_id, cmd.trace_id); + + if (mark_pending_req_cancelled(cmd.request_id)) + error_report("CancelPending Request for %s failed. No such pending request.", cmd.request_id); + + free_cancel_pending_req(&cmd); + return 0; +} + typedef struct { const char *name; simple_hash_t name_hash; @@ -460,12 +478,13 @@ new_cloud_rx_msg_t rx_msgs[] = { { .name = "ChartsAndDimensionsAck", .name_hash = 0, .fnc = charts_and_dimensions_ack }, { .name = "UpdateChartConfigs", .name_hash = 0, .fnc = update_chart_configs }, { .name = "StartAlarmStreaming", .name_hash = 0, .fnc = start_alarm_streaming }, - { .name = "SendAlarmLogHealth", .name_hash = 0, .fnc = send_alarm_log_health }, + { .name = "SendAlarmCheckpoint", .name_hash = 0, .fnc = send_alarm_checkpoint }, { .name = "SendAlarmConfiguration", .name_hash = 0, .fnc = send_alarm_configuration }, { .name = "SendAlarmSnapshot", .name_hash = 0, .fnc = send_alarm_snapshot }, { .name = "DisconnectReq", .name_hash = 0, .fnc = handle_disconnect_req }, { .name = "ContextsCheckpoint", .name_hash = 0, .fnc = contexts_checkpoint }, { .name = "StopStreamingContexts", .name_hash = 0, .fnc = stop_streaming_contexts }, + { .name = "CancelPendingRequest", .name_hash = 0, .fnc = cancel_pending_req }, { .name = NULL, .name_hash = 0, .fnc = NULL }, }; |