From 5da14042f70711ea5cf66e034699730335462f66 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 5 May 2024 14:08:03 +0200 Subject: Merging upstream version 1.45.3+dfsg. Signed-off-by: Daniel Baumann --- .../plugins/in_docker_events/CMakeLists.txt | 5 + .../plugins/in_docker_events/docker_events.c | 476 +++++++++++++++++++++ .../plugins/in_docker_events/docker_events.h | 56 +++ .../in_docker_events/docker_events_config.c | 106 +++++ .../in_docker_events/docker_events_config.h | 29 ++ 5 files changed, 672 insertions(+) create mode 100644 src/fluent-bit/plugins/in_docker_events/CMakeLists.txt create mode 100644 src/fluent-bit/plugins/in_docker_events/docker_events.c create mode 100644 src/fluent-bit/plugins/in_docker_events/docker_events.h create mode 100644 src/fluent-bit/plugins/in_docker_events/docker_events_config.c create mode 100644 src/fluent-bit/plugins/in_docker_events/docker_events_config.h (limited to 'src/fluent-bit/plugins/in_docker_events') diff --git a/src/fluent-bit/plugins/in_docker_events/CMakeLists.txt b/src/fluent-bit/plugins/in_docker_events/CMakeLists.txt new file mode 100644 index 000000000..dee7c0f27 --- /dev/null +++ b/src/fluent-bit/plugins/in_docker_events/CMakeLists.txt @@ -0,0 +1,5 @@ +set(src + docker_events.c + docker_events_config.c) + +FLB_PLUGIN(in_docker_events "${src}" "") diff --git a/src/fluent-bit/plugins/in_docker_events/docker_events.c b/src/fluent-bit/plugins/in_docker_events/docker_events.c new file mode 100644 index 000000000..7534eb1d6 --- /dev/null +++ b/src/fluent-bit/plugins/in_docker_events/docker_events.c @@ -0,0 +1,476 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "docker_events.h" +#include "docker_events_config.h" + + +/** + * Creates the connection to docker's unix socket and sends the + * HTTP GET /events + * + * @param ctx Pointer to flb_in_de_config + * + * @return int 0 on success, -1 on failure + */ +static int de_unix_create(struct flb_in_de_config *ctx) +{ + ssize_t bytes; + unsigned long len; + size_t address_length; + struct sockaddr_un address; + char request[512]; + + ctx->fd = flb_net_socket_create(AF_UNIX, FLB_FALSE); + if (ctx->fd == -1) { + return -1; + } + + /* Prepare the unix socket path */ + len = strlen(ctx->unix_path); + address.sun_family = AF_UNIX; + sprintf(address.sun_path, "%s", ctx->unix_path); + address_length = sizeof(address.sun_family) + len + 1; + if (connect(ctx->fd, (struct sockaddr *)&address, address_length) == -1) { + flb_errno(); + close(ctx->fd); + return -1; + } + + strcpy(request, "GET /events HTTP/1.0\r\n\r\n"); + flb_plg_trace(ctx->ins, "writing to socket %s", request); + write(ctx->fd, request, strlen(request)); + + /* Read the initial http response */ + bytes = read(ctx->fd, ctx->buf, ctx->buf_size - 1); + if (bytes == -1) { + flb_errno(); + } + flb_plg_debug(ctx->ins, "read %zu bytes from socket", bytes); + + return 0; +} + +static int in_de_collect(struct flb_input_instance *ins, + struct flb_config *config, void *in_context); + +static int reconnect_docker_sock(struct flb_input_instance *ins, + struct flb_config *config, + struct flb_in_de_config *ctx) +{ + int ret; + + /* remove old socket collector */ + if (ctx->coll_id >= 0) { + ret = flb_input_collector_delete(ctx->coll_id, ins); + if (ret < 0) { + flb_plg_error(ctx->ins, "failed to pause event"); + return -1; + } + ctx->coll_id = -1; + } + if (ctx->fd > 0) { + flb_plg_debug(ctx->ins, "close socket fd=%d", ctx->fd); + close(ctx->fd); + ctx->fd = -1; + } + + /* create socket again */ + if (de_unix_create(ctx) < 0) { + flb_plg_error(ctx->ins, "failed to re-initialize socket"); + if (ctx->fd > 0) { + flb_plg_debug(ctx->ins, "close socket fd=%d", ctx->fd); + close(ctx->fd); + ctx->fd = -1; + } + return -1; + } + /* set event */ + ctx->coll_id = flb_input_set_collector_event(ins, + in_de_collect, + ctx->fd, config); + if (ctx->coll_id < 0) { + flb_plg_error(ctx->ins, + "could not set collector for IN_DOCKER_EVENTS plugin"); + close(ctx->fd); + ctx->fd = -1; + return -1; + } + ret = flb_input_collector_start(ctx->coll_id, ins); + if (ret < 0) { + flb_plg_error(ctx->ins, + "could not start collector for IN_DOCKER_EVENTS plugin"); + flb_input_collector_delete(ctx->coll_id, ins); + close(ctx->fd); + ctx->coll_id = -1; + ctx->fd = -1; + return -1; + } + + flb_plg_info(ctx->ins, "Reconnect successful"); + return 0; +} + +static int cb_reconnect(struct flb_input_instance *ins, + struct flb_config *config, + void *in_context) +{ + struct flb_in_de_config *ctx = in_context; + int ret; + + flb_plg_info(ctx->ins, "Retry(%d/%d)", + ctx->current_retries, ctx->reconnect_retry_limits); + ret = reconnect_docker_sock(ins, config, ctx); + if (ret < 0) { + /* Failed to reconnect */ + ctx->current_retries++; + if (ctx->current_retries > ctx->reconnect_retry_limits) { + /* give up */ + flb_plg_error(ctx->ins, "Failed to retry. Giving up..."); + goto cb_reconnect_end; + } + flb_plg_info(ctx->ins, "Failed. Waiting for next retry.."); + return 0; + } + + cb_reconnect_end: + if(flb_input_collector_delete(ctx->retry_coll_id, ins) < 0) { + flb_plg_error(ctx->ins, "failed to delete timer event"); + } + ctx->current_retries = 0; + ctx->retry_coll_id = -1; + return ret; +} + +static int create_reconnect_event(struct flb_input_instance *ins, + struct flb_config *config, + struct flb_in_de_config *ctx) +{ + int ret; + + if (ctx->retry_coll_id >= 0) { + flb_plg_debug(ctx->ins, "already retring ?"); + return 0; + } + + /* try before creating event to stop incoming event */ + ret = reconnect_docker_sock(ins, config, ctx); + if (ret == 0) { + return 0; + } + + ctx->current_retries = 1; + ctx->retry_coll_id = flb_input_set_collector_time(ins, + cb_reconnect, + ctx->reconnect_retry_interval, + 0, + config); + if (ctx->retry_coll_id < 0) { + flb_plg_error(ctx->ins, "failed to create timer event"); + return -1; + } + ret = flb_input_collector_start(ctx->retry_coll_id, ins); + if (ret < 0) { + flb_plg_error(ctx->ins, "failed to start timer event"); + flb_input_collector_delete(ctx->retry_coll_id, ins); + ctx->retry_coll_id = -1; + return -1; + } + flb_plg_info(ctx->ins, "create reconnect event. interval=%d second", + ctx->reconnect_retry_interval); + + return 0; +} + +static int is_recoverable_error(int error) +{ + /* ENOTTY: + It reports on Docker in Docker mode. + https://github.com/fluent/fluent-bit/issues/3439#issuecomment-831424674 + */ + if (error == ENOTTY || error == EBADF) { + return FLB_TRUE; + } + return FLB_FALSE; +} + + +/** + * Callback function to process events recieved on the unix + * socket. + * + * @param ins Pointer to flb_input_instance + * @param config Pointer to flb_config + * @param in_context void Pointer used to cast to + * flb_in_de_config + * + * @return int Always returns success + */ +static int in_de_collect(struct flb_input_instance *ins, + struct flb_config *config, void *in_context) +{ + int ret = 0; + int error; + size_t str_len = 0; + struct flb_in_de_config *ctx = in_context; + + /* variables for parser */ + int parser_ret = -1; + void *out_buf = NULL; + size_t out_size = 0; + struct flb_time out_time; + + ret = read(ctx->fd, ctx->buf, ctx->buf_size - 1); + + if (ret > 0) { + str_len = ret; + ctx->buf[str_len] = '\0'; + + ret = flb_log_event_encoder_begin_record(&ctx->log_encoder); + + if (!ctx->parser) { + /* Initialize local msgpack buffer */ + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_set_current_timestamp( + &ctx->log_encoder); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_values( + &ctx->log_encoder, + FLB_LOG_EVENT_CSTRING_VALUE(ctx->key), + FLB_LOG_EVENT_STRING_VALUE(ctx->buf, str_len)); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_commit_record(&ctx->log_encoder); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + flb_input_log_append(ins, NULL, 0, + ctx->log_encoder.output_buffer, + ctx->log_encoder.output_length); + + } + else { + flb_plg_error(ctx->ins, "Error encoding record : %d", ret); + } + } + else { + flb_time_get(&out_time); + + parser_ret = flb_parser_do(ctx->parser, ctx->buf, str_len - 1, + &out_buf, &out_size, &out_time); + if (parser_ret >= 0) { + if (flb_time_to_nanosec(&out_time) == 0L) { + flb_time_get(&out_time); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_set_timestamp( + &ctx->log_encoder, + &out_time); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_set_body_from_raw_msgpack( + &ctx->log_encoder, + out_buf, + out_size); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_commit_record(&ctx->log_encoder); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + flb_input_log_append(ins, NULL, 0, + ctx->log_encoder.output_buffer, + ctx->log_encoder.output_length); + + } + else { + flb_plg_error(ctx->ins, "Error encoding record : %d", ret); + } + + + flb_free(out_buf); + } + else { + flb_plg_trace(ctx->ins, "tried to parse: %s", ctx->buf); + flb_plg_trace(ctx->ins, "buf_size %zu", ctx->buf_size); + flb_plg_error(ctx->ins, "parser returned an error: %d", + parser_ret); + } + } + + flb_log_event_encoder_reset(&ctx->log_encoder); + } + else if (ret == 0) { + /* EOF */ + + /* docker service may be restarted */ + flb_plg_info(ctx->ins, "EOF detected. Re-initialize"); + if (ctx->reconnect_retry_limits > 0) { + ret = create_reconnect_event(ins, config, ctx); + if (ret < 0) { + return ret; + } + } + } + else { + error = errno; + flb_plg_error(ctx->ins, "read returned error: %d, %s", error, + strerror(error)); + if (is_recoverable_error(error)) { + if (ctx->reconnect_retry_limits > 0) { + ret = create_reconnect_event(ins, config, ctx); + if (ret < 0) { + return ret; + } + } + } + } + + return 0; +} + +/** + * Callback function to initialize docker events plugin + * + * @param ins Pointer to flb_input_instance + * @param config Pointer to flb_config + * @param data Unused + * + * @return int 0 on success, -1 on failure + */ +static int in_de_init(struct flb_input_instance *ins, + struct flb_config *config, void *data) +{ + struct flb_in_de_config *ctx = NULL; + (void) data; + + /* Allocate space for the configuration */ + ctx = de_config_init(ins, config); + if (!ctx) { + return -1; + } + ctx->ins = ins; + ctx->retry_coll_id = -1; + ctx->current_retries = 0; + + /* Set the context */ + flb_input_set_context(ins, ctx); + + if (de_unix_create(ctx) != 0) { + flb_plg_error(ctx->ins, "could not listen on unix://%s", + ctx->unix_path); + de_config_destroy(ctx); + return -1; + } + + ctx->coll_id = flb_input_set_collector_event(ins, in_de_collect, + ctx->fd, config); + if(ctx->coll_id < 0){ + flb_plg_error(ctx->ins, + "could not set collector for IN_DOCKER_EVENTS plugin"); + de_config_destroy(ctx); + return -1; + } + + flb_plg_info(ctx->ins, "listening for events on %s", ctx->unix_path); + return 0; +} + +/** + * Callback exit function to cleanup plugin + * + * @param data Pointer cast to flb_in_de_config + * @param config Unused + * + * @return int Always returns 0 + */ +static int in_de_exit(void *data, struct flb_config *config) +{ + (void) config; + struct flb_in_de_config *ctx = data; + + if (!ctx) { + return 0; + } + + de_config_destroy(ctx); + + return 0; +} + +/* Configuration properties map */ +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_STR, "unix_path", DEFAULT_UNIX_SOCKET_PATH, + 0, FLB_TRUE, offsetof(struct flb_in_de_config, unix_path), + "Define Docker unix socket path to read events" + }, + { + FLB_CONFIG_MAP_SIZE, "buffer_size", "8k", + 0, FLB_TRUE, offsetof(struct flb_in_de_config, buf_size), + "Set buffer size to read events" + }, + { + FLB_CONFIG_MAP_STR, "parser", NULL, + 0, FLB_FALSE, 0, + "Optional parser for records, if not set, records are packages under 'key'" + }, + { + FLB_CONFIG_MAP_STR, "key", DEFAULT_FIELD_NAME, + 0, FLB_TRUE, offsetof(struct flb_in_de_config, key), + "Set the key name to store unparsed Docker events" + }, + { + FLB_CONFIG_MAP_INT, "reconnect.retry_limits", "5", + 0, FLB_TRUE, offsetof(struct flb_in_de_config, reconnect_retry_limits), + "Maximum number to retry to connect docker socket" + }, + { + FLB_CONFIG_MAP_INT, "reconnect.retry_interval", "1", + 0, FLB_TRUE, offsetof(struct flb_in_de_config, reconnect_retry_interval), + "Retry interval to connect docker socket" + }, + /* EOF */ + {0} +}; + +/* Plugin reference */ +struct flb_input_plugin in_docker_events_plugin = { + .name = "docker_events", + .description = "Docker events", + .cb_init = in_de_init, + .cb_pre_run = NULL, + .cb_collect = in_de_collect, + .cb_flush_buf = NULL, + .cb_exit = in_de_exit, + .config_map = config_map, + .flags = FLB_INPUT_NET +}; diff --git a/src/fluent-bit/plugins/in_docker_events/docker_events.h b/src/fluent-bit/plugins/in_docker_events/docker_events.h new file mode 100644 index 000000000..dc659d5ec --- /dev/null +++ b/src/fluent-bit/plugins/in_docker_events/docker_events.h @@ -0,0 +1,56 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_DE_H +#define FLB_IN_DE_H + +#include +#include +#include +#include + +#define DEFAULT_BUF_SIZE 8192 +#define MIN_BUF_SIZE 2048 +#define DEFAULT_FIELD_NAME "message" +#define DEFAULT_UNIX_SOCKET_PATH "/var/run/docker.sock" + +struct flb_in_de_config +{ + int fd; /* File descriptor */ + int coll_id; /* collector id */ + flb_sds_t unix_path; /* Unix path for socket */ + char *buf; + size_t buf_size; + flb_sds_t key; + + /* retries */ + int reconnect_retry_limits; + int reconnect_retry_interval; + + /* retries (internal) */ + int current_retries; + int retry_coll_id; + + struct flb_parser *parser; + struct flb_log_event_encoder log_encoder; + struct flb_input_instance *ins; /* Input plugin instace */ + +}; + +#endif diff --git a/src/fluent-bit/plugins/in_docker_events/docker_events_config.c b/src/fluent-bit/plugins/in_docker_events/docker_events_config.c new file mode 100644 index 000000000..8290686c1 --- /dev/null +++ b/src/fluent-bit/plugins/in_docker_events/docker_events_config.c @@ -0,0 +1,106 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "docker_events.h" +#include "docker_events_config.h" + +/** + * Function to initialize docker_events plugin. + * + * @param ins Pointer to flb_input_instance + * @param config Pointer to flb_config + * + * @return struct flb_in_de_config* Pointer to the plugin's + * structure on success, NULL on failure. + */ +struct flb_in_de_config *de_config_init(struct flb_input_instance *ins, + struct flb_config *config) +{ + int ret; + const char *tmp; + struct flb_in_de_config *ctx; + + ctx = flb_calloc(1, sizeof(struct flb_in_de_config)); + if (!ctx) { + flb_errno(); + return NULL; + } + ctx->ins = ins; + + /* Load the config map */ + ret = flb_input_config_map_set(ins, (void *) ctx); + if (ret == -1) { + flb_free(ctx); + return NULL; + } + + /* Allocate buffer for events */ + ctx->buf = flb_malloc(ctx->buf_size); + if (!ctx->buf) { + flb_errno(); + flb_free(ctx); + return NULL; + } + + tmp = flb_input_get_property("parser", ins); + if (tmp) { + ctx->parser = flb_parser_get(tmp, config); + if (ctx->parser == NULL) { + flb_plg_error(ctx->ins, "requested parser '%s' not found", tmp); + flb_free(ctx->buf); + flb_free(ctx); + return NULL; + } + } + + ret = flb_log_event_encoder_init(&ctx->log_encoder, + FLB_LOG_EVENT_FORMAT_DEFAULT); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_error(ctx->ins, "error initializing event encoder : %d", ret); + + de_config_destroy(ctx); + + ctx = NULL; + } + + return ctx; +} + +/** + * Function to destroy docker_events plugin. + * + * @param ctx Pointer to flb_in_de_config + * + * @return int 0 + */ +int de_config_destroy(struct flb_in_de_config *ctx) +{ + if (ctx->buf) { + flb_free(ctx->buf); + } + + flb_log_event_encoder_destroy(&ctx->log_encoder); + + flb_free(ctx); + return 0; +} diff --git a/src/fluent-bit/plugins/in_docker_events/docker_events_config.h b/src/fluent-bit/plugins/in_docker_events/docker_events_config.h new file mode 100644 index 000000000..94a6d87db --- /dev/null +++ b/src/fluent-bit/plugins/in_docker_events/docker_events_config.h @@ -0,0 +1,29 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_DE_CONFIG_H +#define FLB_IN_DE_CONFIG_H + +#include "docker_events.h" + +struct flb_in_de_config *de_config_init(struct flb_input_instance *ins, + struct flb_config *config); +int de_config_destroy(struct flb_in_de_config *config); + +#endif -- cgit v1.2.3