diff options
Diffstat (limited to 'src/fluent-bit/plugins/in_docker_events')
5 files changed, 0 insertions, 672 deletions
diff --git a/src/fluent-bit/plugins/in_docker_events/CMakeLists.txt b/src/fluent-bit/plugins/in_docker_events/CMakeLists.txt deleted file mode 100644 index dee7c0f27..000000000 --- a/src/fluent-bit/plugins/in_docker_events/CMakeLists.txt +++ /dev/null @@ -1,5 +0,0 @@ -set(src - docker_events.c - docker_events_config.c) - -FLB_PLUGIN(in_docker_events "${src}" "") diff --git a/src/fluent-bit/plugins/in_docker_events/docker_events.c b/src/fluent-bit/plugins/in_docker_events/docker_events.c deleted file mode 100644 index 7534eb1d6..000000000 --- a/src/fluent-bit/plugins/in_docker_events/docker_events.c +++ /dev/null @@ -1,476 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_input_plugin.h> -#include <fluent-bit/flb_network.h> -#include <fluent-bit/flb_pack.h> -#include <sys/socket.h> -#include <sys/un.h> - -#include "docker_events.h" -#include "docker_events_config.h" - - -/** - * Creates the connection to docker's unix socket and sends the - * HTTP GET /events - * - * @param ctx Pointer to flb_in_de_config - * - * @return int 0 on success, -1 on failure - */ -static int de_unix_create(struct flb_in_de_config *ctx) -{ - ssize_t bytes; - unsigned long len; - size_t address_length; - struct sockaddr_un address; - char request[512]; - - ctx->fd = flb_net_socket_create(AF_UNIX, FLB_FALSE); - if (ctx->fd == -1) { - return -1; - } - - /* Prepare the unix socket path */ - len = strlen(ctx->unix_path); - address.sun_family = AF_UNIX; - sprintf(address.sun_path, "%s", ctx->unix_path); - address_length = sizeof(address.sun_family) + len + 1; - if (connect(ctx->fd, (struct sockaddr *)&address, address_length) == -1) { - flb_errno(); - close(ctx->fd); - return -1; - } - - strcpy(request, "GET /events HTTP/1.0\r\n\r\n"); - flb_plg_trace(ctx->ins, "writing to socket %s", request); - write(ctx->fd, request, strlen(request)); - - /* Read the initial http response */ - bytes = read(ctx->fd, ctx->buf, ctx->buf_size - 1); - if (bytes == -1) { - flb_errno(); - } - flb_plg_debug(ctx->ins, "read %zu bytes from socket", bytes); - - return 0; -} - -static int in_de_collect(struct flb_input_instance *ins, - struct flb_config *config, void *in_context); - -static int reconnect_docker_sock(struct flb_input_instance *ins, - struct flb_config *config, - struct flb_in_de_config *ctx) -{ - int ret; - - /* remove old socket collector */ - if (ctx->coll_id >= 0) { - ret = flb_input_collector_delete(ctx->coll_id, ins); - if (ret < 0) { - flb_plg_error(ctx->ins, "failed to pause event"); - return -1; - } - ctx->coll_id = -1; - } - if (ctx->fd > 0) { - flb_plg_debug(ctx->ins, "close socket fd=%d", ctx->fd); - close(ctx->fd); - ctx->fd = -1; - } - - /* create socket again */ - if (de_unix_create(ctx) < 0) { - flb_plg_error(ctx->ins, "failed to re-initialize socket"); - if (ctx->fd > 0) { - flb_plg_debug(ctx->ins, "close socket fd=%d", ctx->fd); - close(ctx->fd); - ctx->fd = -1; - } - return -1; - } - /* set event */ - ctx->coll_id = flb_input_set_collector_event(ins, - in_de_collect, - ctx->fd, config); - if (ctx->coll_id < 0) { - flb_plg_error(ctx->ins, - "could not set collector for IN_DOCKER_EVENTS plugin"); - close(ctx->fd); - ctx->fd = -1; - return -1; - } - ret = flb_input_collector_start(ctx->coll_id, ins); - if (ret < 0) { - flb_plg_error(ctx->ins, - "could not start collector for IN_DOCKER_EVENTS plugin"); - flb_input_collector_delete(ctx->coll_id, ins); - close(ctx->fd); - ctx->coll_id = -1; - ctx->fd = -1; - return -1; - } - - flb_plg_info(ctx->ins, "Reconnect successful"); - return 0; -} - -static int cb_reconnect(struct flb_input_instance *ins, - struct flb_config *config, - void *in_context) -{ - struct flb_in_de_config *ctx = in_context; - int ret; - - flb_plg_info(ctx->ins, "Retry(%d/%d)", - ctx->current_retries, ctx->reconnect_retry_limits); - ret = reconnect_docker_sock(ins, config, ctx); - if (ret < 0) { - /* Failed to reconnect */ - ctx->current_retries++; - if (ctx->current_retries > ctx->reconnect_retry_limits) { - /* give up */ - flb_plg_error(ctx->ins, "Failed to retry. Giving up..."); - goto cb_reconnect_end; - } - flb_plg_info(ctx->ins, "Failed. Waiting for next retry.."); - return 0; - } - - cb_reconnect_end: - if(flb_input_collector_delete(ctx->retry_coll_id, ins) < 0) { - flb_plg_error(ctx->ins, "failed to delete timer event"); - } - ctx->current_retries = 0; - ctx->retry_coll_id = -1; - return ret; -} - -static int create_reconnect_event(struct flb_input_instance *ins, - struct flb_config *config, - struct flb_in_de_config *ctx) -{ - int ret; - - if (ctx->retry_coll_id >= 0) { - flb_plg_debug(ctx->ins, "already retring ?"); - return 0; - } - - /* try before creating event to stop incoming event */ - ret = reconnect_docker_sock(ins, config, ctx); - if (ret == 0) { - return 0; - } - - ctx->current_retries = 1; - ctx->retry_coll_id = flb_input_set_collector_time(ins, - cb_reconnect, - ctx->reconnect_retry_interval, - 0, - config); - if (ctx->retry_coll_id < 0) { - flb_plg_error(ctx->ins, "failed to create timer event"); - return -1; - } - ret = flb_input_collector_start(ctx->retry_coll_id, ins); - if (ret < 0) { - flb_plg_error(ctx->ins, "failed to start timer event"); - flb_input_collector_delete(ctx->retry_coll_id, ins); - ctx->retry_coll_id = -1; - return -1; - } - flb_plg_info(ctx->ins, "create reconnect event. interval=%d second", - ctx->reconnect_retry_interval); - - return 0; -} - -static int is_recoverable_error(int error) -{ - /* ENOTTY: - It reports on Docker in Docker mode. - https://github.com/fluent/fluent-bit/issues/3439#issuecomment-831424674 - */ - if (error == ENOTTY || error == EBADF) { - return FLB_TRUE; - } - return FLB_FALSE; -} - - -/** - * Callback function to process events recieved on the unix - * socket. - * - * @param ins Pointer to flb_input_instance - * @param config Pointer to flb_config - * @param in_context void Pointer used to cast to - * flb_in_de_config - * - * @return int Always returns success - */ -static int in_de_collect(struct flb_input_instance *ins, - struct flb_config *config, void *in_context) -{ - int ret = 0; - int error; - size_t str_len = 0; - struct flb_in_de_config *ctx = in_context; - - /* variables for parser */ - int parser_ret = -1; - void *out_buf = NULL; - size_t out_size = 0; - struct flb_time out_time; - - ret = read(ctx->fd, ctx->buf, ctx->buf_size - 1); - - if (ret > 0) { - str_len = ret; - ctx->buf[str_len] = '\0'; - - ret = flb_log_event_encoder_begin_record(&ctx->log_encoder); - - if (!ctx->parser) { - /* Initialize local msgpack buffer */ - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_current_timestamp( - &ctx->log_encoder); - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_values( - &ctx->log_encoder, - FLB_LOG_EVENT_CSTRING_VALUE(ctx->key), - FLB_LOG_EVENT_STRING_VALUE(ctx->buf, str_len)); - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_commit_record(&ctx->log_encoder); - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - flb_input_log_append(ins, NULL, 0, - ctx->log_encoder.output_buffer, - ctx->log_encoder.output_length); - - } - else { - flb_plg_error(ctx->ins, "Error encoding record : %d", ret); - } - } - else { - flb_time_get(&out_time); - - parser_ret = flb_parser_do(ctx->parser, ctx->buf, str_len - 1, - &out_buf, &out_size, &out_time); - if (parser_ret >= 0) { - if (flb_time_to_nanosec(&out_time) == 0L) { - flb_time_get(&out_time); - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_timestamp( - &ctx->log_encoder, - &out_time); - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_body_from_raw_msgpack( - &ctx->log_encoder, - out_buf, - out_size); - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_commit_record(&ctx->log_encoder); - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - flb_input_log_append(ins, NULL, 0, - ctx->log_encoder.output_buffer, - ctx->log_encoder.output_length); - - } - else { - flb_plg_error(ctx->ins, "Error encoding record : %d", ret); - } - - - flb_free(out_buf); - } - else { - flb_plg_trace(ctx->ins, "tried to parse: %s", ctx->buf); - flb_plg_trace(ctx->ins, "buf_size %zu", ctx->buf_size); - flb_plg_error(ctx->ins, "parser returned an error: %d", - parser_ret); - } - } - - flb_log_event_encoder_reset(&ctx->log_encoder); - } - else if (ret == 0) { - /* EOF */ - - /* docker service may be restarted */ - flb_plg_info(ctx->ins, "EOF detected. Re-initialize"); - if (ctx->reconnect_retry_limits > 0) { - ret = create_reconnect_event(ins, config, ctx); - if (ret < 0) { - return ret; - } - } - } - else { - error = errno; - flb_plg_error(ctx->ins, "read returned error: %d, %s", error, - strerror(error)); - if (is_recoverable_error(error)) { - if (ctx->reconnect_retry_limits > 0) { - ret = create_reconnect_event(ins, config, ctx); - if (ret < 0) { - return ret; - } - } - } - } - - return 0; -} - -/** - * Callback function to initialize docker events plugin - * - * @param ins Pointer to flb_input_instance - * @param config Pointer to flb_config - * @param data Unused - * - * @return int 0 on success, -1 on failure - */ -static int in_de_init(struct flb_input_instance *ins, - struct flb_config *config, void *data) -{ - struct flb_in_de_config *ctx = NULL; - (void) data; - - /* Allocate space for the configuration */ - ctx = de_config_init(ins, config); - if (!ctx) { - return -1; - } - ctx->ins = ins; - ctx->retry_coll_id = -1; - ctx->current_retries = 0; - - /* Set the context */ - flb_input_set_context(ins, ctx); - - if (de_unix_create(ctx) != 0) { - flb_plg_error(ctx->ins, "could not listen on unix://%s", - ctx->unix_path); - de_config_destroy(ctx); - return -1; - } - - ctx->coll_id = flb_input_set_collector_event(ins, in_de_collect, - ctx->fd, config); - if(ctx->coll_id < 0){ - flb_plg_error(ctx->ins, - "could not set collector for IN_DOCKER_EVENTS plugin"); - de_config_destroy(ctx); - return -1; - } - - flb_plg_info(ctx->ins, "listening for events on %s", ctx->unix_path); - return 0; -} - -/** - * Callback exit function to cleanup plugin - * - * @param data Pointer cast to flb_in_de_config - * @param config Unused - * - * @return int Always returns 0 - */ -static int in_de_exit(void *data, struct flb_config *config) -{ - (void) config; - struct flb_in_de_config *ctx = data; - - if (!ctx) { - return 0; - } - - de_config_destroy(ctx); - - return 0; -} - -/* Configuration properties map */ -static struct flb_config_map config_map[] = { - { - FLB_CONFIG_MAP_STR, "unix_path", DEFAULT_UNIX_SOCKET_PATH, - 0, FLB_TRUE, offsetof(struct flb_in_de_config, unix_path), - "Define Docker unix socket path to read events" - }, - { - FLB_CONFIG_MAP_SIZE, "buffer_size", "8k", - 0, FLB_TRUE, offsetof(struct flb_in_de_config, buf_size), - "Set buffer size to read events" - }, - { - FLB_CONFIG_MAP_STR, "parser", NULL, - 0, FLB_FALSE, 0, - "Optional parser for records, if not set, records are packages under 'key'" - }, - { - FLB_CONFIG_MAP_STR, "key", DEFAULT_FIELD_NAME, - 0, FLB_TRUE, offsetof(struct flb_in_de_config, key), - "Set the key name to store unparsed Docker events" - }, - { - FLB_CONFIG_MAP_INT, "reconnect.retry_limits", "5", - 0, FLB_TRUE, offsetof(struct flb_in_de_config, reconnect_retry_limits), - "Maximum number to retry to connect docker socket" - }, - { - FLB_CONFIG_MAP_INT, "reconnect.retry_interval", "1", - 0, FLB_TRUE, offsetof(struct flb_in_de_config, reconnect_retry_interval), - "Retry interval to connect docker socket" - }, - /* EOF */ - {0} -}; - -/* Plugin reference */ -struct flb_input_plugin in_docker_events_plugin = { - .name = "docker_events", - .description = "Docker events", - .cb_init = in_de_init, - .cb_pre_run = NULL, - .cb_collect = in_de_collect, - .cb_flush_buf = NULL, - .cb_exit = in_de_exit, - .config_map = config_map, - .flags = FLB_INPUT_NET -}; diff --git a/src/fluent-bit/plugins/in_docker_events/docker_events.h b/src/fluent-bit/plugins/in_docker_events/docker_events.h deleted file mode 100644 index dc659d5ec..000000000 --- a/src/fluent-bit/plugins/in_docker_events/docker_events.h +++ /dev/null @@ -1,56 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_IN_DE_H -#define FLB_IN_DE_H - -#include <msgpack.h> -#include <fluent-bit/flb_input.h> -#include <fluent-bit/flb_parser.h> -#include <fluent-bit/flb_log_event_encoder.h> - -#define DEFAULT_BUF_SIZE 8192 -#define MIN_BUF_SIZE 2048 -#define DEFAULT_FIELD_NAME "message" -#define DEFAULT_UNIX_SOCKET_PATH "/var/run/docker.sock" - -struct flb_in_de_config -{ - int fd; /* File descriptor */ - int coll_id; /* collector id */ - flb_sds_t unix_path; /* Unix path for socket */ - char *buf; - size_t buf_size; - flb_sds_t key; - - /* retries */ - int reconnect_retry_limits; - int reconnect_retry_interval; - - /* retries (internal) */ - int current_retries; - int retry_coll_id; - - struct flb_parser *parser; - struct flb_log_event_encoder log_encoder; - struct flb_input_instance *ins; /* Input plugin instace */ - -}; - -#endif diff --git a/src/fluent-bit/plugins/in_docker_events/docker_events_config.c b/src/fluent-bit/plugins/in_docker_events/docker_events_config.c deleted file mode 100644 index 8290686c1..000000000 --- a/src/fluent-bit/plugins/in_docker_events/docker_events_config.c +++ /dev/null @@ -1,106 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_input_plugin.h> -#include <fluent-bit/flb_utils.h> - -#include "docker_events.h" -#include "docker_events_config.h" - -/** - * Function to initialize docker_events plugin. - * - * @param ins Pointer to flb_input_instance - * @param config Pointer to flb_config - * - * @return struct flb_in_de_config* Pointer to the plugin's - * structure on success, NULL on failure. - */ -struct flb_in_de_config *de_config_init(struct flb_input_instance *ins, - struct flb_config *config) -{ - int ret; - const char *tmp; - struct flb_in_de_config *ctx; - - ctx = flb_calloc(1, sizeof(struct flb_in_de_config)); - if (!ctx) { - flb_errno(); - return NULL; - } - ctx->ins = ins; - - /* Load the config map */ - ret = flb_input_config_map_set(ins, (void *) ctx); - if (ret == -1) { - flb_free(ctx); - return NULL; - } - - /* Allocate buffer for events */ - ctx->buf = flb_malloc(ctx->buf_size); - if (!ctx->buf) { - flb_errno(); - flb_free(ctx); - return NULL; - } - - tmp = flb_input_get_property("parser", ins); - if (tmp) { - ctx->parser = flb_parser_get(tmp, config); - if (ctx->parser == NULL) { - flb_plg_error(ctx->ins, "requested parser '%s' not found", tmp); - flb_free(ctx->buf); - flb_free(ctx); - return NULL; - } - } - - ret = flb_log_event_encoder_init(&ctx->log_encoder, - FLB_LOG_EVENT_FORMAT_DEFAULT); - - if (ret != FLB_EVENT_ENCODER_SUCCESS) { - flb_plg_error(ctx->ins, "error initializing event encoder : %d", ret); - - de_config_destroy(ctx); - - ctx = NULL; - } - - return ctx; -} - -/** - * Function to destroy docker_events plugin. - * - * @param ctx Pointer to flb_in_de_config - * - * @return int 0 - */ -int de_config_destroy(struct flb_in_de_config *ctx) -{ - if (ctx->buf) { - flb_free(ctx->buf); - } - - flb_log_event_encoder_destroy(&ctx->log_encoder); - - flb_free(ctx); - return 0; -} diff --git a/src/fluent-bit/plugins/in_docker_events/docker_events_config.h b/src/fluent-bit/plugins/in_docker_events/docker_events_config.h deleted file mode 100644 index 94a6d87db..000000000 --- a/src/fluent-bit/plugins/in_docker_events/docker_events_config.h +++ /dev/null @@ -1,29 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_IN_DE_CONFIG_H -#define FLB_IN_DE_CONFIG_H - -#include "docker_events.h" - -struct flb_in_de_config *de_config_init(struct flb_input_instance *ins, - struct flb_config *config); -int de_config_destroy(struct flb_in_de_config *config); - -#endif |