summaryrefslogtreecommitdiffstats
path: root/backends
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2019-07-08 20:14:42 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2019-07-08 20:14:42 +0000
commit4f88e1a9be89a257fd6ed3045703db6e900027ee (patch)
tree518eb3c3aa1dce9ea281d02e0fd3cc01a9e7913f /backends
parentAdding upstream version 1.15.0. (diff)
downloadnetdata-4f88e1a9be89a257fd6ed3045703db6e900027ee.tar.xz
netdata-4f88e1a9be89a257fd6ed3045703db6e900027ee.zip
Adding upstream version 1.16.0.upstream/1.16.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'backends')
-rw-r--r--backends/README.md12
-rw-r--r--backends/aws_kinesis/README.md6
-rw-r--r--backends/backends.c522
-rw-r--r--backends/backends.h18
-rw-r--r--backends/opentsdb/README.md26
-rw-r--r--backends/opentsdb/opentsdb.c115
-rw-r--r--backends/opentsdb/opentsdb.h23
-rw-r--r--backends/prometheus/Makefile.am4
-rw-r--r--backends/prometheus/backend_prometheus.c198
-rw-r--r--backends/prometheus/backend_prometheus.h15
-rw-r--r--backends/prometheus/remote_write/Makefile.am14
-rw-r--r--backends/prometheus/remote_write/README.md30
-rw-r--r--backends/prometheus/remote_write/remote_write.cc117
-rw-r--r--backends/prometheus/remote_write/remote_write.h30
-rw-r--r--backends/prometheus/remote_write/remote_write.proto29
15 files changed, 1063 insertions, 96 deletions
diff --git a/backends/README.md b/backends/README.md
index efaba0caa..bdc409017 100644
--- a/backends/README.md
+++ b/backends/README.md
@@ -22,7 +22,7 @@ X seconds (though, it can send them per second if you need it to).
metrics are sent to the backend server as `prefix.hostname.chart.dimension`. `prefix` is
configured below, `hostname` is the hostname of the machine (can also be configured).
- - **opentsdb** (`telnet interface`, used by **OpenTSDB**, **InfluxDB**, **KairosDB**, etc)
+ - **opentsdb** (`telnet or HTTP interfaces`, used by **OpenTSDB**, **InfluxDB**, **KairosDB**, etc)
metrics are sent to opentsdb as `prefix.chart.dimension` with tag `host=hostname`.
@@ -32,6 +32,12 @@ X seconds (though, it can send them per second if you need it to).
- **prometheus** is described at [prometheus page](prometheus/) since it pulls data from netdata.
+ - **prometheus remote write** (a binary snappy-compressed protocol buffer encoding over HTTP used by
+ a lot of [storage providers](https://prometheus.io/docs/operating/integrations/#remote-endpoints-and-storage))
+
+ metrics are labeled in the format, which is used by Netdata for the [plaintext prometheus protocol](prometheus/).
+ Notes on using the remote write backend are [here](prometheus/remote_write/).
+
- **AWS Kinesis Data Streams**
metrics are sent to the service in `JSON` format.
@@ -70,7 +76,7 @@ of `netdata.conf` from your netdata):
```
[backend]
enabled = yes | no
- type = graphite | opentsdb | json | kinesis
+ type = graphite | opentsdb:telnet | opentsdb:http | opentsdb:https | prometheus_remote_write | json | kinesis
host tags = list of TAG=VALUE
destination = space separated list of [PROTOCOL:]HOST[:PORT] - the first working will be used, or a region for kinesis
data source = average | sum | as collected
@@ -86,7 +92,7 @@ of `netdata.conf` from your netdata):
- `enabled = yes | no`, enables or disables sending data to a backend
-- `type = graphite | opentsdb | json | kinesis`, selects the backend type
+- `type = graphite | opentsdb:telnet | opentsdb:http | opentsdb:https | json | kinesis`, selects the backend type
- `destination = host1 host2 host3 ...`, accepts **a space separated list** of hostnames,
IPs (IPv4 and IPv6) and ports to connect to.
diff --git a/backends/aws_kinesis/README.md b/backends/aws_kinesis/README.md
index a9cc77d6e..424979097 100644
--- a/backends/aws_kinesis/README.md
+++ b/backends/aws_kinesis/README.md
@@ -4,7 +4,11 @@
To use AWS Kinesis as a backend AWS SDK for C++ should be [installed](https://docs.aws.amazon.com/en_us/sdk-for-cpp/v1/developer-guide/setup.html) first. `libcrypto`, `libssl`, and `libcurl` are also required to compile netdata with Kinesis support enabled. Next, netdata should be re-installed from the source. The installer will detect that the required libraries are now available.
-If AWS SDK for C++ is being installed from sources, it is useful to set `-DBUILD_ONLY="kinesis"`. Otherwise, the building process could take a very long time.
+If the AWS SDK for C++ is being installed from source, it is useful to set `-DBUILD_ONLY="kinesis"`. Otherwise, the building process could take a very long time. Take a note, that the default installation path for the libraries is `/usr/local/lib64`. Many Linux distributions don't include this path as the default one for a library search, so it is advisable to use the following options to `cmake` while building the AWS SDK:
+
+```
+cmake -DCMAKE_INSTALL_LIBDIR=/usr/lib -DCMAKE_INSTALL_INCLUDEDIR=/usr/include -DBUILD_SHARED_LIBS=OFF -DBUILD_ONLY=kinesis <aws-sdk-cpp sources>
+```
## Configuration
diff --git a/backends/backends.c b/backends/backends.c
index 0e7918916..15a0cb41b 100644
--- a/backends/backends.c
+++ b/backends/backends.c
@@ -246,6 +246,194 @@ static void backends_main_cleanup(void *ptr) {
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
}
+/**
+ * Set Kinesis variables
+ *
+ * Set the variables necessaries to work with this specific backend.
+ *
+ * @param default_port the default port of the backend
+ * @param brc function called to check the result.
+ * @param brf function called to format the msessage to the backend
+ * @param type the backend string selector.
+ */
+void backend_set_kinesis_variables(int *default_port,
+ backend_response_checker_t brc,
+ backend_request_formatter_t brf)
+{
+ (void)default_port;
+#ifndef HAVE_KINESIS
+ (void)brc;
+ (void)brf;
+#endif
+
+#if HAVE_KINESIS
+ *brc = process_json_response;
+ if (BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED)
+ *brf = format_dimension_collected_json_plaintext;
+ else
+ *brf = format_dimension_stored_json_plaintext;
+#endif
+}
+
+/**
+ * Set Prometheus variables
+ *
+ * Set the variables necessaries to work with this specific backend.
+ *
+ * @param default_port the default port of the backend
+ * @param brc function called to check the result.
+ * @param brf function called to format the msessage to the backend
+ * @param type the backend string selector.
+ */
+void backend_set_prometheus_variables(int *default_port,
+ backend_response_checker_t brc,
+ backend_request_formatter_t brf)
+{
+ (void)default_port;
+ (void)brf;
+#ifndef ENABLE_PROMETHEUS_REMOTE_WRITE
+ (void)brc;
+#endif
+
+#if ENABLE_PROMETHEUS_REMOTE_WRITE
+ *brc = process_prometheus_remote_write_response;
+#endif /* ENABLE_PROMETHEUS_REMOTE_WRITE */
+}
+
+/**
+ * Set JSON variables
+ *
+ * Set the variables necessaries to work with this specific backend.
+ *
+ * @param default_port the default port of the backend
+ * @param brc function called to check the result.
+ * @param brf function called to format the msessage to the backend
+ * @param type the backend string selector.
+ */
+void backend_set_json_variables(int *default_port,
+ backend_response_checker_t brc,
+ backend_request_formatter_t brf)
+{
+ *default_port = 5448;
+ *brc = process_json_response;
+
+ if (BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED)
+ *brf = format_dimension_collected_json_plaintext;
+ else
+ *brf = format_dimension_stored_json_plaintext;
+}
+
+/**
+ * Set OpenTSDB HTTP variables
+ *
+ * Set the variables necessaries to work with this specific backend.
+ *
+ * @param default_port the default port of the backend
+ * @param brc function called to check the result.
+ * @param brf function called to format the msessage to the backend
+ * @param type the backend string selector.
+ */
+void backend_set_opentsdb_http_variables(int *default_port,
+ backend_response_checker_t brc,
+ backend_request_formatter_t brf)
+{
+ *default_port = 4242;
+ *brc = process_opentsdb_response;
+
+ if(BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED)
+ *brf = format_dimension_collected_opentsdb_http;
+ else
+ *brf = format_dimension_stored_opentsdb_http;
+
+}
+
+/**
+ * Set OpenTSDB Telnet variables
+ *
+ * Set the variables necessaries to work with this specific backend.
+ *
+ * @param default_port the default port of the backend
+ * @param brc function called to check the result.
+ * @param brf function called to format the msessage to the backend
+ * @param type the backend string selector.
+ */
+void backend_set_opentsdb_telnet_variables(int *default_port,
+ backend_response_checker_t brc,
+ backend_request_formatter_t brf)
+{
+ *default_port = 4242;
+ *brc = process_opentsdb_response;
+
+ if(BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED)
+ *brf = format_dimension_collected_opentsdb_telnet;
+ else
+ *brf = format_dimension_stored_opentsdb_telnet;
+}
+
+/**
+ * Set Graphite variables
+ *
+ * Set the variables necessaries to work with this specific backend.
+ *
+ * @param default_port the default port of the backend
+ * @param brc function called to check the result.
+ * @param brf function called to format the msessage to the backend
+ * @param type the backend string selector.
+ */
+void backend_set_graphite_variables(int *default_port,
+ backend_response_checker_t brc,
+ backend_request_formatter_t brf)
+{
+ *default_port = 2003;
+ *brc = process_graphite_response;
+
+ if(BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED)
+ *brf = format_dimension_collected_graphite_plaintext;
+ else
+ *brf = format_dimension_stored_graphite_plaintext;
+}
+
+/**
+ * Select Type
+ *
+ * Select the backedn type based in the user input
+ *
+ * @param type is the string that defines the backend type
+ *
+ * @return It returns the backend id.
+ */
+BACKEND_TYPE backend_select_type(const char *type) {
+ if(!strcmp(type, "graphite") || !strcmp(type, "graphite:plaintext")) {
+ return BACKEND_TYPE_GRAPHITE;
+ }
+ else if(!strcmp(type, "opentsdb") || !strcmp(type, "opentsdb:telnet")) {
+ return BACKEND_TYPE_OPENTSDB_USING_TELNET;
+ }
+ else if(!strcmp(type, "opentsdb:http") || !strcmp(type, "opentsdb:https")) {
+ return BACKEND_TYPE_OPENTSDB_USING_HTTP;
+ }
+ else if (!strcmp(type, "json") || !strcmp(type, "json:plaintext")) {
+ return BACKEND_TYPE_JSON;
+ }
+ else if (!strcmp(type, "prometheus_remote_write")) {
+ return BACKEND_TYPE_PROMETEUS;
+ }
+ else if (!strcmp(type, "kinesis") || !strcmp(type, "kinesis:plaintext")) {
+ return BACKEND_TYPE_KINESIS;
+ }
+
+ return BACKEND_TYPE_UNKNOWN;
+}
+
+/**
+ * Backend main
+ *
+ * The main thread used to control the backedns.
+ *
+ * @param ptr a pointer to netdata_static_structure.
+ *
+ * @return It always return NULL.
+ */
void *backends_main(void *ptr) {
netdata_thread_cleanup_push(backends_main_cleanup, ptr);
@@ -260,6 +448,15 @@ void *backends_main(void *ptr) {
char *kinesis_auth_key_id = NULL, *kinesis_secure_key = NULL, *kinesis_stream_name = NULL;
#endif
+#if ENABLE_PROMETHEUS_REMOTE_WRITE
+ int do_prometheus_remote_write = 0;
+ BUFFER *http_request_header = buffer_create(1);
+#endif
+
+#ifdef ENABLE_HTTPS
+ struct netdata_ssl opentsdb_ssl = {NULL , NETDATA_SSL_START};
+#endif
+
// ------------------------------------------------------------------------
// collect configuration options
@@ -285,6 +482,10 @@ void *backends_main(void *ptr) {
charts_pattern = simple_pattern_create(config_get(CONFIG_SECTION_BACKEND, "send charts matching", "*"), NULL, SIMPLE_PATTERN_EXACT);
hosts_pattern = simple_pattern_create(config_get(CONFIG_SECTION_BACKEND, "send hosts matching", "localhost *"), NULL, SIMPLE_PATTERN_EXACT);
+#if ENABLE_PROMETHEUS_REMOTE_WRITE
+ const char *remote_write_path = config_get(CONFIG_SECTION_BACKEND, "remote write URL path", "/receive");
+#endif
+
// ------------------------------------------------------------------------
// validate configuration options
// and prepare for sending data to our backend
@@ -303,90 +504,95 @@ void *backends_main(void *ptr) {
// ------------------------------------------------------------------------
// select the backend type
-
- if(!strcmp(type, "graphite") || !strcmp(type, "graphite:plaintext")) {
-
- default_port = 2003;
- backend_response_checker = process_graphite_response;
-
- if(BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED)
- backend_request_formatter = format_dimension_collected_graphite_plaintext;
- else
- backend_request_formatter = format_dimension_stored_graphite_plaintext;
-
- }
- else if(!strcmp(type, "opentsdb") || !strcmp(type, "opentsdb:telnet")) {
-
- default_port = 4242;
- backend_response_checker = process_opentsdb_response;
-
- if(BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED)
- backend_request_formatter = format_dimension_collected_opentsdb_telnet;
- else
- backend_request_formatter = format_dimension_stored_opentsdb_telnet;
-
+ BACKEND_TYPE work_type = backend_select_type(type);
+ if (work_type == BACKEND_TYPE_UNKNOWN) {
+ error("BACKEND: Unknown backend type '%s'", type);
+ goto cleanup;
}
- else if (!strcmp(type, "json") || !strcmp(type, "json:plaintext")) {
- default_port = 5448;
- backend_response_checker = process_json_response;
-
- if (BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED)
- backend_request_formatter = format_dimension_collected_json_plaintext;
- else
- backend_request_formatter = format_dimension_stored_json_plaintext;
+ switch (work_type) {
+ case BACKEND_TYPE_OPENTSDB_USING_HTTP: {
+#ifdef ENABLE_HTTPS
+ if (!strcmp(type, "opentsdb:https")) {
+ security_start_ssl(NETDATA_SSL_CONTEXT_OPENTSDB);
+ }
+#endif
+ backend_set_opentsdb_http_variables(&default_port,&backend_response_checker,&backend_request_formatter);
+ break;
+ }
+ case BACKEND_TYPE_PROMETEUS: {
+#if ENABLE_PROMETHEUS_REMOTE_WRITE
+ do_prometheus_remote_write = 1;
- }
+ init_write_request();
+#else
+ error("BACKEND: Prometheus remote write support isn't compiled");
+#endif // ENABLE_PROMETHEUS_REMOTE_WRITE
+ backend_set_prometheus_variables(&default_port,&backend_response_checker,&backend_request_formatter);
+ break;
+ }
+ case BACKEND_TYPE_KINESIS: {
#if HAVE_KINESIS
- else if (!strcmp(type, "kinesis") || !strcmp(type, "kinesis:plaintext")) {
+ do_kinesis = 1;
- do_kinesis = 1;
+ if(unlikely(read_kinesis_conf(netdata_configured_user_config_dir, &kinesis_auth_key_id, &kinesis_secure_key, &kinesis_stream_name))) {
+ error("BACKEND: kinesis backend type is set but cannot read its configuration from %s/aws_kinesis.conf", netdata_configured_user_config_dir);
+ goto cleanup;
+ }
- if(unlikely(read_kinesis_conf(netdata_configured_user_config_dir, &kinesis_auth_key_id, &kinesis_secure_key, &kinesis_stream_name))) {
- error("BACKEND: kinesis backend type is set but cannot read its configuration from %s/aws_kinesis.conf", netdata_configured_user_config_dir);
- goto cleanup;
+ kinesis_init(destination, kinesis_auth_key_id, kinesis_secure_key, timeout.tv_sec * 1000 + timeout.tv_usec / 1000);
+#else
+ error("BACKEND: AWS Kinesis support isn't compiled");
+#endif // HAVE_KINESIS
+ backend_set_kinesis_variables(&default_port,&backend_response_checker,&backend_request_formatter);
+ break;
+ }
+ case BACKEND_TYPE_GRAPHITE: {
+ backend_set_graphite_variables(&default_port,&backend_response_checker,&backend_request_formatter);
+ break;
+ }
+ case BACKEND_TYPE_OPENTSDB_USING_TELNET: {
+ backend_set_opentsdb_telnet_variables(&default_port,&backend_response_checker,&backend_request_formatter);
+ break;
+ }
+ case BACKEND_TYPE_JSON: {
+ backend_set_json_variables(&default_port,&backend_response_checker,&backend_request_formatter);
+ break;
+ }
+ case BACKEND_TYPE_UNKNOWN: {
+ break;
}
-
- kinesis_init(destination, kinesis_auth_key_id, kinesis_secure_key, timeout.tv_sec * 1000 + timeout.tv_usec / 1000);
-
- backend_response_checker = process_json_response;
- if (BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED)
- backend_request_formatter = format_dimension_collected_json_plaintext;
- else
- backend_request_formatter = format_dimension_stored_json_plaintext;
-
- }
-#endif /* HAVE_KINESIS */
- else {
- error("BACKEND: Unknown backend type '%s'", type);
- goto cleanup;
}
+#if ENABLE_PROMETHEUS_REMOTE_WRITE
+ if((backend_request_formatter == NULL && !do_prometheus_remote_write) || backend_response_checker == NULL) {
+#else
if(backend_request_formatter == NULL || backend_response_checker == NULL) {
+#endif
error("BACKEND: backend is misconfigured - disabling it.");
goto cleanup;
}
- // ------------------------------------------------------------------------
- // prepare the charts for monitoring the backend operation
+// ------------------------------------------------------------------------
+// prepare the charts for monitoring the backend operation
struct rusage thread;
collected_number
- chart_buffered_metrics = 0,
- chart_lost_metrics = 0,
- chart_sent_metrics = 0,
- chart_buffered_bytes = 0,
- chart_received_bytes = 0,
- chart_sent_bytes = 0,
- chart_receptions = 0,
- chart_transmission_successes = 0,
- chart_transmission_failures = 0,
- chart_data_lost_events = 0,
- chart_lost_bytes = 0,
- chart_backend_reconnects = 0;
- // chart_backend_latency = 0;
+ chart_buffered_metrics = 0,
+ chart_lost_metrics = 0,
+ chart_sent_metrics = 0,
+ chart_buffered_bytes = 0,
+ chart_received_bytes = 0,
+ chart_sent_bytes = 0,
+ chart_receptions = 0,
+ chart_transmission_successes = 0,
+ chart_transmission_failures = 0,
+ chart_data_lost_events = 0,
+ chart_lost_bytes = 0,
+ chart_backend_reconnects = 0;
+ // chart_backend_latency = 0;
RRDSET *chart_metrics = rrdset_create_localhost("netdata", "backend_metrics", NULL, "backend", NULL, "Netdata Buffered Metrics", "metrics", "backends", NULL, 130600, global_backend_update_every, RRDSET_TYPE_LINE);
rrddim_add(chart_metrics, "buffered", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
@@ -407,12 +613,12 @@ void *backends_main(void *ptr) {
rrddim_add(chart_ops, "read", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
/*
- * this is misleading - we can only measure the time we need to send data
- * this time is not related to the time required for the data to travel to
- * the backend database and the time that server needed to process them
- *
- * issue #1432 and https://www.softlab.ntua.gr/facilities/documentation/unix/unix-socket-faq/unix-socket-faq-2.html
- *
+ * this is misleading - we can only measure the time we need to send data
+ * this time is not related to the time required for the data to travel to
+ * the backend database and the time that server needed to process them
+ *
+ * issue #1432 and https://www.softlab.ntua.gr/facilities/documentation/unix/unix-socket-faq/unix-socket-faq-2.html
+ *
RRDSET *chart_latency = rrdset_create_localhost("netdata", "backend_latency", NULL, "backend", NULL, "Netdata Backend Latency", "ms", "backends", NULL, 130620, global_backend_update_every, RRDSET_TYPE_AREA);
rrddim_add(chart_latency, "latency", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE);
*/
@@ -451,6 +657,9 @@ void *backends_main(void *ptr) {
size_t count_charts_total = 0;
size_t count_dims_total = 0;
+#if ENABLE_PROMETHEUS_REMOTE_WRITE
+ clear_write_request();
+#endif
rrd_rdlock();
RRDHOST *host;
rrdhost_foreach_read(host) {
@@ -478,26 +687,45 @@ void *backends_main(void *ptr) {
const char *__hostname = (host == localhost)?hostname:host->hostname;
- RRDSET *st;
- rrdset_foreach_read(st, host) {
- if(likely(backends_can_send_rrdset(global_backend_options, st))) {
- rrdset_rdlock(st);
-
- count_charts++;
-
- RRDDIM *rd;
- rrddim_foreach_read(rd, st) {
- if (likely(rd->last_collected_time.tv_sec >= after)) {
- chart_buffered_metrics += backend_request_formatter(b, global_backend_prefix, host, __hostname, st, rd, after, before, global_backend_options);
- count_dims++;
- }
- else {
- debug(D_BACKEND, "BACKEND: not sending dimension '%s' of chart '%s' from host '%s', its last data collection (%lu) is not within our timeframe (%lu to %lu)", rd->id, st->id, __hostname, (unsigned long)rd->last_collected_time.tv_sec, (unsigned long)after, (unsigned long)before);
- count_dims_skipped++;
+#if ENABLE_PROMETHEUS_REMOTE_WRITE
+ if(do_prometheus_remote_write) {
+ rrd_stats_remote_write_allmetrics_prometheus(
+ host
+ , __hostname
+ , global_backend_prefix
+ , global_backend_options
+ , after
+ , before
+ , &count_charts
+ , &count_dims
+ , &count_dims_skipped
+ );
+ chart_buffered_metrics += count_dims;
+ }
+ else
+#endif
+ {
+ RRDSET *st;
+ rrdset_foreach_read(st, host) {
+ if(likely(backends_can_send_rrdset(global_backend_options, st))) {
+ rrdset_rdlock(st);
+
+ count_charts++;
+
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st) {
+ if (likely(rd->last_collected_time.tv_sec >= after)) {
+ chart_buffered_metrics += backend_request_formatter(b, global_backend_prefix, host, __hostname, st, rd, after, before, global_backend_options);
+ count_dims++;
+ }
+ else {
+ debug(D_BACKEND, "BACKEND: not sending dimension '%s' of chart '%s' from host '%s', its last data collection (%lu) is not within our timeframe (%lu to %lu)", rd->id, st->id, __hostname, (unsigned long)rd->last_collected_time.tv_sec, (unsigned long)after, (unsigned long)before);
+ count_dims_skipped++;
+ }
}
- }
- rrdset_unlock(st);
+ rrdset_unlock(st);
+ }
}
}
@@ -621,7 +849,16 @@ void *backends_main(void *ptr) {
while(sock != -1 && errno != EWOULDBLOCK) {
buffer_need_bytes(response, 4096);
- ssize_t r = recv(sock, &response->buffer[response->len], response->size - response->len, MSG_DONTWAIT);
+ ssize_t r;
+#ifdef ENABLE_HTTPS
+ if(opentsdb_ssl.conn && !opentsdb_ssl.flags) {
+ r = SSL_read(opentsdb_ssl.conn, &response->buffer[response->len], response->size - response->len);
+ } else {
+ r = recv(sock, &response->buffer[response->len], response->size - response->len, MSG_DONTWAIT);
+ }
+#else
+ r = recv(sock, &response->buffer[response->len], response->size - response->len, MSG_DONTWAIT);
+#endif
if(likely(r > 0)) {
// we received some data
response->len += r;
@@ -654,7 +891,37 @@ void *backends_main(void *ptr) {
size_t reconnects = 0;
sock = connect_to_one_of(destination, default_port, &timeout, &reconnects, NULL, 0);
+#ifdef ENABLE_HTTPS
+ if(sock != -1) {
+ if(netdata_opentsdb_ctx) {
+ if(!opentsdb_ssl.conn) {
+ opentsdb_ssl.conn = SSL_new(netdata_opentsdb_ctx);
+ if(!opentsdb_ssl.conn) {
+ error("Failed to allocate SSL structure %d.", sock);
+ opentsdb_ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
+ }
+ } else {
+ SSL_clear(opentsdb_ssl.conn);
+ }
+ }
+ if(opentsdb_ssl.conn) {
+ if(SSL_set_fd(opentsdb_ssl.conn, sock) != 1) {
+ error("Failed to set the socket to the SSL on socket fd %d.", host->rrdpush_sender_socket);
+ opentsdb_ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
+ } else {
+ opentsdb_ssl.flags = NETDATA_SSL_HANDSHAKE_COMPLETE;
+ SSL_set_connect_state(opentsdb_ssl.conn);
+ int err = SSL_connect(opentsdb_ssl.conn);
+ if (err != 1) {
+ err = SSL_get_error(opentsdb_ssl.conn, err);
+ error("SSL cannot connect with the server: %s ", ERR_error_string((long)SSL_get_error(opentsdb_ssl.conn, err), NULL));
+ opentsdb_ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
+ } //TODO: check certificate here
+ }
+ }
+ }
+#endif
chart_backend_reconnects += reconnects;
// chart_backend_latency += now_monotonic_usec() - start_ut;
}
@@ -672,7 +939,54 @@ void *backends_main(void *ptr) {
flags += MSG_NOSIGNAL;
#endif
- ssize_t written = send(sock, buffer_tostring(b), len, flags);
+#if ENABLE_PROMETHEUS_REMOTE_WRITE
+ if(do_prometheus_remote_write) {
+ size_t data_size = get_write_request_size();
+
+ if(unlikely(!data_size)) {
+ error("BACKEND: write request size is out of range");
+ continue;
+ }
+
+ buffer_flush(b);
+ buffer_need_bytes(b, data_size);
+ if(unlikely(pack_write_request(b->buffer, &data_size))) {
+ error("BACKEND: cannot pack write request");
+ continue;
+ }
+ b->len = data_size;
+ chart_buffered_bytes = (collected_number)buffer_strlen(b);
+
+ buffer_flush(http_request_header);
+ buffer_sprintf(http_request_header,
+ "POST %s HTTP/1.1\r\n"
+ "Host: %s\r\n"
+ "Accept: */*\r\n"
+ "Content-Length: %zu\r\n"
+ "Content-Type: application/x-www-form-urlencoded\r\n\r\n",
+ remote_write_path,
+ hostname,
+ data_size
+ );
+
+ len = buffer_strlen(http_request_header);
+ send(sock, buffer_tostring(http_request_header), len, flags);
+
+ len = data_size;
+ }
+#endif
+
+ ssize_t written;
+#ifdef ENABLE_HTTPS
+ if(opentsdb_ssl.conn && !opentsdb_ssl.flags) {
+ written = SSL_write(opentsdb_ssl.conn, buffer_tostring(b), len);
+ } else {
+ written = send(sock, buffer_tostring(b), len, flags);
+ }
+#else
+ written = send(sock, buffer_tostring(b), len, flags);
+#endif
+
// chart_backend_latency += now_monotonic_usec() - start_ut;
if(written != -1 && (size_t)written == len) {
// we sent the data successfully
@@ -711,6 +1025,16 @@ void *backends_main(void *ptr) {
}
}
+
+#if ENABLE_PROMETHEUS_REMOTE_WRITE
+ if(failures) {
+ (void) buffer_on_failures;
+ failures = 0;
+ chart_lost_bytes = chart_buffered_bytes = get_write_request_size(); // estimated write request size
+ chart_data_lost_events++;
+ chart_lost_metrics = chart_buffered_metrics;
+ }
+#else
if(failures > buffer_on_failures) {
// too bad! we are going to lose data
chart_lost_bytes += buffer_strlen(b);
@@ -720,6 +1044,7 @@ void *backends_main(void *ptr) {
chart_data_lost_events++;
chart_lost_metrics = chart_buffered_metrics;
}
+#endif /* ENABLE_PROMETHEUS_REMOTE_WRITE */
if(unlikely(netdata_exit)) break;
@@ -775,12 +1100,27 @@ cleanup:
}
#endif
+#if ENABLE_PROMETHEUS_REMOTE_WRITE
+ if(do_prometheus_remote_write) {
+ buffer_free(http_request_header);
+ protocol_buffers_shutdown();
+ }
+#endif
+
if(sock != -1)
close(sock);
buffer_free(b);
buffer_free(response);
+#ifdef ENABLE_HTTPS
+ if(netdata_opentsdb_ctx) {
+ if(opentsdb_ssl.conn) {
+ SSL_free(opentsdb_ssl.conn);
+ }
+ }
+#endif
+
netdata_thread_cleanup_pop(1);
return NULL;
}
diff --git a/backends/backends.h b/backends/backends.h
index 118665496..8d0bda413 100644
--- a/backends/backends.h
+++ b/backends/backends.h
@@ -15,6 +15,20 @@ typedef enum backend_options {
BACKEND_OPTION_SEND_NAMES = (1 << 16)
} BACKEND_OPTIONS;
+typedef enum backend_types {
+ BACKEND_TYPE_UNKNOWN, //Invalid type
+ BACKEND_TYPE_GRAPHITE, //Send plain text to Graphite
+ BACKEND_TYPE_OPENTSDB_USING_TELNET, //Send data to OpenTSDB using telnet API
+ BACKEND_TYPE_OPENTSDB_USING_HTTP, //Send data to OpenTSDB using HTTP API
+ BACKEND_TYPE_JSON, //Stores the data using JSON.
+ BACKEND_TYPE_PROMETEUS, //The user selected to use Prometheus backend
+ BACKEND_TYPE_KINESIS //Send message to AWS Kinesis
+} BACKEND_TYPE;
+
+
+typedef int (**backend_response_checker_t)(BUFFER *);
+typedef int (**backend_request_formatter_t)(BUFFER *, const char *, RRDHOST *, const char *, RRDSET *, RRDDIM *, time_t, time_t, BACKEND_OPTIONS);
+
#define BACKEND_OPTIONS_SOURCE_BITS (BACKEND_SOURCE_DATA_AS_COLLECTED|BACKEND_SOURCE_DATA_AVERAGE|BACKEND_SOURCE_DATA_SUM)
#define BACKEND_OPTIONS_DATA_SOURCE(backend_options) (backend_options & BACKEND_OPTIONS_SOURCE_BITS)
@@ -53,4 +67,8 @@ extern int discard_response(BUFFER *b, const char *backend);
#include "backends/aws_kinesis/aws_kinesis.h"
#endif
+#if ENABLE_PROMETHEUS_REMOTE_WRITE
+#include "backends/prometheus/remote_write/remote_write.h"
+#endif
+
#endif /* NETDATA_BACKENDS_H */
diff --git a/backends/opentsdb/README.md b/backends/opentsdb/README.md
new file mode 100644
index 000000000..3d57e2e1a
--- /dev/null
+++ b/backends/opentsdb/README.md
@@ -0,0 +1,26 @@
+# OpenTSDB with HTTP
+
+Since version 1.16 the Netdata has the feature to communicate with OpenTSDB using HTTP API. To enable this channel
+it is necessary to set the following options in your netdata.conf
+
+```
+[backend]
+ type = opentsdb:http
+ destination = localhost:4242
+```
+
+, in this example we are considering that OpenTSDB is running with its default port (4242).
+
+## HTTPS
+
+Netdata also supports sending the metrics using SSL/TLS, but OpenTDSB does not have support to safety connections,
+so it will be necessary to configure a reverse-proxy to enable the HTTPS communication. After to configure your proxy the
+following changes must be done in the netdata.conf:
+
+```
+[backend]
+ type = opentsdb:https
+ destination = localhost:8082
+```
+
+In this example we used the port 8082 for our reverse proxy.
diff --git a/backends/opentsdb/opentsdb.c b/backends/opentsdb/opentsdb.c
index 6e3a31ab6..6ee559dbf 100644
--- a/backends/opentsdb/opentsdb.c
+++ b/backends/opentsdb/opentsdb.c
@@ -80,6 +80,7 @@ int format_dimension_stored_opentsdb_telnet(
return 1;
}
+
return 0;
}
@@ -87,4 +88,118 @@ int process_opentsdb_response(BUFFER *b) {
return discard_response(b, "opentsdb");
}
+static inline void opentsdb_build_message(BUFFER *b, char *message, const char *hostname, int length) {
+ buffer_sprintf(
+ b
+ , "POST /api/put HTTP/1.1\r\n"
+ "Host: %s\r\n"
+ "Content-Type: application/json\r\n"
+ "Content-Length: %d\r\n"
+ "\r\n"
+ "%s"
+ , hostname
+ , length
+ , message
+ );
+}
+
+int format_dimension_collected_opentsdb_http(
+ BUFFER *b // the buffer to write data to
+ , const char *prefix // the prefix to use
+ , RRDHOST *host // the host this chart comes from
+ , const char *hostname // the hostname (to override host->hostname)
+ , RRDSET *st // the chart
+ , RRDDIM *rd // the dimension
+ , time_t after // the start timestamp
+ , time_t before // the end timestamp
+ , BACKEND_OPTIONS backend_options // BACKEND_SOURCE_* bitmap
+) {
+ (void)host;
+ (void)after;
+ (void)before;
+
+ char message[1024];
+ char chart_name[RRD_ID_LENGTH_MAX + 1];
+ char dimension_name[RRD_ID_LENGTH_MAX + 1];
+ backend_name_copy(chart_name, (backend_options & BACKEND_OPTION_SEND_NAMES && st->name)?st->name:st->id, RRD_ID_LENGTH_MAX);
+ backend_name_copy(dimension_name, (backend_options & BACKEND_OPTION_SEND_NAMES && rd->name)?rd->name:rd->id, RRD_ID_LENGTH_MAX);
+
+ int length = snprintfz(message
+ , sizeof(message)
+ , "{"
+ " \"metric\": \"%s.%s.%s\","
+ " \"timestamp\": %llu,"
+ " \"value\": "COLLECTED_NUMBER_FORMAT ","
+ " \"tags\": {"
+ " \"host\": \"%s%s%s\""
+ " }"
+ "}"
+ , prefix
+ , chart_name
+ , dimension_name
+ , (unsigned long long)rd->last_collected_time.tv_sec
+ , rd->last_collected_value
+ , hostname
+ , (host->tags)?" ":""
+ , (host->tags)?host->tags:""
+ );
+
+ if(length > 0) {
+ opentsdb_build_message(b, message, hostname, length);
+ }
+
+ return 1;
+}
+int format_dimension_stored_opentsdb_http(
+ BUFFER *b // the buffer to write data to
+ , const char *prefix // the prefix to use
+ , RRDHOST *host // the host this chart comes from
+ , const char *hostname // the hostname (to override host->hostname)
+ , RRDSET *st // the chart
+ , RRDDIM *rd // the dimension
+ , time_t after // the start timestamp
+ , time_t before // the end timestamp
+ , BACKEND_OPTIONS backend_options // BACKEND_SOURCE_* bitmap
+) {
+ (void)host;
+
+ time_t first_t = after, last_t = before;
+ calculated_number value = backend_calculate_value_from_stored_data(st, rd, after, before, backend_options, &first_t, &last_t);
+
+ if(!isnan(value)) {
+ char chart_name[RRD_ID_LENGTH_MAX + 1];
+ char dimension_name[RRD_ID_LENGTH_MAX + 1];
+ backend_name_copy(chart_name, (backend_options & BACKEND_OPTION_SEND_NAMES && st->name)?st->name:st->id, RRD_ID_LENGTH_MAX);
+ backend_name_copy(dimension_name, (backend_options & BACKEND_OPTION_SEND_NAMES && rd->name)?rd->name:rd->id, RRD_ID_LENGTH_MAX);
+
+ char message[1024];
+ int length = snprintfz(message
+ , sizeof(message)
+ , "{"
+ " \"metric\": \"%s.%s.%s\","
+ " \"timestamp\": %llu,"
+ " \"value\": "CALCULATED_NUMBER_FORMAT ","
+ " \"tags\": {"
+ " \"host\": \"%s%s%s\""
+ " }"
+ "}"
+ , prefix
+ , chart_name
+ , dimension_name
+ , (unsigned long long)last_t
+ , value
+ , hostname
+ , (host->tags)?" ":""
+ , (host->tags)?host->tags:""
+ );
+
+ if(length > 0) {
+ opentsdb_build_message(b, message, hostname, length);
+ }
+
+ return 1;
+ }
+
+ return 0;
+}
diff --git a/backends/opentsdb/opentsdb.h b/backends/opentsdb/opentsdb.h
index fc83b39ca..b9372d914 100644
--- a/backends/opentsdb/opentsdb.h
+++ b/backends/opentsdb/opentsdb.h
@@ -31,5 +31,28 @@ extern int format_dimension_stored_opentsdb_telnet(
extern int process_opentsdb_response(BUFFER *b);
+int format_dimension_collected_opentsdb_http(
+ BUFFER *b // the buffer to write data to
+ , const char *prefix // the prefix to use
+ , RRDHOST *host // the host this chart comes from
+ , const char *hostname // the hostname (to override host->hostname)
+ , RRDSET *st // the chart
+ , RRDDIM *rd // the dimension
+ , time_t after // the start timestamp
+ , time_t before // the end timestamp
+ , BACKEND_OPTIONS backend_options // BACKEND_SOURCE_* bitmap
+);
+
+int format_dimension_stored_opentsdb_http(
+ BUFFER *b // the buffer to write data to
+ , const char *prefix // the prefix to use
+ , RRDHOST *host // the host this chart comes from
+ , const char *hostname // the hostname (to override host->hostname)
+ , RRDSET *st // the chart
+ , RRDDIM *rd // the dimension
+ , time_t after // the start timestamp
+ , time_t before // the end timestamp
+ , BACKEND_OPTIONS backend_options // BACKEND_SOURCE_* bitmap
+);
#endif //NETDATA_BACKEND_OPENTSDB_H
diff --git a/backends/prometheus/Makefile.am b/backends/prometheus/Makefile.am
index 19554bed8..e5f74851a 100644
--- a/backends/prometheus/Makefile.am
+++ b/backends/prometheus/Makefile.am
@@ -3,6 +3,10 @@
AUTOMAKE_OPTIONS = subdir-objects
MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+SUBDIRS = \
+ remote_write \
+ $(NULL)
+
dist_noinst_DATA = \
README.md \
$(NULL)
diff --git a/backends/prometheus/backend_prometheus.c b/backends/prometheus/backend_prometheus.c
index 3641b07c5..67342ea7a 100644
--- a/backends/prometheus/backend_prometheus.c
+++ b/backends/prometheus/backend_prometheus.c
@@ -153,6 +153,8 @@ static inline char *prometheus_units_copy(char *d, const char *s, size_t usable,
#define PROMETHEUS_LABELS_MAX 1024
#define PROMETHEUS_VARIABLE_MAX 256
+#define PROMETHEUS_LABELS_MAX_NUMBER 128
+
struct host_variables_callback_options {
RRDHOST *host;
BUFFER *wb;
@@ -307,7 +309,7 @@ static void rrd_stats_api_v1_charts_allmetrics_prometheus(RRDHOST *host, BUFFER
int as_collected = (BACKEND_OPTIONS_DATA_SOURCE(backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED);
int homogeneous = 1;
if(as_collected) {
- if(rrdset_flag_check(st, RRDSET_FLAG_HOMEGENEOUS_CHECK))
+ if(rrdset_flag_check(st, RRDSET_FLAG_HOMOGENEOUS_CHECK))
rrdset_update_heterogeneous_flag(st);
if(rrdset_flag_check(st, RRDSET_FLAG_HETEROGENEOUS))
@@ -537,6 +539,177 @@ static void rrd_stats_api_v1_charts_allmetrics_prometheus(RRDHOST *host, BUFFER
rrdhost_unlock(host);
}
+#if ENABLE_PROMETHEUS_REMOTE_WRITE
+inline static void remote_write_split_words(char *str, char **words, int max_words) {
+ char *s = str;
+ int i = 0;
+
+ while(*s && i < max_words - 1) {
+ while(*s && isspace(*s)) s++; // skip spaces to the begining of a tag name
+
+ if(*s)
+ words[i] = s;
+
+ while(*s && !isspace(*s) && *s != '=') s++; // find the end of the tag name
+
+ if(*s != '=') {
+ words[i] = NULL;
+ break;
+ }
+ *s = '\0';
+ s++;
+ i++;
+
+ while(*s && isspace(*s)) s++; // skip spaces to the begining of a tag value
+
+ if(*s && *s == '"') s++; // strip an opening quote
+ if(*s)
+ words[i] = s;
+
+ while(*s && !isspace(*s) && *s != ',') s++; // find the end of the tag value
+
+ if(*s && *s != ',') {
+ words[i] = NULL;
+ break;
+ }
+ if(s != words[i] && *(s - 1) == '"') *(s - 1) = '\0'; // strip a closing quote
+ if(*s != '\0') {
+ *s = '\0';
+ s++;
+ i++;
+ }
+ }
+}
+
+void rrd_stats_remote_write_allmetrics_prometheus(
+ RRDHOST *host
+ , const char *__hostname
+ , const char *prefix
+ , BACKEND_OPTIONS backend_options
+ , time_t after
+ , time_t before
+ , size_t *count_charts
+ , size_t *count_dims
+ , size_t *count_dims_skipped
+) {
+ char hostname[PROMETHEUS_ELEMENT_MAX + 1];
+ prometheus_label_copy(hostname, __hostname, PROMETHEUS_ELEMENT_MAX);
+
+ add_host_info("netdata_info", hostname, host->program_name, host->program_version, now_realtime_usec() / USEC_PER_MS);
+
+ if(host->tags && *(host->tags)) {
+ char tags[PROMETHEUS_LABELS_MAX + 1];
+ strncpy(tags, host->tags, PROMETHEUS_LABELS_MAX);
+ char *words[PROMETHEUS_LABELS_MAX_NUMBER] = {NULL};
+ int i;
+
+ remote_write_split_words(tags, words, PROMETHEUS_LABELS_MAX_NUMBER);
+
+ add_host_info("netdata_host_tags_info", hostname, NULL, NULL, now_realtime_usec() / USEC_PER_MS);
+
+ for(i = 0; words[i] != NULL && words[i + 1] != NULL && (i + 1) < PROMETHEUS_LABELS_MAX_NUMBER; i += 2) {
+ add_tag(words[i], words[i + 1]);
+ }
+ }
+
+ // for each chart
+ RRDSET *st;
+ rrdset_foreach_read(st, host) {
+ char chart[PROMETHEUS_ELEMENT_MAX + 1];
+ char context[PROMETHEUS_ELEMENT_MAX + 1];
+ char family[PROMETHEUS_ELEMENT_MAX + 1];
+ char units[PROMETHEUS_ELEMENT_MAX + 1] = "";
+
+ prometheus_label_copy(chart, (backend_options & BACKEND_OPTION_SEND_NAMES && st->name)?st->name:st->id, PROMETHEUS_ELEMENT_MAX);
+ prometheus_label_copy(family, st->family, PROMETHEUS_ELEMENT_MAX);
+ prometheus_name_copy(context, st->context, PROMETHEUS_ELEMENT_MAX);
+
+ if(likely(backends_can_send_rrdset(backend_options, st))) {
+ rrdset_rdlock(st);
+
+ (*count_charts)++;
+
+ int as_collected = (BACKEND_OPTIONS_DATA_SOURCE(backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED);
+ int homogeneous = 1;
+ if(as_collected) {
+ if(rrdset_flag_check(st, RRDSET_FLAG_HOMOGENEOUS_CHECK))
+ rrdset_update_heterogeneous_flag(st);
+
+ if(rrdset_flag_check(st, RRDSET_FLAG_HETEROGENEOUS))
+ homogeneous = 0;
+ }
+ else {
+ if(BACKEND_OPTIONS_DATA_SOURCE(backend_options) == BACKEND_SOURCE_DATA_AVERAGE)
+ prometheus_units_copy(units, st->units, PROMETHEUS_ELEMENT_MAX, 0);
+ }
+
+ // for each dimension
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st) {
+ if(rd->collections_counter && !rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE)) {
+ char name[PROMETHEUS_LABELS_MAX + 1];
+ char dimension[PROMETHEUS_ELEMENT_MAX + 1];
+ char *suffix = "";
+
+ if (as_collected) {
+ // we need as-collected / raw data
+
+ if(unlikely(rd->last_collected_time.tv_sec < after)) {
+ debug(D_BACKEND, "BACKEND: not sending dimension '%s' of chart '%s' from host '%s', its last data collection (%lu) is not within our timeframe (%lu to %lu)", rd->id, st->id, __hostname, (unsigned long)rd->last_collected_time.tv_sec, (unsigned long)after, (unsigned long)before);
+ (*count_dims_skipped)++;
+ continue;
+ }
+
+ if(homogeneous) {
+ // all the dimensions of the chart, has the same algorithm, multiplier and divisor
+ // we add all dimensions as labels
+
+ prometheus_label_copy(dimension, (backend_options & BACKEND_OPTION_SEND_NAMES && rd->name) ? rd->name : rd->id, PROMETHEUS_ELEMENT_MAX);
+ snprintf(name, PROMETHEUS_LABELS_MAX, "%s_%s%s", prefix, context, suffix);
+
+ add_metric(name, chart, family, dimension, hostname, rd->last_collected_value, timeval_msec(&rd->last_collected_time));
+ (*count_dims)++;
+ }
+ else {
+ // the dimensions of the chart, do not have the same algorithm, multiplier or divisor
+ // we create a metric per dimension
+
+ prometheus_name_copy(dimension, (backend_options & BACKEND_OPTION_SEND_NAMES && rd->name) ? rd->name : rd->id, PROMETHEUS_ELEMENT_MAX);
+ snprintf(name, PROMETHEUS_LABELS_MAX, "%s_%s_%s%s", prefix, context, dimension, suffix);
+
+ add_metric(name, chart, family, NULL, hostname, rd->last_collected_value, timeval_msec(&rd->last_collected_time));
+ (*count_dims)++;
+ }
+ }
+ else {
+ // we need average or sum of the data
+
+ time_t first_t = after, last_t = before;
+ calculated_number value = backend_calculate_value_from_stored_data(st, rd, after, before, backend_options, &first_t, &last_t);
+
+ if(!isnan(value) && !isinf(value)) {
+
+ if(BACKEND_OPTIONS_DATA_SOURCE(backend_options) == BACKEND_SOURCE_DATA_AVERAGE)
+ suffix = "_average";
+ else if(BACKEND_OPTIONS_DATA_SOURCE(backend_options) == BACKEND_SOURCE_DATA_SUM)
+ suffix = "_sum";
+
+ prometheus_label_copy(dimension, (backend_options & BACKEND_OPTION_SEND_NAMES && rd->name) ? rd->name : rd->id, PROMETHEUS_ELEMENT_MAX);
+ snprintf(name, PROMETHEUS_LABELS_MAX, "%s_%s%s%s", prefix, context, units, suffix);
+
+ add_metric(name, chart, family, dimension, hostname, rd->last_collected_value, timeval_msec(&rd->last_collected_time));
+ (*count_dims)++;
+ }
+ }
+ }
+ }
+
+ rrdset_unlock(st);
+ }
+ }
+}
+#endif /* ENABLE_PROMETHEUS_REMOTE_WRITE */
+
static inline time_t prometheus_preparation(RRDHOST *host, BUFFER *wb, BACKEND_OPTIONS backend_options, const char *server, time_t now, PROMETHEUS_OUTPUT_OPTIONS output_options) {
if(!server || !*server) server = "default";
@@ -599,3 +772,26 @@ void rrd_stats_api_v1_charts_allmetrics_prometheus_all_hosts(RRDHOST *host, BUFF
}
rrd_unlock();
}
+
+#if ENABLE_PROMETHEUS_REMOTE_WRITE
+int process_prometheus_remote_write_response(BUFFER *b) {
+ if(unlikely(!b)) return 1;
+
+ const char *s = buffer_tostring(b);
+ int len = buffer_strlen(b);
+
+ // do nothing with HTTP response 200
+
+ while(!isspace(*s) && len) {
+ s++;
+ len--;
+ }
+ s++;
+ len--;
+
+ if(likely(len > 4 && !strncmp(s, "200 ", 4)))
+ return 0;
+ else
+ return discard_response(b, "prometheus remote write");
+}
+#endif
diff --git a/backends/prometheus/backend_prometheus.h b/backends/prometheus/backend_prometheus.h
index 72b65a22b..d58d24004 100644
--- a/backends/prometheus/backend_prometheus.h
+++ b/backends/prometheus/backend_prometheus.h
@@ -19,4 +19,19 @@ typedef enum prometheus_output_flags {
extern void rrd_stats_api_v1_charts_allmetrics_prometheus_single_host(RRDHOST *host, BUFFER *wb, const char *server, const char *prefix, BACKEND_OPTIONS backend_options, PROMETHEUS_OUTPUT_OPTIONS output_options);
extern void rrd_stats_api_v1_charts_allmetrics_prometheus_all_hosts(RRDHOST *host, BUFFER *wb, const char *server, const char *prefix, BACKEND_OPTIONS backend_options, PROMETHEUS_OUTPUT_OPTIONS output_options);
+#if ENABLE_PROMETHEUS_REMOTE_WRITE
+extern void rrd_stats_remote_write_allmetrics_prometheus(
+ RRDHOST *host
+ , const char *__hostname
+ , const char *prefix
+ , BACKEND_OPTIONS backend_options
+ , time_t after
+ , time_t before
+ , size_t *count_charts
+ , size_t *count_dims
+ , size_t *count_dims_skipped
+);
+extern int process_prometheus_remote_write_response(BUFFER *b);
+#endif
+
#endif //NETDATA_BACKEND_PROMETHEUS_H
diff --git a/backends/prometheus/remote_write/Makefile.am b/backends/prometheus/remote_write/Makefile.am
new file mode 100644
index 000000000..5f8f9d4c4
--- /dev/null
+++ b/backends/prometheus/remote_write/Makefile.am
@@ -0,0 +1,14 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+CLEANFILES = \
+ remote_write.pb.cc \
+ remote_write.pb.h \
+ $(NULL)
+
+dist_noinst_DATA = \
+ remote_write.proto \
+ README.md \
+ $(NULL)
diff --git a/backends/prometheus/remote_write/README.md b/backends/prometheus/remote_write/README.md
new file mode 100644
index 000000000..73cb1daf5
--- /dev/null
+++ b/backends/prometheus/remote_write/README.md
@@ -0,0 +1,30 @@
+# Prometheus remote write backend
+
+## Prerequisites
+
+To use the prometheus remote write API with [storage providers](https://prometheus.io/docs/operating/integrations/#remote-endpoints-and-storage) [protobuf](https://developers.google.com/protocol-buffers/) and [snappy](https://github.com/google/snappy) libraries should be installed first. Next, netdata should be re-installed from the source. The installer will detect that the required libraries and utilities are now available.
+
+## Configuration
+
+An additional option in the backend configuration section is available for the remote write backend:
+
+```
+[backend]
+ remote write URL path = /receive
+```
+
+The default value is `/receive`. `remote write URL path` is used to set an endpoint path for the remote write protocol. For example, if your endpoint is `http://example.domain:example_port/storage/read` you should set
+
+```
+[backend]
+ destination = example.domain:example_port
+ remote write URL path = /storage/read
+```
+
+`buffered` and `lost` dimensions in the Netdata Backend Data Size operation monitoring chart estimate uncompressed buffer size on failures.
+
+## Notes
+
+The remote write backend does not support `buffer on failures`
+
+[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fbackends%2Fprometheus%2Fremote_write%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)]()
diff --git a/backends/prometheus/remote_write/remote_write.cc b/backends/prometheus/remote_write/remote_write.cc
new file mode 100644
index 000000000..91d4305ba
--- /dev/null
+++ b/backends/prometheus/remote_write/remote_write.cc
@@ -0,0 +1,117 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include <snappy.h>
+#include "remote_write.pb.h"
+#include "remote_write.h"
+
+using namespace prometheus;
+
+
+google::protobuf::Arena arena;
+WriteRequest *write_request;
+
+void init_write_request() {
+ GOOGLE_PROTOBUF_VERIFY_VERSION;
+ write_request = google::protobuf::Arena::CreateMessage<WriteRequest>(&arena);
+}
+
+void clear_write_request() {
+ write_request->clear_timeseries();
+}
+
+void add_host_info(const char *name, const char *instance, const char *application, const char *version, const int64_t timestamp) {
+ TimeSeries *timeseries;
+ Sample *sample;
+ Label *label;
+
+ timeseries = write_request->add_timeseries();
+
+ label = timeseries->add_labels();
+ label->set_name("__name__");
+ label->set_value(name);
+
+ label = timeseries->add_labels();
+ label->set_name("instance");
+ label->set_value(instance);
+
+ if(application) {
+ label = timeseries->add_labels();
+ label->set_name("application");
+ label->set_value(application);
+ }
+
+ if(version) {
+ label = timeseries->add_labels();
+ label->set_name("version");
+ label->set_value(version);
+ }
+
+ sample = timeseries->add_samples();
+ sample->set_value(1);
+ sample->set_timestamp(timestamp);
+}
+
+// adds tag to the last created timeseries
+void add_tag(char *tag, char *value) {
+ TimeSeries *timeseries;
+ Label *label;
+
+ timeseries = write_request->mutable_timeseries(write_request->timeseries_size() - 1);
+
+ label = timeseries->add_labels();
+ label->set_name(tag);
+ label->set_value(value);
+}
+
+void add_metric(const char *name, const char *chart, const char *family, const char *dimension, const char *instance, const double value, const int64_t timestamp) {
+ TimeSeries *timeseries;
+ Sample *sample;
+ Label *label;
+
+ timeseries = write_request->add_timeseries();
+
+ label = timeseries->add_labels();
+ label->set_name("__name__");
+ label->set_value(name);
+
+ label = timeseries->add_labels();
+ label->set_name("chart");
+ label->set_value(chart);
+
+ label = timeseries->add_labels();
+ label->set_name("family");
+ label->set_value(family);
+
+ if(dimension) {
+ label = timeseries->add_labels();
+ label->set_name("dimension");
+ label->set_value(dimension);
+ }
+
+ label = timeseries->add_labels();
+ label->set_name("instance");
+ label->set_value(instance);
+
+ sample = timeseries->add_samples();
+ sample->set_value(value);
+ sample->set_timestamp(timestamp);
+}
+
+size_t get_write_request_size(){
+ size_t size = (size_t)snappy::MaxCompressedLength(write_request->ByteSize());
+
+ return (size < INT_MAX)?size:0;
+}
+
+int pack_write_request(char *buffer, size_t *size) {
+ std::string uncompressed_write_request;
+ if(write_request->SerializeToString(&uncompressed_write_request) == false) return 1;
+
+ snappy::RawCompress(uncompressed_write_request.data(), uncompressed_write_request.size(), buffer, size);
+
+ return 0;
+}
+
+void protocol_buffers_shutdown() {
+ google::protobuf::ShutdownProtobufLibrary();
+}
diff --git a/backends/prometheus/remote_write/remote_write.h b/backends/prometheus/remote_write/remote_write.h
new file mode 100644
index 000000000..edcc477b8
--- /dev/null
+++ b/backends/prometheus/remote_write/remote_write.h
@@ -0,0 +1,30 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_BACKEND_PROMETHEUS_REMOTE_WRITE_H
+#define NETDATA_BACKEND_PROMETHEUS_REMOTE_WRITE_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+void init_write_request();
+
+void clear_write_request();
+
+void add_host_info(const char *name, const char *instance, const char *application, const char *version, const int64_t timestamp);
+
+void add_tag(char *tag, char *value);
+
+void add_metric(const char *name, const char *chart, const char *family, const char *dimension, const char *instance, const double value, const int64_t timestamp);
+
+size_t get_write_request_size();
+
+int pack_write_request(char *buffer, size_t *size);
+
+void protocol_buffers_shutdown();
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif //NETDATA_BACKEND_PROMETHEUS_REMOTE_WRITE_H
diff --git a/backends/prometheus/remote_write/remote_write.proto b/backends/prometheus/remote_write/remote_write.proto
new file mode 100644
index 000000000..dfde254e1
--- /dev/null
+++ b/backends/prometheus/remote_write/remote_write.proto
@@ -0,0 +1,29 @@
+syntax = "proto3";
+package prometheus;
+
+option cc_enable_arenas = true;
+
+import "google/protobuf/descriptor.proto";
+
+message WriteRequest {
+ repeated TimeSeries timeseries = 1 [(nullable) = false];
+}
+
+message TimeSeries {
+ repeated Label labels = 1 [(nullable) = false];
+ repeated Sample samples = 2 [(nullable) = false];
+}
+
+message Label {
+ string name = 1;
+ string value = 2;
+}
+
+message Sample {
+ double value = 1;
+ int64 timestamp = 2;
+}
+
+extend google.protobuf.FieldOptions {
+ bool nullable = 65001;
+}