diff options
Diffstat (limited to '')
-rw-r--r-- | database/engine/pagecache.c | 9 | ||||
-rw-r--r-- | database/engine/pagecache.h | 1 | ||||
-rw-r--r-- | database/engine/rrdengine.c | 36 | ||||
-rw-r--r-- | database/engine/rrdengine.h | 2 | ||||
-rwxr-xr-x | database/engine/rrdengineapi.c | 98 | ||||
-rw-r--r-- | database/engine/rrdengineapi.h | 10 |
6 files changed, 92 insertions, 64 deletions
diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c index d65cb35a5..4f5da7084 100644 --- a/database/engine/pagecache.c +++ b/database/engine/pagecache.c @@ -524,6 +524,14 @@ uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_d } rrdeng_page_descr_mutex_unlock(ctx, descr); + while (unlikely(pg_cache_descr->flags & RRD_PAGE_READ_PENDING)) { + error_limit_static_global_var(erl, 1, 0); + error_limit(&erl, "%s: Found page with READ PENDING, waiting for read to complete", __func__); + if (unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr, "", true); + pg_cache_wait_event_unsafe(descr); + } + if (pg_cache_descr->flags & RRD_PAGE_POPULATED) { /* only after locking can it be safely deleted from LRU */ pg_cache_replaceQ_delete(ctx, descr); @@ -1196,7 +1204,6 @@ struct pg_cache_page_index *create_page_index(uuid_t *id, struct rrdengine_insta page_index->refcount = 0; page_index->writers = 0; page_index->ctx = ctx; - page_index->alignment = NULL; page_index->latest_update_every_s = default_rrd_update_every; return page_index; diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h index 2f4d6b332..635b02123 100644 --- a/database/engine/pagecache.h +++ b/database/engine/pagecache.h @@ -112,7 +112,6 @@ struct pg_cache_page_index { usec_t latest_time_ut; struct rrdengine_instance *ctx; - struct pg_alignment *alignment; uint32_t latest_update_every_s; struct pg_cache_page_index *prev; diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index e4cd37e98..a6840f38c 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -272,9 +272,19 @@ static void fill_page_with_nulls(void *page, uint32_t page_length, uint8_t type) } } +struct rrdeng_page_descr *get_descriptor(struct pg_cache_page_index *page_index, time_t start_time_s) +{ + uv_rwlock_rdlock(&page_index->lock); + Pvoid_t *PValue = JudyLGet(page_index->JudyL_array, start_time_s, PJE0); + struct rrdeng_page_descr *descr = unlikely(NULL == PValue) ? NULL : *PValue; + uv_rwlock_rdunlock(&page_index->lock); + return descr; +}; + static void do_extent_processing (struct rrdengine_worker_config *wc, struct extent_io_descriptor *xt_io_descr, bool read_failed) { struct rrdengine_instance *ctx = wc->ctx; + struct page_cache *pg_cache = &ctx->pg_cache; struct rrdeng_page_descr *descr; struct page_cache_descr *pg_cache_descr; int ret; @@ -365,19 +375,30 @@ after_crc_check: } } + uv_rwlock_rdlock(&pg_cache->metrics_index.lock); + Pvoid_t *PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, xt_io_descr->descr_array[0]->id, sizeof(uuid_t)); + struct pg_cache_page_index *page_index = likely( NULL != PValue) ? *PValue : NULL; + uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + + for (i = 0, page_offset = 0; i < count; page_offset += header->descr[i++].page_length) { uint8_t is_prefetched_page; descr = NULL; for (j = 0 ; j < xt_io_descr->descr_count; ++j) { - struct rrdeng_page_descr *descrj; + struct rrdeng_page_descr descrj; - descrj = xt_io_descr->descr_array[j]; + descrj = xt_io_descr->descr_read_array[j]; /* care, we don't hold the descriptor mutex */ - if (!uuid_compare(*(uuid_t *) header->descr[i].uuid, *descrj->id) && - header->descr[i].page_length == descrj->page_length && - header->descr[i].start_time_ut == descrj->start_time_ut && - header->descr[i].end_time_ut == descrj->end_time_ut) { - descr = descrj; + if (!uuid_compare(*(uuid_t *) header->descr[i].uuid, *descrj.id) && + header->descr[i].page_length == descrj.page_length && + header->descr[i].start_time_ut == descrj.start_time_ut && + header->descr[i].end_time_ut == descrj.end_time_ut) { + //descr = descrj; + descr = get_descriptor(page_index, (time_t) (descrj.start_time_ut / USEC_PER_SEC)); + if (unlikely(!descr)) { + error_limit_static_thread_var(erl, 1, 0); + error_limit(&erl, "%s: Required descriptor is not in the page index anymore", __FUNCTION__); + } break; } } @@ -506,6 +527,7 @@ static void do_read_extent(struct rrdengine_worker_config* wc, pg_cache_descr->flags |= RRD_PAGE_READ_PENDING; rrdeng_page_descr_mutex_unlock(ctx, descr[i]); xt_io_descr->descr_array[i] = descr[i]; + xt_io_descr->descr_read_array[i] = *(descr[i]); } xt_io_descr->descr_count = count; xt_io_descr->file = datafile->file; diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h index fedadbe86..521d2521a 100644 --- a/database/engine/rrdengine.h +++ b/database/engine/rrdengine.h @@ -41,6 +41,7 @@ struct rrdeng_collect_handle { unsigned long page_correlation_id; // set to 1 when this dimension is not page aligned with the other dimensions in the chart uint8_t unaligned_page; + struct pg_alignment *alignment; }; struct rrdeng_query_handle { @@ -117,6 +118,7 @@ struct extent_io_descriptor { unsigned descr_count; int release_descr; struct rrdeng_page_descr *descr_array[MAX_PAGES_PER_EXTENT]; + struct rrdeng_page_descr descr_read_array[MAX_PAGES_PER_EXTENT]; Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT]; struct extent_io_descriptor *next; /* multiple requests to be served by the same cached extent */ }; diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c index 27503baee..4525b041f 100755 --- a/database/engine/rrdengineapi.c +++ b/database/engine/rrdengineapi.c @@ -39,21 +39,35 @@ uint8_t rrdeng_drop_metrics_under_page_cache_pressure = 1; // ---------------------------------------------------------------------------- // metrics groups +static inline void rrdeng_page_alignment_acquire(struct pg_alignment *pa) { + if(unlikely(!pa)) return; + __atomic_add_fetch(&pa->refcount, 1, __ATOMIC_SEQ_CST); +} + +static inline bool rrdeng_page_alignment_release(struct pg_alignment *pa) { + if(unlikely(!pa)) return true; + + if(__atomic_sub_fetch(&pa->refcount, 1, __ATOMIC_SEQ_CST) == 0) { + freez(pa); + return true; + } + + return false; +} + +// charts call this STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *db_instance __maybe_unused, uuid_t *uuid __maybe_unused) { - return callocz(1, sizeof(struct pg_alignment)); + struct pg_alignment *pa = callocz(1, sizeof(struct pg_alignment)); + rrdeng_page_alignment_acquire(pa); + return (STORAGE_METRICS_GROUP *)pa; } -void rrdeng_metrics_group_release(STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg) { - if(!smg) return; +// charts call this +void rrdeng_metrics_group_release(STORAGE_INSTANCE *db_instance __maybe_unused, STORAGE_METRICS_GROUP *smg) { + if(unlikely(!smg)) return; - struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; struct pg_alignment *pa = (struct pg_alignment *)smg; - struct page_cache *pg_cache = &ctx->pg_cache; - - uv_rwlock_rdlock(&pg_cache->metrics_index.lock); - if(pa->refcount == 0) - freez(pa); - uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + rrdeng_page_alignment_release(pa); } // ---------------------------------------------------------------------------- @@ -93,10 +107,10 @@ void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uu memcpy(ret_uuid, hash_value, sizeof(uuid_t)); } -STORAGE_METRIC_HANDLE *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const char *rd_id, const char *st_id, STORAGE_METRICS_GROUP *smg) { +STORAGE_METRIC_HANDLE *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const char *rd_id, const char *st_id) { uuid_t legacy_uuid; rrdeng_generate_legacy_uuid(rd_id, st_id, &legacy_uuid); - return rrdeng_metric_get(db_instance, &legacy_uuid, smg); + return rrdeng_metric_get(db_instance, &legacy_uuid); } // ---------------------------------------------------------------------------- @@ -105,11 +119,7 @@ STORAGE_METRIC_HANDLE *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, c void rrdeng_metric_release(STORAGE_METRIC_HANDLE *db_metric_handle) { struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; - unsigned short refcount = __atomic_sub_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST); - if(refcount == 0 && page_index->alignment) { - __atomic_sub_fetch(&page_index->alignment->refcount, 1, __ATOMIC_SEQ_CST); - page_index->alignment = NULL; - } + __atomic_sub_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST); } STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *db_metric_handle) { @@ -118,9 +128,8 @@ STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *db_metric_handle return db_metric_handle; } -STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid, STORAGE_METRICS_GROUP *smg) { +STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid) { struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; - struct pg_alignment *pa = (struct pg_alignment *)smg; struct page_cache *pg_cache = &ctx->pg_cache; struct pg_cache_page_index *page_index = NULL; @@ -130,27 +139,16 @@ STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t * page_index = *PValue; uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); - if (likely(page_index)) { + if (likely(page_index)) __atomic_add_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST); - if(pa) { - if(page_index->alignment && page_index->alignment != pa && page_index->writers > 0) - fatal("DBENGINE: page_index has a different alignment (page_index refcount is %u, writers is %u).", - page_index->refcount, page_index->writers); - - page_index->alignment = pa; - __atomic_add_fetch(&pa->refcount, 1, __ATOMIC_SEQ_CST); - } - } - return (STORAGE_METRIC_HANDLE *)page_index; } -STORAGE_METRIC_HANDLE *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid, STORAGE_METRICS_GROUP *smg) { +STORAGE_METRIC_HANDLE *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid) { internal_fatal(!db_instance, "DBENGINE: db_instance is NULL"); struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; - struct pg_alignment *pa = (struct pg_alignment *)smg; struct pg_cache_page_index *page_index; struct page_cache *pg_cache = &ctx->pg_cache; @@ -160,28 +158,25 @@ STORAGE_METRIC_HANDLE *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_ *PValue = page_index = create_page_index(uuid, ctx); page_index->prev = pg_cache->metrics_index.last_page_index; pg_cache->metrics_index.last_page_index = page_index; - page_index->alignment = pa; page_index->refcount = 1; - if(pa) - pa->refcount++; uv_rwlock_wrunlock(&pg_cache->metrics_index.lock); return (STORAGE_METRIC_HANDLE *)page_index; } -STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg) { +STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance) { STORAGE_METRIC_HANDLE *db_metric_handle; - db_metric_handle = rrdeng_metric_get(db_instance, &rd->metric_uuid, smg); + db_metric_handle = rrdeng_metric_get(db_instance, &rd->metric_uuid); if(!db_metric_handle) { - db_metric_handle = rrdeng_metric_get_legacy(db_instance, rrddim_id(rd), rrdset_id(rd->rrdset), smg); + db_metric_handle = rrdeng_metric_get_legacy(db_instance, rrddim_id(rd), rrdset_id(rd->rrdset)); if(db_metric_handle) { struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; uuid_copy(rd->metric_uuid, page_index->id); } } if(!db_metric_handle) - db_metric_handle = rrdeng_metric_create(db_instance, &rd->metric_uuid, smg); + db_metric_handle = rrdeng_metric_create(db_instance, &rd->metric_uuid); #ifdef NETDATA_INTERNAL_CHECKS struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; @@ -210,19 +205,19 @@ STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE * Gets a handle for storing metrics to the database. * The handle must be released with rrdeng_store_metric_final(). */ -STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every) { +STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg) { struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle; struct rrdeng_collect_handle *handle; - if(!page_index->alignment) - fatal("DBENGINE: metric group is required for collect operations"); - handle = callocz(1, sizeof(struct rrdeng_collect_handle)); handle->page_index = page_index; handle->descr = NULL; handle->unaligned_page = 0; page_index->latest_update_every_s = update_every; + handle->alignment = (struct pg_alignment *)smg; + rrdeng_page_alignment_acquire(handle->alignment); + uv_rwlock_wrlock(&page_index->lock); ++page_index->writers; uv_rwlock_wrunlock(&page_index->lock); @@ -331,18 +326,18 @@ static void rrdeng_store_metric_next_internal(STORAGE_COLLECT_HANDLE *collection } #endif - if (descr->page_length == page_index->alignment->page_length) { + if (descr->page_length == handle->alignment->page_length) { /* this is the leading dimension that defines chart alignment */ perfect_page_alignment = 1; } /* is the metric far enough out of alignment with the others? */ - if (unlikely(descr->page_length + PAGE_POINT_SIZE_BYTES(descr) < page_index->alignment->page_length)) { + if (unlikely(descr->page_length + PAGE_POINT_SIZE_BYTES(descr) < handle->alignment->page_length)) { handle->unaligned_page = 1; print_page_cache_descr(descr, "Metric page is not aligned with chart", true); } if (unlikely(handle->unaligned_page && /* did the other metrics change page? */ - page_index->alignment->page_length <= PAGE_POINT_SIZE_BYTES(descr))) { + handle->alignment->page_length <= PAGE_POINT_SIZE_BYTES(descr))) { print_page_cache_descr(descr, "must_flush_unaligned_page = 1", true); must_flush_unaligned_page = 1; handle->unaligned_page = 0; @@ -365,7 +360,7 @@ static void rrdeng_store_metric_next_internal(STORAGE_COLLECT_HANDLE *collection handle->page_correlation_id = rrd_atomic_fetch_add(&pg_cache->committed_page_index.latest_corr_id, 1); - if (0 == page_index->alignment->page_length) { + if (0 == handle->alignment->page_length) { /* this is the leading dimension that defines chart alignment */ perfect_page_alignment = 1; } @@ -403,7 +398,7 @@ static void rrdeng_store_metric_next_internal(STORAGE_COLLECT_HANDLE *collection pg_cache_atomic_set_pg_info(descr, point_in_time_ut, descr->page_length + PAGE_POINT_SIZE_BYTES(descr)); if (perfect_page_alignment) - page_index->alignment->page_length = descr->page_length; + handle->alignment->page_length = descr->page_length; if (unlikely(INVALID_TIME == descr->start_time_ut)) { unsigned long new_metric_API_producers, old_metric_API_max_producers, ret_metric_API_max_producers; descr->start_time_ut = point_in_time_ut; @@ -530,10 +525,13 @@ int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) { rrdeng_store_metric_flush_current_page(collection_handle); uv_rwlock_wrlock(&page_index->lock); - if (!--page_index->writers && !page_index->page_count) { + + if (!--page_index->writers && !page_index->page_count) can_delete_metric = 1; - } + uv_rwlock_wrunlock(&page_index->lock); + + rrdeng_page_alignment_release(handle->alignment); freez(handle); return can_delete_metric; diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h index 85375044f..3acee4ec6 100644 --- a/database/engine/rrdengineapi.h +++ b/database/engine/rrdengineapi.h @@ -42,14 +42,14 @@ void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uu uuid_t *ret_uuid); -STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg); -STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid, STORAGE_METRICS_GROUP *smg); -STORAGE_METRIC_HANDLE *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid, STORAGE_METRICS_GROUP *smg); -STORAGE_METRIC_HANDLE *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const char *rd_id, const char *st_id, STORAGE_METRICS_GROUP *smg); +STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance); +STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid); +STORAGE_METRIC_HANDLE *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid); +STORAGE_METRIC_HANDLE *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const char *rd_id, const char *st_id); void rrdeng_metric_release(STORAGE_METRIC_HANDLE *db_metric_handle); STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *db_metric_handle); -STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every); +STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg); void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle); void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every); void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time_ut, NETDATA_DOUBLE n, |