summaryrefslogtreecommitdiffstats
path: root/exporting/mongodb/mongodb.c
diff options
context:
space:
mode:
Diffstat (limited to 'exporting/mongodb/mongodb.c')
-rw-r--r--exporting/mongodb/mongodb.c387
1 files changed, 387 insertions, 0 deletions
diff --git a/exporting/mongodb/mongodb.c b/exporting/mongodb/mongodb.c
new file mode 100644
index 000000000..44922a242
--- /dev/null
+++ b/exporting/mongodb/mongodb.c
@@ -0,0 +1,387 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#define EXPORTING_INTERNALS
+#include "mongodb.h"
+
+#define CONFIG_FILE_LINE_MAX ((CONFIG_MAX_NAME + CONFIG_MAX_VALUE + 1024) * 2)
+
+/**
+ * Initialize MongoDB connector specific data, including a ring buffer
+ *
+ * @param instance an instance data structure.
+ * @return Returns 0 on success, 1 on failure.
+ */
+int mongodb_init(struct instance *instance)
+{
+ struct mongodb_specific_config *connector_specific_config = instance->config.connector_specific_config;
+ mongoc_uri_t *uri;
+ bson_error_t bson_error;
+
+ if (unlikely(!connector_specific_config->collection || !*connector_specific_config->collection)) {
+ error("EXPORTING: collection name is a mandatory MongoDB parameter, but it is not configured");
+ return 1;
+ }
+
+ uri = mongoc_uri_new_with_error(instance->config.destination, &bson_error);
+ if (unlikely(!uri)) {
+ error(
+ "EXPORTING: failed to parse URI: %s. Error message: %s", instance->config.destination, bson_error.message);
+ return 1;
+ }
+
+ int32_t socket_timeout =
+ mongoc_uri_get_option_as_int32(uri, MONGOC_URI_SOCKETTIMEOUTMS, instance->config.timeoutms);
+ if (!mongoc_uri_set_option_as_int32(uri, MONGOC_URI_SOCKETTIMEOUTMS, socket_timeout)) {
+ error("EXPORTING: failed to set %s to the value %d", MONGOC_URI_SOCKETTIMEOUTMS, socket_timeout);
+ return 1;
+ };
+
+ struct mongodb_specific_data *connector_specific_data =
+ (struct mongodb_specific_data *)instance->connector_specific_data;
+
+ connector_specific_data->client = mongoc_client_new_from_uri(uri);
+ if (unlikely(!connector_specific_data->client)) {
+ error("EXPORTING: failed to create a new client");
+ return 1;
+ }
+
+ if (!mongoc_client_set_appname(connector_specific_data->client, "netdata")) {
+ error("EXPORTING: failed to set client appname");
+ };
+
+ connector_specific_data->collection = mongoc_client_get_collection(
+ connector_specific_data->client, connector_specific_config->database, connector_specific_config->collection);
+
+ mongoc_uri_destroy(uri);
+
+ // create a ring buffer
+ struct bson_buffer *first_buffer = NULL;
+
+ if (instance->config.buffer_on_failures < 2)
+ instance->config.buffer_on_failures = 1;
+ else
+ instance->config.buffer_on_failures -= 1;
+
+ for (int i = 0; i < instance->config.buffer_on_failures; i++) {
+ struct bson_buffer *current_buffer = callocz(1, sizeof(struct bson_buffer));
+
+ if (!connector_specific_data->first_buffer)
+ first_buffer = current_buffer;
+ else
+ current_buffer->next = connector_specific_data->first_buffer;
+
+ connector_specific_data->first_buffer = current_buffer;
+ }
+
+ first_buffer->next = connector_specific_data->first_buffer;
+ connector_specific_data->last_buffer = connector_specific_data->first_buffer;
+
+ return 0;
+}
+
+/**
+ * Initialize a MongoDB connector instance
+ *
+ * @param instance an instance data structure.
+ * @return Returns 0 on success, 1 on failure.
+ */
+int init_mongodb_instance(struct instance *instance)
+{
+ instance->worker = mongodb_connector_worker;
+
+ instance->start_batch_formatting = NULL;
+ instance->start_host_formatting = format_host_labels_json_plaintext;
+ instance->start_chart_formatting = NULL;
+
+ if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED)
+ instance->metric_formatting = format_dimension_collected_json_plaintext;
+ else
+ instance->metric_formatting = format_dimension_stored_json_plaintext;
+
+ instance->end_chart_formatting = NULL;
+ instance->end_host_formatting = flush_host_labels;
+ instance->end_batch_formatting = format_batch_mongodb;
+
+ instance->prepare_header = NULL;
+ instance->check_response = NULL;
+
+ instance->buffer = (void *)buffer_create(0);
+ if (!instance->buffer) {
+ error("EXPORTING: cannot create buffer for MongoDB exporting connector instance %s", instance->config.name);
+ return 1;
+ }
+ if (uv_mutex_init(&instance->mutex))
+ return 1;
+ if (uv_cond_init(&instance->cond_var))
+ return 1;
+
+ struct mongodb_specific_data *connector_specific_data = callocz(1, sizeof(struct mongodb_specific_data));
+ instance->connector_specific_data = (void *)connector_specific_data;
+
+ instance->config.timeoutms =
+ (instance->config.update_every >= 2) ? (instance->engine->config.update_every * MSEC_PER_SEC - 500) : 1000;
+
+ if (!instance->engine->mongoc_initialized) {
+ mongoc_init();
+ instance->engine->mongoc_initialized = 1;
+ }
+
+ if (unlikely(mongodb_init(instance))) {
+ error("EXPORTING: cannot initialize MongoDB exporting connector");
+ return 1;
+ }
+
+ return 0;
+}
+
+/**
+ * Free an array of BSON structures
+ *
+ * @param insert an array of documents.
+ * @param documents_inserted the number of documents inserted.
+ */
+void free_bson(bson_t **insert, size_t documents_inserted)
+{
+ size_t i;
+
+ for (i = 0; i < documents_inserted; i++)
+ bson_destroy(insert[i]);
+
+ freez(insert);
+}
+
+/**
+ * Format a batch for the MongoDB connector
+ *
+ * @param instance an instance data structure.
+ * @return Returns 0 on success, 1 on failure.
+ */
+int format_batch_mongodb(struct instance *instance)
+{
+ struct mongodb_specific_data *connector_specific_data =
+ (struct mongodb_specific_data *)instance->connector_specific_data;
+ struct stats *stats = &instance->stats;
+
+ bson_t **insert = connector_specific_data->last_buffer->insert;
+ if (insert) {
+ // ring buffer is full, reuse the oldest element
+ connector_specific_data->first_buffer = connector_specific_data->first_buffer->next;
+ free_bson(insert, connector_specific_data->last_buffer->documents_inserted);
+ connector_specific_data->total_documents_inserted -= connector_specific_data->last_buffer->documents_inserted;
+ stats->buffered_bytes -= connector_specific_data->last_buffer->buffered_bytes;
+ }
+ insert = callocz((size_t)stats->buffered_metrics, sizeof(bson_t *));
+ connector_specific_data->last_buffer->insert = insert;
+
+ BUFFER *buffer = (BUFFER *)instance->buffer;
+ char *start = (char *)buffer_tostring(buffer);
+ char *end = start;
+
+ size_t documents_inserted = 0;
+
+ while (*end && documents_inserted <= (size_t)stats->buffered_metrics) {
+ while (*end && *end != '\n')
+ end++;
+
+ if (likely(*end)) {
+ *end = '\0';
+ end++;
+ } else {
+ break;
+ }
+
+ bson_error_t bson_error;
+ insert[documents_inserted] = bson_new_from_json((const uint8_t *)start, -1, &bson_error);
+
+ if (unlikely(!insert[documents_inserted])) {
+ error(
+ "EXPORTING: Failed creating a BSON document from a JSON string \"%s\" : %s", start, bson_error.message);
+ free_bson(insert, documents_inserted);
+ return 1;
+ }
+
+ start = end;
+
+ documents_inserted++;
+ }
+
+ stats->buffered_bytes += connector_specific_data->last_buffer->buffered_bytes = buffer_strlen(buffer);
+
+ buffer_flush(buffer);
+
+ // The stats->buffered_metrics is used in the MongoDB batch formatting as a variable for the number
+ // of metrics, added in the current iteration, so we are clearing it here. We will use the
+ // connector_specific_data->total_documents_inserted in the worker to show the statistics.
+ stats->buffered_metrics = 0;
+ connector_specific_data->total_documents_inserted += documents_inserted;
+
+ connector_specific_data->last_buffer->documents_inserted = documents_inserted;
+ connector_specific_data->last_buffer = connector_specific_data->last_buffer->next;
+
+ return 0;
+}
+
+/**
+ * Clean a MongoDB connector instance up
+ *
+ * @param instance an instance data structure.
+ */
+void mongodb_cleanup(struct instance *instance)
+{
+ info("EXPORTING: cleaning up instance %s ...", instance->config.name);
+
+ struct mongodb_specific_data *connector_specific_data =
+ (struct mongodb_specific_data *)instance->connector_specific_data;
+
+ mongoc_collection_destroy(connector_specific_data->collection);
+ mongoc_client_destroy(connector_specific_data->client);
+ if (instance->engine->mongoc_initialized) {
+ mongoc_cleanup();
+ instance->engine->mongoc_initialized = 0;
+ }
+
+ buffer_free(instance->buffer);
+
+ struct bson_buffer *next_buffer = connector_specific_data->first_buffer;
+ for (int i = 0; i < instance->config.buffer_on_failures; i++) {
+ struct bson_buffer *current_buffer = next_buffer;
+ next_buffer = next_buffer->next;
+
+ if (current_buffer->insert)
+ free_bson(current_buffer->insert, current_buffer->documents_inserted);
+ freez(current_buffer);
+ }
+
+ freez(connector_specific_data);
+
+ struct mongodb_specific_config *connector_specific_config =
+ (struct mongodb_specific_config *)instance->config.connector_specific_config;
+ freez(connector_specific_config->database);
+ freez(connector_specific_config->collection);
+ freez(connector_specific_config);
+
+ info("EXPORTING: instance %s exited", instance->config.name);
+ instance->exited = 1;
+
+ return;
+}
+
+/**
+ * MongoDB connector worker
+ *
+ * Runs in a separate thread for every instance.
+ *
+ * @param instance_p an instance data structure.
+ */
+void mongodb_connector_worker(void *instance_p)
+{
+ struct instance *instance = (struct instance *)instance_p;
+ struct mongodb_specific_config *connector_specific_config = instance->config.connector_specific_config;
+ struct mongodb_specific_data *connector_specific_data =
+ (struct mongodb_specific_data *)instance->connector_specific_data;
+
+ while (!instance->engine->exit) {
+ struct stats *stats = &instance->stats;
+
+ uv_mutex_lock(&instance->mutex);
+ if (!connector_specific_data->first_buffer->insert ||
+ !connector_specific_data->first_buffer->documents_inserted) {
+ while (!instance->data_is_ready)
+ uv_cond_wait(&instance->cond_var, &instance->mutex);
+ instance->data_is_ready = 0;
+ }
+
+ if (unlikely(instance->engine->exit)) {
+ uv_mutex_unlock(&instance->mutex);
+ break;
+ }
+
+ // reset the monitoring chart counters
+ stats->received_bytes =
+ stats->sent_bytes =
+ stats->sent_metrics =
+ stats->lost_metrics =
+ stats->receptions =
+ stats->transmission_successes =
+ stats->transmission_failures =
+ stats->data_lost_events =
+ stats->lost_bytes =
+ stats->reconnects = 0;
+
+ bson_t **insert = connector_specific_data->first_buffer->insert;
+ size_t documents_inserted = connector_specific_data->first_buffer->documents_inserted;
+ size_t buffered_bytes = connector_specific_data->first_buffer->buffered_bytes;
+
+ connector_specific_data->first_buffer->insert = NULL;
+ connector_specific_data->first_buffer->documents_inserted = 0;
+ connector_specific_data->first_buffer->buffered_bytes = 0;
+ connector_specific_data->first_buffer = connector_specific_data->first_buffer->next;
+
+ uv_mutex_unlock(&instance->mutex);
+
+ size_t data_size = 0;
+ for (size_t i = 0; i < documents_inserted; i++) {
+ data_size += insert[i]->len;
+ }
+
+ debug(
+ D_BACKEND,
+ "EXPORTING: mongodb_insert(): destination = %s, database = %s, collection = %s, data size = %zu",
+ instance->config.destination,
+ connector_specific_config->database,
+ connector_specific_config->collection,
+ data_size);
+
+ if (likely(documents_inserted != 0)) {
+ bson_error_t bson_error;
+ if (likely(mongoc_collection_insert_many(
+ connector_specific_data->collection,
+ (const bson_t **)insert,
+ documents_inserted,
+ NULL,
+ NULL,
+ &bson_error))) {
+ stats->sent_metrics = documents_inserted;
+ stats->sent_bytes += data_size;
+ stats->transmission_successes++;
+ stats->receptions++;
+ } else {
+ // oops! we couldn't send (all or some of the) data
+ error("EXPORTING: %s", bson_error.message);
+ error(
+ "EXPORTING: failed to write data to the database '%s'. "
+ "Willing to write %zu bytes, wrote %zu bytes.",
+ instance->config.destination, data_size, 0UL);
+
+ stats->transmission_failures++;
+ stats->data_lost_events++;
+ stats->lost_bytes += buffered_bytes;
+ stats->lost_metrics += documents_inserted;
+ }
+ }
+
+ free_bson(insert, documents_inserted);
+
+ if (unlikely(instance->engine->exit))
+ break;
+
+ uv_mutex_lock(&instance->mutex);
+
+ stats->buffered_metrics = connector_specific_data->total_documents_inserted;
+
+ send_internal_metrics(instance);
+
+ connector_specific_data->total_documents_inserted -= documents_inserted;
+
+ stats->buffered_metrics = 0;
+ stats->buffered_bytes -= buffered_bytes;
+
+ uv_mutex_unlock(&instance->mutex);
+
+#ifdef UNIT_TESTING
+ return;
+#endif
+ }
+
+ mongodb_cleanup(instance);
+}