diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-06-03 17:01:24 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-06-03 17:01:24 +0000 |
commit | 6dd3dfb79125cd02d02efbce435a6c82e5af92ef (patch) | |
tree | 45084fc83278586f6bbafcb935f92d53f71a6b03 /exec/totemudpu.c | |
parent | Initial commit. (diff) | |
download | corosync-6dd3dfb79125cd02d02efbce435a6c82e5af92ef.tar.xz corosync-6dd3dfb79125cd02d02efbce435a6c82e5af92ef.zip |
Adding upstream version 3.1.8.upstream/3.1.8upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'exec/totemudpu.c')
-rw-r--r-- | exec/totemudpu.c | 1453 |
1 files changed, 1453 insertions, 0 deletions
diff --git a/exec/totemudpu.c b/exec/totemudpu.c new file mode 100644 index 0000000..399b47b --- /dev/null +++ b/exec/totemudpu.c @@ -0,0 +1,1453 @@ +/* + * Copyright (c) 2005 MontaVista Software, Inc. + * Copyright (c) 2006-2018 Red Hat, Inc. + * + * All rights reserved. + * + * Author: Steven Dake (sdake@redhat.com) + + * This software licensed under BSD license, the text of which follows: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * - Neither the name of the MontaVista Software, Inc. nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include <config.h> + +#include <assert.h> +#include <sys/mman.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <sys/socket.h> +#include <netdb.h> +#include <sys/un.h> +#include <sys/ioctl.h> +#include <sys/param.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <unistd.h> +#include <fcntl.h> +#include <stdlib.h> +#include <stdio.h> +#include <errno.h> +#include <sched.h> +#include <time.h> +#include <sys/time.h> +#include <sys/poll.h> +#include <sys/uio.h> +#include <limits.h> + +#include <qb/qblist.h> +#include <qb/qbdefs.h> +#include <qb/qbloop.h> + +#include <corosync/sq.h> +#include <corosync/swab.h> +#define LOGSYS_UTILS_ONLY 1 +#include <corosync/logsys.h> +#include "totemudpu.h" + +#include "util.h" + +#ifndef MSG_NOSIGNAL +#define MSG_NOSIGNAL 0 +#endif + +#define MCAST_SOCKET_BUFFER_SIZE (TRANSMITS_ALLOWED * UDP_RECEIVE_FRAME_SIZE_MAX) +#define NETIF_STATE_REPORT_UP 1 +#define NETIF_STATE_REPORT_DOWN 2 + +#define BIND_STATE_UNBOUND 0 +#define BIND_STATE_REGULAR 1 +#define BIND_STATE_LOOPBACK 2 + +struct totemudpu_member { + struct qb_list_head list; + struct totem_ip_address member; + int fd; + int active; +}; + +struct totemudpu_instance { + qb_loop_t *totemudpu_poll_handle; + + struct totem_interface *totem_interface; + + int netif_state_report; + + int netif_bind_state; + + void *context; + + int (*totemudpu_deliver_fn) ( + void *context, + const void *msg, + unsigned int msg_len, + const struct sockaddr_storage *system_from); + + int (*totemudpu_iface_change_fn) ( + void *context, + const struct totem_ip_address *iface_address, + unsigned int ring_no); + + void (*totemudpu_target_set_completed) (void *context); + + /* + * Function and data used to log messages + */ + int totemudpu_log_level_security; + + int totemudpu_log_level_error; + + int totemudpu_log_level_warning; + + int totemudpu_log_level_notice; + + int totemudpu_log_level_debug; + + int totemudpu_subsys_id; + + void (*totemudpu_log_printf) ( + int level, + int subsys, + const char *function, + const char *file, + int line, + const char *format, + ...)__attribute__((format(printf, 6, 7))); + + void *udpu_context; + + char iov_buffer[UDP_RECEIVE_FRAME_SIZE_MAX]; + + struct iovec totemudpu_iov_recv; + + struct qb_list_head member_list; + + int stats_sent; + + int stats_recv; + + int stats_delv; + + int stats_remcasts; + + int stats_orf_token; + + struct timeval stats_tv_start; + + struct totem_ip_address my_id; + + int firstrun; + + qb_loop_timer_handle timer_netif_check_timeout; + + unsigned int my_memb_entries; + + struct totem_config *totem_config; + + totemsrp_stats_t *stats; + + struct totem_ip_address token_target; + + int token_socket; + + int local_loop_sock[2]; + + qb_loop_timer_handle timer_merge_detect_timeout; + + int send_merge_detect_message; + + unsigned int merge_detect_messages_sent_before_timeout; +}; + +struct work_item { + const void *msg; + unsigned int msg_len; + struct totemudpu_instance *instance; +}; + +static int totemudpu_build_sockets ( + struct totemudpu_instance *instance, + struct totem_ip_address *bindnet_address, + struct totem_ip_address *bound_to); + +static int totemudpu_create_sending_socket( + void *udpu_context, + const struct totem_ip_address *member); + +int totemudpu_member_list_rebind_ip ( + void *udpu_context); + +static void totemudpu_start_merge_detect_timeout( + void *udpu_context); + +static void totemudpu_stop_merge_detect_timeout( + void *udpu_context); + +static void totemudpu_instance_initialize (struct totemudpu_instance *instance) +{ + memset (instance, 0, sizeof (struct totemudpu_instance)); + + instance->netif_state_report = NETIF_STATE_REPORT_UP | NETIF_STATE_REPORT_DOWN; + + instance->totemudpu_iov_recv.iov_base = instance->iov_buffer; + + instance->totemudpu_iov_recv.iov_len = UDP_RECEIVE_FRAME_SIZE_MAX; //sizeof (instance->iov_buffer); + + /* + * There is always atleast 1 processor + */ + instance->my_memb_entries = 1; + + qb_list_init (&instance->member_list); +} + +#define log_printf(level, format, args...) \ +do { \ + instance->totemudpu_log_printf ( \ + level, instance->totemudpu_subsys_id, \ + __FUNCTION__, __FILE__, __LINE__, \ + (const char *)format, ##args); \ +} while (0); +#define LOGSYS_PERROR(err_num, level, fmt, args...) \ +do { \ + char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \ + const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \ + instance->totemudpu_log_printf ( \ + level, instance->totemudpu_subsys_id, \ + __FUNCTION__, __FILE__, __LINE__, \ + fmt ": %s (%d)", ##args, _error_ptr, err_num); \ + } while(0) + +int totemudpu_crypto_set ( + void *udpu_context, + const char *cipher_type, + const char *hash_type) +{ + + return (0); +} + + +static inline void ucast_sendmsg ( + struct totemudpu_instance *instance, + struct totem_ip_address *system_to, + const void *msg, + unsigned int msg_len) +{ + struct msghdr msg_ucast; + int res = 0; + struct sockaddr_storage sockaddr; + struct iovec iovec; + int addrlen; + int send_sock; + + iovec.iov_base = (void *)msg; + iovec.iov_len = msg_len; + + /* + * Build unicast message + */ + totemip_totemip_to_sockaddr_convert(system_to, + instance->totem_interface->ip_port, &sockaddr, &addrlen); + memset(&msg_ucast, 0, sizeof(msg_ucast)); + msg_ucast.msg_name = &sockaddr; + msg_ucast.msg_namelen = addrlen; + msg_ucast.msg_iov = (void *)&iovec; + msg_ucast.msg_iovlen = 1; +#ifdef HAVE_MSGHDR_CONTROL + msg_ucast.msg_control = 0; +#endif +#ifdef HAVE_MSGHDR_CONTROLLEN + msg_ucast.msg_controllen = 0; +#endif +#ifdef HAVE_MSGHDR_FLAGS + msg_ucast.msg_flags = 0; +#endif +#ifdef HAVE_MSGHDR_ACCRIGHTS + msg_ucast.msg_accrights = NULL; +#endif +#ifdef HAVE_MSGHDR_ACCRIGHTSLEN + msg_ucast.msg_accrightslen = 0; +#endif + + if (instance->netif_bind_state == BIND_STATE_REGULAR) { + send_sock = instance->token_socket; + } else { + send_sock = instance->local_loop_sock[1]; + msg_ucast.msg_name = NULL; + msg_ucast.msg_namelen = 0; + } + + + /* + * Transmit unicast message + * An error here is recovered by totemsrp + */ + res = sendmsg (send_sock, &msg_ucast, MSG_NOSIGNAL); + if (res < 0) { + LOGSYS_PERROR (errno, instance->totemudpu_log_level_debug, + "sendmsg(ucast) failed (non-critical)"); + } +} + +static inline void mcast_sendmsg ( + struct totemudpu_instance *instance, + const void *msg, + unsigned int msg_len, + int only_active) +{ + struct msghdr msg_mcast; + int res = 0; + struct iovec iovec; + struct sockaddr_storage sockaddr; + int addrlen; + struct qb_list_head *list; + struct totemudpu_member *member; + + iovec.iov_base = (void *)msg; + iovec.iov_len = msg_len; + + memset(&msg_mcast, 0, sizeof(msg_mcast)); + /* + * Build multicast message + */ + if (instance->netif_bind_state == BIND_STATE_REGULAR) { + qb_list_for_each(list, &(instance->member_list)) { + member = qb_list_entry (list, + struct totemudpu_member, + list); + /* + * Do not send multicast message if message is not "flush", member + * is inactive and timeout for sending merge message didn't expired. + */ + if (only_active && !member->active && !instance->send_merge_detect_message) + continue ; + + totemip_totemip_to_sockaddr_convert(&member->member, + instance->totem_interface->ip_port, &sockaddr, &addrlen); + msg_mcast.msg_name = &sockaddr; + msg_mcast.msg_namelen = addrlen; + msg_mcast.msg_iov = (void *)&iovec; + msg_mcast.msg_iovlen = 1; + #ifdef HAVE_MSGHDR_CONTROL + msg_mcast.msg_control = 0; + #endif + #ifdef HAVE_MSGHDR_CONTROLLEN + msg_mcast.msg_controllen = 0; + #endif + #ifdef HAVE_MSGHDR_FLAGS + msg_mcast.msg_flags = 0; + #endif + #ifdef HAVE_MSGHDR_ACCRIGHTS + msg_mcast.msg_accrights = NULL; + #endif + #ifdef HAVE_MSGHDR_ACCRIGHTSLEN + msg_mcast.msg_accrightslen = 0; + #endif + + /* + * Transmit multicast message + * An error here is recovered by totemsrp + */ + res = sendmsg (member->fd, &msg_mcast, MSG_NOSIGNAL); + if (res < 0) { + LOGSYS_PERROR (errno, instance->totemudpu_log_level_debug, + "sendmsg(mcast) failed (non-critical)"); + } + } + + if (!only_active || instance->send_merge_detect_message) { + /* + * Current message was sent to all nodes + */ + instance->merge_detect_messages_sent_before_timeout++; + instance->send_merge_detect_message = 0; + } + } else { + /* + * Transmit multicast message to local unix mcast loop + * An error here is recovered by totemsrp + */ + msg_mcast.msg_name = NULL; + msg_mcast.msg_namelen = 0; + msg_mcast.msg_iov = (void *)&iovec; + msg_mcast.msg_iovlen = 1; + #ifdef HAVE_MSGHDR_CONTROL + msg_mcast.msg_control = 0; + #endif + #ifdef HAVE_MSGHDR_CONTROLLEN + msg_mcast.msg_controllen = 0; + #endif + #ifdef HAVE_MSGHDR_FLAGS + msg_mcast.msg_flags = 0; + #endif + #ifdef HAVE_MSGHDR_ACCRIGHTS + msg_mcast.msg_accrights = NULL; + #endif + #ifdef HAVE_MSGHDR_ACCRIGHTSLEN + msg_mcast.msg_accrightslen = 0; + #endif + + res = sendmsg (instance->local_loop_sock[1], &msg_mcast, + MSG_NOSIGNAL); + if (res < 0) { + LOGSYS_PERROR (errno, instance->totemudpu_log_level_debug, + "sendmsg(local mcast loop) failed (non-critical)"); + } + } +} + +int totemudpu_finalize ( + void *udpu_context) +{ + struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; + int res = 0; + + if (instance->token_socket > 0) { + qb_loop_poll_del (instance->totemudpu_poll_handle, + instance->token_socket); + close (instance->token_socket); + } + + if (instance->local_loop_sock[0] > 0) { + qb_loop_poll_del (instance->totemudpu_poll_handle, + instance->local_loop_sock[0]); + close (instance->local_loop_sock[0]); + close (instance->local_loop_sock[1]); + } + + totemudpu_stop_merge_detect_timeout(instance); + + return (res); +} + +static struct totemudpu_member *find_member_by_sockaddr( + const void *udpu_context, + const struct sockaddr *sa) +{ + struct qb_list_head *list; + struct totemudpu_member *member; + struct totemudpu_member *res_member; + const struct totemudpu_instance *instance = (const struct totemudpu_instance *)udpu_context; + + res_member = NULL; + + qb_list_for_each(list, &(instance->member_list)) { + member = qb_list_entry (list, + struct totemudpu_member, + list); + + if (totemip_sa_equal(&member->member, sa)) { + res_member = member; + break ; + } + } + + return (res_member); +} + + +static int net_deliver_fn ( + int fd, + int revents, + void *data) +{ + struct totemudpu_instance *instance = (struct totemudpu_instance *)data; + struct msghdr msg_recv; + struct iovec *iovec; + struct sockaddr_storage system_from; + int bytes_received; + int truncated_packet; + + iovec = &instance->totemudpu_iov_recv; + + /* + * Receive datagram + */ + msg_recv.msg_name = &system_from; + msg_recv.msg_namelen = sizeof (struct sockaddr_storage); + msg_recv.msg_iov = iovec; + msg_recv.msg_iovlen = 1; +#ifdef HAVE_MSGHDR_CONTROL + msg_recv.msg_control = 0; +#endif +#ifdef HAVE_MSGHDR_CONTROLLEN + msg_recv.msg_controllen = 0; +#endif +#ifdef HAVE_MSGHDR_FLAGS + msg_recv.msg_flags = 0; +#endif +#ifdef HAVE_MSGHDR_ACCRIGHTS + msg_recv.msg_accrights = NULL; +#endif +#ifdef HAVE_MSGHDR_ACCRIGHTSLEN + msg_recv.msg_accrightslen = 0; +#endif + + bytes_received = recvmsg (fd, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT); + if (bytes_received == -1) { + return (0); + } else { + instance->stats_recv += bytes_received; + } + + truncated_packet = 0; + +#ifdef HAVE_MSGHDR_FLAGS + if (msg_recv.msg_flags & MSG_TRUNC) { + truncated_packet = 1; + } +#else + /* + * We don't have MSGHDR_FLAGS, but we can (hopefully) safely make assumption that + * if bytes_received == UDP_RECEIVE_FRAME_SIZE_MAX then packet is truncated + */ + if (bytes_received == UDP_RECEIVE_FRAME_SIZE_MAX) { + truncated_packet = 1; + } +#endif + + if (truncated_packet) { + log_printf (instance->totemudpu_log_level_error, + "Received too big message. This may be because something bad is happening" + "on the network (attack?), or you tried join more nodes than corosync is" + "compiled with (%u) or bug in the code (bad estimation of " + "the UDP_RECEIVE_FRAME_SIZE_MAX). Dropping packet.", PROCESSOR_COUNT_MAX); + return (0); + } + + if (instance->totem_config->block_unlisted_ips && + instance->netif_bind_state == BIND_STATE_REGULAR && + find_member_by_sockaddr(instance, (const struct sockaddr *)&system_from) == NULL) { + log_printf(instance->totemudpu_log_level_debug, "Packet rejected from %s", + totemip_sa_print((const struct sockaddr *)&system_from)); + + return (0); + } + + iovec->iov_len = bytes_received; + + /* + * Handle incoming message + */ + instance->totemudpu_deliver_fn ( + instance->context, + iovec->iov_base, + iovec->iov_len, + &system_from); + + iovec->iov_len = UDP_RECEIVE_FRAME_SIZE_MAX; + return (0); +} + +static int netif_determine ( + struct totemudpu_instance *instance, + struct totem_ip_address *bindnet, + struct totem_ip_address *bound_to, + int *interface_up, + int *interface_num) +{ + int res; + + res = totemip_iface_check (bindnet, bound_to, + interface_up, interface_num, + instance->totem_config->clear_node_high_bit); + + + return (res); +} + + +/* + * If the interface is up, the sockets for totem are built. If the interface is down + * this function is requeued in the timer list to retry building the sockets later. + */ +static void timer_function_netif_check_timeout ( + void *data) +{ + struct totemudpu_instance *instance = (struct totemudpu_instance *)data; + int interface_up; + int interface_num; + + /* + * Build sockets for every interface + */ + netif_determine (instance, + &instance->totem_interface->bindnet, + &instance->totem_interface->boundto, + &interface_up, &interface_num); + /* + * If the network interface isn't back up and we are already + * in loopback mode, add timer to check again and return + */ + if ((instance->netif_bind_state == BIND_STATE_LOOPBACK && + interface_up == 0) || + + (instance->my_memb_entries == 1 && + instance->netif_bind_state == BIND_STATE_REGULAR && + interface_up == 1)) { + + qb_loop_timer_add (instance->totemudpu_poll_handle, + QB_LOOP_MED, + instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC, + (void *)instance, + timer_function_netif_check_timeout, + &instance->timer_netif_check_timeout); + + /* + * Add a timer to check for a downed regular interface + */ + return; + } + + if (instance->token_socket > 0) { + qb_loop_poll_del (instance->totemudpu_poll_handle, + instance->token_socket); + close (instance->token_socket); + instance->token_socket = -1; + } + + if (interface_up == 0) { + if (instance->netif_bind_state == BIND_STATE_UNBOUND) { + log_printf (instance->totemudpu_log_level_error, + "One of your ip addresses are now bound to localhost. " + "Corosync would not work correctly."); + exit(COROSYNC_DONE_FATAL_ERR); + } + + /* + * Interface is not up + */ + instance->netif_bind_state = BIND_STATE_LOOPBACK; + + /* + * Add a timer to retry building interfaces and request memb_gather_enter + */ + qb_loop_timer_add (instance->totemudpu_poll_handle, + QB_LOOP_MED, + instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC, + (void *)instance, + timer_function_netif_check_timeout, + &instance->timer_netif_check_timeout); + } else { + /* + * Interface is up + */ + instance->netif_bind_state = BIND_STATE_REGULAR; + } + /* + * Create and bind the multicast and unicast sockets + */ + totemudpu_build_sockets (instance, + &instance->totem_interface->bindnet, + &instance->totem_interface->boundto); + + if (instance->netif_bind_state == BIND_STATE_REGULAR) { + qb_loop_poll_add (instance->totemudpu_poll_handle, + QB_LOOP_MED, + instance->token_socket, + POLLIN, instance, net_deliver_fn); + } + + totemip_copy (&instance->my_id, &instance->totem_interface->boundto); + + /* + * This reports changes in the interface to the user and totemsrp + */ + if (instance->netif_bind_state == BIND_STATE_REGULAR) { + if (instance->netif_state_report & NETIF_STATE_REPORT_UP) { + log_printf (instance->totemudpu_log_level_notice, + "The network interface [%s] is now up.", + totemip_print (&instance->totem_interface->boundto)); + instance->netif_state_report = NETIF_STATE_REPORT_DOWN; + instance->totemudpu_iface_change_fn (instance->context, &instance->my_id, 0); + } + /* + * Add a timer to check for interface going down in single membership + */ + if (instance->my_memb_entries == 1) { + qb_loop_timer_add (instance->totemudpu_poll_handle, + QB_LOOP_MED, + instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC, + (void *)instance, + timer_function_netif_check_timeout, + &instance->timer_netif_check_timeout); + } + + } else { + if (instance->netif_state_report & NETIF_STATE_REPORT_DOWN) { + log_printf (instance->totemudpu_log_level_notice, + "The network interface is down."); + instance->totemudpu_iface_change_fn (instance->context, &instance->my_id, 0); + } + instance->netif_state_report = NETIF_STATE_REPORT_UP; + + } +} + +/* Set the socket priority to INTERACTIVE to ensure + that our messages don't get queued behind anything else */ +static void totemudpu_traffic_control_set(struct totemudpu_instance *instance, int sock) +{ +#ifdef SO_PRIORITY + int prio = 6; /* TC_PRIO_INTERACTIVE */ + + if (setsockopt(sock, SOL_SOCKET, SO_PRIORITY, &prio, sizeof(int))) { + LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning, + "Could not set traffic priority"); + } +#endif +} + +static int totemudpu_build_sockets_ip ( + struct totemudpu_instance *instance, + struct totem_ip_address *bindnet_address, + struct totem_ip_address *bound_to, + int interface_num) +{ + struct sockaddr_storage sockaddr; + int addrlen; + int res; + unsigned int recvbuf_size; + unsigned int optlen = sizeof (recvbuf_size); + unsigned int retries = 0; + + /* + * Setup unicast socket + */ + instance->token_socket = socket (bindnet_address->family, SOCK_DGRAM, 0); + if (instance->token_socket == -1) { + LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning, + "socket() failed"); + return (-1); + } + + totemip_nosigpipe (instance->token_socket); + res = fcntl (instance->token_socket, F_SETFL, O_NONBLOCK); + if (res == -1) { + LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning, + "Could not set non-blocking operation on token socket"); + return (-1); + } + + /* + * Bind to unicast socket used for token send/receives + * This has the side effect of binding to the correct interface + */ + totemip_totemip_to_sockaddr_convert(bound_to, instance->totem_interface->ip_port, &sockaddr, &addrlen); + while (1) { + res = bind (instance->token_socket, (struct sockaddr *)&sockaddr, addrlen); + if (res == 0) { + break; + } + LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning, + "bind token socket failed"); + if (++retries > BIND_MAX_RETRIES) { + break; + } + + /* + * Wait for a while + */ + (void)poll(NULL, 0, BIND_RETRIES_INTERVAL * retries); + } + + if (res == -1) { + return (-1); + } + + /* + * the token_socket can receive many messages. Allow a large number + * of receive messages on this socket + */ + recvbuf_size = MCAST_SOCKET_BUFFER_SIZE; + res = setsockopt (instance->token_socket, SOL_SOCKET, SO_RCVBUF, + &recvbuf_size, optlen); + if (res == -1) { + LOGSYS_PERROR (errno, instance->totemudpu_log_level_notice, + "Could not set recvbuf size"); + } + + return 0; +} + +int totemudpu_nodestatus_get (void *udpu_context, unsigned int nodeid, + struct totem_node_status *node_status) +{ + struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; + struct qb_list_head *list; + struct totemudpu_member *member; + + qb_list_for_each(list, &(instance->member_list)) { + member = qb_list_entry (list, + struct totemudpu_member, + list); + + if (member->member.nodeid == nodeid) { + node_status->nodeid = nodeid; + /* reachable is filled in by totemsrp */ + if (instance->netif_bind_state == BIND_STATE_REGULAR) { + node_status->link_status[0].enabled = 1; + } else { + node_status->link_status[0].enabled = 0; + } + node_status->link_status[0].connected = node_status->reachable; + node_status->link_status[0].mtu = instance->totem_config->net_mtu; + strncpy(node_status->link_status[0].src_ipaddr, totemip_print(&member->member), KNET_MAX_HOST_LEN-1); + } + } + return (0); +} + +int totemudpu_ifaces_get ( + void *net_context, + char ***status, + unsigned int *iface_count) +{ + static char *statuses[INTERFACE_MAX] = {(char*)"OK"}; + + if (status) { + *status = statuses; + } + *iface_count = 1; + + return (0); +} + + +static int totemudpu_build_local_sockets( + struct totemudpu_instance *instance) +{ + int i; + unsigned int sendbuf_size; + unsigned int recvbuf_size; + unsigned int optlen = sizeof (sendbuf_size); + int res; + + /* + * Create local multicast loop socket + */ + if (socketpair(AF_UNIX, SOCK_DGRAM, 0, instance->local_loop_sock) == -1) { + LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning, + "socket() failed"); + return (-1); + } + + for (i = 0; i < 2; i++) { + totemip_nosigpipe (instance->local_loop_sock[i]); + res = fcntl (instance->local_loop_sock[i], F_SETFL, O_NONBLOCK); + if (res == -1) { + LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning, + "Could not set non-blocking operation on multicast socket"); + return (-1); + } + } + + recvbuf_size = MCAST_SOCKET_BUFFER_SIZE; + sendbuf_size = MCAST_SOCKET_BUFFER_SIZE; + + res = setsockopt (instance->local_loop_sock[0], SOL_SOCKET, SO_RCVBUF, &recvbuf_size, optlen); + if (res == -1) { + LOGSYS_PERROR (errno, instance->totemudpu_log_level_debug, + "Unable to set SO_RCVBUF size on UDP local mcast loop socket"); + return (-1); + } + res = setsockopt (instance->local_loop_sock[1], SOL_SOCKET, SO_SNDBUF, &sendbuf_size, optlen); + if (res == -1) { + LOGSYS_PERROR (errno, instance->totemudpu_log_level_debug, + "Unable to set SO_SNDBUF size on UDP local mcast loop socket"); + return (-1); + } + + res = getsockopt (instance->local_loop_sock[0], SOL_SOCKET, SO_RCVBUF, &recvbuf_size, &optlen); + if (res == 0) { + log_printf (instance->totemudpu_log_level_debug, + "Local receive multicast loop socket recv buffer size (%d bytes).", recvbuf_size); + } + + res = getsockopt (instance->local_loop_sock[1], SOL_SOCKET, SO_SNDBUF, &sendbuf_size, &optlen); + if (res == 0) { + log_printf (instance->totemudpu_log_level_debug, + "Local transmit multicast loop socket send buffer size (%d bytes).", sendbuf_size); + } + + return (0); +} + +static int totemudpu_build_sockets ( + struct totemudpu_instance *instance, + struct totem_ip_address *bindnet_address, + struct totem_ip_address *bound_to) +{ + int interface_num; + int interface_up; + int res; + + /* + * Determine the ip address bound to and the interface name + */ + res = netif_determine (instance, + bindnet_address, + bound_to, + &interface_up, + &interface_num); + + if (res == -1) { + return (-1); + } + + totemip_copy(&instance->my_id, bound_to); + + res = totemudpu_build_sockets_ip (instance, + bindnet_address, bound_to, interface_num); + + if (res == -1) { + /* if we get here, corosync won't work anyway, so better leaving than faking to work */ + LOGSYS_PERROR (errno, instance->totemudpu_log_level_error, + "Unable to create sockets, exiting"); + exit(EXIT_FAILURE); + } + + /* We only send out of the token socket */ + totemudpu_traffic_control_set(instance, instance->token_socket); + + /* + * Rebind all members to new ips + */ + totemudpu_member_list_rebind_ip(instance); + + return res; +} + +/* + * Totem Network interface + * depends on poll abstraction, POSIX, IPV4 + */ + +/* + * Create an instance + */ +int totemudpu_initialize ( + qb_loop_t *poll_handle, + void **udpu_context, + struct totem_config *totem_config, + totemsrp_stats_t *stats, + void *context, + + int (*deliver_fn) ( + void *context, + const void *msg, + unsigned int msg_len, + const struct sockaddr_storage *system_from), + + int (*iface_change_fn) ( + void *context, + const struct totem_ip_address *iface_address, + unsigned int ring_no), + + void (*mtu_changed) ( + void *context, + int net_mtu), + + void (*target_set_completed) ( + void *context)) +{ + struct totemudpu_instance *instance; + + instance = malloc (sizeof (struct totemudpu_instance)); + if (instance == NULL) { + return (-1); + } + + totemudpu_instance_initialize (instance); + + instance->totem_config = totem_config; + instance->stats = stats; + + /* + * Configure logging + */ + instance->totemudpu_log_level_security = 1; //totem_config->totem_logging_configuration.log_level_security; + instance->totemudpu_log_level_error = totem_config->totem_logging_configuration.log_level_error; + instance->totemudpu_log_level_warning = totem_config->totem_logging_configuration.log_level_warning; + instance->totemudpu_log_level_notice = totem_config->totem_logging_configuration.log_level_notice; + instance->totemudpu_log_level_debug = totem_config->totem_logging_configuration.log_level_debug; + instance->totemudpu_subsys_id = totem_config->totem_logging_configuration.log_subsys_id; + instance->totemudpu_log_printf = totem_config->totem_logging_configuration.log_printf; + + /* + * Initialize local variables for totemudpu + */ + instance->totem_interface = &totem_config->interfaces[0]; + memset (instance->iov_buffer, 0, UDP_RECEIVE_FRAME_SIZE_MAX); + + instance->totemudpu_poll_handle = poll_handle; + + instance->totem_interface->bindnet.nodeid = instance->totem_config->node_id; + + instance->context = context; + instance->totemudpu_deliver_fn = deliver_fn; + + instance->totemudpu_iface_change_fn = iface_change_fn; + + instance->totemudpu_target_set_completed = target_set_completed; + + /* + * Create static local mcast sockets + */ + if (totemudpu_build_local_sockets(instance) == -1) { + free(instance); + return (-1); + } + + qb_loop_poll_add ( + instance->totemudpu_poll_handle, + QB_LOOP_MED, + instance->local_loop_sock[0], + POLLIN, instance, net_deliver_fn); + + /* + * RRP layer isn't ready to receive message because it hasn't + * initialized yet. Add short timer to check the interfaces. + */ + qb_loop_timer_add (instance->totemudpu_poll_handle, + QB_LOOP_MED, + 100*QB_TIME_NS_IN_MSEC, + (void *)instance, + timer_function_netif_check_timeout, + &instance->timer_netif_check_timeout); + + totemudpu_start_merge_detect_timeout((void*)instance); + + *udpu_context = instance; + return (0); +} + +void *totemudpu_buffer_alloc (void) +{ + return malloc (FRAME_SIZE_MAX); +} + +void totemudpu_buffer_release (void *ptr) +{ + return free (ptr); +} + +int totemudpu_processor_count_set ( + void *udpu_context, + int processor_count) +{ + struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; + int res = 0; + + instance->my_memb_entries = processor_count; + qb_loop_timer_del (instance->totemudpu_poll_handle, + instance->timer_netif_check_timeout); + if (processor_count == 1) { + qb_loop_timer_add (instance->totemudpu_poll_handle, + QB_LOOP_MED, + instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC, + (void *)instance, + timer_function_netif_check_timeout, + &instance->timer_netif_check_timeout); + } + + return (res); +} + +int totemudpu_recv_flush (void *udpu_context) +{ + int res = 0; + + return (res); +} + +int totemudpu_send_flush (void *udpu_context) +{ + int res = 0; + + return (res); +} + +int totemudpu_token_send ( + void *udpu_context, + const void *msg, + unsigned int msg_len) +{ + struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; + int res = 0; + + ucast_sendmsg (instance, &instance->token_target, msg, msg_len); + + return (res); +} +int totemudpu_mcast_flush_send ( + void *udpu_context, + const void *msg, + unsigned int msg_len) +{ + struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; + int res = 0; + + mcast_sendmsg (instance, msg, msg_len, 0); + + return (res); +} + +int totemudpu_mcast_noflush_send ( + void *udpu_context, + const void *msg, + unsigned int msg_len) +{ + struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; + int res = 0; + + mcast_sendmsg (instance, msg, msg_len, 1); + + return (res); +} + +extern int totemudpu_iface_check (void *udpu_context) +{ + struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; + int res = 0; + + timer_function_netif_check_timeout (instance); + + return (res); +} + +extern void totemudpu_net_mtu_adjust (void *udpu_context, struct totem_config *totem_config) +{ + totem_config->net_mtu -= totemip_udpip_header_size(totem_config->interfaces[0].bindnet.family); +} + + +int totemudpu_token_target_set ( + void *udpu_context, + unsigned int nodeid) +{ + + struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; + struct qb_list_head *list; + struct totemudpu_member *member; + int res = 0; + + qb_list_for_each(list, &(instance->member_list)) { + member = qb_list_entry (list, + struct totemudpu_member, + list); + + if (member->member.nodeid == nodeid) { + memcpy (&instance->token_target, &member->member, + sizeof (struct totem_ip_address)); + + instance->totemudpu_target_set_completed (instance->context); + break; + } + } + return (res); +} + +extern int totemudpu_recv_mcast_empty ( + void *udpu_context) +{ + struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; + unsigned int res; + struct sockaddr_storage system_from; + struct msghdr msg_recv; + struct pollfd ufd; + int nfds, i; + int msg_processed = 0; + int sock; + + /* + * Receive datagram + */ + msg_recv.msg_name = &system_from; + msg_recv.msg_namelen = sizeof (struct sockaddr_storage); + msg_recv.msg_iov = &instance->totemudpu_iov_recv; + msg_recv.msg_iovlen = 1; +#ifdef HAVE_MSGHDR_CONTROL + msg_recv.msg_control = 0; +#endif +#ifdef HAVE_MSGHDR_CONTROLLEN + msg_recv.msg_controllen = 0; +#endif +#ifdef HAVE_MSGHDR_FLAGS + msg_recv.msg_flags = 0; +#endif +#ifdef HAVE_MSGHDR_ACCRIGHTS + msg_recv.msg_accrights = NULL; +#endif +#ifdef HAVE_MSGHDR_ACCRIGHTSLEN + msg_recv.msg_accrightslen = 0; +#endif + + for (i = 0; i < 2; i++) { + sock = -1; + if (i == 0) { + if (instance->netif_bind_state == BIND_STATE_REGULAR) { + sock = instance->token_socket; + } else { + continue; + } + } + if (i == 1) { + sock = instance->local_loop_sock[0]; + } + assert(sock != -1); + + do { + ufd.fd = sock; + ufd.events = POLLIN; + nfds = poll (&ufd, 1, 0); + if (nfds == 1 && ufd.revents & POLLIN) { + res = recvmsg (sock, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT); + if (res != -1) { + msg_processed = 1; + } else { + msg_processed = -1; + } + } + } while (nfds == 1); + } + + return (msg_processed); +} + +static int totemudpu_create_sending_socket( + void *udpu_context, + const struct totem_ip_address *member) +{ + struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; + int fd; + int res; + unsigned int sendbuf_size; + unsigned int optlen = sizeof (sendbuf_size); + struct sockaddr_storage sockaddr; + int addrlen; + + fd = socket (member->family, SOCK_DGRAM, 0); + if (fd == -1) { + LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning, + "Could not create socket for new member"); + return (-1); + } + totemip_nosigpipe (fd); + res = fcntl (fd, F_SETFL, O_NONBLOCK); + if (res == -1) { + LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning, + "Could not set non-blocking operation on token socket"); + goto error_close_fd; + } + + /* + * These sockets are used to send multicast messages, so their buffers + * should be large + */ + sendbuf_size = MCAST_SOCKET_BUFFER_SIZE; + res = setsockopt (fd, SOL_SOCKET, SO_SNDBUF, + &sendbuf_size, optlen); + if (res == -1) { + LOGSYS_PERROR (errno, instance->totemudpu_log_level_notice, + "Could not set sendbuf size"); + /* + * Fail in setting sendbuf size is not fatal -> don't exit + */ + } + + /* + * Bind to sending interface + */ + totemip_totemip_to_sockaddr_convert(&instance->my_id, 0, &sockaddr, &addrlen); + res = bind (fd, (struct sockaddr *)&sockaddr, addrlen); + if (res == -1) { + LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning, + "bind token socket failed"); + goto error_close_fd; + } + + return (fd); + +error_close_fd: + close(fd); + return (-1); +} + +int totemudpu_iface_set (void *net_context, + const struct totem_ip_address *local_addr, + unsigned short ip_port, + unsigned int iface_no) +{ + /* Not supported */ + return (-1); +} + +int totemudpu_member_add ( + void *udpu_context, + const struct totem_ip_address *local, + const struct totem_ip_address *member, + int ring_no) +{ + struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; + + struct totemudpu_member *new_member; + + new_member = malloc (sizeof (struct totemudpu_member)); + if (new_member == NULL) { + return (-1); + } + + memset(new_member, 0, sizeof(*new_member)); + + log_printf (LOGSYS_LEVEL_NOTICE, "adding new UDPU member {%s}", + totemip_print(member)); + qb_list_init (&new_member->list); + qb_list_add_tail (&new_member->list, &instance->member_list); + memcpy (&new_member->member, member, sizeof (struct totem_ip_address)); + new_member->fd = totemudpu_create_sending_socket(udpu_context, member); + new_member->active = 1; + + return (0); +} + +int totemudpu_member_remove ( + void *udpu_context, + const struct totem_ip_address *token_target, + int ring_no) +{ + int found = 0; + struct qb_list_head *list; + struct totemudpu_member *member; + + struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; + + /* + * Find the member to remove and close its socket + */ + qb_list_for_each(list, &(instance->member_list)) { + member = qb_list_entry (list, + struct totemudpu_member, + list); + + if (totemip_compare (token_target, &member->member)==0) { + log_printf(LOGSYS_LEVEL_NOTICE, + "removing UDPU member {%s}", + totemip_print(&member->member)); + + if (member->fd > 0) { + log_printf(LOGSYS_LEVEL_DEBUG, + "Closing socket to: {%s}", + totemip_print(&member->member)); + qb_loop_poll_del (instance->totemudpu_poll_handle, + member->fd); + close (member->fd); + } + found = 1; + break; + } + } + + /* + * Delete the member from the list + */ + if (found) { + qb_list_del (list); + } + + instance = NULL; + return (0); +} + +int totemudpu_member_list_rebind_ip ( + void *udpu_context) +{ + struct qb_list_head *list; + struct totemudpu_member *member; + + struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; + + qb_list_for_each(list, &(instance->member_list)) { + member = qb_list_entry (list, + struct totemudpu_member, + list); + + if (member->fd > 0) { + close (member->fd); + } + + member->fd = totemudpu_create_sending_socket(udpu_context, &member->member); + } + + return (0); +} + + +static void timer_function_merge_detect_timeout ( + void *data) +{ + struct totemudpu_instance *instance = (struct totemudpu_instance *)data; + + if (instance->merge_detect_messages_sent_before_timeout == 0) { + instance->send_merge_detect_message = 1; + } + + instance->merge_detect_messages_sent_before_timeout = 0; + + totemudpu_start_merge_detect_timeout(instance); +} + +static void totemudpu_start_merge_detect_timeout( + void *udpu_context) +{ + struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; + + qb_loop_timer_add(instance->totemudpu_poll_handle, + QB_LOOP_MED, + instance->totem_config->merge_timeout * 2 * QB_TIME_NS_IN_MSEC, + (void *)instance, + timer_function_merge_detect_timeout, + &instance->timer_merge_detect_timeout); + +} + +static void totemudpu_stop_merge_detect_timeout( + void *udpu_context) +{ + struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; + + qb_loop_timer_del(instance->totemudpu_poll_handle, + instance->timer_merge_detect_timeout); +} + +int totemudpu_reconfigure ( + void *udpu_context, + struct totem_config *totem_config) +{ + /* Not supported */ + return (-1); +} |