diff options
Diffstat (limited to 'exporting/process_data.c')
-rw-r--r-- | exporting/process_data.c | 426 |
1 files changed, 426 insertions, 0 deletions
diff --git a/exporting/process_data.c b/exporting/process_data.c new file mode 100644 index 00000000..5e11b394 --- /dev/null +++ b/exporting/process_data.c @@ -0,0 +1,426 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "exporting_engine.h" + +/** + * Normalize chart and dimension names + * + * Substitute '_' for any special character except '.'. + * + * @param dst where to copy name to. + * @param src where to copy name from. + * @param max_len the maximum size of copied name. + * @return Returns the size of the copied name. + */ +size_t exporting_name_copy(char *dst, const char *src, size_t max_len) +{ + size_t n; + + for (n = 0; *src && n < max_len; dst++, src++, n++) { + char c = *src; + + if (c != '.' && !isalnum(c)) + *dst = '_'; + else + *dst = c; + } + *dst = '\0'; + + return n; +} + +/** + * Mark scheduled instances + * + * Any instance can have its own update interval. On every exporting engine update only those instances are picked, + * which are scheduled for the update. + * + * @param engine an engine data structure. + * @return Returns 1 if there are instances to process + */ +int mark_scheduled_instances(struct engine *engine) +{ + int instances_were_scheduled = 0; + + for (struct instance *instance = engine->instance_root; instance; instance = instance->next) { + if (!instance->disabled && (engine->now % instance->config.update_every >= + instance->config.update_every - localhost->rrd_update_every)) { + instance->scheduled = 1; + instances_were_scheduled = 1; + instance->before = engine->now; + } + } + + return instances_were_scheduled; +} + +/** + * Calculate the SUM or AVERAGE of a dimension, for any timeframe + * + * May return NAN if the database does not have any value in the give timeframe. + * + * @param instance an instance data structure. + * @param rd a dimension(metric) in the Netdata database. + * @param last_timestamp the timestamp that should be reported to the exporting connector instance. + * @return Returns the value, calculated over the given period. + */ +calculated_number exporting_calculate_value_from_stored_data( + struct instance *instance, + RRDDIM *rd, + time_t *last_timestamp) +{ + RRDSET *st = rd->rrdset; + RRDHOST *host = st->rrdhost; + time_t after = instance->after; + time_t before = instance->before; + + // find the edges of the rrd database for this chart + time_t first_t = rd->state->query_ops.oldest_time(rd); + time_t last_t = rd->state->query_ops.latest_time(rd); + time_t update_every = st->update_every; + struct rrddim_query_handle handle; + storage_number n; + + // step back a little, to make sure we have complete data collection + // for all metrics + after -= update_every * 2; + before -= update_every * 2; + + // align the time-frame + after = after - (after % update_every); + before = before - (before % update_every); + + // for before, loose another iteration + // the latest point will be reported the next time + before -= update_every; + + if (unlikely(after > before)) + // this can happen when update_every > before - after + after = before; + + if (unlikely(after < first_t)) + after = first_t; + + if (unlikely(before > last_t)) + before = last_t; + + if (unlikely(before < first_t || after > last_t)) { + // the chart has not been updated in the wanted timeframe + debug( + D_BACKEND, + "EXPORTING: %s.%s.%s: aligned timeframe %lu to %lu is outside the chart's database range %lu to %lu", + host->hostname, + st->id, + rd->id, + (unsigned long)after, + (unsigned long)before, + (unsigned long)first_t, + (unsigned long)last_t); + return NAN; + } + + *last_timestamp = before; + + size_t counter = 0; + calculated_number sum = 0; + + for (rd->state->query_ops.init(rd, &handle, after, before); !rd->state->query_ops.is_finished(&handle);) { + time_t curr_t; + n = rd->state->query_ops.next_metric(&handle, &curr_t); + + if (unlikely(!does_storage_number_exist(n))) { + // not collected + continue; + } + + calculated_number value = unpack_storage_number(n); + sum += value; + + counter++; + } + rd->state->query_ops.finalize(&handle); + if (unlikely(!counter)) { + debug( + D_BACKEND, + "EXPORTING: %s.%s.%s: no values stored in database for range %lu to %lu", + host->hostname, + st->id, + rd->id, + (unsigned long)after, + (unsigned long)before); + return NAN; + } + + if (unlikely(EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_SUM)) + return sum; + + return sum / (calculated_number)counter; +} + +/** + * Start batch formatting for every connector instance's buffer + * + * @param engine an engine data structure. + */ +void start_batch_formatting(struct engine *engine) +{ + for (struct instance *instance = engine->instance_root; instance; instance = instance->next) { + if (instance->scheduled) { + uv_mutex_lock(&instance->mutex); + if (instance->start_batch_formatting && instance->start_batch_formatting(instance) != 0) { + error("EXPORTING: cannot start batch formatting for %s", instance->config.name); + disable_instance(instance); + } + } + } +} + +/** + * Start host formatting for every connector instance's buffer + * + * @param engine an engine data structure. + * @param host a data collecting host. + */ +void start_host_formatting(struct engine *engine, RRDHOST *host) +{ + for (struct instance *instance = engine->instance_root; instance; instance = instance->next) { + if (instance->scheduled) { + if (rrdhost_is_exportable(instance, host)) { + if (instance->start_host_formatting && instance->start_host_formatting(instance, host) != 0) { + error("EXPORTING: cannot start host formatting for %s", instance->config.name); + disable_instance(instance); + } + } else { + instance->skip_host = 1; + } + } + } +} + +/** + * Start chart formatting for every connector instance's buffer + * + * @param engine an engine data structure. + * @param st a chart. + */ +void start_chart_formatting(struct engine *engine, RRDSET *st) +{ + for (struct instance *instance = engine->instance_root; instance; instance = instance->next) { + if (instance->scheduled && !instance->skip_host) { + if (rrdset_is_exportable(instance, st)) { + if (instance->start_chart_formatting && instance->start_chart_formatting(instance, st) != 0) { + error("EXPORTING: cannot start chart formatting for %s", instance->config.name); + disable_instance(instance); + } + } else { + instance->skip_chart = 1; + } + } + } +} + +/** + * Format metric for every connector instance's buffer + * + * @param engine an engine data structure. + * @param rd a dimension(metric) in the Netdata database. + */ +void metric_formatting(struct engine *engine, RRDDIM *rd) +{ + for (struct instance *instance = engine->instance_root; instance; instance = instance->next) { + if (instance->scheduled && !instance->skip_host && !instance->skip_chart) { + if (instance->metric_formatting && instance->metric_formatting(instance, rd) != 0) { + error("EXPORTING: cannot format metric for %s", instance->config.name); + disable_instance(instance); + continue; + } + instance->stats.buffered_metrics++; + } + } +} + +/** + * End chart formatting for every connector instance's buffer + * + * @param engine an engine data structure. + * @param a chart. + */ +void end_chart_formatting(struct engine *engine, RRDSET *st) +{ + for (struct instance *instance = engine->instance_root; instance; instance = instance->next) { + if (instance->scheduled && !instance->skip_host && !instance->skip_chart) { + if (instance->end_chart_formatting && instance->end_chart_formatting(instance, st) != 0) { + error("EXPORTING: cannot end chart formatting for %s", instance->config.name); + disable_instance(instance); + continue; + } + } + instance->skip_chart = 0; + } +} + +/** + * End host formatting for every connector instance's buffer + * + * @param engine an engine data structure. + * @param host a data collecting host. + */ +void end_host_formatting(struct engine *engine, RRDHOST *host) +{ + for (struct instance *instance = engine->instance_root; instance; instance = instance->next) { + if (instance->scheduled && !instance->skip_host) { + if (instance->end_host_formatting && instance->end_host_formatting(instance, host) != 0) { + error("EXPORTING: cannot end host formatting for %s", instance->config.name); + disable_instance(instance); + continue; + } + } + instance->skip_host = 0; + } +} + +/** + * End batch formatting for every connector instance's buffer + * + * @param engine an engine data structure. + */ +void end_batch_formatting(struct engine *engine) +{ + for (struct instance *instance = engine->instance_root; instance; instance = instance->next) { + if (instance->scheduled) { + if (instance->end_batch_formatting && instance->end_batch_formatting(instance) != 0) { + error("EXPORTING: cannot end batch formatting for %s", instance->config.name); + disable_instance(instance); + continue; + } + uv_mutex_unlock(&instance->mutex); + instance->data_is_ready = 1; + uv_cond_signal(&instance->cond_var); + + instance->scheduled = 0; + instance->after = instance->before; + } + } +} + +/** + * Prepare buffers + * + * Walk through the Netdata database and fill buffers for every scheduled exporting connector instance according to + * configured rules. + * + * @param engine an engine data structure. + */ +void prepare_buffers(struct engine *engine) +{ + netdata_thread_disable_cancelability(); + start_batch_formatting(engine); + + rrd_rdlock(); + RRDHOST *host; + rrdhost_foreach_read(host) + { + rrdhost_rdlock(host); + start_host_formatting(engine, host); + RRDSET *st; + rrdset_foreach_read(st, host) + { + rrdset_rdlock(st); + start_chart_formatting(engine, st); + + RRDDIM *rd; + rrddim_foreach_read(rd, st) + metric_formatting(engine, rd); + + end_chart_formatting(engine, st); + rrdset_unlock(st); + } + + end_host_formatting(engine, host); + rrdhost_unlock(host); + } + rrd_unlock(); + netdata_thread_enable_cancelability(); + + end_batch_formatting(engine); +} + +/** + * Flush a buffer with host labels + * + * @param instance an instance data structure. + * @param host a data collecting host. + * @return Always returns 0. + */ +int flush_host_labels(struct instance *instance, RRDHOST *host) +{ + (void)host; + + if (instance->labels) + buffer_flush(instance->labels); + + return 0; +} + +/** + * End a batch for a simple connector + * + * @param instance an instance data structure. + * @return Returns 0 on success, 1 on failure. + */ +int simple_connector_end_batch(struct instance *instance) +{ + struct simple_connector_data *simple_connector_data = + (struct simple_connector_data *)instance->connector_specific_data; + struct stats *stats = &instance->stats; + + BUFFER *instance_buffer = (BUFFER *)instance->buffer; + struct simple_connector_buffer *last_buffer = simple_connector_data->last_buffer; + + if (!last_buffer->buffer) { + last_buffer->buffer = buffer_create(0); + } + + if (last_buffer->used) { + // ring buffer is full, reuse the oldest element + simple_connector_data->first_buffer = simple_connector_data->first_buffer->next; + + stats->data_lost_events++; + stats->lost_metrics += last_buffer->buffered_metrics; + stats->lost_bytes += last_buffer->buffered_bytes; + } + + // swap buffers + BUFFER *tmp_buffer = last_buffer->buffer; + last_buffer->buffer = instance_buffer; + instance->buffer = instance_buffer = tmp_buffer; + + buffer_flush(instance_buffer); + + if (last_buffer->header) + buffer_flush(last_buffer->header); + else + last_buffer->header = buffer_create(0); + + if (instance->prepare_header) + instance->prepare_header(instance); + + // The stats->buffered_metrics is used in the simple connector batch formatting as a variable for the number + // of metrics, added in the current iteration, so we are clearing it here. We will use the + // simple_connector_data->total_buffered_metrics in the worker to show the statistics. + size_t buffered_metrics = (size_t)stats->buffered_metrics; + stats->buffered_metrics = 0; + + size_t buffered_bytes = buffer_strlen(last_buffer->buffer); + + last_buffer->buffered_metrics = buffered_metrics; + last_buffer->buffered_bytes = buffered_bytes; + last_buffer->used++; + + simple_connector_data->total_buffered_metrics += buffered_metrics; + stats->buffered_bytes += buffered_bytes; + + simple_connector_data->last_buffer = simple_connector_data->last_buffer->next; + + return 0; +} |