summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/in_collectd/netprot.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/plugins/in_collectd/netprot.c')
-rw-r--r--src/fluent-bit/plugins/in_collectd/netprot.c308
1 files changed, 308 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/in_collectd/netprot.c b/src/fluent-bit/plugins/in_collectd/netprot.c
new file mode 100644
index 000000000..005db2edb
--- /dev/null
+++ b/src/fluent-bit/plugins/in_collectd/netprot.c
@@ -0,0 +1,308 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * This module implements the binary network protocol of collectd.
+ * (https://collectd.org/wiki/index.php/Binary_protocol)
+ *
+ * The only interface you need to care is netprot_to_msgpack() that
+ * parses a UDP packet and converts it into MessagePack format.
+ */
+
+#include <fluent-bit/flb_compat.h>
+#include <fluent-bit/flb_log.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_endian.h>
+#include <fluent-bit/flb_log_event_encoder.h>
+#include <msgpack.h>
+#include "netprot.h"
+#include "typesdb.h"
+
+#define be16read(x) (be16toh(*(uint16_t *) (x)))
+#define be32read(x) (be32toh(*(uint32_t *) (x)))
+#define be64read(x) (be64toh(*(uint64_t *) (x)))
+
+#define le16read(x) (le16toh(*(uint16_t *) (x)))
+#define le32read(x) (le32toh(*(uint32_t *) (x)))
+#define le64read(x) (le64toh(*(uint64_t *) (x)))
+
+/* Convert a high-resolution time into a normal UNIX time. */
+#define hr2time(x) ((double) (x) / 1073741824)
+
+/* Basic data field definitions for collectd */
+#define PART_HOST 0x0000
+#define PART_TIME 0x0001
+#define PART_PLUGIN 0x0002
+#define PART_PLUGIN_INSTANCE 0x0003
+#define PART_TYPE 0x0004
+#define PART_TYPE_INSTANCE 0x0005
+#define PART_VALUE 0x0006
+#define PART_INTERVAL 0x0007
+
+#define PART_TIME_HR 0x0008
+#define PART_INTERVAL_HR 0x0009
+
+/*
+ * The "DS_TYPE_*" are value types for PART_VALUE fields.
+ *
+ * Read https://collectd.org/wiki/index.php/Data_source for what
+ * these types mean.
+ */
+#define DS_TYPE_COUNTER 0
+#define DS_TYPE_GAUGE 1
+#define DS_TYPE_DERIVE 2
+#define DS_TYPE_ABSOLUTE 3
+
+struct netprot_header
+{
+ double time;
+ double interval;
+ char *host;
+ char *plugin;
+ char *plugin_instance;
+ char *type;
+ char *type_instance;
+};
+
+static int netprot_pack_value(char *ptr, int size, struct netprot_header *hdr,
+ struct mk_list *tdb,
+ struct flb_log_event_encoder *encoder)
+{
+ int i;
+ char type;
+ char *pval;
+ uint16_t count;
+ struct typesdb_node *node;
+ int result;
+
+ if (hdr->type == NULL) {
+ flb_error("[in_collectd] invalid data (type is NULL)");
+ return -1;
+ }
+
+ /*
+ * Since each value uses (1 + 8) bytes, the total buffer size must
+ * be 2-byte header plus <count * 9> bytes.
+ */
+ count = be16read(ptr);
+ if (size != 2 + count * 9) {
+ flb_error("[in_collectd] data corrupted (size=%i, count=%i)",
+ size, count);
+ return -1;
+ }
+
+ /*
+ * We need to query against TypesDB in order to get field names
+ * for the data set values.
+ */
+ node = typesdb_find_node(tdb, hdr->type);
+ if (!node) {
+ flb_error("[in_collectd] no such type found '%s'", hdr->type);
+ return -1;
+ }
+ if (node->count != count) {
+ flb_error("[in_collectd] invalid value for '%s' (%i != %i)",
+ hdr->type, node->count, count);
+ return -1;
+ }
+
+ result = flb_log_event_encoder_begin_record(encoder);
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_set_current_timestamp(encoder);
+ }
+
+ if (hdr->type != NULL &&
+ result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_append_body_values(
+ encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE("type"),
+ FLB_LOG_EVENT_CSTRING_VALUE(hdr->type));
+ }
+
+ if (hdr->type_instance != NULL &&
+ result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_append_body_values(
+ encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE("type_instance"),
+ FLB_LOG_EVENT_CSTRING_VALUE(hdr->type_instance));
+ }
+
+ if (hdr->time > 0 &&
+ result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_append_body_values(
+ encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE("time"),
+ FLB_LOG_EVENT_DOUBLE_VALUE(hdr->time));
+ }
+
+ if (hdr->interval > 0 &&
+ result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_append_body_values(
+ encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE("interval"),
+ FLB_LOG_EVENT_DOUBLE_VALUE(hdr->interval));
+ }
+
+ if (hdr->plugin != NULL &&
+ result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_append_body_values(
+ encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE("plugin"),
+ FLB_LOG_EVENT_CSTRING_VALUE(hdr->plugin));
+ }
+
+ if (hdr->plugin_instance != NULL &&
+ result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_append_body_values(
+ encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE("plugin_instance"),
+ FLB_LOG_EVENT_CSTRING_VALUE(hdr->plugin_instance));
+ }
+
+ if (hdr->host != NULL &&
+ result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_append_body_values(
+ encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE("host"),
+ FLB_LOG_EVENT_CSTRING_VALUE(hdr->host));
+ }
+
+ for (i = 0; i < count && result == FLB_EVENT_ENCODER_SUCCESS ; i++) {
+ pval = ptr + 2 + count + 8 * i;
+ type = ptr[2 + i];
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_append_body_cstring(
+ encoder, node->fields[i]);
+ }
+
+ switch (type) {
+ case DS_TYPE_COUNTER:
+ result = flb_log_event_encoder_append_body_uint64(
+ encoder, be64read(pval));
+ break;
+ case DS_TYPE_GAUGE:
+ result = flb_log_event_encoder_append_body_double(
+ encoder, *((double *) pval));
+ break;
+ case DS_TYPE_DERIVE:
+ result = flb_log_event_encoder_append_body_int64(
+ encoder, (int64_t) be64read(pval));
+ break;
+ case DS_TYPE_ABSOLUTE:
+ result = flb_log_event_encoder_append_body_uint64(
+ encoder, be64read(pval));
+ break;
+ default:
+ flb_error("[in_collectd] unknown data type %i", type);
+
+ result = FLB_EVENT_ENCODER_ERROR_INVALID_ARGUMENT;
+ }
+ }
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_commit_record(encoder);
+ }
+ else {
+ flb_log_event_encoder_rollback_record(encoder);
+ }
+
+ if (result != FLB_EVENT_ENCODER_SUCCESS) {
+ return -1;
+ }
+
+ return 0;
+}
+
+/*
+ * Entry point function
+ */
+int netprot_to_msgpack(char *buf, int len, struct mk_list *tdb,
+ struct flb_log_event_encoder *encoder)
+{
+ uint16_t part_type;
+ uint16_t part_len;
+ int size;
+ char *ptr;
+ struct netprot_header hdr = {0};
+
+ while (len >= 4) {
+ part_type = be16read(buf);
+ part_len = be16read(buf + 2);
+
+ if (len < part_len) {
+ flb_error("[in_collectd] data truncated (%i < %i)", len, part_len);
+ return -1;
+ }
+ ptr = buf + 4;
+ size = part_len - 4;
+
+ switch (part_type) {
+ case PART_HOST:
+ if (ptr[size] == '\0') {
+ hdr.host = ptr;
+ }
+ break;
+ case PART_TIME:
+ hdr.time = (double) be64read(ptr);
+ break;
+ case PART_TIME_HR:
+ hdr.time = hr2time(be64read(ptr));
+ break;
+ case PART_PLUGIN:
+ if (ptr[size] == '\0') {
+ hdr.plugin = ptr;
+ }
+ break;
+ case PART_PLUGIN_INSTANCE:
+ if (ptr[size] == '\0') {
+ hdr.plugin_instance = ptr;
+ }
+ break;
+ case PART_TYPE:
+ if (ptr[size] == '\0') {
+ hdr.type = ptr;
+ }
+ break;
+ case PART_TYPE_INSTANCE:
+ if (ptr[size] == '\0') {
+ hdr.type_instance = ptr;
+ }
+ break;
+ case PART_VALUE:
+ if (netprot_pack_value(ptr, size, &hdr, tdb, encoder)) {
+ return -1;
+ }
+ break;
+ case PART_INTERVAL:
+ hdr.interval = (double) be64read(ptr);
+ break;
+ case PART_INTERVAL_HR:
+ hdr.interval = hr2time(be64read(ptr));
+ break;
+ default:
+ flb_debug("[in_collectd] skip unknown type %x", part_type);
+ break;
+ }
+ len -= part_len;
+ buf += part_len;
+ }
+ return 0;
+}