summaryrefslogtreecommitdiffstats
path: root/exporting/mongodb
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 02:57:58 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 02:57:58 +0000
commitbe1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 (patch)
tree9754ff1ca740f6346cf8483ec915d4054bc5da2d /exporting/mongodb
parentInitial commit. (diff)
downloadnetdata-be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97.tar.xz
netdata-be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97.zip
Adding upstream version 1.44.3.upstream/1.44.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'exporting/mongodb')
-rw-r--r--exporting/mongodb/Makefile.am8
l---------exporting/mongodb/README.md1
-rw-r--r--exporting/mongodb/integrations/mongodb.md145
-rw-r--r--exporting/mongodb/metadata.yaml151
-rw-r--r--exporting/mongodb/mongodb.c392
-rw-r--r--exporting/mongodb/mongodb.h35
6 files changed, 732 insertions, 0 deletions
diff --git a/exporting/mongodb/Makefile.am b/exporting/mongodb/Makefile.am
new file mode 100644
index 00000000..161784b8
--- /dev/null
+++ b/exporting/mongodb/Makefile.am
@@ -0,0 +1,8 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+dist_noinst_DATA = \
+ README.md \
+ $(NULL)
diff --git a/exporting/mongodb/README.md b/exporting/mongodb/README.md
new file mode 120000
index 00000000..a2825305
--- /dev/null
+++ b/exporting/mongodb/README.md
@@ -0,0 +1 @@
+integrations/mongodb.md \ No newline at end of file
diff --git a/exporting/mongodb/integrations/mongodb.md b/exporting/mongodb/integrations/mongodb.md
new file mode 100644
index 00000000..30dfe4f8
--- /dev/null
+++ b/exporting/mongodb/integrations/mongodb.md
@@ -0,0 +1,145 @@
+<!--startmeta
+custom_edit_url: "https://github.com/netdata/netdata/edit/master/exporting/mongodb/README.md"
+meta_yaml: "https://github.com/netdata/netdata/edit/master/exporting/mongodb/metadata.yaml"
+sidebar_label: "MongoDB"
+learn_status: "Published"
+learn_rel_path: "Exporting"
+message: "DO NOT EDIT THIS FILE DIRECTLY, IT IS GENERATED BY THE EXPORTER'S metadata.yaml FILE"
+endmeta-->
+
+# MongoDB
+
+
+<img src="https://netdata.cloud/img/mongodb.svg" width="150"/>
+
+
+Use the MongoDB connector for the exporting engine to archive your agent's metrics to a MongoDB database
+for long-term storage, further analysis, or correlation with data from other sources.
+
+
+
+<img src="https://img.shields.io/badge/maintained%20by-Netdata-%2300ab44" />
+
+## Setup
+
+### Prerequisites
+
+####
+
+- To use MongoDB as an external storage for long-term archiving, you should first [install](http://mongoc.org/libmongoc/current/installing.html) libmongoc 1.7.0 or higher.
+- Next, re-install Netdata from the source, which detects that the required library is now available.
+
+
+
+### Configuration
+
+#### File
+
+The configuration file name for this integration is `exporting.conf`.
+
+
+You can edit the configuration file using the `edit-config` script from the
+Netdata [config directory](https://github.com/netdata/netdata/blob/master/docs/configure/nodes.md#the-netdata-config-directory).
+
+```bash
+cd /etc/netdata 2>/dev/null || cd /opt/netdata/etc/netdata
+sudo ./edit-config exporting.conf
+```
+#### Options
+
+The following options can be defined for this exporter.
+
+
+<details><summary>Config options</summary>
+
+| Name | Description | Default | Required |
+|:----|:-----------|:-------|:--------:|
+| enabled | Enables or disables an exporting connector instance (yes/no). | no | yes |
+| destination | Accepts a space separated list of hostnames, IPs (IPv4 and IPv6) and ports to connect to. Netdata will use the first available to send the metrics. | localhost | yes |
+| username | Username for HTTP authentication | my_username | no |
+| password | Password for HTTP authentication | my_password | no |
+| data source | Selects the kind of data that will be sent to the external database. (as collected/average/sum) | | no |
+| hostname | The hostname to be used for sending data to the external database server. | [global].hostname | no |
+| prefix | The prefix to add to all metrics. | Netdata | no |
+| update every | Frequency of sending sending data to the external database, in seconds. | 10 | no |
+| buffer on failures | The number of iterations (`update every` seconds) to buffer data, when the external database server is not available. | 10 | no |
+| timeout ms | The timeout in milliseconds to wait for the external database server to process the data. | 2 * update_every * 1000 | no |
+| send hosts matching | Hosts filter. Determines which hosts will be sent to the external database. The syntax is [simple patterns](https://github.com/netdata/netdata/tree/master/libnetdata/simple_pattern#simple-patterns). | localhost * | no |
+| send charts matching | One or more space separated patterns (use * as wildcard) checked against both chart id and chart name. | * | no |
+| send names instead of ids | Controls the metric names Netdata should send to the external database (yes/no). | | no |
+| send configured labels | Controls if host labels defined in the `[host labels]` section in `netdata.conf` should be sent to the external database (yes/no). | | no |
+| send automatic labels | Controls if automatically created labels, like `_os_name` or `_architecture` should be sent to the external database (yes/no). | | no |
+
+##### destination
+
+The format of each item in this list, is: [PROTOCOL:]IP[:PORT].
+- PROTOCOL can be udp or tcp. tcp is the default and only supported by the current exporting engine.
+- IP can be XX.XX.XX.XX (IPv4), or [XX:XX...XX:XX] (IPv6). For IPv6 you can to enclose the IP in [] to separate it from the port.
+- PORT can be a number of a service name. If omitted, the default port for the exporting connector will be used.
+
+Example IPv4:
+ ```yaml
+ destination = 10.11.14.2:27017 10.11.14.3:4242 10.11.14.4:27017
+ ```
+Example IPv6 and IPv4 together:
+```yaml
+destination = [ffff:...:0001]:2003 10.11.12.1:2003
+```
+When multiple servers are defined, Netdata will try the next one when the previous one fails.
+
+
+##### update every
+
+Netdata will add some randomness to this number, to prevent stressing the external server when many Netdata servers
+send data to the same database. This randomness does not affect the quality of the data, only the time they are sent.
+
+
+##### buffer on failures
+
+If the server fails to receive the data after that many failures, data loss on the connector instance is expected (Netdata will also log it).
+
+
+##### send hosts matching
+
+Includes one or more space separated patterns, using * as wildcard (any number of times within each pattern).
+The patterns are checked against the hostname (the localhost is always checked as localhost), allowing us to
+filter which hosts will be sent to the external database when this Netdata is a central Netdata aggregating multiple hosts.
+
+A pattern starting with `!` gives a negative match. So to match all hosts named `*db*` except hosts containing `*child*`,
+use `!*child* *db*` (so, the order is important: the first pattern matching the hostname will be used - positive or negative).
+
+
+##### send charts matching
+
+A pattern starting with ! gives a negative match. So to match all charts named apps.* except charts ending in *reads,
+use !*reads apps.* (so, the order is important: the first pattern matching the chart id or the chart name will be used,
+positive or negative). There is also a URL parameter filter that can be used while querying allmetrics. The URL parameter
+has a higher priority than the configuration option.
+
+
+##### send names instead of ids
+
+Netdata supports names and IDs for charts and dimensions. Usually IDs are unique identifiers as read by the system and names
+are human friendly labels (also unique). Most charts and metrics have the same ID and name, but in several cases they are
+different : disks with device-mapper, interrupts, QoS classes, statsd synthetic charts, etc.
+
+
+</details>
+
+#### Examples
+
+##### Basic configuration
+
+The default socket timeout depends on the exporting connector update interval.
+The timeout is 500 ms shorter than the interval (but not less than 1000 ms). You can alter the timeout using the sockettimeoutms MongoDB URI option.
+
+
+```yaml
+[mongodb:my_instance]
+ enabled = yes
+ destination = mongodb://<hostname>
+ database = your_database_name
+ collection = your_collection_name
+
+```
+
diff --git a/exporting/mongodb/metadata.yaml b/exporting/mongodb/metadata.yaml
new file mode 100644
index 00000000..30e1e89d
--- /dev/null
+++ b/exporting/mongodb/metadata.yaml
@@ -0,0 +1,151 @@
+# yamllint disable rule:line-length
+---
+id: 'export-mongodb'
+meta:
+ name: 'MongoDB'
+ link: 'https://www.mongodb.com/'
+ categories:
+ - export
+ icon_filename: 'mongodb.svg'
+keywords:
+ - exporter
+ - MongoDB
+overview:
+ exporter_description: |
+ Use the MongoDB connector for the exporting engine to archive your agent's metrics to a MongoDB database
+ for long-term storage, further analysis, or correlation with data from other sources.
+ exporter_limitations: ''
+setup:
+ prerequisites:
+ list:
+ - title: ''
+ description: |
+ - To use MongoDB as an external storage for long-term archiving, you should first [install](http://mongoc.org/libmongoc/current/installing.html) libmongoc 1.7.0 or higher.
+ - Next, re-install Netdata from the source, which detects that the required library is now available.
+ configuration:
+ file:
+ name: 'exporting.conf'
+ options:
+ description: |
+ The following options can be defined for this exporter.
+ folding:
+ title: 'Config options'
+ enabled: true
+ list:
+ - name: 'enabled'
+ default_value: 'no'
+ description: 'Enables or disables an exporting connector instance (yes|no).'
+ required: true
+ - name: 'destination'
+ default_value: 'localhost'
+ description: 'Accepts a space separated list of hostnames, IPs (IPv4 and IPv6) and ports to connect to. Netdata will use the first available to send the metrics.'
+ required: true
+ detailed_description: |
+ The format of each item in this list, is: [PROTOCOL:]IP[:PORT].
+ - PROTOCOL can be udp or tcp. tcp is the default and only supported by the current exporting engine.
+ - IP can be XX.XX.XX.XX (IPv4), or [XX:XX...XX:XX] (IPv6). For IPv6 you can to enclose the IP in [] to separate it from the port.
+ - PORT can be a number of a service name. If omitted, the default port for the exporting connector will be used.
+
+ Example IPv4:
+ ```yaml
+ destination = 10.11.14.2:27017 10.11.14.3:4242 10.11.14.4:27017
+ ```
+ Example IPv6 and IPv4 together:
+ ```yaml
+ destination = [ffff:...:0001]:2003 10.11.12.1:2003
+ ```
+ When multiple servers are defined, Netdata will try the next one when the previous one fails.
+ - name: 'username'
+ default_value: 'my_username'
+ description: 'Username for HTTP authentication'
+ required: false
+ - name: 'password'
+ default_value: 'my_password'
+ description: 'Password for HTTP authentication'
+ required: false
+ - name: 'data source'
+ default_value: ''
+ description: 'Selects the kind of data that will be sent to the external database. (as collected|average|sum)'
+ required: false
+ - name: 'hostname'
+ default_value: '[global].hostname'
+ description: 'The hostname to be used for sending data to the external database server.'
+ required: false
+ - name: 'prefix'
+ default_value: 'Netdata'
+ description: 'The prefix to add to all metrics.'
+ required: false
+ - name: 'update every'
+ default_value: '10'
+ description: |
+ Frequency of sending sending data to the external database, in seconds.
+ required: false
+ detailed_description: |
+ Netdata will add some randomness to this number, to prevent stressing the external server when many Netdata servers
+ send data to the same database. This randomness does not affect the quality of the data, only the time they are sent.
+ - name: 'buffer on failures'
+ default_value: '10'
+ description: |
+ The number of iterations (`update every` seconds) to buffer data, when the external database server is not available.
+ required: false
+ detailed_description: |
+ If the server fails to receive the data after that many failures, data loss on the connector instance is expected (Netdata will also log it).
+ - name: 'timeout ms'
+ default_value: '2 * update_every * 1000'
+ description: 'The timeout in milliseconds to wait for the external database server to process the data.'
+ required: false
+ - name: 'send hosts matching'
+ default_value: 'localhost *'
+ description: |
+ Hosts filter. Determines which hosts will be sent to the external database. The syntax is [simple patterns](https://github.com/netdata/netdata/tree/master/libnetdata/simple_pattern#simple-patterns).
+ required: false
+ detailed_description: |
+ Includes one or more space separated patterns, using * as wildcard (any number of times within each pattern).
+ The patterns are checked against the hostname (the localhost is always checked as localhost), allowing us to
+ filter which hosts will be sent to the external database when this Netdata is a central Netdata aggregating multiple hosts.
+
+ A pattern starting with `!` gives a negative match. So to match all hosts named `*db*` except hosts containing `*child*`,
+ use `!*child* *db*` (so, the order is important: the first pattern matching the hostname will be used - positive or negative).
+ - name: 'send charts matching'
+ default_value: '*'
+ description: |
+ One or more space separated patterns (use * as wildcard) checked against both chart id and chart name.
+ required: false
+ detailed_description: |
+ A pattern starting with ! gives a negative match. So to match all charts named apps.* except charts ending in *reads,
+ use !*reads apps.* (so, the order is important: the first pattern matching the chart id or the chart name will be used,
+ positive or negative). There is also a URL parameter filter that can be used while querying allmetrics. The URL parameter
+ has a higher priority than the configuration option.
+ - name: 'send names instead of ids'
+ default_value: ''
+ description: 'Controls the metric names Netdata should send to the external database (yes|no).'
+ required: false
+ detailed_description: |
+ Netdata supports names and IDs for charts and dimensions. Usually IDs are unique identifiers as read by the system and names
+ are human friendly labels (also unique). Most charts and metrics have the same ID and name, but in several cases they are
+ different : disks with device-mapper, interrupts, QoS classes, statsd synthetic charts, etc.
+ - name: 'send configured labels'
+ default_value: ''
+ description: 'Controls if host labels defined in the `[host labels]` section in `netdata.conf` should be sent to the external database (yes|no).'
+ required: false
+ - name: 'send automatic labels'
+ default_value: ''
+ description: 'Controls if automatically created labels, like `_os_name` or `_architecture` should be sent to the external database (yes|no).'
+ required: false
+ examples:
+ folding:
+ enabled: true
+ title: ''
+ list:
+ - name: 'Basic configuration'
+ folding:
+ enabled: false
+ description: |
+ The default socket timeout depends on the exporting connector update interval.
+ The timeout is 500 ms shorter than the interval (but not less than 1000 ms). You can alter the timeout using the sockettimeoutms MongoDB URI option.
+ config: |
+ [mongodb:my_instance]
+ enabled = yes
+ destination = mongodb://<hostname>
+ database = your_database_name
+ collection = your_collection_name
diff --git a/exporting/mongodb/mongodb.c b/exporting/mongodb/mongodb.c
new file mode 100644
index 00000000..c65f8d4c
--- /dev/null
+++ b/exporting/mongodb/mongodb.c
@@ -0,0 +1,392 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#define EXPORTING_INTERNALS
+#include "mongodb.h"
+
+#define CONFIG_FILE_LINE_MAX ((CONFIG_MAX_NAME + CONFIG_MAX_VALUE + 1024) * 2)
+
+/**
+ * Initialize MongoDB connector specific data, including a ring buffer
+ *
+ * @param instance an instance data structure.
+ * @return Returns 0 on success, 1 on failure.
+ */
+int mongodb_init(struct instance *instance)
+{
+ struct mongodb_specific_config *connector_specific_config = instance->config.connector_specific_config;
+ mongoc_uri_t *uri;
+ bson_error_t bson_error;
+
+ if (unlikely(!connector_specific_config->collection || !*connector_specific_config->collection)) {
+ netdata_log_error("EXPORTING: collection name is a mandatory MongoDB parameter, but it is not configured");
+ return 1;
+ }
+
+ uri = mongoc_uri_new_with_error(instance->config.destination, &bson_error);
+ if (unlikely(!uri)) {
+ netdata_log_error("EXPORTING: failed to parse URI: %s. Error message: %s",
+ instance->config.destination,
+ bson_error.message);
+ return 1;
+ }
+
+ int32_t socket_timeout =
+ mongoc_uri_get_option_as_int32(uri, MONGOC_URI_SOCKETTIMEOUTMS, instance->config.timeoutms);
+ if (!mongoc_uri_set_option_as_int32(uri, MONGOC_URI_SOCKETTIMEOUTMS, socket_timeout)) {
+ netdata_log_error("EXPORTING: failed to set %s to the value %d", MONGOC_URI_SOCKETTIMEOUTMS, socket_timeout);
+ return 1;
+ };
+
+ struct mongodb_specific_data *connector_specific_data =
+ (struct mongodb_specific_data *)instance->connector_specific_data;
+
+ connector_specific_data->client = mongoc_client_new_from_uri(uri);
+ if (unlikely(!connector_specific_data->client)) {
+ netdata_log_error("EXPORTING: failed to create a new client");
+ return 1;
+ }
+
+ if (!mongoc_client_set_appname(connector_specific_data->client, "netdata")) {
+ netdata_log_error("EXPORTING: failed to set client appname");
+ };
+
+ connector_specific_data->collection = mongoc_client_get_collection(
+ connector_specific_data->client, connector_specific_config->database, connector_specific_config->collection);
+
+ mongoc_uri_destroy(uri);
+
+ // create a ring buffer
+ struct bson_buffer *first_buffer = NULL;
+
+ if (instance->config.buffer_on_failures < 2)
+ instance->config.buffer_on_failures = 1;
+ else
+ instance->config.buffer_on_failures -= 1;
+
+ for (int i = 0; i < instance->config.buffer_on_failures; i++) {
+ struct bson_buffer *current_buffer = callocz(1, sizeof(struct bson_buffer));
+
+ if (!connector_specific_data->first_buffer)
+ first_buffer = current_buffer;
+ else
+ current_buffer->next = connector_specific_data->first_buffer;
+
+ connector_specific_data->first_buffer = current_buffer;
+ }
+
+ first_buffer->next = connector_specific_data->first_buffer;
+ connector_specific_data->last_buffer = connector_specific_data->first_buffer;
+
+ return 0;
+}
+
+/**
+ * Initialize a MongoDB connector instance
+ *
+ * @param instance an instance data structure.
+ * @return Returns 0 on success, 1 on failure.
+ */
+int init_mongodb_instance(struct instance *instance)
+{
+ instance->worker = mongodb_connector_worker;
+
+ instance->start_batch_formatting = NULL;
+ instance->start_host_formatting = format_host_labels_json_plaintext;
+ instance->start_chart_formatting = NULL;
+
+ if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED)
+ instance->metric_formatting = format_dimension_collected_json_plaintext;
+ else
+ instance->metric_formatting = format_dimension_stored_json_plaintext;
+
+ instance->end_chart_formatting = NULL;
+ instance->variables_formatting = NULL;
+ instance->end_host_formatting = flush_host_labels;
+ instance->end_batch_formatting = format_batch_mongodb;
+
+ instance->prepare_header = NULL;
+ instance->check_response = NULL;
+
+ instance->buffer = (void *)buffer_create(0, &netdata_buffers_statistics.buffers_exporters);
+ if (!instance->buffer) {
+ netdata_log_error("EXPORTING: cannot create buffer for MongoDB exporting connector instance %s",
+ instance->config.name);
+ return 1;
+ }
+ if (uv_mutex_init(&instance->mutex))
+ return 1;
+ if (uv_cond_init(&instance->cond_var))
+ return 1;
+
+ struct mongodb_specific_data *connector_specific_data = callocz(1, sizeof(struct mongodb_specific_data));
+ instance->connector_specific_data = (void *)connector_specific_data;
+
+ instance->config.timeoutms =
+ (instance->config.update_every >= 2) ? (instance->engine->config.update_every * MSEC_PER_SEC - 500) : 1000;
+
+ if (!instance->engine->mongoc_initialized) {
+ mongoc_init();
+ instance->engine->mongoc_initialized = 1;
+ }
+
+ if (unlikely(mongodb_init(instance))) {
+ netdata_log_error("EXPORTING: cannot initialize MongoDB exporting connector");
+ return 1;
+ }
+
+ return 0;
+}
+
+/**
+ * Free an array of BSON structures
+ *
+ * @param insert an array of documents.
+ * @param documents_inserted the number of documents inserted.
+ */
+void free_bson(bson_t **insert, size_t documents_inserted)
+{
+ size_t i;
+
+ for (i = 0; i < documents_inserted; i++)
+ bson_destroy(insert[i]);
+
+ freez(insert);
+}
+
+/**
+ * Format a batch for the MongoDB connector
+ *
+ * @param instance an instance data structure.
+ * @return Returns 0 on success, 1 on failure.
+ */
+int format_batch_mongodb(struct instance *instance)
+{
+ struct mongodb_specific_data *connector_specific_data =
+ (struct mongodb_specific_data *)instance->connector_specific_data;
+ struct stats *stats = &instance->stats;
+
+ bson_t **insert = connector_specific_data->last_buffer->insert;
+ if (insert) {
+ // ring buffer is full, reuse the oldest element
+ connector_specific_data->first_buffer = connector_specific_data->first_buffer->next;
+ free_bson(insert, connector_specific_data->last_buffer->documents_inserted);
+ connector_specific_data->total_documents_inserted -= connector_specific_data->last_buffer->documents_inserted;
+ stats->buffered_bytes -= connector_specific_data->last_buffer->buffered_bytes;
+ }
+ insert = callocz((size_t)stats->buffered_metrics, sizeof(bson_t *));
+ connector_specific_data->last_buffer->insert = insert;
+
+ BUFFER *buffer = (BUFFER *)instance->buffer;
+ char *start = (char *)buffer_tostring(buffer);
+ char *end = start;
+
+ size_t documents_inserted = 0;
+
+ while (*end && documents_inserted <= (size_t)stats->buffered_metrics) {
+ while (*end && *end != '\n')
+ end++;
+
+ if (likely(*end)) {
+ *end = '\0';
+ end++;
+ } else {
+ break;
+ }
+
+ bson_error_t bson_error;
+ insert[documents_inserted] = bson_new_from_json((const uint8_t *)start, -1, &bson_error);
+
+ if (unlikely(!insert[documents_inserted])) {
+ netdata_log_error(
+ "EXPORTING: Failed creating a BSON document from a JSON string \"%s\" : %s", start, bson_error.message);
+ free_bson(insert, documents_inserted);
+ return 1;
+ }
+
+ start = end;
+
+ documents_inserted++;
+ }
+
+ stats->buffered_bytes += connector_specific_data->last_buffer->buffered_bytes = buffer_strlen(buffer);
+
+ buffer_flush(buffer);
+
+ // The stats->buffered_metrics is used in the MongoDB batch formatting as a variable for the number
+ // of metrics, added in the current iteration, so we are clearing it here. We will use the
+ // connector_specific_data->total_documents_inserted in the worker to show the statistics.
+ stats->buffered_metrics = 0;
+ connector_specific_data->total_documents_inserted += documents_inserted;
+
+ connector_specific_data->last_buffer->documents_inserted = documents_inserted;
+ connector_specific_data->last_buffer = connector_specific_data->last_buffer->next;
+
+ return 0;
+}
+
+/**
+ * Clean a MongoDB connector instance up
+ *
+ * @param instance an instance data structure.
+ */
+void mongodb_cleanup(struct instance *instance)
+{
+ netdata_log_info("EXPORTING: cleaning up instance %s ...", instance->config.name);
+
+ struct mongodb_specific_data *connector_specific_data =
+ (struct mongodb_specific_data *)instance->connector_specific_data;
+
+ mongoc_collection_destroy(connector_specific_data->collection);
+ mongoc_client_destroy(connector_specific_data->client);
+ if (instance->engine->mongoc_initialized) {
+ mongoc_cleanup();
+ instance->engine->mongoc_initialized = 0;
+ }
+
+ buffer_free(instance->buffer);
+
+ struct bson_buffer *next_buffer = connector_specific_data->first_buffer;
+ for (int i = 0; i < instance->config.buffer_on_failures; i++) {
+ struct bson_buffer *current_buffer = next_buffer;
+ next_buffer = next_buffer->next;
+
+ if (current_buffer->insert)
+ free_bson(current_buffer->insert, current_buffer->documents_inserted);
+ freez(current_buffer);
+ }
+
+ freez(connector_specific_data);
+
+ struct mongodb_specific_config *connector_specific_config =
+ (struct mongodb_specific_config *)instance->config.connector_specific_config;
+ freez(connector_specific_config->database);
+ freez(connector_specific_config->collection);
+ freez(connector_specific_config);
+
+ netdata_log_info("EXPORTING: instance %s exited", instance->config.name);
+ instance->exited = 1;
+
+ return;
+}
+
+/**
+ * MongoDB connector worker
+ *
+ * Runs in a separate thread for every instance.
+ *
+ * @param instance_p an instance data structure.
+ */
+void mongodb_connector_worker(void *instance_p)
+{
+ struct instance *instance = (struct instance *)instance_p;
+#ifdef NETDATA_INTERNAL_CHECKS
+ struct mongodb_specific_config *connector_specific_config = instance->config.connector_specific_config;
+#endif
+ struct mongodb_specific_data *connector_specific_data =
+ (struct mongodb_specific_data *)instance->connector_specific_data;
+
+ while (!instance->engine->exit) {
+ struct stats *stats = &instance->stats;
+
+ uv_mutex_lock(&instance->mutex);
+ if (!connector_specific_data->first_buffer->insert ||
+ !connector_specific_data->first_buffer->documents_inserted) {
+ while (!instance->data_is_ready)
+ uv_cond_wait(&instance->cond_var, &instance->mutex);
+ instance->data_is_ready = 0;
+ }
+
+ if (unlikely(instance->engine->exit)) {
+ uv_mutex_unlock(&instance->mutex);
+ break;
+ }
+
+ // reset the monitoring chart counters
+ stats->received_bytes =
+ stats->sent_bytes =
+ stats->sent_metrics =
+ stats->lost_metrics =
+ stats->receptions =
+ stats->transmission_successes =
+ stats->transmission_failures =
+ stats->data_lost_events =
+ stats->lost_bytes =
+ stats->reconnects = 0;
+
+ bson_t **insert = connector_specific_data->first_buffer->insert;
+ size_t documents_inserted = connector_specific_data->first_buffer->documents_inserted;
+ size_t buffered_bytes = connector_specific_data->first_buffer->buffered_bytes;
+
+ connector_specific_data->first_buffer->insert = NULL;
+ connector_specific_data->first_buffer->documents_inserted = 0;
+ connector_specific_data->first_buffer->buffered_bytes = 0;
+ connector_specific_data->first_buffer = connector_specific_data->first_buffer->next;
+
+ uv_mutex_unlock(&instance->mutex);
+
+ size_t data_size = 0;
+ for (size_t i = 0; i < documents_inserted; i++) {
+ data_size += insert[i]->len;
+ }
+
+ netdata_log_debug(
+ D_EXPORTING,
+ "EXPORTING: mongodb_insert(): destination = %s, database = %s, collection = %s, data size = %zu",
+ instance->config.destination,
+ connector_specific_config->database,
+ connector_specific_config->collection,
+ data_size);
+
+ if (likely(documents_inserted != 0)) {
+ bson_error_t bson_error;
+ if (likely(mongoc_collection_insert_many(
+ connector_specific_data->collection,
+ (const bson_t **)insert,
+ documents_inserted,
+ NULL,
+ NULL,
+ &bson_error))) {
+ stats->sent_metrics = documents_inserted;
+ stats->sent_bytes += data_size;
+ stats->transmission_successes++;
+ stats->receptions++;
+ } else {
+ // oops! we couldn't send (all or some of the) data
+ netdata_log_error("EXPORTING: %s", bson_error.message);
+ netdata_log_error(
+ "EXPORTING: failed to write data to the database '%s'. "
+ "Willing to write %zu bytes, wrote %zu bytes.",
+ instance->config.destination, data_size, 0UL);
+
+ stats->transmission_failures++;
+ stats->data_lost_events++;
+ stats->lost_bytes += buffered_bytes;
+ stats->lost_metrics += documents_inserted;
+ }
+ }
+
+ free_bson(insert, documents_inserted);
+
+ if (unlikely(instance->engine->exit))
+ break;
+
+ uv_mutex_lock(&instance->mutex);
+
+ stats->buffered_metrics = connector_specific_data->total_documents_inserted;
+
+ send_internal_metrics(instance);
+
+ connector_specific_data->total_documents_inserted -= documents_inserted;
+
+ stats->buffered_metrics = 0;
+ stats->buffered_bytes -= buffered_bytes;
+
+ uv_mutex_unlock(&instance->mutex);
+
+#ifdef UNIT_TESTING
+ return;
+#endif
+ }
+
+ mongodb_cleanup(instance);
+}
diff --git a/exporting/mongodb/mongodb.h b/exporting/mongodb/mongodb.h
new file mode 100644
index 00000000..f1867b28
--- /dev/null
+++ b/exporting/mongodb/mongodb.h
@@ -0,0 +1,35 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_EXPORTING_MONGODB_H
+#define NETDATA_EXPORTING_MONGODB_H
+
+#include "exporting/exporting_engine.h"
+#include "exporting/json/json.h"
+#include <mongoc.h>
+
+struct bson_buffer {
+ bson_t **insert;
+ size_t documents_inserted;
+ size_t buffered_bytes;
+
+ struct bson_buffer *next;
+};
+
+struct mongodb_specific_data {
+ mongoc_client_t *client;
+ mongoc_collection_t *collection;
+
+ size_t total_documents_inserted;
+
+ struct bson_buffer *first_buffer;
+ struct bson_buffer *last_buffer;
+};
+
+int mongodb_init(struct instance *instance);
+void mongodb_cleanup(struct instance *instance);
+
+int init_mongodb_instance(struct instance *instance);
+int format_batch_mongodb(struct instance *instance);
+void mongodb_connector_worker(void *instance_p);
+
+#endif //NETDATA_EXPORTING_MONGODB_H