diff options
Diffstat (limited to 'database/engine/rrdengineapi.c')
-rw-r--r-- | database/engine/rrdengineapi.c | 277 |
1 files changed, 192 insertions, 85 deletions
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c index a4e71155..a87ce6d6 100644 --- a/database/engine/rrdengineapi.c +++ b/database/engine/rrdengineapi.c @@ -41,6 +41,7 @@ void rrdeng_store_metric_init(RRDDIM *rd) handle->descr = NULL; handle->prev_descr = NULL; + handle->unaligned_page = 0; uv_rwlock_rdlock(&pg_cache->metrics_index.lock); PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, &temp_id, sizeof(uuid_t)); @@ -54,59 +55,140 @@ void rrdeng_store_metric_init(RRDDIM *rd) PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, &temp_id, sizeof(uuid_t), PJE0); assert(NULL == *PValue); /* TODO: figure out concurrency model */ *PValue = page_index = create_page_index(&temp_id); + page_index->prev = pg_cache->metrics_index.last_page_index; + pg_cache->metrics_index.last_page_index = page_index; uv_rwlock_wrunlock(&pg_cache->metrics_index.lock); } rd->state->rrdeng_uuid = &page_index->id; handle->page_index = page_index; } +/* The page must be populated and referenced */ +static int page_has_only_empty_metrics(struct rrdeng_page_descr *descr) +{ + unsigned i; + uint8_t has_only_empty_metrics = 1; + storage_number *page; + + page = descr->pg_cache_descr->page; + for (i = 0 ; i < descr->page_length / sizeof(storage_number); ++i) { + if (SN_EMPTY_SLOT != page[i]) { + has_only_empty_metrics = 0; + break; + } + } + return has_only_empty_metrics; +} + +void rrdeng_store_metric_flush_current_page(RRDDIM *rd) +{ + struct rrdeng_collect_handle *handle; + struct rrdengine_instance *ctx; + struct rrdeng_page_descr *descr; + + handle = &rd->state->handle.rrdeng; + ctx = handle->ctx; + descr = handle->descr; + if (unlikely(NULL == descr)) { + return; + } + if (likely(descr->page_length)) { + int ret, page_is_empty; + +#ifdef NETDATA_INTERNAL_CHECKS + rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1); +#endif + if (handle->prev_descr) { + /* unpin old second page */ + pg_cache_put(ctx, handle->prev_descr); + } + page_is_empty = page_has_only_empty_metrics(descr); + if (page_is_empty) { + debug(D_RRDENGINE, "Page has empty metrics only, deleting:"); + if (unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr); + pg_cache_put(ctx, descr); + pg_cache_punch_hole(ctx, descr, 1); + handle->prev_descr = NULL; + } else { + /* added 1 extra reference to keep 2 dirty pages pinned per metric, expected refcnt = 2 */ + rrdeng_page_descr_mutex_lock(ctx, descr); + ret = pg_cache_try_get_unsafe(descr, 0); + rrdeng_page_descr_mutex_unlock(ctx, descr); + assert (1 == ret); + + rrdeng_commit_page(ctx, descr, handle->page_correlation_id); + handle->prev_descr = descr; + } + } else { + freez(descr->pg_cache_descr->page); + rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr); + freez(descr); + } + handle->descr = NULL; +} + void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number) { struct rrdeng_collect_handle *handle; struct rrdengine_instance *ctx; struct page_cache *pg_cache; - struct rrdeng_page_cache_descr *descr; + struct rrdeng_page_descr *descr; storage_number *page; + uint8_t must_flush_unaligned_page = 0, perfect_page_alignment = 0; handle = &rd->state->handle.rrdeng; ctx = handle->ctx; pg_cache = &ctx->pg_cache; descr = handle->descr; - if (unlikely(NULL == descr || descr->page_length + sizeof(number) > RRDENG_BLOCK_SIZE)) { - if (descr) { - descr->handle = NULL; - if (descr->page_length) { -#ifdef NETDATA_INTERNAL_CHECKS - rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1); -#endif - /* added 1 extra reference to keep 2 dirty pages pinned per metric, expected refcnt = 2 */ - ++descr->refcnt; - rrdeng_commit_page(ctx, descr, handle->page_correlation_id); - if (handle->prev_descr) { - /* unpin old second page */ - pg_cache_put(handle->prev_descr); - } - handle->prev_descr = descr; - } else { - free(descr->page); - free(descr); - handle->descr = NULL; - } + + if (descr) { + /* Make alignment decisions */ + + if (descr->page_length == rd->rrdset->rrddim_page_alignment) { + /* this is the leading dimension that defines chart alignment */ + perfect_page_alignment = 1; + } + /* is the metric far enough out of alignment with the others? */ + if (unlikely(descr->page_length + sizeof(number) < rd->rrdset->rrddim_page_alignment)) { + handle->unaligned_page = 1; + debug(D_RRDENGINE, "Metric page is not aligned with chart:"); + if (unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr); } - page = rrdeng_create_page(&handle->page_index->id, &descr); + if (unlikely(handle->unaligned_page && + /* did the other metrics change page? */ + rd->rrdset->rrddim_page_alignment <= sizeof(number))) { + debug(D_RRDENGINE, "Flushing unaligned metric page."); + must_flush_unaligned_page = 1; + handle->unaligned_page = 0; + } + } + if (unlikely(NULL == descr || + descr->page_length + sizeof(number) > RRDENG_BLOCK_SIZE || + must_flush_unaligned_page)) { + rrdeng_store_metric_flush_current_page(rd); + + page = rrdeng_create_page(ctx, &handle->page_index->id, &descr); assert(page); - handle->prev_descr = handle->descr; + handle->descr = descr; - descr->handle = handle; + uv_rwlock_wrlock(&pg_cache->commited_page_index.lock); handle->page_correlation_id = pg_cache->commited_page_index.latest_corr_id++; uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock); - } - page = descr->page; + if (0 == rd->rrdset->rrddim_page_alignment) { + /* this is the leading dimension that defines chart alignment */ + perfect_page_alignment = 1; + } + } + page = descr->pg_cache_descr->page; page[descr->page_length / sizeof(number)] = number; descr->end_time = point_in_time; descr->page_length += sizeof(number); + if (perfect_page_alignment) + rd->rrdset->rrddim_page_alignment = descr->page_length; if (unlikely(INVALID_TIME == descr->start_time)) { descr->start_time = point_in_time; @@ -126,26 +208,13 @@ void rrdeng_store_metric_finalize(RRDDIM *rd) { struct rrdeng_collect_handle *handle; struct rrdengine_instance *ctx; - struct rrdeng_page_cache_descr *descr; handle = &rd->state->handle.rrdeng; ctx = handle->ctx; - descr = handle->descr; - if (descr) { - descr->handle = NULL; - if (descr->page_length) { -#ifdef NETDATA_INTERNAL_CHECKS - rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1); -#endif - rrdeng_commit_page(ctx, descr, handle->page_correlation_id); - if (handle->prev_descr) { - /* unpin old second page */ - pg_cache_put(handle->prev_descr); - } - } else { - free(descr->page); - free(descr); - } + rrdeng_store_metric_flush_current_page(rd); + if (handle->prev_descr) { + /* unpin old second page */ + pg_cache_put(ctx, handle->prev_descr); } } @@ -174,7 +243,7 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle { struct rrdeng_query_handle *handle; struct rrdengine_instance *ctx; - struct rrdeng_page_cache_descr *descr; + struct rrdeng_page_descr *descr; storage_number *page, ret; unsigned position; usec_t point_in_time; @@ -198,7 +267,7 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle #ifdef NETDATA_INTERNAL_CHECKS rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1); #endif - pg_cache_put(descr); + pg_cache_put(ctx, descr); handle->descr = NULL; } descr = pg_cache_lookup(ctx, handle->page_index, &handle->page_index->id, point_in_time); @@ -216,7 +285,7 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle ret = SN_EMPTY_SLOT; goto out; } - page = descr->page; + page = descr->pg_cache_descr->page; if (unlikely(descr->start_time == descr->end_time)) { ret = page[0]; goto out; @@ -248,7 +317,7 @@ void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle) { struct rrdeng_query_handle *handle; struct rrdengine_instance *ctx; - struct rrdeng_page_cache_descr *descr; + struct rrdeng_page_descr *descr; handle = &rrdimm_handle->rrdeng; ctx = handle->ctx; @@ -257,7 +326,7 @@ void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle) #ifdef NETDATA_INTERNAL_CHECKS rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1); #endif - pg_cache_put(descr); + pg_cache_put(ctx, descr); } } @@ -283,30 +352,32 @@ time_t rrdeng_metric_oldest_time(RRDDIM *rd) } /* Also gets a reference for the page */ -void *rrdeng_create_page(uuid_t *id, struct rrdeng_page_cache_descr **ret_descr) +void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr) { - struct rrdeng_page_cache_descr *descr; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; void *page; - int ret; - /* TODO: check maximum number of pages in page cache limit */ - page = mallocz(RRDENG_BLOCK_SIZE); /*TODO: add page size */ descr = pg_cache_create_descr(); - descr->page = page; descr->id = id; /* TODO: add page type: metric, log, something? */ - descr->flags = RRD_PAGE_DIRTY /*| RRD_PAGE_LOCKED */ | RRD_PAGE_POPULATED /* | BEING_COLLECTED */; - descr->refcnt = 1; - - debug(D_RRDENGINE, "-----------------\nCreated new page:\n-----------------"); - if(unlikely(debug_flags & D_RRDENGINE)) + page = mallocz(RRDENG_BLOCK_SIZE); /*TODO: add page size */ + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + pg_cache_descr->page = page; + pg_cache_descr->flags = RRD_PAGE_DIRTY /*| RRD_PAGE_LOCKED */ | RRD_PAGE_POPULATED /* | BEING_COLLECTED */; + pg_cache_descr->refcnt = 1; + + debug(D_RRDENGINE, "Created new page:"); + if (unlikely(debug_flags & D_RRDENGINE)) print_page_cache_descr(descr); + rrdeng_page_descr_mutex_unlock(ctx, descr); *ret_descr = descr; return page; } /* The page must not be empty */ -void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr, +void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, Word_t page_correlation_id) { struct page_cache *pg_cache = &ctx->pg_cache; @@ -324,15 +395,16 @@ void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_cache ++pg_cache->commited_page_index.nr_commited_pages; uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock); - pg_cache_put(descr); + pg_cache_put(ctx, descr); } /* Gets a reference for the page */ void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle) { - struct rrdeng_page_cache_descr *descr; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; - debug(D_RRDENGINE, "----------------------\nReading existing page:\n----------------------"); + debug(D_RRDENGINE, "Reading existing page:"); descr = pg_cache_lookup(ctx, NULL, id, INVALID_TIME); if (NULL == descr) { *handle = NULL; @@ -340,16 +412,18 @@ void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void ** return NULL; } *handle = descr; + pg_cache_descr = descr->pg_cache_descr; - return descr->page; + return pg_cache_descr->page; } /* Gets a reference for the page */ void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle) { - struct rrdeng_page_cache_descr *descr; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; - debug(D_RRDENGINE, "----------------------\nReading existing page:\n----------------------"); + debug(D_RRDENGINE, "Reading existing page:"); descr = pg_cache_lookup(ctx, NULL, id, point_in_time); if (NULL == descr) { *handle = NULL; @@ -357,11 +431,18 @@ void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_i return NULL; } *handle = descr; + pg_cache_descr = descr->pg_cache_descr; - return descr->page; + return pg_cache_descr->page; } -void rrdeng_get_27_statistics(struct rrdengine_instance *ctx, unsigned long long *array) +/* + * Gathers Database Engine statistics. + * Careful when modifying this function. + * You must not change the indices of the statistics or user code will break. + * You must not exceed RRDENG_NR_STATS or it will crash. + */ +void rrdeng_get_33_statistics(struct rrdengine_instance *ctx, unsigned long long *array) { struct page_cache *pg_cache = &ctx->pg_cache; @@ -392,24 +473,46 @@ void rrdeng_get_27_statistics(struct rrdengine_instance *ctx, unsigned long long array[24] = (uint64_t)ctx->stats.datafile_deletions; array[25] = (uint64_t)ctx->stats.journalfile_creations; array[26] = (uint64_t)ctx->stats.journalfile_deletions; + array[27] = (uint64_t)ctx->stats.page_cache_descriptors; + array[28] = (uint64_t)ctx->stats.io_errors; + array[29] = (uint64_t)ctx->stats.fs_errors; + array[30] = (uint64_t)global_io_errors; + array[31] = (uint64_t)global_fs_errors; + array[32] = (uint64_t)rrdeng_reserved_file_descriptors; + assert(RRDENG_NR_STATS == 33); } /* Releases reference to page */ void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle) { (void)ctx; - pg_cache_put((struct rrdeng_page_cache_descr *)handle); + pg_cache_put(ctx, (struct rrdeng_page_descr *)handle); } /* - * Returns 0 on success, 1 on error + * Returns 0 on success, negative on error */ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb, unsigned disk_space_mb) { struct rrdengine_instance *ctx; int error; + uint32_t max_open_files; sanity_check(); + + max_open_files = rlimit_nofile.rlim_cur / 4; + + /* reserve RRDENG_FD_BUDGET_PER_INSTANCE file descriptors for this instance */ + rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, RRDENG_FD_BUDGET_PER_INSTANCE); + if (rrdeng_reserved_file_descriptors > max_open_files) { + error("Exceeded the budget of available file descriptors (%u/%u), cannot create new dbengine instance.", + (unsigned)rrdeng_reserved_file_descriptors, (unsigned)max_open_files); + + rrd_stat_atomic_add(&global_fs_errors, 1); + rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE); + return UV_EMFILE; + } + if (NULL == ctxp) { /* for testing */ ctx = &default_global_ctx; @@ -417,10 +520,6 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p } else { *ctxp = ctx = callocz(1, sizeof(*ctx)); } - if (ctx->rrdengine_state != RRDENGINE_STATUS_UNINITIALIZED) { - return 1; - } - ctx->rrdengine_state = RRDENGINE_STATUS_INITIALIZING; ctx->global_compress_alg = RRD_LZ4; if (page_cache_mb < RRDENG_MIN_PAGE_CACHE_SIZE_MB) page_cache_mb = RRDENG_MIN_PAGE_CACHE_SIZE_MB; @@ -439,11 +538,7 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p init_commit_log(ctx); error = init_rrd_files(ctx); if (error) { - ctx->rrdengine_state = RRDENGINE_STATUS_UNINITIALIZED; - if (ctx != &default_global_ctx) { - freez(ctx); - } - return 1; + goto error_after_init_rrd_files; } init_completion(&ctx->rrdengine_completion); @@ -451,9 +546,21 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p /* wait for worker thread to initialize */ wait_for_completion(&ctx->rrdengine_completion); destroy_completion(&ctx->rrdengine_completion); - - ctx->rrdengine_state = RRDENGINE_STATUS_INITIALIZED; + if (ctx->worker_config.error) { + goto error_after_rrdeng_worker; + } return 0; + +error_after_rrdeng_worker: + finalize_rrd_files(ctx); +error_after_init_rrd_files: + free_page_cache(ctx); + if (ctx != &default_global_ctx) { + freez(ctx); + *ctxp = NULL; + } + rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE); + return UV_EIO; } /* @@ -464,10 +571,6 @@ int rrdeng_exit(struct rrdengine_instance *ctx) struct rrdeng_cmd cmd; if (NULL == ctx) { - /* TODO: move to per host basis */ - ctx = &default_global_ctx; - } - if (ctx->rrdengine_state != RRDENGINE_STATUS_INITIALIZED) { return 1; } @@ -477,8 +580,12 @@ int rrdeng_exit(struct rrdengine_instance *ctx) assert(0 == uv_thread_join(&ctx->worker_config.thread)); + finalize_rrd_files(ctx); + free_page_cache(ctx); + if (ctx != &default_global_ctx) { freez(ctx); } + rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE); return 0; }
\ No newline at end of file |