summaryrefslogtreecommitdiffstats
path: root/exporting/process_data.c
diff options
context:
space:
mode:
Diffstat (limited to 'exporting/process_data.c')
-rw-r--r--exporting/process_data.c54
1 files changed, 36 insertions, 18 deletions
diff --git a/exporting/process_data.c b/exporting/process_data.c
index c77b7ad4..d5138b78 100644
--- a/exporting/process_data.c
+++ b/exporting/process_data.c
@@ -64,7 +64,7 @@ int mark_scheduled_instances(struct engine *engine)
* @param last_timestamp the timestamp that should be reported to the exporting connector instance.
* @return Returns the value, calculated over the given period.
*/
-calculated_number exporting_calculate_value_from_stored_data(
+NETDATA_DOUBLE exporting_calculate_value_from_stored_data(
struct instance *instance,
RRDDIM *rd,
time_t *last_timestamp)
@@ -77,11 +77,10 @@ calculated_number exporting_calculate_value_from_stored_data(
time_t before = instance->before;
// find the edges of the rrd database for this chart
- time_t first_t = rd->state->query_ops.oldest_time(rd);
- time_t last_t = rd->state->query_ops.latest_time(rd);
+ time_t first_t = rd->tiers[0]->query_ops.oldest_time(rd->tiers[0]->db_metric_handle);
+ time_t last_t = rd->tiers[0]->query_ops.latest_time(rd->tiers[0]->db_metric_handle);
time_t update_every = st->update_every;
struct rrddim_query_handle handle;
- storage_number n;
// step back a little, to make sure we have complete data collection
// for all metrics
@@ -124,23 +123,21 @@ calculated_number exporting_calculate_value_from_stored_data(
*last_timestamp = before;
size_t counter = 0;
- calculated_number sum = 0;
+ NETDATA_DOUBLE sum = 0;
- for (rd->state->query_ops.init(rd, &handle, after, before); !rd->state->query_ops.is_finished(&handle);) {
- time_t curr_t;
- n = rd->state->query_ops.next_metric(&handle, &curr_t);
+ for (rd->tiers[0]->query_ops.init(rd->tiers[0]->db_metric_handle, &handle, after, before, TIER_QUERY_FETCH_SUM); !rd->tiers[0]->query_ops.is_finished(&handle);) {
+ STORAGE_POINT sp = rd->tiers[0]->query_ops.next_metric(&handle);
- if (unlikely(!does_storage_number_exist(n))) {
+ if (unlikely(storage_point_is_empty(sp))) {
// not collected
continue;
}
- calculated_number value = unpack_storage_number(n);
- sum += value;
-
- counter++;
+ sum += sp.sum;
+ counter += sp.count;
}
- rd->state->query_ops.finalize(&handle);
+ rd->tiers[0]->query_ops.finalize(&handle);
+
if (unlikely(!counter)) {
debug(
D_EXPORTING,
@@ -156,7 +153,7 @@ calculated_number exporting_calculate_value_from_stored_data(
if (unlikely(EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_SUM))
return sum;
- return sum / (calculated_number)counter;
+ return sum / (NETDATA_DOUBLE)counter;
}
/**
@@ -262,6 +259,27 @@ void end_chart_formatting(struct engine *engine, RRDSET *st)
}
/**
+ * Format variables for every connector instance's buffer
+ *
+ * @param engine an engine data structure.
+ * @param host a data collecting host.
+ */
+void variables_formatting(struct engine *engine, RRDHOST *host)
+{
+ for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
+ if (instance->scheduled && !instance->skip_host && should_send_variables(instance)) {
+ if (instance->variables_formatting && instance->variables_formatting(instance, host) != 0){
+ error("EXPORTING: cannot format variables for %s", instance->config.name);
+ disable_instance(instance);
+ continue;
+ }
+ // sum all variables as one metrics
+ instance->stats.buffered_metrics++;
+ }
+ }
+}
+
+/**
* End host formatting for every connector instance's buffer
*
* @param engine an engine data structure.
@@ -337,7 +355,7 @@ void prepare_buffers(struct engine *engine)
end_chart_formatting(engine, st);
rrdset_unlock(st);
}
-
+ variables_formatting(engine, host);
end_host_formatting(engine, host);
rrdhost_unlock(host);
}
@@ -358,8 +376,8 @@ int flush_host_labels(struct instance *instance, RRDHOST *host)
{
(void)host;
- if (instance->labels)
- buffer_flush(instance->labels);
+ if (instance->labels_buffer)
+ buffer_flush(instance->labels_buffer);
return 0;
}