diff options
Diffstat (limited to 'fluent-bit/plugins/in_tail/tail_dockermode.c')
-rw-r--r-- | fluent-bit/plugins/in_tail/tail_dockermode.c | 459 |
1 files changed, 459 insertions, 0 deletions
diff --git a/fluent-bit/plugins/in_tail/tail_dockermode.c b/fluent-bit/plugins/in_tail/tail_dockermode.c new file mode 100644 index 000000000..a8f20f9cd --- /dev/null +++ b/fluent-bit/plugins/in_tail/tail_dockermode.c @@ -0,0 +1,459 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_unescape.h> + +#include "tail_config.h" +#include "tail_dockermode.h" +#include "tail_file_internal.h" + +int flb_tail_dmode_create(struct flb_tail_config *ctx, + struct flb_input_instance *ins, + struct flb_config *config) +{ + const char *tmp; + + if (ctx->multiline == FLB_TRUE) { + flb_plg_error(ctx->ins, "Docker mode cannot be enabled when multiline " + "is enabled"); + return -1; + } + +#ifdef FLB_HAVE_REGEX + /* First line Parser */ + tmp = flb_input_get_property("docker_mode_parser", ins); + if (tmp) { + ctx->docker_mode_parser = flb_parser_get(tmp, config); + if (!ctx->docker_mode_parser) { + flb_plg_error(ctx->ins, "parser '%s' is not registered", tmp); + } + } + else { + ctx->docker_mode_parser = NULL; + } +#endif + + tmp = flb_input_get_property("docker_mode_flush", ins); + if (!tmp) { + ctx->docker_mode_flush = FLB_TAIL_DMODE_FLUSH; + } + else { + ctx->docker_mode_flush = atoi(tmp); + if (ctx->docker_mode_flush <= 0) { + ctx->docker_mode_flush = 1; + } + } + + return 0; +} + +static int modify_json_cond(char *js, size_t js_len, + char **val, size_t *val_len, + char **out, size_t *out_len, + int cond(char*, size_t), + int mod(char*, size_t, char**, size_t*, void*), void *data) +{ + int ret; + struct flb_pack_state state; + jsmntok_t *t; + jsmntok_t *t_val = NULL; + int i; + int i_root = -1; + int i_key = -1; + char *old_val; + size_t old_val_len; + char *new_val = NULL; + size_t new_val_len = 0; + size_t mod_len; + + ret = flb_pack_state_init(&state); + if (ret != 0) { + ret = -1; + goto modify_json_cond_end; + } + + ret = flb_json_tokenise(js, js_len, &state); + if (ret != 0 || state.tokens_count == 0) { + ret = -1; + goto modify_json_cond_end; + } + + for (i = 0; i < state.tokens_count; i++) { + t = &state.tokens[i]; + + if (i_key >= 0) { + if (t->parent == i_key) { + if (t->type == JSMN_STRING) { + t_val = t; + } + break; + } + continue; + } + + if (t->start == 0 && t->parent == -1 && t->type == JSMN_OBJECT) { + i_root = i; + continue; + } + if (i_root == -1) { + continue; + } + + if (t->parent == i_root && t->type == JSMN_STRING && t->end - t->start == 3 && strncmp(js + t->start, "log", 3) == 0) { + i_key = i; + } + } + + if (!t_val) { + ret = -1; + goto modify_json_cond_end; + } + + *out = js; + *out_len = js_len; + + if (val) { + *val = js + t_val->start; + } + if (val_len) { + *val_len = t_val->end - t_val->start; + } + + if (!cond || cond(js + t_val->start, t_val->end - t_val->start)) { + old_val = js + t_val->start; + old_val_len = t_val->end - t_val->start; + ret = mod(old_val, old_val_len, &new_val, &new_val_len, data); + if (ret != 0) { + ret = -1; + goto modify_json_cond_end; + } + + ret = 1; + + if (new_val == old_val) { + goto modify_json_cond_end; + } + + mod_len = js_len + new_val_len - old_val_len; + *out = flb_malloc(mod_len); + if (!*out) { + flb_errno(); + flb_free(new_val); + ret = -1; + goto modify_json_cond_end; + } + *out_len = mod_len; + + memcpy(*out, js, t_val->start); + memcpy(*out + t_val->start, new_val, new_val_len); + memcpy(*out + t_val->start + new_val_len, js + t_val->end, js_len - t_val->end); + + flb_free(new_val); + } + + modify_json_cond_end: + flb_pack_state_reset(&state); + if (ret < 0) { + *out = NULL; + } + return ret; +} + +static int unesc_ends_with_nl(char *str, size_t len) +{ + char* unesc; + int unesc_len; + int nl; + + unesc = flb_malloc(len + 1); + if (!unesc) { + flb_errno(); + return FLB_FALSE; + } + unesc_len = flb_unescape_string(str, len, &unesc); + nl = unesc[unesc_len - 1] == '\n'; + flb_free(unesc); + return nl; +} + +static int prepend_sds_to_str(char *str, size_t len, char **out, size_t *out_len, void *data) +{ + flb_sds_t sds = data; + + if (flb_sds_len(sds) == 0) { + *out = str; + *out_len = len; + return 0; + } + + size_t mod_len = flb_sds_len(sds) + len; + *out = flb_malloc(mod_len); + if (!*out) { + flb_errno(); + return -1; + } + *out_len = mod_len; + + memcpy(*out, sds, flb_sds_len(sds)); + memcpy(*out + flb_sds_len(sds), str, len); + return 0; +} + +static int use_sds(char *str, size_t len, char **out, size_t *out_len, void *data) +{ + flb_sds_t sds = data; + size_t mod_len = flb_sds_len(sds); + *out = flb_malloc(mod_len); + if (!*out) { + flb_errno(); + return -1; + } + *out_len = mod_len; + + memcpy(*out, sds, flb_sds_len(sds)); + return 0; +} + +int flb_tail_dmode_process_content(time_t now, + char* line, size_t line_len, + char **repl_line, size_t *repl_line_len, + struct flb_tail_file *file, + struct flb_tail_config *ctx) +{ + char* val = NULL; + size_t val_len; + int ret; + void *out_buf = NULL; + size_t out_size; + struct flb_time out_time = {0}; + *repl_line = NULL; + *repl_line_len = 0; + flb_sds_t tmp; + flb_sds_t tmp_copy; + +#ifdef FLB_HAVE_REGEX + if (ctx->docker_mode_parser) { + ret = flb_parser_do(ctx->docker_mode_parser, line, line_len, + &out_buf, &out_size, &out_time); + flb_free(out_buf); + + /* + * Set dmode_firstline if the line meets the first-line requirement + */ + if(ret >= 0) { + file->dmode_firstline = true; + } + + /* + * Process if buffer contains full log line + */ + if (flb_sds_len(file->dmode_lastline) > 0 && file->dmode_complete) { + /* + * Buffered log should be flushed out + * as current line meets first-line requirement + */ + if(ret >= 0) { + flb_tail_dmode_flush(file, ctx); + } + + /* + * Flush the buffer if multiline has not been detected yet + */ + if (!file->dmode_firstline) { + flb_tail_dmode_flush(file, ctx); + } + } + } +#endif + + ret = modify_json_cond(line, line_len, + &val, &val_len, + repl_line, repl_line_len, + unesc_ends_with_nl, + prepend_sds_to_str, file->dmode_buf); + if (ret >= 0) { + /* line is a valid json */ + flb_sds_len_set(file->dmode_lastline, 0); + + /* concatenate current log line with buffered one */ + tmp = flb_sds_cat(file->dmode_buf, val, val_len); + if (!tmp) { + flb_errno(); + return -1; + } + file->dmode_buf = tmp; + + tmp_copy = flb_sds_copy(file->dmode_lastline, line, line_len); + if (!tmp_copy) { + flb_errno(); + return -1; + } + + file->dmode_lastline = tmp_copy; + file->dmode_flush_timeout = now + (ctx->docker_mode_flush - 1); + + if (ret == 0) { + /* Line not ended with newline */ + file->dmode_complete = false; + } + else { + /* Line ended with newline */ + file->dmode_complete = true; +#ifdef FLB_HAVE_REGEX + if (!ctx->docker_mode_parser) { + flb_tail_dmode_flush(file, ctx); + } +#else + flb_tail_dmode_flush(file, ctx); +#endif + } + } + return ret; +} + +void flb_tail_dmode_flush(struct flb_tail_file *file, struct flb_tail_config *ctx) +{ + int ret; + char *repl_line = NULL; + size_t repl_line_len = 0; + void *out_buf = NULL; + size_t out_size; + struct flb_time out_time = {0}; + time_t now = time(NULL); + + if (flb_sds_len(file->dmode_lastline) == 0) { + return; + } + + flb_time_zero(&out_time); + + ret = modify_json_cond(file->dmode_lastline, + flb_sds_len(file->dmode_lastline), + NULL, NULL, + &repl_line, &repl_line_len, + NULL, + use_sds, file->dmode_buf); + if (ret < 0) { + return; + } + + flb_sds_len_set(file->dmode_buf, 0); + flb_sds_len_set(file->dmode_lastline, 0); + file->dmode_flush_timeout = 0; + +#ifdef FLB_HAVE_REGEX + if (ctx->parser) { + ret = flb_parser_do(ctx->parser, repl_line, repl_line_len, + &out_buf, &out_size, &out_time); + if (ret >= 0) { + if (flb_time_to_double(&out_time) == 0) { + flb_time_get(&out_time); + } + if (ctx->ignore_older > 0 && (now - ctx->ignore_older) > out_time.tm.tv_sec) { + goto dmode_flush_end; + } + + flb_tail_pack_line_map(&out_time, (char**) &out_buf, &out_size, file, 0); + + goto dmode_flush_end; + } + } +#endif + + flb_tail_file_pack_line(NULL, repl_line, repl_line_len, file, 0); + + dmode_flush_end: + flb_free(repl_line); + flb_free(out_buf); +} + +static void file_pending_flush(struct flb_tail_config *ctx, + struct flb_tail_file *file, time_t now) +{ + if (file->dmode_flush_timeout > now) { + return; + } + + if (flb_sds_len(file->dmode_lastline) == 0) { + return; + } + + flb_tail_dmode_flush(file, ctx); + + if (file->sl_log_event_encoder->output_length > 0) { + flb_input_log_append(ctx->ins, + file->tag_buf, + file->tag_len, + file->sl_log_event_encoder->output_buffer, + file->sl_log_event_encoder->output_length); + + flb_log_event_encoder_reset(file->sl_log_event_encoder); + } +} + +int flb_tail_dmode_pending_flush_all(struct flb_tail_config *ctx) +{ + time_t expired; + struct mk_list *head; + struct flb_tail_file *file; + + expired = time(NULL) + 3600; + + /* Iterate promoted event files with pending bytes */ + mk_list_foreach(head, &ctx->files_static) { + file = mk_list_entry(head, struct flb_tail_file, _head); + file_pending_flush(ctx, file, expired); + } + + /* Iterate promoted event files with pending bytes */ + mk_list_foreach(head, &ctx->files_event) { + file = mk_list_entry(head, struct flb_tail_file, _head); + file_pending_flush(ctx, file, expired); + } + + return 0; +} + +int flb_tail_dmode_pending_flush(struct flb_input_instance *ins, + struct flb_config *config, void *context) +{ + time_t now; + struct mk_list *head; + struct flb_tail_file *file; + struct flb_tail_config *ctx = context; + + now = time(NULL); + + /* Iterate static event files with pending bytes */ + mk_list_foreach(head, &ctx->files_static) { + file = mk_list_entry(head, struct flb_tail_file, _head); + file_pending_flush(ctx, file, now); + } + + /* Iterate promoted event files with pending bytes */ + mk_list_foreach(head, &ctx->files_event) { + file = mk_list_entry(head, struct flb_tail_file, _head); + file_pending_flush(ctx, file, now); + } + + return 0; +} |