summaryrefslogtreecommitdiffstats
path: root/database/rrdlabels.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--database/rrdlabels.c204
1 files changed, 175 insertions, 29 deletions
diff --git a/database/rrdlabels.c b/database/rrdlabels.c
index 243b16c69..75167cb24 100644
--- a/database/rrdlabels.c
+++ b/database/rrdlabels.c
@@ -671,6 +671,7 @@ void rrdlabels_destroy(RRDLABELS *labels)
freez(labels);
}
+// Check in labels to see if we have the key specified in label
static RRDLABEL *rrdlabels_find_label_with_key_unsafe(RRDLABELS *labels, RRDLABEL *label)
{
if (unlikely(!labels))
@@ -682,7 +683,7 @@ static RRDLABEL *rrdlabels_find_label_with_key_unsafe(RRDLABELS *labels, RRDLABE
RRDLABEL *found = NULL;
while ((PValue = JudyLFirstThenNext(labels->JudyL, &Index, &first_then_next))) {
RRDLABEL *lb = (RRDLABEL *)Index;
- if (lb->index.key == label->index.key && lb->index.value != label->index.value) {
+ if (lb->index.key == label->index.key) {
found = (RRDLABEL *)Index;
break;
}
@@ -695,39 +696,42 @@ static RRDLABEL *rrdlabels_find_label_with_key_unsafe(RRDLABELS *labels, RRDLABE
static void labels_add_already_sanitized(RRDLABELS *labels, const char *key, const char *value, RRDLABEL_SRC ls)
{
- RRDLABEL *label = add_label_name_value(key, value);
+ RRDLABEL *new_label = add_label_name_value(key, value);
spinlock_lock(&labels->spinlock);
- RRDLABEL *old_key = rrdlabels_find_label_with_key_unsafe(labels, label);
+ RRDLABEL *old_label_with_key = rrdlabels_find_label_with_key_unsafe(labels, new_label);
+
+ if (old_label_with_key == new_label) {
+ spinlock_unlock(&labels->spinlock);
+ delete_label(new_label);
+ return;
+ }
size_t mem_before_judyl = JudyLMemUsed(labels->JudyL);
- Pvoid_t *PValue = JudyLIns(&labels->JudyL, (Word_t) label, PJE0);
- if(unlikely(!PValue || PValue == PJERR))
+ Pvoid_t *PValue = JudyLIns(&labels->JudyL, (Word_t)new_label, PJE0);
+ if (!PValue || PValue == PJERR)
fatal("RRDLABELS: corrupted labels JudyL array");
- if (!*PValue) {
- RRDLABEL_SRC new_ls;
- if (old_key)
- new_ls = ((ls & ~(RRDLABEL_FLAG_NEW | RRDLABEL_FLAG_OLD)) | RRDLABEL_FLAG_OLD);
- else
- new_ls = ((ls & ~(RRDLABEL_FLAG_NEW | RRDLABEL_FLAG_OLD)) | RRDLABEL_FLAG_NEW);
- *((RRDLABEL_SRC *)PValue) = new_ls;
+ RRDLABEL_SRC new_ls = (ls & ~(RRDLABEL_FLAG_NEW | RRDLABEL_FLAG_OLD));
+ labels->version++;
- labels->version++;
+ if (old_label_with_key) {
+ (void)JudyLDel(&labels->JudyL, (Word_t)old_label_with_key, PJE0);
+ new_ls |= RRDLABEL_FLAG_OLD;
+ } else
+ new_ls |= RRDLABEL_FLAG_NEW;
- if (old_key) {
- (void)JudyLDel(&labels->JudyL, (Word_t) old_key, PJE0);
- delete_label((RRDLABEL *)old_key);
- }
- size_t mem_after_judyl = JudyLMemUsed(labels->JudyL);
- STATS_PLUS_MEMORY(&dictionary_stats_category_rrdlabels, 0, mem_after_judyl - mem_before_judyl, 0);
- }
- else
- delete_label(label);
+ *((RRDLABEL_SRC *)PValue) = new_ls;
+
+ size_t mem_after_judyl = JudyLMemUsed(labels->JudyL);
+ STATS_PLUS_MEMORY(&dictionary_stats_category_rrdlabels, 0, mem_after_judyl - mem_before_judyl, 0);
spinlock_unlock(&labels->spinlock);
+
+ if (old_label_with_key)
+ delete_label((RRDLABEL *)old_label_with_key);
}
void rrdlabels_add(RRDLABELS *labels, const char *name, const char *value, RRDLABEL_SRC ls)
@@ -984,7 +988,7 @@ int rrdlabels_walkthrough_read(RRDLABELS *labels, int (*callback)(const char *na
// migrate an existing label list to a new list
void rrdlabels_migrate_to_these(RRDLABELS *dst, RRDLABELS *src) {
- if (!dst || !src)
+ if (!dst || !src || (dst == src))
return;
spinlock_lock(&dst->spinlock);
@@ -1025,7 +1029,7 @@ void rrdlabels_migrate_to_these(RRDLABELS *dst, RRDLABELS *src) {
void rrdlabels_copy(RRDLABELS *dst, RRDLABELS *src)
{
- if (!dst || !src)
+ if (!dst || !src || (dst == src))
return;
RRDLABEL *label;
@@ -1034,23 +1038,34 @@ void rrdlabels_copy(RRDLABELS *dst, RRDLABELS *src)
spinlock_lock(&dst->spinlock);
spinlock_lock(&src->spinlock);
+ size_t mem_before_judyl = JudyLMemUsed(dst->JudyL);
+ bool update_statistics = false;
lfe_start_nolock(src, label, ls)
{
- size_t mem_before_judyl = JudyLMemUsed(dst->JudyL);
+ RRDLABEL *old_label_with_key = rrdlabels_find_label_with_key_unsafe(dst, label);
+ if (old_label_with_key && old_label_with_key == label)
+ continue;
+
Pvoid_t *PValue = JudyLIns(&dst->JudyL, (Word_t)label, PJE0);
if(unlikely(!PValue || PValue == PJERR))
fatal("RRDLABELS: corrupted labels array");
if (!*PValue) {
dup_label(label);
- size_t mem_after_judyl = JudyLMemUsed(dst->JudyL);
- STATS_PLUS_MEMORY(&dictionary_stats_category_rrdlabels, 0, mem_after_judyl - mem_before_judyl, 0);
*((RRDLABEL_SRC *)PValue) = ls;
+ dst->version++;
+ update_statistics = true;
+ if (old_label_with_key) {
+ (void)JudyLDel(&dst->JudyL, (Word_t)old_label_with_key, PJE0);
+ delete_label((RRDLABEL *)old_label_with_key);
+ }
}
}
lfe_done_nolock();
-
- dst->version = src->version;
+ if (update_statistics) {
+ size_t mem_after_judyl = JudyLMemUsed(dst->JudyL);
+ STATS_PLUS_MEMORY(&dictionary_stats_category_rrdlabels, 0, mem_after_judyl - mem_before_judyl, 0);
+ }
spinlock_unlock(&src->spinlock);
spinlock_unlock(&dst->spinlock);
@@ -1265,6 +1280,9 @@ void rrdlabels_to_buffer_json_members(RRDLABELS *labels, BUFFER *wb)
size_t rrdlabels_entries(RRDLABELS *labels __maybe_unused)
{
+ if (unlikely(!labels))
+ return 0;
+
size_t count;
spinlock_lock(&labels->spinlock);
count = JudyLCount(labels->JudyL, 0, -1, PJE0);
@@ -1274,6 +1292,9 @@ size_t rrdlabels_entries(RRDLABELS *labels __maybe_unused)
size_t rrdlabels_version(RRDLABELS *labels __maybe_unused)
{
+ if (unlikely(!labels))
+ return 0;
+
return (size_t) labels->version;
}
@@ -1405,6 +1426,129 @@ int rrdlabels_unittest_add_pairs() {
return errors;
}
+int rrdlabels_unittest_double_check() {
+ fprintf(stderr, "\n%s() tests\n", __FUNCTION__);
+
+ int errors = 1;
+ int ret = 0;
+ RRDLABELS *labels = rrdlabels_create();
+
+ const char *pair = "key1=value1";
+
+ struct rrdlabels_unittest_add_a_pair tmp = {
+ .pair = pair,
+ .expected_name = "key1",
+ .expected_value = NULL,
+ .errors = 0
+ };
+
+ fprintf(stderr, "rrdlabels_add_pair(labels, %s) ...\n ", pair);
+
+ rrdlabels_add_pair(labels, pair, RRDLABEL_SRC_CONFIG);
+ size_t count = rrdlabels_entries(labels);
+ fprintf(stderr, "Added one key with \"value1\", entries found %zu\n", count);
+ tmp.expected_value = "value1";
+ ret = rrdlabels_walkthrough_read(labels, rrdlabels_unittest_add_a_pair_callback, &tmp);
+
+ fprintf(stderr, "Adding key with same value \"value1\" (collision check)\n");
+ rrdlabels_add_pair(labels, pair, RRDLABEL_SRC_CONFIG);
+ count = rrdlabels_entries(labels);
+ fprintf(stderr, "Added same key again \"value1\", entries found %zu\n", count);
+
+ ret = rrdlabels_walkthrough_read(labels, rrdlabels_unittest_add_a_pair_callback, &tmp);
+
+ // Add same key with different value
+ pair = "key1=value2";
+ rrdlabels_add_pair(labels, pair, RRDLABEL_SRC_CONFIG);
+ count = rrdlabels_entries(labels);
+ fprintf(stderr, "Added same key again with \"value2\", entries found %zu\n", count);
+
+ tmp.expected_value = "value2";
+ ret = rrdlabels_walkthrough_read(labels, rrdlabels_unittest_add_a_pair_callback, &tmp);
+
+ fprintf(stderr, "Adding key with same value \"value2\" (collision check)\n");
+ rrdlabels_add_pair(labels, pair, RRDLABEL_SRC_CONFIG);
+ count = rrdlabels_entries(labels);
+ fprintf(stderr, "Added same key again with \"value2\", entries found %zu\n", count);
+
+ ret = rrdlabels_walkthrough_read(labels, rrdlabels_unittest_add_a_pair_callback, &tmp);
+ errors = tmp.errors;
+ if(ret != 1) {
+ fprintf(stderr, "failed to get \"%s\" label", "key1");
+ errors++;
+ }
+
+ if(!errors)
+ fprintf(stderr, " OK, name='%s' and value='%s'\n", tmp.name, tmp.value?tmp.value:"(null)");
+ else
+ fprintf(stderr, " FAILED\n");
+
+ rrdlabels_destroy(labels);
+
+ return errors;
+}
+
+int rrdlabels_unittest_migrate_check() {
+ fprintf(stderr, "\n%s() tests\n", __FUNCTION__);
+
+ RRDLABELS *labels1 = NULL;
+ RRDLABELS *labels2 = NULL;
+
+ labels1 = rrdlabels_create();
+ labels2 = rrdlabels_create();
+
+ rrdlabels_add(labels1, "key1", "value1", RRDLABEL_SRC_CONFIG);
+ rrdlabels_add(labels1, "key1", "value2", RRDLABEL_SRC_CONFIG);
+
+ rrdlabels_add(labels2, "new_key1", "value2", RRDLABEL_SRC_CONFIG);
+ rrdlabels_add(labels2, "new_key2", "value2", RRDLABEL_SRC_CONFIG);
+ rrdlabels_add(labels2, "key1", "value2", RRDLABEL_SRC_CONFIG);
+
+ fprintf(stderr, "Labels1 entries found %zu (should be 1)\n", rrdlabels_entries(labels1));
+ fprintf(stderr, "Labels2 entries found %zu (should be 3)\n", rrdlabels_entries(labels2));
+
+ rrdlabels_migrate_to_these(labels1, labels2);
+ fprintf(stderr, "labels1 (migrated) entries found %zu (should be 3)\n", rrdlabels_entries(labels1));
+ size_t entries = rrdlabels_entries(labels1);
+
+ rrdlabels_destroy(labels1);
+ rrdlabels_destroy(labels2);
+
+ if (entries != 3)
+ return 1;
+
+ // Copy test
+ labels1 = rrdlabels_create();
+ labels2 = rrdlabels_create();
+
+ rrdlabels_add(labels1, "key1", "value1", RRDLABEL_SRC_CONFIG);
+ rrdlabels_add(labels1, "key2", "value1", RRDLABEL_SRC_CONFIG);
+ rrdlabels_add(labels1, "key3", "value1", RRDLABEL_SRC_CONFIG);
+ rrdlabels_add(labels1, "key4", "value1", RRDLABEL_SRC_CONFIG); // 4 keys
+
+ rrdlabels_add(labels2, "key1", "value10", RRDLABEL_SRC_CONFIG);
+ rrdlabels_add(labels2, "key2", "value1", RRDLABEL_SRC_CONFIG);
+ rrdlabels_add(labels2, "key0", "value1", RRDLABEL_SRC_CONFIG);
+
+ rrdlabels_copy(labels1, labels2); // labels1 should have 5 keys
+
+ entries = rrdlabels_entries(labels1);
+ fprintf(stderr, "labels1 (copied) entries found %zu (should be 5)\n", rrdlabels_entries(labels1));
+ if (entries != 5)
+ return 1;
+
+ rrdlabels_add(labels1, "key100", "value1", RRDLABEL_SRC_CONFIG);
+ rrdlabels_copy(labels2, labels1); // labels2 should have 6 keys
+ entries = rrdlabels_entries(labels1);
+
+ fprintf(stderr, "labels2 (copied) entries found %zu (should be 6)\n", rrdlabels_entries(labels1));
+
+ rrdlabels_destroy(labels1);
+ rrdlabels_destroy(labels2);
+
+ return entries != 6;
+}
+
int rrdlabels_unittest_check_simple_pattern(RRDLABELS *labels, const char *pattern, bool expected) {
fprintf(stderr, "rrdlabels_match_simple_pattern(labels, \"%s\") ... ", pattern);
@@ -1497,6 +1641,8 @@ int rrdlabels_unittest(void) {
errors += rrdlabels_unittest_sanitization();
errors += rrdlabels_unittest_add_pairs();
errors += rrdlabels_unittest_simple_pattern();
+ errors += rrdlabels_unittest_double_check();
+ errors += rrdlabels_unittest_migrate_check();
fprintf(stderr, "%d errors found\n", errors);
return errors;