summaryrefslogtreecommitdiffstats
path: root/lib/common/mainloop.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/common/mainloop.c')
-rw-r--r--lib/common/mainloop.c1480
1 files changed, 1480 insertions, 0 deletions
diff --git a/lib/common/mainloop.c b/lib/common/mainloop.c
new file mode 100644
index 0000000..3124e43
--- /dev/null
+++ b/lib/common/mainloop.c
@@ -0,0 +1,1480 @@
+/*
+ * Copyright 2004-2023 the Pacemaker project contributors
+ *
+ * The version control history for this file may have further details.
+ *
+ * This source code is licensed under the GNU Lesser General Public License
+ * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
+ */
+
+#include <crm_internal.h>
+
+#ifndef _GNU_SOURCE
+# define _GNU_SOURCE
+#endif
+
+#include <stdlib.h>
+#include <string.h>
+#include <signal.h>
+#include <errno.h>
+
+#include <sys/wait.h>
+
+#include <crm/crm.h>
+#include <crm/common/xml.h>
+#include <crm/common/mainloop.h>
+#include <crm/common/ipc_internal.h>
+
+#include <qb/qbarray.h>
+
+struct mainloop_child_s {
+ pid_t pid;
+ char *desc;
+ unsigned timerid;
+ gboolean timeout;
+ void *privatedata;
+
+ enum mainloop_child_flags flags;
+
+ /* Called when a process dies */
+ void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode);
+};
+
+struct trigger_s {
+ GSource source;
+ gboolean running;
+ gboolean trigger;
+ void *user_data;
+ guint id;
+
+};
+
+struct mainloop_timer_s {
+ guint id;
+ guint period_ms;
+ bool repeat;
+ char *name;
+ GSourceFunc cb;
+ void *userdata;
+};
+
+static gboolean
+crm_trigger_prepare(GSource * source, gint * timeout)
+{
+ crm_trigger_t *trig = (crm_trigger_t *) source;
+
+ /* cluster-glue's FD and IPC related sources make use of
+ * g_source_add_poll() but do not set a timeout in their prepare
+ * functions
+ *
+ * This means mainloop's poll() will block until an event for one
+ * of these sources occurs - any /other/ type of source, such as
+ * this one or g_idle_*, that doesn't use g_source_add_poll() is
+ * S-O-L and won't be processed until there is something fd-based
+ * happens.
+ *
+ * Luckily the timeout we can set here affects all sources and
+ * puts an upper limit on how long poll() can take.
+ *
+ * So unconditionally set a small-ish timeout, not too small that
+ * we're in constant motion, which will act as an upper bound on
+ * how long the signal handling might be delayed for.
+ */
+ *timeout = 500; /* Timeout in ms */
+
+ return trig->trigger;
+}
+
+static gboolean
+crm_trigger_check(GSource * source)
+{
+ crm_trigger_t *trig = (crm_trigger_t *) source;
+
+ return trig->trigger;
+}
+
+/*!
+ * \internal
+ * \brief GSource dispatch function for crm_trigger_t
+ *
+ * \param[in] source crm_trigger_t being dispatched
+ * \param[in] callback Callback passed at source creation
+ * \param[in,out] userdata User data passed at source creation
+ *
+ * \return G_SOURCE_REMOVE to remove source, G_SOURCE_CONTINUE to keep it
+ */
+static gboolean
+crm_trigger_dispatch(GSource *source, GSourceFunc callback, gpointer userdata)
+{
+ gboolean rc = G_SOURCE_CONTINUE;
+ crm_trigger_t *trig = (crm_trigger_t *) source;
+
+ if (trig->running) {
+ /* Wait until the existing job is complete before starting the next one */
+ return G_SOURCE_CONTINUE;
+ }
+ trig->trigger = FALSE;
+
+ if (callback) {
+ int callback_rc = callback(trig->user_data);
+
+ if (callback_rc < 0) {
+ crm_trace("Trigger handler %p not yet complete", trig);
+ trig->running = TRUE;
+ } else if (callback_rc == 0) {
+ rc = G_SOURCE_REMOVE;
+ }
+ }
+ return rc;
+}
+
+static void
+crm_trigger_finalize(GSource * source)
+{
+ crm_trace("Trigger %p destroyed", source);
+}
+
+static GSourceFuncs crm_trigger_funcs = {
+ crm_trigger_prepare,
+ crm_trigger_check,
+ crm_trigger_dispatch,
+ crm_trigger_finalize,
+};
+
+static crm_trigger_t *
+mainloop_setup_trigger(GSource * source, int priority, int (*dispatch) (gpointer user_data),
+ gpointer userdata)
+{
+ crm_trigger_t *trigger = NULL;
+
+ trigger = (crm_trigger_t *) source;
+
+ trigger->id = 0;
+ trigger->trigger = FALSE;
+ trigger->user_data = userdata;
+
+ if (dispatch) {
+ g_source_set_callback(source, dispatch, trigger, NULL);
+ }
+
+ g_source_set_priority(source, priority);
+ g_source_set_can_recurse(source, FALSE);
+
+ trigger->id = g_source_attach(source, NULL);
+ return trigger;
+}
+
+void
+mainloop_trigger_complete(crm_trigger_t * trig)
+{
+ crm_trace("Trigger handler %p complete", trig);
+ trig->running = FALSE;
+}
+
+/*!
+ * \brief Create a trigger to be used as a mainloop source
+ *
+ * \param[in] priority Relative priority of source (lower number is higher priority)
+ * \param[in] dispatch Trigger dispatch function (should return 0 to remove the
+ * trigger from the mainloop, -1 if the trigger should be
+ * kept but the job is still running and not complete, and
+ * 1 if the trigger should be kept and the job is complete)
+ * \param[in] userdata Pointer to pass to \p dispatch
+ *
+ * \return Newly allocated mainloop source for trigger
+ */
+crm_trigger_t *
+mainloop_add_trigger(int priority, int (*dispatch) (gpointer user_data),
+ gpointer userdata)
+{
+ GSource *source = NULL;
+
+ CRM_ASSERT(sizeof(crm_trigger_t) > sizeof(GSource));
+ source = g_source_new(&crm_trigger_funcs, sizeof(crm_trigger_t));
+ CRM_ASSERT(source != NULL);
+
+ return mainloop_setup_trigger(source, priority, dispatch, userdata);
+}
+
+void
+mainloop_set_trigger(crm_trigger_t * source)
+{
+ if(source) {
+ source->trigger = TRUE;
+ }
+}
+
+gboolean
+mainloop_destroy_trigger(crm_trigger_t * source)
+{
+ GSource *gs = NULL;
+
+ if(source == NULL) {
+ return TRUE;
+ }
+
+ gs = (GSource *)source;
+
+ g_source_destroy(gs); /* Remove from mainloop, ref_count-- */
+ g_source_unref(gs); /* The caller no longer carries a reference to source
+ *
+ * At this point the source should be free'd,
+ * unless we're currently processing said
+ * source, in which case mainloop holds an
+ * additional reference and it will be free'd
+ * once our processing completes
+ */
+ return TRUE;
+}
+
+// Define a custom glib source for signal handling
+
+// Data structure for custom glib source
+typedef struct signal_s {
+ crm_trigger_t trigger; // trigger that invoked source (must be first)
+ void (*handler) (int sig); // signal handler
+ int signal; // signal that was received
+} crm_signal_t;
+
+// Table to associate signal handlers with signal numbers
+static crm_signal_t *crm_signals[NSIG];
+
+/*!
+ * \internal
+ * \brief Dispatch an event from custom glib source for signals
+ *
+ * Given an signal event, clear the event trigger and call any registered
+ * signal handler.
+ *
+ * \param[in] source glib source that triggered this dispatch
+ * \param[in] callback (ignored)
+ * \param[in] userdata (ignored)
+ */
+static gboolean
+crm_signal_dispatch(GSource *source, GSourceFunc callback, gpointer userdata)
+{
+ crm_signal_t *sig = (crm_signal_t *) source;
+
+ if(sig->signal != SIGCHLD) {
+ crm_notice("Caught '%s' signal "CRM_XS" %d (%s handler)",
+ strsignal(sig->signal), sig->signal,
+ (sig->handler? "invoking" : "no"));
+ }
+
+ sig->trigger.trigger = FALSE;
+ if (sig->handler) {
+ sig->handler(sig->signal);
+ }
+ return TRUE;
+}
+
+/*!
+ * \internal
+ * \brief Handle a signal by setting a trigger for signal source
+ *
+ * \param[in] sig Signal number that was received
+ *
+ * \note This is the true signal handler for the mainloop signal source, and
+ * must be async-safe.
+ */
+static void
+mainloop_signal_handler(int sig)
+{
+ if (sig > 0 && sig < NSIG && crm_signals[sig] != NULL) {
+ mainloop_set_trigger((crm_trigger_t *) crm_signals[sig]);
+ }
+}
+
+// Functions implementing our custom glib source for signal handling
+static GSourceFuncs crm_signal_funcs = {
+ crm_trigger_prepare,
+ crm_trigger_check,
+ crm_signal_dispatch,
+ crm_trigger_finalize,
+};
+
+/*!
+ * \internal
+ * \brief Set a true signal handler
+ *
+ * signal()-like interface to sigaction()
+ *
+ * \param[in] sig Signal number to register handler for
+ * \param[in] dispatch Signal handler
+ *
+ * \return The previous value of the signal handler, or SIG_ERR on error
+ * \note The dispatch function must be async-safe.
+ */
+sighandler_t
+crm_signal_handler(int sig, sighandler_t dispatch)
+{
+ sigset_t mask;
+ struct sigaction sa;
+ struct sigaction old;
+
+ if (sigemptyset(&mask) < 0) {
+ crm_err("Could not set handler for signal %d: %s",
+ sig, pcmk_rc_str(errno));
+ return SIG_ERR;
+ }
+
+ memset(&sa, 0, sizeof(struct sigaction));
+ sa.sa_handler = dispatch;
+ sa.sa_flags = SA_RESTART;
+ sa.sa_mask = mask;
+
+ if (sigaction(sig, &sa, &old) < 0) {
+ crm_err("Could not set handler for signal %d: %s",
+ sig, pcmk_rc_str(errno));
+ return SIG_ERR;
+ }
+ return old.sa_handler;
+}
+
+static void
+mainloop_destroy_signal_entry(int sig)
+{
+ crm_signal_t *tmp = crm_signals[sig];
+
+ crm_signals[sig] = NULL;
+
+ crm_trace("Destroying signal %d", sig);
+ mainloop_destroy_trigger((crm_trigger_t *) tmp);
+}
+
+/*!
+ * \internal
+ * \brief Add a signal handler to a mainloop
+ *
+ * \param[in] sig Signal number to handle
+ * \param[in] dispatch Signal handler function
+ *
+ * \note The true signal handler merely sets a mainloop trigger to call this
+ * dispatch function via the mainloop. Therefore, the dispatch function
+ * does not need to be async-safe.
+ */
+gboolean
+mainloop_add_signal(int sig, void (*dispatch) (int sig))
+{
+ GSource *source = NULL;
+ int priority = G_PRIORITY_HIGH - 1;
+
+ if (sig == SIGTERM) {
+ /* TERM is higher priority than other signals,
+ * signals are higher priority than other ipc.
+ * Yes, minus: smaller is "higher"
+ */
+ priority--;
+ }
+
+ if (sig >= NSIG || sig < 0) {
+ crm_err("Signal %d is out of range", sig);
+ return FALSE;
+
+ } else if (crm_signals[sig] != NULL && crm_signals[sig]->handler == dispatch) {
+ crm_trace("Signal handler for %d is already installed", sig);
+ return TRUE;
+
+ } else if (crm_signals[sig] != NULL) {
+ crm_err("Different signal handler for %d is already installed", sig);
+ return FALSE;
+ }
+
+ CRM_ASSERT(sizeof(crm_signal_t) > sizeof(GSource));
+ source = g_source_new(&crm_signal_funcs, sizeof(crm_signal_t));
+
+ crm_signals[sig] = (crm_signal_t *) mainloop_setup_trigger(source, priority, NULL, NULL);
+ CRM_ASSERT(crm_signals[sig] != NULL);
+
+ crm_signals[sig]->handler = dispatch;
+ crm_signals[sig]->signal = sig;
+
+ if (crm_signal_handler(sig, mainloop_signal_handler) == SIG_ERR) {
+ mainloop_destroy_signal_entry(sig);
+ return FALSE;
+ }
+#if 0
+ /* If we want signals to interrupt mainloop's poll(), instead of waiting for
+ * the timeout, then we should call siginterrupt() below
+ *
+ * For now, just enforce a low timeout
+ */
+ if (siginterrupt(sig, 1) < 0) {
+ crm_perror(LOG_INFO, "Could not enable system call interruptions for signal %d", sig);
+ }
+#endif
+
+ return TRUE;
+}
+
+gboolean
+mainloop_destroy_signal(int sig)
+{
+ if (sig >= NSIG || sig < 0) {
+ crm_err("Signal %d is out of range", sig);
+ return FALSE;
+
+ } else if (crm_signal_handler(sig, NULL) == SIG_ERR) {
+ crm_perror(LOG_ERR, "Could not uninstall signal handler for signal %d", sig);
+ return FALSE;
+
+ } else if (crm_signals[sig] == NULL) {
+ return TRUE;
+ }
+ mainloop_destroy_signal_entry(sig);
+ return TRUE;
+}
+
+static qb_array_t *gio_map = NULL;
+
+void
+mainloop_cleanup(void)
+{
+ if (gio_map) {
+ qb_array_free(gio_map);
+ }
+
+ for (int sig = 0; sig < NSIG; ++sig) {
+ mainloop_destroy_signal_entry(sig);
+ }
+}
+
+/*
+ * libqb...
+ */
+struct gio_to_qb_poll {
+ int32_t is_used;
+ guint source;
+ int32_t events;
+ void *data;
+ qb_ipcs_dispatch_fn_t fn;
+ enum qb_loop_priority p;
+};
+
+static gboolean
+gio_read_socket(GIOChannel * gio, GIOCondition condition, gpointer data)
+{
+ struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
+ gint fd = g_io_channel_unix_get_fd(gio);
+
+ crm_trace("%p.%d %d", data, fd, condition);
+
+ /* if this assert get's hit, then there is a race condition between
+ * when we destroy a fd and when mainloop actually gives it up */
+ CRM_ASSERT(adaptor->is_used > 0);
+
+ return (adaptor->fn(fd, condition, adaptor->data) == 0);
+}
+
+static void
+gio_poll_destroy(gpointer data)
+{
+ struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
+
+ adaptor->is_used--;
+ CRM_ASSERT(adaptor->is_used >= 0);
+
+ if (adaptor->is_used == 0) {
+ crm_trace("Marking adaptor %p unused", adaptor);
+ adaptor->source = 0;
+ }
+}
+
+/*!
+ * \internal
+ * \brief Convert libqb's poll priority into GLib's one
+ *
+ * \param[in] prio libqb's poll priority (#QB_LOOP_MED assumed as fallback)
+ *
+ * \return best matching GLib's priority
+ */
+static gint
+conv_prio_libqb2glib(enum qb_loop_priority prio)
+{
+ switch (prio) {
+ case QB_LOOP_LOW: return G_PRIORITY_LOW;
+ case QB_LOOP_HIGH: return G_PRIORITY_HIGH;
+ default: return G_PRIORITY_DEFAULT; // QB_LOOP_MED
+ }
+}
+
+/*!
+ * \internal
+ * \brief Convert libqb's poll priority to rate limiting spec
+ *
+ * \param[in] prio libqb's poll priority (#QB_LOOP_MED assumed as fallback)
+ *
+ * \return best matching rate limiting spec
+ * \note This is the inverse of libqb's qb_ipcs_request_rate_limit().
+ */
+static enum qb_ipcs_rate_limit
+conv_libqb_prio2ratelimit(enum qb_loop_priority prio)
+{
+ switch (prio) {
+ case QB_LOOP_LOW: return QB_IPCS_RATE_SLOW;
+ case QB_LOOP_HIGH: return QB_IPCS_RATE_FAST;
+ default: return QB_IPCS_RATE_NORMAL; // QB_LOOP_MED
+ }
+}
+
+static int32_t
+gio_poll_dispatch_update(enum qb_loop_priority p, int32_t fd, int32_t evts,
+ void *data, qb_ipcs_dispatch_fn_t fn, int32_t add)
+{
+ struct gio_to_qb_poll *adaptor;
+ GIOChannel *channel;
+ int32_t res = 0;
+
+ res = qb_array_index(gio_map, fd, (void **)&adaptor);
+ if (res < 0) {
+ crm_err("Array lookup failed for fd=%d: %d", fd, res);
+ return res;
+ }
+
+ crm_trace("Adding fd=%d to mainloop as adaptor %p", fd, adaptor);
+
+ if (add && adaptor->source) {
+ crm_err("Adaptor for descriptor %d is still in-use", fd);
+ return -EEXIST;
+ }
+ if (!add && !adaptor->is_used) {
+ crm_err("Adaptor for descriptor %d is not in-use", fd);
+ return -ENOENT;
+ }
+
+ /* channel is created with ref_count = 1 */
+ channel = g_io_channel_unix_new(fd);
+ if (!channel) {
+ crm_err("No memory left to add fd=%d", fd);
+ return -ENOMEM;
+ }
+
+ if (adaptor->source) {
+ g_source_remove(adaptor->source);
+ adaptor->source = 0;
+ }
+
+ /* Because unlike the poll() API, glib doesn't tell us about HUPs by default */
+ evts |= (G_IO_HUP | G_IO_NVAL | G_IO_ERR);
+
+ adaptor->fn = fn;
+ adaptor->events = evts;
+ adaptor->data = data;
+ adaptor->p = p;
+ adaptor->is_used++;
+ adaptor->source =
+ g_io_add_watch_full(channel, conv_prio_libqb2glib(p), evts,
+ gio_read_socket, adaptor, gio_poll_destroy);
+
+ /* Now that mainloop now holds a reference to channel,
+ * thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new().
+ *
+ * This means that channel will be free'd by:
+ * g_main_context_dispatch()
+ * -> g_source_destroy_internal()
+ * -> g_source_callback_unref()
+ * shortly after gio_poll_destroy() completes
+ */
+ g_io_channel_unref(channel);
+
+ crm_trace("Added to mainloop with gsource id=%d", adaptor->source);
+ if (adaptor->source > 0) {
+ return 0;
+ }
+
+ return -EINVAL;
+}
+
+static int32_t
+gio_poll_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts,
+ void *data, qb_ipcs_dispatch_fn_t fn)
+{
+ return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_TRUE);
+}
+
+static int32_t
+gio_poll_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts,
+ void *data, qb_ipcs_dispatch_fn_t fn)
+{
+ return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_FALSE);
+}
+
+static int32_t
+gio_poll_dispatch_del(int32_t fd)
+{
+ struct gio_to_qb_poll *adaptor;
+
+ crm_trace("Looking for fd=%d", fd);
+ if (qb_array_index(gio_map, fd, (void **)&adaptor) == 0) {
+ if (adaptor->source) {
+ g_source_remove(adaptor->source);
+ adaptor->source = 0;
+ }
+ }
+ return 0;
+}
+
+struct qb_ipcs_poll_handlers gio_poll_funcs = {
+ .job_add = NULL,
+ .dispatch_add = gio_poll_dispatch_add,
+ .dispatch_mod = gio_poll_dispatch_mod,
+ .dispatch_del = gio_poll_dispatch_del,
+};
+
+static enum qb_ipc_type
+pick_ipc_type(enum qb_ipc_type requested)
+{
+ const char *env = getenv("PCMK_ipc_type");
+
+ if (env && strcmp("shared-mem", env) == 0) {
+ return QB_IPC_SHM;
+ } else if (env && strcmp("socket", env) == 0) {
+ return QB_IPC_SOCKET;
+ } else if (env && strcmp("posix", env) == 0) {
+ return QB_IPC_POSIX_MQ;
+ } else if (env && strcmp("sysv", env) == 0) {
+ return QB_IPC_SYSV_MQ;
+ } else if (requested == QB_IPC_NATIVE) {
+ /* We prefer shared memory because the server never blocks on
+ * send. If part of a message fits into the socket, libqb
+ * needs to block until the remainder can be sent also.
+ * Otherwise the client will wait forever for the remaining
+ * bytes.
+ */
+ return QB_IPC_SHM;
+ }
+ return requested;
+}
+
+qb_ipcs_service_t *
+mainloop_add_ipc_server(const char *name, enum qb_ipc_type type,
+ struct qb_ipcs_service_handlers *callbacks)
+{
+ return mainloop_add_ipc_server_with_prio(name, type, callbacks, QB_LOOP_MED);
+}
+
+qb_ipcs_service_t *
+mainloop_add_ipc_server_with_prio(const char *name, enum qb_ipc_type type,
+ struct qb_ipcs_service_handlers *callbacks,
+ enum qb_loop_priority prio)
+{
+ int rc = 0;
+ qb_ipcs_service_t *server = NULL;
+
+ if (gio_map == NULL) {
+ gio_map = qb_array_create_2(64, sizeof(struct gio_to_qb_poll), 1);
+ }
+
+ server = qb_ipcs_create(name, 0, pick_ipc_type(type), callbacks);
+
+ if (server == NULL) {
+ crm_err("Could not create %s IPC server: %s (%d)", name, pcmk_strerror(rc), rc);
+ return NULL;
+ }
+
+ if (prio != QB_LOOP_MED) {
+ qb_ipcs_request_rate_limit(server, conv_libqb_prio2ratelimit(prio));
+ }
+
+ /* All clients should use at least ipc_buffer_max as their buffer size */
+ qb_ipcs_enforce_buffer_size(server, crm_ipc_default_buffer_size());
+ qb_ipcs_poll_handlers_set(server, &gio_poll_funcs);
+
+ rc = qb_ipcs_run(server);
+ if (rc < 0) {
+ crm_err("Could not start %s IPC server: %s (%d)", name, pcmk_strerror(rc), rc);
+ return NULL; // qb_ipcs_run() destroys server on failure
+ }
+
+ return server;
+}
+
+void
+mainloop_del_ipc_server(qb_ipcs_service_t * server)
+{
+ if (server) {
+ qb_ipcs_destroy(server);
+ }
+}
+
+struct mainloop_io_s {
+ char *name;
+ void *userdata;
+
+ int fd;
+ guint source;
+ crm_ipc_t *ipc;
+ GIOChannel *channel;
+
+ int (*dispatch_fn_ipc) (const char *buffer, ssize_t length, gpointer userdata);
+ int (*dispatch_fn_io) (gpointer userdata);
+ void (*destroy_fn) (gpointer userdata);
+
+};
+
+/*!
+ * \internal
+ * \brief I/O watch callback function (GIOFunc)
+ *
+ * \param[in] gio I/O channel being watched
+ * \param[in] condition I/O condition satisfied
+ * \param[in] data User data passed when source was created
+ *
+ * \return G_SOURCE_REMOVE to remove source, G_SOURCE_CONTINUE to keep it
+ */
+static gboolean
+mainloop_gio_callback(GIOChannel *gio, GIOCondition condition, gpointer data)
+{
+ gboolean rc = G_SOURCE_CONTINUE;
+ mainloop_io_t *client = data;
+
+ CRM_ASSERT(client->fd == g_io_channel_unix_get_fd(gio));
+
+ if (condition & G_IO_IN) {
+ if (client->ipc) {
+ long read_rc = 0L;
+ int max = 10;
+
+ do {
+ read_rc = crm_ipc_read(client->ipc);
+ if (read_rc <= 0) {
+ crm_trace("Could not read IPC message from %s: %s (%ld)",
+ client->name, pcmk_strerror(read_rc), read_rc);
+
+ } else if (client->dispatch_fn_ipc) {
+ const char *buffer = crm_ipc_buffer(client->ipc);
+
+ crm_trace("New %ld-byte IPC message from %s "
+ "after I/O condition %d",
+ read_rc, client->name, (int) condition);
+ if (client->dispatch_fn_ipc(buffer, read_rc, client->userdata) < 0) {
+ crm_trace("Connection to %s no longer required", client->name);
+ rc = G_SOURCE_REMOVE;
+ }
+ }
+
+ } while ((rc == G_SOURCE_CONTINUE) && (read_rc > 0) && --max > 0);
+
+ } else {
+ crm_trace("New I/O event for %s after I/O condition %d",
+ client->name, (int) condition);
+ if (client->dispatch_fn_io) {
+ if (client->dispatch_fn_io(client->userdata) < 0) {
+ crm_trace("Connection to %s no longer required", client->name);
+ rc = G_SOURCE_REMOVE;
+ }
+ }
+ }
+ }
+
+ if (client->ipc && !crm_ipc_connected(client->ipc)) {
+ crm_err("Connection to %s closed " CRM_XS "client=%p condition=%d",
+ client->name, client, condition);
+ rc = G_SOURCE_REMOVE;
+
+ } else if (condition & (G_IO_HUP | G_IO_NVAL | G_IO_ERR)) {
+ crm_trace("The connection %s[%p] has been closed (I/O condition=%d)",
+ client->name, client, condition);
+ rc = G_SOURCE_REMOVE;
+
+ } else if ((condition & G_IO_IN) == 0) {
+ /*
+ #define GLIB_SYSDEF_POLLIN =1
+ #define GLIB_SYSDEF_POLLPRI =2
+ #define GLIB_SYSDEF_POLLOUT =4
+ #define GLIB_SYSDEF_POLLERR =8
+ #define GLIB_SYSDEF_POLLHUP =16
+ #define GLIB_SYSDEF_POLLNVAL =32
+
+ typedef enum
+ {
+ G_IO_IN GLIB_SYSDEF_POLLIN,
+ G_IO_OUT GLIB_SYSDEF_POLLOUT,
+ G_IO_PRI GLIB_SYSDEF_POLLPRI,
+ G_IO_ERR GLIB_SYSDEF_POLLERR,
+ G_IO_HUP GLIB_SYSDEF_POLLHUP,
+ G_IO_NVAL GLIB_SYSDEF_POLLNVAL
+ } GIOCondition;
+
+ A bitwise combination representing a condition to watch for on an event source.
+
+ G_IO_IN There is data to read.
+ G_IO_OUT Data can be written (without blocking).
+ G_IO_PRI There is urgent data to read.
+ G_IO_ERR Error condition.
+ G_IO_HUP Hung up (the connection has been broken, usually for pipes and sockets).
+ G_IO_NVAL Invalid request. The file descriptor is not open.
+ */
+ crm_err("Strange condition: %d", condition);
+ }
+
+ /* G_SOURCE_REMOVE results in mainloop_gio_destroy() being called
+ * just before the source is removed from mainloop
+ */
+ return rc;
+}
+
+static void
+mainloop_gio_destroy(gpointer c)
+{
+ mainloop_io_t *client = c;
+ char *c_name = strdup(client->name);
+
+ /* client->source is valid but about to be destroyed (ref_count == 0) in gmain.c
+ * client->channel will still have ref_count > 0... should be == 1
+ */
+ crm_trace("Destroying client %s[%p]", c_name, c);
+
+ if (client->ipc) {
+ crm_ipc_close(client->ipc);
+ }
+
+ if (client->destroy_fn) {
+ void (*destroy_fn) (gpointer userdata) = client->destroy_fn;
+
+ client->destroy_fn = NULL;
+ destroy_fn(client->userdata);
+ }
+
+ if (client->ipc) {
+ crm_ipc_t *ipc = client->ipc;
+
+ client->ipc = NULL;
+ crm_ipc_destroy(ipc);
+ }
+
+ crm_trace("Destroyed client %s[%p]", c_name, c);
+
+ free(client->name); client->name = NULL;
+ free(client);
+
+ free(c_name);
+}
+
+/*!
+ * \brief Connect to IPC and add it as a main loop source
+ *
+ * \param[in,out] ipc IPC connection to add
+ * \param[in] priority Event source priority to use for connection
+ * \param[in] userdata Data to register with callbacks
+ * \param[in] callbacks Dispatch and destroy callbacks for connection
+ * \param[out] source Newly allocated event source
+ *
+ * \return Standard Pacemaker return code
+ *
+ * \note On failure, the caller is still responsible for ipc. On success, the
+ * caller should call mainloop_del_ipc_client() when source is no longer
+ * needed, which will lead to the disconnection of the IPC later in the
+ * main loop if it is connected. However the IPC disconnects,
+ * mainloop_gio_destroy() will free ipc and source after calling the
+ * destroy callback.
+ */
+int
+pcmk__add_mainloop_ipc(crm_ipc_t *ipc, int priority, void *userdata,
+ const struct ipc_client_callbacks *callbacks,
+ mainloop_io_t **source)
+{
+ CRM_CHECK((ipc != NULL) && (callbacks != NULL), return EINVAL);
+
+ if (!crm_ipc_connect(ipc)) {
+ int rc = errno;
+ crm_debug("Connection to %s failed: %d", crm_ipc_name(ipc), errno);
+ return rc;
+ }
+ *source = mainloop_add_fd(crm_ipc_name(ipc), priority, crm_ipc_get_fd(ipc),
+ userdata, NULL);
+ if (*source == NULL) {
+ int rc = errno;
+
+ crm_ipc_close(ipc);
+ return rc;
+ }
+ (*source)->ipc = ipc;
+ (*source)->destroy_fn = callbacks->destroy;
+ (*source)->dispatch_fn_ipc = callbacks->dispatch;
+ return pcmk_rc_ok;
+}
+
+/*!
+ * \brief Get period for mainloop timer
+ *
+ * \param[in] timer Timer
+ *
+ * \return Period in ms
+ */
+guint
+pcmk__mainloop_timer_get_period(const mainloop_timer_t *timer)
+{
+ if (timer) {
+ return timer->period_ms;
+ }
+ return 0;
+}
+
+mainloop_io_t *
+mainloop_add_ipc_client(const char *name, int priority, size_t max_size,
+ void *userdata, struct ipc_client_callbacks *callbacks)
+{
+ crm_ipc_t *ipc = crm_ipc_new(name, max_size);
+ mainloop_io_t *source = NULL;
+ int rc = pcmk__add_mainloop_ipc(ipc, priority, userdata, callbacks,
+ &source);
+
+ if (rc != pcmk_rc_ok) {
+ if (crm_log_level == LOG_STDOUT) {
+ fprintf(stderr, "Connection to %s failed: %s",
+ name, pcmk_rc_str(rc));
+ }
+ crm_ipc_destroy(ipc);
+ if (rc > 0) {
+ errno = rc;
+ } else {
+ errno = ENOTCONN;
+ }
+ return NULL;
+ }
+ return source;
+}
+
+void
+mainloop_del_ipc_client(mainloop_io_t * client)
+{
+ mainloop_del_fd(client);
+}
+
+crm_ipc_t *
+mainloop_get_ipc_client(mainloop_io_t * client)
+{
+ if (client) {
+ return client->ipc;
+ }
+ return NULL;
+}
+
+mainloop_io_t *
+mainloop_add_fd(const char *name, int priority, int fd, void *userdata,
+ struct mainloop_fd_callbacks * callbacks)
+{
+ mainloop_io_t *client = NULL;
+
+ if (fd >= 0) {
+ client = calloc(1, sizeof(mainloop_io_t));
+ if (client == NULL) {
+ return NULL;
+ }
+ client->name = strdup(name);
+ client->userdata = userdata;
+
+ if (callbacks) {
+ client->destroy_fn = callbacks->destroy;
+ client->dispatch_fn_io = callbacks->dispatch;
+ }
+
+ client->fd = fd;
+ client->channel = g_io_channel_unix_new(fd);
+ client->source =
+ g_io_add_watch_full(client->channel, priority,
+ (G_IO_IN | G_IO_HUP | G_IO_NVAL | G_IO_ERR), mainloop_gio_callback,
+ client, mainloop_gio_destroy);
+
+ /* Now that mainloop now holds a reference to channel,
+ * thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new().
+ *
+ * This means that channel will be free'd by:
+ * g_main_context_dispatch() or g_source_remove()
+ * -> g_source_destroy_internal()
+ * -> g_source_callback_unref()
+ * shortly after mainloop_gio_destroy() completes
+ */
+ g_io_channel_unref(client->channel);
+ crm_trace("Added connection %d for %s[%p].%d", client->source, client->name, client, fd);
+ } else {
+ errno = EINVAL;
+ }
+
+ return client;
+}
+
+void
+mainloop_del_fd(mainloop_io_t * client)
+{
+ if (client != NULL) {
+ crm_trace("Removing client %s[%p]", client->name, client);
+ if (client->source) {
+ /* Results in mainloop_gio_destroy() being called just
+ * before the source is removed from mainloop
+ */
+ g_source_remove(client->source);
+ }
+ }
+}
+
+static GList *child_list = NULL;
+
+pid_t
+mainloop_child_pid(mainloop_child_t * child)
+{
+ return child->pid;
+}
+
+const char *
+mainloop_child_name(mainloop_child_t * child)
+{
+ return child->desc;
+}
+
+int
+mainloop_child_timeout(mainloop_child_t * child)
+{
+ return child->timeout;
+}
+
+void *
+mainloop_child_userdata(mainloop_child_t * child)
+{
+ return child->privatedata;
+}
+
+void
+mainloop_clear_child_userdata(mainloop_child_t * child)
+{
+ child->privatedata = NULL;
+}
+
+/* good function name */
+static void
+child_free(mainloop_child_t *child)
+{
+ if (child->timerid != 0) {
+ crm_trace("Removing timer %d", child->timerid);
+ g_source_remove(child->timerid);
+ child->timerid = 0;
+ }
+ free(child->desc);
+ free(child);
+}
+
+/* terrible function name */
+static int
+child_kill_helper(mainloop_child_t *child)
+{
+ int rc;
+ if (child->flags & mainloop_leave_pid_group) {
+ crm_debug("Kill pid %d only. leave group intact.", child->pid);
+ rc = kill(child->pid, SIGKILL);
+ } else {
+ crm_debug("Kill pid %d's group", child->pid);
+ rc = kill(-child->pid, SIGKILL);
+ }
+
+ if (rc < 0) {
+ if (errno != ESRCH) {
+ crm_perror(LOG_ERR, "kill(%d, KILL) failed", child->pid);
+ }
+ return -errno;
+ }
+ return 0;
+}
+
+static gboolean
+child_timeout_callback(gpointer p)
+{
+ mainloop_child_t *child = p;
+ int rc = 0;
+
+ child->timerid = 0;
+ if (child->timeout) {
+ crm_warn("%s process (PID %d) will not die!", child->desc, (int)child->pid);
+ return FALSE;
+ }
+
+ rc = child_kill_helper(child);
+ if (rc == -ESRCH) {
+ /* Nothing left to do. pid doesn't exist */
+ return FALSE;
+ }
+
+ child->timeout = TRUE;
+ crm_debug("%s process (PID %d) timed out", child->desc, (int)child->pid);
+
+ child->timerid = g_timeout_add(5000, child_timeout_callback, child);
+ return FALSE;
+}
+
+static bool
+child_waitpid(mainloop_child_t *child, int flags)
+{
+ int rc = 0;
+ int core = 0;
+ int signo = 0;
+ int status = 0;
+ int exitcode = 0;
+ bool callback_needed = true;
+
+ rc = waitpid(child->pid, &status, flags);
+ if (rc == 0) { // WNOHANG in flags, and child status is not available
+ crm_trace("Child process %d (%s) still active",
+ child->pid, child->desc);
+ callback_needed = false;
+
+ } else if (rc != child->pid) {
+ /* According to POSIX, possible conditions:
+ * - child->pid was non-positive (process group or any child),
+ * and rc is specific child
+ * - errno ECHILD (pid does not exist or is not child)
+ * - errno EINVAL (invalid flags)
+ * - errno EINTR (caller interrupted by signal)
+ *
+ * @TODO Handle these cases more specifically.
+ */
+ signo = SIGCHLD;
+ exitcode = 1;
+ crm_notice("Wait for child process %d (%s) interrupted: %s",
+ child->pid, child->desc, pcmk_rc_str(errno));
+
+ } else if (WIFEXITED(status)) {
+ exitcode = WEXITSTATUS(status);
+ crm_trace("Child process %d (%s) exited with status %d",
+ child->pid, child->desc, exitcode);
+
+ } else if (WIFSIGNALED(status)) {
+ signo = WTERMSIG(status);
+ crm_trace("Child process %d (%s) exited with signal %d (%s)",
+ child->pid, child->desc, signo, strsignal(signo));
+
+#ifdef WCOREDUMP // AIX, SunOS, maybe others
+ } else if (WCOREDUMP(status)) {
+ core = 1;
+ crm_err("Child process %d (%s) dumped core",
+ child->pid, child->desc);
+#endif
+
+ } else { // flags must contain WUNTRACED and/or WCONTINUED to reach this
+ crm_trace("Child process %d (%s) stopped or continued",
+ child->pid, child->desc);
+ callback_needed = false;
+ }
+
+ if (callback_needed && child->callback) {
+ child->callback(child, child->pid, core, signo, exitcode);
+ }
+ return callback_needed;
+}
+
+static void
+child_death_dispatch(int signal)
+{
+ for (GList *iter = child_list; iter; ) {
+ GList *saved = iter;
+ mainloop_child_t *child = iter->data;
+
+ iter = iter->next;
+ if (child_waitpid(child, WNOHANG)) {
+ crm_trace("Removing completed process %d from child list",
+ child->pid);
+ child_list = g_list_remove_link(child_list, saved);
+ g_list_free(saved);
+ child_free(child);
+ }
+ }
+}
+
+static gboolean
+child_signal_init(gpointer p)
+{
+ crm_trace("Installed SIGCHLD handler");
+ /* Do NOT use g_child_watch_add() and friends, they rely on pthreads */
+ mainloop_add_signal(SIGCHLD, child_death_dispatch);
+
+ /* In case they terminated before the signal handler was installed */
+ child_death_dispatch(SIGCHLD);
+ return FALSE;
+}
+
+gboolean
+mainloop_child_kill(pid_t pid)
+{
+ GList *iter;
+ mainloop_child_t *child = NULL;
+ mainloop_child_t *match = NULL;
+ /* It is impossible to block SIGKILL, this allows us to
+ * call waitpid without WNOHANG flag.*/
+ int waitflags = 0, rc = 0;
+
+ for (iter = child_list; iter != NULL && match == NULL; iter = iter->next) {
+ child = iter->data;
+ if (pid == child->pid) {
+ match = child;
+ }
+ }
+
+ if (match == NULL) {
+ return FALSE;
+ }
+
+ rc = child_kill_helper(match);
+ if(rc == -ESRCH) {
+ /* It's gone, but hasn't shown up in waitpid() yet. Wait until we get
+ * SIGCHLD and let handler clean it up as normal (so we get the correct
+ * return code/status). The blocking alternative would be to call
+ * child_waitpid(match, 0).
+ */
+ crm_trace("Waiting for signal that child process %d completed",
+ match->pid);
+ return TRUE;
+
+ } else if(rc != 0) {
+ /* If KILL for some other reason set the WNOHANG flag since we
+ * can't be certain what happened.
+ */
+ waitflags = WNOHANG;
+ }
+
+ if (!child_waitpid(match, waitflags)) {
+ /* not much we can do if this occurs */
+ return FALSE;
+ }
+
+ child_list = g_list_remove(child_list, match);
+ child_free(match);
+ return TRUE;
+}
+
+/* Create/Log a new tracked process
+ * To track a process group, use -pid
+ *
+ * @TODO Using a non-positive pid (i.e. any child, or process group) would
+ * likely not be useful since we will free the child after the first
+ * completed process.
+ */
+void
+mainloop_child_add_with_flags(pid_t pid, int timeout, const char *desc, void *privatedata, enum mainloop_child_flags flags,
+ void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode))
+{
+ static bool need_init = TRUE;
+ mainloop_child_t *child = calloc(1, sizeof(mainloop_child_t));
+
+ child->pid = pid;
+ child->timerid = 0;
+ child->timeout = FALSE;
+ child->privatedata = privatedata;
+ child->callback = callback;
+ child->flags = flags;
+ pcmk__str_update(&child->desc, desc);
+
+ if (timeout) {
+ child->timerid = g_timeout_add(timeout, child_timeout_callback, child);
+ }
+
+ child_list = g_list_append(child_list, child);
+
+ if(need_init) {
+ need_init = FALSE;
+ /* SIGCHLD processing has to be invoked from mainloop.
+ * We do not want it to be possible to both add a child pid
+ * to mainloop, and have the pid's exit callback invoked within
+ * the same callstack. */
+ g_timeout_add(1, child_signal_init, NULL);
+ }
+}
+
+void
+mainloop_child_add(pid_t pid, int timeout, const char *desc, void *privatedata,
+ void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode))
+{
+ mainloop_child_add_with_flags(pid, timeout, desc, privatedata, 0, callback);
+}
+
+static gboolean
+mainloop_timer_cb(gpointer user_data)
+{
+ int id = 0;
+ bool repeat = FALSE;
+ struct mainloop_timer_s *t = user_data;
+
+ CRM_ASSERT(t != NULL);
+
+ id = t->id;
+ t->id = 0; /* Ensure it's unset during callbacks so that
+ * mainloop_timer_running() works as expected
+ */
+
+ if(t->cb) {
+ crm_trace("Invoking callbacks for timer %s", t->name);
+ repeat = t->repeat;
+ if(t->cb(t->userdata) == FALSE) {
+ crm_trace("Timer %s complete", t->name);
+ repeat = FALSE;
+ }
+ }
+
+ if(repeat) {
+ /* Restore if repeating */
+ t->id = id;
+ }
+
+ return repeat;
+}
+
+bool
+mainloop_timer_running(mainloop_timer_t *t)
+{
+ if(t && t->id != 0) {
+ return TRUE;
+ }
+ return FALSE;
+}
+
+void
+mainloop_timer_start(mainloop_timer_t *t)
+{
+ mainloop_timer_stop(t);
+ if(t && t->period_ms > 0) {
+ crm_trace("Starting timer %s", t->name);
+ t->id = g_timeout_add(t->period_ms, mainloop_timer_cb, t);
+ }
+}
+
+void
+mainloop_timer_stop(mainloop_timer_t *t)
+{
+ if(t && t->id != 0) {
+ crm_trace("Stopping timer %s", t->name);
+ g_source_remove(t->id);
+ t->id = 0;
+ }
+}
+
+guint
+mainloop_timer_set_period(mainloop_timer_t *t, guint period_ms)
+{
+ guint last = 0;
+
+ if(t) {
+ last = t->period_ms;
+ t->period_ms = period_ms;
+ }
+
+ if(t && t->id != 0 && last != t->period_ms) {
+ mainloop_timer_start(t);
+ }
+ return last;
+}
+
+mainloop_timer_t *
+mainloop_timer_add(const char *name, guint period_ms, bool repeat, GSourceFunc cb, void *userdata)
+{
+ mainloop_timer_t *t = calloc(1, sizeof(mainloop_timer_t));
+
+ if(t) {
+ if(name) {
+ t->name = crm_strdup_printf("%s-%u-%d", name, period_ms, repeat);
+ } else {
+ t->name = crm_strdup_printf("%p-%u-%d", t, period_ms, repeat);
+ }
+ t->id = 0;
+ t->period_ms = period_ms;
+ t->repeat = repeat;
+ t->cb = cb;
+ t->userdata = userdata;
+ crm_trace("Created timer %s with %p %p", t->name, userdata, t->userdata);
+ }
+ return t;
+}
+
+void
+mainloop_timer_del(mainloop_timer_t *t)
+{
+ if(t) {
+ crm_trace("Destroying timer %s", t->name);
+ mainloop_timer_stop(t);
+ free(t->name);
+ free(t);
+ }
+}
+
+/*
+ * Helpers to make sure certain events aren't lost at shutdown
+ */
+
+static gboolean
+drain_timeout_cb(gpointer user_data)
+{
+ bool *timeout_popped = (bool*) user_data;
+
+ *timeout_popped = TRUE;
+ return FALSE;
+}
+
+/*!
+ * \brief Drain some remaining main loop events then quit it
+ *
+ * \param[in,out] mloop Main loop to drain and quit
+ * \param[in] n Drain up to this many pending events
+ */
+void
+pcmk_quit_main_loop(GMainLoop *mloop, unsigned int n)
+{
+ if ((mloop != NULL) && g_main_loop_is_running(mloop)) {
+ GMainContext *ctx = g_main_loop_get_context(mloop);
+
+ /* Drain up to n events in case some memory clean-up is pending
+ * (helpful to reduce noise in valgrind output).
+ */
+ for (int i = 0; (i < n) && g_main_context_pending(ctx); ++i) {
+ g_main_context_dispatch(ctx);
+ }
+ g_main_loop_quit(mloop);
+ }
+}
+
+/*!
+ * \brief Process main loop events while a certain condition is met
+ *
+ * \param[in,out] mloop Main loop to process
+ * \param[in] timer_ms Don't process longer than this amount of time
+ * \param[in] check Function that returns true if events should be
+ * processed
+ *
+ * \note This function is intended to be called at shutdown if certain important
+ * events should not be missed. The caller would likely quit the main loop
+ * or exit after calling this function. The check() function will be
+ * passed the remaining timeout in milliseconds.
+ */
+void
+pcmk_drain_main_loop(GMainLoop *mloop, guint timer_ms, bool (*check)(guint))
+{
+ bool timeout_popped = FALSE;
+ guint timer = 0;
+ GMainContext *ctx = NULL;
+
+ CRM_CHECK(mloop && check, return);
+
+ ctx = g_main_loop_get_context(mloop);
+ if (ctx) {
+ time_t start_time = time(NULL);
+
+ timer = g_timeout_add(timer_ms, drain_timeout_cb, &timeout_popped);
+ while (!timeout_popped
+ && check(timer_ms - (time(NULL) - start_time) * 1000)) {
+ g_main_context_iteration(ctx, TRUE);
+ }
+ }
+ if (!timeout_popped && (timer > 0)) {
+ g_source_remove(timer);
+ }
+}
+
+// Deprecated functions kept only for backward API compatibility
+// LCOV_EXCL_START
+
+#include <crm/common/mainloop_compat.h>
+
+gboolean
+crm_signal(int sig, void (*dispatch) (int sig))
+{
+ return crm_signal_handler(sig, dispatch) != SIG_ERR;
+}
+
+// LCOV_EXCL_STOP
+// End deprecated API