summaryrefslogtreecommitdiffstats
path: root/aclk/aclk_tx_msgs.c
diff options
context:
space:
mode:
Diffstat (limited to 'aclk/aclk_tx_msgs.c')
-rw-r--r--aclk/aclk_tx_msgs.c83
1 files changed, 41 insertions, 42 deletions
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c
index 185f5d796..3530dccff 100644
--- a/aclk/aclk_tx_msgs.c
+++ b/aclk/aclk_tx_msgs.c
@@ -49,7 +49,11 @@ uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, s
return 0;
}
- mqtt_wss_publish_pid(client, topic, msg, msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
+ if (use_mqtt_5)
+ mqtt_wss_publish5(client, (char*)topic, NULL, msg, &freez, msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
+ else
+ mqtt_wss_publish_pid(client, topic, msg, msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
+
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published(packet_id);
#endif
@@ -125,7 +129,7 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec
if (unlikely(!topic || topic[0] != '/')) {
error ("Full topic required!");
- return 500;
+ return HTTP_RESP_INTERNAL_SERVER_ERROR;
}
str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
@@ -149,17 +153,22 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec
json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
#endif */
- rc = mqtt_wss_publish_pid_block(client, topic, payload_len ? full_msg : str, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000);
- if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT) {
- error("Timeout sending binpacked message");
- freez(full_msg);
- return 503;
- }
- if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL) {
- error("Message is bigger than allowed maximum");
- freez(full_msg);
- return 403;
+ if (use_mqtt_5)
+ mqtt_wss_publish5(client, (char*)topic, NULL, (char*)(payload_len ? full_msg : str), NULL, len, MQTT_WSS_PUB_QOS1, &packet_id);
+ else {
+ rc = mqtt_wss_publish_pid_block(client, topic, payload_len ? full_msg : str, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000);
+ if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT) {
+ error("Timeout sending binpacked message");
+ freez(full_msg);
+ return HTTP_RESP_BACKEND_FETCH_FAILED;
+ }
+ if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL) {
+ error("Message is bigger than allowed maximum");
+ freez(full_msg);
+ return HTTP_RESP_FORBIDDEN;
+ }
}
+
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published(packet_id);
#endif
@@ -363,13 +372,13 @@ void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg
json_object_put(msg);
switch (rc) {
- case 403:
+ case HTTP_RESP_FORBIDDEN:
aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_REQ_REPLY_TOO_BIG, CLOUD_EMSG_REQ_REPLY_TOO_BIG, payload, payload_len);
break;
- case 500:
+ case HTTP_RESP_INTERNAL_SERVER_ERROR:
aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_FAIL_TOPIC, CLOUD_EMSG_FAIL_TOPIC, payload, payload_len);
break;
- case 503:
+ case HTTP_RESP_BACKEND_FETCH_FAILED:
aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, payload, payload_len);
break;
}
@@ -452,10 +461,22 @@ int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message)
uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable) {
size_t len;
uint16_t pid;
+
+ struct capability agent_capabilities[] = {
+ { .name = "json", .version = 2, .enabled = 0 },
+ { .name = "proto", .version = 1, .enabled = 1 },
+#ifdef ENABLE_ML
+ { .name = "ml", .version = 1, .enabled = ml_enabled(localhost) },
+#endif
+ { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
+ { .name = NULL, .version = 0, .enabled = 0 }
+ };
+
update_agent_connection_t conn = {
.reachable = (reachable ? 1 : 0),
.lwt = 0,
- .session_id = aclk_session_newarch
+ .session_id = aclk_session_newarch,
+ .capabilities = agent_capabilities
};
rrdhost_aclk_state_lock(localhost);
@@ -478,7 +499,8 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable
}
pid = aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_AGENT_CONN, "UpdateAgentConnection");
- freez(msg);
+ if (!use_mqtt_5)
+ freez(msg);
if (localhost->aclk_state.prev_claimed_id) {
freez(localhost->aclk_state.prev_claimed_id);
localhost->aclk_state.prev_claimed_id = NULL;
@@ -490,7 +512,8 @@ char *aclk_generate_lwt(size_t *size) {
update_agent_connection_t conn = {
.reachable = 0,
.lwt = 1,
- .session_id = aclk_session_newarch
+ .session_id = aclk_session_newarch,
+ .capabilities = NULL
};
rrdhost_aclk_state_lock(localhost);
@@ -509,30 +532,6 @@ char *aclk_generate_lwt(size_t *size) {
return msg;
}
-
-void aclk_generate_node_registration(mqtt_wss_client client, node_instance_creation_t *node_creation) {
- size_t len;
- char *msg = generate_node_instance_creation(&len, node_creation);
- if (!msg) {
- error("Error generating nodeinstance::create::v1::CreateNodeInstance");
- return;
- }
-
- aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_CREATE_NODE, "CreateNodeInstance");
- freez(msg);
-}
-
-void aclk_generate_node_state_update(mqtt_wss_client client, node_instance_connection_t *node_connection) {
- size_t len;
- char *msg = generate_node_instance_connection(&len, node_connection);
- if (!msg) {
- error("Error generating nodeinstance::v1::UpdateNodeInstanceConnection");
- return;
- }
-
- aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection");
- freez(msg);
-}
#endif /* ENABLE_NEW_CLOUD_PROTOCOL */
#ifndef __GNUC__