diff options
Diffstat (limited to 'backends/backends.c')
-rw-r--r-- | backends/backends.c | 78 |
1 files changed, 42 insertions, 36 deletions
diff --git a/backends/backends.c b/backends/backends.c index 120c6e703..6bf583e17 100644 --- a/backends/backends.c +++ b/backends/backends.c @@ -28,6 +28,7 @@ const char *global_backend_prefix = "netdata"; int global_backend_update_every = 10; BACKEND_OPTIONS global_backend_options = BACKEND_SOURCE_DATA_AVERAGE | BACKEND_OPTION_SEND_NAMES; +const char *global_backend_source = NULL; // ---------------------------------------------------------------------------- // helper functions for backends @@ -269,9 +270,9 @@ void backend_set_kinesis_variables(int *default_port, #if HAVE_KINESIS *brc = process_json_response; if (BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED) - *brf = format_dimension_collected_json_plaintext; + *brf = backends_format_dimension_collected_json_plaintext; else - *brf = format_dimension_stored_json_plaintext; + *brf = backends_format_dimension_stored_json_plaintext; #endif } @@ -295,7 +296,7 @@ void backend_set_prometheus_variables(int *default_port, #endif #if ENABLE_PROMETHEUS_REMOTE_WRITE - *brc = process_prometheus_remote_write_response; + *brc = backends_process_prometheus_remote_write_response; #endif /* ENABLE_PROMETHEUS_REMOTE_WRITE */ } @@ -321,9 +322,9 @@ void backend_set_mongodb_variables(int *default_port, #if HAVE_MONGOC *brc = process_json_response; if(BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED) - *brf = format_dimension_collected_json_plaintext; + *brf = backends_format_dimension_collected_json_plaintext; else - *brf = format_dimension_stored_json_plaintext; + *brf = backends_format_dimension_stored_json_plaintext; #endif } @@ -344,9 +345,9 @@ void backend_set_json_variables(int *default_port, *brc = process_json_response; if (BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED) - *brf = format_dimension_collected_json_plaintext; + *brf = backends_format_dimension_collected_json_plaintext; else - *brf = format_dimension_stored_json_plaintext; + *brf = backends_format_dimension_stored_json_plaintext; } /** @@ -366,9 +367,9 @@ void backend_set_opentsdb_http_variables(int *default_port, *brc = process_opentsdb_response; if(BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED) - *brf = format_dimension_collected_opentsdb_http; + *brf = backends_format_dimension_collected_opentsdb_http; else - *brf = format_dimension_stored_opentsdb_http; + *brf = backends_format_dimension_stored_opentsdb_http; } @@ -389,9 +390,9 @@ void backend_set_opentsdb_telnet_variables(int *default_port, *brc = process_opentsdb_response; if(BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED) - *brf = format_dimension_collected_opentsdb_telnet; + *brf = backends_format_dimension_collected_opentsdb_telnet; else - *brf = format_dimension_stored_opentsdb_telnet; + *brf = backends_format_dimension_stored_opentsdb_telnet; } /** @@ -411,9 +412,9 @@ void backend_set_graphite_variables(int *default_port, *brc = process_graphite_response; if(BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED) - *brf = format_dimension_collected_graphite_plaintext; + *brf = backends_format_dimension_collected_graphite_plaintext; else - *brf = format_dimension_stored_graphite_plaintext; + *brf = backends_format_dimension_stored_graphite_plaintext; } /** @@ -439,7 +440,7 @@ BACKEND_TYPE backend_select_type(const char *type) { return BACKEND_TYPE_JSON; } else if (!strcmp(type, "prometheus_remote_write")) { - return BACKEND_TYPE_PROMETEUS; + return BACKEND_TYPE_PROMETHEUS_REMOTE_WRITE; } else if (!strcmp(type, "kinesis") || !strcmp(type, "kinesis:plaintext")) { return BACKEND_TYPE_KINESIS; @@ -528,6 +529,7 @@ void *backends_main(void *ptr) { // and prepare for sending data to our backend global_backend_options = backend_parse_data_source(source, global_backend_options); + global_backend_source = source; if(timeoutms < 1) { error("BACKEND: invalid timeout %ld ms given. Assuming %d ms.", timeoutms, global_backend_update_every * 2 * 1000); @@ -551,18 +553,18 @@ void *backends_main(void *ptr) { case BACKEND_TYPE_OPENTSDB_USING_HTTP: { #ifdef ENABLE_HTTPS if (!strcmp(type, "opentsdb:https")) { - security_start_ssl(NETDATA_SSL_CONTEXT_OPENTSDB); + security_start_ssl(NETDATA_SSL_CONTEXT_EXPORTING); } #endif backend_set_opentsdb_http_variables(&default_port,&backend_response_checker,&backend_request_formatter); break; } - case BACKEND_TYPE_PROMETEUS: { + case BACKEND_TYPE_PROMETHEUS_REMOTE_WRITE: { #if ENABLE_PROMETHEUS_REMOTE_WRITE do_prometheus_remote_write = 1; http_request_header = buffer_create(1); - init_write_request(); + backends_init_write_request(); #else error("BACKEND: Prometheus remote write support isn't compiled"); #endif // ENABLE_PROMETHEUS_REMOTE_WRITE @@ -578,7 +580,7 @@ void *backends_main(void *ptr) { goto cleanup; } - kinesis_init(destination, kinesis_auth_key_id, kinesis_secure_key, timeout.tv_sec * 1000 + timeout.tv_usec / 1000); + backends_kinesis_init(destination, kinesis_auth_key_id, kinesis_secure_key, timeout.tv_sec * 1000 + timeout.tv_usec / 1000); #else error("BACKEND: AWS Kinesis support isn't compiled"); #endif // HAVE_KINESIS @@ -596,7 +598,7 @@ void *backends_main(void *ptr) { goto cleanup; } - if(likely(!mongodb_init(mongodb_uri, mongodb_database, mongodb_collection, mongodb_default_socket_timeout))) { + if(likely(!backends_mongodb_init(mongodb_uri, mongodb_database, mongodb_collection, mongodb_default_socket_timeout))) { backend_set_mongodb_variables(&default_port, &backend_response_checker, &backend_request_formatter); do_mongodb = 1; } @@ -624,6 +626,9 @@ void *backends_main(void *ptr) { case BACKEND_TYPE_UNKNOWN: { break; } + default: { + break; + } } #if ENABLE_PROMETHEUS_REMOTE_WRITE @@ -694,6 +699,7 @@ void *backends_main(void *ptr) { // prepare the backend main loop info("BACKEND: configured ('%s' on '%s' sending '%s' data, every %d seconds, as host '%s', with prefix '%s')", type, destination, source, global_backend_update_every, hostname, global_backend_prefix); + send_statistics("BACKEND_START", "OK", type); usec_t step_ut = global_backend_update_every * USEC_PER_SEC; time_t after = now_realtime_sec(); @@ -721,7 +727,7 @@ void *backends_main(void *ptr) { #if ENABLE_PROMETHEUS_REMOTE_WRITE if(do_prometheus_remote_write) - clear_write_request(); + backends_clear_write_request(); #endif rrd_rdlock(); RRDHOST *host; @@ -752,7 +758,7 @@ void *backends_main(void *ptr) { #if ENABLE_PROMETHEUS_REMOTE_WRITE if(do_prometheus_remote_write) { - rrd_stats_remote_write_allmetrics_prometheus( + backends_rrd_stats_remote_write_allmetrics_prometheus( host , __hostname , global_backend_prefix @@ -857,18 +863,18 @@ void *backends_main(void *ptr) { char error_message[ERROR_LINE_MAX + 1] = ""; - debug(D_BACKEND, "BACKEND: kinesis_put_record(): dest = %s, id = %s, key = %s, stream = %s, partition_key = %s, \ + debug(D_BACKEND, "BACKEND: backends_kinesis_put_record(): dest = %s, id = %s, key = %s, stream = %s, partition_key = %s, \ buffer = %zu, record = %zu", destination, kinesis_auth_key_id, kinesis_secure_key, kinesis_stream_name, partition_key, buffer_len, record_len); - kinesis_put_record(kinesis_stream_name, partition_key, first_char, record_len); + backends_kinesis_put_record(kinesis_stream_name, partition_key, first_char, record_len); sent += record_len; chart_transmission_successes++; size_t sent_bytes = 0, lost_bytes = 0; - if(unlikely(kinesis_get_result(error_message, &sent_bytes, &lost_bytes))) { + if(unlikely(backends_kinesis_get_result(error_message, &sent_bytes, &lost_bytes))) { // oops! we couldn't send (all or some of the) data error("BACKEND: %s", error_message); error("BACKEND: failed to write data to database backend '%s'. Willing to write %zu bytes, wrote %zu bytes.", @@ -907,10 +913,10 @@ void *backends_main(void *ptr) { while(sent < buffer_len) { const char *first_char = buffer_tostring(b); - debug(D_BACKEND, "BACKEND: mongodb_insert(): uri = %s, database = %s, collection = %s, \ + debug(D_BACKEND, "BACKEND: backends_mongodb_insert(): uri = %s, database = %s, collection = %s, \ buffer = %zu", mongodb_uri, mongodb_database, mongodb_collection, buffer_len); - if(likely(!mongodb_insert((char *)first_char, (size_t)chart_buffered_metrics))) { + if(likely(!backends_mongodb_insert((char *)first_char, (size_t)chart_buffered_metrics))) { sent += buffer_len; chart_transmission_successes++; chart_receptions++; @@ -997,9 +1003,9 @@ void *backends_main(void *ptr) { sock = connect_to_one_of(destination, default_port, &timeout, &reconnects, NULL, 0); #ifdef ENABLE_HTTPS if(sock != -1) { - if(netdata_opentsdb_ctx) { + if(netdata_exporting_ctx) { if(!opentsdb_ssl.conn) { - opentsdb_ssl.conn = SSL_new(netdata_opentsdb_ctx); + opentsdb_ssl.conn = SSL_new(netdata_exporting_ctx); if(!opentsdb_ssl.conn) { error("Failed to allocate SSL structure %d.", sock); opentsdb_ssl.flags = NETDATA_SSL_NO_HANDSHAKE; @@ -1045,7 +1051,7 @@ void *backends_main(void *ptr) { #if ENABLE_PROMETHEUS_REMOTE_WRITE if(do_prometheus_remote_write) { - size_t data_size = get_write_request_size(); + size_t data_size = backends_get_write_request_size(); if(unlikely(!data_size)) { error("BACKEND: write request size is out of range"); @@ -1054,7 +1060,7 @@ void *backends_main(void *ptr) { buffer_flush(b); buffer_need_bytes(b, data_size); - if(unlikely(pack_write_request(b->buffer, &data_size))) { + if(unlikely(backends_pack_write_request(b->buffer, &data_size))) { error("BACKEND: cannot pack write request"); continue; } @@ -1069,7 +1075,7 @@ void *backends_main(void *ptr) { "Content-Length: %zu\r\n" "Content-Type: application/x-www-form-urlencoded\r\n\r\n", remote_write_path, - hostname, + destination, data_size ); @@ -1134,7 +1140,7 @@ void *backends_main(void *ptr) { if(do_prometheus_remote_write && failures) { (void) buffer_on_failures; failures = 0; - chart_lost_bytes = chart_buffered_bytes = get_write_request_size(); // estimated write request size + chart_lost_bytes = chart_buffered_bytes = backends_get_write_request_size(); // estimated write request size chart_data_lost_events++; chart_lost_metrics = chart_buffered_metrics; } else @@ -1196,7 +1202,7 @@ void *backends_main(void *ptr) { cleanup: #if HAVE_KINESIS if(do_kinesis) { - kinesis_shutdown(); + backends_kinesis_shutdown(); freez(kinesis_auth_key_id); freez(kinesis_secure_key); freez(kinesis_stream_name); @@ -1206,12 +1212,12 @@ cleanup: #if ENABLE_PROMETHEUS_REMOTE_WRITE buffer_free(http_request_header); if(do_prometheus_remote_write) - protocol_buffers_shutdown(); + backends_protocol_buffers_shutdown(); #endif #if HAVE_MONGOC if(do_mongodb) { - mongodb_cleanup(); + backends_mongodb_cleanup(); freez(mongodb_uri); freez(mongodb_database); freez(mongodb_collection); @@ -1225,7 +1231,7 @@ cleanup: buffer_free(response); #ifdef ENABLE_HTTPS - if(netdata_opentsdb_ctx) { + if(netdata_exporting_ctx) { if(opentsdb_ssl.conn) { SSL_free(opentsdb_ssl.conn); } |