summaryrefslogtreecommitdiffstats
path: root/aclk/aclk_rx_msgs.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2023-05-08 16:27:04 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2023-05-08 16:27:04 +0000
commita836a244a3d2bdd4da1ee2641e3e957850668cea (patch)
treecb87c75b3677fab7144f868435243f864048a1e6 /aclk/aclk_rx_msgs.c
parentAdding upstream version 1.38.1. (diff)
downloadnetdata-a836a244a3d2bdd4da1ee2641e3e957850668cea.tar.xz
netdata-a836a244a3d2bdd4da1ee2641e3e957850668cea.zip
Adding upstream version 1.39.0.upstream/1.39.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'aclk/aclk_rx_msgs.c')
-rw-r--r--aclk/aclk_rx_msgs.c55
1 files changed, 37 insertions, 18 deletions
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c
index 104fbcb3..60bff9ba 100644
--- a/aclk/aclk_rx_msgs.c
+++ b/aclk/aclk_rx_msgs.c
@@ -6,6 +6,7 @@
#include "aclk_query_queue.h"
#include "aclk.h"
#include "aclk_capas.h"
+#include "aclk_query.h"
#include "schema-wrappers/proto_2_json.h"
@@ -272,13 +273,12 @@ int create_node_instance_result(const char *msg, size_t msg_len)
.live = 0,
.queryable = 1,
.session_id = aclk_session_newarch,
- .node_id = res.node_id
+ .node_id = res.node_id,
+ .capabilities = NULL
};
RRDHOST *host = rrdhost_find_by_guid(res.machine_guid);
- if (host) {
- // not all host must have RRDHOST struct created for them
- // if they never connected during runtime of agent
+ if (likely(host)) {
if (host == localhost) {
node_state_update.live = 1;
node_state_update.hops = 0;
@@ -286,10 +286,9 @@ int create_node_instance_result(const char *msg, size_t msg_len)
node_state_update.live = (!rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN));
node_state_update.hops = host->system_info->hops;
}
+ node_state_update.capabilities = aclk_get_node_instance_capas(host);
}
- node_state_update.capabilities = aclk_get_node_instance_capas(host);
-
rrdhost_aclk_state_lock(localhost);
node_state_update.claim_id = localhost->aclk_state.claimed_id;
query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
@@ -341,25 +340,27 @@ int update_chart_configs(const char *msg, size_t msg_len)
int start_alarm_streaming(const char *msg, size_t msg_len)
{
struct start_alarm_streaming res = parse_start_alarm_streaming(msg, msg_len);
- if (!res.node_id || !res.batch_id) {
+ if (!res.node_id) {
error("Error parsing StartAlarmStreaming");
- freez(res.node_id);
return 1;
}
- aclk_start_alert_streaming(res.node_id, res.batch_id, res.start_seq_id);
+ aclk_start_alert_streaming(res.node_id, res.resets);
freez(res.node_id);
return 0;
}
-int send_alarm_log_health(const char *msg, size_t msg_len)
+int send_alarm_checkpoint(const char *msg, size_t msg_len)
{
- char *node_id = parse_send_alarm_log_health(msg, msg_len);
- if (!node_id) {
- error("Error parsing SendAlarmLogHealth");
+ struct send_alarm_checkpoint sac = parse_send_alarm_checkpoint(msg, msg_len);
+ if (!sac.node_id || !sac.claim_id) {
+ error("Error parsing SendAlarmCheckpoint");
+ freez(sac.node_id);
+ freez(sac.claim_id);
return 1;
}
- aclk_send_alarm_health_log(node_id);
- freez(node_id);
+ aclk_send_alarm_checkpoint(sac.node_id, sac.claim_id);
+ freez(sac.node_id);
+ freez(sac.claim_id);
return 0;
}
@@ -379,12 +380,12 @@ int send_alarm_configuration(const char *msg, size_t msg_len)
int send_alarm_snapshot(const char *msg, size_t msg_len)
{
struct send_alarm_snapshot *sas = parse_send_alarm_snapshot(msg, msg_len);
- if (!sas->node_id || !sas->claim_id) {
+ if (!sas->node_id || !sas->claim_id || !sas->snapshot_uuid) {
error("Error parsing SendAlarmSnapshot");
destroy_send_alarm_snapshot(sas);
return 1;
}
- aclk_process_send_alarm_snapshot(sas->node_id, sas->claim_id, sas->snapshot_id, sas->sequence_id);
+ aclk_process_send_alarm_snapshot(sas->node_id, sas->claim_id, sas->snapshot_uuid);
destroy_send_alarm_snapshot(sas);
return 0;
}
@@ -446,6 +447,23 @@ int stop_streaming_contexts(const char *msg, size_t msg_len)
return 0;
}
+int cancel_pending_req(const char *msg, size_t msg_len)
+{
+ struct aclk_cancel_pending_req cmd;
+ if(parse_cancel_pending_req(msg, msg_len, &cmd)) {
+ error_report("Error parsing CancelPendingReq");
+ return 1;
+ }
+
+ log_access("ACLK CancelPendingRequest REQ: %s, cloud trace-id: %s", cmd.request_id, cmd.trace_id);
+
+ if (mark_pending_req_cancelled(cmd.request_id))
+ error_report("CancelPending Request for %s failed. No such pending request.", cmd.request_id);
+
+ free_cancel_pending_req(&cmd);
+ return 0;
+}
+
typedef struct {
const char *name;
simple_hash_t name_hash;
@@ -460,12 +478,13 @@ new_cloud_rx_msg_t rx_msgs[] = {
{ .name = "ChartsAndDimensionsAck", .name_hash = 0, .fnc = charts_and_dimensions_ack },
{ .name = "UpdateChartConfigs", .name_hash = 0, .fnc = update_chart_configs },
{ .name = "StartAlarmStreaming", .name_hash = 0, .fnc = start_alarm_streaming },
- { .name = "SendAlarmLogHealth", .name_hash = 0, .fnc = send_alarm_log_health },
+ { .name = "SendAlarmCheckpoint", .name_hash = 0, .fnc = send_alarm_checkpoint },
{ .name = "SendAlarmConfiguration", .name_hash = 0, .fnc = send_alarm_configuration },
{ .name = "SendAlarmSnapshot", .name_hash = 0, .fnc = send_alarm_snapshot },
{ .name = "DisconnectReq", .name_hash = 0, .fnc = handle_disconnect_req },
{ .name = "ContextsCheckpoint", .name_hash = 0, .fnc = contexts_checkpoint },
{ .name = "StopStreamingContexts", .name_hash = 0, .fnc = stop_streaming_contexts },
+ { .name = "CancelPendingRequest", .name_hash = 0, .fnc = cancel_pending_req },
{ .name = NULL, .name_hash = 0, .fnc = NULL },
};