diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-06-03 13:39:29 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-06-03 13:39:29 +0000 |
commit | b41961d74fe7ff2d4d4abaca92454e87c561e49f (patch) | |
tree | b34e3826a7b649dafdbd05081140c990c96d736d /lib/cluster/cpg.c | |
parent | Releasing progress-linux version 2.1.7-1~progress7.99u1. (diff) | |
download | pacemaker-b41961d74fe7ff2d4d4abaca92454e87c561e49f.tar.xz pacemaker-b41961d74fe7ff2d4d4abaca92454e87c561e49f.zip |
Merging upstream version 2.1.8~rc1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | lib/cluster/cpg.c | 472 |
1 files changed, 292 insertions, 180 deletions
diff --git a/lib/cluster/cpg.c b/lib/cluster/cpg.c index d1decc6..62d39a6 100644 --- a/lib/cluster/cpg.c +++ b/lib/cluster/cpg.c @@ -1,5 +1,5 @@ /* - * Copyright 2004-2023 the Pacemaker project contributors + * Copyright 2004-2024 the Pacemaker project contributors * * The version control history for this file may have further details. * @@ -8,37 +8,40 @@ */ #include <crm_internal.h> -#include <bzlib.h> -#include <sys/socket.h> -#include <netinet/in.h> + #include <arpa/inet.h> +#include <inttypes.h> // PRIu32 #include <netdb.h> - -#include <crm/common/ipc.h> -#include <crm/cluster/internal.h> -#include <crm/common/mainloop.h> +#include <netinet/in.h> +#include <stdbool.h> +#include <stdint.h> // uint32_t +#include <sys/socket.h> +#include <sys/types.h> // size_t #include <sys/utsname.h> -#include <qb/qbipc_common.h> -#include <qb/qbipcc.h> -#include <qb/qbutil.h> - +#include <bzlib.h> #include <corosync/corodefs.h> #include <corosync/corotypes.h> #include <corosync/hdb.h> #include <corosync/cpg.h> +#include <qb/qbipc_common.h> +#include <qb/qbipcc.h> +#include <qb/qbutil.h> -#include <crm/msg_xml.h> +#include <crm/cluster/internal.h> +#include <crm/common/ipc.h> +#include <crm/common/ipc_internal.h> // PCMK__SPECIAL_PID +#include <crm/common/mainloop.h> +#include <crm/common/xml.h> -#include <crm/common/ipc_internal.h> /* PCMK__SPECIAL_PID* */ #include "crmcluster_private.h" -/* @TODO Once we can update the public API to require crm_cluster_t* in more +/* @TODO Once we can update the public API to require pcmk_cluster_t* in more * functions, we can ditch this in favor of cluster->cpg_handle. */ static cpg_handle_t pcmk_cpg_handle = 0; -// @TODO These could be moved to crm_cluster_t* at that time as well +// @TODO These could be moved to pcmk_cluster_t* at that time as well static bool cpg_evicted = false; static GList *cs_message_queue = NULL; static int cs_message_timer = 0; @@ -87,26 +90,7 @@ static void crm_cs_flush(gpointer data); } while (counter < max) /*! - * \brief Disconnect from Corosync CPG - * - * \param[in,out] cluster Cluster to disconnect - */ -void -cluster_disconnect_cpg(crm_cluster_t *cluster) -{ - pcmk_cpg_handle = 0; - if (cluster->cpg_handle) { - crm_trace("Disconnecting CPG"); - cpg_leave(cluster->cpg_handle, &cluster->group); - cpg_finalize(cluster->cpg_handle); - cluster->cpg_handle = 0; - - } else { - crm_info("No CPG connection"); - } -} - -/*! + * \internal * \brief Get the local Corosync node ID (via CPG) * * \param[in] handle CPG connection to use (or 0 to use new connection) @@ -114,7 +98,7 @@ cluster_disconnect_cpg(crm_cluster_t *cluster) * \return Corosync ID of local node (or 0 if not known) */ uint32_t -get_local_nodeid(cpg_handle_t handle) +pcmk__cpg_local_nodeid(cpg_handle_t handle) { cs_error_t rc = CS_OK; int retries = 0; @@ -125,15 +109,18 @@ get_local_nodeid(cpg_handle_t handle) uid_t found_uid = 0; gid_t found_gid = 0; pid_t found_pid = 0; - int rv; + int rv = 0; - if(local_nodeid != 0) { + if (local_nodeid != 0) { return local_nodeid; } - if(handle == 0) { + if (handle == 0) { crm_trace("Creating connection"); - cs_repeat(rc, retries, 5, cpg_model_initialize(&local_handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL)); + cs_repeat(rc, retries, 5, + cpg_model_initialize(&local_handle, CPG_MODEL_V1, + (cpg_model_data_t *) &cpg_model_info, + NULL)); if (rc != CS_OK) { crm_err("Could not connect to the CPG API: %s (%d)", cs_strerror(rc), rc); @@ -147,14 +134,16 @@ get_local_nodeid(cpg_handle_t handle) goto bail; } - /* CPG provider run as root (in given user namespace, anyway)? */ - if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid, - &found_uid, &found_gid))) { + // CPG provider run as root (at least in given user namespace)? + rv = crm_ipc_is_authentic_process(fd, (uid_t) 0, (gid_t) 0, &found_pid, + &found_uid, &found_gid); + if (rv == 0) { crm_err("CPG provider is not authentic:" " process %lld (uid: %lld, gid: %lld)", (long long) PCMK__SPECIAL_PID_AS_0(found_pid), (long long) found_uid, (long long) found_gid); goto bail; + } else if (rv < 0) { crm_err("Could not verify authenticity of CPG provider: %s (%d)", strerror(-rv), -rv); @@ -174,7 +163,7 @@ get_local_nodeid(cpg_handle_t handle) } bail: - if(handle == 0) { + if (handle == 0) { crm_trace("Closing connection"); cpg_finalize(local_handle); } @@ -279,7 +268,7 @@ static int pcmk_cpg_dispatch(gpointer user_data) { cs_error_t rc = CS_OK; - crm_cluster_t *cluster = (crm_cluster_t *) user_data; + pcmk_cluster_t *cluster = (pcmk_cluster_t *) user_data; rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE); if (rc != CS_OK) { @@ -422,59 +411,64 @@ check_message_sanity(const pcmk__cpg_msg_t *msg) } /*! + * \internal * \brief Extract text data from a Corosync CPG message * - * \param[in] handle CPG connection (to get local node ID if not known) - * \param[in] nodeid Corosync ID of node that sent message - * \param[in] pid Process ID of message sender (for logging only) - * \param[in,out] content CPG message - * \param[out] kind If not NULL, will be set to CPG header ID - * (which should be an enum crm_ais_msg_class value, - * currently always crm_class_cluster) - * \param[out] from If not NULL, will be set to sender uname - * (valid for the lifetime of \p content) + * \param[in] handle CPG connection (to get local node ID if not known) + * \param[in] sender_id Corosync ID of node that sent message + * \param[in] pid Process ID of message sender (for logging only) + * \param[in,out] content CPG message + * \param[out] kind If not \c NULL, will be set to CPG header ID + * (which should be an <tt>enum crm_ais_msg_class</tt> + * value, currently always \c crm_class_cluster) + * \param[out] from If not \c NULL, will be set to sender uname + * (valid for the lifetime of \p content) * * \return Newly allocated string with message data - * \note It is the caller's responsibility to free the return value with free(). + * + * \note The caller is responsible for freeing the return value using \c free(). */ char * -pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content, - uint32_t *kind, const char **from) +pcmk__cpg_message_data(cpg_handle_t handle, uint32_t sender_id, uint32_t pid, + void *content, uint32_t *kind, const char **from) { char *data = NULL; - pcmk__cpg_msg_t *msg = (pcmk__cpg_msg_t *) content; + pcmk__cpg_msg_t *msg = content; - if(handle) { + if (handle != 0) { // Do filtering and field massaging - uint32_t local_nodeid = get_local_nodeid(handle); - const char *local_name = get_local_node_name(); + uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle); + const char *local_name = pcmk__cluster_local_node_name(); - if (msg->sender.id > 0 && msg->sender.id != nodeid) { - crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id); + if ((msg->sender.id != 0) && (msg->sender.id != sender_id)) { + crm_err("Nodeid mismatch from %" PRIu32 ".%" PRIu32 + ": claimed nodeid=%" PRIu32, + sender_id, pid, msg->sender.id); return NULL; - - } else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) { - /* Not for us */ - crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid); + } + if ((msg->host.id != 0) && (local_nodeid != msg->host.id)) { + crm_trace("Not for us: %" PRIu32" != %" PRIu32, + msg->host.id, local_nodeid); return NULL; - } else if (msg->host.size != 0 && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) { - /* Not for us */ + } + if ((msg->host.size > 0) + && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) { + crm_trace("Not for us: %s != %s", msg->host.uname, local_name); return NULL; } - msg->sender.id = nodeid; + msg->sender.id = sender_id; if (msg->sender.size == 0) { - crm_node_t *peer = crm_get_peer(nodeid, NULL); - - if (peer == NULL) { - crm_err("Peer with nodeid=%u is unknown", nodeid); + const crm_node_t *peer = + pcmk__get_node(sender_id, NULL, NULL, + pcmk__node_search_cluster_member); - } else if (peer->uname == NULL) { - crm_err("No uname for peer with nodeid=%u", nodeid); + if (peer->uname == NULL) { + crm_err("No uname for peer with nodeid=%u", sender_id); } else { - crm_notice("Fixing uname for peer with nodeid=%u", nodeid); + crm_notice("Fixing uname for peer with nodeid=%u", sender_id); msg->sender.size = strlen(peer->uname); memset(msg->sender.uname, 0, MAX_NAME); memcpy(msg->sender.uname, peer->uname, msg->sender.size); @@ -493,7 +487,7 @@ pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *from = msg->sender.uname; } - if (msg->is_compressed && msg->size > 0) { + if (msg->is_compressed && (msg->size > 0)) { int rc = BZ_OK; char *uncompressed = NULL; unsigned int new_size = msg->size + 1; @@ -503,13 +497,15 @@ pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void } crm_trace("Decompressing message data"); - uncompressed = calloc(1, new_size); - rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0); + uncompressed = pcmk__assert_alloc(1, new_size); + rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, + msg->compressed_size, 1, 0); rc = pcmk__bzlib2rc(rc); if (rc != pcmk_rc_ok) { - crm_err("Decompression failed: %s " CRM_XS " rc=%d", pcmk_rc_str(rc), rc); + crm_err("Decompression failed: %s " CRM_XS " rc=%d", + pcmk_rc_str(rc), rc); free(uncompressed); goto badmsg; } @@ -526,7 +522,8 @@ pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void } // Is this necessary? - crm_get_peer(msg->sender.id, msg->sender.uname); + pcmk__get_node(msg->sender.id, msg->sender.uname, NULL, + pcmk__node_search_cluster_member); crm_trace("Payload: %.200s", data); return data; @@ -627,8 +624,9 @@ node_left(const char *cpg_group_name, int event_counter, const struct cpg_address **sorted_member_list, size_t member_list_entries) { - crm_node_t *peer = pcmk__search_cluster_node_cache(cpg_peer->nodeid, - NULL, NULL); + crm_node_t *peer = + pcmk__search_node_caches(cpg_peer->nodeid, NULL, + pcmk__node_search_cluster_member); const struct cpg_address **rival = NULL; /* Most CPG-related Pacemaker code assumes that only one process on a node @@ -656,7 +654,7 @@ node_left(const char *cpg_group_name, int event_counter, cpgreason2str(cpg_peer->reason)); if (peer != NULL) { crm_update_peer_proc(__func__, peer, crm_proc_cpg, - OFFLINESTATUS); + PCMK_VALUE_OFFLINE); } } else if (cpg_peer->nodeid == local_nodeid) { crm_warn("Group %s event %d: duplicate local pid %u left%s", @@ -672,72 +670,81 @@ node_left(const char *cpg_group_name, int event_counter, } /*! + * \internal * \brief Handle a CPG configuration change event * * \param[in] handle CPG connection - * \param[in] cpg_name CPG group name + * \param[in] group_name CPG group name * \param[in] member_list List of current CPG members * \param[in] member_list_entries Number of entries in \p member_list * \param[in] left_list List of CPG members that left * \param[in] left_list_entries Number of entries in \p left_list * \param[in] joined_list List of CPG members that joined * \param[in] joined_list_entries Number of entries in \p joined_list + * + * \note This is of type \c cpg_confchg_fn_t, intended to be used in a + * \c cpg_callbacks_t object. */ void -pcmk_cpg_membership(cpg_handle_t handle, - const struct cpg_name *groupName, - const struct cpg_address *member_list, size_t member_list_entries, - const struct cpg_address *left_list, size_t left_list_entries, - const struct cpg_address *joined_list, size_t joined_list_entries) +pcmk__cpg_confchg_cb(cpg_handle_t handle, + const struct cpg_name *group_name, + const struct cpg_address *member_list, + size_t member_list_entries, + const struct cpg_address *left_list, + size_t left_list_entries, + const struct cpg_address *joined_list, + size_t joined_list_entries) { - int i; - gboolean found = FALSE; static int counter = 0; - uint32_t local_nodeid = get_local_nodeid(handle); - const struct cpg_address **sorted; - sorted = malloc(member_list_entries * sizeof(const struct cpg_address *)); - CRM_ASSERT(sorted != NULL); + bool found = false; + uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle); + const struct cpg_address **sorted = NULL; + + sorted = pcmk__assert_alloc(member_list_entries, + sizeof(const struct cpg_address *)); for (size_t iter = 0; iter < member_list_entries; iter++) { sorted[iter] = member_list + iter; } - /* so that the cross-matching multiply-subscribed nodes is then cheap */ + + // So that the cross-matching of multiply-subscribed nodes is then cheap qsort(sorted, member_list_entries, sizeof(const struct cpg_address *), cmp_member_list_nodeid); - for (i = 0; i < left_list_entries; i++) { - node_left(groupName->value, counter, local_nodeid, &left_list[i], + for (int i = 0; i < left_list_entries; i++) { + node_left(group_name->value, counter, local_nodeid, &left_list[i], sorted, member_list_entries); } free(sorted); sorted = NULL; - for (i = 0; i < joined_list_entries; i++) { + for (int i = 0; i < joined_list_entries; i++) { crm_info("Group %s event %d: node %u pid %u joined%s", - groupName->value, counter, joined_list[i].nodeid, + group_name->value, counter, joined_list[i].nodeid, joined_list[i].pid, cpgreason2str(joined_list[i].reason)); } - for (i = 0; i < member_list_entries; i++) { - crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL); + for (int i = 0; i < member_list_entries; i++) { + crm_node_t *peer = pcmk__get_node(member_list[i].nodeid, NULL, NULL, + pcmk__node_search_cluster_member); if (member_list[i].nodeid == local_nodeid && member_list[i].pid != getpid()) { // See the note in node_left() crm_warn("Group %s event %d: detected duplicate local pid %u", - groupName->value, counter, member_list[i].pid); + group_name->value, counter, member_list[i].pid); continue; } crm_info("Group %s event %d: %s (node %u pid %u) is member", - groupName->value, counter, peer_name(peer), + group_name->value, counter, peer_name(peer), member_list[i].nodeid, member_list[i].pid); /* If the caller left auto-reaping enabled, this will also update the * state to member. */ peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg, - ONLINESTATUS); + PCMK_VALUE_ONLINE); if (peer && peer->state && strcmp(peer->state, CRM_NODE_MEMBER)) { /* The node is a CPG member, but we currently think it's not a @@ -755,19 +762,20 @@ pcmk_cpg_membership(cpg_handle_t handle, } else if (now > (peer->when_lost + 60)) { // If it persists for more than a minute, update the state - crm_warn("Node %u is member of group %s but was believed offline", - member_list[i].nodeid, groupName->value); + crm_warn("Node %u is member of group %s but was believed " + "offline", + member_list[i].nodeid, group_name->value); pcmk__update_peer_state(__func__, peer, CRM_NODE_MEMBER, 0); } } if (local_nodeid == member_list[i].nodeid) { - found = TRUE; + found = true; } } if (!found) { - crm_err("Local node was evicted from group %s", groupName->value); + crm_err("Local node was evicted from group %s", group_name->value); cpg_evicted = true; } @@ -775,14 +783,50 @@ pcmk_cpg_membership(cpg_handle_t handle, } /*! - * \brief Connect to Corosync CPG + * \brief Set the CPG deliver callback function for a cluster object * * \param[in,out] cluster Cluster object + * \param[in] fn Deliver callback function to set * - * \return TRUE on success, otherwise FALSE + * \return Standard Pacemaker return code */ -gboolean -cluster_connect_cpg(crm_cluster_t *cluster) +int +pcmk_cpg_set_deliver_fn(pcmk_cluster_t *cluster, cpg_deliver_fn_t fn) +{ + if (cluster == NULL) { + return EINVAL; + } + cluster->cpg.cpg_deliver_fn = fn; + return pcmk_rc_ok; +} + +/*! + * \brief Set the CPG config change callback function for a cluster object + * + * \param[in,out] cluster Cluster object + * \param[in] fn Configuration change callback function to set + * + * \return Standard Pacemaker return code + */ +int +pcmk_cpg_set_confchg_fn(pcmk_cluster_t *cluster, cpg_confchg_fn_t fn) +{ + if (cluster == NULL) { + return EINVAL; + } + cluster->cpg.cpg_confchg_fn = fn; + return pcmk_rc_ok; +} + +/*! + * \brief Connect to Corosync CPG + * + * \param[in,out] cluster Initialized cluster object to connect + * + * \return Standard Pacemaker return code + */ +int +pcmk__cpg_connect(pcmk_cluster_t *cluster) { cs_error_t rc; int fd = -1; @@ -848,7 +892,7 @@ cluster_connect_cpg(crm_cluster_t *cluster) goto bail; } - id = get_local_nodeid(handle); + id = pcmk__cpg_local_nodeid(handle); if (id == 0) { crm_err("Could not get local node id from the CPG API"); goto bail; @@ -870,54 +914,52 @@ cluster_connect_cpg(crm_cluster_t *cluster) bail: if (rc != CS_OK) { cpg_finalize(handle); - return FALSE; + // @TODO Map rc to more specific Pacemaker return code + return ENOTCONN; } - peer = crm_get_peer(id, NULL); - crm_update_peer_proc(__func__, peer, crm_proc_cpg, ONLINESTATUS); - return TRUE; + peer = pcmk__get_node(id, NULL, NULL, pcmk__node_search_cluster_member); + crm_update_peer_proc(__func__, peer, crm_proc_cpg, PCMK_VALUE_ONLINE); + return pcmk_rc_ok; } /*! * \internal - * \brief Send an XML message via Corosync CPG - * - * \param[in] msg XML message to send - * \param[in] node Cluster node to send message to - * \param[in] dest Type of message to send + * \brief Disconnect from Corosync CPG * - * \return TRUE on success, otherwise FALSE + * \param[in,out] cluster Cluster object to disconnect */ -bool -pcmk__cpg_send_xml(const xmlNode *msg, const crm_node_t *node, - enum crm_ais_msg_types dest) +void +pcmk__cpg_disconnect(pcmk_cluster_t *cluster) { - bool rc = true; - char *data = NULL; + pcmk_cpg_handle = 0; + if (cluster->cpg_handle != 0) { + crm_trace("Disconnecting CPG"); + cpg_leave(cluster->cpg_handle, &cluster->group); + cpg_finalize(cluster->cpg_handle); + cluster->cpg_handle = 0; - data = dump_xml_unformatted(msg); - rc = send_cluster_text(crm_class_cluster, data, FALSE, node, dest); - free(data); - return rc; + } else { + crm_info("No CPG connection"); + } } /*! * \internal * \brief Send string data via Corosync CPG * - * \param[in] msg_class Message class (to set as CPG header ID) - * \param[in] data Data to send - * \param[in] local What to set as host "local" value (which is never used) - * \param[in] node Cluster node to send message to - * \param[in] dest Type of message to send + * \param[in] data Data to send + * \param[in] local What to set as host "local" value (which is never used) + * \param[in] node Cluster node to send message to + * \param[in] dest Type of message to send * - * \return TRUE on success, otherwise FALSE + * \return \c true on success, or \c false otherwise */ -gboolean -send_cluster_text(enum crm_ais_msg_class msg_class, const char *data, - gboolean local, const crm_node_t *node, - enum crm_ais_msg_types dest) +static bool +send_cpg_text(const char *data, bool local, const crm_node_t *node, + enum crm_ais_msg_types dest) { + // @COMPAT Drop local argument when send_cluster_text is dropped static int msg_id = 0; static int local_pid = 0; static int local_name_len = 0; @@ -926,20 +968,11 @@ send_cluster_text(enum crm_ais_msg_class msg_class, const char *data, char *target = NULL; struct iovec *iov; pcmk__cpg_msg_t *msg = NULL; - enum crm_ais_msg_types sender = text2msg_type(crm_system_name); - switch (msg_class) { - case crm_class_cluster: - break; - default: - crm_err("Invalid message class: %d", msg_class); - return FALSE; - } - - CRM_CHECK(dest != crm_msg_ais, return FALSE); + CRM_CHECK(dest != crm_msg_ais, return false); if (local_name == NULL) { - local_name = get_local_node_name(); + local_name = pcmk__cluster_local_node_name(); } if ((local_name_len == 0) && (local_name != NULL)) { local_name_len = strlen(local_name); @@ -953,39 +986,38 @@ send_cluster_text(enum crm_ais_msg_class msg_class, const char *data, local_pid = getpid(); } - if (sender == crm_msg_none) { - sender = local_pid; - } - - msg = calloc(1, sizeof(pcmk__cpg_msg_t)); + msg = pcmk__assert_alloc(1, sizeof(pcmk__cpg_msg_t)); msg_id++; msg->id = msg_id; - msg->header.id = msg_class; + msg->header.id = crm_class_cluster; msg->header.error = CS_OK; msg->host.type = dest; msg->host.local = local; - if (node) { - if (node->uname) { - target = strdup(node->uname); + if (node != NULL) { + if (node->uname != NULL) { + target = pcmk__str_copy(node->uname); msg->host.size = strlen(node->uname); memset(msg->host.uname, 0, MAX_NAME); memcpy(msg->host.uname, node->uname, msg->host.size); + } else { target = crm_strdup_printf("%u", node->id); } msg->host.id = node->id; + } else { - target = strdup("all"); + target = pcmk__str_copy("all"); } msg->sender.id = 0; - msg->sender.type = sender; + msg->sender.type = pcmk__cluster_parse_msg_type(crm_system_name); msg->sender.pid = local_pid; msg->sender.size = local_name_len; memset(msg->sender.uname, 0, MAX_NAME); + if ((local_name != NULL) && (msg->sender.size != 0)) { memcpy(msg->sender.uname, local_name, msg->sender.size); } @@ -1000,10 +1032,9 @@ send_cluster_text(enum crm_ais_msg_class msg_class, const char *data, } else { char *compressed = NULL; unsigned int new_size = 0; - char *uncompressed = strdup(data); - if (pcmk__compress(uncompressed, (unsigned int) msg->size, 0, - &compressed, &new_size) == pcmk_rc_ok) { + if (pcmk__compress(data, (unsigned int) msg->size, 0, &compressed, + &new_size) == pcmk_rc_ok) { msg->header.size = sizeof(pcmk__cpg_msg_t) + new_size; msg = pcmk__realloc(msg, msg->header.size); @@ -1019,38 +1050,116 @@ send_cluster_text(enum crm_ais_msg_class msg_class, const char *data, memcpy(msg->data, data, msg->size); } - free(uncompressed); free(compressed); } - iov = calloc(1, sizeof(struct iovec)); + iov = pcmk__assert_alloc(1, sizeof(struct iovec)); iov->iov_base = msg; iov->iov_len = msg->header.size; - if (msg->compressed_size) { - crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s", + if (msg->compressed_size > 0) { + crm_trace("Queueing CPG message %u to %s " + "(%llu bytes, %d bytes compressed payload): %.200s", msg->id, target, (unsigned long long) iov->iov_len, msg->compressed_size, data); } else { - crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s", + crm_trace("Queueing CPG message %u to %s " + "(%llu bytes, %d bytes payload): %.200s", msg->id, target, (unsigned long long) iov->iov_len, msg->size, data); } + free(target); cs_message_queue = g_list_append(cs_message_queue, iov); crm_cs_flush(&pcmk_cpg_handle); - return TRUE; + return true; } /*! - * \brief Get the message type equivalent of a string + * \internal + * \brief Send an XML message via Corosync CPG * - * \param[in] text String of message type + * \param[in] msg XML message to send + * \param[in] node Cluster node to send message to + * \param[in] dest Type of message to send * - * \return Message type equivalent of \p text + * \return TRUE on success, otherwise FALSE */ +bool +pcmk__cpg_send_xml(const xmlNode *msg, const crm_node_t *node, + enum crm_ais_msg_types dest) +{ + bool rc = true; + GString *data = g_string_sized_new(1024); + + pcmk__xml_string(msg, 0, data, 0); + + rc = send_cpg_text(data->str, false, node, dest); + g_string_free(data, TRUE); + return rc; +} + +// Deprecated functions kept only for backward API compatibility +// LCOV_EXCL_START + +#include <crm/cluster/compat.h> + +gboolean +cluster_connect_cpg(pcmk_cluster_t *cluster) +{ + return pcmk__cpg_connect(cluster) == pcmk_rc_ok; +} + +void +cluster_disconnect_cpg(pcmk_cluster_t *cluster) +{ + pcmk__cpg_disconnect(cluster); +} + +uint32_t +get_local_nodeid(cpg_handle_t handle) +{ + return pcmk__cpg_local_nodeid(handle); +} + +void +pcmk_cpg_membership(cpg_handle_t handle, + const struct cpg_name *group_name, + const struct cpg_address *member_list, + size_t member_list_entries, + const struct cpg_address *left_list, + size_t left_list_entries, + const struct cpg_address *joined_list, + size_t joined_list_entries) +{ + pcmk__cpg_confchg_cb(handle, group_name, member_list, member_list_entries, + left_list, left_list_entries, + joined_list, joined_list_entries); +} + +gboolean +send_cluster_text(enum crm_ais_msg_class msg_class, const char *data, + gboolean local, const crm_node_t *node, + enum crm_ais_msg_types dest) +{ + switch (msg_class) { + case crm_class_cluster: + return send_cpg_text(data, local, node, dest); + default: + crm_err("Invalid message class: %d", msg_class); + return FALSE; + } +} + +char * +pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, + void *content, uint32_t *kind, const char **from) +{ + return pcmk__cpg_message_data(handle, nodeid, pid, content, kind, from); +} + enum crm_ais_msg_types text2msg_type(const char *text) { @@ -1090,3 +1199,6 @@ text2msg_type(const char *text) } return type; } + +// LCOV_EXCL_STOP +// End deprecated API |