diff options
Diffstat (limited to '')
-rw-r--r-- | lib/common/remote.c | 1270 |
1 files changed, 1270 insertions, 0 deletions
diff --git a/lib/common/remote.c b/lib/common/remote.c new file mode 100644 index 0000000..8c5969a --- /dev/null +++ b/lib/common/remote.c @@ -0,0 +1,1270 @@ +/* + * Copyright 2008-2022 the Pacemaker project contributors + * + * The version control history for this file may have further details. + * + * This source code is licensed under the GNU Lesser General Public License + * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY. + */ + +#include <crm_internal.h> +#include <crm/crm.h> + +#include <sys/param.h> +#include <stdio.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <unistd.h> +#include <sys/socket.h> +#include <arpa/inet.h> +#include <netinet/in.h> +#include <netinet/ip.h> +#include <netinet/tcp.h> +#include <netdb.h> +#include <stdlib.h> +#include <errno.h> +#include <inttypes.h> // PRIx32 + +#include <glib.h> +#include <bzlib.h> + +#include <crm/common/ipc_internal.h> +#include <crm/common/xml.h> +#include <crm/common/mainloop.h> +#include <crm/common/remote_internal.h> + +#ifdef HAVE_GNUTLS_GNUTLS_H +# include <gnutls/gnutls.h> +#endif + +/* Swab macros from linux/swab.h */ +#ifdef HAVE_LINUX_SWAB_H +# include <linux/swab.h> +#else +/* + * casts are necessary for constants, because we never know how for sure + * how U/UL/ULL map to __u16, __u32, __u64. At least not in a portable way. + */ +#define __swab16(x) ((uint16_t)( \ + (((uint16_t)(x) & (uint16_t)0x00ffU) << 8) | \ + (((uint16_t)(x) & (uint16_t)0xff00U) >> 8))) + +#define __swab32(x) ((uint32_t)( \ + (((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) | \ + (((uint32_t)(x) & (uint32_t)0x0000ff00UL) << 8) | \ + (((uint32_t)(x) & (uint32_t)0x00ff0000UL) >> 8) | \ + (((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24))) + +#define __swab64(x) ((uint64_t)( \ + (((uint64_t)(x) & (uint64_t)0x00000000000000ffULL) << 56) | \ + (((uint64_t)(x) & (uint64_t)0x000000000000ff00ULL) << 40) | \ + (((uint64_t)(x) & (uint64_t)0x0000000000ff0000ULL) << 24) | \ + (((uint64_t)(x) & (uint64_t)0x00000000ff000000ULL) << 8) | \ + (((uint64_t)(x) & (uint64_t)0x000000ff00000000ULL) >> 8) | \ + (((uint64_t)(x) & (uint64_t)0x0000ff0000000000ULL) >> 24) | \ + (((uint64_t)(x) & (uint64_t)0x00ff000000000000ULL) >> 40) | \ + (((uint64_t)(x) & (uint64_t)0xff00000000000000ULL) >> 56))) +#endif + +#define REMOTE_MSG_VERSION 1 +#define ENDIAN_LOCAL 0xBADADBBD + +struct remote_header_v0 { + uint32_t endian; /* Detect messages from hosts with different endian-ness */ + uint32_t version; + uint64_t id; + uint64_t flags; + uint32_t size_total; + uint32_t payload_offset; + uint32_t payload_compressed; + uint32_t payload_uncompressed; + + /* New fields get added here */ + +} __attribute__ ((packed)); + +/*! + * \internal + * \brief Retrieve remote message header, in local endianness + * + * Return a pointer to the header portion of a remote connection's message + * buffer, converting the header to local endianness if needed. + * + * \param[in,out] remote Remote connection with new message + * + * \return Pointer to message header, localized if necessary + */ +static struct remote_header_v0 * +localized_remote_header(pcmk__remote_t *remote) +{ + struct remote_header_v0 *header = (struct remote_header_v0 *)remote->buffer; + if(remote->buffer_offset < sizeof(struct remote_header_v0)) { + return NULL; + + } else if(header->endian != ENDIAN_LOCAL) { + uint32_t endian = __swab32(header->endian); + + CRM_LOG_ASSERT(endian == ENDIAN_LOCAL); + if(endian != ENDIAN_LOCAL) { + crm_err("Invalid message detected, endian mismatch: %" PRIx32 + " is neither %" PRIx32 " nor the swab'd %" PRIx32, + ENDIAN_LOCAL, header->endian, endian); + return NULL; + } + + header->id = __swab64(header->id); + header->flags = __swab64(header->flags); + header->endian = __swab32(header->endian); + + header->version = __swab32(header->version); + header->size_total = __swab32(header->size_total); + header->payload_offset = __swab32(header->payload_offset); + header->payload_compressed = __swab32(header->payload_compressed); + header->payload_uncompressed = __swab32(header->payload_uncompressed); + } + + return header; +} + +#ifdef HAVE_GNUTLS_GNUTLS_H + +int +pcmk__tls_client_handshake(pcmk__remote_t *remote, int timeout_ms) +{ + int rc = 0; + int pollrc = 0; + time_t time_limit = time(NULL) + timeout_ms / 1000; + + do { + rc = gnutls_handshake(*remote->tls_session); + if ((rc == GNUTLS_E_INTERRUPTED) || (rc == GNUTLS_E_AGAIN)) { + pollrc = pcmk__remote_ready(remote, 1000); + if ((pollrc != pcmk_rc_ok) && (pollrc != ETIME)) { + /* poll returned error, there is no hope */ + crm_trace("TLS handshake poll failed: %s (%d)", + pcmk_strerror(pollrc), pollrc); + return pcmk_legacy2rc(pollrc); + } + } else if (rc < 0) { + crm_trace("TLS handshake failed: %s (%d)", + gnutls_strerror(rc), rc); + return EPROTO; + } else { + return pcmk_rc_ok; + } + } while (time(NULL) < time_limit); + return ETIME; +} + +/*! + * \internal + * \brief Set minimum prime size required by TLS client + * + * \param[in] session TLS session to affect + */ +static void +set_minimum_dh_bits(const gnutls_session_t *session) +{ + int dh_min_bits; + + pcmk__scan_min_int(getenv("PCMK_dh_min_bits"), &dh_min_bits, 0); + + /* This function is deprecated since GnuTLS 3.1.7, in favor of letting + * the priority string imply the DH requirements, but this is the only + * way to give the user control over compatibility with older servers. + */ + if (dh_min_bits > 0) { + crm_info("Requiring server use a Diffie-Hellman prime of at least %d bits", + dh_min_bits); + gnutls_dh_set_prime_bits(*session, dh_min_bits); + } +} + +static unsigned int +get_bound_dh_bits(unsigned int dh_bits) +{ + int dh_min_bits; + int dh_max_bits; + + pcmk__scan_min_int(getenv("PCMK_dh_min_bits"), &dh_min_bits, 0); + pcmk__scan_min_int(getenv("PCMK_dh_max_bits"), &dh_max_bits, 0); + if ((dh_max_bits > 0) && (dh_max_bits < dh_min_bits)) { + crm_warn("Ignoring PCMK_dh_max_bits less than PCMK_dh_min_bits"); + dh_max_bits = 0; + } + if ((dh_min_bits > 0) && (dh_bits < dh_min_bits)) { + return dh_min_bits; + } + if ((dh_max_bits > 0) && (dh_bits > dh_max_bits)) { + return dh_max_bits; + } + return dh_bits; +} + +/*! + * \internal + * \brief Initialize a new TLS session + * + * \param[in] csock Connected socket for TLS session + * \param[in] conn_type GNUTLS_SERVER or GNUTLS_CLIENT + * \param[in] cred_type GNUTLS_CRD_ANON or GNUTLS_CRD_PSK + * \param[in] credentials TLS session credentials + * + * \return Pointer to newly created session object, or NULL on error + */ +gnutls_session_t * +pcmk__new_tls_session(int csock, unsigned int conn_type, + gnutls_credentials_type_t cred_type, void *credentials) +{ + int rc = GNUTLS_E_SUCCESS; + const char *prio_base = NULL; + char *prio = NULL; + gnutls_session_t *session = NULL; + + /* Determine list of acceptable ciphers, etc. Pacemaker always adds the + * values required for its functionality. + * + * For an example of anonymous authentication, see: + * http://www.manpagez.com/info/gnutls/gnutls-2.10.4/gnutls_81.php#Echo-Server-with-anonymous-authentication + */ + + prio_base = getenv("PCMK_tls_priorities"); + if (prio_base == NULL) { + prio_base = PCMK_GNUTLS_PRIORITIES; + } + prio = crm_strdup_printf("%s:%s", prio_base, + (cred_type == GNUTLS_CRD_ANON)? "+ANON-DH" : "+DHE-PSK:+PSK"); + + session = gnutls_malloc(sizeof(gnutls_session_t)); + if (session == NULL) { + rc = GNUTLS_E_MEMORY_ERROR; + goto error; + } + + rc = gnutls_init(session, conn_type); + if (rc != GNUTLS_E_SUCCESS) { + goto error; + } + + /* @TODO On the server side, it would be more efficient to cache the + * priority with gnutls_priority_init2() and set it with + * gnutls_priority_set() for all sessions. + */ + rc = gnutls_priority_set_direct(*session, prio, NULL); + if (rc != GNUTLS_E_SUCCESS) { + goto error; + } + if (conn_type == GNUTLS_CLIENT) { + set_minimum_dh_bits(session); + } + + gnutls_transport_set_ptr(*session, + (gnutls_transport_ptr_t) GINT_TO_POINTER(csock)); + + rc = gnutls_credentials_set(*session, cred_type, credentials); + if (rc != GNUTLS_E_SUCCESS) { + goto error; + } + free(prio); + return session; + +error: + crm_err("Could not initialize %s TLS %s session: %s " + CRM_XS " rc=%d priority='%s'", + (cred_type == GNUTLS_CRD_ANON)? "anonymous" : "PSK", + (conn_type == GNUTLS_SERVER)? "server" : "client", + gnutls_strerror(rc), rc, prio); + free(prio); + if (session != NULL) { + gnutls_free(session); + } + return NULL; +} + +/*! + * \internal + * \brief Initialize Diffie-Hellman parameters for a TLS server + * + * \param[out] dh_params Parameter object to initialize + * + * \return Standard Pacemaker return code + * \todo The current best practice is to allow the client and server to + * negotiate the Diffie-Hellman parameters via a TLS extension (RFC 7919). + * However, we have to support both older versions of GnuTLS (<3.6) that + * don't support the extension on our side, and older Pacemaker versions + * that don't support the extension on the other side. The next best + * practice would be to use a known good prime (see RFC 5114 section 2.2), + * possibly stored in a file distributed with Pacemaker. + */ +int +pcmk__init_tls_dh(gnutls_dh_params_t *dh_params) +{ + int rc = GNUTLS_E_SUCCESS; + unsigned int dh_bits = 0; + + rc = gnutls_dh_params_init(dh_params); + if (rc != GNUTLS_E_SUCCESS) { + goto error; + } + + dh_bits = gnutls_sec_param_to_pk_bits(GNUTLS_PK_DH, + GNUTLS_SEC_PARAM_NORMAL); + if (dh_bits == 0) { + rc = GNUTLS_E_DH_PRIME_UNACCEPTABLE; + goto error; + } + dh_bits = get_bound_dh_bits(dh_bits); + + crm_info("Generating Diffie-Hellman parameters with %u-bit prime for TLS", + dh_bits); + rc = gnutls_dh_params_generate2(*dh_params, dh_bits); + if (rc != GNUTLS_E_SUCCESS) { + goto error; + } + + return pcmk_rc_ok; + +error: + crm_err("Could not initialize Diffie-Hellman parameters for TLS: %s " + CRM_XS " rc=%d", gnutls_strerror(rc), rc); + return EPROTO; +} + +/*! + * \internal + * \brief Process handshake data from TLS client + * + * Read as much TLS handshake data as is available. + * + * \param[in] client Client connection + * + * \return Standard Pacemaker return code (of particular interest, EAGAIN + * if some data was successfully read but more data is needed) + */ +int +pcmk__read_handshake_data(const pcmk__client_t *client) +{ + int rc = 0; + + CRM_ASSERT(client && client->remote && client->remote->tls_session); + + do { + rc = gnutls_handshake(*client->remote->tls_session); + } while (rc == GNUTLS_E_INTERRUPTED); + + if (rc == GNUTLS_E_AGAIN) { + /* No more data is available at the moment. This function should be + * invoked again once the client sends more. + */ + return EAGAIN; + } else if (rc != GNUTLS_E_SUCCESS) { + crm_err("TLS handshake with remote client failed: %s " + CRM_XS " rc=%d", gnutls_strerror(rc), rc); + return EPROTO; + } + return pcmk_rc_ok; +} + +// \return Standard Pacemaker return code +static int +send_tls(gnutls_session_t *session, struct iovec *iov) +{ + const char *unsent = iov->iov_base; + size_t unsent_len = iov->iov_len; + ssize_t gnutls_rc; + + if (unsent == NULL) { + return EINVAL; + } + + crm_trace("Sending TLS message of %llu bytes", + (unsigned long long) unsent_len); + while (true) { + gnutls_rc = gnutls_record_send(*session, unsent, unsent_len); + + if (gnutls_rc == GNUTLS_E_INTERRUPTED || gnutls_rc == GNUTLS_E_AGAIN) { + crm_trace("Retrying to send %llu bytes remaining", + (unsigned long long) unsent_len); + + } else if (gnutls_rc < 0) { + // Caller can log as error if necessary + crm_info("TLS connection terminated: %s " CRM_XS " rc=%lld", + gnutls_strerror((int) gnutls_rc), + (long long) gnutls_rc); + return ECONNABORTED; + + } else if (gnutls_rc < unsent_len) { + crm_trace("Sent %lld of %llu bytes remaining", + (long long) gnutls_rc, (unsigned long long) unsent_len); + unsent_len -= gnutls_rc; + unsent += gnutls_rc; + } else { + crm_trace("Sent all %lld bytes remaining", (long long) gnutls_rc); + break; + } + } + return pcmk_rc_ok; +} +#endif + +// \return Standard Pacemaker return code +static int +send_plaintext(int sock, struct iovec *iov) +{ + const char *unsent = iov->iov_base; + size_t unsent_len = iov->iov_len; + ssize_t write_rc; + + if (unsent == NULL) { + return EINVAL; + } + + crm_debug("Sending plaintext message of %llu bytes to socket %d", + (unsigned long long) unsent_len, sock); + while (true) { + write_rc = write(sock, unsent, unsent_len); + if (write_rc < 0) { + int rc = errno; + + if ((errno == EINTR) || (errno == EAGAIN)) { + crm_trace("Retrying to send %llu bytes remaining to socket %d", + (unsigned long long) unsent_len, sock); + continue; + } + + // Caller can log as error if necessary + crm_info("Could not send message: %s " CRM_XS " rc=%d socket=%d", + pcmk_rc_str(rc), rc, sock); + return rc; + + } else if (write_rc < unsent_len) { + crm_trace("Sent %lld of %llu bytes remaining", + (long long) write_rc, (unsigned long long) unsent_len); + unsent += write_rc; + unsent_len -= write_rc; + continue; + + } else { + crm_trace("Sent all %lld bytes remaining: %.100s", + (long long) write_rc, (char *) (iov->iov_base)); + break; + } + } + return pcmk_rc_ok; +} + +// \return Standard Pacemaker return code +static int +remote_send_iovs(pcmk__remote_t *remote, struct iovec *iov, int iovs) +{ + int rc = pcmk_rc_ok; + + for (int lpc = 0; (lpc < iovs) && (rc == pcmk_rc_ok); lpc++) { +#ifdef HAVE_GNUTLS_GNUTLS_H + if (remote->tls_session) { + rc = send_tls(remote->tls_session, &(iov[lpc])); + continue; + } +#endif + if (remote->tcp_socket) { + rc = send_plaintext(remote->tcp_socket, &(iov[lpc])); + } else { + rc = ESOCKTNOSUPPORT; + } + } + return rc; +} + +/*! + * \internal + * \brief Send an XML message over a Pacemaker Remote connection + * + * \param[in,out] remote Pacemaker Remote connection to use + * \param[in] msg XML to send + * + * \return Standard Pacemaker return code + */ +int +pcmk__remote_send_xml(pcmk__remote_t *remote, xmlNode *msg) +{ + int rc = pcmk_rc_ok; + static uint64_t id = 0; + char *xml_text = NULL; + + struct iovec iov[2]; + struct remote_header_v0 *header; + + CRM_CHECK((remote != NULL) && (msg != NULL), return EINVAL); + + xml_text = dump_xml_unformatted(msg); + CRM_CHECK(xml_text != NULL, return EINVAL); + + header = calloc(1, sizeof(struct remote_header_v0)); + CRM_ASSERT(header != NULL); + + iov[0].iov_base = header; + iov[0].iov_len = sizeof(struct remote_header_v0); + + iov[1].iov_base = xml_text; + iov[1].iov_len = 1 + strlen(xml_text); + + id++; + header->id = id; + header->endian = ENDIAN_LOCAL; + header->version = REMOTE_MSG_VERSION; + header->payload_offset = iov[0].iov_len; + header->payload_uncompressed = iov[1].iov_len; + header->size_total = iov[0].iov_len + iov[1].iov_len; + + rc = remote_send_iovs(remote, iov, 2); + if (rc != pcmk_rc_ok) { + crm_err("Could not send remote message: %s " CRM_XS " rc=%d", + pcmk_rc_str(rc), rc); + } + + free(iov[0].iov_base); + free(iov[1].iov_base); + return rc; +} + +/*! + * \internal + * \brief Obtain the XML from the currently buffered remote connection message + * + * \param[in,out] remote Remote connection possibly with message available + * + * \return Newly allocated XML object corresponding to message data, or NULL + * \note This effectively removes the message from the connection buffer. + */ +xmlNode * +pcmk__remote_message_xml(pcmk__remote_t *remote) +{ + xmlNode *xml = NULL; + struct remote_header_v0 *header = localized_remote_header(remote); + + if (header == NULL) { + return NULL; + } + + /* Support compression on the receiving end now, in case we ever want to add it later */ + if (header->payload_compressed) { + int rc = 0; + unsigned int size_u = 1 + header->payload_uncompressed; + char *uncompressed = calloc(1, header->payload_offset + size_u); + + crm_trace("Decompressing message data %d bytes into %d bytes", + header->payload_compressed, size_u); + + rc = BZ2_bzBuffToBuffDecompress(uncompressed + header->payload_offset, &size_u, + remote->buffer + header->payload_offset, + header->payload_compressed, 1, 0); + + if (rc != BZ_OK && header->version > REMOTE_MSG_VERSION) { + crm_warn("Couldn't decompress v%d message, we only understand v%d", + header->version, REMOTE_MSG_VERSION); + free(uncompressed); + return NULL; + + } else if (rc != BZ_OK) { + crm_err("Decompression failed: %s " CRM_XS " bzerror=%d", + bz2_strerror(rc), rc); + free(uncompressed); + return NULL; + } + + CRM_ASSERT(size_u == header->payload_uncompressed); + + memcpy(uncompressed, remote->buffer, header->payload_offset); /* Preserve the header */ + remote->buffer_size = header->payload_offset + size_u; + + free(remote->buffer); + remote->buffer = uncompressed; + header = localized_remote_header(remote); + } + + /* take ownership of the buffer */ + remote->buffer_offset = 0; + + CRM_LOG_ASSERT(remote->buffer[sizeof(struct remote_header_v0) + header->payload_uncompressed - 1] == 0); + + xml = string2xml(remote->buffer + header->payload_offset); + if (xml == NULL && header->version > REMOTE_MSG_VERSION) { + crm_warn("Couldn't parse v%d message, we only understand v%d", + header->version, REMOTE_MSG_VERSION); + + } else if (xml == NULL) { + crm_err("Couldn't parse: '%.120s'", remote->buffer + header->payload_offset); + } + + return xml; +} + +static int +get_remote_socket(const pcmk__remote_t *remote) +{ +#ifdef HAVE_GNUTLS_GNUTLS_H + if (remote->tls_session) { + void *sock_ptr = gnutls_transport_get_ptr(*remote->tls_session); + + return GPOINTER_TO_INT(sock_ptr); + } +#endif + + if (remote->tcp_socket) { + return remote->tcp_socket; + } + + crm_err("Remote connection type undetermined (bug?)"); + return -1; +} + +/*! + * \internal + * \brief Wait for a remote session to have data to read + * + * \param[in] remote Connection to check + * \param[in] timeout_ms Maximum time (in ms) to wait + * + * \return Standard Pacemaker return code (of particular interest, pcmk_rc_ok if + * there is data ready to be read, and ETIME if there is no data within + * the specified timeout) + */ +int +pcmk__remote_ready(const pcmk__remote_t *remote, int timeout_ms) +{ + struct pollfd fds = { 0, }; + int sock = 0; + int rc = 0; + time_t start; + int timeout = timeout_ms; + + sock = get_remote_socket(remote); + if (sock <= 0) { + crm_trace("No longer connected"); + return ENOTCONN; + } + + start = time(NULL); + errno = 0; + do { + fds.fd = sock; + fds.events = POLLIN; + + /* If we got an EINTR while polling, and we have a + * specific timeout we are trying to honor, attempt + * to adjust the timeout to the closest second. */ + if (errno == EINTR && (timeout > 0)) { + timeout = timeout_ms - ((time(NULL) - start) * 1000); + if (timeout < 1000) { + timeout = 1000; + } + } + + rc = poll(&fds, 1, timeout); + } while (rc < 0 && errno == EINTR); + + if (rc < 0) { + return errno; + } + return (rc == 0)? ETIME : pcmk_rc_ok; +} + +/*! + * \internal + * \brief Read bytes from non-blocking remote connection + * + * \param[in,out] remote Remote connection to read + * + * \return Standard Pacemaker return code (of particular interest, pcmk_rc_ok if + * a full message has been received, or EAGAIN for a partial message) + * \note Use only with non-blocking sockets after polling the socket. + * \note This function will return when the socket read buffer is empty or an + * error is encountered. + */ +static int +read_available_remote_data(pcmk__remote_t *remote) +{ + int rc = pcmk_rc_ok; + size_t read_len = sizeof(struct remote_header_v0); + struct remote_header_v0 *header = localized_remote_header(remote); + bool received = false; + ssize_t read_rc; + + if(header) { + /* Stop at the end of the current message */ + read_len = header->size_total; + } + + /* automatically grow the buffer when needed */ + if(remote->buffer_size < read_len) { + remote->buffer_size = 2 * read_len; + crm_trace("Expanding buffer to %llu bytes", + (unsigned long long) remote->buffer_size); + remote->buffer = pcmk__realloc(remote->buffer, remote->buffer_size + 1); + } + +#ifdef HAVE_GNUTLS_GNUTLS_H + if (!received && remote->tls_session) { + read_rc = gnutls_record_recv(*(remote->tls_session), + remote->buffer + remote->buffer_offset, + remote->buffer_size - remote->buffer_offset); + if (read_rc == GNUTLS_E_INTERRUPTED) { + rc = EINTR; + } else if (read_rc == GNUTLS_E_AGAIN) { + rc = EAGAIN; + } else if (read_rc < 0) { + crm_debug("TLS receive failed: %s (%lld)", + gnutls_strerror(read_rc), (long long) read_rc); + rc = EIO; + } + received = true; + } +#endif + + if (!received && remote->tcp_socket) { + read_rc = read(remote->tcp_socket, + remote->buffer + remote->buffer_offset, + remote->buffer_size - remote->buffer_offset); + if (read_rc < 0) { + rc = errno; + } + received = true; + } + + if (!received) { + crm_err("Remote connection type undetermined (bug?)"); + return ESOCKTNOSUPPORT; + } + + /* process any errors. */ + if (read_rc > 0) { + remote->buffer_offset += read_rc; + /* always null terminate buffer, the +1 to alloc always allows for this. */ + remote->buffer[remote->buffer_offset] = '\0'; + crm_trace("Received %lld more bytes (%llu total)", + (long long) read_rc, + (unsigned long long) remote->buffer_offset); + + } else if ((rc == EINTR) || (rc == EAGAIN)) { + crm_trace("No data available for non-blocking remote read: %s (%d)", + pcmk_rc_str(rc), rc); + + } else if (read_rc == 0) { + crm_debug("End of remote data encountered after %llu bytes", + (unsigned long long) remote->buffer_offset); + return ENOTCONN; + + } else { + crm_debug("Error receiving remote data after %llu bytes: %s (%d)", + (unsigned long long) remote->buffer_offset, + pcmk_rc_str(rc), rc); + return ENOTCONN; + } + + header = localized_remote_header(remote); + if(header) { + if(remote->buffer_offset < header->size_total) { + crm_trace("Read partial remote message (%llu of %u bytes)", + (unsigned long long) remote->buffer_offset, + header->size_total); + } else { + crm_trace("Read full remote message of %llu bytes", + (unsigned long long) remote->buffer_offset); + return pcmk_rc_ok; + } + } + + return EAGAIN; +} + +/*! + * \internal + * \brief Read one message from a remote connection + * + * \param[in,out] remote Remote connection to read + * \param[in] timeout_ms Fail if message not read in this many milliseconds + * (10s will be used if 0, and 60s if negative) + * + * \return Standard Pacemaker return code + */ +int +pcmk__read_remote_message(pcmk__remote_t *remote, int timeout_ms) +{ + int rc = pcmk_rc_ok; + time_t start = time(NULL); + int remaining_timeout = 0; + + if (timeout_ms == 0) { + timeout_ms = 10000; + } else if (timeout_ms < 0) { + timeout_ms = 60000; + } + + remaining_timeout = timeout_ms; + while (remaining_timeout > 0) { + + crm_trace("Waiting for remote data (%d ms of %d ms timeout remaining)", + remaining_timeout, timeout_ms); + rc = pcmk__remote_ready(remote, remaining_timeout); + + if (rc == ETIME) { + crm_err("Timed out (%d ms) while waiting for remote data", + remaining_timeout); + return rc; + + } else if (rc != pcmk_rc_ok) { + crm_debug("Wait for remote data aborted (will retry): %s " + CRM_XS " rc=%d", pcmk_rc_str(rc), rc); + + } else { + rc = read_available_remote_data(remote); + if (rc == pcmk_rc_ok) { + return rc; + } else if (rc == EAGAIN) { + crm_trace("Waiting for more remote data"); + } else { + crm_debug("Could not receive remote data: %s " CRM_XS " rc=%d", + pcmk_rc_str(rc), rc); + } + } + + // Don't waste time retrying after fatal errors + if ((rc == ENOTCONN) || (rc == ESOCKTNOSUPPORT)) { + return rc; + } + + remaining_timeout = timeout_ms - ((time(NULL) - start) * 1000); + } + return ETIME; +} + +struct tcp_async_cb_data { + int sock; + int timeout_ms; + time_t start; + void *userdata; + void (*callback) (void *userdata, int rc, int sock); +}; + +// \return TRUE if timer should be rescheduled, FALSE otherwise +static gboolean +check_connect_finished(gpointer userdata) +{ + struct tcp_async_cb_data *cb_data = userdata; + int rc; + + fd_set rset, wset; + struct timeval ts = { 0, }; + + if (cb_data->start == 0) { + // Last connect() returned success immediately + rc = pcmk_rc_ok; + goto dispatch_done; + } + + // If the socket is ready for reading or writing, the connect succeeded + FD_ZERO(&rset); + FD_SET(cb_data->sock, &rset); + wset = rset; + rc = select(cb_data->sock + 1, &rset, &wset, NULL, &ts); + + if (rc < 0) { // select() error + rc = errno; + if ((rc == EINPROGRESS) || (rc == EAGAIN)) { + if ((time(NULL) - cb_data->start) < (cb_data->timeout_ms / 1000)) { + return TRUE; // There is time left, so reschedule timer + } else { + rc = ETIMEDOUT; + } + } + crm_trace("Could not check socket %d for connection success: %s (%d)", + cb_data->sock, pcmk_rc_str(rc), rc); + + } else if (rc == 0) { // select() timeout + if ((time(NULL) - cb_data->start) < (cb_data->timeout_ms / 1000)) { + return TRUE; // There is time left, so reschedule timer + } + crm_debug("Timed out while waiting for socket %d connection success", + cb_data->sock); + rc = ETIMEDOUT; + + // select() returned number of file descriptors that are ready + + } else if (FD_ISSET(cb_data->sock, &rset) + || FD_ISSET(cb_data->sock, &wset)) { + + // The socket is ready; check it for connection errors + int error = 0; + socklen_t len = sizeof(error); + + if (getsockopt(cb_data->sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) { + rc = errno; + crm_trace("Couldn't check socket %d for connection errors: %s (%d)", + cb_data->sock, pcmk_rc_str(rc), rc); + } else if (error != 0) { + rc = error; + crm_trace("Socket %d connected with error: %s (%d)", + cb_data->sock, pcmk_rc_str(rc), rc); + } else { + rc = pcmk_rc_ok; + } + + } else { // Should not be possible + crm_trace("select() succeeded, but socket %d not in resulting " + "read/write sets", cb_data->sock); + rc = EAGAIN; + } + + dispatch_done: + if (rc == pcmk_rc_ok) { + crm_trace("Socket %d is connected", cb_data->sock); + } else { + close(cb_data->sock); + cb_data->sock = -1; + } + + if (cb_data->callback) { + cb_data->callback(cb_data->userdata, rc, cb_data->sock); + } + free(cb_data); + return FALSE; // Do not reschedule timer +} + +/*! + * \internal + * \brief Attempt to connect socket, calling callback when done + * + * Set a given socket non-blocking, then attempt to connect to it, + * retrying periodically until success or a timeout is reached. + * Call a caller-supplied callback function when completed. + * + * \param[in] sock Newly created socket + * \param[in] addr Socket address information for connect + * \param[in] addrlen Size of socket address information in bytes + * \param[in] timeout_ms Fail if not connected within this much time + * \param[out] timer_id If not NULL, store retry timer ID here + * \param[in] userdata User data to pass to callback + * \param[in] callback Function to call when connection attempt completes + * + * \return Standard Pacemaker return code + */ +static int +connect_socket_retry(int sock, const struct sockaddr *addr, socklen_t addrlen, + int timeout_ms, int *timer_id, void *userdata, + void (*callback) (void *userdata, int rc, int sock)) +{ + int rc = 0; + int interval = 500; + int timer; + struct tcp_async_cb_data *cb_data = NULL; + + rc = pcmk__set_nonblocking(sock); + if (rc != pcmk_rc_ok) { + crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d", + pcmk_rc_str(rc), rc); + return rc; + } + + rc = connect(sock, addr, addrlen); + if (rc < 0 && (errno != EINPROGRESS) && (errno != EAGAIN)) { + rc = errno; + crm_warn("Could not connect socket: %s " CRM_XS " rc=%d", + pcmk_rc_str(rc), rc); + return rc; + } + + cb_data = calloc(1, sizeof(struct tcp_async_cb_data)); + cb_data->userdata = userdata; + cb_data->callback = callback; + cb_data->sock = sock; + cb_data->timeout_ms = timeout_ms; + + if (rc == 0) { + /* The connect was successful immediately, we still return to mainloop + * and let this callback get called later. This avoids the user of this api + * to have to account for the fact the callback could be invoked within this + * function before returning. */ + cb_data->start = 0; + interval = 1; + } else { + cb_data->start = time(NULL); + } + + /* This timer function does a non-blocking poll on the socket to see if we + * can use it. Once we can, the connect has completed. This method allows us + * to connect without blocking the mainloop. + * + * @TODO Use a mainloop fd callback for this instead of polling. Something + * about the way mainloop is currently polling prevents this from + * working at the moment though. (See connect(2) regarding EINPROGRESS + * for possible new handling needed.) + */ + crm_trace("Scheduling check in %dms for whether connect to fd %d finished", + interval, sock); + timer = g_timeout_add(interval, check_connect_finished, cb_data); + if (timer_id) { + *timer_id = timer; + } + + // timer callback should be taking care of cb_data + // cppcheck-suppress memleak + return pcmk_rc_ok; +} + +/*! + * \internal + * \brief Attempt once to connect socket and set it non-blocking + * + * \param[in] sock Newly created socket + * \param[in] addr Socket address information for connect + * \param[in] addrlen Size of socket address information in bytes + * + * \return Standard Pacemaker return code + */ +static int +connect_socket_once(int sock, const struct sockaddr *addr, socklen_t addrlen) +{ + int rc = connect(sock, addr, addrlen); + + if (rc < 0) { + rc = errno; + crm_warn("Could not connect socket: %s " CRM_XS " rc=%d", + pcmk_rc_str(rc), rc); + return rc; + } + + rc = pcmk__set_nonblocking(sock); + if (rc != pcmk_rc_ok) { + crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d", + pcmk_rc_str(rc), rc); + return rc; + } + + return pcmk_ok; +} + +/*! + * \internal + * \brief Connect to server at specified TCP port + * + * \param[in] host Name of server to connect to + * \param[in] port Server port to connect to + * \param[in] timeout_ms If asynchronous, fail if not connected in this time + * \param[out] timer_id If asynchronous and this is non-NULL, retry timer ID + * will be put here (for ease of cancelling by caller) + * \param[out] sock_fd Where to store socket file descriptor + * \param[in] userdata If asynchronous, data to pass to callback + * \param[in] callback If NULL, attempt a single synchronous connection, + * otherwise retry asynchronously then call this + * + * \return Standard Pacemaker return code + */ +int +pcmk__connect_remote(const char *host, int port, int timeout, int *timer_id, + int *sock_fd, void *userdata, + void (*callback) (void *userdata, int rc, int sock)) +{ + char buffer[INET6_ADDRSTRLEN]; + struct addrinfo *res = NULL; + struct addrinfo *rp = NULL; + struct addrinfo hints; + const char *server = host; + int rc; + int sock = -1; + + CRM_CHECK((host != NULL) && (sock_fd != NULL), return EINVAL); + + // Get host's IP address(es) + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_CANONNAME; + rc = getaddrinfo(server, NULL, &hints, &res); + if (rc != 0) { + crm_err("Unable to get IP address info for %s: %s", + server, gai_strerror(rc)); + rc = ENOTCONN; + goto async_cleanup; + } + if (!res || !res->ai_addr) { + crm_err("Unable to get IP address info for %s: no result", server); + rc = ENOTCONN; + goto async_cleanup; + } + + // getaddrinfo() returns a list of host's addresses, try them in order + for (rp = res; rp != NULL; rp = rp->ai_next) { + struct sockaddr *addr = rp->ai_addr; + + if (!addr) { + continue; + } + + if (rp->ai_canonname) { + server = res->ai_canonname; + } + crm_debug("Got canonical name %s for %s", server, host); + + sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP); + if (sock == -1) { + rc = errno; + crm_warn("Could not create socket for remote connection to %s:%d: " + "%s " CRM_XS " rc=%d", server, port, pcmk_rc_str(rc), rc); + continue; + } + + /* Set port appropriately for address family */ + /* (void*) casts avoid false-positive compiler alignment warnings */ + if (addr->sa_family == AF_INET6) { + ((struct sockaddr_in6 *)(void*)addr)->sin6_port = htons(port); + } else { + ((struct sockaddr_in *)(void*)addr)->sin_port = htons(port); + } + + memset(buffer, 0, PCMK__NELEM(buffer)); + pcmk__sockaddr2str(addr, buffer); + crm_info("Attempting remote connection to %s:%d", buffer, port); + + if (callback) { + if (connect_socket_retry(sock, rp->ai_addr, rp->ai_addrlen, timeout, + timer_id, userdata, callback) == pcmk_rc_ok) { + goto async_cleanup; /* Success for now, we'll hear back later in the callback */ + } + + } else if (connect_socket_once(sock, rp->ai_addr, + rp->ai_addrlen) == pcmk_rc_ok) { + break; /* Success */ + } + + // Connect failed + close(sock); + sock = -1; + rc = ENOTCONN; + } + +async_cleanup: + + if (res) { + freeaddrinfo(res); + } + *sock_fd = sock; + return rc; +} + +/*! + * \internal + * \brief Convert an IP address (IPv4 or IPv6) to a string for logging + * + * \param[in] sa Socket address for IP + * \param[out] s Storage for at least INET6_ADDRSTRLEN bytes + * + * \note sa The socket address can be a pointer to struct sockaddr_in (IPv4), + * struct sockaddr_in6 (IPv6) or struct sockaddr_storage (either), + * as long as its sa_family member is set correctly. + */ +void +pcmk__sockaddr2str(const void *sa, char *s) +{ + switch (((const struct sockaddr *) sa)->sa_family) { + case AF_INET: + inet_ntop(AF_INET, &(((const struct sockaddr_in *) sa)->sin_addr), + s, INET6_ADDRSTRLEN); + break; + + case AF_INET6: + inet_ntop(AF_INET6, + &(((const struct sockaddr_in6 *) sa)->sin6_addr), + s, INET6_ADDRSTRLEN); + break; + + default: + strcpy(s, "<invalid>"); + } +} + +/*! + * \internal + * \brief Accept a client connection on a remote server socket + * + * \param[in] ssock Server socket file descriptor being listened on + * \param[out] csock Where to put new client socket's file descriptor + * + * \return Standard Pacemaker return code + */ +int +pcmk__accept_remote_connection(int ssock, int *csock) +{ + int rc; + struct sockaddr_storage addr; + socklen_t laddr = sizeof(addr); + char addr_str[INET6_ADDRSTRLEN]; + + /* accept the connection */ + memset(&addr, 0, sizeof(addr)); + *csock = accept(ssock, (struct sockaddr *)&addr, &laddr); + if (*csock == -1) { + rc = errno; + crm_err("Could not accept remote client connection: %s " + CRM_XS " rc=%d", pcmk_rc_str(rc), rc); + return rc; + } + pcmk__sockaddr2str(&addr, addr_str); + crm_info("Accepted new remote client connection from %s", addr_str); + + rc = pcmk__set_nonblocking(*csock); + if (rc != pcmk_rc_ok) { + crm_err("Could not set socket non-blocking: %s " CRM_XS " rc=%d", + pcmk_rc_str(rc), rc); + close(*csock); + *csock = -1; + return rc; + } + +#ifdef TCP_USER_TIMEOUT + if (pcmk__get_sbd_timeout() > 0) { + // Time to fail and retry before watchdog + unsigned int optval = (unsigned int) pcmk__get_sbd_timeout() / 2; + + rc = setsockopt(*csock, SOL_TCP, TCP_USER_TIMEOUT, + &optval, sizeof(optval)); + if (rc < 0) { + rc = errno; + crm_err("Could not set TCP timeout to %d ms on remote connection: " + "%s " CRM_XS " rc=%d", optval, pcmk_rc_str(rc), rc); + close(*csock); + *csock = -1; + return rc; + } + } +#endif + + return rc; +} + +/*! + * \brief Get the default remote connection TCP port on this host + * + * \return Remote connection TCP port number + */ +int +crm_default_remote_port(void) +{ + static int port = 0; + + if (port == 0) { + const char *env = getenv("PCMK_remote_port"); + + if (env) { + errno = 0; + port = strtol(env, NULL, 10); + if (errno || (port < 1) || (port > 65535)) { + crm_warn("Environment variable PCMK_remote_port has invalid value '%s', using %d instead", + env, DEFAULT_REMOTE_PORT); + port = DEFAULT_REMOTE_PORT; + } + } else { + port = DEFAULT_REMOTE_PORT; + } + } + return port; +} |