From b5f8ee61a7f7e9bd291dd26b0585d03eb686c941 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 5 May 2024 13:19:16 +0200 Subject: Adding upstream version 1.46.3. Signed-off-by: Daniel Baumann --- src/database/rrdcollector.c | 137 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 137 insertions(+) create mode 100644 src/database/rrdcollector.c (limited to 'src/database/rrdcollector.c') diff --git a/src/database/rrdcollector.c b/src/database/rrdcollector.c new file mode 100644 index 000000000..59c5c459d --- /dev/null +++ b/src/database/rrdcollector.c @@ -0,0 +1,137 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "rrdcollector.h" +#include "rrdcollector-internals.h" + +// Each function points to this collector structure +// so that when the collector exits, all of them will +// be invalidated (running == false) +// The last function using this collector +// frees the structure too (or when the collector calls +// rrdset_collector_finished()). + +struct rrd_collector { + int32_t refcount; + int32_t refcount_dispatcher; + pid_t tid; + bool running; +}; + +// Each thread that adds RRDSET functions has to call +// rrdset_collector_started() and rrdset_collector_finished() +// to create the collector structure. + +__thread struct rrd_collector *thread_rrd_collector = NULL; + +inline bool rrd_collector_running(struct rrd_collector *rdc) { + return __atomic_load_n(&rdc->running, __ATOMIC_RELAXED); +} + +inline pid_t rrd_collector_tid(struct rrd_collector *rdc) { + return rdc->tid; +} + +bool rrd_collector_dispatcher_acquire(struct rrd_collector *rdc) { + int32_t expected = __atomic_load_n(&rdc->refcount_dispatcher, __ATOMIC_RELAXED); + int32_t wanted; + do { + if(expected < 0) + return false; + + wanted = expected + 1; + } while(!__atomic_compare_exchange_n(&rdc->refcount_dispatcher, &expected, wanted, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); + + return true; +} + +void rrd_collector_dispatcher_release(struct rrd_collector *rdc) { + __atomic_sub_fetch(&rdc->refcount_dispatcher, 1, __ATOMIC_RELAXED); +} + +static void rrd_collector_free(struct rrd_collector *rdc) { + if(rdc->running) + return; + + int32_t expected = 0; + if(!__atomic_compare_exchange_n(&rdc->refcount, &expected, -1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) { + // the collector is still referenced by charts. + // leave it hanging there, the last chart will actually free it. + return; + } + + // we can free it now + freez(rdc); +} + +// called once per collector +void rrd_collector_started(void) { + if(!thread_rrd_collector) + thread_rrd_collector = callocz(1, sizeof(struct rrd_collector)); + + thread_rrd_collector->tid = gettid_cached(); + __atomic_store_n(&thread_rrd_collector->running, true, __ATOMIC_RELAXED); +} + +// called once per collector +void rrd_collector_finished(void) { + if(!thread_rrd_collector) + return; + + __atomic_store_n(&thread_rrd_collector->running, false, __ATOMIC_RELAXED); + + // wait for any cancellation requests to be dispatched; + // the problem is that cancellation requests require a structure allocated by the collector, + // so, while cancellation requests are being dispatched, this structure is accessed. + // delaying the exit of the thread is required to avoid cleaning up this structure. + + int32_t expected = 0; + while(!__atomic_compare_exchange_n(&thread_rrd_collector->refcount_dispatcher, &expected, -1, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) { + expected = 0; + sleep_usec(1 * USEC_PER_MS); + } + + rrd_collector_free(thread_rrd_collector); + thread_rrd_collector = NULL; +} + +bool rrd_collector_acquire(struct rrd_collector *rdc) { + + int32_t expected = __atomic_load_n(&rdc->refcount, __ATOMIC_RELAXED), wanted = 0; + do { + if(expected < 0 || !rrd_collector_running(rdc)) + return false; + + wanted = expected + 1; + } while(!__atomic_compare_exchange_n(&rdc->refcount, &expected, wanted, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)); + + return true; +} + +struct rrd_collector *rrd_collector_acquire_current_thread(void) { + rrd_collector_started(); + + if(!rrd_collector_acquire(thread_rrd_collector)) + internal_fatal(true, "FUNCTIONS: Trying to acquire a the current thread collector, that is currently exiting."); + + return thread_rrd_collector; +} + +void rrd_collector_release(struct rrd_collector *rdc) { + if(unlikely(!rdc)) return; + + int32_t expected = __atomic_load_n(&rdc->refcount, __ATOMIC_RELAXED), wanted = 0; + do { + if(expected < 0) + return; + + if(expected == 0) { + internal_fatal(true, "FUNCTIONS: Trying to release a collector that is not acquired."); + return; + } + + wanted = expected - 1; + } while(!__atomic_compare_exchange_n(&rdc->refcount, &expected, wanted, false, __ATOMIC_RELEASE, __ATOMIC_RELAXED)); + + if(wanted == 0) + rrd_collector_free(rdc); +} -- cgit v1.2.3