summaryrefslogtreecommitdiffstats
path: root/lib/workqueue.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 09:53:30 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 09:53:30 +0000
commit2c7cac91ed6e7db0f6937923d2b57f97dbdbc337 (patch)
treec05dc0f8e6aa3accc84e3e5cffc933ed94941383 /lib/workqueue.c
parentInitial commit. (diff)
downloadfrr-2c7cac91ed6e7db0f6937923d2b57f97dbdbc337.tar.xz
frr-2c7cac91ed6e7db0f6937923d2b57f97dbdbc337.zip
Adding upstream version 8.4.4.upstream/8.4.4upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'lib/workqueue.c')
-rw-r--r--lib/workqueue.c384
1 files changed, 384 insertions, 0 deletions
diff --git a/lib/workqueue.c b/lib/workqueue.c
new file mode 100644
index 0000000..c703de9
--- /dev/null
+++ b/lib/workqueue.c
@@ -0,0 +1,384 @@
+/*
+ * Quagga Work Queue Support.
+ *
+ * Copyright (C) 2005 Sun Microsystems, Inc.
+ *
+ * This file is part of GNU Zebra.
+ *
+ * Quagga is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2, or (at your option) any
+ * later version.
+ *
+ * Quagga is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; see the file COPYING; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <zebra.h>
+#include "thread.h"
+#include "memory.h"
+#include "workqueue.h"
+#include "linklist.h"
+#include "command.h"
+#include "log.h"
+
+DEFINE_MTYPE(LIB, WORK_QUEUE, "Work queue");
+DEFINE_MTYPE_STATIC(LIB, WORK_QUEUE_ITEM, "Work queue item");
+DEFINE_MTYPE_STATIC(LIB, WORK_QUEUE_NAME, "Work queue name string");
+
+/* master list of work_queues */
+static struct list _work_queues;
+/* pointer primarily to avoid an otherwise harmless warning on
+ * ALL_LIST_ELEMENTS_RO
+ */
+static struct list *work_queues = &_work_queues;
+
+#define WORK_QUEUE_MIN_GRANULARITY 1
+
+static struct work_queue_item *work_queue_item_new(struct work_queue *wq)
+{
+ struct work_queue_item *item;
+ assert(wq);
+
+ item = XCALLOC(MTYPE_WORK_QUEUE_ITEM, sizeof(struct work_queue_item));
+
+ return item;
+}
+
+static void work_queue_item_free(struct work_queue_item *item)
+{
+ XFREE(MTYPE_WORK_QUEUE_ITEM, item);
+ return;
+}
+
+static void work_queue_item_remove(struct work_queue *wq,
+ struct work_queue_item *item)
+{
+ assert(item && item->data);
+
+ /* call private data deletion callback if needed */
+ if (wq->spec.del_item_data)
+ wq->spec.del_item_data(wq, item->data);
+
+ work_queue_item_dequeue(wq, item);
+
+ work_queue_item_free(item);
+
+ return;
+}
+
+/* create new work queue */
+struct work_queue *work_queue_new(struct thread_master *m,
+ const char *queue_name)
+{
+ struct work_queue *new;
+
+ new = XCALLOC(MTYPE_WORK_QUEUE, sizeof(struct work_queue));
+
+ new->name = XSTRDUP(MTYPE_WORK_QUEUE_NAME, queue_name);
+ new->master = m;
+ SET_FLAG(new->flags, WQ_UNPLUGGED);
+
+ STAILQ_INIT(&new->items);
+
+ listnode_add(work_queues, new);
+
+ new->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
+
+ /* Default values, can be overridden by caller */
+ new->spec.hold = WORK_QUEUE_DEFAULT_HOLD;
+ new->spec.yield = THREAD_YIELD_TIME_SLOT;
+ new->spec.retry = WORK_QUEUE_DEFAULT_RETRY;
+
+ return new;
+}
+
+void work_queue_free_and_null(struct work_queue **wqp)
+{
+ struct work_queue *wq = *wqp;
+
+ THREAD_OFF(wq->thread);
+
+ while (!work_queue_empty(wq)) {
+ struct work_queue_item *item = work_queue_last_item(wq);
+
+ work_queue_item_remove(wq, item);
+ }
+
+ listnode_delete(work_queues, wq);
+
+ XFREE(MTYPE_WORK_QUEUE_NAME, wq->name);
+ XFREE(MTYPE_WORK_QUEUE, wq);
+
+ *wqp = NULL;
+}
+
+bool work_queue_is_scheduled(struct work_queue *wq)
+{
+ return thread_is_scheduled(wq->thread);
+}
+
+static int work_queue_schedule(struct work_queue *wq, unsigned int delay)
+{
+ /* if appropriate, schedule work queue thread */
+ if (CHECK_FLAG(wq->flags, WQ_UNPLUGGED) &&
+ !thread_is_scheduled(wq->thread) && !work_queue_empty(wq)) {
+ /* Schedule timer if there's a delay, otherwise just schedule
+ * as an 'event'
+ */
+ if (delay > 0) {
+ thread_add_timer_msec(wq->master, work_queue_run, wq,
+ delay, &wq->thread);
+ thread_ignore_late_timer(wq->thread);
+ } else
+ thread_add_event(wq->master, work_queue_run, wq, 0,
+ &wq->thread);
+
+ /* set thread yield time, if needed */
+ if (thread_is_scheduled(wq->thread) &&
+ wq->spec.yield != THREAD_YIELD_TIME_SLOT)
+ thread_set_yield_time(wq->thread, wq->spec.yield);
+ return 1;
+ } else
+ return 0;
+}
+
+void work_queue_add(struct work_queue *wq, void *data)
+{
+ struct work_queue_item *item;
+
+ assert(wq);
+
+ item = work_queue_item_new(wq);
+
+ item->data = data;
+ work_queue_item_enqueue(wq, item);
+
+ work_queue_schedule(wq, wq->spec.hold);
+
+ return;
+}
+
+static void work_queue_item_requeue(struct work_queue *wq,
+ struct work_queue_item *item)
+{
+ work_queue_item_dequeue(wq, item);
+
+ /* attach to end of list */
+ work_queue_item_enqueue(wq, item);
+}
+
+DEFUN (show_work_queues,
+ show_work_queues_cmd,
+ "show work-queues",
+ SHOW_STR
+ "Work Queue information\n")
+{
+ struct listnode *node;
+ struct work_queue *wq;
+
+ vty_out(vty, "%c %8s %5s %8s %8s %21s\n", ' ', "List", "(ms) ",
+ "Q. Runs", "Yields", "Cycle Counts ");
+ vty_out(vty, "%c %8s %5s %8s %8s %7s %6s %8s %6s %s\n", 'P', "Items",
+ "Hold", "Total", "Total", "Best", "Gran.", "Total", "Avg.",
+ "Name");
+
+ for (ALL_LIST_ELEMENTS_RO(work_queues, node, wq)) {
+ vty_out(vty, "%c %8d %5d %8ld %8ld %7d %6d %8ld %6u %s\n",
+ (CHECK_FLAG(wq->flags, WQ_UNPLUGGED) ? ' ' : 'P'),
+ work_queue_item_count(wq), wq->spec.hold, wq->runs,
+ wq->yields, wq->cycles.best, wq->cycles.granularity,
+ wq->cycles.total,
+ (wq->runs) ? (unsigned int)(wq->cycles.total / wq->runs)
+ : 0,
+ wq->name);
+ }
+
+ return CMD_SUCCESS;
+}
+
+void workqueue_cmd_init(void)
+{
+ install_element(VIEW_NODE, &show_work_queues_cmd);
+}
+
+/* 'plug' a queue: Stop it from being scheduled,
+ * ie: prevent the queue from draining.
+ */
+void work_queue_plug(struct work_queue *wq)
+{
+ THREAD_OFF(wq->thread);
+
+ UNSET_FLAG(wq->flags, WQ_UNPLUGGED);
+}
+
+/* unplug queue, schedule it again, if appropriate
+ * Ie: Allow the queue to be drained again
+ */
+void work_queue_unplug(struct work_queue *wq)
+{
+ SET_FLAG(wq->flags, WQ_UNPLUGGED);
+
+ /* if thread isnt already waiting, add one */
+ work_queue_schedule(wq, wq->spec.hold);
+}
+
+/* timer thread to process a work queue
+ * will reschedule itself if required,
+ * otherwise work_queue_item_add
+ */
+void work_queue_run(struct thread *thread)
+{
+ struct work_queue *wq;
+ struct work_queue_item *item, *titem;
+ wq_item_status ret = WQ_SUCCESS;
+ unsigned int cycles = 0;
+ char yielded = 0;
+
+ wq = THREAD_ARG(thread);
+
+ assert(wq);
+
+ /* calculate cycle granularity:
+ * list iteration == 1 run
+ * listnode processing == 1 cycle
+ * granularity == # cycles between checks whether we should yield.
+ *
+ * granularity should be > 0, and can increase slowly after each run to
+ * provide some hysteris, but not past cycles.best or 2*cycles.
+ *
+ * Best: starts low, can only increase
+ *
+ * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased
+ * if we run to end of time slot, can increase otherwise
+ * by a small factor.
+ *
+ * We could use just the average and save some work, however we want to
+ * be
+ * able to adjust quickly to CPU pressure. Average wont shift much if
+ * daemon has been running a long time.
+ */
+ if (wq->cycles.granularity == 0)
+ wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
+
+ STAILQ_FOREACH_SAFE (item, &wq->items, wq, titem) {
+ assert(item->data);
+
+ /* dont run items which are past their allowed retries */
+ if (item->ran > wq->spec.max_retries) {
+ /* run error handler, if any */
+ if (wq->spec.errorfunc)
+ wq->spec.errorfunc(wq, item);
+ work_queue_item_remove(wq, item);
+ continue;
+ }
+
+ /* run and take care of items that want to be retried
+ * immediately */
+ do {
+ ret = wq->spec.workfunc(wq, item->data);
+ item->ran++;
+ } while ((ret == WQ_RETRY_NOW)
+ && (item->ran < wq->spec.max_retries));
+
+ switch (ret) {
+ case WQ_QUEUE_BLOCKED: {
+ /* decrement item->ran again, cause this isn't an item
+ * specific error, and fall through to WQ_RETRY_LATER
+ */
+ item->ran--;
+ }
+ case WQ_RETRY_LATER: {
+ goto stats;
+ }
+ case WQ_REQUEUE: {
+ item->ran--;
+ work_queue_item_requeue(wq, item);
+ /* If a single node is being used with a meta-queue
+ * (e.g., zebra),
+ * update the next node as we don't want to exit the
+ * thread and
+ * reschedule it after every node. By definition,
+ * WQ_REQUEUE is
+ * meant to continue the processing; the yield logic
+ * will kick in
+ * to terminate the thread when time has exceeded.
+ */
+ if (titem == NULL)
+ titem = item;
+ break;
+ }
+ case WQ_RETRY_NOW:
+ /* a RETRY_NOW that gets here has exceeded max_tries, same as
+ * ERROR */
+ case WQ_ERROR: {
+ if (wq->spec.errorfunc)
+ wq->spec.errorfunc(wq, item);
+ }
+ /* fallthru */
+ case WQ_SUCCESS:
+ default: {
+ work_queue_item_remove(wq, item);
+ break;
+ }
+ }
+
+ /* completed cycle */
+ cycles++;
+
+ /* test if we should yield */
+ if (!(cycles % wq->cycles.granularity)
+ && thread_should_yield(thread)) {
+ yielded = 1;
+ goto stats;
+ }
+ }
+
+stats:
+
+#define WQ_HYSTERESIS_FACTOR 4
+
+ /* we yielded, check whether granularity should be reduced */
+ if (yielded && (cycles < wq->cycles.granularity)) {
+ wq->cycles.granularity =
+ ((cycles > 0) ? cycles : WORK_QUEUE_MIN_GRANULARITY);
+ }
+ /* otherwise, should granularity increase? */
+ else if (cycles >= (wq->cycles.granularity)) {
+ if (cycles > wq->cycles.best)
+ wq->cycles.best = cycles;
+
+ /* along with yielded check, provides hysteresis for granularity
+ */
+ if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR
+ * WQ_HYSTERESIS_FACTOR))
+ wq->cycles.granularity *=
+ WQ_HYSTERESIS_FACTOR; /* quick ramp-up */
+ else if (cycles
+ > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR))
+ wq->cycles.granularity += WQ_HYSTERESIS_FACTOR;
+ }
+#undef WQ_HYSTERIS_FACTOR
+
+ wq->runs++;
+ wq->cycles.total += cycles;
+ if (yielded)
+ wq->yields++;
+
+ /* Is the queue done yet? If it is, call the completion callback. */
+ if (!work_queue_empty(wq)) {
+ if (ret == WQ_RETRY_LATER ||
+ ret == WQ_QUEUE_BLOCKED)
+ work_queue_schedule(wq, wq->spec.retry);
+ else
+ work_queue_schedule(wq, 0);
+
+ } else if (wq->spec.completion_func)
+ wq->spec.completion_func(wq);
+}