summaryrefslogtreecommitdiffstats
path: root/database/engine/rrdengineapi.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine/rrdengineapi.c')
-rw-r--r--database/engine/rrdengineapi.c277
1 files changed, 192 insertions, 85 deletions
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
index a4e71155..a87ce6d6 100644
--- a/database/engine/rrdengineapi.c
+++ b/database/engine/rrdengineapi.c
@@ -41,6 +41,7 @@ void rrdeng_store_metric_init(RRDDIM *rd)
handle->descr = NULL;
handle->prev_descr = NULL;
+ handle->unaligned_page = 0;
uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, &temp_id, sizeof(uuid_t));
@@ -54,59 +55,140 @@ void rrdeng_store_metric_init(RRDDIM *rd)
PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, &temp_id, sizeof(uuid_t), PJE0);
assert(NULL == *PValue); /* TODO: figure out concurrency model */
*PValue = page_index = create_page_index(&temp_id);
+ page_index->prev = pg_cache->metrics_index.last_page_index;
+ pg_cache->metrics_index.last_page_index = page_index;
uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
}
rd->state->rrdeng_uuid = &page_index->id;
handle->page_index = page_index;
}
+/* The page must be populated and referenced */
+static int page_has_only_empty_metrics(struct rrdeng_page_descr *descr)
+{
+ unsigned i;
+ uint8_t has_only_empty_metrics = 1;
+ storage_number *page;
+
+ page = descr->pg_cache_descr->page;
+ for (i = 0 ; i < descr->page_length / sizeof(storage_number); ++i) {
+ if (SN_EMPTY_SLOT != page[i]) {
+ has_only_empty_metrics = 0;
+ break;
+ }
+ }
+ return has_only_empty_metrics;
+}
+
+void rrdeng_store_metric_flush_current_page(RRDDIM *rd)
+{
+ struct rrdeng_collect_handle *handle;
+ struct rrdengine_instance *ctx;
+ struct rrdeng_page_descr *descr;
+
+ handle = &rd->state->handle.rrdeng;
+ ctx = handle->ctx;
+ descr = handle->descr;
+ if (unlikely(NULL == descr)) {
+ return;
+ }
+ if (likely(descr->page_length)) {
+ int ret, page_is_empty;
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1);
+#endif
+ if (handle->prev_descr) {
+ /* unpin old second page */
+ pg_cache_put(ctx, handle->prev_descr);
+ }
+ page_is_empty = page_has_only_empty_metrics(descr);
+ if (page_is_empty) {
+ debug(D_RRDENGINE, "Page has empty metrics only, deleting:");
+ if (unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ pg_cache_put(ctx, descr);
+ pg_cache_punch_hole(ctx, descr, 1);
+ handle->prev_descr = NULL;
+ } else {
+ /* added 1 extra reference to keep 2 dirty pages pinned per metric, expected refcnt = 2 */
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ ret = pg_cache_try_get_unsafe(descr, 0);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ assert (1 == ret);
+
+ rrdeng_commit_page(ctx, descr, handle->page_correlation_id);
+ handle->prev_descr = descr;
+ }
+ } else {
+ freez(descr->pg_cache_descr->page);
+ rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr);
+ freez(descr);
+ }
+ handle->descr = NULL;
+}
+
void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number)
{
struct rrdeng_collect_handle *handle;
struct rrdengine_instance *ctx;
struct page_cache *pg_cache;
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
storage_number *page;
+ uint8_t must_flush_unaligned_page = 0, perfect_page_alignment = 0;
handle = &rd->state->handle.rrdeng;
ctx = handle->ctx;
pg_cache = &ctx->pg_cache;
descr = handle->descr;
- if (unlikely(NULL == descr || descr->page_length + sizeof(number) > RRDENG_BLOCK_SIZE)) {
- if (descr) {
- descr->handle = NULL;
- if (descr->page_length) {
-#ifdef NETDATA_INTERNAL_CHECKS
- rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1);
-#endif
- /* added 1 extra reference to keep 2 dirty pages pinned per metric, expected refcnt = 2 */
- ++descr->refcnt;
- rrdeng_commit_page(ctx, descr, handle->page_correlation_id);
- if (handle->prev_descr) {
- /* unpin old second page */
- pg_cache_put(handle->prev_descr);
- }
- handle->prev_descr = descr;
- } else {
- free(descr->page);
- free(descr);
- handle->descr = NULL;
- }
+
+ if (descr) {
+ /* Make alignment decisions */
+
+ if (descr->page_length == rd->rrdset->rrddim_page_alignment) {
+ /* this is the leading dimension that defines chart alignment */
+ perfect_page_alignment = 1;
+ }
+ /* is the metric far enough out of alignment with the others? */
+ if (unlikely(descr->page_length + sizeof(number) < rd->rrdset->rrddim_page_alignment)) {
+ handle->unaligned_page = 1;
+ debug(D_RRDENGINE, "Metric page is not aligned with chart:");
+ if (unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
}
- page = rrdeng_create_page(&handle->page_index->id, &descr);
+ if (unlikely(handle->unaligned_page &&
+ /* did the other metrics change page? */
+ rd->rrdset->rrddim_page_alignment <= sizeof(number))) {
+ debug(D_RRDENGINE, "Flushing unaligned metric page.");
+ must_flush_unaligned_page = 1;
+ handle->unaligned_page = 0;
+ }
+ }
+ if (unlikely(NULL == descr ||
+ descr->page_length + sizeof(number) > RRDENG_BLOCK_SIZE ||
+ must_flush_unaligned_page)) {
+ rrdeng_store_metric_flush_current_page(rd);
+
+ page = rrdeng_create_page(ctx, &handle->page_index->id, &descr);
assert(page);
- handle->prev_descr = handle->descr;
+
handle->descr = descr;
- descr->handle = handle;
+
uv_rwlock_wrlock(&pg_cache->commited_page_index.lock);
handle->page_correlation_id = pg_cache->commited_page_index.latest_corr_id++;
uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock);
- }
- page = descr->page;
+ if (0 == rd->rrdset->rrddim_page_alignment) {
+ /* this is the leading dimension that defines chart alignment */
+ perfect_page_alignment = 1;
+ }
+ }
+ page = descr->pg_cache_descr->page;
page[descr->page_length / sizeof(number)] = number;
descr->end_time = point_in_time;
descr->page_length += sizeof(number);
+ if (perfect_page_alignment)
+ rd->rrdset->rrddim_page_alignment = descr->page_length;
if (unlikely(INVALID_TIME == descr->start_time)) {
descr->start_time = point_in_time;
@@ -126,26 +208,13 @@ void rrdeng_store_metric_finalize(RRDDIM *rd)
{
struct rrdeng_collect_handle *handle;
struct rrdengine_instance *ctx;
- struct rrdeng_page_cache_descr *descr;
handle = &rd->state->handle.rrdeng;
ctx = handle->ctx;
- descr = handle->descr;
- if (descr) {
- descr->handle = NULL;
- if (descr->page_length) {
-#ifdef NETDATA_INTERNAL_CHECKS
- rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1);
-#endif
- rrdeng_commit_page(ctx, descr, handle->page_correlation_id);
- if (handle->prev_descr) {
- /* unpin old second page */
- pg_cache_put(handle->prev_descr);
- }
- } else {
- free(descr->page);
- free(descr);
- }
+ rrdeng_store_metric_flush_current_page(rd);
+ if (handle->prev_descr) {
+ /* unpin old second page */
+ pg_cache_put(ctx, handle->prev_descr);
}
}
@@ -174,7 +243,7 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle
{
struct rrdeng_query_handle *handle;
struct rrdengine_instance *ctx;
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
storage_number *page, ret;
unsigned position;
usec_t point_in_time;
@@ -198,7 +267,7 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle
#ifdef NETDATA_INTERNAL_CHECKS
rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
#endif
- pg_cache_put(descr);
+ pg_cache_put(ctx, descr);
handle->descr = NULL;
}
descr = pg_cache_lookup(ctx, handle->page_index, &handle->page_index->id, point_in_time);
@@ -216,7 +285,7 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle
ret = SN_EMPTY_SLOT;
goto out;
}
- page = descr->page;
+ page = descr->pg_cache_descr->page;
if (unlikely(descr->start_time == descr->end_time)) {
ret = page[0];
goto out;
@@ -248,7 +317,7 @@ void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle)
{
struct rrdeng_query_handle *handle;
struct rrdengine_instance *ctx;
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
handle = &rrdimm_handle->rrdeng;
ctx = handle->ctx;
@@ -257,7 +326,7 @@ void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle)
#ifdef NETDATA_INTERNAL_CHECKS
rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
#endif
- pg_cache_put(descr);
+ pg_cache_put(ctx, descr);
}
}
@@ -283,30 +352,32 @@ time_t rrdeng_metric_oldest_time(RRDDIM *rd)
}
/* Also gets a reference for the page */
-void *rrdeng_create_page(uuid_t *id, struct rrdeng_page_cache_descr **ret_descr)
+void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr)
{
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
void *page;
- int ret;
-
/* TODO: check maximum number of pages in page cache limit */
- page = mallocz(RRDENG_BLOCK_SIZE); /*TODO: add page size */
descr = pg_cache_create_descr();
- descr->page = page;
descr->id = id; /* TODO: add page type: metric, log, something? */
- descr->flags = RRD_PAGE_DIRTY /*| RRD_PAGE_LOCKED */ | RRD_PAGE_POPULATED /* | BEING_COLLECTED */;
- descr->refcnt = 1;
-
- debug(D_RRDENGINE, "-----------------\nCreated new page:\n-----------------");
- if(unlikely(debug_flags & D_RRDENGINE))
+ page = mallocz(RRDENG_BLOCK_SIZE); /*TODO: add page size */
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ pg_cache_descr->page = page;
+ pg_cache_descr->flags = RRD_PAGE_DIRTY /*| RRD_PAGE_LOCKED */ | RRD_PAGE_POPULATED /* | BEING_COLLECTED */;
+ pg_cache_descr->refcnt = 1;
+
+ debug(D_RRDENGINE, "Created new page:");
+ if (unlikely(debug_flags & D_RRDENGINE))
print_page_cache_descr(descr);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
*ret_descr = descr;
return page;
}
/* The page must not be empty */
-void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr,
+void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr,
Word_t page_correlation_id)
{
struct page_cache *pg_cache = &ctx->pg_cache;
@@ -324,15 +395,16 @@ void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_cache
++pg_cache->commited_page_index.nr_commited_pages;
uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock);
- pg_cache_put(descr);
+ pg_cache_put(ctx, descr);
}
/* Gets a reference for the page */
void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle)
{
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
- debug(D_RRDENGINE, "----------------------\nReading existing page:\n----------------------");
+ debug(D_RRDENGINE, "Reading existing page:");
descr = pg_cache_lookup(ctx, NULL, id, INVALID_TIME);
if (NULL == descr) {
*handle = NULL;
@@ -340,16 +412,18 @@ void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **
return NULL;
}
*handle = descr;
+ pg_cache_descr = descr->pg_cache_descr;
- return descr->page;
+ return pg_cache_descr->page;
}
/* Gets a reference for the page */
void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle)
{
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
- debug(D_RRDENGINE, "----------------------\nReading existing page:\n----------------------");
+ debug(D_RRDENGINE, "Reading existing page:");
descr = pg_cache_lookup(ctx, NULL, id, point_in_time);
if (NULL == descr) {
*handle = NULL;
@@ -357,11 +431,18 @@ void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_i
return NULL;
}
*handle = descr;
+ pg_cache_descr = descr->pg_cache_descr;
- return descr->page;
+ return pg_cache_descr->page;
}
-void rrdeng_get_27_statistics(struct rrdengine_instance *ctx, unsigned long long *array)
+/*
+ * Gathers Database Engine statistics.
+ * Careful when modifying this function.
+ * You must not change the indices of the statistics or user code will break.
+ * You must not exceed RRDENG_NR_STATS or it will crash.
+ */
+void rrdeng_get_33_statistics(struct rrdengine_instance *ctx, unsigned long long *array)
{
struct page_cache *pg_cache = &ctx->pg_cache;
@@ -392,24 +473,46 @@ void rrdeng_get_27_statistics(struct rrdengine_instance *ctx, unsigned long long
array[24] = (uint64_t)ctx->stats.datafile_deletions;
array[25] = (uint64_t)ctx->stats.journalfile_creations;
array[26] = (uint64_t)ctx->stats.journalfile_deletions;
+ array[27] = (uint64_t)ctx->stats.page_cache_descriptors;
+ array[28] = (uint64_t)ctx->stats.io_errors;
+ array[29] = (uint64_t)ctx->stats.fs_errors;
+ array[30] = (uint64_t)global_io_errors;
+ array[31] = (uint64_t)global_fs_errors;
+ array[32] = (uint64_t)rrdeng_reserved_file_descriptors;
+ assert(RRDENG_NR_STATS == 33);
}
/* Releases reference to page */
void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle)
{
(void)ctx;
- pg_cache_put((struct rrdeng_page_cache_descr *)handle);
+ pg_cache_put(ctx, (struct rrdeng_page_descr *)handle);
}
/*
- * Returns 0 on success, 1 on error
+ * Returns 0 on success, negative on error
*/
int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb, unsigned disk_space_mb)
{
struct rrdengine_instance *ctx;
int error;
+ uint32_t max_open_files;
sanity_check();
+
+ max_open_files = rlimit_nofile.rlim_cur / 4;
+
+ /* reserve RRDENG_FD_BUDGET_PER_INSTANCE file descriptors for this instance */
+ rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, RRDENG_FD_BUDGET_PER_INSTANCE);
+ if (rrdeng_reserved_file_descriptors > max_open_files) {
+ error("Exceeded the budget of available file descriptors (%u/%u), cannot create new dbengine instance.",
+ (unsigned)rrdeng_reserved_file_descriptors, (unsigned)max_open_files);
+
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
+ return UV_EMFILE;
+ }
+
if (NULL == ctxp) {
/* for testing */
ctx = &default_global_ctx;
@@ -417,10 +520,6 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p
} else {
*ctxp = ctx = callocz(1, sizeof(*ctx));
}
- if (ctx->rrdengine_state != RRDENGINE_STATUS_UNINITIALIZED) {
- return 1;
- }
- ctx->rrdengine_state = RRDENGINE_STATUS_INITIALIZING;
ctx->global_compress_alg = RRD_LZ4;
if (page_cache_mb < RRDENG_MIN_PAGE_CACHE_SIZE_MB)
page_cache_mb = RRDENG_MIN_PAGE_CACHE_SIZE_MB;
@@ -439,11 +538,7 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p
init_commit_log(ctx);
error = init_rrd_files(ctx);
if (error) {
- ctx->rrdengine_state = RRDENGINE_STATUS_UNINITIALIZED;
- if (ctx != &default_global_ctx) {
- freez(ctx);
- }
- return 1;
+ goto error_after_init_rrd_files;
}
init_completion(&ctx->rrdengine_completion);
@@ -451,9 +546,21 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p
/* wait for worker thread to initialize */
wait_for_completion(&ctx->rrdengine_completion);
destroy_completion(&ctx->rrdengine_completion);
-
- ctx->rrdengine_state = RRDENGINE_STATUS_INITIALIZED;
+ if (ctx->worker_config.error) {
+ goto error_after_rrdeng_worker;
+ }
return 0;
+
+error_after_rrdeng_worker:
+ finalize_rrd_files(ctx);
+error_after_init_rrd_files:
+ free_page_cache(ctx);
+ if (ctx != &default_global_ctx) {
+ freez(ctx);
+ *ctxp = NULL;
+ }
+ rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
+ return UV_EIO;
}
/*
@@ -464,10 +571,6 @@ int rrdeng_exit(struct rrdengine_instance *ctx)
struct rrdeng_cmd cmd;
if (NULL == ctx) {
- /* TODO: move to per host basis */
- ctx = &default_global_ctx;
- }
- if (ctx->rrdengine_state != RRDENGINE_STATUS_INITIALIZED) {
return 1;
}
@@ -477,8 +580,12 @@ int rrdeng_exit(struct rrdengine_instance *ctx)
assert(0 == uv_thread_join(&ctx->worker_config.thread));
+ finalize_rrd_files(ctx);
+ free_page_cache(ctx);
+
if (ctx != &default_global_ctx) {
freez(ctx);
}
+ rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
return 0;
} \ No newline at end of file