summaryrefslogtreecommitdiffstats
path: root/database/engine/rrdengineapi.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine/rrdengineapi.c')
-rwxr-xr-xdatabase/engine/rrdengineapi.c267
1 files changed, 118 insertions, 149 deletions
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
index 27497bbb8..ddc306ed7 100755
--- a/database/engine/rrdengineapi.c
+++ b/database/engine/rrdengineapi.c
@@ -35,16 +35,16 @@ __attribute__((constructor)) void initialize_multidb_ctx(void) {
multidb_ctx[4] = &multidb_ctx_storage_tier4;
}
-int default_rrdeng_page_fetch_timeout = 3;
-int default_rrdeng_page_fetch_retries = 3;
int db_engine_journal_check = 0;
int default_rrdeng_disk_quota_mb = 256;
int default_multidb_disk_quota_mb = 256;
#if defined(ENV32BIT)
int default_rrdeng_page_cache_mb = 16;
+int default_rrdeng_extent_cache_mb = 0;
#else
int default_rrdeng_page_cache_mb = 32;
+int default_rrdeng_extent_cache_mb = 0;
#endif
// ----------------------------------------------------------------------------
@@ -163,7 +163,7 @@ STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE
}
#ifdef NETDATA_INTERNAL_CHECKS
- if(uuid_compare(rd->metric_uuid, *mrg_metric_uuid(main_mrg, metric)) != 0) {
+ if(uuid_memcmp(&rd->metric_uuid, mrg_metric_uuid(main_mrg, metric)) != 0) {
char uuid1[UUID_STR_LEN + 1];
char uuid2[UUID_STR_LEN + 1];
@@ -255,8 +255,11 @@ STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metri
struct rrdeng_collect_handle *handle;
handle = callocz(1, sizeof(struct rrdeng_collect_handle));
+ handle->common.backend = STORAGE_ENGINE_BACKEND_DBENGINE;
handle->metric = metric;
handle->page = NULL;
+ handle->data = NULL;
+ handle->data_size = 0;
handle->page_position = 0;
handle->page_entries_max = 0;
handle->update_every_ut = (usec_t)update_every * USEC_PER_SEC;
@@ -340,6 +343,8 @@ void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_h
handle->page_flags = 0;
handle->page_position = 0;
handle->page_entries_max = 0;
+ handle->data = NULL;
+ handle->data_size = 0;
// important!
// we should never zero page end time ut, because this will allow
@@ -348,6 +353,8 @@ void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_h
// handle->page_start_time_ut;
check_and_fix_mrg_update_every(handle);
+
+ timing_step(TIMING_STEP_DBENGINE_FLUSH_PAGE);
}
static void rrdeng_store_metric_create_new_page(struct rrdeng_collect_handle *handle,
@@ -365,7 +372,7 @@ static void rrdeng_store_metric_create_new_page(struct rrdeng_collect_handle *ha
.end_time_s = point_in_time_s,
.size = data_size,
.data = data,
- .update_every_s = update_every_s,
+ .update_every_s = (uint32_t) update_every_s,
.hot = true
};
@@ -414,62 +421,57 @@ static void rrdeng_store_metric_create_new_page(struct rrdeng_collect_handle *ha
handle->page_flags |= RRDENG_PAGE_CREATED_IN_FUTURE;
check_and_fix_mrg_update_every(handle);
+
+ timing_step(TIMING_STEP_DBENGINE_CREATE_NEW_PAGE);
}
-static void *rrdeng_alloc_new_metric_data(struct rrdeng_collect_handle *handle, size_t *data_size, usec_t point_in_time_ut) {
- struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric);
- size_t size;
+static size_t aligned_allocation_entries(size_t max_slots, size_t target_slot, time_t now_s) {
+ size_t slots = target_slot;
+ size_t pos = (now_s % max_slots);
- if(handle->options & RRDENG_FIRST_PAGE_ALLOCATED) {
- // any page except the first
- size = tier_page_size[ctx->config.tier];
- }
- else {
- size_t final_slots = 0;
+ if(pos > slots)
+ slots += max_slots - pos;
- // the first page
- handle->options |= RRDENG_FIRST_PAGE_ALLOCATED;
- size_t max_size = tier_page_size[ctx->config.tier];
- size_t max_slots = max_size / CTX_POINT_SIZE_BYTES(ctx);
+ else if(pos < slots)
+ slots -= pos;
- if(handle->alignment->initial_slots) {
- final_slots = handle->alignment->initial_slots;
- }
- else {
- max_slots -= 3;
+ else
+ slots = max_slots;
- size_t smaller_slot = indexing_partition((Word_t)handle->alignment, max_slots);
- final_slots = smaller_slot;
+ return slots;
+}
- time_t now_s = (time_t)(point_in_time_ut / USEC_PER_SEC);
- size_t current_pos = (now_s % max_slots);
+static void *rrdeng_alloc_new_metric_data(struct rrdeng_collect_handle *handle, size_t *data_size, usec_t point_in_time_ut) {
+ struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric);
- if(current_pos > final_slots)
- final_slots += max_slots - current_pos;
+ size_t max_size = tier_page_size[ctx->config.tier];
+ size_t max_slots = max_size / CTX_POINT_SIZE_BYTES(ctx);
- else if(current_pos < final_slots)
- final_slots -= current_pos;
+ size_t slots = aligned_allocation_entries(
+ max_slots,
+ indexing_partition((Word_t) handle->alignment, max_slots),
+ (time_t) (point_in_time_ut / USEC_PER_SEC)
+ );
- if(final_slots < 3) {
- final_slots += 3;
- smaller_slot += 3;
+ if(slots < max_slots / 3)
+ slots = max_slots / 3;
- if(smaller_slot >= max_slots)
- smaller_slot -= max_slots;
- }
+ if(slots < 3)
+ slots = 3;
- max_slots += 3;
- handle->alignment->initial_slots = smaller_slot + 3;
+ size_t size = slots * CTX_POINT_SIZE_BYTES(ctx);
- internal_fatal(handle->alignment->initial_slots < 3 || handle->alignment->initial_slots >= max_slots, "ooops! wrong distribution of metrics across time");
- internal_fatal(final_slots < 3 || final_slots >= max_slots, "ooops! wrong distribution of metrics across time");
- }
+ // internal_error(true, "PAGE ALLOC %zu bytes (%zu max)", size, max_size);
- size = final_slots * CTX_POINT_SIZE_BYTES(ctx);
- }
+ internal_fatal(slots < 3 || slots > max_slots, "ooops! wrong distribution of metrics across time");
+ internal_fatal(size > tier_page_size[ctx->config.tier] || size < CTX_POINT_SIZE_BYTES(ctx) * 2, "ooops! wrong page size");
*data_size = size;
- return dbengine_page_alloc(size);
+ void *d = dbengine_page_alloc(size);
+
+ timing_step(TIMING_STEP_DBENGINE_PAGE_ALLOC);
+
+ return d;
}
static void rrdeng_store_metric_append_point(STORAGE_COLLECT_HANDLE *collection_handle,
@@ -484,75 +486,33 @@ static void rrdeng_store_metric_append_point(STORAGE_COLLECT_HANDLE *collection_
struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric);
- bool perfect_page_alignment = false;
- void *data;
- size_t data_size;
+ if(unlikely(!handle->data))
+ handle->data = rrdeng_alloc_new_metric_data(handle, &handle->data_size, point_in_time_ut);
- if(likely(handle->page)) {
- /* Make alignment decisions */
- if (handle->page_position == handle->alignment->page_position) {
- /* this is the leading dimension that defines chart alignment */
- perfect_page_alignment = true;
- }
-
- /* is the metric far enough out of alignment with the others? */
- if (unlikely(handle->page_position + 1 < handle->alignment->page_position))
- handle->options |= RRDENG_CHO_UNALIGNED;
+ timing_step(TIMING_STEP_DBENGINE_CHECK_DATA);
- if (unlikely((handle->options & RRDENG_CHO_UNALIGNED) &&
- /* did the other metrics change page? */
- handle->alignment->page_position <= 1)) {
- handle->options &= ~RRDENG_CHO_UNALIGNED;
- handle->page_flags |= RRDENG_PAGE_UNALIGNED;
- rrdeng_store_metric_flush_current_page(collection_handle);
-
- data = rrdeng_alloc_new_metric_data(handle, &data_size, point_in_time_ut);
- }
- else {
- data = pgc_page_data(handle->page);
- data_size = pgc_page_data_size(main_cache, handle->page);
- }
+ if(likely(ctx->config.page_type == PAGE_METRICS)) {
+ storage_number *tier0_metric_data = handle->data;
+ tier0_metric_data[handle->page_position] = pack_storage_number(n, flags);
+ }
+ else if(likely(ctx->config.page_type == PAGE_TIER)) {
+ storage_number_tier1_t *tier12_metric_data = handle->data;
+ storage_number_tier1_t number_tier1;
+ number_tier1.sum_value = (float) n;
+ number_tier1.min_value = (float) min_value;
+ number_tier1.max_value = (float) max_value;
+ number_tier1.anomaly_count = anomaly_count;
+ number_tier1.count = count;
+ tier12_metric_data[handle->page_position] = number_tier1;
}
else
- data = rrdeng_alloc_new_metric_data(handle, &data_size, point_in_time_ut);
-
- switch (ctx->config.page_type) {
- case PAGE_METRICS: {
- storage_number *tier0_metric_data = data;
- tier0_metric_data[handle->page_position] = pack_storage_number(n, flags);
- }
- break;
+ fatal("DBENGINE: cannot store metric on unknown page type id %d", ctx->config.page_type);
- case PAGE_TIER: {
- storage_number_tier1_t *tier12_metric_data = data;
- storage_number_tier1_t number_tier1;
- number_tier1.sum_value = (float)n;
- number_tier1.min_value = (float)min_value;
- number_tier1.max_value = (float)max_value;
- number_tier1.anomaly_count = anomaly_count;
- number_tier1.count = count;
- tier12_metric_data[handle->page_position] = number_tier1;
- }
- break;
-
- default: {
- static bool logged = false;
- if(!logged) {
- error("DBENGINE: cannot store metric on unknown page type id %d", ctx->config.page_type);
- logged = true;
- }
- }
- break;
- }
+ timing_step(TIMING_STEP_DBENGINE_PACK);
if(unlikely(!handle->page)){
- rrdeng_store_metric_create_new_page(handle, ctx, point_in_time_ut, data, data_size);
+ rrdeng_store_metric_create_new_page(handle, ctx, point_in_time_ut, handle->data, handle->data_size);
// handle->position is set to 1 already
-
- if (0 == handle->alignment->page_position) {
- /* this is the leading dimension that defines chart alignment */
- perfect_page_alignment = true;
- }
}
else {
// update an existing page
@@ -566,11 +526,12 @@ static void rrdeng_store_metric_append_point(STORAGE_COLLECT_HANDLE *collection_
}
}
- if (perfect_page_alignment)
- handle->alignment->page_position = handle->page_position;
+ timing_step(TIMING_STEP_DBENGINE_PAGE_FIN);
// update the metric information
mrg_metric_set_hot_latest_time_s(main_mrg, handle->metric, (time_t) (point_in_time_ut / USEC_PER_SEC));
+
+ timing_step(TIMING_STEP_DBENGINE_MRG_UPDATE);
}
static void store_metric_next_error_log(struct rrdeng_collect_handle *handle, usec_t point_in_time_ut, const char *msg) {
@@ -612,6 +573,8 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle,
const uint16_t anomaly_count,
const SN_FLAGS flags)
{
+ timing_step(TIMING_STEP_RRDSET_STORE_METRIC);
+
struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
#ifdef NETDATA_INTERNAL_CHECKS
@@ -619,59 +582,62 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle,
handle->page_flags |= RRDENG_PAGE_FUTURE_POINT;
#endif
- if(likely(handle->page_end_time_ut + handle->update_every_ut == point_in_time_ut)) {
+ usec_t delta_ut = point_in_time_ut - handle->page_end_time_ut;
+
+ if(likely(delta_ut == handle->update_every_ut)) {
// happy path
;
}
+ else if(unlikely(point_in_time_ut > handle->page_end_time_ut)) {
+ if(handle->page) {
+ if (unlikely(delta_ut < handle->update_every_ut)) {
+ handle->page_flags |= RRDENG_PAGE_STEP_TOO_SMALL;
+ rrdeng_store_metric_flush_current_page(collection_handle);
+ }
+ else if (unlikely(delta_ut % handle->update_every_ut)) {
+ handle->page_flags |= RRDENG_PAGE_STEP_UNALIGNED;
+ rrdeng_store_metric_flush_current_page(collection_handle);
+ }
+ else {
+ size_t points_gap = delta_ut / handle->update_every_ut;
+ size_t page_remaining_points = handle->page_entries_max - handle->page_position;
+
+ if (points_gap >= page_remaining_points) {
+ handle->page_flags |= RRDENG_PAGE_BIG_GAP;
+ rrdeng_store_metric_flush_current_page(collection_handle);
+ }
+ else {
+ // loop to fill the gap
+ handle->page_flags |= RRDENG_PAGE_GAP;
+
+ usec_t stop_ut = point_in_time_ut - handle->update_every_ut;
+ for (usec_t this_ut = handle->page_end_time_ut + handle->update_every_ut;
+ this_ut <= stop_ut;
+ this_ut = handle->page_end_time_ut + handle->update_every_ut) {
+ rrdeng_store_metric_append_point(
+ collection_handle,
+ this_ut,
+ NAN, NAN, NAN,
+ 1, 0,
+ SN_EMPTY_SLOT);
+ }
+ }
+ }
+ }
+ }
else if(unlikely(point_in_time_ut < handle->page_end_time_ut)) {
handle->page_flags |= RRDENG_PAGE_PAST_COLLECTION;
store_metric_next_error_log(handle, point_in_time_ut, "is older than the");
return;
}
- else if(unlikely(point_in_time_ut == handle->page_end_time_ut)) {
+ else /* if(unlikely(point_in_time_ut == handle->page_end_time_ut)) */ {
handle->page_flags |= RRDENG_PAGE_REPEATED_COLLECTION;
store_metric_next_error_log(handle, point_in_time_ut, "is at the same time as the");
return;
}
- else if(handle->page) {
- usec_t delta_ut = point_in_time_ut - handle->page_end_time_ut;
-
- if(unlikely(delta_ut < handle->update_every_ut)) {
- handle->page_flags |= RRDENG_PAGE_STEP_TOO_SMALL;
- rrdeng_store_metric_flush_current_page(collection_handle);
- }
- else if(unlikely(delta_ut % handle->update_every_ut)) {
- handle->page_flags |= RRDENG_PAGE_STEP_UNALIGNED;
- rrdeng_store_metric_flush_current_page(collection_handle);
- }
- else {
- size_t points_gap = delta_ut / handle->update_every_ut;
- size_t page_remaining_points = handle->page_entries_max - handle->page_position;
-
- if(points_gap >= page_remaining_points) {
- handle->page_flags |= RRDENG_PAGE_BIG_GAP;
- rrdeng_store_metric_flush_current_page(collection_handle);
- }
- else {
- // loop to fill the gap
- handle->page_flags |= RRDENG_PAGE_GAP;
-
- usec_t stop_ut = point_in_time_ut - handle->update_every_ut;
- for(usec_t this_ut = handle->page_end_time_ut + handle->update_every_ut;
- this_ut <= stop_ut ;
- this_ut = handle->page_end_time_ut + handle->update_every_ut) {
- rrdeng_store_metric_append_point(
- collection_handle,
- this_ut,
- NAN, NAN, NAN,
- 1, 0,
- SN_EMPTY_SLOT);
- }
- }
- }
- }
+ timing_step(TIMING_STEP_DBENGINE_FIRST_CHECK);
rrdeng_store_metric_append_point(collection_handle,
point_in_time_ut,
@@ -776,10 +742,10 @@ void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle,
handle = rrdeng_query_handle_get();
register_query_handle(handle);
- if(unlikely(priority < STORAGE_PRIORITY_HIGH))
+ if (unlikely(priority < STORAGE_PRIORITY_HIGH))
priority = STORAGE_PRIORITY_HIGH;
- else if(unlikely(priority > STORAGE_PRIORITY_BEST_EFFORT))
- priority = STORAGE_PRIORITY_BEST_EFFORT;
+ else if (unlikely(priority >= STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE))
+ priority = STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE - 1;
handle->ctx = ctx;
handle->metric = metric;
@@ -809,6 +775,7 @@ void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle,
rrddim_handle->start_time_s = handle->start_time_s;
rrddim_handle->end_time_s = handle->end_time_s;
rrddim_handle->priority = priority;
+ rrddim_handle->backend = STORAGE_ENGINE_BACKEND_DBENGINE;
pg_cache_preload(handle);
@@ -824,6 +791,7 @@ void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle,
rrddim_handle->start_time_s = handle->start_time_s;
rrddim_handle->end_time_s = 0;
rrddim_handle->priority = priority;
+ rrddim_handle->backend = STORAGE_ENGINE_BACKEND_DBENGINE;
}
}
@@ -906,6 +874,7 @@ STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim
// We need to get a new page
if (!rrdeng_load_page_next(rrddim_handle, false)) {
+ handle->now_s = rrddim_handle->end_time_s;
storage_point_empty(sp, handle->now_s - handle->dt_s, handle->now_s);
goto prepare_for_next_iteration;
}