diff options
Diffstat (limited to '')
-rw-r--r-- | exporting/process_data.c | 445 |
1 files changed, 445 insertions, 0 deletions
diff --git a/exporting/process_data.c b/exporting/process_data.c new file mode 100644 index 00000000..c7792fa5 --- /dev/null +++ b/exporting/process_data.c @@ -0,0 +1,445 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "exporting_engine.h" + +/** + * Normalize chart and dimension names + * + * Substitute '_' for any special character except '.'. + * + * @param dst where to copy name to. + * @param src where to copy name from. + * @param max_len the maximum size of copied name. + * @return Returns the size of the copied name. + */ +size_t exporting_name_copy(char *dst, const char *src, size_t max_len) +{ + size_t n; + + for (n = 0; *src && n < max_len; dst++, src++, n++) { + char c = *src; + + if (c != '.' && !isalnum(c)) + *dst = '_'; + else + *dst = c; + } + *dst = '\0'; + + return n; +} + +/** + * Mark scheduled instances + * + * Any instance can have its own update interval. On every exporting engine update only those instances are picked, + * which are scheduled for the update. + * + * @param engine an engine data structure. + * @return Returns 1 if there are instances to process + */ +int mark_scheduled_instances(struct engine *engine) +{ + int instances_were_scheduled = 0; + + for (struct instance *instance = engine->instance_root; instance; instance = instance->next) { + if (!instance->disabled && (engine->now % instance->config.update_every >= + instance->config.update_every - localhost->rrd_update_every)) { + instance->scheduled = 1; + instances_were_scheduled = 1; + instance->before = engine->now; + } + } + + return instances_were_scheduled; +} + +/** + * Calculate the SUM or AVERAGE of a dimension, for any timeframe + * + * May return NAN if the database does not have any value in the give timeframe. + * + * @param instance an instance data structure. + * @param rd a dimension(metric) in the Netdata database. + * @param last_timestamp the timestamp that should be reported to the exporting connector instance. + * @return Returns the value, calculated over the given period. + */ +NETDATA_DOUBLE exporting_calculate_value_from_stored_data( + struct instance *instance, + RRDDIM *rd, + time_t *last_timestamp) +{ + RRDSET *st = rd->rrdset; +#ifdef NETDATA_INTERNAL_CHECKS + RRDHOST *host = st->rrdhost; +#endif + time_t after = instance->after; + time_t before = instance->before; + + // find the edges of the rrd database for this chart + time_t first_t = storage_engine_oldest_time_s(rd->tiers[0].backend, rd->tiers[0].db_metric_handle); + time_t last_t = storage_engine_latest_time_s(rd->tiers[0].backend, rd->tiers[0].db_metric_handle); + time_t update_every = st->update_every; + struct storage_engine_query_handle handle; + + // step back a little, to make sure we have complete data collection + // for all metrics + after -= update_every * 2; + before -= update_every * 2; + + // align the time-frame + after = after - (after % update_every); + before = before - (before % update_every); + + // for before, loose another iteration + // the latest point will be reported the next time + before -= update_every; + + if (unlikely(after > before)) + // this can happen when update_every > before - after + after = before; + + if (unlikely(after < first_t)) + after = first_t; + + if (unlikely(before > last_t)) + before = last_t; + + if (unlikely(before < first_t || after > last_t)) { + // the chart has not been updated in the wanted timeframe + netdata_log_debug( + D_EXPORTING, + "EXPORTING: %s.%s.%s: aligned timeframe %lu to %lu is outside the chart's database range %lu to %lu", + rrdhost_hostname(host), + rrdset_id(st), + rrddim_id(rd), + (unsigned long)after, + (unsigned long)before, + (unsigned long)first_t, + (unsigned long)last_t); + return NAN; + } + + *last_timestamp = before; + + size_t points_read = 0; + size_t counter = 0; + NETDATA_DOUBLE sum = 0; + + for (storage_engine_query_init(rd->tiers[0].backend, rd->tiers[0].db_metric_handle, &handle, after, before, STORAGE_PRIORITY_SYNCHRONOUS); !storage_engine_query_is_finished(&handle);) { + STORAGE_POINT sp = storage_engine_query_next_metric(&handle); + points_read++; + + if (unlikely(storage_point_is_gap(sp))) { + // not collected + continue; + } + + sum += sp.sum; + counter += sp.count; + } + storage_engine_query_finalize(&handle); + global_statistics_exporters_query_completed(points_read); + + if (unlikely(!counter)) { + netdata_log_debug( + D_EXPORTING, + "EXPORTING: %s.%s.%s: no values stored in database for range %lu to %lu", + rrdhost_hostname(host), + rrdset_id(st), + rrddim_id(rd), + (unsigned long)after, + (unsigned long)before); + return NAN; + } + + if (unlikely(EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_SUM)) + return sum; + + return sum / (NETDATA_DOUBLE)counter; +} + +/** + * Start batch formatting for every connector instance's buffer + * + * @param engine an engine data structure. + */ +void start_batch_formatting(struct engine *engine) +{ + for (struct instance *instance = engine->instance_root; instance; instance = instance->next) { + if (instance->scheduled) { + uv_mutex_lock(&instance->mutex); + if (instance->start_batch_formatting && instance->start_batch_formatting(instance) != 0) { + netdata_log_error("EXPORTING: cannot start batch formatting for %s", instance->config.name); + disable_instance(instance); + } + } + } +} + +/** + * Start host formatting for every connector instance's buffer + * + * @param engine an engine data structure. + * @param host a data collecting host. + */ +void start_host_formatting(struct engine *engine, RRDHOST *host) +{ + for (struct instance *instance = engine->instance_root; instance; instance = instance->next) { + if (instance->scheduled) { + if (rrdhost_is_exportable(instance, host)) { + if (instance->start_host_formatting && instance->start_host_formatting(instance, host) != 0) { + netdata_log_error("EXPORTING: cannot start host formatting for %s", instance->config.name); + disable_instance(instance); + } + } else { + instance->skip_host = 1; + } + } + } +} + +/** + * Start chart formatting for every connector instance's buffer + * + * @param engine an engine data structure. + * @param st a chart. + */ +void start_chart_formatting(struct engine *engine, RRDSET *st) +{ + for (struct instance *instance = engine->instance_root; instance; instance = instance->next) { + if (instance->scheduled && !instance->skip_host) { + if (rrdset_is_exportable(instance, st)) { + if (instance->start_chart_formatting && instance->start_chart_formatting(instance, st) != 0) { + netdata_log_error("EXPORTING: cannot start chart formatting for %s", instance->config.name); + disable_instance(instance); + } + } else { + instance->skip_chart = 1; + } + } + } +} + +/** + * Format metric for every connector instance's buffer + * + * @param engine an engine data structure. + * @param rd a dimension(metric) in the Netdata database. + */ +void metric_formatting(struct engine *engine, RRDDIM *rd) +{ + for (struct instance *instance = engine->instance_root; instance; instance = instance->next) { + if (instance->scheduled && !instance->skip_host && !instance->skip_chart) { + if (instance->metric_formatting && instance->metric_formatting(instance, rd) != 0) { + netdata_log_error("EXPORTING: cannot format metric for %s", instance->config.name); + disable_instance(instance); + continue; + } + instance->stats.buffered_metrics++; + } + } +} + +/** + * End chart formatting for every connector instance's buffer + * + * @param engine an engine data structure. + * @param a chart. + */ +void end_chart_formatting(struct engine *engine, RRDSET *st) +{ + for (struct instance *instance = engine->instance_root; instance; instance = instance->next) { + if (instance->scheduled && !instance->skip_host && !instance->skip_chart) { + if (instance->end_chart_formatting && instance->end_chart_formatting(instance, st) != 0) { + netdata_log_error("EXPORTING: cannot end chart formatting for %s", instance->config.name); + disable_instance(instance); + continue; + } + } + instance->skip_chart = 0; + } +} + +/** + * Format variables for every connector instance's buffer + * + * @param engine an engine data structure. + * @param host a data collecting host. + */ +void variables_formatting(struct engine *engine, RRDHOST *host) +{ + for (struct instance *instance = engine->instance_root; instance; instance = instance->next) { + if (instance->scheduled && !instance->skip_host && should_send_variables(instance)) { + if (instance->variables_formatting && instance->variables_formatting(instance, host) != 0){ + netdata_log_error("EXPORTING: cannot format variables for %s", instance->config.name); + disable_instance(instance); + continue; + } + // sum all variables as one metrics + instance->stats.buffered_metrics++; + } + } +} + +/** + * End host formatting for every connector instance's buffer + * + * @param engine an engine data structure. + * @param host a data collecting host. + */ +void end_host_formatting(struct engine *engine, RRDHOST *host) +{ + for (struct instance *instance = engine->instance_root; instance; instance = instance->next) { + if (instance->scheduled && !instance->skip_host) { + if (instance->end_host_formatting && instance->end_host_formatting(instance, host) != 0) { + netdata_log_error("EXPORTING: cannot end host formatting for %s", instance->config.name); + disable_instance(instance); + continue; + } + } + instance->skip_host = 0; + } +} + +/** + * End batch formatting for every connector instance's buffer + * + * @param engine an engine data structure. + */ +void end_batch_formatting(struct engine *engine) +{ + for (struct instance *instance = engine->instance_root; instance; instance = instance->next) { + if (instance->scheduled) { + if (instance->end_batch_formatting && instance->end_batch_formatting(instance) != 0) { + netdata_log_error("EXPORTING: cannot end batch formatting for %s", instance->config.name); + disable_instance(instance); + continue; + } + uv_mutex_unlock(&instance->mutex); + instance->data_is_ready = 1; + uv_cond_signal(&instance->cond_var); + + instance->scheduled = 0; + instance->after = instance->before; + } + } +} + +/** + * Prepare buffers + * + * Walk through the Netdata database and fill buffers for every scheduled exporting connector instance according to + * configured rules. + * + * @param engine an engine data structure. + */ +void prepare_buffers(struct engine *engine) +{ + netdata_thread_disable_cancelability(); + start_batch_formatting(engine); + + rrd_rdlock(); + RRDHOST *host; + rrdhost_foreach_read(host) { + start_host_formatting(engine, host); + RRDSET *st; + rrdset_foreach_read(st, host) { + start_chart_formatting(engine, st); + + RRDDIM *rd; + rrddim_foreach_read(rd, st) + metric_formatting(engine, rd); + rrddim_foreach_done(rd); + + end_chart_formatting(engine, st); + } + rrdset_foreach_done(st); + variables_formatting(engine, host); + end_host_formatting(engine, host); + } + rrd_unlock(); + netdata_thread_enable_cancelability(); + + end_batch_formatting(engine); +} + +/** + * Flush a buffer with host labels + * + * @param instance an instance data structure. + * @param host a data collecting host. + * @return Always returns 0. + */ +int flush_host_labels(struct instance *instance, RRDHOST *host) +{ + (void)host; + + if (instance->labels_buffer) + buffer_flush(instance->labels_buffer); + + return 0; +} + +/** + * End a batch for a simple connector + * + * @param instance an instance data structure. + * @return Returns 0 on success, 1 on failure. + */ +int simple_connector_end_batch(struct instance *instance) +{ + struct simple_connector_data *simple_connector_data = + (struct simple_connector_data *)instance->connector_specific_data; + struct stats *stats = &instance->stats; + + BUFFER *instance_buffer = (BUFFER *)instance->buffer; + struct simple_connector_buffer *last_buffer = simple_connector_data->last_buffer; + + if (!last_buffer->buffer) { + last_buffer->buffer = buffer_create(0, &netdata_buffers_statistics.buffers_exporters); + } + + if (last_buffer->used) { + // ring buffer is full, reuse the oldest element + simple_connector_data->first_buffer = simple_connector_data->first_buffer->next; + + stats->data_lost_events++; + stats->lost_metrics += last_buffer->buffered_metrics; + stats->lost_bytes += last_buffer->buffered_bytes; + } + + // swap buffers + BUFFER *tmp_buffer = last_buffer->buffer; + last_buffer->buffer = instance_buffer; + instance->buffer = instance_buffer = tmp_buffer; + + buffer_flush(instance_buffer); + + if (last_buffer->header) + buffer_flush(last_buffer->header); + else + last_buffer->header = buffer_create(0, &netdata_buffers_statistics.buffers_exporters); + + if (instance->prepare_header) + instance->prepare_header(instance); + + // The stats->buffered_metrics is used in the simple connector batch formatting as a variable for the number + // of metrics, added in the current iteration, so we are clearing it here. We will use the + // simple_connector_data->total_buffered_metrics in the worker to show the statistics. + size_t buffered_metrics = (size_t)stats->buffered_metrics; + stats->buffered_metrics = 0; + + size_t buffered_bytes = buffer_strlen(last_buffer->buffer); + + last_buffer->buffered_metrics = buffered_metrics; + last_buffer->buffered_bytes = buffered_bytes; + last_buffer->used++; + + simple_connector_data->total_buffered_metrics += buffered_metrics; + stats->buffered_bytes += buffered_bytes; + + simple_connector_data->last_buffer = simple_connector_data->last_buffer->next; + + return 0; +} |