diff options
Diffstat (limited to 'aclk/legacy/mqtt.c')
-rw-r--r-- | aclk/legacy/mqtt.c | 370 |
1 files changed, 0 insertions, 370 deletions
diff --git a/aclk/legacy/mqtt.c b/aclk/legacy/mqtt.c deleted file mode 100644 index 0e4bb2ec..00000000 --- a/aclk/legacy/mqtt.c +++ /dev/null @@ -1,370 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include <libnetdata/json/json.h> -#include "daemon/common.h" -#include "mqtt.h" -#include "aclk_lws_wss_client.h" -#include "aclk_stats.h" -#include "aclk_rx_msgs.h" - -#include "agent_cloud_link.h" - -#define ACLK_QOS 1 - -extern usec_t aclk_session_us; -extern time_t aclk_session_sec; - -inline const char *_link_strerror(int rc) -{ - return mosquitto_strerror(rc); -} - -#ifdef NETDATA_INTERNAL_CHECKS -static struct timeval sendTimes[1024]; -#endif - -static struct mosquitto *mosq = NULL; - - -void mqtt_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg) -{ - UNUSED(mosq); - UNUSED(obj); - - legacy_aclk_handle_cloud_message(msg->payload); -} - -void publish_callback(struct mosquitto *mosq, void *obj, int rc) -{ - UNUSED(mosq); - UNUSED(obj); - UNUSED(rc); -#ifdef NETDATA_INTERNAL_CHECKS - struct timeval now, *orig; - now_realtime_timeval(&now); - orig = &sendTimes[ rc & 0x3ff ]; - int64_t diff = (now.tv_sec - orig->tv_sec) * USEC_PER_SEC + (now.tv_usec - orig->tv_usec); - diff /= 1000; - - info("Publish_callback: mid=%d latency=%" PRId64 "ms", rc, diff); - - legacy_aclk_metric_mat_update(&legacy_aclk_metrics_per_sample.latency, diff); -#endif - return; -} - -void connect_callback(struct mosquitto *mosq, void *obj, int rc) -{ - UNUSED(mosq); - UNUSED(obj); - UNUSED(rc); - - info("Connection to cloud established"); - aclk_connect(); - - return; -} - -void disconnect_callback(struct mosquitto *mosq, void *obj, int rc) -{ - UNUSED(mosq); - UNUSED(obj); - UNUSED(rc); - - if (netdata_exit) - info("Connection to cloud terminated due to agent shutdown"); - else { - errno = 0; - error("Connection to cloud failed"); - } - aclk_disconnect(); - - aclk_lws_wss_mqtt_layer_disconnect_notif(); - - return; -} - -void _show_mqtt_info() -{ - int libmosq_major, libmosq_minor, libmosq_revision, libmosq_version; - libmosq_version = mosquitto_lib_version(&libmosq_major, &libmosq_minor, &libmosq_revision); - - info( - "Detected libmosquitto library version %d, %d.%d.%d", libmosq_version, libmosq_major, libmosq_minor, - libmosq_revision); -} - -size_t _mqtt_external_write_hook(void *buf, size_t count) -{ - return aclk_lws_wss_client_write(buf, count); -} - -size_t _mqtt_external_read_hook(void *buf, size_t count) -{ - return aclk_lws_wss_client_read(buf, count); -} - -int _mqtt_lib_init() -{ - int rc; - //int libmosq_major, libmosq_minor, libmosq_revision, libmosq_version; - /* Commenting out now as it is unused - do not delete, this is needed for the on-prem version. - char *ca_crt; - char *server_crt; - char *server_key; - - // show library info so can have it in the logfile - //libmosq_version = mosquitto_lib_version(&libmosq_major, &libmosq_minor, &libmosq_revision); - ca_crt = config_get(CONFIG_SECTION_CLOUD, "link cert", "*"); - server_crt = config_get(CONFIG_SECTION_CLOUD, "link server cert", "*"); - server_key = config_get(CONFIG_SECTION_CLOUD, "link server key", "*"); - - if (ca_crt[0] == '*') { - freez(ca_crt); - ca_crt = NULL; - } - - if (server_crt[0] == '*') { - freez(server_crt); - server_crt = NULL; - } - - if (server_key[0] == '*') { - freez(server_key); - server_key = NULL; - } - */ - - // info( - // "Detected libmosquitto library version %d, %d.%d.%d", libmosq_version, libmosq_major, libmosq_minor, - // libmosq_revision); - - rc = mosquitto_lib_init(); - if (unlikely(rc != MOSQ_ERR_SUCCESS)) { - error("Failed to initialize MQTT (libmosquitto library)"); - return 1; - } - return 0; -} - -static int _mqtt_create_connection(char *username, char *password) -{ - if (mosq != NULL) - mosquitto_destroy(mosq); - mosq = mosquitto_new(username, true, NULL); - if (unlikely(!mosq)) { - mosquitto_lib_cleanup(); - error("MQTT new structure -- %s", mosquitto_strerror(errno)); - return MOSQ_ERR_UNKNOWN; - } - - // Record the session start time to allow a nominal LWT timestamp - usec_t now = now_realtime_usec(); - aclk_session_sec = now / USEC_PER_SEC; - aclk_session_us = now % USEC_PER_SEC; - - _link_set_lwt("outbound/meta", 2); - - mosquitto_connect_callback_set(mosq, connect_callback); - mosquitto_disconnect_callback_set(mosq, disconnect_callback); - mosquitto_publish_callback_set(mosq, publish_callback); - - info("Using challenge-response: %s / %s", username, password); - mosquitto_username_pw_set(mosq, username, password); - - int rc = mosquitto_threaded_set(mosq, 1); - if (unlikely(rc != MOSQ_ERR_SUCCESS)) - error("Failed to tune the thread model for libmosquitto (%s)", mosquitto_strerror(rc)); - -#if defined(LIBMOSQUITTO_VERSION_NUMBER) >= 1006000 - rc = mosquitto_int_option(mosq, MQTT_PROTOCOL_V311, 0); - if (unlikely(rc != MOSQ_ERR_SUCCESS)) - error("MQTT protocol specification rc = %d (%s)", rc, mosquitto_strerror(rc)); - - rc = mosquitto_int_option(mosq, MOSQ_OPT_SEND_MAXIMUM, 1); - info("MQTT in flight messages set to 1 -- %s", mosquitto_strerror(rc)); -#endif - - return rc; -} - -static int _link_mqtt_connect(char *aclk_hostname, int aclk_port) -{ - int rc; - - rc = mosquitto_connect_async(mosq, aclk_hostname, aclk_port, ACLK_PING_INTERVAL); - - if (unlikely(rc != MOSQ_ERR_SUCCESS)) - error( - "Failed to establish link to [%s:%d] MQTT status = %d (%s)", aclk_hostname, aclk_port, rc, - mosquitto_strerror(rc)); - else - info("Establishing MQTT link to [%s:%d]", aclk_hostname, aclk_port); - - return rc; -} - -static inline void _link_mosquitto_write() -{ - int rc; - - if (unlikely(!mosq)) { - return; - } - - rc = mosquitto_loop_misc(mosq); - if (unlikely(rc != MOSQ_ERR_SUCCESS)) - debug(D_ACLK, "ACLK: failure during mosquitto_loop_misc %s", mosquitto_strerror(rc)); - - if (likely(mosquitto_want_write(mosq))) { - rc = mosquitto_loop_write(mosq, 1); - if (rc != MOSQ_ERR_SUCCESS) - debug(D_ACLK, "ACLK: failure during mosquitto_loop_write %s", mosquitto_strerror(rc)); - } -} - -void aclk_lws_connection_established(char *hostname, int port) -{ - _link_mqtt_connect(hostname, port); // Parameters only used for logging, lower layer connected. - _link_mosquitto_write(); -} - -void aclk_lws_connection_data_received() -{ - int rc = mosquitto_loop_read(mosq, 1); - if (rc != MOSQ_ERR_SUCCESS) - debug(D_ACLK, "ACLK: failure during mosquitto_loop_read %s", mosquitto_strerror(rc)); -} - -void aclk_lws_connection_closed() -{ - aclk_disconnect(); - -} - - -int mqtt_attempt_connection(char *aclk_hostname, int aclk_port, char *username, char *password) -{ - if(aclk_lws_wss_connect(aclk_hostname, aclk_port)) - return MOSQ_ERR_UNKNOWN; - aclk_lws_wss_service_loop(); - - int rc = _mqtt_create_connection(username, password); - if (rc!= MOSQ_ERR_SUCCESS) - return rc; - - mosquitto_external_callbacks_set(mosq, _mqtt_external_write_hook, _mqtt_external_read_hook); - return rc; -} - -inline int _link_event_loop() -{ - - // TODO: Check if we need to flush undelivered messages from libmosquitto on new connection attempts (QoS=1). - _link_mosquitto_write(); - aclk_lws_wss_service_loop(); - - // this is because if use LWS we don't want - // mqtt to reconnect by itself - return MOSQ_ERR_SUCCESS; -} - -void _link_shutdown() -{ - int rc; - - if (likely(!mosq)) - return; - - rc = mosquitto_disconnect(mosq); - switch (rc) { - case MOSQ_ERR_SUCCESS: - info("MQTT disconnected from broker"); - break; - default: - info("MQTT invalid structure"); - break; - }; -} - - -int _link_set_lwt(char *sub_topic, int qos) -{ - int rc; - char topic[ACLK_MAX_TOPIC + 1]; - char *final_topic; - - final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC); - if (unlikely(!final_topic)) { - errno = 0; - error("Unable to build outgoing topic; truncated?"); - return 1; - } - - usec_t lwt_time = aclk_session_sec * USEC_PER_SEC + aclk_session_us + 1; - BUFFER *b = buffer_create(512); - aclk_create_header(b, "disconnect", NULL, lwt_time / USEC_PER_SEC, lwt_time % USEC_PER_SEC, ACLK_VERSION_NEG_VERSION); - buffer_strcat(b, ", \"payload\": \"unexpected\" }"); - rc = mosquitto_will_set(mosq, topic, buffer_strlen(b), buffer_tostring(b), qos, 0); - buffer_free(b); - - return rc; -} - -int _link_subscribe(char *topic, int qos) -{ - int rc; - - if (unlikely(!mosq)) - return 1; - - mosquitto_message_callback_set(mosq, mqtt_message_callback); - - rc = mosquitto_subscribe(mosq, NULL, topic, qos); - if (unlikely(rc)) { - errno = 0; - error("Failed to register subscription %d (%s)", rc, mosquitto_strerror(rc)); - return 1; - } - - _link_mosquitto_write(); - return 0; -} - -/* - * Send a message to the cloud to specific topic - * - */ - -int _link_send_message(char *topic, const void *message, size_t len, int *mid) -{ - int rc; - size_t write_q, write_q_bytes, read_q; - - rc = mosquitto_pub_topic_check(topic); - - if (unlikely(rc != MOSQ_ERR_SUCCESS)) - return rc; - - lws_wss_check_queues(&write_q, &write_q_bytes, &read_q); - rc = mosquitto_publish(mosq, mid, topic, len, message, ACLK_QOS, 0); - -#ifdef NETDATA_INTERNAL_CHECKS - char msg_head[64]; - memset(msg_head, 0, sizeof(msg_head)); - strncpy(msg_head, (char*)message, 60); - for (size_t i = 0; i < sizeof(msg_head); i++) - if(msg_head[i] == '\n') msg_head[i] = ' '; - info("Sending MQTT len=%d mid=%d wq=%zu (%zu-bytes) readq=%zu: %s", (int)len, - *mid, write_q, write_q_bytes, read_q, msg_head); - now_realtime_timeval(&sendTimes[ *mid & 0x3ff ]); -#endif - - // TODO: Add better handling -- error will flood the logfile here - if (unlikely(rc != MOSQ_ERR_SUCCESS)) { - errno = 0; - error("MQTT message failed : %s", mosquitto_strerror(rc)); - } - _link_mosquitto_write(); - return rc; -} |