summaryrefslogtreecommitdiffstats
path: root/src/exporting/exporting_engine.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 11:19:16 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-07-24 09:53:24 +0000
commitb5f8ee61a7f7e9bd291dd26b0585d03eb686c941 (patch)
treed4d31289c39fc00da064a825df13a0b98ce95b10 /src/exporting/exporting_engine.c
parentAdding upstream version 1.44.3. (diff)
downloadnetdata-upstream.tar.xz
netdata-upstream.zip
Adding upstream version 1.46.3.upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/exporting/exporting_engine.c')
-rw-r--r--src/exporting/exporting_engine.c220
1 files changed, 220 insertions, 0 deletions
diff --git a/src/exporting/exporting_engine.c b/src/exporting/exporting_engine.c
new file mode 100644
index 000000000..eb5f8a0a8
--- /dev/null
+++ b/src/exporting/exporting_engine.c
@@ -0,0 +1,220 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "exporting_engine.h"
+
+static struct engine *engine = NULL;
+
+void analytics_exporting_connectors_ssl(BUFFER *b)
+{
+#ifdef ENABLE_HTTPS
+ if (netdata_ssl_exporting_ctx) {
+ for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
+ struct simple_connector_data *connector_specific_data = instance->connector_specific_data;
+ if (SSL_connection(&connector_specific_data->ssl)) {
+ buffer_strcat(b, "exporting");
+ break;
+ }
+ }
+ }
+#endif
+ buffer_strcat(b, "|");
+}
+
+void analytics_exporting_connectors(BUFFER *b)
+{
+ if (!engine)
+ return;
+
+ uint8_t count = 0;
+
+ for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
+ if (count)
+ buffer_strcat(b, "|");
+
+ switch (instance->config.type) {
+ case EXPORTING_CONNECTOR_TYPE_GRAPHITE:
+ buffer_strcat(b, "Graphite");
+ break;
+ case EXPORTING_CONNECTOR_TYPE_GRAPHITE_HTTP:
+ buffer_strcat(b, "GraphiteHTTP");
+ break;
+ case EXPORTING_CONNECTOR_TYPE_JSON:
+ buffer_strcat(b, "JSON");
+ break;
+ case EXPORTING_CONNECTOR_TYPE_JSON_HTTP:
+ buffer_strcat(b, "JSONHTTP");
+ break;
+ case EXPORTING_CONNECTOR_TYPE_OPENTSDB:
+ buffer_strcat(b, "OpenTSDB");
+ break;
+ case EXPORTING_CONNECTOR_TYPE_OPENTSDB_HTTP:
+ buffer_strcat(b, "OpenTSDBHTTP");
+ break;
+ case EXPORTING_CONNECTOR_TYPE_PROMETHEUS_REMOTE_WRITE:
+#ifdef ENABLE_PROMETHEUS_REMOTE_WRITE
+ buffer_strcat(b, "PrometheusRemoteWrite");
+#endif
+ break;
+ case EXPORTING_CONNECTOR_TYPE_KINESIS:
+#if HAVE_KINESIS
+ buffer_strcat(b, "Kinesis");
+#endif
+ break;
+ case EXPORTING_CONNECTOR_TYPE_PUBSUB:
+#if ENABLE_EXPORTING_PUBSUB
+ buffer_strcat(b, "Pubsub");
+#endif
+ break;
+ case EXPORTING_CONNECTOR_TYPE_MONGODB:
+#ifdef HAVE_MONGOC
+ buffer_strcat(b, "MongoDB");
+#endif
+ break;
+ default:
+ buffer_strcat(b, "Unknown");
+ }
+
+ count++;
+ }
+}
+
+/**
+ * Exporting Clean Engine
+ *
+ * Clean all variables allocated inside engine structure
+ *
+ * @param en a pointer to the structure that will be cleaned.
+ */
+static void exporting_clean_engine()
+{
+ if (!engine)
+ return;
+
+#if HAVE_KINESIS
+ if (engine->aws_sdk_initialized)
+ aws_sdk_shutdown();
+#endif
+
+#ifdef ENABLE_PROMETHEUS_REMOTE_WRITE
+ if (engine->protocol_buffers_initialized)
+ protocol_buffers_shutdown();
+#endif
+
+ //Cleanup web api
+ prometheus_clean_server_root();
+
+ for (struct instance *instance = engine->instance_root; instance;) {
+ struct instance *current_instance = instance;
+ instance = instance->next;
+
+ clean_instance(current_instance);
+ }
+
+ freez((void *)engine->config.hostname);
+ freez(engine);
+}
+
+/**
+ * Clean up the main exporting thread and all connector workers on Netdata exit
+ *
+ * @param ptr thread data.
+ */
+static void exporting_main_cleanup(void *pptr)
+{
+ struct netdata_static_thread *static_thread = CLEANUP_FUNCTION_GET_PTR(pptr);
+ if(!static_thread) return;
+
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
+
+ netdata_log_info("cleaning up...");
+
+ if (!engine) {
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+ return;
+ }
+
+ engine->exit = 1;
+
+ int found = 0;
+ usec_t max = 2 * USEC_PER_SEC, step = 50000;
+
+ for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
+ if (!instance->exited) {
+ found++;
+ netdata_log_info("stopping worker for instance %s", instance->config.name);
+ uv_mutex_unlock(&instance->mutex);
+ instance->data_is_ready = 1;
+ uv_cond_signal(&instance->cond_var);
+ } else
+ netdata_log_info("found stopped worker for instance %s", instance->config.name);
+ }
+
+ while (found && max > 0) {
+ max -= step;
+ netdata_log_info("Waiting %d exporting connectors to finish...", found);
+ sleep_usec(step);
+ found = 0;
+
+ for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
+ if (!instance->exited)
+ found++;
+ }
+ }
+
+ exporting_clean_engine();
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+}
+
+/**
+ * Exporting engine main
+ *
+ * The main thread used to control the exporting engine.
+ *
+ * @param ptr a pointer to netdata_static_structure.
+ *
+ * @return It always returns NULL.
+ */
+void *exporting_main(void *ptr)
+{
+ CLEANUP_FUNCTION_REGISTER(exporting_main_cleanup) cleanup_ptr = ptr;
+
+ engine = read_exporting_config();
+ if (!engine) {
+ netdata_log_info("EXPORTING: no exporting connectors configured");
+ goto cleanup;
+ }
+
+ if (init_connectors(engine) != 0) {
+ netdata_log_error("EXPORTING: cannot initialize exporting connectors");
+
+ analytics_statistic_t statistic = { "EXPORTING_START", "FAIL", "-" };
+ analytics_statistic_send(&statistic);
+ goto cleanup;
+ }
+
+ RRDSET *st_main_rusage = NULL;
+ RRDDIM *rd_main_user = NULL;
+ RRDDIM *rd_main_system = NULL;
+ create_main_rusage_chart(&st_main_rusage, &rd_main_user, &rd_main_system);
+
+ usec_t step_ut = localhost->rrd_update_every * USEC_PER_SEC;
+ heartbeat_t hb;
+ heartbeat_init(&hb);
+
+ while (service_running(SERVICE_EXPORTERS)) {
+ heartbeat_next(&hb, step_ut);
+ engine->now = now_realtime_sec();
+
+ if (mark_scheduled_instances(engine))
+ prepare_buffers(engine);
+
+ send_main_rusage(st_main_rusage, rd_main_user, rd_main_system);
+
+#ifdef UNIT_TESTING
+ return NULL;
+#endif
+ }
+
+cleanup:
+ return NULL;
+}