diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-06-03 17:01:24 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-06-03 17:01:24 +0000 |
commit | 6dd3dfb79125cd02d02efbce435a6c82e5af92ef (patch) | |
tree | 45084fc83278586f6bbafcb935f92d53f71a6b03 /lib/sam.c | |
parent | Initial commit. (diff) | |
download | corosync-6dd3dfb79125cd02d02efbce435a6c82e5af92ef.tar.xz corosync-6dd3dfb79125cd02d02efbce435a6c82e5af92ef.zip |
Adding upstream version 3.1.8.upstream/3.1.8upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'lib/sam.c')
-rw-r--r-- | lib/sam.c | 1489 |
1 files changed, 1489 insertions, 0 deletions
diff --git a/lib/sam.c b/lib/sam.c new file mode 100644 index 0000000..94dbf2a --- /dev/null +++ b/lib/sam.c @@ -0,0 +1,1489 @@ +/* + * Copyright (c) 2009-2011 Red Hat, Inc. + * + * All rights reserved. + * + * Author: Jan Friesse (jfriesse@redhat.com) + * + * This software licensed under BSD license, the text of which follows: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * - Neither the name of the Red Hat, Inc. nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ + +/* + * Provides a SAM API + */ + +#include <config.h> + +#include <limits.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#include <sys/time.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <errno.h> +#include <poll.h> + +#include <corosync/corotypes.h> +#include <qb/qbipcc.h> +#include <corosync/corodefs.h> +#include <corosync/cmap.h> +#include <corosync/hdb.h> +#include <corosync/quorum.h> + +#include <corosync/sam.h> + +#include "util.h" + +#include <stdio.h> +#include <sys/wait.h> +#include <signal.h> + +#define SAM_CMAP_S_FAILED "failed" +#define SAM_CMAP_S_REGISTERED "stopped" +#define SAM_CMAP_S_STARTED "running" +#define SAM_CMAP_S_Q_WAIT "waiting for quorum" + +#define SAM_RP_MASK_Q(pol) (pol & (~SAM_RECOVERY_POLICY_QUORUM)) +#define SAM_RP_MASK_C(pol) (pol & (~SAM_RECOVERY_POLICY_CMAP)) +#define SAM_RP_MASK(pol) (pol & (~(SAM_RECOVERY_POLICY_QUORUM | SAM_RECOVERY_POLICY_CMAP))) + +enum sam_internal_status_t { + SAM_INTERNAL_STATUS_NOT_INITIALIZED = 0, + SAM_INTERNAL_STATUS_INITIALIZED, + SAM_INTERNAL_STATUS_REGISTERED, + SAM_INTERNAL_STATUS_STARTED, + SAM_INTERNAL_STATUS_FINALIZED +}; + +enum sam_command_t { + SAM_COMMAND_START, + SAM_COMMAND_STOP, + SAM_COMMAND_HB, + SAM_COMMAND_DATA_STORE, + SAM_COMMAND_WARN_SIGNAL_SET, + SAM_COMMAND_MARK_FAILED, +}; + +enum sam_reply_t { + SAM_REPLY_OK, + SAM_REPLY_ERROR, +}; + +enum sam_parent_action_t { + SAM_PARENT_ACTION_ERROR, + SAM_PARENT_ACTION_RECOVERY, + SAM_PARENT_ACTION_QUIT, + SAM_PARENT_ACTION_CONTINUE +}; + +enum sam_cmap_key_t { + SAM_CMAP_KEY_RECOVERY, + SAM_CMAP_KEY_HC_PERIOD, + SAM_CMAP_KEY_LAST_HC, + SAM_CMAP_KEY_STATE, +}; + +static struct { + int time_interval; + sam_recovery_policy_t recovery_policy; + enum sam_internal_status_t internal_status; + unsigned int instance_id; + int child_fd_out; + int child_fd_in; + int term_send; + int warn_signal; + int am_i_child; + + sam_hc_callback_t hc_callback; + pthread_t cb_thread; + int cb_rpipe_fd, cb_wpipe_fd; + int cb_registered; + + void *user_data; + size_t user_data_size; + size_t user_data_allocated; + + pthread_mutex_t lock; + + quorum_handle_t quorum_handle; + uint32_t quorate; + int quorum_fd; + + cmap_handle_t cmap_handle; + char cmap_pid_path[CMAP_KEYNAME_MAXLEN]; +} sam_internal_data; + +extern const char *__progname; + +static cs_error_t sam_cmap_update_key (enum sam_cmap_key_t key, const char *value) +{ + cs_error_t err; + const char *svalue; + uint64_t hc_period, last_hc; + + const char *ssvalue[] = { [SAM_RECOVERY_POLICY_QUIT] = "quit", [SAM_RECOVERY_POLICY_RESTART] = "restart" }; + char key_name[CMAP_KEYNAME_MAXLEN]; + + switch (key) { + case SAM_CMAP_KEY_RECOVERY: + svalue = ssvalue[SAM_RP_MASK (sam_internal_data.recovery_policy)]; + + if (snprintf(key_name, CMAP_KEYNAME_MAXLEN, "%s%s", sam_internal_data.cmap_pid_path, + "recovery") >= CMAP_KEYNAME_MAXLEN) { + + err = CS_ERR_NAME_TOO_LONG; + goto exit_error; + } + + if ((err = cmap_set_string(sam_internal_data.cmap_handle, key_name, svalue)) != CS_OK) { + goto exit_error; + } + break; + case SAM_CMAP_KEY_HC_PERIOD: + hc_period = sam_internal_data.time_interval; + + if (snprintf(key_name, CMAP_KEYNAME_MAXLEN, "%s%s", sam_internal_data.cmap_pid_path, + "poll_period") >= CMAP_KEYNAME_MAXLEN) { + + err = CS_ERR_NAME_TOO_LONG; + goto exit_error; + } + + if ((err = cmap_set_uint64(sam_internal_data.cmap_handle, key_name, hc_period)) != CS_OK) { + goto exit_error; + } + break; + case SAM_CMAP_KEY_LAST_HC: + last_hc = cs_timestamp_get(); + + if (snprintf(key_name, CMAP_KEYNAME_MAXLEN, "%s%s", sam_internal_data.cmap_pid_path, + "last_updated") >= CMAP_KEYNAME_MAXLEN) { + + err = CS_ERR_NAME_TOO_LONG; + goto exit_error; + } + if ((err = cmap_set_uint64(sam_internal_data.cmap_handle, key_name, last_hc)) != CS_OK) { + goto exit_error; + } + break; + case SAM_CMAP_KEY_STATE: + svalue = value; + if (snprintf(key_name, CMAP_KEYNAME_MAXLEN, "%s%s", sam_internal_data.cmap_pid_path, + "state") >= CMAP_KEYNAME_MAXLEN) { + + err = CS_ERR_NAME_TOO_LONG; + goto exit_error; + } + + if ((err = cmap_set_string(sam_internal_data.cmap_handle, key_name, svalue)) != CS_OK) { + goto exit_error; + } + break; + } + + return (CS_OK); + +exit_error: + return (err); +} + +static cs_error_t sam_cmap_destroy_pid_path (void) +{ + cmap_iter_handle_t iter; + cs_error_t err; + char key_name[CMAP_KEYNAME_MAXLEN + 1]; + + err = cmap_iter_init(sam_internal_data.cmap_handle, sam_internal_data.cmap_pid_path, &iter); + if (err != CS_OK) { + goto error_exit; + } + + while ((err = cmap_iter_next(sam_internal_data.cmap_handle, iter, key_name, NULL, NULL)) == CS_OK) { + cmap_delete(sam_internal_data.cmap_handle, key_name); + } + + err = cmap_iter_finalize(sam_internal_data.cmap_handle, iter); + +error_exit: + return (err); +} + +static cs_error_t sam_cmap_register (void) +{ + cs_error_t err; + cmap_handle_t cmap_handle; + + if ((err = cmap_initialize (&cmap_handle)) != CS_OK) { + return (err); + } + + snprintf(sam_internal_data.cmap_pid_path, CMAP_KEYNAME_MAXLEN, "resources.process.%d.", getpid()); + + sam_internal_data.cmap_handle = cmap_handle; + + if ((err = sam_cmap_update_key (SAM_CMAP_KEY_RECOVERY, NULL)) != CS_OK) { + goto destroy_finalize_error; + } + + if ((err = sam_cmap_update_key (SAM_CMAP_KEY_HC_PERIOD, NULL)) != CS_OK) { + goto destroy_finalize_error; + } + + return (CS_OK); + +destroy_finalize_error: + sam_cmap_destroy_pid_path (); + cmap_finalize (cmap_handle); + return (err); +} + +static void quorum_notification_fn ( + quorum_handle_t handle, + uint32_t quorate, + uint64_t ring_id, + uint32_t view_list_entries, + uint32_t *view_list) +{ + sam_internal_data.quorate = quorate; +} + +cs_error_t sam_initialize ( + int time_interval, + sam_recovery_policy_t recovery_policy) +{ + quorum_callbacks_t quorum_callbacks; + uint32_t quorum_type; + cs_error_t err; + + if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_NOT_INITIALIZED) { + return (CS_ERR_BAD_HANDLE); + } + + if (SAM_RP_MASK (recovery_policy) != SAM_RECOVERY_POLICY_QUIT && + SAM_RP_MASK (recovery_policy) != SAM_RECOVERY_POLICY_RESTART) { + return (CS_ERR_INVALID_PARAM); + } + + if (recovery_policy & SAM_RECOVERY_POLICY_QUORUM) { + /* + * Initialize quorum + */ + quorum_callbacks.quorum_notify_fn = quorum_notification_fn; + if ((err = quorum_initialize (&sam_internal_data.quorum_handle, &quorum_callbacks, &quorum_type)) != CS_OK) { + goto exit_error; + } + + if ((err = quorum_trackstart (sam_internal_data.quorum_handle, CS_TRACK_CHANGES)) != CS_OK) { + goto exit_error_quorum; + } + + if ((err = quorum_fd_get (sam_internal_data.quorum_handle, &sam_internal_data.quorum_fd)) != CS_OK) { + goto exit_error_quorum; + } + + /* + * Dispatch initial quorate state + */ + if ((err = quorum_dispatch (sam_internal_data.quorum_handle, CS_DISPATCH_ONE)) != CS_OK) { + goto exit_error_quorum; + } + } + sam_internal_data.recovery_policy = recovery_policy; + + sam_internal_data.time_interval = time_interval; + + sam_internal_data.internal_status = SAM_INTERNAL_STATUS_INITIALIZED; + + sam_internal_data.warn_signal = SIGTERM; + + sam_internal_data.am_i_child = 0; + + sam_internal_data.user_data = NULL; + sam_internal_data.user_data_size = 0; + sam_internal_data.user_data_allocated = 0; + + pthread_mutex_init (&sam_internal_data.lock, NULL); + + return (CS_OK); + +exit_error_quorum: + quorum_finalize (sam_internal_data.quorum_handle); +exit_error: + return (err); +} + +/* + * Wrapper on top of write(2) function. It handles EAGAIN and EINTR states and sends whole buffer if possible. + */ +static size_t sam_safe_write ( + int d, + const void *buf, + size_t nbyte) +{ + ssize_t bytes_write; + ssize_t tmp_bytes_write; + + bytes_write = 0; + + do { + tmp_bytes_write = write (d, (const char *)buf + bytes_write, + (nbyte - bytes_write > SSIZE_MAX) ? SSIZE_MAX : nbyte - bytes_write); + + if (tmp_bytes_write == -1) { + if (!(errno == EAGAIN || errno == EINTR)) + return -1; + } else { + bytes_write += tmp_bytes_write; + } + } while (bytes_write != nbyte); + + return (bytes_write); +} + +/* + * Wrapper on top of read(2) function. It handles EAGAIN and EINTR states and reads whole buffer if possible. + */ +static size_t sam_safe_read ( + int d, + void *buf, + size_t nbyte) +{ + ssize_t bytes_read; + ssize_t tmp_bytes_read; + + bytes_read = 0; + + do { + tmp_bytes_read = read (d, (char *)buf + bytes_read, + (nbyte - bytes_read > SSIZE_MAX) ? SSIZE_MAX : nbyte - bytes_read); + + if (tmp_bytes_read == -1) { + if (!(errno == EAGAIN || errno == EINTR)) + return -1; + } else { + bytes_read += tmp_bytes_read; + } + + } while (bytes_read != nbyte && tmp_bytes_read != 0); + + return (bytes_read); +} + +static cs_error_t sam_read_reply ( + int child_fd_in) +{ + char reply; + cs_error_t err; + + if (sam_safe_read (sam_internal_data.child_fd_in, &reply, sizeof (reply)) != sizeof (reply)) { + return (CS_ERR_LIBRARY); + } + + switch (reply) { + case SAM_REPLY_ERROR: + /* + * Read error and return that + */ + if (sam_safe_read (sam_internal_data.child_fd_in, &err, sizeof (err)) != sizeof (err)) { + return (CS_ERR_LIBRARY); + } + + return (err); + break; + case SAM_REPLY_OK: + /* + * Everything correct + */ + break; + default: + return (CS_ERR_LIBRARY); + break; + } + + return (CS_OK); +} + +cs_error_t sam_data_getsize (size_t *size) +{ + if (size == NULL) { + return (CS_ERR_INVALID_PARAM); + } + + if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_INITIALIZED && + sam_internal_data.internal_status != SAM_INTERNAL_STATUS_REGISTERED && + sam_internal_data.internal_status != SAM_INTERNAL_STATUS_STARTED) { + + return (CS_ERR_BAD_HANDLE); + } + + pthread_mutex_lock (&sam_internal_data.lock); + + *size = sam_internal_data.user_data_size; + + pthread_mutex_unlock (&sam_internal_data.lock); + + return (CS_OK); +} + +cs_error_t sam_data_restore ( + void *data, + size_t size) +{ + cs_error_t err; + + err = CS_OK; + + if (data == NULL) { + return (CS_ERR_INVALID_PARAM); + } + + if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_INITIALIZED && + sam_internal_data.internal_status != SAM_INTERNAL_STATUS_REGISTERED && + sam_internal_data.internal_status != SAM_INTERNAL_STATUS_STARTED) { + + return (CS_ERR_BAD_HANDLE); + } + + pthread_mutex_lock (&sam_internal_data.lock); + + if (sam_internal_data.user_data_size == 0) { + err = CS_OK; + + goto error_unlock; + } + + if (size < sam_internal_data.user_data_size) { + err = CS_ERR_INVALID_PARAM; + + goto error_unlock; + } + + memcpy (data, sam_internal_data.user_data, sam_internal_data.user_data_size); + + pthread_mutex_unlock (&sam_internal_data.lock); + + return (CS_OK); + +error_unlock: + pthread_mutex_unlock (&sam_internal_data.lock); + + return (err); +} + +cs_error_t sam_data_store ( + const void *data, + size_t size) +{ + cs_error_t err; + char command; + char *new_data; + + if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_INITIALIZED && + sam_internal_data.internal_status != SAM_INTERNAL_STATUS_REGISTERED && + sam_internal_data.internal_status != SAM_INTERNAL_STATUS_STARTED) { + + return (CS_ERR_BAD_HANDLE); + } + + + if (data == NULL) { + size = 0; + } + + pthread_mutex_lock (&sam_internal_data.lock); + + if (sam_internal_data.am_i_child) { + /* + * We are child so we must send data to parent + */ + command = SAM_COMMAND_DATA_STORE; + if (sam_safe_write (sam_internal_data.child_fd_out, &command, sizeof (command)) != sizeof (command)) { + err = CS_ERR_LIBRARY; + + goto error_unlock; + } + + if (sam_safe_write (sam_internal_data.child_fd_out, &size, sizeof (size)) != sizeof (size)) { + err = CS_ERR_LIBRARY; + + goto error_unlock; + } + + if (data != NULL && sam_safe_write (sam_internal_data.child_fd_out, data, size) != size) { + err = CS_ERR_LIBRARY; + + goto error_unlock; + } + + /* + * And wait for reply + */ + if ((err = sam_read_reply (sam_internal_data.child_fd_in)) != CS_OK) { + goto error_unlock; + } + } + + /* + * We are parent or we received OK reply from parent -> do required action + */ + if (data == NULL) { + free (sam_internal_data.user_data); + sam_internal_data.user_data = NULL; + sam_internal_data.user_data_allocated = 0; + sam_internal_data.user_data_size = 0; + } else { + if (sam_internal_data.user_data_allocated < size) { + if ((new_data = realloc (sam_internal_data.user_data, size)) == NULL) { + err = CS_ERR_NO_MEMORY; + + goto error_unlock; + } + + sam_internal_data.user_data_allocated = size; + } else { + new_data = sam_internal_data.user_data; + } + sam_internal_data.user_data = new_data; + sam_internal_data.user_data_size = size; + + memcpy (sam_internal_data.user_data, data, size); + } + + pthread_mutex_unlock (&sam_internal_data.lock); + + return (CS_OK); + +error_unlock: + pthread_mutex_unlock (&sam_internal_data.lock); + + return (err); +} + +cs_error_t sam_start (void) +{ + char command; + cs_error_t err; + sam_recovery_policy_t recpol; + + if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_REGISTERED) { + return (CS_ERR_BAD_HANDLE); + } + + recpol = sam_internal_data.recovery_policy; + + if (recpol & SAM_RECOVERY_POLICY_QUORUM || recpol & SAM_RECOVERY_POLICY_CMAP) { + pthread_mutex_lock (&sam_internal_data.lock); + } + + command = SAM_COMMAND_START; + + if (sam_safe_write (sam_internal_data.child_fd_out, &command, sizeof (command)) != sizeof (command)) { + if (recpol & SAM_RECOVERY_POLICY_QUORUM || recpol & SAM_RECOVERY_POLICY_CMAP) { + pthread_mutex_unlock (&sam_internal_data.lock); + } + + return (CS_ERR_LIBRARY); + } + + if (recpol & SAM_RECOVERY_POLICY_QUORUM || recpol & SAM_RECOVERY_POLICY_CMAP) { + /* + * Wait for parent reply + */ + if ((err = sam_read_reply (sam_internal_data.child_fd_in)) != CS_OK) { + pthread_mutex_unlock (&sam_internal_data.lock); + + return (err); + } + + pthread_mutex_unlock (&sam_internal_data.lock); + } + + if (sam_internal_data.hc_callback) + if (sam_safe_write (sam_internal_data.cb_wpipe_fd, &command, sizeof (command)) != sizeof (command)) + return (CS_ERR_LIBRARY); + + sam_internal_data.internal_status = SAM_INTERNAL_STATUS_STARTED; + + return (CS_OK); +} + +cs_error_t sam_stop (void) +{ + char command; + cs_error_t err; + + if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_STARTED) { + return (CS_ERR_BAD_HANDLE); + } + + command = SAM_COMMAND_STOP; + + if (sam_internal_data.recovery_policy & SAM_RECOVERY_POLICY_CMAP) { + pthread_mutex_lock (&sam_internal_data.lock); + } + + if (sam_safe_write (sam_internal_data.child_fd_out, &command, sizeof (command)) != sizeof (command)) { + if (sam_internal_data.recovery_policy & SAM_RECOVERY_POLICY_CMAP) { + pthread_mutex_unlock (&sam_internal_data.lock); + } + + return (CS_ERR_LIBRARY); + } + + if (sam_internal_data.recovery_policy & SAM_RECOVERY_POLICY_CMAP) { + /* + * Wait for parent reply + */ + if ((err = sam_read_reply (sam_internal_data.child_fd_in)) != CS_OK) { + pthread_mutex_unlock (&sam_internal_data.lock); + + return (err); + } + + pthread_mutex_unlock (&sam_internal_data.lock); + } + + if (sam_internal_data.hc_callback) + if (sam_safe_write (sam_internal_data.cb_wpipe_fd, &command, sizeof (command)) != sizeof (command)) + return (CS_ERR_LIBRARY); + + sam_internal_data.internal_status = SAM_INTERNAL_STATUS_REGISTERED; + + return (CS_OK); +} + +cs_error_t sam_hc_send (void) +{ + char command; + + if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_STARTED) { + return (CS_ERR_BAD_HANDLE); + } + + command = SAM_COMMAND_HB; + + if (sam_safe_write (sam_internal_data.child_fd_out, &command, sizeof (command)) != sizeof (command)) + return (CS_ERR_LIBRARY); + + return (CS_OK); +} + +cs_error_t sam_finalize (void) +{ + cs_error_t error; + + if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_INITIALIZED && + sam_internal_data.internal_status != SAM_INTERNAL_STATUS_REGISTERED && + sam_internal_data.internal_status != SAM_INTERNAL_STATUS_STARTED) { + return (CS_ERR_BAD_HANDLE); + } + + if (sam_internal_data.internal_status == SAM_INTERNAL_STATUS_STARTED) { + error = sam_stop (); + if (error != CS_OK) + goto exit_error; + } + + sam_internal_data.internal_status = SAM_INTERNAL_STATUS_FINALIZED; + + free (sam_internal_data.user_data); + +exit_error: + return (CS_OK); +} + +cs_error_t sam_mark_failed (void) +{ + char command; + + if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_STARTED && + sam_internal_data.internal_status != SAM_INTERNAL_STATUS_REGISTERED) { + return (CS_ERR_BAD_HANDLE); + } + + if (!(sam_internal_data.recovery_policy & SAM_RECOVERY_POLICY_CMAP)) { + return (CS_ERR_INVALID_PARAM); + } + + command = SAM_COMMAND_MARK_FAILED; + + if (sam_safe_write (sam_internal_data.child_fd_out, &command, sizeof (command)) != sizeof (command)) + return (CS_ERR_LIBRARY); + + return (CS_OK); +} + +cs_error_t sam_warn_signal_set (int warn_signal) +{ + char command; + cs_error_t err; + + if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_INITIALIZED && + sam_internal_data.internal_status != SAM_INTERNAL_STATUS_REGISTERED && + sam_internal_data.internal_status != SAM_INTERNAL_STATUS_STARTED) { + return (CS_ERR_BAD_HANDLE); + } + + pthread_mutex_lock (&sam_internal_data.lock); + + if (sam_internal_data.am_i_child) { + /* + * We are child so we must send data to parent + */ + command = SAM_COMMAND_WARN_SIGNAL_SET; + if (sam_safe_write (sam_internal_data.child_fd_out, &command, sizeof (command)) != sizeof (command)) { + err = CS_ERR_LIBRARY; + + goto error_unlock; + } + + if (sam_safe_write (sam_internal_data.child_fd_out, &warn_signal, sizeof (warn_signal)) != + sizeof (warn_signal)) { + err = CS_ERR_LIBRARY; + + goto error_unlock; + } + + /* + * And wait for reply + */ + if ((err = sam_read_reply (sam_internal_data.child_fd_in)) != CS_OK) { + goto error_unlock; + } + } + + /* + * We are parent or we received OK reply from parent -> do required action + */ + sam_internal_data.warn_signal = warn_signal; + + pthread_mutex_unlock (&sam_internal_data.lock); + + return (CS_OK); + +error_unlock: + pthread_mutex_unlock (&sam_internal_data.lock); + + return (err); +} + +static cs_error_t sam_parent_reply_send ( + cs_error_t err, + int parent_fd_in, + int parent_fd_out) +{ + char reply; + + if (err == CS_OK) { + reply = SAM_REPLY_OK; + + if (sam_safe_write (parent_fd_out, &reply, sizeof (reply)) != sizeof (reply)) { + err = CS_ERR_LIBRARY; + goto error_reply; + } + + return (CS_OK); + } + +error_reply: + reply = SAM_REPLY_ERROR; + if (sam_safe_write (parent_fd_out, &reply, sizeof (reply)) != sizeof (reply)) { + return (CS_ERR_LIBRARY); + } + if (sam_safe_write (parent_fd_out, &err, sizeof (err)) != sizeof (err)) { + return (CS_ERR_LIBRARY); + } + + return (err); +} + + +static cs_error_t sam_parent_warn_signal_set ( + int parent_fd_in, + int parent_fd_out) +{ + int warn_signal; + cs_error_t err; + + err = CS_OK; + + if (sam_safe_read (parent_fd_in, &warn_signal, sizeof (warn_signal)) != sizeof (warn_signal)) { + err = CS_ERR_LIBRARY; + goto error_reply; + } + + err = sam_warn_signal_set (warn_signal); + if (err != CS_OK) { + goto error_reply; + } + + + return (sam_parent_reply_send (CS_OK, parent_fd_in, parent_fd_out)); + +error_reply: + return (sam_parent_reply_send (err, parent_fd_in, parent_fd_out)); +} + +static cs_error_t sam_parent_wait_for_quorum ( + int parent_fd_in, + int parent_fd_out) +{ + cs_error_t err; + struct pollfd pfds[2]; + int poll_err; + + if (sam_internal_data.recovery_policy & SAM_RECOVERY_POLICY_CMAP) { + if ((err = sam_cmap_update_key (SAM_CMAP_KEY_STATE, SAM_CMAP_S_Q_WAIT)) != CS_OK) { + goto error_reply; + } + } + + /* + * Update current quorum + */ + if ((err = quorum_dispatch (sam_internal_data.quorum_handle, CS_DISPATCH_ALL)) != CS_OK) { + goto error_reply; + } + + /* + * Wait for quorum + */ + while (!sam_internal_data.quorate) { + pfds[0].fd = parent_fd_in; + pfds[0].events = 0; + pfds[0].revents = 0; + + pfds[1].fd = sam_internal_data.quorum_fd; + pfds[1].events = POLLIN; + pfds[1].revents = 0; + + poll_err = poll (pfds, 2, -1); + + if (poll_err == -1) { + /* + * Error in poll + * If it is EINTR, continue, otherwise QUIT + */ + if (errno != EINTR) { + err = CS_ERR_LIBRARY; + goto error_reply; + } + } + + if (pfds[0].revents != 0) { + if (pfds[0].revents == POLLERR || pfds[0].revents == POLLHUP ||pfds[0].revents == POLLNVAL) { + /* + * Child has exited + */ + return (CS_OK); + } + } + + if (pfds[1].revents != 0) { + if ((err = quorum_dispatch (sam_internal_data.quorum_handle, CS_DISPATCH_ONE)) != CS_OK) { + goto error_reply; + } + } + } + + if (sam_internal_data.recovery_policy & SAM_RECOVERY_POLICY_CMAP) { + if ((err = sam_cmap_update_key (SAM_CMAP_KEY_STATE, SAM_CMAP_S_STARTED)) != CS_OK) { + goto error_reply; + } + } + + return (sam_parent_reply_send (CS_OK, parent_fd_in, parent_fd_out)); + +error_reply: + if (sam_internal_data.recovery_policy & SAM_RECOVERY_POLICY_CMAP) { + sam_cmap_update_key (SAM_CMAP_KEY_STATE, SAM_CMAP_S_REGISTERED); + } + + return (sam_parent_reply_send (err, parent_fd_in, parent_fd_out)); +} + +static cs_error_t sam_parent_cmap_state_set ( + int parent_fd_in, + int parent_fd_out, + int state) +{ + cs_error_t err; + const char *state_s; + + if (state == 1) { + state_s = SAM_CMAP_S_STARTED; + } else { + state_s = SAM_CMAP_S_REGISTERED; + } + + if ((err = sam_cmap_update_key (SAM_CMAP_KEY_STATE, state_s)) != CS_OK) { + goto error_reply; + } + + return (sam_parent_reply_send (CS_OK, parent_fd_in, parent_fd_out)); + +error_reply: + return (sam_parent_reply_send (err, parent_fd_in, parent_fd_out)); +} + +static cs_error_t sam_parent_kill_child ( + int *action, + pid_t child_pid) +{ + /* + * Kill child process + */ + if (!sam_internal_data.term_send) { + /* + * We didn't send warn_signal yet. + */ + kill (child_pid, sam_internal_data.warn_signal); + + sam_internal_data.term_send = 1; + } else { + /* + * We sent child warning. Now, we will not be so nice + */ + kill (child_pid, SIGKILL); + *action = SAM_PARENT_ACTION_RECOVERY; + } + + return (CS_OK); +} + +static cs_error_t sam_parent_mark_child_failed ( + int *action, + pid_t child_pid) +{ + sam_recovery_policy_t recpol; + + recpol = sam_internal_data.recovery_policy; + + sam_internal_data.term_send = 1; + sam_internal_data.recovery_policy = SAM_RECOVERY_POLICY_QUIT | + (SAM_RP_MASK_C (recpol) ? SAM_RECOVERY_POLICY_CMAP : 0) | + (SAM_RP_MASK_Q (recpol) ? SAM_RECOVERY_POLICY_QUORUM : 0); + + return (sam_parent_kill_child (action, child_pid)); +} + +static cs_error_t sam_parent_data_store ( + int parent_fd_in, + int parent_fd_out) +{ + char *user_data; + ssize_t size; + cs_error_t err; + + err = CS_OK; + user_data = NULL; + + if (sam_safe_read (parent_fd_in, &size, sizeof (size)) != sizeof (size)) { + err = CS_ERR_LIBRARY; + goto error_reply; + } + + if (size > 0) { + user_data = malloc (size); + if (user_data == NULL) { + err = CS_ERR_NO_MEMORY; + goto error_reply; + } + + if (sam_safe_read (parent_fd_in, user_data, size) != size) { + err = CS_ERR_LIBRARY; + goto free_error_reply; + } + } + + err = sam_data_store (user_data, size); + if (err != CS_OK) { + goto free_error_reply; + } + + free (user_data); + + return (sam_parent_reply_send (CS_OK, parent_fd_in, parent_fd_out)); + +free_error_reply: + free (user_data); +error_reply: + return (sam_parent_reply_send (err, parent_fd_in, parent_fd_out)); +} + +static enum sam_parent_action_t sam_parent_handler ( + int parent_fd_in, + int parent_fd_out, + pid_t child_pid) +{ + int poll_error; + int action; + int status; + ssize_t bytes_read; + char command; + int time_interval; + struct pollfd pfds[2]; + nfds_t nfds; + cs_error_t err; + sam_recovery_policy_t recpol; + + status = 0; + + action = SAM_PARENT_ACTION_CONTINUE; + recpol = sam_internal_data.recovery_policy; + + while (action == SAM_PARENT_ACTION_CONTINUE) { + pfds[0].fd = parent_fd_in; + pfds[0].events = POLLIN; + pfds[0].revents = 0; + nfds = 1; + + if (status == 1 && sam_internal_data.time_interval != 0) { + time_interval = sam_internal_data.time_interval; + } else { + time_interval = -1; + } + + if (recpol & SAM_RECOVERY_POLICY_QUORUM) { + pfds[nfds].fd = sam_internal_data.quorum_fd; + pfds[nfds].events = POLLIN; + pfds[nfds].revents = 0; + nfds++; + } + + poll_error = poll (pfds, nfds, time_interval); + + if (poll_error == -1) { + /* + * Error in poll + * If it is EINTR, continue, otherwise QUIT + */ + if (errno != EINTR) { + action = SAM_PARENT_ACTION_ERROR; + } + } + + if (poll_error == 0) { + /* + * Time limit expires + */ + if (status == 0) { + action = SAM_PARENT_ACTION_QUIT; + } else { + sam_parent_kill_child (&action, child_pid); + } + } + + if (poll_error > 0) { + if (pfds[0].revents != 0) { + /* + * We have EOF or command in pipe + */ + bytes_read = sam_safe_read (parent_fd_in, &command, 1); + + if (bytes_read == 0) { + /* + * Handle EOF -> Take recovery action or quit if sam_start wasn't called + */ + if (status == 0) + action = SAM_PARENT_ACTION_QUIT; + else + action = SAM_PARENT_ACTION_RECOVERY; + + continue; + } + + if (bytes_read == -1) { + action = SAM_PARENT_ACTION_ERROR; + goto action_exit; + } + + if (recpol & SAM_RECOVERY_POLICY_CMAP) { + sam_cmap_update_key (SAM_CMAP_KEY_LAST_HC, NULL); + } + + /* + * We have read command + */ + switch (command) { + case SAM_COMMAND_START: + if (status == 0) { + /* + * Not started yet + */ + if (recpol & SAM_RECOVERY_POLICY_QUORUM) { + if (sam_parent_wait_for_quorum (parent_fd_in, + parent_fd_out) != CS_OK) { + continue; + } + } + + if (recpol & SAM_RECOVERY_POLICY_CMAP) { + if (sam_parent_cmap_state_set (parent_fd_in, + parent_fd_out, 1) != CS_OK) { + continue; + } + } + + status = 1; + } + break; + case SAM_COMMAND_STOP: + if (status == 1) { + /* + * Started + */ + if (recpol & SAM_RECOVERY_POLICY_CMAP) { + if (sam_parent_cmap_state_set (parent_fd_in, + parent_fd_out, 0) != CS_OK) { + continue; + } + } + + status = 0; + } + break; + case SAM_COMMAND_DATA_STORE: + sam_parent_data_store (parent_fd_in, parent_fd_out); + break; + case SAM_COMMAND_WARN_SIGNAL_SET: + sam_parent_warn_signal_set (parent_fd_in, parent_fd_out); + break; + case SAM_COMMAND_MARK_FAILED: + status = 1; + sam_parent_mark_child_failed (&action, child_pid); + break; + } + } /* if (pfds[0].revents != 0) */ + + if ((sam_internal_data.recovery_policy & SAM_RECOVERY_POLICY_QUORUM) && + pfds[1].revents != 0) { + /* + * Handle quorum change + */ + err = quorum_dispatch (sam_internal_data.quorum_handle, CS_DISPATCH_ALL); + + if (status == 1 && + (!sam_internal_data.quorate || (err != CS_ERR_TRY_AGAIN && err != CS_OK))) { + sam_parent_kill_child (&action, child_pid); + } + } + } /* select_error > 0 */ + } /* action == SAM_PARENT_ACTION_CONTINUE */ + +action_exit: + return action; +} + +cs_error_t sam_register ( + unsigned int *instance_id) +{ + cs_error_t error; + pid_t pid; + int pipe_error; + int pipe_fd_out[2], pipe_fd_in[2]; + enum sam_parent_action_t action, old_action; + int child_status; + sam_recovery_policy_t recpol; + + if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_INITIALIZED) { + return (CS_ERR_BAD_HANDLE); + } + + recpol = sam_internal_data.recovery_policy; + + if (recpol & SAM_RECOVERY_POLICY_CMAP) { + /* + * Register to cmap + */ + if ((error = sam_cmap_register ()) != CS_OK) { + goto error_exit; + } + } + + error = CS_OK; + + while (1) { + if ((pipe_error = pipe (pipe_fd_out)) != 0) { + error = CS_ERR_LIBRARY; + goto error_exit; + } + + if ((pipe_error = pipe (pipe_fd_in)) != 0) { + close (pipe_fd_out[0]); + close (pipe_fd_out[1]); + + error = CS_ERR_LIBRARY; + goto error_exit; + } + + if (recpol & SAM_RECOVERY_POLICY_CMAP) { + if ((error = sam_cmap_update_key (SAM_CMAP_KEY_STATE, SAM_CMAP_S_REGISTERED)) != CS_OK) { + goto error_exit; + } + } + + sam_internal_data.instance_id++; + + sam_internal_data.term_send = 0; + + pid = fork (); + + if (pid == -1) { + /* + * Fork error + */ + sam_internal_data.instance_id--; + + error = CS_ERR_LIBRARY; + goto error_exit; + } + + if (pid == 0) { + /* + * Child process + */ + close (pipe_fd_out[0]); + close (pipe_fd_in[1]); + + sam_internal_data.child_fd_out = pipe_fd_out[1]; + sam_internal_data.child_fd_in = pipe_fd_in[0]; + + if (instance_id) + *instance_id = sam_internal_data.instance_id; + + sam_internal_data.am_i_child = 1; + sam_internal_data.internal_status = SAM_INTERNAL_STATUS_REGISTERED; + + pthread_mutex_init (&sam_internal_data.lock, NULL); + + goto error_exit; + } else { + /* + * Parent process + */ + close (pipe_fd_out[1]); + close (pipe_fd_in[0]); + + action = sam_parent_handler (pipe_fd_out[0], pipe_fd_in[1], pid); + + close (pipe_fd_out[0]); + close (pipe_fd_in[1]); + + if (action == SAM_PARENT_ACTION_ERROR) { + error = CS_ERR_LIBRARY; + goto error_exit; + } + + /* + * We really don't like zombies + */ + while (waitpid (pid, &child_status, 0) == -1 && errno == EINTR) + ; + + old_action = action; + + if (action == SAM_PARENT_ACTION_RECOVERY) { + if (SAM_RP_MASK (sam_internal_data.recovery_policy) == SAM_RECOVERY_POLICY_QUIT) + action = SAM_PARENT_ACTION_QUIT; + } + + + if (action == SAM_PARENT_ACTION_QUIT) { + if (recpol & SAM_RECOVERY_POLICY_QUORUM) { + quorum_finalize (sam_internal_data.quorum_handle); + } + + if (recpol & SAM_RECOVERY_POLICY_CMAP) { + if (old_action == SAM_PARENT_ACTION_RECOVERY) { + /* + * Mark as failed + */ + sam_cmap_update_key (SAM_CMAP_KEY_STATE, SAM_CMAP_S_FAILED); + } else { + sam_cmap_destroy_pid_path (); + } + } + + exit (WEXITSTATUS (child_status)); + } + + + } + } + +error_exit: + return (error); +} + +static void *hc_callback_thread (void *unused_param) +{ + int poll_error; + int status; + ssize_t bytes_readed; + char command; + int time_interval, tmp_time_interval; + int counter; + struct pollfd pfds; + + status = 0; + counter = 0; + + time_interval = sam_internal_data.time_interval >> 2; + + while (1) { + pfds.fd = sam_internal_data.cb_rpipe_fd; + pfds.events = POLLIN; + pfds.revents = 0; + + if (status == 1) { + tmp_time_interval = time_interval; + } else { + tmp_time_interval = -1; + } + + poll_error = poll (&pfds, 1, tmp_time_interval); + + if (poll_error == 0) { + if (sam_hc_send () == CS_OK) { + counter++; + } + + if (counter >= 4) { + if (sam_internal_data.hc_callback () != 0) { + status = 3; + } + + counter = 0; + } + } + + if (poll_error > 0) { + bytes_readed = sam_safe_read (sam_internal_data.cb_rpipe_fd, &command, 1); + + if (bytes_readed > 0) { + if (status == 0 && command == SAM_COMMAND_START) + status = 1; + + if (status == 1 && command == SAM_COMMAND_STOP) + status = 0; + + } + } + } + + /* + * This makes compiler happy, it's same as return (NULL); + */ + return (unused_param); +} + +cs_error_t sam_hc_callback_register (sam_hc_callback_t cb) +{ + cs_error_t error = CS_OK; + pthread_attr_t thread_attr; + int pipe_error; + int pipe_fd[2]; + + if (sam_internal_data.internal_status != SAM_INTERNAL_STATUS_REGISTERED) { + return (CS_ERR_BAD_HANDLE); + } + + if (sam_internal_data.time_interval == 0) { + return (CS_ERR_INVALID_PARAM); + } + + if (sam_internal_data.cb_registered) { + sam_internal_data.hc_callback = cb; + + return (CS_OK); + } + + /* + * We know, this is first registration + */ + + if (cb == NULL) { + return (CS_ERR_INVALID_PARAM); + } + + pipe_error = pipe (pipe_fd); + + if (pipe_error != 0) { + /* + * Pipe creation error + */ + error = CS_ERR_LIBRARY; + goto error_exit; + } + + sam_internal_data.cb_rpipe_fd = pipe_fd[0]; + sam_internal_data.cb_wpipe_fd = pipe_fd[1]; + + /* + * Create thread attributes + */ + error = pthread_attr_init (&thread_attr); + if (error != 0) { + error = CS_ERR_LIBRARY; + goto error_close_fd_exit; + } + + + pthread_attr_setdetachstate (&thread_attr, PTHREAD_CREATE_DETACHED); + pthread_attr_setstacksize (&thread_attr, 32768); + + /* + * Create thread + */ + error = pthread_create (&sam_internal_data.cb_thread, &thread_attr, hc_callback_thread, NULL); + + if (error != 0) { + error = CS_ERR_LIBRARY; + goto error_attr_destroy_exit; + } + + /* + * Cleanup + */ + pthread_attr_destroy(&thread_attr); + + sam_internal_data.cb_registered = 1; + sam_internal_data.hc_callback = cb; + + return (CS_OK); + +error_attr_destroy_exit: + pthread_attr_destroy(&thread_attr); +error_close_fd_exit: + sam_internal_data.cb_rpipe_fd = sam_internal_data.cb_wpipe_fd = 0; + close (pipe_fd[0]); + close (pipe_fd[1]); +error_exit: + return (error); +} |