summaryrefslogtreecommitdiffstats
path: root/database/engine/cache.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2023-02-06 16:11:30 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2023-02-06 16:11:30 +0000
commitaa2fe8ccbfcb117efa207d10229eeeac5d0f97c7 (patch)
tree941cbdd387b41c1a81587c20a6df9f0e5e0ff7ab /database/engine/cache.c
parentAdding upstream version 1.37.1. (diff)
downloadnetdata-aa2fe8ccbfcb117efa207d10229eeeac5d0f97c7.tar.xz
netdata-aa2fe8ccbfcb117efa207d10229eeeac5d0f97c7.zip
Adding upstream version 1.38.0.upstream/1.38.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'database/engine/cache.c')
-rw-r--r--database/engine/cache.c2737
1 files changed, 2737 insertions, 0 deletions
diff --git a/database/engine/cache.c b/database/engine/cache.c
new file mode 100644
index 000000000..4091684b2
--- /dev/null
+++ b/database/engine/cache.c
@@ -0,0 +1,2737 @@
+#include "cache.h"
+
+/* STATES AND TRANSITIONS
+ *
+ * entry | entry
+ * v v
+ * HOT -> DIRTY --> CLEAN --> EVICT
+ * v | v
+ * flush | evict
+ * v | v
+ * save | free
+ * callback | callback
+ *
+ */
+
+typedef int32_t REFCOUNT;
+#define REFCOUNT_DELETING (-100)
+
+// to use ARAL uncomment the following line:
+#define PGC_WITH_ARAL 1
+
+typedef enum __attribute__ ((__packed__)) {
+ // mutually exclusive flags
+ PGC_PAGE_CLEAN = (1 << 0), // none of the following
+ PGC_PAGE_DIRTY = (1 << 1), // contains unsaved data
+ PGC_PAGE_HOT = (1 << 2), // currently being collected
+
+ // flags related to various actions on each page
+ PGC_PAGE_IS_BEING_DELETED = (1 << 3),
+ PGC_PAGE_IS_BEING_MIGRATED_TO_V2 = (1 << 4),
+ PGC_PAGE_HAS_NO_DATA_IGNORE_ACCESSES = (1 << 5),
+ PGC_PAGE_HAS_BEEN_ACCESSED = (1 << 6),
+} PGC_PAGE_FLAGS;
+
+#define page_flag_check(page, flag) (__atomic_load_n(&((page)->flags), __ATOMIC_ACQUIRE) & (flag))
+#define page_flag_set(page, flag) __atomic_or_fetch(&((page)->flags), flag, __ATOMIC_RELEASE)
+#define page_flag_clear(page, flag) __atomic_and_fetch(&((page)->flags), ~(flag), __ATOMIC_RELEASE)
+
+#define page_get_status_flags(page) page_flag_check(page, PGC_PAGE_HOT | PGC_PAGE_DIRTY | PGC_PAGE_CLEAN)
+#define is_page_hot(page) (page_get_status_flags(page) == PGC_PAGE_HOT)
+#define is_page_dirty(page) (page_get_status_flags(page) == PGC_PAGE_DIRTY)
+#define is_page_clean(page) (page_get_status_flags(page) == PGC_PAGE_CLEAN)
+
+struct pgc_page {
+ // indexing data
+ Word_t section;
+ Word_t metric_id;
+ time_t start_time_s;
+ time_t end_time_s;
+ uint32_t update_every_s;
+ uint32_t assumed_size;
+
+ REFCOUNT refcount;
+ uint16_t accesses; // counts the number of accesses on this page
+ PGC_PAGE_FLAGS flags;
+ SPINLOCK transition_spinlock; // when the page changes between HOT, DIRTY, CLEAN, we have to get this lock
+
+ struct {
+ struct pgc_page *next;
+ struct pgc_page *prev;
+ } link;
+
+ void *data;
+ uint8_t custom_data[];
+
+ // IMPORTANT!
+ // THIS STRUCTURE NEEDS TO BE INITIALIZED BY HAND!
+};
+
+struct pgc_linked_list {
+ SPINLOCK spinlock;
+ union {
+ PGC_PAGE *base;
+ Pvoid_t sections_judy;
+ };
+ PGC_PAGE_FLAGS flags;
+ size_t version;
+ size_t last_version_checked;
+ bool linked_list_in_sections_judy; // when true, we use 'sections_judy', otherwise we use 'base'
+ struct pgc_queue_statistics *stats;
+};
+
+struct pgc {
+ struct {
+ char name[PGC_NAME_MAX + 1];
+
+ size_t partitions;
+ size_t clean_size;
+ size_t max_dirty_pages_per_call;
+ size_t max_pages_per_inline_eviction;
+ size_t max_skip_pages_per_inline_eviction;
+ size_t max_flushes_inline;
+ size_t max_workers_evict_inline;
+ size_t additional_bytes_per_page;
+ free_clean_page_callback pgc_free_clean_cb;
+ save_dirty_page_callback pgc_save_dirty_cb;
+ save_dirty_init_callback pgc_save_init_cb;
+ PGC_OPTIONS options;
+
+ size_t severe_pressure_per1000;
+ size_t aggressive_evict_per1000;
+ size_t healthy_size_per1000;
+ size_t evict_low_threshold_per1000;
+
+ dynamic_target_cache_size_callback dynamic_target_size_cb;
+ } config;
+
+#ifdef PGC_WITH_ARAL
+ ARAL **aral;
+#endif
+
+ PGC_CACHE_LINE_PADDING(0);
+
+ struct pgc_index {
+ netdata_rwlock_t rwlock;
+ Pvoid_t sections_judy;
+ } *index;
+
+ PGC_CACHE_LINE_PADDING(1);
+
+ struct {
+ SPINLOCK spinlock;
+ size_t per1000;
+ } usage;
+
+ PGC_CACHE_LINE_PADDING(2);
+
+ struct pgc_linked_list clean; // LRU is applied here to free memory from the cache
+
+ PGC_CACHE_LINE_PADDING(3);
+
+ struct pgc_linked_list dirty; // in the dirty list, pages are ordered the way they were marked dirty
+
+ PGC_CACHE_LINE_PADDING(4);
+
+ struct pgc_linked_list hot; // in the hot list, pages are order the way they were marked hot
+
+ PGC_CACHE_LINE_PADDING(5);
+
+ struct pgc_statistics stats; // statistics
+
+#ifdef NETDATA_PGC_POINTER_CHECK
+ PGC_CACHE_LINE_PADDING(6);
+ netdata_mutex_t global_pointer_registry_mutex;
+ Pvoid_t global_pointer_registry;
+#endif
+};
+
+
+
+// ----------------------------------------------------------------------------
+// validate each pointer is indexed once - internal checks only
+
+static inline void pointer_index_init(PGC *cache __maybe_unused) {
+#ifdef NETDATA_PGC_POINTER_CHECK
+ netdata_mutex_init(&cache->global_pointer_registry_mutex);
+#else
+ ;
+#endif
+}
+
+static inline void pointer_destroy_index(PGC *cache __maybe_unused) {
+#ifdef NETDATA_PGC_POINTER_CHECK
+ netdata_mutex_lock(&cache->global_pointer_registry_mutex);
+ JudyHSFreeArray(&cache->global_pointer_registry, PJE0);
+ netdata_mutex_unlock(&cache->global_pointer_registry_mutex);
+#else
+ ;
+#endif
+}
+static inline void pointer_add(PGC *cache __maybe_unused, PGC_PAGE *page __maybe_unused) {
+#ifdef NETDATA_PGC_POINTER_CHECK
+ netdata_mutex_lock(&cache->global_pointer_registry_mutex);
+ Pvoid_t *PValue = JudyHSIns(&cache->global_pointer_registry, &page, sizeof(void *), PJE0);
+ if(*PValue != NULL)
+ fatal("pointer already exists in registry");
+ *PValue = page;
+ netdata_mutex_unlock(&cache->global_pointer_registry_mutex);
+#else
+ ;
+#endif
+}
+
+static inline void pointer_check(PGC *cache __maybe_unused, PGC_PAGE *page __maybe_unused) {
+#ifdef NETDATA_PGC_POINTER_CHECK
+ netdata_mutex_lock(&cache->global_pointer_registry_mutex);
+ Pvoid_t *PValue = JudyHSGet(cache->global_pointer_registry, &page, sizeof(void *));
+ if(PValue == NULL)
+ fatal("pointer is not found in registry");
+ netdata_mutex_unlock(&cache->global_pointer_registry_mutex);
+#else
+ ;
+#endif
+}
+
+static inline void pointer_del(PGC *cache __maybe_unused, PGC_PAGE *page __maybe_unused) {
+#ifdef NETDATA_PGC_POINTER_CHECK
+ netdata_mutex_lock(&cache->global_pointer_registry_mutex);
+ int ret = JudyHSDel(&cache->global_pointer_registry, &page, sizeof(void *), PJE0);
+ if(!ret)
+ fatal("pointer to be deleted does not exist in registry");
+ netdata_mutex_unlock(&cache->global_pointer_registry_mutex);
+#else
+ ;
+#endif
+}
+
+// ----------------------------------------------------------------------------
+// locking
+
+static inline size_t pgc_indexing_partition(PGC *cache, Word_t metric_id) {
+ static __thread Word_t last_metric_id = 0;
+ static __thread size_t last_partition = 0;
+
+ if(metric_id == last_metric_id || cache->config.partitions == 1)
+ return last_partition;
+
+ last_metric_id = metric_id;
+ last_partition = indexing_partition(metric_id, cache->config.partitions);
+
+ return last_partition;
+}
+
+static inline void pgc_index_read_lock(PGC *cache, size_t partition) {
+ netdata_rwlock_rdlock(&cache->index[partition].rwlock);
+}
+static inline void pgc_index_read_unlock(PGC *cache, size_t partition) {
+ netdata_rwlock_unlock(&cache->index[partition].rwlock);
+}
+//static inline bool pgc_index_write_trylock(PGC *cache, size_t partition) {
+// return !netdata_rwlock_trywrlock(&cache->index[partition].rwlock);
+//}
+static inline void pgc_index_write_lock(PGC *cache, size_t partition) {
+ netdata_rwlock_wrlock(&cache->index[partition].rwlock);
+}
+static inline void pgc_index_write_unlock(PGC *cache, size_t partition) {
+ netdata_rwlock_unlock(&cache->index[partition].rwlock);
+}
+
+static inline bool pgc_ll_trylock(PGC *cache __maybe_unused, struct pgc_linked_list *ll) {
+ return netdata_spinlock_trylock(&ll->spinlock);
+}
+
+static inline void pgc_ll_lock(PGC *cache __maybe_unused, struct pgc_linked_list *ll) {
+ netdata_spinlock_lock(&ll->spinlock);
+}
+
+static inline void pgc_ll_unlock(PGC *cache __maybe_unused, struct pgc_linked_list *ll) {
+ netdata_spinlock_unlock(&ll->spinlock);
+}
+
+static inline bool page_transition_trylock(PGC *cache __maybe_unused, PGC_PAGE *page) {
+ return netdata_spinlock_trylock(&page->transition_spinlock);
+}
+
+static inline void page_transition_lock(PGC *cache __maybe_unused, PGC_PAGE *page) {
+ netdata_spinlock_lock(&page->transition_spinlock);
+}
+
+static inline void page_transition_unlock(PGC *cache __maybe_unused, PGC_PAGE *page) {
+ netdata_spinlock_unlock(&page->transition_spinlock);
+}
+
+// ----------------------------------------------------------------------------
+// evictions control
+
+static inline size_t cache_usage_per1000(PGC *cache, size_t *size_to_evict) {
+
+ if(size_to_evict)
+ netdata_spinlock_lock(&cache->usage.spinlock);
+
+ else if(!netdata_spinlock_trylock(&cache->usage.spinlock))
+ return __atomic_load_n(&cache->usage.per1000, __ATOMIC_RELAXED);
+
+ size_t current_cache_size;
+ size_t wanted_cache_size;
+ size_t per1000;
+
+ size_t dirty = __atomic_load_n(&cache->dirty.stats->size, __ATOMIC_RELAXED);
+ size_t hot = __atomic_load_n(&cache->hot.stats->size, __ATOMIC_RELAXED);
+
+ if(cache->config.options & PGC_OPTIONS_AUTOSCALE) {
+ size_t dirty_max = __atomic_load_n(&cache->dirty.stats->max_size, __ATOMIC_RELAXED);
+ size_t hot_max = __atomic_load_n(&cache->hot.stats->max_size, __ATOMIC_RELAXED);
+
+ // our promise to users
+ size_t max_size1 = MAX(hot_max, hot) * 2;
+
+ // protection against slow flushing
+ size_t max_size2 = hot_max + ((dirty_max < hot_max / 2) ? hot_max / 2 : dirty_max * 2);
+
+ // the final wanted cache size
+ wanted_cache_size = MIN(max_size1, max_size2);
+
+ if(cache->config.dynamic_target_size_cb) {
+ size_t wanted_cache_size_cb = cache->config.dynamic_target_size_cb();
+ if(wanted_cache_size_cb > wanted_cache_size)
+ wanted_cache_size = wanted_cache_size_cb;
+ }
+
+ if (wanted_cache_size < hot + dirty + cache->config.clean_size)
+ wanted_cache_size = hot + dirty + cache->config.clean_size;
+ }
+ else
+ wanted_cache_size = hot + dirty + cache->config.clean_size;
+
+ // protection again huge queries
+ // if huge queries are running, or huge amounts need to be saved
+ // allow the cache to grow more (hot pages in main cache are also referenced)
+ size_t referenced_size = __atomic_load_n(&cache->stats.referenced_size, __ATOMIC_RELAXED);
+ if(unlikely(wanted_cache_size < referenced_size * 2 / 3))
+ wanted_cache_size = referenced_size * 2 / 3;
+
+ current_cache_size = __atomic_load_n(&cache->stats.size, __ATOMIC_RELAXED); // + pgc_aral_overhead();
+
+ per1000 = (size_t)((unsigned long long)current_cache_size * 1000ULL / (unsigned long long)wanted_cache_size);
+
+ __atomic_store_n(&cache->usage.per1000, per1000, __ATOMIC_RELAXED);
+ __atomic_store_n(&cache->stats.wanted_cache_size, wanted_cache_size, __ATOMIC_RELAXED);
+ __atomic_store_n(&cache->stats.current_cache_size, current_cache_size, __ATOMIC_RELAXED);
+
+ netdata_spinlock_unlock(&cache->usage.spinlock);
+
+ if(size_to_evict) {
+ size_t target = (size_t)((unsigned long long)wanted_cache_size * (unsigned long long)cache->config.evict_low_threshold_per1000 / 1000ULL);
+ if(current_cache_size > target)
+ *size_to_evict = current_cache_size - target;
+ else
+ *size_to_evict = 0;
+ }
+
+ if(per1000 >= cache->config.severe_pressure_per1000)
+ __atomic_add_fetch(&cache->stats.events_cache_under_severe_pressure, 1, __ATOMIC_RELAXED);
+
+ else if(per1000 >= cache->config.aggressive_evict_per1000)
+ __atomic_add_fetch(&cache->stats.events_cache_needs_space_aggressively, 1, __ATOMIC_RELAXED);
+
+ return per1000;
+}
+
+static inline bool cache_pressure(PGC *cache, size_t limit) {
+ return (cache_usage_per1000(cache, NULL) >= limit);
+}
+
+#define cache_under_severe_pressure(cache) cache_pressure(cache, (cache)->config.severe_pressure_per1000)
+#define cache_needs_space_aggressively(cache) cache_pressure(cache, (cache)->config.aggressive_evict_per1000)
+#define cache_above_healthy_limit(cache) cache_pressure(cache, (cache)->config.healthy_size_per1000)
+
+typedef bool (*evict_filter)(PGC_PAGE *page, void *data);
+static bool evict_pages_with_filter(PGC *cache, size_t max_skip, size_t max_evict, bool wait, bool all_of_them, evict_filter filter, void *data);
+#define evict_pages(cache, max_skip, max_evict, wait, all_of_them) evict_pages_with_filter(cache, max_skip, max_evict, wait, all_of_them, NULL, NULL)
+
+static inline void evict_on_clean_page_added(PGC *cache __maybe_unused) {
+ if((cache->config.options & PGC_OPTIONS_EVICT_PAGES_INLINE) || cache_needs_space_aggressively(cache)) {
+ evict_pages(cache,
+ cache->config.max_skip_pages_per_inline_eviction,
+ cache->config.max_pages_per_inline_eviction,
+ false, false);
+ }
+}
+
+static inline void evict_on_page_release_when_permitted(PGC *cache __maybe_unused) {
+ if ((cache->config.options & PGC_OPTIONS_EVICT_PAGES_INLINE) || cache_under_severe_pressure(cache)) {
+ evict_pages(cache,
+ cache->config.max_skip_pages_per_inline_eviction,
+ cache->config.max_pages_per_inline_eviction,
+ false, false);
+ }
+}
+
+// ----------------------------------------------------------------------------
+// flushing control
+
+static bool flush_pages(PGC *cache, size_t max_flushes, Word_t section, bool wait, bool all_of_them);
+
+static inline bool flushing_critical(PGC *cache) {
+ if(unlikely(__atomic_load_n(&cache->dirty.stats->size, __ATOMIC_RELAXED) > __atomic_load_n(&cache->hot.stats->max_size, __ATOMIC_RELAXED))) {
+ __atomic_add_fetch(&cache->stats.events_flush_critical, 1, __ATOMIC_RELAXED);
+ return true;
+ }
+
+ return false;
+}
+
+// ----------------------------------------------------------------------------
+// helpers
+
+static inline size_t page_assumed_size(PGC *cache, size_t size) {
+ return size + (sizeof(PGC_PAGE) + cache->config.additional_bytes_per_page + sizeof(Word_t) * 3);
+}
+
+static inline size_t page_size_from_assumed_size(PGC *cache, size_t assumed_size) {
+ return assumed_size - (sizeof(PGC_PAGE) + cache->config.additional_bytes_per_page + sizeof(Word_t) * 3);
+}
+
+// ----------------------------------------------------------------------------
+// Linked list management
+
+static inline void atomic_set_max(size_t *max, size_t desired) {
+ size_t expected;
+
+ expected = __atomic_load_n(max, __ATOMIC_RELAXED);
+
+ do {
+
+ if(expected >= desired)
+ return;
+
+ } while(!__atomic_compare_exchange_n(max, &expected, desired,
+ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
+}
+
+struct section_pages {
+ SPINLOCK migration_to_v2_spinlock;
+ size_t entries;
+ size_t size;
+ PGC_PAGE *base;
+};
+
+static ARAL *pgc_section_pages_aral = NULL;
+static void pgc_section_pages_static_aral_init(void) {
+ static SPINLOCK spinlock = NETDATA_SPINLOCK_INITIALIZER;
+
+ if(unlikely(!pgc_section_pages_aral)) {
+ netdata_spinlock_lock(&spinlock);
+
+ // we have to check again
+ if(!pgc_section_pages_aral)
+ pgc_section_pages_aral = aral_create(
+ "pgc_section",
+ sizeof(struct section_pages),
+ 0,
+ 65536, NULL,
+ NULL, NULL, false, false);
+
+ netdata_spinlock_unlock(&spinlock);
+ }
+}
+
+static inline void pgc_stats_ll_judy_change(PGC *cache, struct pgc_linked_list *ll, size_t mem_before_judyl, size_t mem_after_judyl) {
+ if(mem_after_judyl > mem_before_judyl) {
+ __atomic_add_fetch(&ll->stats->size, mem_after_judyl - mem_before_judyl, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&cache->stats.size, mem_after_judyl - mem_before_judyl, __ATOMIC_RELAXED);
+ }
+ else if(mem_after_judyl < mem_before_judyl) {
+ __atomic_sub_fetch(&ll->stats->size, mem_before_judyl - mem_after_judyl, __ATOMIC_RELAXED);
+ __atomic_sub_fetch(&cache->stats.size, mem_before_judyl - mem_after_judyl, __ATOMIC_RELAXED);
+ }
+}
+
+static inline void pgc_stats_index_judy_change(PGC *cache, size_t mem_before_judyl, size_t mem_after_judyl) {
+ if(mem_after_judyl > mem_before_judyl) {
+ __atomic_add_fetch(&cache->stats.size, mem_after_judyl - mem_before_judyl, __ATOMIC_RELAXED);
+ }
+ else if(mem_after_judyl < mem_before_judyl) {
+ __atomic_sub_fetch(&cache->stats.size, mem_before_judyl - mem_after_judyl, __ATOMIC_RELAXED);
+ }
+}
+
+static void pgc_ll_add(PGC *cache __maybe_unused, struct pgc_linked_list *ll, PGC_PAGE *page, bool having_lock) {
+ if(!having_lock)
+ pgc_ll_lock(cache, ll);
+
+ internal_fatal(page_get_status_flags(page) != 0,
+ "DBENGINE CACHE: invalid page flags, the page has %d, but it is should be %d",
+ page_get_status_flags(page),
+ 0);
+
+ if(ll->linked_list_in_sections_judy) {
+ size_t mem_before_judyl, mem_after_judyl;
+
+ mem_before_judyl = JudyLMemUsed(ll->sections_judy);
+ Pvoid_t *section_pages_pptr = JudyLIns(&ll->sections_judy, page->section, PJE0);
+ mem_after_judyl = JudyLMemUsed(ll->sections_judy);
+
+ struct section_pages *sp = *section_pages_pptr;
+ if(!sp) {
+ // sp = callocz(1, sizeof(struct section_pages));
+ sp = aral_mallocz(pgc_section_pages_aral);
+ memset(sp, 0, sizeof(struct section_pages));
+
+ *section_pages_pptr = sp;
+
+ mem_after_judyl += sizeof(struct section_pages);
+ }
+ pgc_stats_ll_judy_change(cache, ll, mem_before_judyl, mem_after_judyl);
+
+ sp->entries++;
+ sp->size += page->assumed_size;
+ DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(sp->base, page, link.prev, link.next);
+
+ if((sp->entries % cache->config.max_dirty_pages_per_call) == 0)
+ ll->version++;
+ }
+ else {
+ // CLEAN pages end up here.
+ // - New pages created as CLEAN, always have 1 access.
+ // - DIRTY pages made CLEAN, depending on their accesses may be appended (accesses > 0) or prepended (accesses = 0).
+
+ if(page->accesses || page_flag_check(page, PGC_PAGE_HAS_BEEN_ACCESSED | PGC_PAGE_HAS_NO_DATA_IGNORE_ACCESSES) == PGC_PAGE_HAS_BEEN_ACCESSED) {
+ DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(ll->base, page, link.prev, link.next);
+ page_flag_clear(page, PGC_PAGE_HAS_BEEN_ACCESSED);
+ }
+ else
+ DOUBLE_LINKED_LIST_PREPEND_ITEM_UNSAFE(ll->base, page, link.prev, link.next);
+
+ ll->version++;
+ }
+
+ page_flag_set(page, ll->flags);
+
+ if(!having_lock)
+ pgc_ll_unlock(cache, ll);
+
+ size_t entries = __atomic_add_fetch(&ll->stats->entries, 1, __ATOMIC_RELAXED);
+ size_t size = __atomic_add_fetch(&ll->stats->size, page->assumed_size, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&ll->stats->added_entries, 1, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&ll->stats->added_size, page->assumed_size, __ATOMIC_RELAXED);
+
+ atomic_set_max(&ll->stats->max_entries, entries);
+ atomic_set_max(&ll->stats->max_size, size);
+}
+
+static void pgc_ll_del(PGC *cache __maybe_unused, struct pgc_linked_list *ll, PGC_PAGE *page, bool having_lock) {
+ __atomic_sub_fetch(&ll->stats->entries, 1, __ATOMIC_RELAXED);
+ __atomic_sub_fetch(&ll->stats->size, page->assumed_size, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&ll->stats->removed_entries, 1, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&ll->stats->removed_size, page->assumed_size, __ATOMIC_RELAXED);
+
+ if(!having_lock)
+ pgc_ll_lock(cache, ll);
+
+ internal_fatal(page_get_status_flags(page) != ll->flags,
+ "DBENGINE CACHE: invalid page flags, the page has %d, but it is should be %d",
+ page_get_status_flags(page),
+ ll->flags);
+
+ page_flag_clear(page, ll->flags);
+
+ if(ll->linked_list_in_sections_judy) {
+ Pvoid_t *section_pages_pptr = JudyLGet(ll->sections_judy, page->section, PJE0);
+ internal_fatal(!section_pages_pptr, "DBENGINE CACHE: page should be in Judy LL, but it is not");
+
+ struct section_pages *sp = *section_pages_pptr;
+ sp->entries--;
+ sp->size -= page->assumed_size;
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(sp->base, page, link.prev, link.next);
+
+ if(!sp->base) {
+ size_t mem_before_judyl, mem_after_judyl;
+
+ mem_before_judyl = JudyLMemUsed(ll->sections_judy);
+ int rc = JudyLDel(&ll->sections_judy, page->section, PJE0);
+ mem_after_judyl = JudyLMemUsed(ll->sections_judy);
+
+ if(!rc)
+ fatal("DBENGINE CACHE: cannot delete section from Judy LL");
+
+ // freez(sp);
+ aral_freez(pgc_section_pages_aral, sp);
+ mem_after_judyl -= sizeof(struct section_pages);
+ pgc_stats_ll_judy_change(cache, ll, mem_before_judyl, mem_after_judyl);
+ }
+ }
+ else {
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(ll->base, page, link.prev, link.next);
+ ll->version++;
+ }
+
+ if(!having_lock)
+ pgc_ll_unlock(cache, ll);
+}
+
+static inline void page_has_been_accessed(PGC *cache, PGC_PAGE *page) {
+ PGC_PAGE_FLAGS flags = page_flag_check(page, PGC_PAGE_CLEAN | PGC_PAGE_HAS_NO_DATA_IGNORE_ACCESSES);
+
+ if (!(flags & PGC_PAGE_HAS_NO_DATA_IGNORE_ACCESSES)) {
+ __atomic_add_fetch(&page->accesses, 1, __ATOMIC_RELAXED);
+
+ if (flags & PGC_PAGE_CLEAN) {
+ if(pgc_ll_trylock(cache, &cache->clean)) {
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(cache->clean.base, page, link.prev, link.next);
+ DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(cache->clean.base, page, link.prev, link.next);
+ pgc_ll_unlock(cache, &cache->clean);
+ page_flag_clear(page, PGC_PAGE_HAS_BEEN_ACCESSED);
+ }
+ else
+ page_flag_set(page, PGC_PAGE_HAS_BEEN_ACCESSED);
+ }
+ }
+}
+
+
+// ----------------------------------------------------------------------------
+// state transitions
+
+static inline void page_set_clean(PGC *cache, PGC_PAGE *page, bool having_transition_lock, bool having_clean_lock) {
+ if(!having_transition_lock)
+ page_transition_lock(cache, page);
+
+ PGC_PAGE_FLAGS flags = page_get_status_flags(page);
+
+ if(flags & PGC_PAGE_CLEAN) {
+ if(!having_transition_lock)
+ page_transition_unlock(cache, page);
+ return;
+ }
+
+ if(flags & PGC_PAGE_HOT)
+ pgc_ll_del(cache, &cache->hot, page, false);
+
+ if(flags & PGC_PAGE_DIRTY)
+ pgc_ll_del(cache, &cache->dirty, page, false);
+
+ // first add to linked list, the set the flag (required for move_page_last())
+ pgc_ll_add(cache, &cache->clean, page, having_clean_lock);
+
+ if(!having_transition_lock)
+ page_transition_unlock(cache, page);
+}
+
+static inline void page_set_dirty(PGC *cache, PGC_PAGE *page, bool having_hot_lock) {
+ if(!having_hot_lock)
+ // to avoid deadlocks, we have to get the hot lock before the page transition
+ // since this is what all_hot_to_dirty() does
+ pgc_ll_lock(cache, &cache->hot);
+
+ page_transition_lock(cache, page);
+
+ PGC_PAGE_FLAGS flags = page_get_status_flags(page);
+
+ if(flags & PGC_PAGE_DIRTY) {
+ page_transition_unlock(cache, page);
+
+ if(!having_hot_lock)
+ // we don't need the hot lock anymore
+ pgc_ll_unlock(cache, &cache->hot);
+
+ return;
+ }
+
+ __atomic_add_fetch(&cache->stats.hot2dirty_entries, 1, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&cache->stats.hot2dirty_size, page->assumed_size, __ATOMIC_RELAXED);
+
+ if(likely(flags & PGC_PAGE_HOT))
+ pgc_ll_del(cache, &cache->hot, page, true);
+
+ if(!having_hot_lock)
+ // we don't need the hot lock anymore
+ pgc_ll_unlock(cache, &cache->hot);
+
+ if(unlikely(flags & PGC_PAGE_CLEAN))
+ pgc_ll_del(cache, &cache->clean, page, false);
+
+ // first add to linked list, the set the flag (required for move_page_last())
+ pgc_ll_add(cache, &cache->dirty, page, false);
+
+ __atomic_sub_fetch(&cache->stats.hot2dirty_entries, 1, __ATOMIC_RELAXED);
+ __atomic_sub_fetch(&cache->stats.hot2dirty_size, page->assumed_size, __ATOMIC_RELAXED);
+
+ page_transition_unlock(cache, page);
+}
+
+static inline void page_set_hot(PGC *cache, PGC_PAGE *page) {
+ page_transition_lock(cache, page);
+
+ PGC_PAGE_FLAGS flags = page_get_status_flags(page);
+
+ if(flags & PGC_PAGE_HOT) {
+ page_transition_unlock(cache, page);
+ return;
+ }
+
+ if(flags & PGC_PAGE_DIRTY)
+ pgc_ll_del(cache, &cache->dirty, page, false);
+
+ if(flags & PGC_PAGE_CLEAN)
+ pgc_ll_del(cache, &cache->clean, page, false);
+
+ // first add to linked list, the set the flag (required for move_page_last())
+ pgc_ll_add(cache, &cache->hot, page, false);
+
+ page_transition_unlock(cache, page);
+}
+
+
+// ----------------------------------------------------------------------------
+// Referencing
+
+static inline size_t PGC_REFERENCED_PAGES(PGC *cache) {
+ return __atomic_load_n(&cache->stats.referenced_entries, __ATOMIC_RELAXED);
+}
+
+static inline void PGC_REFERENCED_PAGES_PLUS1(PGC *cache, PGC_PAGE *page) {
+ __atomic_add_fetch(&cache->stats.referenced_entries, 1, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&cache->stats.referenced_size, page->assumed_size, __ATOMIC_RELAXED);
+}
+
+static inline void PGC_REFERENCED_PAGES_MINUS1(PGC *cache, size_t assumed_size) {
+ __atomic_sub_fetch(&cache->stats.referenced_entries, 1, __ATOMIC_RELAXED);
+ __atomic_sub_fetch(&cache->stats.referenced_size, assumed_size, __ATOMIC_RELAXED);
+}
+
+// If the page is not already acquired,
+// YOU HAVE TO HAVE THE QUEUE (hot, dirty, clean) THE PAGE IS IN, L O C K E D !
+// If you don't have it locked, NOTHING PREVENTS THIS PAGE FOR VANISHING WHILE THIS IS CALLED!
+static inline bool page_acquire(PGC *cache, PGC_PAGE *page) {
+ __atomic_add_fetch(&cache->stats.acquires, 1, __ATOMIC_RELAXED);
+
+ REFCOUNT expected, desired;
+
+ expected = __atomic_load_n(&page->refcount, __ATOMIC_RELAXED);
+ size_t spins = 0;
+
+ do {
+ spins++;
+
+ if(unlikely(expected < 0))
+ return false;
+
+ desired = expected + 1;
+
+ } while(!__atomic_compare_exchange_n(&page->refcount, &expected, desired, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED));
+
+ if(unlikely(spins > 1))
+ __atomic_add_fetch(&cache->stats.acquire_spins, spins - 1, __ATOMIC_RELAXED);
+
+ if(desired == 1)
+ PGC_REFERENCED_PAGES_PLUS1(cache, page);
+
+ return true;
+}
+
+static inline void page_release(PGC *cache, PGC_PAGE *page, bool evict_if_necessary) {
+ __atomic_add_fetch(&cache->stats.releases, 1, __ATOMIC_RELAXED);
+
+ size_t assumed_size = page->assumed_size; // take the size before we release it
+ REFCOUNT expected, desired;
+
+ expected = __atomic_load_n(&page->refcount, __ATOMIC_RELAXED);
+
+ size_t spins = 0;
+ do {
+ spins++;
+
+ internal_fatal(expected <= 0,
+ "DBENGINE CACHE: trying to release a page with reference counter %d", expected);
+
+ desired = expected - 1;
+
+ } while(!__atomic_compare_exchange_n(&page->refcount, &expected, desired, false, __ATOMIC_RELEASE, __ATOMIC_RELAXED));
+
+ if(unlikely(spins > 1))
+ __atomic_add_fetch(&cache->stats.release_spins, spins - 1, __ATOMIC_RELAXED);
+
+ if(desired == 0) {
+ PGC_REFERENCED_PAGES_MINUS1(cache, assumed_size);
+
+ if(evict_if_necessary)
+ evict_on_page_release_when_permitted(cache);
+ }
+}
+
+static inline bool non_acquired_page_get_for_deletion___while_having_clean_locked(PGC *cache __maybe_unused, PGC_PAGE *page) {
+ __atomic_add_fetch(&cache->stats.acquires_for_deletion, 1, __ATOMIC_RELAXED);
+
+ internal_fatal(!is_page_clean(page),
+ "DBENGINE CACHE: only clean pages can be deleted");
+
+ REFCOUNT expected, desired;
+
+ expected = __atomic_load_n(&page->refcount, __ATOMIC_RELAXED);
+ size_t spins = 0;
+ bool delete_it;
+
+ do {
+ spins++;
+
+ if (expected == 0) {
+ desired = REFCOUNT_DELETING;
+ delete_it = true;
+ }
+ else {
+ delete_it = false;
+ break;
+ }
+
+ } while(!__atomic_compare_exchange_n(&page->refcount, &expected, desired, false, __ATOMIC_RELEASE, __ATOMIC_RELAXED));
+
+ if(delete_it) {
+ // we can delete this page
+ internal_fatal(page_flag_check(page, PGC_PAGE_IS_BEING_DELETED),
+ "DBENGINE CACHE: page is already being deleted");
+
+ page_flag_set(page, PGC_PAGE_IS_BEING_DELETED);
+ }
+
+ if(unlikely(spins > 1))
+ __atomic_add_fetch(&cache->stats.delete_spins, spins - 1, __ATOMIC_RELAXED);
+
+ return delete_it;
+}
+
+static inline bool acquired_page_get_for_deletion_or_release_it(PGC *cache __maybe_unused, PGC_PAGE *page) {
+ __atomic_add_fetch(&cache->stats.acquires_for_deletion, 1, __ATOMIC_RELAXED);
+
+ size_t assumed_size = page->assumed_size; // take the size before we release it
+
+ REFCOUNT expected, desired;
+
+ expected = __atomic_load_n(&page->refcount, __ATOMIC_RELAXED);
+ size_t spins = 0;
+ bool delete_it;
+
+ do {
+ spins++;
+
+ internal_fatal(expected < 1,
+ "DBENGINE CACHE: page to be deleted should be acquired by the caller.");
+
+ if (expected == 1) {
+ // we are the only one having this page referenced
+ desired = REFCOUNT_DELETING;
+ delete_it = true;
+ }
+ else {
+ // this page cannot be deleted
+ desired = expected - 1;
+ delete_it = false;
+ }
+
+ } while(!__atomic_compare_exchange_n(&page->refcount, &expected, desired, false, __ATOMIC_RELEASE, __ATOMIC_RELAXED));
+
+ if(delete_it) {
+ PGC_REFERENCED_PAGES_MINUS1(cache, assumed_size);
+
+ // we can delete this page
+ internal_fatal(page_flag_check(page, PGC_PAGE_IS_BEING_DELETED),
+ "DBENGINE CACHE: page is already being deleted");
+
+ page_flag_set(page, PGC_PAGE_IS_BEING_DELETED);
+ }
+
+ if(unlikely(spins > 1))
+ __atomic_add_fetch(&cache->stats.delete_spins, spins - 1, __ATOMIC_RELAXED);
+
+ return delete_it;
+}
+
+
+// ----------------------------------------------------------------------------
+// Indexing
+
+static inline void free_this_page(PGC *cache, PGC_PAGE *page, size_t partition __maybe_unused) {
+ // call the callback to free the user supplied memory
+ cache->config.pgc_free_clean_cb(cache, (PGC_ENTRY){
+ .section = page->section,
+ .metric_id = page->metric_id,
+ .start_time_s = page->start_time_s,
+ .end_time_s = __atomic_load_n(&page->end_time_s, __ATOMIC_RELAXED),
+ .update_every_s = page->update_every_s,
+ .size = page_size_from_assumed_size(cache, page->assumed_size),
+ .hot = (is_page_hot(page)) ? true : false,
+ .data = page->data,
+ .custom_data = (cache->config.additional_bytes_per_page) ? page->custom_data : NULL,
+ });
+
+ // update statistics
+ __atomic_add_fetch(&cache->stats.removed_entries, 1, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&cache->stats.removed_size, page->assumed_size, __ATOMIC_RELAXED);
+
+ __atomic_sub_fetch(&cache->stats.entries, 1, __ATOMIC_RELAXED);
+ __atomic_sub_fetch(&cache->stats.size, page->assumed_size, __ATOMIC_RELAXED);
+
+ // free our memory
+#ifdef PGC_WITH_ARAL
+ aral_freez(cache->aral[partition], page);
+#else
+ freez(page);
+#endif
+}
+
+static void remove_this_page_from_index_unsafe(PGC *cache, PGC_PAGE *page, size_t partition) {
+ // remove it from the Judy arrays
+
+ pointer_check(cache, page);
+
+ internal_fatal(page_flag_check(page, PGC_PAGE_HOT | PGC_PAGE_DIRTY | PGC_PAGE_CLEAN),
+ "DBENGINE CACHE: page to be removed from the cache is still in the linked-list");
+
+ internal_fatal(!page_flag_check(page, PGC_PAGE_IS_BEING_DELETED),
+ "DBENGINE CACHE: page to be removed from the index, is not marked for deletion");
+
+ internal_fatal(partition != pgc_indexing_partition(cache, page->metric_id),
+ "DBENGINE CACHE: attempted to remove this page from the wrong partition of the cache");
+
+ Pvoid_t *metrics_judy_pptr = JudyLGet(cache->index[partition].sections_judy, page->section, PJE0);
+ if(unlikely(!metrics_judy_pptr))
+ fatal("DBENGINE CACHE: section '%lu' should exist, but it does not.", page->section);
+
+ Pvoid_t *pages_judy_pptr = JudyLGet(*metrics_judy_pptr, page->metric_id, PJE0);
+ if(unlikely(!pages_judy_pptr))
+ fatal("DBENGINE CACHE: metric '%lu' in section '%lu' should exist, but it does not.",
+ page->metric_id, page->section);
+
+ Pvoid_t *page_ptr = JudyLGet(*pages_judy_pptr, page->start_time_s, PJE0);
+ if(unlikely(!page_ptr))
+ fatal("DBENGINE CACHE: page with start time '%ld' of metric '%lu' in section '%lu' should exist, but it does not.",
+ page->start_time_s, page->metric_id, page->section);
+
+ PGC_PAGE *found_page = *page_ptr;
+ if(unlikely(found_page != page))
+ fatal("DBENGINE CACHE: page with start time '%ld' of metric '%lu' in section '%lu' should exist, but the index returned a different address.",
+ page->start_time_s, page->metric_id, page->section);
+
+ size_t mem_before_judyl = 0, mem_after_judyl = 0;
+
+ mem_before_judyl += JudyLMemUsed(*pages_judy_pptr);
+ if(unlikely(!JudyLDel(pages_judy_pptr, page->start_time_s, PJE0)))
+ fatal("DBENGINE CACHE: page with start time '%ld' of metric '%lu' in section '%lu' exists, but cannot be deleted.",
+ page->start_time_s, page->metric_id, page->section);
+ mem_after_judyl += JudyLMemUsed(*pages_judy_pptr);
+
+ mem_before_judyl += JudyLMemUsed(*metrics_judy_pptr);
+ if(!*pages_judy_pptr && !JudyLDel(metrics_judy_pptr, page->metric_id, PJE0))
+ fatal("DBENGINE CACHE: metric '%lu' in section '%lu' exists and is empty, but cannot be deleted.",
+ page->metric_id, page->section);
+ mem_after_judyl += JudyLMemUsed(*metrics_judy_pptr);
+
+ mem_before_judyl += JudyLMemUsed(cache->index[partition].sections_judy);
+ if(!*metrics_judy_pptr && !JudyLDel(&cache->index[partition].sections_judy, page->section, PJE0))
+ fatal("DBENGINE CACHE: section '%lu' exists and is empty, but cannot be deleted.", page->section);
+ mem_after_judyl += JudyLMemUsed(cache->index[partition].sections_judy);
+
+ pgc_stats_index_judy_change(cache, mem_before_judyl, mem_after_judyl);
+
+ pointer_del(cache, page);
+}
+
+static inline void remove_and_free_page_not_in_any_queue_and_acquired_for_deletion(PGC *cache, PGC_PAGE *page) {
+ size_t partition = pgc_indexing_partition(cache, page->metric_id);
+ pgc_index_write_lock(cache, partition);
+ remove_this_page_from_index_unsafe(cache, page, partition);
+ pgc_index_write_unlock(cache, partition);
+ free_this_page(cache, page, partition);
+}
+
+static inline bool make_acquired_page_clean_and_evict_or_page_release(PGC *cache, PGC_PAGE *page) {
+ pointer_check(cache, page);
+
+ page_transition_lock(cache, page);
+ pgc_ll_lock(cache, &cache->clean);
+
+ // make it clean - it does not have any accesses, so it will be prepended
+ page_set_clean(cache, page, true, true);
+
+ if(!acquired_page_get_for_deletion_or_release_it(cache, page)) {
+ pgc_ll_unlock(cache, &cache->clean);
+ page_transition_unlock(cache, page);
+ return false;
+ }
+
+ // remove it from the linked list
+ pgc_ll_del(cache, &cache->clean, page, true);
+ pgc_ll_unlock(cache, &cache->clean);
+ page_transition_unlock(cache, page);
+
+ remove_and_free_page_not_in_any_queue_and_acquired_for_deletion(cache, page);
+
+ return true;
+}
+
+// returns true, when there is more work to do
+static bool evict_pages_with_filter(PGC *cache, size_t max_skip, size_t max_evict, bool wait, bool all_of_them, evict_filter filter, void *data) {
+ size_t per1000 = cache_usage_per1000(cache, NULL);
+
+ if(!all_of_them && per1000 < cache->config.healthy_size_per1000)
+ // don't bother - not enough to do anything
+ return false;
+
+ size_t workers_running = __atomic_add_fetch(&cache->stats.workers_evict, 1, __ATOMIC_RELAXED);
+ if(!wait && !all_of_them && workers_running > cache->config.max_workers_evict_inline && per1000 < cache->config.severe_pressure_per1000) {
+ __atomic_sub_fetch(&cache->stats.workers_evict, 1, __ATOMIC_RELAXED);
+ return false;
+ }
+
+ internal_fatal(cache->clean.linked_list_in_sections_judy,
+ "wrong clean pages configuration - clean pages need to have a linked list, not a judy array");
+
+ if(unlikely(!max_skip))
+ max_skip = SIZE_MAX;
+ else if(unlikely(max_skip < 2))
+ max_skip = 2;
+
+ if(unlikely(!max_evict))
+ max_evict = SIZE_MAX;
+ else if(unlikely(max_evict < 2))
+ max_evict = 2;
+
+ size_t total_pages_evicted = 0;
+ size_t total_pages_skipped = 0;
+ bool stopped_before_finishing = false;
+ size_t spins = 0;
+
+ do {
+ if(++spins > 1)
+ __atomic_add_fetch(&cache->stats.evict_spins, 1, __ATOMIC_RELAXED);
+
+ bool batch;
+ size_t max_size_to_evict = 0;
+ if (unlikely(all_of_them)) {
+ max_size_to_evict = SIZE_MAX;
+ batch = true;
+ }
+ else if(unlikely(wait)) {
+ per1000 = cache_usage_per1000(cache, &max_size_to_evict);
+ batch = (wait && per1000 > cache->config.severe_pressure_per1000) ? true : false;
+ }
+ else {
+ batch = false;
+ max_size_to_evict = (cache_above_healthy_limit(cache)) ? 1 : 0;
+ }
+
+ if (!max_size_to_evict)
+ break;
+
+ // check if we have to stop
+ if(total_pages_evicted >= max_evict && !all_of_them) {
+ stopped_before_finishing = true;
+ break;
+ }
+
+ if(!all_of_them && !wait) {
+ if(!pgc_ll_trylock(cache, &cache->clean)) {
+ stopped_before_finishing = true;
+ goto premature_exit;
+ }
+
+ // at this point we have the clean lock
+ }
+ else
+ pgc_ll_lock(cache, &cache->clean);
+
+ // find a page to evict
+ PGC_PAGE *pages_to_evict = NULL;
+ size_t pages_to_evict_size = 0;
+ for(PGC_PAGE *page = cache->clean.base, *next = NULL, *first_page_we_relocated = NULL; page ; page = next) {
+ next = page->link.next;
+
+ if(unlikely(page == first_page_we_relocated))
+ // we did a complete loop on all pages
+ break;
+
+ if(unlikely(page_flag_check(page, PGC_PAGE_HAS_BEEN_ACCESSED | PGC_PAGE_HAS_NO_DATA_IGNORE_ACCESSES) == PGC_PAGE_HAS_BEEN_ACCESSED)) {
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(cache->clean.base, page, link.prev, link.next);
+ DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(cache->clean.base, page, link.prev, link.next);
+ page_flag_clear(page, PGC_PAGE_HAS_BEEN_ACCESSED);
+ continue;
+ }
+
+ if(unlikely(filter && !filter(page, data)))
+ continue;
+
+ if(non_acquired_page_get_for_deletion___while_having_clean_locked(cache, page)) {
+ // we can delete this page
+
+ // remove it from the clean list
+ pgc_ll_del(cache, &cache->clean, page, true);
+
+ __atomic_add_fetch(&cache->stats.evicting_entries, 1, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&cache->stats.evicting_size, page->assumed_size, __ATOMIC_RELAXED);
+
+ DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(pages_to_evict, page, link.prev, link.next);
+
+ pages_to_evict_size += page->assumed_size;
+
+ if(unlikely(all_of_them || (batch && pages_to_evict_size < max_size_to_evict)))
+ // get more pages
+ ;
+ else
+ // one page at a time
+ break;
+ }
+ else {
+ // we can't delete this page
+
+ if(!first_page_we_relocated)
+ first_page_we_relocated = page;
+
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(cache->clean.base, page, link.prev, link.next);
+ DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(cache->clean.base, page, link.prev, link.next);
+
+ // check if we have to stop
+ if(unlikely(++total_pages_skipped >= max_skip && !all_of_them)) {
+ stopped_before_finishing = true;
+ break;
+ }
+ }
+ }
+ pgc_ll_unlock(cache, &cache->clean);
+
+ if(likely(pages_to_evict)) {
+ // remove them from the index
+
+ if(unlikely(pages_to_evict->link.next)) {
+ // we have many pages, let's minimize the index locks we are going to get
+
+ PGC_PAGE *pages_per_partition[cache->config.partitions];
+ memset(pages_per_partition, 0, sizeof(PGC_PAGE *) * cache->config.partitions);
+
+ // sort them by partition
+ for (PGC_PAGE *page = pages_to_evict, *next = NULL; page; page = next) {
+ next = page->link.next;
+
+ size_t partition = pgc_indexing_partition(cache, page->metric_id);
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(pages_to_evict, page, link.prev, link.next);
+ DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(pages_per_partition[partition], page, link.prev, link.next);
+ }
+
+ // remove them from the index
+ for (size_t partition = 0; partition < cache->config.partitions; partition++) {
+ if (!pages_per_partition[partition]) continue;
+
+ pgc_index_write_lock(cache, partition);
+
+ for (PGC_PAGE *page = pages_per_partition[partition]; page; page = page->link.next)
+ remove_this_page_from_index_unsafe(cache, page, partition);
+
+ pgc_index_write_unlock(cache, partition);
+ }
+
+ // free them
+ for (size_t partition = 0; partition < cache->config.partitions; partition++) {
+ if (!pages_per_partition[partition]) continue;
+
+ for (PGC_PAGE *page = pages_per_partition[partition], *next = NULL; page; page = next) {
+ next = page->link.next;
+
+ size_t page_size = page->assumed_size;
+ free_this_page(cache, page, partition);
+
+ __atomic_sub_fetch(&cache->stats.evicting_entries, 1, __ATOMIC_RELAXED);
+ __atomic_sub_fetch(&cache->stats.evicting_size, page_size, __ATOMIC_RELAXED);
+
+ total_pages_evicted++;
+ }
+ }
+ }
+ else {
+ // just one page to be evicted
+ PGC_PAGE *page = pages_to_evict;
+
+ size_t page_size = page->assumed_size;
+
+ size_t partition = pgc_indexing_partition(cache, page->metric_id);
+ pgc_index_write_lock(cache, partition);
+ remove_this_page_from_index_unsafe(cache, page, partition);
+ pgc_index_write_unlock(cache, partition);
+ free_this_page(cache, page, partition);
+
+ __atomic_sub_fetch(&cache->stats.evicting_entries, 1, __ATOMIC_RELAXED);
+ __atomic_sub_fetch(&cache->stats.evicting_size, page_size, __ATOMIC_RELAXED);
+
+ total_pages_evicted++;
+ }
+ }
+ else
+ break;
+
+ } while(all_of_them || (total_pages_evicted < max_evict && total_pages_skipped < max_skip));
+
+ if(all_of_them && !filter) {
+ pgc_ll_lock(cache, &cache->clean);
+ if(cache->clean.stats->entries) {
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl, "DBENGINE CACHE: cannot free all clean pages, %zu are still in the clean queue",
+ cache->clean.stats->entries);
+ }
+ pgc_ll_unlock(cache, &cache->clean);
+ }
+
+premature_exit:
+ if(unlikely(total_pages_skipped))
+ __atomic_add_fetch(&cache->stats.evict_skipped, total_pages_skipped, __ATOMIC_RELAXED);
+
+ __atomic_sub_fetch(&cache->stats.workers_evict, 1, __ATOMIC_RELAXED);
+
+ return stopped_before_finishing;
+}
+
+static PGC_PAGE *page_add(PGC *cache, PGC_ENTRY *entry, bool *added) {
+ __atomic_add_fetch(&cache->stats.workers_add, 1, __ATOMIC_RELAXED);
+
+ size_t partition = pgc_indexing_partition(cache, entry->metric_id);
+
+#ifdef PGC_WITH_ARAL
+ PGC_PAGE *allocation = aral_mallocz(cache->aral[partition]);
+#endif
+ PGC_PAGE *page;
+ size_t spins = 0;
+
+ do {
+ if(++spins > 1)
+ __atomic_add_fetch(&cache->stats.insert_spins, 1, __ATOMIC_RELAXED);
+
+ pgc_index_write_lock(cache, partition);
+
+ size_t mem_before_judyl = 0, mem_after_judyl = 0;
+
+ mem_before_judyl += JudyLMemUsed(cache->index[partition].sections_judy);
+ Pvoid_t *metrics_judy_pptr = JudyLIns(&cache->index[partition].sections_judy, entry->section, PJE0);
+ if(unlikely(!metrics_judy_pptr || metrics_judy_pptr == PJERR))
+ fatal("DBENGINE CACHE: corrupted sections judy array");
+ mem_after_judyl += JudyLMemUsed(cache->index[partition].sections_judy);
+
+ mem_before_judyl += JudyLMemUsed(*metrics_judy_pptr);
+ Pvoid_t *pages_judy_pptr = JudyLIns(metrics_judy_pptr, entry->metric_id, PJE0);
+ if(unlikely(!pages_judy_pptr || pages_judy_pptr == PJERR))
+ fatal("DBENGINE CACHE: corrupted pages judy array");
+ mem_after_judyl += JudyLMemUsed(*metrics_judy_pptr);
+
+ mem_before_judyl += JudyLMemUsed(*pages_judy_pptr);
+ Pvoid_t *page_ptr = JudyLIns(pages_judy_pptr, entry->start_time_s, PJE0);
+ if(unlikely(!page_ptr || page_ptr == PJERR))
+ fatal("DBENGINE CACHE: corrupted page in judy array");
+ mem_after_judyl += JudyLMemUsed(*pages_judy_pptr);
+
+ pgc_stats_index_judy_change(cache, mem_before_judyl, mem_after_judyl);
+
+ page = *page_ptr;
+
+ if (likely(!page)) {
+#ifdef PGC_WITH_ARAL
+ page = allocation;
+ allocation = NULL;
+#else
+ page = mallocz(sizeof(PGC_PAGE) + cache->config.additional_bytes_per_page);
+#endif
+ page->refcount = 1;
+ page->accesses = (entry->hot) ? 0 : 1;
+ page->flags = 0;
+ page->section = entry->section;
+ page->metric_id = entry->metric_id;
+ page->start_time_s = entry->start_time_s;
+ page->end_time_s = entry->end_time_s,
+ page->update_every_s = entry->update_every_s,
+ page->data = entry->data;
+ page->assumed_size = page_assumed_size(cache, entry->size);
+ netdata_spinlock_init(&page->transition_spinlock);
+ page->link.prev = NULL;
+ page->link.next = NULL;
+
+ if(cache->config.additional_bytes_per_page) {
+ if(entry->custom_data)
+ memcpy(page->custom_data, entry->custom_data, cache->config.additional_bytes_per_page);
+ else
+ memset(page->custom_data, 0, cache->config.additional_bytes_per_page);
+ }
+
+ // put it in the index
+ *page_ptr = page;
+ pointer_add(cache, page);
+ pgc_index_write_unlock(cache, partition);
+
+ if (entry->hot)
+ page_set_hot(cache, page);
+ else
+ page_set_clean(cache, page, false, false);
+
+ PGC_REFERENCED_PAGES_PLUS1(cache, page);
+
+ // update statistics
+ __atomic_add_fetch(&cache->stats.added_entries, 1, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&cache->stats.added_size, page->assumed_size, __ATOMIC_RELAXED);
+
+ __atomic_add_fetch(&cache->stats.entries, 1, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&cache->stats.size, page->assumed_size, __ATOMIC_RELAXED);
+
+ if(added)
+ *added = true;
+ }
+ else {
+ if (!page_acquire(cache, page))
+ page = NULL;
+
+ else if(added)
+ *added = false;
+
+ pgc_index_write_unlock(cache, partition);
+
+ if(unlikely(!page)) {
+ // now that we don't have the lock,
+ // give it some time for the old page to go away
+ struct timespec ns = { .tv_sec = 0, .tv_nsec = 1 };
+ nanosleep(&ns, NULL);
+ }
+ }
+
+ } while(!page);
+
+#ifdef PGC_WITH_ARAL
+ if(allocation)
+ aral_freez(cache->aral[partition], allocation);
+#endif
+
+ __atomic_sub_fetch(&cache->stats.workers_add, 1, __ATOMIC_RELAXED);
+
+ if(!entry->hot)
+ evict_on_clean_page_added(cache);
+
+ if((cache->config.options & PGC_OPTIONS_FLUSH_PAGES_INLINE) || flushing_critical(cache)) {
+ flush_pages(cache, cache->config.max_flushes_inline, PGC_SECTION_ALL,
+ false, false);
+ }
+
+ return page;
+}
+
+static PGC_PAGE *page_find_and_acquire(PGC *cache, Word_t section, Word_t metric_id, time_t start_time_s, PGC_SEARCH method) {
+ __atomic_add_fetch(&cache->stats.workers_search, 1, __ATOMIC_RELAXED);
+
+ size_t *stats_hit_ptr, *stats_miss_ptr;
+
+ if(method == PGC_SEARCH_CLOSEST) {
+ __atomic_add_fetch(&cache->stats.searches_closest, 1, __ATOMIC_RELAXED);
+ stats_hit_ptr = &cache->stats.searches_closest_hits;
+ stats_miss_ptr = &cache->stats.searches_closest_misses;
+ }
+ else {
+ __atomic_add_fetch(&cache->stats.searches_exact, 1, __ATOMIC_RELAXED);
+ stats_hit_ptr = &cache->stats.searches_exact_hits;
+ stats_miss_ptr = &cache->stats.searches_exact_misses;
+ }
+
+ PGC_PAGE *page = NULL;
+ size_t partition = pgc_indexing_partition(cache, metric_id);
+
+ pgc_index_read_lock(cache, partition);
+
+ Pvoid_t *metrics_judy_pptr = JudyLGet(cache->index[partition].sections_judy, section, PJE0);
+ if(unlikely(metrics_judy_pptr == PJERR))
+ fatal("DBENGINE CACHE: corrupted sections judy array");
+
+ if(unlikely(!metrics_judy_pptr)) {
+ // section does not exist
+ goto cleanup;
+ }
+
+ Pvoid_t *pages_judy_pptr = JudyLGet(*metrics_judy_pptr, metric_id, PJE0);
+ if(unlikely(pages_judy_pptr == PJERR))
+ fatal("DBENGINE CACHE: corrupted pages judy array");
+
+ if(unlikely(!pages_judy_pptr)) {
+ // metric does not exist
+ goto cleanup;
+ }
+
+ switch(method) {
+ default:
+ case PGC_SEARCH_CLOSEST: {
+ Pvoid_t *page_ptr = JudyLGet(*pages_judy_pptr, start_time_s, PJE0);
+ if (unlikely(page_ptr == PJERR))
+ fatal("DBENGINE CACHE: corrupted page in pages judy array");
+
+ if (page_ptr)
+ page = *page_ptr;
+
+ else {
+ Word_t time = start_time_s;
+
+ // find the previous page
+ page_ptr = JudyLLast(*pages_judy_pptr, &time, PJE0);
+ if(unlikely(page_ptr == PJERR))
+ fatal("DBENGINE CACHE: corrupted page in pages judy array #2");
+
+ if(page_ptr) {
+ // found a page starting before our timestamp
+ // check if our timestamp is included
+ page = *page_ptr;
+ if(start_time_s > page->end_time_s)
+ // it is not good for us
+ page = NULL;
+ }
+
+ if(!page) {
+ // find the next page then...
+ time = start_time_s;
+ page_ptr = JudyLNext(*pages_judy_pptr, &time, PJE0);
+ if(page_ptr)
+ page = *page_ptr;
+ }
+ }
+ }
+ break;
+
+ case PGC_SEARCH_EXACT: {
+ Pvoid_t *page_ptr = JudyLGet(*pages_judy_pptr, start_time_s, PJE0);
+ if (unlikely(page_ptr == PJERR))
+ fatal("DBENGINE CACHE: corrupted page in pages judy array");
+
+ if (page_ptr)
+ page = *page_ptr;
+ }
+ break;
+
+ case PGC_SEARCH_FIRST: {
+ Word_t time = start_time_s;
+ Pvoid_t *page_ptr = JudyLFirst(*pages_judy_pptr, &time, PJE0);
+ if (unlikely(page_ptr == PJERR))
+ fatal("DBENGINE CACHE: corrupted page in pages judy array");
+
+ if (page_ptr)
+ page = *page_ptr;
+ }
+ break;
+
+ case PGC_SEARCH_NEXT: {
+ Word_t time = start_time_s;
+ Pvoid_t *page_ptr = JudyLNext(*pages_judy_pptr, &time, PJE0);
+ if (unlikely(page_ptr == PJERR))
+ fatal("DBENGINE CACHE: corrupted page in pages judy array");
+
+ if (page_ptr)
+ page = *page_ptr;
+ }
+ break;
+
+ case PGC_SEARCH_LAST: {
+ Word_t time = start_time_s;
+ Pvoid_t *page_ptr = JudyLLast(*pages_judy_pptr, &time, PJE0);
+ if (unlikely(page_ptr == PJERR))
+ fatal("DBENGINE CACHE: corrupted page in pages judy array");
+
+ if (page_ptr)
+ page = *page_ptr;
+ }
+ break;
+
+ case PGC_SEARCH_PREV: {
+ Word_t time = start_time_s;
+ Pvoid_t *page_ptr = JudyLPrev(*pages_judy_pptr, &time, PJE0);
+ if (unlikely(page_ptr == PJERR))
+ fatal("DBENGINE CACHE: corrupted page in pages judy array");
+
+ if (page_ptr)
+ page = *page_ptr;
+ }
+ break;
+ }
+
+ if(page) {
+ pointer_check(cache, page);
+
+ if(!page_acquire(cache, page)) {
+ // this page is not good to use
+ page = NULL;
+ }
+ }
+
+cleanup:
+ pgc_index_read_unlock(cache, partition);
+
+ if(page) {
+ __atomic_add_fetch(stats_hit_ptr, 1, __ATOMIC_RELAXED);
+ page_has_been_accessed(cache, page);
+ }
+ else
+ __atomic_add_fetch(stats_miss_ptr, 1, __ATOMIC_RELAXED);
+
+ __atomic_sub_fetch(&cache->stats.workers_search, 1, __ATOMIC_RELAXED);
+
+ return page;
+}
+
+static void all_hot_pages_to_dirty(PGC *cache, Word_t section) {
+ pgc_ll_lock(cache, &cache->hot);
+
+ bool first = true;
+ Word_t last_section = (section == PGC_SECTION_ALL) ? 0 : section;
+ Pvoid_t *section_pages_pptr;
+ while ((section_pages_pptr = JudyLFirstThenNext(cache->hot.sections_judy, &last_section, &first))) {
+ if(section != PGC_SECTION_ALL && last_section != section)
+ break;
+
+ struct section_pages *sp = *section_pages_pptr;
+
+ PGC_PAGE *page = sp->base;
+ while(page) {
+ PGC_PAGE *next = page->link.next;
+
+ if(page_acquire(cache, page)) {
+ page_set_dirty(cache, page, true);
+ page_release(cache, page, false);
+ // page ptr may be invalid now
+ }
+
+ page = next;
+ }
+ }
+ pgc_ll_unlock(cache, &cache->hot);
+}
+
+// returns true when there is more work to do
+static bool flush_pages(PGC *cache, size_t max_flushes, Word_t section, bool wait, bool all_of_them) {
+ internal_fatal(!cache->dirty.linked_list_in_sections_judy,
+ "wrong dirty pages configuration - dirty pages need to have a judy array, not a linked list");
+
+ if(!all_of_them && !wait) {
+ // we have been called from a data collection thread
+ // let's not waste its time...
+
+ if(!pgc_ll_trylock(cache, &cache->dirty)) {
+ // we would block, so give up...
+ return true;
+ }
+
+ // we got the lock at this point
+ }
+ else
+ pgc_ll_lock(cache, &cache->dirty);
+
+ size_t optimal_flush_size = cache->config.max_dirty_pages_per_call;
+ size_t dirty_version_at_entry = cache->dirty.version;
+ if(!all_of_them && (cache->dirty.stats->entries < optimal_flush_size || cache->dirty.last_version_checked == dirty_version_at_entry)) {
+ pgc_ll_unlock(cache, &cache->dirty);
+ return false;
+ }
+
+ __atomic_add_fetch(&cache->stats.workers_flush, 1, __ATOMIC_RELAXED);
+
+ bool have_dirty_lock = true;
+
+ if(all_of_them || !max_flushes)
+ max_flushes = SIZE_MAX;
+
+ Word_t last_section = (section == PGC_SECTION_ALL) ? 0 : section;
+ size_t flushes_so_far = 0;
+ Pvoid_t *section_pages_pptr;
+ bool stopped_before_finishing = false;
+ size_t spins = 0;
+ bool first = true;
+
+ while (have_dirty_lock && (section_pages_pptr = JudyLFirstThenNext(cache->dirty.sections_judy, &last_section, &first))) {
+ if(section != PGC_SECTION_ALL && last_section != section)
+ break;
+
+ struct section_pages *sp = *section_pages_pptr;
+ if(!all_of_them && sp->entries < optimal_flush_size)
+ continue;
+
+ if(!all_of_them && flushes_so_far > max_flushes) {
+ stopped_before_finishing = true;
+ break;
+ }
+
+ if(++spins > 1)
+ __atomic_add_fetch(&cache->stats.flush_spins, 1, __ATOMIC_RELAXED);
+
+ PGC_ENTRY array[optimal_flush_size];
+ PGC_PAGE *pages[optimal_flush_size];
+ size_t pages_added = 0, pages_added_size = 0;
+ size_t pages_removed_dirty = 0, pages_removed_dirty_size = 0;
+ size_t pages_cancelled = 0, pages_cancelled_size = 0;
+ size_t pages_made_clean = 0, pages_made_clean_size = 0;
+
+ PGC_PAGE *page = sp->base;
+ while (page && pages_added < optimal_flush_size) {
+ PGC_PAGE *next = page->link.next;
+
+ internal_fatal(page_get_status_flags(page) != PGC_PAGE_DIRTY,
+ "DBENGINE CACHE: page should be in the dirty list before saved");
+
+ if (page_acquire(cache, page)) {
+ internal_fatal(page_get_status_flags(page) != PGC_PAGE_DIRTY,
+ "DBENGINE CACHE: page should be in the dirty list before saved");
+
+ internal_fatal(page->section != last_section,
+ "DBENGINE CACHE: dirty page is not in the right section (tier)");
+
+ if(!page_transition_trylock(cache, page)) {
+ page_release(cache, page, false);
+ // page ptr may be invalid now
+ }
+ else {
+ pages[pages_added] = page;
+ array[pages_added] = (PGC_ENTRY) {
+ .section = page->section,
+ .metric_id = page->metric_id,
+ .start_time_s = page->start_time_s,
+ .end_time_s = __atomic_load_n(&page->end_time_s, __ATOMIC_RELAXED),
+ .update_every_s = page->update_every_s,
+ .size = page_size_from_assumed_size(cache, page->assumed_size),
+ .data = page->data,
+ .custom_data = (cache->config.additional_bytes_per_page) ? page->custom_data : NULL,
+ .hot = false,
+ };
+
+ pages_added_size += page->assumed_size;
+ pages_added++;
+ }
+ }
+
+ page = next;
+ }
+
+ // do we have enough to save?
+ if(all_of_them || pages_added == optimal_flush_size) {
+ // we should do it
+
+ for (size_t i = 0; i < pages_added; i++) {
+ PGC_PAGE *tpg = pages[i];
+
+ internal_fatal(page_get_status_flags(tpg) != PGC_PAGE_DIRTY,
+ "DBENGINE CACHE: page should be in the dirty list before saved");
+
+ __atomic_add_fetch(&cache->stats.flushing_entries, 1, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&cache->stats.flushing_size, tpg->assumed_size, __ATOMIC_RELAXED);
+
+ // remove it from the dirty list
+ pgc_ll_del(cache, &cache->dirty, tpg, true);
+
+ pages_removed_dirty_size += tpg->assumed_size;
+ pages_removed_dirty++;
+ }
+
+ // next time, repeat the same section (tier)
+ first = true;
+ }
+ else {
+ // we can't do it
+
+ for (size_t i = 0; i < pages_added; i++) {
+ PGC_PAGE *tpg = pages[i];
+
+ internal_fatal(page_get_status_flags(tpg) != PGC_PAGE_DIRTY,
+ "DBENGINE CACHE: page should be in the dirty list before saved");
+
+ pages_cancelled_size += tpg->assumed_size;
+ pages_cancelled++;
+
+ page_transition_unlock(cache, tpg);
+ page_release(cache, tpg, false);
+ // page ptr may be invalid now
+ }
+
+ __atomic_add_fetch(&cache->stats.flushes_cancelled, pages_cancelled, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&cache->stats.flushes_cancelled_size, pages_cancelled_size, __ATOMIC_RELAXED);
+
+ internal_fatal(pages_added != pages_cancelled || pages_added_size != pages_cancelled_size,
+ "DBENGINE CACHE: flushing cancel pages mismatch");
+
+ // next time, continue to the next section (tier)
+ first = false;
+ continue;
+ }
+
+ if(cache->config.pgc_save_init_cb)
+ cache->config.pgc_save_init_cb(cache, last_section);
+
+ pgc_ll_unlock(cache, &cache->dirty);
+ have_dirty_lock = false;
+
+ // call the callback to save them
+ // it may take some time, so let's release the lock
+ cache->config.pgc_save_dirty_cb(cache, array, pages, pages_added);
+ flushes_so_far++;
+
+ __atomic_add_fetch(&cache->stats.flushes_completed, pages_added, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&cache->stats.flushes_completed_size, pages_added_size, __ATOMIC_RELAXED);
+
+ size_t pages_to_evict = 0; (void)pages_to_evict;
+ for (size_t i = 0; i < pages_added; i++) {
+ PGC_PAGE *tpg = pages[i];
+
+ internal_fatal(page_get_status_flags(tpg) != 0,
+ "DBENGINE CACHE: page should not be in any list while it is being saved");
+
+ __atomic_sub_fetch(&cache->stats.flushing_entries, 1, __ATOMIC_RELAXED);
+ __atomic_sub_fetch(&cache->stats.flushing_size, tpg->assumed_size, __ATOMIC_RELAXED);
+
+ pages_made_clean_size += tpg->assumed_size;
+ pages_made_clean++;
+
+ if(!tpg->accesses)
+ pages_to_evict++;
+
+ page_set_clean(cache, tpg, true, false);
+ page_transition_unlock(cache, tpg);
+ page_release(cache, tpg, false);
+ // tpg ptr may be invalid now
+ }
+
+ internal_fatal(pages_added != pages_made_clean || pages_added != pages_removed_dirty ||
+ pages_added_size != pages_made_clean_size || pages_added_size != pages_removed_dirty_size
+ , "DBENGINE CACHE: flushing pages mismatch");
+
+ if(!all_of_them && !wait) {
+ if(pgc_ll_trylock(cache, &cache->dirty))
+ have_dirty_lock = true;
+
+ else {
+ stopped_before_finishing = true;
+ have_dirty_lock = false;
+ }
+ }
+ else {
+ pgc_ll_lock(cache, &cache->dirty);
+ have_dirty_lock = true;
+ }
+ }
+
+ if(have_dirty_lock) {
+ if(!stopped_before_finishing && dirty_version_at_entry > cache->dirty.last_version_checked)
+ cache->dirty.last_version_checked = dirty_version_at_entry;
+
+ pgc_ll_unlock(cache, &cache->dirty);
+ }
+
+ __atomic_sub_fetch(&cache->stats.workers_flush, 1, __ATOMIC_RELAXED);
+
+ return stopped_before_finishing;
+}
+
+void free_all_unreferenced_clean_pages(PGC *cache) {
+ evict_pages(cache, 0, 0, true, true);
+}
+
+// ----------------------------------------------------------------------------
+// public API
+
+PGC *pgc_create(const char *name,
+ size_t clean_size_bytes, free_clean_page_callback pgc_free_cb,
+ size_t max_dirty_pages_per_flush,
+ save_dirty_init_callback pgc_save_init_cb,
+ save_dirty_page_callback pgc_save_dirty_cb,
+ size_t max_pages_per_inline_eviction, size_t max_inline_evictors,
+ size_t max_skip_pages_per_inline_eviction,
+ size_t max_flushes_inline,
+ PGC_OPTIONS options, size_t partitions, size_t additional_bytes_per_page) {
+
+ if(max_pages_per_inline_eviction < 2)
+ max_pages_per_inline_eviction = 2;
+
+ if(max_dirty_pages_per_flush < 1)
+ max_dirty_pages_per_flush = 1;
+
+ if(max_flushes_inline * max_dirty_pages_per_flush < 2)
+ max_flushes_inline = 2;
+
+ PGC *cache = callocz(1, sizeof(PGC));
+ strncpyz(cache->config.name, name, PGC_NAME_MAX);
+ cache->config.options = options;
+ cache->config.clean_size = (clean_size_bytes < 1 * 1024 * 1024) ? 1 * 1024 * 1024 : clean_size_bytes;
+ cache->config.pgc_free_clean_cb = pgc_free_cb;
+ cache->config.max_dirty_pages_per_call = max_dirty_pages_per_flush;
+ cache->config.pgc_save_init_cb = pgc_save_init_cb;
+ cache->config.pgc_save_dirty_cb = pgc_save_dirty_cb;
+ cache->config.max_pages_per_inline_eviction = (max_pages_per_inline_eviction < 2) ? 2 : max_pages_per_inline_eviction;
+ cache->config.max_skip_pages_per_inline_eviction = (max_skip_pages_per_inline_eviction < 2) ? 2 : max_skip_pages_per_inline_eviction;
+ cache->config.max_flushes_inline = (max_flushes_inline < 1) ? 1 : max_flushes_inline;
+ cache->config.partitions = partitions < 1 ? (size_t)get_netdata_cpus() : partitions;
+ cache->config.additional_bytes_per_page = additional_bytes_per_page;
+
+ cache->config.max_workers_evict_inline = max_inline_evictors;
+ cache->config.severe_pressure_per1000 = 1010;
+ cache->config.aggressive_evict_per1000 = 990;
+ cache->config.healthy_size_per1000 = 980;
+ cache->config.evict_low_threshold_per1000 = 970;
+
+ cache->index = callocz(cache->config.partitions, sizeof(struct pgc_index));
+
+ for(size_t part = 0; part < cache->config.partitions ; part++)
+ netdata_rwlock_init(&cache->index[part].rwlock);
+
+ netdata_spinlock_init(&cache->hot.spinlock);
+ netdata_spinlock_init(&cache->dirty.spinlock);
+ netdata_spinlock_init(&cache->clean.spinlock);
+
+ cache->hot.flags = PGC_PAGE_HOT;
+ cache->hot.linked_list_in_sections_judy = true;
+ cache->hot.stats = &cache->stats.queues.hot;
+
+ cache->dirty.flags = PGC_PAGE_DIRTY;
+ cache->dirty.linked_list_in_sections_judy = true;
+ cache->dirty.stats = &cache->stats.queues.dirty;
+
+ cache->clean.flags = PGC_PAGE_CLEAN;
+ cache->clean.linked_list_in_sections_judy = false;
+ cache->clean.stats = &cache->stats.queues.clean;
+
+ pgc_section_pages_static_aral_init();
+
+#ifdef PGC_WITH_ARAL
+ cache->aral = callocz(cache->config.partitions, sizeof(ARAL *));
+ for(size_t part = 0; part < cache->config.partitions ; part++) {
+ char buf[100 +1];
+ snprintfz(buf, 100, "%s[%zu]", name, part);
+ cache->aral[part] = aral_create(
+ buf,
+ sizeof(PGC_PAGE) + cache->config.additional_bytes_per_page,
+ 0,
+ 16384,
+ aral_statistics(pgc_section_pages_aral),
+ NULL, NULL, false, false);
+ }
+#endif
+
+ pointer_index_init(cache);
+
+ return cache;
+}
+
+struct aral_statistics *pgc_aral_statistics(void) {
+ return aral_statistics(pgc_section_pages_aral);
+}
+
+size_t pgc_aral_structures(void) {
+ return aral_structures(pgc_section_pages_aral);
+}
+
+size_t pgc_aral_overhead(void) {
+ return aral_overhead(pgc_section_pages_aral);
+}
+
+void pgc_flush_all_hot_and_dirty_pages(PGC *cache, Word_t section) {
+ all_hot_pages_to_dirty(cache, section);
+
+ // save all dirty pages to make them clean
+ flush_pages(cache, 0, section, true, true);
+}
+
+void pgc_destroy(PGC *cache) {
+ // convert all hot pages to dirty
+ all_hot_pages_to_dirty(cache, PGC_SECTION_ALL);
+
+ // save all dirty pages to make them clean
+ flush_pages(cache, 0, PGC_SECTION_ALL, true, true);
+
+ // free all unreferenced clean pages
+ free_all_unreferenced_clean_pages(cache);
+
+ if(PGC_REFERENCED_PAGES(cache))
+ error("DBENGINE CACHE: there are %zu referenced cache pages - leaving the cache allocated", PGC_REFERENCED_PAGES(cache));
+ else {
+ pointer_destroy_index(cache);
+
+ for(size_t part = 0; part < cache->config.partitions ; part++)
+ netdata_rwlock_destroy(&cache->index[part].rwlock);
+
+#ifdef PGC_WITH_ARAL
+ for(size_t part = 0; part < cache->config.partitions ; part++)
+ aral_destroy(cache->aral[part]);
+
+ freez(cache->aral);
+#endif
+
+ freez(cache);
+ }
+}
+
+PGC_PAGE *pgc_page_add_and_acquire(PGC *cache, PGC_ENTRY entry, bool *added) {
+ return page_add(cache, &entry, added);
+}
+
+PGC_PAGE *pgc_page_dup(PGC *cache, PGC_PAGE *page) {
+ if(!page_acquire(cache, page))
+ fatal("DBENGINE CACHE: tried to dup a page that is not acquired!");
+
+ return page;
+}
+
+void pgc_page_release(PGC *cache, PGC_PAGE *page) {
+ page_release(cache, page, is_page_clean(page));
+}
+
+void pgc_page_hot_to_dirty_and_release(PGC *cache, PGC_PAGE *page) {
+ __atomic_add_fetch(&cache->stats.workers_hot2dirty, 1, __ATOMIC_RELAXED);
+
+//#ifdef NETDATA_INTERNAL_CHECKS
+// page_transition_lock(cache, page);
+// internal_fatal(!is_page_hot(page), "DBENGINE CACHE: called %s() but page is not hot", __FUNCTION__ );
+// page_transition_unlock(cache, page);
+//#endif
+
+ // make page dirty
+ page_set_dirty(cache, page, false);
+
+ // release the page
+ page_release(cache, page, true);
+ // page ptr may be invalid now
+
+ __atomic_sub_fetch(&cache->stats.workers_hot2dirty, 1, __ATOMIC_RELAXED);
+
+ // flush, if we have to
+ if((cache->config.options & PGC_OPTIONS_FLUSH_PAGES_INLINE) || flushing_critical(cache)) {
+ flush_pages(cache, cache->config.max_flushes_inline, PGC_SECTION_ALL,
+ false, false);
+ }
+}
+
+bool pgc_page_to_clean_evict_or_release(PGC *cache, PGC_PAGE *page) {
+ bool ret;
+
+ __atomic_add_fetch(&cache->stats.workers_hot2dirty, 1, __ATOMIC_RELAXED);
+
+ // prevent accesses from increasing the accesses counter
+ page_flag_set(page, PGC_PAGE_HAS_NO_DATA_IGNORE_ACCESSES);
+
+ // zero the accesses counter
+ __atomic_store_n(&page->accesses, 0, __ATOMIC_RELEASE);
+
+ // if there are no other references to it, evict it immediately
+ if(make_acquired_page_clean_and_evict_or_page_release(cache, page)) {
+ __atomic_add_fetch(&cache->stats.hot_empty_pages_evicted_immediately, 1, __ATOMIC_RELAXED);
+ ret = true;
+ }
+ else {
+ __atomic_add_fetch(&cache->stats.hot_empty_pages_evicted_later, 1, __ATOMIC_RELAXED);
+ ret = false;
+ }
+
+ __atomic_sub_fetch(&cache->stats.workers_hot2dirty, 1, __ATOMIC_RELAXED);
+
+ return ret;
+}
+
+Word_t pgc_page_section(PGC_PAGE *page) {
+ return page->section;
+}
+
+Word_t pgc_page_metric(PGC_PAGE *page) {
+ return page->metric_id;
+}
+
+time_t pgc_page_start_time_s(PGC_PAGE *page) {
+ return page->start_time_s;
+}
+
+time_t pgc_page_end_time_s(PGC_PAGE *page) {
+ return page->end_time_s;
+}
+
+time_t pgc_page_update_every_s(PGC_PAGE *page) {
+ return page->update_every_s;
+}
+
+time_t pgc_page_fix_update_every(PGC_PAGE *page, time_t update_every_s) {
+ if(page->update_every_s == 0)
+ page->update_every_s = update_every_s;
+
+ return page->update_every_s;
+}
+
+time_t pgc_page_fix_end_time_s(PGC_PAGE *page, time_t end_time_s) {
+ page->end_time_s = end_time_s;
+ return page->end_time_s;
+}
+
+void *pgc_page_data(PGC_PAGE *page) {
+ return page->data;
+}
+
+void *pgc_page_custom_data(PGC *cache, PGC_PAGE *page) {
+ if(cache->config.additional_bytes_per_page)
+ return page->custom_data;
+
+ return NULL;
+}
+
+size_t pgc_page_data_size(PGC *cache, PGC_PAGE *page) {
+ return page_size_from_assumed_size(cache, page->assumed_size);
+}
+
+bool pgc_is_page_hot(PGC_PAGE *page) {
+ return is_page_hot(page);
+}
+
+bool pgc_is_page_dirty(PGC_PAGE *page) {
+ return is_page_dirty(page);
+}
+
+bool pgc_is_page_clean(PGC_PAGE *page) {
+ return is_page_clean(page);
+}
+
+void pgc_reset_hot_max(PGC *cache) {
+ size_t entries = __atomic_load_n(&cache->hot.stats->entries, __ATOMIC_RELAXED);
+ size_t size = __atomic_load_n(&cache->hot.stats->size, __ATOMIC_RELAXED);
+
+ __atomic_store_n(&cache->hot.stats->max_entries, entries, __ATOMIC_RELAXED);
+ __atomic_store_n(&cache->hot.stats->max_size, size, __ATOMIC_RELAXED);
+
+ size_t size_to_evict = 0;
+ cache_usage_per1000(cache, &size_to_evict);
+ evict_pages(cache, 0, 0, true, false);
+}
+
+void pgc_set_dynamic_target_cache_size_callback(PGC *cache, dynamic_target_cache_size_callback callback) {
+ cache->config.dynamic_target_size_cb = callback;
+
+ size_t size_to_evict = 0;
+ cache_usage_per1000(cache, &size_to_evict);
+ evict_pages(cache, 0, 0, true, false);
+}
+
+size_t pgc_get_current_cache_size(PGC *cache) {
+ cache_usage_per1000(cache, NULL);
+ return __atomic_load_n(&cache->stats.current_cache_size, __ATOMIC_RELAXED);
+}
+
+size_t pgc_get_wanted_cache_size(PGC *cache) {
+ cache_usage_per1000(cache, NULL);
+ return __atomic_load_n(&cache->stats.wanted_cache_size, __ATOMIC_RELAXED);
+}
+
+bool pgc_evict_pages(PGC *cache, size_t max_skip, size_t max_evict) {
+ bool under_pressure = cache_needs_space_aggressively(cache);
+ return evict_pages(cache,
+ under_pressure ? 0 : max_skip,
+ under_pressure ? 0 : max_evict,
+ true, false);
+}
+
+bool pgc_flush_pages(PGC *cache, size_t max_flushes) {
+ bool under_pressure = flushing_critical(cache);
+ return flush_pages(cache, under_pressure ? 0 : max_flushes, PGC_SECTION_ALL, true, false);
+}
+
+void pgc_page_hot_set_end_time_s(PGC *cache __maybe_unused, PGC_PAGE *page, time_t end_time_s) {
+ internal_fatal(!is_page_hot(page),
+ "DBENGINE CACHE: end_time_s update on non-hot page");
+
+ internal_fatal(end_time_s < __atomic_load_n(&page->end_time_s, __ATOMIC_RELAXED),
+ "DBENGINE CACHE: end_time_s is not bigger than existing");
+
+ __atomic_store_n(&page->end_time_s, end_time_s, __ATOMIC_RELAXED);
+
+#ifdef PGC_COUNT_POINTS_COLLECTED
+ __atomic_add_fetch(&cache->stats.points_collected, 1, __ATOMIC_RELAXED);
+#endif
+}
+
+PGC_PAGE *pgc_page_get_and_acquire(PGC *cache, Word_t section, Word_t metric_id, time_t start_time_s, PGC_SEARCH method) {
+ return page_find_and_acquire(cache, section, metric_id, start_time_s, method);
+}
+
+struct pgc_statistics pgc_get_statistics(PGC *cache) {
+ // FIXME - get the statistics atomically
+ return cache->stats;
+}
+
+size_t pgc_hot_and_dirty_entries(PGC *cache) {
+ size_t entries = 0;
+
+ entries += __atomic_load_n(&cache->hot.stats->entries, __ATOMIC_RELAXED);
+ entries += __atomic_load_n(&cache->dirty.stats->entries, __ATOMIC_RELAXED);
+ entries += __atomic_load_n(&cache->stats.flushing_entries, __ATOMIC_RELAXED);
+ entries += __atomic_load_n(&cache->stats.hot2dirty_entries, __ATOMIC_RELAXED);
+
+ return entries;
+}
+
+void pgc_open_cache_to_journal_v2(PGC *cache, Word_t section, unsigned datafile_fileno, uint8_t type, migrate_to_v2_callback cb, void *data) {
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.journal_v2_indexing_started, 1, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&cache->stats.workers_jv2_flush, 1, __ATOMIC_RELAXED);
+
+ pgc_ll_lock(cache, &cache->hot);
+
+ Pvoid_t JudyL_metrics = NULL;
+ Pvoid_t JudyL_extents_pos = NULL;
+
+ size_t count_of_unique_extents = 0;
+ size_t count_of_unique_metrics = 0;
+ size_t count_of_unique_pages = 0;
+
+ size_t master_extent_index_id = 0;
+
+ Pvoid_t *section_pages_pptr = JudyLGet(cache->hot.sections_judy, section, PJE0);
+ if(!section_pages_pptr) {
+ pgc_ll_unlock(cache, &cache->hot);
+ return;
+ }
+
+ struct section_pages *sp = *section_pages_pptr;
+ if(!netdata_spinlock_trylock(&sp->migration_to_v2_spinlock)) {
+ internal_fatal(true, "DBENGINE: migration to journal v2 is already running for this section");
+ pgc_ll_unlock(cache, &cache->hot);
+ return;
+ }
+
+ ARAL *ar_mi = aral_by_size_acquire(sizeof(struct jv2_metrics_info));
+ ARAL *ar_pi = aral_by_size_acquire(sizeof(struct jv2_page_info));
+ ARAL *ar_ei = aral_by_size_acquire(sizeof(struct jv2_extents_info));
+
+ for(PGC_PAGE *page = sp->base; page ; page = page->link.next) {
+ struct extent_io_data *xio = (struct extent_io_data *)page->custom_data;
+ if(xio->fileno != datafile_fileno) continue;
+
+ if(page_flag_check(page, PGC_PAGE_IS_BEING_MIGRATED_TO_V2)) {
+ internal_fatal(true, "Migration to journal v2: page has already been migrated to v2");
+ continue;
+ }
+
+ if(!page_transition_trylock(cache, page)) {
+ internal_fatal(true, "Migration to journal v2: cannot get page transition lock");
+ continue;
+ }
+
+ if(!page_acquire(cache, page)) {
+ internal_fatal(true, "Migration to journal v2: cannot acquire page for migration to v2");
+ continue;
+ }
+
+ page_flag_set(page, PGC_PAGE_IS_BEING_MIGRATED_TO_V2);
+
+ pgc_ll_unlock(cache, &cache->hot);
+
+ // update the extents JudyL
+
+ size_t current_extent_index_id;
+ Pvoid_t *PValue = JudyLIns(&JudyL_extents_pos, xio->pos, PJE0);
+ if(!PValue || *PValue == PJERR)
+ fatal("Corrupted JudyL extents pos");
+
+ struct jv2_extents_info *ei;
+ if(!*PValue) {
+ ei = aral_mallocz(ar_ei); // callocz(1, sizeof(struct jv2_extents_info));
+ ei->pos = xio->pos;
+ ei->bytes = xio->bytes;
+ ei->number_of_pages = 1;
+ ei->index = master_extent_index_id++;
+ *PValue = ei;
+
+ count_of_unique_extents++;
+ }
+ else {
+ ei = *PValue;
+ ei->number_of_pages++;
+ }
+
+ current_extent_index_id = ei->index;
+
+ // update the metrics JudyL
+
+ PValue = JudyLIns(&JudyL_metrics, page->metric_id, PJE0);
+ if(!PValue || *PValue == PJERR)
+ fatal("Corrupted JudyL metrics");
+
+ struct jv2_metrics_info *mi;
+ if(!*PValue) {
+ mi = aral_mallocz(ar_mi); // callocz(1, sizeof(struct jv2_metrics_info));
+ mi->uuid = mrg_metric_uuid(main_mrg, (METRIC *)page->metric_id);
+ mi->first_time_s = page->start_time_s;
+ mi->last_time_s = page->end_time_s;
+ mi->number_of_pages = 1;
+ mi->page_list_header = 0;
+ mi->JudyL_pages_by_start_time = NULL;
+ *PValue = mi;
+
+ count_of_unique_metrics++;
+ }
+ else {
+ mi = *PValue;
+ mi->number_of_pages++;
+ if(page->start_time_s < mi->first_time_s)
+ mi->first_time_s = page->start_time_s;
+ if(page->end_time_s > mi->last_time_s)
+ mi->last_time_s = page->end_time_s;
+ }
+
+ PValue = JudyLIns(&mi->JudyL_pages_by_start_time, page->start_time_s, PJE0);
+ if(!PValue || *PValue == PJERR)
+ fatal("Corrupted JudyL metric pages");
+
+ if(!*PValue) {
+ struct jv2_page_info *pi = aral_mallocz(ar_pi); // callocz(1, (sizeof(struct jv2_page_info)));
+ pi->start_time_s = page->start_time_s;
+ pi->end_time_s = page->end_time_s;
+ pi->update_every_s = page->update_every_s;
+ pi->page_length = page_size_from_assumed_size(cache, page->assumed_size);
+ pi->page = page;
+ pi->extent_index = current_extent_index_id;
+ pi->custom_data = (cache->config.additional_bytes_per_page) ? page->custom_data : NULL;
+ *PValue = pi;
+
+ count_of_unique_pages++;
+ }
+ else {
+ // impossible situation
+ internal_fatal(true, "Page is already in JudyL metric pages");
+ page_flag_clear(page, PGC_PAGE_IS_BEING_MIGRATED_TO_V2);
+ page_transition_unlock(cache, page);
+ page_release(cache, page, false);
+ }
+
+ pgc_ll_lock(cache, &cache->hot);
+ }
+
+ netdata_spinlock_unlock(&sp->migration_to_v2_spinlock);
+ pgc_ll_unlock(cache, &cache->hot);
+
+ // callback
+ cb(section, datafile_fileno, type, JudyL_metrics, JudyL_extents_pos, count_of_unique_extents, count_of_unique_metrics, count_of_unique_pages, data);
+
+ {
+ Pvoid_t *PValue1;
+ bool metric_id_first = true;
+ Word_t metric_id = 0;
+ while ((PValue1 = JudyLFirstThenNext(JudyL_metrics, &metric_id, &metric_id_first))) {
+ struct jv2_metrics_info *mi = *PValue1;
+
+ Pvoid_t *PValue2;
+ bool start_time_first = true;
+ Word_t start_time = 0;
+ while ((PValue2 = JudyLFirstThenNext(mi->JudyL_pages_by_start_time, &start_time, &start_time_first))) {
+ struct jv2_page_info *pi = *PValue2;
+ page_transition_unlock(cache, pi->page);
+ pgc_page_hot_to_dirty_and_release(cache, pi->page);
+ // make_acquired_page_clean_and_evict_or_page_release(cache, pi->page);
+ aral_freez(ar_pi, pi);
+ }
+
+ JudyLFreeArray(&mi->JudyL_pages_by_start_time, PJE0);
+ aral_freez(ar_mi, mi);
+ }
+ JudyLFreeArray(&JudyL_metrics, PJE0);
+ }
+
+ {
+ Pvoid_t *PValue;
+ bool extent_pos_first = true;
+ Word_t extent_pos = 0;
+ while ((PValue = JudyLFirstThenNext(JudyL_extents_pos, &extent_pos, &extent_pos_first))) {
+ struct jv2_extents_info *ei = *PValue;
+ aral_freez(ar_ei, ei);
+ }
+ JudyLFreeArray(&JudyL_extents_pos, PJE0);
+ }
+
+ aral_by_size_release(ar_ei);
+ aral_by_size_release(ar_pi);
+ aral_by_size_release(ar_mi);
+
+ __atomic_sub_fetch(&cache->stats.workers_jv2_flush, 1, __ATOMIC_RELAXED);
+}
+
+static bool match_page_data(PGC_PAGE *page, void *data) {
+ return (page->data == data);
+}
+
+void pgc_open_evict_clean_pages_of_datafile(PGC *cache, struct rrdengine_datafile *datafile) {
+ evict_pages_with_filter(cache, 0, 0, true, true, match_page_data, datafile);
+}
+
+size_t pgc_count_clean_pages_having_data_ptr(PGC *cache, Word_t section, void *ptr) {
+ size_t found = 0;
+
+ pgc_ll_lock(cache, &cache->clean);
+ for(PGC_PAGE *page = cache->clean.base; page ;page = page->link.next)
+ found += (page->data == ptr && page->section == section) ? 1 : 0;
+ pgc_ll_unlock(cache, &cache->clean);
+
+ return found;
+}
+
+size_t pgc_count_hot_pages_having_data_ptr(PGC *cache, Word_t section, void *ptr) {
+ size_t found = 0;
+
+ pgc_ll_lock(cache, &cache->hot);
+ Pvoid_t *section_pages_pptr = JudyLGet(cache->hot.sections_judy, section, PJE0);
+ if(section_pages_pptr) {
+ struct section_pages *sp = *section_pages_pptr;
+ for(PGC_PAGE *page = sp->base; page ;page = page->link.next)
+ found += (page->data == ptr) ? 1 : 0;
+ }
+ pgc_ll_unlock(cache, &cache->hot);
+
+ return found;
+}
+
+// ----------------------------------------------------------------------------
+// unittest
+
+static void unittest_free_clean_page_callback(PGC *cache __maybe_unused, PGC_ENTRY entry __maybe_unused) {
+ ;
+}
+
+static void unittest_save_dirty_page_callback(PGC *cache __maybe_unused, PGC_ENTRY *entries_array __maybe_unused, PGC_PAGE **pages_array __maybe_unused, size_t entries __maybe_unused) {
+ ;
+}
+
+#ifdef PGC_STRESS_TEST
+
+struct {
+ bool stop;
+ PGC *cache;
+ PGC_PAGE **metrics;
+ size_t clean_metrics;
+ size_t hot_metrics;
+ time_t first_time_t;
+ time_t last_time_t;
+ size_t cache_size;
+ size_t query_threads;
+ size_t collect_threads;
+ size_t partitions;
+ size_t points_per_page;
+ time_t time_per_collection_ut;
+ time_t time_per_query_ut;
+ time_t time_per_flush_ut;
+ PGC_OPTIONS options;
+ char rand_statebufs[1024];
+ struct random_data *random_data;
+} pgc_uts = {
+ .stop = false,
+ .metrics = NULL,
+ .clean_metrics = 100000,
+ .hot_metrics = 1000000,
+ .first_time_t = 100000000,
+ .last_time_t = 0,
+ .cache_size = 0, // get the default (8MB)
+ .collect_threads = 16,
+ .query_threads = 16,
+ .partitions = 0, // get the default (system cpus)
+ .options = PGC_OPTIONS_AUTOSCALE,/* PGC_OPTIONS_FLUSH_PAGES_INLINE | PGC_OPTIONS_EVICT_PAGES_INLINE,*/
+ .points_per_page = 10,
+ .time_per_collection_ut = 1000000,
+ .time_per_query_ut = 250,
+ .time_per_flush_ut = 100,
+ .rand_statebufs = {},
+ .random_data = NULL,
+};
+
+void *unittest_stress_test_collector(void *ptr) {
+ size_t id = *((size_t *)ptr);
+
+ size_t metric_start = pgc_uts.clean_metrics;
+ size_t metric_end = pgc_uts.clean_metrics + pgc_uts.hot_metrics;
+ size_t number_of_metrics = metric_end - metric_start;
+ size_t per_collector_metrics = number_of_metrics / pgc_uts.collect_threads;
+ metric_start = metric_start + per_collector_metrics * id + 1;
+ metric_end = metric_start + per_collector_metrics - 1;
+
+ time_t start_time_t = pgc_uts.first_time_t + 1;
+
+ heartbeat_t hb;
+ heartbeat_init(&hb);
+
+ while(!__atomic_load_n(&pgc_uts.stop, __ATOMIC_RELAXED)) {
+ // info("COLLECTOR %zu: collecting metrics %zu to %zu, from %ld to %lu", id, metric_start, metric_end, start_time_t, start_time_t + pgc_uts.points_per_page);
+
+ netdata_thread_disable_cancelability();
+
+ for (size_t i = metric_start; i < metric_end; i++) {
+ bool added;
+
+ pgc_uts.metrics[i] = pgc_page_add_and_acquire(pgc_uts.cache, (PGC_ENTRY) {
+ .section = 1,
+ .metric_id = i,
+ .start_time_t = start_time_t,
+ .end_time_t = start_time_t,
+ .update_every = 1,
+ .size = 4096,
+ .data = NULL,
+ .hot = true,
+ }, &added);
+
+ if(!pgc_is_page_hot(pgc_uts.metrics[i]) || !added) {
+ pgc_page_release(pgc_uts.cache, pgc_uts.metrics[i]);
+ pgc_uts.metrics[i] = NULL;
+ }
+ }
+
+ time_t end_time_t = start_time_t + (time_t)pgc_uts.points_per_page;
+ while(++start_time_t <= end_time_t && !__atomic_load_n(&pgc_uts.stop, __ATOMIC_RELAXED)) {
+ heartbeat_next(&hb, pgc_uts.time_per_collection_ut);
+
+ for (size_t i = metric_start; i < metric_end; i++) {
+ if(pgc_uts.metrics[i])
+ pgc_page_hot_set_end_time_t(pgc_uts.cache, pgc_uts.metrics[i], start_time_t);
+ }
+
+ __atomic_store_n(&pgc_uts.last_time_t, start_time_t, __ATOMIC_RELAXED);
+ }
+
+ for (size_t i = metric_start; i < metric_end; i++) {
+ if (pgc_uts.metrics[i]) {
+ if(i % 10 == 0)
+ pgc_page_to_clean_evict_or_release(pgc_uts.cache, pgc_uts.metrics[i]);
+ else
+ pgc_page_hot_to_dirty_and_release(pgc_uts.cache, pgc_uts.metrics[i]);
+ }
+ }
+
+ netdata_thread_enable_cancelability();
+ }
+
+ return ptr;
+}
+
+void *unittest_stress_test_queries(void *ptr) {
+ size_t id = *((size_t *)ptr);
+ struct random_data *random_data = &pgc_uts.random_data[id];
+
+ size_t start = 0;
+ size_t end = pgc_uts.clean_metrics + pgc_uts.hot_metrics;
+
+ while(!__atomic_load_n(&pgc_uts.stop, __ATOMIC_RELAXED)) {
+ netdata_thread_disable_cancelability();
+
+ int32_t random_number;
+ random_r(random_data, &random_number);
+
+ size_t metric_id = random_number % (end - start);
+ time_t start_time_t = pgc_uts.first_time_t;
+ time_t end_time_t = __atomic_load_n(&pgc_uts.last_time_t, __ATOMIC_RELAXED);
+ if(end_time_t <= start_time_t)
+ end_time_t = start_time_t + 1;
+ size_t pages = (end_time_t - start_time_t) / pgc_uts.points_per_page + 1;
+
+ PGC_PAGE *array[pages];
+ for(size_t i = 0; i < pages ;i++)
+ array[i] = NULL;
+
+ // find the pages the cache has
+ for(size_t i = 0; i < pages ;i++) {
+ time_t page_start_time = start_time_t + (time_t)(i * pgc_uts.points_per_page);
+ array[i] = pgc_page_get_and_acquire(pgc_uts.cache, 1, metric_id,
+ page_start_time, (i < pages - 1)?PGC_SEARCH_EXACT:PGC_SEARCH_CLOSEST);
+ }
+
+ // load the rest of the pages
+ for(size_t i = 0; i < pages ;i++) {
+ if(array[i]) continue;
+
+ time_t page_start_time = start_time_t + (time_t)(i * pgc_uts.points_per_page);
+ array[i] = pgc_page_add_and_acquire(pgc_uts.cache, (PGC_ENTRY) {
+ .section = 1,
+ .metric_id = metric_id,
+ .start_time_t = page_start_time,
+ .end_time_t = page_start_time + (time_t)pgc_uts.points_per_page,
+ .update_every = 1,
+ .size = 4096,
+ .data = NULL,
+ .hot = false,
+ }, NULL);
+ }
+
+ // do the query
+ // ...
+ struct timespec work_duration = {.tv_sec = 0, .tv_nsec = pgc_uts.time_per_query_ut * NSEC_PER_USEC };
+ nanosleep(&work_duration, NULL);
+
+ // release the pages
+ for(size_t i = 0; i < pages ;i++) {
+ if(!array[i]) continue;
+ pgc_page_release(pgc_uts.cache, array[i]);
+ array[i] = NULL;
+ }
+
+ netdata_thread_enable_cancelability();
+ }
+
+ return ptr;
+}
+
+void *unittest_stress_test_service(void *ptr) {
+ heartbeat_t hb;
+ heartbeat_init(&hb);
+ while(!__atomic_load_n(&pgc_uts.stop, __ATOMIC_RELAXED)) {
+ heartbeat_next(&hb, 1 * USEC_PER_SEC);
+
+ pgc_flush_pages(pgc_uts.cache, 1000);
+ pgc_evict_pages(pgc_uts.cache, 0, 0);
+ }
+ return ptr;
+}
+
+static void unittest_stress_test_save_dirty_page_callback(PGC *cache __maybe_unused, PGC_ENTRY *entries_array __maybe_unused, PGC_PAGE **pages_array __maybe_unused, size_t entries __maybe_unused) {
+ // info("SAVE %zu pages", entries);
+ if(!pgc_uts.stop) {
+ usec_t t = pgc_uts.time_per_flush_ut;
+
+ if(t > 0) {
+ struct timespec work_duration = {
+ .tv_sec = t / USEC_PER_SEC,
+ .tv_nsec = (long) ((t % USEC_PER_SEC) * NSEC_PER_USEC)
+ };
+
+ nanosleep(&work_duration, NULL);
+ }
+ }
+}
+
+void unittest_stress_test(void) {
+ pgc_uts.cache = pgc_create(pgc_uts.cache_size * 1024 * 1024,
+ unittest_free_clean_page_callback,
+ 64, unittest_stress_test_save_dirty_page_callback,
+ 1000, 10000, 1,
+ pgc_uts.options, pgc_uts.partitions, 0);
+
+ pgc_uts.metrics = callocz(pgc_uts.clean_metrics + pgc_uts.hot_metrics, sizeof(PGC_PAGE *));
+
+ pthread_t service_thread;
+ netdata_thread_create(&service_thread, "SERVICE",
+ NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG,
+ unittest_stress_test_service, NULL);
+
+ pthread_t collect_threads[pgc_uts.collect_threads];
+ size_t collect_thread_ids[pgc_uts.collect_threads];
+ for(size_t i = 0; i < pgc_uts.collect_threads ;i++) {
+ collect_thread_ids[i] = i;
+ char buffer[100 + 1];
+ snprintfz(buffer, 100, "COLLECT_%zu", i);
+ netdata_thread_create(&collect_threads[i], buffer,
+ NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG,
+ unittest_stress_test_collector, &collect_thread_ids[i]);
+ }
+
+ pthread_t queries_threads[pgc_uts.query_threads];
+ size_t query_thread_ids[pgc_uts.query_threads];
+ pgc_uts.random_data = callocz(pgc_uts.query_threads, sizeof(struct random_data));
+ for(size_t i = 0; i < pgc_uts.query_threads ;i++) {
+ query_thread_ids[i] = i;
+ char buffer[100 + 1];
+ snprintfz(buffer, 100, "QUERY_%zu", i);
+ initstate_r(1, pgc_uts.rand_statebufs, 1024, &pgc_uts.random_data[i]);
+ netdata_thread_create(&queries_threads[i], buffer,
+ NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG,
+ unittest_stress_test_queries, &query_thread_ids[i]);
+ }
+
+ heartbeat_t hb;
+ heartbeat_init(&hb);
+
+ struct {
+ size_t entries;
+ size_t added;
+ size_t deleted;
+ size_t referenced;
+
+ size_t hot_entries;
+ size_t hot_added;
+ size_t hot_deleted;
+
+ size_t dirty_entries;
+ size_t dirty_added;
+ size_t dirty_deleted;
+
+ size_t clean_entries;
+ size_t clean_added;
+ size_t clean_deleted;
+
+ size_t searches_exact;
+ size_t searches_exact_hits;
+ size_t searches_closest;
+ size_t searches_closest_hits;
+
+ size_t collections;
+
+ size_t events_cache_under_severe_pressure;
+ size_t events_cache_needs_space_90;
+ size_t events_flush_critical;
+ } stats = {}, old_stats = {};
+
+ for(int i = 0; i < 86400 ;i++) {
+ heartbeat_next(&hb, 1 * USEC_PER_SEC);
+
+ old_stats = stats;
+ stats.entries = __atomic_load_n(&pgc_uts.cache->stats.entries, __ATOMIC_RELAXED);
+ stats.added = __atomic_load_n(&pgc_uts.cache->stats.added_entries, __ATOMIC_RELAXED);
+ stats.deleted = __atomic_load_n(&pgc_uts.cache->stats.removed_entries, __ATOMIC_RELAXED);
+ stats.referenced = __atomic_load_n(&pgc_uts.cache->stats.referenced_entries, __ATOMIC_RELAXED);
+
+ stats.hot_entries = __atomic_load_n(&pgc_uts.cache->hot.stats->entries, __ATOMIC_RELAXED);
+ stats.hot_added = __atomic_load_n(&pgc_uts.cache->hot.stats->added_entries, __ATOMIC_RELAXED);
+ stats.hot_deleted = __atomic_load_n(&pgc_uts.cache->hot.stats->removed_entries, __ATOMIC_RELAXED);
+
+ stats.dirty_entries = __atomic_load_n(&pgc_uts.cache->dirty.stats->entries, __ATOMIC_RELAXED);
+ stats.dirty_added = __atomic_load_n(&pgc_uts.cache->dirty.stats->added_entries, __ATOMIC_RELAXED);
+ stats.dirty_deleted = __atomic_load_n(&pgc_uts.cache->dirty.stats->removed_entries, __ATOMIC_RELAXED);
+
+ stats.clean_entries = __atomic_load_n(&pgc_uts.cache->clean.stats->entries, __ATOMIC_RELAXED);
+ stats.clean_added = __atomic_load_n(&pgc_uts.cache->clean.stats->added_entries, __ATOMIC_RELAXED);
+ stats.clean_deleted = __atomic_load_n(&pgc_uts.cache->clean.stats->removed_entries, __ATOMIC_RELAXED);
+
+ stats.searches_exact = __atomic_load_n(&pgc_uts.cache->stats.searches_exact, __ATOMIC_RELAXED);
+ stats.searches_exact_hits = __atomic_load_n(&pgc_uts.cache->stats.searches_exact_hits, __ATOMIC_RELAXED);
+
+ stats.searches_closest = __atomic_load_n(&pgc_uts.cache->stats.searches_closest, __ATOMIC_RELAXED);
+ stats.searches_closest_hits = __atomic_load_n(&pgc_uts.cache->stats.searches_closest_hits, __ATOMIC_RELAXED);
+
+ stats.events_cache_under_severe_pressure = __atomic_load_n(&pgc_uts.cache->stats.events_cache_under_severe_pressure, __ATOMIC_RELAXED);
+ stats.events_cache_needs_space_90 = __atomic_load_n(&pgc_uts.cache->stats.events_cache_needs_space_aggressively, __ATOMIC_RELAXED);
+ stats.events_flush_critical = __atomic_load_n(&pgc_uts.cache->stats.events_flush_critical, __ATOMIC_RELAXED);
+
+ size_t searches_exact = stats.searches_exact - old_stats.searches_exact;
+ size_t searches_closest = stats.searches_closest - old_stats.searches_closest;
+
+ size_t hit_exact = stats.searches_exact_hits - old_stats.searches_exact_hits;
+ size_t hit_closest = stats.searches_closest_hits - old_stats.searches_closest_hits;
+
+ double hit_exact_pc = (searches_exact > 0) ? (double)hit_exact * 100.0 / (double)searches_exact : 0.0;
+ double hit_closest_pc = (searches_closest > 0) ? (double)hit_closest * 100.0 / (double)searches_closest : 0.0;
+
+#ifdef PGC_COUNT_POINTS_COLLECTED
+ stats.collections = __atomic_load_n(&pgc_uts.cache->stats.points_collected, __ATOMIC_RELAXED);
+#endif
+
+ char *cache_status = "N";
+ if(stats.events_cache_under_severe_pressure > old_stats.events_cache_under_severe_pressure)
+ cache_status = "F";
+ else if(stats.events_cache_needs_space_90 > old_stats.events_cache_needs_space_90)
+ cache_status = "f";
+
+ char *flushing_status = "N";
+ if(stats.events_flush_critical > old_stats.events_flush_critical)
+ flushing_status = "F";
+
+ info("PGS %5zuk +%4zuk/-%4zuk "
+ "| RF %5zuk "
+ "| HOT %5zuk +%4zuk -%4zuk "
+ "| DRT %s %5zuk +%4zuk -%4zuk "
+ "| CLN %s %5zuk +%4zuk -%4zuk "
+ "| SRCH %4zuk %4zuk, HIT %4.1f%% %4.1f%% "
+#ifdef PGC_COUNT_POINTS_COLLECTED
+ "| CLCT %8.4f Mps"
+#endif
+ , stats.entries / 1000
+ , (stats.added - old_stats.added) / 1000, (stats.deleted - old_stats.deleted) / 1000
+ , stats.referenced / 1000
+ , stats.hot_entries / 1000, (stats.hot_added - old_stats.hot_added) / 1000, (stats.hot_deleted - old_stats.hot_deleted) / 1000
+ , flushing_status
+ , stats.dirty_entries / 1000
+ , (stats.dirty_added - old_stats.dirty_added) / 1000, (stats.dirty_deleted - old_stats.dirty_deleted) / 1000
+ , cache_status
+ , stats.clean_entries / 1000
+ , (stats.clean_added - old_stats.clean_added) / 1000, (stats.clean_deleted - old_stats.clean_deleted) / 1000
+ , searches_exact / 1000, searches_closest / 1000
+ , hit_exact_pc, hit_closest_pc
+#ifdef PGC_COUNT_POINTS_COLLECTED
+ , (double)(stats.collections - old_stats.collections) / 1000.0 / 1000.0
+#endif
+ );
+ }
+ info("Waiting for threads to stop...");
+ __atomic_store_n(&pgc_uts.stop, true, __ATOMIC_RELAXED);
+
+ netdata_thread_join(service_thread, NULL);
+
+ for(size_t i = 0; i < pgc_uts.collect_threads ;i++)
+ netdata_thread_join(collect_threads[i],NULL);
+
+ for(size_t i = 0; i < pgc_uts.query_threads ;i++)
+ netdata_thread_join(queries_threads[i],NULL);
+
+ pgc_destroy(pgc_uts.cache);
+
+ freez(pgc_uts.metrics);
+ freez(pgc_uts.random_data);
+}
+#endif
+
+int pgc_unittest(void) {
+ PGC *cache = pgc_create("test",
+ 32 * 1024 * 1024, unittest_free_clean_page_callback,
+ 64, NULL, unittest_save_dirty_page_callback,
+ 10, 10, 1000, 10,
+ PGC_OPTIONS_DEFAULT, 1, 11);
+
+ // FIXME - unit tests
+ // - add clean page
+ // - add clean page again (should not add it)
+ // - release page (should decrement counters)
+ // - add hot page
+ // - add hot page again (should not add it)
+ // - turn hot page to dirty, with and without a reference counter to it
+ // - dirty pages are saved once there are enough of them
+ // - find page exact
+ // - find page (should return last)
+ // - find page (should return next)
+ // - page cache full (should evict)
+ // - on destroy, turn hot pages to dirty and save them
+
+ PGC_PAGE *page1 = pgc_page_add_and_acquire(cache, (PGC_ENTRY){
+ .section = 1,
+ .metric_id = 10,
+ .start_time_s = 100,
+ .end_time_s = 1000,
+ .size = 4096,
+ .data = NULL,
+ .hot = false,
+ .custom_data = (uint8_t *)"0123456789",
+ }, NULL);
+
+ if(strcmp(pgc_page_custom_data(cache, page1), "0123456789") != 0)
+ fatal("custom data do not work");
+
+ memcpy(pgc_page_custom_data(cache, page1), "ABCDEFGHIJ", 11);
+ if(strcmp(pgc_page_custom_data(cache, page1), "ABCDEFGHIJ") != 0)
+ fatal("custom data do not work");
+
+ pgc_page_release(cache, page1);
+
+ PGC_PAGE *page2 = pgc_page_add_and_acquire(cache, (PGC_ENTRY){
+ .section = 2,
+ .metric_id = 10,
+ .start_time_s = 1001,
+ .end_time_s = 2000,
+ .size = 4096,
+ .data = NULL,
+ .hot = true,
+ }, NULL);
+
+ pgc_page_hot_set_end_time_s(cache, page2, 2001);
+ pgc_page_hot_to_dirty_and_release(cache, page2);
+
+ PGC_PAGE *page3 = pgc_page_add_and_acquire(cache, (PGC_ENTRY){
+ .section = 3,
+ .metric_id = 10,
+ .start_time_s = 1001,
+ .end_time_s = 2000,
+ .size = 4096,
+ .data = NULL,
+ .hot = true,
+ }, NULL);
+
+ pgc_page_hot_set_end_time_s(cache, page3, 2001);
+ pgc_page_hot_to_dirty_and_release(cache, page3);
+
+ pgc_destroy(cache);
+
+#ifdef PGC_STRESS_TEST
+ unittest_stress_test();
+#endif
+
+ return 0;
+}