summaryrefslogtreecommitdiffstats
path: root/contrib/mmdarwin/mmdarwin.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--contrib/mmdarwin/mmdarwin.c953
1 files changed, 953 insertions, 0 deletions
diff --git a/contrib/mmdarwin/mmdarwin.c b/contrib/mmdarwin/mmdarwin.c
new file mode 100644
index 0000000..e36dbde
--- /dev/null
+++ b/contrib/mmdarwin/mmdarwin.c
@@ -0,0 +1,953 @@
+/* Copyright 2019 Advens
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "config.h"
+#include "rsyslog.h"
+#include <stdio.h>
+#include <stdarg.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <signal.h>
+#include <errno.h>
+#include <unistd.h>
+#include <stdint.h>
+#include <pthread.h>
+#include "conf.h"
+#include "syslogd-types.h"
+#include "srUtils.h"
+#include "template.h"
+#include "module-template.h"
+#include "errmsg.h"
+#include "parserif.h"
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/un.h>
+#include <sys/socket.h>
+#include <uuid/uuid.h>
+#include <json.h>
+
+#include "protocol.h" /* custom file written for Darwin */
+
+#define JSON_DEFAULT_CONTAINER "!mmdarwin"
+#define JSON_DARWIN_ID "darwin_id"
+#define INVLD_SOCK -1
+#define INITIAL_BUFFER_SIZE 32
+#define BUFFER_DEFAULT_MAX_SIZE 65536
+
+MODULE_TYPE_OUTPUT
+MODULE_TYPE_NOKEEP
+MODULE_CNFNAME("mmdarwin")
+
+DEFobjCurrIf(glbl)
+DEF_OMOD_STATIC_DATA
+
+typedef struct dyn_buffer_t
+{
+ char *buffer;
+ size_t bufferAllocSize;
+ size_t bufferMsgSize;
+ size_t bufferMaxSize;
+} dyn_buffer;
+
+/* config variables */
+typedef struct _instanceData
+{
+ char *pUUIDKey; /* the key to the UUID generated by an mmdarwin instance */
+ char *pCertitudeKey; /* the key name to save in the enriched log
+ line the certitude obtained from Darwin */
+ uchar *pSockName; /* the socket path of the filter which will be used by
+ Darwin */
+ unsigned long long int filterCode; /* the filter code associated to the filter which will be used
+ by Darwin */
+ enum darwin_filter_response_type response; /* the type of response for Darwin: no / back / darwin / both */
+ struct
+ {
+ int nmemb;
+ char **name;
+ char **varname;
+ } fieldList; /* our keys (fields) to be extracted from the JSON-parsed log line */
+ unsigned int socketMaxUse;
+ sbool sendPartial;
+} instanceData;
+
+typedef struct wrkrInstanceData
+{
+ instanceData *pData;
+ int sock; /* the socket of the filter which will be used by Darwin */
+ struct sockaddr_un addr; /* the sockaddr_un used to connect to the Darwin filter */
+ uint8_t pktSentSocket;
+ dyn_buffer darwinBody; /* the body object used (and reused) to hold data to send to Darwin */
+ dyn_buffer fieldBuffer;
+} wrkrInstanceData_t;
+
+struct modConfData_s
+{
+ /* our overall config object */
+ rsconf_t *pConf;
+ const char *container;
+};
+
+/* modConf ptr to use for the current load process */
+static modConfData_t *loadModConf = NULL;
+/* modConf ptr to use for the current exec process */
+static modConfData_t *runModConf = NULL;
+
+/* module-global parameters */
+static struct cnfparamdescr modpdescr[] = {
+ {"container", eCmdHdlrGetWord, 0},
+};
+static struct cnfparamblk modpblk =
+ {CNFPARAMBLK_VERSION,
+ sizeof(modpdescr) / sizeof(struct cnfparamdescr),
+ modpdescr};
+
+/* tables for interfacing with the v6 config system
+ * action (instance) parameters */
+static struct cnfparamdescr actpdescr[] = {
+ {"key", eCmdHdlrGetWord, CNFPARAM_REQUIRED},
+ {"socketpath", eCmdHdlrGetWord, CNFPARAM_REQUIRED},
+ {"fields", eCmdHdlrArray, CNFPARAM_REQUIRED},
+ {"filtercode", eCmdHdlrGetWord, 0}, /* optional parameter */
+ {"response", eCmdHdlrGetWord, 0}, /* optional parameter */
+ {"send_partial", eCmdHdlrBinary, 0}, /* optional parameter */
+ {"socket_max_use", eCmdHdlrNonNegInt, 0}, /* optional parameter - will disappear in future updates */
+};
+static struct cnfparamblk actpblk = {
+ CNFPARAMBLK_VERSION,
+ sizeof(actpdescr) / sizeof(struct cnfparamdescr),
+ actpdescr};
+
+/* custom functions */
+#define min(a, b) \
+ ({ __typeof__ (a) _a = (a); \
+ __typeof__ (b) _b = (b); \
+ _a < _b ? _a : _b; })
+
+static rsRetVal openSocket(wrkrInstanceData_t *pWrkrData);
+static rsRetVal closeSocket(wrkrInstanceData_t *pWrkrData);
+static rsRetVal doTryResume(wrkrInstanceData_t *pWrkrData);
+
+static rsRetVal sendMsg(wrkrInstanceData_t *pWrkrData, void *msg, size_t len);
+static rsRetVal receiveMsg(wrkrInstanceData_t *pWrkrData, void *response, size_t len);
+
+const char* get_uuid_object(smsg_t *const pMsg);
+int get_field(smsg_t *const pMsg, const char *pFieldName, char **ppRetString);
+int expand_buffer(dyn_buffer *pBody, size_t new_size);
+int add_field_to_body(dyn_buffer *pBody, const char *field, size_t size);
+int start_new_line(dyn_buffer *pBody);
+int end_body(dyn_buffer *pBody);
+
+/* open socket to remote system
+ */
+static rsRetVal openSocket(wrkrInstanceData_t *pWrkrData)
+{
+ DEFiRet;
+ assert(pWrkrData->sock == INVLD_SOCK);
+
+ if ((pWrkrData->sock = socket(AF_UNIX, SOCK_STREAM, 0)) == -1)
+ {
+ char errStr[1024];
+ int eno = errno;
+ DBGPRINTF("mmdarwin::openSocket:: error %d creating AF_UNIX/SOCK_STREAM: %s.\n",
+ eno, rs_strerror_r(eno, errStr, sizeof(errStr)));
+ pWrkrData->sock = INVLD_SOCK;
+ ABORT_FINALIZE(RS_RET_NO_SOCKET);
+ }
+
+ memset(&pWrkrData->addr, 0, sizeof(struct sockaddr_un));
+ pWrkrData->addr.sun_family = AF_UNIX;
+ strncpy(pWrkrData->addr.sun_path, (char *)pWrkrData->pData->pSockName, sizeof(pWrkrData->addr.sun_path) - 1);
+
+ DBGPRINTF("mmdarwin::openSocket:: connecting to Darwin...\n");
+
+ if (connect(pWrkrData->sock, (struct sockaddr *)&pWrkrData->addr, sizeof(struct sockaddr_un)) == -1)
+ {
+ LogError(errno, RS_RET_NO_SOCKET, "mmdarwin::openSocket:: error connecting to Darwin "
+ "via socket '%s'",
+ pWrkrData->pData->pSockName);
+
+ pWrkrData->sock = INVLD_SOCK;
+ ABORT_FINALIZE(RS_RET_NO_SOCKET);
+ }
+
+ DBGPRINTF("mmdarwin::openSocket:: connected !\n");
+finalize_it:
+ if (iRet != RS_RET_OK)
+ {
+ closeSocket(pWrkrData);
+ }
+ RETiRet;
+}
+
+/* close socket to remote system
+ */
+static rsRetVal closeSocket(wrkrInstanceData_t *pWrkrData)
+{
+ DEFiRet;
+ if (pWrkrData->sock != INVLD_SOCK)
+ {
+ if (close(pWrkrData->sock) != 0)
+ {
+ char errStr[1024];
+ int eno = errno;
+ DBGPRINTF("mmdarwin::closeSocket:: error %d closing the socket: %s.\n",
+ eno, rs_strerror_r(eno, errStr, sizeof(errStr)));
+ }
+ pWrkrData->sock = INVLD_SOCK;
+ }
+ RETiRet;
+}
+
+/* try to resume connection if it is not ready
+ */
+static rsRetVal doTryResume(wrkrInstanceData_t *pWrkrData)
+{
+ DEFiRet;
+
+ DBGPRINTF("mmdarwin::doTryResume:: trying to resume\n");
+ closeSocket(pWrkrData);
+ iRet = openSocket(pWrkrData);
+
+ if (iRet != RS_RET_OK)
+ {
+ iRet = RS_RET_SUSPENDED;
+ }
+
+ RETiRet;
+}
+
+/* send a message via TCP
+ * inspired by rgehards, 2007-12-20
+ */
+static rsRetVal sendMsg(wrkrInstanceData_t *pWrkrData, void *msg, size_t len)
+{
+ DEFiRet;
+
+ DBGPRINTF("mmdarwin::sendMsg:: sending message to Darwin...\n");
+
+ if (pWrkrData->sock == INVLD_SOCK)
+ {
+ CHKiRet(doTryResume(pWrkrData));
+ }
+
+ if (pWrkrData->sock != INVLD_SOCK)
+ {
+ if (send(pWrkrData->sock, msg, len, 0) == -1)
+ {
+ char errStr[1024];
+ DBGPRINTF("mmdarwin::sendData:: error while sending data: error[%d] -> %s\n",
+ errno, rs_strerror_r(errno, errStr, sizeof(errStr)));
+ iRet = RS_RET_SUSPENDED;
+ }
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+/* receive a message via TCP
+ * inspired by rgehards, 2007-12-20
+ */
+static rsRetVal receiveMsg(wrkrInstanceData_t *pWrkrData, void *response, size_t len)
+{
+ DEFiRet;
+
+ DBGPRINTF("mmdarwin::receiveMsg:: receiving message from Darwin...\n");
+
+ if (pWrkrData->sock == INVLD_SOCK)
+ {
+ CHKiRet(doTryResume(pWrkrData));
+ }
+
+ if (pWrkrData->sock != INVLD_SOCK)
+ {
+ if (recv(pWrkrData->sock, response, len, MSG_WAITALL) <= 0)
+ {
+ char errStr[1024];
+ DBGPRINTF("mmdarwin::receiveMsg:: error while receiving data: error[%d] -> %s\n",
+ errno, rs_strerror_r(errno, errStr, sizeof(errStr)));
+ iRet = RS_RET_NONE;
+ }
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+/**
+ * Get the string corresponding to a field supposedly present in the provided message
+ *
+ * params:
+ * - pMsg: a pointer to the rsyslog message where the field should be
+ * - pFieldName: a nul-terminated pointer to string representing the name of the field to search for
+ * - ppRetString: the pointer to contain the potential return string
+ *
+ * return: 1 if a string was put in ppRetString, 0 otherwise
+ *
+ * note: the string placed in ppRetString should be freed by the caller
+ */
+int get_field(smsg_t *const pMsg, const char *pFieldName, char **ppRetString)
+{
+ DBGPRINTF("mmdarwin::get_field:: getting key '%s' in msg\n", pFieldName);
+ struct json_object *pJson = NULL;
+ char *pFieldString = NULL;
+ int retVal = 0;
+
+ msgPropDescr_t propDesc;
+ msgPropDescrFill(&propDesc, (uchar *)pFieldName, strlen(pFieldName));
+ msgGetJSONPropJSONorString(pMsg, &propDesc, &pJson, (uchar **)&pFieldString);
+
+ if (pFieldString)
+ {
+ *ppRetString = pFieldString;
+ DBGPRINTF("mmdarwin::get_field:: got string\n");
+ retVal = 1;
+ }
+ else if (pJson)
+ {
+ pFieldString = (char *)json_object_get_string(pJson);
+ if (pFieldString)
+ {
+ *ppRetString = strdup(pFieldString);
+ retVal = 1;
+ DBGPRINTF("mmdarwin::get_field:: got string from json\n");
+ json_object_put(pJson);
+ }
+ }
+
+ msgPropDescrDestruct(&propDesc);
+ return retVal;
+}
+
+/**
+ * expands the buffer object in the dyn_buffer object
+ *
+ * params:
+ * - pBody: a pointer to the concerned structure to expand
+ * - new_size: the new size to give to the underlying buffer
+ *
+ * return: 0 if the expansion was successful, -1 otherwise
+ */
+int expand_buffer(dyn_buffer *pBody, size_t new_size)
+{
+ /* return error if new_size tries to exceed max defined size */
+ if (new_size > pBody->bufferMaxSize)
+ return -1;
+ while (pBody->bufferAllocSize < new_size)
+ pBody->bufferAllocSize += INITIAL_BUFFER_SIZE;
+
+ DBGPRINTF("mmdarwin::expand_buffer:: expanding buffer to %zu\n", pBody->bufferAllocSize);
+
+ char *tmp = realloc(pBody->buffer, pBody->bufferAllocSize * sizeof(char));
+
+ if (!tmp)
+ {
+ DBGPRINTF("mmdarwin::expand_buffer:: could not resize buffer\n");
+ return -1;
+ }
+
+ pBody->buffer = tmp;
+ return 0;
+}
+
+/**
+ * adds a field to the dyn_buffer buffer
+ *
+ * params:
+ * - pBody: the pointer on the dyn_buffer structure
+ * - field: the potentially not null-terminated string to add as a field to the dyn_buffer
+ * - size: the size of the string (without the '\0' character)
+ *
+ * return: 0 if the field was indeed added to the dyn_buffer, -1 otherwise
+ */
+int add_field_to_body(dyn_buffer *pBody, const char *field, size_t size)
+{
+ /* get required additional size for field, quotes, colon, and \0
+ and potentially also for the beginning of the message structure */
+ int beginning = (pBody->bufferMsgSize == 0) ? 2 : 0;
+ size_t requiredBodySize = pBody->bufferMsgSize + size + 4 + beginning;
+
+ /* resize body buffer if necessary */
+ if (requiredBodySize > pBody->bufferAllocSize)
+ {
+ if (expand_buffer(pBody, requiredBodySize) != 0)
+ {
+ return -1;
+ }
+ }
+
+ /* add message structure beginning if current message is empty */
+ if (!pBody->bufferMsgSize)
+ {
+ pBody->buffer[0] = '[';
+ pBody->buffer[1] = '[';
+ pBody->bufferMsgSize += 2;
+ }
+
+ /* add field with quotes and colon */
+ pBody->buffer[pBody->bufferMsgSize++] = '\"';
+ memcpy((void *)&pBody->buffer[pBody->bufferMsgSize], (const void *)field, size);
+ pBody->bufferMsgSize += size;
+ pBody->buffer[pBody->bufferMsgSize++] = '\"';
+ pBody->buffer[pBody->bufferMsgSize++] = ',';
+
+ return 0;
+}
+
+/**
+ * small helper function to start a new input line (used for bulk-calls) in the dyn_buffer.
+ * will close current line with a ']' and start the next with a '['.
+ * will also remove leading ',' in fields list.
+ *
+ * params:
+ * - pBody: the pointer on the dyn_buffer on which to start a new input line
+ *
+ * return: 0 if successful, -1 otherwise
+ */
+int start_new_line(dyn_buffer *pBody)
+{
+ /* don't if the message is empty */
+ if (!pBody->bufferMsgSize)
+ {
+ return -1;
+ }
+
+ DBGPRINTF("mmdarwin::start_new_line:: starting new line entry in body\n");
+
+ if (pBody->bufferAllocSize < pBody->bufferMsgSize + 2)
+ {
+ if (expand_buffer(pBody, pBody->bufferAllocSize + 2) != 0)
+ {
+ return -1;
+ }
+ }
+
+ pBody->buffer[pBody->bufferMsgSize - 1] = ']';
+ pBody->buffer[pBody->bufferMsgSize++] = ',';
+ pBody->buffer[pBody->bufferMsgSize++] = '[';
+ return 0;
+}
+
+/**
+ * small helper function to close the dyn_buffer structure.
+ * will close the line list with two ']' and will remove the leading ',' in the fields list
+ *
+ * params:
+ * - pBody: the pointer on the dyn_buffer on which to start a new input line
+ *
+ * return: 0 if successful, -1 otherwise
+ */
+int end_body(dyn_buffer *pBody)
+{
+ /* don't if the message is empty */
+ if (!pBody->bufferMsgSize)
+ {
+ return -1;
+ }
+
+ DBGPRINTF("mmdarwin::end_body:: finishing body structure\n");
+
+ if (pBody->bufferAllocSize < pBody->bufferMsgSize + 2)
+ {
+ if (expand_buffer(pBody, pBody->bufferAllocSize + 2) != 0)
+ {
+ return -1;
+ }
+ }
+
+ pBody->buffer[pBody->bufferMsgSize - 1] = ']';
+ pBody->buffer[pBody->bufferMsgSize++] = ']';
+ pBody->buffer[pBody->bufferMsgSize++] = '\0';
+ return 0;
+}
+
+/**
+ * Get the potential existing uuid put by previous mmdarwin call in a json
+ *
+ * params:
+ * - pJson: the pointer on the json
+ *
+ * return: a valid json_object pointer if found, NULL otherwise
+ */
+const char* get_uuid_object(smsg_t *const pMsg) {
+ struct json_object *mmdarwin_object = NULL;
+ const char *result = NULL, *key = NULL;
+
+ msgPropDescr_t propDesc;
+ msgPropDescrFill(&propDesc, (uchar *)runModConf->container, strlen(runModConf->container));
+ msgGetJSONPropJSON(pMsg, &propDesc, &mmdarwin_object);
+
+ if(mmdarwin_object) {
+ struct json_object_iterator it = json_object_iter_begin(mmdarwin_object);
+ struct json_object_iterator itEnd = json_object_iter_end(mmdarwin_object);
+
+ while(!json_object_iter_equal(&it, &itEnd)) {
+ key = json_object_iter_peek_name(&it);
+
+ if(!strcmp(key, JSON_DARWIN_ID)) {
+ // should always be a (non-empty) null-terminated string, safe to use with strdup()
+ result = strdup(json_object_get_string(json_object_iter_peek_value(&it)));
+ break;
+ }
+
+ json_object_iter_next(&it);
+ }
+ json_object_put(mmdarwin_object);
+ }
+
+ msgPropDescrDestruct(&propDesc);
+ return result;
+}
+
+BEGINbeginCnfLoad
+CODESTARTbeginCnfLoad
+ loadModConf = pModConf;
+pModConf->pConf = pConf;
+ENDbeginCnfLoad
+
+BEGINendCnfLoad
+CODESTARTendCnfLoad
+ENDendCnfLoad
+
+BEGINcheckCnf
+CODESTARTcheckCnf
+ENDcheckCnf
+
+BEGINactivateCnf
+CODESTARTactivateCnf
+ runModConf = pModConf;
+ENDactivateCnf
+
+BEGINfreeCnf
+CODESTARTfreeCnf
+ free((void *)pModConf->container);
+ENDfreeCnf
+
+BEGINdbgPrintInstInfo
+CODESTARTdbgPrintInstInfo
+ DBGPRINTF("%s\n", pData->pSockName);
+ENDdbgPrintInstInfo
+
+BEGINcreateInstance
+CODESTARTcreateInstance
+ENDcreateInstance
+
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ pWrkrData->pktSentSocket = 0;
+ pWrkrData->darwinBody.bufferAllocSize = 0;
+ pWrkrData->darwinBody.bufferMaxSize = BUFFER_DEFAULT_MAX_SIZE;
+ pWrkrData->darwinBody.bufferMsgSize = 0;
+ pWrkrData->sock = INVLD_SOCK;
+ENDcreateWrkrInstance
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ENDisCompatibleWithFeature
+
+BEGINfreeInstance
+CODESTARTfreeInstance
+ if (pData->fieldList.name != NULL)
+ {
+ for (int i = 0; i < pData->fieldList.nmemb; ++i)
+ {
+ free(pData->fieldList.name[i]);
+ free(pData->fieldList.varname[i]);
+ }
+ free(pData->fieldList.name);
+ free(pData->fieldList.varname);
+ }
+ free(pData->pUUIDKey);
+ free(pData->pCertitudeKey);
+ free(pData->pSockName);
+ENDfreeInstance
+
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ closeSocket(pWrkrData);
+ free(pWrkrData->darwinBody.buffer);
+ENDfreeWrkrInstance
+
+BEGINsetModCnf
+struct cnfparamvals *pvals = NULL;
+int i;
+CODESTARTsetModCnf
+ loadModConf->container = NULL;
+ pvals = nvlstGetParams(lst, &modpblk, NULL);
+ if (pvals == NULL)
+ {
+ LogError(0, RS_RET_MISSING_CNFPARAMS,
+ "mmdarwin: error processing module config parameters missing [module(...)]");
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+ if (Debug)
+ {
+ DBGPRINTF("mmdarwin::setModCnf:: module (global) param blk for mmdarwin:\n");
+ cnfparamsPrint(&modpblk, pvals);
+ }
+
+ for (i = 0; i < modpblk.nParams; ++i)
+ {
+ if (!pvals[i].bUsed)
+ continue;
+ if (!strcmp(modpblk.descr[i].name, "container"))
+ {
+ loadModConf->container = es_str2cstr(pvals[i].val.d.estr, NULL);
+ if(loadModConf->container[0] != '!' && loadModConf->container[0] != '.') {
+ LogError(0, RS_RET_INVALID_PARAMS, "mmdarwin: container should either"
+ " begin with '!' or '.'\n");
+ ABORT_FINALIZE(RS_RET_INVALID_PARAMS);
+ }
+ }
+ else
+ {
+ DBGPRINTF("mmdarwin::setModCnf:: program error, non-handled "
+ "param '%s'\n",
+ modpblk.descr[i].name);
+ }
+ }
+
+ if (loadModConf->container == NULL)
+ {
+ CHKmalloc(loadModConf->container = strdup(JSON_DEFAULT_CONTAINER));
+ }
+
+finalize_it :
+ if (pvals != NULL)
+ cnfparamvalsDestruct(pvals, &modpblk);
+ENDsetModCnf
+
+static inline void setInstParamDefaults(instanceData *pData)
+{
+ DBGPRINTF("mmdarwin::setInstParamDefaults::\n");
+ pData->pUUIDKey = NULL;
+ pData->pCertitudeKey = NULL;
+ pData->pSockName = NULL;
+ pData->fieldList.nmemb = 0;
+ pData->filterCode = DARWIN_FILTER_CODE_NO;
+ pData->response = DARWIN_RESPONSE_SEND_NO;
+ pData->socketMaxUse = 0;
+ pData->sendPartial = 0;
+}
+
+BEGINnewActInst
+ struct cnfparamvals *pvals;
+ int i;
+CODESTARTnewActInst
+ DBGPRINTF("mmdarwin::newActInst::\n");
+ if ((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL)
+ {
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ CODE_STD_STRING_REQUESTnewActInst(1)
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG));
+ CHKiRet(createInstance(&pData));
+ setInstParamDefaults(pData);
+
+ for (i = 0; i < actpblk.nParams; ++i)
+ {
+ if (!pvals[i].bUsed)
+ continue;
+
+ if (!strcmp(actpblk.descr[i].name, "key"))
+ {
+ char *key = es_str2cstr(pvals[i].val.d.estr, NULL);
+ char vnamebuf[1024];
+ snprintf(vnamebuf, sizeof(vnamebuf), "%s!%s", loadModConf->container, key);
+ CHKmalloc(pData->pCertitudeKey = strdup(vnamebuf));
+ free(key);
+ DBGPRINTF("mmdarwin::newActInst:: certitudeKey is %s\n", pData->pCertitudeKey);
+ }
+ else if (!strcmp(actpblk.descr[i].name, "socketpath"))
+ {
+ pData->pSockName = (uchar *)es_str2cstr(pvals[i].val.d.estr, NULL);
+ DBGPRINTF("mmdarwin::newActInst:: sockName is %s\n", pData->pSockName);
+ }
+ else if (!strcmp(actpblk.descr[i].name, "socket_max_use"))
+ {
+ pData->socketMaxUse = (uint32_t)pvals[i].val.d.n;
+ DBGPRINTF("mmdarwin::newActInst:: socketMaxUse is %d\n", pData->socketMaxUse);
+ }
+ else if (!strcmp(actpblk.descr[i].name, "send_partial"))
+ {
+ pData->sendPartial = (sbool)pvals[i].val.d.n;
+ if (pData->sendPartial)
+ {
+ DBGPRINTF("mmdarwin::newActInst:: sending bodies even if fields are missing\n");
+ }
+ else
+ {
+ DBGPRINTF("mmdarwin::newActInst:: only sending complete bodies\n");
+ }
+ }
+ else if (!strcmp(actpblk.descr[i].name, "response"))
+ {
+ char *response = es_str2cstr(pvals[i].val.d.estr, NULL);
+
+ if (!strcmp(response, "no"))
+ {
+ pData->response = DARWIN_RESPONSE_SEND_NO;
+ DBGPRINTF("mmdarwin::newActInst:: response type is 'no'\n");
+ }
+ else if (!strcmp(response, "back"))
+ {
+ pData->response = DARWIN_RESPONSE_SEND_BACK;
+ DBGPRINTF("mmdarwin::newActInst:: response type is 'back'\n");
+ }
+ else if (!strcmp(response, "darwin"))
+ {
+ pData->response = DARWIN_RESPONSE_SEND_DARWIN;
+ DBGPRINTF("mmdarwin::newActInst:: response type is 'darwin'\n");
+ }
+ else if (!strcmp(response, "both"))
+ {
+ pData->response = DARWIN_RESPONSE_SEND_BOTH;
+ DBGPRINTF("mmdarwin::newActInst:: response type is 'both'\n");
+ }
+ else
+ {
+ DBGPRINTF(
+ "mmdarwin::newActInst:: invalid 'response' value: %s. 'No response' set.\n",
+ response);
+
+ pData->response = DARWIN_RESPONSE_SEND_NO;
+ DBGPRINTF("mmdarwin::newActInst:: response type is 'no'\n");
+ }
+
+ free(response);
+ }
+ else if (!strcmp(actpblk.descr[i].name, "filtercode"))
+ {
+ char *filterCode = es_str2cstr(pvals[i].val.d.estr, NULL);
+ pData->filterCode = strtoull(filterCode, NULL, 16);
+ free(filterCode);
+ }
+ else if (!strcmp(actpblk.descr[i].name, "fields"))
+ {
+ pData->fieldList.nmemb = pvals[i].val.d.ar->nmemb;
+ CHKmalloc(pData->fieldList.name = calloc(pData->fieldList.nmemb, sizeof(char *)));
+ CHKmalloc(pData->fieldList.varname = calloc(pData->fieldList.nmemb, sizeof(char *)));
+
+ for (int j = 0; j < pData->fieldList.nmemb; ++j)
+ {
+ char *const param = es_str2cstr(pvals[i].val.d.ar->arr[j], NULL);
+ char *varname = NULL;
+ char *name;
+ if (*param == ':')
+ {
+ char *b = strchr(param + 1, ':');
+ if (b == NULL)
+ {
+ parser_errmsg(
+ "mmdarwin::newActInst:: missing closing colon: '%s'", param);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+
+ *b = '\0'; /* split name & varname */
+ varname = param + 1;
+ name = b + 1;
+ }
+ else
+ {
+ name = param;
+ }
+ CHKmalloc(pData->fieldList.name[j] = strdup(name));
+ char vnamebuf[1024];
+ snprintf(vnamebuf, sizeof(vnamebuf),
+ "%s!%s", loadModConf->container,
+ (varname == NULL) ? name : varname);
+ CHKmalloc(pData->fieldList.varname[j] = strdup(vnamebuf));
+ free(param);
+ DBGPRINTF("mmdarwin::newActInst:: will look for field %s\n", pData->fieldList.name[j]);
+ }
+ }
+ else
+ {
+ DBGPRINTF(
+ "mmdarwin::newActInst:: program error, non-handled param '%s'\n", actpblk.descr[i].name);
+ }
+ }
+
+ // reserve space for 'container!key\0'
+ size_t sizeKey = strlen(loadModConf->container) + strlen(JSON_DARWIN_ID) + 2;
+ pData->pUUIDKey = malloc(sizeKey);
+ snprintf(pData->pUUIDKey, sizeKey, "%s!%s", loadModConf->container, JSON_DARWIN_ID);
+ DBGPRINTF("mmdarwin:: uuid key is %s\n", pData->pUUIDKey);
+
+CODE_STD_FINALIZERnewActInst
+ cnfparamvalsDestruct(pvals, &actpblk);
+ENDnewActInst
+
+BEGINtryResume
+CODESTARTtryResume
+ iRet = doTryResume(pWrkrData);
+ENDtryResume
+
+BEGINdoAction_NoStrings
+ smsg_t **ppMsg = (smsg_t **)pMsgData; /* the raw data */
+ smsg_t *pMsg = ppMsg[0]; /* the raw log line */
+ instanceData *pData = pWrkrData->pData; /* the parameters given for the plugin */
+ char *pFieldValue = NULL; /* ponter to the found field value */
+ int fieldsNum = 0; /* number of fields retrieved */
+
+CODESTARTdoAction
+ DBGPRINTF("mmdarwin::doAction:: beggining action\n");
+ pWrkrData->darwinBody.bufferMsgSize = 0;
+ fieldsNum = 0;
+
+ for (int i = 0; i < pData->fieldList.nmemb; i++)
+ {
+ DBGPRINTF("mmdarwin::doAction:: processing field '%s'\n", pData->fieldList.name[i]);
+ pFieldValue = NULL;
+
+ /* case 1: static field. We simply forward it to Darwin */
+ if (pData->fieldList.name[i][0] != '!' && pData->fieldList.name[i][0] != '.')
+ {
+ pFieldValue = strdup(pData->fieldList.name[i]);
+ }
+ /* case 2: dynamic field. We retrieve its value from the JSON logline and forward it to
+ * Darwin */
+ else
+ {
+ if (!get_field(pMsg, pData->fieldList.name[i], &pFieldValue))
+ {
+ DBGPRINTF("mmdarwin::doAction:: \
+could not extract field '%s' from message\n", pData->fieldList.name[i]);
+ continue;
+ }
+ }
+
+ DBGPRINTF(
+ "mmdarwin::doAction:: got value of field '%s': '%s'\n", pData->fieldList.name[i], pFieldValue);
+
+ if (add_field_to_body(&(pWrkrData->darwinBody), pFieldValue, strlen(pFieldValue)) != 0)
+ {
+ DBGPRINTF("mmdarwin::doAction:: could not add field to body, aborting\n");
+ free(pFieldValue);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+
+ fieldsNum++;
+ free(pFieldValue);
+ }
+
+ if (fieldsNum)
+ {
+ if (!pData->sendPartial && fieldsNum != pData->fieldList.nmemb)
+ {
+ DBGPRINTF("mmdarwin::doAction:: not all fields could be retrieved, not sending partial message."
+ " (if you wish to send partial messages anyway, set 'send_partial' to 'on' in instance parameters)\n");
+ FINALIZE;
+ }
+ if (end_body(&(pWrkrData->darwinBody)) != 0)
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ else
+ {
+ DBGPRINTF("mmdarwin::doAction:: no fields retrieved, finalizing\n");
+ FINALIZE;
+ }
+
+ DBGPRINTF("mmdarwin::doAction:: body to send: '%s'\n", pWrkrData->darwinBody.buffer);
+
+ if (pData->socketMaxUse)
+ {
+ /* need to rotate socket connections */
+ if (!pWrkrData->pktSentSocket)
+ {
+ DBGPRINTF("mmdarwin::doAction:: opening a new connection\n");
+ CHKiRet(doTryResume(pWrkrData));
+ }
+ pWrkrData->pktSentSocket = (pWrkrData->pktSentSocket + 1) % pData->socketMaxUse;
+ }
+
+ /* the Darwin header to be sent to the filter */
+ darwin_filter_packet_t header = {
+ .type = DARWIN_PACKET_OTHER,
+ .response = pData->response,
+ .filter_code = pData->filterCode,
+ .body_size = pWrkrData->darwinBody.bufferMsgSize};
+
+ const char *uuid = get_uuid_object(pMsg);
+ if(uuid) {
+ DBGPRINTF("mmdarwin: using existing UUID = %s\n", uuid);
+ if(uuid_parse(uuid, header.evt_id))
+ LogError(0, RS_RET_ERR, "mmdarwin:: failed to parse existing UUID: %s\n", uuid);
+ free((void*)uuid);
+ }
+ else {
+ uuid_generate(header.evt_id);
+ char uuidStr[40];
+ uuid_unparse(header.evt_id, uuidStr);
+ DBGPRINTF("mmdarwin: generated new UUID = %s\n", uuidStr);
+ msgAddJSON(pMsg, (uchar *)pData->pUUIDKey, json_object_new_string(uuidStr), 0, 0);
+ }
+
+ DBGPRINTF("mmdarwin::doAction:: sending header to Darwin\n");
+ CHKiRet(sendMsg(pWrkrData, &header, sizeof(darwin_filter_packet_t)));
+
+ DBGPRINTF("mmdarwin::doAction:: sending body to Darwin\n");
+ CHKiRet(sendMsg(pWrkrData, (void *)(pWrkrData->darwinBody.buffer), pWrkrData->darwinBody.bufferMsgSize));
+
+ /* there is no need to wait for a response that will never come */
+ if (pData->response == DARWIN_RESPONSE_SEND_NO || pData->response == DARWIN_RESPONSE_SEND_DARWIN)
+ {
+ DBGPRINTF("mmdarwin::doAction:: no response will be sent back "
+ "(darwin response type is set to 'no' or 'darwin')\n");
+ goto finalize_it;
+ }
+
+ darwin_filter_packet_t response;
+ memset(&response, 0, sizeof(response));
+ DBGPRINTF("mmdarwin::doAction:: receiving from Darwin\n");
+ CHKiRet(receiveMsg(pWrkrData, &response, sizeof(response)));
+
+ unsigned int certitude = response.certitude_list[0];
+ DBGPRINTF("mmdarwin::doAction:: end of the transaction, certitude is %d\n", certitude);
+
+ msgAddJSON(pMsg, (uchar *)pData->pCertitudeKey, json_object_new_int(certitude), 0, 0);
+
+finalize_it :
+ DBGPRINTF("mmdarwin::doAction:: finished processing log line\n");
+
+ENDdoAction
+
+NO_LEGACY_CONF_parseSelectorAct
+
+BEGINmodExit
+CODESTARTmodExit
+ objRelease(glbl, CORE_COMPONENT);
+ENDmodExit
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
+CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
+CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_QUERIES
+ENDqueryEtryPt
+
+BEGINmodInit()
+CODESTARTmodInit
+ /* we only support the current interface specification */
+ *ipIFVersProvided = CURR_MOD_IF_VERSION;
+CODEmodInit_QueryRegCFSLineHdlr
+ DBGPRINTF("mmdarwin::modInit:: module compiled with rsyslog version %s.\n", VERSION);
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
+ENDmodInit