diff options
Diffstat (limited to 'fluent-bit/src/fluent-bit.c')
-rw-r--r-- | fluent-bit/src/fluent-bit.c | 1417 |
1 files changed, 1417 insertions, 0 deletions
diff --git a/fluent-bit/src/fluent-bit.c b/fluent-bit/src/fluent-bit.c new file mode 100644 index 000000000..51b814cfd --- /dev/null +++ b/fluent-bit/src/fluent-bit.c @@ -0,0 +1,1417 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define _GNU_SOURCE + +#include <stdio.h> +#include <stdint.h> +#include <stdlib.h> +#include <string.h> +#include <signal.h> +#include <ctype.h> + +#include <cfl/cfl.h> +#include <cfl/cfl_array.h> +#include <cfl/cfl_kvlist.h> + +#include <fluent-bit/flb_compat.h> +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_dump.h> +#include <fluent-bit/flb_stacktrace.h> +#include <fluent-bit/flb_env.h> +#include <fluent-bit/flb_macros.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_meta.h> +#include <fluent-bit/flb_config.h> +#include <fluent-bit/flb_version.h> +#include <fluent-bit/flb_error.h> +#include <fluent-bit/flb_custom.h> +#include <fluent-bit/flb_input.h> +#include <fluent-bit/flb_output.h> +#include <fluent-bit/flb_filter.h> +#include <fluent-bit/flb_processor.h> +#include <fluent-bit/flb_engine.h> +#include <fluent-bit/flb_str.h> +#include <fluent-bit/flb_slist.h> +#include <fluent-bit/flb_plugin.h> +#include <fluent-bit/flb_parser.h> +#include <fluent-bit/flb_lib.h> +#include <fluent-bit/flb_help.h> +#include <fluent-bit/flb_record_accessor.h> +#include <fluent-bit/flb_ra_key.h> +#include <fluent-bit/flb_kv.h> +#include <fluent-bit/flb_reload.h> +#include <fluent-bit/flb_config_format.h> + +#ifdef FLB_HAVE_MTRACE +#include <mcheck.h> +#endif + +#ifdef FLB_SYSTEM_WINDOWS +extern int win32_main(int, char**); +extern void win32_started(void); +#endif + +flb_ctx_t *ctx; +struct flb_config *config; +volatile sig_atomic_t exit_signal = 0; +volatile sig_atomic_t flb_bin_restarting = FLB_RELOAD_IDLE; + +#ifdef FLB_HAVE_LIBBACKTRACE +struct flb_stacktrace flb_st; +#endif + +#ifdef FLB_HAVE_CHUNK_TRACE + +#include <fluent-bit/flb_chunk_trace.h> + +#define FLB_LONG_TRACE (1024 + 1) +#define FLB_LONG_TRACE_INPUT (1024 + 2) +#define FLB_LONG_TRACE_OUTPUT (1024 + 3) +#define FLB_LONG_TRACE_OUTPUT_PROPERTY (1024 + 4) + +#endif + +#define FLB_HELP_TEXT 0 +#define FLB_HELP_JSON 1 + + +#define PLUGIN_CUSTOM 0 +#define PLUGIN_INPUT 1 +#define PLUGIN_OUTPUT 2 +#define PLUGIN_FILTER 3 + +#define print_opt(a, b) printf(" %-24s%s\n", a, b) +#define print_opt_i(a, b, c) printf(" %-24s%s (default: %i)\n", a, b, c) +#define print_opt_s(a, b, c) printf(" %-24s%s (default: %s)\n", a, b, c) + +#define get_key(a, b, c) mk_rconf_section_get_key(a, b, c) +#define n_get_key(a, b, c) (intptr_t) get_key(a, b, c) +#define s_get_key(a, b, c) (char *) get_key(a, b, c) + +static char *prog_name; + +static void flb_signal_init(); + +static void flb_help(int rc, struct flb_config *config) +{ + struct mk_list *head; + struct flb_input_plugin *in; + struct flb_output_plugin *out; + struct flb_filter_plugin *filter; + struct flb_processor_plugin *processor; + + printf("Usage: %s [OPTION]\n\n", prog_name); + printf("%sAvailable Options%s\n", ANSI_BOLD, ANSI_RESET); + print_opt("-b --storage_path=PATH", "specify a storage buffering path"); + print_opt("-c --config=FILE", "specify an optional configuration file"); +#ifdef FLB_HAVE_FORK + print_opt("-d, --daemon", "run Fluent Bit in background mode"); +#endif + print_opt("-D, --dry-run", "dry run"); + print_opt_i("-f, --flush=SECONDS", "flush timeout in seconds", + FLB_CONFIG_FLUSH_SECS); + print_opt("-C, --custom=CUSTOM", "enable a custom plugin"); + print_opt("-i, --input=INPUT", "set an input"); + print_opt("-F --filter=FILTER", "set a filter"); + print_opt("-m, --match=MATCH", "set plugin match, same as '-p match=abc'"); + print_opt("-o, --output=OUTPUT", "set an output"); + print_opt("-p, --prop=\"A=B\"", "set plugin configuration property"); +#ifdef FLB_HAVE_PARSER + print_opt("-R, --parser=FILE", "specify a parser configuration file"); +#endif + print_opt("-e, --plugin=FILE", "load an external plugin (shared lib)"); + print_opt("-l, --log_file=FILE", "write log info to a file"); + print_opt("-t, --tag=TAG", "set plugin tag, same as '-p tag=abc'"); +#ifdef FLB_HAVE_STREAM_PROCESSOR + print_opt("-T, --sp-task=SQL", "define a stream processor task"); +#endif + print_opt("-v, --verbose", "increase logging verbosity (default: info)"); +#ifdef FLB_TRACE + print_opt("-vv", "trace mode (available)"); +#endif +#ifdef FLB_HAVE_CHUNK_TRACE + print_opt("-Z, --enable-chunk-trace", "enable chunk tracing, it can be activated either through the http api or the command line"); + print_opt("--trace-input", "input to start tracing on startup."); + print_opt("--trace-output", "output to use for tracing on startup."); + print_opt("--trace-output-property", "set a property for output tracing on startup."); + print_opt("--trace", "setup a trace pipeline on startup. Uses a single line, ie: \"input=dummy.0 output=stdout output.format='json'\""); +#endif + print_opt("-w, --workdir", "set the working directory"); +#ifdef FLB_HAVE_HTTP_SERVER + print_opt("-H, --http", "enable monitoring HTTP server"); + print_opt_s("-P, --port", "set HTTP server TCP port", + FLB_CONFIG_HTTP_PORT); +#endif + print_opt_i("-s, --coro_stack_size", "set coroutines stack size in bytes", + config->coro_stack_size); + print_opt("-q, --quiet", "quiet mode"); + print_opt("-S, --sosreport", "support report for Enterprise customers"); + print_opt("-Y, --enable-hot-reload", "enable for hot reloading"); + print_opt("-W, --disable-thread-safety-on-hot-reloading", "disable thread safety on hot reloading"); + print_opt("-V, --version", "show version number"); + print_opt("-h, --help", "print this help"); + + printf("\n%sInputs%s\n", ANSI_BOLD, ANSI_RESET); + + /* Iterate each supported input */ + mk_list_foreach(head, &config->in_plugins) { + in = mk_list_entry(head, struct flb_input_plugin, _head); + if (strcmp(in->name, "lib") == 0 || (in->flags & FLB_INPUT_PRIVATE)) { + /* useless..., just skip it. */ + continue; + } + print_opt(in->name, in->description); + } + + printf("\n%sProcessors%s\n", ANSI_BOLD, ANSI_RESET); + mk_list_foreach(head, &config->processor_plugins) { + processor = mk_list_entry(head, struct flb_processor_plugin, _head); + print_opt(processor->name, processor->description); + } + + printf("\n%sFilters%s\n", ANSI_BOLD, ANSI_RESET); + mk_list_foreach(head, &config->filter_plugins) { + filter = mk_list_entry(head, struct flb_filter_plugin, _head); + print_opt(filter->name, filter->description); + } + + printf("\n%sOutputs%s\n", ANSI_BOLD, ANSI_RESET); + mk_list_foreach(head, &config->out_plugins) { + out = mk_list_entry(head, struct flb_output_plugin, _head); + if (strcmp(out->name, "lib") == 0 || (out->flags & FLB_OUTPUT_PRIVATE)) { + /* useless..., just skip it. */ + continue; + } + print_opt(out->name, out->description); + } + + printf("\n%sInternal%s\n", ANSI_BOLD, ANSI_RESET); + printf(" Event Loop = %s\n", mk_event_backend()); + printf(" Build Flags =%s\n", FLB_INFO_FLAGS); + exit(rc); +} + +/* + * If the description is larger than the allowed 80 chars including left + * padding, split the content in multiple lines and align it properly. + */ +static void help_plugin_description(int left_padding, flb_sds_t str) +{ + int len; + int max; + int line = 0; + char *c; + char *p; + char *end; + char fmt[32]; + + if (!str) { + printf("no description available\n"); + return; + } + + max = 90 - left_padding; + len = strlen(str); + + if (len <= max) { + printf("%s\n", str); + return; + } + + p = str; + len = flb_sds_len(str); + end = str + len; + + while (p < end) { + if ((p + max) > end) { + c = end; + } + else { + c = p + max; + while (*c != ' ' && c > p) { + c--; + } + } + + if (c == p) { + len = end - p; + } + else { + len = c - p; + } + + snprintf(fmt, sizeof(fmt) - 1, "%%*s%%.%is\n", len); + if (line == 0) { + printf(fmt, 0, "", p); + } + else { + printf(fmt, left_padding, " ", p); + } + line++; + p += len + 1; + } +} + +static flb_sds_t help_get_value(msgpack_object map, char *key) +{ + flb_sds_t k; + flb_sds_t val; + msgpack_object *o; + struct flb_ra_value *rval = NULL; + struct flb_record_accessor *ra = NULL; + + k = flb_sds_create(key); + ra = flb_ra_create(k, FLB_FALSE); + flb_sds_destroy(k); + if (!ra) { + return NULL; + } + + rval = flb_ra_get_value_object(ra, map); + if (!rval) { + flb_ra_destroy(ra); + return NULL; + } + + o = &rval->o; + val = flb_sds_create_len(o->via.str.ptr, o->via.str.size); + + flb_ra_key_value_destroy(rval); + flb_ra_destroy(ra); + + return val; +} + +static void help_print_property(int max, msgpack_object k, msgpack_object v) +{ + int i; + int len = 0; + char buf[32]; + char fmt[32]; + char fmt_prf[32]; + char def[32]; + msgpack_object map; + flb_sds_t tmp; + flb_sds_t name; + flb_sds_t type; + flb_sds_t desc; + flb_sds_t defv; + + /* Convert property type to uppercase and print it */ + for (i = 0; i < k.via.str.size; i++) { + buf[i] = toupper(k.via.str.ptr[i]); + } + buf[k.via.str.size] = '\0'; + printf(ANSI_BOLD "\n%s\n" ANSI_RESET, buf); + + snprintf(fmt, sizeof(fmt) - 1, "%%-%is", max); + snprintf(fmt_prf, sizeof(fmt_prf) - 1, "%%-%is", max); + snprintf(def, sizeof(def) - 1, "%%*s> default: %%s, type: "); + + for (i = 0; i < v.via.array.size; i++) { + map = v.via.array.ptr[i]; + + name = help_get_value(map, "$name"); + type = help_get_value(map, "$type"); + desc = help_get_value(map, "$description"); + defv = help_get_value(map, "$default"); + + if (strcmp(type, "prefix") == 0) { + len = flb_sds_len(name); + tmp = flb_sds_create_size(len + 2); + flb_sds_printf(&tmp, "%sN", name); + printf(fmt_prf, tmp); + flb_sds_destroy(tmp); + } + else { + printf(fmt, name); + } + + help_plugin_description(max, desc); + + if (defv) { + printf(def, max, " ", defv); + } + else { + printf("%*s> type: ", max, " "); + } + printf("%s", type); + printf("\n\n"); + } +} + +static void help_format_json(void *help_buf, size_t help_size) +{ + flb_sds_t json; + + json = flb_msgpack_raw_to_json_sds(help_buf, help_size); + printf("%s\n", json); + flb_sds_destroy(json); +} + +static void help_format_text(void *help_buf, size_t help_size) +{ + int i; + int x; + int max = 0; + int len = 0; + int ret; + size_t off = 0; + flb_sds_t name; + flb_sds_t type; + flb_sds_t desc; + msgpack_unpacked result; + msgpack_object map; + msgpack_object p; + msgpack_object k; + msgpack_object v; + + msgpack_unpacked_init(&result); + ret = msgpack_unpack_next(&result, help_buf, help_size, &off); + if (ret != MSGPACK_UNPACK_SUCCESS) { + return; + } + map = result.data; + + type = help_get_value(map, "$type"); + name = help_get_value(map, "$name"); + desc = help_get_value(map, "$description"); + + printf("%sHELP%s\n%s %s plugin\n", ANSI_BOLD, ANSI_RESET, + name, type); + flb_sds_destroy(type); + flb_sds_destroy(name); + + if (desc) { + printf(ANSI_BOLD "\nDESCRIPTION\n" ANSI_RESET "%s\n", desc); + flb_sds_destroy(desc); + } + + /* Properties */ + p = map.via.map.ptr[3].val; + + /* Calculate padding */ + for (i = 0; i < p.via.map.size; i++) { + v = p.via.map.ptr[i].val; + for (x = 0; x < v.via.map.size; x++) { + msgpack_object ptr = v.via.array.ptr[x]; + name = help_get_value(ptr, "$name"); + len = flb_sds_len(name); + flb_sds_destroy(name); + if (len > max) { + max = len; + } + } + } + max += 2; + + /* Iterate each section of properties */ + for (i = 0; i < p.via.map.size; i++) { + k = p.via.map.ptr[i].key; + v = p.via.map.ptr[i].val; + help_print_property(max, k, v); + } +} + +static void flb_help_plugin(int rc, int format, + struct flb_config *config, int type, + struct flb_cf *cf, + struct flb_cf_section *s) +{ + struct flb_config_map *opt = NULL; + void *help_buf; + size_t help_size; + char *name; + struct flb_custom_instance *c = NULL; + struct flb_input_instance *i = NULL; + struct flb_filter_instance *f = NULL; + struct flb_output_instance *o = NULL; + + flb_version_banner(); + + name = flb_cf_section_property_get_string(cf, s, "name"); + if (!name) { + exit(EXIT_FAILURE); + } + + if (type == PLUGIN_CUSTOM) { + c = flb_custom_new(config, name, NULL); + if (!c) { + fprintf(stderr, "invalid custom plugin '%s'", name); + return; + } + opt = c->p->config_map; + flb_help_custom(c, &help_buf, &help_size); + flb_custom_instance_destroy(c); + } + else if (type == PLUGIN_INPUT) { + i = flb_input_new(config, name, 0, FLB_TRUE); + if (!i) { + fprintf(stderr, "invalid input plugin '%s'", name); + return; + } + opt = i->p->config_map; + flb_help_input(i, &help_buf, &help_size); + flb_input_instance_destroy(i); + } + else if (type == PLUGIN_FILTER) { + f = flb_filter_new(config, name, 0); + if (!f) { + fprintf(stderr, "invalid filter plugin '%s'", name); + return; + } + opt = f->p->config_map; + flb_help_filter(f, &help_buf, &help_size); + flb_filter_instance_destroy(f); + } + else if (type == PLUGIN_OUTPUT) { + o = flb_output_new(config, name, 0, FLB_TRUE); + if (!o) { + fprintf(stderr, "invalid output plugin '%s'", name); + return; + } + opt = o->p->config_map; + flb_help_output(o, &help_buf, &help_size); + flb_output_instance_destroy(o); + } + + if (!opt) { + exit(rc); + } + + if (format == FLB_HELP_TEXT) { + help_format_text(help_buf, help_size); + } + else if (format == FLB_HELP_JSON) { + help_format_json(help_buf, help_size); + } + + flb_free(help_buf); + exit(rc); +} + +#define flb_print_signal(X) case X: \ + write (STDERR_FILENO, #X ")\n", sizeof(#X ")\n")-1); \ + break; + +static void flb_signal_handler_break_loop(int signal) +{ + exit_signal = signal; +} + +static void flb_signal_exit(int signal) +{ + int len; + char ts[32]; + char s[] = "[engine] caught signal ("; + time_t now; + struct tm *cur; + + now = time(NULL); + cur = localtime(&now); + len = snprintf(ts, sizeof(ts) - 1, "[%i/%02i/%02i %02i:%02i:%02i] ", + cur->tm_year + 1900, + cur->tm_mon + 1, + cur->tm_mday, + cur->tm_hour, + cur->tm_min, + cur->tm_sec); + + /* write signal number */ + write(STDERR_FILENO, ts, len); + write(STDERR_FILENO, s, sizeof(s) - 1); + switch (signal) { + flb_print_signal(SIGINT); +#ifndef FLB_SYSTEM_WINDOWS + flb_print_signal(SIGQUIT); + flb_print_signal(SIGHUP); + flb_print_signal(SIGCONT); +#endif + flb_print_signal(SIGTERM); + flb_print_signal(SIGSEGV); + }; +} + +static void flb_signal_handler(int signal) +{ + int len; + char ts[32]; + char s[] = "[engine] caught signal ("; + time_t now; + struct tm *cur; + flb_ctx_t *ctx = flb_context_get(); + struct flb_cf *cf_opts = flb_cf_context_get(); + + now = time(NULL); + cur = localtime(&now); + len = snprintf(ts, sizeof(ts) - 1, "[%i/%02i/%02i %02i:%02i:%02i] ", + cur->tm_year + 1900, + cur->tm_mon + 1, + cur->tm_mday, + cur->tm_hour, + cur->tm_min, + cur->tm_sec); + + /* write signal number */ + write(STDERR_FILENO, ts, len); + write(STDERR_FILENO, s, sizeof(s) - 1); + switch (signal) { + flb_print_signal(SIGINT); +#ifndef FLB_SYSTEM_WINDOWS + flb_print_signal(SIGQUIT); + flb_print_signal(SIGHUP); + flb_print_signal(SIGCONT); +#endif + flb_print_signal(SIGTERM); + flb_print_signal(SIGSEGV); + flb_print_signal(SIGFPE); + }; + + flb_signal_init(); + + switch(signal) { + case SIGSEGV: + case SIGFPE: +#ifdef FLB_HAVE_LIBBACKTRACE + /* To preserve stacktrace */ + flb_stacktrace_print(&flb_st); +#endif + abort(); +#ifndef FLB_SYSTEM_WINDOWS + case SIGCONT: + flb_dump(ctx->config); + break; + case SIGHUP: +#ifndef FLB_HAVE_STATIC_CONF + if (flb_bin_restarting == FLB_RELOAD_IDLE) { + flb_bin_restarting = FLB_RELOAD_IN_PROGRESS; + /* reload by using same config files/path */ + flb_reload(ctx, cf_opts); + flb_bin_restarting = FLB_RELOAD_IDLE; + } + else { + flb_utils_error(FLB_ERR_RELOADING_IN_PROGRESS); + } + break; +#endif +#endif + } +} + +#ifdef FLB_SYSTEM_WINDOWS +#include <ConsoleApi.h> + +static flb_ctx_t *handler_ctx = NULL; +static struct flb_cf *handler_opts = NULL; +static int handler_signal = 0; + +void flb_console_handler_set_ctx(flb_ctx_t *ctx, struct flb_cf *cf_opts) +{ + handler_ctx = ctx; + handler_opts = cf_opts; +} + +static BOOL WINAPI flb_console_handler(DWORD evType) +{ + switch(evType) { + case 1 /* CTRL_BREAK_EVENT_1 */: + if (flb_bin_restarting == FLB_RELOAD_IDLE) { + flb_bin_restarting = FLB_RELOAD_IN_PROGRESS; + /* signal the main loop to execute reload. this is necessary since + * all signal handlers in win32 are executed on their own thread. + */ + handler_signal = 1; + flb_bin_restarting = FLB_RELOAD_IDLE; + } + else { + flb_utils_error(FLB_ERR_RELOADING_IN_PROGRESS); + } + break; + } + return 1; +} +#endif + +static void flb_signal_init() +{ + signal(SIGINT, &flb_signal_handler_break_loop); +#ifndef FLB_SYSTEM_WINDOWS + signal(SIGQUIT, &flb_signal_handler_break_loop); + signal(SIGHUP, &flb_signal_handler); + signal(SIGCONT, &flb_signal_handler); +#else + /* Use SetConsoleCtrlHandler on windows to simulate SIGHUP */ + SetConsoleCtrlHandler(flb_console_handler, 1); +#endif + signal(SIGTERM, &flb_signal_handler_break_loop); + signal(SIGSEGV, &flb_signal_handler); + signal(SIGFPE, &flb_signal_handler); +} + +static int set_property(struct flb_cf *cf, struct flb_cf_section *s, char *kv) +{ + int len; + int sep; + char *key; + char *value; + struct cfl_variant *tmp; + + len = strlen(kv); + sep = mk_string_char_search(kv, '=', len); + if (sep == -1) { + return -1; + } + + key = mk_string_copy_substr(kv, 0, sep); + value = kv + sep + 1; + + if (!key) { + return -1; + } + + tmp = flb_cf_section_property_add(cf, s->properties, key, 0, value, 0); + if (tmp == NULL) { + fprintf(stderr, "[error] setting up section '%s' plugin property '%s'\n", + s->name, key); + } + mk_mem_free(key); + return 0; +} + +static int flb_service_conf_path_set(struct flb_config *config, char *file) +{ + char *end; + char *path; + + path = realpath(file, NULL); + if (!path) { + return -1; + } + + /* lookup path ending and truncate */ + end = strrchr(path, FLB_DIRCHAR); + if (!end) { + free(path); + return -1; + } + + end++; + *end = '\0'; + config->conf_path = flb_strdup(path); + free(path); + + /* Store the relative file path */ + config->conf_path_file = flb_sds_create(file); + + return 0; +} + + +static struct flb_cf *service_configure(struct flb_cf *cf, + struct flb_config *config, char *file) +{ + int ret = -1; + +#ifdef FLB_HAVE_STATIC_CONF + cf = flb_config_static_open(file); +#else + if (file) { + cf = flb_cf_create_from_file(cf, file); + } +#endif + + if (!cf) { + return NULL; + } + + + /* Set configuration root path */ + if (file) { + flb_service_conf_path_set(config, file); + } + + ret = flb_config_load_config_format(config, cf); + if (ret != 0) { + return NULL; + } + + config->cf_main = cf; + return cf; +} + +#ifdef FLB_HAVE_CHUNK_TRACE +static struct flb_input_instance *find_input(flb_ctx_t *ctx, const char *name) +{ + struct mk_list *head; + struct flb_input_instance *in; + + + mk_list_foreach(head, &ctx->config->inputs) { + in = mk_list_entry(head, struct flb_input_instance, _head); + if (strcmp(name, in->name) == 0) { + return in; + } + if (in->alias) { + if (strcmp(name, in->alias) == 0) { + return in; + } + } + } + return NULL; +} + +static int enable_trace_input(flb_ctx_t *ctx, const char *name, const char *prefix, const char *output_name, struct mk_list *props) +{ + struct flb_input_instance *in; + + + in = find_input(ctx, name); + if (in == NULL) { + return FLB_ERROR; + } + + flb_chunk_trace_context_new(in, output_name, prefix, NULL, props); + return (in->chunk_trace_ctxt == NULL ? FLB_ERROR : FLB_OK); +} + +static int disable_trace_input(flb_ctx_t *ctx, const char *name) +{ + struct flb_input_instance *in; + + + in = find_input(ctx, name); + if (in == NULL) { + return FLB_ERROR; + } + + if (in->chunk_trace_ctxt != NULL) { + flb_chunk_trace_context_destroy(in); + } + return FLB_OK; +} + +static int set_trace_property(struct mk_list *props, char *kv) +{ + int len; + int sep; + char *key; + char *value; + + len = strlen(kv); + sep = mk_string_char_search(kv, '=', len); + if (sep == -1) { + return -1; + } + + key = mk_string_copy_substr(kv, 0, sep); + value = kv + sep + 1; + + if (!key) { + return -1; + } + + flb_kv_item_create_len(props, + (char *)key, strlen(key), + (char *)value, strlen(value)); + + mk_mem_free(key); + return 0; +} + +static int parse_trace_pipeline_prop(flb_ctx_t *ctx, const char *kv, char **key, char **value) +{ + int len; + int sep; + + len = strlen(kv); + sep = mk_string_char_search(kv, '=', len); + if (sep == -1) { + return FLB_ERROR; + } + + *key = mk_string_copy_substr(kv, 0, sep); + if (!key) { + return FLB_ERROR; + } + + *value = flb_strdup(kv + sep + 1); + return FLB_OK; +} + +static int parse_trace_pipeline(flb_ctx_t *ctx, const char *pipeline, char **trace_input, char **trace_output, struct mk_list **props) +{ + struct mk_list *parts = NULL; + struct mk_list *cur; + struct flb_split_entry *part; + char *key; + char *value; + const char *propname; + const char *propval; + + + parts = flb_utils_split(pipeline, (int)' ', 0); + if (parts == NULL) { + return FLB_ERROR; + } + + mk_list_foreach(cur, parts) { + key = NULL; + value = NULL; + + part = mk_list_entry(cur, struct flb_split_entry, _head); + + if (parse_trace_pipeline_prop(ctx, part->value, &key, &value) == FLB_ERROR) { + return FLB_ERROR; + } + + if (strcmp(key, "input") == 0) { + if (*trace_input != NULL) { + flb_free(*trace_input); + } + *trace_input = flb_strdup(value); + } + else if (strcmp(key, "output") == 0) { + if (*trace_output != NULL) { + flb_free(*trace_output); + } + *trace_output = flb_strdup(value); + } + else if (strncmp(key, "output.", strlen("output.")) == 0) { + propname = mk_string_copy_substr(key, strlen("output."), strlen(key)); + if (propname == NULL) { + return FLB_ERROR; + } + + propval = flb_strdup(value); + if (propval == NULL) { + return FLB_ERROR; + } + + if (*props == NULL) { + *props = flb_calloc(1, sizeof(struct mk_list)); + flb_kv_init(*props); + } + + flb_kv_item_create_len(*props, + (char *)propname, strlen(propname), + (char *)propval, strlen(propval)); + } + + if (key != NULL) { + mk_mem_free(key); + } + + if (value != NULL) { + flb_free(value); + } + } + + flb_utils_split_free(parts); + return FLB_OK; +} +#endif + +int flb_main(int argc, char **argv) +{ + int opt; + int ret; + flb_sds_t json; + + /* handle plugin properties: -1 = none, 0 = input, 1 = output */ + int last_plugin = -1; + + /* local variables to handle config options */ + char *cfg_file = NULL; + + /* config format context */ + struct flb_cf *cf; + struct flb_cf *tmp; + struct flb_cf_section *service; + struct flb_cf_section *s; + struct flb_cf_section *section; + struct flb_cf *cf_opts; + + prog_name = argv[0]; + + cf_opts = flb_cf_create(); + if (!cf_opts) { + exit(EXIT_FAILURE); + } + section = flb_cf_section_create(cf_opts, "service", 0); + if (!section) { + flb_cf_destroy(cf_opts); + exit(EXIT_FAILURE); + } + +#ifdef FLB_HAVE_LIBBACKTRACE + flb_stacktrace_init(argv[0], &flb_st); +#endif + +#ifdef FLB_HAVE_CHUNK_TRACE + char *trace_input = NULL; + char *trace_output = flb_strdup("stdout"); + struct mk_list *trace_props = NULL; +#endif + + /* Setup long-options */ + static const struct option long_opts[] = { + { "storage_path", required_argument, NULL, 'b' }, + { "config", required_argument, NULL, 'c' }, +#ifdef FLB_HAVE_FORK + { "daemon", no_argument , NULL, 'd' }, +#endif + { "dry-run", no_argument , NULL, 'D' }, + { "flush", required_argument, NULL, 'f' }, + { "http", no_argument , NULL, 'H' }, + { "log_file", required_argument, NULL, 'l' }, + { "port", required_argument, NULL, 'P' }, + { "custom", required_argument, NULL, 'C' }, + { "input", required_argument, NULL, 'i' }, + { "match", required_argument, NULL, 'm' }, + { "output", required_argument, NULL, 'o' }, + { "filter", required_argument, NULL, 'F' }, +#ifdef FLB_HAVE_PARSER + { "parser", required_argument, NULL, 'R' }, +#endif + { "prop", required_argument, NULL, 'p' }, + { "plugin", required_argument, NULL, 'e' }, + { "tag", required_argument, NULL, 't' }, +#ifdef FLB_HAVE_STREAM_PROCESSOR + { "sp-task", required_argument, NULL, 'T' }, +#endif + { "version", no_argument , NULL, 'V' }, + { "verbose", no_argument , NULL, 'v' }, + { "workdir", required_argument, NULL, 'w' }, + { "quiet", no_argument , NULL, 'q' }, + { "help", no_argument , NULL, 'h' }, + { "help-json", no_argument , NULL, 'J' }, + { "coro_stack_size", required_argument, NULL, 's' }, + { "sosreport", no_argument , NULL, 'S' }, +#ifdef FLB_HAVE_HTTP_SERVER + { "http_server", no_argument , NULL, 'H' }, + { "http_listen", required_argument, NULL, 'L' }, + { "http_port", required_argument, NULL, 'P' }, +#endif + { "enable-hot-reload", no_argument, NULL, 'Y' }, +#ifdef FLB_HAVE_CHUNK_TRACE + { "enable-chunk-trace", no_argument, NULL, 'Z' }, + { "trace", required_argument, NULL, FLB_LONG_TRACE }, + { "trace-input", required_argument, NULL, FLB_LONG_TRACE_INPUT }, + { "trace-output", required_argument, NULL, FLB_LONG_TRACE_OUTPUT }, + { "trace-output-property", required_argument, NULL, FLB_LONG_TRACE_OUTPUT_PROPERTY }, +#endif + { "disable-thread-safety-on-hot-reload", no_argument, NULL, 'W' }, + { NULL, 0, NULL, 0 } + }; + + /* Signal handler */ + flb_signal_init(); + + /* Initialize Monkey Core library */ + mk_core_init(); + + /* Create Fluent Bit context */ + ctx = flb_create(); + if (!ctx) { + exit(EXIT_FAILURE); + } + config = ctx->config; + cf = config->cf_main; + service = cf_opts->service; + +#ifdef FLB_SYSTEM_WINDOWS + flb_console_handler_set_ctx(ctx, cf_opts); +#endif + + /* Add reference for cf_opts */ + config->cf_opts = cf_opts; + +#ifndef FLB_HAVE_STATIC_CONF + + /* Parse the command line options */ + while ((opt = getopt_long(argc, argv, + "b:c:dDf:C:i:m:o:R:F:p:e:" + "t:T:l:vw:qVhJL:HP:s:SWYZ", + long_opts, NULL)) != -1) { + + switch (opt) { + case 'b': + flb_cf_section_property_add(cf_opts, service->properties, + "storage.path", 0, optarg, 0); + break; + case 'c': + cfg_file = flb_strdup(optarg); + break; +#ifdef FLB_HAVE_FORK + case 'd': + flb_cf_section_property_add(cf_opts, service->properties, + "daemon", 0, "on", 0); + config->daemon = FLB_TRUE; + break; +#endif + case 'D': + config->dry_run = FLB_TRUE; + break; + case 'e': + ret = flb_plugin_load_router(optarg, config); + if (ret == -1) { + exit(EXIT_FAILURE); + } + /* Store the relative file path for external plugin */ + flb_slist_add(&config->external_plugins, optarg); + break; + case 'f': + flb_cf_section_property_add(cf_opts, service->properties, + "flush", 0, optarg, 0); + break; + case 'C': + s = flb_cf_section_create(cf_opts, "custom", 0); + if (!s) { + flb_utils_error(FLB_ERR_CUSTOM_INVALID); + } + flb_cf_section_property_add(cf_opts, s->properties, "name", 0, optarg, 0); + last_plugin = PLUGIN_CUSTOM; + break; + case 'i': + s = flb_cf_section_create(cf_opts, "input", 0); + if (!s) { + flb_utils_error(FLB_ERR_INPUT_INVALID); + } + flb_cf_section_property_add(cf_opts, s->properties, "name", 0, optarg, 0); + last_plugin = PLUGIN_INPUT; + break; + case 'm': + if (last_plugin == PLUGIN_FILTER || last_plugin == PLUGIN_OUTPUT) { + flb_cf_section_property_add(cf_opts, s->properties, "match", 0, optarg, 0); + } + break; + case 'o': + s = flb_cf_section_create(cf_opts, "output", 0); + if (!s) { + flb_utils_error(FLB_ERR_OUTPUT_INVALID); + } + flb_cf_section_property_add(cf_opts, s->properties, "name", 0, optarg, 0); + last_plugin = PLUGIN_OUTPUT; + break; +#ifdef FLB_HAVE_PARSER + case 'R': + ret = flb_parser_conf_file_stat(optarg, config); + if (ret == -1) { + flb_cf_destroy(cf_opts); + flb_destroy(ctx); + exit(EXIT_FAILURE); + } + flb_cf_section_property_add(cf_opts, service->properties, FLB_CONF_STR_PARSERS_FILE, 0, optarg, 0); + break; +#endif + case 'F': + s = flb_cf_section_create(cf_opts, "filter", 0); + if (!s) { + flb_utils_error(FLB_ERR_FILTER_INVALID); + } + flb_cf_section_property_add(cf_opts, s->properties, "name", 0, optarg, 0); + last_plugin = PLUGIN_FILTER; + break; + case 'l': + flb_cf_section_property_add(cf_opts, service->properties, + "log_file", 0, optarg, 0); + break; + case 'p': + if (s) { + set_property(cf_opts, s, optarg); + } + break; + case 't': + if (s) { + flb_cf_section_property_add(cf_opts, s->properties, "tag", 0, optarg, 0); + } + break; +#ifdef FLB_HAVE_STREAM_PROCESSOR + case 'T': + flb_slist_add(&config->stream_processor_tasks, optarg); + break; +#endif + case 'h': + if (last_plugin == -1) { + flb_help(EXIT_SUCCESS, config); + } + else { + flb_help_plugin(EXIT_SUCCESS, FLB_HELP_TEXT, + config, + last_plugin, cf_opts, s); + } + break; + case 'J': + if (last_plugin == -1) { + json = flb_help_build_json_schema(config); + if (!json) { + exit(EXIT_FAILURE); + } + + printf("%s\n", json); + flb_sds_destroy(json); + exit(EXIT_SUCCESS); + } + else { + flb_help_plugin(EXIT_SUCCESS, FLB_HELP_JSON, config, + last_plugin, cf_opts, s); + } + break; +#ifdef FLB_HAVE_HTTP_SERVER + case 'H': + flb_cf_section_property_add(cf_opts, service->properties, "http_server", 0, "on", 0); + break; + case 'L': + flb_cf_section_property_add(cf_opts, service->properties, FLB_CONF_STR_HTTP_LISTEN, 0, optarg, 0); + break; + case 'P': + flb_cf_section_property_add(cf_opts, service->properties, FLB_CONF_STR_HTTP_PORT, 0, optarg, 0); + break; +#endif + case 'V': + flb_version(); + exit(EXIT_SUCCESS); + case 'v': + config->verbose++; + break; + case 'w': + config->workdir = flb_strdup(optarg); + break; + case 'q': + config->verbose = FLB_LOG_OFF; + break; + case 's': + flb_cf_section_property_add(cf_opts, service->properties, FLB_CONF_STR_CORO_STACK_SIZE, 0, optarg, 0); + break; + case 'S': + config->support_mode = FLB_TRUE; + break; + case 'Y': + flb_cf_section_property_add(cf_opts, service->properties, FLB_CONF_STR_HOT_RELOAD, 0, "on", 0); + break; + case 'W': + flb_cf_section_property_add(cf_opts, service->properties, + FLB_CONF_STR_HOT_RELOAD_ENSURE_THREAD_SAFETY, 0, "off", 0); + break; +#ifdef FLB_HAVE_CHUNK_TRACE + case 'Z': + flb_cf_section_property_add(cf_opts, service->properties, FLB_CONF_STR_ENABLE_CHUNK_TRACE, 0, "on", 0); + break; + case FLB_LONG_TRACE: + parse_trace_pipeline(ctx, optarg, &trace_input, &trace_output, &trace_props); + break; + case FLB_LONG_TRACE_INPUT: + if (trace_input != NULL) { + flb_free(trace_input); + } + trace_input = flb_strdup(optarg); + break; + case FLB_LONG_TRACE_OUTPUT: + if (trace_output != NULL) { + flb_free(trace_output); + } + trace_output = flb_strdup(optarg); + break; + case FLB_LONG_TRACE_OUTPUT_PROPERTY: + if (trace_props == NULL) { + trace_props = flb_calloc(1, sizeof(struct mk_list)); + flb_kv_init(trace_props); + } + set_trace_property(trace_props, optarg); + break; +#endif /* FLB_HAVE_CHUNK_TRACE */ + default: + flb_help(EXIT_FAILURE, config); + } + } +#endif /* !FLB_HAVE_STATIC_CONF */ + + set_log_level_from_env(config); + + if (config->verbose != FLB_LOG_OFF) { + flb_version_banner(); + } + + /* Program name */ + flb_config_set_program_name(config, argv[0]); + + /* Set the current directory */ + if (config->workdir) { + ret = chdir(config->workdir); + if (ret == -1) { + flb_cf_destroy(cf_opts); + flb_errno(); + return -1; + } + } + + /* Validate config file */ +#ifndef FLB_HAVE_STATIC_CONF + if (cfg_file) { + if (access(cfg_file, R_OK) != 0) { + flb_free(cfg_file); + flb_cf_destroy(cf_opts); + flb_utils_error(FLB_ERR_CFG_FILE); + } + } + + if (flb_reload_reconstruct_cf(cf_opts, cf) != 0) { + flb_free(cfg_file); + flb_cf_destroy(cf_opts); + fprintf(stderr, "reconstruct format context is failed\n"); + exit(EXIT_FAILURE); + } + + /* Load the service configuration file */ + tmp = service_configure(cf, config, cfg_file); + flb_free(cfg_file); + if (!tmp) { + flb_cf_destroy(cf_opts); + flb_utils_error(FLB_ERR_CFG_FILE_STOP); + } +#else + tmp = service_configure(cf, config, "fluent-bit.conf"); + if (!tmp) { + flb_cf_destroy(cf_opts); + flb_utils_error(FLB_ERR_CFG_FILE_STOP); + } + + /* destroy previous context and override */ + flb_cf_destroy(cf); + config->cf_main = tmp; + cf = tmp; +#endif + + /* Check co-routine stack size */ + if (config->coro_stack_size < getpagesize()) { + flb_cf_destroy(cf_opts); + flb_utils_error(FLB_ERR_CORO_STACK_SIZE); + } + + /* Validate flush time (seconds) */ + if (config->flush <= (double) 0.0) { + flb_cf_destroy(cf_opts); + flb_utils_error(FLB_ERR_CFG_FLUSH); + } + + /* debug or trace */ + if (config->verbose >= FLB_LOG_DEBUG) { + flb_utils_print_setup(config); + } + +#ifdef FLB_HAVE_FORK + /* Run in background/daemon mode */ + if (config->daemon == FLB_TRUE) { + flb_utils_set_daemon(config); + } +#endif + +#ifdef FLB_SYSTEM_WINDOWS + win32_started(); +#endif + + if (config->dry_run == FLB_TRUE) { + fprintf(stderr, "configuration test is successful\n"); + flb_cf_destroy(cf_opts); + flb_destroy(ctx); + exit(EXIT_SUCCESS); + } + + /* start Fluent Bit library */ + ret = flb_start(ctx); + if (ret != 0) { + flb_cf_destroy(cf_opts); + flb_destroy(ctx); + return ret; + } + + /* Store the current config format context from command line */ + flb_cf_context_set(cf_opts); + + /* + * Always re-set the original context that was started, note that during a flb_start() a 'reload' could happen so the context + * will be different. Use flb_context_get() to get the current context. + */ + ctx = flb_context_get(); + +#ifdef FLB_HAVE_CHUNK_TRACE + if (trace_input != NULL) { + enable_trace_input(ctx, trace_input, NULL /* prefix ... */, trace_output, trace_props); + } +#endif + + while (ctx->status == FLB_LIB_OK && exit_signal == 0) { + sleep(1); + +#ifdef FLB_SYSTEM_WINDOWS + if (handler_signal == 1) { + handler_signal = 0; + flb_reload(ctx, cf_opts); + } +#endif + + /* set the context again before checking the status again */ + ctx = flb_context_get(); + +#ifdef FLB_SYSTEM_WINDOWS + flb_console_handler_set_ctx(ctx, cf_opts); +#endif + } + + if (exit_signal) { + flb_signal_exit(exit_signal); + } + ret = config->exit_status_code; + + cf_opts = flb_cf_context_get(); + + if (cf_opts != NULL) { + flb_cf_destroy(cf_opts); + } + +#ifdef FLB_HAVE_CHUNK_TRACE + if (trace_input != NULL) { + disable_trace_input(ctx, trace_input); + flb_free(trace_input); + } + if (trace_output) { + flb_free(trace_output); + } + if (trace_props != NULL) { + flb_kv_release(trace_props); + flb_free(trace_props); + } +#endif + + flb_stop(ctx); + flb_destroy(ctx); + + return ret; +} + +int main(int argc, char **argv) +{ +#ifdef FLB_SYSTEM_WINDOWS + return win32_main(argc, argv); +#else + return flb_main(argc, argv); +#endif +} |