summaryrefslogtreecommitdiffstats
path: root/vqsim/vqsim_vq_engine.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-06-03 17:01:24 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-06-03 17:01:24 +0000
commit6dd3dfb79125cd02d02efbce435a6c82e5af92ef (patch)
tree45084fc83278586f6bbafcb935f92d53f71a6b03 /vqsim/vqsim_vq_engine.c
parentInitial commit. (diff)
downloadcorosync-6dd3dfb79125cd02d02efbce435a6c82e5af92ef.tar.xz
corosync-6dd3dfb79125cd02d02efbce435a6c82e5af92ef.zip
Adding upstream version 3.1.8.upstream/3.1.8upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vqsim/vqsim_vq_engine.c')
-rw-r--r--vqsim/vqsim_vq_engine.c479
1 files changed, 479 insertions, 0 deletions
diff --git a/vqsim/vqsim_vq_engine.c b/vqsim/vqsim_vq_engine.c
new file mode 100644
index 0000000..e0bb0bd
--- /dev/null
+++ b/vqsim/vqsim_vq_engine.c
@@ -0,0 +1,479 @@
+
+/* This is the bit of VQSIM that runs in the forked process.
+ It represents a single votequorum instance or, if you like,
+ a 'node' in the cluster.
+*/
+
+#include <sys/types.h>
+#include <qb/qblog.h>
+#include <qb/qbloop.h>
+#include <qb/qbipc_common.h>
+#include <netinet/in.h>
+#include <sys/poll.h>
+#include <sys/socket.h>
+#include <stdio.h>
+
+#include "../exec/votequorum.h"
+#include "../exec/service.h"
+#include "../include/corosync/corotypes.h"
+#include "../include/corosync/votequorum.h"
+#include "../include/corosync/ipc_votequorum.h"
+#include <corosync/logsys.h>
+#include <corosync/coroapi.h>
+
+#include "icmap.h"
+#include "vqsim.h"
+
+#define QDEVICE_NAME "VQsim_qdevice"
+
+/* Static variables here are per-instance because we are forked */
+static struct corosync_service_engine *engine;
+static int parent_socket; /* Our end of the socket */
+static char buffer[8192];
+static int our_nodeid;
+static char *private_data;
+static qb_loop_t *poll_loop;
+static qb_loop_timer_handle sync_timer;
+static qb_loop_timer_handle qdevice_timer;
+static int we_are_quorate;
+static void *fake_conn = (void*)1;
+static cs_error_t last_lib_error;
+static struct memb_ring_id current_ring_id;
+static int qdevice_registered;
+static unsigned int qdevice_timeout = VOTEQUORUM_QDEVICE_DEFAULT_TIMEOUT;
+
+/* 'Keep the compiler happy' time */
+char *get_run_dir(void);
+
+int api_timer_add_duration (
+ unsigned long long nanosec_duration,
+ void *data,
+ void (*timer_fn) (void *data),
+ corosync_timer_handle_t *handle);
+
+static void api_error_memory_failure(void) __attribute__((noreturn));
+static void api_error_memory_failure()
+{
+ fprintf(stderr, "Out of memory error\n");
+ exit(-1);
+}
+static void api_timer_delete(corosync_timer_handle_t th)
+{
+ qb_loop_timer_del(poll_loop, th);
+}
+
+int api_timer_add_duration (
+ unsigned long long nanosec_duration,
+ void *data,
+ void (*timer_fn) (void *data),
+ corosync_timer_handle_t *handle)
+{
+ return qb_loop_timer_add(poll_loop,
+ QB_LOOP_MED,
+ nanosec_duration,
+ data,
+ timer_fn,
+ handle);
+}
+
+static unsigned int api_totem_nodeid_get(void)
+{
+ return our_nodeid;
+}
+
+static int api_totem_mcast(const struct iovec *iov, unsigned int iovlen, unsigned int type)
+{
+ struct vqsim_msg_header header;
+ struct iovec iovec[iovlen+1];
+ int total = sizeof(header);
+ int res;
+ int i;
+
+ header.type = VQMSG_EXEC;
+ header.from_nodeid = our_nodeid;
+ header.param = 0;
+
+ iovec[0].iov_base = &header;
+ iovec[0].iov_len = sizeof(header);
+ for (i=0; i<iovlen; i++) {
+ iovec[i+1].iov_base = iov[i].iov_base;
+ iovec[i+1].iov_len = iov[i].iov_len;
+ total += iov[i].iov_len;
+ }
+
+ res = writev(parent_socket, iovec, iovlen+1);
+ if (res != total) {
+ fprintf(stderr, "writev wrote only %d of %d bytes\n", res, total);
+ }
+ return 0;
+}
+static void *api_ipc_private_data_get(void *conn)
+{
+ return private_data;
+}
+static int api_ipc_response_send(void *conn, const void *msg, size_t len)
+{
+ struct qb_ipc_response_header *qb_header = (void*)msg;
+
+ /* Save the error so we can return it */
+ last_lib_error = qb_header->error;
+ return 0;
+}
+
+static struct corosync_api_v1 corosync_api = {
+ .error_memory_failure = api_error_memory_failure,
+ .timer_delete = api_timer_delete,
+ .timer_add_duration = api_timer_add_duration,
+ .totem_nodeid_get = api_totem_nodeid_get,
+ .totem_mcast = api_totem_mcast,
+ .ipc_private_data_get = api_ipc_private_data_get,
+ .ipc_response_send = api_ipc_response_send,
+};
+
+/* -------------------- Above is all for providing the corosync_api support routines --------------------------------------------*/
+/* They need to be in the same file as the engine as they use the local 'poll_loop' variable which is per-process */
+
+static void start_qdevice_poll(int longwait);
+static void start_sync_timer(void);
+
+/* Callback from Votequorum to tell us about the quorum state */
+static void quorum_fn(const unsigned int *view_list,
+ size_t view_list_entries,
+ int quorate, struct memb_ring_id *ring_id)
+{
+ char msgbuf[8192];
+ int len;
+ struct vqsim_quorum_msg *quorum_msg = (void*) msgbuf;
+
+ we_are_quorate = quorate;
+
+ /* Send back to parent */
+ quorum_msg->header.type = VQMSG_QUORUM;
+ quorum_msg->header.from_nodeid = our_nodeid;
+ quorum_msg->header.param = 0;
+ quorum_msg->quorate = quorate;
+ memcpy(&quorum_msg->ring_id, ring_id, sizeof(*ring_id));
+ quorum_msg->view_list_entries = view_list_entries;
+
+ memcpy(quorum_msg->view_list, view_list, sizeof(unsigned int)*view_list_entries);
+
+ if ( (len=write(parent_socket, msgbuf, sizeof(*quorum_msg) + sizeof(unsigned int)*view_list_entries)) <= 0) {
+ perror("write (view list to parent) failed");
+ }
+ memcpy(&current_ring_id, ring_id, sizeof(*ring_id));
+}
+
+char *corosync_service_link_and_init(struct corosync_api_v1 *api,
+ struct default_service *service_engine)
+{
+ /* dummy */
+ return NULL;
+}
+
+/* For votequorum */
+char *get_run_dir()
+{
+ static char cwd_buffer[PATH_MAX];
+
+ return getcwd(cwd_buffer, PATH_MAX);
+}
+
+/* This is different to the one in totemconfig.c in that we already
+ * know the 'local' node ID, so we can just search for that.
+ * It needs to be here rather than at main config read time as it's
+ * (obviously) going to be different for each instance.
+ */
+static void set_local_node_pos(struct corosync_api_v1 *api)
+{
+ icmap_iter_t iter;
+ uint32_t node_pos;
+ char name_str[ICMAP_KEYNAME_MAXLEN];
+ uint32_t nodeid;
+ const char *iter_key;
+ int res;
+ int found = 0;
+
+ iter = icmap_iter_init("nodelist.node.");
+ while ((iter_key = icmap_iter_next(iter, NULL, NULL)) != NULL) {
+ res = sscanf(iter_key, "nodelist.node.%u.%s", &node_pos, name_str);
+ if (res != 2) {
+ continue;
+ }
+ if (strcmp(name_str, "nodeid")) {
+ continue;
+ }
+
+ res = icmap_get_uint32(iter_key, &nodeid);
+ if (res == CS_OK) {
+ if (nodeid == our_nodeid) {
+ found = 1;
+ res = icmap_set_uint32("nodelist.local_node_pos", node_pos);
+ assert(res == CS_OK);
+ }
+ }
+ }
+ if (!found) {
+ /* This probably indicates a dynamically-added node
+ * set the pos to zero and use the votes of the
+ * first node in corosync.conf
+ */
+ res = icmap_set_uint32("nodelist.local_node_pos", 0);
+ assert(res == CS_OK);
+ }
+}
+
+static int load_quorum_instance(struct corosync_api_v1 *api)
+{
+ const char *error_string;
+ int res;
+
+ error_string = votequorum_init(api, quorum_fn);
+ if (error_string) {
+ fprintf(stderr, "Votequorum init failed: %s\n", error_string);
+ return -1;
+ }
+
+ engine = votequorum_get_service_engine_ver0();
+ error_string = engine->exec_init_fn(api);
+ if (error_string) {
+ fprintf(stderr, "votequorum exec init failed: %s\n", error_string);
+ return -1;
+ }
+
+ private_data = malloc(engine->private_data_size);
+ if (!private_data) {
+ perror("Malloc in child failed");
+ return -1;
+ }
+
+ res = engine->lib_init_fn(fake_conn);
+
+ return res;
+}
+
+static void sync_dispatch_fn(void *data)
+{
+ if (engine->sync_process()) {
+ start_sync_timer();
+ }
+ else {
+ engine->sync_activate();
+ }
+}
+
+static void start_sync_timer()
+{
+ qb_loop_timer_add(poll_loop,
+ QB_LOOP_MED,
+ 10000000,
+ NULL,
+ sync_dispatch_fn,
+ &sync_timer);
+}
+
+static void send_sync(char *buf, int len)
+{
+ struct vqsim_sync_msg *msg = (void*)buf;
+
+ /* Votequorum doesn't use the transitional node list :-) */
+ engine->sync_init(NULL, 0,
+ msg->view_list, msg->view_list_entries,
+ &msg->ring_id);
+
+ start_sync_timer();
+}
+
+static void send_exec_msg(char *buf, int len)
+{
+ struct vqsim_exec_msg *execmsg = (void*)buf;
+ struct qb_ipc_request_header *qb_header = (void*)execmsg->execmsg;
+
+ engine->exec_engine[qb_header->id & 0xFFFF].exec_handler_fn(execmsg->execmsg, execmsg->header.from_nodeid);
+}
+
+static int send_lib_msg(int type, void *msg)
+{
+ /* Clear this as not all lib functions return a response immediately */
+ last_lib_error = CS_OK;
+
+ engine->lib_engine[type].lib_handler_fn(fake_conn, msg);
+
+ return last_lib_error;
+}
+
+static int poll_qdevice(int onoff)
+{
+ struct req_lib_votequorum_qdevice_poll pollmsg;
+ int res;
+
+ pollmsg.cast_vote = onoff;
+ pollmsg.ring_id.nodeid = current_ring_id.nodeid;
+ pollmsg.ring_id.seq = current_ring_id.seq;
+ strcpy(pollmsg.name, QDEVICE_NAME);
+
+ res = send_lib_msg(MESSAGE_REQ_VOTEQUORUM_QDEVICE_POLL, &pollmsg);
+ if (res != CS_OK) {
+ fprintf(stderr, CS_PRI_NODE_ID ": qdevice poll failed: %d\n", our_nodeid, res);
+ }
+ return res;
+}
+
+static void qdevice_dispatch_fn(void *data)
+{
+ if (poll_qdevice(1) == CS_OK) {
+ start_qdevice_poll(0);
+ }
+}
+
+static void start_qdevice_poll(int longwait)
+{
+ unsigned long long timeout;
+
+ timeout = (unsigned long long)qdevice_timeout*500000; /* Half the corosync timeout */
+ if (longwait) {
+ timeout *= 2;
+ }
+
+ qb_loop_timer_add(poll_loop,
+ QB_LOOP_MED,
+ timeout,
+ NULL,
+ qdevice_dispatch_fn,
+ &qdevice_timer);
+}
+
+static void stop_qdevice_poll(void)
+{
+ qb_loop_timer_del(poll_loop, qdevice_timer);
+ qdevice_timer = 0;
+}
+
+static void do_qdevice(int onoff)
+{
+ int res;
+
+ if (onoff) {
+ if (!qdevice_registered) {
+ struct req_lib_votequorum_qdevice_register regmsg;
+
+ strcpy(regmsg.name, QDEVICE_NAME);
+ if ( (res=send_lib_msg(MESSAGE_REQ_VOTEQUORUM_QDEVICE_REGISTER, &regmsg)) == CS_OK) {
+ qdevice_registered = 1;
+ start_qdevice_poll(1);
+ }
+ else {
+ fprintf(stderr, CS_PRI_NODE_ID ": qdevice registration failed: %d\n", our_nodeid, res);
+ }
+ }
+ else {
+ if (!qdevice_timer) {
+ start_qdevice_poll(0);
+ }
+ }
+ }
+ else {
+ poll_qdevice(0);
+ stop_qdevice_poll();
+ }
+}
+
+
+/* From controller */
+static int parent_pipe_read_fn(int32_t fd, int32_t revents, void *data)
+{
+ struct vqsim_msg_header *header = (void*)buffer;
+ int len;
+
+ len = read(fd, buffer, sizeof(buffer));
+ if (len > 0) {
+ /* Check header and route */
+ switch (header->type) {
+ case VQMSG_QUIT:
+ exit(0);
+ break;
+ case VQMSG_EXEC: /* For votequorum exec messages */
+ send_exec_msg(buffer, len);
+ break;
+ case VQMSG_SYNC:
+ send_sync(buffer, len);
+ break;
+ case VQMSG_QDEVICE:
+ do_qdevice(header->param);
+ break;
+ case VQMSG_QUORUMQUIT:
+ if (!we_are_quorate) {
+ exit(1);
+ }
+ break;
+ case VQMSG_QUORUM:
+ /* not used here */
+ break;
+ }
+ }
+ return 0;
+}
+
+static void initial_sync(int nodeid)
+{
+ unsigned int trans_list[1] = {nodeid};
+ unsigned int member_list[1] = {nodeid};
+ struct memb_ring_id ring_id;
+
+ ring_id.nodeid = our_nodeid;
+ ring_id.seq = 1;
+
+ /* cluster with just us in it */
+ engine->sync_init(trans_list, 1,
+ member_list, 1,
+ &ring_id);
+ start_sync_timer();
+}
+
+/* Return pipe FDs & child PID if sucessful */
+int fork_new_instance(int nodeid, int *vq_sock, pid_t *childpid)
+{
+ int pipes[2];
+ pid_t pid;
+
+ if (socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_NONBLOCK, 0, pipes)) {
+ return -1;
+ }
+ parent_socket = pipes[0];
+
+ switch ( (pid=fork()) ) {
+ case -1:
+ perror("fork failed");
+ return -1;
+ case 0:
+ /* child process - continue below */
+ break;
+ default:
+ /* parent process */
+ *vq_sock = pipes[1];
+ *childpid = pid;
+ return 0;
+ }
+
+ our_nodeid = nodeid;
+ poll_loop = qb_loop_create();
+
+ if (icmap_get_uint32("quorum.device.timeout", &qdevice_timeout) != CS_OK) {
+ qdevice_timeout = VOTEQUORUM_QDEVICE_DEFAULT_TIMEOUT;
+ }
+
+ set_local_node_pos(&corosync_api);
+ load_quorum_instance(&corosync_api);
+
+ qb_loop_poll_add(poll_loop,
+ QB_LOOP_MED,
+ parent_socket,
+ POLLIN,
+ NULL,
+ parent_pipe_read_fn);
+
+ /* Start it up! */
+ initial_sync(nodeid);
+ qb_loop_run(poll_loop);
+
+ return 0;
+}