summaryrefslogtreecommitdiffstats
path: root/lib/clplumbing/ipctest.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 06:40:13 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 06:40:13 +0000
commite9be59e1502a41bab9891d96d753102a7dafef0b (patch)
treec3b2da87c414881f4b53d0964f407c83492d813e /lib/clplumbing/ipctest.c
parentInitial commit. (diff)
downloadcluster-glue-e9be59e1502a41bab9891d96d753102a7dafef0b.tar.xz
cluster-glue-e9be59e1502a41bab9891d96d753102a7dafef0b.zip
Adding upstream version 1.0.12.upstream/1.0.12upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--lib/clplumbing/ipctest.c1377
1 files changed, 1377 insertions, 0 deletions
diff --git a/lib/clplumbing/ipctest.c b/lib/clplumbing/ipctest.c
new file mode 100644
index 0000000..333d3a0
--- /dev/null
+++ b/lib/clplumbing/ipctest.c
@@ -0,0 +1,1377 @@
+/*
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#undef _GNU_SOURCE /* in case it was defined on the command line */
+#define _GNU_SOURCE
+#include <lha_internal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdarg.h>
+#include <string.h>
+#include <errno.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+/* libgen.h: for 'basename()' on Solaris */
+#include <libgen.h>
+#include <glib.h>
+#include <clplumbing/cl_log.h>
+#include <clplumbing/cl_poll.h>
+#include <clplumbing/GSource.h>
+#include <clplumbing/ipc.h>
+
+#define MAXERRORS 1000
+#define MAXERRORS_RECV 10
+
+typedef int (*TestFunc_t)(IPC_Channel*chan, int count);
+
+static int channelpair(TestFunc_t client, TestFunc_t server, int count);
+#if 0
+static void clientserverpair(IPC_Channel* channels[2]);
+#endif
+
+static int echoserver(IPC_Channel*, int repcount);
+static int echoclient(IPC_Channel*, int repcount);
+static int asyn_echoserver(IPC_Channel*, int repcount);
+static int asyn_echoclient(IPC_Channel*, int repcount);
+static int mainloop_server(IPC_Channel* chan, int repcount);
+static int mainloop_client(IPC_Channel* chan, int repcount);
+
+static int checksock(IPC_Channel* channel);
+static void checkifblocked(IPC_Channel* channel);
+
+static int (*PollFunc)(struct pollfd * fds, unsigned int, int)
+= (int (*)(struct pollfd * fds, unsigned int, int)) poll;
+static gboolean checkmsg(IPC_Message* rmsg, const char * who, int rcount);
+
+static const char *procname;
+
+static const int iter_def = 10000; /* number of iterations */
+static int verbosity; /* verbosity level */
+
+/*
+ * The ipc interface can be invoked as either:
+ * 1. pair (pipe/socketpair);
+ * 2. separate connect/accept (like server with multiple independent clients).
+ *
+ * If number of clients is given as 0, the "pair" mechanism is used,
+ * otherwise the client/server mechanism.
+ */
+/* *** CLIENTS_MAX currently 1 while coding *** */
+#define CLIENTS_MAX 1 /* max. number of independent clients */
+static int clients_def; /* number of independent clients */
+
+static int
+channelpair(TestFunc_t clientfunc, TestFunc_t serverfunc, int count)
+{
+ IPC_Channel* channels[2];
+ int rc = 0;
+ int waitstat = 0;
+
+ if (verbosity >= 1) {
+ cl_log(LOG_DEBUG, "%s[%d]%d: main process",
+ procname, (int)getpid(), __LINE__);
+ }
+ switch (fork()) {
+ case -1:
+ cl_perror("can't fork");
+ exit(1);
+ break;
+ default: /* Parent */
+ if (verbosity >= 1) {
+ cl_log(LOG_DEBUG, "%s[%d]%d: main waiting...",
+ procname, (int)getpid(), __LINE__);
+ }
+ while (wait(&waitstat) > 0) {
+ if (WIFEXITED(waitstat)) {
+ rc += WEXITSTATUS(waitstat);
+ }else{
+ rc += 1;
+ }
+ }
+ if (verbosity >= 1) {
+ cl_log(LOG_DEBUG, "%s[%d]%d: main ended rc: %d",
+ procname, (int)getpid(), __LINE__, rc);
+ }
+ if (rc > 127) {
+ rc = 127;
+ }
+ exit(rc);
+ break;
+ case 0: /* Child */
+ break;
+ }
+ /* Child continues here... */
+ if (ipc_channel_pair(channels) != IPC_OK) {
+ cl_perror("Can't create ipc channel pair");
+ exit(1);
+ }
+ checksock(channels[0]);
+ checksock(channels[1]);
+ switch (fork()) {
+ case -1:
+ cl_perror("can't fork");
+ exit(1);
+ break;
+
+ case 0: /* echo "client" Child */
+ channels[1]->ops->destroy(channels[1]);
+ channels[1] = NULL;
+ if (verbosity >= 1) {
+ cl_log(LOG_DEBUG, "%s[%d]%d: client starting...",
+ procname, (int)getpid(), __LINE__);
+ }
+ rc = clientfunc(channels[0], count);
+ if (verbosity >= 1) {
+ cl_log(LOG_DEBUG, "%s[%d]%d: client ended rc:%d",
+ procname, (int)getpid(), __LINE__, rc);
+ }
+ exit (rc > 127 ? 127 : rc);
+ break;
+
+ default:
+ break;
+ }
+ channels[0]->ops->destroy(channels[0]);
+ channels[0] = NULL;
+ if (verbosity >= 1) {
+ cl_log(LOG_DEBUG, "%s[%d]%d: server starting...",
+ procname, (int)getpid(), __LINE__);
+ }
+ rc = serverfunc(channels[1], count);
+ wait(&waitstat);
+ if (WIFEXITED(waitstat)) {
+ rc += WEXITSTATUS(waitstat);
+ }else{
+ rc += 1;
+ }
+ if (verbosity >= 1) {
+ cl_log(LOG_DEBUG, "%s[%d]%d: server ended rc:%d",
+ procname, (int)getpid(), __LINE__, rc);
+ }
+ return(rc);
+}
+
+/* server with many clients */
+static int
+clientserver(TestFunc_t clientfunc, TestFunc_t serverfunc, int count, int clients)
+{
+ IPC_Channel* channel;
+ int rc = 0;
+ int waitstat = 0;
+ struct IPC_WAIT_CONNECTION *wconn;
+ char path[] = IPC_PATH_ATTR;
+ char commpath[] = "/tmp/foobar"; /* *** CHECK/FIX: Is this OK? */
+ GHashTable * wattrs;
+ int i;
+ pid_t pid;
+
+ if (verbosity >= 1) {
+ cl_log(LOG_DEBUG, "%s[%d]%d: main process",
+ procname, (int)getpid(), __LINE__);
+ }
+
+ switch (fork()) {
+ case -1:
+ cl_perror("can't fork");
+ exit(1);
+ break;
+ default: /* Parent */
+ if (verbosity >= 1) {
+ cl_log(LOG_DEBUG, "%s[%d]%d: main waiting...",
+ procname, (int)getpid(), __LINE__);
+ }
+ while ((pid = wait(&waitstat)) > 0) {
+ if (WIFEXITED(waitstat)) {
+ rc += WEXITSTATUS(waitstat);
+ }else{
+ rc += 1;
+ }
+ }
+ if (verbosity >= 1) {
+ cl_log(LOG_DEBUG, "%s[%d]%d: main ended rc: %d",
+ procname, (int)getpid(), __LINE__, rc);
+ }
+ if (rc > 127) {
+ rc = 127;
+ }
+ exit(rc);
+ break;
+ case 0: /* Child */
+ break;
+ }
+
+ if (verbosity >= 1) {
+ cl_log(LOG_DEBUG, "%s[%d]%d:",
+ procname, (int)getpid(), __LINE__);
+ }
+
+ /* set up a server */
+ wattrs = g_hash_table_new(g_str_hash, g_str_equal);
+ if (! wattrs) {
+ cl_perror("g_hash_table_new() failed");
+ exit(1);
+ }
+ g_hash_table_insert(wattrs, path, commpath);
+
+ if (verbosity >= 1) {
+ cl_log(LOG_DEBUG, "%s[%d]%d:",
+ procname, (int)getpid(), __LINE__);
+ }
+
+ wconn = ipc_wait_conn_constructor(IPC_ANYTYPE, wattrs);
+ if (! wconn) {
+ cl_perror("could not establish server");
+ exit(1);
+ }
+
+ if (verbosity >= 1) {
+ cl_log(LOG_DEBUG, "%s[%d]%d:",
+ procname, (int)getpid(), __LINE__);
+ }
+
+ /* spawn the clients */
+ for (i = 1; i <= clients; i++) {
+ if (verbosity >= 1) {
+ cl_log(LOG_DEBUG, "%s[%d]%d: fork client %d of %d",
+ procname, (int)getpid(), __LINE__, i, clients);
+ }
+ switch (fork()) {
+ case -1:
+ cl_perror("can't fork");
+ exit(1);
+ break;
+
+ case 0: /* echo "client" Child */
+ if (verbosity >= 1) {
+ cl_log(LOG_DEBUG, "%s[%d]%d: client %d starting...",
+ procname, (int)getpid(), __LINE__, i);
+ }
+ channel = ipc_channel_constructor(IPC_ANYTYPE, wattrs);
+ if (channel == NULL) {
+ cl_perror("client: channel creation failed");
+ exit(1);
+ }
+
+ rc = channel->ops->initiate_connection(channel);
+ if (rc != IPC_OK) {
+ cl_perror("channel[1] failed to connect");
+ exit(1);
+ }
+ checksock(channel);
+ rc = clientfunc(channel, count);
+ if (verbosity >= 1) {
+ cl_log(LOG_DEBUG, "%s[%d]%d: client %d ended rc:%d",
+ procname, (int)getpid(), __LINE__, rc, i);
+ }
+ exit (rc > 127 ? 127 : rc);
+ break;
+
+ default:
+ break;
+ }
+ }
+
+ if (verbosity >= 1) {
+ cl_log(LOG_DEBUG, "%s[%d]%d: server starting...",
+ procname, (int)getpid(), __LINE__);
+ }
+ /* accept on server */
+ /* ***
+ * Two problems (or more) here:
+ * 1. What to do if no incoming call pending?
+ * At present, fudge by sleeping a little so client gets started.
+ * 2. How to handle multiple clients?
+ * Would need to be able to await both new connections and
+ * data on existing connections.
+ * At present, fudge CLIENTS_MAX as 1.
+ * ***
+ */
+ sleep(1); /* *** */
+ channel = wconn->ops->accept_connection(wconn, NULL);
+ if (channel == NULL) {
+ cl_perror("server: acceptance failed");
+ }
+
+ checksock(channel);
+
+ rc = serverfunc(channel, count);
+
+ /* server finished: tidy up */
+ wconn->ops->destroy(wconn);
+
+ if (verbosity >= 1) {
+ cl_log(LOG_DEBUG, "%s[%d]%d: server ended rc:%d",
+ procname, (int)getpid(), __LINE__, rc);
+ }
+
+ /* reap the clients */
+ for (i = 1; i <= clients; i++) {
+ pid_t pid;
+
+ pid = wait(&waitstat);
+ if (verbosity >= 1) {
+ cl_log(LOG_DEBUG, "%s[%d]%d: client %d reaped:%d",
+ procname, (int)getpid(), __LINE__,
+ (int) pid, WIFEXITED(waitstat));
+ }
+ if (WIFEXITED(waitstat)) {
+ rc += WEXITSTATUS(waitstat);
+ }else{
+ rc += 1;
+ }
+ }
+
+ return(rc);
+}
+
+static void
+echomsgbody(void * body, int n, int niter, size_t * len)
+{
+ char *str = body;
+ int l;
+
+ l = snprintf(str, n-1, "String-%d", niter);
+ if (l < (n-1)) {
+ memset(&str[l], 'a', (n - (l+1)));
+ }
+ str[n-1] = '\0';
+ *len = n;
+}
+
+static void
+checkifblocked(IPC_Channel* chan)
+{
+ if (chan->ops->is_sending_blocked(chan)) {
+ cl_log(LOG_INFO, "Sending is blocked.");
+ chan->ops->resume_io(chan);
+ }
+}
+
+#ifdef CHEAT_CHECKS
+extern long SeqNums[32];
+#endif
+
+static int
+transport_tests(int iterations, int clients)
+{
+ int rc = 0;
+
+#ifdef CHEAT_CHECKS
+ memset(SeqNums, 0, sizeof(SeqNums));
+#endif
+ rc += (clients <= 0)
+ ? channelpair(echoclient, echoserver, iterations)
+ : clientserver(echoclient, echoserver, iterations, clients);
+
+#ifdef CHEAT_CHECKS
+ memset(SeqNums, 0, sizeof(SeqNums));
+#endif
+ rc += (clients <= 0)
+ ? channelpair(asyn_echoclient, asyn_echoserver, iterations)
+ : clientserver(asyn_echoclient, asyn_echoserver, iterations, clients);
+
+#ifdef CHEAT_CHECKS
+ memset(SeqNums, 0, sizeof(SeqNums));
+#endif
+ rc += (clients <= 0)
+ ? channelpair(mainloop_client, mainloop_server, iterations)
+ : clientserver(mainloop_client, mainloop_server, iterations, clients);
+
+ return rc;
+}
+
+static int data_size = 20;
+
+int
+main(int argc, char ** argv)
+{
+ int argflag, argerrs;
+ int iterations;
+ int clients;
+ int rc = 0;
+
+ /*
+ * Check and process arguments.
+ * -v: verbose
+ * -i: number of iterations
+ * -c: number of clients (invokes client/server mechanism)
+ * -s: data-size
+ */
+ procname = basename(argv[0]);
+
+ argerrs = 0;
+ iterations = iter_def;
+ clients = clients_def;
+ while ((argflag = getopt(argc, argv, "i:vuc:s:")) != EOF) {
+ switch (argflag) {
+ case 'i': /* iterations */
+ iterations = atoi(optarg);
+ break;
+ case 'v': /* verbosity */
+ verbosity++;
+ break;
+ case 'c': /* number of clients */
+ clients = atoi(optarg);
+ if (clients < 1 || clients > CLIENTS_MAX) {
+ fprintf(stderr, "number of clients out of range"
+ "(1 to %d)\n", CLIENTS_MAX);
+ argerrs++;
+ }
+ break;
+ case 's': /* data size */
+ data_size = atoi(optarg);
+ if (data_size < 0) {
+ fprintf(stderr, "data size must be >=0\n");
+ argerrs++;
+ }
+ if (data_size > MAXMSG) {
+ fprintf(stderr, "maximum data size is %d\n", MAXMSG);
+ argerrs++;
+ }
+ break;
+ default:
+ argerrs++;
+ break;
+ }
+ }
+ if (argerrs) {
+ fprintf(stderr,
+ "Usage: %s [-v] [-i iterations] [-c clients] [-s size]\n"
+ "\t-v : verbose\n"
+ "\t-i : iterations (default %d)\n"
+ "\t-c : number of clients (default %d; nonzero invokes client/server)\n"
+ "\t-s : data size (default 20 bytes)\n",
+ procname, iter_def, clients_def);
+ exit(1);
+ }
+
+ cl_log_set_entity(procname);
+ cl_log_enable_stderr(TRUE);
+
+
+
+ rc += transport_tests(iterations, clients);
+
+#if 0
+ /* Broken for the moment - need to fix it long term */
+ cl_log(LOG_INFO, "NOTE: Enabling poll(2) replacement code.");
+ PollFunc = cl_poll;
+ g_main_set_poll_func(cl_glibpoll);
+ ipc_set_pollfunc(cl_poll);
+
+ rc += transport_tests(5 * iterations, clients);
+#endif
+
+ cl_log(LOG_INFO, "TOTAL errors: %d", rc);
+
+ return (rc > 127 ? 127 : rc);
+}
+
+static int
+checksock(IPC_Channel* channel)
+{
+
+ if (!channel) {
+ cl_log(LOG_ERR, "Channel null");
+ return 1;
+ }
+ if (!IPC_ISRCONN(channel)) {
+ cl_log(LOG_ERR, "Channel status is %d"
+ ", not IPC_CONNECT", channel->ch_status);
+ return 1;
+ }
+ return 0;
+}
+
+static void
+EOFcheck(IPC_Channel* chan)
+{
+ int fd = chan->ops->get_recv_select_fd(chan);
+ struct pollfd pf[1];
+ int rc;
+
+ cl_log(LOG_INFO, "channel state: %d", chan->ch_status);
+
+ if (chan->recv_queue->current_qlen > 0) {
+ cl_log(LOG_INFO, "EOF Receive queue has %ld messages in it"
+ , (long)chan->recv_queue->current_qlen);
+ }
+ if (fd <= 0) {
+ cl_log(LOG_INFO, "EOF receive fd: %d", fd);
+ }
+
+
+ pf[0].fd = fd;
+ pf[0].events = POLLIN|POLLHUP;
+ pf[0].revents = 0;
+
+ rc = poll(pf, 1, 0);
+
+ if (rc < 0) {
+ cl_perror("failed poll(2) call in EOFcheck");
+ return;
+ }
+
+ /* Got input? */
+ if (pf[0].revents & POLLIN) {
+ cl_log(LOG_INFO, "EOF socket %d (still) has input ready (real poll)"
+ , fd);
+ }
+ if ((pf[0].revents & ~(POLLIN|POLLHUP)) != 0) {
+ cl_log(LOG_INFO, "EOFcheck poll(2) bits: 0x%lx"
+ , (unsigned long)pf[0].revents);
+ }
+ pf[0].fd = fd;
+ pf[0].events = POLLIN|POLLHUP;
+ pf[0].revents = 0;
+ rc = PollFunc(pf, 1, 0);
+ if (rc < 0) {
+ cl_perror("failed PollFunc() call in EOFcheck");
+ return;
+ }
+
+ /* Got input? */
+ if (pf[0].revents & POLLIN) {
+ cl_log(LOG_INFO, "EOF socket %d (still) has input ready (PollFunc())"
+ , fd);
+ }
+ if ((pf[0].revents & ~(POLLIN|POLLHUP)) != 0) {
+ cl_log(LOG_INFO, "EOFcheck PollFunc() bits: 0x%lx"
+ , (unsigned long)pf[0].revents);
+ }
+}
+
+static int
+echoserver(IPC_Channel* wchan, int repcount)
+{
+ char *str;
+ int j;
+ int errcount = 0;
+ IPC_Message wmsg;
+ IPC_Message* rmsg = NULL;
+
+ if (!(str = malloc(data_size))) {
+ cl_log(LOG_ERR, "Out of memory");
+ exit(1);
+ }
+
+ memset(&wmsg, 0, sizeof(wmsg));
+ wmsg.msg_private = NULL;
+ wmsg.msg_done = NULL;
+ wmsg.msg_body = str;
+ wmsg.msg_buf = NULL;
+ wmsg.msg_ch = wchan;
+
+ cl_log(LOG_INFO, "Echo server: %d reps pid %d.", repcount, getpid());
+ for (j=1; j <= repcount
+ ;++j, rmsg != NULL && (rmsg->msg_done(rmsg),1)) {
+ int rc;
+
+ echomsgbody(str, data_size, j, &(wmsg.msg_len));
+ if ((rc = wchan->ops->send(wchan, &wmsg)) != IPC_OK) {
+ cl_log(LOG_ERR
+ , "echotest: send failed %d rc iter %d"
+ , rc, j);
+ ++errcount;
+ continue;
+ }
+
+ /*fprintf(stderr, "+"); */
+ wchan->ops->waitout(wchan);
+ checkifblocked(wchan);
+ /*fprintf(stderr, "S"); */
+
+ /* Try and induce a failure... */
+ if (j == repcount) {
+ sleep(1);
+ }
+
+ while ((rc = wchan->ops->waitin(wchan)) == IPC_INTR);
+
+ if (rc != IPC_OK) {
+ cl_log(LOG_ERR
+ , "echotest server: waitin failed %d rc iter %d"
+ " errno=%d"
+ , rc, j, errno);
+ cl_perror("waitin");
+ exit(1);
+ }
+
+ /*fprintf(stderr, "-"); */
+ if ((rc = wchan->ops->recv(wchan, &rmsg)) != IPC_OK) {
+ cl_log(LOG_ERR
+ , "echotest server: recv failed %d rc iter %d"
+ " errno=%d"
+ , rc, j, errno);
+ cl_perror("recv");
+ ++errcount;
+ rmsg=NULL;
+ continue;
+ }
+ /*fprintf(stderr, "s"); */
+ if (rmsg->msg_len != wmsg.msg_len) {
+ cl_log(LOG_ERR
+ , "echotest: length mismatch [%lu,%lu] iter %d"
+ , (unsigned long)rmsg->msg_len
+ , (unsigned long)wmsg.msg_len, j);
+ ++errcount;
+ continue;
+ }
+ if (strncmp(rmsg->msg_body, wmsg.msg_body, wmsg.msg_len)
+ != 0) {
+ cl_log(LOG_ERR
+ , "echotest: data mismatch. iteration %d"
+ , j);
+ ++errcount;
+ continue;
+ }
+
+ }
+ cl_log(LOG_INFO, "echoserver: %d errors", errcount);
+#if 0
+ cl_log(LOG_INFO, "destroying channel 0x%lx", (unsigned long)wchan);
+#endif
+ wchan->ops->destroy(wchan); wchan = NULL;
+
+ free(str);
+
+ return errcount;
+}
+static int
+echoclient(IPC_Channel* rchan, int repcount)
+{
+ int j;
+ int errcount = 0;
+ IPC_Message* rmsg;
+
+
+
+ cl_log(LOG_INFO, "Echo client: %d reps pid %d."
+ , repcount, (int)getpid());
+ for (j=1; j <= repcount ;++j) {
+
+ int rc;
+
+ while ((rc = rchan->ops->waitin(rchan)) == IPC_INTR);
+
+ if (rc != IPC_OK) {
+ cl_log(LOG_ERR
+ , "echotest client: waitin failed %d rc iter %d"
+ " errno=%d"
+ , rc, j, errno);
+ cl_perror("waitin");
+ exit(1);
+ }
+ /*fprintf(stderr, "/"); */
+
+ if ((rc = rchan->ops->recv(rchan, &rmsg)) != IPC_OK) {
+ cl_log(LOG_ERR
+ , "echoclient: recv failed %d rc iter %d"
+ " errno=%d"
+ , rc, j, errno);
+ cl_perror("recv");
+ ++errcount;
+ if (errcount > MAXERRORS_RECV) {
+ cl_log(LOG_ERR,
+ "echoclient: errcount excessive: %d: abandoning",
+ errcount);
+ exit(1);
+ }
+ --j;
+ rmsg=NULL;
+ continue;
+ }
+ /*fprintf(stderr, "c"); */
+ if ((rc = rchan->ops->send(rchan, rmsg)) != IPC_OK) {
+ cl_log(LOG_ERR
+ , "echoclient: send failed %d rc iter %d"
+ , rc, j);
+ cl_log(LOG_INFO, "Message being sent: %s"
+ , (char*)rmsg->msg_body);
+ ++errcount;
+ continue;
+ }
+ /*fprintf(stderr, "%%"); */
+ rchan->ops->waitout(rchan);
+ checkifblocked(rchan);
+ /*fprintf(stderr, "C"); */
+ }
+ cl_log(LOG_INFO, "echoclient: %d errors", errcount);
+#if 0
+ cl_log(LOG_INFO, "destroying channel 0x%lx", (unsigned long)rchan);
+#endif
+ rchan->ops->destroy(rchan); rchan = NULL;
+ return errcount;
+}
+
+void dump_ipc_info(IPC_Channel* chan);
+
+static int
+checkinput(IPC_Channel* chan, const char * where, int* rdcount, int maxcount)
+{
+ IPC_Message* rmsg = NULL;
+ int errs = 0;
+ int rc;
+
+ while (chan->ops->is_message_pending(chan)
+ && errs < 10 && *rdcount < maxcount) {
+
+ if (chan->ch_status == IPC_DISCONNECT && *rdcount < maxcount){
+ cl_log(LOG_ERR
+ , "checkinput1[0x%lx %s]: EOF in iter %d"
+ , (unsigned long)chan, where, *rdcount);
+ EOFcheck(chan);
+ }
+
+ if (rmsg != NULL) {
+ rmsg->msg_done(rmsg);
+ rmsg = NULL;
+ }
+
+ if ((rc = chan->ops->recv(chan, &rmsg)) != IPC_OK) {
+ if (chan->ch_status == IPC_DISCONNECT) {
+ cl_log(LOG_ERR
+ , "checkinput2[0x%lx %s]: EOF in iter %d"
+ , (unsigned long)chan, where, *rdcount);
+ EOFcheck(chan);
+ return errs;
+ }
+ cl_log(LOG_ERR
+ , "checkinput[%s]: recv"
+ " failed: rc %d rdcount %d errno=%d"
+ , where, rc, *rdcount, errno);
+ cl_perror("recv");
+ rmsg=NULL;
+ ++errs;
+ continue;
+ }
+ *rdcount += 1;
+ if (!checkmsg(rmsg, where, *rdcount)) {
+ dump_ipc_info(chan);
+ ++errs;
+ }
+ if (*rdcount < maxcount && chan->ch_status == IPC_DISCONNECT){
+ cl_log(LOG_ERR
+ , "checkinput3[0x%lx %s]: EOF in iter %d"
+ , (unsigned long)chan, where, *rdcount);
+ EOFcheck(chan);
+ }
+
+ }
+ return errs;
+}
+
+static void
+async_high_flow_callback(IPC_Channel* ch, void* userdata)
+{
+ int* stopsending = userdata;
+
+ if (userdata == NULL){
+ cl_log(LOG_ERR, "userdata is NULL");
+ return;
+ }
+
+ *stopsending = 1;
+
+}
+
+static void
+async_low_flow_callback(IPC_Channel* ch, void* userdata)
+{
+
+ int* stopsending = userdata;
+
+ if (userdata == NULL){
+ cl_log(LOG_ERR, "userdata is NULL");
+ return;
+ }
+
+ *stopsending = 0;
+
+}
+
+
+static int
+asyn_echoserver(IPC_Channel* wchan, int repcount)
+{
+ int rdcount = 0;
+ int wrcount = 0;
+ int errcount = 0;
+ int blockedcount = 0;
+ IPC_Message* wmsg;
+ const char* w = "asyn_echoserver";
+ int stopsending = 0;
+
+ cl_log(LOG_INFO, "Asyn echo server: %d reps pid %d."
+ , repcount, (int)getpid());
+
+ (void)async_high_flow_callback;
+ (void)async_low_flow_callback;
+
+
+ wchan->ops->set_high_flow_callback(wchan, async_high_flow_callback, &stopsending);
+ wchan->ops->set_low_flow_callback(wchan, async_low_flow_callback, &stopsending);
+
+ wchan->low_flow_mark = 2;
+ wchan->high_flow_mark = 20;
+
+ while (rdcount < repcount) {
+ int rc;
+
+ while (wrcount < repcount && blockedcount < 10
+ && wchan->ch_status != IPC_DISCONNECT
+ ){
+
+ if (!stopsending){
+ ++wrcount;
+ if (wrcount > repcount) {
+ break;
+ }
+ wmsg = wchan->ops->new_ipcmsg(wchan, NULL, data_size, NULL);
+ echomsgbody(wmsg->msg_body, data_size, wrcount, &wmsg->msg_len);
+ if ((rc = wchan->ops->send(wchan, wmsg)) != IPC_OK){
+
+ cl_log(LOG_INFO, "channel sstatus in echo server is %d",
+ wchan->ch_status);
+ if (wchan->ch_status != IPC_CONNECT) {
+ cl_log(LOG_ERR
+ , "asyn_echoserver: send failed"
+ " %d rc iter %d"
+ , rc, wrcount);
+ ++errcount;
+ continue;
+ }else {/*send failed because of channel busy
+ * roll back
+ */
+ --wrcount;
+ }
+ }
+
+ if (wchan->ops->is_sending_blocked(wchan)) {
+ /* fprintf(stderr, "b"); */
+ ++blockedcount;
+ }else{
+ blockedcount = 0;
+ }
+ }
+
+
+ errcount += checkinput(wchan, w, &rdcount, repcount);
+ if (wrcount < repcount
+ && wchan->ch_status == IPC_DISCONNECT) {
+ ++errcount;
+ break;
+ }
+ }
+
+/* cl_log(LOG_INFO, "async_echoserver: wrcount =%d rdcount=%d B", wrcount, rdcount); */
+
+ wchan->ops->waitout(wchan);
+ errcount += checkinput(wchan, w, &rdcount, repcount);
+ if (wrcount >= repcount && rdcount < repcount) {
+ while ((rc = wchan->ops->waitin(wchan)) == IPC_INTR);
+
+ if (rc != IPC_OK) {
+ cl_log(LOG_ERR
+ , "asyn_echoserver: waitin()"
+ " failed %d rc rdcount %d errno=%d"
+ , rc, rdcount, errno);
+ cl_perror("waitin");
+ exit(1);
+ }
+ }
+ if (wchan->ch_status == IPC_DISCONNECT
+ && rdcount < repcount) {
+ cl_log(LOG_ERR,
+ "asyn_echoserver: EOF in iter %d (wrcount=%d)",
+ rdcount, wrcount);
+ EOFcheck(wchan);
+ ++errcount;
+ break;
+ }
+
+ blockedcount = 0;
+
+ }
+
+ cl_log(LOG_INFO, "asyn_echoserver: %d errors", errcount);
+#if 0
+ cl_log(LOG_INFO, "%d destroying channel 0x%lx", getpid(), (unsigned long)wchan);
+#endif
+ wchan->ops->destroy(wchan); wchan = NULL;
+ return errcount;
+}
+
+static int
+asyn_echoclient(IPC_Channel* chan, int repcount)
+{
+ int rdcount = 0;
+ int wrcount = 0;
+ int errcount = 0;
+ IPC_Message* rmsg;
+ int rfd = chan->ops->get_recv_select_fd(chan);
+ int wfd = chan->ops->get_send_select_fd(chan);
+ gboolean rdeqwr = (rfd == wfd);
+
+
+ cl_log(LOG_INFO, "Async Echo client: %d reps pid %d."
+ , repcount, (int)getpid());
+ ipc_set_pollfunc(PollFunc);
+
+ while (rdcount < repcount && errcount < repcount) {
+
+ int rc;
+ struct pollfd pf[2];
+ int nfd = 1;
+
+ pf[0].fd = rfd;
+ pf[0].events = POLLIN|POLLHUP;
+
+
+ if (chan->ops->is_sending_blocked(chan)) {
+ if (rdeqwr) {
+ pf[0].events |= POLLOUT;
+ }else{
+ nfd = 2;
+ pf[1].fd = wfd;
+ pf[1].events = POLLOUT|POLLHUP;
+ }
+ }
+
+ /* Have input? */
+ /* fprintf(stderr, "i"); */
+ while (chan->ops->is_message_pending(chan)
+ && rdcount < repcount) {
+ /*fprintf(stderr, "r"); */
+
+ if ((rc = chan->ops->recv(chan, &rmsg)) != IPC_OK) {
+ if (!IPC_ISRCONN(chan)) {
+ cl_log(LOG_ERR
+ , "Async echoclient: disconnect"
+ " iter %d", rdcount+1);
+ ++errcount;
+ return errcount;
+ }
+ cl_log(LOG_ERR
+ , "Async echoclient: recv"
+ " failed %d rc iter %d errno=%d"
+ , rc, rdcount+1, errno);
+ cl_perror("recv");
+ rmsg=NULL;
+ ++errcount;
+ cl_log(LOG_INFO, "sleep(1)");
+ sleep(1);
+ continue;
+ }
+ /*fprintf(stderr, "c"); */
+ ++rdcount;
+
+
+ do {
+ rc = chan->ops->send(chan, rmsg);
+
+ }while (rc != IPC_OK && chan->ch_status == IPC_CONNECT);
+
+ if (chan->ch_status != IPC_CONNECT){
+ ++errcount;
+ cl_perror("send");
+ cl_log(LOG_ERR
+ , "Async echoclient: send failed"
+ " rc %d, iter %d", rc, rdcount);
+ cl_log(LOG_INFO, "Message being sent: %s"
+ , (char*)rmsg->msg_body);
+ if (!IPC_ISRCONN(chan)) {
+ cl_log(LOG_ERR
+ , "Async echoclient: EOF(2)"
+ " iter %d", rdcount+1);
+ EOFcheck(chan);
+ return errcount;
+ }
+ continue;
+
+ }
+
+
+ ++wrcount;
+ /*fprintf(stderr, "x"); */
+ }
+ if (rdcount >= repcount) {
+ break;
+ }
+ /*
+ * At this point it is possible that the POLLOUT bit
+ * being on is no longer necessary, but this will only
+ * cause an extra (false) output poll iteration at worst...
+ * This is because (IIRC) both is_sending_blocked(), and
+ * is_message_pending() both perform a resume_io().
+ * This might be confusing, but -- oh well...
+ */
+
+ /*
+ fprintf(stderr, "P");
+ cl_log(LOG_INFO, "poll[%d, 0x%x]"
+ , pf[0].fd, pf[0].events);
+ cl_log(LOG_DEBUG, "poll[%d, 0x%x]..."
+ , pf[0].fd, pf[0].events);
+ fprintf(stderr, "%%");
+ cl_log(LOG_DEBUG, "CallingPollFunc()");
+ */
+ rc = PollFunc(pf, nfd, -1);
+
+ /* Bad poll? */
+ if (rc <= 0) {
+ cl_perror("Async echoclient: bad poll rc."
+ " %d rc iter %d", rc, rdcount);
+ ++errcount;
+ continue;
+ }
+
+ /* Error indication? */
+ if ((pf[0].revents & (POLLERR|POLLNVAL)) != 0) {
+ cl_log(LOG_ERR
+ , "Async echoclient: bad poll revents."
+ " revents: 0x%x iter %d", pf[0].revents, rdcount);
+ ++errcount;
+ continue;
+ }
+
+ /* HUP without input... Premature EOF... */
+ if ((pf[0].revents & POLLHUP)
+ && ((pf[0].revents&POLLIN) == 0)) {
+ cl_log(LOG_ERR
+ , "Async echoclient: premature pollhup."
+ " revents: 0x%x iter %d", pf[0].revents, rdcount);
+ EOFcheck(chan);
+ ++errcount;
+ continue;
+ }
+
+ /* Error indication? */
+ if (nfd > 1
+ && (pf[1].revents & (POLLERR|POLLNVAL)) != 0) {
+ cl_log(LOG_ERR
+ , "Async echoclient: bad poll revents[1]."
+ " revents: 0x%x iter %d", pf[1].revents, rdcount);
+ ++errcount;
+ continue;
+ }
+
+ /* Output unblocked (only) ? */
+ if (pf[nfd-1].revents & POLLOUT) {
+ /*fprintf(stderr, "R");*/
+ chan->ops->resume_io(chan);
+ }else if ((pf[0].revents & POLLIN) == 0) {
+ /* Neither I nor O available... */
+ cl_log(LOG_ERR
+ , "Async echoclient: bad events."
+ " revents: 0x%x iter %d", pf[0].revents, rdcount);
+ ++errcount;
+ }
+ }
+ cl_poll_ignore(rfd);
+ cl_poll_ignore(wfd);
+ cl_log(LOG_INFO, "Async echoclient: %d errors, %d reads, %d writes",
+ errcount, rdcount, wrcount);
+#if 0
+ cl_log(LOG_INFO, "%d destroying channel 0x%lx",getpid(), (unsigned long)chan);
+#endif
+
+
+ chan->ops->waitout(chan);
+
+ chan->ops->destroy(chan); chan = NULL;
+ return errcount;
+}
+
+
+struct iterinfo {
+ int wcount;
+ int rcount;
+ int errcount;
+ IPC_Channel* chan;
+ int max;
+ gboolean sendingsuspended;
+};
+
+static GMainLoop* loop = NULL;
+
+
+
+
+static gboolean
+s_send_msg(gpointer data)
+{
+ struct iterinfo*i = data;
+ IPC_Message* wmsg;
+ int rc;
+
+ ++i->wcount;
+
+ wmsg = i->chan->ops->new_ipcmsg(i->chan, NULL, data_size, NULL);
+ echomsgbody(wmsg->msg_body, data_size, i->wcount, &wmsg->msg_len);
+
+ /*cl_log(LOG_INFO, "s_send_msg: sending out %d", i->wcount);*/
+
+ if ((rc = i->chan->ops->send(i->chan, wmsg)) != IPC_OK) {
+ cl_log(LOG_ERR
+ , "s_send_msg: send failed"
+ " %d rc iter %d"
+ , rc, i->wcount);
+ cl_log(LOG_ERR
+ , "s_send_msg: channel status: %d qlen: %ld"
+ , i->chan->ch_status
+ , (long)i->chan->send_queue->current_qlen);
+ ++i->errcount;
+ if (i->chan->ch_status != IPC_CONNECT) {
+ cl_log(LOG_ERR, "s_send_msg: Exiting.");
+ return FALSE;
+ }
+ if (i->errcount >= MAXERRORS) {
+ g_main_quit(loop);
+ return FALSE;
+ }
+ }
+ return !i->sendingsuspended?i->wcount < i->max: FALSE;
+}
+
+
+
+
+static void
+mainloop_low_flow_callback(IPC_Channel* ch, void* userdata)
+{
+
+ struct iterinfo* i = (struct iterinfo*) userdata;
+
+ if (userdata == NULL){
+ cl_log(LOG_ERR, "userdata is NULL");
+ return;
+ }
+
+ if (i->sendingsuspended){
+ i->sendingsuspended = FALSE;
+ g_idle_add(s_send_msg, i);
+ }
+
+ return;
+
+}
+
+static void
+mainloop_high_flow_callback(IPC_Channel* ch, void* userdata)
+{
+ struct iterinfo* i = (struct iterinfo*) userdata;
+
+ if (userdata == NULL){
+ cl_log(LOG_ERR, "userdata is NULL");
+ return;
+ }
+
+ i->sendingsuspended = TRUE;
+
+}
+
+
+static gboolean
+s_rcv_msg(IPC_Channel* chan, gpointer data)
+{
+ struct iterinfo*i = data;
+
+ i->errcount += checkinput(chan, "s_rcv_msg", &i->rcount, i->max);
+
+ if (chan->ch_status == IPC_DISCONNECT
+ || i->rcount >= i->max || i->errcount > MAXERRORS) {
+ if (i->rcount < i->max) {
+ ++i->errcount;
+ cl_log(LOG_INFO, "Early exit from s_rcv_msg");
+ }
+ g_main_quit(loop);
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+static gboolean
+checkmsg(IPC_Message* rmsg, const char * who, int rcount)
+{
+ char *str;
+ size_t len;
+
+ if (!(str = malloc(data_size))) {
+ cl_log(LOG_ERR, "Out of memory");
+ exit(1);
+ }
+
+ echomsgbody(str, data_size, rcount, &len);
+
+ if (rmsg->msg_len != len) {
+ cl_log(LOG_ERR
+ , "checkmsg[%s]: length mismatch"
+ " [expected %u, got %lu] iteration %d"
+ , who, (unsigned)len
+ , (unsigned long)rmsg->msg_len
+ , rcount);
+ cl_log(LOG_ERR
+ , "checkmsg[%s]: expecting [%s]"
+ , who, str);
+ cl_log(LOG_ERR
+ , "checkmsg[%s]: got [%s] instead"
+ , who, (const char *)rmsg->msg_body);
+ return FALSE;
+ }
+ if (strncmp(rmsg->msg_body, str, len) != 0) {
+ cl_log(LOG_ERR
+ , "checkmsg[%s]: data mismatch"
+ ". input iteration %d"
+ , who, rcount);
+ cl_log(LOG_ERR
+ , "checkmsg[%s]: expecting [%s]"
+ , who, str);
+ cl_log(LOG_ERR
+ , "checkmsg[%s]: got [%s] instead"
+ , who, (const char *)rmsg->msg_body);
+ return FALSE;
+#if 0
+ }else if (strcmp(who, "s_rcv_msg") == 0) {
+#if 0
+
+ || strcmp(who, "s_echo_msg") == 0) {
+#endif
+ cl_log(LOG_ERR
+ , "checkmsg[%s]: data Good"
+ "! input iteration %d"
+ , who, rcount);
+#endif
+ }
+
+ free(str);
+
+ return TRUE;
+}
+
+static gboolean
+s_echo_msg(IPC_Channel* chan, gpointer data)
+{
+ struct iterinfo* i = data;
+ int rc;
+ IPC_Message* rmsg;
+
+ while (chan->ops->is_message_pending(chan)) {
+ if (chan->ch_status == IPC_DISCONNECT) {
+ break;
+ }
+
+ if ((rc = chan->ops->recv(chan, &rmsg)) != IPC_OK) {
+ cl_log(LOG_ERR
+ , "s_echo_msg: recv failed %d rc iter %d"
+ " errno=%d"
+ , rc, i->rcount+1, errno);
+ cl_perror("recv");
+ ++i->errcount;
+ goto retout;
+ }
+ i->rcount++;
+ if (!checkmsg(rmsg, "s_echo_msg", i->rcount)) {
+ ++i->errcount;
+ }
+
+
+
+ /*cl_log(LOG_INFO, "s_echo_msg: rcount= %d, wcount =%d", i->rcount, i->wcount);*/
+
+
+ do {
+ rc = chan->ops->send(chan, rmsg);
+
+ }while (rc != IPC_OK && chan->ch_status == IPC_CONNECT);
+
+ if (chan->ch_status != IPC_CONNECT){
+ cl_log(LOG_ERR,
+ "s_echo_msg: send failed %d rc iter %d qlen %ld",
+ rc, i->rcount, (long)chan->send_queue->current_qlen);
+ cl_perror("send");
+ i->errcount ++;
+
+ }
+
+ i->wcount+=1;
+ /*cl_log(LOG_INFO, "s_echo_msg: end of this ite");*/
+ }
+ retout:
+ /*fprintf(stderr, "%%");*/
+ if (i->rcount >= i->max || chan->ch_status == IPC_DISCONNECT
+ || i->errcount > MAXERRORS) {
+ chan->ops->waitout(chan);
+ g_main_quit(loop);
+ return FALSE;
+ }
+ return TRUE;
+}
+
+static void
+init_iterinfo(struct iterinfo * i, IPC_Channel* chan, int max)
+{
+ memset(i, 0, sizeof(*i));
+ i->chan = chan;
+ i->max = max;
+ i->sendingsuspended = FALSE;
+}
+
+static int
+mainloop_server(IPC_Channel* chan, int repcount)
+{
+ struct iterinfo info;
+ guint sendmsgsrc;
+
+
+
+ loop = g_main_new(FALSE);
+ init_iterinfo(&info, chan, repcount);
+
+ chan->ops->set_high_flow_callback(chan, mainloop_high_flow_callback, &info);
+ chan->ops->set_low_flow_callback(chan, mainloop_low_flow_callback, &info);
+ chan->high_flow_mark = 20;
+ chan->low_flow_mark = 2;
+
+ sendmsgsrc = g_idle_add(s_send_msg, &info);
+ G_main_add_IPC_Channel(G_PRIORITY_DEFAULT, chan
+ , FALSE, s_rcv_msg, &info, NULL);
+ cl_log(LOG_INFO, "Mainloop echo server: %d reps pid %d.", repcount, (int)getpid());
+ g_main_run(loop);
+ g_main_destroy(loop);
+ g_source_remove(sendmsgsrc);
+ loop = NULL;
+ cl_log(LOG_INFO, "Mainloop echo server: %d errors", info.errcount);
+ return info.errcount;
+}
+static int
+mainloop_client(IPC_Channel* chan, int repcount)
+{
+ struct iterinfo info;
+ loop = g_main_new(FALSE);
+ init_iterinfo(&info, chan, repcount);
+ G_main_add_IPC_Channel(G_PRIORITY_DEFAULT, chan
+ , FALSE, s_echo_msg, &info, NULL);
+ cl_log(LOG_INFO, "Mainloop echo client: %d reps pid %d.", repcount, (int)getpid());
+ g_main_run(loop);
+ g_main_destroy(loop);
+ loop = NULL;
+ cl_log(LOG_INFO, "Mainloop echo client: %d errors, %d read %d written"
+ , info.errcount, info.rcount, info.wcount);
+ return info.errcount;
+}