diff options
Diffstat (limited to 'zebra/zebra_opaque.c')
-rw-r--r-- | zebra/zebra_opaque.c | 979 |
1 files changed, 979 insertions, 0 deletions
diff --git a/zebra/zebra_opaque.c b/zebra/zebra_opaque.c new file mode 100644 index 0000000..9053395 --- /dev/null +++ b/zebra/zebra_opaque.c @@ -0,0 +1,979 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +/* + * Zebra opaque message handler module + * Copyright (c) 2020 Volta Networks, Inc. + */ + + +#include <zebra.h> +#include "lib/debug.h" +#include "lib/frr_pthread.h" +#include "lib/stream.h" +#include "zebra/debug.h" +#include "zebra/zserv.h" +#include "zebra/zebra_opaque.h" +#include "zebra/rib.h" + +/* Mem type */ +DEFINE_MTYPE_STATIC(ZEBRA, OPQ, "ZAPI Opaque Information"); + +/* Hash to hold message registration info from zapi clients */ +PREDECL_HASH(opq_regh); + +/* Registered client info */ +struct opq_client_reg { + int proto; + int instance; + uint32_t session_id; + + int flags; + + struct opq_client_reg *next; + struct opq_client_reg *prev; +}; + +/* Registration is for receiving or for notifications */ +#define OPQ_CLIENT_FLAG_RECV 0x01 +#define OPQ_CLIENT_FLAG_NOTIFY 0x02 + +/* Opaque message registration info */ +struct opq_msg_reg { + struct opq_regh_item item; + + /* Message type */ + uint32_t type; + + struct opq_client_reg *clients; +}; + +/* Registration helper prototypes */ +static uint32_t registration_hash(const struct opq_msg_reg *reg); +static int registration_compare(const struct opq_msg_reg *reg1, + const struct opq_msg_reg *reg2); + +DECLARE_HASH(opq_regh, struct opq_msg_reg, item, registration_compare, + registration_hash); + +static struct opq_regh_head opq_reg_hash; + +/* + * Globals + */ +static struct zebra_opaque_globals { + + /* Sentinel for run or start of shutdown */ + _Atomic uint32_t run; + + /* Limit number of pending, unprocessed updates */ + _Atomic uint32_t max_queued_updates; + + /* Limit number of new messages dequeued at once, to pace an + * incoming burst. + */ + uint32_t msgs_per_cycle; + + /* Stats: counters of incoming messages, errors, and yields (when + * the limit has been reached.) + */ + _Atomic uint32_t msgs_in; + _Atomic uint32_t msg_errors; + _Atomic uint32_t yields; + + /* pthread */ + struct frr_pthread *pthread; + + /* Event-delivery context 'master' for the module */ + struct event_loop *master; + + /* Event/'thread' pointer for queued zapi messages */ + struct event *t_msgs; + + /* Input fifo queue to the module, and lock to protect it. */ + pthread_mutex_t mutex; + struct stream_fifo in_fifo; + +} zo_info; + +/* Name string for debugs/logs */ +static const char LOG_NAME[] = "Zebra Opaque"; + +/* Prototypes */ + +/* Main event loop, processing incoming message queue */ +static void process_messages(struct event *event); +static int handle_opq_registration(const struct zmsghdr *hdr, + struct stream *msg); +static int handle_opq_unregistration(const struct zmsghdr *hdr, + struct stream *msg); +static int handle_opq_notif_req(const struct zmsghdr *hdr, struct stream *msg); +static int handle_opq_notif_unreg(const struct zapi_opaque_notif_info *info); +static int dispatch_opq_messages(struct stream_fifo *msg_fifo); +static struct opq_msg_reg *opq_reg_lookup(uint32_t type); +static bool opq_client_match(const struct opq_client_reg *client, + const struct zapi_opaque_reg_info *info); +static bool opq_client_notif_match(const struct opq_client_reg *client, + const struct zapi_opaque_notif_info *info); +static struct opq_msg_reg *opq_reg_alloc(uint32_t type); +static void opq_reg_free(struct opq_msg_reg **reg); +static struct opq_client_reg *opq_client_alloc(uint8_t proto, uint16_t instance, + uint32_t session_id); +static void opq_client_free(struct opq_client_reg **client); +static const char *opq_client2str(char *buf, size_t buflen, + const struct opq_client_reg *client); + +/* + * Initialize the module at startup + */ +void zebra_opaque_init(void) +{ + memset(&zo_info, 0, sizeof(zo_info)); + + pthread_mutex_init(&zo_info.mutex, NULL); + stream_fifo_init(&zo_info.in_fifo); + + zo_info.msgs_per_cycle = ZEBRA_OPAQUE_MSG_LIMIT; +} + +/* + * Start the module pthread. This step is run later than the + * 'init' step, in case zebra has fork-ed. + */ +void zebra_opaque_start(void) +{ + struct frr_pthread_attr pattr = { + .start = frr_pthread_attr_default.start, + .stop = frr_pthread_attr_default.stop + }; + + if (IS_ZEBRA_DEBUG_EVENT) + zlog_debug("%s module starting", LOG_NAME); + + /* Start pthread */ + zo_info.pthread = frr_pthread_new(&pattr, "Zebra Opaque thread", + "zebra_opaque"); + + /* Associate event 'master' */ + zo_info.master = zo_info.pthread->master; + + atomic_store_explicit(&zo_info.run, 1, memory_order_relaxed); + + /* Enqueue an initial event for the pthread */ + event_add_event(zo_info.master, process_messages, NULL, 0, + &zo_info.t_msgs); + + /* And start the pthread */ + frr_pthread_run(zo_info.pthread, NULL); +} + +/* + * Module stop, halting the dedicated pthread; called from the main pthread. + */ +void zebra_opaque_stop(void) +{ + if (IS_ZEBRA_DEBUG_EVENT) + zlog_debug("%s module stop", LOG_NAME); + + atomic_store_explicit(&zo_info.run, 0, memory_order_relaxed); + + frr_pthread_stop(zo_info.pthread, NULL); + + frr_pthread_destroy(zo_info.pthread); + + if (IS_ZEBRA_DEBUG_EVENT) + zlog_debug("%s module stop complete", LOG_NAME); +} + +/* + * Module final cleanup, called from the zebra main pthread. + */ +void zebra_opaque_finish(void) +{ + struct opq_msg_reg *reg; + struct opq_client_reg *client; + + if (IS_ZEBRA_DEBUG_EVENT) + zlog_debug("%s module shutdown", LOG_NAME); + + /* Clear out registration info */ + while ((reg = opq_regh_pop(&opq_reg_hash)) != NULL) { + client = reg->clients; + while (client) { + reg->clients = client->next; + opq_client_free(&client); + client = reg->clients; + } + + opq_reg_free(®); + } + + opq_regh_fini(&opq_reg_hash); + + pthread_mutex_destroy(&zo_info.mutex); + stream_fifo_deinit(&zo_info.in_fifo); +} + +/* + * Does this module handle (intercept) the specified zapi message type? + */ +bool zebra_opaque_handles_msgid(uint16_t id) +{ + bool ret = false; + + switch (id) { + case ZEBRA_OPAQUE_MESSAGE: + case ZEBRA_OPAQUE_REGISTER: + case ZEBRA_OPAQUE_UNREGISTER: + case ZEBRA_OPAQUE_NOTIFY: + ret = true; + break; + default: + break; + } + + return ret; +} + +/* + * Enqueue a batch of messages for processing - this is the public api + * used from the zapi processing threads. + */ +uint32_t zebra_opaque_enqueue_batch(struct stream_fifo *batch) +{ + uint32_t counter = 0; + struct stream *msg; + + /* Dequeue messages from the incoming batch, and save them + * on the module fifo. + */ + frr_with_mutex (&zo_info.mutex) { + msg = stream_fifo_pop(batch); + while (msg) { + stream_fifo_push(&zo_info.in_fifo, msg); + counter++; + msg = stream_fifo_pop(batch); + } + } + + /* Schedule module's pthread to process the batch */ + if (counter > 0) { + if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL) + zlog_debug("%s: received %u messages", + __func__, counter); + event_add_event(zo_info.master, process_messages, NULL, 0, + &zo_info.t_msgs); + } + + return counter; +} + +/* + * Pthread event loop, process the incoming message queue. + */ +static void process_messages(struct event *event) +{ + struct stream_fifo fifo; + struct stream *msg; + uint32_t i; + bool need_resched = false; + + stream_fifo_init(&fifo); + + /* Check for zebra shutdown */ + if (atomic_load_explicit(&zo_info.run, memory_order_relaxed) == 0) + goto done; + + /* + * Dequeue some messages from the incoming queue, temporarily + * save them on the local fifo + */ + frr_with_mutex (&zo_info.mutex) { + + for (i = 0; i < zo_info.msgs_per_cycle; i++) { + msg = stream_fifo_pop(&zo_info.in_fifo); + if (msg == NULL) + break; + + stream_fifo_push(&fifo, msg); + } + + /* + * We may need to reschedule, if there are still + * queued messages + */ + if (stream_fifo_head(&zo_info.in_fifo) != NULL) + need_resched = true; + } + + /* Update stats */ + atomic_fetch_add_explicit(&zo_info.msgs_in, i, memory_order_relaxed); + + /* Check for zebra shutdown */ + if (atomic_load_explicit(&zo_info.run, memory_order_relaxed) == 0) { + need_resched = false; + goto done; + } + + if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL) + zlog_debug("%s: processing %u messages", __func__, i); + + /* + * Process the messages from the temporary fifo. We send the whole + * fifo so that we can take advantage of batching internally. Note + * that registration/deregistration messages are handled here also. + */ + dispatch_opq_messages(&fifo); + +done: + + if (need_resched) { + atomic_fetch_add_explicit(&zo_info.yields, 1, + memory_order_relaxed); + event_add_event(zo_info.master, process_messages, NULL, 0, + &zo_info.t_msgs); + } + + /* This will also free any leftover messages, in the shutdown case */ + stream_fifo_deinit(&fifo); +} + +/* + * Helper to acquire/lock a client session and send the message in 's'. + * Note that 's' is enqueued for an io pthread, so don't free it + * or touch it if this returns 'true'. + */ +static bool opq_send_message(uint8_t proto, uint16_t instance, + uint32_t session_id, struct stream *s) +{ + bool ret = false; + struct zserv *zclient; + + /* + * TODO -- this isn't ideal: we're going through an + * acquire/release cycle for each client for each + * message. Replace this with a batching version. + */ + zclient = zserv_acquire_client(proto, instance, session_id); + if (zclient) { + /* + * Sending a message actually means enqueuing + * it for a zapi io pthread to send - so we + * don't touch the message after this call. + */ + zserv_send_message(zclient, s); + + zserv_release_client(zclient); + ret = true; + } + + return ret; +} + +/* + * Process (dispatch) or drop opaque messages. + */ +static int dispatch_opq_messages(struct stream_fifo *msg_fifo) +{ + struct stream *msg, *dup; + struct zmsghdr hdr; + struct zapi_opaque_msg info; + struct opq_msg_reg *reg; + int ret; + struct opq_client_reg *client; + char buf[50]; + + while ((msg = stream_fifo_pop(msg_fifo)) != NULL) { + zapi_parse_header(msg, &hdr); + hdr.length -= ZEBRA_HEADER_SIZE; + + /* Handle client registration messages */ + if (hdr.command == ZEBRA_OPAQUE_REGISTER) { + handle_opq_registration(&hdr, msg); + continue; + } else if (hdr.command == ZEBRA_OPAQUE_UNREGISTER) { + handle_opq_unregistration(&hdr, msg); + continue; + } else if (hdr.command == ZEBRA_OPAQUE_NOTIFY) { + handle_opq_notif_req(&hdr, msg); + continue; + } + + /* We only process OPAQUE messages - drop anything else */ + if (hdr.command != ZEBRA_OPAQUE_MESSAGE) + goto drop_it; + + /* Dispatch to any registered ZAPI client(s) */ + + /* Extract subtype and flags */ + ret = zclient_opaque_decode(msg, &info); + if (ret != 0) + goto drop_it; + + /* Look up registered ZAPI client(s) */ + reg = opq_reg_lookup(info.type); + if (reg == NULL) { + if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL) + zlog_debug("%s: no registrations for opaque type %u, flags %#x", + __func__, info.type, info.flags); + goto drop_it; + } + + /* Reset read pointer, since we'll be re-sending message */ + stream_set_getp(msg, 0); + + /* Send a copy of the message to all registered clients */ + for (client = reg->clients; client; client = client->next) { + dup = NULL; + + if (CHECK_FLAG(info.flags, ZAPI_OPAQUE_FLAG_UNICAST)) { + + if (client->proto != info.dest_proto || + client->instance != info.dest_instance || + client->session_id != info.dest_session_id) + continue; + + if (IS_ZEBRA_DEBUG_RECV && + IS_ZEBRA_DEBUG_DETAIL) + zlog_debug("%s: found matching unicast client %s", + __func__, + opq_client2str(buf, + sizeof(buf), + client)); + + } else { + /* Copy message if more clients */ + if (client->next) + dup = stream_dup(msg); + } + + if (IS_ZEBRA_DEBUG_SEND && IS_ZEBRA_DEBUG_DETAIL) + zlog_debug("%s: sending %s to client %s", + __func__, (dup ? "dup" : "msg"), + opq_client2str(buf, sizeof(buf), + client)); + + /* + * TODO -- this isn't ideal: we're going through an + * acquire/release cycle for each client for each + * message. Replace this with a batching version. + */ + if (opq_send_message(client->proto, client->instance, + client->session_id, + (dup ? dup : msg))) { + /* Message is gone - don't touch it */ + if (dup) + dup = NULL; + else + msg = NULL; + } else { + if (IS_ZEBRA_DEBUG_RECV && + IS_ZEBRA_DEBUG_DETAIL) + zlog_debug("%s: type %u: no zclient for %s", + __func__, info.type, + opq_client2str(buf, + sizeof(buf), + client)); + /* Registered but gone? */ + if (dup) + stream_free(dup); + } + + /* If unicast, we're done */ + if (CHECK_FLAG(info.flags, ZAPI_OPAQUE_FLAG_UNICAST)) + break; + } + +drop_it: + + if (msg) + stream_free(msg); + } + + return 0; +} + +/* Enqueue registration client object */ +static void opq_enqueue_client(struct opq_msg_reg *reg, + struct opq_client_reg *client) +{ + client->next = reg->clients; + if (reg->clients) + reg->clients->prev = client; + reg->clients = client; +} + +/* Dequeue registration client object */ +static void opq_dequeue_client(struct opq_msg_reg *reg, + struct opq_client_reg *client) +{ + if (client->prev) + client->prev->next = client->next; + if (client->next) + client->next->prev = client->prev; + if (reg->clients == client) + reg->clients = client->next; +} + +/* + * Send notification messages to any interested clients in 'reg', + * about 'server'; the sense is 'registered' (or not). + * The 'server' is not required for un-registrations. + */ +static void opq_send_notifications(const struct opq_msg_reg *reg, + const struct opq_client_reg *server, + bool registered) +{ + const struct opq_client_reg *client; + struct stream *msg = NULL; + + /* If there are any notification clients, send them a message */ + for (client = reg->clients; client; client = client->next) { + if (CHECK_FLAG(client->flags, OPQ_CLIENT_FLAG_NOTIFY)) { + msg = stream_new(ZEBRA_SMALL_PACKET_SIZE); + + if (registered) { + zclient_opaque_notif_encode(msg, reg->type, + registered, + server->proto, + server->instance, + server->session_id); + } else { + zclient_opaque_notif_encode(msg, reg->type, + registered, 0, 0, 0); + } + + /* Locate zebra client and enqueue message to it */ + if (!opq_send_message(client->proto, client->instance, + client->session_id, msg)) { + /* Error - need to free the message */ + stream_free(msg); + } + } + } +} + +/* + * Process a register/unregister message + */ +static int handle_opq_registration(const struct zmsghdr *hdr, + struct stream *msg) +{ + int ret = 0; + struct zapi_opaque_reg_info info; + struct opq_client_reg *client; + struct opq_msg_reg key, *reg; + char buf[50]; + + memset(&info, 0, sizeof(info)); + + if (zapi_opaque_reg_decode(msg, &info) < 0) { + ret = -1; + goto done; + } + + memset(&key, 0, sizeof(key)); + + key.type = info.type; + + reg = opq_regh_find(&opq_reg_hash, &key); + if (reg) { + /* Look for dup client */ + for (client = reg->clients; client != NULL; + client = client->next) { + if (opq_client_match(client, &info)) + break; + } + + if (client) { + /* Oops - duplicate registration? */ + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: duplicate opq reg for client %s", + __func__, + opq_client2str(buf, sizeof(buf), + client)); + goto done; + } + + client = opq_client_alloc(info.proto, info.instance, + info.session_id); + SET_FLAG(client->flags, OPQ_CLIENT_FLAG_RECV); + + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: client %s registers for %u", + __func__, + opq_client2str(buf, sizeof(buf), client), + info.type); + + /* Link client into registration */ + opq_enqueue_client(reg, client); + + /* Send notifications to any clients who want them */ + opq_send_notifications(reg, client, true); + + } else { + /* + * No existing registrations - create one, add the + * client, and add registration to hash. + */ + reg = opq_reg_alloc(info.type); + client = opq_client_alloc(info.proto, info.instance, + info.session_id); + SET_FLAG(client->flags, OPQ_CLIENT_FLAG_RECV); + + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: client %s registers for new reg %u", + __func__, + opq_client2str(buf, sizeof(buf), client), + info.type); + + reg->clients = client; + + opq_regh_add(&opq_reg_hash, reg); + } + +done: + + stream_free(msg); + return ret; +} + +/* + * Process a register/unregister message + */ +static int handle_opq_unregistration(const struct zmsghdr *hdr, + struct stream *msg) +{ + int ret = 0; + struct zapi_opaque_reg_info info; + struct opq_client_reg *client, *tclient; + struct opq_msg_reg key, *reg; + int scount; + char buf[50]; + + memset(&info, 0, sizeof(info)); + + if (zapi_opaque_reg_decode(msg, &info) < 0) { + ret = -1; + goto done; + } + + memset(&key, 0, sizeof(key)); + + key.type = info.type; + + reg = opq_regh_find(&opq_reg_hash, &key); + if (reg == NULL) { + /* Weird: unregister for unknown message? */ + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: unknown client %s/%u/%u unregisters for unknown type %u", + __func__, + zebra_route_string(info.proto), + info.instance, info.session_id, info.type); + goto done; + } + + /* Look for client info, count servers and notif clients too */ + client = NULL; + scount = 0; + + for (tclient = reg->clients; tclient != NULL; tclient = tclient->next) { + if (opq_client_match(tclient, &info)) + client = tclient; + + if (CHECK_FLAG(tclient->flags, OPQ_CLIENT_FLAG_RECV)) + scount++; + } + + if (client == NULL) { + /* Oops - unregister for unknown client? */ + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: unknown client %s/%u/%u unregisters for %u", + __func__, zebra_route_string(info.proto), + info.instance, info.session_id, info.type); + goto done; + } + + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: client %s unregisters for %u", + __func__, opq_client2str(buf, sizeof(buf), client), + info.type); + + opq_dequeue_client(reg, client); + opq_client_free(&client); + scount--; + + /* Is registration empty now? */ + if (reg->clients == NULL) { + + opq_regh_del(&opq_reg_hash, reg); + opq_reg_free(®); + } else if (scount == 0) { + /* Send notifications if no more servers for the message. */ + opq_send_notifications(reg, NULL, false); + } + +done: + + stream_free(msg); + return ret; +} + +/* + * Handle requests about opaque notifications. + */ +static int handle_opq_notif_req(const struct zmsghdr *hdr, struct stream *msg) +{ + int ret; + struct zapi_opaque_notif_info info = {}; + struct opq_client_reg *client; + struct opq_msg_reg key, *reg; + char buf[50]; + + ret = zclient_opaque_notif_decode(msg, &info); + if (ret < 0) + goto done; + + /* Handle deregistration */ + if (!info.reg) { + ret = handle_opq_notif_unreg(&info); + goto done; + } + + memset(&key, 0, sizeof(key)); + + key.type = info.msg_type; + + reg = opq_regh_find(&opq_reg_hash, &key); + if (reg) { + /* Look for dup client */ + for (client = reg->clients; client != NULL; + client = client->next) { + if (opq_client_notif_match(client, &info)) + break; + } + + if (client) { + /* Oops - duplicate ? */ + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: duplicate opq notif reg client %s", + __func__, opq_client2str(buf, + sizeof(buf), + client)); + goto done; + } + + client = opq_client_alloc(info.proto, info.instance, + info.session_id); + SET_FLAG(client->flags, OPQ_CLIENT_FLAG_NOTIFY); + + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: client %s registers for notif %u", + __func__, + opq_client2str(buf, sizeof(buf), client), + info.msg_type); + + /* Link client into registration */ + opq_enqueue_client(reg, client); + + /* Send notification if any registered servers */ + /* Look for a server */ + for (client = reg->clients; client != NULL; + client = client->next) { + if (CHECK_FLAG(client->flags, OPQ_CLIENT_FLAG_RECV)) + break; + } + if (client) + opq_send_notifications(reg, client, true); + + } else if (info.reg) { + /* + * No existing registrations - create one, add the + * client, and add registration to hash. + */ + reg = opq_reg_alloc(info.msg_type); + client = opq_client_alloc(info.proto, info.instance, + info.session_id); + SET_FLAG(client->flags, OPQ_CLIENT_FLAG_NOTIFY); + + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: client %s registers for new notif %u", + __func__, + opq_client2str(buf, sizeof(buf), client), + info.msg_type); + + reg->clients = client; + + opq_regh_add(&opq_reg_hash, reg); + } + +done: + stream_free(msg); + return ret; +} + +/* + * Unregister notification + */ +static int handle_opq_notif_unreg(const struct zapi_opaque_notif_info *info) +{ + int ret = 0; + struct opq_client_reg *client; + struct opq_msg_reg key, *reg; + char buf[50]; + + memset(&key, 0, sizeof(key)); + + key.type = info->msg_type; + + reg = opq_regh_find(&opq_reg_hash, &key); + if (reg == NULL) { + /* Weird: unregister for unknown message? */ + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: unknown client %s/%u/%u unregisters notif for unknown type %u", + __func__, zebra_route_string(info->proto), + info->instance, info->session_id, + info->msg_type); + goto done; + } + + /* Look for client */ + for (client = reg->clients; client != NULL; client = client->next) { + if (opq_client_notif_match(client, info)) + break; + } + + if (client == NULL) { + /* Oops - unregister for unknown client? */ + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: unknown client %s/%u/%u unregisters notif for %u", + __func__, zebra_route_string(info->proto), + info->instance, info->session_id, + info->msg_type); + goto done; + } + + if (IS_ZEBRA_DEBUG_RECV) + zlog_debug("%s: client %s unregisters notif for %u", __func__, + opq_client2str(buf, sizeof(buf), client), + info->msg_type); + + /* Dequeue client object */ + opq_dequeue_client(reg, client); + + opq_client_free(&client); + + /* Is registration empty now? */ + if (reg->clients == NULL) { + opq_regh_del(&opq_reg_hash, reg); + opq_reg_free(®); + } + +done: + + return ret; +} + +/* Compare utility for registered clients */ +static bool opq_client_match(const struct opq_client_reg *client, + const struct zapi_opaque_reg_info *info) +{ + /* look for matching client, skip notifications */ + if (client->proto == info->proto && client->instance == info->instance && + client->session_id == info->session_id && + CHECK_FLAG(client->flags, OPQ_CLIENT_FLAG_RECV)) + return true; + else + return false; +} + +/* Compare helper for clients registered for notifications */ +static bool opq_client_notif_match(const struct opq_client_reg *client, + const struct zapi_opaque_notif_info *info) +{ + /* look for matching client, only for notifications */ + if (client->proto == info->proto && client->instance == info->instance && + client->session_id == info->session_id && + CHECK_FLAG(client->flags, OPQ_CLIENT_FLAG_NOTIFY)) + return true; + else + return false; +} + +static struct opq_msg_reg *opq_reg_lookup(uint32_t type) +{ + struct opq_msg_reg key, *reg; + + memset(&key, 0, sizeof(key)); + + key.type = type; + + reg = opq_regh_find(&opq_reg_hash, &key); + + return reg; +} + +static struct opq_msg_reg *opq_reg_alloc(uint32_t type) +{ + struct opq_msg_reg *reg; + + reg = XCALLOC(MTYPE_OPQ, sizeof(struct opq_msg_reg)); + + reg->type = type; + INIT_HASH(®->item); + + return reg; +} + +static void opq_reg_free(struct opq_msg_reg **reg) +{ + XFREE(MTYPE_OPQ, (*reg)); +} + +static struct opq_client_reg *opq_client_alloc(uint8_t proto, uint16_t instance, + uint32_t session_id) +{ + struct opq_client_reg *client; + + client = XCALLOC(MTYPE_OPQ, sizeof(struct opq_client_reg)); + + client->proto = proto; + client->instance = instance; + client->session_id = session_id; + + return client; +} + +static void opq_client_free(struct opq_client_reg **client) +{ + XFREE(MTYPE_OPQ, (*client)); +} + +static const char *opq_client2str(char *buf, size_t buflen, + const struct opq_client_reg *client) +{ + char sbuf[20]; + + snprintf(buf, buflen, "%s/%u", zebra_route_string(client->proto), + client->instance); + if (client->session_id > 0) { + snprintf(sbuf, sizeof(sbuf), "/%u", client->session_id); + strlcat(buf, sbuf, buflen); + } + + return buf; +} + +/* Hash function for clients registered for messages */ +static uint32_t registration_hash(const struct opq_msg_reg *reg) +{ + return reg->type; +} + +/* Comparison function for client registrations */ +static int registration_compare(const struct opq_msg_reg *reg1, + const struct opq_msg_reg *reg2) +{ + if (reg1->type == reg2->type) + return 0; + else + return -1; +} |