summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/in_collectd
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/plugins/in_collectd')
-rw-r--r--src/fluent-bit/plugins/in_collectd/CMakeLists.txt7
-rw-r--r--src/fluent-bit/plugins/in_collectd/in_collectd.c226
-rw-r--r--src/fluent-bit/plugins/in_collectd/in_collectd.h46
-rw-r--r--src/fluent-bit/plugins/in_collectd/netprot.c308
-rw-r--r--src/fluent-bit/plugins/in_collectd/netprot.h22
-rw-r--r--src/fluent-bit/plugins/in_collectd/typesdb.c223
-rw-r--r--src/fluent-bit/plugins/in_collectd/typesdb.h45
-rw-r--r--src/fluent-bit/plugins/in_collectd/typesdb_parser.c214
-rw-r--r--src/fluent-bit/plugins/in_collectd/typesdb_parser.h20
9 files changed, 1111 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/in_collectd/CMakeLists.txt b/src/fluent-bit/plugins/in_collectd/CMakeLists.txt
new file mode 100644
index 000000000..1f8d64074
--- /dev/null
+++ b/src/fluent-bit/plugins/in_collectd/CMakeLists.txt
@@ -0,0 +1,7 @@
+set(src
+ typesdb.c
+ typesdb_parser.c
+ netprot.c
+ in_collectd.c)
+
+FLB_PLUGIN(in_collectd "${src}" "")
diff --git a/src/fluent-bit/plugins/in_collectd/in_collectd.c b/src/fluent-bit/plugins/in_collectd/in_collectd.c
new file mode 100644
index 000000000..06ef5ae8a
--- /dev/null
+++ b/src/fluent-bit/plugins/in_collectd/in_collectd.c
@@ -0,0 +1,226 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_compat.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_utils.h>
+#include <msgpack.h>
+
+#include "in_collectd.h"
+#include "netprot.h"
+#include "typesdb.h"
+
+/*
+ * Max payload size. By default, Collectd sends up to 1452 bytes
+ * per a UDP packet, but the limit can be increased up to 65535
+ * bytes through a configuration parameter.
+ *
+ * See network_config_set_buffer_size() in collectd/src/network.c.
+ */
+#define BUFFER_SIZE 65535
+
+#define DEFAULT_LISTEN "0.0.0.0"
+#define DEFAULT_PORT 25826
+
+/* This is where most Linux systems places a default TypesDB */
+#define DEFAULT_TYPESDB "/usr/share/collectd/types.db"
+
+static int in_collectd_callback(struct flb_input_instance *i_ins,
+ struct flb_config *config, void *in_context);
+
+static int in_collectd_init(struct flb_input_instance *in,
+ struct flb_config *config, void *data)
+{
+ int ret;
+ struct flb_in_collectd_config *ctx;
+ struct mk_list *tdb;
+ char *listen = DEFAULT_LISTEN;
+ int port = DEFAULT_PORT;
+
+ /* Initialize context */
+ ctx = flb_calloc(1, sizeof(struct flb_in_collectd_config));
+ if (!ctx) {
+ flb_errno();
+ return -1;
+ }
+ ctx->ins = in;
+
+ ctx->bufsize = BUFFER_SIZE;
+ ctx->buf = flb_malloc(ctx->bufsize);
+ if (!ctx->buf) {
+ flb_errno();
+ flb_free(ctx);
+ return -1;
+ }
+
+ /* Load the config map */
+ ret = flb_input_config_map_set(in, (void *)ctx);
+ if (ret == -1) {
+ flb_plg_error(in, "unable to load configuration");
+ flb_free(ctx);
+ return -1;
+ }
+
+ /* Listening address */
+ if (in->host.listen) {
+ listen = in->host.listen;
+ }
+
+ if (strlen(listen) > sizeof(ctx->listen) - 1) {
+ flb_plg_error(ctx->ins, "too long address '%s'", listen);
+ flb_free(ctx);
+ return -1;
+ }
+ strcpy(ctx->listen, listen);
+
+ /* Listening port */
+ if (in->host.port) {
+ port = in->host.port;
+ }
+ snprintf(ctx->port, sizeof(ctx->port), "%hu", (unsigned short) port);
+
+ flb_plg_debug(ctx->ins, "Loading TypesDB from %s", ctx->types_db);
+
+ tdb = typesdb_load_all(ctx, ctx->types_db);
+ if (!tdb) {
+ flb_plg_error(ctx->ins, "failed to load '%s'", ctx->types_db);
+ flb_free(ctx->buf);
+ flb_free(ctx);
+ return -1;
+ }
+ ctx->tdb = tdb;
+
+ /* Set the context */
+ flb_input_set_context(in, ctx);
+
+ ctx->server_fd = flb_net_server_udp(ctx->port, ctx->listen);
+ if (ctx->server_fd < 0) {
+ flb_plg_error(ctx->ins, "failed to bind to %s:%s", ctx->listen,
+ ctx->port);
+ typesdb_destroy(ctx->tdb);
+ flb_free(ctx->buf);
+ flb_free(ctx);
+ return -1;
+ }
+
+ /* Set the collector */
+ ret = flb_input_set_collector_socket(in,
+ in_collectd_callback,
+ ctx->server_fd,
+ config);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "failed set up a collector");
+ flb_socket_close(ctx->server_fd);
+ typesdb_destroy(ctx->tdb);
+ flb_free(ctx->buf);
+ flb_free(ctx);
+ return -1;
+ }
+ ctx->coll_fd = ret;
+
+ ret = flb_log_event_encoder_init(&ctx->log_encoder,
+ FLB_LOG_EVENT_FORMAT_DEFAULT);
+
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ flb_plg_error(ctx->ins, "error initializing event encoder : %d", ret);
+
+ flb_socket_close(ctx->server_fd);
+ typesdb_destroy(ctx->tdb);
+ flb_free(ctx->buf);
+ flb_free(ctx);
+
+ return -1;
+ }
+
+ flb_plg_info(ctx->ins, "start listening to %s:%s",
+ ctx->listen, ctx->port);
+
+ return 0;
+}
+
+static int in_collectd_callback(struct flb_input_instance *i_ins,
+ struct flb_config *config, void *in_context)
+{
+ int len;
+ struct flb_in_collectd_config *ctx = in_context;
+
+ len = recv(ctx->server_fd, ctx->buf, ctx->bufsize, 0);
+ if (len < 0) {
+ flb_errno();
+ return -1;
+ }
+ if (len == 0) {
+ return 0;
+ }
+
+ flb_log_event_encoder_reset(&ctx->log_encoder);
+
+ if (netprot_to_msgpack(ctx->buf, len, ctx->tdb, &ctx->log_encoder)) {
+ flb_plg_error(ctx->ins, "netprot_to_msgpack fails");
+
+ return -1;
+ }
+
+ if (ctx->log_encoder.output_length > 0) {
+ flb_input_log_append(i_ins, NULL, 0,
+ ctx->log_encoder.output_buffer,
+ ctx->log_encoder.output_length);
+ }
+
+ return 0;
+}
+
+static int in_collectd_exit(void *data, struct flb_config *config)
+{
+ struct flb_in_collectd_config *ctx = data;
+
+ flb_log_event_encoder_destroy(&ctx->log_encoder);
+ flb_socket_close(ctx->server_fd);
+ flb_pipe_close(ctx->coll_fd);
+ typesdb_destroy(ctx->tdb);
+ flb_free(ctx->buf);
+ flb_free(ctx);
+
+ return 0;
+}
+
+static struct flb_config_map config_map[] = {
+ {
+ FLB_CONFIG_MAP_STR, "typesdb", DEFAULT_TYPESDB,
+ 0, FLB_TRUE, offsetof(struct flb_in_collectd_config, types_db),
+ "Set the types database filename"
+ },
+ /* EOF */
+ {0}
+};
+
+struct flb_input_plugin in_collectd_plugin = {
+ .name = "collectd",
+ .description = "collectd input plugin",
+ .cb_init = in_collectd_init,
+ .cb_pre_run = NULL,
+ .cb_collect = NULL,
+ .cb_flush_buf = NULL,
+ .cb_pause = NULL,
+ .cb_resume = NULL,
+ .config_map = config_map,
+ .flags = FLB_INPUT_NET_SERVER,
+ .cb_exit = in_collectd_exit
+};
diff --git a/src/fluent-bit/plugins/in_collectd/in_collectd.h b/src/fluent-bit/plugins/in_collectd/in_collectd.h
new file mode 100644
index 000000000..8750c7bf5
--- /dev/null
+++ b/src/fluent-bit/plugins/in_collectd/in_collectd.h
@@ -0,0 +1,46 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_IN_COLLECTD_H
+#define FLB_IN_COLLECTD_H
+
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_log_event_encoder.h>
+
+struct flb_in_collectd_config {
+ char *buf;
+ int bufsize;
+
+ /* Server */
+ char listen[256]; /* RFC-2181 */
+ char port[6]; /* RFC-793 */
+
+ /* Sockets */
+ flb_sockfd_t server_fd;
+ flb_pipefd_t coll_fd;
+
+ flb_sds_t types_db;
+ struct mk_list *tdb;
+ struct flb_log_event_encoder log_encoder;
+
+ /* Plugin input instance */
+ struct flb_input_instance *ins;
+};
+
+#endif
diff --git a/src/fluent-bit/plugins/in_collectd/netprot.c b/src/fluent-bit/plugins/in_collectd/netprot.c
new file mode 100644
index 000000000..005db2edb
--- /dev/null
+++ b/src/fluent-bit/plugins/in_collectd/netprot.c
@@ -0,0 +1,308 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * This module implements the binary network protocol of collectd.
+ * (https://collectd.org/wiki/index.php/Binary_protocol)
+ *
+ * The only interface you need to care is netprot_to_msgpack() that
+ * parses a UDP packet and converts it into MessagePack format.
+ */
+
+#include <fluent-bit/flb_compat.h>
+#include <fluent-bit/flb_log.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_endian.h>
+#include <fluent-bit/flb_log_event_encoder.h>
+#include <msgpack.h>
+#include "netprot.h"
+#include "typesdb.h"
+
+#define be16read(x) (be16toh(*(uint16_t *) (x)))
+#define be32read(x) (be32toh(*(uint32_t *) (x)))
+#define be64read(x) (be64toh(*(uint64_t *) (x)))
+
+#define le16read(x) (le16toh(*(uint16_t *) (x)))
+#define le32read(x) (le32toh(*(uint32_t *) (x)))
+#define le64read(x) (le64toh(*(uint64_t *) (x)))
+
+/* Convert a high-resolution time into a normal UNIX time. */
+#define hr2time(x) ((double) (x) / 1073741824)
+
+/* Basic data field definitions for collectd */
+#define PART_HOST 0x0000
+#define PART_TIME 0x0001
+#define PART_PLUGIN 0x0002
+#define PART_PLUGIN_INSTANCE 0x0003
+#define PART_TYPE 0x0004
+#define PART_TYPE_INSTANCE 0x0005
+#define PART_VALUE 0x0006
+#define PART_INTERVAL 0x0007
+
+#define PART_TIME_HR 0x0008
+#define PART_INTERVAL_HR 0x0009
+
+/*
+ * The "DS_TYPE_*" are value types for PART_VALUE fields.
+ *
+ * Read https://collectd.org/wiki/index.php/Data_source for what
+ * these types mean.
+ */
+#define DS_TYPE_COUNTER 0
+#define DS_TYPE_GAUGE 1
+#define DS_TYPE_DERIVE 2
+#define DS_TYPE_ABSOLUTE 3
+
+struct netprot_header
+{
+ double time;
+ double interval;
+ char *host;
+ char *plugin;
+ char *plugin_instance;
+ char *type;
+ char *type_instance;
+};
+
+static int netprot_pack_value(char *ptr, int size, struct netprot_header *hdr,
+ struct mk_list *tdb,
+ struct flb_log_event_encoder *encoder)
+{
+ int i;
+ char type;
+ char *pval;
+ uint16_t count;
+ struct typesdb_node *node;
+ int result;
+
+ if (hdr->type == NULL) {
+ flb_error("[in_collectd] invalid data (type is NULL)");
+ return -1;
+ }
+
+ /*
+ * Since each value uses (1 + 8) bytes, the total buffer size must
+ * be 2-byte header plus <count * 9> bytes.
+ */
+ count = be16read(ptr);
+ if (size != 2 + count * 9) {
+ flb_error("[in_collectd] data corrupted (size=%i, count=%i)",
+ size, count);
+ return -1;
+ }
+
+ /*
+ * We need to query against TypesDB in order to get field names
+ * for the data set values.
+ */
+ node = typesdb_find_node(tdb, hdr->type);
+ if (!node) {
+ flb_error("[in_collectd] no such type found '%s'", hdr->type);
+ return -1;
+ }
+ if (node->count != count) {
+ flb_error("[in_collectd] invalid value for '%s' (%i != %i)",
+ hdr->type, node->count, count);
+ return -1;
+ }
+
+ result = flb_log_event_encoder_begin_record(encoder);
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_set_current_timestamp(encoder);
+ }
+
+ if (hdr->type != NULL &&
+ result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_append_body_values(
+ encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE("type"),
+ FLB_LOG_EVENT_CSTRING_VALUE(hdr->type));
+ }
+
+ if (hdr->type_instance != NULL &&
+ result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_append_body_values(
+ encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE("type_instance"),
+ FLB_LOG_EVENT_CSTRING_VALUE(hdr->type_instance));
+ }
+
+ if (hdr->time > 0 &&
+ result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_append_body_values(
+ encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE("time"),
+ FLB_LOG_EVENT_DOUBLE_VALUE(hdr->time));
+ }
+
+ if (hdr->interval > 0 &&
+ result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_append_body_values(
+ encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE("interval"),
+ FLB_LOG_EVENT_DOUBLE_VALUE(hdr->interval));
+ }
+
+ if (hdr->plugin != NULL &&
+ result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_append_body_values(
+ encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE("plugin"),
+ FLB_LOG_EVENT_CSTRING_VALUE(hdr->plugin));
+ }
+
+ if (hdr->plugin_instance != NULL &&
+ result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_append_body_values(
+ encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE("plugin_instance"),
+ FLB_LOG_EVENT_CSTRING_VALUE(hdr->plugin_instance));
+ }
+
+ if (hdr->host != NULL &&
+ result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_append_body_values(
+ encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE("host"),
+ FLB_LOG_EVENT_CSTRING_VALUE(hdr->host));
+ }
+
+ for (i = 0; i < count && result == FLB_EVENT_ENCODER_SUCCESS ; i++) {
+ pval = ptr + 2 + count + 8 * i;
+ type = ptr[2 + i];
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_append_body_cstring(
+ encoder, node->fields[i]);
+ }
+
+ switch (type) {
+ case DS_TYPE_COUNTER:
+ result = flb_log_event_encoder_append_body_uint64(
+ encoder, be64read(pval));
+ break;
+ case DS_TYPE_GAUGE:
+ result = flb_log_event_encoder_append_body_double(
+ encoder, *((double *) pval));
+ break;
+ case DS_TYPE_DERIVE:
+ result = flb_log_event_encoder_append_body_int64(
+ encoder, (int64_t) be64read(pval));
+ break;
+ case DS_TYPE_ABSOLUTE:
+ result = flb_log_event_encoder_append_body_uint64(
+ encoder, be64read(pval));
+ break;
+ default:
+ flb_error("[in_collectd] unknown data type %i", type);
+
+ result = FLB_EVENT_ENCODER_ERROR_INVALID_ARGUMENT;
+ }
+ }
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_commit_record(encoder);
+ }
+ else {
+ flb_log_event_encoder_rollback_record(encoder);
+ }
+
+ if (result != FLB_EVENT_ENCODER_SUCCESS) {
+ return -1;
+ }
+
+ return 0;
+}
+
+/*
+ * Entry point function
+ */
+int netprot_to_msgpack(char *buf, int len, struct mk_list *tdb,
+ struct flb_log_event_encoder *encoder)
+{
+ uint16_t part_type;
+ uint16_t part_len;
+ int size;
+ char *ptr;
+ struct netprot_header hdr = {0};
+
+ while (len >= 4) {
+ part_type = be16read(buf);
+ part_len = be16read(buf + 2);
+
+ if (len < part_len) {
+ flb_error("[in_collectd] data truncated (%i < %i)", len, part_len);
+ return -1;
+ }
+ ptr = buf + 4;
+ size = part_len - 4;
+
+ switch (part_type) {
+ case PART_HOST:
+ if (ptr[size] == '\0') {
+ hdr.host = ptr;
+ }
+ break;
+ case PART_TIME:
+ hdr.time = (double) be64read(ptr);
+ break;
+ case PART_TIME_HR:
+ hdr.time = hr2time(be64read(ptr));
+ break;
+ case PART_PLUGIN:
+ if (ptr[size] == '\0') {
+ hdr.plugin = ptr;
+ }
+ break;
+ case PART_PLUGIN_INSTANCE:
+ if (ptr[size] == '\0') {
+ hdr.plugin_instance = ptr;
+ }
+ break;
+ case PART_TYPE:
+ if (ptr[size] == '\0') {
+ hdr.type = ptr;
+ }
+ break;
+ case PART_TYPE_INSTANCE:
+ if (ptr[size] == '\0') {
+ hdr.type_instance = ptr;
+ }
+ break;
+ case PART_VALUE:
+ if (netprot_pack_value(ptr, size, &hdr, tdb, encoder)) {
+ return -1;
+ }
+ break;
+ case PART_INTERVAL:
+ hdr.interval = (double) be64read(ptr);
+ break;
+ case PART_INTERVAL_HR:
+ hdr.interval = hr2time(be64read(ptr));
+ break;
+ default:
+ flb_debug("[in_collectd] skip unknown type %x", part_type);
+ break;
+ }
+ len -= part_len;
+ buf += part_len;
+ }
+ return 0;
+}
diff --git a/src/fluent-bit/plugins/in_collectd/netprot.h b/src/fluent-bit/plugins/in_collectd/netprot.h
new file mode 100644
index 000000000..c9292640c
--- /dev/null
+++ b/src/fluent-bit/plugins/in_collectd/netprot.h
@@ -0,0 +1,22 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* Convert a binary buffer into MessagePack format */
+int netprot_to_msgpack(char *buf, int len, struct mk_list *tdb,
+ struct flb_log_event_encoder *encoder);
diff --git a/src/fluent-bit/plugins/in_collectd/typesdb.c b/src/fluent-bit/plugins/in_collectd/typesdb.c
new file mode 100644
index 000000000..ef579583c
--- /dev/null
+++ b/src/fluent-bit/plugins/in_collectd/typesdb.c
@@ -0,0 +1,223 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_compat.h>
+#include <fluent-bit/flb_log.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_str.h>
+
+#include "in_collectd.h"
+#include "typesdb.h"
+#include "typesdb_parser.h"
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+
+/* Internal function to load from a single TypesDB */
+static int typesdb_load(struct flb_in_collectd_config *ctx,
+ struct mk_list *tdb, const char *path)
+{
+ int fd = open(path, O_RDONLY);
+ if (fd < 0) {
+ flb_errno();
+ flb_plg_error(ctx->ins, "failed to open '%s'", path);
+ return -1;
+ }
+
+ if (typesdb_parse(tdb, fd)) {
+ flb_plg_error(ctx->ins, "failed to parse '%s'", path);
+ close(fd);
+ return -1;
+ }
+ close(fd);
+ return 0;
+}
+
+/*
+ * Load multiple TypesDB files at once. The return value is
+ * a linked list of typesdb_node objects.
+ *
+ * "paths" is a comma-separated list of file names.
+ */
+struct mk_list *typesdb_load_all(struct flb_in_collectd_config *ctx,
+ const char *paths)
+{
+ char *buf;
+ char *state;
+ char *path;
+ struct mk_list *tdb;
+
+ buf = flb_strdup(paths);
+ if (!buf) {
+ flb_errno();
+ return NULL;
+ }
+
+ tdb = flb_malloc(sizeof(struct mk_list));
+ if (!tdb) {
+ flb_errno();
+ flb_free(buf);
+ return NULL;
+ }
+ mk_list_init(tdb);
+
+ path = strtok_r(buf, ",", &state);
+ while (path) {
+ if (typesdb_load(ctx, tdb, path)) {
+ typesdb_destroy(tdb);
+ flb_free(buf);
+ return NULL;
+ }
+ path = strtok_r(NULL, ",", &state);
+ }
+ flb_free(buf);
+ return tdb;
+}
+
+void typesdb_destroy(struct mk_list *tdb)
+{
+ struct typesdb_node *node;
+ struct mk_list *head;
+ struct mk_list *tmp;
+
+ mk_list_foreach_safe(head, tmp, tdb) {
+ node = mk_list_entry(head, struct typesdb_node, _head);
+ typesdb_destroy_node(node);
+ }
+ flb_free(tdb);
+}
+
+struct typesdb_node *typesdb_find_node(struct mk_list *tdb, const char *type)
+{
+ struct typesdb_node *node;
+ struct mk_list *head;
+
+ if (type == NULL) {
+ return NULL;
+ }
+
+ /*
+ * Search the linked list from the tail so that later entries
+ * take precedence over earlier ones.
+ */
+ mk_list_foreach_r(head, tdb) {
+ node = mk_list_entry(head, struct typesdb_node, _head);
+ if (strcmp(node->type, type) == 0) {
+ return node;
+ }
+ }
+ return NULL;
+}
+
+struct typesdb_node *typesdb_last_node(struct mk_list *tdb)
+{
+ return mk_list_entry_last(tdb, struct typesdb_node, _head);
+}
+
+/*
+ * The folloings are API functions to modify a TypesDB instance.
+ */
+int typesdb_add_node(struct mk_list *tdb, const char *type)
+{
+ struct typesdb_node *node;
+
+ node = flb_calloc(1, sizeof(struct typesdb_node));
+ if (!node) {
+ flb_errno();
+ return -1;
+ }
+
+ node->type = flb_strdup(type);
+ if (!node->type) {
+ flb_errno();
+ flb_free(node);
+ return -1;
+ }
+
+ mk_list_add(&node->_head, tdb);
+ return 0;
+}
+
+void typesdb_destroy_node(struct typesdb_node *node)
+{
+ int i;
+
+ flb_free(node->type);
+
+ if (node->fields) {
+ for (i = 0; i < node->count; i++) {
+ flb_free(node->fields[i]);
+ }
+ flb_free(node->fields);
+ }
+ mk_list_del(&node->_head);
+ flb_free(node);
+}
+
+int typesdb_add_field(struct typesdb_node *node, const char *field)
+{
+ char *end;
+ int alloc;
+ char **fields;
+
+ end = strchr(field, ':');
+ if (!end) {
+ return -1;
+ }
+
+ if (node->count >= node->alloc) {
+ alloc = node->alloc > 0 ? node->alloc * 2 : 1;
+ fields = flb_realloc(node->fields, alloc * sizeof(char *));
+ if (!fields) {
+ flb_errno();
+ return -1;
+ }
+ node->alloc = alloc;
+ node->fields = fields;
+ }
+
+ node->fields[node->count] = flb_strndup(field, end - field);
+ if (!node->fields[node->count]) {
+ flb_errno();
+ return -1;
+ }
+ node->count++;
+ return 0;
+}
+
+/* A debug function to see the content of TypesDB */
+void typesdb_dump(struct mk_list *tdb)
+{
+ struct mk_list *head;
+ struct typesdb_node *node;
+ int i;
+
+ mk_list_foreach(head, tdb) {
+ node = mk_list_entry(head, struct typesdb_node, _head);
+
+ printf("%s", node->type);
+ for (i = 0; i < node->count; i++) {
+ printf("\t%s", node->fields[i]);
+ }
+ putchar('\n');
+ }
+}
diff --git a/src/fluent-bit/plugins/in_collectd/typesdb.h b/src/fluent-bit/plugins/in_collectd/typesdb.h
new file mode 100644
index 000000000..7da131be0
--- /dev/null
+++ b/src/fluent-bit/plugins/in_collectd/typesdb.h
@@ -0,0 +1,45 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "in_collectd.h"
+
+struct typesdb_node {
+ char *type;
+ int alloc;
+ int count;
+ char **fields;
+ struct mk_list _head;
+};
+
+/* Load and destroy TypesDB */
+struct mk_list *typesdb_load_all(struct flb_in_collectd_config *ctx,
+ const char *paths);
+void typesdb_destroy(struct mk_list *tdb);
+
+/* Find a node in TypesDB */
+struct typesdb_node *typesdb_find_node(struct mk_list *tdb, const char *type);
+struct typesdb_node *typesdb_last_node(struct mk_list *tdb);
+
+/* Modify a TypesDB instance (used in typesdb_parser.c) */
+int typesdb_add_node(struct mk_list *tdb, const char *type);
+void typesdb_destroy_node(struct typesdb_node *node);
+int typesdb_add_field(struct typesdb_node *node, const char *field);
+
+/* For debugging */
+void typesdb_dump(struct mk_list *tdb);
diff --git a/src/fluent-bit/plugins/in_collectd/typesdb_parser.c b/src/fluent-bit/plugins/in_collectd/typesdb_parser.c
new file mode 100644
index 000000000..5e237ffaf
--- /dev/null
+++ b/src/fluent-bit/plugins/in_collectd/typesdb_parser.c
@@ -0,0 +1,214 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * This file implements a collectd 5.x compatible parser for types.db(5).
+ *
+ * Note: it internally implements a finite state machine that consumes a
+ * single char at once, and pushes parsed tokens via typesdb_* methods.
+ */
+
+#include <fluent-bit/flb_compat.h>
+#include <fluent-bit/flb_log.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_str.h>
+
+#include "typesdb.h"
+#include "typesdb_parser.h"
+
+#define TDB_INVALID -1
+#define TDB_INIT 0
+#define TDB_LEFT 1
+#define TDB_SEP 2
+#define TDB_RIGHT 3
+#define TDB_RIGHT_SEP 4
+#define TDB_COMMENT 5
+
+/* See collectd/src/daemon/types_list.c */
+#define MAX_LINE_SIZE 4096
+
+/*
+ * tdb_* are state functions that take a single character as input.
+ * They do some action based on the input and return the next state.
+ */
+static int tdb_init(char c, struct mk_list *tdb, char *buf)
+{
+ switch (c) {
+ case '#':
+ return TDB_COMMENT;
+ case '\r':
+ case '\n':
+ return TDB_INIT;
+ default:
+ buf[0] = c;
+ buf[1] = '\0';
+ return TDB_LEFT;
+ }
+}
+
+static int tdb_left(char c, struct mk_list *tdb, char *buf)
+{
+ int len;
+
+ switch (c) {
+ case ' ':
+ if (typesdb_add_node(tdb, buf)) {
+ return TDB_INVALID;
+ }
+ return TDB_SEP;
+ case '\r':
+ case '\n':
+ return TDB_INVALID;
+ default:
+ len = strlen(buf);
+ if (len >= MAX_LINE_SIZE - 1) {
+ return TDB_INVALID;
+ }
+ buf[len] = c;
+ buf[++len] = '\0';
+ return TDB_LEFT;
+ }
+}
+
+static int tdb_sep(char c, struct mk_list *tdb, char *buf)
+{
+ switch (c) {
+ case ' ':
+ return TDB_SEP;
+ case '\r':
+ case '\n':
+ return TDB_INVALID;
+ default:
+ buf[0] = c;
+ buf[1] = '\0';
+ return TDB_RIGHT;
+ }
+}
+
+static int tdb_right(char c, struct mk_list *tdb, char *buf)
+{
+ int len;
+ struct typesdb_node *node = typesdb_last_node(tdb);
+
+ switch (c) {
+ case ' ':
+ case ',':
+ if (typesdb_add_field(node, buf)) {
+ flb_error("[in_collectd] cannot add value '%s'", buf);
+ return TDB_INVALID;
+ }
+ return TDB_RIGHT_SEP;
+ case '\r':
+ case '\n':
+ if (typesdb_add_field(node, buf)) {
+ flb_error("[in_collectd] cannot add value '%s'", buf);
+ return TDB_INVALID;
+ }
+ return TDB_INIT;
+ default:
+ len = strlen(buf);
+ if (len >= MAX_LINE_SIZE - 1) {
+ flb_error("[in_collectd] line too long > %i", MAX_LINE_SIZE);
+ return TDB_INVALID;
+ }
+ buf[len] = c;
+ buf[++len] = '\0';
+ return TDB_RIGHT;
+ }
+}
+
+static int tdb_right_sep(char c, struct mk_list *tdb, char *buf)
+{
+ switch (c) {
+ case ' ':
+ case ',':
+ return TDB_RIGHT_SEP;
+ case '\r':
+ case '\n':
+ return TDB_INIT;
+ default:
+ buf[0] = c;
+ buf[1] = '\0';
+ return TDB_RIGHT;
+ }
+}
+
+static int tdb_comment(char c, struct mk_list *tdb, char *buf)
+{
+ switch (c) {
+ case '\r':
+ case '\n':
+ return TDB_INIT;
+ default:
+ return TDB_COMMENT;
+ }
+}
+
+/*
+ * Entry point function
+ */
+int typesdb_parse(struct mk_list *tdb, int fp)
+{
+ char tmp[1024];
+ char buf[MAX_LINE_SIZE];
+ char c;
+ int i;
+ int bytes;
+ int state = TDB_INIT;
+
+ while (1) {
+ bytes = read(fp, tmp, 1024);
+ if (bytes < 0) {
+ flb_errno();
+ return bytes;
+ }
+ if (bytes == 0) {
+ return 0;
+ }
+ for (i = 0; i < bytes; i++) {
+ c = tmp[i];
+ switch (state) {
+ case TDB_INVALID:
+ return -1;
+ case TDB_INIT:
+ state = tdb_init(c, tdb, buf);
+ break;
+ case TDB_LEFT:
+ state = tdb_left(c, tdb, buf);
+ break;
+ case TDB_SEP:
+ state = tdb_sep(c, tdb, buf);
+ break;
+ case TDB_RIGHT:
+ state = tdb_right(c, tdb, buf);
+ break;
+ case TDB_RIGHT_SEP:
+ state = tdb_right_sep(c, tdb, buf);
+ break;
+ case TDB_COMMENT:
+ state = tdb_comment(c, tdb, buf);
+ break;
+ default:
+ flb_error("[in_collectd] unknown state %i", state);
+ return -1;
+ }
+ }
+ }
+ return 0;
+}
diff --git a/src/fluent-bit/plugins/in_collectd/typesdb_parser.h b/src/fluent-bit/plugins/in_collectd/typesdb_parser.h
new file mode 100644
index 000000000..985570134
--- /dev/null
+++ b/src/fluent-bit/plugins/in_collectd/typesdb_parser.h
@@ -0,0 +1,20 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+int typesdb_parse(struct mk_list *tdb, int fp);