diff options
Diffstat (limited to 'aclk/aclk_rx_msgs.c')
-rw-r--r-- | aclk/aclk_rx_msgs.c | 193 |
1 files changed, 189 insertions, 4 deletions
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index 3d3ab5e2..e7ce932e 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -4,9 +4,12 @@ #include "aclk_stats.h" #include "aclk_query_queue.h" +#include "aclk.h" #define ACLK_V2_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A" -#define ACLK_CLOUD_REQ_V2_PREFIX "GET /api/v1/" +#define ACLK_CLOUD_REQ_V2_PREFIX "GET /" + +#define ACLK_V_COMPRESSION 2 struct aclk_request { char *type_id; @@ -18,7 +21,7 @@ struct aclk_request { int max_version; }; -int cloud_to_agent_parse(JSON_ENTRY *e) +static int cloud_to_agent_parse(JSON_ENTRY *e) { struct aclk_request *data = e->callback_data; @@ -88,6 +91,7 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur { const char *start, *end; + // TODO better check of URL if(strncmp(payload, ACLK_CLOUD_REQ_V2_PREFIX, strlen(ACLK_CLOUD_REQ_V2_PREFIX))) { errno = 0; error("Only accepting requests that start with \"%s\" from CLOUD.", ACLK_CLOUD_REQ_V2_PREFIX); @@ -108,7 +112,7 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur } #define HTTP_CHECK_AGENT_INITIALIZED() ACLK_SHARED_STATE_LOCK;\ - if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {\ + if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {\ debug(D_ACLK, "Ignoring \"http\" cloud request; agent not in stable state");\ ACLK_SHARED_STATE_UNLOCK;\ return 1;\ @@ -117,7 +121,9 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload) { - HTTP_CHECK_AGENT_INITIALIZED(); + if (!aclk_use_new_cloud_arch) { + HTTP_CHECK_AGENT_INITIALIZED(); + } aclk_query_t query; @@ -253,3 +259,182 @@ err_cleanup_nojson: return 1; } + +#ifdef ENABLE_NEW_CLOUD_PROTOCOL +void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len) +{ + // TODO do the look up table with hashes to optimize when there are more + // than few + if (!strcmp(message_type, "cmd")) { + // msg is binary payload in all other cases + // however in this message from old legacy cloud + // we have to convert it to C string + char *str = mallocz(msg_len+1); + memcpy(str, msg, msg_len); + str[msg_len] = 0; + aclk_handle_cloud_message(str); + freez(str); + return; + } + if (!strcmp(message_type, "CreateNodeInstanceResult")) { + node_instance_creation_result_t res = parse_create_node_instance_result(msg, msg_len); + if (!res.machine_guid || !res.node_id) { + error_report("Error parsing CreateNodeInstanceResult"); + freez(res.machine_guid); + freez(res.node_id); + return; + } + + debug(D_ACLK, "CreateNodeInstanceResult: guid:%s nodeid:%s", res.machine_guid, res.node_id); + + uuid_t host_id, node_id; + if (uuid_parse(res.machine_guid, host_id)) { + error("Error parsing machine_guid provided by CreateNodeInstanceResult"); + freez(res.machine_guid); + freez(res.node_id); + return; + } + if (uuid_parse(res.node_id, node_id)) { + error("Error parsing node_id provided by CreateNodeInstanceResult"); + freez(res.machine_guid); + freez(res.node_id); + return; + } + update_node_id(&host_id, &node_id); + + aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); + query->data.node_update.hops = 1; //TODO - real hop count instead of hardcoded + rrdhost_aclk_state_lock(localhost); + query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id); + rrdhost_aclk_state_unlock(localhost); + + RRDHOST *host = rrdhost_find_by_guid(res.machine_guid, 0); + query->data.node_update.live = 0; + + if (host) { + // not all host must have RRDHOST struct created for them + // if they never connected during runtime of agent + if (host == localhost) { + query->data.node_update.live = 1; + query->data.node_update.hops = 0; + } else { + netdata_mutex_lock(&host->receiver_lock); + query->data.node_update.live = (host->receiver != NULL); + netdata_mutex_unlock(&host->receiver_lock); + query->data.node_update.hops = host->system_info->hops; + } + } + + query->data.node_update.node_id = res.node_id; // aclk_query_free will free it + query->data.node_update.queryable = 1; + query->data.node_update.session_id = aclk_session_newarch; + aclk_queue_query(query); + freez(res.machine_guid); + return; + } + if (!strcmp(message_type, "SendNodeInstances")) { + debug(D_ACLK, "Got SendNodeInstances"); + aclk_send_node_instances(); + return; + } + + if (!strcmp(message_type, "StreamChartsAndDimensions")) { + stream_charts_and_dims_t res = parse_stream_charts_and_dims(msg, msg_len); + if (!res.claim_id || !res.node_id) { + error("Error parsing StreamChartsAndDimensions msg"); + freez(res.claim_id); + freez(res.node_id); + return; + } + chart_batch_id = res.batch_id; + aclk_start_streaming(res.node_id, res.seq_id, res.seq_id_created_at.tv_sec, res.batch_id); + freez(res.claim_id); + freez(res.node_id); + return; + } + if (!strcmp(message_type, "ChartsAndDimensionsAck")) { + chart_and_dim_ack_t res = parse_chart_and_dimensions_ack(msg, msg_len); + if (!res.claim_id || !res.node_id) { + error("Error parsing StreamChartsAndDimensions msg"); + freez(res.claim_id); + freez(res.node_id); + return; + } + aclk_ack_chart_sequence_id(res.node_id, res.last_seq_id); + freez(res.claim_id); + freez(res.node_id); + return; + } + if (!strcmp(message_type, "UpdateChartConfigs")) { + struct update_chart_config res = parse_update_chart_config(msg, msg_len); + if (!res.claim_id || !res.node_id || !res.hashes) + error("Error parsing UpdateChartConfigs msg"); + else + aclk_get_chart_config(res.hashes); + destroy_update_chart_config(&res); + return; + } + if (!strcmp(message_type, "StartAlarmStreaming")) { + struct start_alarm_streaming res = parse_start_alarm_streaming(msg, msg_len); + if (!res.node_id || !res.batch_id) { + error("Error parsing StartAlarmStreaming"); + freez(res.node_id); + return; + } + aclk_start_alert_streaming(res.node_id, res.batch_id, res.start_seq_id); + freez(res.node_id); + return; + } + if (!strcmp(message_type, "SendAlarmLogHealth")) { + char *node_id = parse_send_alarm_log_health(msg, msg_len); + if (!node_id) { + error("Error parsing SendAlarmLogHealth"); + return; + } + aclk_send_alarm_health_log(node_id); + freez(node_id); + return; + } + if (!strcmp(message_type, "SendAlarmConfiguration")) { + char *config_hash = parse_send_alarm_configuration(msg, msg_len); + if (!config_hash || !*config_hash) { + error("Error parsing SendAlarmConfiguration"); + freez(config_hash); + return; + } + aclk_send_alarm_configuration(config_hash); + freez(config_hash); + return; + } + if (!strcmp(message_type, "SendAlarmSnapshot")) { + struct send_alarm_snapshot *sas = parse_send_alarm_snapshot(msg, msg_len); + if (!sas->node_id || !sas->claim_id) { + error("Error parsing SendAlarmSnapshot"); + destroy_send_alarm_snapshot(sas); + return; + } + aclk_process_send_alarm_snapshot(sas->node_id, sas->claim_id, sas->snapshot_id, sas->sequence_id); + destroy_send_alarm_snapshot(sas); + return; + } + if (!strcmp(message_type, "DisconnectReq")) { + struct disconnect_cmd *cmd = parse_disconnect_cmd(msg, msg_len); + if (!cmd) + return; + if (cmd->permaban) { + error ("Cloud Banned This Agent!"); + aclk_disable_runtime = 1; + } + info ("Cloud requested disconnect (EC=%u, \"%s\")", (unsigned int)cmd->error_code, cmd->error_description); + if (cmd->reconnect_after_s > 0) { + aclk_block_until = now_monotonic_sec() + cmd->reconnect_after_s; + info ("Cloud asks not to reconnect for %u seconds. We shall honor that request", (unsigned int)cmd->reconnect_after_s); + } + disconnect_req = 1; + freez(cmd->error_description); + freez(cmd); + return; + } + error ("Unknown new cloud arch message type received \"%s\"", message_type); +} +#endif |