summaryrefslogtreecommitdiffstats
path: root/aclk/aclk_tx_msgs.c
diff options
context:
space:
mode:
Diffstat (limited to 'aclk/aclk_tx_msgs.c')
-rw-r--r--aclk/aclk_tx_msgs.c68
1 files changed, 54 insertions, 14 deletions
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c
index 74fc19c72..185f5d796 100644
--- a/aclk/aclk_tx_msgs.c
+++ b/aclk/aclk_tx_msgs.c
@@ -116,28 +116,30 @@ static void aclk_send_message_topic(mqtt_wss_client client, json_object *msg, co
#define TOPIC_MAX_LEN 512
#define V2_BIN_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A"
-static void aclk_send_message_with_bin_payload(mqtt_wss_client client, json_object *msg, const char *topic, const void *payload, size_t payload_len)
+static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_object *msg, const char *topic, const void *payload, size_t payload_len)
{
uint16_t packet_id;
const char *str;
- char *full_msg;
- int len;
+ char *full_msg = NULL;
+ int len, rc;
if (unlikely(!topic || topic[0] != '/')) {
error ("Full topic required!");
- return;
+ return 500;
}
str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
len = strlen(str);
- full_msg = mallocz(len + strlen(V2_BIN_PAYLOAD_SEPARATOR) + payload_len);
+ if (payload_len) {
+ full_msg = mallocz(len + strlen(V2_BIN_PAYLOAD_SEPARATOR) + payload_len);
- memcpy(full_msg, str, len);
- memcpy(&full_msg[len], V2_BIN_PAYLOAD_SEPARATOR, strlen(V2_BIN_PAYLOAD_SEPARATOR));
- len += strlen(V2_BIN_PAYLOAD_SEPARATOR);
- memcpy(&full_msg[len], payload, payload_len);
- len += payload_len;
+ memcpy(full_msg, str, len);
+ memcpy(&full_msg[len], V2_BIN_PAYLOAD_SEPARATOR, strlen(V2_BIN_PAYLOAD_SEPARATOR));
+ len += strlen(V2_BIN_PAYLOAD_SEPARATOR);
+ memcpy(&full_msg[len], payload, payload_len);
+ len += payload_len;
+ }
/* TODO
#ifdef ACLK_LOG_CONVERSATION_DIR
@@ -147,15 +149,22 @@ static void aclk_send_message_with_bin_payload(mqtt_wss_client client, json_obje
json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
#endif */
- int rc = mqtt_wss_publish_pid_block(client, topic, full_msg, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000);
- if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT)
+ rc = mqtt_wss_publish_pid_block(client, topic, payload_len ? full_msg : str, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000);
+ if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT) {
error("Timeout sending binpacked message");
- if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL)
+ freez(full_msg);
+ return 503;
+ }
+ if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL) {
error("Message is bigger than allowed maximum");
+ freez(full_msg);
+ return 403;
+ }
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published(packet_id);
#endif
freez(full_msg);
+ return 0;
}
/*
@@ -316,6 +325,25 @@ void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted)
buffer_free(local_buffer);
}
+void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char *msg_id, int http_code, int ec, const char* emsg, const char *payload, size_t payload_len)
+{
+ json_object *tmp, *msg;
+ msg = create_hdr("http", msg_id, 0, 0, 2);
+ tmp = json_object_new_int(http_code);
+ json_object_object_add(msg, "http-code", tmp);
+
+ tmp = json_object_new_int(ec);
+ json_object_object_add(msg, "error-code", tmp);
+
+ tmp = json_object_new_string(emsg);
+ json_object_object_add(msg, "error-description", tmp);
+
+ if (aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len)) {
+ error("Failed to send cancelation message for http reply");
+ }
+ json_object_put(msg);
+}
+
void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len)
{
json_object *tmp, *msg;
@@ -331,8 +359,20 @@ void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg
tmp = json_object_new_int(http_code);
json_object_object_add(msg, "http-code", tmp);
- aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len);
+ int rc = aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len);
json_object_put(msg);
+
+ switch (rc) {
+ case 403:
+ aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_REQ_REPLY_TOO_BIG, CLOUD_EMSG_REQ_REPLY_TOO_BIG, payload, payload_len);
+ break;
+ case 500:
+ aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_FAIL_TOPIC, CLOUD_EMSG_FAIL_TOPIC, payload, payload_len);
+ break;
+ case 503:
+ aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, payload, payload_len);
+ break;
+ }
}
void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart)