summaryrefslogtreecommitdiffstats
path: root/lib/cluster/cpg.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-06-03 13:39:29 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-06-03 13:39:29 +0000
commitb41961d74fe7ff2d4d4abaca92454e87c561e49f (patch)
treeb34e3826a7b649dafdbd05081140c990c96d736d /lib/cluster/cpg.c
parentReleasing progress-linux version 2.1.7-1~progress7.99u1. (diff)
downloadpacemaker-b41961d74fe7ff2d4d4abaca92454e87c561e49f.tar.xz
pacemaker-b41961d74fe7ff2d4d4abaca92454e87c561e49f.zip
Merging upstream version 2.1.8~rc1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--lib/cluster/cpg.c472
1 files changed, 292 insertions, 180 deletions
diff --git a/lib/cluster/cpg.c b/lib/cluster/cpg.c
index d1decc6..62d39a6 100644
--- a/lib/cluster/cpg.c
+++ b/lib/cluster/cpg.c
@@ -1,5 +1,5 @@
/*
- * Copyright 2004-2023 the Pacemaker project contributors
+ * Copyright 2004-2024 the Pacemaker project contributors
*
* The version control history for this file may have further details.
*
@@ -8,37 +8,40 @@
*/
#include <crm_internal.h>
-#include <bzlib.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
+
#include <arpa/inet.h>
+#include <inttypes.h> // PRIu32
#include <netdb.h>
-
-#include <crm/common/ipc.h>
-#include <crm/cluster/internal.h>
-#include <crm/common/mainloop.h>
+#include <netinet/in.h>
+#include <stdbool.h>
+#include <stdint.h> // uint32_t
+#include <sys/socket.h>
+#include <sys/types.h> // size_t
#include <sys/utsname.h>
-#include <qb/qbipc_common.h>
-#include <qb/qbipcc.h>
-#include <qb/qbutil.h>
-
+#include <bzlib.h>
#include <corosync/corodefs.h>
#include <corosync/corotypes.h>
#include <corosync/hdb.h>
#include <corosync/cpg.h>
+#include <qb/qbipc_common.h>
+#include <qb/qbipcc.h>
+#include <qb/qbutil.h>
-#include <crm/msg_xml.h>
+#include <crm/cluster/internal.h>
+#include <crm/common/ipc.h>
+#include <crm/common/ipc_internal.h> // PCMK__SPECIAL_PID
+#include <crm/common/mainloop.h>
+#include <crm/common/xml.h>
-#include <crm/common/ipc_internal.h> /* PCMK__SPECIAL_PID* */
#include "crmcluster_private.h"
-/* @TODO Once we can update the public API to require crm_cluster_t* in more
+/* @TODO Once we can update the public API to require pcmk_cluster_t* in more
* functions, we can ditch this in favor of cluster->cpg_handle.
*/
static cpg_handle_t pcmk_cpg_handle = 0;
-// @TODO These could be moved to crm_cluster_t* at that time as well
+// @TODO These could be moved to pcmk_cluster_t* at that time as well
static bool cpg_evicted = false;
static GList *cs_message_queue = NULL;
static int cs_message_timer = 0;
@@ -87,26 +90,7 @@ static void crm_cs_flush(gpointer data);
} while (counter < max)
/*!
- * \brief Disconnect from Corosync CPG
- *
- * \param[in,out] cluster Cluster to disconnect
- */
-void
-cluster_disconnect_cpg(crm_cluster_t *cluster)
-{
- pcmk_cpg_handle = 0;
- if (cluster->cpg_handle) {
- crm_trace("Disconnecting CPG");
- cpg_leave(cluster->cpg_handle, &cluster->group);
- cpg_finalize(cluster->cpg_handle);
- cluster->cpg_handle = 0;
-
- } else {
- crm_info("No CPG connection");
- }
-}
-
-/*!
+ * \internal
* \brief Get the local Corosync node ID (via CPG)
*
* \param[in] handle CPG connection to use (or 0 to use new connection)
@@ -114,7 +98,7 @@ cluster_disconnect_cpg(crm_cluster_t *cluster)
* \return Corosync ID of local node (or 0 if not known)
*/
uint32_t
-get_local_nodeid(cpg_handle_t handle)
+pcmk__cpg_local_nodeid(cpg_handle_t handle)
{
cs_error_t rc = CS_OK;
int retries = 0;
@@ -125,15 +109,18 @@ get_local_nodeid(cpg_handle_t handle)
uid_t found_uid = 0;
gid_t found_gid = 0;
pid_t found_pid = 0;
- int rv;
+ int rv = 0;
- if(local_nodeid != 0) {
+ if (local_nodeid != 0) {
return local_nodeid;
}
- if(handle == 0) {
+ if (handle == 0) {
crm_trace("Creating connection");
- cs_repeat(rc, retries, 5, cpg_model_initialize(&local_handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
+ cs_repeat(rc, retries, 5,
+ cpg_model_initialize(&local_handle, CPG_MODEL_V1,
+ (cpg_model_data_t *) &cpg_model_info,
+ NULL));
if (rc != CS_OK) {
crm_err("Could not connect to the CPG API: %s (%d)",
cs_strerror(rc), rc);
@@ -147,14 +134,16 @@ get_local_nodeid(cpg_handle_t handle)
goto bail;
}
- /* CPG provider run as root (in given user namespace, anyway)? */
- if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
- &found_uid, &found_gid))) {
+ // CPG provider run as root (at least in given user namespace)?
+ rv = crm_ipc_is_authentic_process(fd, (uid_t) 0, (gid_t) 0, &found_pid,
+ &found_uid, &found_gid);
+ if (rv == 0) {
crm_err("CPG provider is not authentic:"
" process %lld (uid: %lld, gid: %lld)",
(long long) PCMK__SPECIAL_PID_AS_0(found_pid),
(long long) found_uid, (long long) found_gid);
goto bail;
+
} else if (rv < 0) {
crm_err("Could not verify authenticity of CPG provider: %s (%d)",
strerror(-rv), -rv);
@@ -174,7 +163,7 @@ get_local_nodeid(cpg_handle_t handle)
}
bail:
- if(handle == 0) {
+ if (handle == 0) {
crm_trace("Closing connection");
cpg_finalize(local_handle);
}
@@ -279,7 +268,7 @@ static int
pcmk_cpg_dispatch(gpointer user_data)
{
cs_error_t rc = CS_OK;
- crm_cluster_t *cluster = (crm_cluster_t *) user_data;
+ pcmk_cluster_t *cluster = (pcmk_cluster_t *) user_data;
rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
if (rc != CS_OK) {
@@ -422,59 +411,64 @@ check_message_sanity(const pcmk__cpg_msg_t *msg)
}
/*!
+ * \internal
* \brief Extract text data from a Corosync CPG message
*
- * \param[in] handle CPG connection (to get local node ID if not known)
- * \param[in] nodeid Corosync ID of node that sent message
- * \param[in] pid Process ID of message sender (for logging only)
- * \param[in,out] content CPG message
- * \param[out] kind If not NULL, will be set to CPG header ID
- * (which should be an enum crm_ais_msg_class value,
- * currently always crm_class_cluster)
- * \param[out] from If not NULL, will be set to sender uname
- * (valid for the lifetime of \p content)
+ * \param[in] handle CPG connection (to get local node ID if not known)
+ * \param[in] sender_id Corosync ID of node that sent message
+ * \param[in] pid Process ID of message sender (for logging only)
+ * \param[in,out] content CPG message
+ * \param[out] kind If not \c NULL, will be set to CPG header ID
+ * (which should be an <tt>enum crm_ais_msg_class</tt>
+ * value, currently always \c crm_class_cluster)
+ * \param[out] from If not \c NULL, will be set to sender uname
+ * (valid for the lifetime of \p content)
*
* \return Newly allocated string with message data
- * \note It is the caller's responsibility to free the return value with free().
+ *
+ * \note The caller is responsible for freeing the return value using \c free().
*/
char *
-pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content,
- uint32_t *kind, const char **from)
+pcmk__cpg_message_data(cpg_handle_t handle, uint32_t sender_id, uint32_t pid,
+ void *content, uint32_t *kind, const char **from)
{
char *data = NULL;
- pcmk__cpg_msg_t *msg = (pcmk__cpg_msg_t *) content;
+ pcmk__cpg_msg_t *msg = content;
- if(handle) {
+ if (handle != 0) {
// Do filtering and field massaging
- uint32_t local_nodeid = get_local_nodeid(handle);
- const char *local_name = get_local_node_name();
+ uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle);
+ const char *local_name = pcmk__cluster_local_node_name();
- if (msg->sender.id > 0 && msg->sender.id != nodeid) {
- crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id);
+ if ((msg->sender.id != 0) && (msg->sender.id != sender_id)) {
+ crm_err("Nodeid mismatch from %" PRIu32 ".%" PRIu32
+ ": claimed nodeid=%" PRIu32,
+ sender_id, pid, msg->sender.id);
return NULL;
-
- } else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
- /* Not for us */
- crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid);
+ }
+ if ((msg->host.id != 0) && (local_nodeid != msg->host.id)) {
+ crm_trace("Not for us: %" PRIu32" != %" PRIu32,
+ msg->host.id, local_nodeid);
return NULL;
- } else if (msg->host.size != 0 && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
- /* Not for us */
+ }
+ if ((msg->host.size > 0)
+ && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
+
crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
return NULL;
}
- msg->sender.id = nodeid;
+ msg->sender.id = sender_id;
if (msg->sender.size == 0) {
- crm_node_t *peer = crm_get_peer(nodeid, NULL);
-
- if (peer == NULL) {
- crm_err("Peer with nodeid=%u is unknown", nodeid);
+ const crm_node_t *peer =
+ pcmk__get_node(sender_id, NULL, NULL,
+ pcmk__node_search_cluster_member);
- } else if (peer->uname == NULL) {
- crm_err("No uname for peer with nodeid=%u", nodeid);
+ if (peer->uname == NULL) {
+ crm_err("No uname for peer with nodeid=%u", sender_id);
} else {
- crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
+ crm_notice("Fixing uname for peer with nodeid=%u", sender_id);
msg->sender.size = strlen(peer->uname);
memset(msg->sender.uname, 0, MAX_NAME);
memcpy(msg->sender.uname, peer->uname, msg->sender.size);
@@ -493,7 +487,7 @@ pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void
*from = msg->sender.uname;
}
- if (msg->is_compressed && msg->size > 0) {
+ if (msg->is_compressed && (msg->size > 0)) {
int rc = BZ_OK;
char *uncompressed = NULL;
unsigned int new_size = msg->size + 1;
@@ -503,13 +497,15 @@ pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void
}
crm_trace("Decompressing message data");
- uncompressed = calloc(1, new_size);
- rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
+ uncompressed = pcmk__assert_alloc(1, new_size);
+ rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data,
+ msg->compressed_size, 1, 0);
rc = pcmk__bzlib2rc(rc);
if (rc != pcmk_rc_ok) {
- crm_err("Decompression failed: %s " CRM_XS " rc=%d", pcmk_rc_str(rc), rc);
+ crm_err("Decompression failed: %s " CRM_XS " rc=%d",
+ pcmk_rc_str(rc), rc);
free(uncompressed);
goto badmsg;
}
@@ -526,7 +522,8 @@ pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void
}
// Is this necessary?
- crm_get_peer(msg->sender.id, msg->sender.uname);
+ pcmk__get_node(msg->sender.id, msg->sender.uname, NULL,
+ pcmk__node_search_cluster_member);
crm_trace("Payload: %.200s", data);
return data;
@@ -627,8 +624,9 @@ node_left(const char *cpg_group_name, int event_counter,
const struct cpg_address **sorted_member_list,
size_t member_list_entries)
{
- crm_node_t *peer = pcmk__search_cluster_node_cache(cpg_peer->nodeid,
- NULL, NULL);
+ crm_node_t *peer =
+ pcmk__search_node_caches(cpg_peer->nodeid, NULL,
+ pcmk__node_search_cluster_member);
const struct cpg_address **rival = NULL;
/* Most CPG-related Pacemaker code assumes that only one process on a node
@@ -656,7 +654,7 @@ node_left(const char *cpg_group_name, int event_counter,
cpgreason2str(cpg_peer->reason));
if (peer != NULL) {
crm_update_peer_proc(__func__, peer, crm_proc_cpg,
- OFFLINESTATUS);
+ PCMK_VALUE_OFFLINE);
}
} else if (cpg_peer->nodeid == local_nodeid) {
crm_warn("Group %s event %d: duplicate local pid %u left%s",
@@ -672,72 +670,81 @@ node_left(const char *cpg_group_name, int event_counter,
}
/*!
+ * \internal
* \brief Handle a CPG configuration change event
*
* \param[in] handle CPG connection
- * \param[in] cpg_name CPG group name
+ * \param[in] group_name CPG group name
* \param[in] member_list List of current CPG members
* \param[in] member_list_entries Number of entries in \p member_list
* \param[in] left_list List of CPG members that left
* \param[in] left_list_entries Number of entries in \p left_list
* \param[in] joined_list List of CPG members that joined
* \param[in] joined_list_entries Number of entries in \p joined_list
+ *
+ * \note This is of type \c cpg_confchg_fn_t, intended to be used in a
+ * \c cpg_callbacks_t object.
*/
void
-pcmk_cpg_membership(cpg_handle_t handle,
- const struct cpg_name *groupName,
- const struct cpg_address *member_list, size_t member_list_entries,
- const struct cpg_address *left_list, size_t left_list_entries,
- const struct cpg_address *joined_list, size_t joined_list_entries)
+pcmk__cpg_confchg_cb(cpg_handle_t handle,
+ const struct cpg_name *group_name,
+ const struct cpg_address *member_list,
+ size_t member_list_entries,
+ const struct cpg_address *left_list,
+ size_t left_list_entries,
+ const struct cpg_address *joined_list,
+ size_t joined_list_entries)
{
- int i;
- gboolean found = FALSE;
static int counter = 0;
- uint32_t local_nodeid = get_local_nodeid(handle);
- const struct cpg_address **sorted;
- sorted = malloc(member_list_entries * sizeof(const struct cpg_address *));
- CRM_ASSERT(sorted != NULL);
+ bool found = false;
+ uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle);
+ const struct cpg_address **sorted = NULL;
+
+ sorted = pcmk__assert_alloc(member_list_entries,
+ sizeof(const struct cpg_address *));
for (size_t iter = 0; iter < member_list_entries; iter++) {
sorted[iter] = member_list + iter;
}
- /* so that the cross-matching multiply-subscribed nodes is then cheap */
+
+ // So that the cross-matching of multiply-subscribed nodes is then cheap
qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
cmp_member_list_nodeid);
- for (i = 0; i < left_list_entries; i++) {
- node_left(groupName->value, counter, local_nodeid, &left_list[i],
+ for (int i = 0; i < left_list_entries; i++) {
+ node_left(group_name->value, counter, local_nodeid, &left_list[i],
sorted, member_list_entries);
}
free(sorted);
sorted = NULL;
- for (i = 0; i < joined_list_entries; i++) {
+ for (int i = 0; i < joined_list_entries; i++) {
crm_info("Group %s event %d: node %u pid %u joined%s",
- groupName->value, counter, joined_list[i].nodeid,
+ group_name->value, counter, joined_list[i].nodeid,
joined_list[i].pid, cpgreason2str(joined_list[i].reason));
}
- for (i = 0; i < member_list_entries; i++) {
- crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
+ for (int i = 0; i < member_list_entries; i++) {
+ crm_node_t *peer = pcmk__get_node(member_list[i].nodeid, NULL, NULL,
+ pcmk__node_search_cluster_member);
if (member_list[i].nodeid == local_nodeid
&& member_list[i].pid != getpid()) {
// See the note in node_left()
crm_warn("Group %s event %d: detected duplicate local pid %u",
- groupName->value, counter, member_list[i].pid);
+ group_name->value, counter, member_list[i].pid);
continue;
}
crm_info("Group %s event %d: %s (node %u pid %u) is member",
- groupName->value, counter, peer_name(peer),
+ group_name->value, counter, peer_name(peer),
member_list[i].nodeid, member_list[i].pid);
/* If the caller left auto-reaping enabled, this will also update the
* state to member.
*/
peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg,
- ONLINESTATUS);
+ PCMK_VALUE_ONLINE);
if (peer && peer->state && strcmp(peer->state, CRM_NODE_MEMBER)) {
/* The node is a CPG member, but we currently think it's not a
@@ -755,19 +762,20 @@ pcmk_cpg_membership(cpg_handle_t handle,
} else if (now > (peer->when_lost + 60)) {
// If it persists for more than a minute, update the state
- crm_warn("Node %u is member of group %s but was believed offline",
- member_list[i].nodeid, groupName->value);
+ crm_warn("Node %u is member of group %s but was believed "
+ "offline",
+ member_list[i].nodeid, group_name->value);
pcmk__update_peer_state(__func__, peer, CRM_NODE_MEMBER, 0);
}
}
if (local_nodeid == member_list[i].nodeid) {
- found = TRUE;
+ found = true;
}
}
if (!found) {
- crm_err("Local node was evicted from group %s", groupName->value);
+ crm_err("Local node was evicted from group %s", group_name->value);
cpg_evicted = true;
}
@@ -775,14 +783,50 @@ pcmk_cpg_membership(cpg_handle_t handle,
}
/*!
- * \brief Connect to Corosync CPG
+ * \brief Set the CPG deliver callback function for a cluster object
*
* \param[in,out] cluster Cluster object
+ * \param[in] fn Deliver callback function to set
*
- * \return TRUE on success, otherwise FALSE
+ * \return Standard Pacemaker return code
*/
-gboolean
-cluster_connect_cpg(crm_cluster_t *cluster)
+int
+pcmk_cpg_set_deliver_fn(pcmk_cluster_t *cluster, cpg_deliver_fn_t fn)
+{
+ if (cluster == NULL) {
+ return EINVAL;
+ }
+ cluster->cpg.cpg_deliver_fn = fn;
+ return pcmk_rc_ok;
+}
+
+/*!
+ * \brief Set the CPG config change callback function for a cluster object
+ *
+ * \param[in,out] cluster Cluster object
+ * \param[in] fn Configuration change callback function to set
+ *
+ * \return Standard Pacemaker return code
+ */
+int
+pcmk_cpg_set_confchg_fn(pcmk_cluster_t *cluster, cpg_confchg_fn_t fn)
+{
+ if (cluster == NULL) {
+ return EINVAL;
+ }
+ cluster->cpg.cpg_confchg_fn = fn;
+ return pcmk_rc_ok;
+}
+
+/*!
+ * \brief Connect to Corosync CPG
+ *
+ * \param[in,out] cluster Initialized cluster object to connect
+ *
+ * \return Standard Pacemaker return code
+ */
+int
+pcmk__cpg_connect(pcmk_cluster_t *cluster)
{
cs_error_t rc;
int fd = -1;
@@ -848,7 +892,7 @@ cluster_connect_cpg(crm_cluster_t *cluster)
goto bail;
}
- id = get_local_nodeid(handle);
+ id = pcmk__cpg_local_nodeid(handle);
if (id == 0) {
crm_err("Could not get local node id from the CPG API");
goto bail;
@@ -870,54 +914,52 @@ cluster_connect_cpg(crm_cluster_t *cluster)
bail:
if (rc != CS_OK) {
cpg_finalize(handle);
- return FALSE;
+ // @TODO Map rc to more specific Pacemaker return code
+ return ENOTCONN;
}
- peer = crm_get_peer(id, NULL);
- crm_update_peer_proc(__func__, peer, crm_proc_cpg, ONLINESTATUS);
- return TRUE;
+ peer = pcmk__get_node(id, NULL, NULL, pcmk__node_search_cluster_member);
+ crm_update_peer_proc(__func__, peer, crm_proc_cpg, PCMK_VALUE_ONLINE);
+ return pcmk_rc_ok;
}
/*!
* \internal
- * \brief Send an XML message via Corosync CPG
- *
- * \param[in] msg XML message to send
- * \param[in] node Cluster node to send message to
- * \param[in] dest Type of message to send
+ * \brief Disconnect from Corosync CPG
*
- * \return TRUE on success, otherwise FALSE
+ * \param[in,out] cluster Cluster object to disconnect
*/
-bool
-pcmk__cpg_send_xml(const xmlNode *msg, const crm_node_t *node,
- enum crm_ais_msg_types dest)
+void
+pcmk__cpg_disconnect(pcmk_cluster_t *cluster)
{
- bool rc = true;
- char *data = NULL;
+ pcmk_cpg_handle = 0;
+ if (cluster->cpg_handle != 0) {
+ crm_trace("Disconnecting CPG");
+ cpg_leave(cluster->cpg_handle, &cluster->group);
+ cpg_finalize(cluster->cpg_handle);
+ cluster->cpg_handle = 0;
- data = dump_xml_unformatted(msg);
- rc = send_cluster_text(crm_class_cluster, data, FALSE, node, dest);
- free(data);
- return rc;
+ } else {
+ crm_info("No CPG connection");
+ }
}
/*!
* \internal
* \brief Send string data via Corosync CPG
*
- * \param[in] msg_class Message class (to set as CPG header ID)
- * \param[in] data Data to send
- * \param[in] local What to set as host "local" value (which is never used)
- * \param[in] node Cluster node to send message to
- * \param[in] dest Type of message to send
+ * \param[in] data Data to send
+ * \param[in] local What to set as host "local" value (which is never used)
+ * \param[in] node Cluster node to send message to
+ * \param[in] dest Type of message to send
*
- * \return TRUE on success, otherwise FALSE
+ * \return \c true on success, or \c false otherwise
*/
-gboolean
-send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
- gboolean local, const crm_node_t *node,
- enum crm_ais_msg_types dest)
+static bool
+send_cpg_text(const char *data, bool local, const crm_node_t *node,
+ enum crm_ais_msg_types dest)
{
+ // @COMPAT Drop local argument when send_cluster_text is dropped
static int msg_id = 0;
static int local_pid = 0;
static int local_name_len = 0;
@@ -926,20 +968,11 @@ send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
char *target = NULL;
struct iovec *iov;
pcmk__cpg_msg_t *msg = NULL;
- enum crm_ais_msg_types sender = text2msg_type(crm_system_name);
- switch (msg_class) {
- case crm_class_cluster:
- break;
- default:
- crm_err("Invalid message class: %d", msg_class);
- return FALSE;
- }
-
- CRM_CHECK(dest != crm_msg_ais, return FALSE);
+ CRM_CHECK(dest != crm_msg_ais, return false);
if (local_name == NULL) {
- local_name = get_local_node_name();
+ local_name = pcmk__cluster_local_node_name();
}
if ((local_name_len == 0) && (local_name != NULL)) {
local_name_len = strlen(local_name);
@@ -953,39 +986,38 @@ send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
local_pid = getpid();
}
- if (sender == crm_msg_none) {
- sender = local_pid;
- }
-
- msg = calloc(1, sizeof(pcmk__cpg_msg_t));
+ msg = pcmk__assert_alloc(1, sizeof(pcmk__cpg_msg_t));
msg_id++;
msg->id = msg_id;
- msg->header.id = msg_class;
+ msg->header.id = crm_class_cluster;
msg->header.error = CS_OK;
msg->host.type = dest;
msg->host.local = local;
- if (node) {
- if (node->uname) {
- target = strdup(node->uname);
+ if (node != NULL) {
+ if (node->uname != NULL) {
+ target = pcmk__str_copy(node->uname);
msg->host.size = strlen(node->uname);
memset(msg->host.uname, 0, MAX_NAME);
memcpy(msg->host.uname, node->uname, msg->host.size);
+
} else {
target = crm_strdup_printf("%u", node->id);
}
msg->host.id = node->id;
+
} else {
- target = strdup("all");
+ target = pcmk__str_copy("all");
}
msg->sender.id = 0;
- msg->sender.type = sender;
+ msg->sender.type = pcmk__cluster_parse_msg_type(crm_system_name);
msg->sender.pid = local_pid;
msg->sender.size = local_name_len;
memset(msg->sender.uname, 0, MAX_NAME);
+
if ((local_name != NULL) && (msg->sender.size != 0)) {
memcpy(msg->sender.uname, local_name, msg->sender.size);
}
@@ -1000,10 +1032,9 @@ send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
} else {
char *compressed = NULL;
unsigned int new_size = 0;
- char *uncompressed = strdup(data);
- if (pcmk__compress(uncompressed, (unsigned int) msg->size, 0,
- &compressed, &new_size) == pcmk_rc_ok) {
+ if (pcmk__compress(data, (unsigned int) msg->size, 0, &compressed,
+ &new_size) == pcmk_rc_ok) {
msg->header.size = sizeof(pcmk__cpg_msg_t) + new_size;
msg = pcmk__realloc(msg, msg->header.size);
@@ -1019,38 +1050,116 @@ send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
memcpy(msg->data, data, msg->size);
}
- free(uncompressed);
free(compressed);
}
- iov = calloc(1, sizeof(struct iovec));
+ iov = pcmk__assert_alloc(1, sizeof(struct iovec));
iov->iov_base = msg;
iov->iov_len = msg->header.size;
- if (msg->compressed_size) {
- crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s",
+ if (msg->compressed_size > 0) {
+ crm_trace("Queueing CPG message %u to %s "
+ "(%llu bytes, %d bytes compressed payload): %.200s",
msg->id, target, (unsigned long long) iov->iov_len,
msg->compressed_size, data);
} else {
- crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s",
+ crm_trace("Queueing CPG message %u to %s "
+ "(%llu bytes, %d bytes payload): %.200s",
msg->id, target, (unsigned long long) iov->iov_len,
msg->size, data);
}
+
free(target);
cs_message_queue = g_list_append(cs_message_queue, iov);
crm_cs_flush(&pcmk_cpg_handle);
- return TRUE;
+ return true;
}
/*!
- * \brief Get the message type equivalent of a string
+ * \internal
+ * \brief Send an XML message via Corosync CPG
*
- * \param[in] text String of message type
+ * \param[in] msg XML message to send
+ * \param[in] node Cluster node to send message to
+ * \param[in] dest Type of message to send
*
- * \return Message type equivalent of \p text
+ * \return TRUE on success, otherwise FALSE
*/
+bool
+pcmk__cpg_send_xml(const xmlNode *msg, const crm_node_t *node,
+ enum crm_ais_msg_types dest)
+{
+ bool rc = true;
+ GString *data = g_string_sized_new(1024);
+
+ pcmk__xml_string(msg, 0, data, 0);
+
+ rc = send_cpg_text(data->str, false, node, dest);
+ g_string_free(data, TRUE);
+ return rc;
+}
+
+// Deprecated functions kept only for backward API compatibility
+// LCOV_EXCL_START
+
+#include <crm/cluster/compat.h>
+
+gboolean
+cluster_connect_cpg(pcmk_cluster_t *cluster)
+{
+ return pcmk__cpg_connect(cluster) == pcmk_rc_ok;
+}
+
+void
+cluster_disconnect_cpg(pcmk_cluster_t *cluster)
+{
+ pcmk__cpg_disconnect(cluster);
+}
+
+uint32_t
+get_local_nodeid(cpg_handle_t handle)
+{
+ return pcmk__cpg_local_nodeid(handle);
+}
+
+void
+pcmk_cpg_membership(cpg_handle_t handle,
+ const struct cpg_name *group_name,
+ const struct cpg_address *member_list,
+ size_t member_list_entries,
+ const struct cpg_address *left_list,
+ size_t left_list_entries,
+ const struct cpg_address *joined_list,
+ size_t joined_list_entries)
+{
+ pcmk__cpg_confchg_cb(handle, group_name, member_list, member_list_entries,
+ left_list, left_list_entries,
+ joined_list, joined_list_entries);
+}
+
+gboolean
+send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
+ gboolean local, const crm_node_t *node,
+ enum crm_ais_msg_types dest)
+{
+ switch (msg_class) {
+ case crm_class_cluster:
+ return send_cpg_text(data, local, node, dest);
+ default:
+ crm_err("Invalid message class: %d", msg_class);
+ return FALSE;
+ }
+}
+
+char *
+pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid,
+ void *content, uint32_t *kind, const char **from)
+{
+ return pcmk__cpg_message_data(handle, nodeid, pid, content, kind, from);
+}
+
enum crm_ais_msg_types
text2msg_type(const char *text)
{
@@ -1090,3 +1199,6 @@ text2msg_type(const char *text)
}
return type;
}
+
+// LCOV_EXCL_STOP
+// End deprecated API