diff options
Diffstat (limited to 'src/fluent-bit/plugins/filter_kubernetes/kubernetes.c')
-rw-r--r-- | src/fluent-bit/plugins/filter_kubernetes/kubernetes.c | 1000 |
1 files changed, 1000 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/filter_kubernetes/kubernetes.c b/src/fluent-bit/plugins/filter_kubernetes/kubernetes.c new file mode 100644 index 000000000..f54e08483 --- /dev/null +++ b/src/fluent-bit/plugins/filter_kubernetes/kubernetes.c @@ -0,0 +1,1000 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_filter_plugin.h> +#include <fluent-bit/flb_filter.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_parser.h> +#include <fluent-bit/flb_unescape.h> +#include <fluent-bit/flb_log_event_decoder.h> +#include <fluent-bit/flb_log_event_encoder.h> + +#include "kube_conf.h" +#include "kube_meta.h" +#include "kube_regex.h" +#include "kube_property.h" + +#include <stdio.h> +#include <msgpack.h> + +/* Merge status used by merge_log_handler() */ +#define MERGE_NONE 0 /* merge unescaped string in temporary buffer */ +#define MERGE_PARSED 1 /* merge parsed string (log_buf) */ +#define MERGE_MAP 2 /* merge direct binary object (v) */ + +static int get_stream(msgpack_object_map map) +{ + int i; + msgpack_object k; + msgpack_object v; + + for (i = 0; i < map.size; i++) { + k = map.ptr[i].key; + v = map.ptr[i].val; + + if (k.type == MSGPACK_OBJECT_STR && + strncmp(k.via.str.ptr, "stream", k.via.str.size) == 0) { + if (strncmp(v.via.str.ptr, "stdout", v.via.str.size) == 0) { + return FLB_KUBE_PROP_STREAM_STDOUT; + } + else if (strncmp(v.via.str.ptr, "stderr", v.via.str.size) == 0) { + return FLB_KUBE_PROP_STREAM_STDERR; + } + else { + return FLB_KUBE_PROP_STREAM_UNKNOWN; + } + } + } + + return FLB_KUBE_PROP_NO_STREAM; +} + +static int value_trim_size(msgpack_object o) +{ + int i; + int size = o.via.str.size; + + for (i = size - 1; i > 0; i--) { + if (o.via.str.ptr[i] == '\n') { + size -= 1; + continue; + } + + if (o.via.str.ptr[i - 1] == '\\' && + (o.via.str.ptr[i] == 'n' || o.via.str.ptr[i] == 'r')) { + size -= 2; + i--; + } + else { + break; + } + } + + return size; +} + +static int merge_log_handler(msgpack_object o, + struct flb_parser *parser, + void **out_buf, size_t *out_size, + struct flb_time *log_time, + struct flb_kube *ctx) +{ + int ret; + int new_size; + int root_type; + int records = 0; + char *tmp; + + /* Reset vars */ + *out_buf = NULL; + *out_size = 0; + + /* Allocate more space if required */ + if (o.via.str.size >= ctx->unesc_buf_size) { + new_size = o.via.str.size + 1; + tmp = flb_realloc(ctx->unesc_buf, new_size); + if (tmp) { + ctx->unesc_buf = tmp; + ctx->unesc_buf_size = new_size; + } + else { + flb_errno(); + return -1; + } + } + + /* Copy the string value and append the required NULL byte */ + ctx->unesc_buf_len = (int) o.via.str.size; + memcpy(ctx->unesc_buf, o.via.str.ptr, o.via.str.size); + ctx->unesc_buf[ctx->unesc_buf_len] = '\0'; + + ret = -1; + + /* Parser set by Annotation */ + if (parser) { + ret = flb_parser_do(parser, ctx->unesc_buf, ctx->unesc_buf_len, + out_buf, out_size, log_time); + if (ret >= 0) { + if (flb_time_to_nanosec(log_time) == 0L) { + flb_time_get(log_time); + } + return MERGE_PARSED; + } + } + else if (ctx->merge_parser) { /* Custom parser 'merge_parser' option */ + ret = flb_parser_do(ctx->merge_parser, + ctx->unesc_buf, ctx->unesc_buf_len, + out_buf, out_size, log_time); + if (ret >= 0) { + if (flb_time_to_nanosec(log_time) == 0L) { + flb_time_get(log_time); + } + return MERGE_PARSED; + } + } + else { /* Default JSON parser */ + ret = flb_pack_json_recs(ctx->unesc_buf, ctx->unesc_buf_len, + (char **) out_buf, out_size, &root_type, + &records, NULL); + if (ret == 0 && root_type != FLB_PACK_JSON_OBJECT) { + flb_plg_debug(ctx->ins, "could not merge JSON, root_type=%i", + root_type); + flb_free(*out_buf); + return MERGE_NONE; + } + + if (ret == 0 && records != 1) { + flb_plg_debug(ctx->ins, + "could not merge JSON, invalid number of records: %i", + records); + flb_free(*out_buf); + return MERGE_NONE; + } + } + + if (ret == -1) { + return MERGE_NONE; + } + + return MERGE_PARSED; +} + +static int cb_kube_init(struct flb_filter_instance *f_ins, + struct flb_config *config, + void *data) +{ + int ret; + struct flb_kube *ctx; + (void) data; + + /* Create configuration context */ + ctx = flb_kube_conf_create(f_ins, config); + if (!ctx) { + return -1; + } + + /* Initialize regex context */ + ret = flb_kube_regex_init(ctx); + if (ret == -1) { + flb_kube_conf_destroy(ctx); + return -1; + } + + /* Set context */ + flb_filter_set_context(f_ins, ctx); + + /* + * Get Kubernetes Metadata: we gather this at the beginning + * as we need this information to process logs in Kubernetes + * environment, otherwise the service should not start. + */ + flb_kube_meta_init(ctx, config); + + return 0; +} + +static int pack_map_content(struct flb_log_event_encoder *log_encoder, + msgpack_object source_map, + const char *kube_buf, size_t kube_size, + struct flb_kube_meta *meta, + struct flb_time *time_lookup, + struct flb_parser *parser, + struct flb_kube *ctx) +{ + int append_original_objects; + int scope_opened; + int ret; + int i; + int map_size = 0; + int merge_status = -1; + int log_index = -1; + int log_buf_entries = 0; + size_t off = 0; + void *log_buf = NULL; + size_t log_size = 0; + msgpack_unpacked result; + msgpack_object k; + msgpack_object v; + msgpack_object root; + struct flb_time log_time; + + /* Original map size */ + map_size = source_map.via.map.size; + + /* If merge_log is enabled, we need to lookup the 'log' field */ + if (ctx->merge_log == FLB_TRUE) { + for (i = 0; i < map_size; i++) { + k = source_map.via.map.ptr[i].key; + + /* Validate 'log' field */ + if (k.via.str.size == 3 && + strncmp(k.via.str.ptr, "log", 3) == 0) { + log_index = i; + break; + } + } + } + + /* reset */ + flb_time_zero(&log_time); + + /* + * If a log_index exists, the application log content inside the + * Docker JSON map is a escaped string. Proceed to reserve a temporary + * buffer and create an unescaped version. + */ + if (log_index != -1) { + v = source_map.via.map.ptr[log_index].val; + if (v.type == MSGPACK_OBJECT_MAP) { + /* This is the easiest way, no extra processing required */ + merge_status = MERGE_MAP; + } + else if (v.type == MSGPACK_OBJECT_STR) { + merge_status = merge_log_handler(v, parser, + &log_buf, &log_size, + &log_time, ctx); + } + } + + /* Append record timestamp */ + if (merge_status == MERGE_PARSED) { + if (flb_time_to_nanosec(&log_time) == 0L) { + ret = flb_log_event_encoder_set_timestamp( + log_encoder, time_lookup); + } + else { + ret = flb_log_event_encoder_set_timestamp( + log_encoder, &log_time); + } + } + else { + ret = flb_log_event_encoder_set_timestamp( + log_encoder, time_lookup); + } + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return -1; + } + + /* If a merged status exists, check the number of entries to merge */ + if (log_index != -1) { + if (merge_status == MERGE_PARSED) { + off = 0; + msgpack_unpacked_init(&result); + msgpack_unpack_next(&result, log_buf, log_size, &off); + root = result.data; + if (root.type == MSGPACK_OBJECT_MAP) { + log_buf_entries = root.via.map.size; + } + msgpack_unpacked_destroy(&result); + } + else if (merge_status == MERGE_MAP) { + /* object 'v' represents the original binary log */ + log_buf_entries = v.via.map.size; + } + } + + if ((merge_status == MERGE_PARSED || merge_status == MERGE_MAP) && + ctx->keep_log == FLB_FALSE) { + } + + /* Original map */ + for (i = 0; + i < map_size && + ret == FLB_EVENT_ENCODER_SUCCESS; + i++) { + k = source_map.via.map.ptr[i].key; + v = source_map.via.map.ptr[i].val; + + /* + * If log_index is set, means a merge log is a requirement but + * will depend on merge_status. If the parsing failed we cannot + * merge so we keep the 'log' key/value. + */ + append_original_objects = FLB_FALSE; + + if (log_index == i) { + if (ctx->keep_log == FLB_TRUE) { + if (merge_status == MERGE_NONE || merge_status == MERGE_PARSED){ + ret = flb_log_event_encoder_append_body_values( + log_encoder, + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&k), + FLB_LOG_EVENT_STRING_VALUE(ctx->unesc_buf, + ctx->unesc_buf_len)); + } + else { + append_original_objects = FLB_TRUE; + } + } + else if (merge_status == MERGE_NONE) { + append_original_objects = FLB_TRUE; + } + } + else { + append_original_objects = FLB_TRUE; + } + + if (append_original_objects) { + ret = flb_log_event_encoder_append_body_values( + log_encoder, + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&k), + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&v)); + } + } + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return -2; + } + + scope_opened = FLB_FALSE; + /* Merge Log */ + if (log_index != -1) { + if (merge_status == MERGE_PARSED) { + if (ctx->merge_log_key && log_buf_entries > 0) { + ret = flb_log_event_encoder_append_body_string( + log_encoder, + ctx->merge_log_key, + flb_sds_len(ctx->merge_log_key)); + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_body_begin_map(log_encoder); + } + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return -3; + } + + scope_opened = FLB_TRUE; + } + + off = 0; + msgpack_unpacked_init(&result); + msgpack_unpack_next(&result, log_buf, log_size, &off); + root = result.data; + + for (i = 0; + i < log_buf_entries && + ret == FLB_EVENT_ENCODER_SUCCESS; + i++) { + k = root.via.map.ptr[i].key; + + ret = flb_log_event_encoder_append_body_msgpack_object( + log_encoder, &k); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return -4; + } + + v = root.via.map.ptr[i].val; + + /* + * If this is the last string value, trim any remaining + * break line or return carrier character. + */ + if (v.type == MSGPACK_OBJECT_STR && + ctx->merge_log_trim == FLB_TRUE) { + ret = flb_log_event_encoder_append_body_string( + log_encoder, + (char *) v.via.str.ptr, + value_trim_size(v)); + } + else { + ret = flb_log_event_encoder_append_body_msgpack_object( + log_encoder, &v); + } + } + + msgpack_unpacked_destroy(&result); + + flb_free(log_buf); + + if (scope_opened && ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_body_commit_map(log_encoder); + } + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return -5; + } + } + else if (merge_status == MERGE_MAP) { + msgpack_object map; + + if (ctx->merge_log_key && log_buf_entries > 0) { + ret = flb_log_event_encoder_append_body_string( + log_encoder, + ctx->merge_log_key, + flb_sds_len(ctx->merge_log_key)); + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_body_begin_map(log_encoder); + } + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return -6; + } + + scope_opened = FLB_TRUE; + } + + map = source_map.via.map.ptr[log_index].val; + for (i = 0; + i < map.via.map.size && + ret == FLB_EVENT_ENCODER_SUCCESS; + i++) { + k = map.via.map.ptr[i].key; + v = map.via.map.ptr[i].val; + + ret = flb_log_event_encoder_append_body_values( + log_encoder, + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&k), + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&v)); + } + + if (scope_opened && ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_body_commit_map(log_encoder); + } + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return -7; + } + } + } + + /* Kubernetes */ + if (kube_buf && kube_size > 0) { + ret = flb_log_event_encoder_append_body_cstring( + log_encoder, + "kubernetes"); + + off = 0; + msgpack_unpacked_init(&result); + msgpack_unpack_next(&result, kube_buf, kube_size, &off); + + if (kube_size != off) { + /* This buffer should contain a single map and we shouldn't + * have to unpack it in order to ensure that we are appending + * a single map but considering that the current code only + * appends the first entry without taking any actions I think + * we should warn the user if there is more than one entry in + * it so in the future we can remove the unpack code and just + * use flb_log_event_encoder_append_body_raw_msgpack with + * kube_size. + */ + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_raw_msgpack(log_encoder, + (char *) kube_buf, off); + } + + msgpack_unpacked_destroy(&result); + } + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return -8; + } + + return 0; +} + +static int cb_kube_filter(const void *data, size_t bytes, + const char *tag, int tag_len, + void **out_buf, size_t *out_bytes, + struct flb_filter_instance *f_ins, + struct flb_input_instance *i_ins, + void *filter_context, + struct flb_config *config) +{ + int ret; + size_t pre = 0; + size_t off = 0; + char *dummy_cache_buf = NULL; + const char *cache_buf = NULL; + size_t cache_size = 0; + msgpack_object map; + struct flb_parser *parser = NULL; + struct flb_kube *ctx = filter_context; + struct flb_kube_meta meta = {0}; + struct flb_kube_props props = {0}; + struct flb_log_event_encoder log_encoder; + struct flb_log_event_decoder log_decoder; + struct flb_log_event log_event; + + (void) f_ins; + (void) i_ins; + (void) config; + + if (ctx->use_journal == FLB_FALSE || ctx->dummy_meta == FLB_TRUE) { + if (ctx->dummy_meta == FLB_TRUE) { + ret = flb_kube_dummy_meta_get(&dummy_cache_buf, &cache_size); + cache_buf = dummy_cache_buf; + } + else { + /* Check if we have some cached metadata for the incoming events */ + ret = flb_kube_meta_get(ctx, + tag, tag_len, + data, bytes, + &cache_buf, &cache_size, &meta, &props); + } + if (ret == -1) { + return FLB_FILTER_NOTOUCH; + } + } + + ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); + + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_plg_error(ctx->ins, + "Log event decoder initialization error : %d", ret); + + flb_kube_meta_release(&meta); + flb_kube_prop_destroy(&props); + + return FLB_FILTER_NOTOUCH; + } + + ret = flb_log_event_encoder_init(&log_encoder, + FLB_LOG_EVENT_FORMAT_DEFAULT); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_error(ctx->ins, + "Log event encoder initialization error : %d", ret); + + flb_log_event_decoder_destroy(&log_decoder); + flb_kube_meta_release(&meta); + flb_kube_prop_destroy(&props); + + return FLB_FILTER_NOTOUCH; + } + + while ((ret = flb_log_event_decoder_next( + &log_decoder, + &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + off = log_decoder.offset; + /* + * Journal entries can be origined by different Pods, so we are forced + * to parse and check it metadata. + * + * note: when the source is in_tail the situation is different since all + * records passed to the filter have a unique source log file. + */ + if (ctx->use_journal == FLB_TRUE && ctx->dummy_meta == FLB_FALSE) { + ret = flb_kube_meta_get(ctx, + tag, tag_len, + (char *) data + pre, off - pre, + &cache_buf, &cache_size, &meta, &props); + if (ret == -1) { + continue; + } + + pre = off; + } + + parser = NULL; + + switch (get_stream(log_event.body->via.map)) { + case FLB_KUBE_PROP_STREAM_STDOUT: + { + if (props.stdout_exclude == FLB_TRUE) { + /* Skip this record */ + if (ctx->use_journal == FLB_TRUE) { + flb_kube_meta_release(&meta); + flb_kube_prop_destroy(&props); + } + continue; + } + if (props.stdout_parser != NULL) { + parser = flb_parser_get(props.stdout_parser, config); + } + } + break; + case FLB_KUBE_PROP_STREAM_STDERR: + { + if (props.stderr_exclude == FLB_TRUE) { + /* Skip this record */ + if (ctx->use_journal == FLB_TRUE) { + flb_kube_meta_release(&meta); + flb_kube_prop_destroy(&props); + } + continue; + } + if (props.stderr_parser != NULL) { + parser = flb_parser_get(props.stderr_parser, config); + } + } + break; + default: + { + if (props.stdout_exclude == props.stderr_exclude && + props.stderr_exclude == FLB_TRUE) { + continue; + } + if (props.stdout_parser == props.stderr_parser && + props.stderr_parser != NULL) { + parser = flb_parser_get(props.stdout_parser, config); + } + } + break; + } + + /* get records map */ + map = *log_event.body; + + ret = flb_log_event_encoder_begin_record(&log_encoder); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + break; + } + + ret = pack_map_content(&log_encoder, + map, + cache_buf, cache_size, + &meta, &log_event.timestamp, parser, ctx); + if (ret != 0) { + flb_log_event_decoder_destroy(&log_decoder); + flb_log_event_encoder_destroy(&log_encoder); + + if (ctx->dummy_meta == FLB_TRUE) { + flb_free(dummy_cache_buf); + } + + flb_kube_meta_release(&meta); + flb_kube_prop_destroy(&props); + + return FLB_FILTER_NOTOUCH; + } + + ret = flb_log_event_encoder_commit_record(&log_encoder); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_rollback_record(&log_encoder); + + break; + } + + if (ctx->use_journal == FLB_TRUE) { + flb_kube_meta_release(&meta); + flb_kube_prop_destroy(&props); + } + } + + /* Release meta fields */ + if (ctx->use_journal == FLB_FALSE) { + flb_kube_meta_release(&meta); + flb_kube_prop_destroy(&props); + } + + if (ctx->dummy_meta == FLB_TRUE) { + flb_free(dummy_cache_buf); + } + + *out_buf = log_encoder.output_buffer; + *out_bytes = log_encoder.output_length; + + flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder); + + flb_log_event_decoder_destroy(&log_decoder); + flb_log_event_encoder_destroy(&log_encoder); + + return FLB_FILTER_MODIFIED; +} + +static int cb_kube_exit(void *data, struct flb_config *config) +{ + struct flb_kube *ctx; + + ctx = data; + flb_kube_conf_destroy(ctx); + + return 0; +} + +/* Configuration properties map */ +static struct flb_config_map config_map[] = { + + /* Buffer size for HTTP Client when reading responses from API Server */ + { + FLB_CONFIG_MAP_SIZE, "buffer_size", "32K", + 0, FLB_TRUE, offsetof(struct flb_kube, buffer_size), + "buffer size to receive response from API server", + }, + + /* TLS: set debug 'level' */ + { + FLB_CONFIG_MAP_INT, "tls.debug", "0", + 0, FLB_TRUE, offsetof(struct flb_kube, tls_debug), + "set TLS debug level: 0 (no debug), 1 (error), " + "2 (state change), 3 (info) and 4 (verbose)" + }, + + /* TLS: enable verification */ + { + FLB_CONFIG_MAP_BOOL, "tls.verify", "true", + 0, FLB_TRUE, offsetof(struct flb_kube, tls_verify), + "enable or disable verification of TLS peer certificate" + }, + + /* TLS: set tls.vhost feature */ + { + FLB_CONFIG_MAP_STR, "tls.vhost", NULL, + 0, FLB_TRUE, offsetof(struct flb_kube, tls_vhost), + "set optional TLS virtual host" + }, + + /* Merge structured record as independent keys */ + { + FLB_CONFIG_MAP_BOOL, "merge_log", "false", + 0, FLB_TRUE, offsetof(struct flb_kube, merge_log), + "merge 'log' key content as individual keys" + }, + + /* Optional parser for 'log' key content */ + { + FLB_CONFIG_MAP_STR, "merge_parser", NULL, + 0, FLB_FALSE, 0, + "specify a 'parser' name to parse the 'log' key content" + }, + + /* New key name to merge the structured content of 'log' */ + { + FLB_CONFIG_MAP_STR, "merge_log_key", NULL, + 0, FLB_TRUE, offsetof(struct flb_kube, merge_log_key), + "set the 'key' name where the content of 'key' will be placed. Only " + "used if the option 'merge_log' is enabled" + }, + + /* On merge, trim field values (remove possible ending \n or \r) */ + { + FLB_CONFIG_MAP_BOOL, "merge_log_trim", "true", + 0, FLB_TRUE, offsetof(struct flb_kube, merge_log_trim), + "remove ending '\\n' or '\\r' characters from the log content" + }, + + /* Keep original log key after successful merging/parsing */ + { + FLB_CONFIG_MAP_BOOL, "keep_log", "true", + 0, FLB_TRUE, offsetof(struct flb_kube, keep_log), + "keep original log content if it was successfully parsed and merged" + }, + + /* Full Kubernetes API server URL */ + { + FLB_CONFIG_MAP_STR, "kube_url", "https://kubernetes.default.svc", + 0, FLB_FALSE, 0, + "Kubernetes API server URL" + }, + + /* + * If set, meta-data load will be attempted from files in this dir, + * falling back to API if not existing. + */ + { + FLB_CONFIG_MAP_STR, "kube_meta_preload_cache_dir", NULL, + 0, FLB_TRUE, offsetof(struct flb_kube, meta_preload_cache_dir), + "set directory with metadata files" + }, + + /* Kubernetes TLS: CA file */ + { + FLB_CONFIG_MAP_STR, "kube_ca_file", FLB_KUBE_CA, + 0, FLB_TRUE, offsetof(struct flb_kube, tls_ca_file), + "Kubernetes TLS CA file" + }, + + /* Kubernetes TLS: CA certs path */ + { + FLB_CONFIG_MAP_STR, "kube_ca_path", NULL, + 0, FLB_TRUE, offsetof(struct flb_kube, tls_ca_path), + "Kubernetes TLS ca path" + }, + + /* Kubernetes Tag prefix */ + { + FLB_CONFIG_MAP_STR, "kube_tag_prefix", FLB_KUBE_TAG_PREFIX, + 0, FLB_TRUE, offsetof(struct flb_kube, kube_tag_prefix), + "prefix used in tag by the input plugin" + }, + + /* Kubernetes Token file */ + { + FLB_CONFIG_MAP_STR, "kube_token_file", FLB_KUBE_TOKEN, + 0, FLB_TRUE, offsetof(struct flb_kube, token_file), + "Kubernetes authorization token file" + }, + + /* Kubernetes Token command */ + { + FLB_CONFIG_MAP_STR, "kube_token_command", NULL, + 0, FLB_FALSE, 0, + "command to get Kubernetes authorization token" + }, + + /* Include Kubernetes Labels in the final record ? */ + { + FLB_CONFIG_MAP_BOOL, "labels", "true", + 0, FLB_TRUE, offsetof(struct flb_kube, labels), + "include Kubernetes labels on every record" + }, + + /* Include Kubernetes Annotations in the final record ? */ + { + FLB_CONFIG_MAP_BOOL, "annotations", "true", + 0, FLB_TRUE, offsetof(struct flb_kube, annotations), + "include Kubernetes annotations on every record" + }, + + /* + * The Application may 'propose' special configuration keys + * to the logging agent (Fluent Bit) through the annotations + * set in the Pod definition, e.g: + * + * "annotations": { + * "logging": {"parser": "apache"} + * } + * + * As of now, Fluent Bit/filter_kubernetes supports the following + * options under the 'logging' map value: + * + * - k8s-logging.parser: propose Fluent Bit to parse the content + * using the pre-defined parser in the + * value (e.g: apache). + * - k8s-logging.exclude: Fluent Bit allows Pods to exclude themselves + * + * By default all options are disabled, so each option needs to + * be enabled manually. + */ + { + FLB_CONFIG_MAP_BOOL, "k8s-logging.parser", "false", + 0, FLB_TRUE, offsetof(struct flb_kube, k8s_logging_parser), + "allow Pods to suggest a parser" + }, + { + FLB_CONFIG_MAP_BOOL, "k8s-logging.exclude", "false", + 0, FLB_TRUE, offsetof(struct flb_kube, k8s_logging_exclude), + "allow Pods to exclude themselves from the logging pipeline" + }, + + /* Use Systemd Journal mode ? */ + { + FLB_CONFIG_MAP_BOOL, "use_journal", "false", + 0, FLB_TRUE, offsetof(struct flb_kube, use_journal), + "use Journald (Systemd) mode" + }, + + /* Custom Tag Regex */ + { + FLB_CONFIG_MAP_STR, "regex_parser", NULL, + 0, FLB_FALSE, 0, + "optional regex parser to extract metadata from container name or container log file name" + }, + + /* Generate dummy metadata (only for test/dev purposes) */ + { + FLB_CONFIG_MAP_BOOL, "dummy_meta", "false", + 0, FLB_TRUE, offsetof(struct flb_kube, dummy_meta), + "use 'dummy' metadata, do not talk to API server" + }, + + /* + * Poll DNS status to mitigate unreliable network issues. + * See fluent/fluent-bit/2144. + */ + { + FLB_CONFIG_MAP_INT, "dns_retries", "6", + 0, FLB_TRUE, offsetof(struct flb_kube, dns_retries), + "dns lookup retries N times until the network start working" + }, + + { + FLB_CONFIG_MAP_TIME, "dns_wait_time", "30", + 0, FLB_TRUE, offsetof(struct flb_kube, dns_wait_time), + "dns interval between network status checks" + }, + /* Fetch K8s meta when docker_id has changed */ + { + FLB_CONFIG_MAP_BOOL, "cache_use_docker_id", "false", + 0, FLB_TRUE, offsetof(struct flb_kube, cache_use_docker_id), + "fetch K8s meta when docker_id is changed" + }, + + { + FLB_CONFIG_MAP_BOOL, "use_tag_for_meta", "false", + 0, FLB_TRUE, offsetof(struct flb_kube, use_tag_for_meta), + "use tag associated to retrieve metadata instead of kube-server" + }, + + /* + * Enable the feature for using kubelet to get pods information + */ + { + FLB_CONFIG_MAP_BOOL, "use_kubelet", "false", + 0, FLB_TRUE, offsetof(struct flb_kube, use_kubelet), + "use kubelet to get metadata instead of kube-server" + }, + /* + * The kubelet host for /pods endpoint, default is 127.0.0.1 + * Will only check when "use_kubelet" config is set to true + */ + { + FLB_CONFIG_MAP_STR, "kubelet_host", "127.0.0.1", + 0, FLB_TRUE, offsetof(struct flb_kube, kubelet_host), + "kubelet host to connect with when using kubelet" + }, + /* + * The kubelet port for /pods endpoint, default is 10250 + * Will only check when "use_kubelet" config is set to true + */ + { + FLB_CONFIG_MAP_INT, "kubelet_port", "10250", + 0, FLB_TRUE, offsetof(struct flb_kube, kubelet_port), + "kubelet port to connect with when using kubelet" + }, + { + FLB_CONFIG_MAP_TIME, "kube_token_ttl", "10m", + 0, FLB_TRUE, offsetof(struct flb_kube, kube_token_ttl), + "kubernetes token ttl, until it is reread from the token file. Default: 10m" + }, + /* + * Set TTL for K8s cached metadata + */ + { + FLB_CONFIG_MAP_TIME, "kube_meta_cache_ttl", "0", + 0, FLB_TRUE, offsetof(struct flb_kube, kube_meta_cache_ttl), + "configurable TTL for K8s cached metadata. " + "By default, it is set to 0 which means TTL for cache entries is disabled and " + "cache entries are evicted at random when capacity is reached. " + "In order to enable this option, you should set the number to a time interval. " + "For example, set this value to 60 or 60s and cache entries " + "which have been created more than 60s will be evicted" + }, + /* EOF */ + {0} +}; + +struct flb_filter_plugin filter_kubernetes_plugin = { + .name = "kubernetes", + .description = "Filter to append Kubernetes metadata", + .cb_init = cb_kube_init, + .cb_filter = cb_kube_filter, + .cb_exit = cb_kube_exit, + .config_map = config_map, + .flags = 0 +}; |