summaryrefslogtreecommitdiffstats
path: root/aclk/legacy/mqtt.c
diff options
context:
space:
mode:
Diffstat (limited to 'aclk/legacy/mqtt.c')
-rw-r--r--aclk/legacy/mqtt.c366
1 files changed, 366 insertions, 0 deletions
diff --git a/aclk/legacy/mqtt.c b/aclk/legacy/mqtt.c
new file mode 100644
index 000000000..6f38a20dc
--- /dev/null
+++ b/aclk/legacy/mqtt.c
@@ -0,0 +1,366 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include <libnetdata/json/json.h>
+#include "../../daemon/common.h"
+#include "mqtt.h"
+#include "aclk_lws_wss_client.h"
+#include "aclk_stats.h"
+#include "aclk_rx_msgs.h"
+
+extern usec_t aclk_session_us;
+extern time_t aclk_session_sec;
+
+inline const char *_link_strerror(int rc)
+{
+ return mosquitto_strerror(rc);
+}
+
+#ifdef NETDATA_INTERNAL_CHECKS
+static struct timeval sendTimes[1024];
+#endif
+
+static struct mosquitto *mosq = NULL;
+
+
+void mqtt_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg)
+{
+ UNUSED(mosq);
+ UNUSED(obj);
+
+ aclk_handle_cloud_message(msg->payload);
+}
+
+void publish_callback(struct mosquitto *mosq, void *obj, int rc)
+{
+ UNUSED(mosq);
+ UNUSED(obj);
+ UNUSED(rc);
+#ifdef NETDATA_INTERNAL_CHECKS
+ struct timeval now, *orig;
+ now_realtime_timeval(&now);
+ orig = &sendTimes[ rc & 0x3ff ];
+ int64_t diff = (now.tv_sec - orig->tv_sec) * USEC_PER_SEC + (now.tv_usec - orig->tv_usec);
+ diff /= 1000;
+
+ info("Publish_callback: mid=%d latency=%" PRId64 "ms", rc, diff);
+
+ aclk_metric_mat_update(&aclk_metrics_per_sample.latency, diff);
+#endif
+ return;
+}
+
+void connect_callback(struct mosquitto *mosq, void *obj, int rc)
+{
+ UNUSED(mosq);
+ UNUSED(obj);
+ UNUSED(rc);
+
+ info("Connection to cloud estabilished");
+ aclk_connect();
+
+ return;
+}
+
+void disconnect_callback(struct mosquitto *mosq, void *obj, int rc)
+{
+ UNUSED(mosq);
+ UNUSED(obj);
+ UNUSED(rc);
+
+ if (netdata_exit)
+ info("Connection to cloud terminated due to agent shutdown");
+ else {
+ errno = 0;
+ error("Connection to cloud failed");
+ }
+ aclk_disconnect();
+
+ aclk_lws_wss_mqtt_layer_disconect_notif();
+
+ return;
+}
+
+void _show_mqtt_info()
+{
+ int libmosq_major, libmosq_minor, libmosq_revision, libmosq_version;
+ libmosq_version = mosquitto_lib_version(&libmosq_major, &libmosq_minor, &libmosq_revision);
+
+ info(
+ "Detected libmosquitto library version %d, %d.%d.%d", libmosq_version, libmosq_major, libmosq_minor,
+ libmosq_revision);
+}
+
+size_t _mqtt_external_write_hook(void *buf, size_t count)
+{
+ return aclk_lws_wss_client_write(buf, count);
+}
+
+size_t _mqtt_external_read_hook(void *buf, size_t count)
+{
+ return aclk_lws_wss_client_read(buf, count);
+}
+
+int _mqtt_lib_init()
+{
+ int rc;
+ //int libmosq_major, libmosq_minor, libmosq_revision, libmosq_version;
+ /* Commenting out now as it is unused - do not delete, this is needed for the on-prem version.
+ char *ca_crt;
+ char *server_crt;
+ char *server_key;
+
+ // show library info so can have it in the logfile
+ //libmosq_version = mosquitto_lib_version(&libmosq_major, &libmosq_minor, &libmosq_revision);
+ ca_crt = config_get(CONFIG_SECTION_CLOUD, "link cert", "*");
+ server_crt = config_get(CONFIG_SECTION_CLOUD, "link server cert", "*");
+ server_key = config_get(CONFIG_SECTION_CLOUD, "link server key", "*");
+
+ if (ca_crt[0] == '*') {
+ freez(ca_crt);
+ ca_crt = NULL;
+ }
+
+ if (server_crt[0] == '*') {
+ freez(server_crt);
+ server_crt = NULL;
+ }
+
+ if (server_key[0] == '*') {
+ freez(server_key);
+ server_key = NULL;
+ }
+ */
+
+ // info(
+ // "Detected libmosquitto library version %d, %d.%d.%d", libmosq_version, libmosq_major, libmosq_minor,
+ // libmosq_revision);
+
+ rc = mosquitto_lib_init();
+ if (unlikely(rc != MOSQ_ERR_SUCCESS)) {
+ error("Failed to initialize MQTT (libmosquitto library)");
+ return 1;
+ }
+ return 0;
+}
+
+static int _mqtt_create_connection(char *username, char *password)
+{
+ if (mosq != NULL)
+ mosquitto_destroy(mosq);
+ mosq = mosquitto_new(username, true, NULL);
+ if (unlikely(!mosq)) {
+ mosquitto_lib_cleanup();
+ error("MQTT new structure -- %s", mosquitto_strerror(errno));
+ return MOSQ_ERR_UNKNOWN;
+ }
+
+ // Record the session start time to allow a nominal LWT timestamp
+ usec_t now = now_realtime_usec();
+ aclk_session_sec = now / USEC_PER_SEC;
+ aclk_session_us = now % USEC_PER_SEC;
+
+ _link_set_lwt("outbound/meta", 2);
+
+ mosquitto_connect_callback_set(mosq, connect_callback);
+ mosquitto_disconnect_callback_set(mosq, disconnect_callback);
+ mosquitto_publish_callback_set(mosq, publish_callback);
+
+ info("Using challenge-response: %s / %s", username, password);
+ mosquitto_username_pw_set(mosq, username, password);
+
+ int rc = mosquitto_threaded_set(mosq, 1);
+ if (unlikely(rc != MOSQ_ERR_SUCCESS))
+ error("Failed to tune the thread model for libmoquitto (%s)", mosquitto_strerror(rc));
+
+#if defined(LIBMOSQUITTO_VERSION_NUMBER) >= 1006000
+ rc = mosquitto_int_option(mosq, MQTT_PROTOCOL_V311, 0);
+ if (unlikely(rc != MOSQ_ERR_SUCCESS))
+ error("MQTT protocol specification rc = %d (%s)", rc, mosquitto_strerror(rc));
+
+ rc = mosquitto_int_option(mosq, MOSQ_OPT_SEND_MAXIMUM, 1);
+ info("MQTT in flight messages set to 1 -- %s", mosquitto_strerror(rc));
+#endif
+
+ return rc;
+}
+
+static int _link_mqtt_connect(char *aclk_hostname, int aclk_port)
+{
+ int rc;
+
+ rc = mosquitto_connect_async(mosq, aclk_hostname, aclk_port, ACLK_PING_INTERVAL);
+
+ if (unlikely(rc != MOSQ_ERR_SUCCESS))
+ error(
+ "Failed to establish link to [%s:%d] MQTT status = %d (%s)", aclk_hostname, aclk_port, rc,
+ mosquitto_strerror(rc));
+ else
+ info("Establishing MQTT link to [%s:%d]", aclk_hostname, aclk_port);
+
+ return rc;
+}
+
+static inline void _link_mosquitto_write()
+{
+ int rc;
+
+ if (unlikely(!mosq)) {
+ return;
+ }
+
+ rc = mosquitto_loop_misc(mosq);
+ if (unlikely(rc != MOSQ_ERR_SUCCESS))
+ debug(D_ACLK, "ACLK: failure during mosquitto_loop_misc %s", mosquitto_strerror(rc));
+
+ if (likely(mosquitto_want_write(mosq))) {
+ rc = mosquitto_loop_write(mosq, 1);
+ if (rc != MOSQ_ERR_SUCCESS)
+ debug(D_ACLK, "ACLK: failure during mosquitto_loop_write %s", mosquitto_strerror(rc));
+ }
+}
+
+void aclk_lws_connection_established(char *hostname, int port)
+{
+ _link_mqtt_connect(hostname, port); // Parameters only used for logging, lower layer connected.
+ _link_mosquitto_write();
+}
+
+void aclk_lws_connection_data_received()
+{
+ int rc = mosquitto_loop_read(mosq, 1);
+ if (rc != MOSQ_ERR_SUCCESS)
+ debug(D_ACLK, "ACLK: failure during mosquitto_loop_read %s", mosquitto_strerror(rc));
+}
+
+void aclk_lws_connection_closed()
+{
+ aclk_disconnect();
+
+}
+
+
+int mqtt_attempt_connection(char *aclk_hostname, int aclk_port, char *username, char *password)
+{
+ if(aclk_lws_wss_connect(aclk_hostname, aclk_port))
+ return MOSQ_ERR_UNKNOWN;
+ aclk_lws_wss_service_loop();
+
+ int rc = _mqtt_create_connection(username, password);
+ if (rc!= MOSQ_ERR_SUCCESS)
+ return rc;
+
+ mosquitto_external_callbacks_set(mosq, _mqtt_external_write_hook, _mqtt_external_read_hook);
+ return rc;
+}
+
+inline int _link_event_loop()
+{
+
+ // TODO: Check if we need to flush undelivered messages from libmosquitto on new connection attempts (QoS=1).
+ _link_mosquitto_write();
+ aclk_lws_wss_service_loop();
+
+ // this is because if use LWS we don't want
+ // mqtt to reconnect by itself
+ return MOSQ_ERR_SUCCESS;
+}
+
+void _link_shutdown()
+{
+ int rc;
+
+ if (likely(!mosq))
+ return;
+
+ rc = mosquitto_disconnect(mosq);
+ switch (rc) {
+ case MOSQ_ERR_SUCCESS:
+ info("MQTT disconnected from broker");
+ break;
+ default:
+ info("MQTT invalid structure");
+ break;
+ };
+}
+
+
+int _link_set_lwt(char *sub_topic, int qos)
+{
+ int rc;
+ char topic[ACLK_MAX_TOPIC + 1];
+ char *final_topic;
+
+ final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
+ if (unlikely(!final_topic)) {
+ errno = 0;
+ error("Unable to build outgoing topic; truncated?");
+ return 1;
+ }
+
+ usec_t lwt_time = aclk_session_sec * USEC_PER_SEC + aclk_session_us + 1;
+ BUFFER *b = buffer_create(512);
+ aclk_create_header(b, "disconnect", NULL, lwt_time / USEC_PER_SEC, lwt_time % USEC_PER_SEC, ACLK_VERSION_NEG_VERSION);
+ buffer_strcat(b, ", \"payload\": \"unexpected\" }");
+ rc = mosquitto_will_set(mosq, topic, buffer_strlen(b), buffer_tostring(b), qos, 0);
+ buffer_free(b);
+
+ return rc;
+}
+
+int _link_subscribe(char *topic, int qos)
+{
+ int rc;
+
+ if (unlikely(!mosq))
+ return 1;
+
+ mosquitto_message_callback_set(mosq, mqtt_message_callback);
+
+ rc = mosquitto_subscribe(mosq, NULL, topic, qos);
+ if (unlikely(rc)) {
+ errno = 0;
+ error("Failed to register subscription %d (%s)", rc, mosquitto_strerror(rc));
+ return 1;
+ }
+
+ _link_mosquitto_write();
+ return 0;
+}
+
+/*
+ * Send a message to the cloud to specific topic
+ *
+ */
+
+int _link_send_message(char *topic, const void *message, size_t len, int *mid)
+{
+ int rc;
+ size_t write_q, write_q_bytes, read_q;
+
+ rc = mosquitto_pub_topic_check(topic);
+
+ if (unlikely(rc != MOSQ_ERR_SUCCESS))
+ return rc;
+
+ lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
+ rc = mosquitto_publish(mosq, mid, topic, len, message, ACLK_QOS, 0);
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ char msg_head[64];
+ memset(msg_head, 0, sizeof(msg_head));
+ strncpy(msg_head, (char*)message, 60);
+ for (size_t i = 0; i < sizeof(msg_head); i++)
+ if(msg_head[i] == '\n') msg_head[i] = ' ';
+ info("Sending MQTT len=%d mid=%d wq=%zu (%zu-bytes) readq=%zu: %s", (int)len,
+ *mid, write_q, write_q_bytes, read_q, msg_head);
+ now_realtime_timeval(&sendTimes[ *mid & 0x3ff ]);
+#endif
+
+ // TODO: Add better handling -- error will flood the logfile here
+ if (unlikely(rc != MOSQ_ERR_SUCCESS)) {
+ errno = 0;
+ error("MQTT message failed : %s", mosquitto_strerror(rc));
+ }
+ _link_mosquitto_write();
+ return rc;
+}