// SPDX-License-Identifier: GPL-3.0-or-later #include "rrdcollector.h" #include "rrdcollector-internals.h" // Each function points to this collector structure // so that when the collector exits, all of them will // be invalidated (running == false) // The last function using this collector // frees the structure too (or when the collector calls // rrdset_collector_finished()). struct rrd_collector { int32_t refcount; int32_t refcount_dispatcher; pid_t tid; bool running; }; // Each thread that adds RRDSET functions has to call // rrdset_collector_started() and rrdset_collector_finished() // to create the collector structure. __thread struct rrd_collector *thread_rrd_collector = NULL; inline bool rrd_collector_running(struct rrd_collector *rdc) { return __atomic_load_n(&rdc->running, __ATOMIC_RELAXED); } inline pid_t rrd_collector_tid(struct rrd_collector *rdc) { return rdc->tid; } bool rrd_collector_dispatcher_acquire(struct rrd_collector *rdc) { int32_t expected = __atomic_load_n(&rdc->refcount_dispatcher, __ATOMIC_RELAXED); int32_t wanted; do { if(expected < 0) return false; wanted = expected + 1; } while(!__atomic_compare_exchange_n(&rdc->refcount_dispatcher, &expected, wanted, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); return true; } void rrd_collector_dispatcher_release(struct rrd_collector *rdc) { __atomic_sub_fetch(&rdc->refcount_dispatcher, 1, __ATOMIC_RELAXED); } static void rrd_collector_free(struct rrd_collector *rdc) { if(rdc->running) return; int32_t expected = 0; if(!__atomic_compare_exchange_n(&rdc->refcount, &expected, -1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) { // the collector is still referenced by charts. // leave it hanging there, the last chart will actually free it. return; } // we can free it now freez(rdc); } // called once per collector void rrd_collector_started(void) { if(!thread_rrd_collector) thread_rrd_collector = callocz(1, sizeof(struct rrd_collector)); thread_rrd_collector->tid = gettid_cached(); __atomic_store_n(&thread_rrd_collector->running, true, __ATOMIC_RELAXED); } // called once per collector void rrd_collector_finished(void) { if(!thread_rrd_collector) return; __atomic_store_n(&thread_rrd_collector->running, false, __ATOMIC_RELAXED); // wait for any cancellation requests to be dispatched; // the problem is that cancellation requests require a structure allocated by the collector, // so, while cancellation requests are being dispatched, this structure is accessed. // delaying the exit of the thread is required to avoid cleaning up this structure. int32_t expected = 0; while(!__atomic_compare_exchange_n(&thread_rrd_collector->refcount_dispatcher, &expected, -1, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) { expected = 0; sleep_usec(1 * USEC_PER_MS); } rrd_collector_free(thread_rrd_collector); thread_rrd_collector = NULL; } bool rrd_collector_acquire(struct rrd_collector *rdc) { int32_t expected = __atomic_load_n(&rdc->refcount, __ATOMIC_RELAXED), wanted = 0; do { if(expected < 0 || !rrd_collector_running(rdc)) return false; wanted = expected + 1; } while(!__atomic_compare_exchange_n(&rdc->refcount, &expected, wanted, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)); return true; } struct rrd_collector *rrd_collector_acquire_current_thread(void) { rrd_collector_started(); if(!rrd_collector_acquire(thread_rrd_collector)) internal_fatal(true, "FUNCTIONS: Trying to acquire a the current thread collector, that is currently exiting."); return thread_rrd_collector; } void rrd_collector_release(struct rrd_collector *rdc) { if(unlikely(!rdc)) return; int32_t expected = __atomic_load_n(&rdc->refcount, __ATOMIC_RELAXED), wanted = 0; do { if(expected < 0) return; if(expected == 0) { internal_fatal(true, "FUNCTIONS: Trying to release a collector that is not acquired."); return; } wanted = expected - 1; } while(!__atomic_compare_exchange_n(&rdc->refcount, &expected, wanted, false, __ATOMIC_RELEASE, __ATOMIC_RELAXED)); if(wanted == 0) rrd_collector_free(rdc); }