summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/in_docker_events
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/plugins/in_docker_events')
-rw-r--r--src/fluent-bit/plugins/in_docker_events/CMakeLists.txt5
-rw-r--r--src/fluent-bit/plugins/in_docker_events/docker_events.c476
-rw-r--r--src/fluent-bit/plugins/in_docker_events/docker_events.h56
-rw-r--r--src/fluent-bit/plugins/in_docker_events/docker_events_config.c106
-rw-r--r--src/fluent-bit/plugins/in_docker_events/docker_events_config.h29
5 files changed, 0 insertions, 672 deletions
diff --git a/src/fluent-bit/plugins/in_docker_events/CMakeLists.txt b/src/fluent-bit/plugins/in_docker_events/CMakeLists.txt
deleted file mode 100644
index dee7c0f27..000000000
--- a/src/fluent-bit/plugins/in_docker_events/CMakeLists.txt
+++ /dev/null
@@ -1,5 +0,0 @@
-set(src
- docker_events.c
- docker_events_config.c)
-
-FLB_PLUGIN(in_docker_events "${src}" "")
diff --git a/src/fluent-bit/plugins/in_docker_events/docker_events.c b/src/fluent-bit/plugins/in_docker_events/docker_events.c
deleted file mode 100644
index 7534eb1d6..000000000
--- a/src/fluent-bit/plugins/in_docker_events/docker_events.c
+++ /dev/null
@@ -1,476 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <fluent-bit/flb_input_plugin.h>
-#include <fluent-bit/flb_network.h>
-#include <fluent-bit/flb_pack.h>
-#include <sys/socket.h>
-#include <sys/un.h>
-
-#include "docker_events.h"
-#include "docker_events_config.h"
-
-
-/**
- * Creates the connection to docker's unix socket and sends the
- * HTTP GET /events
- *
- * @param ctx Pointer to flb_in_de_config
- *
- * @return int 0 on success, -1 on failure
- */
-static int de_unix_create(struct flb_in_de_config *ctx)
-{
- ssize_t bytes;
- unsigned long len;
- size_t address_length;
- struct sockaddr_un address;
- char request[512];
-
- ctx->fd = flb_net_socket_create(AF_UNIX, FLB_FALSE);
- if (ctx->fd == -1) {
- return -1;
- }
-
- /* Prepare the unix socket path */
- len = strlen(ctx->unix_path);
- address.sun_family = AF_UNIX;
- sprintf(address.sun_path, "%s", ctx->unix_path);
- address_length = sizeof(address.sun_family) + len + 1;
- if (connect(ctx->fd, (struct sockaddr *)&address, address_length) == -1) {
- flb_errno();
- close(ctx->fd);
- return -1;
- }
-
- strcpy(request, "GET /events HTTP/1.0\r\n\r\n");
- flb_plg_trace(ctx->ins, "writing to socket %s", request);
- write(ctx->fd, request, strlen(request));
-
- /* Read the initial http response */
- bytes = read(ctx->fd, ctx->buf, ctx->buf_size - 1);
- if (bytes == -1) {
- flb_errno();
- }
- flb_plg_debug(ctx->ins, "read %zu bytes from socket", bytes);
-
- return 0;
-}
-
-static int in_de_collect(struct flb_input_instance *ins,
- struct flb_config *config, void *in_context);
-
-static int reconnect_docker_sock(struct flb_input_instance *ins,
- struct flb_config *config,
- struct flb_in_de_config *ctx)
-{
- int ret;
-
- /* remove old socket collector */
- if (ctx->coll_id >= 0) {
- ret = flb_input_collector_delete(ctx->coll_id, ins);
- if (ret < 0) {
- flb_plg_error(ctx->ins, "failed to pause event");
- return -1;
- }
- ctx->coll_id = -1;
- }
- if (ctx->fd > 0) {
- flb_plg_debug(ctx->ins, "close socket fd=%d", ctx->fd);
- close(ctx->fd);
- ctx->fd = -1;
- }
-
- /* create socket again */
- if (de_unix_create(ctx) < 0) {
- flb_plg_error(ctx->ins, "failed to re-initialize socket");
- if (ctx->fd > 0) {
- flb_plg_debug(ctx->ins, "close socket fd=%d", ctx->fd);
- close(ctx->fd);
- ctx->fd = -1;
- }
- return -1;
- }
- /* set event */
- ctx->coll_id = flb_input_set_collector_event(ins,
- in_de_collect,
- ctx->fd, config);
- if (ctx->coll_id < 0) {
- flb_plg_error(ctx->ins,
- "could not set collector for IN_DOCKER_EVENTS plugin");
- close(ctx->fd);
- ctx->fd = -1;
- return -1;
- }
- ret = flb_input_collector_start(ctx->coll_id, ins);
- if (ret < 0) {
- flb_plg_error(ctx->ins,
- "could not start collector for IN_DOCKER_EVENTS plugin");
- flb_input_collector_delete(ctx->coll_id, ins);
- close(ctx->fd);
- ctx->coll_id = -1;
- ctx->fd = -1;
- return -1;
- }
-
- flb_plg_info(ctx->ins, "Reconnect successful");
- return 0;
-}
-
-static int cb_reconnect(struct flb_input_instance *ins,
- struct flb_config *config,
- void *in_context)
-{
- struct flb_in_de_config *ctx = in_context;
- int ret;
-
- flb_plg_info(ctx->ins, "Retry(%d/%d)",
- ctx->current_retries, ctx->reconnect_retry_limits);
- ret = reconnect_docker_sock(ins, config, ctx);
- if (ret < 0) {
- /* Failed to reconnect */
- ctx->current_retries++;
- if (ctx->current_retries > ctx->reconnect_retry_limits) {
- /* give up */
- flb_plg_error(ctx->ins, "Failed to retry. Giving up...");
- goto cb_reconnect_end;
- }
- flb_plg_info(ctx->ins, "Failed. Waiting for next retry..");
- return 0;
- }
-
- cb_reconnect_end:
- if(flb_input_collector_delete(ctx->retry_coll_id, ins) < 0) {
- flb_plg_error(ctx->ins, "failed to delete timer event");
- }
- ctx->current_retries = 0;
- ctx->retry_coll_id = -1;
- return ret;
-}
-
-static int create_reconnect_event(struct flb_input_instance *ins,
- struct flb_config *config,
- struct flb_in_de_config *ctx)
-{
- int ret;
-
- if (ctx->retry_coll_id >= 0) {
- flb_plg_debug(ctx->ins, "already retring ?");
- return 0;
- }
-
- /* try before creating event to stop incoming event */
- ret = reconnect_docker_sock(ins, config, ctx);
- if (ret == 0) {
- return 0;
- }
-
- ctx->current_retries = 1;
- ctx->retry_coll_id = flb_input_set_collector_time(ins,
- cb_reconnect,
- ctx->reconnect_retry_interval,
- 0,
- config);
- if (ctx->retry_coll_id < 0) {
- flb_plg_error(ctx->ins, "failed to create timer event");
- return -1;
- }
- ret = flb_input_collector_start(ctx->retry_coll_id, ins);
- if (ret < 0) {
- flb_plg_error(ctx->ins, "failed to start timer event");
- flb_input_collector_delete(ctx->retry_coll_id, ins);
- ctx->retry_coll_id = -1;
- return -1;
- }
- flb_plg_info(ctx->ins, "create reconnect event. interval=%d second",
- ctx->reconnect_retry_interval);
-
- return 0;
-}
-
-static int is_recoverable_error(int error)
-{
- /* ENOTTY:
- It reports on Docker in Docker mode.
- https://github.com/fluent/fluent-bit/issues/3439#issuecomment-831424674
- */
- if (error == ENOTTY || error == EBADF) {
- return FLB_TRUE;
- }
- return FLB_FALSE;
-}
-
-
-/**
- * Callback function to process events recieved on the unix
- * socket.
- *
- * @param ins Pointer to flb_input_instance
- * @param config Pointer to flb_config
- * @param in_context void Pointer used to cast to
- * flb_in_de_config
- *
- * @return int Always returns success
- */
-static int in_de_collect(struct flb_input_instance *ins,
- struct flb_config *config, void *in_context)
-{
- int ret = 0;
- int error;
- size_t str_len = 0;
- struct flb_in_de_config *ctx = in_context;
-
- /* variables for parser */
- int parser_ret = -1;
- void *out_buf = NULL;
- size_t out_size = 0;
- struct flb_time out_time;
-
- ret = read(ctx->fd, ctx->buf, ctx->buf_size - 1);
-
- if (ret > 0) {
- str_len = ret;
- ctx->buf[str_len] = '\0';
-
- ret = flb_log_event_encoder_begin_record(&ctx->log_encoder);
-
- if (!ctx->parser) {
- /* Initialize local msgpack buffer */
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_log_event_encoder_set_current_timestamp(
- &ctx->log_encoder);
- }
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_log_event_encoder_append_body_values(
- &ctx->log_encoder,
- FLB_LOG_EVENT_CSTRING_VALUE(ctx->key),
- FLB_LOG_EVENT_STRING_VALUE(ctx->buf, str_len));
- }
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_log_event_encoder_commit_record(&ctx->log_encoder);
- }
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- flb_input_log_append(ins, NULL, 0,
- ctx->log_encoder.output_buffer,
- ctx->log_encoder.output_length);
-
- }
- else {
- flb_plg_error(ctx->ins, "Error encoding record : %d", ret);
- }
- }
- else {
- flb_time_get(&out_time);
-
- parser_ret = flb_parser_do(ctx->parser, ctx->buf, str_len - 1,
- &out_buf, &out_size, &out_time);
- if (parser_ret >= 0) {
- if (flb_time_to_nanosec(&out_time) == 0L) {
- flb_time_get(&out_time);
- }
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_log_event_encoder_set_timestamp(
- &ctx->log_encoder,
- &out_time);
- }
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_log_event_encoder_set_body_from_raw_msgpack(
- &ctx->log_encoder,
- out_buf,
- out_size);
- }
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_log_event_encoder_commit_record(&ctx->log_encoder);
- }
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- flb_input_log_append(ins, NULL, 0,
- ctx->log_encoder.output_buffer,
- ctx->log_encoder.output_length);
-
- }
- else {
- flb_plg_error(ctx->ins, "Error encoding record : %d", ret);
- }
-
-
- flb_free(out_buf);
- }
- else {
- flb_plg_trace(ctx->ins, "tried to parse: %s", ctx->buf);
- flb_plg_trace(ctx->ins, "buf_size %zu", ctx->buf_size);
- flb_plg_error(ctx->ins, "parser returned an error: %d",
- parser_ret);
- }
- }
-
- flb_log_event_encoder_reset(&ctx->log_encoder);
- }
- else if (ret == 0) {
- /* EOF */
-
- /* docker service may be restarted */
- flb_plg_info(ctx->ins, "EOF detected. Re-initialize");
- if (ctx->reconnect_retry_limits > 0) {
- ret = create_reconnect_event(ins, config, ctx);
- if (ret < 0) {
- return ret;
- }
- }
- }
- else {
- error = errno;
- flb_plg_error(ctx->ins, "read returned error: %d, %s", error,
- strerror(error));
- if (is_recoverable_error(error)) {
- if (ctx->reconnect_retry_limits > 0) {
- ret = create_reconnect_event(ins, config, ctx);
- if (ret < 0) {
- return ret;
- }
- }
- }
- }
-
- return 0;
-}
-
-/**
- * Callback function to initialize docker events plugin
- *
- * @param ins Pointer to flb_input_instance
- * @param config Pointer to flb_config
- * @param data Unused
- *
- * @return int 0 on success, -1 on failure
- */
-static int in_de_init(struct flb_input_instance *ins,
- struct flb_config *config, void *data)
-{
- struct flb_in_de_config *ctx = NULL;
- (void) data;
-
- /* Allocate space for the configuration */
- ctx = de_config_init(ins, config);
- if (!ctx) {
- return -1;
- }
- ctx->ins = ins;
- ctx->retry_coll_id = -1;
- ctx->current_retries = 0;
-
- /* Set the context */
- flb_input_set_context(ins, ctx);
-
- if (de_unix_create(ctx) != 0) {
- flb_plg_error(ctx->ins, "could not listen on unix://%s",
- ctx->unix_path);
- de_config_destroy(ctx);
- return -1;
- }
-
- ctx->coll_id = flb_input_set_collector_event(ins, in_de_collect,
- ctx->fd, config);
- if(ctx->coll_id < 0){
- flb_plg_error(ctx->ins,
- "could not set collector for IN_DOCKER_EVENTS plugin");
- de_config_destroy(ctx);
- return -1;
- }
-
- flb_plg_info(ctx->ins, "listening for events on %s", ctx->unix_path);
- return 0;
-}
-
-/**
- * Callback exit function to cleanup plugin
- *
- * @param data Pointer cast to flb_in_de_config
- * @param config Unused
- *
- * @return int Always returns 0
- */
-static int in_de_exit(void *data, struct flb_config *config)
-{
- (void) config;
- struct flb_in_de_config *ctx = data;
-
- if (!ctx) {
- return 0;
- }
-
- de_config_destroy(ctx);
-
- return 0;
-}
-
-/* Configuration properties map */
-static struct flb_config_map config_map[] = {
- {
- FLB_CONFIG_MAP_STR, "unix_path", DEFAULT_UNIX_SOCKET_PATH,
- 0, FLB_TRUE, offsetof(struct flb_in_de_config, unix_path),
- "Define Docker unix socket path to read events"
- },
- {
- FLB_CONFIG_MAP_SIZE, "buffer_size", "8k",
- 0, FLB_TRUE, offsetof(struct flb_in_de_config, buf_size),
- "Set buffer size to read events"
- },
- {
- FLB_CONFIG_MAP_STR, "parser", NULL,
- 0, FLB_FALSE, 0,
- "Optional parser for records, if not set, records are packages under 'key'"
- },
- {
- FLB_CONFIG_MAP_STR, "key", DEFAULT_FIELD_NAME,
- 0, FLB_TRUE, offsetof(struct flb_in_de_config, key),
- "Set the key name to store unparsed Docker events"
- },
- {
- FLB_CONFIG_MAP_INT, "reconnect.retry_limits", "5",
- 0, FLB_TRUE, offsetof(struct flb_in_de_config, reconnect_retry_limits),
- "Maximum number to retry to connect docker socket"
- },
- {
- FLB_CONFIG_MAP_INT, "reconnect.retry_interval", "1",
- 0, FLB_TRUE, offsetof(struct flb_in_de_config, reconnect_retry_interval),
- "Retry interval to connect docker socket"
- },
- /* EOF */
- {0}
-};
-
-/* Plugin reference */
-struct flb_input_plugin in_docker_events_plugin = {
- .name = "docker_events",
- .description = "Docker events",
- .cb_init = in_de_init,
- .cb_pre_run = NULL,
- .cb_collect = in_de_collect,
- .cb_flush_buf = NULL,
- .cb_exit = in_de_exit,
- .config_map = config_map,
- .flags = FLB_INPUT_NET
-};
diff --git a/src/fluent-bit/plugins/in_docker_events/docker_events.h b/src/fluent-bit/plugins/in_docker_events/docker_events.h
deleted file mode 100644
index dc659d5ec..000000000
--- a/src/fluent-bit/plugins/in_docker_events/docker_events.h
+++ /dev/null
@@ -1,56 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef FLB_IN_DE_H
-#define FLB_IN_DE_H
-
-#include <msgpack.h>
-#include <fluent-bit/flb_input.h>
-#include <fluent-bit/flb_parser.h>
-#include <fluent-bit/flb_log_event_encoder.h>
-
-#define DEFAULT_BUF_SIZE 8192
-#define MIN_BUF_SIZE 2048
-#define DEFAULT_FIELD_NAME "message"
-#define DEFAULT_UNIX_SOCKET_PATH "/var/run/docker.sock"
-
-struct flb_in_de_config
-{
- int fd; /* File descriptor */
- int coll_id; /* collector id */
- flb_sds_t unix_path; /* Unix path for socket */
- char *buf;
- size_t buf_size;
- flb_sds_t key;
-
- /* retries */
- int reconnect_retry_limits;
- int reconnect_retry_interval;
-
- /* retries (internal) */
- int current_retries;
- int retry_coll_id;
-
- struct flb_parser *parser;
- struct flb_log_event_encoder log_encoder;
- struct flb_input_instance *ins; /* Input plugin instace */
-
-};
-
-#endif
diff --git a/src/fluent-bit/plugins/in_docker_events/docker_events_config.c b/src/fluent-bit/plugins/in_docker_events/docker_events_config.c
deleted file mode 100644
index 8290686c1..000000000
--- a/src/fluent-bit/plugins/in_docker_events/docker_events_config.c
+++ /dev/null
@@ -1,106 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <fluent-bit/flb_input_plugin.h>
-#include <fluent-bit/flb_utils.h>
-
-#include "docker_events.h"
-#include "docker_events_config.h"
-
-/**
- * Function to initialize docker_events plugin.
- *
- * @param ins Pointer to flb_input_instance
- * @param config Pointer to flb_config
- *
- * @return struct flb_in_de_config* Pointer to the plugin's
- * structure on success, NULL on failure.
- */
-struct flb_in_de_config *de_config_init(struct flb_input_instance *ins,
- struct flb_config *config)
-{
- int ret;
- const char *tmp;
- struct flb_in_de_config *ctx;
-
- ctx = flb_calloc(1, sizeof(struct flb_in_de_config));
- if (!ctx) {
- flb_errno();
- return NULL;
- }
- ctx->ins = ins;
-
- /* Load the config map */
- ret = flb_input_config_map_set(ins, (void *) ctx);
- if (ret == -1) {
- flb_free(ctx);
- return NULL;
- }
-
- /* Allocate buffer for events */
- ctx->buf = flb_malloc(ctx->buf_size);
- if (!ctx->buf) {
- flb_errno();
- flb_free(ctx);
- return NULL;
- }
-
- tmp = flb_input_get_property("parser", ins);
- if (tmp) {
- ctx->parser = flb_parser_get(tmp, config);
- if (ctx->parser == NULL) {
- flb_plg_error(ctx->ins, "requested parser '%s' not found", tmp);
- flb_free(ctx->buf);
- flb_free(ctx);
- return NULL;
- }
- }
-
- ret = flb_log_event_encoder_init(&ctx->log_encoder,
- FLB_LOG_EVENT_FORMAT_DEFAULT);
-
- if (ret != FLB_EVENT_ENCODER_SUCCESS) {
- flb_plg_error(ctx->ins, "error initializing event encoder : %d", ret);
-
- de_config_destroy(ctx);
-
- ctx = NULL;
- }
-
- return ctx;
-}
-
-/**
- * Function to destroy docker_events plugin.
- *
- * @param ctx Pointer to flb_in_de_config
- *
- * @return int 0
- */
-int de_config_destroy(struct flb_in_de_config *ctx)
-{
- if (ctx->buf) {
- flb_free(ctx->buf);
- }
-
- flb_log_event_encoder_destroy(&ctx->log_encoder);
-
- flb_free(ctx);
- return 0;
-}
diff --git a/src/fluent-bit/plugins/in_docker_events/docker_events_config.h b/src/fluent-bit/plugins/in_docker_events/docker_events_config.h
deleted file mode 100644
index 94a6d87db..000000000
--- a/src/fluent-bit/plugins/in_docker_events/docker_events_config.h
+++ /dev/null
@@ -1,29 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef FLB_IN_DE_CONFIG_H
-#define FLB_IN_DE_CONFIG_H
-
-#include "docker_events.h"
-
-struct flb_in_de_config *de_config_init(struct flb_input_instance *ins,
- struct flb_config *config);
-int de_config_destroy(struct flb_in_de_config *config);
-
-#endif