summaryrefslogtreecommitdiffstats
path: root/daemons/attrd/attrd_sync.c
diff options
context:
space:
mode:
Diffstat (limited to 'daemons/attrd/attrd_sync.c')
-rw-r--r--daemons/attrd/attrd_sync.c577
1 files changed, 577 insertions, 0 deletions
diff --git a/daemons/attrd/attrd_sync.c b/daemons/attrd/attrd_sync.c
new file mode 100644
index 0000000..d59ddd5
--- /dev/null
+++ b/daemons/attrd/attrd_sync.c
@@ -0,0 +1,577 @@
+/*
+ * Copyright 2022-2023 the Pacemaker project contributors
+ *
+ * The version control history for this file may have further details.
+ *
+ * This source code is licensed under the GNU General Public License version 2
+ * or later (GPLv2+) WITHOUT ANY WARRANTY.
+ */
+
+#include <crm_internal.h>
+
+#include <crm/msg_xml.h>
+#include <crm/common/attrd_internal.h>
+
+#include "pacemaker-attrd.h"
+
+/* A hash table storing clients that are waiting on a sync point to be reached.
+ * The key is waitlist_client - just a plain int. The obvious key would be
+ * the IPC client's ID, but this is not guaranteed to be unique. A single client
+ * could be waiting on a sync point for multiple attributes at the same time.
+ *
+ * It is not expected that this hash table will ever be especially large.
+ */
+static GHashTable *waitlist = NULL;
+static int waitlist_client = 0;
+
+struct waitlist_node {
+ /* What kind of sync point does this node describe? */
+ enum attrd_sync_point sync_point;
+
+ /* Information required to construct and send a reply to the client. */
+ char *client_id;
+ uint32_t ipc_id;
+ uint32_t flags;
+};
+
+/* A hash table storing information on in-progress IPC requests that are awaiting
+ * confirmations. These requests are currently being processed by peer attrds and
+ * we are waiting to receive confirmation messages from each peer indicating that
+ * processing is complete.
+ *
+ * Multiple requests could be waiting on confirmations at the same time.
+ *
+ * The key is the unique callid for the IPC request, and the value is a
+ * confirmation_action struct.
+ */
+static GHashTable *expected_confirmations = NULL;
+
+/*!
+ * \internal
+ * \brief A structure describing a single IPC request that is awaiting confirmations
+ */
+struct confirmation_action {
+ /*!
+ * \brief A list of peer attrds that we are waiting to receive confirmation
+ * messages from
+ *
+ * This list is dynamic - as confirmations arrive from peer attrds, they will
+ * be removed from this list. When the list is empty, all peers have processed
+ * the request and the associated confirmation action will be taken.
+ */
+ GList *respondents;
+
+ /*!
+ * \brief A timer that will be used to remove the client should it time out
+ * before receiving all confirmations
+ */
+ mainloop_timer_t *timer;
+
+ /*!
+ * \brief A function to run when all confirmations have been received
+ */
+ attrd_confirmation_action_fn fn;
+
+ /*!
+ * \brief Information required to construct and send a reply to the client
+ */
+ char *client_id;
+ uint32_t ipc_id;
+ uint32_t flags;
+
+ /*!
+ * \brief The XML request containing the callid associated with this action
+ */
+ void *xml;
+};
+
+static void
+next_key(void)
+{
+ do {
+ waitlist_client++;
+ if (waitlist_client < 0) {
+ waitlist_client = 1;
+ }
+ } while (g_hash_table_contains(waitlist, GINT_TO_POINTER(waitlist_client)));
+}
+
+static void
+free_waitlist_node(gpointer data)
+{
+ struct waitlist_node *wl = (struct waitlist_node *) data;
+
+ free(wl->client_id);
+ free(wl);
+}
+
+static const char *
+sync_point_str(enum attrd_sync_point sync_point)
+{
+ if (sync_point == attrd_sync_point_local) {
+ return PCMK__VALUE_LOCAL;
+ } else if (sync_point == attrd_sync_point_cluster) {
+ return PCMK__VALUE_CLUSTER;
+ } else {
+ return "unknown";
+ }
+}
+
+/*!
+ * \internal
+ * \brief Add a client to the attrd waitlist
+ *
+ * Typically, a client receives an ACK for its XML IPC request immediately. However,
+ * some clients want to wait until their request has been processed and taken effect.
+ * This is called a sync point. Any client placed on this waitlist will have its
+ * ACK message delayed until either its requested sync point is hit, or until it
+ * times out.
+ *
+ * The XML IPC request must specify the type of sync point it wants to wait for.
+ *
+ * \param[in,out] request The request describing the client to place on the waitlist.
+ */
+void
+attrd_add_client_to_waitlist(pcmk__request_t *request)
+{
+ const char *sync_point = attrd_request_sync_point(request->xml);
+ struct waitlist_node *wl = NULL;
+
+ if (sync_point == NULL) {
+ return;
+ }
+
+ if (waitlist == NULL) {
+ waitlist = pcmk__intkey_table(free_waitlist_node);
+ }
+
+ wl = calloc(sizeof(struct waitlist_node), 1);
+
+ CRM_ASSERT(wl != NULL);
+
+ wl->client_id = strdup(request->ipc_client->id);
+
+ CRM_ASSERT(wl->client_id);
+
+ if (pcmk__str_eq(sync_point, PCMK__VALUE_LOCAL, pcmk__str_none)) {
+ wl->sync_point = attrd_sync_point_local;
+ } else if (pcmk__str_eq(sync_point, PCMK__VALUE_CLUSTER, pcmk__str_none)) {
+ wl->sync_point = attrd_sync_point_cluster;
+ } else {
+ free_waitlist_node(wl);
+ return;
+ }
+
+ wl->ipc_id = request->ipc_id;
+ wl->flags = request->flags;
+
+ next_key();
+ pcmk__intkey_table_insert(waitlist, waitlist_client, wl);
+
+ crm_trace("Added client %s to waitlist for %s sync point",
+ wl->client_id, sync_point_str(wl->sync_point));
+ crm_trace("%d clients now on waitlist", g_hash_table_size(waitlist));
+
+ /* And then add the key to the request XML so we can uniquely identify
+ * it when it comes time to issue the ACK.
+ */
+ crm_xml_add_int(request->xml, XML_LRM_ATTR_CALLID, waitlist_client);
+}
+
+/*!
+ * \internal
+ * \brief Free all memory associated with the waitlist. This is most typically
+ * used when attrd shuts down.
+ */
+void
+attrd_free_waitlist(void)
+{
+ if (waitlist == NULL) {
+ return;
+ }
+
+ g_hash_table_destroy(waitlist);
+ waitlist = NULL;
+}
+
+/*!
+ * \internal
+ * \brief Unconditionally remove a client from the waitlist, such as when the client
+ * node disconnects from the cluster
+ *
+ * \param[in] client The client to remove
+ */
+void
+attrd_remove_client_from_waitlist(pcmk__client_t *client)
+{
+ GHashTableIter iter;
+ gpointer value;
+
+ if (waitlist == NULL) {
+ return;
+ }
+
+ g_hash_table_iter_init(&iter, waitlist);
+
+ while (g_hash_table_iter_next(&iter, NULL, &value)) {
+ struct waitlist_node *wl = (struct waitlist_node *) value;
+
+ if (pcmk__str_eq(wl->client_id, client->id, pcmk__str_none)) {
+ g_hash_table_iter_remove(&iter);
+ crm_trace("%d clients now on waitlist", g_hash_table_size(waitlist));
+ }
+ }
+}
+
+/*!
+ * \internal
+ * \brief Send an IPC ACK message to all awaiting clients
+ *
+ * This function will search the waitlist for all clients that are currently awaiting
+ * an ACK indicating their attrd operation is complete. Only those clients with a
+ * matching sync point type and callid from their original XML IPC request will be
+ * ACKed. Once they have received an ACK, they will be removed from the waitlist.
+ *
+ * \param[in] sync_point What kind of sync point have we hit?
+ * \param[in] xml The original XML IPC request.
+ */
+void
+attrd_ack_waitlist_clients(enum attrd_sync_point sync_point, const xmlNode *xml)
+{
+ int callid;
+ gpointer value;
+
+ if (waitlist == NULL) {
+ return;
+ }
+
+ if (crm_element_value_int(xml, XML_LRM_ATTR_CALLID, &callid) == -1) {
+ crm_warn("Could not get callid from request XML");
+ return;
+ }
+
+ value = pcmk__intkey_table_lookup(waitlist, callid);
+ if (value != NULL) {
+ struct waitlist_node *wl = (struct waitlist_node *) value;
+ pcmk__client_t *client = NULL;
+
+ if (wl->sync_point != sync_point) {
+ return;
+ }
+
+ crm_notice("Alerting client %s for reached %s sync point",
+ wl->client_id, sync_point_str(wl->sync_point));
+
+ client = pcmk__find_client_by_id(wl->client_id);
+ if (client == NULL) {
+ return;
+ }
+
+ attrd_send_ack(client, wl->ipc_id, wl->flags | crm_ipc_client_response);
+
+ /* And then remove the client so it doesn't get alerted again. */
+ pcmk__intkey_table_remove(waitlist, callid);
+
+ crm_trace("%d clients now on waitlist", g_hash_table_size(waitlist));
+ }
+}
+
+/*!
+ * \internal
+ * \brief Action to take when a cluster sync point is hit for a
+ * PCMK__ATTRD_CMD_UPDATE* message.
+ *
+ * \param[in] xml The request that should be passed along to
+ * attrd_ack_waitlist_clients. This should be the original
+ * IPC request containing the callid for this update message.
+ */
+int
+attrd_cluster_sync_point_update(xmlNode *xml)
+{
+ crm_trace("Hit cluster sync point for attribute update");
+ attrd_ack_waitlist_clients(attrd_sync_point_cluster, xml);
+ return pcmk_rc_ok;
+}
+
+/*!
+ * \internal
+ * \brief Return the sync point attribute for an IPC request
+ *
+ * This function will check both the top-level element of \p xml for a sync
+ * point attribute, as well as all of its \p op children, if any. The latter
+ * is useful for newer versions of attrd that can put multiple IPC requests
+ * into a single message.
+ *
+ * \param[in] xml An XML IPC request
+ *
+ * \note It is assumed that if one child element has a sync point attribute,
+ * all will have a sync point attribute and they will all be the same
+ * sync point. No other configuration is supported.
+ *
+ * \return The sync point attribute of \p xml, or NULL if none.
+ */
+const char *
+attrd_request_sync_point(xmlNode *xml)
+{
+ if (xml_has_children(xml)) {
+ xmlNode *child = pcmk__xe_match(xml, XML_ATTR_OP, PCMK__XA_ATTR_SYNC_POINT, NULL);
+
+ if (child) {
+ return crm_element_value(child, PCMK__XA_ATTR_SYNC_POINT);
+ } else {
+ return NULL;
+ }
+
+ } else {
+ return crm_element_value(xml, PCMK__XA_ATTR_SYNC_POINT);
+ }
+}
+
+/*!
+ * \internal
+ * \brief Does an IPC request contain any sync point attribute?
+ *
+ * \param[in] xml An XML IPC request
+ *
+ * \return true if there's a sync point attribute, false otherwise
+ */
+bool
+attrd_request_has_sync_point(xmlNode *xml)
+{
+ return attrd_request_sync_point(xml) != NULL;
+}
+
+static void
+free_action(gpointer data)
+{
+ struct confirmation_action *action = (struct confirmation_action *) data;
+ g_list_free_full(action->respondents, free);
+ mainloop_timer_del(action->timer);
+ free_xml(action->xml);
+ free(action->client_id);
+ free(action);
+}
+
+/* Remove an IPC request from the expected_confirmations table if the peer attrds
+ * don't respond before the timeout is hit. We set the timeout to 15s. The exact
+ * number isn't critical - we just want to make sure that the table eventually gets
+ * cleared of things that didn't complete.
+ */
+static gboolean
+confirmation_timeout_cb(gpointer data)
+{
+ struct confirmation_action *action = (struct confirmation_action *) data;
+
+ GHashTableIter iter;
+ gpointer value;
+
+ if (expected_confirmations == NULL) {
+ return G_SOURCE_REMOVE;
+ }
+
+ g_hash_table_iter_init(&iter, expected_confirmations);
+
+ while (g_hash_table_iter_next(&iter, NULL, &value)) {
+ if (value == action) {
+ pcmk__client_t *client = pcmk__find_client_by_id(action->client_id);
+ if (client == NULL) {
+ return G_SOURCE_REMOVE;
+ }
+
+ crm_trace("Timed out waiting for confirmations for client %s", client->id);
+ pcmk__ipc_send_ack(client, action->ipc_id, action->flags | crm_ipc_client_response,
+ "ack", ATTRD_PROTOCOL_VERSION, CRM_EX_TIMEOUT);
+
+ g_hash_table_iter_remove(&iter);
+ crm_trace("%d requests now in expected confirmations table", g_hash_table_size(expected_confirmations));
+ break;
+ }
+ }
+
+ return G_SOURCE_REMOVE;
+}
+
+/*!
+ * \internal
+ * \brief When a peer disconnects from the cluster, no longer wait for its confirmation
+ * for any IPC action. If this peer is the last one being waited on, this will
+ * trigger the confirmation action.
+ *
+ * \param[in] host The disconnecting peer attrd's uname
+ */
+void
+attrd_do_not_expect_from_peer(const char *host)
+{
+ GList *keys = NULL;
+
+ if (expected_confirmations == NULL) {
+ return;
+ }
+
+ keys = g_hash_table_get_keys(expected_confirmations);
+
+ crm_trace("Removing peer %s from expected confirmations", host);
+
+ for (GList *node = keys; node != NULL; node = node->next) {
+ int callid = *(int *) node->data;
+ attrd_handle_confirmation(callid, host);
+ }
+
+ g_list_free(keys);
+}
+
+/*!
+ * \internal
+ * \brief When a client disconnects from the cluster, no longer wait on confirmations
+ * for it. Because the peer attrds may still be processing the original IPC
+ * message, they may still send us confirmations. However, we will take no
+ * action on them.
+ *
+ * \param[in] client The disconnecting client
+ */
+void
+attrd_do_not_wait_for_client(pcmk__client_t *client)
+{
+ GHashTableIter iter;
+ gpointer value;
+
+ if (expected_confirmations == NULL) {
+ return;
+ }
+
+ g_hash_table_iter_init(&iter, expected_confirmations);
+
+ while (g_hash_table_iter_next(&iter, NULL, &value)) {
+ struct confirmation_action *action = (struct confirmation_action *) value;
+
+ if (pcmk__str_eq(action->client_id, client->id, pcmk__str_none)) {
+ crm_trace("Removing client %s from expected confirmations", client->id);
+ g_hash_table_iter_remove(&iter);
+ crm_trace("%d requests now in expected confirmations table", g_hash_table_size(expected_confirmations));
+ break;
+ }
+ }
+}
+
+/*!
+ * \internal
+ * \brief Register some action to be taken when IPC request confirmations are
+ * received
+ *
+ * When this function is called, a list of all peer attrds that support confirming
+ * requests is generated. As confirmations from these peer attrds are received,
+ * they are removed from this list. When the list is empty, the registered action
+ * will be called.
+ *
+ * \note This function should always be called before attrd_send_message is called
+ * to broadcast to the peers to ensure that we know what replies we are
+ * waiting on. Otherwise, it is possible the peer could finish and confirm
+ * before we know to expect it.
+ *
+ * \param[in] request The request that is awaiting confirmations
+ * \param[in] fn A function to be run after all confirmations are received
+ */
+void
+attrd_expect_confirmations(pcmk__request_t *request, attrd_confirmation_action_fn fn)
+{
+ struct confirmation_action *action = NULL;
+ GHashTableIter iter;
+ gpointer host, ver;
+ GList *respondents = NULL;
+ int callid;
+
+ if (expected_confirmations == NULL) {
+ expected_confirmations = pcmk__intkey_table((GDestroyNotify) free_action);
+ }
+
+ if (crm_element_value_int(request->xml, XML_LRM_ATTR_CALLID, &callid) == -1) {
+ crm_err("Could not get callid from xml");
+ return;
+ }
+
+ if (pcmk__intkey_table_lookup(expected_confirmations, callid)) {
+ crm_err("Already waiting on confirmations for call id %d", callid);
+ return;
+ }
+
+ g_hash_table_iter_init(&iter, peer_protocol_vers);
+ while (g_hash_table_iter_next(&iter, &host, &ver)) {
+ if (ATTRD_SUPPORTS_CONFIRMATION(GPOINTER_TO_INT(ver))) {
+ char *s = strdup((char *) host);
+
+ CRM_ASSERT(s != NULL);
+ respondents = g_list_prepend(respondents, s);
+ }
+ }
+
+ action = calloc(1, sizeof(struct confirmation_action));
+ CRM_ASSERT(action != NULL);
+
+ action->respondents = respondents;
+ action->fn = fn;
+ action->xml = copy_xml(request->xml);
+
+ action->client_id = strdup(request->ipc_client->id);
+ CRM_ASSERT(action->client_id != NULL);
+
+ action->ipc_id = request->ipc_id;
+ action->flags = request->flags;
+
+ action->timer = mainloop_timer_add(NULL, 15000, FALSE, confirmation_timeout_cb, action);
+ mainloop_timer_start(action->timer);
+
+ pcmk__intkey_table_insert(expected_confirmations, callid, action);
+ crm_trace("Callid %d now waiting on %d confirmations", callid, g_list_length(respondents));
+ crm_trace("%d requests now in expected confirmations table", g_hash_table_size(expected_confirmations));
+}
+
+void
+attrd_free_confirmations(void)
+{
+ if (expected_confirmations != NULL) {
+ g_hash_table_destroy(expected_confirmations);
+ expected_confirmations = NULL;
+ }
+}
+
+/*!
+ * \internal
+ * \brief Process a confirmation message from a peer attrd
+ *
+ * This function is called every time a PCMK__ATTRD_CMD_CONFIRM message is
+ * received from a peer attrd. If this is the last confirmation we are waiting
+ * on for a given operation, the registered action will be called.
+ *
+ * \param[in] callid The unique callid for the XML IPC request
+ * \param[in] host The confirming peer attrd's uname
+ */
+void
+attrd_handle_confirmation(int callid, const char *host)
+{
+ struct confirmation_action *action = NULL;
+ GList *node = NULL;
+
+ if (expected_confirmations == NULL) {
+ return;
+ }
+
+ action = pcmk__intkey_table_lookup(expected_confirmations, callid);
+ if (action == NULL) {
+ return;
+ }
+
+ node = g_list_find_custom(action->respondents, host, (GCompareFunc) strcasecmp);
+
+ if (node == NULL) {
+ return;
+ }
+
+ action->respondents = g_list_remove(action->respondents, node->data);
+ crm_trace("Callid %d now waiting on %d confirmations", callid, g_list_length(action->respondents));
+
+ if (action->respondents == NULL) {
+ action->fn(action->xml);
+ pcmk__intkey_table_remove(expected_confirmations, callid);
+ crm_trace("%d requests now in expected confirmations table", g_hash_table_size(expected_confirmations));
+ }
+}