diff options
Diffstat (limited to 'backends/backends.c')
-rw-r--r-- | backends/backends.c | 272 |
1 files changed, 198 insertions, 74 deletions
diff --git a/backends/backends.c b/backends/backends.c index da818c50b..0e7918916 100644 --- a/backends/backends.c +++ b/backends/backends.c @@ -62,9 +62,11 @@ calculated_number backend_calculate_value_from_stored_data( (void)host; // find the edges of the rrd database for this chart - time_t first_t = rrdset_first_entry_t(st); - time_t last_t = rrdset_last_entry_t(st); + time_t first_t = rd->state->query_ops.oldest_time(rd); + time_t last_t = rd->state->query_ops.latest_time(rd); time_t update_every = st->update_every; + struct rrddim_query_handle handle; + storage_number n; // step back a little, to make sure we have complete data collection // for all metrics @@ -105,6 +107,7 @@ calculated_number backend_calculate_value_from_stored_data( size_t counter = 0; calculated_number sum = 0; +/* long start_at_slot = rrdset_time2slot(st, before), stop_at_slot = rrdset_time2slot(st, after), slot, stop_now = 0; @@ -126,7 +129,21 @@ calculated_number backend_calculate_value_from_stored_data( counter++; } +*/ + for(rd->state->query_ops.init(rd, &handle, after, before) ; !rd->state->query_ops.is_finished(&handle) ; ) { + n = rd->state->query_ops.next_metric(&handle); + if(unlikely(!does_storage_number_exist(n))) { + // not collected + continue; + } + + calculated_number value = unpack_storage_number(n); + sum += value; + + counter++; + } + rd->state->query_ops.finalize(&handle); if(unlikely(!counter)) { debug(D_BACKEND, "BACKEND: %s.%s.%s: no values stored in database for range %lu to %lu", host->hostname, st->id, rd->id, @@ -238,6 +255,11 @@ void *backends_main(void *ptr) { int (*backend_request_formatter)(BUFFER *, const char *, RRDHOST *, const char *, RRDSET *, RRDDIM *, time_t, time_t, BACKEND_OPTIONS) = NULL; int (*backend_response_checker)(BUFFER *) = NULL; +#if HAVE_KINESIS + int do_kinesis = 0; + char *kinesis_auth_key_id = NULL, *kinesis_secure_key = NULL, *kinesis_stream_name = NULL; +#endif + // ------------------------------------------------------------------------ // collect configuration options @@ -263,7 +285,6 @@ void *backends_main(void *ptr) { charts_pattern = simple_pattern_create(config_get(CONFIG_SECTION_BACKEND, "send charts matching", "*"), NULL, SIMPLE_PATTERN_EXACT); hosts_pattern = simple_pattern_create(config_get(CONFIG_SECTION_BACKEND, "send hosts matching", "localhost *"), NULL, SIMPLE_PATTERN_EXACT); - // ------------------------------------------------------------------------ // validate configuration options // and prepare for sending data to our backend @@ -316,6 +337,26 @@ void *backends_main(void *ptr) { backend_request_formatter = format_dimension_stored_json_plaintext; } +#if HAVE_KINESIS + else if (!strcmp(type, "kinesis") || !strcmp(type, "kinesis:plaintext")) { + + do_kinesis = 1; + + if(unlikely(read_kinesis_conf(netdata_configured_user_config_dir, &kinesis_auth_key_id, &kinesis_secure_key, &kinesis_stream_name))) { + error("BACKEND: kinesis backend type is set but cannot read its configuration from %s/aws_kinesis.conf", netdata_configured_user_config_dir); + goto cleanup; + } + + kinesis_init(destination, kinesis_auth_key_id, kinesis_secure_key, timeout.tv_sec * 1000 + timeout.tv_usec / 1000); + + backend_response_checker = process_json_response; + if (BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED) + backend_request_formatter = format_dimension_collected_json_plaintext; + else + backend_request_formatter = format_dimension_stored_json_plaintext; + + } +#endif /* HAVE_KINESIS */ else { error("BACKEND: Unknown backend type '%s'", type); goto cleanup; @@ -481,6 +522,7 @@ void *backends_main(void *ptr) { chart_sent_bytes = chart_sent_metrics = chart_lost_metrics = + chart_receptions = chart_transmission_successes = chart_transmission_failures = chart_data_lost_events = @@ -497,104 +539,177 @@ void *backends_main(void *ptr) { // to add incrementally data to buffer after = before; - // ------------------------------------------------------------------------ - // if we are connected, receive a response, without blocking +#if HAVE_KINESIS + if(do_kinesis) { + unsigned long long partition_key_seq = 0; - if(likely(sock != -1)) { - errno = 0; + size_t buffer_len = buffer_strlen(b); + size_t sent = 0; - // loop through to collect all data - while(sock != -1 && errno != EWOULDBLOCK) { - buffer_need_bytes(response, 4096); + while(sent < buffer_len) { + char partition_key[KINESIS_PARTITION_KEY_MAX + 1]; + snprintf(partition_key, KINESIS_PARTITION_KEY_MAX, "netdata_%llu", partition_key_seq++); + size_t partition_key_len = strnlen(partition_key, KINESIS_PARTITION_KEY_MAX); - ssize_t r = recv(sock, &response->buffer[response->len], response->size - response->len, MSG_DONTWAIT); - if(likely(r > 0)) { - // we received some data - response->len += r; - chart_received_bytes += r; - chart_receptions++; + const char *first_char = buffer_tostring(b) + sent; + + size_t record_len = 0; + + // split buffer into chunks of maximum allowed size + if(buffer_len - sent < KINESIS_RECORD_MAX - partition_key_len) { + record_len = buffer_len - sent; } - else if(r == 0) { - error("BACKEND: '%s' closed the socket", destination); - close(sock); - sock = -1; + else { + record_len = KINESIS_RECORD_MAX - partition_key_len; + while(*(first_char + record_len) != '\n' && record_len) record_len--; + } + + char error_message[ERROR_LINE_MAX + 1] = ""; + + debug(D_BACKEND, "BACKEND: kinesis_put_record(): dest = %s, id = %s, key = %s, stream = %s, partition_key = %s, \ + buffer = %zu, record = %zu", destination, kinesis_auth_key_id, kinesis_secure_key, kinesis_stream_name, + partition_key, buffer_len, record_len); + + kinesis_put_record(kinesis_stream_name, partition_key, first_char, record_len); + + sent += record_len; + chart_transmission_successes++; + + size_t sent_bytes = 0, lost_bytes = 0; + + if(unlikely(kinesis_get_result(error_message, &sent_bytes, &lost_bytes))) { + // oops! we couldn't send (all or some of the) data + error("BACKEND: %s", error_message); + error("BACKEND: failed to write data to database backend '%s'. Willing to write %zu bytes, wrote %zu bytes.", + destination, sent_bytes, sent_bytes - lost_bytes); + + chart_transmission_failures++; + chart_data_lost_events++; + chart_lost_bytes += lost_bytes; + + // estimate the number of lost metrics + chart_lost_metrics += (collected_number)(chart_buffered_metrics + * (buffer_len && (lost_bytes > buffer_len) ? (double)lost_bytes / buffer_len : 1)); + + break; } else { - // failed to receive data - if(errno != EAGAIN && errno != EWOULDBLOCK) { - error("BACKEND: cannot receive data from backend '%s'.", destination); - } + chart_receptions++; } + + if(unlikely(netdata_exit)) break; } - // if we received data, process them - if(buffer_strlen(response)) - backend_response_checker(response); + chart_sent_bytes += sent; + if(likely(sent == buffer_len)) + chart_sent_metrics = chart_buffered_metrics; + + buffer_flush(b); } + else { +#else + { +#endif /* HAVE_KINESIS */ + + // ------------------------------------------------------------------------ + // if we are connected, receive a response, without blocking + + if(likely(sock != -1)) { + errno = 0; + + // loop through to collect all data + while(sock != -1 && errno != EWOULDBLOCK) { + buffer_need_bytes(response, 4096); + + ssize_t r = recv(sock, &response->buffer[response->len], response->size - response->len, MSG_DONTWAIT); + if(likely(r > 0)) { + // we received some data + response->len += r; + chart_received_bytes += r; + chart_receptions++; + } + else if(r == 0) { + error("BACKEND: '%s' closed the socket", destination); + close(sock); + sock = -1; + } + else { + // failed to receive data + if(errno != EAGAIN && errno != EWOULDBLOCK) { + error("BACKEND: cannot receive data from backend '%s'.", destination); + } + } + } - // ------------------------------------------------------------------------ - // if we are not connected, connect to a backend server + // if we received data, process them + if(buffer_strlen(response)) + backend_response_checker(response); + } - if(unlikely(sock == -1)) { - // usec_t start_ut = now_monotonic_usec(); - size_t reconnects = 0; + // ------------------------------------------------------------------------ + // if we are not connected, connect to a backend server - sock = connect_to_one_of(destination, default_port, &timeout, &reconnects, NULL, 0); + if(unlikely(sock == -1)) { + // usec_t start_ut = now_monotonic_usec(); + size_t reconnects = 0; - chart_backend_reconnects += reconnects; - // chart_backend_latency += now_monotonic_usec() - start_ut; - } + sock = connect_to_one_of(destination, default_port, &timeout, &reconnects, NULL, 0); - if(unlikely(netdata_exit)) break; + chart_backend_reconnects += reconnects; + // chart_backend_latency += now_monotonic_usec() - start_ut; + } - // ------------------------------------------------------------------------ - // if we are connected, send our buffer to the backend server - - if(likely(sock != -1)) { - size_t len = buffer_strlen(b); - // usec_t start_ut = now_monotonic_usec(); - int flags = 0; -#ifdef MSG_NOSIGNAL - flags += MSG_NOSIGNAL; -#endif + if(unlikely(netdata_exit)) break; - ssize_t written = send(sock, buffer_tostring(b), len, flags); - // chart_backend_latency += now_monotonic_usec() - start_ut; - if(written != -1 && (size_t)written == len) { - // we sent the data successfully - chart_transmission_successes++; - chart_sent_bytes += written; - chart_sent_metrics = chart_buffered_metrics; + // ------------------------------------------------------------------------ + // if we are connected, send our buffer to the backend server + + if(likely(sock != -1)) { + size_t len = buffer_strlen(b); + // usec_t start_ut = now_monotonic_usec(); + int flags = 0; + #ifdef MSG_NOSIGNAL + flags += MSG_NOSIGNAL; + #endif + + ssize_t written = send(sock, buffer_tostring(b), len, flags); + // chart_backend_latency += now_monotonic_usec() - start_ut; + if(written != -1 && (size_t)written == len) { + // we sent the data successfully + chart_transmission_successes++; + chart_sent_bytes += written; + chart_sent_metrics = chart_buffered_metrics; - // reset the failures count - failures = 0; + // reset the failures count + failures = 0; - // empty the buffer - buffer_flush(b); + // empty the buffer + buffer_flush(b); + } + else { + // oops! we couldn't send (all or some of the) data + error("BACKEND: failed to write data to database backend '%s'. Willing to write %zu bytes, wrote %zd bytes. Will re-connect.", destination, len, written); + chart_transmission_failures++; + + if(written != -1) + chart_sent_bytes += written; + + // increment the counter we check for data loss + failures++; + + // close the socket - we will re-open it next time + close(sock); + sock = -1; + } } else { - // oops! we couldn't send (all or some of the) data - error("BACKEND: failed to write data to database backend '%s'. Willing to write %zu bytes, wrote %zd bytes. Will re-connect.", destination, len, written); + error("BACKEND: failed to update database backend '%s'", destination); chart_transmission_failures++; - if(written != -1) - chart_sent_bytes += written; - // increment the counter we check for data loss failures++; - - // close the socket - we will re-open it next time - close(sock); - sock = -1; } } - else { - error("BACKEND: failed to update database backend '%s'", destination); - chart_transmission_failures++; - - // increment the counter we check for data loss - failures++; - } if(failures > buffer_on_failures) { // too bad! we are going to lose data @@ -651,6 +766,15 @@ void *backends_main(void *ptr) { } cleanup: +#if HAVE_KINESIS + if(do_kinesis) { + kinesis_shutdown(); + freez(kinesis_auth_key_id); + freez(kinesis_secure_key); + freez(kinesis_stream_name); + } +#endif + if(sock != -1) close(sock); |