summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/filter_kubernetes/kubernetes.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/plugins/filter_kubernetes/kubernetes.c')
-rw-r--r--src/fluent-bit/plugins/filter_kubernetes/kubernetes.c1000
1 files changed, 1000 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/filter_kubernetes/kubernetes.c b/src/fluent-bit/plugins/filter_kubernetes/kubernetes.c
new file mode 100644
index 000000000..f54e08483
--- /dev/null
+++ b/src/fluent-bit/plugins/filter_kubernetes/kubernetes.c
@@ -0,0 +1,1000 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_filter_plugin.h>
+#include <fluent-bit/flb_filter.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_parser.h>
+#include <fluent-bit/flb_unescape.h>
+#include <fluent-bit/flb_log_event_decoder.h>
+#include <fluent-bit/flb_log_event_encoder.h>
+
+#include "kube_conf.h"
+#include "kube_meta.h"
+#include "kube_regex.h"
+#include "kube_property.h"
+
+#include <stdio.h>
+#include <msgpack.h>
+
+/* Merge status used by merge_log_handler() */
+#define MERGE_NONE 0 /* merge unescaped string in temporary buffer */
+#define MERGE_PARSED 1 /* merge parsed string (log_buf) */
+#define MERGE_MAP 2 /* merge direct binary object (v) */
+
+static int get_stream(msgpack_object_map map)
+{
+ int i;
+ msgpack_object k;
+ msgpack_object v;
+
+ for (i = 0; i < map.size; i++) {
+ k = map.ptr[i].key;
+ v = map.ptr[i].val;
+
+ if (k.type == MSGPACK_OBJECT_STR &&
+ strncmp(k.via.str.ptr, "stream", k.via.str.size) == 0) {
+ if (strncmp(v.via.str.ptr, "stdout", v.via.str.size) == 0) {
+ return FLB_KUBE_PROP_STREAM_STDOUT;
+ }
+ else if (strncmp(v.via.str.ptr, "stderr", v.via.str.size) == 0) {
+ return FLB_KUBE_PROP_STREAM_STDERR;
+ }
+ else {
+ return FLB_KUBE_PROP_STREAM_UNKNOWN;
+ }
+ }
+ }
+
+ return FLB_KUBE_PROP_NO_STREAM;
+}
+
+static int value_trim_size(msgpack_object o)
+{
+ int i;
+ int size = o.via.str.size;
+
+ for (i = size - 1; i > 0; i--) {
+ if (o.via.str.ptr[i] == '\n') {
+ size -= 1;
+ continue;
+ }
+
+ if (o.via.str.ptr[i - 1] == '\\' &&
+ (o.via.str.ptr[i] == 'n' || o.via.str.ptr[i] == 'r')) {
+ size -= 2;
+ i--;
+ }
+ else {
+ break;
+ }
+ }
+
+ return size;
+}
+
+static int merge_log_handler(msgpack_object o,
+ struct flb_parser *parser,
+ void **out_buf, size_t *out_size,
+ struct flb_time *log_time,
+ struct flb_kube *ctx)
+{
+ int ret;
+ int new_size;
+ int root_type;
+ int records = 0;
+ char *tmp;
+
+ /* Reset vars */
+ *out_buf = NULL;
+ *out_size = 0;
+
+ /* Allocate more space if required */
+ if (o.via.str.size >= ctx->unesc_buf_size) {
+ new_size = o.via.str.size + 1;
+ tmp = flb_realloc(ctx->unesc_buf, new_size);
+ if (tmp) {
+ ctx->unesc_buf = tmp;
+ ctx->unesc_buf_size = new_size;
+ }
+ else {
+ flb_errno();
+ return -1;
+ }
+ }
+
+ /* Copy the string value and append the required NULL byte */
+ ctx->unesc_buf_len = (int) o.via.str.size;
+ memcpy(ctx->unesc_buf, o.via.str.ptr, o.via.str.size);
+ ctx->unesc_buf[ctx->unesc_buf_len] = '\0';
+
+ ret = -1;
+
+ /* Parser set by Annotation */
+ if (parser) {
+ ret = flb_parser_do(parser, ctx->unesc_buf, ctx->unesc_buf_len,
+ out_buf, out_size, log_time);
+ if (ret >= 0) {
+ if (flb_time_to_nanosec(log_time) == 0L) {
+ flb_time_get(log_time);
+ }
+ return MERGE_PARSED;
+ }
+ }
+ else if (ctx->merge_parser) { /* Custom parser 'merge_parser' option */
+ ret = flb_parser_do(ctx->merge_parser,
+ ctx->unesc_buf, ctx->unesc_buf_len,
+ out_buf, out_size, log_time);
+ if (ret >= 0) {
+ if (flb_time_to_nanosec(log_time) == 0L) {
+ flb_time_get(log_time);
+ }
+ return MERGE_PARSED;
+ }
+ }
+ else { /* Default JSON parser */
+ ret = flb_pack_json_recs(ctx->unesc_buf, ctx->unesc_buf_len,
+ (char **) out_buf, out_size, &root_type,
+ &records, NULL);
+ if (ret == 0 && root_type != FLB_PACK_JSON_OBJECT) {
+ flb_plg_debug(ctx->ins, "could not merge JSON, root_type=%i",
+ root_type);
+ flb_free(*out_buf);
+ return MERGE_NONE;
+ }
+
+ if (ret == 0 && records != 1) {
+ flb_plg_debug(ctx->ins,
+ "could not merge JSON, invalid number of records: %i",
+ records);
+ flb_free(*out_buf);
+ return MERGE_NONE;
+ }
+ }
+
+ if (ret == -1) {
+ return MERGE_NONE;
+ }
+
+ return MERGE_PARSED;
+}
+
+static int cb_kube_init(struct flb_filter_instance *f_ins,
+ struct flb_config *config,
+ void *data)
+{
+ int ret;
+ struct flb_kube *ctx;
+ (void) data;
+
+ /* Create configuration context */
+ ctx = flb_kube_conf_create(f_ins, config);
+ if (!ctx) {
+ return -1;
+ }
+
+ /* Initialize regex context */
+ ret = flb_kube_regex_init(ctx);
+ if (ret == -1) {
+ flb_kube_conf_destroy(ctx);
+ return -1;
+ }
+
+ /* Set context */
+ flb_filter_set_context(f_ins, ctx);
+
+ /*
+ * Get Kubernetes Metadata: we gather this at the beginning
+ * as we need this information to process logs in Kubernetes
+ * environment, otherwise the service should not start.
+ */
+ flb_kube_meta_init(ctx, config);
+
+ return 0;
+}
+
+static int pack_map_content(struct flb_log_event_encoder *log_encoder,
+ msgpack_object source_map,
+ const char *kube_buf, size_t kube_size,
+ struct flb_kube_meta *meta,
+ struct flb_time *time_lookup,
+ struct flb_parser *parser,
+ struct flb_kube *ctx)
+{
+ int append_original_objects;
+ int scope_opened;
+ int ret;
+ int i;
+ int map_size = 0;
+ int merge_status = -1;
+ int log_index = -1;
+ int log_buf_entries = 0;
+ size_t off = 0;
+ void *log_buf = NULL;
+ size_t log_size = 0;
+ msgpack_unpacked result;
+ msgpack_object k;
+ msgpack_object v;
+ msgpack_object root;
+ struct flb_time log_time;
+
+ /* Original map size */
+ map_size = source_map.via.map.size;
+
+ /* If merge_log is enabled, we need to lookup the 'log' field */
+ if (ctx->merge_log == FLB_TRUE) {
+ for (i = 0; i < map_size; i++) {
+ k = source_map.via.map.ptr[i].key;
+
+ /* Validate 'log' field */
+ if (k.via.str.size == 3 &&
+ strncmp(k.via.str.ptr, "log", 3) == 0) {
+ log_index = i;
+ break;
+ }
+ }
+ }
+
+ /* reset */
+ flb_time_zero(&log_time);
+
+ /*
+ * If a log_index exists, the application log content inside the
+ * Docker JSON map is a escaped string. Proceed to reserve a temporary
+ * buffer and create an unescaped version.
+ */
+ if (log_index != -1) {
+ v = source_map.via.map.ptr[log_index].val;
+ if (v.type == MSGPACK_OBJECT_MAP) {
+ /* This is the easiest way, no extra processing required */
+ merge_status = MERGE_MAP;
+ }
+ else if (v.type == MSGPACK_OBJECT_STR) {
+ merge_status = merge_log_handler(v, parser,
+ &log_buf, &log_size,
+ &log_time, ctx);
+ }
+ }
+
+ /* Append record timestamp */
+ if (merge_status == MERGE_PARSED) {
+ if (flb_time_to_nanosec(&log_time) == 0L) {
+ ret = flb_log_event_encoder_set_timestamp(
+ log_encoder, time_lookup);
+ }
+ else {
+ ret = flb_log_event_encoder_set_timestamp(
+ log_encoder, &log_time);
+ }
+ }
+ else {
+ ret = flb_log_event_encoder_set_timestamp(
+ log_encoder, time_lookup);
+ }
+
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ return -1;
+ }
+
+ /* If a merged status exists, check the number of entries to merge */
+ if (log_index != -1) {
+ if (merge_status == MERGE_PARSED) {
+ off = 0;
+ msgpack_unpacked_init(&result);
+ msgpack_unpack_next(&result, log_buf, log_size, &off);
+ root = result.data;
+ if (root.type == MSGPACK_OBJECT_MAP) {
+ log_buf_entries = root.via.map.size;
+ }
+ msgpack_unpacked_destroy(&result);
+ }
+ else if (merge_status == MERGE_MAP) {
+ /* object 'v' represents the original binary log */
+ log_buf_entries = v.via.map.size;
+ }
+ }
+
+ if ((merge_status == MERGE_PARSED || merge_status == MERGE_MAP) &&
+ ctx->keep_log == FLB_FALSE) {
+ }
+
+ /* Original map */
+ for (i = 0;
+ i < map_size &&
+ ret == FLB_EVENT_ENCODER_SUCCESS;
+ i++) {
+ k = source_map.via.map.ptr[i].key;
+ v = source_map.via.map.ptr[i].val;
+
+ /*
+ * If log_index is set, means a merge log is a requirement but
+ * will depend on merge_status. If the parsing failed we cannot
+ * merge so we keep the 'log' key/value.
+ */
+ append_original_objects = FLB_FALSE;
+
+ if (log_index == i) {
+ if (ctx->keep_log == FLB_TRUE) {
+ if (merge_status == MERGE_NONE || merge_status == MERGE_PARSED){
+ ret = flb_log_event_encoder_append_body_values(
+ log_encoder,
+ FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&k),
+ FLB_LOG_EVENT_STRING_VALUE(ctx->unesc_buf,
+ ctx->unesc_buf_len));
+ }
+ else {
+ append_original_objects = FLB_TRUE;
+ }
+ }
+ else if (merge_status == MERGE_NONE) {
+ append_original_objects = FLB_TRUE;
+ }
+ }
+ else {
+ append_original_objects = FLB_TRUE;
+ }
+
+ if (append_original_objects) {
+ ret = flb_log_event_encoder_append_body_values(
+ log_encoder,
+ FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&k),
+ FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&v));
+ }
+ }
+
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ return -2;
+ }
+
+ scope_opened = FLB_FALSE;
+ /* Merge Log */
+ if (log_index != -1) {
+ if (merge_status == MERGE_PARSED) {
+ if (ctx->merge_log_key && log_buf_entries > 0) {
+ ret = flb_log_event_encoder_append_body_string(
+ log_encoder,
+ ctx->merge_log_key,
+ flb_sds_len(ctx->merge_log_key));
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_body_begin_map(log_encoder);
+ }
+
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ return -3;
+ }
+
+ scope_opened = FLB_TRUE;
+ }
+
+ off = 0;
+ msgpack_unpacked_init(&result);
+ msgpack_unpack_next(&result, log_buf, log_size, &off);
+ root = result.data;
+
+ for (i = 0;
+ i < log_buf_entries &&
+ ret == FLB_EVENT_ENCODER_SUCCESS;
+ i++) {
+ k = root.via.map.ptr[i].key;
+
+ ret = flb_log_event_encoder_append_body_msgpack_object(
+ log_encoder, &k);
+
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ return -4;
+ }
+
+ v = root.via.map.ptr[i].val;
+
+ /*
+ * If this is the last string value, trim any remaining
+ * break line or return carrier character.
+ */
+ if (v.type == MSGPACK_OBJECT_STR &&
+ ctx->merge_log_trim == FLB_TRUE) {
+ ret = flb_log_event_encoder_append_body_string(
+ log_encoder,
+ (char *) v.via.str.ptr,
+ value_trim_size(v));
+ }
+ else {
+ ret = flb_log_event_encoder_append_body_msgpack_object(
+ log_encoder, &v);
+ }
+ }
+
+ msgpack_unpacked_destroy(&result);
+
+ flb_free(log_buf);
+
+ if (scope_opened && ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_body_commit_map(log_encoder);
+ }
+
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ return -5;
+ }
+ }
+ else if (merge_status == MERGE_MAP) {
+ msgpack_object map;
+
+ if (ctx->merge_log_key && log_buf_entries > 0) {
+ ret = flb_log_event_encoder_append_body_string(
+ log_encoder,
+ ctx->merge_log_key,
+ flb_sds_len(ctx->merge_log_key));
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_body_begin_map(log_encoder);
+ }
+
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ return -6;
+ }
+
+ scope_opened = FLB_TRUE;
+ }
+
+ map = source_map.via.map.ptr[log_index].val;
+ for (i = 0;
+ i < map.via.map.size &&
+ ret == FLB_EVENT_ENCODER_SUCCESS;
+ i++) {
+ k = map.via.map.ptr[i].key;
+ v = map.via.map.ptr[i].val;
+
+ ret = flb_log_event_encoder_append_body_values(
+ log_encoder,
+ FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&k),
+ FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&v));
+ }
+
+ if (scope_opened && ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_body_commit_map(log_encoder);
+ }
+
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ return -7;
+ }
+ }
+ }
+
+ /* Kubernetes */
+ if (kube_buf && kube_size > 0) {
+ ret = flb_log_event_encoder_append_body_cstring(
+ log_encoder,
+ "kubernetes");
+
+ off = 0;
+ msgpack_unpacked_init(&result);
+ msgpack_unpack_next(&result, kube_buf, kube_size, &off);
+
+ if (kube_size != off) {
+ /* This buffer should contain a single map and we shouldn't
+ * have to unpack it in order to ensure that we are appending
+ * a single map but considering that the current code only
+ * appends the first entry without taking any actions I think
+ * we should warn the user if there is more than one entry in
+ * it so in the future we can remove the unpack code and just
+ * use flb_log_event_encoder_append_body_raw_msgpack with
+ * kube_size.
+ */
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_append_body_raw_msgpack(log_encoder,
+ (char *) kube_buf, off);
+ }
+
+ msgpack_unpacked_destroy(&result);
+ }
+
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ return -8;
+ }
+
+ return 0;
+}
+
+static int cb_kube_filter(const void *data, size_t bytes,
+ const char *tag, int tag_len,
+ void **out_buf, size_t *out_bytes,
+ struct flb_filter_instance *f_ins,
+ struct flb_input_instance *i_ins,
+ void *filter_context,
+ struct flb_config *config)
+{
+ int ret;
+ size_t pre = 0;
+ size_t off = 0;
+ char *dummy_cache_buf = NULL;
+ const char *cache_buf = NULL;
+ size_t cache_size = 0;
+ msgpack_object map;
+ struct flb_parser *parser = NULL;
+ struct flb_kube *ctx = filter_context;
+ struct flb_kube_meta meta = {0};
+ struct flb_kube_props props = {0};
+ struct flb_log_event_encoder log_encoder;
+ struct flb_log_event_decoder log_decoder;
+ struct flb_log_event log_event;
+
+ (void) f_ins;
+ (void) i_ins;
+ (void) config;
+
+ if (ctx->use_journal == FLB_FALSE || ctx->dummy_meta == FLB_TRUE) {
+ if (ctx->dummy_meta == FLB_TRUE) {
+ ret = flb_kube_dummy_meta_get(&dummy_cache_buf, &cache_size);
+ cache_buf = dummy_cache_buf;
+ }
+ else {
+ /* Check if we have some cached metadata for the incoming events */
+ ret = flb_kube_meta_get(ctx,
+ tag, tag_len,
+ data, bytes,
+ &cache_buf, &cache_size, &meta, &props);
+ }
+ if (ret == -1) {
+ return FLB_FILTER_NOTOUCH;
+ }
+ }
+
+ ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
+
+ if (ret != FLB_EVENT_DECODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event decoder initialization error : %d", ret);
+
+ flb_kube_meta_release(&meta);
+ flb_kube_prop_destroy(&props);
+
+ return FLB_FILTER_NOTOUCH;
+ }
+
+ ret = flb_log_event_encoder_init(&log_encoder,
+ FLB_LOG_EVENT_FORMAT_DEFAULT);
+
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event encoder initialization error : %d", ret);
+
+ flb_log_event_decoder_destroy(&log_decoder);
+ flb_kube_meta_release(&meta);
+ flb_kube_prop_destroy(&props);
+
+ return FLB_FILTER_NOTOUCH;
+ }
+
+ while ((ret = flb_log_event_decoder_next(
+ &log_decoder,
+ &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
+ off = log_decoder.offset;
+ /*
+ * Journal entries can be origined by different Pods, so we are forced
+ * to parse and check it metadata.
+ *
+ * note: when the source is in_tail the situation is different since all
+ * records passed to the filter have a unique source log file.
+ */
+ if (ctx->use_journal == FLB_TRUE && ctx->dummy_meta == FLB_FALSE) {
+ ret = flb_kube_meta_get(ctx,
+ tag, tag_len,
+ (char *) data + pre, off - pre,
+ &cache_buf, &cache_size, &meta, &props);
+ if (ret == -1) {
+ continue;
+ }
+
+ pre = off;
+ }
+
+ parser = NULL;
+
+ switch (get_stream(log_event.body->via.map)) {
+ case FLB_KUBE_PROP_STREAM_STDOUT:
+ {
+ if (props.stdout_exclude == FLB_TRUE) {
+ /* Skip this record */
+ if (ctx->use_journal == FLB_TRUE) {
+ flb_kube_meta_release(&meta);
+ flb_kube_prop_destroy(&props);
+ }
+ continue;
+ }
+ if (props.stdout_parser != NULL) {
+ parser = flb_parser_get(props.stdout_parser, config);
+ }
+ }
+ break;
+ case FLB_KUBE_PROP_STREAM_STDERR:
+ {
+ if (props.stderr_exclude == FLB_TRUE) {
+ /* Skip this record */
+ if (ctx->use_journal == FLB_TRUE) {
+ flb_kube_meta_release(&meta);
+ flb_kube_prop_destroy(&props);
+ }
+ continue;
+ }
+ if (props.stderr_parser != NULL) {
+ parser = flb_parser_get(props.stderr_parser, config);
+ }
+ }
+ break;
+ default:
+ {
+ if (props.stdout_exclude == props.stderr_exclude &&
+ props.stderr_exclude == FLB_TRUE) {
+ continue;
+ }
+ if (props.stdout_parser == props.stderr_parser &&
+ props.stderr_parser != NULL) {
+ parser = flb_parser_get(props.stdout_parser, config);
+ }
+ }
+ break;
+ }
+
+ /* get records map */
+ map = *log_event.body;
+
+ ret = flb_log_event_encoder_begin_record(&log_encoder);
+
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ break;
+ }
+
+ ret = pack_map_content(&log_encoder,
+ map,
+ cache_buf, cache_size,
+ &meta, &log_event.timestamp, parser, ctx);
+ if (ret != 0) {
+ flb_log_event_decoder_destroy(&log_decoder);
+ flb_log_event_encoder_destroy(&log_encoder);
+
+ if (ctx->dummy_meta == FLB_TRUE) {
+ flb_free(dummy_cache_buf);
+ }
+
+ flb_kube_meta_release(&meta);
+ flb_kube_prop_destroy(&props);
+
+ return FLB_FILTER_NOTOUCH;
+ }
+
+ ret = flb_log_event_encoder_commit_record(&log_encoder);
+
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ flb_log_event_encoder_rollback_record(&log_encoder);
+
+ break;
+ }
+
+ if (ctx->use_journal == FLB_TRUE) {
+ flb_kube_meta_release(&meta);
+ flb_kube_prop_destroy(&props);
+ }
+ }
+
+ /* Release meta fields */
+ if (ctx->use_journal == FLB_FALSE) {
+ flb_kube_meta_release(&meta);
+ flb_kube_prop_destroy(&props);
+ }
+
+ if (ctx->dummy_meta == FLB_TRUE) {
+ flb_free(dummy_cache_buf);
+ }
+
+ *out_buf = log_encoder.output_buffer;
+ *out_bytes = log_encoder.output_length;
+
+ flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder);
+
+ flb_log_event_decoder_destroy(&log_decoder);
+ flb_log_event_encoder_destroy(&log_encoder);
+
+ return FLB_FILTER_MODIFIED;
+}
+
+static int cb_kube_exit(void *data, struct flb_config *config)
+{
+ struct flb_kube *ctx;
+
+ ctx = data;
+ flb_kube_conf_destroy(ctx);
+
+ return 0;
+}
+
+/* Configuration properties map */
+static struct flb_config_map config_map[] = {
+
+ /* Buffer size for HTTP Client when reading responses from API Server */
+ {
+ FLB_CONFIG_MAP_SIZE, "buffer_size", "32K",
+ 0, FLB_TRUE, offsetof(struct flb_kube, buffer_size),
+ "buffer size to receive response from API server",
+ },
+
+ /* TLS: set debug 'level' */
+ {
+ FLB_CONFIG_MAP_INT, "tls.debug", "0",
+ 0, FLB_TRUE, offsetof(struct flb_kube, tls_debug),
+ "set TLS debug level: 0 (no debug), 1 (error), "
+ "2 (state change), 3 (info) and 4 (verbose)"
+ },
+
+ /* TLS: enable verification */
+ {
+ FLB_CONFIG_MAP_BOOL, "tls.verify", "true",
+ 0, FLB_TRUE, offsetof(struct flb_kube, tls_verify),
+ "enable or disable verification of TLS peer certificate"
+ },
+
+ /* TLS: set tls.vhost feature */
+ {
+ FLB_CONFIG_MAP_STR, "tls.vhost", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_kube, tls_vhost),
+ "set optional TLS virtual host"
+ },
+
+ /* Merge structured record as independent keys */
+ {
+ FLB_CONFIG_MAP_BOOL, "merge_log", "false",
+ 0, FLB_TRUE, offsetof(struct flb_kube, merge_log),
+ "merge 'log' key content as individual keys"
+ },
+
+ /* Optional parser for 'log' key content */
+ {
+ FLB_CONFIG_MAP_STR, "merge_parser", NULL,
+ 0, FLB_FALSE, 0,
+ "specify a 'parser' name to parse the 'log' key content"
+ },
+
+ /* New key name to merge the structured content of 'log' */
+ {
+ FLB_CONFIG_MAP_STR, "merge_log_key", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_kube, merge_log_key),
+ "set the 'key' name where the content of 'key' will be placed. Only "
+ "used if the option 'merge_log' is enabled"
+ },
+
+ /* On merge, trim field values (remove possible ending \n or \r) */
+ {
+ FLB_CONFIG_MAP_BOOL, "merge_log_trim", "true",
+ 0, FLB_TRUE, offsetof(struct flb_kube, merge_log_trim),
+ "remove ending '\\n' or '\\r' characters from the log content"
+ },
+
+ /* Keep original log key after successful merging/parsing */
+ {
+ FLB_CONFIG_MAP_BOOL, "keep_log", "true",
+ 0, FLB_TRUE, offsetof(struct flb_kube, keep_log),
+ "keep original log content if it was successfully parsed and merged"
+ },
+
+ /* Full Kubernetes API server URL */
+ {
+ FLB_CONFIG_MAP_STR, "kube_url", "https://kubernetes.default.svc",
+ 0, FLB_FALSE, 0,
+ "Kubernetes API server URL"
+ },
+
+ /*
+ * If set, meta-data load will be attempted from files in this dir,
+ * falling back to API if not existing.
+ */
+ {
+ FLB_CONFIG_MAP_STR, "kube_meta_preload_cache_dir", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_kube, meta_preload_cache_dir),
+ "set directory with metadata files"
+ },
+
+ /* Kubernetes TLS: CA file */
+ {
+ FLB_CONFIG_MAP_STR, "kube_ca_file", FLB_KUBE_CA,
+ 0, FLB_TRUE, offsetof(struct flb_kube, tls_ca_file),
+ "Kubernetes TLS CA file"
+ },
+
+ /* Kubernetes TLS: CA certs path */
+ {
+ FLB_CONFIG_MAP_STR, "kube_ca_path", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_kube, tls_ca_path),
+ "Kubernetes TLS ca path"
+ },
+
+ /* Kubernetes Tag prefix */
+ {
+ FLB_CONFIG_MAP_STR, "kube_tag_prefix", FLB_KUBE_TAG_PREFIX,
+ 0, FLB_TRUE, offsetof(struct flb_kube, kube_tag_prefix),
+ "prefix used in tag by the input plugin"
+ },
+
+ /* Kubernetes Token file */
+ {
+ FLB_CONFIG_MAP_STR, "kube_token_file", FLB_KUBE_TOKEN,
+ 0, FLB_TRUE, offsetof(struct flb_kube, token_file),
+ "Kubernetes authorization token file"
+ },
+
+ /* Kubernetes Token command */
+ {
+ FLB_CONFIG_MAP_STR, "kube_token_command", NULL,
+ 0, FLB_FALSE, 0,
+ "command to get Kubernetes authorization token"
+ },
+
+ /* Include Kubernetes Labels in the final record ? */
+ {
+ FLB_CONFIG_MAP_BOOL, "labels", "true",
+ 0, FLB_TRUE, offsetof(struct flb_kube, labels),
+ "include Kubernetes labels on every record"
+ },
+
+ /* Include Kubernetes Annotations in the final record ? */
+ {
+ FLB_CONFIG_MAP_BOOL, "annotations", "true",
+ 0, FLB_TRUE, offsetof(struct flb_kube, annotations),
+ "include Kubernetes annotations on every record"
+ },
+
+ /*
+ * The Application may 'propose' special configuration keys
+ * to the logging agent (Fluent Bit) through the annotations
+ * set in the Pod definition, e.g:
+ *
+ * "annotations": {
+ * "logging": {"parser": "apache"}
+ * }
+ *
+ * As of now, Fluent Bit/filter_kubernetes supports the following
+ * options under the 'logging' map value:
+ *
+ * - k8s-logging.parser: propose Fluent Bit to parse the content
+ * using the pre-defined parser in the
+ * value (e.g: apache).
+ * - k8s-logging.exclude: Fluent Bit allows Pods to exclude themselves
+ *
+ * By default all options are disabled, so each option needs to
+ * be enabled manually.
+ */
+ {
+ FLB_CONFIG_MAP_BOOL, "k8s-logging.parser", "false",
+ 0, FLB_TRUE, offsetof(struct flb_kube, k8s_logging_parser),
+ "allow Pods to suggest a parser"
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "k8s-logging.exclude", "false",
+ 0, FLB_TRUE, offsetof(struct flb_kube, k8s_logging_exclude),
+ "allow Pods to exclude themselves from the logging pipeline"
+ },
+
+ /* Use Systemd Journal mode ? */
+ {
+ FLB_CONFIG_MAP_BOOL, "use_journal", "false",
+ 0, FLB_TRUE, offsetof(struct flb_kube, use_journal),
+ "use Journald (Systemd) mode"
+ },
+
+ /* Custom Tag Regex */
+ {
+ FLB_CONFIG_MAP_STR, "regex_parser", NULL,
+ 0, FLB_FALSE, 0,
+ "optional regex parser to extract metadata from container name or container log file name"
+ },
+
+ /* Generate dummy metadata (only for test/dev purposes) */
+ {
+ FLB_CONFIG_MAP_BOOL, "dummy_meta", "false",
+ 0, FLB_TRUE, offsetof(struct flb_kube, dummy_meta),
+ "use 'dummy' metadata, do not talk to API server"
+ },
+
+ /*
+ * Poll DNS status to mitigate unreliable network issues.
+ * See fluent/fluent-bit/2144.
+ */
+ {
+ FLB_CONFIG_MAP_INT, "dns_retries", "6",
+ 0, FLB_TRUE, offsetof(struct flb_kube, dns_retries),
+ "dns lookup retries N times until the network start working"
+ },
+
+ {
+ FLB_CONFIG_MAP_TIME, "dns_wait_time", "30",
+ 0, FLB_TRUE, offsetof(struct flb_kube, dns_wait_time),
+ "dns interval between network status checks"
+ },
+ /* Fetch K8s meta when docker_id has changed */
+ {
+ FLB_CONFIG_MAP_BOOL, "cache_use_docker_id", "false",
+ 0, FLB_TRUE, offsetof(struct flb_kube, cache_use_docker_id),
+ "fetch K8s meta when docker_id is changed"
+ },
+
+ {
+ FLB_CONFIG_MAP_BOOL, "use_tag_for_meta", "false",
+ 0, FLB_TRUE, offsetof(struct flb_kube, use_tag_for_meta),
+ "use tag associated to retrieve metadata instead of kube-server"
+ },
+
+ /*
+ * Enable the feature for using kubelet to get pods information
+ */
+ {
+ FLB_CONFIG_MAP_BOOL, "use_kubelet", "false",
+ 0, FLB_TRUE, offsetof(struct flb_kube, use_kubelet),
+ "use kubelet to get metadata instead of kube-server"
+ },
+ /*
+ * The kubelet host for /pods endpoint, default is 127.0.0.1
+ * Will only check when "use_kubelet" config is set to true
+ */
+ {
+ FLB_CONFIG_MAP_STR, "kubelet_host", "127.0.0.1",
+ 0, FLB_TRUE, offsetof(struct flb_kube, kubelet_host),
+ "kubelet host to connect with when using kubelet"
+ },
+ /*
+ * The kubelet port for /pods endpoint, default is 10250
+ * Will only check when "use_kubelet" config is set to true
+ */
+ {
+ FLB_CONFIG_MAP_INT, "kubelet_port", "10250",
+ 0, FLB_TRUE, offsetof(struct flb_kube, kubelet_port),
+ "kubelet port to connect with when using kubelet"
+ },
+ {
+ FLB_CONFIG_MAP_TIME, "kube_token_ttl", "10m",
+ 0, FLB_TRUE, offsetof(struct flb_kube, kube_token_ttl),
+ "kubernetes token ttl, until it is reread from the token file. Default: 10m"
+ },
+ /*
+ * Set TTL for K8s cached metadata
+ */
+ {
+ FLB_CONFIG_MAP_TIME, "kube_meta_cache_ttl", "0",
+ 0, FLB_TRUE, offsetof(struct flb_kube, kube_meta_cache_ttl),
+ "configurable TTL for K8s cached metadata. "
+ "By default, it is set to 0 which means TTL for cache entries is disabled and "
+ "cache entries are evicted at random when capacity is reached. "
+ "In order to enable this option, you should set the number to a time interval. "
+ "For example, set this value to 60 or 60s and cache entries "
+ "which have been created more than 60s will be evicted"
+ },
+ /* EOF */
+ {0}
+};
+
+struct flb_filter_plugin filter_kubernetes_plugin = {
+ .name = "kubernetes",
+ .description = "Filter to append Kubernetes metadata",
+ .cb_init = cb_kube_init,
+ .cb_filter = cb_kube_filter,
+ .cb_exit = cb_kube_exit,
+ .config_map = config_map,
+ .flags = 0
+};