diff options
Diffstat (limited to 'src/fluent-bit/plugins/in_forward')
-rw-r--r-- | src/fluent-bit/plugins/in_forward/CMakeLists.txt | 7 | ||||
-rw-r--r-- | src/fluent-bit/plugins/in_forward/fw.c | 325 | ||||
-rw-r--r-- | src/fluent-bit/plugins/in_forward/fw.h | 52 | ||||
-rw-r--r-- | src/fluent-bit/plugins/in_forward/fw_config.c | 120 | ||||
-rw-r--r-- | src/fluent-bit/plugins/in_forward/fw_config.h | 28 | ||||
-rw-r--r-- | src/fluent-bit/plugins/in_forward/fw_conn.c | 199 | ||||
-rw-r--r-- | src/fluent-bit/plugins/in_forward/fw_conn.h | 57 | ||||
-rw-r--r-- | src/fluent-bit/plugins/in_forward/fw_prot.c | 846 | ||||
-rw-r--r-- | src/fluent-bit/plugins/in_forward/fw_prot.h | 28 |
9 files changed, 0 insertions, 1662 deletions
diff --git a/src/fluent-bit/plugins/in_forward/CMakeLists.txt b/src/fluent-bit/plugins/in_forward/CMakeLists.txt deleted file mode 100644 index ce4f62728..000000000 --- a/src/fluent-bit/plugins/in_forward/CMakeLists.txt +++ /dev/null @@ -1,7 +0,0 @@ -set(src - fw.c - fw_conn.c - fw_prot.c - fw_config.c) - -FLB_PLUGIN(in_forward "${src}" "") diff --git a/src/fluent-bit/plugins/in_forward/fw.c b/src/fluent-bit/plugins/in_forward/fw.c deleted file mode 100644 index b85b198c6..000000000 --- a/src/fluent-bit/plugins/in_forward/fw.c +++ /dev/null @@ -1,325 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_input.h> -#include <fluent-bit/flb_engine.h> -#include <fluent-bit/flb_downstream.h> -#include <fluent-bit/flb_input_plugin.h> -#include <fluent-bit/flb_network.h> -#include <msgpack.h> - -#ifdef FLB_HAVE_UNIX_SOCKET -#include <sys/socket.h> -#include <sys/un.h> -#include <sys/stat.h> -#endif - -#include "fw.h" -#include "fw_conn.h" -#include "fw_config.h" - -#ifdef FLB_HAVE_UNIX_SOCKET -static int remove_existing_socket_file(char *socket_path) -{ - struct stat file_data; - int result; - - result = stat(socket_path, &file_data); - - if (result == -1) { - if (errno == ENOENT) { - return 0; - } - - flb_errno(); - - return -1; - } - - if (S_ISSOCK(file_data.st_mode) == 0) { - return -2; - } - - result = unlink(socket_path); - - if (result != 0) { - return -3; - } - - return 0; -} - -static int fw_unix_create(struct flb_in_fw_config *ctx) -{ - int ret; - - ret = remove_existing_socket_file(ctx->unix_path); - - if (ret != 0) { - if (ret == -2) { - flb_plg_error(ctx->ins, - "%s exists and it is not a unix socket. Aborting", - ctx->unix_path); - } - else { - flb_plg_error(ctx->ins, - "could not remove existing unix socket %s. Aborting", - ctx->unix_path); - } - - return -1; - } - - ctx->downstream = flb_downstream_create(FLB_TRANSPORT_UNIX_STREAM, - ctx->ins->flags, - ctx->unix_path, - 0, - ctx->ins->tls, - ctx->ins->config, - &ctx->ins->net_setup); - - if (ctx->downstream == NULL) { - return -1; - } - - if (ctx->unix_perm_str) { - if (chmod(ctx->unix_path, ctx->unix_perm)) { - flb_errno(); - - flb_plg_error(ctx->ins, "cannot set permission on '%s' to %04o", - ctx->unix_path, ctx->unix_perm); - - return -1; - } - } - - return 0; -} -#endif - -/* - * For a server event, the collection event means a new client have arrived, we - * accept the connection and create a new FW instance which will wait for - * MessagePack records. - */ -static int in_fw_collect(struct flb_input_instance *ins, - struct flb_config *config, void *in_context) -{ - struct flb_connection *connection; - struct fw_conn *conn; - struct flb_in_fw_config *ctx; - - ctx = in_context; - - connection = flb_downstream_conn_get(ctx->downstream); - - if (connection == NULL) { - flb_plg_error(ctx->ins, "could not accept new connection"); - - return -1; - } - - if (!config->is_ingestion_active) { - flb_downstream_conn_release(connection); - - return -1; - } - - flb_plg_trace(ins, "new TCP connection arrived FD=%i", connection->fd); - - conn = fw_conn_add(connection, ctx); - - if (!conn) { - return -1; - } - - return 0; -} - -/* Initialize plugin */ -static int in_fw_init(struct flb_input_instance *ins, - struct flb_config *config, void *data) -{ - unsigned short int port; - int ret; - struct flb_in_fw_config *ctx; - - (void) data; - - /* Allocate space for the configuration */ - ctx = fw_config_init(ins); - if (!ctx) { - return -1; - } - - ctx->coll_fd = -1; - ctx->ins = ins; - mk_list_init(&ctx->connections); - - /* Set the context */ - flb_input_set_context(ins, ctx); - - /* Unix Socket mode */ - if (ctx->unix_path) { -#ifndef FLB_HAVE_UNIX_SOCKET - flb_plg_error(ctx->ins, "unix address is not supported %s:%s. Aborting", - ctx->listen, ctx->tcp_port); - fw_config_destroy(ctx); - return -1; -#else - ret = fw_unix_create(ctx); - if (ret != 0) { - flb_plg_error(ctx->ins, "could not listen on unix://%s", - ctx->unix_path); - fw_config_destroy(ctx); - return -1; - } - flb_plg_info(ctx->ins, "listening on unix://%s", ctx->unix_path); -#endif - } - else { - port = (unsigned short int) strtoul(ctx->tcp_port, NULL, 10); - - ctx->downstream = flb_downstream_create(FLB_TRANSPORT_TCP, - ctx->ins->flags, - ctx->listen, - port, - ctx->ins->tls, - config, - &ctx->ins->net_setup); - - if (ctx->downstream == NULL) { - flb_plg_error(ctx->ins, - "could not initialize downstream on unix://%s. Aborting", - ctx->listen); - - fw_config_destroy(ctx); - - return -1; - } - - if (ctx->downstream != NULL) { - flb_plg_info(ctx->ins, "listening on %s:%s", - ctx->listen, ctx->tcp_port); - } - else { - flb_plg_error(ctx->ins, "could not bind address %s:%s. Aborting", - ctx->listen, ctx->tcp_port); - - fw_config_destroy(ctx); - - return -1; - } - } - - flb_input_downstream_set(ctx->downstream, ctx->ins); - - flb_net_socket_nonblocking(ctx->downstream->server_fd); - - /* Collect upon data available on the standard input */ - ret = flb_input_set_collector_socket(ins, - in_fw_collect, - ctx->downstream->server_fd, - config); - if (ret == -1) { - flb_plg_error(ctx->ins, "could not set server socket collector"); - fw_config_destroy(ctx); - return -1; - } - - ctx->coll_fd = ret; - - return 0; -} - -static void in_fw_pause(void *data, struct flb_config *config) -{ - struct flb_in_fw_config *ctx = data; - - /* - * If the plugin is paused AND the ingestion not longer active, - * it means we are in a shutdown phase. This plugin can safetly - * close the socket server collector. - * - * This socket stop is a workaround since the server API will be - * refactored shortly. - */ - if (config->is_ingestion_active == FLB_FALSE) { - fw_conn_del_all(ctx); - } -} - -static int in_fw_exit(void *data, struct flb_config *config) -{ - (void) *config; - struct flb_in_fw_config *ctx = data; - - if (!ctx) { - return 0; - } - - fw_conn_del_all(ctx); - fw_config_destroy(ctx); - return 0; -} - -/* Configuration properties map */ -static struct flb_config_map config_map[] = { - { - FLB_CONFIG_MAP_STR, "tag_prefix", NULL, - 0, FLB_TRUE, offsetof(struct flb_in_fw_config, tag_prefix), - "Prefix incoming tag with the defined value." - }, - { - FLB_CONFIG_MAP_STR, "unix_path", NULL, - 0, FLB_TRUE, offsetof(struct flb_in_fw_config, unix_path), - "The path to unix socket to receive a Forward message." - }, - { - FLB_CONFIG_MAP_STR, "unix_perm", (char *)NULL, - 0, FLB_TRUE, offsetof(struct flb_in_fw_config, unix_perm_str), - "Set the permissions for the UNIX socket" - }, - { - FLB_CONFIG_MAP_SIZE, "buffer_chunk_size", FLB_IN_FW_CHUNK_SIZE, - 0, FLB_TRUE, offsetof(struct flb_in_fw_config, buffer_chunk_size), - "The buffer memory size used to receive a Forward message." - }, - { - FLB_CONFIG_MAP_SIZE, "buffer_max_size", FLB_IN_FW_CHUNK_MAX_SIZE, - 0, FLB_TRUE, offsetof(struct flb_in_fw_config, buffer_max_size), - "The maximum buffer memory size used to receive a Forward message." - }, - {0} -}; - -/* Plugin reference */ -struct flb_input_plugin in_forward_plugin = { - .name = "forward", - .description = "Fluentd in-forward", - .cb_init = in_fw_init, - .cb_pre_run = NULL, - .cb_collect = in_fw_collect, - .cb_flush_buf = NULL, - .cb_pause = in_fw_pause, - .cb_exit = in_fw_exit, - .config_map = config_map, - .flags = FLB_INPUT_NET_SERVER | FLB_IO_OPT_TLS -}; diff --git a/src/fluent-bit/plugins/in_forward/fw.h b/src/fluent-bit/plugins/in_forward/fw.h deleted file mode 100644 index 454f255b9..000000000 --- a/src/fluent-bit/plugins/in_forward/fw.h +++ /dev/null @@ -1,52 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_IN_FW_H -#define FLB_IN_FW_H - -#include <msgpack.h> -#include <fluent-bit/flb_input.h> -#include <fluent-bit/flb_log_event_decoder.h> -#include <fluent-bit/flb_log_event_encoder.h> - -struct flb_in_fw_config { - size_t buffer_max_size; /* Max Buffer size */ - size_t buffer_chunk_size; /* Chunk allocation size */ - - /* Network */ - char *listen; /* Listen interface */ - char *tcp_port; /* TCP Port */ - - flb_sds_t tag_prefix; /* tag prefix */ - - /* Unix Socket */ - char *unix_path; /* Unix path for socket */ - unsigned int unix_perm; /* Permission for socket */ - flb_sds_t unix_perm_str; /* Permission (config map) */ - - int coll_fd; - struct flb_downstream *downstream; /* Client manager */ - struct mk_list connections; /* List of active connections */ - struct flb_input_instance *ins; /* Input plugin instace */ - - struct flb_log_event_decoder *log_decoder; - struct flb_log_event_encoder *log_encoder; -}; - -#endif diff --git a/src/fluent-bit/plugins/in_forward/fw_config.c b/src/fluent-bit/plugins/in_forward/fw_config.c deleted file mode 100644 index 7edbab0c7..000000000 --- a/src/fluent-bit/plugins/in_forward/fw_config.c +++ /dev/null @@ -1,120 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <stdlib.h> -#include <fluent-bit/flb_utils.h> -#include <fluent-bit/flb_downstream.h> -#include <fluent-bit/flb_input_plugin.h> - -#include "fw.h" -#include "fw_conn.h" -#include "fw_config.h" - -struct flb_in_fw_config *fw_config_init(struct flb_input_instance *i_ins) -{ - char tmp[16]; - int ret = -1; - const char *p; - struct flb_in_fw_config *config; - - config = flb_calloc(1, sizeof(struct flb_in_fw_config)); - if (!config) { - flb_errno(); - return NULL; - } - config->coll_fd = -1; - - config->log_encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_DEFAULT); - - if (config->log_encoder == NULL) { - flb_plg_error(i_ins, "could not initialize event encoder"); - fw_config_destroy(config); - - return NULL; - } - - config->log_decoder = flb_log_event_decoder_create(NULL, 0); - - if (config->log_decoder == NULL) { - flb_plg_error(i_ins, "could not initialize event decoder"); - fw_config_destroy(config); - - return NULL; - } - - ret = flb_input_config_map_set(i_ins, (void *)config); - if (ret == -1) { - flb_plg_error(i_ins, "config map set error"); - flb_free(config); - return NULL; - } - - p = flb_input_get_property("unix_path", i_ins); - if (p == NULL) { - /* Listen interface (if not set, defaults to 0.0.0.0:24224) */ - flb_input_net_default_listener("0.0.0.0", 24224, i_ins); - config->listen = i_ins->host.listen; - snprintf(tmp, sizeof(tmp) - 1, "%d", i_ins->host.port); - config->tcp_port = flb_strdup(tmp); - } - else { - /* Unix socket mode */ - if (config->unix_perm_str) { - config->unix_perm = strtol(config->unix_perm_str, NULL, 8) & 07777; - } - } - - if (!config->unix_path) { - flb_debug("[in_fw] Listen='%s' TCP_Port=%s", - config->listen, config->tcp_port); - } - return config; -} - -int fw_config_destroy(struct flb_in_fw_config *config) -{ - if (config->log_encoder != NULL) { - flb_log_event_encoder_destroy(config->log_encoder); - } - - if (config->log_decoder != NULL) { - flb_log_event_decoder_destroy(config->log_decoder); - } - - if (config->coll_fd != -1) { - flb_input_collector_delete(config->coll_fd, config->ins); - - config->coll_fd = -1; - } - - if (config->downstream != NULL) { - flb_downstream_destroy(config->downstream); - } - - if (config->unix_path) { - unlink(config->unix_path); - } - else { - flb_free(config->tcp_port); - } - - flb_free(config); - - return 0; -} diff --git a/src/fluent-bit/plugins/in_forward/fw_config.h b/src/fluent-bit/plugins/in_forward/fw_config.h deleted file mode 100644 index bbad17610..000000000 --- a/src/fluent-bit/plugins/in_forward/fw_config.h +++ /dev/null @@ -1,28 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_IN_FW_CONFIG_H -#define FLB_IN_FW_CONFIG_H - -#include "fw.h" - -struct flb_in_fw_config *fw_config_init(struct flb_input_instance *i_ins); -int fw_config_destroy(struct flb_in_fw_config *config); - -#endif diff --git a/src/fluent-bit/plugins/in_forward/fw_conn.c b/src/fluent-bit/plugins/in_forward/fw_conn.c deleted file mode 100644 index 3ccd98c24..000000000 --- a/src/fluent-bit/plugins/in_forward/fw_conn.c +++ /dev/null @@ -1,199 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_input_plugin.h> -#include <fluent-bit/flb_utils.h> -#include <fluent-bit/flb_engine.h> -#include <fluent-bit/flb_network.h> -#include <fluent-bit/flb_downstream.h> - -#include "fw.h" -#include "fw_prot.h" -#include "fw_conn.h" - -/* Callback invoked every time an event is triggered for a connection */ -int fw_conn_event(void *data) -{ - int ret; - int bytes; - int available; - int size; - char *tmp; - struct fw_conn *conn; - struct mk_event *event; - struct flb_in_fw_config *ctx; - struct flb_connection *connection; - - connection = (struct flb_connection *) data; - - conn = connection->user_data; - - ctx = conn->ctx; - - event = &connection->event; - - - if (event->mask & MK_EVENT_READ) { - available = (conn->buf_size - conn->buf_len); - if (available < 1) { - if (conn->buf_size >= ctx->buffer_max_size) { - flb_plg_warn(ctx->ins, "fd=%i incoming data exceed limit (%lu bytes)", - event->fd, (ctx->buffer_max_size)); - fw_conn_del(conn); - return -1; - } - else if (conn->buf_size + ctx->buffer_chunk_size > ctx->buffer_max_size) { - /* no space to add buffer_chunk_size */ - /* set maximum size */ - size = ctx->buffer_max_size; - } - else { - size = conn->buf_size + ctx->buffer_chunk_size; - } - tmp = flb_realloc(conn->buf, size); - if (!tmp) { - flb_errno(); - return -1; - } - flb_plg_trace(ctx->ins, "fd=%i buffer realloc %i -> %i", - event->fd, conn->buf_size, size); - - conn->buf = tmp; - conn->buf_size = size; - available = (conn->buf_size - conn->buf_len); - } - - bytes = flb_io_net_read(connection, - (void *) &conn->buf[conn->buf_len], - available); - - if (bytes > 0) { - flb_plg_trace(ctx->ins, "read()=%i pre_len=%i now_len=%i", - bytes, conn->buf_len, conn->buf_len + bytes); - conn->buf_len += bytes; - - ret = fw_prot_process(ctx->ins, conn); - if (ret == -1) { - fw_conn_del(conn); - return -1; - } - return bytes; - } - else { - flb_plg_trace(ctx->ins, "fd=%i closed connection", event->fd); - fw_conn_del(conn); - return -1; - } - } - - if (event->mask & MK_EVENT_CLOSE) { - flb_plg_trace(ctx->ins, "fd=%i hangup", event->fd); - fw_conn_del(conn); - return -1; - } - return 0; -} - -/* Create a new Forward request instance */ -struct fw_conn *fw_conn_add(struct flb_connection *connection, struct flb_in_fw_config *ctx) -{ - struct fw_conn *conn; - int ret; - - conn = flb_malloc(sizeof(struct fw_conn)); - if (!conn) { - flb_errno(); - - return NULL; - } - - conn->connection = connection; - - /* Set data for the event-loop */ - connection->user_data = conn; - connection->event.type = FLB_ENGINE_EV_CUSTOM; - connection->event.handler = fw_conn_event; - - /* Connection info */ - conn->ctx = ctx; - conn->buf_len = 0; - conn->rest = 0; - conn->status = FW_NEW; - - /* Allocate read buffer */ - conn->buf = flb_malloc(ctx->buffer_chunk_size); - if (!conn->buf) { - flb_errno(); - flb_free(conn); - - return NULL; - } - conn->buf_size = ctx->buffer_chunk_size; - conn->in = ctx->ins; - - /* Register instance into the event loop */ - ret = mk_event_add(flb_engine_evl_get(), - connection->fd, - FLB_ENGINE_EV_CUSTOM, - MK_EVENT_READ, - &connection->event); - if (ret == -1) { - flb_plg_error(ctx->ins, "could not register new connection"); - - flb_free(conn->buf); - flb_free(conn); - - return NULL; - } - - mk_list_add(&conn->_head, &ctx->connections); - - return conn; -} - -int fw_conn_del(struct fw_conn *conn) -{ - /* The downstream unregisters the file descriptor from the event-loop - * so there's nothing to be done by the plugin - */ - flb_downstream_conn_release(conn->connection); - - /* Release resources */ - mk_list_del(&conn->_head); - - flb_free(conn->buf); - flb_free(conn); - - return 0; -} - -int fw_conn_del_all(struct flb_in_fw_config *ctx) -{ - struct mk_list *tmp; - struct mk_list *head; - struct fw_conn *conn; - - mk_list_foreach_safe(head, tmp, &ctx->connections) { - conn = mk_list_entry(head, struct fw_conn, _head); - fw_conn_del(conn); - } - - return 0; -}
\ No newline at end of file diff --git a/src/fluent-bit/plugins/in_forward/fw_conn.h b/src/fluent-bit/plugins/in_forward/fw_conn.h deleted file mode 100644 index 4c04d9400..000000000 --- a/src/fluent-bit/plugins/in_forward/fw_conn.h +++ /dev/null @@ -1,57 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_IN_FW_CONN_H -#define FLB_IN_FW_CONN_H - -#define FLB_IN_FW_CHUNK_SIZE "1024000" /* 1MB */ -#define FLB_IN_FW_CHUNK_MAX_SIZE "6144000" /* =FLB_IN_FW_CHUNK_SIZE * 6. 6MB */ - -enum { - FW_NEW = 1, /* it's a new connection */ - FW_CONNECTED = 2, /* MQTT connection per protocol spec OK */ -}; - -struct fw_conn_stream { - char *tag; - size_t tag_len; -}; - -/* Respresents a connection */ -struct fw_conn { - int status; /* Connection status */ - - /* Buffer */ - char *buf; /* Buffer data */ - int buf_len; /* Data length */ - int buf_size; /* Buffer size */ - size_t rest; /* Unpacking offset */ - - struct flb_input_instance *in; /* Parent plugin instance */ - struct flb_in_fw_config *ctx; /* Plugin configuration context */ - struct flb_connection *connection; - - struct mk_list _head; -}; - -struct fw_conn *fw_conn_add(struct flb_connection *connection, struct flb_in_fw_config *ctx); -int fw_conn_del(struct fw_conn *conn); -int fw_conn_del_all(struct flb_in_fw_config *ctx); - -#endif diff --git a/src/fluent-bit/plugins/in_forward/fw_prot.c b/src/fluent-bit/plugins/in_forward/fw_prot.c deleted file mode 100644 index 2a23b6254..000000000 --- a/src/fluent-bit/plugins/in_forward/fw_prot.c +++ /dev/null @@ -1,846 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_input_plugin.h> -#include <fluent-bit/flb_input.h> -#include <fluent-bit/flb_config.h> -#include <fluent-bit/flb_pack.h> -#include <fluent-bit/flb_utils.h> -#include <fluent-bit/flb_gzip.h> - -#include <fluent-bit/flb_input_metric.h> -#include <fluent-bit/flb_input_trace.h> - -#include <cmetrics/cmetrics.h> -#include <cmetrics/cmt_decode_msgpack.h> - -#include <ctraces/ctraces.h> -#include <ctraces/ctr_decode_msgpack.h> - -#include <msgpack.h> - -#include "fw.h" -#include "fw_prot.h" -#include "fw_conn.h" - -/* Try parsing rounds up-to 32 bytes */ -#define EACH_RECV_SIZE 32 - -static int get_chunk_event_type(struct flb_input_instance *ins, msgpack_object options) -{ - int i; - int type = FLB_EVENT_TYPE_LOGS; - msgpack_object k; - msgpack_object v; - - if (options.type != MSGPACK_OBJECT_MAP) { - flb_plg_error(ins, "invalid options field in record"); - return -1; - } - - for (i = 0; i < options.via.map.size; i++) { - k = options.via.map.ptr[i].key; - v = options.via.map.ptr[i].val; - - if (k.type != MSGPACK_OBJECT_STR) { - return -1; - } - - if (k.via.str.size != 13) { - continue; - } - - if (strncmp(k.via.str.ptr, "fluent_signal", 13) == 0) { - if (v.type != MSGPACK_OBJECT_POSITIVE_INTEGER) { - flb_plg_error(ins, "invalid value type in options fluent_signal"); - return -1; - } - - if (v.via.i64 != FLB_EVENT_TYPE_LOGS && v.via.i64 != FLB_EVENT_TYPE_METRICS && v.via.i64 != FLB_EVENT_TYPE_TRACES) { - flb_plg_error(ins, "invalid value in options fluent_signal"); - return -1; - } - - /* cast should be fine */ - type = (int) v.via.i64; - break; - } - } - - return type; -} - -static int is_gzip_compressed(msgpack_object options) -{ - int i; - msgpack_object k; - msgpack_object v; - - if (options.type != MSGPACK_OBJECT_MAP) { - return -1; - } - - - for (i = 0; i < options.via.map.size; i++) { - k = options.via.map.ptr[i].key; - v = options.via.map.ptr[i].val; - - if (k.type != MSGPACK_OBJECT_STR) { - return -1; - } - - if (k.via.str.size != 10) { - continue; - } - - if (strncmp(k.via.str.ptr, "compressed", 10) == 0) { - if (v.type != MSGPACK_OBJECT_STR) { - return -1; - } - - if (v.via.str.size != 4) { - return -1; - } - - if (strncmp(v.via.str.ptr, "gzip", 4) == 0) { - return FLB_TRUE; - } - else if (strncmp(v.via.str.ptr, "text", 4) == 0) { - return FLB_FALSE; - } - - return -1; - } - } - - return FLB_FALSE; -} - -static int send_ack(struct flb_input_instance *in, struct fw_conn *conn, - msgpack_object chunk) -{ - int result; - size_t sent; - ssize_t bytes; - msgpack_packer mp_pck; - msgpack_sbuffer mp_sbuf; - - msgpack_sbuffer_init(&mp_sbuf); - msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); - - msgpack_pack_map(&mp_pck, 1); - msgpack_pack_str(&mp_pck, 3); - msgpack_pack_str_body(&mp_pck, "ack", 3); - msgpack_pack_object(&mp_pck, chunk); - - - bytes = flb_io_net_write(conn->connection, - (void *) mp_sbuf.data, - mp_sbuf.size, - &sent); - - msgpack_sbuffer_destroy(&mp_sbuf); - - if (bytes == -1) { - flb_plg_error(in, "cannot send ACK response: %.*s", - chunk.via.str.size, chunk.via.str.ptr); - - result = -1; - } - else { - result = 0; - } - - return result; - -} - -static size_t get_options_metadata(msgpack_object *arr, int expected, size_t *idx) -{ - size_t i; - msgpack_object *options; - msgpack_object k; - msgpack_object v; - - if (arr->type != MSGPACK_OBJECT_ARRAY) { - return -1; - } - - /* Make sure the 'expected' entry position is valid for the array size */ - if (expected >= arr->via.array.size) { - return 0; - } - - options = &arr->via.array.ptr[expected]; - if (options->type == MSGPACK_OBJECT_NIL) { - /* - * Old Docker 18.x sends a NULL options parameter, just be friendly and - * let it pass. - */ - return 0; - } - - if (options->type != MSGPACK_OBJECT_MAP) { - return -1; - } - - if (options->via.map.size <= 0) { - return 0; - } - - for (i = 0; i < options->via.map.size; i++) { - k = options->via.map.ptr[i].key; - v = options->via.map.ptr[i].val; - - if (k.type != MSGPACK_OBJECT_STR) { - continue; - } - - if (k.via.str.size != 8) { - continue; - } - - if (strncmp(k.via.str.ptr, "metadata", 8) != 0) { - continue; - } - - if (v.type != MSGPACK_OBJECT_MAP) { - return -1; - } - - *idx = i; - - return 0; - } - - return 0; -} - -static size_t get_options_chunk(msgpack_object *arr, int expected, size_t *idx) -{ - size_t i; - msgpack_object *options; - msgpack_object k; - msgpack_object v; - - if (arr->type != MSGPACK_OBJECT_ARRAY) { - return -1; - } - - /* Make sure the 'expected' entry position is valid for the array size */ - if (expected >= arr->via.array.size) { - return 0; - } - - options = &arr->via.array.ptr[expected]; - if (options->type == MSGPACK_OBJECT_NIL) { - /* - * Old Docker 18.x sends a NULL options parameter, just be friendly and - * let it pass. - */ - return 0; - } - - if (options->type != MSGPACK_OBJECT_MAP) { - return -1; - } - - if (options->via.map.size <= 0) { - return 0; - } - - for (i = 0; i < options->via.map.size; i++) { - k = options->via.map.ptr[i].key; - v = options->via.map.ptr[i].val; - - if (k.type != MSGPACK_OBJECT_STR) { - continue; - } - - if (k.via.str.size != 5) { - continue; - } - - if (strncmp(k.via.str.ptr, "chunk", 5) != 0) { - continue; - } - - if (v.type != MSGPACK_OBJECT_STR) { - return -1; - } - - *idx = i; - return 0; - } - - return 0; -} - -static int fw_process_forward_mode_entry( - struct fw_conn *conn, - const char *tag, int tag_len, - msgpack_object *entry, - int chunk_id) -{ - int result; - struct flb_log_event event; - - result = flb_event_decoder_decode_object(conn->ctx->log_decoder, - &event, entry); - - if (result == FLB_EVENT_DECODER_SUCCESS) { - result = flb_log_event_encoder_begin_record(conn->ctx->log_encoder); - } - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_set_timestamp(conn->ctx->log_encoder, - &event.timestamp); - } - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_set_metadata_from_msgpack_object( - conn->ctx->log_encoder, - event.metadata); - } - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_set_body_from_msgpack_object( - conn->ctx->log_encoder, - event.body); - } - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_commit_record(conn->ctx->log_encoder); - } - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - flb_input_log_append(conn->ctx->ins, tag, tag_len, - conn->ctx->log_encoder->output_buffer, - conn->ctx->log_encoder->output_length); - } - - flb_log_event_encoder_reset(conn->ctx->log_encoder); - - if (result != FLB_EVENT_ENCODER_SUCCESS) { - flb_plg_warn(conn->ctx->ins, "Event decoder failure : %d", result); - - return -1; - } - - return 0; -} - -static int fw_process_message_mode_entry( - struct flb_input_instance *in, - struct fw_conn *conn, - const char *tag, int tag_len, - msgpack_object *root, - msgpack_object *ts, - msgpack_object *body, - int chunk_id, int metadata_id) -{ - struct flb_time timestamp; - msgpack_object *metadata; - msgpack_object options; - int result; - msgpack_object chunk; - - metadata = NULL; - - if (chunk_id != -1 || metadata_id != -1) { - options = root->via.array.ptr[3]; - - if (metadata_id != -1) { - metadata = &options.via.map.ptr[metadata_id].val; - } - } - - result = flb_log_event_decoder_decode_timestamp(ts, ×tamp); - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_begin_record(conn->ctx->log_encoder); - } - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_set_timestamp(conn->ctx->log_encoder, - ×tamp); - } - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - if (metadata != NULL) { - result = flb_log_event_encoder_set_metadata_from_msgpack_object( - conn->ctx->log_encoder, - metadata); - } - } - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_set_body_from_msgpack_object( - conn->ctx->log_encoder, - body); - } - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_commit_record(conn->ctx->log_encoder); - } - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - flb_input_log_append(in, tag, tag_len, - conn->ctx->log_encoder->output_buffer, - conn->ctx->log_encoder->output_length); - } - - flb_log_event_encoder_reset(conn->ctx->log_encoder); - - if (chunk_id != -1) { - chunk = options.via.map.ptr[chunk_id].val; - send_ack(in, conn, chunk); - } - - return 0; -} - -static size_t receiver_recv(struct fw_conn *conn, char *buf, size_t try_size) { - size_t off; - size_t actual_size; - - off = conn->buf_len - conn->rest; - actual_size = try_size; - - if (actual_size > conn->rest) { - actual_size = conn->rest; - } - - memcpy(buf, conn->buf + off, actual_size); - conn->rest -= actual_size; - - return actual_size; -} - -static size_t receiver_to_unpacker(struct fw_conn *conn, size_t request_size, - msgpack_unpacker *unpacker) -{ - size_t recv_len; - - /* make sure there's enough room, or expand the unpacker accordingly */ - if (msgpack_unpacker_buffer_capacity(unpacker) < request_size) { - msgpack_unpacker_reserve_buffer(unpacker, request_size); - assert(msgpack_unpacker_buffer_capacity(unpacker) >= request_size); - } - recv_len = receiver_recv(conn, msgpack_unpacker_buffer(unpacker), - request_size); - msgpack_unpacker_buffer_consumed(unpacker, recv_len); - - return recv_len; -} - -int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn) -{ - int ret; - int stag_len; - int event_type; - int contain_options = FLB_FALSE; - size_t index = 0; - size_t off = 0; - size_t chunk_id = -1; - size_t metadata_id = -1; - const char *stag; - flb_sds_t out_tag = NULL; - size_t bytes; - size_t recv_len; - size_t gz_size; - void *gz_data; - msgpack_object tag; - msgpack_object entry; - msgpack_object map; - msgpack_object root; - msgpack_object chunk; - msgpack_unpacked result; - msgpack_unpacker *unp; - size_t all_used = 0; - struct flb_in_fw_config *ctx = conn->ctx; - struct cmt *cmt; - struct ctrace *ctr; - - /* - * [tag, time, record] - * [tag, [[time,record], [time,record], ...]] - */ - - out_tag = flb_sds_create_size(1024); - if (!out_tag) { - return -1; - } - - unp = msgpack_unpacker_new(1024); - msgpack_unpacked_init(&result); - conn->rest = conn->buf_len; - - while (1) { - recv_len = receiver_to_unpacker(conn, EACH_RECV_SIZE, unp); - if (recv_len == 0) { - /* No more data */ - msgpack_unpacker_free(unp); - msgpack_unpacked_destroy(&result); - - /* Adjust buffer data */ - if (conn->buf_len >= all_used && all_used > 0) { - memmove(conn->buf, conn->buf + all_used, - conn->buf_len - all_used); - conn->buf_len -= all_used; - } - flb_sds_destroy(out_tag); - - return 0; - } - - /* Always summarize the total number of bytes requested to parse */ - ret = msgpack_unpacker_next_with_size(unp, &result, &bytes); - - /* - * Upon parsing or memory errors, break the loop, return the error - * and expect the connection to be closed. - */ - if (ret == MSGPACK_UNPACK_PARSE_ERROR || - ret == MSGPACK_UNPACK_NOMEM_ERROR) { - /* A bit redunant, print out the real error */ - if (ret == MSGPACK_UNPACK_PARSE_ERROR) { - flb_plg_debug(ctx->ins, "err=MSGPACK_UNPACK_PARSE_ERROR"); - } - else { - flb_plg_error(ctx->ins, "err=MSGPACK_UNPACK_NOMEM_ERROR"); - } - - /* Cleanup buffers */ - msgpack_unpacked_destroy(&result); - msgpack_unpacker_free(unp); - flb_sds_destroy(out_tag); - - return -1; - } - - while (ret == MSGPACK_UNPACK_SUCCESS) { - /* - * For buffering optimization we always want to know the total - * number of bytes involved on the new object returned. Despites - * buf_off always know the given bytes, it's likely we used a bit - * less. This 'all_used' field keep a reference per object so - * when returning to the caller we can adjust the source buffer - * and deprecated consumed data. - * - * The 'last_parsed' field is Fluent Bit specific and is documented - * in: - * - * lib/msgpack-c/include/msgpack/unpack.h - * - * Other references: - * - * https://github.com/msgpack/msgpack-c/issues/514 - */ - all_used += bytes; - - - /* Map the array */ - root = result.data; - - if (root.type != MSGPACK_OBJECT_ARRAY) { - flb_plg_debug(ctx->ins, - "parser: expecting an array (type=%i), skip.", - root.type); - msgpack_unpacked_destroy(&result); - msgpack_unpacker_free(unp); - flb_sds_destroy(out_tag); - - return -1; - } - - if (root.via.array.size < 2) { - flb_plg_debug(ctx->ins, - "parser: array of invalid size, skip."); - msgpack_unpacked_destroy(&result); - msgpack_unpacker_free(unp); - flb_sds_destroy(out_tag); - - return -1; - } - - if (root.via.array.size == 3) { - contain_options = FLB_TRUE; - } - - /* Get the tag */ - tag = root.via.array.ptr[0]; - if (tag.type != MSGPACK_OBJECT_STR) { - flb_plg_debug(ctx->ins, - "parser: invalid tag format, skip."); - msgpack_unpacked_destroy(&result); - msgpack_unpacker_free(unp); - flb_sds_destroy(out_tag); - return -1; - } - - /* reference the tag associated with the record */ - stag = tag.via.str.ptr; - stag_len = tag.via.str.size; - - /* clear out_tag before using */ - flb_sds_len_set(out_tag, 0); - - /* Prefix the incoming record tag with a custom prefix */ - if (ctx->tag_prefix) { - /* prefix */ - flb_sds_cat_safe(&out_tag, - ctx->tag_prefix, flb_sds_len(ctx->tag_prefix)); - /* record tag */ - flb_sds_cat_safe(&out_tag, stag, stag_len); - } - else if (ins->tag && !ins->tag_default) { - /* if the input plugin instance Tag has been manually set, use it */ - flb_sds_cat_safe(&out_tag, ins->tag, flb_sds_len(ins->tag)); - } - else { - /* use the tag from the record */ - flb_sds_cat_safe(&out_tag, stag, stag_len); - } - - entry = root.via.array.ptr[1]; - - if (entry.type == MSGPACK_OBJECT_ARRAY) { - /* - * Forward format 1 (forward mode: [tag, [[time, map], ...]] - */ - - /* Check for options */ - chunk_id = -1; - ret = get_options_chunk(&root, 2, &chunk_id); - if (ret == -1) { - flb_plg_debug(ctx->ins, "invalid options field"); - msgpack_unpacked_destroy(&result); - msgpack_unpacker_free(unp); - flb_sds_destroy(out_tag); - - return -1; - } - - /* Process array */ - ret = 0; - - for(index = 0 ; - index < entry.via.array.size && - ret == 0 ; - index++) { - ret = fw_process_forward_mode_entry( - conn, - out_tag, flb_sds_len(out_tag), - &entry.via.array.ptr[index], - chunk_id); - } - - if (chunk_id != -1) { - msgpack_object options; - msgpack_object chunk; - - options = root.via.array.ptr[2]; - chunk = options.via.map.ptr[chunk_id].val; - - send_ack(conn->in, conn, chunk); - } - } - else if (entry.type == MSGPACK_OBJECT_POSITIVE_INTEGER || - entry.type == MSGPACK_OBJECT_EXT) { - /* - * Forward format 2 (message mode) : [tag, time, map, ...] - */ - map = root.via.array.ptr[2]; - if (map.type != MSGPACK_OBJECT_MAP) { - flb_plg_warn(ctx->ins, "invalid data format, map expected"); - msgpack_unpacked_destroy(&result); - msgpack_unpacker_free(unp); - flb_sds_destroy(out_tag); - return -1; - } - - /* Check for options */ - chunk_id = -1; - ret = get_options_chunk(&root, 3, &chunk_id); - if (ret == -1) { - flb_plg_debug(ctx->ins, "invalid options field"); - msgpack_unpacked_destroy(&result); - msgpack_unpacker_free(unp); - flb_sds_destroy(out_tag); - return -1; - } - - metadata_id = -1; - ret = get_options_metadata(&root, 3, &metadata_id); - if (ret == -1) { - flb_plg_debug(ctx->ins, "invalid options field"); - msgpack_unpacked_destroy(&result); - msgpack_unpacker_free(unp); - flb_sds_destroy(out_tag); - return -1; - } - - /* Process map */ - fw_process_message_mode_entry( - conn->in, conn, - out_tag, flb_sds_len(out_tag), - &root, &entry, &map, chunk_id, - metadata_id); - } - else if (entry.type == MSGPACK_OBJECT_STR || - entry.type == MSGPACK_OBJECT_BIN) { - /* PackedForward Mode */ - const char *data = NULL; - size_t len = 0; - - /* Check for options */ - chunk_id = -1; - ret = get_options_chunk(&root, 2, &chunk_id); - if (ret == -1) { - flb_plg_debug(ctx->ins, "invalid options field"); - msgpack_unpacked_destroy(&result); - msgpack_unpacker_free(unp); - flb_sds_destroy(out_tag); - return -1; - } - - if (entry.type == MSGPACK_OBJECT_STR) { - data = entry.via.str.ptr; - len = entry.via.str.size; - } - else if (entry.type == MSGPACK_OBJECT_BIN) { - data = entry.via.bin.ptr; - len = entry.via.bin.size; - } - - if (data) { - ret = is_gzip_compressed(root.via.array.ptr[2]); - if (ret == -1) { - flb_plg_error(ctx->ins, "invalid 'compressed' option"); - msgpack_unpacked_destroy(&result); - msgpack_unpacker_free(unp); - flb_sds_destroy(out_tag); - return -1; - } - - if (ret == FLB_TRUE) { - ret = flb_gzip_uncompress((void *) data, len, - &gz_data, &gz_size); - if (ret == -1) { - flb_plg_error(ctx->ins, "gzip uncompress failure"); - msgpack_unpacked_destroy(&result); - msgpack_unpacker_free(unp); - flb_sds_destroy(out_tag); - return -1; - } - - /* Append uncompressed data */ - flb_input_log_append(conn->in, - out_tag, flb_sds_len(out_tag), - gz_data, gz_size); - flb_free(gz_data); - } - else { - event_type = FLB_EVENT_TYPE_LOGS; - if (contain_options) { - ret = get_chunk_event_type(ins, root.via.array.ptr[2]); - if (ret == -1) { - msgpack_unpacked_destroy(&result); - msgpack_unpacker_free(unp); - flb_sds_destroy(out_tag); - return -1; - } - event_type = ret; - } - - if (event_type == FLB_EVENT_TYPE_LOGS) { - flb_input_log_append(conn->in, - out_tag, flb_sds_len(out_tag), - data, len); - } - else if (event_type == FLB_EVENT_TYPE_METRICS) { - ret = cmt_decode_msgpack_create(&cmt, (char *) data, len, &off); - if (ret == -1) { - msgpack_unpacked_destroy(&result); - msgpack_unpacker_free(unp); - flb_sds_destroy(out_tag); - return -1; - } - flb_input_metrics_append(conn->in, - out_tag, flb_sds_len(out_tag), - cmt); - } - else if (event_type == FLB_EVENT_TYPE_TRACES) { - off = 0; - ret = ctr_decode_msgpack_create(&ctr, (char *) data, len, &off); - if (ret == -1) { - msgpack_unpacked_destroy(&result); - msgpack_unpacker_free(unp); - flb_sds_destroy(out_tag); - return -1; - } - - flb_input_trace_append(ins, - out_tag, flb_sds_len(out_tag), - ctr); - } - } - - /* Handle ACK response */ - if (chunk_id != -1) { - chunk = root.via.array.ptr[2].via.map.ptr[chunk_id].val; - send_ack(ctx->ins, conn, chunk); - } - } - } - else { - flb_plg_warn(ctx->ins, "invalid data format, type=%i", - entry.type); - msgpack_unpacked_destroy(&result); - msgpack_unpacker_free(unp); - return -1; - } - - ret = msgpack_unpacker_next(unp, &result); - } - } - - msgpack_unpacked_destroy(&result); - msgpack_unpacker_free(unp); - flb_sds_destroy(out_tag); - - switch (ret) { - case MSGPACK_UNPACK_EXTRA_BYTES: - flb_plg_error(ctx->ins, "MSGPACK_UNPACK_EXTRA_BYTES"); - return -1; - case MSGPACK_UNPACK_CONTINUE: - flb_plg_trace(ctx->ins, "MSGPACK_UNPACK_CONTINUE"); - return 1; - case MSGPACK_UNPACK_PARSE_ERROR: - flb_plg_debug(ctx->ins, "err=MSGPACK_UNPACK_PARSE_ERROR"); - return -1; - case MSGPACK_UNPACK_NOMEM_ERROR: - flb_plg_error(ctx->ins, "err=MSGPACK_UNPACK_NOMEM_ERROR"); - return -1; - }; - - return 0; -} diff --git a/src/fluent-bit/plugins/in_forward/fw_prot.h b/src/fluent-bit/plugins/in_forward/fw_prot.h deleted file mode 100644 index 67eae5507..000000000 --- a/src/fluent-bit/plugins/in_forward/fw_prot.h +++ /dev/null @@ -1,28 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_IN_FW_PROT_H -#define FLB_IN_FW_PROT_H - -#include "fw_conn.h" - -int fw_prot_parser(struct fw_conn *conn); -int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn); - -#endif |