diff options
Diffstat (limited to '')
-rw-r--r-- | lib/frr_zmq.c | 356 |
1 files changed, 356 insertions, 0 deletions
diff --git a/lib/frr_zmq.c b/lib/frr_zmq.c new file mode 100644 index 0000000..b28dd7f --- /dev/null +++ b/lib/frr_zmq.c @@ -0,0 +1,356 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +/* + * libzebra ZeroMQ bindings + * Copyright (C) 2015 David Lamparter + */ + +/* + * IF YOU MODIFY THIS FILE PLEASE RUN `make check` and ensure that + * the test_zmq.c unit test is still working. There are dependencies + * between the two that are extremely fragile. My understanding + * is that there is specialized ownership of the cb pointer based + * upon what is happening. Those assumptions are supposed to be + * tested in the test_zmq.c + */ +#include <zebra.h> +#include <zmq.h> + +#include "frrevent.h" +#include "memory.h" +#include "frr_zmq.h" +#include "log.h" +#include "lib_errors.h" + +DEFINE_MTYPE_STATIC(LIB, ZEROMQ_CB, "ZeroMQ callback"); + +/* libzmq's context */ +void *frrzmq_context = NULL; +static unsigned frrzmq_initcount = 0; + +void frrzmq_init(void) +{ + if (frrzmq_initcount++ == 0) { + frrzmq_context = zmq_ctx_new(); + zmq_ctx_set(frrzmq_context, ZMQ_IPV6, 1); + } +} + +void frrzmq_finish(void) +{ + if (--frrzmq_initcount == 0) { + zmq_ctx_term(frrzmq_context); + frrzmq_context = NULL; + } +} + +static void frrzmq_read_msg(struct event *t) +{ + struct frrzmq_cb **cbp = EVENT_ARG(t); + struct frrzmq_cb *cb; + zmq_msg_t msg; + unsigned partno; + unsigned char read = 0; + int ret, more; + size_t moresz; + + if (!cbp) + return; + cb = (*cbp); + if (!cb || !cb->zmqsock) + return; + + while (1) { + zmq_pollitem_t polli = {.socket = cb->zmqsock, + .events = ZMQ_POLLIN}; + ret = zmq_poll(&polli, 1, 0); + + if (ret < 0) + goto out_err; + + if (!(polli.revents & ZMQ_POLLIN)) + break; + + if (cb->read.cb_msg) { + cb->in_cb = true; + cb->read.cb_msg(cb->read.arg, cb->zmqsock); + cb->in_cb = false; + + read = 1; + + if (cb->read.cancelled) { + frrzmq_check_events(cbp, &cb->write, + ZMQ_POLLOUT); + cb->read.thread = NULL; + if (cb->write.cancelled && !cb->write.thread) + XFREE(MTYPE_ZEROMQ_CB, *cbp); + + return; + } + continue; + } + + partno = 0; + if (zmq_msg_init(&msg)) + goto out_err; + do { + ret = zmq_msg_recv(&msg, cb->zmqsock, ZMQ_NOBLOCK); + if (ret < 0) { + if (errno == EAGAIN) + break; + + zmq_msg_close(&msg); + goto out_err; + } + read = 1; + + cb->in_cb = true; + cb->read.cb_part(cb->read.arg, cb->zmqsock, &msg, + partno); + cb->in_cb = false; + + if (cb->read.cancelled) { + zmq_msg_close(&msg); + frrzmq_check_events(cbp, &cb->write, + ZMQ_POLLOUT); + cb->read.thread = NULL; + if (cb->write.cancelled && !cb->write.thread) + XFREE(MTYPE_ZEROMQ_CB, *cbp); + + return; + } + + /* cb_part may have read additional parts of the + * message; don't use zmq_msg_more here */ + moresz = sizeof(more); + more = 0; + ret = zmq_getsockopt(cb->zmqsock, ZMQ_RCVMORE, &more, + &moresz); + if (ret < 0) { + zmq_msg_close(&msg); + goto out_err; + } + + partno++; + } while (more); + zmq_msg_close(&msg); + } + + if (read) + frrzmq_check_events(cbp, &cb->write, ZMQ_POLLOUT); + + event_add_read(t->master, frrzmq_read_msg, cbp, cb->fd, + &cb->read.thread); + return; + +out_err: + flog_err(EC_LIB_ZMQ, "ZeroMQ read error: %s(%d)", strerror(errno), + errno); + if (cb->read.cb_error) + cb->read.cb_error(cb->read.arg, cb->zmqsock); +} + +int _frrzmq_event_add_read(const struct xref_eventsched *xref, + struct event_loop *master, + void (*msgfunc)(void *arg, void *zmqsock), + void (*partfunc)(void *arg, void *zmqsock, + zmq_msg_t *msg, unsigned partnum), + void (*errfunc)(void *arg, void *zmqsock), void *arg, + void *zmqsock, struct frrzmq_cb **cbp) +{ + int fd, events; + size_t len; + struct frrzmq_cb *cb; + + if (!cbp) + return -1; + if (!(msgfunc || partfunc) || (msgfunc && partfunc)) + return -1; + len = sizeof(fd); + if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len)) + return -1; + len = sizeof(events); + if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len)) + return -1; + + if (*cbp) + cb = *cbp; + else { + cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb)); + cb->write.cancelled = true; + *cbp = cb; + } + + cb->zmqsock = zmqsock; + cb->fd = fd; + cb->read.arg = arg; + cb->read.cb_msg = msgfunc; + cb->read.cb_part = partfunc; + cb->read.cb_error = errfunc; + cb->read.cancelled = false; + cb->in_cb = false; + + if (events & ZMQ_POLLIN) { + event_cancel(&cb->read.thread); + + event_add_event(master, frrzmq_read_msg, cbp, fd, + &cb->read.thread); + } else + event_add_read(master, frrzmq_read_msg, cbp, fd, + &cb->read.thread); + return 0; +} + +static void frrzmq_write_msg(struct event *t) +{ + struct frrzmq_cb **cbp = EVENT_ARG(t); + struct frrzmq_cb *cb; + unsigned char written = 0; + int ret; + + if (!cbp) + return; + cb = (*cbp); + if (!cb || !cb->zmqsock) + return; + + while (1) { + zmq_pollitem_t polli = {.socket = cb->zmqsock, + .events = ZMQ_POLLOUT}; + ret = zmq_poll(&polli, 1, 0); + + if (ret < 0) + goto out_err; + + if (!(polli.revents & ZMQ_POLLOUT)) + break; + + if (cb->write.cb_msg) { + cb->in_cb = true; + cb->write.cb_msg(cb->write.arg, cb->zmqsock); + cb->in_cb = false; + + written = 1; + + if (cb->write.cancelled) { + frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN); + cb->write.thread = NULL; + if (cb->read.cancelled && !cb->read.thread) + XFREE(MTYPE_ZEROMQ_CB, *cbp); + + return; + } + continue; + } + } + + if (written) + frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN); + + event_add_write(t->master, frrzmq_write_msg, cbp, cb->fd, + &cb->write.thread); + return; + +out_err: + flog_err(EC_LIB_ZMQ, "ZeroMQ write error: %s(%d)", strerror(errno), + errno); + if (cb->write.cb_error) + cb->write.cb_error(cb->write.arg, cb->zmqsock); +} + +int _frrzmq_event_add_write(const struct xref_eventsched *xref, + struct event_loop *master, + void (*msgfunc)(void *arg, void *zmqsock), + void (*errfunc)(void *arg, void *zmqsock), + void *arg, void *zmqsock, struct frrzmq_cb **cbp) +{ + int fd, events; + size_t len; + struct frrzmq_cb *cb; + + if (!cbp) + return -1; + if (!msgfunc) + return -1; + len = sizeof(fd); + if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len)) + return -1; + len = sizeof(events); + if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len)) + return -1; + + if (*cbp) + cb = *cbp; + else { + cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb)); + cb->read.cancelled = true; + *cbp = cb; + } + + cb->zmqsock = zmqsock; + cb->fd = fd; + cb->write.arg = arg; + cb->write.cb_msg = msgfunc; + cb->write.cb_part = NULL; + cb->write.cb_error = errfunc; + cb->write.cancelled = false; + cb->in_cb = false; + + if (events & ZMQ_POLLOUT) { + event_cancel(&cb->write.thread); + + _event_add_event(xref, master, frrzmq_write_msg, cbp, fd, + &cb->write.thread); + } else + event_add_write(master, frrzmq_write_msg, cbp, fd, + &cb->write.thread); + return 0; +} + +void frrzmq_thread_cancel(struct frrzmq_cb **cb, struct cb_core *core) +{ + if (!cb || !*cb) + return; + core->cancelled = true; + event_cancel(&core->thread); + + /* If cancelled from within a callback, don't try to free memory + * in this path. + */ + if ((*cb)->in_cb) + return; + + /* Ok to free the callback context if no more ... context. */ + if ((*cb)->read.cancelled && !(*cb)->read.thread + && (*cb)->write.cancelled && ((*cb)->write.thread == NULL)) + XFREE(MTYPE_ZEROMQ_CB, *cb); +} + +void frrzmq_check_events(struct frrzmq_cb **cbp, struct cb_core *core, + int event) +{ + struct frrzmq_cb *cb; + int events; + size_t len; + + if (!cbp) + return; + cb = (*cbp); + if (!cb || !cb->zmqsock) + return; + + len = sizeof(events); + if (zmq_getsockopt(cb->zmqsock, ZMQ_EVENTS, &events, &len)) + return; + if ((events & event) && core->thread && !core->cancelled) { + struct event_loop *tm = core->thread->master; + + event_cancel(&core->thread); + + if (event == ZMQ_POLLIN) + event_add_event(tm, frrzmq_read_msg, cbp, cb->fd, + &core->thread); + else + event_add_event(tm, frrzmq_write_msg, cbp, cb->fd, + &core->thread); + } +} |