diff options
Diffstat (limited to 'libnetdata/dictionary/dictionary.c')
-rw-r--r-- | libnetdata/dictionary/dictionary.c | 214 |
1 files changed, 153 insertions, 61 deletions
diff --git a/libnetdata/dictionary/dictionary.c b/libnetdata/dictionary/dictionary.c index 05da5534..a74a5958 100644 --- a/libnetdata/dictionary/dictionary.c +++ b/libnetdata/dictionary/dictionary.c @@ -250,6 +250,7 @@ static inline void pointer_del(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM // ---------------------------------------------------------------------------- // memory statistics +#ifdef DICT_WITH_STATS static inline void DICTIONARY_STATS_PLUS_MEMORY(DICTIONARY *dict, size_t key_size, size_t item_size, size_t value_size) { if(key_size) __atomic_fetch_add(&dict->stats->memory.index, (long)JUDYHS_INDEX_SIZE_ESTIMATE(key_size), __ATOMIC_RELAXED); @@ -260,6 +261,7 @@ static inline void DICTIONARY_STATS_PLUS_MEMORY(DICTIONARY *dict, size_t key_siz if(value_size) __atomic_fetch_add(&dict->stats->memory.values, (long)value_size, __ATOMIC_RELAXED); } + static inline void DICTIONARY_STATS_MINUS_MEMORY(DICTIONARY *dict, size_t key_size, size_t item_size, size_t value_size) { if(key_size) __atomic_fetch_sub(&dict->stats->memory.index, (long)JUDYHS_INDEX_SIZE_ESTIMATE(key_size), __ATOMIC_RELAXED); @@ -270,6 +272,10 @@ static inline void DICTIONARY_STATS_MINUS_MEMORY(DICTIONARY *dict, size_t key_si if(value_size) __atomic_fetch_sub(&dict->stats->memory.values, (long)value_size, __ATOMIC_RELAXED); } +#else +#define DICTIONARY_STATS_PLUS_MEMORY(dict, key_size, item_size, value_size) do {;} while(0) +#define DICTIONARY_STATS_MINUS_MEMORY(dict, key_size, item_size, value_size) do {;} while(0) +#endif // ---------------------------------------------------------------------------- // callbacks registration @@ -376,14 +382,21 @@ void dictionary_version_increment(DICTIONARY *dict) { // ---------------------------------------------------------------------------- // internal statistics API +#ifdef DICT_WITH_STATS static inline void DICTIONARY_STATS_SEARCHES_PLUS1(DICTIONARY *dict) { __atomic_fetch_add(&dict->stats->ops.searches, 1, __ATOMIC_RELAXED); } +#else +#define DICTIONARY_STATS_SEARCHES_PLUS1(dict) do {;} while(0) +#endif + static inline void DICTIONARY_ENTRIES_PLUS1(DICTIONARY *dict) { +#ifdef DICT_WITH_STATS // statistics __atomic_fetch_add(&dict->stats->items.entries, 1, __ATOMIC_RELAXED); __atomic_fetch_add(&dict->stats->items.referenced, 1, __ATOMIC_RELAXED); __atomic_fetch_add(&dict->stats->ops.inserts, 1, __ATOMIC_RELAXED); +#endif if(unlikely(is_dictionary_single_threaded(dict))) { dict->version++; @@ -397,10 +410,13 @@ static inline void DICTIONARY_ENTRIES_PLUS1(DICTIONARY *dict) { __atomic_fetch_add(&dict->referenced_items, 1, __ATOMIC_RELAXED); } } + static inline void DICTIONARY_ENTRIES_MINUS1(DICTIONARY *dict) { +#ifdef DICT_WITH_STATS // statistics __atomic_fetch_add(&dict->stats->ops.deletes, 1, __ATOMIC_RELAXED); __atomic_fetch_sub(&dict->stats->items.entries, 1, __ATOMIC_RELAXED); +#endif size_t entries; (void)entries; if(unlikely(is_dictionary_single_threaded(dict))) { @@ -418,14 +434,19 @@ static inline void DICTIONARY_ENTRIES_MINUS1(DICTIONARY *dict) { dict->creation_line, dict->creation_file); } + static inline void DICTIONARY_VALUE_RESETS_PLUS1(DICTIONARY *dict) { +#ifdef DICT_WITH_STATS __atomic_fetch_add(&dict->stats->ops.resets, 1, __ATOMIC_RELAXED); +#endif if(unlikely(is_dictionary_single_threaded(dict))) dict->version++; else __atomic_fetch_add(&dict->version, 1, __ATOMIC_RELAXED); } + +#ifdef DICT_WITH_STATS static inline void DICTIONARY_STATS_TRAVERSALS_PLUS1(DICTIONARY *dict) { __atomic_fetch_add(&dict->stats->ops.traversals, 1, __ATOMIC_RELAXED); } @@ -476,9 +497,29 @@ static inline void DICTIONARY_STATS_DICT_DESTROY_QUEUED_MINUS1(DICTIONARY *dict) static inline void DICTIONARY_STATS_DICT_FLUSHES_PLUS1(DICTIONARY *dict) { __atomic_fetch_add(&dict->stats->ops.flushes, 1, __ATOMIC_RELAXED); } +#else +#define DICTIONARY_STATS_TRAVERSALS_PLUS1(dict) do {;} while(0) +#define DICTIONARY_STATS_WALKTHROUGHS_PLUS1(dict) do {;} while(0) +#define DICTIONARY_STATS_CHECK_SPINS_PLUS(dict, count) do {;} while(0) +#define DICTIONARY_STATS_INSERT_SPINS_PLUS(dict, count) do {;} while(0) +#define DICTIONARY_STATS_DELETE_SPINS_PLUS(dict, count) do {;} while(0) +#define DICTIONARY_STATS_SEARCH_IGNORES_PLUS1(dict) do {;} while(0) +#define DICTIONARY_STATS_CALLBACK_INSERTS_PLUS1(dict) do {;} while(0) +#define DICTIONARY_STATS_CALLBACK_CONFLICTS_PLUS1(dict) do {;} while(0) +#define DICTIONARY_STATS_CALLBACK_REACTS_PLUS1(dict) do {;} while(0) +#define DICTIONARY_STATS_CALLBACK_DELETES_PLUS1(dict) do {;} while(0) +#define DICTIONARY_STATS_GARBAGE_COLLECTIONS_PLUS1(dict) do {;} while(0) +#define DICTIONARY_STATS_DICT_CREATIONS_PLUS1(dict) do {;} while(0) +#define DICTIONARY_STATS_DICT_DESTRUCTIONS_PLUS1(dict) do {;} while(0) +#define DICTIONARY_STATS_DICT_DESTROY_QUEUED_PLUS1(dict) do {;} while(0) +#define DICTIONARY_STATS_DICT_DESTROY_QUEUED_MINUS1(dict) do {;} while(0) +#define DICTIONARY_STATS_DICT_FLUSHES_PLUS1(dict) do {;} while(0) +#endif static inline void DICTIONARY_REFERENCED_ITEMS_PLUS1(DICTIONARY *dict) { +#ifdef DICT_WITH_STATS __atomic_fetch_add(&dict->stats->items.referenced, 1, __ATOMIC_RELAXED); +#endif if(unlikely(is_dictionary_single_threaded(dict))) ++dict->referenced_items; @@ -487,7 +528,9 @@ static inline void DICTIONARY_REFERENCED_ITEMS_PLUS1(DICTIONARY *dict) { } static inline void DICTIONARY_REFERENCED_ITEMS_MINUS1(DICTIONARY *dict) { +#ifdef DICT_WITH_STATS __atomic_fetch_sub(&dict->stats->items.referenced, 1, __ATOMIC_RELAXED); +#endif long int referenced_items; (void)referenced_items; if(unlikely(is_dictionary_single_threaded(dict))) @@ -504,7 +547,9 @@ static inline void DICTIONARY_REFERENCED_ITEMS_MINUS1(DICTIONARY *dict) { } static inline void DICTIONARY_PENDING_DELETES_PLUS1(DICTIONARY *dict) { +#ifdef DICT_WITH_STATS __atomic_fetch_add(&dict->stats->items.pending_deletion, 1, __ATOMIC_RELAXED); +#endif if(unlikely(is_dictionary_single_threaded(dict))) ++dict->pending_deletion_items; @@ -513,7 +558,9 @@ static inline void DICTIONARY_PENDING_DELETES_PLUS1(DICTIONARY *dict) { } static inline long int DICTIONARY_PENDING_DELETES_MINUS1(DICTIONARY *dict) { +#ifdef DICT_WITH_STATS __atomic_fetch_sub(&dict->stats->items.pending_deletion, 1, __ATOMIC_RELEASE); +#endif if(unlikely(is_dictionary_single_threaded(dict))) return --dict->pending_deletion_items; @@ -977,7 +1024,7 @@ static int item_check_and_acquire_advanced(DICTIONARY *dict, DICTIONARY_ITEM *it DICTIONARY_REFERENCED_ITEMS_PLUS1(dict); } - if(unlikely(spins > 1 && dict->stats)) + if(unlikely(spins > 1)) DICTIONARY_STATS_CHECK_SPINS_PLUS(dict, spins - 1); return ret; @@ -1022,7 +1069,7 @@ static inline int item_is_not_referenced_and_can_be_removed_advanced(DICTIONARY item->deleter_pid = gettid(); #endif - if(unlikely(spins > 1 && dict->stats)) + if(unlikely(spins > 1)) DICTIONARY_STATS_DELETE_SPINS_PLUS(dict, spins - 1); return ret; @@ -1535,22 +1582,6 @@ static inline void dict_item_release_and_check_if_it_is_deleted_and_can_be_remov } static bool dict_item_del(DICTIONARY *dict, const char *name, ssize_t name_len) { - if(unlikely(!name || !*name)) { - internal_error( - true, - "DICTIONARY: attempted to %s() without a name on a dictionary created from %s() %zu@%s.", - __FUNCTION__, - dict->creation_function, - dict->creation_line, - dict->creation_file); - return false; - } - - if(unlikely(is_dictionary_destroyed(dict))) { - internal_error(true, "DICTIONARY: attempted to dictionary_del() on a destroyed dictionary"); - return false; - } - if(name_len == -1) name_len = (ssize_t)strlen(name) + 1; // we need the terminating null too @@ -1695,7 +1726,7 @@ static DICTIONARY_ITEM *dict_item_add_or_reset_value_and_acquire(DICTIONARY *dic } while(!item); - if(unlikely(spins > 0 && dict->stats)) + if(unlikely(spins > 0)) DICTIONARY_STATS_INSERT_SPINS_PLUS(dict, spins); if(is_master_dictionary(dict) && added_or_updated) @@ -2064,11 +2095,15 @@ void dictionary_flush(DICTIONARY *dict) { if(unlikely(!dict)) return; - void *value; - dfe_start_write(dict, value) { - dictionary_del_advanced(dict, item_get_name(value_dfe.item), (ssize_t)item_get_name_len(value_dfe.item) + 1); + ll_recursive_lock(dict, DICTIONARY_LOCK_WRITE); + + DICTIONARY_ITEM *item, *next = NULL; + for(item = dict->items.list; item ;item = next) { + next = item->next; + dict_item_del(dict, item_get_name(item), (ssize_t) item_get_name_len(item) + 1); } - dfe_done(value); + + ll_recursive_unlock(dict, DICTIONARY_LOCK_WRITE); DICTIONARY_STATS_DICT_FLUSHES_PLUS1(dict); } @@ -2251,6 +2286,12 @@ bool dictionary_del_advanced(DICTIONARY *dict, const char *name, ssize_t name_le return false; api_internal_check(dict, NULL, false, true); + + if(unlikely(is_dictionary_destroyed(dict))) { + internal_error(true, "DICTIONARY: attempted to delete item on a destroyed dictionary"); + return false; + } + return dict_item_del(dict, name, name_len); } @@ -2260,6 +2301,8 @@ bool dictionary_del_advanced(DICTIONARY *dict, const char *name, ssize_t name_le void *dictionary_foreach_start_rw(DICTFE *dfe, DICTIONARY *dict, char rw) { if(unlikely(!dfe || !dict)) return NULL; + DICTIONARY_STATS_TRAVERSALS_PLUS1(dict); + if(unlikely(is_dictionary_destroyed(dict))) { internal_error(true, "DICTIONARY: attempted to dictionary_foreach_start_rw() on a destroyed dictionary"); dfe->counter = 0; @@ -2275,8 +2318,6 @@ void *dictionary_foreach_start_rw(DICTFE *dfe, DICTIONARY *dict, char rw) { dfe->locked = true; ll_recursive_lock(dict, dfe->rw); - DICTIONARY_STATS_TRAVERSALS_PLUS1(dict); - // get the first item from the list DICTIONARY_ITEM *item = dict->items.list; @@ -2836,7 +2877,7 @@ static usec_t dictionary_unittest_run_and_measure_time(DICTIONARY *dict, char *m } } - fprintf(stderr, " %zu errors, %d (found %ld) items in dictionary, %d (found %ld) referenced, %d (found %ld) deleted, %llu usec \n", + fprintf(stderr, " %zu errors, %d (found %ld) items in dictionary, %d (found %ld) referenced, %d (found %ld) deleted, %"PRIu64" usec \n", errs, dict?dict->entries:0, found_ok, dict?dict->referenced_items:0, found_referenced, dict?dict->pending_deletion_items:0, found_deleted, dt); *errors += errs; return dt; @@ -3129,6 +3170,9 @@ struct thread_unittest { int join; DICTIONARY *dict; int dups; + + netdata_thread_t thread; + struct dictionary_stats stats; }; static void *unittest_dict_thread(void *arg) { @@ -3140,46 +3184,59 @@ static void *unittest_dict_thread(void *arg) { DICT_ITEM_CONST DICTIONARY_ITEM *item = dictionary_set_and_acquire_item_advanced(tu->dict, "dict thread checking 1234567890", -1, NULL, 0, NULL); - + tu->stats.ops.inserts++; dictionary_get(tu->dict, dictionary_acquired_item_name(item)); + tu->stats.ops.searches++; void *t1; dfe_start_write(tu->dict, t1) { // this should delete the referenced item dictionary_del(tu->dict, t1_dfe.name); + tu->stats.ops.deletes++; void *t2; dfe_start_write(tu->dict, t2) { // this should add another dictionary_set(tu->dict, t2_dfe.name, NULL, 0); + tu->stats.ops.inserts++; dictionary_get(tu->dict, dictionary_acquired_item_name(item)); + tu->stats.ops.searches++; // and this should delete it again dictionary_del(tu->dict, t2_dfe.name); + tu->stats.ops.deletes++; } dfe_done(t2); + tu->stats.ops.traversals++; // this should fail to add it dictionary_set(tu->dict, t1_dfe.name, NULL, 0); + tu->stats.ops.inserts++; + dictionary_del(tu->dict, t1_dfe.name); + tu->stats.ops.deletes++; } dfe_done(t1); + tu->stats.ops.traversals++; for(int i = 0; i < tu->dups ; i++) { dictionary_acquired_item_dup(tu->dict, item); dictionary_get(tu->dict, dictionary_acquired_item_name(item)); + tu->stats.ops.searches++; } for(int i = 0; i < tu->dups ; i++) { dictionary_acquired_item_release(tu->dict, item); dictionary_del(tu->dict, dictionary_acquired_item_name(item)); + tu->stats.ops.deletes++; } dictionary_acquired_item_release(tu->dict, item); dictionary_del(tu->dict, "dict thread checking 1234567890"); + tu->stats.ops.deletes++; // test concurrent deletions and flushes { @@ -3189,16 +3246,19 @@ static void *unittest_dict_thread(void *arg) { for (int i = 0; i < 1000; i++) { snprintfz(buf, 256, "del/flush test %d", i); dictionary_set(tu->dict, buf, NULL, 0); + tu->stats.ops.inserts++; } for (int i = 0; i < 1000; i++) { snprintfz(buf, 256, "del/flush test %d", i); dictionary_del(tu->dict, buf); + tu->stats.ops.deletes++; } } else { for (int i = 0; i < 10; i++) { dictionary_flush(tu->dict); + tu->stats.ops.flushes++; } } } @@ -3208,47 +3268,75 @@ static void *unittest_dict_thread(void *arg) { } static int dictionary_unittest_threads() { - - struct thread_unittest tu = { - .join = 0, - .dict = NULL, - .dups = 1, - }; - - // threads testing of dictionary - tu.dict = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE); time_t seconds_to_run = 5; int threads_to_create = 2; + + struct thread_unittest tu[threads_to_create]; + memset(tu, 0, sizeof(struct thread_unittest) * threads_to_create); + fprintf( - stderr, - "\nChecking dictionary concurrency with %d threads for %lld seconds...\n", - threads_to_create, - (long long)seconds_to_run); + stderr, + "\nChecking dictionary concurrency with %d threads for %lld seconds...\n", + threads_to_create, + (long long)seconds_to_run); + + // threads testing of dictionary + struct dictionary_stats stats = {}; + tu[0].join = 0; + tu[0].dups = 1; + tu[0].dict = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE, &stats, 0); - netdata_thread_t threads[threads_to_create]; - tu.join = 0; for (int i = 0; i < threads_to_create; i++) { + if(i) + tu[i] = tu[0]; + char buf[100 + 1]; snprintf(buf, 100, "dict%d", i); netdata_thread_create( - &threads[i], + &tu[i].thread, buf, NETDATA_THREAD_OPTION_DONT_LOG | NETDATA_THREAD_OPTION_JOINABLE, unittest_dict_thread, - &tu); + &tu[i]); } + sleep_usec(seconds_to_run * USEC_PER_SEC); - __atomic_store_n(&tu.join, 1, __ATOMIC_RELAXED); for (int i = 0; i < threads_to_create; i++) { + __atomic_store_n(&tu[i].join, 1, __ATOMIC_RELAXED); + void *retval; - netdata_thread_join(threads[i], &retval); + netdata_thread_join(tu[i].thread, &retval); + + if(i) { + tu[0].stats.ops.inserts += tu[i].stats.ops.inserts; + tu[0].stats.ops.deletes += tu[i].stats.ops.deletes; + tu[0].stats.ops.searches += tu[i].stats.ops.searches; + tu[0].stats.ops.flushes += tu[i].stats.ops.flushes; + tu[0].stats.ops.traversals += tu[i].stats.ops.traversals; + } } fprintf(stderr, - "inserts %zu" + "CALLS : inserts %zu" ", deletes %zu" ", searches %zu" + ", traversals %zu" + ", flushes %zu" + "\n", + tu[0].stats.ops.inserts, + tu[0].stats.ops.deletes, + tu[0].stats.ops.searches, + tu[0].stats.ops.traversals, + tu[0].stats.ops.flushes + ); + +#ifdef DICT_WITH_STATS + fprintf(stderr, + "ACTUAL: inserts %zu" + ", deletes %zu" + ", searches %zu" + ", traversals %zu" ", resets %zu" ", flushes %zu" ", entries %d" @@ -3259,22 +3347,23 @@ static int dictionary_unittest_threads() { ", delete spins %zu" ", search ignores %zu" "\n", - tu.dict->stats->ops.inserts, - tu.dict->stats->ops.deletes, - tu.dict->stats->ops.searches, - tu.dict->stats->ops.resets, - tu.dict->stats->ops.flushes, - tu.dict->entries, - tu.dict->referenced_items, - tu.dict->pending_deletion_items, - tu.dict->stats->spin_locks.use_spins, - tu.dict->stats->spin_locks.insert_spins, - tu.dict->stats->spin_locks.delete_spins, - tu.dict->stats->spin_locks.search_spins + stats.ops.inserts, + stats.ops.deletes, + stats.ops.searches, + stats.ops.traversals, + stats.ops.resets, + stats.ops.flushes, + tu[0].dict->entries, + tu[0].dict->referenced_items, + tu[0].dict->pending_deletion_items, + stats.spin_locks.use_spins, + stats.spin_locks.insert_spins, + stats.spin_locks.delete_spins, + stats.spin_locks.search_spins ); - dictionary_destroy(tu.dict); - tu.dict = NULL; +#endif + dictionary_destroy(tu[0].dict); return 0; } @@ -3407,6 +3496,7 @@ static int dictionary_unittest_view_threads() { netdata_thread_join(view_thread, &retval); netdata_thread_join(master_thread, &retval); +#ifdef DICT_WITH_STATS fprintf(stderr, "MASTER: inserts %zu" ", deletes %zu" @@ -3457,6 +3547,8 @@ static int dictionary_unittest_view_threads() { stats_view.spin_locks.delete_spins, stats_view.spin_locks.search_spins ); +#endif + dictionary_destroy(tv.master); dictionary_destroy(tv.view); |