summaryrefslogtreecommitdiffstats
path: root/lib/common/ipc_server.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 06:53:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 06:53:20 +0000
commite5a812082ae033afb1eed82c0f2df3d0f6bdc93f (patch)
treea6716c9275b4b413f6c9194798b34b91affb3cc7 /lib/common/ipc_server.c
parentInitial commit. (diff)
downloadpacemaker-e5a812082ae033afb1eed82c0f2df3d0f6bdc93f.tar.xz
pacemaker-e5a812082ae033afb1eed82c0f2df3d0f6bdc93f.zip
Adding upstream version 2.1.6.upstream/2.1.6
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'lib/common/ipc_server.c')
-rw-r--r--lib/common/ipc_server.c1008
1 files changed, 1008 insertions, 0 deletions
diff --git a/lib/common/ipc_server.c b/lib/common/ipc_server.c
new file mode 100644
index 0000000..60f20fb
--- /dev/null
+++ b/lib/common/ipc_server.c
@@ -0,0 +1,1008 @@
+/*
+ * Copyright 2004-2022 the Pacemaker project contributors
+ *
+ * The version control history for this file may have further details.
+ *
+ * This source code is licensed under the GNU Lesser General Public License
+ * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
+ */
+
+#include <crm_internal.h>
+
+#include <stdio.h>
+#include <errno.h>
+#include <bzlib.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+
+#include <crm/crm.h>
+#include <crm/msg_xml.h>
+#include <crm/common/ipc.h>
+#include <crm/common/ipc_internal.h>
+#include "crmcommon_private.h"
+
+/* Evict clients whose event queue grows this large (by default) */
+#define PCMK_IPC_DEFAULT_QUEUE_MAX 500
+
+static GHashTable *client_connections = NULL;
+
+/*!
+ * \internal
+ * \brief Count IPC clients
+ *
+ * \return Number of active IPC client connections
+ */
+guint
+pcmk__ipc_client_count(void)
+{
+ return client_connections? g_hash_table_size(client_connections) : 0;
+}
+
+/*!
+ * \internal
+ * \brief Execute a function for each active IPC client connection
+ *
+ * \param[in] func Function to call
+ * \param[in,out] user_data Pointer to pass to function
+ *
+ * \note The parameters are the same as for g_hash_table_foreach().
+ */
+void
+pcmk__foreach_ipc_client(GHFunc func, gpointer user_data)
+{
+ if ((func != NULL) && (client_connections != NULL)) {
+ g_hash_table_foreach(client_connections, func, user_data);
+ }
+}
+
+pcmk__client_t *
+pcmk__find_client(const qb_ipcs_connection_t *c)
+{
+ if (client_connections) {
+ return g_hash_table_lookup(client_connections, c);
+ }
+
+ crm_trace("No client found for %p", c);
+ return NULL;
+}
+
+pcmk__client_t *
+pcmk__find_client_by_id(const char *id)
+{
+ if ((client_connections != NULL) && (id != NULL)) {
+ gpointer key;
+ pcmk__client_t *client = NULL;
+ GHashTableIter iter;
+
+ g_hash_table_iter_init(&iter, client_connections);
+ while (g_hash_table_iter_next(&iter, &key, (gpointer *) & client)) {
+ if (strcmp(client->id, id) == 0) {
+ return client;
+ }
+ }
+ }
+ crm_trace("No client found with id='%s'", pcmk__s(id, ""));
+ return NULL;
+}
+
+/*!
+ * \internal
+ * \brief Get a client identifier for use in log messages
+ *
+ * \param[in] c Client
+ *
+ * \return Client's name, client's ID, or a string literal, as available
+ * \note This is intended to be used in format strings like "client %s".
+ */
+const char *
+pcmk__client_name(const pcmk__client_t *c)
+{
+ if (c == NULL) {
+ return "(unspecified)";
+
+ } else if (c->name != NULL) {
+ return c->name;
+
+ } else if (c->id != NULL) {
+ return c->id;
+
+ } else {
+ return "(unidentified)";
+ }
+}
+
+void
+pcmk__client_cleanup(void)
+{
+ if (client_connections != NULL) {
+ int active = g_hash_table_size(client_connections);
+
+ if (active > 0) {
+ crm_warn("Exiting with %d active IPC client%s",
+ active, pcmk__plural_s(active));
+ }
+ g_hash_table_destroy(client_connections);
+ client_connections = NULL;
+ }
+}
+
+void
+pcmk__drop_all_clients(qb_ipcs_service_t *service)
+{
+ qb_ipcs_connection_t *c = NULL;
+
+ if (service == NULL) {
+ return;
+ }
+
+ c = qb_ipcs_connection_first_get(service);
+
+ while (c != NULL) {
+ qb_ipcs_connection_t *last = c;
+
+ c = qb_ipcs_connection_next_get(service, last);
+
+ /* There really shouldn't be anyone connected at this point */
+ crm_notice("Disconnecting client %p, pid=%d...",
+ last, pcmk__client_pid(last));
+ qb_ipcs_disconnect(last);
+ qb_ipcs_connection_unref(last);
+ }
+}
+
+/*!
+ * \internal
+ * \brief Allocate a new pcmk__client_t object based on an IPC connection
+ *
+ * \param[in] c IPC connection (NULL to allocate generic client)
+ * \param[in] key Connection table key (NULL to use sane default)
+ * \param[in] uid_client UID corresponding to c (ignored if c is NULL)
+ *
+ * \return Pointer to new pcmk__client_t (or NULL on error)
+ */
+static pcmk__client_t *
+client_from_connection(qb_ipcs_connection_t *c, void *key, uid_t uid_client)
+{
+ pcmk__client_t *client = calloc(1, sizeof(pcmk__client_t));
+
+ if (client == NULL) {
+ crm_perror(LOG_ERR, "Allocating client");
+ return NULL;
+ }
+
+ if (c) {
+ client->user = pcmk__uid2username(uid_client);
+ if (client->user == NULL) {
+ client->user = strdup("#unprivileged");
+ CRM_CHECK(client->user != NULL, free(client); return NULL);
+ crm_err("Unable to enforce ACLs for user ID %d, assuming unprivileged",
+ uid_client);
+ }
+ client->ipcs = c;
+ pcmk__set_client_flags(client, pcmk__client_ipc);
+ client->pid = pcmk__client_pid(c);
+ if (key == NULL) {
+ key = c;
+ }
+ }
+
+ client->id = crm_generate_uuid();
+ if (key == NULL) {
+ key = client->id;
+ }
+ if (client_connections == NULL) {
+ crm_trace("Creating IPC client table");
+ client_connections = g_hash_table_new(g_direct_hash, g_direct_equal);
+ }
+ g_hash_table_insert(client_connections, key, client);
+ return client;
+}
+
+/*!
+ * \brief Allocate a new pcmk__client_t object and generate its ID
+ *
+ * \param[in] key What to use as connections hash table key (NULL to use ID)
+ *
+ * \return Pointer to new pcmk__client_t (asserts on failure)
+ */
+pcmk__client_t *
+pcmk__new_unauth_client(void *key)
+{
+ pcmk__client_t *client = client_from_connection(NULL, key, 0);
+
+ CRM_ASSERT(client != NULL);
+ return client;
+}
+
+pcmk__client_t *
+pcmk__new_client(qb_ipcs_connection_t *c, uid_t uid_client, gid_t gid_client)
+{
+ gid_t uid_cluster = 0;
+ gid_t gid_cluster = 0;
+
+ pcmk__client_t *client = NULL;
+
+ CRM_CHECK(c != NULL, return NULL);
+
+ if (pcmk_daemon_user(&uid_cluster, &gid_cluster) < 0) {
+ static bool need_log = TRUE;
+
+ if (need_log) {
+ crm_warn("Could not find user and group IDs for user %s",
+ CRM_DAEMON_USER);
+ need_log = FALSE;
+ }
+ }
+
+ if (uid_client != 0) {
+ crm_trace("Giving group %u access to new IPC connection", gid_cluster);
+ /* Passing -1 to chown(2) means don't change */
+ qb_ipcs_connection_auth_set(c, -1, gid_cluster, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
+ }
+
+ /* TODO: Do our own auth checking, return NULL if unauthorized */
+ client = client_from_connection(c, NULL, uid_client);
+ if (client == NULL) {
+ return NULL;
+ }
+
+ if ((uid_client == 0) || (uid_client == uid_cluster)) {
+ /* Remember when a connection came from root or hacluster */
+ pcmk__set_client_flags(client, pcmk__client_privileged);
+ }
+
+ crm_debug("New IPC client %s for PID %u with uid %d and gid %d",
+ client->id, client->pid, uid_client, gid_client);
+ return client;
+}
+
+static struct iovec *
+pcmk__new_ipc_event(void)
+{
+ struct iovec *iov = calloc(2, sizeof(struct iovec));
+
+ CRM_ASSERT(iov != NULL);
+ return iov;
+}
+
+/*!
+ * \brief Free an I/O vector created by pcmk__ipc_prepare_iov()
+ *
+ * \param[in,out] event I/O vector to free
+ */
+void
+pcmk_free_ipc_event(struct iovec *event)
+{
+ if (event != NULL) {
+ free(event[0].iov_base);
+ free(event[1].iov_base);
+ free(event);
+ }
+}
+
+static void
+free_event(gpointer data)
+{
+ pcmk_free_ipc_event((struct iovec *) data);
+}
+
+static void
+add_event(pcmk__client_t *c, struct iovec *iov)
+{
+ if (c->event_queue == NULL) {
+ c->event_queue = g_queue_new();
+ }
+ g_queue_push_tail(c->event_queue, iov);
+}
+
+void
+pcmk__free_client(pcmk__client_t *c)
+{
+ if (c == NULL) {
+ return;
+ }
+
+ if (client_connections) {
+ if (c->ipcs) {
+ crm_trace("Destroying %p/%p (%d remaining)",
+ c, c->ipcs, g_hash_table_size(client_connections) - 1);
+ g_hash_table_remove(client_connections, c->ipcs);
+
+ } else {
+ crm_trace("Destroying remote connection %p (%d remaining)",
+ c, g_hash_table_size(client_connections) - 1);
+ g_hash_table_remove(client_connections, c->id);
+ }
+ }
+
+ if (c->event_timer) {
+ g_source_remove(c->event_timer);
+ }
+
+ if (c->event_queue) {
+ crm_debug("Destroying %d events", g_queue_get_length(c->event_queue));
+ g_queue_free_full(c->event_queue, free_event);
+ }
+
+ free(c->id);
+ free(c->name);
+ free(c->user);
+ if (c->remote) {
+ if (c->remote->auth_timeout) {
+ g_source_remove(c->remote->auth_timeout);
+ }
+ free(c->remote->buffer);
+ free(c->remote);
+ }
+ free(c);
+}
+
+/*!
+ * \internal
+ * \brief Raise IPC eviction threshold for a client, if allowed
+ *
+ * \param[in,out] client Client to modify
+ * \param[in] qmax New threshold (as non-NULL string)
+ *
+ * \return true if change was allowed, false otherwise
+ */
+bool
+pcmk__set_client_queue_max(pcmk__client_t *client, const char *qmax)
+{
+ if (pcmk_is_set(client->flags, pcmk__client_privileged)) {
+ long long qmax_ll;
+
+ if ((pcmk__scan_ll(qmax, &qmax_ll, 0LL) == pcmk_rc_ok)
+ && (qmax_ll > 0LL) && (qmax_ll <= UINT_MAX)) {
+ client->queue_max = (unsigned int) qmax_ll;
+ return true;
+ }
+ }
+ return false;
+}
+
+int
+pcmk__client_pid(qb_ipcs_connection_t *c)
+{
+ struct qb_ipcs_connection_stats stats;
+
+ stats.client_pid = 0;
+ qb_ipcs_connection_stats_get(c, &stats, 0);
+ return stats.client_pid;
+}
+
+/*!
+ * \internal
+ * \brief Retrieve message XML from data read from client IPC
+ *
+ * \param[in,out] c IPC client connection
+ * \param[in] data Data read from client connection
+ * \param[out] id Where to store message ID from libqb header
+ * \param[out] flags Where to store flags from libqb header
+ *
+ * \return Message XML on success, NULL otherwise
+ */
+xmlNode *
+pcmk__client_data2xml(pcmk__client_t *c, void *data, uint32_t *id,
+ uint32_t *flags)
+{
+ xmlNode *xml = NULL;
+ char *uncompressed = NULL;
+ char *text = ((char *)data) + sizeof(pcmk__ipc_header_t);
+ pcmk__ipc_header_t *header = data;
+
+ if (!pcmk__valid_ipc_header(header)) {
+ return NULL;
+ }
+
+ if (id) {
+ *id = ((struct qb_ipc_response_header *)data)->id;
+ }
+ if (flags) {
+ *flags = header->flags;
+ }
+
+ if (pcmk_is_set(header->flags, crm_ipc_proxied)) {
+ /* Mark this client as being the endpoint of a proxy connection.
+ * Proxy connections responses are sent on the event channel, to avoid
+ * blocking the controller serving as proxy.
+ */
+ pcmk__set_client_flags(c, pcmk__client_proxied);
+ }
+
+ if (header->size_compressed) {
+ int rc = 0;
+ unsigned int size_u = 1 + header->size_uncompressed;
+ uncompressed = calloc(1, size_u);
+
+ crm_trace("Decompressing message data %u bytes into %u bytes",
+ header->size_compressed, size_u);
+
+ rc = BZ2_bzBuffToBuffDecompress(uncompressed, &size_u, text, header->size_compressed, 1, 0);
+ text = uncompressed;
+
+ if (rc != BZ_OK) {
+ crm_err("Decompression failed: %s " CRM_XS " bzerror=%d",
+ bz2_strerror(rc), rc);
+ free(uncompressed);
+ return NULL;
+ }
+ }
+
+ CRM_ASSERT(text[header->size_uncompressed - 1] == 0);
+
+ xml = string2xml(text);
+ crm_log_xml_trace(xml, "[IPC received]");
+
+ free(uncompressed);
+ return xml;
+}
+
+static int crm_ipcs_flush_events(pcmk__client_t *c);
+
+static gboolean
+crm_ipcs_flush_events_cb(gpointer data)
+{
+ pcmk__client_t *c = data;
+
+ c->event_timer = 0;
+ crm_ipcs_flush_events(c);
+ return FALSE;
+}
+
+/*!
+ * \internal
+ * \brief Add progressive delay before next event queue flush
+ *
+ * \param[in,out] c Client connection to add delay to
+ * \param[in] queue_len Current event queue length
+ */
+static inline void
+delay_next_flush(pcmk__client_t *c, unsigned int queue_len)
+{
+ /* Delay a maximum of 1.5 seconds */
+ guint delay = (queue_len < 5)? (1000 + 100 * queue_len) : 1500;
+
+ c->event_timer = g_timeout_add(delay, crm_ipcs_flush_events_cb, c);
+}
+
+/*!
+ * \internal
+ * \brief Send client any messages in its queue
+ *
+ * \param[in,out] c Client to flush
+ *
+ * \return Standard Pacemaker return value
+ */
+static int
+crm_ipcs_flush_events(pcmk__client_t *c)
+{
+ int rc = pcmk_rc_ok;
+ ssize_t qb_rc = 0;
+ unsigned int sent = 0;
+ unsigned int queue_len = 0;
+
+ if (c == NULL) {
+ return rc;
+
+ } else if (c->event_timer) {
+ /* There is already a timer, wait until it goes off */
+ crm_trace("Timer active for %p - %d", c->ipcs, c->event_timer);
+ return rc;
+ }
+
+ if (c->event_queue) {
+ queue_len = g_queue_get_length(c->event_queue);
+ }
+ while (sent < 100) {
+ pcmk__ipc_header_t *header = NULL;
+ struct iovec *event = NULL;
+
+ if (c->event_queue) {
+ // We don't pop unless send is successful
+ event = g_queue_peek_head(c->event_queue);
+ }
+ if (event == NULL) { // Queue is empty
+ break;
+ }
+
+ qb_rc = qb_ipcs_event_sendv(c->ipcs, event, 2);
+ if (qb_rc < 0) {
+ rc = (int) -qb_rc;
+ break;
+ }
+ event = g_queue_pop_head(c->event_queue);
+
+ sent++;
+ header = event[0].iov_base;
+ if (header->size_compressed) {
+ crm_trace("Event %d to %p[%d] (%lld compressed bytes) sent",
+ header->qb.id, c->ipcs, c->pid, (long long) qb_rc);
+ } else {
+ crm_trace("Event %d to %p[%d] (%lld bytes) sent: %.120s",
+ header->qb.id, c->ipcs, c->pid, (long long) qb_rc,
+ (char *) (event[1].iov_base));
+ }
+ pcmk_free_ipc_event(event);
+ }
+
+ queue_len -= sent;
+ if (sent > 0 || queue_len) {
+ crm_trace("Sent %d events (%d remaining) for %p[%d]: %s (%lld)",
+ sent, queue_len, c->ipcs, c->pid,
+ pcmk_rc_str(rc), (long long) qb_rc);
+ }
+
+ if (queue_len) {
+
+ /* Allow clients to briefly fall behind on processing incoming messages,
+ * but drop completely unresponsive clients so the connection doesn't
+ * consume resources indefinitely.
+ */
+ if (queue_len > QB_MAX(c->queue_max, PCMK_IPC_DEFAULT_QUEUE_MAX)) {
+ if ((c->queue_backlog <= 1) || (queue_len < c->queue_backlog)) {
+ /* Don't evict for a new or shrinking backlog */
+ crm_warn("Client with process ID %u has a backlog of %u messages "
+ CRM_XS " %p", c->pid, queue_len, c->ipcs);
+ } else {
+ crm_err("Evicting client with process ID %u due to backlog of %u messages "
+ CRM_XS " %p", c->pid, queue_len, c->ipcs);
+ c->queue_backlog = 0;
+ qb_ipcs_disconnect(c->ipcs);
+ return rc;
+ }
+ }
+
+ c->queue_backlog = queue_len;
+ delay_next_flush(c, queue_len);
+
+ } else {
+ /* Event queue is empty, there is no backlog */
+ c->queue_backlog = 0;
+ }
+
+ return rc;
+}
+
+/*!
+ * \internal
+ * \brief Create an I/O vector for sending an IPC XML message
+ *
+ * \param[in] request Identifier for libqb response header
+ * \param[in,out] message XML message to send
+ * \param[in] max_send_size If 0, default IPC buffer size is used
+ * \param[out] result Where to store prepared I/O vector
+ * \param[out] bytes Size of prepared data in bytes
+ *
+ * \return Standard Pacemaker return code
+ */
+int
+pcmk__ipc_prepare_iov(uint32_t request, xmlNode *message,
+ uint32_t max_send_size, struct iovec **result,
+ ssize_t *bytes)
+{
+ static unsigned int biggest = 0;
+ struct iovec *iov;
+ unsigned int total = 0;
+ char *compressed = NULL;
+ char *buffer = NULL;
+ pcmk__ipc_header_t *header = NULL;
+
+ if ((message == NULL) || (result == NULL)) {
+ return EINVAL;
+ }
+
+ header = calloc(1, sizeof(pcmk__ipc_header_t));
+ if (header == NULL) {
+ return ENOMEM; /* errno mightn't be set by allocator */
+ }
+
+ buffer = dump_xml_unformatted(message);
+
+ if (max_send_size == 0) {
+ max_send_size = crm_ipc_default_buffer_size();
+ }
+ CRM_LOG_ASSERT(max_send_size != 0);
+
+ *result = NULL;
+ iov = pcmk__new_ipc_event();
+ iov[0].iov_len = sizeof(pcmk__ipc_header_t);
+ iov[0].iov_base = header;
+
+ header->version = PCMK__IPC_VERSION;
+ header->size_uncompressed = 1 + strlen(buffer);
+ total = iov[0].iov_len + header->size_uncompressed;
+
+ if (total < max_send_size) {
+ iov[1].iov_base = buffer;
+ iov[1].iov_len = header->size_uncompressed;
+
+ } else {
+ unsigned int new_size = 0;
+
+ if (pcmk__compress(buffer, (unsigned int) header->size_uncompressed,
+ (unsigned int) max_send_size, &compressed,
+ &new_size) == pcmk_rc_ok) {
+
+ pcmk__set_ipc_flags(header->flags, "send data", crm_ipc_compressed);
+ header->size_compressed = new_size;
+
+ iov[1].iov_len = header->size_compressed;
+ iov[1].iov_base = compressed;
+
+ free(buffer);
+
+ biggest = QB_MAX(header->size_compressed, biggest);
+
+ } else {
+ crm_log_xml_trace(message, "EMSGSIZE");
+ biggest = QB_MAX(header->size_uncompressed, biggest);
+
+ crm_err("Could not compress %u-byte message into less than IPC "
+ "limit of %u bytes; set PCMK_ipc_buffer to higher value "
+ "(%u bytes suggested)",
+ header->size_uncompressed, max_send_size, 4 * biggest);
+
+ free(compressed);
+ free(buffer);
+ pcmk_free_ipc_event(iov);
+ return EMSGSIZE;
+ }
+ }
+
+ header->qb.size = iov[0].iov_len + iov[1].iov_len;
+ header->qb.id = (int32_t)request; /* Replying to a specific request */
+
+ *result = iov;
+ CRM_ASSERT(header->qb.size > 0);
+ if (bytes != NULL) {
+ *bytes = header->qb.size;
+ }
+ return pcmk_rc_ok;
+}
+
+int
+pcmk__ipc_send_iov(pcmk__client_t *c, struct iovec *iov, uint32_t flags)
+{
+ int rc = pcmk_rc_ok;
+ static uint32_t id = 1;
+ pcmk__ipc_header_t *header = iov[0].iov_base;
+
+ if (c->flags & pcmk__client_proxied) {
+ /* _ALL_ replies to proxied connections need to be sent as events */
+ if (!pcmk_is_set(flags, crm_ipc_server_event)) {
+ /* The proxied flag lets us know this was originally meant to be a
+ * response, even though we're sending it over the event channel.
+ */
+ pcmk__set_ipc_flags(flags, "server event",
+ crm_ipc_server_event
+ |crm_ipc_proxied_relay_response);
+ }
+ }
+
+ pcmk__set_ipc_flags(header->flags, "server event", flags);
+ if (flags & crm_ipc_server_event) {
+ header->qb.id = id++; /* We don't really use it, but doesn't hurt to set one */
+
+ if (flags & crm_ipc_server_free) {
+ crm_trace("Sending the original to %p[%d]", c->ipcs, c->pid);
+ add_event(c, iov);
+
+ } else {
+ struct iovec *iov_copy = pcmk__new_ipc_event();
+
+ crm_trace("Sending a copy to %p[%d]", c->ipcs, c->pid);
+ iov_copy[0].iov_len = iov[0].iov_len;
+ iov_copy[0].iov_base = malloc(iov[0].iov_len);
+ memcpy(iov_copy[0].iov_base, iov[0].iov_base, iov[0].iov_len);
+
+ iov_copy[1].iov_len = iov[1].iov_len;
+ iov_copy[1].iov_base = malloc(iov[1].iov_len);
+ memcpy(iov_copy[1].iov_base, iov[1].iov_base, iov[1].iov_len);
+
+ add_event(c, iov_copy);
+ }
+
+ } else {
+ ssize_t qb_rc;
+
+ CRM_LOG_ASSERT(header->qb.id != 0); /* Replying to a specific request */
+
+ qb_rc = qb_ipcs_response_sendv(c->ipcs, iov, 2);
+ if (qb_rc < header->qb.size) {
+ if (qb_rc < 0) {
+ rc = (int) -qb_rc;
+ }
+ crm_notice("Response %d to pid %d failed: %s "
+ CRM_XS " bytes=%u rc=%lld ipcs=%p",
+ header->qb.id, c->pid, pcmk_rc_str(rc),
+ header->qb.size, (long long) qb_rc, c->ipcs);
+
+ } else {
+ crm_trace("Response %d sent, %lld bytes to %p[%d]",
+ header->qb.id, (long long) qb_rc, c->ipcs, c->pid);
+ }
+
+ if (flags & crm_ipc_server_free) {
+ pcmk_free_ipc_event(iov);
+ }
+ }
+
+ if (flags & crm_ipc_server_event) {
+ rc = crm_ipcs_flush_events(c);
+ } else {
+ crm_ipcs_flush_events(c);
+ }
+
+ if ((rc == EPIPE) || (rc == ENOTCONN)) {
+ crm_trace("Client %p disconnected", c->ipcs);
+ }
+ return rc;
+}
+
+int
+pcmk__ipc_send_xml(pcmk__client_t *c, uint32_t request, xmlNode *message,
+ uint32_t flags)
+{
+ struct iovec *iov = NULL;
+ int rc = pcmk_rc_ok;
+
+ if (c == NULL) {
+ return EINVAL;
+ }
+ rc = pcmk__ipc_prepare_iov(request, message, crm_ipc_default_buffer_size(),
+ &iov, NULL);
+ if (rc == pcmk_rc_ok) {
+ pcmk__set_ipc_flags(flags, "send data", crm_ipc_server_free);
+ rc = pcmk__ipc_send_iov(c, iov, flags);
+ } else {
+ pcmk_free_ipc_event(iov);
+ crm_notice("IPC message to pid %d failed: %s " CRM_XS " rc=%d",
+ c->pid, pcmk_rc_str(rc), rc);
+ }
+ return rc;
+}
+
+/*!
+ * \internal
+ * \brief Create an acknowledgement with a status code to send to a client
+ *
+ * \param[in] function Calling function
+ * \param[in] line Source file line within calling function
+ * \param[in] flags IPC flags to use when sending
+ * \param[in] tag Element name to use for acknowledgement
+ * \param[in] ver IPC protocol version (can be NULL)
+ * \param[in] status Exit status code to add to ack
+ *
+ * \return Newly created XML for ack
+ * \note The caller is responsible for freeing the return value with free_xml().
+ */
+xmlNode *
+pcmk__ipc_create_ack_as(const char *function, int line, uint32_t flags,
+ const char *tag, const char *ver, crm_exit_t status)
+{
+ xmlNode *ack = NULL;
+
+ if (pcmk_is_set(flags, crm_ipc_client_response)) {
+ ack = create_xml_node(NULL, tag);
+ crm_xml_add(ack, "function", function);
+ crm_xml_add_int(ack, "line", line);
+ crm_xml_add_int(ack, "status", (int) status);
+ crm_xml_add(ack, PCMK__XA_IPC_PROTO_VERSION, ver);
+ }
+ return ack;
+}
+
+/*!
+ * \internal
+ * \brief Send an acknowledgement with a status code to a client
+ *
+ * \param[in] function Calling function
+ * \param[in] line Source file line within calling function
+ * \param[in] c Client to send ack to
+ * \param[in] request Request ID being replied to
+ * \param[in] flags IPC flags to use when sending
+ * \param[in] tag Element name to use for acknowledgement
+ * \param[in] ver IPC protocol version (can be NULL)
+ * \param[in] status Status code to send with acknowledgement
+ *
+ * \return Standard Pacemaker return code
+ */
+int
+pcmk__ipc_send_ack_as(const char *function, int line, pcmk__client_t *c,
+ uint32_t request, uint32_t flags, const char *tag,
+ const char *ver, crm_exit_t status)
+{
+ int rc = pcmk_rc_ok;
+ xmlNode *ack = pcmk__ipc_create_ack_as(function, line, flags, tag, ver, status);
+
+ if (ack != NULL) {
+ crm_trace("Ack'ing IPC message from client %s as <%s status=%d>",
+ pcmk__client_name(c), tag, status);
+ c->request_id = 0;
+ rc = pcmk__ipc_send_xml(c, request, ack, flags);
+ free_xml(ack);
+ }
+ return rc;
+}
+
+/*!
+ * \internal
+ * \brief Add an IPC server to the main loop for the pacemaker-based API
+ *
+ * \param[out] ipcs_ro New IPC server for read-only pacemaker-based API
+ * \param[out] ipcs_rw New IPC server for read/write pacemaker-based API
+ * \param[out] ipcs_shm New IPC server for shared-memory pacemaker-based API
+ * \param[in] ro_cb IPC callbacks for read-only API
+ * \param[in] rw_cb IPC callbacks for read/write and shared-memory APIs
+ *
+ * \note This function exits fatally if unable to create the servers.
+ */
+void pcmk__serve_based_ipc(qb_ipcs_service_t **ipcs_ro,
+ qb_ipcs_service_t **ipcs_rw,
+ qb_ipcs_service_t **ipcs_shm,
+ struct qb_ipcs_service_handlers *ro_cb,
+ struct qb_ipcs_service_handlers *rw_cb)
+{
+ *ipcs_ro = mainloop_add_ipc_server(PCMK__SERVER_BASED_RO,
+ QB_IPC_NATIVE, ro_cb);
+
+ *ipcs_rw = mainloop_add_ipc_server(PCMK__SERVER_BASED_RW,
+ QB_IPC_NATIVE, rw_cb);
+
+ *ipcs_shm = mainloop_add_ipc_server(PCMK__SERVER_BASED_SHM,
+ QB_IPC_SHM, rw_cb);
+
+ if (*ipcs_ro == NULL || *ipcs_rw == NULL || *ipcs_shm == NULL) {
+ crm_err("Failed to create the CIB manager: exiting and inhibiting respawn");
+ crm_warn("Verify pacemaker and pacemaker_remote are not both enabled");
+ crm_exit(CRM_EX_FATAL);
+ }
+}
+
+/*!
+ * \internal
+ * \brief Destroy IPC servers for pacemaker-based API
+ *
+ * \param[out] ipcs_ro IPC server for read-only pacemaker-based API
+ * \param[out] ipcs_rw IPC server for read/write pacemaker-based API
+ * \param[out] ipcs_shm IPC server for shared-memory pacemaker-based API
+ *
+ * \note This is a convenience function for calling qb_ipcs_destroy() for each
+ * argument.
+ */
+void
+pcmk__stop_based_ipc(qb_ipcs_service_t *ipcs_ro,
+ qb_ipcs_service_t *ipcs_rw,
+ qb_ipcs_service_t *ipcs_shm)
+{
+ qb_ipcs_destroy(ipcs_ro);
+ qb_ipcs_destroy(ipcs_rw);
+ qb_ipcs_destroy(ipcs_shm);
+}
+
+/*!
+ * \internal
+ * \brief Add an IPC server to the main loop for the pacemaker-controld API
+ *
+ * \param[in] cb IPC callbacks
+ *
+ * \return Newly created IPC server
+ */
+qb_ipcs_service_t *
+pcmk__serve_controld_ipc(struct qb_ipcs_service_handlers *cb)
+{
+ return mainloop_add_ipc_server(CRM_SYSTEM_CRMD, QB_IPC_NATIVE, cb);
+}
+
+/*!
+ * \internal
+ * \brief Add an IPC server to the main loop for the pacemaker-attrd API
+ *
+ * \param[out] ipcs Where to store newly created IPC server
+ * \param[in] cb IPC callbacks
+ *
+ * \note This function exits fatally if unable to create the servers.
+ */
+void
+pcmk__serve_attrd_ipc(qb_ipcs_service_t **ipcs,
+ struct qb_ipcs_service_handlers *cb)
+{
+ *ipcs = mainloop_add_ipc_server(T_ATTRD, QB_IPC_NATIVE, cb);
+
+ if (*ipcs == NULL) {
+ crm_err("Failed to create pacemaker-attrd server: exiting and inhibiting respawn");
+ crm_warn("Verify pacemaker and pacemaker_remote are not both enabled.");
+ crm_exit(CRM_EX_FATAL);
+ }
+}
+
+/*!
+ * \internal
+ * \brief Add an IPC server to the main loop for the pacemaker-fenced API
+ *
+ * \param[out] ipcs Where to store newly created IPC server
+ * \param[in] cb IPC callbacks
+ *
+ * \note This function exits fatally if unable to create the servers.
+ */
+void
+pcmk__serve_fenced_ipc(qb_ipcs_service_t **ipcs,
+ struct qb_ipcs_service_handlers *cb)
+{
+ *ipcs = mainloop_add_ipc_server_with_prio("stonith-ng", QB_IPC_NATIVE, cb,
+ QB_LOOP_HIGH);
+
+ if (*ipcs == NULL) {
+ crm_err("Failed to create fencer: exiting and inhibiting respawn.");
+ crm_warn("Verify pacemaker and pacemaker_remote are not both enabled.");
+ crm_exit(CRM_EX_FATAL);
+ }
+}
+
+/*!
+ * \internal
+ * \brief Add an IPC server to the main loop for the pacemakerd API
+ *
+ * \param[out] ipcs Where to store newly created IPC server
+ * \param[in] cb IPC callbacks
+ *
+ * \note This function exits with CRM_EX_OSERR if unable to create the servers.
+ */
+void
+pcmk__serve_pacemakerd_ipc(qb_ipcs_service_t **ipcs,
+ struct qb_ipcs_service_handlers *cb)
+{
+ *ipcs = mainloop_add_ipc_server(CRM_SYSTEM_MCP, QB_IPC_NATIVE, cb);
+
+ if (*ipcs == NULL) {
+ crm_err("Couldn't start pacemakerd IPC server");
+ crm_warn("Verify pacemaker and pacemaker_remote are not both enabled.");
+ /* sub-daemons are observed by pacemakerd. Thus we exit CRM_EX_FATAL
+ * if we want to prevent pacemakerd from restarting them.
+ * With pacemakerd we leave the exit-code shown to e.g. systemd
+ * to what it was prior to moving the code here from pacemakerd.c
+ */
+ crm_exit(CRM_EX_OSERR);
+ }
+}
+
+/*!
+ * \internal
+ * \brief Add an IPC server to the main loop for the pacemaker-schedulerd API
+ *
+ * \param[in] cb IPC callbacks
+ *
+ * \return Newly created IPC server
+ * \note This function exits fatally if unable to create the servers.
+ */
+qb_ipcs_service_t *
+pcmk__serve_schedulerd_ipc(struct qb_ipcs_service_handlers *cb)
+{
+ return mainloop_add_ipc_server(CRM_SYSTEM_PENGINE, QB_IPC_NATIVE, cb);
+}
+
+/*!
+ * \brief Check whether string represents a client name used by cluster daemons
+ *
+ * \param[in] name String to check
+ *
+ * \return true if name is standard client name used by daemons, false otherwise
+ *
+ * \note This is provided by the client, and so cannot be used by itself as a
+ * secure means of authentication.
+ */
+bool
+crm_is_daemon_name(const char *name)
+{
+ name = pcmk__message_name(name);
+ return (!strcmp(name, CRM_SYSTEM_CRMD)
+ || !strcmp(name, CRM_SYSTEM_STONITHD)
+ || !strcmp(name, "stonith-ng")
+ || !strcmp(name, "attrd")
+ || !strcmp(name, CRM_SYSTEM_CIB)
+ || !strcmp(name, CRM_SYSTEM_MCP)
+ || !strcmp(name, CRM_SYSTEM_DC)
+ || !strcmp(name, CRM_SYSTEM_TENGINE)
+ || !strcmp(name, CRM_SYSTEM_LRMD));
+}