summaryrefslogtreecommitdiffstats
path: root/aclk/aclk_rx_msgs.c
diff options
context:
space:
mode:
Diffstat (limited to 'aclk/aclk_rx_msgs.c')
-rw-r--r--aclk/aclk_rx_msgs.c84
1 files changed, 65 insertions, 19 deletions
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c
index 27f1bf2d..e6ed332c 100644
--- a/aclk/aclk_rx_msgs.c
+++ b/aclk/aclk_rx_msgs.c
@@ -6,6 +6,8 @@
#include "aclk_query_queue.h"
#include "aclk.h"
+#include "schema-wrappers/proto_2_json.h"
+
#define ACLK_V2_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A"
#define ACLK_CLOUD_REQ_V2_PREFIX "GET /"
@@ -55,19 +57,19 @@ static int cloud_to_agent_parse(JSON_ENTRY *e)
break;
case JSON_NUMBER:
if (!strcmp(e->name, "version")) {
- data->version = e->data.number;
+ data->version = (int)e->data.number;
break;
}
if (!strcmp(e->name, "timeout")) {
- data->timeout = e->data.number;
+ data->timeout = (int)e->data.number;
break;
}
if (!strcmp(e->name, "min-version")) {
- data->min_version = e->data.number;
+ data->min_version = (int)e->data.number;
break;
}
if (!strcmp(e->name, "max-version")) {
- data->max_version = e->data.number;
+ data->max_version = (int)e->data.number;
break;
}
@@ -116,20 +118,8 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur
return 0;
}
-#define HTTP_CHECK_AGENT_INITIALIZED() ACLK_SHARED_STATE_LOCK;\
- if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {\
- debug(D_ACLK, "Ignoring \"http\" cloud request; agent not in stable state");\
- ACLK_SHARED_STATE_UNLOCK;\
- return 1;\
- }\
- ACLK_SHARED_STATE_UNLOCK;
-
static int aclk_handle_cloud_http_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload)
{
- if (!aclk_use_new_cloud_arch) {
- HTTP_CHECK_AGENT_INITIALIZED();
- }
-
aclk_query_t query;
errno = 0;
@@ -229,7 +219,6 @@ err_cleanup:
return 1;
}
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
typedef uint32_t simple_hash_t;
typedef int(*rx_msg_handler)(const char *msg, size_t msg_len);
@@ -300,6 +289,15 @@ int create_node_instance_result(const char *msg, size_t msg_len)
}
}
+ struct capability caps[] = {
+ { .name = "proto", .version = 1, .enabled = 1 },
+ { .name = "ml", .version = ml_capable(localhost), .enabled = host ? ml_enabled(host) : 0 },
+ { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
+ { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled },
+ { .name = NULL, .version = 0, .enabled = 0 }
+ };
+ node_state_update.capabilities = caps;
+
rrdhost_aclk_state_lock(localhost);
node_state_update.claim_id = localhost->aclk_state.claimed_id;
query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
@@ -324,6 +322,7 @@ int send_node_instances(const char *msg, size_t msg_len)
int stream_charts_and_dimensions(const char *msg, size_t msg_len)
{
+ aclk_ctx_based = 0;
stream_charts_and_dims_t res = parse_stream_charts_and_dims(msg, msg_len);
if (!res.claim_id || !res.node_id) {
error("Error parsing StreamChartsAndDimensions msg");
@@ -437,6 +436,41 @@ int handle_disconnect_req(const char *msg, size_t msg_len)
return 0;
}
+int contexts_checkpoint(const char *msg, size_t msg_len)
+{
+ aclk_ctx_based = 1;
+
+ struct ctxs_checkpoint *cmd = parse_ctxs_checkpoint(msg, msg_len);
+ if (!cmd)
+ return 1;
+
+ rrdcontext_hub_checkpoint_command(cmd);
+
+ freez(cmd->claim_id);
+ freez(cmd->node_id);
+ freez(cmd);
+ return 0;
+}
+
+int stop_streaming_contexts(const char *msg, size_t msg_len)
+{
+ if (!aclk_ctx_based) {
+ error_report("Received StopStreamingContexts message but context based communication was not enabled (Cloud violated the protocol). Ignoring message");
+ return 1;
+ }
+
+ struct stop_streaming_ctxs *cmd = parse_stop_streaming_ctxs(msg, msg_len);
+ if (!cmd)
+ return 1;
+
+ rrdcontext_hub_stop_streaming_command(cmd);
+
+ freez(cmd->claim_id);
+ freez(cmd->node_id);
+ freez(cmd);
+ return 0;
+}
+
typedef struct {
const char *name;
simple_hash_t name_hash;
@@ -455,6 +489,8 @@ new_cloud_rx_msg_t rx_msgs[] = {
{ .name = "SendAlarmConfiguration", .name_hash = 0, .fnc = send_alarm_configuration },
{ .name = "SendAlarmSnapshot", .name_hash = 0, .fnc = send_alarm_snapshot },
{ .name = "DisconnectReq", .name_hash = 0, .fnc = handle_disconnect_req },
+ { .name = "ContextsCheckpoint", .name_hash = 0, .fnc = contexts_checkpoint },
+ { .name = "StopStreamingContexts", .name_hash = 0, .fnc = stop_streaming_contexts },
{ .name = NULL, .name_hash = 0, .fnc = NULL },
};
@@ -491,7 +527,7 @@ unsigned int aclk_init_rx_msg_handlers(void)
return i;
}
-void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len)
+void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len, const char *topic)
{
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
@@ -509,6 +545,17 @@ void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t
}
return;
}
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ if (!strncmp(message_type, "cmd", strlen("cmd"))) {
+ log_aclk_message_bin(msg, msg_len, 0, topic, msg_descriptor->name);
+ } else {
+ char *json = protomsg_to_json(msg, msg_len, msg_descriptor->name);
+ log_aclk_message_bin(json, strlen(json), 0, topic, msg_descriptor->name);
+ freez(json);
+ }
+#endif
+
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
aclk_proto_rx_msgs_sample[msg_descriptor-rx_msgs]++;
@@ -524,4 +571,3 @@ void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t
return;
}
}
-#endif