diff options
Diffstat (limited to 'aclk/aclk_tx_msgs.c')
-rw-r--r-- | aclk/aclk_tx_msgs.c | 83 |
1 files changed, 41 insertions, 42 deletions
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c index 185f5d796..3530dccff 100644 --- a/aclk/aclk_tx_msgs.c +++ b/aclk/aclk_tx_msgs.c @@ -49,7 +49,11 @@ uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, s return 0; } - mqtt_wss_publish_pid(client, topic, msg, msg_len, MQTT_WSS_PUB_QOS1, &packet_id); + if (use_mqtt_5) + mqtt_wss_publish5(client, (char*)topic, NULL, msg, &freez, msg_len, MQTT_WSS_PUB_QOS1, &packet_id); + else + mqtt_wss_publish_pid(client, topic, msg, msg_len, MQTT_WSS_PUB_QOS1, &packet_id); + #ifdef NETDATA_INTERNAL_CHECKS aclk_stats_msg_published(packet_id); #endif @@ -125,7 +129,7 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec if (unlikely(!topic || topic[0] != '/')) { error ("Full topic required!"); - return 500; + return HTTP_RESP_INTERNAL_SERVER_ERROR; } str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN); @@ -149,17 +153,22 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY); #endif */ - rc = mqtt_wss_publish_pid_block(client, topic, payload_len ? full_msg : str, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000); - if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT) { - error("Timeout sending binpacked message"); - freez(full_msg); - return 503; - } - if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL) { - error("Message is bigger than allowed maximum"); - freez(full_msg); - return 403; + if (use_mqtt_5) + mqtt_wss_publish5(client, (char*)topic, NULL, (char*)(payload_len ? full_msg : str), NULL, len, MQTT_WSS_PUB_QOS1, &packet_id); + else { + rc = mqtt_wss_publish_pid_block(client, topic, payload_len ? full_msg : str, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000); + if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT) { + error("Timeout sending binpacked message"); + freez(full_msg); + return HTTP_RESP_BACKEND_FETCH_FAILED; + } + if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL) { + error("Message is bigger than allowed maximum"); + freez(full_msg); + return HTTP_RESP_FORBIDDEN; + } } + #ifdef NETDATA_INTERNAL_CHECKS aclk_stats_msg_published(packet_id); #endif @@ -363,13 +372,13 @@ void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg json_object_put(msg); switch (rc) { - case 403: + case HTTP_RESP_FORBIDDEN: aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_REQ_REPLY_TOO_BIG, CLOUD_EMSG_REQ_REPLY_TOO_BIG, payload, payload_len); break; - case 500: + case HTTP_RESP_INTERNAL_SERVER_ERROR: aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_FAIL_TOPIC, CLOUD_EMSG_FAIL_TOPIC, payload, payload_len); break; - case 503: + case HTTP_RESP_BACKEND_FETCH_FAILED: aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, payload, payload_len); break; } @@ -452,10 +461,22 @@ int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message) uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable) { size_t len; uint16_t pid; + + struct capability agent_capabilities[] = { + { .name = "json", .version = 2, .enabled = 0 }, + { .name = "proto", .version = 1, .enabled = 1 }, +#ifdef ENABLE_ML + { .name = "ml", .version = 1, .enabled = ml_enabled(localhost) }, +#endif + { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, + { .name = NULL, .version = 0, .enabled = 0 } + }; + update_agent_connection_t conn = { .reachable = (reachable ? 1 : 0), .lwt = 0, - .session_id = aclk_session_newarch + .session_id = aclk_session_newarch, + .capabilities = agent_capabilities }; rrdhost_aclk_state_lock(localhost); @@ -478,7 +499,8 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable } pid = aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_AGENT_CONN, "UpdateAgentConnection"); - freez(msg); + if (!use_mqtt_5) + freez(msg); if (localhost->aclk_state.prev_claimed_id) { freez(localhost->aclk_state.prev_claimed_id); localhost->aclk_state.prev_claimed_id = NULL; @@ -490,7 +512,8 @@ char *aclk_generate_lwt(size_t *size) { update_agent_connection_t conn = { .reachable = 0, .lwt = 1, - .session_id = aclk_session_newarch + .session_id = aclk_session_newarch, + .capabilities = NULL }; rrdhost_aclk_state_lock(localhost); @@ -509,30 +532,6 @@ char *aclk_generate_lwt(size_t *size) { return msg; } - -void aclk_generate_node_registration(mqtt_wss_client client, node_instance_creation_t *node_creation) { - size_t len; - char *msg = generate_node_instance_creation(&len, node_creation); - if (!msg) { - error("Error generating nodeinstance::create::v1::CreateNodeInstance"); - return; - } - - aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_CREATE_NODE, "CreateNodeInstance"); - freez(msg); -} - -void aclk_generate_node_state_update(mqtt_wss_client client, node_instance_connection_t *node_connection) { - size_t len; - char *msg = generate_node_instance_connection(&len, node_connection); - if (!msg) { - error("Error generating nodeinstance::v1::UpdateNodeInstanceConnection"); - return; - } - - aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection"); - freez(msg); -} #endif /* ENABLE_NEW_CLOUD_PROTOCOL */ #ifndef __GNUC__ |