summaryrefslogtreecommitdiffstats
path: root/database/engine/rrdengineapi.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine/rrdengineapi.c')
-rwxr-xr-xdatabase/engine/rrdengineapi.c98
1 files changed, 48 insertions, 50 deletions
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
index 27503baee..4525b041f 100755
--- a/database/engine/rrdengineapi.c
+++ b/database/engine/rrdengineapi.c
@@ -39,21 +39,35 @@ uint8_t rrdeng_drop_metrics_under_page_cache_pressure = 1;
// ----------------------------------------------------------------------------
// metrics groups
+static inline void rrdeng_page_alignment_acquire(struct pg_alignment *pa) {
+ if(unlikely(!pa)) return;
+ __atomic_add_fetch(&pa->refcount, 1, __ATOMIC_SEQ_CST);
+}
+
+static inline bool rrdeng_page_alignment_release(struct pg_alignment *pa) {
+ if(unlikely(!pa)) return true;
+
+ if(__atomic_sub_fetch(&pa->refcount, 1, __ATOMIC_SEQ_CST) == 0) {
+ freez(pa);
+ return true;
+ }
+
+ return false;
+}
+
+// charts call this
STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *db_instance __maybe_unused, uuid_t *uuid __maybe_unused) {
- return callocz(1, sizeof(struct pg_alignment));
+ struct pg_alignment *pa = callocz(1, sizeof(struct pg_alignment));
+ rrdeng_page_alignment_acquire(pa);
+ return (STORAGE_METRICS_GROUP *)pa;
}
-void rrdeng_metrics_group_release(STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg) {
- if(!smg) return;
+// charts call this
+void rrdeng_metrics_group_release(STORAGE_INSTANCE *db_instance __maybe_unused, STORAGE_METRICS_GROUP *smg) {
+ if(unlikely(!smg)) return;
- struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
struct pg_alignment *pa = (struct pg_alignment *)smg;
- struct page_cache *pg_cache = &ctx->pg_cache;
-
- uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
- if(pa->refcount == 0)
- freez(pa);
- uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ rrdeng_page_alignment_release(pa);
}
// ----------------------------------------------------------------------------
@@ -93,10 +107,10 @@ void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uu
memcpy(ret_uuid, hash_value, sizeof(uuid_t));
}
-STORAGE_METRIC_HANDLE *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const char *rd_id, const char *st_id, STORAGE_METRICS_GROUP *smg) {
+STORAGE_METRIC_HANDLE *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const char *rd_id, const char *st_id) {
uuid_t legacy_uuid;
rrdeng_generate_legacy_uuid(rd_id, st_id, &legacy_uuid);
- return rrdeng_metric_get(db_instance, &legacy_uuid, smg);
+ return rrdeng_metric_get(db_instance, &legacy_uuid);
}
// ----------------------------------------------------------------------------
@@ -105,11 +119,7 @@ STORAGE_METRIC_HANDLE *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, c
void rrdeng_metric_release(STORAGE_METRIC_HANDLE *db_metric_handle) {
struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
- unsigned short refcount = __atomic_sub_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST);
- if(refcount == 0 && page_index->alignment) {
- __atomic_sub_fetch(&page_index->alignment->refcount, 1, __ATOMIC_SEQ_CST);
- page_index->alignment = NULL;
- }
+ __atomic_sub_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST);
}
STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *db_metric_handle) {
@@ -118,9 +128,8 @@ STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *db_metric_handle
return db_metric_handle;
}
-STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid, STORAGE_METRICS_GROUP *smg) {
+STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid) {
struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
- struct pg_alignment *pa = (struct pg_alignment *)smg;
struct page_cache *pg_cache = &ctx->pg_cache;
struct pg_cache_page_index *page_index = NULL;
@@ -130,27 +139,16 @@ STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *
page_index = *PValue;
uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
- if (likely(page_index)) {
+ if (likely(page_index))
__atomic_add_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST);
- if(pa) {
- if(page_index->alignment && page_index->alignment != pa && page_index->writers > 0)
- fatal("DBENGINE: page_index has a different alignment (page_index refcount is %u, writers is %u).",
- page_index->refcount, page_index->writers);
-
- page_index->alignment = pa;
- __atomic_add_fetch(&pa->refcount, 1, __ATOMIC_SEQ_CST);
- }
- }
-
return (STORAGE_METRIC_HANDLE *)page_index;
}
-STORAGE_METRIC_HANDLE *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid, STORAGE_METRICS_GROUP *smg) {
+STORAGE_METRIC_HANDLE *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid) {
internal_fatal(!db_instance, "DBENGINE: db_instance is NULL");
struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
- struct pg_alignment *pa = (struct pg_alignment *)smg;
struct pg_cache_page_index *page_index;
struct page_cache *pg_cache = &ctx->pg_cache;
@@ -160,28 +158,25 @@ STORAGE_METRIC_HANDLE *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_
*PValue = page_index = create_page_index(uuid, ctx);
page_index->prev = pg_cache->metrics_index.last_page_index;
pg_cache->metrics_index.last_page_index = page_index;
- page_index->alignment = pa;
page_index->refcount = 1;
- if(pa)
- pa->refcount++;
uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
return (STORAGE_METRIC_HANDLE *)page_index;
}
-STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg) {
+STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance) {
STORAGE_METRIC_HANDLE *db_metric_handle;
- db_metric_handle = rrdeng_metric_get(db_instance, &rd->metric_uuid, smg);
+ db_metric_handle = rrdeng_metric_get(db_instance, &rd->metric_uuid);
if(!db_metric_handle) {
- db_metric_handle = rrdeng_metric_get_legacy(db_instance, rrddim_id(rd), rrdset_id(rd->rrdset), smg);
+ db_metric_handle = rrdeng_metric_get_legacy(db_instance, rrddim_id(rd), rrdset_id(rd->rrdset));
if(db_metric_handle) {
struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
uuid_copy(rd->metric_uuid, page_index->id);
}
}
if(!db_metric_handle)
- db_metric_handle = rrdeng_metric_create(db_instance, &rd->metric_uuid, smg);
+ db_metric_handle = rrdeng_metric_create(db_instance, &rd->metric_uuid);
#ifdef NETDATA_INTERNAL_CHECKS
struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
@@ -210,19 +205,19 @@ STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE
* Gets a handle for storing metrics to the database.
* The handle must be released with rrdeng_store_metric_final().
*/
-STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every) {
+STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg) {
struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
struct rrdeng_collect_handle *handle;
- if(!page_index->alignment)
- fatal("DBENGINE: metric group is required for collect operations");
-
handle = callocz(1, sizeof(struct rrdeng_collect_handle));
handle->page_index = page_index;
handle->descr = NULL;
handle->unaligned_page = 0;
page_index->latest_update_every_s = update_every;
+ handle->alignment = (struct pg_alignment *)smg;
+ rrdeng_page_alignment_acquire(handle->alignment);
+
uv_rwlock_wrlock(&page_index->lock);
++page_index->writers;
uv_rwlock_wrunlock(&page_index->lock);
@@ -331,18 +326,18 @@ static void rrdeng_store_metric_next_internal(STORAGE_COLLECT_HANDLE *collection
}
#endif
- if (descr->page_length == page_index->alignment->page_length) {
+ if (descr->page_length == handle->alignment->page_length) {
/* this is the leading dimension that defines chart alignment */
perfect_page_alignment = 1;
}
/* is the metric far enough out of alignment with the others? */
- if (unlikely(descr->page_length + PAGE_POINT_SIZE_BYTES(descr) < page_index->alignment->page_length)) {
+ if (unlikely(descr->page_length + PAGE_POINT_SIZE_BYTES(descr) < handle->alignment->page_length)) {
handle->unaligned_page = 1;
print_page_cache_descr(descr, "Metric page is not aligned with chart", true);
}
if (unlikely(handle->unaligned_page &&
/* did the other metrics change page? */
- page_index->alignment->page_length <= PAGE_POINT_SIZE_BYTES(descr))) {
+ handle->alignment->page_length <= PAGE_POINT_SIZE_BYTES(descr))) {
print_page_cache_descr(descr, "must_flush_unaligned_page = 1", true);
must_flush_unaligned_page = 1;
handle->unaligned_page = 0;
@@ -365,7 +360,7 @@ static void rrdeng_store_metric_next_internal(STORAGE_COLLECT_HANDLE *collection
handle->page_correlation_id = rrd_atomic_fetch_add(&pg_cache->committed_page_index.latest_corr_id, 1);
- if (0 == page_index->alignment->page_length) {
+ if (0 == handle->alignment->page_length) {
/* this is the leading dimension that defines chart alignment */
perfect_page_alignment = 1;
}
@@ -403,7 +398,7 @@ static void rrdeng_store_metric_next_internal(STORAGE_COLLECT_HANDLE *collection
pg_cache_atomic_set_pg_info(descr, point_in_time_ut, descr->page_length + PAGE_POINT_SIZE_BYTES(descr));
if (perfect_page_alignment)
- page_index->alignment->page_length = descr->page_length;
+ handle->alignment->page_length = descr->page_length;
if (unlikely(INVALID_TIME == descr->start_time_ut)) {
unsigned long new_metric_API_producers, old_metric_API_max_producers, ret_metric_API_max_producers;
descr->start_time_ut = point_in_time_ut;
@@ -530,10 +525,13 @@ int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) {
rrdeng_store_metric_flush_current_page(collection_handle);
uv_rwlock_wrlock(&page_index->lock);
- if (!--page_index->writers && !page_index->page_count) {
+
+ if (!--page_index->writers && !page_index->page_count)
can_delete_metric = 1;
- }
+
uv_rwlock_wrunlock(&page_index->lock);
+
+ rrdeng_page_alignment_release(handle->alignment);
freez(handle);
return can_delete_metric;