diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:19:48 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:20:02 +0000 |
commit | 58daab21cd043e1dc37024a7f99b396788372918 (patch) | |
tree | 96771e43bb69f7c1c2b0b4f7374cb74d7866d0cb /fluent-bit/plugins/in_collectd | |
parent | Releasing debian version 1.43.2-1. (diff) | |
download | netdata-58daab21cd043e1dc37024a7f99b396788372918.tar.xz netdata-58daab21cd043e1dc37024a7f99b396788372918.zip |
Merging upstream version 1.44.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/plugins/in_collectd')
-rw-r--r-- | fluent-bit/plugins/in_collectd/CMakeLists.txt | 7 | ||||
-rw-r--r-- | fluent-bit/plugins/in_collectd/in_collectd.c | 226 | ||||
-rw-r--r-- | fluent-bit/plugins/in_collectd/in_collectd.h | 46 | ||||
-rw-r--r-- | fluent-bit/plugins/in_collectd/netprot.c | 308 | ||||
-rw-r--r-- | fluent-bit/plugins/in_collectd/netprot.h | 22 | ||||
-rw-r--r-- | fluent-bit/plugins/in_collectd/typesdb.c | 223 | ||||
-rw-r--r-- | fluent-bit/plugins/in_collectd/typesdb.h | 45 | ||||
-rw-r--r-- | fluent-bit/plugins/in_collectd/typesdb_parser.c | 214 | ||||
-rw-r--r-- | fluent-bit/plugins/in_collectd/typesdb_parser.h | 20 |
9 files changed, 1111 insertions, 0 deletions
diff --git a/fluent-bit/plugins/in_collectd/CMakeLists.txt b/fluent-bit/plugins/in_collectd/CMakeLists.txt new file mode 100644 index 000000000..1f8d64074 --- /dev/null +++ b/fluent-bit/plugins/in_collectd/CMakeLists.txt @@ -0,0 +1,7 @@ +set(src + typesdb.c + typesdb_parser.c + netprot.c + in_collectd.c) + +FLB_PLUGIN(in_collectd "${src}" "") diff --git a/fluent-bit/plugins/in_collectd/in_collectd.c b/fluent-bit/plugins/in_collectd/in_collectd.c new file mode 100644 index 000000000..06ef5ae8a --- /dev/null +++ b/fluent-bit/plugins/in_collectd/in_collectd.c @@ -0,0 +1,226 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_compat.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_utils.h> +#include <msgpack.h> + +#include "in_collectd.h" +#include "netprot.h" +#include "typesdb.h" + +/* + * Max payload size. By default, Collectd sends up to 1452 bytes + * per a UDP packet, but the limit can be increased up to 65535 + * bytes through a configuration parameter. + * + * See network_config_set_buffer_size() in collectd/src/network.c. + */ +#define BUFFER_SIZE 65535 + +#define DEFAULT_LISTEN "0.0.0.0" +#define DEFAULT_PORT 25826 + +/* This is where most Linux systems places a default TypesDB */ +#define DEFAULT_TYPESDB "/usr/share/collectd/types.db" + +static int in_collectd_callback(struct flb_input_instance *i_ins, + struct flb_config *config, void *in_context); + +static int in_collectd_init(struct flb_input_instance *in, + struct flb_config *config, void *data) +{ + int ret; + struct flb_in_collectd_config *ctx; + struct mk_list *tdb; + char *listen = DEFAULT_LISTEN; + int port = DEFAULT_PORT; + + /* Initialize context */ + ctx = flb_calloc(1, sizeof(struct flb_in_collectd_config)); + if (!ctx) { + flb_errno(); + return -1; + } + ctx->ins = in; + + ctx->bufsize = BUFFER_SIZE; + ctx->buf = flb_malloc(ctx->bufsize); + if (!ctx->buf) { + flb_errno(); + flb_free(ctx); + return -1; + } + + /* Load the config map */ + ret = flb_input_config_map_set(in, (void *)ctx); + if (ret == -1) { + flb_plg_error(in, "unable to load configuration"); + flb_free(ctx); + return -1; + } + + /* Listening address */ + if (in->host.listen) { + listen = in->host.listen; + } + + if (strlen(listen) > sizeof(ctx->listen) - 1) { + flb_plg_error(ctx->ins, "too long address '%s'", listen); + flb_free(ctx); + return -1; + } + strcpy(ctx->listen, listen); + + /* Listening port */ + if (in->host.port) { + port = in->host.port; + } + snprintf(ctx->port, sizeof(ctx->port), "%hu", (unsigned short) port); + + flb_plg_debug(ctx->ins, "Loading TypesDB from %s", ctx->types_db); + + tdb = typesdb_load_all(ctx, ctx->types_db); + if (!tdb) { + flb_plg_error(ctx->ins, "failed to load '%s'", ctx->types_db); + flb_free(ctx->buf); + flb_free(ctx); + return -1; + } + ctx->tdb = tdb; + + /* Set the context */ + flb_input_set_context(in, ctx); + + ctx->server_fd = flb_net_server_udp(ctx->port, ctx->listen); + if (ctx->server_fd < 0) { + flb_plg_error(ctx->ins, "failed to bind to %s:%s", ctx->listen, + ctx->port); + typesdb_destroy(ctx->tdb); + flb_free(ctx->buf); + flb_free(ctx); + return -1; + } + + /* Set the collector */ + ret = flb_input_set_collector_socket(in, + in_collectd_callback, + ctx->server_fd, + config); + if (ret == -1) { + flb_plg_error(ctx->ins, "failed set up a collector"); + flb_socket_close(ctx->server_fd); + typesdb_destroy(ctx->tdb); + flb_free(ctx->buf); + flb_free(ctx); + return -1; + } + ctx->coll_fd = ret; + + ret = flb_log_event_encoder_init(&ctx->log_encoder, + FLB_LOG_EVENT_FORMAT_DEFAULT); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_error(ctx->ins, "error initializing event encoder : %d", ret); + + flb_socket_close(ctx->server_fd); + typesdb_destroy(ctx->tdb); + flb_free(ctx->buf); + flb_free(ctx); + + return -1; + } + + flb_plg_info(ctx->ins, "start listening to %s:%s", + ctx->listen, ctx->port); + + return 0; +} + +static int in_collectd_callback(struct flb_input_instance *i_ins, + struct flb_config *config, void *in_context) +{ + int len; + struct flb_in_collectd_config *ctx = in_context; + + len = recv(ctx->server_fd, ctx->buf, ctx->bufsize, 0); + if (len < 0) { + flb_errno(); + return -1; + } + if (len == 0) { + return 0; + } + + flb_log_event_encoder_reset(&ctx->log_encoder); + + if (netprot_to_msgpack(ctx->buf, len, ctx->tdb, &ctx->log_encoder)) { + flb_plg_error(ctx->ins, "netprot_to_msgpack fails"); + + return -1; + } + + if (ctx->log_encoder.output_length > 0) { + flb_input_log_append(i_ins, NULL, 0, + ctx->log_encoder.output_buffer, + ctx->log_encoder.output_length); + } + + return 0; +} + +static int in_collectd_exit(void *data, struct flb_config *config) +{ + struct flb_in_collectd_config *ctx = data; + + flb_log_event_encoder_destroy(&ctx->log_encoder); + flb_socket_close(ctx->server_fd); + flb_pipe_close(ctx->coll_fd); + typesdb_destroy(ctx->tdb); + flb_free(ctx->buf); + flb_free(ctx); + + return 0; +} + +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_STR, "typesdb", DEFAULT_TYPESDB, + 0, FLB_TRUE, offsetof(struct flb_in_collectd_config, types_db), + "Set the types database filename" + }, + /* EOF */ + {0} +}; + +struct flb_input_plugin in_collectd_plugin = { + .name = "collectd", + .description = "collectd input plugin", + .cb_init = in_collectd_init, + .cb_pre_run = NULL, + .cb_collect = NULL, + .cb_flush_buf = NULL, + .cb_pause = NULL, + .cb_resume = NULL, + .config_map = config_map, + .flags = FLB_INPUT_NET_SERVER, + .cb_exit = in_collectd_exit +}; diff --git a/fluent-bit/plugins/in_collectd/in_collectd.h b/fluent-bit/plugins/in_collectd/in_collectd.h new file mode 100644 index 000000000..8750c7bf5 --- /dev/null +++ b/fluent-bit/plugins/in_collectd/in_collectd.h @@ -0,0 +1,46 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_COLLECTD_H +#define FLB_IN_COLLECTD_H + +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_log_event_encoder.h> + +struct flb_in_collectd_config { + char *buf; + int bufsize; + + /* Server */ + char listen[256]; /* RFC-2181 */ + char port[6]; /* RFC-793 */ + + /* Sockets */ + flb_sockfd_t server_fd; + flb_pipefd_t coll_fd; + + flb_sds_t types_db; + struct mk_list *tdb; + struct flb_log_event_encoder log_encoder; + + /* Plugin input instance */ + struct flb_input_instance *ins; +}; + +#endif diff --git a/fluent-bit/plugins/in_collectd/netprot.c b/fluent-bit/plugins/in_collectd/netprot.c new file mode 100644 index 000000000..005db2edb --- /dev/null +++ b/fluent-bit/plugins/in_collectd/netprot.c @@ -0,0 +1,308 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * This module implements the binary network protocol of collectd. + * (https://collectd.org/wiki/index.php/Binary_protocol) + * + * The only interface you need to care is netprot_to_msgpack() that + * parses a UDP packet and converts it into MessagePack format. + */ + +#include <fluent-bit/flb_compat.h> +#include <fluent-bit/flb_log.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_endian.h> +#include <fluent-bit/flb_log_event_encoder.h> +#include <msgpack.h> +#include "netprot.h" +#include "typesdb.h" + +#define be16read(x) (be16toh(*(uint16_t *) (x))) +#define be32read(x) (be32toh(*(uint32_t *) (x))) +#define be64read(x) (be64toh(*(uint64_t *) (x))) + +#define le16read(x) (le16toh(*(uint16_t *) (x))) +#define le32read(x) (le32toh(*(uint32_t *) (x))) +#define le64read(x) (le64toh(*(uint64_t *) (x))) + +/* Convert a high-resolution time into a normal UNIX time. */ +#define hr2time(x) ((double) (x) / 1073741824) + +/* Basic data field definitions for collectd */ +#define PART_HOST 0x0000 +#define PART_TIME 0x0001 +#define PART_PLUGIN 0x0002 +#define PART_PLUGIN_INSTANCE 0x0003 +#define PART_TYPE 0x0004 +#define PART_TYPE_INSTANCE 0x0005 +#define PART_VALUE 0x0006 +#define PART_INTERVAL 0x0007 + +#define PART_TIME_HR 0x0008 +#define PART_INTERVAL_HR 0x0009 + +/* + * The "DS_TYPE_*" are value types for PART_VALUE fields. + * + * Read https://collectd.org/wiki/index.php/Data_source for what + * these types mean. + */ +#define DS_TYPE_COUNTER 0 +#define DS_TYPE_GAUGE 1 +#define DS_TYPE_DERIVE 2 +#define DS_TYPE_ABSOLUTE 3 + +struct netprot_header +{ + double time; + double interval; + char *host; + char *plugin; + char *plugin_instance; + char *type; + char *type_instance; +}; + +static int netprot_pack_value(char *ptr, int size, struct netprot_header *hdr, + struct mk_list *tdb, + struct flb_log_event_encoder *encoder) +{ + int i; + char type; + char *pval; + uint16_t count; + struct typesdb_node *node; + int result; + + if (hdr->type == NULL) { + flb_error("[in_collectd] invalid data (type is NULL)"); + return -1; + } + + /* + * Since each value uses (1 + 8) bytes, the total buffer size must + * be 2-byte header plus <count * 9> bytes. + */ + count = be16read(ptr); + if (size != 2 + count * 9) { + flb_error("[in_collectd] data corrupted (size=%i, count=%i)", + size, count); + return -1; + } + + /* + * We need to query against TypesDB in order to get field names + * for the data set values. + */ + node = typesdb_find_node(tdb, hdr->type); + if (!node) { + flb_error("[in_collectd] no such type found '%s'", hdr->type); + return -1; + } + if (node->count != count) { + flb_error("[in_collectd] invalid value for '%s' (%i != %i)", + hdr->type, node->count, count); + return -1; + } + + result = flb_log_event_encoder_begin_record(encoder); + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_set_current_timestamp(encoder); + } + + if (hdr->type != NULL && + result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_append_body_values( + encoder, + FLB_LOG_EVENT_CSTRING_VALUE("type"), + FLB_LOG_EVENT_CSTRING_VALUE(hdr->type)); + } + + if (hdr->type_instance != NULL && + result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_append_body_values( + encoder, + FLB_LOG_EVENT_CSTRING_VALUE("type_instance"), + FLB_LOG_EVENT_CSTRING_VALUE(hdr->type_instance)); + } + + if (hdr->time > 0 && + result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_append_body_values( + encoder, + FLB_LOG_EVENT_CSTRING_VALUE("time"), + FLB_LOG_EVENT_DOUBLE_VALUE(hdr->time)); + } + + if (hdr->interval > 0 && + result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_append_body_values( + encoder, + FLB_LOG_EVENT_CSTRING_VALUE("interval"), + FLB_LOG_EVENT_DOUBLE_VALUE(hdr->interval)); + } + + if (hdr->plugin != NULL && + result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_append_body_values( + encoder, + FLB_LOG_EVENT_CSTRING_VALUE("plugin"), + FLB_LOG_EVENT_CSTRING_VALUE(hdr->plugin)); + } + + if (hdr->plugin_instance != NULL && + result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_append_body_values( + encoder, + FLB_LOG_EVENT_CSTRING_VALUE("plugin_instance"), + FLB_LOG_EVENT_CSTRING_VALUE(hdr->plugin_instance)); + } + + if (hdr->host != NULL && + result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_append_body_values( + encoder, + FLB_LOG_EVENT_CSTRING_VALUE("host"), + FLB_LOG_EVENT_CSTRING_VALUE(hdr->host)); + } + + for (i = 0; i < count && result == FLB_EVENT_ENCODER_SUCCESS ; i++) { + pval = ptr + 2 + count + 8 * i; + type = ptr[2 + i]; + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_append_body_cstring( + encoder, node->fields[i]); + } + + switch (type) { + case DS_TYPE_COUNTER: + result = flb_log_event_encoder_append_body_uint64( + encoder, be64read(pval)); + break; + case DS_TYPE_GAUGE: + result = flb_log_event_encoder_append_body_double( + encoder, *((double *) pval)); + break; + case DS_TYPE_DERIVE: + result = flb_log_event_encoder_append_body_int64( + encoder, (int64_t) be64read(pval)); + break; + case DS_TYPE_ABSOLUTE: + result = flb_log_event_encoder_append_body_uint64( + encoder, be64read(pval)); + break; + default: + flb_error("[in_collectd] unknown data type %i", type); + + result = FLB_EVENT_ENCODER_ERROR_INVALID_ARGUMENT; + } + } + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_commit_record(encoder); + } + else { + flb_log_event_encoder_rollback_record(encoder); + } + + if (result != FLB_EVENT_ENCODER_SUCCESS) { + return -1; + } + + return 0; +} + +/* + * Entry point function + */ +int netprot_to_msgpack(char *buf, int len, struct mk_list *tdb, + struct flb_log_event_encoder *encoder) +{ + uint16_t part_type; + uint16_t part_len; + int size; + char *ptr; + struct netprot_header hdr = {0}; + + while (len >= 4) { + part_type = be16read(buf); + part_len = be16read(buf + 2); + + if (len < part_len) { + flb_error("[in_collectd] data truncated (%i < %i)", len, part_len); + return -1; + } + ptr = buf + 4; + size = part_len - 4; + + switch (part_type) { + case PART_HOST: + if (ptr[size] == '\0') { + hdr.host = ptr; + } + break; + case PART_TIME: + hdr.time = (double) be64read(ptr); + break; + case PART_TIME_HR: + hdr.time = hr2time(be64read(ptr)); + break; + case PART_PLUGIN: + if (ptr[size] == '\0') { + hdr.plugin = ptr; + } + break; + case PART_PLUGIN_INSTANCE: + if (ptr[size] == '\0') { + hdr.plugin_instance = ptr; + } + break; + case PART_TYPE: + if (ptr[size] == '\0') { + hdr.type = ptr; + } + break; + case PART_TYPE_INSTANCE: + if (ptr[size] == '\0') { + hdr.type_instance = ptr; + } + break; + case PART_VALUE: + if (netprot_pack_value(ptr, size, &hdr, tdb, encoder)) { + return -1; + } + break; + case PART_INTERVAL: + hdr.interval = (double) be64read(ptr); + break; + case PART_INTERVAL_HR: + hdr.interval = hr2time(be64read(ptr)); + break; + default: + flb_debug("[in_collectd] skip unknown type %x", part_type); + break; + } + len -= part_len; + buf += part_len; + } + return 0; +} diff --git a/fluent-bit/plugins/in_collectd/netprot.h b/fluent-bit/plugins/in_collectd/netprot.h new file mode 100644 index 000000000..c9292640c --- /dev/null +++ b/fluent-bit/plugins/in_collectd/netprot.h @@ -0,0 +1,22 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* Convert a binary buffer into MessagePack format */ +int netprot_to_msgpack(char *buf, int len, struct mk_list *tdb, + struct flb_log_event_encoder *encoder); diff --git a/fluent-bit/plugins/in_collectd/typesdb.c b/fluent-bit/plugins/in_collectd/typesdb.c new file mode 100644 index 000000000..ef579583c --- /dev/null +++ b/fluent-bit/plugins/in_collectd/typesdb.c @@ -0,0 +1,223 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_compat.h> +#include <fluent-bit/flb_log.h> +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/flb_str.h> + +#include "in_collectd.h" +#include "typesdb.h" +#include "typesdb_parser.h" + +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> + + +/* Internal function to load from a single TypesDB */ +static int typesdb_load(struct flb_in_collectd_config *ctx, + struct mk_list *tdb, const char *path) +{ + int fd = open(path, O_RDONLY); + if (fd < 0) { + flb_errno(); + flb_plg_error(ctx->ins, "failed to open '%s'", path); + return -1; + } + + if (typesdb_parse(tdb, fd)) { + flb_plg_error(ctx->ins, "failed to parse '%s'", path); + close(fd); + return -1; + } + close(fd); + return 0; +} + +/* + * Load multiple TypesDB files at once. The return value is + * a linked list of typesdb_node objects. + * + * "paths" is a comma-separated list of file names. + */ +struct mk_list *typesdb_load_all(struct flb_in_collectd_config *ctx, + const char *paths) +{ + char *buf; + char *state; + char *path; + struct mk_list *tdb; + + buf = flb_strdup(paths); + if (!buf) { + flb_errno(); + return NULL; + } + + tdb = flb_malloc(sizeof(struct mk_list)); + if (!tdb) { + flb_errno(); + flb_free(buf); + return NULL; + } + mk_list_init(tdb); + + path = strtok_r(buf, ",", &state); + while (path) { + if (typesdb_load(ctx, tdb, path)) { + typesdb_destroy(tdb); + flb_free(buf); + return NULL; + } + path = strtok_r(NULL, ",", &state); + } + flb_free(buf); + return tdb; +} + +void typesdb_destroy(struct mk_list *tdb) +{ + struct typesdb_node *node; + struct mk_list *head; + struct mk_list *tmp; + + mk_list_foreach_safe(head, tmp, tdb) { + node = mk_list_entry(head, struct typesdb_node, _head); + typesdb_destroy_node(node); + } + flb_free(tdb); +} + +struct typesdb_node *typesdb_find_node(struct mk_list *tdb, const char *type) +{ + struct typesdb_node *node; + struct mk_list *head; + + if (type == NULL) { + return NULL; + } + + /* + * Search the linked list from the tail so that later entries + * take precedence over earlier ones. + */ + mk_list_foreach_r(head, tdb) { + node = mk_list_entry(head, struct typesdb_node, _head); + if (strcmp(node->type, type) == 0) { + return node; + } + } + return NULL; +} + +struct typesdb_node *typesdb_last_node(struct mk_list *tdb) +{ + return mk_list_entry_last(tdb, struct typesdb_node, _head); +} + +/* + * The folloings are API functions to modify a TypesDB instance. + */ +int typesdb_add_node(struct mk_list *tdb, const char *type) +{ + struct typesdb_node *node; + + node = flb_calloc(1, sizeof(struct typesdb_node)); + if (!node) { + flb_errno(); + return -1; + } + + node->type = flb_strdup(type); + if (!node->type) { + flb_errno(); + flb_free(node); + return -1; + } + + mk_list_add(&node->_head, tdb); + return 0; +} + +void typesdb_destroy_node(struct typesdb_node *node) +{ + int i; + + flb_free(node->type); + + if (node->fields) { + for (i = 0; i < node->count; i++) { + flb_free(node->fields[i]); + } + flb_free(node->fields); + } + mk_list_del(&node->_head); + flb_free(node); +} + +int typesdb_add_field(struct typesdb_node *node, const char *field) +{ + char *end; + int alloc; + char **fields; + + end = strchr(field, ':'); + if (!end) { + return -1; + } + + if (node->count >= node->alloc) { + alloc = node->alloc > 0 ? node->alloc * 2 : 1; + fields = flb_realloc(node->fields, alloc * sizeof(char *)); + if (!fields) { + flb_errno(); + return -1; + } + node->alloc = alloc; + node->fields = fields; + } + + node->fields[node->count] = flb_strndup(field, end - field); + if (!node->fields[node->count]) { + flb_errno(); + return -1; + } + node->count++; + return 0; +} + +/* A debug function to see the content of TypesDB */ +void typesdb_dump(struct mk_list *tdb) +{ + struct mk_list *head; + struct typesdb_node *node; + int i; + + mk_list_foreach(head, tdb) { + node = mk_list_entry(head, struct typesdb_node, _head); + + printf("%s", node->type); + for (i = 0; i < node->count; i++) { + printf("\t%s", node->fields[i]); + } + putchar('\n'); + } +} diff --git a/fluent-bit/plugins/in_collectd/typesdb.h b/fluent-bit/plugins/in_collectd/typesdb.h new file mode 100644 index 000000000..7da131be0 --- /dev/null +++ b/fluent-bit/plugins/in_collectd/typesdb.h @@ -0,0 +1,45 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "in_collectd.h" + +struct typesdb_node { + char *type; + int alloc; + int count; + char **fields; + struct mk_list _head; +}; + +/* Load and destroy TypesDB */ +struct mk_list *typesdb_load_all(struct flb_in_collectd_config *ctx, + const char *paths); +void typesdb_destroy(struct mk_list *tdb); + +/* Find a node in TypesDB */ +struct typesdb_node *typesdb_find_node(struct mk_list *tdb, const char *type); +struct typesdb_node *typesdb_last_node(struct mk_list *tdb); + +/* Modify a TypesDB instance (used in typesdb_parser.c) */ +int typesdb_add_node(struct mk_list *tdb, const char *type); +void typesdb_destroy_node(struct typesdb_node *node); +int typesdb_add_field(struct typesdb_node *node, const char *field); + +/* For debugging */ +void typesdb_dump(struct mk_list *tdb); diff --git a/fluent-bit/plugins/in_collectd/typesdb_parser.c b/fluent-bit/plugins/in_collectd/typesdb_parser.c new file mode 100644 index 000000000..5e237ffaf --- /dev/null +++ b/fluent-bit/plugins/in_collectd/typesdb_parser.c @@ -0,0 +1,214 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * This file implements a collectd 5.x compatible parser for types.db(5). + * + * Note: it internally implements a finite state machine that consumes a + * single char at once, and pushes parsed tokens via typesdb_* methods. + */ + +#include <fluent-bit/flb_compat.h> +#include <fluent-bit/flb_log.h> +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/flb_str.h> + +#include "typesdb.h" +#include "typesdb_parser.h" + +#define TDB_INVALID -1 +#define TDB_INIT 0 +#define TDB_LEFT 1 +#define TDB_SEP 2 +#define TDB_RIGHT 3 +#define TDB_RIGHT_SEP 4 +#define TDB_COMMENT 5 + +/* See collectd/src/daemon/types_list.c */ +#define MAX_LINE_SIZE 4096 + +/* + * tdb_* are state functions that take a single character as input. + * They do some action based on the input and return the next state. + */ +static int tdb_init(char c, struct mk_list *tdb, char *buf) +{ + switch (c) { + case '#': + return TDB_COMMENT; + case '\r': + case '\n': + return TDB_INIT; + default: + buf[0] = c; + buf[1] = '\0'; + return TDB_LEFT; + } +} + +static int tdb_left(char c, struct mk_list *tdb, char *buf) +{ + int len; + + switch (c) { + case ' ': + if (typesdb_add_node(tdb, buf)) { + return TDB_INVALID; + } + return TDB_SEP; + case '\r': + case '\n': + return TDB_INVALID; + default: + len = strlen(buf); + if (len >= MAX_LINE_SIZE - 1) { + return TDB_INVALID; + } + buf[len] = c; + buf[++len] = '\0'; + return TDB_LEFT; + } +} + +static int tdb_sep(char c, struct mk_list *tdb, char *buf) +{ + switch (c) { + case ' ': + return TDB_SEP; + case '\r': + case '\n': + return TDB_INVALID; + default: + buf[0] = c; + buf[1] = '\0'; + return TDB_RIGHT; + } +} + +static int tdb_right(char c, struct mk_list *tdb, char *buf) +{ + int len; + struct typesdb_node *node = typesdb_last_node(tdb); + + switch (c) { + case ' ': + case ',': + if (typesdb_add_field(node, buf)) { + flb_error("[in_collectd] cannot add value '%s'", buf); + return TDB_INVALID; + } + return TDB_RIGHT_SEP; + case '\r': + case '\n': + if (typesdb_add_field(node, buf)) { + flb_error("[in_collectd] cannot add value '%s'", buf); + return TDB_INVALID; + } + return TDB_INIT; + default: + len = strlen(buf); + if (len >= MAX_LINE_SIZE - 1) { + flb_error("[in_collectd] line too long > %i", MAX_LINE_SIZE); + return TDB_INVALID; + } + buf[len] = c; + buf[++len] = '\0'; + return TDB_RIGHT; + } +} + +static int tdb_right_sep(char c, struct mk_list *tdb, char *buf) +{ + switch (c) { + case ' ': + case ',': + return TDB_RIGHT_SEP; + case '\r': + case '\n': + return TDB_INIT; + default: + buf[0] = c; + buf[1] = '\0'; + return TDB_RIGHT; + } +} + +static int tdb_comment(char c, struct mk_list *tdb, char *buf) +{ + switch (c) { + case '\r': + case '\n': + return TDB_INIT; + default: + return TDB_COMMENT; + } +} + +/* + * Entry point function + */ +int typesdb_parse(struct mk_list *tdb, int fp) +{ + char tmp[1024]; + char buf[MAX_LINE_SIZE]; + char c; + int i; + int bytes; + int state = TDB_INIT; + + while (1) { + bytes = read(fp, tmp, 1024); + if (bytes < 0) { + flb_errno(); + return bytes; + } + if (bytes == 0) { + return 0; + } + for (i = 0; i < bytes; i++) { + c = tmp[i]; + switch (state) { + case TDB_INVALID: + return -1; + case TDB_INIT: + state = tdb_init(c, tdb, buf); + break; + case TDB_LEFT: + state = tdb_left(c, tdb, buf); + break; + case TDB_SEP: + state = tdb_sep(c, tdb, buf); + break; + case TDB_RIGHT: + state = tdb_right(c, tdb, buf); + break; + case TDB_RIGHT_SEP: + state = tdb_right_sep(c, tdb, buf); + break; + case TDB_COMMENT: + state = tdb_comment(c, tdb, buf); + break; + default: + flb_error("[in_collectd] unknown state %i", state); + return -1; + } + } + } + return 0; +} diff --git a/fluent-bit/plugins/in_collectd/typesdb_parser.h b/fluent-bit/plugins/in_collectd/typesdb_parser.h new file mode 100644 index 000000000..985570134 --- /dev/null +++ b/fluent-bit/plugins/in_collectd/typesdb_parser.h @@ -0,0 +1,20 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +int typesdb_parse(struct mk_list *tdb, int fp); |