summaryrefslogtreecommitdiffstats
path: root/lib/thread.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--lib/thread.c2217
1 files changed, 2217 insertions, 0 deletions
diff --git a/lib/thread.c b/lib/thread.c
new file mode 100644
index 0000000..9eac9b4
--- /dev/null
+++ b/lib/thread.c
@@ -0,0 +1,2217 @@
+/* Thread management routine
+ * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
+ *
+ * This file is part of GNU Zebra.
+ *
+ * GNU Zebra is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2, or (at your option) any
+ * later version.
+ *
+ * GNU Zebra is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; see the file COPYING; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+/* #define DEBUG */
+
+#include <zebra.h>
+#include <sys/resource.h>
+
+#include "thread.h"
+#include "memory.h"
+#include "frrcu.h"
+#include "log.h"
+#include "hash.h"
+#include "command.h"
+#include "sigevent.h"
+#include "network.h"
+#include "jhash.h"
+#include "frratomic.h"
+#include "frr_pthread.h"
+#include "lib_errors.h"
+#include "libfrr_trace.h"
+#include "libfrr.h"
+
+DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread");
+DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master");
+DEFINE_MTYPE_STATIC(LIB, THREAD_POLL, "Thread Poll Info");
+DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats");
+
+DECLARE_LIST(thread_list, struct thread, threaditem);
+
+struct cancel_req {
+ int flags;
+ struct thread *thread;
+ void *eventobj;
+ struct thread **threadref;
+};
+
+/* Flags for task cancellation */
+#define THREAD_CANCEL_FLAG_READY 0x01
+
+static int thread_timer_cmp(const struct thread *a, const struct thread *b)
+{
+ if (a->u.sands.tv_sec < b->u.sands.tv_sec)
+ return -1;
+ if (a->u.sands.tv_sec > b->u.sands.tv_sec)
+ return 1;
+ if (a->u.sands.tv_usec < b->u.sands.tv_usec)
+ return -1;
+ if (a->u.sands.tv_usec > b->u.sands.tv_usec)
+ return 1;
+ return 0;
+}
+
+DECLARE_HEAP(thread_timer_list, struct thread, timeritem, thread_timer_cmp);
+
+#if defined(__APPLE__)
+#include <mach/mach.h>
+#include <mach/mach_time.h>
+#endif
+
+#define AWAKEN(m) \
+ do { \
+ const unsigned char wakebyte = 0x01; \
+ write(m->io_pipe[1], &wakebyte, 1); \
+ } while (0);
+
+/* control variable for initializer */
+static pthread_once_t init_once = PTHREAD_ONCE_INIT;
+pthread_key_t thread_current;
+
+static pthread_mutex_t masters_mtx = PTHREAD_MUTEX_INITIALIZER;
+static struct list *masters;
+
+static void thread_free(struct thread_master *master, struct thread *thread);
+
+#ifndef EXCLUDE_CPU_TIME
+#define EXCLUDE_CPU_TIME 0
+#endif
+#ifndef CONSUMED_TIME_CHECK
+#define CONSUMED_TIME_CHECK 0
+#endif
+
+bool cputime_enabled = !EXCLUDE_CPU_TIME;
+unsigned long cputime_threshold = CONSUMED_TIME_CHECK;
+unsigned long walltime_threshold = CONSUMED_TIME_CHECK;
+
+/* CLI start ---------------------------------------------------------------- */
+#ifndef VTYSH_EXTRACT_PL
+#include "lib/thread_clippy.c"
+#endif
+
+static unsigned int cpu_record_hash_key(const struct cpu_thread_history *a)
+{
+ int size = sizeof(a->func);
+
+ return jhash(&a->func, size, 0);
+}
+
+static bool cpu_record_hash_cmp(const struct cpu_thread_history *a,
+ const struct cpu_thread_history *b)
+{
+ return a->func == b->func;
+}
+
+static void *cpu_record_hash_alloc(struct cpu_thread_history *a)
+{
+ struct cpu_thread_history *new;
+ new = XCALLOC(MTYPE_THREAD_STATS, sizeof(struct cpu_thread_history));
+ new->func = a->func;
+ new->funcname = a->funcname;
+ return new;
+}
+
+static void cpu_record_hash_free(void *a)
+{
+ struct cpu_thread_history *hist = a;
+
+ XFREE(MTYPE_THREAD_STATS, hist);
+}
+
+static void vty_out_cpu_thread_history(struct vty *vty,
+ struct cpu_thread_history *a)
+{
+ vty_out(vty,
+ "%5zu %10zu.%03zu %9zu %8zu %9zu %8zu %9zu %9zu %9zu %10zu",
+ a->total_active, a->cpu.total / 1000, a->cpu.total % 1000,
+ a->total_calls, (a->cpu.total / a->total_calls), a->cpu.max,
+ (a->real.total / a->total_calls), a->real.max,
+ a->total_cpu_warn, a->total_wall_warn, a->total_starv_warn);
+ vty_out(vty, " %c%c%c%c%c %s\n",
+ a->types & (1 << THREAD_READ) ? 'R' : ' ',
+ a->types & (1 << THREAD_WRITE) ? 'W' : ' ',
+ a->types & (1 << THREAD_TIMER) ? 'T' : ' ',
+ a->types & (1 << THREAD_EVENT) ? 'E' : ' ',
+ a->types & (1 << THREAD_EXECUTE) ? 'X' : ' ', a->funcname);
+}
+
+static void cpu_record_hash_print(struct hash_bucket *bucket, void *args[])
+{
+ struct cpu_thread_history *totals = args[0];
+ struct cpu_thread_history copy;
+ struct vty *vty = args[1];
+ uint8_t *filter = args[2];
+
+ struct cpu_thread_history *a = bucket->data;
+
+ copy.total_active =
+ atomic_load_explicit(&a->total_active, memory_order_seq_cst);
+ copy.total_calls =
+ atomic_load_explicit(&a->total_calls, memory_order_seq_cst);
+ copy.total_cpu_warn =
+ atomic_load_explicit(&a->total_cpu_warn, memory_order_seq_cst);
+ copy.total_wall_warn =
+ atomic_load_explicit(&a->total_wall_warn, memory_order_seq_cst);
+ copy.total_starv_warn = atomic_load_explicit(&a->total_starv_warn,
+ memory_order_seq_cst);
+ copy.cpu.total =
+ atomic_load_explicit(&a->cpu.total, memory_order_seq_cst);
+ copy.cpu.max = atomic_load_explicit(&a->cpu.max, memory_order_seq_cst);
+ copy.real.total =
+ atomic_load_explicit(&a->real.total, memory_order_seq_cst);
+ copy.real.max =
+ atomic_load_explicit(&a->real.max, memory_order_seq_cst);
+ copy.types = atomic_load_explicit(&a->types, memory_order_seq_cst);
+ copy.funcname = a->funcname;
+
+ if (!(copy.types & *filter))
+ return;
+
+ vty_out_cpu_thread_history(vty, &copy);
+ totals->total_active += copy.total_active;
+ totals->total_calls += copy.total_calls;
+ totals->total_cpu_warn += copy.total_cpu_warn;
+ totals->total_wall_warn += copy.total_wall_warn;
+ totals->total_starv_warn += copy.total_starv_warn;
+ totals->real.total += copy.real.total;
+ if (totals->real.max < copy.real.max)
+ totals->real.max = copy.real.max;
+ totals->cpu.total += copy.cpu.total;
+ if (totals->cpu.max < copy.cpu.max)
+ totals->cpu.max = copy.cpu.max;
+}
+
+static void cpu_record_print(struct vty *vty, uint8_t filter)
+{
+ struct cpu_thread_history tmp;
+ void *args[3] = {&tmp, vty, &filter};
+ struct thread_master *m;
+ struct listnode *ln;
+
+ if (!cputime_enabled)
+ vty_out(vty,
+ "\n"
+ "Collecting CPU time statistics is currently disabled. Following statistics\n"
+ "will be zero or may display data from when collection was enabled. Use the\n"
+ " \"service cputime-stats\" command to start collecting data.\n"
+ "\nCounters and wallclock times are always maintained and should be accurate.\n");
+
+ memset(&tmp, 0, sizeof(tmp));
+ tmp.funcname = "TOTAL";
+ tmp.types = filter;
+
+ frr_with_mutex (&masters_mtx) {
+ for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
+ const char *name = m->name ? m->name : "main";
+
+ char underline[strlen(name) + 1];
+ memset(underline, '-', sizeof(underline));
+ underline[sizeof(underline) - 1] = '\0';
+
+ vty_out(vty, "\n");
+ vty_out(vty, "Showing statistics for pthread %s\n",
+ name);
+ vty_out(vty, "-------------------------------%s\n",
+ underline);
+ vty_out(vty, "%30s %18s %18s\n", "",
+ "CPU (user+system):", "Real (wall-clock):");
+ vty_out(vty,
+ "Active Runtime(ms) Invoked Avg uSec Max uSecs");
+ vty_out(vty, " Avg uSec Max uSecs");
+ vty_out(vty,
+ " CPU_Warn Wall_Warn Starv_Warn Type Thread\n");
+
+ if (m->cpu_record->count)
+ hash_iterate(
+ m->cpu_record,
+ (void (*)(struct hash_bucket *,
+ void *))cpu_record_hash_print,
+ args);
+ else
+ vty_out(vty, "No data to display yet.\n");
+
+ vty_out(vty, "\n");
+ }
+ }
+
+ vty_out(vty, "\n");
+ vty_out(vty, "Total thread statistics\n");
+ vty_out(vty, "-------------------------\n");
+ vty_out(vty, "%30s %18s %18s\n", "",
+ "CPU (user+system):", "Real (wall-clock):");
+ vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
+ vty_out(vty, " Avg uSec Max uSecs CPU_Warn Wall_Warn");
+ vty_out(vty, " Type Thread\n");
+
+ if (tmp.total_calls > 0)
+ vty_out_cpu_thread_history(vty, &tmp);
+}
+
+static void cpu_record_hash_clear(struct hash_bucket *bucket, void *args[])
+{
+ uint8_t *filter = args[0];
+ struct hash *cpu_record = args[1];
+
+ struct cpu_thread_history *a = bucket->data;
+
+ if (!(a->types & *filter))
+ return;
+
+ hash_release(cpu_record, bucket->data);
+}
+
+static void cpu_record_clear(uint8_t filter)
+{
+ uint8_t *tmp = &filter;
+ struct thread_master *m;
+ struct listnode *ln;
+
+ frr_with_mutex (&masters_mtx) {
+ for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
+ frr_with_mutex (&m->mtx) {
+ void *args[2] = {tmp, m->cpu_record};
+ hash_iterate(
+ m->cpu_record,
+ (void (*)(struct hash_bucket *,
+ void *))cpu_record_hash_clear,
+ args);
+ }
+ }
+ }
+}
+
+static uint8_t parse_filter(const char *filterstr)
+{
+ int i = 0;
+ int filter = 0;
+
+ while (filterstr[i] != '\0') {
+ switch (filterstr[i]) {
+ case 'r':
+ case 'R':
+ filter |= (1 << THREAD_READ);
+ break;
+ case 'w':
+ case 'W':
+ filter |= (1 << THREAD_WRITE);
+ break;
+ case 't':
+ case 'T':
+ filter |= (1 << THREAD_TIMER);
+ break;
+ case 'e':
+ case 'E':
+ filter |= (1 << THREAD_EVENT);
+ break;
+ case 'x':
+ case 'X':
+ filter |= (1 << THREAD_EXECUTE);
+ break;
+ default:
+ break;
+ }
+ ++i;
+ }
+ return filter;
+}
+
+DEFUN_NOSH (show_thread_cpu,
+ show_thread_cpu_cmd,
+ "show thread cpu [FILTER]",
+ SHOW_STR
+ "Thread information\n"
+ "Thread CPU usage\n"
+ "Display filter (rwtex)\n")
+{
+ uint8_t filter = (uint8_t)-1U;
+ int idx = 0;
+
+ if (argv_find(argv, argc, "FILTER", &idx)) {
+ filter = parse_filter(argv[idx]->arg);
+ if (!filter) {
+ vty_out(vty,
+ "Invalid filter \"%s\" specified; must contain at leastone of 'RWTEXB'\n",
+ argv[idx]->arg);
+ return CMD_WARNING;
+ }
+ }
+
+ cpu_record_print(vty, filter);
+ return CMD_SUCCESS;
+}
+
+DEFPY (service_cputime_stats,
+ service_cputime_stats_cmd,
+ "[no] service cputime-stats",
+ NO_STR
+ "Set up miscellaneous service\n"
+ "Collect CPU usage statistics\n")
+{
+ cputime_enabled = !no;
+ return CMD_SUCCESS;
+}
+
+DEFPY (service_cputime_warning,
+ service_cputime_warning_cmd,
+ "[no] service cputime-warning (1-4294967295)",
+ NO_STR
+ "Set up miscellaneous service\n"
+ "Warn for tasks exceeding CPU usage threshold\n"
+ "Warning threshold in milliseconds\n")
+{
+ if (no)
+ cputime_threshold = 0;
+ else
+ cputime_threshold = cputime_warning * 1000;
+ return CMD_SUCCESS;
+}
+
+ALIAS (service_cputime_warning,
+ no_service_cputime_warning_cmd,
+ "no service cputime-warning",
+ NO_STR
+ "Set up miscellaneous service\n"
+ "Warn for tasks exceeding CPU usage threshold\n")
+
+DEFPY (service_walltime_warning,
+ service_walltime_warning_cmd,
+ "[no] service walltime-warning (1-4294967295)",
+ NO_STR
+ "Set up miscellaneous service\n"
+ "Warn for tasks exceeding total wallclock threshold\n"
+ "Warning threshold in milliseconds\n")
+{
+ if (no)
+ walltime_threshold = 0;
+ else
+ walltime_threshold = walltime_warning * 1000;
+ return CMD_SUCCESS;
+}
+
+ALIAS (service_walltime_warning,
+ no_service_walltime_warning_cmd,
+ "no service walltime-warning",
+ NO_STR
+ "Set up miscellaneous service\n"
+ "Warn for tasks exceeding total wallclock threshold\n")
+
+static void show_thread_poll_helper(struct vty *vty, struct thread_master *m)
+{
+ const char *name = m->name ? m->name : "main";
+ char underline[strlen(name) + 1];
+ struct thread *thread;
+ uint32_t i;
+
+ memset(underline, '-', sizeof(underline));
+ underline[sizeof(underline) - 1] = '\0';
+
+ vty_out(vty, "\nShowing poll FD's for %s\n", name);
+ vty_out(vty, "----------------------%s\n", underline);
+ vty_out(vty, "Count: %u/%d\n", (uint32_t)m->handler.pfdcount,
+ m->fd_limit);
+ for (i = 0; i < m->handler.pfdcount; i++) {
+ vty_out(vty, "\t%6d fd:%6d events:%2d revents:%2d\t\t", i,
+ m->handler.pfds[i].fd, m->handler.pfds[i].events,
+ m->handler.pfds[i].revents);
+
+ if (m->handler.pfds[i].events & POLLIN) {
+ thread = m->read[m->handler.pfds[i].fd];
+
+ if (!thread)
+ vty_out(vty, "ERROR ");
+ else
+ vty_out(vty, "%s ", thread->xref->funcname);
+ } else
+ vty_out(vty, " ");
+
+ if (m->handler.pfds[i].events & POLLOUT) {
+ thread = m->write[m->handler.pfds[i].fd];
+
+ if (!thread)
+ vty_out(vty, "ERROR\n");
+ else
+ vty_out(vty, "%s\n", thread->xref->funcname);
+ } else
+ vty_out(vty, "\n");
+ }
+}
+
+DEFUN_NOSH (show_thread_poll,
+ show_thread_poll_cmd,
+ "show thread poll",
+ SHOW_STR
+ "Thread information\n"
+ "Show poll FD's and information\n")
+{
+ struct listnode *node;
+ struct thread_master *m;
+
+ frr_with_mutex (&masters_mtx) {
+ for (ALL_LIST_ELEMENTS_RO(masters, node, m)) {
+ show_thread_poll_helper(vty, m);
+ }
+ }
+
+ return CMD_SUCCESS;
+}
+
+
+DEFUN (clear_thread_cpu,
+ clear_thread_cpu_cmd,
+ "clear thread cpu [FILTER]",
+ "Clear stored data in all pthreads\n"
+ "Thread information\n"
+ "Thread CPU usage\n"
+ "Display filter (rwtexb)\n")
+{
+ uint8_t filter = (uint8_t)-1U;
+ int idx = 0;
+
+ if (argv_find(argv, argc, "FILTER", &idx)) {
+ filter = parse_filter(argv[idx]->arg);
+ if (!filter) {
+ vty_out(vty,
+ "Invalid filter \"%s\" specified; must contain at leastone of 'RWTEXB'\n",
+ argv[idx]->arg);
+ return CMD_WARNING;
+ }
+ }
+
+ cpu_record_clear(filter);
+ return CMD_SUCCESS;
+}
+
+static void show_thread_timers_helper(struct vty *vty, struct thread_master *m)
+{
+ const char *name = m->name ? m->name : "main";
+ char underline[strlen(name) + 1];
+ struct thread *thread;
+
+ memset(underline, '-', sizeof(underline));
+ underline[sizeof(underline) - 1] = '\0';
+
+ vty_out(vty, "\nShowing timers for %s\n", name);
+ vty_out(vty, "-------------------%s\n", underline);
+
+ frr_each (thread_timer_list, &m->timer, thread) {
+ vty_out(vty, " %-50s%pTH\n", thread->hist->funcname, thread);
+ }
+}
+
+DEFPY_NOSH (show_thread_timers,
+ show_thread_timers_cmd,
+ "show thread timers",
+ SHOW_STR
+ "Thread information\n"
+ "Show all timers and how long they have in the system\n")
+{
+ struct listnode *node;
+ struct thread_master *m;
+
+ frr_with_mutex (&masters_mtx) {
+ for (ALL_LIST_ELEMENTS_RO(masters, node, m))
+ show_thread_timers_helper(vty, m);
+ }
+
+ return CMD_SUCCESS;
+}
+
+void thread_cmd_init(void)
+{
+ install_element(VIEW_NODE, &show_thread_cpu_cmd);
+ install_element(VIEW_NODE, &show_thread_poll_cmd);
+ install_element(ENABLE_NODE, &clear_thread_cpu_cmd);
+
+ install_element(CONFIG_NODE, &service_cputime_stats_cmd);
+ install_element(CONFIG_NODE, &service_cputime_warning_cmd);
+ install_element(CONFIG_NODE, &no_service_cputime_warning_cmd);
+ install_element(CONFIG_NODE, &service_walltime_warning_cmd);
+ install_element(CONFIG_NODE, &no_service_walltime_warning_cmd);
+
+ install_element(VIEW_NODE, &show_thread_timers_cmd);
+}
+/* CLI end ------------------------------------------------------------------ */
+
+
+static void cancelreq_del(void *cr)
+{
+ XFREE(MTYPE_TMP, cr);
+}
+
+/* initializer, only ever called once */
+static void initializer(void)
+{
+ pthread_key_create(&thread_current, NULL);
+}
+
+struct thread_master *thread_master_create(const char *name)
+{
+ struct thread_master *rv;
+ struct rlimit limit;
+
+ pthread_once(&init_once, &initializer);
+
+ rv = XCALLOC(MTYPE_THREAD_MASTER, sizeof(struct thread_master));
+
+ /* Initialize master mutex */
+ pthread_mutex_init(&rv->mtx, NULL);
+ pthread_cond_init(&rv->cancel_cond, NULL);
+
+ /* Set name */
+ name = name ? name : "default";
+ rv->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
+
+ /* Initialize I/O task data structures */
+
+ /* Use configured limit if present, ulimit otherwise. */
+ rv->fd_limit = frr_get_fd_limit();
+ if (rv->fd_limit == 0) {
+ getrlimit(RLIMIT_NOFILE, &limit);
+ rv->fd_limit = (int)limit.rlim_cur;
+ }
+
+ rv->read = XCALLOC(MTYPE_THREAD_POLL,
+ sizeof(struct thread *) * rv->fd_limit);
+
+ rv->write = XCALLOC(MTYPE_THREAD_POLL,
+ sizeof(struct thread *) * rv->fd_limit);
+
+ char tmhashname[strlen(name) + 32];
+ snprintf(tmhashname, sizeof(tmhashname), "%s - threadmaster event hash",
+ name);
+ rv->cpu_record = hash_create_size(
+ 8, (unsigned int (*)(const void *))cpu_record_hash_key,
+ (bool (*)(const void *, const void *))cpu_record_hash_cmp,
+ tmhashname);
+
+ thread_list_init(&rv->event);
+ thread_list_init(&rv->ready);
+ thread_list_init(&rv->unuse);
+ thread_timer_list_init(&rv->timer);
+
+ /* Initialize thread_fetch() settings */
+ rv->spin = true;
+ rv->handle_signals = true;
+
+ /* Set pthread owner, should be updated by actual owner */
+ rv->owner = pthread_self();
+ rv->cancel_req = list_new();
+ rv->cancel_req->del = cancelreq_del;
+ rv->canceled = true;
+
+ /* Initialize pipe poker */
+ pipe(rv->io_pipe);
+ set_nonblocking(rv->io_pipe[0]);
+ set_nonblocking(rv->io_pipe[1]);
+
+ /* Initialize data structures for poll() */
+ rv->handler.pfdsize = rv->fd_limit;
+ rv->handler.pfdcount = 0;
+ rv->handler.pfds = XCALLOC(MTYPE_THREAD_MASTER,
+ sizeof(struct pollfd) * rv->handler.pfdsize);
+ rv->handler.copy = XCALLOC(MTYPE_THREAD_MASTER,
+ sizeof(struct pollfd) * rv->handler.pfdsize);
+
+ /* add to list of threadmasters */
+ frr_with_mutex (&masters_mtx) {
+ if (!masters)
+ masters = list_new();
+
+ listnode_add(masters, rv);
+ }
+
+ return rv;
+}
+
+void thread_master_set_name(struct thread_master *master, const char *name)
+{
+ frr_with_mutex (&master->mtx) {
+ XFREE(MTYPE_THREAD_MASTER, master->name);
+ master->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
+ }
+}
+
+#define THREAD_UNUSED_DEPTH 10
+
+/* Move thread to unuse list. */
+static void thread_add_unuse(struct thread_master *m, struct thread *thread)
+{
+ pthread_mutex_t mtxc = thread->mtx;
+
+ assert(m != NULL && thread != NULL);
+
+ thread->hist->total_active--;
+ memset(thread, 0, sizeof(struct thread));
+ thread->type = THREAD_UNUSED;
+
+ /* Restore the thread mutex context. */
+ thread->mtx = mtxc;
+
+ if (thread_list_count(&m->unuse) < THREAD_UNUSED_DEPTH) {
+ thread_list_add_tail(&m->unuse, thread);
+ return;
+ }
+
+ thread_free(m, thread);
+}
+
+/* Free all unused thread. */
+static void thread_list_free(struct thread_master *m,
+ struct thread_list_head *list)
+{
+ struct thread *t;
+
+ while ((t = thread_list_pop(list)))
+ thread_free(m, t);
+}
+
+static void thread_array_free(struct thread_master *m,
+ struct thread **thread_array)
+{
+ struct thread *t;
+ int index;
+
+ for (index = 0; index < m->fd_limit; ++index) {
+ t = thread_array[index];
+ if (t) {
+ thread_array[index] = NULL;
+ thread_free(m, t);
+ }
+ }
+ XFREE(MTYPE_THREAD_POLL, thread_array);
+}
+
+/*
+ * thread_master_free_unused
+ *
+ * As threads are finished with they are put on the
+ * unuse list for later reuse.
+ * If we are shutting down, Free up unused threads
+ * So we can see if we forget to shut anything off
+ */
+void thread_master_free_unused(struct thread_master *m)
+{
+ frr_with_mutex (&m->mtx) {
+ struct thread *t;
+ while ((t = thread_list_pop(&m->unuse)))
+ thread_free(m, t);
+ }
+}
+
+/* Stop thread scheduler. */
+void thread_master_free(struct thread_master *m)
+{
+ struct thread *t;
+
+ frr_with_mutex (&masters_mtx) {
+ listnode_delete(masters, m);
+ if (masters->count == 0) {
+ list_delete(&masters);
+ }
+ }
+
+ thread_array_free(m, m->read);
+ thread_array_free(m, m->write);
+ while ((t = thread_timer_list_pop(&m->timer)))
+ thread_free(m, t);
+ thread_list_free(m, &m->event);
+ thread_list_free(m, &m->ready);
+ thread_list_free(m, &m->unuse);
+ pthread_mutex_destroy(&m->mtx);
+ pthread_cond_destroy(&m->cancel_cond);
+ close(m->io_pipe[0]);
+ close(m->io_pipe[1]);
+ list_delete(&m->cancel_req);
+ m->cancel_req = NULL;
+
+ hash_clean(m->cpu_record, cpu_record_hash_free);
+ hash_free(m->cpu_record);
+ m->cpu_record = NULL;
+
+ XFREE(MTYPE_THREAD_MASTER, m->name);
+ XFREE(MTYPE_THREAD_MASTER, m->handler.pfds);
+ XFREE(MTYPE_THREAD_MASTER, m->handler.copy);
+ XFREE(MTYPE_THREAD_MASTER, m);
+}
+
+/* Return remain time in milliseconds. */
+unsigned long thread_timer_remain_msec(struct thread *thread)
+{
+ int64_t remain;
+
+ if (!thread_is_scheduled(thread))
+ return 0;
+
+ frr_with_mutex (&thread->mtx) {
+ remain = monotime_until(&thread->u.sands, NULL) / 1000LL;
+ }
+
+ return remain < 0 ? 0 : remain;
+}
+
+/* Return remain time in seconds. */
+unsigned long thread_timer_remain_second(struct thread *thread)
+{
+ return thread_timer_remain_msec(thread) / 1000LL;
+}
+
+struct timeval thread_timer_remain(struct thread *thread)
+{
+ struct timeval remain;
+ frr_with_mutex (&thread->mtx) {
+ monotime_until(&thread->u.sands, &remain);
+ }
+ return remain;
+}
+
+static int time_hhmmss(char *buf, int buf_size, long sec)
+{
+ long hh;
+ long mm;
+ int wr;
+
+ assert(buf_size >= 8);
+
+ hh = sec / 3600;
+ sec %= 3600;
+ mm = sec / 60;
+ sec %= 60;
+
+ wr = snprintf(buf, buf_size, "%02ld:%02ld:%02ld", hh, mm, sec);
+
+ return wr != 8;
+}
+
+char *thread_timer_to_hhmmss(char *buf, int buf_size,
+ struct thread *t_timer)
+{
+ if (t_timer) {
+ time_hhmmss(buf, buf_size,
+ thread_timer_remain_second(t_timer));
+ } else {
+ snprintf(buf, buf_size, "--:--:--");
+ }
+ return buf;
+}
+
+/* Get new thread. */
+static struct thread *thread_get(struct thread_master *m, uint8_t type,
+ void (*func)(struct thread *), void *arg,
+ const struct xref_threadsched *xref)
+{
+ struct thread *thread = thread_list_pop(&m->unuse);
+ struct cpu_thread_history tmp;
+
+ if (!thread) {
+ thread = XCALLOC(MTYPE_THREAD, sizeof(struct thread));
+ /* mutex only needs to be initialized at struct creation. */
+ pthread_mutex_init(&thread->mtx, NULL);
+ m->alloc++;
+ }
+
+ thread->type = type;
+ thread->add_type = type;
+ thread->master = m;
+ thread->arg = arg;
+ thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
+ thread->ref = NULL;
+ thread->ignore_timer_late = false;
+
+ /*
+ * So if the passed in funcname is not what we have
+ * stored that means the thread->hist needs to be
+ * updated. We keep the last one around in unused
+ * under the assumption that we are probably
+ * going to immediately allocate the same
+ * type of thread.
+ * This hopefully saves us some serious
+ * hash_get lookups.
+ */
+ if ((thread->xref && thread->xref->funcname != xref->funcname)
+ || thread->func != func) {
+ tmp.func = func;
+ tmp.funcname = xref->funcname;
+ thread->hist =
+ hash_get(m->cpu_record, &tmp,
+ (void *(*)(void *))cpu_record_hash_alloc);
+ }
+ thread->hist->total_active++;
+ thread->func = func;
+ thread->xref = xref;
+
+ return thread;
+}
+
+static void thread_free(struct thread_master *master, struct thread *thread)
+{
+ /* Update statistics. */
+ assert(master->alloc > 0);
+ master->alloc--;
+
+ /* Free allocated resources. */
+ pthread_mutex_destroy(&thread->mtx);
+ XFREE(MTYPE_THREAD, thread);
+}
+
+static int fd_poll(struct thread_master *m, const struct timeval *timer_wait,
+ bool *eintr_p)
+{
+ sigset_t origsigs;
+ unsigned char trash[64];
+ nfds_t count = m->handler.copycount;
+
+ /*
+ * If timer_wait is null here, that means poll() should block
+ * indefinitely, unless the thread_master has overridden it by setting
+ * ->selectpoll_timeout.
+ *
+ * If the value is positive, it specifies the maximum number of
+ * milliseconds to wait. If the timeout is -1, it specifies that
+ * we should never wait and always return immediately even if no
+ * event is detected. If the value is zero, the behavior is default.
+ */
+ int timeout = -1;
+
+ /* number of file descriptors with events */
+ int num;
+
+ if (timer_wait != NULL
+ && m->selectpoll_timeout == 0) // use the default value
+ timeout = (timer_wait->tv_sec * 1000)
+ + (timer_wait->tv_usec / 1000);
+ else if (m->selectpoll_timeout > 0) // use the user's timeout
+ timeout = m->selectpoll_timeout;
+ else if (m->selectpoll_timeout
+ < 0) // effect a poll (return immediately)
+ timeout = 0;
+
+ zlog_tls_buffer_flush();
+ rcu_read_unlock();
+ rcu_assert_read_unlocked();
+
+ /* add poll pipe poker */
+ assert(count + 1 < m->handler.pfdsize);
+ m->handler.copy[count].fd = m->io_pipe[0];
+ m->handler.copy[count].events = POLLIN;
+ m->handler.copy[count].revents = 0x00;
+
+ /* We need to deal with a signal-handling race here: we
+ * don't want to miss a crucial signal, such as SIGTERM or SIGINT,
+ * that may arrive just before we enter poll(). We will block the
+ * key signals, then check whether any have arrived - if so, we return
+ * before calling poll(). If not, we'll re-enable the signals
+ * in the ppoll() call.
+ */
+
+ sigemptyset(&origsigs);
+ if (m->handle_signals) {
+ /* Main pthread that handles the app signals */
+ if (frr_sigevent_check(&origsigs)) {
+ /* Signal to process - restore signal mask and return */
+ pthread_sigmask(SIG_SETMASK, &origsigs, NULL);
+ num = -1;
+ *eintr_p = true;
+ goto done;
+ }
+ } else {
+ /* Don't make any changes for the non-main pthreads */
+ pthread_sigmask(SIG_SETMASK, NULL, &origsigs);
+ }
+
+#if defined(HAVE_PPOLL)
+ struct timespec ts, *tsp;
+
+ if (timeout >= 0) {
+ ts.tv_sec = timeout / 1000;
+ ts.tv_nsec = (timeout % 1000) * 1000000;
+ tsp = &ts;
+ } else
+ tsp = NULL;
+
+ num = ppoll(m->handler.copy, count + 1, tsp, &origsigs);
+ pthread_sigmask(SIG_SETMASK, &origsigs, NULL);
+#else
+ /* Not ideal - there is a race after we restore the signal mask */
+ pthread_sigmask(SIG_SETMASK, &origsigs, NULL);
+ num = poll(m->handler.copy, count + 1, timeout);
+#endif
+
+done:
+
+ if (num < 0 && errno == EINTR)
+ *eintr_p = true;
+
+ if (num > 0 && m->handler.copy[count].revents != 0 && num--)
+ while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0)
+ ;
+
+ rcu_read_lock();
+
+ return num;
+}
+
+/* Add new read thread. */
+void _thread_add_read_write(const struct xref_threadsched *xref,
+ struct thread_master *m,
+ void (*func)(struct thread *), void *arg, int fd,
+ struct thread **t_ptr)
+{
+ int dir = xref->thread_type;
+ struct thread *thread = NULL;
+ struct thread **thread_array;
+
+ if (dir == THREAD_READ)
+ frrtrace(9, frr_libfrr, schedule_read, m,
+ xref->funcname, xref->xref.file, xref->xref.line,
+ t_ptr, fd, 0, arg, 0);
+ else
+ frrtrace(9, frr_libfrr, schedule_write, m,
+ xref->funcname, xref->xref.file, xref->xref.line,
+ t_ptr, fd, 0, arg, 0);
+
+ assert(fd >= 0);
+ if (fd >= m->fd_limit)
+ assert(!"Number of FD's open is greater than FRR currently configured to handle, aborting");
+
+ frr_with_mutex (&m->mtx) {
+ if (t_ptr && *t_ptr)
+ // thread is already scheduled; don't reschedule
+ break;
+
+ /* default to a new pollfd */
+ nfds_t queuepos = m->handler.pfdcount;
+
+ if (dir == THREAD_READ)
+ thread_array = m->read;
+ else
+ thread_array = m->write;
+
+ /* if we already have a pollfd for our file descriptor, find and
+ * use it */
+ for (nfds_t i = 0; i < m->handler.pfdcount; i++)
+ if (m->handler.pfds[i].fd == fd) {
+ queuepos = i;
+
+#ifdef DEV_BUILD
+ /*
+ * What happens if we have a thread already
+ * created for this event?
+ */
+ if (thread_array[fd])
+ assert(!"Thread already scheduled for file descriptor");
+#endif
+ break;
+ }
+
+ /* make sure we have room for this fd + pipe poker fd */
+ assert(queuepos + 1 < m->handler.pfdsize);
+
+ thread = thread_get(m, dir, func, arg, xref);
+
+ m->handler.pfds[queuepos].fd = fd;
+ m->handler.pfds[queuepos].events |=
+ (dir == THREAD_READ ? POLLIN : POLLOUT);
+
+ if (queuepos == m->handler.pfdcount)
+ m->handler.pfdcount++;
+
+ if (thread) {
+ frr_with_mutex (&thread->mtx) {
+ thread->u.fd = fd;
+ thread_array[thread->u.fd] = thread;
+ }
+
+ if (t_ptr) {
+ *t_ptr = thread;
+ thread->ref = t_ptr;
+ }
+ }
+
+ AWAKEN(m);
+ }
+}
+
+static void _thread_add_timer_timeval(const struct xref_threadsched *xref,
+ struct thread_master *m,
+ void (*func)(struct thread *), void *arg,
+ struct timeval *time_relative,
+ struct thread **t_ptr)
+{
+ struct thread *thread;
+ struct timeval t;
+
+ assert(m != NULL);
+
+ assert(time_relative);
+
+ frrtrace(9, frr_libfrr, schedule_timer, m,
+ xref->funcname, xref->xref.file, xref->xref.line,
+ t_ptr, 0, 0, arg, (long)time_relative->tv_sec);
+
+ /* Compute expiration/deadline time. */
+ monotime(&t);
+ timeradd(&t, time_relative, &t);
+
+ frr_with_mutex (&m->mtx) {
+ if (t_ptr && *t_ptr)
+ /* thread is already scheduled; don't reschedule */
+ return;
+
+ thread = thread_get(m, THREAD_TIMER, func, arg, xref);
+
+ frr_with_mutex (&thread->mtx) {
+ thread->u.sands = t;
+ thread_timer_list_add(&m->timer, thread);
+ if (t_ptr) {
+ *t_ptr = thread;
+ thread->ref = t_ptr;
+ }
+ }
+
+ /* The timer list is sorted - if this new timer
+ * might change the time we'll wait for, give the pthread
+ * a chance to re-compute.
+ */
+ if (thread_timer_list_first(&m->timer) == thread)
+ AWAKEN(m);
+ }
+#define ONEYEAR2SEC (60 * 60 * 24 * 365)
+ if (time_relative->tv_sec > ONEYEAR2SEC)
+ flog_err(
+ EC_LIB_TIMER_TOO_LONG,
+ "Timer: %pTHD is created with an expiration that is greater than 1 year",
+ thread);
+}
+
+
+/* Add timer event thread. */
+void _thread_add_timer(const struct xref_threadsched *xref,
+ struct thread_master *m, void (*func)(struct thread *),
+ void *arg, long timer, struct thread **t_ptr)
+{
+ struct timeval trel;
+
+ assert(m != NULL);
+
+ trel.tv_sec = timer;
+ trel.tv_usec = 0;
+
+ _thread_add_timer_timeval(xref, m, func, arg, &trel, t_ptr);
+}
+
+/* Add timer event thread with "millisecond" resolution */
+void _thread_add_timer_msec(const struct xref_threadsched *xref,
+ struct thread_master *m,
+ void (*func)(struct thread *), void *arg,
+ long timer, struct thread **t_ptr)
+{
+ struct timeval trel;
+
+ assert(m != NULL);
+
+ trel.tv_sec = timer / 1000;
+ trel.tv_usec = 1000 * (timer % 1000);
+
+ _thread_add_timer_timeval(xref, m, func, arg, &trel, t_ptr);
+}
+
+/* Add timer event thread with "timeval" resolution */
+void _thread_add_timer_tv(const struct xref_threadsched *xref,
+ struct thread_master *m,
+ void (*func)(struct thread *), void *arg,
+ struct timeval *tv, struct thread **t_ptr)
+{
+ _thread_add_timer_timeval(xref, m, func, arg, tv, t_ptr);
+}
+
+/* Add simple event thread. */
+void _thread_add_event(const struct xref_threadsched *xref,
+ struct thread_master *m, void (*func)(struct thread *),
+ void *arg, int val, struct thread **t_ptr)
+{
+ struct thread *thread = NULL;
+
+ frrtrace(9, frr_libfrr, schedule_event, m,
+ xref->funcname, xref->xref.file, xref->xref.line,
+ t_ptr, 0, val, arg, 0);
+
+ assert(m != NULL);
+
+ frr_with_mutex (&m->mtx) {
+ if (t_ptr && *t_ptr)
+ /* thread is already scheduled; don't reschedule */
+ break;
+
+ thread = thread_get(m, THREAD_EVENT, func, arg, xref);
+ frr_with_mutex (&thread->mtx) {
+ thread->u.val = val;
+ thread_list_add_tail(&m->event, thread);
+ }
+
+ if (t_ptr) {
+ *t_ptr = thread;
+ thread->ref = t_ptr;
+ }
+
+ AWAKEN(m);
+ }
+}
+
+/* Thread cancellation ------------------------------------------------------ */
+
+/**
+ * NOT's out the .events field of pollfd corresponding to the given file
+ * descriptor. The event to be NOT'd is passed in the 'state' parameter.
+ *
+ * This needs to happen for both copies of pollfd's. See 'thread_fetch'
+ * implementation for details.
+ *
+ * @param master
+ * @param fd
+ * @param state the event to cancel. One or more (OR'd together) of the
+ * following:
+ * - POLLIN
+ * - POLLOUT
+ */
+static void thread_cancel_rw(struct thread_master *master, int fd, short state,
+ int idx_hint)
+{
+ bool found = false;
+
+ /* find the index of corresponding pollfd */
+ nfds_t i;
+
+ /* Cancel POLLHUP too just in case some bozo set it */
+ state |= POLLHUP;
+
+ /* Some callers know the index of the pfd already */
+ if (idx_hint >= 0) {
+ i = idx_hint;
+ found = true;
+ } else {
+ /* Have to look for the fd in the pfd array */
+ for (i = 0; i < master->handler.pfdcount; i++)
+ if (master->handler.pfds[i].fd == fd) {
+ found = true;
+ break;
+ }
+ }
+
+ if (!found) {
+ zlog_debug(
+ "[!] Received cancellation request for nonexistent rw job");
+ zlog_debug("[!] threadmaster: %s | fd: %d",
+ master->name ? master->name : "", fd);
+ return;
+ }
+
+ /* NOT out event. */
+ master->handler.pfds[i].events &= ~(state);
+
+ /* If all events are canceled, delete / resize the pollfd array. */
+ if (master->handler.pfds[i].events == 0) {
+ memmove(master->handler.pfds + i, master->handler.pfds + i + 1,
+ (master->handler.pfdcount - i - 1)
+ * sizeof(struct pollfd));
+ master->handler.pfdcount--;
+ master->handler.pfds[master->handler.pfdcount].fd = 0;
+ master->handler.pfds[master->handler.pfdcount].events = 0;
+ }
+
+ /* If we have the same pollfd in the copy, perform the same operations,
+ * otherwise return. */
+ if (i >= master->handler.copycount)
+ return;
+
+ master->handler.copy[i].events &= ~(state);
+
+ if (master->handler.copy[i].events == 0) {
+ memmove(master->handler.copy + i, master->handler.copy + i + 1,
+ (master->handler.copycount - i - 1)
+ * sizeof(struct pollfd));
+ master->handler.copycount--;
+ master->handler.copy[master->handler.copycount].fd = 0;
+ master->handler.copy[master->handler.copycount].events = 0;
+ }
+}
+
+/*
+ * Process task cancellation given a task argument: iterate through the
+ * various lists of tasks, looking for any that match the argument.
+ */
+static void cancel_arg_helper(struct thread_master *master,
+ const struct cancel_req *cr)
+{
+ struct thread *t;
+ nfds_t i;
+ int fd;
+ struct pollfd *pfd;
+
+ /* We're only processing arg-based cancellations here. */
+ if (cr->eventobj == NULL)
+ return;
+
+ /* First process the ready lists. */
+ frr_each_safe(thread_list, &master->event, t) {
+ if (t->arg != cr->eventobj)
+ continue;
+ thread_list_del(&master->event, t);
+ if (t->ref)
+ *t->ref = NULL;
+ thread_add_unuse(master, t);
+ }
+
+ frr_each_safe(thread_list, &master->ready, t) {
+ if (t->arg != cr->eventobj)
+ continue;
+ thread_list_del(&master->ready, t);
+ if (t->ref)
+ *t->ref = NULL;
+ thread_add_unuse(master, t);
+ }
+
+ /* If requested, stop here and ignore io and timers */
+ if (CHECK_FLAG(cr->flags, THREAD_CANCEL_FLAG_READY))
+ return;
+
+ /* Check the io tasks */
+ for (i = 0; i < master->handler.pfdcount;) {
+ pfd = master->handler.pfds + i;
+
+ if (pfd->events & POLLIN)
+ t = master->read[pfd->fd];
+ else
+ t = master->write[pfd->fd];
+
+ if (t && t->arg == cr->eventobj) {
+ fd = pfd->fd;
+
+ /* Found a match to cancel: clean up fd arrays */
+ thread_cancel_rw(master, pfd->fd, pfd->events, i);
+
+ /* Clean up thread arrays */
+ master->read[fd] = NULL;
+ master->write[fd] = NULL;
+
+ /* Clear caller's ref */
+ if (t->ref)
+ *t->ref = NULL;
+
+ thread_add_unuse(master, t);
+
+ /* Don't increment 'i' since the cancellation will have
+ * removed the entry from the pfd array
+ */
+ } else
+ i++;
+ }
+
+ /* Check the timer tasks */
+ t = thread_timer_list_first(&master->timer);
+ while (t) {
+ struct thread *t_next;
+
+ t_next = thread_timer_list_next(&master->timer, t);
+
+ if (t->arg == cr->eventobj) {
+ thread_timer_list_del(&master->timer, t);
+ if (t->ref)
+ *t->ref = NULL;
+ thread_add_unuse(master, t);
+ }
+
+ t = t_next;
+ }
+}
+
+/**
+ * Process cancellation requests.
+ *
+ * This may only be run from the pthread which owns the thread_master.
+ *
+ * @param master the thread master to process
+ * @REQUIRE master->mtx
+ */
+static void do_thread_cancel(struct thread_master *master)
+{
+ struct thread_list_head *list = NULL;
+ struct thread **thread_array = NULL;
+ struct thread *thread;
+ struct cancel_req *cr;
+ struct listnode *ln;
+
+ for (ALL_LIST_ELEMENTS_RO(master->cancel_req, ln, cr)) {
+ /*
+ * If this is an event object cancellation, search
+ * through task lists deleting any tasks which have the
+ * specified argument - use this handy helper function.
+ */
+ if (cr->eventobj) {
+ cancel_arg_helper(master, cr);
+ continue;
+ }
+
+ /*
+ * The pointer varies depending on whether the cancellation
+ * request was made asynchronously or not. If it was, we
+ * need to check whether the thread even exists anymore
+ * before cancelling it.
+ */
+ thread = (cr->thread) ? cr->thread : *cr->threadref;
+
+ if (!thread)
+ continue;
+
+ list = NULL;
+ thread_array = NULL;
+
+ /* Determine the appropriate queue to cancel the thread from */
+ switch (thread->type) {
+ case THREAD_READ:
+ thread_cancel_rw(master, thread->u.fd, POLLIN, -1);
+ thread_array = master->read;
+ break;
+ case THREAD_WRITE:
+ thread_cancel_rw(master, thread->u.fd, POLLOUT, -1);
+ thread_array = master->write;
+ break;
+ case THREAD_TIMER:
+ thread_timer_list_del(&master->timer, thread);
+ break;
+ case THREAD_EVENT:
+ list = &master->event;
+ break;
+ case THREAD_READY:
+ list = &master->ready;
+ break;
+ default:
+ continue;
+ break;
+ }
+
+ if (list) {
+ thread_list_del(list, thread);
+ } else if (thread_array) {
+ thread_array[thread->u.fd] = NULL;
+ }
+
+ if (thread->ref)
+ *thread->ref = NULL;
+
+ thread_add_unuse(thread->master, thread);
+ }
+
+ /* Delete and free all cancellation requests */
+ if (master->cancel_req)
+ list_delete_all_node(master->cancel_req);
+
+ /* Wake up any threads which may be blocked in thread_cancel_async() */
+ master->canceled = true;
+ pthread_cond_broadcast(&master->cancel_cond);
+}
+
+/*
+ * Helper function used for multiple flavors of arg-based cancellation.
+ */
+static void cancel_event_helper(struct thread_master *m, void *arg, int flags)
+{
+ struct cancel_req *cr;
+
+ assert(m->owner == pthread_self());
+
+ /* Only worth anything if caller supplies an arg. */
+ if (arg == NULL)
+ return;
+
+ cr = XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
+
+ cr->flags = flags;
+
+ frr_with_mutex (&m->mtx) {
+ cr->eventobj = arg;
+ listnode_add(m->cancel_req, cr);
+ do_thread_cancel(m);
+ }
+}
+
+/**
+ * Cancel any events which have the specified argument.
+ *
+ * MT-Unsafe
+ *
+ * @param m the thread_master to cancel from
+ * @param arg the argument passed when creating the event
+ */
+void thread_cancel_event(struct thread_master *master, void *arg)
+{
+ cancel_event_helper(master, arg, 0);
+}
+
+/*
+ * Cancel ready tasks with an arg matching 'arg'
+ *
+ * MT-Unsafe
+ *
+ * @param m the thread_master to cancel from
+ * @param arg the argument passed when creating the event
+ */
+void thread_cancel_event_ready(struct thread_master *m, void *arg)
+{
+
+ /* Only cancel ready/event tasks */
+ cancel_event_helper(m, arg, THREAD_CANCEL_FLAG_READY);
+}
+
+/**
+ * Cancel a specific task.
+ *
+ * MT-Unsafe
+ *
+ * @param thread task to cancel
+ */
+void thread_cancel(struct thread **thread)
+{
+ struct thread_master *master;
+
+ if (thread == NULL || *thread == NULL)
+ return;
+
+ master = (*thread)->master;
+
+ frrtrace(9, frr_libfrr, thread_cancel, master,
+ (*thread)->xref->funcname, (*thread)->xref->xref.file,
+ (*thread)->xref->xref.line, NULL, (*thread)->u.fd,
+ (*thread)->u.val, (*thread)->arg, (*thread)->u.sands.tv_sec);
+
+ assert(master->owner == pthread_self());
+
+ frr_with_mutex (&master->mtx) {
+ struct cancel_req *cr =
+ XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
+ cr->thread = *thread;
+ listnode_add(master->cancel_req, cr);
+ do_thread_cancel(master);
+ }
+
+ *thread = NULL;
+}
+
+/**
+ * Asynchronous cancellation.
+ *
+ * Called with either a struct thread ** or void * to an event argument,
+ * this function posts the correct cancellation request and blocks until it is
+ * serviced.
+ *
+ * If the thread is currently running, execution blocks until it completes.
+ *
+ * The last two parameters are mutually exclusive, i.e. if you pass one the
+ * other must be NULL.
+ *
+ * When the cancellation procedure executes on the target thread_master, the
+ * thread * provided is checked for nullity. If it is null, the thread is
+ * assumed to no longer exist and the cancellation request is a no-op. Thus
+ * users of this API must pass a back-reference when scheduling the original
+ * task.
+ *
+ * MT-Safe
+ *
+ * @param master the thread master with the relevant event / task
+ * @param thread pointer to thread to cancel
+ * @param eventobj the event
+ */
+void thread_cancel_async(struct thread_master *master, struct thread **thread,
+ void *eventobj)
+{
+ assert(!(thread && eventobj) && (thread || eventobj));
+
+ if (thread && *thread)
+ frrtrace(9, frr_libfrr, thread_cancel_async, master,
+ (*thread)->xref->funcname, (*thread)->xref->xref.file,
+ (*thread)->xref->xref.line, NULL, (*thread)->u.fd,
+ (*thread)->u.val, (*thread)->arg,
+ (*thread)->u.sands.tv_sec);
+ else
+ frrtrace(9, frr_libfrr, thread_cancel_async, master, NULL, NULL,
+ 0, NULL, 0, 0, eventobj, 0);
+
+ assert(master->owner != pthread_self());
+
+ frr_with_mutex (&master->mtx) {
+ master->canceled = false;
+
+ if (thread) {
+ struct cancel_req *cr =
+ XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
+ cr->threadref = thread;
+ listnode_add(master->cancel_req, cr);
+ } else if (eventobj) {
+ struct cancel_req *cr =
+ XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
+ cr->eventobj = eventobj;
+ listnode_add(master->cancel_req, cr);
+ }
+ AWAKEN(master);
+
+ while (!master->canceled)
+ pthread_cond_wait(&master->cancel_cond, &master->mtx);
+ }
+
+ if (thread)
+ *thread = NULL;
+}
+/* ------------------------------------------------------------------------- */
+
+static struct timeval *thread_timer_wait(struct thread_timer_list_head *timers,
+ struct timeval *timer_val)
+{
+ if (!thread_timer_list_count(timers))
+ return NULL;
+
+ struct thread *next_timer = thread_timer_list_first(timers);
+ monotime_until(&next_timer->u.sands, timer_val);
+ return timer_val;
+}
+
+static struct thread *thread_run(struct thread_master *m, struct thread *thread,
+ struct thread *fetch)
+{
+ *fetch = *thread;
+ thread_add_unuse(m, thread);
+ return fetch;
+}
+
+static int thread_process_io_helper(struct thread_master *m,
+ struct thread *thread, short state,
+ short actual_state, int pos)
+{
+ struct thread **thread_array;
+
+ /*
+ * poll() clears the .events field, but the pollfd array we
+ * pass to poll() is a copy of the one used to schedule threads.
+ * We need to synchronize state between the two here by applying
+ * the same changes poll() made on the copy of the "real" pollfd
+ * array.
+ *
+ * This cleans up a possible infinite loop where we refuse
+ * to respond to a poll event but poll is insistent that
+ * we should.
+ */
+ m->handler.pfds[pos].events &= ~(state);
+
+ if (!thread) {
+ if ((actual_state & (POLLHUP|POLLIN)) != POLLHUP)
+ flog_err(EC_LIB_NO_THREAD,
+ "Attempting to process an I/O event but for fd: %d(%d) no thread to handle this!",
+ m->handler.pfds[pos].fd, actual_state);
+ return 0;
+ }
+
+ if (thread->type == THREAD_READ)
+ thread_array = m->read;
+ else
+ thread_array = m->write;
+
+ thread_array[thread->u.fd] = NULL;
+ thread_list_add_tail(&m->ready, thread);
+ thread->type = THREAD_READY;
+
+ return 1;
+}
+
+/**
+ * Process I/O events.
+ *
+ * Walks through file descriptor array looking for those pollfds whose .revents
+ * field has something interesting. Deletes any invalid file descriptors.
+ *
+ * @param m the thread master
+ * @param num the number of active file descriptors (return value of poll())
+ */
+static void thread_process_io(struct thread_master *m, unsigned int num)
+{
+ unsigned int ready = 0;
+ struct pollfd *pfds = m->handler.copy;
+
+ for (nfds_t i = 0; i < m->handler.copycount && ready < num; ++i) {
+ /* no event for current fd? immediately continue */
+ if (pfds[i].revents == 0)
+ continue;
+
+ ready++;
+
+ /*
+ * Unless someone has called thread_cancel from another
+ * pthread, the only thing that could have changed in
+ * m->handler.pfds while we were asleep is the .events
+ * field in a given pollfd. Barring thread_cancel() that
+ * value should be a superset of the values we have in our
+ * copy, so there's no need to update it. Similarily,
+ * barring deletion, the fd should still be a valid index
+ * into the master's pfds.
+ *
+ * We are including POLLERR here to do a READ event
+ * this is because the read should fail and the
+ * read function should handle it appropriately
+ */
+ if (pfds[i].revents & (POLLIN | POLLHUP | POLLERR)) {
+ thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN,
+ pfds[i].revents, i);
+ }
+ if (pfds[i].revents & POLLOUT)
+ thread_process_io_helper(m, m->write[pfds[i].fd],
+ POLLOUT, pfds[i].revents, i);
+
+ /* if one of our file descriptors is garbage, remove the same
+ * from
+ * both pfds + update sizes and index */
+ if (pfds[i].revents & POLLNVAL) {
+ memmove(m->handler.pfds + i, m->handler.pfds + i + 1,
+ (m->handler.pfdcount - i - 1)
+ * sizeof(struct pollfd));
+ m->handler.pfdcount--;
+ m->handler.pfds[m->handler.pfdcount].fd = 0;
+ m->handler.pfds[m->handler.pfdcount].events = 0;
+
+ memmove(pfds + i, pfds + i + 1,
+ (m->handler.copycount - i - 1)
+ * sizeof(struct pollfd));
+ m->handler.copycount--;
+ m->handler.copy[m->handler.copycount].fd = 0;
+ m->handler.copy[m->handler.copycount].events = 0;
+
+ i--;
+ }
+ }
+}
+
+/* Add all timers that have popped to the ready list. */
+static unsigned int thread_process_timers(struct thread_master *m,
+ struct timeval *timenow)
+{
+ struct timeval prev = *timenow;
+ bool displayed = false;
+ struct thread *thread;
+ unsigned int ready = 0;
+
+ while ((thread = thread_timer_list_first(&m->timer))) {
+ if (timercmp(timenow, &thread->u.sands, <))
+ break;
+ prev = thread->u.sands;
+ prev.tv_sec += 4;
+ /*
+ * If the timer would have popped 4 seconds in the
+ * past then we are in a situation where we are
+ * really getting behind on handling of events.
+ * Let's log it and do the right thing with it.
+ */
+ if (timercmp(timenow, &prev, >)) {
+ atomic_fetch_add_explicit(
+ &thread->hist->total_starv_warn, 1,
+ memory_order_seq_cst);
+ if (!displayed && !thread->ignore_timer_late) {
+ flog_warn(
+ EC_LIB_STARVE_THREAD,
+ "Thread Starvation: %pTHD was scheduled to pop greater than 4s ago",
+ thread);
+ displayed = true;
+ }
+ }
+
+ thread_timer_list_pop(&m->timer);
+ thread->type = THREAD_READY;
+ thread_list_add_tail(&m->ready, thread);
+ ready++;
+ }
+
+ return ready;
+}
+
+/* process a list en masse, e.g. for event thread lists */
+static unsigned int thread_process(struct thread_list_head *list)
+{
+ struct thread *thread;
+ unsigned int ready = 0;
+
+ while ((thread = thread_list_pop(list))) {
+ thread->type = THREAD_READY;
+ thread_list_add_tail(&thread->master->ready, thread);
+ ready++;
+ }
+ return ready;
+}
+
+
+/* Fetch next ready thread. */
+struct thread *thread_fetch(struct thread_master *m, struct thread *fetch)
+{
+ struct thread *thread = NULL;
+ struct timeval now;
+ struct timeval zerotime = {0, 0};
+ struct timeval tv;
+ struct timeval *tw = NULL;
+ bool eintr_p = false;
+ int num = 0;
+
+ do {
+ /* Handle signals if any */
+ if (m->handle_signals)
+ frr_sigevent_process();
+
+ pthread_mutex_lock(&m->mtx);
+
+ /* Process any pending cancellation requests */
+ do_thread_cancel(m);
+
+ /*
+ * Attempt to flush ready queue before going into poll().
+ * This is performance-critical. Think twice before modifying.
+ */
+ if ((thread = thread_list_pop(&m->ready))) {
+ fetch = thread_run(m, thread, fetch);
+ if (fetch->ref)
+ *fetch->ref = NULL;
+ pthread_mutex_unlock(&m->mtx);
+ if (!m->ready_run_loop)
+ GETRUSAGE(&m->last_getrusage);
+ m->ready_run_loop = true;
+ break;
+ }
+
+ m->ready_run_loop = false;
+ /* otherwise, tick through scheduling sequence */
+
+ /*
+ * Post events to ready queue. This must come before the
+ * following block since events should occur immediately
+ */
+ thread_process(&m->event);
+
+ /*
+ * If there are no tasks on the ready queue, we will poll()
+ * until a timer expires or we receive I/O, whichever comes
+ * first. The strategy for doing this is:
+ *
+ * - If there are events pending, set the poll() timeout to zero
+ * - If there are no events pending, but there are timers
+ * pending, set the timeout to the smallest remaining time on
+ * any timer.
+ * - If there are neither timers nor events pending, but there
+ * are file descriptors pending, block indefinitely in poll()
+ * - If nothing is pending, it's time for the application to die
+ *
+ * In every case except the last, we need to hit poll() at least
+ * once per loop to avoid starvation by events
+ */
+ if (!thread_list_count(&m->ready))
+ tw = thread_timer_wait(&m->timer, &tv);
+
+ if (thread_list_count(&m->ready) ||
+ (tw && !timercmp(tw, &zerotime, >)))
+ tw = &zerotime;
+
+ if (!tw && m->handler.pfdcount == 0) { /* die */
+ pthread_mutex_unlock(&m->mtx);
+ fetch = NULL;
+ break;
+ }
+
+ /*
+ * Copy pollfd array + # active pollfds in it. Not necessary to
+ * copy the array size as this is fixed.
+ */
+ m->handler.copycount = m->handler.pfdcount;
+ memcpy(m->handler.copy, m->handler.pfds,
+ m->handler.copycount * sizeof(struct pollfd));
+
+ pthread_mutex_unlock(&m->mtx);
+ {
+ eintr_p = false;
+ num = fd_poll(m, tw, &eintr_p);
+ }
+ pthread_mutex_lock(&m->mtx);
+
+ /* Handle any errors received in poll() */
+ if (num < 0) {
+ if (eintr_p) {
+ pthread_mutex_unlock(&m->mtx);
+ /* loop around to signal handler */
+ continue;
+ }
+
+ /* else die */
+ flog_err(EC_LIB_SYSTEM_CALL, "poll() error: %s",
+ safe_strerror(errno));
+ pthread_mutex_unlock(&m->mtx);
+ fetch = NULL;
+ break;
+ }
+
+ /* Post timers to ready queue. */
+ monotime(&now);
+ thread_process_timers(m, &now);
+
+ /* Post I/O to ready queue. */
+ if (num > 0)
+ thread_process_io(m, num);
+
+ pthread_mutex_unlock(&m->mtx);
+
+ } while (!thread && m->spin);
+
+ return fetch;
+}
+
+static unsigned long timeval_elapsed(struct timeval a, struct timeval b)
+{
+ return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
+ + (a.tv_usec - b.tv_usec));
+}
+
+unsigned long thread_consumed_time(RUSAGE_T *now, RUSAGE_T *start,
+ unsigned long *cputime)
+{
+#ifdef HAVE_CLOCK_THREAD_CPUTIME_ID
+
+#ifdef __FreeBSD__
+ /*
+ * FreeBSD appears to have an issue when calling clock_gettime
+ * with CLOCK_THREAD_CPUTIME_ID really close to each other
+ * occassionally the now time will be before the start time.
+ * This is not good and FRR is ending up with CPU HOG's
+ * when the subtraction wraps to very large numbers
+ *
+ * What we are going to do here is cheat a little bit
+ * and notice that this is a problem and just correct
+ * it so that it is impossible to happen
+ */
+ if (start->cpu.tv_sec == now->cpu.tv_sec &&
+ start->cpu.tv_nsec > now->cpu.tv_nsec)
+ now->cpu.tv_nsec = start->cpu.tv_nsec + 1;
+ else if (start->cpu.tv_sec > now->cpu.tv_sec) {
+ now->cpu.tv_sec = start->cpu.tv_sec;
+ now->cpu.tv_nsec = start->cpu.tv_nsec + 1;
+ }
+#endif
+ *cputime = (now->cpu.tv_sec - start->cpu.tv_sec) * TIMER_SECOND_MICRO
+ + (now->cpu.tv_nsec - start->cpu.tv_nsec) / 1000;
+#else
+ /* This is 'user + sys' time. */
+ *cputime = timeval_elapsed(now->cpu.ru_utime, start->cpu.ru_utime)
+ + timeval_elapsed(now->cpu.ru_stime, start->cpu.ru_stime);
+#endif
+ return timeval_elapsed(now->real, start->real);
+}
+
+/* We should aim to yield after yield milliseconds, which defaults
+ to THREAD_YIELD_TIME_SLOT .
+ Note: we are using real (wall clock) time for this calculation.
+ It could be argued that CPU time may make more sense in certain
+ contexts. The things to consider are whether the thread may have
+ blocked (in which case wall time increases, but CPU time does not),
+ or whether the system is heavily loaded with other processes competing
+ for CPU time. On balance, wall clock time seems to make sense.
+ Plus it has the added benefit that gettimeofday should be faster
+ than calling getrusage. */
+int thread_should_yield(struct thread *thread)
+{
+ int result;
+ frr_with_mutex (&thread->mtx) {
+ result = monotime_since(&thread->real, NULL)
+ > (int64_t)thread->yield;
+ }
+ return result;
+}
+
+void thread_set_yield_time(struct thread *thread, unsigned long yield_time)
+{
+ frr_with_mutex (&thread->mtx) {
+ thread->yield = yield_time;
+ }
+}
+
+void thread_getrusage(RUSAGE_T *r)
+{
+ monotime(&r->real);
+ if (!cputime_enabled) {
+ memset(&r->cpu, 0, sizeof(r->cpu));
+ return;
+ }
+
+#ifdef HAVE_CLOCK_THREAD_CPUTIME_ID
+ /* not currently implemented in Linux's vDSO, but maybe at some point
+ * in the future?
+ */
+ clock_gettime(CLOCK_THREAD_CPUTIME_ID, &r->cpu);
+#else /* !HAVE_CLOCK_THREAD_CPUTIME_ID */
+#if defined RUSAGE_THREAD
+#define FRR_RUSAGE RUSAGE_THREAD
+#else
+#define FRR_RUSAGE RUSAGE_SELF
+#endif
+ getrusage(FRR_RUSAGE, &(r->cpu));
+#endif
+}
+
+/*
+ * Call a thread.
+ *
+ * This function will atomically update the thread's usage history. At present
+ * this is the only spot where usage history is written. Nevertheless the code
+ * has been written such that the introduction of writers in the future should
+ * not need to update it provided the writers atomically perform only the
+ * operations done here, i.e. updating the total and maximum times. In
+ * particular, the maximum real and cpu times must be monotonically increasing
+ * or this code is not correct.
+ */
+void thread_call(struct thread *thread)
+{
+ RUSAGE_T before, after;
+
+ /* if the thread being called is the CLI, it may change cputime_enabled
+ * ("service cputime-stats" command), which can result in nonsensical
+ * and very confusing warnings
+ */
+ bool cputime_enabled_here = cputime_enabled;
+
+ if (thread->master->ready_run_loop)
+ before = thread->master->last_getrusage;
+ else
+ GETRUSAGE(&before);
+
+ thread->real = before.real;
+
+ frrtrace(9, frr_libfrr, thread_call, thread->master,
+ thread->xref->funcname, thread->xref->xref.file,
+ thread->xref->xref.line, NULL, thread->u.fd,
+ thread->u.val, thread->arg, thread->u.sands.tv_sec);
+
+ pthread_setspecific(thread_current, thread);
+ (*thread->func)(thread);
+ pthread_setspecific(thread_current, NULL);
+
+ GETRUSAGE(&after);
+ thread->master->last_getrusage = after;
+
+ unsigned long walltime, cputime;
+ unsigned long exp;
+
+ walltime = thread_consumed_time(&after, &before, &cputime);
+
+ /* update walltime */
+ atomic_fetch_add_explicit(&thread->hist->real.total, walltime,
+ memory_order_seq_cst);
+ exp = atomic_load_explicit(&thread->hist->real.max,
+ memory_order_seq_cst);
+ while (exp < walltime
+ && !atomic_compare_exchange_weak_explicit(
+ &thread->hist->real.max, &exp, walltime,
+ memory_order_seq_cst, memory_order_seq_cst))
+ ;
+
+ if (cputime_enabled_here && cputime_enabled) {
+ /* update cputime */
+ atomic_fetch_add_explicit(&thread->hist->cpu.total, cputime,
+ memory_order_seq_cst);
+ exp = atomic_load_explicit(&thread->hist->cpu.max,
+ memory_order_seq_cst);
+ while (exp < cputime
+ && !atomic_compare_exchange_weak_explicit(
+ &thread->hist->cpu.max, &exp, cputime,
+ memory_order_seq_cst, memory_order_seq_cst))
+ ;
+ }
+
+ atomic_fetch_add_explicit(&thread->hist->total_calls, 1,
+ memory_order_seq_cst);
+ atomic_fetch_or_explicit(&thread->hist->types, 1 << thread->add_type,
+ memory_order_seq_cst);
+
+ if (cputime_enabled_here && cputime_enabled && cputime_threshold
+ && cputime > cputime_threshold) {
+ /*
+ * We have a CPU Hog on our hands. The time FRR has spent
+ * doing actual work (not sleeping) is greater than 5 seconds.
+ * Whinge about it now, so we're aware this is yet another task
+ * to fix.
+ */
+ atomic_fetch_add_explicit(&thread->hist->total_cpu_warn,
+ 1, memory_order_seq_cst);
+ flog_warn(
+ EC_LIB_SLOW_THREAD_CPU,
+ "CPU HOG: task %s (%lx) ran for %lums (cpu time %lums)",
+ thread->xref->funcname, (unsigned long)thread->func,
+ walltime / 1000, cputime / 1000);
+
+ } else if (walltime_threshold && walltime > walltime_threshold) {
+ /*
+ * The runtime for a task is greater than 5 seconds, but the
+ * cpu time is under 5 seconds. Let's whine about this because
+ * this could imply some sort of scheduling issue.
+ */
+ atomic_fetch_add_explicit(&thread->hist->total_wall_warn,
+ 1, memory_order_seq_cst);
+ flog_warn(
+ EC_LIB_SLOW_THREAD_WALL,
+ "STARVATION: task %s (%lx) ran for %lums (cpu time %lums)",
+ thread->xref->funcname, (unsigned long)thread->func,
+ walltime / 1000, cputime / 1000);
+ }
+}
+
+/* Execute thread */
+void _thread_execute(const struct xref_threadsched *xref,
+ struct thread_master *m, void (*func)(struct thread *),
+ void *arg, int val)
+{
+ struct thread *thread;
+
+ /* Get or allocate new thread to execute. */
+ frr_with_mutex (&m->mtx) {
+ thread = thread_get(m, THREAD_EVENT, func, arg, xref);
+
+ /* Set its event value. */
+ frr_with_mutex (&thread->mtx) {
+ thread->add_type = THREAD_EXECUTE;
+ thread->u.val = val;
+ thread->ref = &thread;
+ }
+ }
+
+ /* Execute thread doing all accounting. */
+ thread_call(thread);
+
+ /* Give back or free thread. */
+ thread_add_unuse(m, thread);
+}
+
+/* Debug signal mask - if 'sigs' is NULL, use current effective mask. */
+void debug_signals(const sigset_t *sigs)
+{
+ int i, found;
+ sigset_t tmpsigs;
+ char buf[300];
+
+ /*
+ * We're only looking at the non-realtime signals here, so we need
+ * some limit value. Platform differences mean at some point we just
+ * need to pick a reasonable value.
+ */
+#if defined SIGRTMIN
+# define LAST_SIGNAL SIGRTMIN
+#else
+# define LAST_SIGNAL 32
+#endif
+
+
+ if (sigs == NULL) {
+ sigemptyset(&tmpsigs);
+ pthread_sigmask(SIG_BLOCK, NULL, &tmpsigs);
+ sigs = &tmpsigs;
+ }
+
+ found = 0;
+ buf[0] = '\0';
+
+ for (i = 0; i < LAST_SIGNAL; i++) {
+ char tmp[20];
+
+ if (sigismember(sigs, i) > 0) {
+ if (found > 0)
+ strlcat(buf, ",", sizeof(buf));
+ snprintf(tmp, sizeof(tmp), "%d", i);
+ strlcat(buf, tmp, sizeof(buf));
+ found++;
+ }
+ }
+
+ if (found == 0)
+ snprintf(buf, sizeof(buf), "<none>");
+
+ zlog_debug("%s: %s", __func__, buf);
+}
+
+static ssize_t printfrr_thread_dbg(struct fbuf *buf, struct printfrr_eargs *ea,
+ const struct thread *thread)
+{
+ static const char * const types[] = {
+ [THREAD_READ] = "read",
+ [THREAD_WRITE] = "write",
+ [THREAD_TIMER] = "timer",
+ [THREAD_EVENT] = "event",
+ [THREAD_READY] = "ready",
+ [THREAD_UNUSED] = "unused",
+ [THREAD_EXECUTE] = "exec",
+ };
+ ssize_t rv = 0;
+ char info[16] = "";
+
+ if (!thread)
+ return bputs(buf, "{(thread *)NULL}");
+
+ rv += bprintfrr(buf, "{(thread *)%p arg=%p", thread, thread->arg);
+
+ if (thread->type < array_size(types) && types[thread->type])
+ rv += bprintfrr(buf, " %-6s", types[thread->type]);
+ else
+ rv += bprintfrr(buf, " INVALID(%u)", thread->type);
+
+ switch (thread->type) {
+ case THREAD_READ:
+ case THREAD_WRITE:
+ snprintfrr(info, sizeof(info), "fd=%d", thread->u.fd);
+ break;
+
+ case THREAD_TIMER:
+ snprintfrr(info, sizeof(info), "r=%pTVMud", &thread->u.sands);
+ break;
+ }
+
+ rv += bprintfrr(buf, " %-12s %s() %s from %s:%d}", info,
+ thread->xref->funcname, thread->xref->dest,
+ thread->xref->xref.file, thread->xref->xref.line);
+ return rv;
+}
+
+printfrr_ext_autoreg_p("TH", printfrr_thread);
+static ssize_t printfrr_thread(struct fbuf *buf, struct printfrr_eargs *ea,
+ const void *ptr)
+{
+ const struct thread *thread = ptr;
+ struct timespec remain = {};
+
+ if (ea->fmt[0] == 'D') {
+ ea->fmt++;
+ return printfrr_thread_dbg(buf, ea, thread);
+ }
+
+ if (!thread) {
+ /* need to jump over time formatting flag characters in the
+ * input format string, i.e. adjust ea->fmt!
+ */
+ printfrr_time(buf, ea, &remain,
+ TIMEFMT_TIMER_DEADLINE | TIMEFMT_SKIP);
+ return bputch(buf, '-');
+ }
+
+ TIMEVAL_TO_TIMESPEC(&thread->u.sands, &remain);
+ return printfrr_time(buf, ea, &remain, TIMEFMT_TIMER_DEADLINE);
+}