diff options
Diffstat (limited to 'aclk/aclk_tx_msgs.c')
-rw-r--r-- | aclk/aclk_tx_msgs.c | 68 |
1 files changed, 54 insertions, 14 deletions
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c index 74fc19c72..185f5d796 100644 --- a/aclk/aclk_tx_msgs.c +++ b/aclk/aclk_tx_msgs.c @@ -116,28 +116,30 @@ static void aclk_send_message_topic(mqtt_wss_client client, json_object *msg, co #define TOPIC_MAX_LEN 512 #define V2_BIN_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A" -static void aclk_send_message_with_bin_payload(mqtt_wss_client client, json_object *msg, const char *topic, const void *payload, size_t payload_len) +static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_object *msg, const char *topic, const void *payload, size_t payload_len) { uint16_t packet_id; const char *str; - char *full_msg; - int len; + char *full_msg = NULL; + int len, rc; if (unlikely(!topic || topic[0] != '/')) { error ("Full topic required!"); - return; + return 500; } str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN); len = strlen(str); - full_msg = mallocz(len + strlen(V2_BIN_PAYLOAD_SEPARATOR) + payload_len); + if (payload_len) { + full_msg = mallocz(len + strlen(V2_BIN_PAYLOAD_SEPARATOR) + payload_len); - memcpy(full_msg, str, len); - memcpy(&full_msg[len], V2_BIN_PAYLOAD_SEPARATOR, strlen(V2_BIN_PAYLOAD_SEPARATOR)); - len += strlen(V2_BIN_PAYLOAD_SEPARATOR); - memcpy(&full_msg[len], payload, payload_len); - len += payload_len; + memcpy(full_msg, str, len); + memcpy(&full_msg[len], V2_BIN_PAYLOAD_SEPARATOR, strlen(V2_BIN_PAYLOAD_SEPARATOR)); + len += strlen(V2_BIN_PAYLOAD_SEPARATOR); + memcpy(&full_msg[len], payload, payload_len); + len += payload_len; + } /* TODO #ifdef ACLK_LOG_CONVERSATION_DIR @@ -147,15 +149,22 @@ static void aclk_send_message_with_bin_payload(mqtt_wss_client client, json_obje json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY); #endif */ - int rc = mqtt_wss_publish_pid_block(client, topic, full_msg, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000); - if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT) + rc = mqtt_wss_publish_pid_block(client, topic, payload_len ? full_msg : str, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000); + if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT) { error("Timeout sending binpacked message"); - if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL) + freez(full_msg); + return 503; + } + if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL) { error("Message is bigger than allowed maximum"); + freez(full_msg); + return 403; + } #ifdef NETDATA_INTERNAL_CHECKS aclk_stats_msg_published(packet_id); #endif freez(full_msg); + return 0; } /* @@ -316,6 +325,25 @@ void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted) buffer_free(local_buffer); } +void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char *msg_id, int http_code, int ec, const char* emsg, const char *payload, size_t payload_len) +{ + json_object *tmp, *msg; + msg = create_hdr("http", msg_id, 0, 0, 2); + tmp = json_object_new_int(http_code); + json_object_object_add(msg, "http-code", tmp); + + tmp = json_object_new_int(ec); + json_object_object_add(msg, "error-code", tmp); + + tmp = json_object_new_string(emsg); + json_object_object_add(msg, "error-description", tmp); + + if (aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len)) { + error("Failed to send cancelation message for http reply"); + } + json_object_put(msg); +} + void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len) { json_object *tmp, *msg; @@ -331,8 +359,20 @@ void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg tmp = json_object_new_int(http_code); json_object_object_add(msg, "http-code", tmp); - aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len); + int rc = aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len); json_object_put(msg); + + switch (rc) { + case 403: + aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_REQ_REPLY_TOO_BIG, CLOUD_EMSG_REQ_REPLY_TOO_BIG, payload, payload_len); + break; + case 500: + aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_FAIL_TOPIC, CLOUD_EMSG_FAIL_TOPIC, payload, payload_len); + break; + case 503: + aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, payload, payload_len); + break; + } } void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart) |