diff options
Diffstat (limited to 'lib/clplumbing/cl_msg.c')
-rw-r--r-- | lib/clplumbing/cl_msg.c | 2537 |
1 files changed, 2537 insertions, 0 deletions
diff --git a/lib/clplumbing/cl_msg.c b/lib/clplumbing/cl_msg.c new file mode 100644 index 0000000..22f00e3 --- /dev/null +++ b/lib/clplumbing/cl_msg.c @@ -0,0 +1,2537 @@ +/* + * Heartbeat messaging object. + * + * Copyright (C) 2000 Alan Robertson <alanr@unix.sh> + * + * This software licensed under the GNU LGPL. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + */ + +#include <lha_internal.h> +#include <stdlib.h> +#include <stdio.h> +#include <string.h> +#include <time.h> +#include <errno.h> +#include <sys/utsname.h> +#include <ha_msg.h> +#include <unistd.h> +#include <clplumbing/cl_log.h> +#include <clplumbing/ipc.h> +#include <clplumbing/base64.h> +#include <clplumbing/netstring.h> +#include <glib.h> +#include <clplumbing/cl_uuid.h> +#include <compress.h> +#include <clplumbing/timers.h> +#include <clplumbing/cl_signal.h> + +#define MAXMSGLINE 512 +#define MINFIELDS 30 +#define NEWLINE "\n" + + +#define NEEDAUTH 1 +#define NOAUTH 0 +#define MAX_INT_LEN 64 +#define MAX_NAME_LEN 255 +#define UUID_SLEN 64 +#define MAXCHILDMSGLEN 512 + +static int compression_threshold = (128*1024); + +static enum cl_msgfmt msgfmt = MSGFMT_NVPAIR; +static gboolean use_traditional_compression = FALSE; + +const char* +FT_strings[]={ + "0", + "1", + "2", + "3", + "4", + "5", + "6", + "7", + "8", + "9" +}; + +#undef DOAUDITS +#define DOAUDITS + +#undef DOPARANOIDAUDITS +/* #define DOPARANOIDAUDITS */ + +#ifdef DOAUDITS +void ha_msg_audit(const struct ha_msg* msg); +# define AUDITMSG(msg) ha_msg_audit(msg) +# ifdef DOPARANOIDAUDITS +# define PARANOIDAUDITMSG(msg) ha_msg_audit(msg) +# else +# define PARANOIDAUDITMSG(msg) /*nothing*/ +# endif +#else +# define AUDITMSG(msg) /*nothing*/ +# define PARANOIDAUDITMSG(msg) /*nothing*/ +#endif + + +static volatile hb_msg_stats_t* msgstats = NULL; + +gboolean cl_msg_quiet_fmterr = FALSE; + +extern int netstring_format; + +static struct ha_msg* wirefmt2msg_ll(const char* s, size_t length, int need_auth); + +struct ha_msg* string2msg_ll(const char * s, size_t length, int need_auth, int depth); + +extern int struct_stringlen(size_t namlen, size_t vallen, const void* value); +extern int struct_netstringlen(size_t namlen, size_t vallen, const void* value); +extern int process_netstring_nvpair(struct ha_msg* m, const char* nvpair, int nvlen); +static char* msg2wirefmt_ll(struct ha_msg*m, size_t* len, gboolean need_compress); +extern GHashTable* CompressFuncs; + + +void +cl_set_traditional_compression(gboolean value) +{ + use_traditional_compression = value; + if (use_traditional_compression && CompressFuncs) { + cl_log(LOG_WARNING + , "Traditional compression selected" + ". Realtime behavior will likely be impacted(!)"); + cl_log(LOG_INFO + , "See %s for more information." + , HAURL("Ha.cf#traditional_compression_-_controls_compression_mode")); + } +} + +void +cl_set_compression_threshold(size_t threadhold) +{ + compression_threshold = threadhold; + +} + +void +cl_msg_setstats(volatile hb_msg_stats_t* stats) +{ + msgstats = stats; +} + +static int msg_stats_fd = -1; + +static int +cl_msg_stats_open(const char* filename) +{ + if (filename == NULL){ + cl_log(LOG_ERR, "%s: filename is NULL", __FUNCTION__); + return -1; + } + + return open(filename, O_WRONLY|O_CREAT|O_APPEND, 0644); + +} + +static int +cl_msg_stats_close(void) +{ + if (msg_stats_fd > 0){ + close(msg_stats_fd); + } + + msg_stats_fd = -1; + + return HA_OK; +} + +#define STATSFILE "/var/log/ha_msg_stats" +int +cl_msg_stats_add(longclock_t time, int size) +{ + char buf[MAXLINE]; + int len; + + if (msg_stats_fd < 0){ + msg_stats_fd = cl_msg_stats_open(STATSFILE); + if (msg_stats_fd < 0){ + cl_log(LOG_ERR, "%s:opening file failed", + __FUNCTION__); + return HA_FAIL; + } + } + + + sprintf(buf, "%lld %d\n", (long long)time, size); + len = strnlen(buf, MAXLINE); + if (write(msg_stats_fd, buf, len) == len){ + cl_msg_stats_close(); + return HA_OK; + } + + cl_msg_stats_close(); + + return HA_FAIL;; + +} + + +/* Set default messaging format */ +void +cl_set_msg_format(enum cl_msgfmt mfmt) +{ + msgfmt = mfmt; +} + +void +cl_dump_msgstats(void) +{ + if (msgstats){ + cl_log(LOG_INFO, "dumping msg stats: " + "allocmsgs=%lu", + msgstats->allocmsgs); + } + return; +} +void +list_cleanup(GList* list) +{ + size_t i; + for (i = 0; i < g_list_length(list); i++){ + char* element = g_list_nth_data(list, i); + if (element == NULL){ + cl_log(LOG_WARNING, "list_cleanup:" + "element is NULL"); + continue; + } + free(element); + } + g_list_free(list); +} + + + +/* Create a new (empty) message */ +struct ha_msg * +ha_msg_new(int nfields) +{ + struct ha_msg * ret; + int nalloc; + + ret = MALLOCT(struct ha_msg); + if (ret) { + ret->nfields = 0; + + if (nfields > MINFIELDS) { + nalloc = nfields; + } else { + nalloc = MINFIELDS; + } + + ret->nalloc = nalloc; + ret->names = (char **)calloc(sizeof(char *), nalloc); + ret->nlens = (size_t *)calloc(sizeof(size_t), nalloc); + ret->values = (void **)calloc(sizeof(void *), nalloc); + ret->vlens = (size_t *)calloc(sizeof(size_t), nalloc); + ret->types = (int*)calloc(sizeof(int), nalloc); + + if (ret->names == NULL || ret->values == NULL + || ret->nlens == NULL || ret->vlens == NULL + || ret->types == NULL) { + + cl_log(LOG_ERR, "%s" + , "ha_msg_new: out of memory for ha_msg"); + /* It is safe to give this to ha_msg_del() */ + /* at this point. It's well-enough-formed */ + ha_msg_del(ret); /*violated property*/ + ret = NULL; + }else if (msgstats) { + msgstats->allocmsgs++; + msgstats->totalmsgs++; + msgstats->lastmsg = time_longclock(); + } + } + return(ret); +} + +/* Delete (destroy) a message */ +void +ha_msg_del(struct ha_msg *msg) +{ + if (msg) { + int j; + PARANOIDAUDITMSG(msg); + if (msgstats) { + msgstats->allocmsgs--; + } + if (msg->names) { + for (j=0; j < msg->nfields; ++j) { + if (msg->names[j]) { + free(msg->names[j]); + msg->names[j] = NULL; + } + } + free(msg->names); + msg->names = NULL; + } + if (msg->values) { + for (j=0; j < msg->nfields; ++j) { + + if (msg->values[j] == NULL){ + continue; + } + + if(msg->types[j] < DIMOF(fieldtypefuncs)){ + fieldtypefuncs[msg->types[j]].memfree(msg->values[j]); + } + } + free(msg->values); + msg->values = NULL; + } + if (msg->nlens) { + free(msg->nlens); + msg->nlens = NULL; + } + if (msg->vlens) { + free(msg->vlens); + msg->vlens = NULL; + } + if (msg->types){ + free(msg->types); + msg->types = NULL; + } + msg->nfields = -1; + msg->nalloc = -1; + free(msg); + } +} +struct ha_msg* +ha_msg_copy(const struct ha_msg *msg) +{ + struct ha_msg* ret; + int j; + + + PARANOIDAUDITMSG(msg); + if (msg == NULL || (ret = ha_msg_new(msg->nalloc)) == NULL) { + return NULL; + } + + ret->nfields = msg->nfields; + + memcpy(ret->nlens, msg->nlens, sizeof(msg->nlens[0])*msg->nfields); + memcpy(ret->vlens, msg->vlens, sizeof(msg->vlens[0])*msg->nfields); + memcpy(ret->types, msg->types, sizeof(msg->types[0])*msg->nfields); + + for (j=0; j < msg->nfields; ++j) { + + if ((ret->names[j] = malloc(msg->nlens[j]+1)) == NULL) { + goto freeandleave; + } + memcpy(ret->names[j], msg->names[j], msg->nlens[j]+1); + + + if(msg->types[j] < DIMOF(fieldtypefuncs)){ + ret->values[j] = fieldtypefuncs[msg->types[j]].dup(msg->values[j], + msg->vlens[j]); + if (!ret->values[j]){ + cl_log(LOG_ERR,"duplicating the message field failed"); + goto freeandleave; + } + } + } + return ret; + +freeandleave: + /* + * ha_msg_del nicely handles partially constructed ha_msgs + * so, there's not really a memory leak here at all, but BEAM + * thinks there is. + */ + ha_msg_del(ret);/* memory leak */ ret=NULL; + return ret; +} + +#ifdef DOAUDITS +void +ha_msg_audit(const struct ha_msg* msg) +{ + int doabort = FALSE; + int j; + + if (!msg) { + return; + } + if (!msg) { + cl_log(LOG_CRIT, "Message @ %p is not allocated" + , msg); + abort(); + } + if (msg->nfields < 0) { + cl_log(LOG_CRIT, "Message @ %p has negative fields (%d)" + , msg, msg->nfields); + doabort = TRUE; + } + if (msg->nalloc < 0) { + cl_log(LOG_CRIT, "Message @ %p has negative nalloc (%d)" + , msg, msg->nalloc); + doabort = TRUE; + } + + if (!msg->names) { + cl_log(LOG_CRIT + , "Message names @ %p is not allocated" + , msg->names); + doabort = TRUE; + } + if (!msg->values) { + cl_log(LOG_CRIT + , "Message values @ %p is not allocated" + , msg->values); + doabort = TRUE; + } + if (!msg->nlens) { + cl_log(LOG_CRIT + , "Message nlens @ %p is not allocated" + , msg->nlens); + doabort = TRUE; + } + if (!msg->vlens) { + cl_log(LOG_CRIT + , "Message vlens @ %p is not allocated" + , msg->vlens); + doabort = TRUE; + } + if (doabort) { + cl_log_message(LOG_INFO,msg); + abort(); + } + for (j=0; j < msg->nfields; ++j) { + + if (msg->nlens[j] == 0){ + cl_log(LOG_ERR, "zero namelen found in msg"); + abort(); + } + + if (msg->types[j] == FT_STRING){ + if (msg->vlens[j] != strlen(msg->values[j])){ + cl_log(LOG_ERR, "stringlen does not match"); + cl_log_message(LOG_INFO,msg); + abort(); + } + } + + if (!msg->names[j]) { + cl_log(LOG_CRIT, "Message name[%d] @ 0x%p" + " is not allocated." , + j, msg->names[j]); + abort(); + } + if (msg->types[j] != FT_LIST && !msg->values[j]) { + cl_log(LOG_CRIT, "Message value [%d] @ 0x%p" + " is not allocated.", j, msg->values[j]); + cl_log_message(LOG_INFO, msg); + abort(); + } + } +} +#endif + + + +int +ha_msg_expand(struct ha_msg* msg ) +{ + char ** names ; + size_t *nlens ; + void ** values ; + size_t* vlens ; + int * types ; + int nalloc; + + if(!msg){ + cl_log(LOG_ERR, "ha_msg_expand:" + "input msg is null"); + return HA_FAIL; + } + + names = msg->names; + nlens = msg->nlens; + values = msg->values; + vlens = msg->vlens; + types = msg->types; + + nalloc = msg->nalloc + MINFIELDS; + msg->names = (char **)calloc(sizeof(char *), nalloc); + msg->nlens = (size_t *)calloc(sizeof(size_t), nalloc); + msg->values = (void **)calloc(sizeof(void *), nalloc); + msg->vlens = (size_t *)calloc(sizeof(size_t), nalloc); + msg->types= (int*)calloc(sizeof(int), nalloc); + + if (msg->names == NULL || msg->values == NULL + || msg->nlens == NULL || msg->vlens == NULL + || msg->types == NULL) { + + cl_log(LOG_ERR, "%s" + , " out of memory for ha_msg"); + return(HA_FAIL); + } + + memcpy(msg->names, names, msg->nalloc*sizeof(char *)); + memcpy(msg->nlens, nlens, msg->nalloc*sizeof(size_t)); + memcpy(msg->values, values, msg->nalloc*sizeof(void *)); + memcpy(msg->vlens, vlens, msg->nalloc*sizeof(size_t)); + memcpy(msg->types, types, msg->nalloc*sizeof(int)); + + free(names); + free(nlens); + free(values); + free(vlens); + free(types); + + msg->nalloc = nalloc; + + return HA_OK; +} + +int +cl_msg_remove_value(struct ha_msg* msg, const void* value) +{ + int j; + + if (msg == NULL || value == NULL){ + cl_log(LOG_ERR, "cl_msg_remove: invalid argument"); + return HA_FAIL; + } + + for (j = 0; j < msg->nfields; ++j){ + if (value == msg->values[j]){ + break; + } + } + if (j == msg->nfields){ + cl_log(LOG_ERR, "cl_msg_remove: field %p not found", value); + return HA_FAIL; + } + return cl_msg_remove_offset(msg, j); + +} + + +int +cl_msg_remove(struct ha_msg* msg, const char* name) +{ + int j; + + if (msg == NULL || name == NULL){ + cl_log(LOG_ERR, "cl_msg_remove: invalid argument"); + return HA_FAIL; + } + + for (j = 0; j < msg->nfields; ++j){ + if (strcmp(name, msg->names[j]) == 0){ + break; + } + } + + if (j == msg->nfields){ + cl_log(LOG_ERR, "cl_msg_remove: field %s not found", name); + return HA_FAIL; + } + return cl_msg_remove_offset(msg, j); +} + +int +cl_msg_remove_offset(struct ha_msg* msg, int offset) +{ + int j = offset; + int i; + + if (j == msg->nfields){ + cl_log(LOG_ERR, "cl_msg_remove: field %d not found", j); + return HA_FAIL; + } + + free(msg->names[j]); + fieldtypefuncs[msg->types[j]].memfree(msg->values[j]); + + for (i= j + 1; i < msg->nfields ; i++){ + msg->names[i -1] = msg->names[i]; + msg->nlens[i -1] = msg->nlens[i]; + msg->values[i -1] = msg->values[i]; + msg->vlens[i-1] = msg->vlens[i]; + msg->types[i-1] = msg->types[i]; + } + msg->nfields--; + + + return HA_OK; +} + + + +/* low level implementation for ha_msg_add + the caller is responsible to allocate/free memories + for @name and @value. + +*/ + +static int +ha_msg_addraw_ll(struct ha_msg * msg, char * name, size_t namelen, + void * value, size_t vallen, int type, int depth) +{ + + size_t startlen = sizeof(MSG_START)-1; + + + int (*addfield) (struct ha_msg* msg, char* name, size_t namelen, + void* value, size_t vallen, int depth); + + if (!msg || msg->names == NULL || (msg->values == NULL) ) { + cl_log(LOG_ERR, "ha_msg_addraw_ll: cannot add field to ha_msg"); + return(HA_FAIL); + } + + if (msg->nfields >= msg->nalloc) { + if( ha_msg_expand(msg) != HA_OK){ + cl_log(LOG_ERR, "message expanding failed"); + return(HA_FAIL); + } + + } + + if (namelen >= startlen + && name[0] == '>' + && strncmp(name, MSG_START, startlen) == 0) { + if(!cl_msg_quiet_fmterr) { + cl_log(LOG_ERR, "ha_msg_addraw_ll: illegal field"); + } + return(HA_FAIL); + } + + if (name == NULL || (value == NULL) + || namelen <= 0 || vallen < 0) { + cl_log(LOG_ERR, "ha_msg_addraw_ll: " + "cannot add name/value to ha_msg"); + return(HA_FAIL); + } + + HA_MSG_ASSERT(type < DIMOF(fieldtypefuncs)); + + addfield = fieldtypefuncs[type].addfield; + if (!addfield || + addfield(msg, name, namelen, value, vallen,depth) != HA_OK){ + cl_log(LOG_ERR, "ha_msg_addraw_ll: addfield failed"); + return(HA_FAIL); + } + + PARANOIDAUDITMSG(msg); + + return(HA_OK); + + +} + +static int +ha_msg_addraw(struct ha_msg * msg, const char * name, size_t namelen, + const void * value, size_t vallen, int type, int depth) +{ + + char *cpvalue = NULL; + char *cpname = NULL; + int ret; + + + if (namelen == 0){ + cl_log(LOG_ERR, "%s: Adding a field with 0 name length", __FUNCTION__); + return HA_FAIL; + } + + if ((cpname = malloc(namelen+1)) == NULL) { + cl_log(LOG_ERR, "ha_msg_addraw: no memory for string (name)"); + return(HA_FAIL); + } + strncpy(cpname, name, namelen); + cpname[namelen] = EOS; + + HA_MSG_ASSERT(type < DIMOF(fieldtypefuncs)); + + if (fieldtypefuncs[type].dup){ + cpvalue = fieldtypefuncs[type].dup(value, vallen); + } + if (cpvalue == NULL){ + cl_log(LOG_ERR, "ha_msg_addraw: copying message failed"); + free(cpname); + return(HA_FAIL); + } + + ret = ha_msg_addraw_ll(msg, cpname, namelen, cpvalue, vallen + , type, depth); + + if (ret != HA_OK){ + cl_log(LOG_ERR, "ha_msg_addraw(): ha_msg_addraw_ll failed"); + free(cpname); + fieldtypefuncs[type].memfree(cpvalue); + } + + return(ret); + +} + +/*Add a null-terminated name and binary value to a message*/ +int +ha_msg_addbin(struct ha_msg * msg, const char * name, + const void * value, size_t vallen) +{ + + return(ha_msg_addraw(msg, name, strlen(name), + value, vallen, FT_BINARY, 0)); + +} + +int +ha_msg_adduuid(struct ha_msg* msg, const char *name, const cl_uuid_t* u) +{ + return(ha_msg_addraw(msg, name, strlen(name), + u, sizeof(cl_uuid_t), FT_BINARY, 0)); +} + +/*Add a null-terminated name and struct value to a message*/ +int +ha_msg_addstruct(struct ha_msg * msg, const char * name, const void * value) +{ + const struct ha_msg* childmsg = (const struct ha_msg*) value; + + if (get_netstringlen(childmsg) > MAXCHILDMSGLEN + || get_stringlen(childmsg) > MAXCHILDMSGLEN) { + /*cl_log(LOG_WARNING, + "%s: childmsg too big (name=%s, nslen=%d, len=%d)." + " Use ha_msg_addstruct_compress() instead.", + __FUNCTION__, name, get_netstringlen(childmsg), + get_stringlen(childmsg)); + */ + } + + return ha_msg_addraw(msg, name, strlen(name), value, + sizeof(struct ha_msg), FT_STRUCT, 0); +} + +int +ha_msg_addstruct_compress(struct ha_msg * msg, const char * name, const void * value) +{ + + if (use_traditional_compression){ + return ha_msg_addraw(msg, name, strlen(name), value, + sizeof(struct ha_msg), FT_STRUCT, 0); + }else{ + return ha_msg_addraw(msg, name, strlen(name), value, + sizeof(struct ha_msg), FT_UNCOMPRESS, 0); + } +} + +int +ha_msg_add_int(struct ha_msg * msg, const char * name, int value) +{ + char buf[MAX_INT_LEN]; + snprintf(buf, MAX_INT_LEN, "%d", value); + return (ha_msg_add(msg, name, buf)); +} + +int +ha_msg_mod_int(struct ha_msg * msg, const char * name, int value) +{ + char buf[MAX_INT_LEN]; + snprintf(buf, MAX_INT_LEN, "%d", value); + return (cl_msg_modstring(msg, name, buf)); +} + +int +ha_msg_value_int(const struct ha_msg * msg, const char * name, int* value) +{ + const char* svalue = ha_msg_value(msg, name); + if(NULL == svalue) { + return HA_FAIL; + } + *value = atoi(svalue); + return HA_OK; +} + +int +ha_msg_add_ul(struct ha_msg * msg, const char * name, unsigned long value) +{ + char buf[MAX_INT_LEN]; + snprintf(buf, MAX_INT_LEN, "%lu", value); + return (ha_msg_add(msg, name, buf)); +} + +int +ha_msg_mod_ul(struct ha_msg * msg, const char * name, unsigned long value) +{ + char buf[MAX_INT_LEN]; + snprintf(buf, MAX_INT_LEN, "%lu", value); + return (cl_msg_modstring(msg, name, buf)); +} + +int +ha_msg_value_ul(const struct ha_msg * msg, const char * name, unsigned long* value) +{ + const char* svalue = ha_msg_value(msg, name); + if(NULL == svalue) { + return HA_FAIL; + } + *value = strtoul(svalue, NULL, 10); + return HA_OK; +} + +/* + * ha_msg_value_str_list()/ha_msg_add_str_list(): + * transform a string list suitable for putting into an ha_msg is by a convention + * of naming the fields into the following format: + * listname1=foo + * listname2=bar + * listname3=stuff + * etc. + */ + +GList* +ha_msg_value_str_list(struct ha_msg * msg, const char * name) +{ + + int i = 1; + int len = 0; + const char* value; + char* element; + GList* list = NULL; + + + if( NULL==msg||NULL==name||strnlen(name, MAX_NAME_LEN)>=MAX_NAME_LEN ){ + return NULL; + } + len = cl_msg_list_length(msg,name); + for(i=0; i<len; i++) { + value = cl_msg_list_nth_data(msg,name,i); + if (NULL == value) { + break; + } + element = g_strdup(value); + list = g_list_append(list, element); + } + return list; +} + + + +static void +pair_to_msg(gpointer key, gpointer value, gpointer user_data) +{ + struct ha_msg* msg = (struct ha_msg*)user_data; + if( HA_OK != ha_msg_add(msg, key, value)) { + cl_log(LOG_ERR, "ha_msg_add in pair_to_msg failed"); + } +} + + +static struct ha_msg* +str_table_to_msg(GHashTable* hash_table) +{ + struct ha_msg* hash_msg; + + if ( NULL == hash_table) { + return NULL; + } + + hash_msg = ha_msg_new(5); + g_hash_table_foreach(hash_table, pair_to_msg, hash_msg); + return hash_msg; +} + + +static GHashTable* +msg_to_str_table(struct ha_msg * msg) +{ + int i; + GHashTable* hash_table; + + if ( NULL == msg) { + return NULL; + } + + hash_table = g_hash_table_new(g_str_hash, g_str_equal); + + for (i = 0; i < msg->nfields; i++) { + if( FT_STRING != msg->types[i] ) { + continue; + } + g_hash_table_insert(hash_table, + g_strndup(msg->names[i],msg->nlens[i]), + g_strndup(msg->values[i],msg->vlens[i])); + } + return hash_table; +} + +GHashTable* +ha_msg_value_str_table(struct ha_msg * msg, const char * name) +{ + struct ha_msg* hash_msg; + GHashTable * hash_table = NULL; + + if (NULL == msg || NULL == name) { + return NULL; + } + + hash_msg = cl_get_struct(msg, name); + if (NULL == hash_msg) { + return NULL; + } + hash_table = msg_to_str_table(hash_msg); + return hash_table; +} + +int +ha_msg_add_str_table(struct ha_msg * msg, const char * name, + GHashTable* hash_table) +{ + struct ha_msg* hash_msg; + if (NULL == msg || NULL == name || NULL == hash_table) { + return HA_FAIL; + } + + hash_msg = str_table_to_msg(hash_table); + if( HA_OK != ha_msg_addstruct(msg, name, hash_msg)) { + ha_msg_del(hash_msg); + cl_log(LOG_ERR + , "ha_msg_addstruct in ha_msg_add_str_table failed"); + return HA_FAIL; + } + ha_msg_del(hash_msg); + return HA_OK; +} + +int +ha_msg_mod_str_table(struct ha_msg * msg, const char * name, + GHashTable* hash_table) +{ + struct ha_msg* hash_msg; + if (NULL == msg || NULL == name || NULL == hash_table) { + return HA_FAIL; + } + + hash_msg = str_table_to_msg(hash_table); + if( HA_OK != cl_msg_modstruct(msg, name, hash_msg)) { + ha_msg_del(hash_msg); + cl_log(LOG_ERR + , "ha_msg_modstruct in ha_msg_mod_str_table failed"); + return HA_FAIL; + } + ha_msg_del(hash_msg); + return HA_OK; +} + +int +cl_msg_list_add_string(struct ha_msg* msg, const char* name, const char* value) +{ + GList* list = NULL; + int ret; + + if(!msg || !name || !value){ + cl_log(LOG_ERR, "cl_msg_list_add_string: input invalid"); + return HA_FAIL; + } + + + list = g_list_append(list, UNCONST_CAST_POINTER(gpointer, value)); + if (!list){ + cl_log(LOG_ERR, "cl_msg_list_add_string: append element to" + "a glist failed"); + return HA_FAIL; + } + + ret = ha_msg_addraw(msg, name, strlen(name), list, + string_list_pack_length(list), + FT_LIST, 0); + + g_list_free(list); + + return ret; + +} + +/* Add a null-terminated name and value to a message */ +int +ha_msg_add(struct ha_msg * msg, const char * name, const char * value) +{ + if(name == NULL || value == NULL) { + return HA_FAIL; + } + return(ha_msg_nadd(msg, name, strlen(name), value, strlen(value))); +} + +/* Add a name/value pair to a message (with sizes for name and value) */ +int +ha_msg_nadd(struct ha_msg * msg, const char * name, int namelen + , const char * value, int vallen) +{ + return(ha_msg_addraw(msg, name, namelen, value, vallen, FT_STRING, 0)); + +} + +/* Add a name/value/type to a message (with sizes for name and value) */ +int +ha_msg_nadd_type(struct ha_msg * msg, const char * name, int namelen + , const char * value, int vallen, int type) +{ + return(ha_msg_addraw(msg, name, namelen, value, vallen, type, 0)); + +} + + + +/* Add a "name=value" line to the name, value pairs in a message */ +static int +ha_msg_add_nv_depth(struct ha_msg* msg, const char * nvline, + const char * bufmax, int depth) +{ + int namelen; + const char * valp; + int vallen; + + if (!nvline) { + cl_log(LOG_ERR, "ha_msg_add_nv: NULL nvline"); + return(HA_FAIL); + } + /* How many characters before the '='? */ + if ((namelen = strcspn(nvline, EQUAL)) <= 0 + || nvline[namelen] != '=') { + if (!cl_msg_quiet_fmterr) { + cl_log(LOG_WARNING + , "ha_msg_add_nv_depth: line doesn't contain '='"); + cl_log(LOG_INFO, "%s", nvline); + } + return(HA_FAIL); + } + valp = nvline + namelen +1; /* Point just *past* the '=' */ + if (valp >= bufmax){ + return HA_FAIL; + } + vallen = strcspn(valp, NEWLINE); + if ((valp + vallen) >= bufmax){ + return HA_FAIL; + } + + if (vallen == 0){ + valp = NULL; + } + /* Call ha_msg_nadd to actually add the name/value pair */ + return(ha_msg_addraw(msg, nvline, namelen, valp, vallen + , FT_STRING, depth)); + +} + +int +ha_msg_add_nv(struct ha_msg* msg, const char * nvline, + const char * bufmax) +{ + + return(ha_msg_add_nv_depth(msg, nvline, bufmax, 0)); + +} + + +static void * +cl_get_value(const struct ha_msg * msg, const char * name, + size_t * vallen, int *type) +{ + + int j; + if (!msg || !msg->names || !msg->values) { + cl_log(LOG_ERR, "%s: wrong argument (%s)", + __FUNCTION__, name); + return(NULL); + } + + PARANOIDAUDITMSG(msg); + for (j=0; j < msg->nfields; ++j) { + const char *local_name = msg->names[j]; + if (name[0] == local_name[0] + && strcmp(name, local_name) == 0) { + if (vallen){ + *vallen = msg->vlens[j]; + } + if (type){ + *type = msg->types[j]; + } + return(msg->values[j]); + } + } + return(NULL); +} + +static void * +cl_get_value_mutate(struct ha_msg * msg, const char * name, + size_t * vallen, int *type) +{ + + int j; + if (!msg || !msg->names || !msg->values) { + cl_log(LOG_ERR, "%s: wrong argument", + __FUNCTION__); + return(NULL); + } + + AUDITMSG(msg); + for (j=0; j < msg->nfields; ++j) { + if (strcmp(name, msg->names[j]) == 0) { + int tp = msg->types[j]; + if (fieldtypefuncs[tp].pregetaction){ + fieldtypefuncs[tp].pregetaction(msg, j); + } + + if (vallen){ + *vallen = msg->vlens[j]; + } + if (type){ + *type = msg->types[j]; + } + return(msg->values[j]); + } + } + return(NULL); +} + + +const void * +cl_get_binary(const struct ha_msg *msg, + const char * name, size_t * vallen) +{ + + const void *ret; + int type; + + ret = cl_get_value( msg, name, vallen, &type); + + if (ret == NULL){ + /* + cl_log(LOG_WARNING, "field %s not found", name); + cl_log_message(msg); + */ + return(NULL); + } + if ( type != FT_BINARY){ + cl_log(LOG_WARNING, "field %s is not binary", name); + cl_log_message(LOG_WARNING, msg); + return(NULL); + } + + return(ret); +} + +/* UUIDs are stored with a machine-independent byte ordering (even though it's binary) */ +int +cl_get_uuid(const struct ha_msg *msg, const char * name, cl_uuid_t* retval) +{ + const void * vret; + size_t vretsize; + + cl_uuid_clear(retval); + + if ((vret = cl_get_binary(msg, name, &vretsize)/*discouraged function*/) == NULL) { + /* But perfectly portable in this case */ + return HA_FAIL; + } + if (vretsize != sizeof(cl_uuid_t)) { + cl_log(LOG_WARNING, "Binary field %s is not a uuid.", name); + cl_log(LOG_INFO, "expecting %d bytes, got %d bytes", + (int)sizeof(cl_uuid_t), (int)vretsize); + cl_log_message(LOG_INFO, msg); + return HA_FAIL; + } + memcpy(retval, vret, sizeof(cl_uuid_t)); + return HA_OK; +} + +const char * +cl_get_string(const struct ha_msg *msg, const char *name) +{ + + const void *ret; + int type; + ret = cl_get_value( msg, name, NULL, &type); + + if (ret == NULL || type != FT_STRING){ + return(NULL); + } + + return(ret); + +} + +int +cl_get_type(const struct ha_msg *msg, const char *name) +{ + + const void *ret; + int type; + + ret = cl_get_value( msg, name, NULL, &type); + + if (ret == NULL) { + return -1; + } + if (type < 0){ + cl_log(LOG_WARNING, "field %s not a valid type" + , name); + return(-1); + } + + return(type); + +} + +/* +struct ha_msg * +cl_get_struct(const struct ha_msg *msg, const char* name) +{ + struct ha_msg* ret; + int type; + size_t vallen; + + ret = cl_get_value(msg, name, &vallen, &type); + + if (ret == NULL ){ + return(NULL); + } + + switch(type){ + + case FT_STRUCT: + break; + + default: + cl_log(LOG_ERR, "%s: field %s is not a struct (%d)", + __FUNCTION__, name, type); + return NULL; + } + + return ret; +} +*/ + + +struct ha_msg * +cl_get_struct(struct ha_msg *msg, const char* name) +{ + struct ha_msg* ret; + int type = -1; + size_t vallen; + + ret = cl_get_value_mutate(msg, name, &vallen, &type); + + if (ret == NULL ){ + return(NULL); + } + + switch(type){ + + case FT_UNCOMPRESS: + case FT_STRUCT: + break; + + default: + cl_log(LOG_ERR, "%s: field %s is not a struct (%d)", + __FUNCTION__, name, type); + return NULL; + } + + return ret; +} + + +int +cl_msg_list_length(struct ha_msg* msg, const char* name) +{ + GList* ret; + int type; + + ret = cl_get_value( msg, name, NULL, &type); + + if ( ret == NULL || type != FT_LIST){ + return -1; + } + + return g_list_length(ret); + +} + + +void* +cl_msg_list_nth_data(struct ha_msg* msg, const char* name, int n) +{ + GList* ret; + int type; + + ret = cl_get_value( msg, name, NULL, &type); + + if ( ret == NULL || type != FT_LIST){ + cl_log(LOG_WARNING, "field %s not found " + " or type mismatch", name); + return NULL; + } + + return g_list_nth_data(ret, n); + +} + +int +cl_msg_add_list(struct ha_msg* msg, const char* name, GList* list) +{ + int ret; + + if(msg == NULL|| name ==NULL || list == NULL){ + cl_log(LOG_ERR, "cl_msg_add_list:" + "invalid arguments"); + return HA_FAIL; + } + + ret = ha_msg_addraw(msg, name, strlen(name), list, + string_list_pack_length(list), + FT_LIST, 0); + + return ret; +} + +GList* +cl_msg_get_list(struct ha_msg* msg, const char* name) +{ + GList* ret; + int type; + + ret = cl_get_value( msg, name, NULL, &type); + + if ( ret == NULL || type != FT_LIST){ + cl_log(LOG_WARNING, "field %s not found " + " or type mismatch", name); + return NULL; + } + + return ret; +} + + +int +cl_msg_add_list_str(struct ha_msg* msg, const char* name, + char** buf, size_t n) +{ + GList* list = NULL; + int i; + int ret = HA_FAIL; + + if (n <= 0 || buf == NULL|| name ==NULL ||msg == NULL){ + cl_log(LOG_ERR, "%s:" + "invalid parameter(%s)", + !n <= 0?"n is negative or zero": + !buf?"buf is NULL": + !name?"name is NULL": + "msg is NULL",__FUNCTION__); + return HA_FAIL; + } + + for ( i = 0; i < n; i++){ + if (buf[i] == NULL){ + cl_log(LOG_ERR, "%s: %dth element in buf is null", + __FUNCTION__, i); + goto free_and_out; + } + list = g_list_append(list, buf[i]); + if (list == NULL){ + cl_log(LOG_ERR, "%s:adding integer to list failed", + __FUNCTION__); + goto free_and_out; + } + } + + ret = ha_msg_addraw(msg, name, strlen(name), list, + string_list_pack_length(list), + FT_LIST, 0); + + free_and_out: + if (list){ + g_list_free(list); + list = NULL; + } + return ret; +} + +static void +list_element_free(gpointer data, gpointer userdata) +{ + if (data){ + g_free(data); + } +} + +int +cl_msg_add_list_int(struct ha_msg* msg, const char* name, + int* buf, size_t n) +{ + + GList* list = NULL; + size_t i; + int ret = HA_FAIL; + + if (n <= 0 || buf == NULL|| name ==NULL ||msg == NULL){ + cl_log(LOG_ERR, "cl_msg_add_list_int:" + "invalid parameter(%s)", + !n <= 0?"n is negative or zero": + !buf?"buf is NULL": + !name?"name is NULL": + "msg is NULL"); + goto free_and_out; + } + + for ( i = 0; i < n; i++){ + char intstr[MAX_INT_LEN]; + sprintf(intstr,"%d", buf[i]); + list = g_list_append(list, g_strdup(intstr)); + if (list == NULL){ + cl_log(LOG_ERR, "cl_msg_add_list_int:" + "adding integer to list failed"); + goto free_and_out; + } + } + + ret = ha_msg_addraw(msg, name, strlen(name), list, + string_list_pack_length(list), + FT_LIST, 0); + free_and_out: + if (list){ + g_list_foreach(list,list_element_free , NULL); + g_list_free(list); + list = NULL; + } + + return ret; +} +int +cl_msg_get_list_int(struct ha_msg* msg, const char* name, + int* buf, size_t* n) +{ + GList* list; + size_t len; + int i; + GList* list_element; + + + if (n == NULL || buf == NULL|| name ==NULL ||msg == NULL){ + cl_log(LOG_ERR, "cl_msg_get_list_int:" + "invalid parameter(%s)", + !n?"n is NULL": + !buf?"buf is NULL": + !name?"name is NULL": + "msg is NULL"); + return HA_FAIL; + } + + list = cl_msg_get_list(msg, name); + if (list == NULL){ + cl_log(LOG_ERR, "cl_msg_get_list_int:" + "list of integers %s not found", name); + return HA_FAIL; + } + + len = g_list_length(list); + if (len > *n){ + cl_log(LOG_ERR, "cl_msg_get_list_int:" + "buffer too small: *n=%ld, required len=%ld", + (long)*n, (long)len); + *n = len; + return HA_FAIL; + } + + *n = len; + i = 0; + list_element = g_list_first(list); + while( list_element != NULL){ + char* intstr = list_element->data; + if (intstr == NULL){ + cl_log(LOG_ERR, "cl_msg_get_list_int:" + "element data is NULL"); + return HA_FAIL; + } + + if (sscanf(intstr,"%d", &buf[i]) != 1){ + cl_log(LOG_ERR, "cl_msg_get_list_int:" + "element data is NULL"); + return HA_FAIL; + } + + i++; + list_element = g_list_next(list_element); + } + + return HA_OK; +} + +int +cl_msg_replace_value(struct ha_msg* msg, const void *old_value, + const void* value, size_t vlen, int type) +{ + int j; + + if (msg == NULL || old_value == NULL) { + cl_log(LOG_ERR, "cl_msg_replace: invalid argument"); + return HA_FAIL; + } + + for (j = 0; j < msg->nfields; ++j){ + if (old_value == msg->values[j]){ + break; + } + } + if (j == msg->nfields){ + cl_log(LOG_ERR, "cl_msg_replace: field %p not found", old_value); + return HA_FAIL; + } + return cl_msg_replace(msg, j, value, vlen, type); +} + +/*this function is for internal use only*/ +int +cl_msg_replace(struct ha_msg* msg, int index, + const void* value, size_t vlen, int type) +{ + void * newv ; + int newlen = vlen; + int oldtype; + + PARANOIDAUDITMSG(msg); + if (msg == NULL || value == NULL) { + cl_log(LOG_ERR, "%s: NULL input.", __FUNCTION__); + return HA_FAIL; + } + + if(type >= DIMOF(fieldtypefuncs)){ + cl_log(LOG_ERR, "%s:" + "invalid type(%d)",__FUNCTION__, type); + return HA_FAIL; + } + + if (index >= msg->nfields){ + cl_log(LOG_ERR, "%s: index(%d) out of range(%d)", + __FUNCTION__,index, msg->nfields); + return HA_FAIL; + } + + oldtype = msg->types[index]; + + newv = fieldtypefuncs[type].dup(value,vlen); + if (!newv){ + cl_log(LOG_ERR, "%s: duplicating message fields failed" + "value=%p, vlen=%d, msg->names[i]=%s", + __FUNCTION__,value, (int)vlen, msg->names[index]); + return HA_FAIL; + } + + fieldtypefuncs[oldtype].memfree(msg->values[index]); + + msg->values[index] = newv; + msg->vlens[index] = newlen; + msg->types[index] = type; + PARANOIDAUDITMSG(msg); + return(HA_OK); + +} + + +static int +cl_msg_mod(struct ha_msg * msg, const char * name, + const void* value, size_t vlen, int type) +{ + int j; + int rc; + + PARANOIDAUDITMSG(msg); + if (msg == NULL || name == NULL || value == NULL) { + cl_log(LOG_ERR, "cl_msg_mod: NULL input."); + return HA_FAIL; + } + + if(type >= DIMOF(fieldtypefuncs)){ + cl_log(LOG_ERR, "cl_msg_mod:" + "invalid type(%d)", type); + return HA_FAIL; + } + + for (j=0; j < msg->nfields; ++j) { + if (strcmp(name, msg->names[j]) == 0) { + + char * newv ; + int newlen = vlen; + + if (type != msg->types[j]){ + cl_log(LOG_ERR, "%s: type mismatch(%d %d)", + __FUNCTION__, type, msg->types[j]); + return HA_FAIL; + } + + newv = fieldtypefuncs[type].dup(value,vlen); + if (!newv){ + cl_log(LOG_ERR, "duplicating message fields failed" + "value=%p, vlen=%d, msg->names[j]=%s", + value, (int)vlen, msg->names[j]); + return HA_FAIL; + } + + fieldtypefuncs[type].memfree(msg->values[j]); + msg->values[j] = newv; + msg->vlens[j] = newlen; + PARANOIDAUDITMSG(msg); + return(HA_OK); + } + } + + rc = ha_msg_nadd_type(msg, name,strlen(name), value, vlen, type); + + PARANOIDAUDITMSG(msg); + return rc; +} + +int +cl_msg_modstruct(struct ha_msg * msg, const char* name, + const struct ha_msg* value) +{ + return cl_msg_mod(msg, name, value, 0, FT_STRUCT); +} + +int +cl_msg_modbin(struct ha_msg * msg, const char* name, + const void* value, size_t vlen) +{ + return cl_msg_mod(msg, name, value, vlen, FT_BINARY); + +} +int +cl_msg_moduuid(struct ha_msg * msg, const char* name, + const cl_uuid_t* uuid) +{ + return cl_msg_mod(msg, name, uuid, sizeof(cl_uuid_t), FT_BINARY); +} + + + +/* Modify the value associated with a particular name */ +int +cl_msg_modstring(struct ha_msg * msg, const char * name, const char * value) +{ + return cl_msg_mod(msg, name, value, strlen(value), FT_STRING); +} + + + +/* Return the next message found in the stream */ +struct ha_msg * +msgfromstream(FILE * f) +{ + char buf[MAXMSGLINE]; + char * getsret; + clearerr(f); + /* Skip until we find a MSG_START (hopefully we skip nothing) */ + while(1) { + getsret = fgets(buf, sizeof(buf), f); + if (!getsret) { + break; + } + if (strcmp(buf, MSG_START) == 0) { + return msgfromstream_string(f); + + } + if (strcmp(buf, MSG_START_NETSTRING) == 0){ + return msgfromstream_netstring(f); + } + + } + + return NULL; +} + +/* Return the next message found in the stream with string format */ +struct ha_msg * +msgfromstream_string(FILE * f) +{ + char buf[MAXMSGLINE]; + const char * bufmax = buf + sizeof(buf); + struct ha_msg* ret; + char * getsret; + + + if ((ret = ha_msg_new(0)) == NULL) { + /* Getting an error with EINTR is pretty normal */ + /* (so is EOF) */ + if ( (!ferror(f) || (errno != EINTR && errno != EAGAIN)) + && !feof(f)) { + cl_log(LOG_ERR, "msgfromstream: cannot get message"); + } + return(NULL); + } + + /* Add Name=value pairs until we reach MSG_END or EOF */ + while(1) { + getsret = fgets(buf, MAXMSGLINE, f); + if (!getsret) { + break; + } + + if (strnlen(buf, MAXMSGLINE) > MAXMSGLINE - 2) { + cl_log(LOG_DEBUG + , "msgfromstream: field too long [%s]" + , buf); + } + + if (!strcmp(buf, MSG_END)) { + break; + } + + + /* Add the "name=value" string on this line to the message */ + if (ha_msg_add_nv(ret, buf, bufmax) != HA_OK) { + cl_log(LOG_ERR, "NV failure (msgfromsteam): [%s]" + , buf); + ha_msg_del(ret); ret=NULL; + return(NULL); + } + } + return(ret); +} + + +/* Return the next message found in the stream with netstring format*/ + +struct ha_msg * +msgfromstream_netstring(FILE * f) +{ + struct ha_msg * ret; + + if ((ret = ha_msg_new(0)) == NULL) { + /* Getting an error with EINTR is pretty normal */ + /* (so is EOF) */ + if ( (!ferror(f) || (errno != EINTR && errno != EAGAIN)) + && !feof(f)) { + cl_log(LOG_ERR + , "msgfromstream_netstring(): cannot get message"); + } + return(NULL); + } + + while(1) { + char* nvpair; + int nvlen; + int n; + + if (fscanf(f, "%d:", &nvlen) <= 0 || nvlen <= 0){ + return(ret); + } + + nvpair = malloc(nvlen + 2); + + if ((n =fread(nvpair, 1, nvlen + 1, f)) != nvlen + 1){ + cl_log(LOG_WARNING, "msgfromstream_netstring()" + ": Can't get enough nvpair," + "expecting %d bytes long, got %d bytes", + nvlen + 1, n); + ha_msg_del(ret); + return(NULL); + } + + process_netstring_nvpair(ret, nvpair, nvlen); + + } + +} + +static gboolean ipc_timer_expired = FALSE; + +static void cl_sigalarm_handler(int signum) +{ + if (signum == SIGALRM) { + ipc_timer_expired = TRUE; + } +} + +int +cl_ipc_wait_timeout( + IPC_Channel *chan, int (*waitfun)(IPC_Channel *chan), unsigned int timeout) +{ + int rc = IPC_FAIL; + struct sigaction old_action; + + memset(&old_action, 0, sizeof(old_action)); + cl_signal_set_simple_handler(SIGALRM, cl_sigalarm_handler, &old_action); + + ipc_timer_expired = FALSE; + + alarm(timeout); + rc = waitfun(chan); + if (rc == IPC_INTR && ipc_timer_expired) { + rc = IPC_TIMEOUT; + } + + alarm(0); /* ensure it expires */ + cl_signal_set_simple_handler(SIGALRM, old_action.sa_handler, &old_action); + + + return rc; +} + +/* Return the next message found in the IPC channel */ +static struct ha_msg* +msgfromIPC_ll(IPC_Channel * ch, int flag, unsigned int timeout, int *rc_out) +{ + int rc; + IPC_Message* ipcmsg; + struct ha_msg* hmsg; + int need_auth = flag & MSG_NEEDAUTH; + int allow_intr = flag & MSG_ALLOWINTR; + + startwait: + if(timeout > 0) { + rc = cl_ipc_wait_timeout(ch, ch->ops->waitin, timeout); + } else { + rc = ch->ops->waitin(ch); + } + + if(rc_out) { + *rc_out = rc; + } + + switch(rc) { + default: + case IPC_FAIL: + cl_perror("msgfromIPC: waitin failure"); + return NULL; + + case IPC_TIMEOUT: + return NULL; + + case IPC_BROKEN: + sleep(1); + return NULL; + + case IPC_INTR: + if ( allow_intr){ + goto startwait; + }else{ + return NULL; + } + + case IPC_OK: + break; + } + + + ipcmsg = NULL; + rc = ch->ops->recv(ch, &ipcmsg); +#if 0 + if (DEBUGPKTCONT) { + cl_log(LOG_DEBUG, "msgfromIPC: recv returns %d ipcmsg = 0x%lx" + , rc, (unsigned long)ipcmsg); + } +#endif + if(rc_out) { + *rc_out = rc; + } + + if (rc != IPC_OK) { + return NULL; + } + + hmsg = wirefmt2msg_ll((char *)ipcmsg->msg_body, ipcmsg->msg_len, need_auth); + if (ipcmsg->msg_done) { + ipcmsg->msg_done(ipcmsg); + } + + AUDITMSG(hmsg); + return hmsg; +} + +/* Return the next message found in the IPC channel */ +struct ha_msg* +msgfromIPC_timeout(IPC_Channel *ch, int flag, unsigned int timeout, int *rc_out) +{ + return msgfromIPC_ll(ch, flag, timeout, rc_out); +} + +struct ha_msg* +msgfromIPC(IPC_Channel * ch, int flag) +{ + return msgfromIPC_ll(ch, flag, 0, NULL); +} + + +struct ha_msg* +msgfromIPC_noauth(IPC_Channel * ch) +{ + int flag = 0; + + flag |= MSG_ALLOWINTR; + return msgfromIPC_ll(ch, flag, 0, NULL); +} + +/* Return the next message found in the IPC channel */ +IPC_Message * +ipcmsgfromIPC(IPC_Channel * ch) +{ + int rc; + IPC_Message* ipcmsg; + + rc = ch->ops->waitin(ch); + + switch(rc) { + default: + case IPC_FAIL: + cl_perror("msgfromIPC: waitin failure"); + return NULL; + + case IPC_BROKEN: + sleep(1); + return NULL; + + case IPC_INTR: + return NULL; + + case IPC_OK: + break; + } + + + ipcmsg = NULL; + rc = ch->ops->recv(ch, &ipcmsg); +#if 0 + if (DEBUGPKTCONT) { + cl_log(LOG_DEBUG, "msgfromIPC: recv returns %d ipcmsg = 0x%lx" + , rc, (unsigned long)ipcmsg); + } +#endif + if (rc != IPC_OK) { + return NULL; + } + + + return(ipcmsg); +} + + +/* Writes a message into a stream - used for serial lines */ +int +msg2stream(struct ha_msg* m, FILE * f) +{ + size_t len; + char * s = msg2wirefmt(m, &len); + + if (s != NULL) { + int rc = HA_OK; + if (fputs(s, f) == EOF) { + rc = HA_FAIL; + cl_perror("msg2stream: fputs failure"); + } + if (fflush(f) == EOF) { + cl_perror("msg2stream: fflush failure"); + rc = HA_FAIL; + } + free(s); + return(rc); + }else{ + return(HA_FAIL); + } +} +static void ipcmsg_done(IPC_Message* m); + +static int clmsg_ipcmsg_allocated = 0; +static int clmsg_ipcmsg_freed = 0; + +void dump_clmsg_ipcmsg_stats(void); +void +dump_clmsg_ipcmsg_stats(void) +{ + cl_log(LOG_INFO, "clmsg ipcmsg allocated=%d, freed=%d, diff=%d", + clmsg_ipcmsg_allocated, + clmsg_ipcmsg_freed, + clmsg_ipcmsg_allocated - clmsg_ipcmsg_freed); + + return; +} + +static void +ipcmsg_done(IPC_Message* m) +{ + if (!m) { + return; + } + if (m->msg_buf) { + free(m->msg_buf); + } + free(m); + m = NULL; + clmsg_ipcmsg_freed ++; +} + + + +/* + * create an ipcmsg and copy the data + */ + +IPC_Message* +wirefmt2ipcmsg(void* p, size_t len, IPC_Channel* ch) +{ + IPC_Message* ret = NULL; + + if (p == NULL){ + return(NULL); + } + + ret = MALLOCT(IPC_Message); + if (!ret) { + return(NULL); + } + + memset(ret, 0, sizeof(IPC_Message)); + + if (NULL == (ret->msg_buf = malloc(len + ch->msgpad))) { + free(ret); + return NULL; + } + ret->msg_body = (char*)ret->msg_buf + ch->msgpad; + memcpy(ret->msg_body, p, len); + + ret->msg_done = ipcmsg_done; + ret->msg_private = NULL; + ret->msg_ch = ch; + ret->msg_len = len; + + clmsg_ipcmsg_allocated ++; + + return ret; + +} + +IPC_Message* +hamsg2ipcmsg(struct ha_msg* m, IPC_Channel* ch) +{ + size_t len; + char * s = msg2wirefmt_ll(m, &len, MSG_NEEDCOMPRESS); + IPC_Message* ret = NULL; + + if (s == NULL) { + return ret; + } + ret = MALLOCT(IPC_Message); + if (!ret) { + free(s); + return ret; + } + + memset(ret, 0, sizeof(IPC_Message)); + + if (NULL == (ret->msg_buf = malloc(len + ch->msgpad))) { + free(s); + free(ret); + return NULL; + } + ret->msg_body = (char*)ret->msg_buf + ch->msgpad; + memcpy(ret->msg_body, s, len); + free(s); + + ret->msg_done = ipcmsg_done; + ret->msg_private = NULL; + ret->msg_ch = ch; + ret->msg_len = len; + + clmsg_ipcmsg_allocated ++; + + return ret; +} + +struct ha_msg* +ipcmsg2hamsg(IPC_Message*m) +{ + struct ha_msg* ret = NULL; + + + ret = wirefmt2msg(m->msg_body, m->msg_len,MSG_NEEDAUTH); + return ret; +} + +int +msg2ipcchan(struct ha_msg*m, IPC_Channel*ch) +{ + IPC_Message* imsg; + + if (m == NULL || ch == NULL) { + cl_log(LOG_ERR, "Invalid msg2ipcchan argument"); + errno = EINVAL; + return HA_FAIL; + } + + if ((imsg = hamsg2ipcmsg(m, ch)) == NULL) { + cl_log(LOG_ERR, "hamsg2ipcmsg() failure"); + return HA_FAIL; + } + + if (ch->ops->send(ch, imsg) != IPC_OK) { + if (ch->ch_status == IPC_CONNECT) { + snprintf(ch->failreason,MAXFAILREASON, + "send failed,farside_pid=%d, sendq length=%ld(max is %ld)", + ch->farside_pid, (long)ch->send_queue->current_qlen, + (long)ch->send_queue->max_qlen); + } + imsg->msg_done(imsg); + return HA_FAIL; + } + return HA_OK; +} + +static gboolean (*msg_authentication_method)(const struct ha_msg* ret) = NULL; + + +void +cl_set_oldmsgauthfunc(gboolean (*authfunc)(const struct ha_msg*)) +{ + msg_authentication_method = authfunc; +} + + + +/* Converts a string (perhaps received via UDP) into a message */ +struct ha_msg * +string2msg_ll(const char * s, size_t length, int depth, int need_auth) +{ + struct ha_msg* ret; + int startlen; + int endlen; + const char * sp = s; + const char * smax = s + length; + + + if ((ret = ha_msg_new(0)) == NULL) { + cl_log(LOG_ERR, "%s: creating new msg failed", __FUNCTION__); + return(NULL); + } + + startlen = sizeof(MSG_START)-1; + if (strncmp(sp, MSG_START, startlen) != 0) { + /* This can happen if the sender gets killed */ + /* at just the wrong time... */ + if (!cl_msg_quiet_fmterr) { + cl_log(LOG_WARNING, "string2msg_ll: no MSG_START"); + cl_log(LOG_WARNING, "%s: s=%s", __FUNCTION__, s); + cl_log(LOG_WARNING, "depth=%d", depth); + } + ha_msg_del(ret); + return(NULL); + }else{ + sp += startlen; + } + + endlen = sizeof(MSG_END)-1; + + /* Add Name=value pairs until we reach MSG_END or end of string */ + + while (*sp != EOS && strncmp(sp, MSG_END, endlen) != 0) { + + if (sp >= smax) { + cl_log(LOG_ERR, "%s: buffer overflow(sp=%p, smax=%p)", + __FUNCTION__, sp, smax); + return(NULL); + } + /* Skip over initial CR/NL things */ + sp += strspn(sp, NEWLINE); + if (sp >= smax) { + cl_log(LOG_ERR, "%s: buffer overflow after NEWLINE(sp=%p, smax=%p)", + __FUNCTION__, sp, smax); + return(NULL); + } + /* End of message marker? */ + if (strncmp(sp, MSG_END, endlen) == 0) { + break; + } + /* Add the "name=value" string on this line to the message */ + if (ha_msg_add_nv_depth(ret, sp, smax, depth) != HA_OK) { + if (!cl_msg_quiet_fmterr) { + cl_log(LOG_ERR, "NV failure (string2msg_ll):"); + cl_log(LOG_ERR, "Input string: [%s]", s); + cl_log(LOG_ERR, "sp=%s", sp); + cl_log(LOG_ERR, "depth=%d", depth); + cl_log_message(LOG_ERR,ret); + } + ha_msg_del(ret); + return(NULL); + } + if (sp >= smax) { + cl_log(LOG_ERR, "%s: buffer overflow after adding field(sp=%p, smax=%p)", + __FUNCTION__, sp, smax); + return(NULL); + } + sp += strcspn(sp, NEWLINE); + } + + if (need_auth && msg_authentication_method + && !msg_authentication_method(ret)) { + const char* from = ha_msg_value(ret, F_ORIG); + if (!cl_msg_quiet_fmterr) { + cl_log(LOG_WARNING, + "string2msg_ll: node [%s]" + " failed authentication", from ? from : "?"); + } + ha_msg_del(ret); + ret = NULL; + } + return(ret); +} + + + +struct ha_msg * +string2msg(const char * s, size_t length) +{ + return(string2msg_ll(s, length, 0, MSG_NEEDAUTH)); +} + + + + + + +/* Converts a message into a string (for sending out UDP interface) + + used in two places: + + 1.called by msg2string as a implementation for computing string for a + message provided the buffer + + 2.called by is_authentic. In this case, there are no start/end string + and the "auth" field is not included in the string + +*/ + +#define NOROOM { \ + cl_log(LOG_ERR, "%s:%d: out of memory bound" \ + ", bp=%p, buf + len=%p, len=%ld" \ + , __FUNCTION__, __LINE__ \ + , bp, buf + len, (long)len); \ + cl_log_message(LOG_ERR, m); \ + return(HA_FAIL); \ + } + +#define CHECKROOM_CONST(c) CHECKROOM_INT(STRLEN_CONST(c)) +#define CHECKROOM_STRING(s) CHECKROOM_INT(strnlen(s, len)) +#define CHECKROOM_STRING_INT(s,i) CHECKROOM_INT(strnlen(s, len)+(i)) +#define CHECKROOM_INT(i) { \ + if ((bp + (i)) > maxp) { \ + NOROOM; \ + } \ + } + + +int +msg2string_buf(const struct ha_msg *m, char* buf, size_t len +, int depth,int needhead) +{ + + char * bp = NULL; + int j; + char* maxp = buf + len; + + buf[0]=0; + bp = buf; + + if (needhead){ + CHECKROOM_CONST(MSG_START); + strcpy(bp, MSG_START); + bp += STRLEN_CONST(MSG_START); + } + + for (j=0; j < m->nfields; ++j) { + + int truelen; + int (*tostring)(char*, char*, void*, size_t, int); + + if (needhead == NOHEAD && strcmp(m->names[j], F_AUTH) == 0) { + continue; + } + + if (m->types[j] != FT_STRING){ + CHECKROOM_STRING_INT(FT_strings[m->types[j]],2); + strcat(bp, "("); + bp++; + strcat(bp, FT_strings[m->types[j]]); + bp++; + strcat(bp,")"); + bp++; + } + + CHECKROOM_STRING_INT(m->names[j],1); + strcat(bp, m->names[j]); + bp += m->nlens[j]; + strcat(bp, "="); + bp++; + + if(m->types[j] < DIMOF(fieldtypefuncs)){ + tostring = fieldtypefuncs[m->types[j]].tostring; + } else { + cl_log(LOG_ERR, "type(%d) unrecognized", m->types[j]); + return HA_FAIL; + } + if (!tostring || + (truelen = tostring(bp, maxp, m->values[j], m->vlens[j], depth)) + < 0){ + cl_log(LOG_ERR, "tostring failed for field %d", j); + return HA_FAIL; + } + + CHECKROOM_INT(truelen+1); + bp +=truelen; + + strcat(bp,"\n"); + bp++; + } + if (needhead){ + CHECKROOM_CONST(MSG_END); + strcat(bp, MSG_END); + bp += strlen(MSG_END); + } + + CHECKROOM_INT(1); + bp[0] = EOS; + + return(HA_OK); +} + + +char * +msg2string(const struct ha_msg *m) +{ + void *buf; + int len; + + AUDITMSG(m); + if (m->nfields <= 0) { + cl_log(LOG_ERR, "msg2string: Message with zero fields"); + return(NULL); + } + + len = get_stringlen(m); + + buf = malloc(len); + + if (buf == NULL) { + cl_log(LOG_ERR, "msg2string: no memory for string"); + return(NULL); + } + + if (msg2string_buf(m, buf, len ,0, NEEDHEAD) != HA_OK){ + cl_log(LOG_ERR, "msg2string: msg2string_buf failed"); + free(buf); + return(NULL); + } + + return(buf); +} + +gboolean +must_use_netstring(const struct ha_msg* msg) +{ + int i; + + for ( i = 0; i < msg->nfields; i++){ + if (msg->types[i] == FT_COMPRESS + || msg->types[i] == FT_UNCOMPRESS + || msg->types[i] == FT_STRUCT){ + return TRUE; + } + } + + return FALSE; + +} + +#define use_netstring(m) (msgfmt == MSGFMT_NETSTRING || must_use_netstring(m)) + +static char* +msg2wirefmt_ll(struct ha_msg*m, size_t* len, int flag) +{ + + int wirefmtlen; + int i; + int netstg = use_netstring(m); + + wirefmtlen = netstg ? get_netstringlen(m) : get_stringlen(m); + if (use_traditional_compression + &&(flag & MSG_NEEDCOMPRESS) + && (wirefmtlen> compression_threshold) + && cl_get_compress_fns() != NULL){ + return cl_compressmsg(m, len); + } + + if (flag & MSG_NEEDCOMPRESS){ + for (i=0 ;i < m->nfields; i++){ + int type = m->types[i]; + if (fieldtypefuncs[type].prepackaction){ + fieldtypefuncs[type].prepackaction(m,i); + } + } + } + + wirefmtlen = netstg ? get_netstringlen(m) : get_stringlen(m); + if (wirefmtlen >= MAXMSG){ + if (flag&MSG_NEEDCOMPRESS) { + if (cl_get_compress_fns() != NULL) + return cl_compressmsg(m, len); + } + cl_log(LOG_ERR, "%s: msg too big(%d)", + __FUNCTION__, wirefmtlen); + return NULL; + } + if (flag & MSG_NEEDAUTH) { + return msg2netstring(m, len); + } + return msg2wirefmt_noac(m, len); +} + +char* +msg2wirefmt(struct ha_msg*m, size_t* len){ + return msg2wirefmt_ll(m, len, MSG_NEEDAUTH|MSG_NEEDCOMPRESS); +} + +char* +msg2wirefmt_noac(struct ha_msg*m, size_t* len) +{ + if (use_netstring(m)) { + return msg2netstring_noauth(m, len); + } else { + char *tmp; + + tmp = msg2string(m); + if(tmp == NULL){ + *len = 0; + return NULL; + } + *len = strlen(tmp) + 1; + return tmp; + } +} + +static struct ha_msg* +wirefmt2msg_ll(const char* s, size_t length, int need_auth) +{ + + size_t startlen; + struct ha_msg* msg = NULL; + + + startlen = sizeof(MSG_START)-1; + + if (startlen > length){ + return NULL; + } + + if (strncmp( s, MSG_START, startlen) == 0) { + msg = string2msg_ll(s, length, 0, need_auth); + goto out; + } + + startlen = sizeof(MSG_START_NETSTRING) - 1; + + if (startlen > length){ + return NULL; + } + + if (strncmp(s, MSG_START_NETSTRING, startlen) == 0) { + msg = netstring2msg(s, length, need_auth); + goto out; + } + +out: + if (msg && is_compressed_msg(msg)){ + struct ha_msg* ret; + if ((ret = cl_decompressmsg(msg))==NULL){ + cl_log(LOG_ERR, "decompress msg failed"); + ha_msg_del(msg); + return NULL; + } + ha_msg_del(msg); + return ret; + } + return msg; + +} + + + + +struct ha_msg* +wirefmt2msg(const char* s, size_t length, int flag) +{ + return wirefmt2msg_ll(s, length, flag& MSG_NEEDAUTH); + +} + + +void +cl_log_message (int log_level, const struct ha_msg *m) +{ + int j; + + if(m == NULL) { + cl_log(log_level, "MSG: No message to dump"); + return; + } + + cl_log(log_level, "MSG: Dumping message with %d fields", m->nfields); + + for (j=0; j < m->nfields; ++j) { + + if(m->types[j] < DIMOF(fieldtypefuncs)){ + fieldtypefuncs[m->types[j]].display(log_level, j, + m->names[j], + m->values[j], + m->vlens[j]); + } + } +} + + +#ifdef TESTMAIN_MSGS +int +main(int argc, char ** argv) +{ + struct ha_msg* m; + while (!feof(stdin)) { + if ((m=controlfifo2msg(stdin)) != NULL) { + fprintf(stderr, "Got message!\n"); + if (msg2stream(m, stdout) == HA_OK) { + fprintf(stderr, "Message output OK!\n"); + }else{ + fprintf(stderr, "Could not output Message!\n"); + } + }else{ + fprintf(stderr, "Could not get message!\n"); + } + } + return(0); +} +#endif |