summaryrefslogtreecommitdiffstats
path: root/lib/clplumbing/replytrack.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--lib/clplumbing/replytrack.c643
1 files changed, 643 insertions, 0 deletions
diff --git a/lib/clplumbing/replytrack.c b/lib/clplumbing/replytrack.c
new file mode 100644
index 0000000..8c7c38e
--- /dev/null
+++ b/lib/clplumbing/replytrack.c
@@ -0,0 +1,643 @@
+
+/*
+ * Reply tracking library.
+ *
+ * Copyright (c) 2007 Alan Robertson
+ * Author: Alan Robertson <alanr@unix.sh>
+ *
+ ******************************************************************
+ * This library is useful for tracking replies to multicast messages
+ * sent to cluster members. It tracks incremental membership changes
+ * according to any desired criteria, and then keeps track of when
+ * the last expected reply is received according to the dynamically
+ * updated membership as of when the message was sent out.
+ ******************************************************************
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ */
+
+#include <lha_internal.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <sys/wait.h>
+#include <sys/types.h>
+#include <signal.h>
+#include <memory.h>
+#include <clplumbing/cl_log.h>
+#include <clplumbing/replytrack.h>
+#include <clplumbing/Gmain_timeout.h>
+
+/*
+ * These are the only data items that go in our GHashTables
+ */
+struct rt_node_info {
+ char * nodename;
+ cl_uuid_t nodeid;
+};
+
+struct node_tables {
+
+ GHashTable* uuidmap; /* Keyed by uuid */
+ int uuidcount;
+ GHashTable* namemap; /* Keyed by nodename*/
+ int namecount;
+};
+struct _nodetrack {
+ struct node_tables nt;
+ int refcount;
+ nodetrack_callback_t callback;
+ gpointer user_data;
+ nodetrack_callback_t extra_callback;
+ gpointer ext_data;
+};
+
+/*
+ * Things we use to track outstanding replies
+ * This is the structure used by the replytrack_t typedef
+ */
+struct _replytrack {
+ replytrack_callback_t callback;
+ gpointer user_data;
+ unsigned timerid;
+ struct node_tables tables;
+ gboolean expectingmore;
+ nodetrack_t* membership;
+};
+
+struct _nodetrack_intersection {
+ nodetrack_t** tables;
+ int ntables;
+ nodetrack_callback_t callback;
+ gpointer user_data;
+ nodetrack_t* intersection;
+};
+
+static cl_uuid_t nulluuid;
+static int nodetrack_t_count = 0;
+static int replytrack_t_count = 0;
+static int replytrack_intersection_t_count = 0;
+
+static struct rt_node_info *
+rt_node_info_new(const char * nodename, cl_uuid_t nodeid)
+{
+ struct rt_node_info* ret;
+
+ if (!nodename) {
+ return NULL;
+ }
+ ret = MALLOCT(struct rt_node_info);
+
+ if (!ret) {
+ return ret;
+ }
+ ret->nodename = strdup(nodename);
+ if (!ret->nodename) {
+ free(ret);
+ ret = NULL;
+ return ret;
+ }
+ ret->nodeid = nodeid;
+ return ret;
+}
+
+static void
+rt_node_info_del(struct rt_node_info * ni)
+{
+ if (ni != NULL) {
+ if (ni->nodename != NULL) {
+ free(ni->nodename);
+ }
+ memset(ni, 0, sizeof(*ni));
+ free(ni);
+ }
+}
+
+/*
+ * namehash cannot be NULL, idhash cannot be NULL, and nodename cannot be NULL
+ *
+ * 'id' can be a NULL uuid, in which case it goes into only the name table
+ * 'nodename' can change over time - in which case we update our tables.
+ * It is possible for one nodename to have more than one uuid.
+ * We allow for that.
+ *
+ * Changing the uuid but keeping the nodename the same is considered to be
+ * adding a new node with the same nodename.
+ * Exception: A node with a null uuid is presumed to have acquired a proper
+ * uuid if it is later seen with a non-null UUID
+ */
+
+static gboolean
+del_node_from_hashtables(struct node_tables *t
+, const char * nodename, cl_uuid_t id)
+{
+ struct rt_node_info * entry;
+ if (cl_uuid_is_null(&id)) {
+ if ((entry = g_hash_table_lookup(t->namemap,nodename))!=NULL){
+ g_hash_table_remove(t->namemap, nodename);
+ rt_node_info_del(entry);
+ t->namecount--;
+ }
+ return TRUE;
+ }
+ if ((entry=g_hash_table_lookup(t->uuidmap, &id)) != NULL) {
+ g_hash_table_remove(t->uuidmap, &id);
+ rt_node_info_del(entry);
+ t->uuidcount--;
+ }
+ return TRUE;
+}
+
+
+static gboolean
+add_node_to_hashtables(struct node_tables * t
+, const char * nodename, cl_uuid_t id)
+{
+ struct rt_node_info* idinfo = NULL;
+
+ if (cl_uuid_is_null(&id)) {
+ /* Supplied uuid is the null UUID - insert in name table */
+ struct rt_node_info* ninfo;
+ if (g_hash_table_lookup(t->namemap, nodename) == NULL) {
+ if (NULL == (ninfo = rt_node_info_new(nodename, id))){
+ goto outofmem;
+ }
+ g_hash_table_insert(t->namemap,ninfo->nodename,ninfo);
+ t->namecount++;
+ }
+ return TRUE;
+ }
+
+ /* Supplied uuid is not the null UUID */
+
+ if (g_hash_table_lookup(t->uuidmap,&id) == NULL) {
+ /* See if a corresponding name is in name map */
+ /* If so, delete it - assume uuid was missing before */
+
+ if (g_hash_table_lookup(t->namemap, nodename) != NULL) {
+ del_node_from_hashtables(t, nodename, nulluuid);
+ }
+ /* Not yet in our uuid hash table */
+ idinfo = rt_node_info_new(nodename, id);
+ if (idinfo == NULL) {
+ goto outofmem;
+ }
+ g_hash_table_insert(t->uuidmap, &idinfo->nodeid, idinfo);
+ t->uuidcount++;
+ }
+ return TRUE;
+outofmem:
+ cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__);
+ return FALSE;
+}
+
+static gboolean
+create_new_hashtables(struct node_tables*t)
+{
+ t->namemap = g_hash_table_new(g_str_hash, g_str_equal);
+ if (t->namemap == NULL) {
+ return FALSE;
+ }
+ t->uuidmap = g_hash_table_new(cl_uuid_g_hash, cl_uuid_g_equal);
+ if (t->uuidmap == NULL) {
+ g_hash_table_destroy(t->namemap);
+ t->namemap = NULL;
+ return FALSE;
+ }
+ return TRUE;
+}
+
+static gboolean
+hashtable_destroy_rt_node_info(gpointer key, gpointer rti, gpointer unused)
+{
+ rt_node_info_del(rti);
+ rti = key = NULL;
+ return TRUE;
+}
+
+static void
+destroy_map_hashtable(GHashTable*t)
+{
+ g_hash_table_foreach_remove(t, hashtable_destroy_rt_node_info,NULL);
+ g_hash_table_destroy(t);
+ t = NULL;
+}
+
+struct tablehelp {
+ struct node_tables* t;
+ gboolean ret;
+};
+
+static void
+copy_hashtables_helper(gpointer key_unused, gpointer value
+, gpointer user_data)
+{
+ struct tablehelp * th = user_data;
+ struct rt_node_info* ni = value;
+ if (!add_node_to_hashtables(th->t, ni->nodename, ni->nodeid)) {
+ th->ret = FALSE;
+ }
+}
+
+static gboolean
+copy_hashtables(struct node_tables* tin, struct node_tables* tout)
+{
+ struct tablehelp newtables;
+ if (!create_new_hashtables(tout)){
+ return FALSE;
+ }
+ newtables.t = tout;
+ newtables.ret = TRUE;
+
+ g_hash_table_foreach(tout->namemap,copy_hashtables_helper,&newtables);
+ if (!newtables.ret) {
+ return FALSE;
+ }
+ g_hash_table_foreach(tout->uuidmap,copy_hashtables_helper,&newtables);
+ return newtables.ret;
+}
+
+static gboolean mbr_inityet = FALSE;
+static void
+init_global_membership(void)
+{
+ if (mbr_inityet) {
+ return;
+ }
+ mbr_inityet = TRUE;
+ memset(&nulluuid, 0, sizeof(nulluuid));
+}
+
+gboolean /* Call us when an expected replier joins / comes up */
+nodetrack_nodeup(nodetrack_t * mbr, const char * node, cl_uuid_t uuid)
+{
+ gboolean ret;
+ ret = add_node_to_hashtables(&mbr->nt, node, uuid);
+ if (ret && mbr->callback) {
+ mbr->callback(mbr, node, uuid, NODET_UP, mbr->user_data);
+ }
+ if (mbr->extra_callback) {
+ mbr->extra_callback(mbr, node, uuid, NODET_UP,mbr->ext_data);
+ }
+ return ret;
+}
+
+gboolean /* Call us when an expected replier goes down / away */
+nodetrack_nodedown(nodetrack_t* mbr, const char* node, cl_uuid_t uuid)
+{
+ if (mbr->callback) {
+ mbr->callback(mbr, node, uuid, NODET_DOWN, mbr->user_data);
+ }
+ if (mbr->extra_callback) {
+ mbr->extra_callback(mbr, node,uuid,NODET_DOWN,mbr->ext_data);
+ }
+ return del_node_from_hashtables(&mbr->nt, node, uuid);
+}
+
+/* This function calls the user's timeout callback */
+static gboolean
+replytrack_timeout_helper(gpointer rldata)
+{
+ replytrack_t* rl = rldata;
+ rl->expectingmore = FALSE;
+ rl->timerid = 0;
+ if (rl->callback) {
+ rl->callback(rl, rl->user_data, REPLYT_TIMEOUT);
+ }
+ return FALSE;
+}
+
+replytrack_t* /* replytrack_t constructor */
+replytrack_new(nodetrack_t * membership
+, replytrack_callback_t callback
+, unsigned long timeout_ms
+, gpointer user_data)
+{
+ replytrack_t* ret = MALLOCT(replytrack_t);
+ if (!ret) {
+ return ret;
+ }
+ if (!copy_hashtables(&membership->nt, &ret->tables)) {
+ free(ret);
+ ret = NULL;
+ return ret;
+ }
+ replytrack_t_count++;
+ ret->membership = membership;
+ ret->membership->refcount++;
+ ret->callback = callback;
+ ret->user_data = user_data;
+ ret->expectingmore = TRUE;
+ ret->timerid = 0;
+ if (timeout_ms != 0 && callback != NULL) {
+ ret->timerid = Gmain_timeout_add(timeout_ms
+ , replytrack_timeout_helper, ret);
+ }
+ return ret;
+}
+
+void /* replytrack_t destructor */
+replytrack_del(replytrack_t * rl)
+{
+ rl->membership->refcount--;
+ replytrack_t_count++;
+ if (rl->expectingmore && rl->timerid > 0) {
+ cl_log(LOG_INFO
+ , "%s: destroying replytrack while still expecting"
+ " %d replies"
+ , __FUNCTION__
+ , (rl->tables.namecount + rl->tables.uuidcount));
+ }
+ if (rl->timerid > 0) {
+ g_source_remove(rl->timerid);
+ rl->timerid = 0;
+ }
+ destroy_map_hashtable(rl->tables.namemap);
+ rl->tables.namemap=NULL;
+ destroy_map_hashtable(rl->tables.uuidmap);
+ rl->tables.uuidmap=NULL;
+ memset(&rl, 0, sizeof(rl));
+ free(rl);
+ rl=NULL;
+}
+
+gboolean /* Call replytrack_gotreply when you receive an expected reply */
+replytrack_gotreply(replytrack_t*rl, const char * node, cl_uuid_t uuid)
+{
+ gboolean lastone;
+ del_node_from_hashtables(&rl->tables, node, uuid);
+ lastone = (rl->tables.namecount + rl->tables.uuidcount) == 0;
+ if (lastone) {
+ rl->expectingmore = FALSE;
+ if (rl->timerid > 0) {
+ g_source_remove(rl->timerid);
+ rl->timerid = 0;
+ }
+ if (rl->callback){
+ rl->callback(rl, rl->user_data, REPLYT_ALLRCVD);
+ }
+ }
+ return lastone;
+}
+
+struct replytrack_iterator_data {
+ replytrack_t* rlist;
+ replytrack_iterator_t f;
+ int count;
+ gpointer user_data;
+};
+
+
+static void /* g_hash_table user-level iteration helper */
+replytrack_iterator_helper(gpointer key_unused, gpointer entry
+, gpointer user_data)
+{
+ struct replytrack_iterator_data* ri = user_data;
+ struct rt_node_info* ni = entry;
+ if (ri && ri->rlist) {
+ ++ri->count;
+ if (ri->f) {
+ ri->f(ri->rlist, ri->user_data
+ , ni->nodename, ni->nodeid);
+ }
+ }
+}
+
+
+
+int /* iterate through the outstanding expected replies */
+replytrack_outstanding_iterate(replytrack_t* rl
+, replytrack_iterator_t i, gpointer user_data)
+{
+ struct replytrack_iterator_data id;
+ id.rlist = rl;
+ id.f = i;
+ id.count = 0;
+ id.user_data = user_data;
+ g_hash_table_foreach(rl->tables.namemap, replytrack_iterator_helper
+ , &id);
+ g_hash_table_foreach(rl->tables.uuidmap, replytrack_iterator_helper
+ , &id);
+ if (id.count != (rl->tables.namecount + rl->tables.uuidcount)) {
+ cl_log(LOG_ERR
+ , "%s: iteration count %d disagrees with"
+ " (namecount %d+uuidcount %d)"
+ , __FUNCTION__, id.count
+ , rl->tables.namecount,rl->tables.uuidcount);
+ }
+ return id.count;
+}
+int /* return count of outstanding expected replies */
+replytrack_outstanding_count(replytrack_t* rl)
+{
+ return (rl->tables.namecount + rl->tables.uuidcount);
+}
+
+nodetrack_t*
+nodetrack_new(nodetrack_callback_t callback, gpointer user_data)
+{
+ nodetrack_t* ret = MALLOCT(nodetrack_t);
+ if (!mbr_inityet) {
+ init_global_membership();
+ }
+ if (!ret) {
+ return ret;
+ }
+ nodetrack_t_count++;
+ ret->refcount = 0;
+ if (!create_new_hashtables(&ret->nt)) {
+ free(ret);
+ ret = NULL;
+ }
+ ret->user_data = user_data;
+ ret->callback = callback;
+ ret->extra_callback = NULL;
+ ret->ext_data = NULL;
+ return ret;
+}
+void
+nodetrack_del(nodetrack_t * np)
+{
+ if (np->refcount) {
+ cl_log(LOG_ERR
+ , "%s: reply tracking reference count is %d"
+ , __FUNCTION__, np->refcount);
+ }
+ nodetrack_t_count--;
+ destroy_map_hashtable(np->nt.namemap);
+ np->nt.namemap=NULL;
+ destroy_map_hashtable(np->nt.uuidmap);
+ np->nt.uuidmap=NULL;
+ memset(np, 0, sizeof(*np));
+ free(np);
+}
+
+gboolean
+nodetrack_ismember(nodetrack_t* mbr, const char * name, cl_uuid_t u)
+{
+ if (cl_uuid_is_null(&u)) {
+ return(g_hash_table_lookup(mbr->nt.namemap, name) != NULL);
+ }
+ return (g_hash_table_lookup(mbr->nt.uuidmap, &u) != NULL);
+}
+
+struct nodetrack_iterator_data {
+ nodetrack_t* rlist;
+ nodetrack_iterator_t f;
+ int count;
+ gpointer user_data;
+};
+static void /* g_hash_table user-level iteration helper */
+nodetrack_iterator_helper(gpointer key_unused, gpointer entry
+, gpointer user_data)
+{
+ struct nodetrack_iterator_data* ri = user_data;
+ struct rt_node_info* ni = entry;
+ if (ri && ri->rlist) {
+ ++ri->count;
+ if (ri->f) {
+ ri->f(ri->rlist, ri->user_data
+ , ni->nodename, ni->nodeid);
+ }
+ }
+}
+
+int /* iterate through the outstanding expected replies */
+nodetrack_iterate(nodetrack_t* rl
+, nodetrack_iterator_t i, gpointer user_data)
+{
+ struct nodetrack_iterator_data id;
+ id.rlist = rl;
+ id.f = i;
+ id.count = 0;
+ id.user_data = user_data;
+ g_hash_table_foreach(rl->nt.namemap, nodetrack_iterator_helper
+ , &id);
+ g_hash_table_foreach(rl->nt.uuidmap, nodetrack_iterator_helper
+ , &id);
+ if (id.count != (rl->nt.namecount + rl->nt.uuidcount)) {
+ cl_log(LOG_ERR
+ , "%s: iteration count %d disagrees with"
+ " (namecount %d+uuidcount %d)"
+ , __FUNCTION__, id.count
+ , rl->nt.namecount,rl->nt.uuidcount);
+ }
+ return id.count;
+}
+static void
+intersection_callback
+( nodetrack_t * mbr
+, const char * node
+, cl_uuid_t u
+, nodetrack_change_t reason
+, gpointer user_data)
+{
+ nodetrack_intersection_t* it = user_data;
+ int j;
+ gboolean allfound = TRUE;
+
+ if (reason == NODET_DOWN) {
+ if (nodetrack_ismember(it->intersection, node, u)) {
+ nodetrack_nodedown(it->intersection,node,u);
+ }
+ return;
+ }
+ for (j=0; j < it->ntables && allfound; ++j) {
+ if (nodetrack_ismember(it->tables[j], node, u)) {
+ allfound = FALSE;
+ }
+ }
+ if (allfound) {
+ nodetrack_nodeup(it->intersection, node, u);
+ }
+}
+
+struct li_helper {
+ nodetrack_intersection_t* i;
+ gboolean result;
+};
+
+static void
+intersection_init_iterator(nodetrack_t* nt
+, gpointer ghelp
+, const char* node
+, cl_uuid_t uuid)
+{
+ struct li_helper* help = ghelp;
+ gboolean allfound = TRUE;
+ int j;
+
+ for (j=1; allfound && j < help->i->ntables; ++j) {
+ if (!nodetrack_ismember(help->i->tables[j]
+ , node, uuid)) {
+ allfound = FALSE;
+ }
+ }
+ if (allfound) {
+ nodetrack_nodeup(help->i->intersection, node, uuid);
+ }
+}
+
+nodetrack_intersection_t*
+nodetrack_intersection_new(nodetrack_t** tables, int ntables
+, nodetrack_callback_t callback, gpointer user_data)
+{
+ nodetrack_intersection_t* ret;
+ int j;
+ ret = MALLOCT(nodetrack_intersection_t);
+ if (!ret) {
+ return ret;
+ }
+ ret->intersection = nodetrack_new(callback, user_data);
+ if (!ret->intersection) {
+ free(ret);
+ ret = NULL;
+ return ret;
+ }
+ ret->tables = tables;
+ ret->ntables = ntables;
+ ret->callback = callback;
+ ret->user_data = user_data;
+ for (j=0; j < ntables; ++j) {
+ tables[j]->refcount ++;
+ tables[j]->ext_data = ret;
+ tables[j]->extra_callback = intersection_callback;
+ }
+ /* Initialize the intersection membership list */
+ nodetrack_iterate(tables[0], intersection_init_iterator, ret);
+ replytrack_intersection_t_count++;
+ return ret;
+}
+void
+nodetrack_intersection_del(nodetrack_intersection_t* p)
+{
+ int j;
+
+ for (j=0; j < p->ntables; ++j) {
+ p->tables[j]->refcount ++;
+ }
+ nodetrack_del(p->intersection);
+ p->intersection = NULL;
+ memset(p, 0, sizeof(*p));
+ free(p);
+ p = NULL;
+ replytrack_intersection_t_count--;
+}
+
+nodetrack_t*
+nodetrack_intersection_table(nodetrack_intersection_t*p)
+{
+ return p->intersection;
+}