summaryrefslogtreecommitdiffstats
path: root/contrib/imczmq/imczmq.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--contrib/imczmq/imczmq.c632
1 files changed, 632 insertions, 0 deletions
diff --git a/contrib/imczmq/imczmq.c b/contrib/imczmq/imczmq.c
new file mode 100644
index 0000000..77674d4
--- /dev/null
+++ b/contrib/imczmq/imczmq.c
@@ -0,0 +1,632 @@
+/* imczmq.c
+ * Copyright (C) 2016 Brian Knox
+ * Copyright (C) 2014 Rainer Gerhards
+ *
+ * Author: Brian Knox <bknox@digitalocean.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "config.h"
+#include "rsyslog.h"
+#include <assert.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include "cfsysline.h"
+#include "dirty.h"
+#include "errmsg.h"
+#include "glbl.h"
+#include "module-template.h"
+#include "msg.h"
+#include "net.h"
+#include "parser.h"
+#include "prop.h"
+#include "ruleset.h"
+#include "srUtils.h"
+#include "unicode-helper.h"
+#include <czmq.h>
+
+MODULE_TYPE_INPUT
+MODULE_TYPE_NOKEEP
+MODULE_CNFNAME("imczmq");
+
+DEF_IMOD_STATIC_DATA
+DEFobjCurrIf(glbl)
+DEFobjCurrIf(prop)
+DEFobjCurrIf(ruleset)
+
+static struct cnfparamdescr modpdescr[] = {
+ { "authenticator", eCmdHdlrBinary, 0 },
+ { "authtype", eCmdHdlrString, 0 },
+ { "servercertpath", eCmdHdlrString, 0 },
+ { "clientcertpath", eCmdHdlrString, 0 },
+};
+
+static struct cnfparamblk modpblk = {
+ CNFPARAMBLK_VERSION,
+ sizeof(modpdescr)/sizeof(struct cnfparamdescr),
+ modpdescr
+};
+
+struct modConfData_s {
+ rsconf_t *pConf;
+ instanceConf_t *root;
+ instanceConf_t *tail;
+ int authenticator;
+ char *authType;
+ char *serverCertPath;
+ char *clientCertPath;
+};
+
+struct instanceConf_s {
+ bool serverish;
+ int sockType;
+ char *sockEndpoints;
+ char *topics;
+ uchar *pszBindRuleset;
+ ruleset_t *pBindRuleset;
+ struct instanceConf_s *next;
+};
+
+struct listener_t {
+ zsock_t *sock;
+ ruleset_t *ruleset;
+};
+
+static zlist_t *listenerList;
+static modConfData_t *runModConf = NULL;
+static prop_t *s_namep = NULL;
+
+static struct cnfparamdescr inppdescr[] = {
+ { "endpoints", eCmdHdlrGetWord, 1 },
+ { "socktype", eCmdHdlrGetWord, 1 },
+ { "ruleset", eCmdHdlrGetWord, 0 },
+ { "topics", eCmdHdlrGetWord, 0 },
+};
+
+#include "im-helper.h"
+
+static struct cnfparamblk inppblk = {
+ CNFPARAMBLK_VERSION,
+ sizeof(inppdescr) / sizeof(struct cnfparamdescr),
+ inppdescr
+};
+
+static void setDefaults(instanceConf_t* iconf) {
+ iconf->serverish = true;
+ iconf->sockType = -1;
+ iconf->sockEndpoints = NULL;
+ iconf->topics = NULL;
+ iconf->pszBindRuleset = NULL;
+ iconf->pBindRuleset = NULL;
+ iconf->next = NULL;
+};
+
+static rsRetVal createInstance(instanceConf_t** pinst) {
+ DEFiRet;
+ instanceConf_t* inst;
+ CHKmalloc(inst = malloc(sizeof(instanceConf_t)));
+
+ setDefaults(inst);
+
+ if(runModConf->root == NULL || runModConf->tail == NULL) {
+ runModConf->tail = runModConf->root = inst;
+ }
+ else {
+ runModConf->tail->next = inst;
+ runModConf->tail = inst;
+ }
+ *pinst = inst;
+finalize_it:
+ RETiRet;
+}
+
+static rsRetVal addListener(instanceConf_t* iconf){
+ DEFiRet;
+
+ DBGPRINTF("imczmq: addListener called..\n");
+ struct listener_t* pData = NULL;
+ CHKmalloc(pData=(struct listener_t*)malloc(sizeof(struct listener_t)));
+ pData->ruleset = iconf->pBindRuleset;
+
+ pData->sock = zsock_new(iconf->sockType);
+ if(!pData->sock) {
+ LogError(0, RS_RET_NO_ERRCODE,
+ "imczmq: new socket failed for endpoints: %s",
+ iconf->sockEndpoints);
+ ABORT_FINALIZE(RS_RET_NO_ERRCODE);
+ }
+
+ DBGPRINTF("imczmq: created socket of type %d..\n", iconf->sockType);
+
+ if(runModConf->authType) {
+ if(!strcmp(runModConf->authType, "CURVESERVER")) {
+ DBGPRINTF("imczmq: we are a CURVESERVER\n");
+ zcert_t *serverCert = zcert_load(runModConf->serverCertPath);
+ if(!serverCert) {
+ LogError(0, NO_ERRCODE, "could not load cert %s",
+ runModConf->serverCertPath);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ zsock_set_zap_domain(pData->sock, "global");
+ zsock_set_curve_server(pData->sock, 1);
+ zcert_apply(serverCert, pData->sock);
+ zcert_destroy(&serverCert);
+ }
+ else if(!strcmp(runModConf->authType, "CURVECLIENT")) {
+ DBGPRINTF("imczmq: we are a CURVECLIENT\n");
+ zcert_t *serverCert = zcert_load(runModConf->serverCertPath);
+ if(!serverCert) {
+ LogError(0, NO_ERRCODE, "could not load cert %s",
+ runModConf->serverCertPath);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ const char *server_key = zcert_public_txt(serverCert);
+ zcert_destroy(&serverCert);
+ zsock_set_curve_serverkey(pData->sock, server_key);
+
+ zcert_t *clientCert = zcert_load(runModConf->clientCertPath);
+ if(!clientCert) {
+ LogError(0, NO_ERRCODE, "could not load cert %s",
+ runModConf->clientCertPath);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+
+ zcert_apply(clientCert, pData->sock);
+ zcert_destroy(&clientCert);
+ }
+
+ }
+
+ switch(iconf->sockType) {
+ case ZMQ_SUB:
+#if defined(ZMQ_DISH)
+ case ZMQ_DISH:
+#endif
+ iconf->serverish = false;
+ break;
+ case ZMQ_PULL:
+#if defined(ZMQ_GATHER)
+ case ZMQ_GATHER:
+#endif
+ case ZMQ_ROUTER:
+#if defined(ZMQ_SERVER)
+ case ZMQ_SERVER:
+#endif
+ iconf->serverish = true;
+ break;
+ }
+
+ if(iconf->topics) {
+ // A zero-length topic means subscribe to everything
+ if(!*iconf->topics && iconf->sockType == ZMQ_SUB) {
+ DBGPRINTF("imczmq: subscribing to all topics\n");
+ zsock_set_subscribe(pData->sock, "");
+ }
+
+ char topic[256];
+ while(*iconf->topics) {
+ char *delimiter = strchr(iconf->topics, ',');
+ if(!delimiter) {
+ delimiter = iconf->topics + strlen(iconf->topics);
+ }
+ memcpy (topic, iconf->topics, delimiter - iconf->topics);
+ topic[delimiter-iconf->topics] = 0;
+ DBGPRINTF("imczmq: subscribing to %s\n", topic);
+ if(iconf->sockType == ZMQ_SUB) {
+ zsock_set_subscribe (pData->sock, topic);
+ }
+#if defined(ZMQ_DISH)
+ else if(iconf->sockType == ZMQ_DISH) {
+ int rc = zsock_join (pData->sock, topic);
+ if(rc != 0) {
+ LogError(0, NO_ERRCODE, "could not join group %s", topic);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ }
+#endif
+ if(*delimiter == 0) {
+ break;
+ }
+ iconf->topics = delimiter + 1;
+ }
+ }
+
+ int rc = zsock_attach(pData->sock, (const char*)iconf->sockEndpoints,
+ iconf->serverish);
+ if (rc == -1) {
+ LogError(0, NO_ERRCODE, "zsock_attach to %s failed",
+ iconf->sockEndpoints);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+
+ DBGPRINTF("imczmq: attached socket to %s\n", iconf->sockEndpoints);
+
+ rc = zlist_append(listenerList, (void *)pData);
+ if(rc != 0) {
+ LogError(0, NO_ERRCODE, "could not append listener");
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+finalize_it:
+ if(iRet != RS_RET_OK) {
+ free(pData);
+ }
+ RETiRet;
+}
+
+static rsRetVal rcvData(void){
+ DEFiRet;
+
+ if(!listenerList) {
+ listenerList = zlist_new();
+ if(!listenerList) {
+ LogError(0, NO_ERRCODE, "could not allocate list");
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ }
+
+ zactor_t *authActor = NULL;
+
+ if(runModConf->authenticator == 1) {
+ authActor = zactor_new(zauth, NULL);
+ zstr_sendx(authActor, "CURVE", runModConf->clientCertPath, NULL);
+ zsock_wait(authActor);
+ }
+
+ instanceConf_t *inst;
+ for(inst = runModConf->root; inst != NULL; inst=inst->next) {
+ CHKiRet(addListener(inst));
+ }
+
+ zpoller_t *poller = zpoller_new(NULL);
+ if(!poller) {
+ LogError(0, NO_ERRCODE, "could not create poller");
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ DBGPRINTF("imczmq: created poller\n");
+
+ struct listener_t *pData;
+
+ pData = zlist_first(listenerList);
+ if(!pData) {
+ LogError(0, NO_ERRCODE, "imczmq: no listeners were "
+ "started, input not activated.\n");
+ ABORT_FINALIZE(RS_RET_NO_RUN);
+ }
+
+ while(pData) {
+ int rc = zpoller_add(poller, pData->sock);
+ if(rc != 0) {
+ LogError(0, NO_ERRCODE, "imczmq: could not add "
+ "socket to poller, input not activated.\n");
+ ABORT_FINALIZE(RS_RET_NO_RUN);
+ }
+ pData = zlist_next(listenerList);
+ }
+
+ zsock_t *which = (zsock_t *)zpoller_wait(poller, -1);
+ while(which) {
+ if (zpoller_terminated(poller)) {
+ break;
+ }
+ pData = zlist_first(listenerList);
+ while(pData->sock != which) {
+ pData = zlist_next(listenerList);
+ }
+
+ if(which == pData->sock) {
+ DBGPRINTF("imczmq: found matching socket\n");
+ }
+
+ zframe_t *frame = zframe_recv(which);
+ char *buf = NULL;
+
+ if (frame != NULL)
+ buf = zframe_strdup(frame);
+
+ zframe_destroy(&frame);
+
+ if(buf == NULL) {
+ DBGPRINTF("imczmq: null buffer\n");
+ continue;
+ }
+ smsg_t *pMsg;
+ if(msgConstruct(&pMsg) == RS_RET_OK) {
+ MsgSetRawMsg(pMsg, buf, strlen(buf));
+ MsgSetInputName(pMsg, s_namep);
+ MsgSetHOSTNAME(pMsg, glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName()));
+ MsgSetRcvFrom(pMsg, glbl.GetLocalHostNameProp());
+ MsgSetRcvFromIP(pMsg, glbl.GetLocalHostIP());
+ MsgSetMSGoffs(pMsg, 0);
+ MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY);
+ MsgSetRuleset(pMsg, pData->ruleset);
+ pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME;
+ submitMsg2(pMsg);
+ }
+
+ free(buf);
+ which = (zsock_t *)zpoller_wait(poller, -1);
+ }
+finalize_it:
+ zpoller_destroy(&poller);
+ pData = zlist_first(listenerList);
+ while(pData) {
+ zsock_destroy(&pData->sock);
+ free(pData->ruleset);
+ pData = zlist_next(listenerList);
+ }
+ zlist_destroy(&listenerList);
+ zactor_destroy(&authActor);
+ RETiRet;
+}
+
+BEGINrunInput
+CODESTARTrunInput
+ iRet = rcvData();
+ENDrunInput
+
+
+BEGINwillRun
+CODESTARTwillRun
+ CHKiRet(prop.Construct(&s_namep));
+ CHKiRet(prop.SetString(s_namep,
+ UCHAR_CONSTANT("imczmq"),
+ sizeof("imczmq") - 1));
+
+ CHKiRet(prop.ConstructFinalize(s_namep));
+
+finalize_it:
+ENDwillRun
+
+
+BEGINafterRun
+CODESTARTafterRun
+ if(s_namep != NULL) {
+ prop.Destruct(&s_namep);
+ }
+ENDafterRun
+
+
+BEGINmodExit
+CODESTARTmodExit
+ objRelease(glbl, CORE_COMPONENT);
+ objRelease(prop, CORE_COMPONENT);
+ objRelease(ruleset, CORE_COMPONENT);
+ENDmodExit
+
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ if(eFeat == sFEATURENonCancelInputTermination) {
+ iRet = RS_RET_OK;
+ }
+ENDisCompatibleWithFeature
+
+
+BEGINbeginCnfLoad
+CODESTARTbeginCnfLoad
+ runModConf = pModConf;
+ runModConf->pConf = pConf;
+ runModConf->authenticator = 0;
+ runModConf->authType = NULL;
+ runModConf->serverCertPath = NULL;
+ runModConf->clientCertPath = NULL;
+ENDbeginCnfLoad
+
+
+BEGINsetModCnf
+ struct cnfparamvals* pvals = NULL;
+ int i;
+CODESTARTsetModCnf
+ pvals = nvlstGetParams(lst, &modpblk, NULL);
+ if(NULL == pvals) {
+ LogError(0, RS_RET_MISSING_CNFPARAMS,
+ "imczmq: error processing module "
+ "config parameters ['module(...)']");
+
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ for(i=0; i < modpblk.nParams; ++i) {
+ if(!pvals[i].bUsed) {
+ continue;
+ }
+ if(!strcmp(modpblk.descr[i].name, "authenticator")) {
+ runModConf->authenticator = (int)pvals[i].val.d.n;
+ }
+ else if(!strcmp(modpblk.descr[i].name, "authtype")) {
+ runModConf->authType = es_str2cstr(pvals[i].val.d.estr, NULL);
+ }
+ else if(!strcmp(modpblk.descr[i].name, "servercertpath")) {
+ runModConf->serverCertPath = es_str2cstr(pvals[i].val.d.estr, NULL);
+ }
+ else if(!strcmp(modpblk.descr[i].name, "clientcertpath")) {
+ runModConf->clientCertPath = es_str2cstr(pvals[i].val.d.estr, NULL);
+ }
+ else {
+ LogError(0, RS_RET_INVALID_PARAMS,
+ "imczmq: config error, unknown "
+ "param %s in setModCnf\n",
+ modpblk.descr[i].name);
+ }
+ }
+
+ DBGPRINTF("imczmq: authenticator set to %d\n", runModConf->authenticator);
+ DBGPRINTF("imczmq: authType set to %s\n", runModConf->authType);
+ DBGPRINTF("imczmq: serverCertPath set to %s\n", runModConf->serverCertPath);
+ DBGPRINTF("imczmq: clientCertPath set to %s\n", runModConf->clientCertPath);
+
+finalize_it:
+ if(pvals != NULL) {
+ cnfparamvalsDestruct(pvals, &modpblk);
+ }
+ENDsetModCnf
+
+
+BEGINendCnfLoad
+CODESTARTendCnfLoad
+ENDendCnfLoad
+
+
+static inline void
+std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, instanceConf_t *inst)
+{
+ LogError(0, NO_ERRCODE,
+ "imczmq: ruleset '%s' for socket %s not found - "
+ "using default ruleset instead", inst->pszBindRuleset,
+ inst->sockEndpoints);
+}
+
+
+BEGINcheckCnf
+instanceConf_t* inst;
+CODESTARTcheckCnf
+ for(inst = pModConf->root; inst!=NULL; inst=inst->next) {
+ std_checkRuleset(pModConf, inst);
+ }
+ENDcheckCnf
+
+
+BEGINactivateCnfPrePrivDrop
+CODESTARTactivateCnfPrePrivDrop
+ runModConf = pModConf;
+ putenv((char*)"ZSYS_SIGHANDLER=false");
+ENDactivateCnfPrePrivDrop
+
+
+BEGINactivateCnf
+CODESTARTactivateCnf
+ENDactivateCnf
+
+
+BEGINfreeCnf
+ instanceConf_t *inst, *inst_r;
+CODESTARTfreeCnf
+ free(pModConf->authType);
+ free(pModConf->serverCertPath);
+ free(pModConf->clientCertPath);
+ for (inst = pModConf->root ; inst != NULL ; ) {
+ free(inst->pszBindRuleset);
+ free(inst->sockEndpoints);
+ inst_r = inst;
+ inst = inst->next;
+ free(inst_r);
+ }
+
+ENDfreeCnf
+
+
+BEGINnewInpInst
+ struct cnfparamvals* pvals;
+ instanceConf_t* inst;
+ int i;
+CODESTARTnewInpInst
+ DBGPRINTF("newInpInst (imczmq)\n");
+
+ pvals = nvlstGetParams(lst, &inppblk, NULL);
+ if(NULL==pvals) {
+ LogError(0, RS_RET_MISSING_CNFPARAMS,
+ "imczmq: required parameters are missing\n");
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ if(Debug) {
+ DBGPRINTF("imczmq: input param blk:\n");
+ cnfparamsPrint(&inppblk, pvals);
+ }
+
+ CHKiRet(createInstance(&inst));
+
+ for(i = 0 ; i < inppblk.nParams ; ++i) {
+ if(!pvals[i].bUsed) {
+ continue;
+ }
+
+ if(!strcmp(inppblk.descr[i].name, "ruleset")) {
+ inst->pszBindRuleset = (uchar *)es_str2cstr(pvals[i].val.d.estr, NULL);
+ }
+ else if(!strcmp(inppblk.descr[i].name, "endpoints")) {
+ inst->sockEndpoints = es_str2cstr(pvals[i].val.d.estr, NULL);
+ }
+ else if(!strcmp(inppblk.descr[i].name, "topics")) {
+ inst->topics = es_str2cstr(pvals[i].val.d.estr, NULL);
+ }
+ else if(!strcmp(inppblk.descr[i].name, "socktype")){
+ char *stringType = es_str2cstr(pvals[i].val.d.estr, NULL);
+ if( NULL == stringType ){
+ LogError(0, RS_RET_CONFIG_ERROR,
+ "imczmq: out of memory error copying sockType param");
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ }
+
+ if(!strcmp("PULL", stringType)) {
+ inst->sockType = ZMQ_PULL;
+ }
+#if defined(ZMQ_GATHER)
+ else if(!strcmp("GATHER", stringType)) {
+ inst->sockType = ZMQ_GATHER;
+ }
+#endif
+ else if(!strcmp("SUB", stringType)) {
+ inst->sockType = ZMQ_SUB;
+ }
+#if defined(ZMQ_DISH)
+ else if(!strcmp("DISH", stringType)) {
+ inst->sockType = ZMQ_DISH;
+ }
+#endif
+ else if(!strcmp("ROUTER", stringType)) {
+ inst->sockType = ZMQ_ROUTER;
+ }
+#if defined(ZMQ_SERVER)
+ else if(!strcmp("SERVER", stringType)) {
+ inst->sockType = ZMQ_SERVER;
+ }
+#endif
+ free(stringType);
+
+ }
+ else {
+ LogError(0, NO_ERRCODE,
+ "imczmq: program error, non-handled "
+ "param '%s'\n", inppblk.descr[i].name);
+ }
+ }
+finalize_it:
+CODE_STD_FINALIZERnewInpInst
+ cnfparamvalsDestruct(pvals, &inppblk);
+ENDnewInpInst
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_IMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_QUERIES
+CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
+CODEqueryEtryPt_STD_CONF2_PREPRIVDROP_QUERIES
+CODEqueryEtryPt_STD_CONF2_IMOD_QUERIES
+CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
+ENDqueryEtryPt
+
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION;
+CODEmodInit_QueryRegCFSLineHdlr
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
+ CHKiRet(objUse(prop, CORE_COMPONENT));
+ CHKiRet(objUse(ruleset, CORE_COMPONENT));
+ENDmodInit
+