From 836b47cb7e99a977c5a23b059ca1d0b5065d310e Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Wed, 24 Jul 2024 11:54:23 +0200 Subject: Merging upstream version 1.46.3. Signed-off-by: Daniel Baumann --- src/aclk/aclk_query_queue.c | 124 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 src/aclk/aclk_query_queue.c (limited to 'src/aclk/aclk_query_queue.c') diff --git a/src/aclk/aclk_query_queue.c b/src/aclk/aclk_query_queue.c new file mode 100644 index 000000000..3edadc002 --- /dev/null +++ b/src/aclk/aclk_query_queue.c @@ -0,0 +1,124 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "aclk_query_queue.h" +#include "aclk_query.h" +#include "aclk_stats.h" + +static netdata_mutex_t aclk_query_queue_mutex = NETDATA_MUTEX_INITIALIZER; +#define ACLK_QUEUE_LOCK netdata_mutex_lock(&aclk_query_queue_mutex) +#define ACLK_QUEUE_UNLOCK netdata_mutex_unlock(&aclk_query_queue_mutex) + +static struct aclk_query_queue { + aclk_query_t head; + int block_push; +} aclk_query_queue = { + .head = NULL, + .block_push = 0 +}; + +static inline int _aclk_queue_query(aclk_query_t query) +{ + now_monotonic_high_precision_timeval(&query->created_tv); + query->created = now_realtime_usec(); + + ACLK_QUEUE_LOCK; + if (aclk_query_queue.block_push) { + ACLK_QUEUE_UNLOCK; + if(service_running(SERVICE_ACLK | ABILITY_DATA_QUERIES)) + netdata_log_error("Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown."); + aclk_query_free(query); + return 1; + } + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(aclk_query_queue.head, query, prev, next); + ACLK_QUEUE_UNLOCK; + return 0; + +} + +int aclk_queue_query(aclk_query_t query) +{ + int ret = _aclk_queue_query(query); + if (!ret) { + QUERY_THREAD_WAKEUP; + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.queries_queued++; + ACLK_STATS_UNLOCK; + } + } + return ret; +} + +aclk_query_t aclk_queue_pop(void) +{ + aclk_query_t ret; + + ACLK_QUEUE_LOCK; + if (aclk_query_queue.block_push) { + ACLK_QUEUE_UNLOCK; + if(service_running(SERVICE_ACLK | ABILITY_DATA_QUERIES)) + netdata_log_error("POP Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown."); + return NULL; + } + + ret = aclk_query_queue.head; + if (!ret) { + ACLK_QUEUE_UNLOCK; + return ret; + } + + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(aclk_query_queue.head, ret, prev, next); + ACLK_QUEUE_UNLOCK; + + ret->next = NULL; + return ret; +} + +void aclk_queue_flush(void) +{ + aclk_query_t query = aclk_queue_pop(); + while (query) { + aclk_query_free(query); + query = aclk_queue_pop(); + } +} + +aclk_query_t aclk_query_new(aclk_query_type_t type) +{ + aclk_query_t query = callocz(1, sizeof(struct aclk_query)); + query->type = type; + return query; +} + +void aclk_query_free(aclk_query_t query) +{ + switch (query->type) { + case HTTP_API_V2: + freez(query->data.http_api_v2.payload); + if (query->data.http_api_v2.query != query->dedup_id) + freez(query->data.http_api_v2.query); + break; + + default: + break; + } + + freez(query->dedup_id); + freez(query->callback_topic); + freez(query->msg_id); + freez(query); +} + +void aclk_queue_lock(void) +{ + ACLK_QUEUE_LOCK; + aclk_query_queue.block_push = 1; + ACLK_QUEUE_UNLOCK; +} + +void aclk_queue_unlock(void) +{ + ACLK_QUEUE_LOCK; + aclk_query_queue.block_push = 0; + ACLK_QUEUE_UNLOCK; +} -- cgit v1.2.3