summaryrefslogtreecommitdiffstats
path: root/tests/lib/test_zmq.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-09 13:16:35 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-09 13:16:35 +0000
commite2bbf175a2184bd76f6c54ccf8456babeb1a46fc (patch)
treef0b76550d6e6f500ada964a3a4ee933a45e5a6f1 /tests/lib/test_zmq.c
parentInitial commit. (diff)
downloadfrr-e2bbf175a2184bd76f6c54ccf8456babeb1a46fc.tar.xz
frr-e2bbf175a2184bd76f6c54ccf8456babeb1a46fc.zip
Adding upstream version 9.1.upstream/9.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'tests/lib/test_zmq.c')
-rw-r--r--tests/lib/test_zmq.c312
1 files changed, 312 insertions, 0 deletions
diff --git a/tests/lib/test_zmq.c b/tests/lib/test_zmq.c
new file mode 100644
index 0000000..2cd9d47
--- /dev/null
+++ b/tests/lib/test_zmq.c
@@ -0,0 +1,312 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/*
+ * ZeroMQ event test
+ * Copyright (C) 2017 David Lamparter, for NetDEF, Inc.
+ */
+
+#include <zebra.h>
+#include "memory.h"
+#include "sigevent.h"
+#include "frr_zmq.h"
+
+DEFINE_MTYPE_STATIC(LIB, TESTBUF, "zmq test buffer");
+DEFINE_MTYPE_STATIC(LIB, ZMQMSG, "zmq message");
+
+static struct event_loop *master;
+
+static void msg_buf_free(void *data, void *hint)
+{
+ XFREE(MTYPE_TESTBUF, data);
+}
+
+static int recv_delim(void *zmqsock)
+{
+ /* receive delim */
+ zmq_msg_t zdelim;
+ int more;
+ zmq_msg_init(&zdelim);
+ zmq_msg_recv(&zdelim, zmqsock, 0);
+ more = zmq_msg_more(&zdelim);
+ zmq_msg_close(&zdelim);
+ return more;
+}
+static void send_delim(void *zmqsock)
+{
+ /* Send delim */
+ zmq_msg_t zdelim;
+ zmq_msg_init(&zdelim);
+ zmq_msg_send(&zdelim, zmqsock, ZMQ_SNDMORE);
+ zmq_msg_close(&zdelim);
+}
+static void run_client(int syncfd)
+{
+ int i, j;
+ char buf[32];
+ char dummy;
+ void *zmqctx = NULL;
+ void *zmqsock;
+ int more;
+
+ read(syncfd, &dummy, 1);
+
+ zmqctx = zmq_ctx_new();
+ zmq_ctx_set(zmqctx, ZMQ_IPV6, 1);
+
+ zmqsock = zmq_socket(zmqctx, ZMQ_DEALER);
+ if (zmq_connect(zmqsock, "tcp://127.0.0.1:17171")) {
+ perror("zmq_connect");
+ exit(1);
+ }
+
+ /* single-part */
+ for (i = 0; i < 8; i++) {
+ snprintf(buf, sizeof(buf), "msg #%d %c%c%c", i, 'a' + i,
+ 'b' + i, 'c' + i);
+ printf("client send: %s\n", buf);
+ fflush(stdout);
+ send_delim(zmqsock);
+ zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
+ more = recv_delim(zmqsock);
+ while (more) {
+ zmq_recv(zmqsock, buf, sizeof(buf), 0);
+ printf("client recv: %s\n", buf);
+ size_t len = sizeof(more);
+ if (zmq_getsockopt(zmqsock, ZMQ_RCVMORE, &more, &len))
+ break;
+ }
+ }
+
+ /* multipart */
+ for (i = 2; i < 5; i++) {
+ printf("---\n");
+ send_delim(zmqsock);
+ zmq_msg_t part;
+ for (j = 1; j <= i; j++) {
+ char *dyn = XMALLOC(MTYPE_TESTBUF, 32);
+
+ snprintf(dyn, 32, "part %d/%d", j, i);
+ printf("client send: %s\n", dyn);
+ fflush(stdout);
+
+ zmq_msg_init_data(&part, dyn, strlen(dyn) + 1,
+ msg_buf_free, NULL);
+ zmq_msg_send(&part, zmqsock, j < i ? ZMQ_SNDMORE : 0);
+ }
+
+ recv_delim(zmqsock);
+ do {
+ char *data;
+
+ zmq_msg_recv(&part, zmqsock, 0);
+ data = zmq_msg_data(&part);
+ more = zmq_msg_more(&part);
+ printf("client recv (more: %d): %s\n", more, data);
+ } while (more);
+ zmq_msg_close(&part);
+ }
+
+ /* write callback */
+ printf("---\n");
+ snprintf(buf, sizeof(buf), "Done receiving");
+ printf("client send: %s\n", buf);
+ fflush(stdout);
+ send_delim(zmqsock);
+ zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
+ /* wait for message from server */
+ more = recv_delim(zmqsock);
+ while (more) {
+ zmq_recv(zmqsock, buf, sizeof(buf), 0);
+ printf("client recv: %s\n", buf);
+ size_t len = sizeof(more);
+ if (zmq_getsockopt(zmqsock, ZMQ_RCVMORE, &more, &len))
+ break;
+ }
+
+ zmq_close(zmqsock);
+ zmq_ctx_term(zmqctx);
+}
+
+static struct frrzmq_cb *cb;
+
+static void recv_id_and_delim(void *zmqsock, zmq_msg_t *msg_id)
+{
+ /* receive id */
+ zmq_msg_init(msg_id);
+ zmq_msg_recv(msg_id, zmqsock, 0);
+ /* receive delim */
+ recv_delim(zmqsock);
+}
+static void send_id_and_delim(void *zmqsock, zmq_msg_t *msg_id)
+{
+ /* Send Id */
+ zmq_msg_send(msg_id, zmqsock, ZMQ_SNDMORE);
+ send_delim(zmqsock);
+}
+static void serverwritefn(void *arg, void *zmqsock)
+{
+ zmq_msg_t *msg_id = (zmq_msg_t *)arg;
+ char buf[32] = "Test write callback";
+ size_t i;
+
+ for (i = 0; i < strlen(buf); i++)
+ buf[i] = toupper(buf[i]);
+ printf("server send: %s\n", buf);
+ fflush(stdout);
+ send_id_and_delim(zmqsock, msg_id);
+ zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
+
+ /* send just once */
+ frrzmq_thread_cancel(&cb, &cb->write);
+
+ zmq_msg_close(msg_id);
+ XFREE(MTYPE_ZMQMSG, msg_id);
+}
+static void serverpartfn(void *arg, void *zmqsock, zmq_msg_t *msg,
+ unsigned partnum)
+{
+ static int num = 0;
+ int more = zmq_msg_more(msg);
+ char *in = zmq_msg_data(msg);
+ size_t i;
+ zmq_msg_t reply;
+ char *out;
+
+ /* Id */
+ if (partnum == 0) {
+ send_id_and_delim(zmqsock, msg);
+ return;
+ }
+ /* Delim */
+ if (partnum == 1)
+ return;
+
+
+ printf("server recv part %u (more: %d): %s\n", partnum, more, in);
+ fflush(stdout);
+
+ out = XMALLOC(MTYPE_TESTBUF, strlen(in) + 1);
+ for (i = 0; i < strlen(in); i++)
+ out[i] = toupper(in[i]);
+ out[i] = '\0';
+ zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL);
+ zmq_msg_send(&reply, zmqsock, ZMQ_SNDMORE);
+
+ if (more)
+ return;
+
+ out = XMALLOC(MTYPE_TESTBUF, 32);
+ snprintf(out, 32, "msg# was %u", partnum);
+ zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL);
+ zmq_msg_send(&reply, zmqsock, 0);
+
+ zmq_msg_close(&reply);
+
+ if (++num < 7)
+ return;
+
+ /* write callback test */
+ char buf[32];
+ zmq_msg_t *msg_id = XMALLOC(MTYPE_ZMQMSG, sizeof(zmq_msg_t));
+ recv_id_and_delim(zmqsock, msg_id);
+ zmq_recv(zmqsock, buf, sizeof(buf), 0);
+ printf("server recv: %s\n", buf);
+ fflush(stdout);
+
+ frrzmq_event_add_write_msg(master, serverwritefn, NULL, msg_id, zmqsock,
+ &cb);
+}
+
+static void serverfn(void *arg, void *zmqsock)
+{
+ static int num = 0;
+
+ zmq_msg_t msg_id;
+ char buf[32];
+ size_t i;
+
+ recv_id_and_delim(zmqsock, &msg_id);
+ zmq_recv(zmqsock, buf, sizeof(buf), 0);
+
+ printf("server recv: %s\n", buf);
+ fflush(stdout);
+ for (i = 0; i < strlen(buf); i++)
+ buf[i] = toupper(buf[i]);
+ send_id_and_delim(zmqsock, &msg_id);
+ zmq_msg_close(&msg_id);
+ zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
+
+ if (++num < 4)
+ return;
+
+ /* change to multipart callback */
+ frrzmq_thread_cancel(&cb, &cb->read);
+ frrzmq_thread_cancel(&cb, &cb->write);
+
+ frrzmq_event_add_read_part(master, serverpartfn, NULL, NULL, zmqsock,
+ &cb);
+}
+
+static void sigchld(void)
+{
+ printf("child exited.\n");
+ frrzmq_thread_cancel(&cb, &cb->read);
+ frrzmq_thread_cancel(&cb, &cb->write);
+}
+
+static struct frr_signal_t sigs[] = {
+ {
+ .signal = SIGCHLD,
+ .handler = sigchld,
+ },
+};
+
+static void run_server(int syncfd)
+{
+ void *zmqsock;
+ char dummy = 0;
+ struct event t;
+
+ master = event_master_create(NULL);
+ signal_init(master, array_size(sigs), sigs);
+ frrzmq_init();
+
+ zmqsock = zmq_socket(frrzmq_context, ZMQ_ROUTER);
+ if (zmq_bind(zmqsock, "tcp://*:17171")) {
+ perror("zmq_bind");
+ exit(1);
+ }
+
+ frrzmq_event_add_read_msg(master, serverfn, NULL, NULL, zmqsock, &cb);
+
+ write(syncfd, &dummy, sizeof(dummy));
+ while (event_fetch(master, &t))
+ event_call(&t);
+
+ zmq_close(zmqsock);
+ frrzmq_finish();
+ event_master_free(master);
+ log_memstats_stderr("test");
+}
+
+int main(void)
+{
+ int syncpipe[2];
+ pid_t child;
+
+ if (pipe(syncpipe)) {
+ perror("pipe");
+ exit(1);
+ }
+
+ child = fork();
+ if (child < 0) {
+ perror("fork");
+ exit(1);
+ } else if (child == 0) {
+ run_client(syncpipe[0]);
+ exit(0);
+ }
+
+ run_server(syncpipe[1]);
+ exit(0);
+}