summaryrefslogtreecommitdiffstats
path: root/lib/clplumbing/ipctransientclient.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 06:40:13 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 06:40:13 +0000
commite9be59e1502a41bab9891d96d753102a7dafef0b (patch)
treec3b2da87c414881f4b53d0964f407c83492d813e /lib/clplumbing/ipctransientclient.c
parentInitial commit. (diff)
downloadcluster-glue-upstream.tar.xz
cluster-glue-upstream.zip
Adding upstream version 1.0.12.upstream/1.0.12upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'lib/clplumbing/ipctransientclient.c')
-rw-r--r--lib/clplumbing/ipctransientclient.c222
1 files changed, 222 insertions, 0 deletions
diff --git a/lib/clplumbing/ipctransientclient.c b/lib/clplumbing/ipctransientclient.c
new file mode 100644
index 0000000..080acf2
--- /dev/null
+++ b/lib/clplumbing/ipctransientclient.c
@@ -0,0 +1,222 @@
+/*
+ * Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#include <ipctransient.h>
+
+#define MAX_MESSAGES 3
+static char *messages[MAX_MESSAGES];
+
+IPC_Message *create_simple_message(const char *text, IPC_Channel *ch);
+IPC_Channel *init_client_ipctest_comms(
+ const char *child, gboolean (*dispatch)(
+ IPC_Channel* source_data, gpointer user_data),
+ void *user_data);
+gboolean transient_client_callback(IPC_Channel* server, void* private_data);
+void client_send_message(
+ const char *message_text, IPC_Channel *server_channel, int iteration);
+
+#define MAXTSTMSG 1000
+
+int
+main(int argc, char ** argv)
+{
+ int lpc =0, iteration=0;
+ GMainLoop* client_main = NULL;
+ IPC_Channel *server_channel = NULL;
+
+ trans_getargs(argc, argv);
+
+ cl_log_set_entity(procname);
+ cl_log_enable_stderr(TRUE);
+
+ /* give the server a chance to start */
+ cl_log(LOG_INFO, "#--#--#--#--# Beginning test run %d against server %d...", lpc, iteration);
+ client_main = g_main_new(FALSE);
+
+ /* connect, send messages */
+ server_channel = init_client_ipctest_comms("echo", transient_client_callback, client_main);
+
+ if(server_channel == NULL) {
+ cl_log(LOG_ERR, "[Client %d] Could not connect to server", lpc);
+ return 1;
+ }
+
+ for(lpc = 0; lpc < MAX_MESSAGES; lpc++) {
+ messages[lpc] = (char *)malloc(sizeof(char)*MAXTSTMSG);
+ }
+ snprintf(messages[0], MAXTSTMSG
+ , "%s_%ld%c", "hello", (long)getpid(), '\0');
+ snprintf(messages[1], MAXTSTMSG
+ , "%s_%ld%c", "hello_world", (long)getpid(), '\0');
+ snprintf(messages[2], MAXTSTMSG
+ , "%s_%ld%c", "hello_world_again", (long)getpid(), '\0');
+
+ for(lpc = 0; lpc < MAX_MESSAGES; lpc++) {
+ client_send_message(messages[lpc], server_channel, lpc);
+ }
+
+ server_channel->ops->waitout(server_channel);
+
+ /* wait for the reply by creating a mainloop and running it until
+ * the callbacks are invoked...
+ */
+
+ cl_log(LOG_DEBUG, "Waiting for replies from the echo server");
+ g_main_run(client_main);
+ cl_log(LOG_INFO, "[Iteration %d] Client %d completed successfully", iteration, lpc);
+
+ return 0;
+}
+
+
+IPC_Channel *
+init_client_ipctest_comms(const char *child,
+ gboolean (*dispatch)(IPC_Channel* source_data
+ ,gpointer user_data),
+ void *user_data)
+{
+ IPC_Channel *ch;
+ GHashTable * attrs;
+ int local_sock_len = 2; /* 2 = '/' + '\0' */
+ char *commpath = NULL;
+ static char path[] = IPC_PATH_ATTR;
+
+ local_sock_len += strlen(child);
+ local_sock_len += strlen(commdir);
+
+ commpath = (char*)malloc(sizeof(char)*local_sock_len);
+ if (commpath == NULL){
+ cl_log(LOG_ERR, "%s: allocating memory failed", __FUNCTION__);
+ return NULL;
+ }
+ sprintf(commpath, "%s/%s", commdir, child);
+ commpath[local_sock_len - 1] = '\0';
+
+ cl_log(LOG_DEBUG, "[Client] Attempting to talk on: %s", commpath);
+
+ attrs = g_hash_table_new(g_str_hash,g_str_equal);
+ g_hash_table_insert(attrs, path, commpath);
+ ch = ipc_channel_constructor(IPC_ANYTYPE, attrs);
+ g_hash_table_destroy(attrs);
+
+ if (ch == NULL) {
+ cl_log(LOG_ERR, "[Client] Could not access channel on: %s", commpath);
+ return NULL;
+ } else if(ch->ops->initiate_connection(ch) != IPC_OK) {
+ cl_log(LOG_ERR, "[Client] Could not init comms on: %s", commpath);
+ return NULL;
+ }
+
+ G_main_add_IPC_Channel(G_PRIORITY_LOW,
+ ch, FALSE, dispatch, user_data,
+ default_ipctest_input_destroy);
+
+ return ch;
+}
+
+
+gboolean
+transient_client_callback(IPC_Channel* server, void* private_data)
+{
+ int lpc = 0;
+ IPC_Message *msg = NULL;
+ char *buffer = NULL;
+ static int received_responses = 0;
+
+ GMainLoop *mainloop = (GMainLoop*)private_data;
+
+ while(server->ops->is_message_pending(server) == TRUE) {
+ if (server->ch_status == IPC_DISCONNECT) {
+ /* The message which was pending for us is the
+ * new status of IPC_DISCONNECT */
+ break;
+ }
+ if(server->ops->recv(server, &msg) != IPC_OK) {
+ cl_log(LOG_ERR, "[Client] Error while invoking recv()");
+ perror("[Client] Receive failure:");
+ return FALSE;
+ }
+
+ if (msg != NULL) {
+ buffer = (char*)msg->msg_body;
+ cl_log(LOG_DEBUG, "[Client] Got text [text=%s]", buffer);
+ received_responses++;
+
+ if(lpc < MAX_MESSAGES && strcmp(messages[lpc], buffer) != 0)
+ {
+ cl_log(LOG_ERR, "[Client] Received someone else's message [%s] instead of [%s]", buffer, messages[lpc]);
+ }
+ else if(lpc >= MAX_MESSAGES)
+ {
+ cl_log(LOG_ERR, "[Client] Receivedan extra message [%s]", buffer);
+ }
+
+ lpc++;
+ msg->msg_done(msg);
+ } else {
+ cl_log(LOG_ERR, "[Client] No message this time");
+ }
+ }
+
+ if(server->ch_status == IPC_DISCONNECT) {
+ cl_log(LOG_ERR, "[Client] Client received HUP");
+ return FALSE;
+ }
+
+ cl_log(LOG_DEBUG, "[Client] Processed %d IPC messages this time, %d total", lpc, received_responses);
+
+ if(received_responses > 2) {
+ cl_log(LOG_INFO, "[Client] Processed %d IPC messages, all done.", received_responses);
+ received_responses = 0;
+ g_main_quit(mainloop);
+ cl_log(LOG_INFO, "[Client] Exiting.");
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+void
+client_send_message(const char *message_text,
+ IPC_Channel *server_channel,
+ int iteration)
+{
+ IPC_Message *a_message = NULL;
+ if(server_channel->ch_status != IPC_CONNECT) {
+ cl_log(LOG_WARNING, "[Client %d] Channel is in state %d before sending message [%s]",
+ iteration, server_channel->ch_status, message_text);
+ return;
+ }
+
+ a_message = create_simple_message(message_text, server_channel);
+ if(a_message == NULL) {
+ cl_log(LOG_ERR, "Could not create message to send");
+ } else {
+ cl_log(LOG_DEBUG, "[Client %d] Sending message: %s", iteration, (char*)a_message->msg_body);
+ while(server_channel->ops->send(server_channel, a_message) == IPC_FAIL) {
+ cl_log(LOG_ERR, "[Client %d] IPC channel is blocked", iteration);
+ cl_shortsleep();
+ }
+
+ if(server_channel->ch_status != IPC_CONNECT) {
+ cl_log(LOG_WARNING,
+ "[Client %d] Channel is in state %d after sending message [%s]",
+ iteration, server_channel->ch_status, message_text);
+ }
+ }
+}