diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2021-02-07 11:45:55 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2021-02-07 11:45:55 +0000 |
commit | a8220ab2d293bb7f4b014b79d16b2fb05090fa93 (patch) | |
tree | 77f0a30f016c0925cf7ee9292e644bba183c2774 /database/engine/rrdengineapi.c | |
parent | Adding upstream version 1.19.0. (diff) | |
download | netdata-a8220ab2d293bb7f4b014b79d16b2fb05090fa93.tar.xz netdata-a8220ab2d293bb7f4b014b79d16b2fb05090fa93.zip |
Adding upstream version 1.29.0.upstream/1.29.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'database/engine/rrdengineapi.c')
-rwxr-xr-x[-rw-r--r--] | database/engine/rrdengineapi.c | 353 |
1 files changed, 257 insertions, 96 deletions
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c index baf4a9973..7b2ff5b72 100644..100755 --- a/database/engine/rrdengineapi.c +++ b/database/engine/rrdengineapi.c @@ -2,65 +2,148 @@ #include "rrdengine.h" /* Default global database instance */ -static struct rrdengine_instance default_global_ctx; +struct rrdengine_instance multidb_ctx; int default_rrdeng_page_cache_mb = 32; -int default_rrdeng_disk_quota_mb = RRDENG_MIN_DISK_SPACE_MB; +int default_rrdeng_disk_quota_mb = 256; +int default_multidb_disk_quota_mb = 256; +/* Default behaviour is to unblock data collection if the page cache is full of dirty pages by dropping metrics */ +uint8_t rrdeng_drop_metrics_under_page_cache_pressure = 1; -/* - * Gets a handle for storing metrics to the database. - * The handle must be released with rrdeng_store_metric_final(). - */ -void rrdeng_store_metric_init(RRDDIM *rd) +static inline struct rrdengine_instance *get_rrdeng_ctx_from_host(RRDHOST *host) +{ + return host->rrdeng_ctx; +} + +/* This UUID is not unique across hosts */ +void rrdeng_generate_legacy_uuid(const char *dim_id, char *chart_id, uuid_t *ret_uuid) { - struct rrdeng_collect_handle *handle; - struct page_cache *pg_cache; - struct rrdengine_instance *ctx; - uuid_t temp_id; - Pvoid_t *PValue; - struct pg_cache_page_index *page_index; EVP_MD_CTX *evpctx; unsigned char hash_value[EVP_MAX_MD_SIZE]; unsigned int hash_len; - //&default_global_ctx; TODO: test this use case or remove it? + evpctx = EVP_MD_CTX_create(); + EVP_DigestInit_ex(evpctx, EVP_sha256(), NULL); + EVP_DigestUpdate(evpctx, dim_id, strlen(dim_id)); + EVP_DigestUpdate(evpctx, chart_id, strlen(chart_id)); + EVP_DigestFinal_ex(evpctx, hash_value, &hash_len); + EVP_MD_CTX_destroy(evpctx); + fatal_assert(hash_len > sizeof(uuid_t)); + memcpy(ret_uuid, hash_value, sizeof(uuid_t)); +} - ctx = rd->rrdset->rrdhost->rrdeng_ctx; - pg_cache = &ctx->pg_cache; - handle = &rd->state->handle.rrdeng; - handle->ctx = ctx; +/* Transform legacy UUID to be unique across hosts deterministacally */ +void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uuid_t *legacy_uuid, uuid_t *ret_uuid) +{ + EVP_MD_CTX *evpctx; + unsigned char hash_value[EVP_MAX_MD_SIZE]; + unsigned int hash_len; evpctx = EVP_MD_CTX_create(); EVP_DigestInit_ex(evpctx, EVP_sha256(), NULL); - EVP_DigestUpdate(evpctx, rd->id, strlen(rd->id)); - EVP_DigestUpdate(evpctx, rd->rrdset->id, strlen(rd->rrdset->id)); + EVP_DigestUpdate(evpctx, machine_guid, GUID_LEN); + EVP_DigestUpdate(evpctx, *legacy_uuid, sizeof(uuid_t)); EVP_DigestFinal_ex(evpctx, hash_value, &hash_len); EVP_MD_CTX_destroy(evpctx); - assert(hash_len > sizeof(temp_id)); - memcpy(&temp_id, hash_value, sizeof(temp_id)); + fatal_assert(hash_len > sizeof(uuid_t)); + memcpy(ret_uuid, hash_value, sizeof(uuid_t)); +} - handle->descr = NULL; - handle->prev_descr = NULL; - handle->unaligned_page = 0; +void rrdeng_metric_init(RRDDIM *rd, uuid_t *dim_uuid) +{ + struct page_cache *pg_cache; + struct rrdengine_instance *ctx; + uuid_t legacy_uuid; + uuid_t multihost_legacy_uuid; + Pvoid_t *PValue; + struct pg_cache_page_index *page_index = NULL; + int is_multihost_child = 0; + RRDHOST *host = rd->rrdset->rrdhost; + + ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost); + if (unlikely(!ctx)) { + error("Failed to fetch multidb context"); + return; + } + pg_cache = &ctx->pg_cache; + + rrdeng_generate_legacy_uuid(rd->id, rd->rrdset->id, &legacy_uuid); + rd->state->metric_uuid = dim_uuid; + if (host != localhost && host->rrdeng_ctx == &multidb_ctx) + is_multihost_child = 1; uv_rwlock_rdlock(&pg_cache->metrics_index.lock); - PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, &temp_id, sizeof(uuid_t)); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, &legacy_uuid, sizeof(uuid_t)); if (likely(NULL != PValue)) { page_index = *PValue; } uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); - if (NULL == PValue) { - /* First time we see the UUID */ - uv_rwlock_wrlock(&pg_cache->metrics_index.lock); - PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, &temp_id, sizeof(uuid_t), PJE0); - assert(NULL == *PValue); /* TODO: figure out concurrency model */ - *PValue = page_index = create_page_index(&temp_id); - page_index->prev = pg_cache->metrics_index.last_page_index; - pg_cache->metrics_index.last_page_index = page_index; - uv_rwlock_wrunlock(&pg_cache->metrics_index.lock); + if (is_multihost_child || NULL == PValue) { + /* First time we see the legacy UUID or metric belongs to child host in multi-host DB. + * Drop legacy support, normal path */ + + if (unlikely(!rd->state->metric_uuid)) + rd->state->metric_uuid = create_dimension_uuid(rd->rrdset, rd); + + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, rd->state->metric_uuid, sizeof(uuid_t)); + if (likely(NULL != PValue)) { + page_index = *PValue; + } + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + if (NULL == PValue) { + uv_rwlock_wrlock(&pg_cache->metrics_index.lock); + PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, rd->state->metric_uuid, sizeof(uuid_t), PJE0); + fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */ + *PValue = page_index = create_page_index(rd->state->metric_uuid); + page_index->prev = pg_cache->metrics_index.last_page_index; + pg_cache->metrics_index.last_page_index = page_index; + uv_rwlock_wrunlock(&pg_cache->metrics_index.lock); + } + } else { + /* There are legacy UUIDs in the database, implement backward compatibility */ + + rrdeng_convert_legacy_uuid_to_multihost(rd->rrdset->rrdhost->machine_guid, &legacy_uuid, + &multihost_legacy_uuid); + + if (unlikely(!rd->state->metric_uuid)) + rd->state->metric_uuid = mallocz(sizeof(uuid_t)); + + int need_to_store = (dim_uuid == NULL || uuid_compare(*rd->state->metric_uuid, multihost_legacy_uuid)); + + uuid_copy(*rd->state->metric_uuid, multihost_legacy_uuid); + + if (unlikely(need_to_store)) + (void)sql_store_dimension(rd->state->metric_uuid, rd->rrdset->chart_uuid, rd->id, rd->name, rd->multiplier, rd->divisor, + rd->algorithm); + } rd->state->rrdeng_uuid = &page_index->id; - handle->page_index = page_index; + rd->state->page_index = page_index; +} + +/* + * Gets a handle for storing metrics to the database. + * The handle must be released with rrdeng_store_metric_final(). + */ +void rrdeng_store_metric_init(RRDDIM *rd) +{ + struct rrdeng_collect_handle *handle; + struct rrdengine_instance *ctx; + struct pg_cache_page_index *page_index; + + ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost); + handle = &rd->state->handle.rrdeng; + handle->ctx = ctx; + + handle->descr = NULL; + handle->prev_descr = NULL; + handle->unaligned_page = 0; + + page_index = rd->state->page_index; + uv_rwlock_wrlock(&page_index->lock); + ++page_index->writers; + uv_rwlock_wrunlock(&page_index->lock); } /* The page must be populated and referenced */ @@ -88,12 +171,14 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd) handle = &rd->state->handle.rrdeng; ctx = handle->ctx; + if (unlikely(!ctx)) + return; descr = handle->descr; if (unlikely(NULL == descr)) { return; } if (likely(descr->page_length)) { - int ret, page_is_empty; + int page_is_empty; rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1); @@ -107,17 +192,21 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd) if (unlikely(debug_flags & D_RRDENGINE)) print_page_cache_descr(descr); pg_cache_put(ctx, descr); - pg_cache_punch_hole(ctx, descr, 1); + pg_cache_punch_hole(ctx, descr, 1, 0, NULL); handle->prev_descr = NULL; } else { + /* + * Disable pinning for now as it leads to deadlocks. When a collector stops collecting the extra pinned page + * eventually gets rotated but it cannot be destroyed due to the extra reference. + */ /* added 1 extra reference to keep 2 dirty pages pinned per metric, expected refcnt = 2 */ - rrdeng_page_descr_mutex_lock(ctx, descr); +/* rrdeng_page_descr_mutex_lock(ctx, descr); ret = pg_cache_try_get_unsafe(descr, 0); rrdeng_page_descr_mutex_unlock(ctx, descr); - assert (1 == ret); + fatal_assert(1 == ret);*/ rrdeng_commit_page(ctx, descr, handle->page_correlation_id); - handle->prev_descr = descr; + /* handle->prev_descr = descr;*/ } } else { freez(descr->pg_cache_descr->page); @@ -168,14 +257,12 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n must_flush_unaligned_page)) { rrdeng_store_metric_flush_current_page(rd); - page = rrdeng_create_page(ctx, &handle->page_index->id, &descr); - assert(page); + page = rrdeng_create_page(ctx, &rd->state->page_index->id, &descr); + fatal_assert(page); handle->descr = descr; - uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); - handle->page_correlation_id = pg_cache->committed_page_index.latest_corr_id++; - uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); + handle->page_correlation_id = rrd_atomic_fetch_add(&pg_cache->committed_page_index.latest_corr_id, 1); if (0 == rd->rrdset->rrddim_page_alignment) { /* this is the leading dimension that defines chart alignment */ @@ -189,30 +276,53 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n if (perfect_page_alignment) rd->rrdset->rrddim_page_alignment = descr->page_length; if (unlikely(INVALID_TIME == descr->start_time)) { + unsigned long new_metric_API_producers, old_metric_API_max_producers, ret_metric_API_max_producers; descr->start_time = point_in_time; - rrd_stat_atomic_add(&ctx->stats.metric_API_producers, 1); - pg_cache_insert(ctx, handle->page_index, descr); + new_metric_API_producers = rrd_atomic_add_fetch(&ctx->stats.metric_API_producers, 1); + while (unlikely(new_metric_API_producers > (old_metric_API_max_producers = ctx->metric_API_max_producers))) { + /* Increase ctx->metric_API_max_producers */ + ret_metric_API_max_producers = ulong_compare_and_swap(&ctx->metric_API_max_producers, + old_metric_API_max_producers, + new_metric_API_producers); + if (old_metric_API_max_producers == ret_metric_API_max_producers) { + /* success */ + break; + } + } + + pg_cache_insert(ctx, rd->state->page_index, descr); } else { - pg_cache_add_new_metric_time(handle->page_index, descr); + pg_cache_add_new_metric_time(rd->state->page_index, descr); } } /* * Releases the database reference from the handle for storing metrics. + * Returns 1 if it's safe to delete the dimension. */ -void rrdeng_store_metric_finalize(RRDDIM *rd) +int rrdeng_store_metric_finalize(RRDDIM *rd) { struct rrdeng_collect_handle *handle; struct rrdengine_instance *ctx; + struct pg_cache_page_index *page_index; + uint8_t can_delete_metric = 0; handle = &rd->state->handle.rrdeng; ctx = handle->ctx; + page_index = rd->state->page_index; rrdeng_store_metric_flush_current_page(rd); if (handle->prev_descr) { /* unpin old second page */ pg_cache_put(ctx, handle->prev_descr); } + uv_rwlock_wrlock(&page_index->lock); + if (!--page_index->writers && !page_index->page_count) { + can_delete_metric = 1; + } + uv_rwlock_wrunlock(&page_index->lock); + + return can_delete_metric; } /* Returns 1 if the data collection interval is well defined, 0 otherwise */ @@ -253,7 +363,7 @@ static inline uint32_t *pginfo_to_points(struct rrdeng_page_info *page_info) * @return number of regions with different data collection intervals. */ unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t end_time, - struct rrdeng_region_info **region_info_arrayp, unsigned *max_intervalp) + struct rrdeng_region_info **region_info_arrayp, unsigned *max_intervalp, struct context_param *context_param_list) { struct pg_cache_page_index *page_index; struct rrdengine_instance *ctx; @@ -266,15 +376,16 @@ unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t e struct rrdeng_region_info *region_info_array; uint8_t is_first_region_initialized; - ctx = st->rrdhost->rrdeng_ctx; + ctx = get_rrdeng_ctx_from_host(st->rrdhost); regions = 1; *max_intervalp = max_interval = 0; region_info_array = NULL; *region_info_arrayp = NULL; page_info_array = NULL; + RRDDIM *temp_rd = context_param_list ? context_param_list->rd : NULL; rrdset_rdlock(st); - for(rd_iter = st->dimensions, rd = NULL, min_time = (usec_t)-1 ; rd_iter ; rd_iter = rd_iter->next) { + for(rd_iter = temp_rd?temp_rd:st->dimensions, rd = NULL, min_time = (usec_t)-1 ; rd_iter ; rd_iter = rd_iter->next) { /* * Choose oldest dimension as reference. This is not equivalent to the union of all dimensions * but it is a best effort approximation with a bias towards older metrics in a chart. It @@ -316,7 +427,7 @@ unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t e continue; } page_entries = curr->page_length / sizeof(storage_number); - assert(0 != page_entries); + fatal_assert(0 != page_entries); if (likely(1 != page_entries)) { dt = (curr->end_time - curr->start_time) / (page_entries - 1); *pginfo_to_dt(curr) = ROUND_USEC_TO_SEC(dt); @@ -352,7 +463,7 @@ unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t e if (1 == page_points) first_valid_time_in_page = current_position_time; if (unlikely(!is_first_region_initialized)) { - assert(1 == regions); + fatal_assert(1 == regions); /* this is the first region */ region_info_array[0].start_time = current_position_time; is_first_region_initialized = 1; @@ -365,7 +476,7 @@ unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t e } if (unlikely(0 == *pginfo_to_dt(curr))) { /* unknown data collection interval */ - assert(1 == page_points); + fatal_assert(1 == page_points); if (likely(NULL != prev)) { /* get interval from previous page */ *pginfo_to_dt(curr) = *pginfo_to_dt(prev); @@ -428,7 +539,7 @@ void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_hand struct rrdengine_instance *ctx; unsigned pages_nr; - ctx = rd->rrdset->rrdhost->rrdeng_ctx; + ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost); rrdimm_handle->start_time = start_time; rrdimm_handle->end_time = end_time; handle = &rrdimm_handle->rrdeng; @@ -452,7 +563,7 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle struct rrdeng_page_descr *descr; storage_number *page, ret; unsigned position, entries; - usec_t next_page_time, current_position_time, page_end_time; + usec_t next_page_time = 0, current_position_time, page_end_time = 0; uint32_t page_length; handle = &rrdimm_handle->rrdeng; @@ -473,17 +584,18 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle /* We need to get a new page */ if (descr) { /* Drop old page's reference */ - handle->next_page_time = (page_end_time / USEC_PER_SEC) + 1; - if (unlikely(handle->next_page_time > rrdimm_handle->end_time)) { - goto no_more_metrics; - } - next_page_time = handle->next_page_time * USEC_PER_SEC; #ifdef NETDATA_INTERNAL_CHECKS rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1); #endif pg_cache_put(ctx, descr); handle->descr = NULL; + handle->next_page_time = (page_end_time / USEC_PER_SEC) + 1; + if (unlikely(handle->next_page_time > rrdimm_handle->end_time)) { + goto no_more_metrics; + } + next_page_time = handle->next_page_time * USEC_PER_SEC; } + descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id, next_page_time, rrdimm_handle->end_time * USEC_PER_SEC); if (NULL == descr) { @@ -520,7 +632,7 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle } handle->position = position; handle->now = current_position_time / USEC_PER_SEC; -/* assert(handle->now >= rrdimm_handle->start_time && handle->now <= rrdimm_handle->end_time); +/* fatal_assert(handle->now >= rrdimm_handle->start_time && handle->now <= rrdimm_handle->end_time); The above assertion is an approximation and needs to take update_every into account */ if (unlikely(handle->now >= rrdimm_handle->end_time)) { /* next calls will not load any more metrics */ @@ -564,21 +676,17 @@ void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle) time_t rrdeng_metric_latest_time(RRDDIM *rd) { - struct rrdeng_collect_handle *handle; struct pg_cache_page_index *page_index; - handle = &rd->state->handle.rrdeng; - page_index = handle->page_index; + page_index = rd->state->page_index; return page_index->latest_time / USEC_PER_SEC; } time_t rrdeng_metric_oldest_time(RRDDIM *rd) { - struct rrdeng_collect_handle *handle; struct pg_cache_page_index *page_index; - handle = &rd->state->handle.rrdeng; - page_index = handle->page_index; + page_index = rd->state->page_index; return page_index->oldest_time / USEC_PER_SEC; } @@ -620,7 +728,7 @@ void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr debug(D_RRDENGINE, "%s: page descriptor is NULL, page has already been force-committed.", __func__); return; } - assert(descr->page_length); + fatal_assert(descr->page_length); uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); PValue = JudyLIns(&pg_cache->committed_page_index.JudyL_array, page_correlation_id, PJE0); @@ -628,16 +736,27 @@ void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr nr_committed_pages = ++pg_cache->committed_page_index.nr_committed_pages; uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); - if (nr_committed_pages >= (pg_cache_hard_limit(ctx) - (unsigned long)ctx->stats.metric_API_producers) / 2) { - /* 50% of pages have not been committed yet */ - if (0 == (unsigned long)ctx->stats.flushing_errors) { - /* only print the first time */ - error("Failed to flush quickly enough in dbengine instance \"%s\"" - ". Metric data will not be stored in the database" - ", please reduce disk load or use a faster disk.", ctx->dbfiles_path); + if (nr_committed_pages >= pg_cache_hard_limit(ctx) / 2) { + /* over 50% of pages have not been committed yet */ + + if (ctx->drop_metrics_under_page_cache_pressure && + nr_committed_pages >= pg_cache_committed_hard_limit(ctx)) { + /* 100% of pages are dirty */ + struct rrdeng_cmd cmd; + + cmd.opcode = RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE; + rrdeng_enq_cmd(&ctx->worker_config, &cmd); + } else { + if (0 == (unsigned long) ctx->stats.pg_cache_over_half_dirty_events) { + /* only print the first time */ + errno = 0; + error("Failed to flush dirty buffers quickly enough in dbengine instance \"%s\". " + "Metric data at risk of not being stored in the database, " + "please reduce disk load or use a faster disk.", ctx->dbfiles_path); + } + rrd_stat_atomic_add(&ctx->stats.pg_cache_over_half_dirty_events, 1); + rrd_stat_atomic_add(&global_pg_cache_over_half_dirty_events, 1); } - rrd_stat_atomic_add(&ctx->stats.flushing_errors, 1); - rrd_stat_atomic_add(&global_flushing_errors, 1); } pg_cache_put(ctx, descr); @@ -687,8 +806,11 @@ void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_i * You must not change the indices of the statistics or user code will break. * You must not exceed RRDENG_NR_STATS or it will crash. */ -void rrdeng_get_35_statistics(struct rrdengine_instance *ctx, unsigned long long *array) +void rrdeng_get_37_statistics(struct rrdengine_instance *ctx, unsigned long long *array) { + if (ctx == NULL) + return; + struct page_cache *pg_cache = &ctx->pg_cache; array[0] = (uint64_t)ctx->stats.metric_API_producers; @@ -724,9 +846,11 @@ void rrdeng_get_35_statistics(struct rrdengine_instance *ctx, unsigned long long array[30] = (uint64_t)global_io_errors; array[31] = (uint64_t)global_fs_errors; array[32] = (uint64_t)rrdeng_reserved_file_descriptors; - array[33] = (uint64_t)ctx->stats.flushing_errors; - array[34] = (uint64_t)global_flushing_errors; - assert(RRDENG_NR_STATS == 35); + array[33] = (uint64_t)ctx->stats.pg_cache_over_half_dirty_events; + array[34] = (uint64_t)global_pg_cache_over_half_dirty_events; + array[35] = (uint64_t)ctx->stats.flushing_pressure_page_deletions; + array[36] = (uint64_t)global_flushing_pressure_page_deletions; + fatal_assert(RRDENG_NR_STATS == 37); } /* Releases reference to page */ @@ -739,21 +863,21 @@ void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle) /* * Returns 0 on success, negative on error */ -int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb, unsigned disk_space_mb) +int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb, + unsigned disk_space_mb) { struct rrdengine_instance *ctx; int error; uint32_t max_open_files; - sanity_check(); - max_open_files = rlimit_nofile.rlim_cur / 4; /* reserve RRDENG_FD_BUDGET_PER_INSTANCE file descriptors for this instance */ rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, RRDENG_FD_BUDGET_PER_INSTANCE); if (rrdeng_reserved_file_descriptors > max_open_files) { - error("Exceeded the budget of available file descriptors (%u/%u), cannot create new dbengine instance.", - (unsigned)rrdeng_reserved_file_descriptors, (unsigned)max_open_files); + error( + "Exceeded the budget of available file descriptors (%u/%u), cannot create new dbengine instance.", + (unsigned)rrdeng_reserved_file_descriptors, (unsigned)max_open_files); rrd_stat_atomic_add(&global_fs_errors, 1); rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE); @@ -761,8 +885,7 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p } if (NULL == ctxp) { - /* for testing */ - ctx = &default_global_ctx; + ctx = &multidb_ctx; memset(ctx, 0, sizeof(*ctx)); } else { *ctxp = ctx = callocz(1, sizeof(*ctx)); @@ -778,6 +901,16 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p ctx->max_disk_space = disk_space_mb * 1048576LLU; strncpyz(ctx->dbfiles_path, dbfiles_path, sizeof(ctx->dbfiles_path) - 1); ctx->dbfiles_path[sizeof(ctx->dbfiles_path) - 1] = '\0'; + if (NULL == host) + strncpyz(ctx->machine_guid, registry_get_this_machine_guid(), GUID_LEN); + else + strncpyz(ctx->machine_guid, host->machine_guid, GUID_LEN); + + ctx->drop_metrics_under_page_cache_pressure = rrdeng_drop_metrics_under_page_cache_pressure; + ctx->metric_API_max_producers = 0; + ctx->quiesce = NO_QUIESCE; + ctx->metalog_ctx = NULL; /* only set this after the metadata log has finished initializing */ + ctx->host = host; memset(&ctx->worker_config, 0, sizeof(ctx->worker_config)); ctx->worker_config.ctx = ctx; @@ -789,20 +922,27 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p } init_completion(&ctx->rrdengine_completion); - assert(0 == uv_thread_create(&ctx->worker_config.thread, rrdeng_worker, &ctx->worker_config)); + fatal_assert(0 == uv_thread_create(&ctx->worker_config.thread, rrdeng_worker, &ctx->worker_config)); /* wait for worker thread to initialize */ wait_for_completion(&ctx->rrdengine_completion); destroy_completion(&ctx->rrdengine_completion); + uv_thread_set_name_np(ctx->worker_config.thread, "DBENGINE"); if (ctx->worker_config.error) { goto error_after_rrdeng_worker; } + error = metalog_init(ctx); + if (error) { + error("Failed to initialize metadata log file event loop."); + goto error_after_rrdeng_worker; + } + return 0; error_after_rrdeng_worker: finalize_rrd_files(ctx); error_after_init_rrd_files: free_page_cache(ctx); - if (ctx != &default_global_ctx) { + if (ctx != &multidb_ctx) { freez(ctx); *ctxp = NULL; } @@ -825,14 +965,35 @@ int rrdeng_exit(struct rrdengine_instance *ctx) cmd.opcode = RRDENG_SHUTDOWN; rrdeng_enq_cmd(&ctx->worker_config, &cmd); - assert(0 == uv_thread_join(&ctx->worker_config.thread)); + fatal_assert(0 == uv_thread_join(&ctx->worker_config.thread)); finalize_rrd_files(ctx); + //metalog_exit(ctx->metalog_ctx); free_page_cache(ctx); - if (ctx != &default_global_ctx) { + if (ctx != &multidb_ctx) { freez(ctx); } rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE); return 0; -}
\ No newline at end of file +} + +void rrdeng_prepare_exit(struct rrdengine_instance *ctx) +{ + struct rrdeng_cmd cmd; + + if (NULL == ctx) { + return; + } + + init_completion(&ctx->rrdengine_completion); + cmd.opcode = RRDENG_QUIESCE; + rrdeng_enq_cmd(&ctx->worker_config, &cmd); + + /* wait for dbengine to quiesce */ + wait_for_completion(&ctx->rrdengine_completion); + destroy_completion(&ctx->rrdengine_completion); + + //metalog_prepare_exit(ctx->metalog_ctx); +} + |