diff options
Diffstat (limited to 'backends/backends.c')
-rw-r--r-- | backends/backends.c | 522 |
1 files changed, 431 insertions, 91 deletions
diff --git a/backends/backends.c b/backends/backends.c index 0e7918916..15a0cb41b 100644 --- a/backends/backends.c +++ b/backends/backends.c @@ -246,6 +246,194 @@ static void backends_main_cleanup(void *ptr) { static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; } +/** + * Set Kinesis variables + * + * Set the variables necessaries to work with this specific backend. + * + * @param default_port the default port of the backend + * @param brc function called to check the result. + * @param brf function called to format the msessage to the backend + * @param type the backend string selector. + */ +void backend_set_kinesis_variables(int *default_port, + backend_response_checker_t brc, + backend_request_formatter_t brf) +{ + (void)default_port; +#ifndef HAVE_KINESIS + (void)brc; + (void)brf; +#endif + +#if HAVE_KINESIS + *brc = process_json_response; + if (BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED) + *brf = format_dimension_collected_json_plaintext; + else + *brf = format_dimension_stored_json_plaintext; +#endif +} + +/** + * Set Prometheus variables + * + * Set the variables necessaries to work with this specific backend. + * + * @param default_port the default port of the backend + * @param brc function called to check the result. + * @param brf function called to format the msessage to the backend + * @param type the backend string selector. + */ +void backend_set_prometheus_variables(int *default_port, + backend_response_checker_t brc, + backend_request_formatter_t brf) +{ + (void)default_port; + (void)brf; +#ifndef ENABLE_PROMETHEUS_REMOTE_WRITE + (void)brc; +#endif + +#if ENABLE_PROMETHEUS_REMOTE_WRITE + *brc = process_prometheus_remote_write_response; +#endif /* ENABLE_PROMETHEUS_REMOTE_WRITE */ +} + +/** + * Set JSON variables + * + * Set the variables necessaries to work with this specific backend. + * + * @param default_port the default port of the backend + * @param brc function called to check the result. + * @param brf function called to format the msessage to the backend + * @param type the backend string selector. + */ +void backend_set_json_variables(int *default_port, + backend_response_checker_t brc, + backend_request_formatter_t brf) +{ + *default_port = 5448; + *brc = process_json_response; + + if (BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED) + *brf = format_dimension_collected_json_plaintext; + else + *brf = format_dimension_stored_json_plaintext; +} + +/** + * Set OpenTSDB HTTP variables + * + * Set the variables necessaries to work with this specific backend. + * + * @param default_port the default port of the backend + * @param brc function called to check the result. + * @param brf function called to format the msessage to the backend + * @param type the backend string selector. + */ +void backend_set_opentsdb_http_variables(int *default_port, + backend_response_checker_t brc, + backend_request_formatter_t brf) +{ + *default_port = 4242; + *brc = process_opentsdb_response; + + if(BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED) + *brf = format_dimension_collected_opentsdb_http; + else + *brf = format_dimension_stored_opentsdb_http; + +} + +/** + * Set OpenTSDB Telnet variables + * + * Set the variables necessaries to work with this specific backend. + * + * @param default_port the default port of the backend + * @param brc function called to check the result. + * @param brf function called to format the msessage to the backend + * @param type the backend string selector. + */ +void backend_set_opentsdb_telnet_variables(int *default_port, + backend_response_checker_t brc, + backend_request_formatter_t brf) +{ + *default_port = 4242; + *brc = process_opentsdb_response; + + if(BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED) + *brf = format_dimension_collected_opentsdb_telnet; + else + *brf = format_dimension_stored_opentsdb_telnet; +} + +/** + * Set Graphite variables + * + * Set the variables necessaries to work with this specific backend. + * + * @param default_port the default port of the backend + * @param brc function called to check the result. + * @param brf function called to format the msessage to the backend + * @param type the backend string selector. + */ +void backend_set_graphite_variables(int *default_port, + backend_response_checker_t brc, + backend_request_formatter_t brf) +{ + *default_port = 2003; + *brc = process_graphite_response; + + if(BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED) + *brf = format_dimension_collected_graphite_plaintext; + else + *brf = format_dimension_stored_graphite_plaintext; +} + +/** + * Select Type + * + * Select the backedn type based in the user input + * + * @param type is the string that defines the backend type + * + * @return It returns the backend id. + */ +BACKEND_TYPE backend_select_type(const char *type) { + if(!strcmp(type, "graphite") || !strcmp(type, "graphite:plaintext")) { + return BACKEND_TYPE_GRAPHITE; + } + else if(!strcmp(type, "opentsdb") || !strcmp(type, "opentsdb:telnet")) { + return BACKEND_TYPE_OPENTSDB_USING_TELNET; + } + else if(!strcmp(type, "opentsdb:http") || !strcmp(type, "opentsdb:https")) { + return BACKEND_TYPE_OPENTSDB_USING_HTTP; + } + else if (!strcmp(type, "json") || !strcmp(type, "json:plaintext")) { + return BACKEND_TYPE_JSON; + } + else if (!strcmp(type, "prometheus_remote_write")) { + return BACKEND_TYPE_PROMETEUS; + } + else if (!strcmp(type, "kinesis") || !strcmp(type, "kinesis:plaintext")) { + return BACKEND_TYPE_KINESIS; + } + + return BACKEND_TYPE_UNKNOWN; +} + +/** + * Backend main + * + * The main thread used to control the backedns. + * + * @param ptr a pointer to netdata_static_structure. + * + * @return It always return NULL. + */ void *backends_main(void *ptr) { netdata_thread_cleanup_push(backends_main_cleanup, ptr); @@ -260,6 +448,15 @@ void *backends_main(void *ptr) { char *kinesis_auth_key_id = NULL, *kinesis_secure_key = NULL, *kinesis_stream_name = NULL; #endif +#if ENABLE_PROMETHEUS_REMOTE_WRITE + int do_prometheus_remote_write = 0; + BUFFER *http_request_header = buffer_create(1); +#endif + +#ifdef ENABLE_HTTPS + struct netdata_ssl opentsdb_ssl = {NULL , NETDATA_SSL_START}; +#endif + // ------------------------------------------------------------------------ // collect configuration options @@ -285,6 +482,10 @@ void *backends_main(void *ptr) { charts_pattern = simple_pattern_create(config_get(CONFIG_SECTION_BACKEND, "send charts matching", "*"), NULL, SIMPLE_PATTERN_EXACT); hosts_pattern = simple_pattern_create(config_get(CONFIG_SECTION_BACKEND, "send hosts matching", "localhost *"), NULL, SIMPLE_PATTERN_EXACT); +#if ENABLE_PROMETHEUS_REMOTE_WRITE + const char *remote_write_path = config_get(CONFIG_SECTION_BACKEND, "remote write URL path", "/receive"); +#endif + // ------------------------------------------------------------------------ // validate configuration options // and prepare for sending data to our backend @@ -303,90 +504,95 @@ void *backends_main(void *ptr) { // ------------------------------------------------------------------------ // select the backend type - - if(!strcmp(type, "graphite") || !strcmp(type, "graphite:plaintext")) { - - default_port = 2003; - backend_response_checker = process_graphite_response; - - if(BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED) - backend_request_formatter = format_dimension_collected_graphite_plaintext; - else - backend_request_formatter = format_dimension_stored_graphite_plaintext; - - } - else if(!strcmp(type, "opentsdb") || !strcmp(type, "opentsdb:telnet")) { - - default_port = 4242; - backend_response_checker = process_opentsdb_response; - - if(BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED) - backend_request_formatter = format_dimension_collected_opentsdb_telnet; - else - backend_request_formatter = format_dimension_stored_opentsdb_telnet; - + BACKEND_TYPE work_type = backend_select_type(type); + if (work_type == BACKEND_TYPE_UNKNOWN) { + error("BACKEND: Unknown backend type '%s'", type); + goto cleanup; } - else if (!strcmp(type, "json") || !strcmp(type, "json:plaintext")) { - default_port = 5448; - backend_response_checker = process_json_response; - - if (BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED) - backend_request_formatter = format_dimension_collected_json_plaintext; - else - backend_request_formatter = format_dimension_stored_json_plaintext; + switch (work_type) { + case BACKEND_TYPE_OPENTSDB_USING_HTTP: { +#ifdef ENABLE_HTTPS + if (!strcmp(type, "opentsdb:https")) { + security_start_ssl(NETDATA_SSL_CONTEXT_OPENTSDB); + } +#endif + backend_set_opentsdb_http_variables(&default_port,&backend_response_checker,&backend_request_formatter); + break; + } + case BACKEND_TYPE_PROMETEUS: { +#if ENABLE_PROMETHEUS_REMOTE_WRITE + do_prometheus_remote_write = 1; - } + init_write_request(); +#else + error("BACKEND: Prometheus remote write support isn't compiled"); +#endif // ENABLE_PROMETHEUS_REMOTE_WRITE + backend_set_prometheus_variables(&default_port,&backend_response_checker,&backend_request_formatter); + break; + } + case BACKEND_TYPE_KINESIS: { #if HAVE_KINESIS - else if (!strcmp(type, "kinesis") || !strcmp(type, "kinesis:plaintext")) { + do_kinesis = 1; - do_kinesis = 1; + if(unlikely(read_kinesis_conf(netdata_configured_user_config_dir, &kinesis_auth_key_id, &kinesis_secure_key, &kinesis_stream_name))) { + error("BACKEND: kinesis backend type is set but cannot read its configuration from %s/aws_kinesis.conf", netdata_configured_user_config_dir); + goto cleanup; + } - if(unlikely(read_kinesis_conf(netdata_configured_user_config_dir, &kinesis_auth_key_id, &kinesis_secure_key, &kinesis_stream_name))) { - error("BACKEND: kinesis backend type is set but cannot read its configuration from %s/aws_kinesis.conf", netdata_configured_user_config_dir); - goto cleanup; + kinesis_init(destination, kinesis_auth_key_id, kinesis_secure_key, timeout.tv_sec * 1000 + timeout.tv_usec / 1000); +#else + error("BACKEND: AWS Kinesis support isn't compiled"); +#endif // HAVE_KINESIS + backend_set_kinesis_variables(&default_port,&backend_response_checker,&backend_request_formatter); + break; + } + case BACKEND_TYPE_GRAPHITE: { + backend_set_graphite_variables(&default_port,&backend_response_checker,&backend_request_formatter); + break; + } + case BACKEND_TYPE_OPENTSDB_USING_TELNET: { + backend_set_opentsdb_telnet_variables(&default_port,&backend_response_checker,&backend_request_formatter); + break; + } + case BACKEND_TYPE_JSON: { + backend_set_json_variables(&default_port,&backend_response_checker,&backend_request_formatter); + break; + } + case BACKEND_TYPE_UNKNOWN: { + break; } - - kinesis_init(destination, kinesis_auth_key_id, kinesis_secure_key, timeout.tv_sec * 1000 + timeout.tv_usec / 1000); - - backend_response_checker = process_json_response; - if (BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED) - backend_request_formatter = format_dimension_collected_json_plaintext; - else - backend_request_formatter = format_dimension_stored_json_plaintext; - - } -#endif /* HAVE_KINESIS */ - else { - error("BACKEND: Unknown backend type '%s'", type); - goto cleanup; } +#if ENABLE_PROMETHEUS_REMOTE_WRITE + if((backend_request_formatter == NULL && !do_prometheus_remote_write) || backend_response_checker == NULL) { +#else if(backend_request_formatter == NULL || backend_response_checker == NULL) { +#endif error("BACKEND: backend is misconfigured - disabling it."); goto cleanup; } - // ------------------------------------------------------------------------ - // prepare the charts for monitoring the backend operation +// ------------------------------------------------------------------------ +// prepare the charts for monitoring the backend operation struct rusage thread; collected_number - chart_buffered_metrics = 0, - chart_lost_metrics = 0, - chart_sent_metrics = 0, - chart_buffered_bytes = 0, - chart_received_bytes = 0, - chart_sent_bytes = 0, - chart_receptions = 0, - chart_transmission_successes = 0, - chart_transmission_failures = 0, - chart_data_lost_events = 0, - chart_lost_bytes = 0, - chart_backend_reconnects = 0; - // chart_backend_latency = 0; + chart_buffered_metrics = 0, + chart_lost_metrics = 0, + chart_sent_metrics = 0, + chart_buffered_bytes = 0, + chart_received_bytes = 0, + chart_sent_bytes = 0, + chart_receptions = 0, + chart_transmission_successes = 0, + chart_transmission_failures = 0, + chart_data_lost_events = 0, + chart_lost_bytes = 0, + chart_backend_reconnects = 0; + // chart_backend_latency = 0; RRDSET *chart_metrics = rrdset_create_localhost("netdata", "backend_metrics", NULL, "backend", NULL, "Netdata Buffered Metrics", "metrics", "backends", NULL, 130600, global_backend_update_every, RRDSET_TYPE_LINE); rrddim_add(chart_metrics, "buffered", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); @@ -407,12 +613,12 @@ void *backends_main(void *ptr) { rrddim_add(chart_ops, "read", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); /* - * this is misleading - we can only measure the time we need to send data - * this time is not related to the time required for the data to travel to - * the backend database and the time that server needed to process them - * - * issue #1432 and https://www.softlab.ntua.gr/facilities/documentation/unix/unix-socket-faq/unix-socket-faq-2.html - * + * this is misleading - we can only measure the time we need to send data + * this time is not related to the time required for the data to travel to + * the backend database and the time that server needed to process them + * + * issue #1432 and https://www.softlab.ntua.gr/facilities/documentation/unix/unix-socket-faq/unix-socket-faq-2.html + * RRDSET *chart_latency = rrdset_create_localhost("netdata", "backend_latency", NULL, "backend", NULL, "Netdata Backend Latency", "ms", "backends", NULL, 130620, global_backend_update_every, RRDSET_TYPE_AREA); rrddim_add(chart_latency, "latency", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE); */ @@ -451,6 +657,9 @@ void *backends_main(void *ptr) { size_t count_charts_total = 0; size_t count_dims_total = 0; +#if ENABLE_PROMETHEUS_REMOTE_WRITE + clear_write_request(); +#endif rrd_rdlock(); RRDHOST *host; rrdhost_foreach_read(host) { @@ -478,26 +687,45 @@ void *backends_main(void *ptr) { const char *__hostname = (host == localhost)?hostname:host->hostname; - RRDSET *st; - rrdset_foreach_read(st, host) { - if(likely(backends_can_send_rrdset(global_backend_options, st))) { - rrdset_rdlock(st); - - count_charts++; - - RRDDIM *rd; - rrddim_foreach_read(rd, st) { - if (likely(rd->last_collected_time.tv_sec >= after)) { - chart_buffered_metrics += backend_request_formatter(b, global_backend_prefix, host, __hostname, st, rd, after, before, global_backend_options); - count_dims++; - } - else { - debug(D_BACKEND, "BACKEND: not sending dimension '%s' of chart '%s' from host '%s', its last data collection (%lu) is not within our timeframe (%lu to %lu)", rd->id, st->id, __hostname, (unsigned long)rd->last_collected_time.tv_sec, (unsigned long)after, (unsigned long)before); - count_dims_skipped++; +#if ENABLE_PROMETHEUS_REMOTE_WRITE + if(do_prometheus_remote_write) { + rrd_stats_remote_write_allmetrics_prometheus( + host + , __hostname + , global_backend_prefix + , global_backend_options + , after + , before + , &count_charts + , &count_dims + , &count_dims_skipped + ); + chart_buffered_metrics += count_dims; + } + else +#endif + { + RRDSET *st; + rrdset_foreach_read(st, host) { + if(likely(backends_can_send_rrdset(global_backend_options, st))) { + rrdset_rdlock(st); + + count_charts++; + + RRDDIM *rd; + rrddim_foreach_read(rd, st) { + if (likely(rd->last_collected_time.tv_sec >= after)) { + chart_buffered_metrics += backend_request_formatter(b, global_backend_prefix, host, __hostname, st, rd, after, before, global_backend_options); + count_dims++; + } + else { + debug(D_BACKEND, "BACKEND: not sending dimension '%s' of chart '%s' from host '%s', its last data collection (%lu) is not within our timeframe (%lu to %lu)", rd->id, st->id, __hostname, (unsigned long)rd->last_collected_time.tv_sec, (unsigned long)after, (unsigned long)before); + count_dims_skipped++; + } } - } - rrdset_unlock(st); + rrdset_unlock(st); + } } } @@ -621,7 +849,16 @@ void *backends_main(void *ptr) { while(sock != -1 && errno != EWOULDBLOCK) { buffer_need_bytes(response, 4096); - ssize_t r = recv(sock, &response->buffer[response->len], response->size - response->len, MSG_DONTWAIT); + ssize_t r; +#ifdef ENABLE_HTTPS + if(opentsdb_ssl.conn && !opentsdb_ssl.flags) { + r = SSL_read(opentsdb_ssl.conn, &response->buffer[response->len], response->size - response->len); + } else { + r = recv(sock, &response->buffer[response->len], response->size - response->len, MSG_DONTWAIT); + } +#else + r = recv(sock, &response->buffer[response->len], response->size - response->len, MSG_DONTWAIT); +#endif if(likely(r > 0)) { // we received some data response->len += r; @@ -654,7 +891,37 @@ void *backends_main(void *ptr) { size_t reconnects = 0; sock = connect_to_one_of(destination, default_port, &timeout, &reconnects, NULL, 0); +#ifdef ENABLE_HTTPS + if(sock != -1) { + if(netdata_opentsdb_ctx) { + if(!opentsdb_ssl.conn) { + opentsdb_ssl.conn = SSL_new(netdata_opentsdb_ctx); + if(!opentsdb_ssl.conn) { + error("Failed to allocate SSL structure %d.", sock); + opentsdb_ssl.flags = NETDATA_SSL_NO_HANDSHAKE; + } + } else { + SSL_clear(opentsdb_ssl.conn); + } + } + if(opentsdb_ssl.conn) { + if(SSL_set_fd(opentsdb_ssl.conn, sock) != 1) { + error("Failed to set the socket to the SSL on socket fd %d.", host->rrdpush_sender_socket); + opentsdb_ssl.flags = NETDATA_SSL_NO_HANDSHAKE; + } else { + opentsdb_ssl.flags = NETDATA_SSL_HANDSHAKE_COMPLETE; + SSL_set_connect_state(opentsdb_ssl.conn); + int err = SSL_connect(opentsdb_ssl.conn); + if (err != 1) { + err = SSL_get_error(opentsdb_ssl.conn, err); + error("SSL cannot connect with the server: %s ", ERR_error_string((long)SSL_get_error(opentsdb_ssl.conn, err), NULL)); + opentsdb_ssl.flags = NETDATA_SSL_NO_HANDSHAKE; + } //TODO: check certificate here + } + } + } +#endif chart_backend_reconnects += reconnects; // chart_backend_latency += now_monotonic_usec() - start_ut; } @@ -672,7 +939,54 @@ void *backends_main(void *ptr) { flags += MSG_NOSIGNAL; #endif - ssize_t written = send(sock, buffer_tostring(b), len, flags); +#if ENABLE_PROMETHEUS_REMOTE_WRITE + if(do_prometheus_remote_write) { + size_t data_size = get_write_request_size(); + + if(unlikely(!data_size)) { + error("BACKEND: write request size is out of range"); + continue; + } + + buffer_flush(b); + buffer_need_bytes(b, data_size); + if(unlikely(pack_write_request(b->buffer, &data_size))) { + error("BACKEND: cannot pack write request"); + continue; + } + b->len = data_size; + chart_buffered_bytes = (collected_number)buffer_strlen(b); + + buffer_flush(http_request_header); + buffer_sprintf(http_request_header, + "POST %s HTTP/1.1\r\n" + "Host: %s\r\n" + "Accept: */*\r\n" + "Content-Length: %zu\r\n" + "Content-Type: application/x-www-form-urlencoded\r\n\r\n", + remote_write_path, + hostname, + data_size + ); + + len = buffer_strlen(http_request_header); + send(sock, buffer_tostring(http_request_header), len, flags); + + len = data_size; + } +#endif + + ssize_t written; +#ifdef ENABLE_HTTPS + if(opentsdb_ssl.conn && !opentsdb_ssl.flags) { + written = SSL_write(opentsdb_ssl.conn, buffer_tostring(b), len); + } else { + written = send(sock, buffer_tostring(b), len, flags); + } +#else + written = send(sock, buffer_tostring(b), len, flags); +#endif + // chart_backend_latency += now_monotonic_usec() - start_ut; if(written != -1 && (size_t)written == len) { // we sent the data successfully @@ -711,6 +1025,16 @@ void *backends_main(void *ptr) { } } + +#if ENABLE_PROMETHEUS_REMOTE_WRITE + if(failures) { + (void) buffer_on_failures; + failures = 0; + chart_lost_bytes = chart_buffered_bytes = get_write_request_size(); // estimated write request size + chart_data_lost_events++; + chart_lost_metrics = chart_buffered_metrics; + } +#else if(failures > buffer_on_failures) { // too bad! we are going to lose data chart_lost_bytes += buffer_strlen(b); @@ -720,6 +1044,7 @@ void *backends_main(void *ptr) { chart_data_lost_events++; chart_lost_metrics = chart_buffered_metrics; } +#endif /* ENABLE_PROMETHEUS_REMOTE_WRITE */ if(unlikely(netdata_exit)) break; @@ -775,12 +1100,27 @@ cleanup: } #endif +#if ENABLE_PROMETHEUS_REMOTE_WRITE + if(do_prometheus_remote_write) { + buffer_free(http_request_header); + protocol_buffers_shutdown(); + } +#endif + if(sock != -1) close(sock); buffer_free(b); buffer_free(response); +#ifdef ENABLE_HTTPS + if(netdata_opentsdb_ctx) { + if(opentsdb_ssl.conn) { + SSL_free(opentsdb_ssl.conn); + } + } +#endif + netdata_thread_cleanup_pop(1); return NULL; } |