summaryrefslogtreecommitdiffstats
path: root/backends/backends.c
diff options
context:
space:
mode:
Diffstat (limited to 'backends/backends.c')
-rw-r--r--backends/backends.c272
1 files changed, 198 insertions, 74 deletions
diff --git a/backends/backends.c b/backends/backends.c
index da818c50b..0e7918916 100644
--- a/backends/backends.c
+++ b/backends/backends.c
@@ -62,9 +62,11 @@ calculated_number backend_calculate_value_from_stored_data(
(void)host;
// find the edges of the rrd database for this chart
- time_t first_t = rrdset_first_entry_t(st);
- time_t last_t = rrdset_last_entry_t(st);
+ time_t first_t = rd->state->query_ops.oldest_time(rd);
+ time_t last_t = rd->state->query_ops.latest_time(rd);
time_t update_every = st->update_every;
+ struct rrddim_query_handle handle;
+ storage_number n;
// step back a little, to make sure we have complete data collection
// for all metrics
@@ -105,6 +107,7 @@ calculated_number backend_calculate_value_from_stored_data(
size_t counter = 0;
calculated_number sum = 0;
+/*
long start_at_slot = rrdset_time2slot(st, before),
stop_at_slot = rrdset_time2slot(st, after),
slot, stop_now = 0;
@@ -126,7 +129,21 @@ calculated_number backend_calculate_value_from_stored_data(
counter++;
}
+*/
+ for(rd->state->query_ops.init(rd, &handle, after, before) ; !rd->state->query_ops.is_finished(&handle) ; ) {
+ n = rd->state->query_ops.next_metric(&handle);
+ if(unlikely(!does_storage_number_exist(n))) {
+ // not collected
+ continue;
+ }
+
+ calculated_number value = unpack_storage_number(n);
+ sum += value;
+
+ counter++;
+ }
+ rd->state->query_ops.finalize(&handle);
if(unlikely(!counter)) {
debug(D_BACKEND, "BACKEND: %s.%s.%s: no values stored in database for range %lu to %lu",
host->hostname, st->id, rd->id,
@@ -238,6 +255,11 @@ void *backends_main(void *ptr) {
int (*backend_request_formatter)(BUFFER *, const char *, RRDHOST *, const char *, RRDSET *, RRDDIM *, time_t, time_t, BACKEND_OPTIONS) = NULL;
int (*backend_response_checker)(BUFFER *) = NULL;
+#if HAVE_KINESIS
+ int do_kinesis = 0;
+ char *kinesis_auth_key_id = NULL, *kinesis_secure_key = NULL, *kinesis_stream_name = NULL;
+#endif
+
// ------------------------------------------------------------------------
// collect configuration options
@@ -263,7 +285,6 @@ void *backends_main(void *ptr) {
charts_pattern = simple_pattern_create(config_get(CONFIG_SECTION_BACKEND, "send charts matching", "*"), NULL, SIMPLE_PATTERN_EXACT);
hosts_pattern = simple_pattern_create(config_get(CONFIG_SECTION_BACKEND, "send hosts matching", "localhost *"), NULL, SIMPLE_PATTERN_EXACT);
-
// ------------------------------------------------------------------------
// validate configuration options
// and prepare for sending data to our backend
@@ -316,6 +337,26 @@ void *backends_main(void *ptr) {
backend_request_formatter = format_dimension_stored_json_plaintext;
}
+#if HAVE_KINESIS
+ else if (!strcmp(type, "kinesis") || !strcmp(type, "kinesis:plaintext")) {
+
+ do_kinesis = 1;
+
+ if(unlikely(read_kinesis_conf(netdata_configured_user_config_dir, &kinesis_auth_key_id, &kinesis_secure_key, &kinesis_stream_name))) {
+ error("BACKEND: kinesis backend type is set but cannot read its configuration from %s/aws_kinesis.conf", netdata_configured_user_config_dir);
+ goto cleanup;
+ }
+
+ kinesis_init(destination, kinesis_auth_key_id, kinesis_secure_key, timeout.tv_sec * 1000 + timeout.tv_usec / 1000);
+
+ backend_response_checker = process_json_response;
+ if (BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED)
+ backend_request_formatter = format_dimension_collected_json_plaintext;
+ else
+ backend_request_formatter = format_dimension_stored_json_plaintext;
+
+ }
+#endif /* HAVE_KINESIS */
else {
error("BACKEND: Unknown backend type '%s'", type);
goto cleanup;
@@ -481,6 +522,7 @@ void *backends_main(void *ptr) {
chart_sent_bytes =
chart_sent_metrics =
chart_lost_metrics =
+ chart_receptions =
chart_transmission_successes =
chart_transmission_failures =
chart_data_lost_events =
@@ -497,104 +539,177 @@ void *backends_main(void *ptr) {
// to add incrementally data to buffer
after = before;
- // ------------------------------------------------------------------------
- // if we are connected, receive a response, without blocking
+#if HAVE_KINESIS
+ if(do_kinesis) {
+ unsigned long long partition_key_seq = 0;
- if(likely(sock != -1)) {
- errno = 0;
+ size_t buffer_len = buffer_strlen(b);
+ size_t sent = 0;
- // loop through to collect all data
- while(sock != -1 && errno != EWOULDBLOCK) {
- buffer_need_bytes(response, 4096);
+ while(sent < buffer_len) {
+ char partition_key[KINESIS_PARTITION_KEY_MAX + 1];
+ snprintf(partition_key, KINESIS_PARTITION_KEY_MAX, "netdata_%llu", partition_key_seq++);
+ size_t partition_key_len = strnlen(partition_key, KINESIS_PARTITION_KEY_MAX);
- ssize_t r = recv(sock, &response->buffer[response->len], response->size - response->len, MSG_DONTWAIT);
- if(likely(r > 0)) {
- // we received some data
- response->len += r;
- chart_received_bytes += r;
- chart_receptions++;
+ const char *first_char = buffer_tostring(b) + sent;
+
+ size_t record_len = 0;
+
+ // split buffer into chunks of maximum allowed size
+ if(buffer_len - sent < KINESIS_RECORD_MAX - partition_key_len) {
+ record_len = buffer_len - sent;
}
- else if(r == 0) {
- error("BACKEND: '%s' closed the socket", destination);
- close(sock);
- sock = -1;
+ else {
+ record_len = KINESIS_RECORD_MAX - partition_key_len;
+ while(*(first_char + record_len) != '\n' && record_len) record_len--;
+ }
+
+ char error_message[ERROR_LINE_MAX + 1] = "";
+
+ debug(D_BACKEND, "BACKEND: kinesis_put_record(): dest = %s, id = %s, key = %s, stream = %s, partition_key = %s, \
+ buffer = %zu, record = %zu", destination, kinesis_auth_key_id, kinesis_secure_key, kinesis_stream_name,
+ partition_key, buffer_len, record_len);
+
+ kinesis_put_record(kinesis_stream_name, partition_key, first_char, record_len);
+
+ sent += record_len;
+ chart_transmission_successes++;
+
+ size_t sent_bytes = 0, lost_bytes = 0;
+
+ if(unlikely(kinesis_get_result(error_message, &sent_bytes, &lost_bytes))) {
+ // oops! we couldn't send (all or some of the) data
+ error("BACKEND: %s", error_message);
+ error("BACKEND: failed to write data to database backend '%s'. Willing to write %zu bytes, wrote %zu bytes.",
+ destination, sent_bytes, sent_bytes - lost_bytes);
+
+ chart_transmission_failures++;
+ chart_data_lost_events++;
+ chart_lost_bytes += lost_bytes;
+
+ // estimate the number of lost metrics
+ chart_lost_metrics += (collected_number)(chart_buffered_metrics
+ * (buffer_len && (lost_bytes > buffer_len) ? (double)lost_bytes / buffer_len : 1));
+
+ break;
}
else {
- // failed to receive data
- if(errno != EAGAIN && errno != EWOULDBLOCK) {
- error("BACKEND: cannot receive data from backend '%s'.", destination);
- }
+ chart_receptions++;
}
+
+ if(unlikely(netdata_exit)) break;
}
- // if we received data, process them
- if(buffer_strlen(response))
- backend_response_checker(response);
+ chart_sent_bytes += sent;
+ if(likely(sent == buffer_len))
+ chart_sent_metrics = chart_buffered_metrics;
+
+ buffer_flush(b);
}
+ else {
+#else
+ {
+#endif /* HAVE_KINESIS */
+
+ // ------------------------------------------------------------------------
+ // if we are connected, receive a response, without blocking
+
+ if(likely(sock != -1)) {
+ errno = 0;
+
+ // loop through to collect all data
+ while(sock != -1 && errno != EWOULDBLOCK) {
+ buffer_need_bytes(response, 4096);
+
+ ssize_t r = recv(sock, &response->buffer[response->len], response->size - response->len, MSG_DONTWAIT);
+ if(likely(r > 0)) {
+ // we received some data
+ response->len += r;
+ chart_received_bytes += r;
+ chart_receptions++;
+ }
+ else if(r == 0) {
+ error("BACKEND: '%s' closed the socket", destination);
+ close(sock);
+ sock = -1;
+ }
+ else {
+ // failed to receive data
+ if(errno != EAGAIN && errno != EWOULDBLOCK) {
+ error("BACKEND: cannot receive data from backend '%s'.", destination);
+ }
+ }
+ }
- // ------------------------------------------------------------------------
- // if we are not connected, connect to a backend server
+ // if we received data, process them
+ if(buffer_strlen(response))
+ backend_response_checker(response);
+ }
- if(unlikely(sock == -1)) {
- // usec_t start_ut = now_monotonic_usec();
- size_t reconnects = 0;
+ // ------------------------------------------------------------------------
+ // if we are not connected, connect to a backend server
- sock = connect_to_one_of(destination, default_port, &timeout, &reconnects, NULL, 0);
+ if(unlikely(sock == -1)) {
+ // usec_t start_ut = now_monotonic_usec();
+ size_t reconnects = 0;
- chart_backend_reconnects += reconnects;
- // chart_backend_latency += now_monotonic_usec() - start_ut;
- }
+ sock = connect_to_one_of(destination, default_port, &timeout, &reconnects, NULL, 0);
- if(unlikely(netdata_exit)) break;
+ chart_backend_reconnects += reconnects;
+ // chart_backend_latency += now_monotonic_usec() - start_ut;
+ }
- // ------------------------------------------------------------------------
- // if we are connected, send our buffer to the backend server
-
- if(likely(sock != -1)) {
- size_t len = buffer_strlen(b);
- // usec_t start_ut = now_monotonic_usec();
- int flags = 0;
-#ifdef MSG_NOSIGNAL
- flags += MSG_NOSIGNAL;
-#endif
+ if(unlikely(netdata_exit)) break;
- ssize_t written = send(sock, buffer_tostring(b), len, flags);
- // chart_backend_latency += now_monotonic_usec() - start_ut;
- if(written != -1 && (size_t)written == len) {
- // we sent the data successfully
- chart_transmission_successes++;
- chart_sent_bytes += written;
- chart_sent_metrics = chart_buffered_metrics;
+ // ------------------------------------------------------------------------
+ // if we are connected, send our buffer to the backend server
+
+ if(likely(sock != -1)) {
+ size_t len = buffer_strlen(b);
+ // usec_t start_ut = now_monotonic_usec();
+ int flags = 0;
+ #ifdef MSG_NOSIGNAL
+ flags += MSG_NOSIGNAL;
+ #endif
+
+ ssize_t written = send(sock, buffer_tostring(b), len, flags);
+ // chart_backend_latency += now_monotonic_usec() - start_ut;
+ if(written != -1 && (size_t)written == len) {
+ // we sent the data successfully
+ chart_transmission_successes++;
+ chart_sent_bytes += written;
+ chart_sent_metrics = chart_buffered_metrics;
- // reset the failures count
- failures = 0;
+ // reset the failures count
+ failures = 0;
- // empty the buffer
- buffer_flush(b);
+ // empty the buffer
+ buffer_flush(b);
+ }
+ else {
+ // oops! we couldn't send (all or some of the) data
+ error("BACKEND: failed to write data to database backend '%s'. Willing to write %zu bytes, wrote %zd bytes. Will re-connect.", destination, len, written);
+ chart_transmission_failures++;
+
+ if(written != -1)
+ chart_sent_bytes += written;
+
+ // increment the counter we check for data loss
+ failures++;
+
+ // close the socket - we will re-open it next time
+ close(sock);
+ sock = -1;
+ }
}
else {
- // oops! we couldn't send (all or some of the) data
- error("BACKEND: failed to write data to database backend '%s'. Willing to write %zu bytes, wrote %zd bytes. Will re-connect.", destination, len, written);
+ error("BACKEND: failed to update database backend '%s'", destination);
chart_transmission_failures++;
- if(written != -1)
- chart_sent_bytes += written;
-
// increment the counter we check for data loss
failures++;
-
- // close the socket - we will re-open it next time
- close(sock);
- sock = -1;
}
}
- else {
- error("BACKEND: failed to update database backend '%s'", destination);
- chart_transmission_failures++;
-
- // increment the counter we check for data loss
- failures++;
- }
if(failures > buffer_on_failures) {
// too bad! we are going to lose data
@@ -651,6 +766,15 @@ void *backends_main(void *ptr) {
}
cleanup:
+#if HAVE_KINESIS
+ if(do_kinesis) {
+ kinesis_shutdown();
+ freez(kinesis_auth_key_id);
+ freez(kinesis_secure_key);
+ freez(kinesis_stream_name);
+ }
+#endif
+
if(sock != -1)
close(sock);