diff options
Diffstat (limited to 'daemons/attrd/attrd_sync.c')
-rw-r--r-- | daemons/attrd/attrd_sync.c | 577 |
1 files changed, 577 insertions, 0 deletions
diff --git a/daemons/attrd/attrd_sync.c b/daemons/attrd/attrd_sync.c new file mode 100644 index 0000000..d59ddd5 --- /dev/null +++ b/daemons/attrd/attrd_sync.c @@ -0,0 +1,577 @@ +/* + * Copyright 2022-2023 the Pacemaker project contributors + * + * The version control history for this file may have further details. + * + * This source code is licensed under the GNU General Public License version 2 + * or later (GPLv2+) WITHOUT ANY WARRANTY. + */ + +#include <crm_internal.h> + +#include <crm/msg_xml.h> +#include <crm/common/attrd_internal.h> + +#include "pacemaker-attrd.h" + +/* A hash table storing clients that are waiting on a sync point to be reached. + * The key is waitlist_client - just a plain int. The obvious key would be + * the IPC client's ID, but this is not guaranteed to be unique. A single client + * could be waiting on a sync point for multiple attributes at the same time. + * + * It is not expected that this hash table will ever be especially large. + */ +static GHashTable *waitlist = NULL; +static int waitlist_client = 0; + +struct waitlist_node { + /* What kind of sync point does this node describe? */ + enum attrd_sync_point sync_point; + + /* Information required to construct and send a reply to the client. */ + char *client_id; + uint32_t ipc_id; + uint32_t flags; +}; + +/* A hash table storing information on in-progress IPC requests that are awaiting + * confirmations. These requests are currently being processed by peer attrds and + * we are waiting to receive confirmation messages from each peer indicating that + * processing is complete. + * + * Multiple requests could be waiting on confirmations at the same time. + * + * The key is the unique callid for the IPC request, and the value is a + * confirmation_action struct. + */ +static GHashTable *expected_confirmations = NULL; + +/*! + * \internal + * \brief A structure describing a single IPC request that is awaiting confirmations + */ +struct confirmation_action { + /*! + * \brief A list of peer attrds that we are waiting to receive confirmation + * messages from + * + * This list is dynamic - as confirmations arrive from peer attrds, they will + * be removed from this list. When the list is empty, all peers have processed + * the request and the associated confirmation action will be taken. + */ + GList *respondents; + + /*! + * \brief A timer that will be used to remove the client should it time out + * before receiving all confirmations + */ + mainloop_timer_t *timer; + + /*! + * \brief A function to run when all confirmations have been received + */ + attrd_confirmation_action_fn fn; + + /*! + * \brief Information required to construct and send a reply to the client + */ + char *client_id; + uint32_t ipc_id; + uint32_t flags; + + /*! + * \brief The XML request containing the callid associated with this action + */ + void *xml; +}; + +static void +next_key(void) +{ + do { + waitlist_client++; + if (waitlist_client < 0) { + waitlist_client = 1; + } + } while (g_hash_table_contains(waitlist, GINT_TO_POINTER(waitlist_client))); +} + +static void +free_waitlist_node(gpointer data) +{ + struct waitlist_node *wl = (struct waitlist_node *) data; + + free(wl->client_id); + free(wl); +} + +static const char * +sync_point_str(enum attrd_sync_point sync_point) +{ + if (sync_point == attrd_sync_point_local) { + return PCMK__VALUE_LOCAL; + } else if (sync_point == attrd_sync_point_cluster) { + return PCMK__VALUE_CLUSTER; + } else { + return "unknown"; + } +} + +/*! + * \internal + * \brief Add a client to the attrd waitlist + * + * Typically, a client receives an ACK for its XML IPC request immediately. However, + * some clients want to wait until their request has been processed and taken effect. + * This is called a sync point. Any client placed on this waitlist will have its + * ACK message delayed until either its requested sync point is hit, or until it + * times out. + * + * The XML IPC request must specify the type of sync point it wants to wait for. + * + * \param[in,out] request The request describing the client to place on the waitlist. + */ +void +attrd_add_client_to_waitlist(pcmk__request_t *request) +{ + const char *sync_point = attrd_request_sync_point(request->xml); + struct waitlist_node *wl = NULL; + + if (sync_point == NULL) { + return; + } + + if (waitlist == NULL) { + waitlist = pcmk__intkey_table(free_waitlist_node); + } + + wl = calloc(sizeof(struct waitlist_node), 1); + + CRM_ASSERT(wl != NULL); + + wl->client_id = strdup(request->ipc_client->id); + + CRM_ASSERT(wl->client_id); + + if (pcmk__str_eq(sync_point, PCMK__VALUE_LOCAL, pcmk__str_none)) { + wl->sync_point = attrd_sync_point_local; + } else if (pcmk__str_eq(sync_point, PCMK__VALUE_CLUSTER, pcmk__str_none)) { + wl->sync_point = attrd_sync_point_cluster; + } else { + free_waitlist_node(wl); + return; + } + + wl->ipc_id = request->ipc_id; + wl->flags = request->flags; + + next_key(); + pcmk__intkey_table_insert(waitlist, waitlist_client, wl); + + crm_trace("Added client %s to waitlist for %s sync point", + wl->client_id, sync_point_str(wl->sync_point)); + crm_trace("%d clients now on waitlist", g_hash_table_size(waitlist)); + + /* And then add the key to the request XML so we can uniquely identify + * it when it comes time to issue the ACK. + */ + crm_xml_add_int(request->xml, XML_LRM_ATTR_CALLID, waitlist_client); +} + +/*! + * \internal + * \brief Free all memory associated with the waitlist. This is most typically + * used when attrd shuts down. + */ +void +attrd_free_waitlist(void) +{ + if (waitlist == NULL) { + return; + } + + g_hash_table_destroy(waitlist); + waitlist = NULL; +} + +/*! + * \internal + * \brief Unconditionally remove a client from the waitlist, such as when the client + * node disconnects from the cluster + * + * \param[in] client The client to remove + */ +void +attrd_remove_client_from_waitlist(pcmk__client_t *client) +{ + GHashTableIter iter; + gpointer value; + + if (waitlist == NULL) { + return; + } + + g_hash_table_iter_init(&iter, waitlist); + + while (g_hash_table_iter_next(&iter, NULL, &value)) { + struct waitlist_node *wl = (struct waitlist_node *) value; + + if (pcmk__str_eq(wl->client_id, client->id, pcmk__str_none)) { + g_hash_table_iter_remove(&iter); + crm_trace("%d clients now on waitlist", g_hash_table_size(waitlist)); + } + } +} + +/*! + * \internal + * \brief Send an IPC ACK message to all awaiting clients + * + * This function will search the waitlist for all clients that are currently awaiting + * an ACK indicating their attrd operation is complete. Only those clients with a + * matching sync point type and callid from their original XML IPC request will be + * ACKed. Once they have received an ACK, they will be removed from the waitlist. + * + * \param[in] sync_point What kind of sync point have we hit? + * \param[in] xml The original XML IPC request. + */ +void +attrd_ack_waitlist_clients(enum attrd_sync_point sync_point, const xmlNode *xml) +{ + int callid; + gpointer value; + + if (waitlist == NULL) { + return; + } + + if (crm_element_value_int(xml, XML_LRM_ATTR_CALLID, &callid) == -1) { + crm_warn("Could not get callid from request XML"); + return; + } + + value = pcmk__intkey_table_lookup(waitlist, callid); + if (value != NULL) { + struct waitlist_node *wl = (struct waitlist_node *) value; + pcmk__client_t *client = NULL; + + if (wl->sync_point != sync_point) { + return; + } + + crm_notice("Alerting client %s for reached %s sync point", + wl->client_id, sync_point_str(wl->sync_point)); + + client = pcmk__find_client_by_id(wl->client_id); + if (client == NULL) { + return; + } + + attrd_send_ack(client, wl->ipc_id, wl->flags | crm_ipc_client_response); + + /* And then remove the client so it doesn't get alerted again. */ + pcmk__intkey_table_remove(waitlist, callid); + + crm_trace("%d clients now on waitlist", g_hash_table_size(waitlist)); + } +} + +/*! + * \internal + * \brief Action to take when a cluster sync point is hit for a + * PCMK__ATTRD_CMD_UPDATE* message. + * + * \param[in] xml The request that should be passed along to + * attrd_ack_waitlist_clients. This should be the original + * IPC request containing the callid for this update message. + */ +int +attrd_cluster_sync_point_update(xmlNode *xml) +{ + crm_trace("Hit cluster sync point for attribute update"); + attrd_ack_waitlist_clients(attrd_sync_point_cluster, xml); + return pcmk_rc_ok; +} + +/*! + * \internal + * \brief Return the sync point attribute for an IPC request + * + * This function will check both the top-level element of \p xml for a sync + * point attribute, as well as all of its \p op children, if any. The latter + * is useful for newer versions of attrd that can put multiple IPC requests + * into a single message. + * + * \param[in] xml An XML IPC request + * + * \note It is assumed that if one child element has a sync point attribute, + * all will have a sync point attribute and they will all be the same + * sync point. No other configuration is supported. + * + * \return The sync point attribute of \p xml, or NULL if none. + */ +const char * +attrd_request_sync_point(xmlNode *xml) +{ + if (xml_has_children(xml)) { + xmlNode *child = pcmk__xe_match(xml, XML_ATTR_OP, PCMK__XA_ATTR_SYNC_POINT, NULL); + + if (child) { + return crm_element_value(child, PCMK__XA_ATTR_SYNC_POINT); + } else { + return NULL; + } + + } else { + return crm_element_value(xml, PCMK__XA_ATTR_SYNC_POINT); + } +} + +/*! + * \internal + * \brief Does an IPC request contain any sync point attribute? + * + * \param[in] xml An XML IPC request + * + * \return true if there's a sync point attribute, false otherwise + */ +bool +attrd_request_has_sync_point(xmlNode *xml) +{ + return attrd_request_sync_point(xml) != NULL; +} + +static void +free_action(gpointer data) +{ + struct confirmation_action *action = (struct confirmation_action *) data; + g_list_free_full(action->respondents, free); + mainloop_timer_del(action->timer); + free_xml(action->xml); + free(action->client_id); + free(action); +} + +/* Remove an IPC request from the expected_confirmations table if the peer attrds + * don't respond before the timeout is hit. We set the timeout to 15s. The exact + * number isn't critical - we just want to make sure that the table eventually gets + * cleared of things that didn't complete. + */ +static gboolean +confirmation_timeout_cb(gpointer data) +{ + struct confirmation_action *action = (struct confirmation_action *) data; + + GHashTableIter iter; + gpointer value; + + if (expected_confirmations == NULL) { + return G_SOURCE_REMOVE; + } + + g_hash_table_iter_init(&iter, expected_confirmations); + + while (g_hash_table_iter_next(&iter, NULL, &value)) { + if (value == action) { + pcmk__client_t *client = pcmk__find_client_by_id(action->client_id); + if (client == NULL) { + return G_SOURCE_REMOVE; + } + + crm_trace("Timed out waiting for confirmations for client %s", client->id); + pcmk__ipc_send_ack(client, action->ipc_id, action->flags | crm_ipc_client_response, + "ack", ATTRD_PROTOCOL_VERSION, CRM_EX_TIMEOUT); + + g_hash_table_iter_remove(&iter); + crm_trace("%d requests now in expected confirmations table", g_hash_table_size(expected_confirmations)); + break; + } + } + + return G_SOURCE_REMOVE; +} + +/*! + * \internal + * \brief When a peer disconnects from the cluster, no longer wait for its confirmation + * for any IPC action. If this peer is the last one being waited on, this will + * trigger the confirmation action. + * + * \param[in] host The disconnecting peer attrd's uname + */ +void +attrd_do_not_expect_from_peer(const char *host) +{ + GList *keys = NULL; + + if (expected_confirmations == NULL) { + return; + } + + keys = g_hash_table_get_keys(expected_confirmations); + + crm_trace("Removing peer %s from expected confirmations", host); + + for (GList *node = keys; node != NULL; node = node->next) { + int callid = *(int *) node->data; + attrd_handle_confirmation(callid, host); + } + + g_list_free(keys); +} + +/*! + * \internal + * \brief When a client disconnects from the cluster, no longer wait on confirmations + * for it. Because the peer attrds may still be processing the original IPC + * message, they may still send us confirmations. However, we will take no + * action on them. + * + * \param[in] client The disconnecting client + */ +void +attrd_do_not_wait_for_client(pcmk__client_t *client) +{ + GHashTableIter iter; + gpointer value; + + if (expected_confirmations == NULL) { + return; + } + + g_hash_table_iter_init(&iter, expected_confirmations); + + while (g_hash_table_iter_next(&iter, NULL, &value)) { + struct confirmation_action *action = (struct confirmation_action *) value; + + if (pcmk__str_eq(action->client_id, client->id, pcmk__str_none)) { + crm_trace("Removing client %s from expected confirmations", client->id); + g_hash_table_iter_remove(&iter); + crm_trace("%d requests now in expected confirmations table", g_hash_table_size(expected_confirmations)); + break; + } + } +} + +/*! + * \internal + * \brief Register some action to be taken when IPC request confirmations are + * received + * + * When this function is called, a list of all peer attrds that support confirming + * requests is generated. As confirmations from these peer attrds are received, + * they are removed from this list. When the list is empty, the registered action + * will be called. + * + * \note This function should always be called before attrd_send_message is called + * to broadcast to the peers to ensure that we know what replies we are + * waiting on. Otherwise, it is possible the peer could finish and confirm + * before we know to expect it. + * + * \param[in] request The request that is awaiting confirmations + * \param[in] fn A function to be run after all confirmations are received + */ +void +attrd_expect_confirmations(pcmk__request_t *request, attrd_confirmation_action_fn fn) +{ + struct confirmation_action *action = NULL; + GHashTableIter iter; + gpointer host, ver; + GList *respondents = NULL; + int callid; + + if (expected_confirmations == NULL) { + expected_confirmations = pcmk__intkey_table((GDestroyNotify) free_action); + } + + if (crm_element_value_int(request->xml, XML_LRM_ATTR_CALLID, &callid) == -1) { + crm_err("Could not get callid from xml"); + return; + } + + if (pcmk__intkey_table_lookup(expected_confirmations, callid)) { + crm_err("Already waiting on confirmations for call id %d", callid); + return; + } + + g_hash_table_iter_init(&iter, peer_protocol_vers); + while (g_hash_table_iter_next(&iter, &host, &ver)) { + if (ATTRD_SUPPORTS_CONFIRMATION(GPOINTER_TO_INT(ver))) { + char *s = strdup((char *) host); + + CRM_ASSERT(s != NULL); + respondents = g_list_prepend(respondents, s); + } + } + + action = calloc(1, sizeof(struct confirmation_action)); + CRM_ASSERT(action != NULL); + + action->respondents = respondents; + action->fn = fn; + action->xml = copy_xml(request->xml); + + action->client_id = strdup(request->ipc_client->id); + CRM_ASSERT(action->client_id != NULL); + + action->ipc_id = request->ipc_id; + action->flags = request->flags; + + action->timer = mainloop_timer_add(NULL, 15000, FALSE, confirmation_timeout_cb, action); + mainloop_timer_start(action->timer); + + pcmk__intkey_table_insert(expected_confirmations, callid, action); + crm_trace("Callid %d now waiting on %d confirmations", callid, g_list_length(respondents)); + crm_trace("%d requests now in expected confirmations table", g_hash_table_size(expected_confirmations)); +} + +void +attrd_free_confirmations(void) +{ + if (expected_confirmations != NULL) { + g_hash_table_destroy(expected_confirmations); + expected_confirmations = NULL; + } +} + +/*! + * \internal + * \brief Process a confirmation message from a peer attrd + * + * This function is called every time a PCMK__ATTRD_CMD_CONFIRM message is + * received from a peer attrd. If this is the last confirmation we are waiting + * on for a given operation, the registered action will be called. + * + * \param[in] callid The unique callid for the XML IPC request + * \param[in] host The confirming peer attrd's uname + */ +void +attrd_handle_confirmation(int callid, const char *host) +{ + struct confirmation_action *action = NULL; + GList *node = NULL; + + if (expected_confirmations == NULL) { + return; + } + + action = pcmk__intkey_table_lookup(expected_confirmations, callid); + if (action == NULL) { + return; + } + + node = g_list_find_custom(action->respondents, host, (GCompareFunc) strcasecmp); + + if (node == NULL) { + return; + } + + action->respondents = g_list_remove(action->respondents, node->data); + crm_trace("Callid %d now waiting on %d confirmations", callid, g_list_length(action->respondents)); + + if (action->respondents == NULL) { + action->fn(action->xml); + pcmk__intkey_table_remove(expected_confirmations, callid); + crm_trace("%d requests now in expected confirmations table", g_hash_table_size(expected_confirmations)); + } +} |