summaryrefslogtreecommitdiffstats
path: root/aclk/aclk.c
diff options
context:
space:
mode:
Diffstat (limited to 'aclk/aclk.c')
-rw-r--r--aclk/aclk.c361
1 files changed, 329 insertions, 32 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c
index c25b7df68..599b9a093 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -24,8 +24,16 @@
#define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable
int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est.
+int aclk_rcvd_cloud_msgs = 0;
+int aclk_connection_counter = 0;
int disconnect_req = 0;
+time_t last_conn_time_mqtt = 0;
+time_t last_conn_time_appl = 0;
+time_t last_disconnect_time = 0;
+time_t next_connection_attempt = 0;
+float last_backoff_value = 0;
+
int aclk_alert_reloaded = 1; //1 on startup, and again on health_reload
time_t aclk_block_until = 0;
@@ -43,8 +51,6 @@ struct aclk_shared_state aclk_shared_state = {
.mqtt_shutdown_msg_rcvd = 0
};
-//ENDTODO
-
static RSA *aclk_private_key = NULL;
static int load_private_key()
{
@@ -123,7 +129,7 @@ static int wait_till_agent_claimed(void)
* @param aclk_hostname points to location where string pointer to hostname will be set
* @param aclk_port port to int where port will be saved
*
- * @return If non 0 returned irrecoverable error happened and ACLK should be terminated
+ * @return If non 0 returned irrecoverable error happened (or netdata_exit) and ACLK should be terminated
*/
static int wait_till_agent_claim_ready()
{
@@ -144,20 +150,20 @@ static int wait_till_agent_claim_ready()
// TODO make it without malloc/free
memset(&url, 0, sizeof(url_t));
if (url_parse(cloud_base_url, &url)) {
- error("Agent is claimed but the configuration is invalid, please fix");
+ error("Agent is claimed but the URL in configuration key \"cloud base url\" is invalid, please fix");
url_t_destroy(&url);
sleep(5);
continue;
}
url_t_destroy(&url);
- if (!load_private_key()) {
- sleep(5);
- break;
- }
+ if (!load_private_key())
+ return 0;
+
+ sleep(5);
}
- return 0;
+ return 1;
}
void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str)
@@ -266,6 +272,7 @@ static void msg_callback_new_protocol(const char *topic, const void *msg, size_t
}
static inline void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) {
+ aclk_rcvd_cloud_msgs++;
if (aclk_use_new_cloud_arch)
msg_callback_new_protocol(topic, msg, msglen, qos);
else
@@ -275,8 +282,10 @@ static inline void msg_callback(const char *topic, const void *msg, size_t msgle
static void puback_callback(uint16_t packet_id)
{
- if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE)
+ if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE) {
+ last_conn_time_appl = now_realtime_sec();
aclk_tbeb_reset();
+ }
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_puback(packet_id);
@@ -402,6 +411,8 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
aclk_stats_upd_online(1);
aclk_connected = 1;
aclk_pubacks_per_conn = 0;
+ aclk_rcvd_cloud_msgs = 0;
+ aclk_connection_counter++;
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (!aclk_use_new_cloud_arch) {
@@ -480,6 +491,7 @@ void aclk_graceful_disconnect(mqtt_wss_client client)
info("ACLK link is down");
log_access("ACLK DISCONNECTED");
aclk_stats_upd_online(0);
+ last_disconnect_time = now_realtime_sec();
aclk_connected = 0;
info("Attempting to gracefully shutdown the MQTT/WSS connection");
@@ -521,6 +533,9 @@ static unsigned long aclk_reconnect_delay() {
static int aclk_block_till_recon_allowed() {
unsigned long recon_delay = aclk_reconnect_delay();
+ next_connection_attempt = now_realtime_sec() + (recon_delay / MSEC_PER_SEC);
+ last_backoff_value = (float)recon_delay / MSEC_PER_SEC;
+
info("Wait before attempting to reconnect in %.3f seconds\n", recon_delay / (float)MSEC_PER_SEC);
// we want to wake up from time to time to check netdata_exit
while (recon_delay)
@@ -613,12 +628,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
.drop_on_publish_fail = 1
};
-#if defined(ENABLE_NEW_CLOUD_PROTOCOL) && defined(ACLK_NEWARCH_DEVMODE)
- aclk_use_new_cloud_arch = 1;
- info("Switching ACLK to new protobuf protocol. Due to #define ACLK_NEWARCH_DEVMODE.");
-#else
aclk_use_new_cloud_arch = 0;
-#endif
#ifndef ACLK_DISABLE_CHALLENGE
if (aclk_env) {
@@ -638,20 +648,19 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
if (netdata_exit)
return 1;
-#ifndef ACLK_NEWARCH_DEVMODE
if (aclk_env->encoding == ACLK_ENC_PROTO) {
#ifndef ENABLE_NEW_CLOUD_PROTOCOL
error("Cloud requested New Cloud Protocol to be used but this agent cannot support it!");
continue;
-#endif
+#else
if (!aclk_env_has_capa("proto")) {
error ("Can't encoding=proto without at least \"proto\" capability.");
continue;
}
info("Switching ACLK to new protobuf protocol. Due to /env response.");
aclk_use_new_cloud_arch = 1;
- }
#endif
+ }
memset(&auth_url, 0, sizeof(url_t));
if (url_parse(aclk_env->auth_endpoint, &auth_url)) {
@@ -728,6 +737,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
json_object_put(lwt);
if (!ret) {
+ last_conn_time_mqtt = now_realtime_sec();
info("ACLK connection successfully established");
log_access("ACLK CONNECTED");
mqtt_connected_actions(client);
@@ -767,8 +777,9 @@ void *aclk_main(void *ptr)
return NULL;
}
+ unsigned int proto_hdl_cnt;
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- aclk_init_rx_msg_handlers();
+ proto_hdl_cnt = aclk_init_rx_msg_handlers();
#endif
// This thread is unusual in that it cannot be cancelled by cancel_main_threads()
@@ -808,6 +819,7 @@ void *aclk_main(void *ptr)
stats_thread = callocz(1, sizeof(struct aclk_stats_thread));
stats_thread->thread = mallocz(sizeof(netdata_thread_t));
stats_thread->query_thread_count = query_threads.count;
+ aclk_stats_thread_prepare(query_threads.count, proto_hdl_cnt);
netdata_thread_create(
stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread,
stats_thread);
@@ -843,6 +855,7 @@ void *aclk_main(void *ptr)
if (handle_connection(mqttwss_client)) {
aclk_stats_upd_online(0);
+ last_disconnect_time = now_realtime_sec();
aclk_connected = 0;
log_access("ACLK DISCONNECTED");
}
@@ -1092,6 +1105,7 @@ void aclk_send_node_instances()
uuid_unparse_lower(list->node_id, (char*)query->data.node_update.node_id);
query->data.node_update.queryable = 1;
query->data.node_update.session_id = aclk_session_newarch;
+ freez(list->hostname);
info("Queuing status update for node=%s, live=%d, hops=%d",(char*)query->data.node_update.node_id,
list->live,
list->hops);
@@ -1121,53 +1135,257 @@ void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, con
aclk_send_bin_message_subtopic_pid(mqttwss_client, msg, msg_len, subtopic, msgname);
}
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host)
+{
+ struct proto_alert_status status;
+ memset(&status, 0, sizeof(status));
+ if (get_proto_alert_status(host, &status)) {
+ buffer_strcat(wb, "\nFailed to get alert streaming status for this host");
+ return;
+ }
+ buffer_sprintf(wb,
+ "\n\t\tUpdates: %d"
+ "\n\t\tBatch ID: %"PRIu64
+ "\n\t\tLast Acked Seq ID: %"PRIu64
+ "\n\t\tPending Min Seq ID: %"PRIu64
+ "\n\t\tPending Max Seq ID: %"PRIu64
+ "\n\t\tLast Submitted Seq ID: %"PRIu64,
+ status.alert_updates,
+ status.alerts_batch_id,
+ status.last_acked_sequence_id,
+ status.pending_min_sequence_id,
+ status.pending_max_sequence_id,
+ status.last_submitted_sequence_id
+ );
+}
+
+static void fill_chart_status_for_host(BUFFER *wb, RRDHOST *host)
+{
+ struct aclk_chart_sync_stats *stats = aclk_get_chart_sync_stats(host);
+ if (!stats) {
+ buffer_strcat(wb, "\n\t\tFailed to get alert streaming status for this host");
+ return;
+ }
+ buffer_sprintf(wb,
+ "\n\t\tUpdates: %d"
+ "\n\t\tBatch ID: %"PRIu64
+ "\n\t\tMin Seq ID: %"PRIu64
+ "\n\t\tMax Seq ID: %"PRIu64
+ "\n\t\tPending Min Seq ID: %"PRIu64
+ "\n\t\tPending Max Seq ID: %"PRIu64
+ "\n\t\tSent Min Seq ID: %"PRIu64
+ "\n\t\tSent Max Seq ID: %"PRIu64
+ "\n\t\tAcked Min Seq ID: %"PRIu64
+ "\n\t\tAcked Max Seq ID: %"PRIu64,
+ stats->updates,
+ stats->batch_id,
+ stats->min_seqid,
+ stats->max_seqid,
+ stats->min_seqid_pend,
+ stats->max_seqid_pend,
+ stats->min_seqid_sent,
+ stats->max_seqid_sent,
+ stats->min_seqid_ack,
+ stats->max_seqid_ack
+ );
+ freez(stats);
+}
+#endif
+
char *ng_aclk_state(void)
{
BUFFER *wb = buffer_create(1024);
+ struct tm *tmptr, tmbuf;
char *ret;
buffer_strcat(wb,
"ACLK Available: Yes\n"
- "ACLK Implementation: Next Generation\n"
+ "ACLK Version: 2\n"
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- "New Cloud Protocol Support: Yes\n"
+ "Protocols Supported: Legacy, Protobuf\n"
#else
- "New Cloud Protocol Support: No\n"
+ "Protocols Supported: Legacy\n"
#endif
- "Claimed: "
);
+ buffer_sprintf(wb, "Protocol Used: %s\nClaimed: ", aclk_use_new_cloud_arch ? "Protobuf" : "Legacy");
char *agent_id = is_agent_claimed();
if (agent_id == NULL)
buffer_strcat(wb, "No\n");
else {
- buffer_sprintf(wb, "Yes\nClaimed Id: %s\n", agent_id);
+ char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
+ buffer_sprintf(wb, "Yes\nClaimed Id: %s\nCloud URL: %s\n", agent_id, cloud_base_url ? cloud_base_url : "null");
freez(agent_id);
}
- buffer_sprintf(wb, "Online: %s\nUsed Cloud Protocol: %s", aclk_connected ? "Yes" : "No", aclk_use_new_cloud_arch ? "New" : "Legacy");
+ buffer_sprintf(wb, "Online: %s\nReconnect count: %d\nBanned By Cloud: %s\n", aclk_connected ? "Yes" : "No", aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0, aclk_disable_runtime ? "Yes" : "No");
+ if (last_conn_time_mqtt && (tmptr = localtime_r(&last_conn_time_mqtt, &tmbuf)) ) {
+ char timebuf[26];
+ strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
+ buffer_sprintf(wb, "Last Connection Time: %s\n", timebuf);
+ }
+ if (last_conn_time_appl && (tmptr = localtime_r(&last_conn_time_appl, &tmbuf)) ) {
+ char timebuf[26];
+ strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
+ buffer_sprintf(wb, "Last Connection Time + %d PUBACKs received: %s\n", ACLK_PUBACKS_CONN_STABLE, timebuf);
+ }
+ if (last_disconnect_time && (tmptr = localtime_r(&last_disconnect_time, &tmbuf)) ) {
+ char timebuf[26];
+ strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
+ buffer_sprintf(wb, "Last Disconnect Time: %s\n", timebuf);
+ }
+ if (!aclk_connected && next_connection_attempt && (tmptr = localtime_r(&next_connection_attempt, &tmbuf)) ) {
+ char timebuf[26];
+ strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
+ buffer_sprintf(wb, "Next Connection Attempt At: %s\nLast Backoff: %.3f", timebuf, last_backoff_value);
+ }
+
+ if (aclk_connected) {
+ buffer_sprintf(wb, "Received Cloud MQTT Messages: %d\nMQTT Messages Confirmed by Remote Broker (PUBACKs): %d", aclk_rcvd_cloud_msgs, aclk_pubacks_per_conn);
+
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ RRDHOST *host;
+ rrd_rdlock();
+ rrdhost_foreach_read(host) {
+ buffer_sprintf(wb, "\n\n> Node Instance for mGUID: \"%s\" hostname \"%s\"\n", host->machine_guid, host->hostname);
+
+ buffer_strcat(wb, "\tClaimed ID: ");
+ rrdhost_aclk_state_lock(host);
+ if (host->aclk_state.claimed_id)
+ buffer_strcat(wb, host->aclk_state.claimed_id);
+ else
+ buffer_strcat(wb, "null");
+ rrdhost_aclk_state_unlock(host);
+
+
+ if (host->node_id == NULL || uuid_is_null(*host->node_id)) {
+ buffer_strcat(wb, "\n\tNode ID: null\n");
+ } else {
+ char node_id[GUID_LEN + 1];
+ uuid_unparse_lower(*host->node_id, node_id);
+ buffer_sprintf(wb, "\n\tNode ID: %s\n", node_id);
+ }
+
+ buffer_sprintf(wb, "\tStreaming Hops: %d\n\tRelationship: %s", host->system_info->hops, host == localhost ? "self" : "child");
+
+ if (host != localhost)
+ buffer_sprintf(wb, "\n\tStreaming Connection Live: %s", host->receiver ? "true" : "false");
+
+ buffer_strcat(wb, "\n\tAlert Streaming Status:");
+ fill_alert_status_for_host(wb, host);
+
+ buffer_strcat(wb, "\n\tChart Streaming Status:");
+ fill_chart_status_for_host(wb, host);
+ }
+ rrd_unlock();
+#endif
+ }
ret = strdupz(buffer_tostring(wb));
buffer_free(wb);
return ret;
}
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host)
+{
+ struct proto_alert_status status;
+ memset(&status, 0, sizeof(status));
+ if (get_proto_alert_status(host, &status))
+ return;
+
+ json_object *tmp = json_object_new_int(status.alert_updates);
+ json_object_object_add(obj, "updates", tmp);
+
+ tmp = json_object_new_int(status.alerts_batch_id);
+ json_object_object_add(obj, "batch-id", tmp);
+
+ tmp = json_object_new_int(status.last_acked_sequence_id);
+ json_object_object_add(obj, "last-acked-seq-id", tmp);
+
+ tmp = json_object_new_int(status.pending_min_sequence_id);
+ json_object_object_add(obj, "pending-min-seq-id", tmp);
+
+ tmp = json_object_new_int(status.pending_max_sequence_id);
+ json_object_object_add(obj, "pending-max-seq-id", tmp);
+
+ tmp = json_object_new_int(status.last_submitted_sequence_id);
+ json_object_object_add(obj, "last-submitted-seq-id", tmp);
+}
+
+static void fill_chart_status_for_host_json(json_object *obj, RRDHOST *host)
+{
+ struct aclk_chart_sync_stats *stats = aclk_get_chart_sync_stats(host);
+ if (!stats)
+ return;
+
+ json_object *tmp = json_object_new_int(stats->updates);
+ json_object_object_add(obj, "updates", tmp);
+
+ tmp = json_object_new_int(stats->batch_id);
+ json_object_object_add(obj, "batch-id", tmp);
+
+ tmp = json_object_new_int(stats->min_seqid);
+ json_object_object_add(obj, "min-seq-id", tmp);
+
+ tmp = json_object_new_int(stats->max_seqid);
+ json_object_object_add(obj, "max-seq-id", tmp);
+
+ tmp = json_object_new_int(stats->min_seqid_pend);
+ json_object_object_add(obj, "pending-min-seq-id", tmp);
+
+ tmp = json_object_new_int(stats->max_seqid_pend);
+ json_object_object_add(obj, "pending-max-seq-id", tmp);
+
+ tmp = json_object_new_int(stats->min_seqid_sent);
+ json_object_object_add(obj, "sent-min-seq-id", tmp);
+
+ tmp = json_object_new_int(stats->max_seqid_sent);
+ json_object_object_add(obj, "sent-max-seq-id", tmp);
+
+ tmp = json_object_new_int(stats->min_seqid_ack);
+ json_object_object_add(obj, "acked-min-seq-id", tmp);
+
+ tmp = json_object_new_int(stats->max_seqid_ack);
+ json_object_object_add(obj, "acked-max-seq-id", tmp);
+
+ freez(stats);
+}
+#endif
+
+static json_object *timestamp_to_json(const time_t *t)
+{
+ struct tm *tmptr, tmbuf;
+ if (*t && (tmptr = gmtime_r(t, &tmbuf)) ) {
+ char timebuf[26];
+ strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
+ return json_object_new_string(timebuf);
+ }
+ return NULL;
+}
+
char *ng_aclk_state_json(void)
{
- json_object *tmp, *msg = json_object_new_object();
+ json_object *tmp, *grp, *msg = json_object_new_object();
tmp = json_object_new_boolean(1);
json_object_object_add(msg, "aclk-available", tmp);
- tmp = json_object_new_string("Next Generation");
- json_object_object_add(msg, "aclk-implementation", tmp);
+ tmp = json_object_new_int(2);
+ json_object_object_add(msg, "aclk-version", tmp);
+ grp = json_object_new_array();
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- tmp = json_object_new_boolean(1);
+ tmp = json_object_new_string("Legacy");
+ json_object_array_add(grp, tmp);
+ tmp = json_object_new_string("Protobuf");
+ json_object_array_add(grp, tmp);
#else
- tmp = json_object_new_boolean(0);
+ tmp = json_object_new_string("Legacy");
+ json_object_array_add(grp, tmp);
#endif
- json_object_object_add(msg, "new-cloud-protocol-supported", tmp);
+ json_object_object_add(msg, "protocols-supported", grp);
char *agent_id = is_agent_claimed();
tmp = json_object_new_boolean(agent_id != NULL);
@@ -1180,12 +1398,91 @@ char *ng_aclk_state_json(void)
tmp = NULL;
json_object_object_add(msg, "claimed-id", tmp);
+ char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
+ tmp = cloud_base_url ? json_object_new_string(cloud_base_url) : NULL;
+ json_object_object_add(msg, "cloud-url", tmp);
+
tmp = json_object_new_boolean(aclk_connected);
json_object_object_add(msg, "online", tmp);
- tmp = json_object_new_string(aclk_use_new_cloud_arch ? "New" : "Legacy");
+ tmp = json_object_new_string(aclk_use_new_cloud_arch ? "Protobuf" : "Legacy");
json_object_object_add(msg, "used-cloud-protocol", tmp);
+ tmp = json_object_new_int(aclk_rcvd_cloud_msgs);
+ json_object_object_add(msg, "received-app-layer-msgs", tmp);
+
+ tmp = json_object_new_int(aclk_pubacks_per_conn);
+ json_object_object_add(msg, "received-mqtt-pubacks", tmp);
+
+ tmp = json_object_new_int(aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0);
+ json_object_object_add(msg, "reconnect-count", tmp);
+
+ json_object_object_add(msg, "last-connect-time-utc", timestamp_to_json(&last_conn_time_mqtt));
+ json_object_object_add(msg, "last-connect-time-puback-utc", timestamp_to_json(&last_conn_time_appl));
+ json_object_object_add(msg, "last-disconnect-time-utc", timestamp_to_json(&last_disconnect_time));
+ json_object_object_add(msg, "next-connection-attempt-utc", !aclk_connected ? timestamp_to_json(&next_connection_attempt) : NULL);
+ tmp = NULL;
+ if (!aclk_connected && last_backoff_value)
+ tmp = json_object_new_double(last_backoff_value);
+ json_object_object_add(msg, "last-backoff-value", tmp);
+
+ tmp = json_object_new_boolean(aclk_disable_runtime);
+ json_object_object_add(msg, "banned-by-cloud", tmp);
+
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ grp = json_object_new_array();
+
+ RRDHOST *host;
+ rrd_rdlock();
+ rrdhost_foreach_read(host) {
+ json_object *nodeinstance = json_object_new_object();
+
+ tmp = json_object_new_string(host->hostname);
+ json_object_object_add(nodeinstance, "hostname", tmp);
+
+ tmp = json_object_new_string(host->machine_guid);
+ json_object_object_add(nodeinstance, "mguid", tmp);
+
+ rrdhost_aclk_state_lock(host);
+ if (host->aclk_state.claimed_id) {
+ tmp = json_object_new_string(host->aclk_state.claimed_id);
+ json_object_object_add(nodeinstance, "claimed_id", tmp);
+ } else
+ json_object_object_add(nodeinstance, "claimed_id", NULL);
+ rrdhost_aclk_state_unlock(host);
+
+ if (host->node_id == NULL || uuid_is_null(*host->node_id)) {
+ json_object_object_add(nodeinstance, "node-id", NULL);
+ } else {
+ char node_id[GUID_LEN + 1];
+ uuid_unparse_lower(*host->node_id, node_id);
+ tmp = json_object_new_string(node_id);
+ json_object_object_add(nodeinstance, "node-id", tmp);
+ }
+
+ tmp = json_object_new_int(host->system_info->hops);
+ json_object_object_add(nodeinstance, "streaming-hops", tmp);
+
+ tmp = json_object_new_string(host == localhost ? "self" : "child");
+ json_object_object_add(nodeinstance, "relationship", tmp);
+
+ tmp = json_object_new_boolean((host->receiver || host == localhost));
+ json_object_object_add(nodeinstance, "streaming-online", tmp);
+
+ tmp = json_object_new_object();
+ fill_alert_status_for_host_json(tmp, host);
+ json_object_object_add(nodeinstance, "alert-sync-status", tmp);
+
+ tmp = json_object_new_object();
+ fill_chart_status_for_host_json(tmp, host);
+ json_object_object_add(nodeinstance, "chart-sync-status", tmp);
+
+ json_object_array_add(grp, nodeinstance);
+ }
+ rrd_unlock();
+ json_object_object_add(msg, "node-instances", grp);
+#endif
+
char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN));
json_object_put(msg);
return str;