summaryrefslogtreecommitdiffstats
path: root/aclk/aclk_rx_msgs.c
diff options
context:
space:
mode:
Diffstat (limited to 'aclk/aclk_rx_msgs.c')
-rw-r--r--aclk/aclk_rx_msgs.c193
1 files changed, 189 insertions, 4 deletions
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c
index 3d3ab5e2..e7ce932e 100644
--- a/aclk/aclk_rx_msgs.c
+++ b/aclk/aclk_rx_msgs.c
@@ -4,9 +4,12 @@
#include "aclk_stats.h"
#include "aclk_query_queue.h"
+#include "aclk.h"
#define ACLK_V2_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A"
-#define ACLK_CLOUD_REQ_V2_PREFIX "GET /api/v1/"
+#define ACLK_CLOUD_REQ_V2_PREFIX "GET /"
+
+#define ACLK_V_COMPRESSION 2
struct aclk_request {
char *type_id;
@@ -18,7 +21,7 @@ struct aclk_request {
int max_version;
};
-int cloud_to_agent_parse(JSON_ENTRY *e)
+static int cloud_to_agent_parse(JSON_ENTRY *e)
{
struct aclk_request *data = e->callback_data;
@@ -88,6 +91,7 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur
{
const char *start, *end;
+ // TODO better check of URL
if(strncmp(payload, ACLK_CLOUD_REQ_V2_PREFIX, strlen(ACLK_CLOUD_REQ_V2_PREFIX))) {
errno = 0;
error("Only accepting requests that start with \"%s\" from CLOUD.", ACLK_CLOUD_REQ_V2_PREFIX);
@@ -108,7 +112,7 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur
}
#define HTTP_CHECK_AGENT_INITIALIZED() ACLK_SHARED_STATE_LOCK;\
- if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {\
+ if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {\
debug(D_ACLK, "Ignoring \"http\" cloud request; agent not in stable state");\
ACLK_SHARED_STATE_UNLOCK;\
return 1;\
@@ -117,7 +121,9 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur
static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload)
{
- HTTP_CHECK_AGENT_INITIALIZED();
+ if (!aclk_use_new_cloud_arch) {
+ HTTP_CHECK_AGENT_INITIALIZED();
+ }
aclk_query_t query;
@@ -253,3 +259,182 @@ err_cleanup_nojson:
return 1;
}
+
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len)
+{
+ // TODO do the look up table with hashes to optimize when there are more
+ // than few
+ if (!strcmp(message_type, "cmd")) {
+ // msg is binary payload in all other cases
+ // however in this message from old legacy cloud
+ // we have to convert it to C string
+ char *str = mallocz(msg_len+1);
+ memcpy(str, msg, msg_len);
+ str[msg_len] = 0;
+ aclk_handle_cloud_message(str);
+ freez(str);
+ return;
+ }
+ if (!strcmp(message_type, "CreateNodeInstanceResult")) {
+ node_instance_creation_result_t res = parse_create_node_instance_result(msg, msg_len);
+ if (!res.machine_guid || !res.node_id) {
+ error_report("Error parsing CreateNodeInstanceResult");
+ freez(res.machine_guid);
+ freez(res.node_id);
+ return;
+ }
+
+ debug(D_ACLK, "CreateNodeInstanceResult: guid:%s nodeid:%s", res.machine_guid, res.node_id);
+
+ uuid_t host_id, node_id;
+ if (uuid_parse(res.machine_guid, host_id)) {
+ error("Error parsing machine_guid provided by CreateNodeInstanceResult");
+ freez(res.machine_guid);
+ freez(res.node_id);
+ return;
+ }
+ if (uuid_parse(res.node_id, node_id)) {
+ error("Error parsing node_id provided by CreateNodeInstanceResult");
+ freez(res.machine_guid);
+ freez(res.node_id);
+ return;
+ }
+ update_node_id(&host_id, &node_id);
+
+ aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
+ query->data.node_update.hops = 1; //TODO - real hop count instead of hardcoded
+ rrdhost_aclk_state_lock(localhost);
+ query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ rrdhost_aclk_state_unlock(localhost);
+
+ RRDHOST *host = rrdhost_find_by_guid(res.machine_guid, 0);
+ query->data.node_update.live = 0;
+
+ if (host) {
+ // not all host must have RRDHOST struct created for them
+ // if they never connected during runtime of agent
+ if (host == localhost) {
+ query->data.node_update.live = 1;
+ query->data.node_update.hops = 0;
+ } else {
+ netdata_mutex_lock(&host->receiver_lock);
+ query->data.node_update.live = (host->receiver != NULL);
+ netdata_mutex_unlock(&host->receiver_lock);
+ query->data.node_update.hops = host->system_info->hops;
+ }
+ }
+
+ query->data.node_update.node_id = res.node_id; // aclk_query_free will free it
+ query->data.node_update.queryable = 1;
+ query->data.node_update.session_id = aclk_session_newarch;
+ aclk_queue_query(query);
+ freez(res.machine_guid);
+ return;
+ }
+ if (!strcmp(message_type, "SendNodeInstances")) {
+ debug(D_ACLK, "Got SendNodeInstances");
+ aclk_send_node_instances();
+ return;
+ }
+
+ if (!strcmp(message_type, "StreamChartsAndDimensions")) {
+ stream_charts_and_dims_t res = parse_stream_charts_and_dims(msg, msg_len);
+ if (!res.claim_id || !res.node_id) {
+ error("Error parsing StreamChartsAndDimensions msg");
+ freez(res.claim_id);
+ freez(res.node_id);
+ return;
+ }
+ chart_batch_id = res.batch_id;
+ aclk_start_streaming(res.node_id, res.seq_id, res.seq_id_created_at.tv_sec, res.batch_id);
+ freez(res.claim_id);
+ freez(res.node_id);
+ return;
+ }
+ if (!strcmp(message_type, "ChartsAndDimensionsAck")) {
+ chart_and_dim_ack_t res = parse_chart_and_dimensions_ack(msg, msg_len);
+ if (!res.claim_id || !res.node_id) {
+ error("Error parsing StreamChartsAndDimensions msg");
+ freez(res.claim_id);
+ freez(res.node_id);
+ return;
+ }
+ aclk_ack_chart_sequence_id(res.node_id, res.last_seq_id);
+ freez(res.claim_id);
+ freez(res.node_id);
+ return;
+ }
+ if (!strcmp(message_type, "UpdateChartConfigs")) {
+ struct update_chart_config res = parse_update_chart_config(msg, msg_len);
+ if (!res.claim_id || !res.node_id || !res.hashes)
+ error("Error parsing UpdateChartConfigs msg");
+ else
+ aclk_get_chart_config(res.hashes);
+ destroy_update_chart_config(&res);
+ return;
+ }
+ if (!strcmp(message_type, "StartAlarmStreaming")) {
+ struct start_alarm_streaming res = parse_start_alarm_streaming(msg, msg_len);
+ if (!res.node_id || !res.batch_id) {
+ error("Error parsing StartAlarmStreaming");
+ freez(res.node_id);
+ return;
+ }
+ aclk_start_alert_streaming(res.node_id, res.batch_id, res.start_seq_id);
+ freez(res.node_id);
+ return;
+ }
+ if (!strcmp(message_type, "SendAlarmLogHealth")) {
+ char *node_id = parse_send_alarm_log_health(msg, msg_len);
+ if (!node_id) {
+ error("Error parsing SendAlarmLogHealth");
+ return;
+ }
+ aclk_send_alarm_health_log(node_id);
+ freez(node_id);
+ return;
+ }
+ if (!strcmp(message_type, "SendAlarmConfiguration")) {
+ char *config_hash = parse_send_alarm_configuration(msg, msg_len);
+ if (!config_hash || !*config_hash) {
+ error("Error parsing SendAlarmConfiguration");
+ freez(config_hash);
+ return;
+ }
+ aclk_send_alarm_configuration(config_hash);
+ freez(config_hash);
+ return;
+ }
+ if (!strcmp(message_type, "SendAlarmSnapshot")) {
+ struct send_alarm_snapshot *sas = parse_send_alarm_snapshot(msg, msg_len);
+ if (!sas->node_id || !sas->claim_id) {
+ error("Error parsing SendAlarmSnapshot");
+ destroy_send_alarm_snapshot(sas);
+ return;
+ }
+ aclk_process_send_alarm_snapshot(sas->node_id, sas->claim_id, sas->snapshot_id, sas->sequence_id);
+ destroy_send_alarm_snapshot(sas);
+ return;
+ }
+ if (!strcmp(message_type, "DisconnectReq")) {
+ struct disconnect_cmd *cmd = parse_disconnect_cmd(msg, msg_len);
+ if (!cmd)
+ return;
+ if (cmd->permaban) {
+ error ("Cloud Banned This Agent!");
+ aclk_disable_runtime = 1;
+ }
+ info ("Cloud requested disconnect (EC=%u, \"%s\")", (unsigned int)cmd->error_code, cmd->error_description);
+ if (cmd->reconnect_after_s > 0) {
+ aclk_block_until = now_monotonic_sec() + cmd->reconnect_after_s;
+ info ("Cloud asks not to reconnect for %u seconds. We shall honor that request", (unsigned int)cmd->reconnect_after_s);
+ }
+ disconnect_req = 1;
+ freez(cmd->error_description);
+ freez(cmd);
+ return;
+ }
+ error ("Unknown new cloud arch message type received \"%s\"", message_type);
+}
+#endif