diff options
Diffstat (limited to '')
-rw-r--r-- | exporting/mongodb/Makefile.am | 8 | ||||
l--------- | exporting/mongodb/README.md | 1 | ||||
-rw-r--r-- | exporting/mongodb/integrations/mongodb.md | 145 | ||||
-rw-r--r-- | exporting/mongodb/metadata.yaml | 151 | ||||
-rw-r--r-- | exporting/mongodb/mongodb.c | 392 | ||||
-rw-r--r-- | exporting/mongodb/mongodb.h | 35 |
6 files changed, 732 insertions, 0 deletions
diff --git a/exporting/mongodb/Makefile.am b/exporting/mongodb/Makefile.am new file mode 100644 index 00000000..161784b8 --- /dev/null +++ b/exporting/mongodb/Makefile.am @@ -0,0 +1,8 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +AUTOMAKE_OPTIONS = subdir-objects +MAINTAINERCLEANFILES = $(srcdir)/Makefile.in + +dist_noinst_DATA = \ + README.md \ + $(NULL) diff --git a/exporting/mongodb/README.md b/exporting/mongodb/README.md new file mode 120000 index 00000000..a2825305 --- /dev/null +++ b/exporting/mongodb/README.md @@ -0,0 +1 @@ +integrations/mongodb.md
\ No newline at end of file diff --git a/exporting/mongodb/integrations/mongodb.md b/exporting/mongodb/integrations/mongodb.md new file mode 100644 index 00000000..30dfe4f8 --- /dev/null +++ b/exporting/mongodb/integrations/mongodb.md @@ -0,0 +1,145 @@ +<!--startmeta +custom_edit_url: "https://github.com/netdata/netdata/edit/master/exporting/mongodb/README.md" +meta_yaml: "https://github.com/netdata/netdata/edit/master/exporting/mongodb/metadata.yaml" +sidebar_label: "MongoDB" +learn_status: "Published" +learn_rel_path: "Exporting" +message: "DO NOT EDIT THIS FILE DIRECTLY, IT IS GENERATED BY THE EXPORTER'S metadata.yaml FILE" +endmeta--> + +# MongoDB + + +<img src="https://netdata.cloud/img/mongodb.svg" width="150"/> + + +Use the MongoDB connector for the exporting engine to archive your agent's metrics to a MongoDB database +for long-term storage, further analysis, or correlation with data from other sources. + + + +<img src="https://img.shields.io/badge/maintained%20by-Netdata-%2300ab44" /> + +## Setup + +### Prerequisites + +#### + +- To use MongoDB as an external storage for long-term archiving, you should first [install](http://mongoc.org/libmongoc/current/installing.html) libmongoc 1.7.0 or higher. +- Next, re-install Netdata from the source, which detects that the required library is now available. + + + +### Configuration + +#### File + +The configuration file name for this integration is `exporting.conf`. + + +You can edit the configuration file using the `edit-config` script from the +Netdata [config directory](https://github.com/netdata/netdata/blob/master/docs/configure/nodes.md#the-netdata-config-directory). + +```bash +cd /etc/netdata 2>/dev/null || cd /opt/netdata/etc/netdata +sudo ./edit-config exporting.conf +``` +#### Options + +The following options can be defined for this exporter. + + +<details><summary>Config options</summary> + +| Name | Description | Default | Required | +|:----|:-----------|:-------|:--------:| +| enabled | Enables or disables an exporting connector instance (yes/no). | no | yes | +| destination | Accepts a space separated list of hostnames, IPs (IPv4 and IPv6) and ports to connect to. Netdata will use the first available to send the metrics. | localhost | yes | +| username | Username for HTTP authentication | my_username | no | +| password | Password for HTTP authentication | my_password | no | +| data source | Selects the kind of data that will be sent to the external database. (as collected/average/sum) | | no | +| hostname | The hostname to be used for sending data to the external database server. | [global].hostname | no | +| prefix | The prefix to add to all metrics. | Netdata | no | +| update every | Frequency of sending sending data to the external database, in seconds. | 10 | no | +| buffer on failures | The number of iterations (`update every` seconds) to buffer data, when the external database server is not available. | 10 | no | +| timeout ms | The timeout in milliseconds to wait for the external database server to process the data. | 2 * update_every * 1000 | no | +| send hosts matching | Hosts filter. Determines which hosts will be sent to the external database. The syntax is [simple patterns](https://github.com/netdata/netdata/tree/master/libnetdata/simple_pattern#simple-patterns). | localhost * | no | +| send charts matching | One or more space separated patterns (use * as wildcard) checked against both chart id and chart name. | * | no | +| send names instead of ids | Controls the metric names Netdata should send to the external database (yes/no). | | no | +| send configured labels | Controls if host labels defined in the `[host labels]` section in `netdata.conf` should be sent to the external database (yes/no). | | no | +| send automatic labels | Controls if automatically created labels, like `_os_name` or `_architecture` should be sent to the external database (yes/no). | | no | + +##### destination + +The format of each item in this list, is: [PROTOCOL:]IP[:PORT]. +- PROTOCOL can be udp or tcp. tcp is the default and only supported by the current exporting engine. +- IP can be XX.XX.XX.XX (IPv4), or [XX:XX...XX:XX] (IPv6). For IPv6 you can to enclose the IP in [] to separate it from the port. +- PORT can be a number of a service name. If omitted, the default port for the exporting connector will be used. + +Example IPv4: + ```yaml + destination = 10.11.14.2:27017 10.11.14.3:4242 10.11.14.4:27017 + ``` +Example IPv6 and IPv4 together: +```yaml +destination = [ffff:...:0001]:2003 10.11.12.1:2003 +``` +When multiple servers are defined, Netdata will try the next one when the previous one fails. + + +##### update every + +Netdata will add some randomness to this number, to prevent stressing the external server when many Netdata servers +send data to the same database. This randomness does not affect the quality of the data, only the time they are sent. + + +##### buffer on failures + +If the server fails to receive the data after that many failures, data loss on the connector instance is expected (Netdata will also log it). + + +##### send hosts matching + +Includes one or more space separated patterns, using * as wildcard (any number of times within each pattern). +The patterns are checked against the hostname (the localhost is always checked as localhost), allowing us to +filter which hosts will be sent to the external database when this Netdata is a central Netdata aggregating multiple hosts. + +A pattern starting with `!` gives a negative match. So to match all hosts named `*db*` except hosts containing `*child*`, +use `!*child* *db*` (so, the order is important: the first pattern matching the hostname will be used - positive or negative). + + +##### send charts matching + +A pattern starting with ! gives a negative match. So to match all charts named apps.* except charts ending in *reads, +use !*reads apps.* (so, the order is important: the first pattern matching the chart id or the chart name will be used, +positive or negative). There is also a URL parameter filter that can be used while querying allmetrics. The URL parameter +has a higher priority than the configuration option. + + +##### send names instead of ids + +Netdata supports names and IDs for charts and dimensions. Usually IDs are unique identifiers as read by the system and names +are human friendly labels (also unique). Most charts and metrics have the same ID and name, but in several cases they are +different : disks with device-mapper, interrupts, QoS classes, statsd synthetic charts, etc. + + +</details> + +#### Examples + +##### Basic configuration + +The default socket timeout depends on the exporting connector update interval. +The timeout is 500 ms shorter than the interval (but not less than 1000 ms). You can alter the timeout using the sockettimeoutms MongoDB URI option. + + +```yaml +[mongodb:my_instance] + enabled = yes + destination = mongodb://<hostname> + database = your_database_name + collection = your_collection_name + +``` + diff --git a/exporting/mongodb/metadata.yaml b/exporting/mongodb/metadata.yaml new file mode 100644 index 00000000..30e1e89d --- /dev/null +++ b/exporting/mongodb/metadata.yaml @@ -0,0 +1,151 @@ +# yamllint disable rule:line-length +--- +id: 'export-mongodb' +meta: + name: 'MongoDB' + link: 'https://www.mongodb.com/' + categories: + - export + icon_filename: 'mongodb.svg' +keywords: + - exporter + - MongoDB +overview: + exporter_description: | + Use the MongoDB connector for the exporting engine to archive your agent's metrics to a MongoDB database + for long-term storage, further analysis, or correlation with data from other sources. + exporter_limitations: '' +setup: + prerequisites: + list: + - title: '' + description: | + - To use MongoDB as an external storage for long-term archiving, you should first [install](http://mongoc.org/libmongoc/current/installing.html) libmongoc 1.7.0 or higher. + - Next, re-install Netdata from the source, which detects that the required library is now available. + configuration: + file: + name: 'exporting.conf' + options: + description: | + The following options can be defined for this exporter. + folding: + title: 'Config options' + enabled: true + list: + - name: 'enabled' + default_value: 'no' + description: 'Enables or disables an exporting connector instance (yes|no).' + required: true + - name: 'destination' + default_value: 'localhost' + description: 'Accepts a space separated list of hostnames, IPs (IPv4 and IPv6) and ports to connect to. Netdata will use the first available to send the metrics.' + required: true + detailed_description: | + The format of each item in this list, is: [PROTOCOL:]IP[:PORT]. + - PROTOCOL can be udp or tcp. tcp is the default and only supported by the current exporting engine. + - IP can be XX.XX.XX.XX (IPv4), or [XX:XX...XX:XX] (IPv6). For IPv6 you can to enclose the IP in [] to separate it from the port. + - PORT can be a number of a service name. If omitted, the default port for the exporting connector will be used. + + Example IPv4: + ```yaml + destination = 10.11.14.2:27017 10.11.14.3:4242 10.11.14.4:27017 + ``` + Example IPv6 and IPv4 together: + ```yaml + destination = [ffff:...:0001]:2003 10.11.12.1:2003 + ``` + When multiple servers are defined, Netdata will try the next one when the previous one fails. + - name: 'username' + default_value: 'my_username' + description: 'Username for HTTP authentication' + required: false + - name: 'password' + default_value: 'my_password' + description: 'Password for HTTP authentication' + required: false + - name: 'data source' + default_value: '' + description: 'Selects the kind of data that will be sent to the external database. (as collected|average|sum)' + required: false + - name: 'hostname' + default_value: '[global].hostname' + description: 'The hostname to be used for sending data to the external database server.' + required: false + - name: 'prefix' + default_value: 'Netdata' + description: 'The prefix to add to all metrics.' + required: false + - name: 'update every' + default_value: '10' + description: | + Frequency of sending sending data to the external database, in seconds. + required: false + detailed_description: | + Netdata will add some randomness to this number, to prevent stressing the external server when many Netdata servers + send data to the same database. This randomness does not affect the quality of the data, only the time they are sent. + - name: 'buffer on failures' + default_value: '10' + description: | + The number of iterations (`update every` seconds) to buffer data, when the external database server is not available. + required: false + detailed_description: | + If the server fails to receive the data after that many failures, data loss on the connector instance is expected (Netdata will also log it). + - name: 'timeout ms' + default_value: '2 * update_every * 1000' + description: 'The timeout in milliseconds to wait for the external database server to process the data.' + required: false + - name: 'send hosts matching' + default_value: 'localhost *' + description: | + Hosts filter. Determines which hosts will be sent to the external database. The syntax is [simple patterns](https://github.com/netdata/netdata/tree/master/libnetdata/simple_pattern#simple-patterns). + required: false + detailed_description: | + Includes one or more space separated patterns, using * as wildcard (any number of times within each pattern). + The patterns are checked against the hostname (the localhost is always checked as localhost), allowing us to + filter which hosts will be sent to the external database when this Netdata is a central Netdata aggregating multiple hosts. + + A pattern starting with `!` gives a negative match. So to match all hosts named `*db*` except hosts containing `*child*`, + use `!*child* *db*` (so, the order is important: the first pattern matching the hostname will be used - positive or negative). + - name: 'send charts matching' + default_value: '*' + description: | + One or more space separated patterns (use * as wildcard) checked against both chart id and chart name. + required: false + detailed_description: | + A pattern starting with ! gives a negative match. So to match all charts named apps.* except charts ending in *reads, + use !*reads apps.* (so, the order is important: the first pattern matching the chart id or the chart name will be used, + positive or negative). There is also a URL parameter filter that can be used while querying allmetrics. The URL parameter + has a higher priority than the configuration option. + - name: 'send names instead of ids' + default_value: '' + description: 'Controls the metric names Netdata should send to the external database (yes|no).' + required: false + detailed_description: | + Netdata supports names and IDs for charts and dimensions. Usually IDs are unique identifiers as read by the system and names + are human friendly labels (also unique). Most charts and metrics have the same ID and name, but in several cases they are + different : disks with device-mapper, interrupts, QoS classes, statsd synthetic charts, etc. + - name: 'send configured labels' + default_value: '' + description: 'Controls if host labels defined in the `[host labels]` section in `netdata.conf` should be sent to the external database (yes|no).' + required: false + - name: 'send automatic labels' + default_value: '' + description: 'Controls if automatically created labels, like `_os_name` or `_architecture` should be sent to the external database (yes|no).' + required: false + examples: + folding: + enabled: true + title: '' + list: + - name: 'Basic configuration' + folding: + enabled: false + description: | + The default socket timeout depends on the exporting connector update interval. + The timeout is 500 ms shorter than the interval (but not less than 1000 ms). You can alter the timeout using the sockettimeoutms MongoDB URI option. + config: | + [mongodb:my_instance] + enabled = yes + destination = mongodb://<hostname> + database = your_database_name + collection = your_collection_name diff --git a/exporting/mongodb/mongodb.c b/exporting/mongodb/mongodb.c new file mode 100644 index 00000000..c65f8d4c --- /dev/null +++ b/exporting/mongodb/mongodb.c @@ -0,0 +1,392 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#define EXPORTING_INTERNALS +#include "mongodb.h" + +#define CONFIG_FILE_LINE_MAX ((CONFIG_MAX_NAME + CONFIG_MAX_VALUE + 1024) * 2) + +/** + * Initialize MongoDB connector specific data, including a ring buffer + * + * @param instance an instance data structure. + * @return Returns 0 on success, 1 on failure. + */ +int mongodb_init(struct instance *instance) +{ + struct mongodb_specific_config *connector_specific_config = instance->config.connector_specific_config; + mongoc_uri_t *uri; + bson_error_t bson_error; + + if (unlikely(!connector_specific_config->collection || !*connector_specific_config->collection)) { + netdata_log_error("EXPORTING: collection name is a mandatory MongoDB parameter, but it is not configured"); + return 1; + } + + uri = mongoc_uri_new_with_error(instance->config.destination, &bson_error); + if (unlikely(!uri)) { + netdata_log_error("EXPORTING: failed to parse URI: %s. Error message: %s", + instance->config.destination, + bson_error.message); + return 1; + } + + int32_t socket_timeout = + mongoc_uri_get_option_as_int32(uri, MONGOC_URI_SOCKETTIMEOUTMS, instance->config.timeoutms); + if (!mongoc_uri_set_option_as_int32(uri, MONGOC_URI_SOCKETTIMEOUTMS, socket_timeout)) { + netdata_log_error("EXPORTING: failed to set %s to the value %d", MONGOC_URI_SOCKETTIMEOUTMS, socket_timeout); + return 1; + }; + + struct mongodb_specific_data *connector_specific_data = + (struct mongodb_specific_data *)instance->connector_specific_data; + + connector_specific_data->client = mongoc_client_new_from_uri(uri); + if (unlikely(!connector_specific_data->client)) { + netdata_log_error("EXPORTING: failed to create a new client"); + return 1; + } + + if (!mongoc_client_set_appname(connector_specific_data->client, "netdata")) { + netdata_log_error("EXPORTING: failed to set client appname"); + }; + + connector_specific_data->collection = mongoc_client_get_collection( + connector_specific_data->client, connector_specific_config->database, connector_specific_config->collection); + + mongoc_uri_destroy(uri); + + // create a ring buffer + struct bson_buffer *first_buffer = NULL; + + if (instance->config.buffer_on_failures < 2) + instance->config.buffer_on_failures = 1; + else + instance->config.buffer_on_failures -= 1; + + for (int i = 0; i < instance->config.buffer_on_failures; i++) { + struct bson_buffer *current_buffer = callocz(1, sizeof(struct bson_buffer)); + + if (!connector_specific_data->first_buffer) + first_buffer = current_buffer; + else + current_buffer->next = connector_specific_data->first_buffer; + + connector_specific_data->first_buffer = current_buffer; + } + + first_buffer->next = connector_specific_data->first_buffer; + connector_specific_data->last_buffer = connector_specific_data->first_buffer; + + return 0; +} + +/** + * Initialize a MongoDB connector instance + * + * @param instance an instance data structure. + * @return Returns 0 on success, 1 on failure. + */ +int init_mongodb_instance(struct instance *instance) +{ + instance->worker = mongodb_connector_worker; + + instance->start_batch_formatting = NULL; + instance->start_host_formatting = format_host_labels_json_plaintext; + instance->start_chart_formatting = NULL; + + if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED) + instance->metric_formatting = format_dimension_collected_json_plaintext; + else + instance->metric_formatting = format_dimension_stored_json_plaintext; + + instance->end_chart_formatting = NULL; + instance->variables_formatting = NULL; + instance->end_host_formatting = flush_host_labels; + instance->end_batch_formatting = format_batch_mongodb; + + instance->prepare_header = NULL; + instance->check_response = NULL; + + instance->buffer = (void *)buffer_create(0, &netdata_buffers_statistics.buffers_exporters); + if (!instance->buffer) { + netdata_log_error("EXPORTING: cannot create buffer for MongoDB exporting connector instance %s", + instance->config.name); + return 1; + } + if (uv_mutex_init(&instance->mutex)) + return 1; + if (uv_cond_init(&instance->cond_var)) + return 1; + + struct mongodb_specific_data *connector_specific_data = callocz(1, sizeof(struct mongodb_specific_data)); + instance->connector_specific_data = (void *)connector_specific_data; + + instance->config.timeoutms = + (instance->config.update_every >= 2) ? (instance->engine->config.update_every * MSEC_PER_SEC - 500) : 1000; + + if (!instance->engine->mongoc_initialized) { + mongoc_init(); + instance->engine->mongoc_initialized = 1; + } + + if (unlikely(mongodb_init(instance))) { + netdata_log_error("EXPORTING: cannot initialize MongoDB exporting connector"); + return 1; + } + + return 0; +} + +/** + * Free an array of BSON structures + * + * @param insert an array of documents. + * @param documents_inserted the number of documents inserted. + */ +void free_bson(bson_t **insert, size_t documents_inserted) +{ + size_t i; + + for (i = 0; i < documents_inserted; i++) + bson_destroy(insert[i]); + + freez(insert); +} + +/** + * Format a batch for the MongoDB connector + * + * @param instance an instance data structure. + * @return Returns 0 on success, 1 on failure. + */ +int format_batch_mongodb(struct instance *instance) +{ + struct mongodb_specific_data *connector_specific_data = + (struct mongodb_specific_data *)instance->connector_specific_data; + struct stats *stats = &instance->stats; + + bson_t **insert = connector_specific_data->last_buffer->insert; + if (insert) { + // ring buffer is full, reuse the oldest element + connector_specific_data->first_buffer = connector_specific_data->first_buffer->next; + free_bson(insert, connector_specific_data->last_buffer->documents_inserted); + connector_specific_data->total_documents_inserted -= connector_specific_data->last_buffer->documents_inserted; + stats->buffered_bytes -= connector_specific_data->last_buffer->buffered_bytes; + } + insert = callocz((size_t)stats->buffered_metrics, sizeof(bson_t *)); + connector_specific_data->last_buffer->insert = insert; + + BUFFER *buffer = (BUFFER *)instance->buffer; + char *start = (char *)buffer_tostring(buffer); + char *end = start; + + size_t documents_inserted = 0; + + while (*end && documents_inserted <= (size_t)stats->buffered_metrics) { + while (*end && *end != '\n') + end++; + + if (likely(*end)) { + *end = '\0'; + end++; + } else { + break; + } + + bson_error_t bson_error; + insert[documents_inserted] = bson_new_from_json((const uint8_t *)start, -1, &bson_error); + + if (unlikely(!insert[documents_inserted])) { + netdata_log_error( + "EXPORTING: Failed creating a BSON document from a JSON string \"%s\" : %s", start, bson_error.message); + free_bson(insert, documents_inserted); + return 1; + } + + start = end; + + documents_inserted++; + } + + stats->buffered_bytes += connector_specific_data->last_buffer->buffered_bytes = buffer_strlen(buffer); + + buffer_flush(buffer); + + // The stats->buffered_metrics is used in the MongoDB batch formatting as a variable for the number + // of metrics, added in the current iteration, so we are clearing it here. We will use the + // connector_specific_data->total_documents_inserted in the worker to show the statistics. + stats->buffered_metrics = 0; + connector_specific_data->total_documents_inserted += documents_inserted; + + connector_specific_data->last_buffer->documents_inserted = documents_inserted; + connector_specific_data->last_buffer = connector_specific_data->last_buffer->next; + + return 0; +} + +/** + * Clean a MongoDB connector instance up + * + * @param instance an instance data structure. + */ +void mongodb_cleanup(struct instance *instance) +{ + netdata_log_info("EXPORTING: cleaning up instance %s ...", instance->config.name); + + struct mongodb_specific_data *connector_specific_data = + (struct mongodb_specific_data *)instance->connector_specific_data; + + mongoc_collection_destroy(connector_specific_data->collection); + mongoc_client_destroy(connector_specific_data->client); + if (instance->engine->mongoc_initialized) { + mongoc_cleanup(); + instance->engine->mongoc_initialized = 0; + } + + buffer_free(instance->buffer); + + struct bson_buffer *next_buffer = connector_specific_data->first_buffer; + for (int i = 0; i < instance->config.buffer_on_failures; i++) { + struct bson_buffer *current_buffer = next_buffer; + next_buffer = next_buffer->next; + + if (current_buffer->insert) + free_bson(current_buffer->insert, current_buffer->documents_inserted); + freez(current_buffer); + } + + freez(connector_specific_data); + + struct mongodb_specific_config *connector_specific_config = + (struct mongodb_specific_config *)instance->config.connector_specific_config; + freez(connector_specific_config->database); + freez(connector_specific_config->collection); + freez(connector_specific_config); + + netdata_log_info("EXPORTING: instance %s exited", instance->config.name); + instance->exited = 1; + + return; +} + +/** + * MongoDB connector worker + * + * Runs in a separate thread for every instance. + * + * @param instance_p an instance data structure. + */ +void mongodb_connector_worker(void *instance_p) +{ + struct instance *instance = (struct instance *)instance_p; +#ifdef NETDATA_INTERNAL_CHECKS + struct mongodb_specific_config *connector_specific_config = instance->config.connector_specific_config; +#endif + struct mongodb_specific_data *connector_specific_data = + (struct mongodb_specific_data *)instance->connector_specific_data; + + while (!instance->engine->exit) { + struct stats *stats = &instance->stats; + + uv_mutex_lock(&instance->mutex); + if (!connector_specific_data->first_buffer->insert || + !connector_specific_data->first_buffer->documents_inserted) { + while (!instance->data_is_ready) + uv_cond_wait(&instance->cond_var, &instance->mutex); + instance->data_is_ready = 0; + } + + if (unlikely(instance->engine->exit)) { + uv_mutex_unlock(&instance->mutex); + break; + } + + // reset the monitoring chart counters + stats->received_bytes = + stats->sent_bytes = + stats->sent_metrics = + stats->lost_metrics = + stats->receptions = + stats->transmission_successes = + stats->transmission_failures = + stats->data_lost_events = + stats->lost_bytes = + stats->reconnects = 0; + + bson_t **insert = connector_specific_data->first_buffer->insert; + size_t documents_inserted = connector_specific_data->first_buffer->documents_inserted; + size_t buffered_bytes = connector_specific_data->first_buffer->buffered_bytes; + + connector_specific_data->first_buffer->insert = NULL; + connector_specific_data->first_buffer->documents_inserted = 0; + connector_specific_data->first_buffer->buffered_bytes = 0; + connector_specific_data->first_buffer = connector_specific_data->first_buffer->next; + + uv_mutex_unlock(&instance->mutex); + + size_t data_size = 0; + for (size_t i = 0; i < documents_inserted; i++) { + data_size += insert[i]->len; + } + + netdata_log_debug( + D_EXPORTING, + "EXPORTING: mongodb_insert(): destination = %s, database = %s, collection = %s, data size = %zu", + instance->config.destination, + connector_specific_config->database, + connector_specific_config->collection, + data_size); + + if (likely(documents_inserted != 0)) { + bson_error_t bson_error; + if (likely(mongoc_collection_insert_many( + connector_specific_data->collection, + (const bson_t **)insert, + documents_inserted, + NULL, + NULL, + &bson_error))) { + stats->sent_metrics = documents_inserted; + stats->sent_bytes += data_size; + stats->transmission_successes++; + stats->receptions++; + } else { + // oops! we couldn't send (all or some of the) data + netdata_log_error("EXPORTING: %s", bson_error.message); + netdata_log_error( + "EXPORTING: failed to write data to the database '%s'. " + "Willing to write %zu bytes, wrote %zu bytes.", + instance->config.destination, data_size, 0UL); + + stats->transmission_failures++; + stats->data_lost_events++; + stats->lost_bytes += buffered_bytes; + stats->lost_metrics += documents_inserted; + } + } + + free_bson(insert, documents_inserted); + + if (unlikely(instance->engine->exit)) + break; + + uv_mutex_lock(&instance->mutex); + + stats->buffered_metrics = connector_specific_data->total_documents_inserted; + + send_internal_metrics(instance); + + connector_specific_data->total_documents_inserted -= documents_inserted; + + stats->buffered_metrics = 0; + stats->buffered_bytes -= buffered_bytes; + + uv_mutex_unlock(&instance->mutex); + +#ifdef UNIT_TESTING + return; +#endif + } + + mongodb_cleanup(instance); +} diff --git a/exporting/mongodb/mongodb.h b/exporting/mongodb/mongodb.h new file mode 100644 index 00000000..f1867b28 --- /dev/null +++ b/exporting/mongodb/mongodb.h @@ -0,0 +1,35 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_EXPORTING_MONGODB_H +#define NETDATA_EXPORTING_MONGODB_H + +#include "exporting/exporting_engine.h" +#include "exporting/json/json.h" +#include <mongoc.h> + +struct bson_buffer { + bson_t **insert; + size_t documents_inserted; + size_t buffered_bytes; + + struct bson_buffer *next; +}; + +struct mongodb_specific_data { + mongoc_client_t *client; + mongoc_collection_t *collection; + + size_t total_documents_inserted; + + struct bson_buffer *first_buffer; + struct bson_buffer *last_buffer; +}; + +int mongodb_init(struct instance *instance); +void mongodb_cleanup(struct instance *instance); + +int init_mongodb_instance(struct instance *instance); +int format_batch_mongodb(struct instance *instance); +void mongodb_connector_worker(void *instance_p); + +#endif //NETDATA_EXPORTING_MONGODB_H |