summaryrefslogtreecommitdiffstats
path: root/fluent-bit/plugins/in_syslog
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/plugins/in_syslog')
-rw-r--r--fluent-bit/plugins/in_syslog/CMakeLists.txt8
-rw-r--r--fluent-bit/plugins/in_syslog/syslog.c263
-rw-r--r--fluent-bit/plugins/in_syslog/syslog.h82
-rw-r--r--fluent-bit/plugins/in_syslog/syslog_conf.c193
-rw-r--r--fluent-bit/plugins/in_syslog/syslog_conf.h32
-rw-r--r--fluent-bit/plugins/in_syslog/syslog_conn.c247
-rw-r--r--fluent-bit/plugins/in_syslog/syslog_conn.h53
-rw-r--r--fluent-bit/plugins/in_syslog/syslog_prot.c324
-rw-r--r--fluent-bit/plugins/in_syslog/syslog_prot.h35
-rw-r--r--fluent-bit/plugins/in_syslog/syslog_server.c235
-rw-r--r--fluent-bit/plugins/in_syslog/syslog_server.h31
11 files changed, 1503 insertions, 0 deletions
diff --git a/fluent-bit/plugins/in_syslog/CMakeLists.txt b/fluent-bit/plugins/in_syslog/CMakeLists.txt
new file mode 100644
index 00000000..88f698b1
--- /dev/null
+++ b/fluent-bit/plugins/in_syslog/CMakeLists.txt
@@ -0,0 +1,8 @@
+set(src
+ syslog_conf.c
+ syslog_server.c
+ syslog_conn.c
+ syslog_prot.c
+ syslog.c)
+
+FLB_PLUGIN(in_syslog "${src}" "")
diff --git a/fluent-bit/plugins/in_syslog/syslog.c b/fluent-bit/plugins/in_syslog/syslog.c
new file mode 100644
index 00000000..d478dfc3
--- /dev/null
+++ b/fluent-bit/plugins/in_syslog/syslog.c
@@ -0,0 +1,263 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include <msgpack.h>
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_downstream.h>
+#include <fluent-bit/flb_config.h>
+#include <fluent-bit/flb_error.h>
+#include <fluent-bit/flb_utils.h>
+
+#include "syslog.h"
+#include "syslog_conf.h"
+#include "syslog_server.h"
+#include "syslog_conn.h"
+#include "syslog_prot.h"
+
+/* cb_collect callback */
+static int in_syslog_collect_tcp(struct flb_input_instance *i_ins,
+ struct flb_config *config, void *in_context)
+{
+ struct flb_connection *connection;
+ struct syslog_conn *conn;
+ struct flb_syslog *ctx;
+
+ (void) i_ins;
+
+ ctx = in_context;
+
+ connection = flb_downstream_conn_get(ctx->downstream);
+
+ if (connection == NULL) {
+ flb_plg_error(ctx->ins, "could not accept new connection");
+
+ return -1;
+ }
+
+ if (ctx->dgram_mode_flag) {
+ return syslog_dgram_conn_event(connection);
+ }
+ else {
+ flb_plg_trace(ctx->ins, "new Unix connection arrived FD=%i", connection->fd);
+
+ conn = syslog_conn_add(connection, ctx);
+
+ if (conn == NULL) {
+ flb_plg_error(ctx->ins, "could not accept new connection");
+
+ flb_downstream_conn_release(connection);
+
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+/*
+ * Collect a datagram, per Syslog specification a datagram contains only
+ * one syslog message and it should not exceed 1KB.
+ */
+static int in_syslog_collect_udp(struct flb_input_instance *i_ins,
+ struct flb_config *config,
+ void *in_context)
+{
+ struct flb_syslog *ctx;
+
+ (void) i_ins;
+
+ ctx = in_context;
+
+ return syslog_dgram_conn_event(ctx->dummy_conn->connection);
+}
+
+/* Initialize plugin */
+static int in_syslog_init(struct flb_input_instance *in,
+ struct flb_config *config, void *data)
+{
+ int ret;
+ struct flb_syslog *ctx;
+ struct flb_connection *connection;
+
+ /* Allocate space for the configuration */
+ ctx = syslog_conf_create(in, config);
+ if (!ctx) {
+ flb_plg_error(in, "could not initialize plugin");
+ return -1;
+ }
+ ctx->collector_id = -1;
+
+ if ((ctx->mode == FLB_SYSLOG_UNIX_TCP || ctx->mode == FLB_SYSLOG_UNIX_UDP)
+ && !ctx->unix_path) {
+ flb_plg_error(ctx->ins, "Unix path not defined");
+ syslog_conf_destroy(ctx);
+ return -1;
+ }
+
+ /* Create Unix Socket */
+ ret = syslog_server_create(ctx);
+ if (ret == -1) {
+ syslog_conf_destroy(ctx);
+ return -1;
+ }
+
+ flb_input_downstream_set(ctx->downstream, ctx->ins);
+
+ if (ctx->dgram_mode_flag) {
+ connection = flb_downstream_conn_get(ctx->downstream);
+
+ if (connection == NULL) {
+ flb_plg_error(ctx->ins, "could not get DGRAM server dummy "
+ "connection");
+
+ syslog_conf_destroy(ctx);
+
+ return -1;
+ }
+
+ ctx->dummy_conn = syslog_conn_add(connection, ctx);
+
+ if (ctx->dummy_conn == NULL) {
+ flb_plg_error(ctx->ins, "could not track DGRAM server dummy "
+ "connection");
+
+ syslog_conf_destroy(ctx);
+
+ return -1;
+ }
+ }
+
+ /* Set context */
+ flb_input_set_context(in, ctx);
+
+ /* Collect events for every opened connection to our socket */
+ if (ctx->mode == FLB_SYSLOG_UNIX_TCP ||
+ ctx->mode == FLB_SYSLOG_TCP) {
+ ret = flb_input_set_collector_socket(in,
+ in_syslog_collect_tcp,
+ ctx->downstream->server_fd,
+ config);
+ }
+ else {
+ ret = flb_input_set_collector_socket(in,
+ in_syslog_collect_udp,
+ ctx->downstream->server_fd,
+ config);
+ }
+
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "Could not set collector");
+ syslog_conf_destroy(ctx);
+
+ return -1;
+ }
+
+ ctx->collector_id = ret;
+ ctx->collector_event = flb_input_collector_get_event(ret, in);
+
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "Could not get collector event");
+ syslog_conf_destroy(ctx);
+
+ return -1;
+ }
+
+ return 0;
+}
+
+static int in_syslog_exit(void *data, struct flb_config *config)
+{
+ struct flb_syslog *ctx = data;
+ (void) config;
+
+ syslog_conn_exit(ctx);
+ syslog_conf_destroy(ctx);
+
+ return 0;
+}
+
+static struct flb_config_map config_map[] = {
+ {
+ FLB_CONFIG_MAP_STR, "mode", (char *)NULL,
+ 0, FLB_TRUE, offsetof(struct flb_syslog, mode_str),
+ "Set the socket mode: unix_tcp, unix_udp, tcp or udp"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "path", (char *)NULL,
+ 0, FLB_TRUE, offsetof(struct flb_syslog, unix_path),
+ "Set the path for the UNIX socket"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "unix_perm", (char *)NULL,
+ 0, FLB_TRUE, offsetof(struct flb_syslog, unix_perm_str),
+ "Set the permissions for the UNIX socket"
+ },
+ {
+ FLB_CONFIG_MAP_SIZE, "buffer_chunk_size", FLB_SYSLOG_CHUNK,
+ 0, FLB_TRUE, offsetof(struct flb_syslog, buffer_chunk_size),
+ "Set the buffer chunk size"
+ },
+ {
+ FLB_CONFIG_MAP_SIZE, "buffer_max_size", (char *)NULL,
+ 0, FLB_TRUE, offsetof(struct flb_syslog, buffer_max_size),
+ "Set the buffer chunk size"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "parser", (char *)NULL,
+ 0, FLB_TRUE, offsetof(struct flb_syslog, parser_name),
+ "Set the parser"
+ },
+ {
+ FLB_CONFIG_MAP_SIZE, "receive_buffer_size", (char *)NULL,
+ 0, FLB_TRUE, offsetof(struct flb_syslog, receive_buffer_size),
+ "Set the socket receiving buffer size"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "raw_message_key", (char *) NULL,
+ 0, FLB_TRUE, offsetof(struct flb_syslog, raw_message_key),
+ "Key where the raw message will be preserved"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "source_address_key", (char *) NULL,
+ 0, FLB_TRUE, offsetof(struct flb_syslog, source_address_key),
+ "Key where the source address will be injected"
+ },
+
+
+ /* EOF */
+ {0}
+};
+
+struct flb_input_plugin in_syslog_plugin = {
+ .name = "syslog",
+ .description = "Syslog",
+ .cb_init = in_syslog_init,
+ .cb_pre_run = NULL,
+ .cb_collect = NULL,
+ .cb_flush_buf = NULL,
+ .cb_exit = in_syslog_exit,
+ .config_map = config_map,
+ .flags = FLB_INPUT_NET_SERVER | FLB_IO_OPT_TLS
+};
diff --git a/fluent-bit/plugins/in_syslog/syslog.h b/fluent-bit/plugins/in_syslog/syslog.h
new file mode 100644
index 00000000..6da2fbd8
--- /dev/null
+++ b/fluent-bit/plugins/in_syslog/syslog.h
@@ -0,0 +1,82 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_IN_SYSLOG_H
+#define FLB_IN_SYSLOG_H
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_input.h>
+#include <fluent-bit/flb_log_event_encoder.h>
+
+/* Syslog modes */
+#define FLB_SYSLOG_UNIX_TCP 1
+#define FLB_SYSLOG_UNIX_UDP 2
+#define FLB_SYSLOG_TCP 3
+#define FLB_SYSLOG_UDP 4
+
+/* 32KB chunk size */
+#define FLB_SYSLOG_CHUNK "32768"
+
+struct syslog_conn;
+
+/* Context / Config*/
+struct flb_syslog {
+ /* Listening mode: unix udp, unix tcp or normal tcp */
+ flb_sds_t mode_str;
+ int mode;
+
+ /* Network mode */
+ char *listen;
+ char *port;
+
+ /* Unix socket (UDP/TCP)*/
+ int server_fd;
+ flb_sds_t unix_path;
+ flb_sds_t unix_perm_str;
+ unsigned int unix_perm;
+ size_t receive_buffer_size;
+
+ /* UDP buffer, data length and buffer size */
+ // char *buffer_data;
+ // size_t buffer_len;
+ // size_t buffer_size;
+
+ /* Buffers setup */
+ size_t buffer_max_size;
+ size_t buffer_chunk_size;
+
+ /* Configuration */
+ flb_sds_t parser_name;
+ struct flb_parser *parser;
+ flb_sds_t raw_message_key;
+ flb_sds_t source_address_key;
+
+ int dgram_mode_flag;
+ int collector_id;
+ struct mk_event *collector_event;
+ struct flb_downstream *downstream;
+ struct syslog_conn *dummy_conn;
+
+ /* List for connections and event loop */
+ struct mk_list connections;
+ struct flb_input_instance *ins;
+ struct flb_log_event_encoder *log_encoder;
+};
+
+#endif
diff --git a/fluent-bit/plugins/in_syslog/syslog_conf.c b/fluent-bit/plugins/in_syslog/syslog_conf.c
new file mode 100644
index 00000000..4db3f162
--- /dev/null
+++ b/fluent-bit/plugins/in_syslog/syslog_conf.c
@@ -0,0 +1,193 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_config.h>
+#include <fluent-bit/flb_input.h>
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_str.h>
+#include <fluent-bit/flb_log.h>
+#include <fluent-bit/flb_parser.h>
+#include <fluent-bit/flb_utils.h>
+
+#include "syslog.h"
+#include "syslog_server.h"
+#include "syslog_conf.h"
+
+struct flb_syslog *syslog_conf_create(struct flb_input_instance *ins,
+ struct flb_config *config)
+{
+ int ret;
+ char port[16];
+ struct flb_syslog *ctx;
+
+ ctx = flb_calloc(1, sizeof(struct flb_syslog));
+
+ if (ctx == NULL) {
+ flb_errno();
+
+ return NULL;
+ }
+
+ ctx->ins = ins;
+
+ ctx->log_encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_DEFAULT);
+
+ if (ctx->log_encoder == NULL) {
+ flb_plg_error(ins, "could not initialize event encoder");
+ syslog_conf_destroy(ctx);
+
+ return NULL;
+ }
+
+ mk_list_init(&ctx->connections);
+
+ ret = flb_input_config_map_set(ins, (void *)ctx);
+ if (ret == -1) {
+ flb_log_event_encoder_destroy(ctx->log_encoder);
+
+ flb_plg_error(ins, "unable to load configuration");
+ flb_free(ctx);
+
+ return NULL;
+ }
+
+ /* Syslog mode: unix_udp, unix_tcp, tcp or udp */
+ if (ctx->mode_str) {
+#ifdef FLB_SYSTEM_WINDOWS
+ if (strcasestr(ctx->mode_str, "unix") != NULL) {
+ flb_log_event_encoder_destroy(ctx->log_encoder);
+
+ flb_plg_error(ins, "unix sockets are note available in windows");
+ flb_free(ctx);
+
+ return NULL;
+ }
+
+#undef FLB_SYSLOG_UNIX_UDP
+#define FLB_SYSLOG_UNIX_UDP FLB_SYSLOG_UDP
+#endif
+ if (strcasecmp(ctx->mode_str, "unix_tcp") == 0) {
+ ctx->mode = FLB_SYSLOG_UNIX_TCP;
+ }
+ else if (strcasecmp(ctx->mode_str, "unix_udp") == 0) {
+ ctx->mode = FLB_SYSLOG_UNIX_UDP;
+ }
+ else if (strcasecmp(ctx->mode_str, "tcp") == 0) {
+ ctx->mode = FLB_SYSLOG_TCP;
+ }
+ else if (strcasecmp(ctx->mode_str, "udp") == 0) {
+ ctx->mode = FLB_SYSLOG_UDP;
+ }
+ else {
+ flb_log_event_encoder_destroy(ctx->log_encoder);
+
+ flb_error("[in_syslog] Unknown syslog mode %s", ctx->mode_str);
+ flb_free(ctx);
+ return NULL;
+ }
+ }
+ else {
+ ctx->mode = FLB_SYSLOG_UNIX_UDP;
+ }
+
+ /* Check if TCP mode was requested */
+ if (ctx->mode == FLB_SYSLOG_TCP || ctx->mode == FLB_SYSLOG_UDP) {
+ /* Listen interface (if not set, defaults to 0.0.0.0:5140) */
+ flb_input_net_default_listener("0.0.0.0", 5140, ins);
+ ctx->listen = ins->host.listen;
+ snprintf(port, sizeof(port) - 1, "%d", ins->host.port);
+ ctx->port = flb_strdup(port);
+ }
+
+ /* Unix socket path and permission */
+ if (ctx->mode == FLB_SYSLOG_UNIX_UDP || ctx->mode == FLB_SYSLOG_UNIX_TCP) {
+ if (ctx->unix_perm_str) {
+ ctx->unix_perm = strtol(ctx->unix_perm_str, NULL, 8) & 07777;
+ } else {
+ ctx->unix_perm = 0644;
+ }
+ }
+
+ /* Buffer Chunk Size */
+ if (ctx->buffer_chunk_size == -1) {
+ flb_log_event_encoder_destroy(ctx->log_encoder);
+
+ flb_plg_error(ins, "invalid buffer_chunk_size");
+ flb_free(ctx);
+ return NULL;
+ }
+
+ /* Buffer Max Size */
+ if (ctx->buffer_max_size == -1) {
+ flb_log_event_encoder_destroy(ctx->log_encoder);
+
+ flb_plg_error(ins, "invalid buffer_max_size");
+ flb_free(ctx);
+ return NULL;
+ }
+ else if (ctx->buffer_max_size == 0) {
+ ctx->buffer_max_size = ctx->buffer_chunk_size;
+ }
+
+ /* Socket rcv buffer size */
+ if (ctx->receive_buffer_size == -1 || ctx->receive_buffer_size>INT_MAX) {
+ flb_log_event_encoder_destroy(ctx->log_encoder);
+
+ flb_plg_error(ins, "invalid receive_buffer_size");
+ flb_free(ctx);
+ return NULL;
+ }
+
+ /* Parser */
+ if (ctx->parser_name) {
+ ctx->parser = flb_parser_get(ctx->parser_name, config);
+ }
+ else {
+ if (ctx->mode == FLB_SYSLOG_TCP || ctx->mode == FLB_SYSLOG_UDP) {
+ ctx->parser = flb_parser_get("syslog-rfc5424", config);
+ }
+ else {
+ ctx->parser = flb_parser_get("syslog-rfc3164-local", config);
+ }
+ }
+
+ if (!ctx->parser) {
+ flb_log_event_encoder_destroy(ctx->log_encoder);
+
+ flb_error("[in_syslog] parser not set");
+ syslog_conf_destroy(ctx);
+ return NULL;
+ }
+
+ return ctx;
+}
+
+int syslog_conf_destroy(struct flb_syslog *ctx)
+{
+ if (ctx->log_encoder != NULL) {
+ flb_log_event_encoder_destroy(ctx->log_encoder);
+ }
+
+ syslog_server_destroy(ctx);
+
+ flb_free(ctx);
+
+ return 0;
+}
diff --git a/fluent-bit/plugins/in_syslog/syslog_conf.h b/fluent-bit/plugins/in_syslog/syslog_conf.h
new file mode 100644
index 00000000..ac230403
--- /dev/null
+++ b/fluent-bit/plugins/in_syslog/syslog_conf.h
@@ -0,0 +1,32 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_IN_SYSLOG_CONF_H
+#define FLB_IN_SYSLOG_CONF_H
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_input.h>
+
+#include "syslog.h"
+
+struct flb_syslog *syslog_conf_create(struct flb_input_instance *i_ins,
+ struct flb_config *config);
+int syslog_conf_destroy(struct flb_syslog *ctx);
+
+#endif
diff --git a/fluent-bit/plugins/in_syslog/syslog_conn.c b/fluent-bit/plugins/in_syslog/syslog_conn.c
new file mode 100644
index 00000000..8785c1e8
--- /dev/null
+++ b/fluent-bit/plugins/in_syslog/syslog_conn.c
@@ -0,0 +1,247 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_engine.h>
+#include <fluent-bit/flb_network.h>
+#include <fluent-bit/flb_downstream.h>
+
+#include "syslog.h"
+#include "syslog_conf.h"
+#include "syslog_conn.h"
+#include "syslog_prot.h"
+
+/* Callback invoked every time an event is triggered for a connection */
+int syslog_conn_event(void *data)
+{
+ struct flb_connection *connection;
+ struct syslog_conn *conn;
+ struct flb_syslog *ctx;
+
+ connection = (struct flb_connection *) data;
+
+ conn = connection->user_data;
+
+ ctx = conn->ctx;
+
+ if (ctx->dgram_mode_flag) {
+ return syslog_dgram_conn_event(data);
+ }
+
+ return syslog_stream_conn_event(data);
+}
+
+int syslog_stream_conn_event(void *data)
+{
+ int ret;
+ int bytes;
+ int available;
+ size_t size;
+ char *tmp;
+ struct mk_event *event;
+ struct syslog_conn *conn;
+ struct flb_syslog *ctx;
+ struct flb_connection *connection;
+
+ connection = (struct flb_connection *) data;
+
+ conn = connection->user_data;
+
+ ctx = conn->ctx;
+
+ event = &connection->event;
+
+ if (event->mask & MK_EVENT_READ) {
+ available = (conn->buf_size - conn->buf_len) - 1;
+ if (available < 1) {
+ if (conn->buf_size + ctx->buffer_chunk_size > ctx->buffer_max_size) {
+ flb_plg_debug(ctx->ins,
+ "fd=%i incoming data exceed limit (%zd bytes)",
+ event->fd, (ctx->buffer_max_size));
+ syslog_conn_del(conn);
+ return -1;
+ }
+
+ size = conn->buf_size + ctx->buffer_chunk_size;
+ tmp = flb_realloc(conn->buf_data, size);
+ if (!tmp) {
+ flb_errno();
+ return -1;
+ }
+ flb_plg_trace(ctx->ins, "fd=%i buffer realloc %zd -> %zd",
+ event->fd, conn->buf_size, size);
+
+ conn->buf_data = tmp;
+ conn->buf_size = size;
+ available = (conn->buf_size - conn->buf_len) - 1;
+ }
+
+ bytes = flb_io_net_read(connection,
+ (void *) &conn->buf_data[conn->buf_len],
+ available);
+
+ if (bytes > 0) {
+ flb_plg_trace(ctx->ins, "read()=%i pre_len=%zu now_len=%zu",
+ bytes, conn->buf_len, conn->buf_len + bytes);
+ conn->buf_len += bytes;
+ conn->buf_data[conn->buf_len] = '\0';
+ ret = syslog_prot_process(conn);
+ if (ret == -1) {
+ return -1;
+ }
+ return bytes;
+ }
+ else {
+ flb_plg_trace(ctx->ins, "fd=%i closed connection", event->fd);
+ syslog_conn_del(conn);
+ return -1;
+ }
+ }
+
+ if (event->mask & MK_EVENT_CLOSE) {
+ flb_plg_trace(ctx->ins, "fd=%i hangup", event->fd);
+ syslog_conn_del(conn);
+ return -1;
+ }
+ return 0;
+}
+
+int syslog_dgram_conn_event(void *data)
+{
+ struct flb_connection *connection;
+ int bytes;
+ struct syslog_conn *conn;
+
+ connection = (struct flb_connection *) data;
+
+ conn = connection->user_data;
+
+ bytes = flb_io_net_read(connection,
+ (void *) &conn->buf_data[conn->buf_len],
+ conn->buf_size - 1);
+
+ if (bytes > 0) {
+ conn->buf_data[bytes] = '\0';
+ conn->buf_len = bytes;
+
+ syslog_prot_process_udp(conn);
+ }
+ else {
+ flb_errno();
+ }
+
+ conn->buf_len = 0;
+
+ return 0;
+}
+
+/* Create a new mqtt request instance */
+struct syslog_conn *syslog_conn_add(struct flb_connection *connection,
+ struct flb_syslog *ctx)
+{
+ int ret;
+ struct syslog_conn *conn;
+
+ conn = flb_malloc(sizeof(struct syslog_conn));
+ if (!conn) {
+ return NULL;
+ }
+
+ conn->connection = connection;
+
+ /* Set data for the event-loop */
+ MK_EVENT_NEW(&connection->event);
+
+ connection->user_data = conn;
+ connection->event.type = FLB_ENGINE_EV_CUSTOM;
+ connection->event.handler = syslog_conn_event;
+
+ /* Connection info */
+ conn->ctx = ctx;
+ conn->ins = ctx->ins;
+ conn->buf_len = 0;
+ conn->buf_parsed = 0;
+
+ /* Allocate read buffer */
+ conn->buf_data = flb_malloc(ctx->buffer_chunk_size);
+ if (!conn->buf_data) {
+ flb_errno();
+
+ flb_free(conn);
+
+ return NULL;
+ }
+ conn->buf_size = ctx->buffer_chunk_size;
+
+ /* Register instance into the event loop if we're in
+ * stream mode (UDP events are received through the collector)
+ */
+ if (!ctx->dgram_mode_flag) {
+ ret = mk_event_add(flb_engine_evl_get(),
+ connection->fd,
+ FLB_ENGINE_EV_CUSTOM,
+ MK_EVENT_READ,
+ &connection->event);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "could not register new connection");
+
+ flb_free(conn->buf_data);
+ flb_free(conn);
+
+ return NULL;
+ }
+ }
+
+ mk_list_add(&conn->_head, &ctx->connections);
+
+ return conn;
+}
+
+int syslog_conn_del(struct syslog_conn *conn)
+{
+ /* The downstream unregisters the file descriptor from the event-loop
+ * so there's nothing to be done by the plugin
+ */
+ if (!conn->ctx->dgram_mode_flag) {
+ flb_downstream_conn_release(conn->connection);
+ }
+
+ /* Release resources */
+ mk_list_del(&conn->_head);
+
+ flb_free(conn->buf_data);
+ flb_free(conn);
+
+ return 0;
+}
+
+int syslog_conn_exit(struct flb_syslog *ctx)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct syslog_conn *conn;
+
+ mk_list_foreach_safe(head, tmp, &ctx->connections) {
+ conn = mk_list_entry(head, struct syslog_conn, _head);
+ syslog_conn_del(conn);
+ }
+
+ return 0;
+}
diff --git a/fluent-bit/plugins/in_syslog/syslog_conn.h b/fluent-bit/plugins/in_syslog/syslog_conn.h
new file mode 100644
index 00000000..684d3f9a
--- /dev/null
+++ b/fluent-bit/plugins/in_syslog/syslog_conn.h
@@ -0,0 +1,53 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_IN_SYSLOG_CONN_H
+#define FLB_IN_SYSLOG_CONN_H
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_config.h>
+#include <fluent-bit/flb_connection.h>
+
+#include "syslog.h"
+
+/* Respresents a connection */
+struct syslog_conn {
+ int status; /* Connection status */
+
+ /* Buffer */
+ char *buf_data; /* Buffer data */
+ size_t buf_size; /* Buffer size */
+ size_t buf_len; /* Buffer length */
+ size_t buf_parsed; /* Parsed buffer (offset) */
+ struct flb_input_instance *ins; /* Parent plugin instance */
+ struct flb_syslog *ctx; /* Plugin configuration context */
+ struct flb_connection *connection;
+
+ struct mk_list _head;
+};
+
+int syslog_conn_event(void *data);
+int syslog_stream_conn_event(void *data);
+int syslog_dgram_conn_event(void *data);
+struct syslog_conn *syslog_conn_add(struct flb_connection *connection,
+ struct flb_syslog *ctx);
+int syslog_conn_del(struct syslog_conn *conn);
+int syslog_conn_exit(struct flb_syslog *ctx);
+
+#endif
diff --git a/fluent-bit/plugins/in_syslog/syslog_prot.c b/fluent-bit/plugins/in_syslog/syslog_prot.c
new file mode 100644
index 00000000..1ec2c97c
--- /dev/null
+++ b/fluent-bit/plugins/in_syslog/syslog_prot.c
@@ -0,0 +1,324 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_parser.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_pack.h>
+
+#include "syslog.h"
+#include "syslog_conn.h"
+#include "syslog_prot.h"
+
+#include <string.h>
+
+static inline void consume_bytes(char *buf, int bytes, int length)
+{
+ memmove(buf, buf + bytes, length - bytes);
+}
+
+static int append_message_to_record_data(char **result_buffer,
+ size_t *result_size,
+ flb_sds_t message_key_name,
+ char *base_object_buffer,
+ size_t base_object_size,
+ char *message_buffer,
+ size_t message_size,
+ int message_type)
+{
+ int result = FLB_MAP_NOT_MODIFIED;
+ char *modified_data_buffer;
+ int modified_data_size;
+ msgpack_object_kv *new_map_entries[1];
+ msgpack_object_kv message_entry;
+ *result_buffer = NULL;
+ *result_size = 0;
+ modified_data_buffer = NULL;
+
+ if (message_key_name != NULL) {
+ new_map_entries[0] = &message_entry;
+
+ message_entry.key.type = MSGPACK_OBJECT_STR;
+ message_entry.key.via.str.size = flb_sds_len(message_key_name);
+ message_entry.key.via.str.ptr = message_key_name;
+
+ if (message_type == MSGPACK_OBJECT_BIN) {
+ message_entry.val.type = MSGPACK_OBJECT_BIN;
+ message_entry.val.via.bin.size = message_size;
+ message_entry.val.via.bin.ptr = message_buffer;
+ }
+ else if (message_type == MSGPACK_OBJECT_STR) {
+ message_entry.val.type = MSGPACK_OBJECT_STR;
+ message_entry.val.via.str.size = message_size;
+ message_entry.val.via.str.ptr = message_buffer;
+ }
+ else {
+ result = FLB_MAP_EXPANSION_INVALID_VALUE_TYPE;
+ }
+
+ if (result == FLB_MAP_NOT_MODIFIED) {
+ result = flb_msgpack_expand_map(base_object_buffer,
+ base_object_size,
+ new_map_entries, 1,
+ &modified_data_buffer,
+ &modified_data_size);
+ if (result == 0) {
+ result = FLB_MAP_EXPAND_SUCCESS;
+ }
+ else {
+ result = FLB_MAP_EXPANSION_ERROR;
+ }
+ }
+ }
+
+ if (result == FLB_MAP_EXPAND_SUCCESS) {
+ *result_buffer = modified_data_buffer;
+ *result_size = modified_data_size;
+ }
+
+ return result;
+}
+
+static inline int pack_line(struct flb_syslog *ctx,
+ struct flb_time *time,
+ struct flb_connection *connection,
+ char *data, size_t data_size,
+ char *raw_data, size_t raw_data_size)
+{
+ char *modified_data_buffer;
+ size_t modified_data_size;
+ char *appended_address_buffer;
+ size_t appended_address_size;
+ int result;
+ char *source_address;
+
+ source_address = NULL;
+ modified_data_buffer = NULL;
+ appended_address_buffer = NULL;
+
+ if (ctx->raw_message_key != NULL) {
+ result = append_message_to_record_data(&modified_data_buffer,
+ &modified_data_size,
+ ctx->raw_message_key,
+ data,
+ data_size,
+ raw_data,
+ raw_data_size,
+ MSGPACK_OBJECT_BIN);
+
+ if (result == FLB_MAP_EXPANSION_ERROR) {
+ flb_plg_debug(ctx->ins, "error expanding raw message : %d", result);
+ }
+ }
+
+ if (ctx->source_address_key != NULL) {
+ source_address = flb_connection_get_remote_address(connection);
+ if (source_address != NULL) {
+ if (modified_data_buffer != NULL) {
+ result = append_message_to_record_data(&appended_address_buffer,
+ &appended_address_size,
+ ctx->source_address_key,
+ modified_data_buffer,
+ modified_data_size,
+ source_address,
+ strlen(source_address),
+ MSGPACK_OBJECT_STR);
+ }
+ else {
+ result = append_message_to_record_data(&appended_address_buffer,
+ &appended_address_size,
+ ctx->source_address_key,
+ data,
+ data_size,
+ source_address,
+ strlen(source_address),
+ MSGPACK_OBJECT_STR);
+ }
+
+ if (result == FLB_MAP_EXPANSION_ERROR) {
+ flb_plg_debug(ctx->ins, "error expanding source_address : %d", result);
+ }
+ }
+ }
+
+ result = flb_log_event_encoder_begin_record(ctx->log_encoder);
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_set_timestamp(ctx->log_encoder, time);
+ }
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ if (appended_address_buffer != NULL) {
+ result = flb_log_event_encoder_set_body_from_raw_msgpack(
+ ctx->log_encoder, appended_address_buffer, appended_address_size);
+ }
+ else if (modified_data_buffer != NULL) {
+ result = flb_log_event_encoder_set_body_from_raw_msgpack(
+ ctx->log_encoder, modified_data_buffer, modified_data_size);
+ }
+ else {
+ result = flb_log_event_encoder_set_body_from_raw_msgpack(
+ ctx->log_encoder, data, data_size);
+ }
+ }
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_commit_record(ctx->log_encoder);
+ }
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ flb_input_log_append(ctx->ins, NULL, 0,
+ ctx->log_encoder->output_buffer,
+ ctx->log_encoder->output_length);
+ result = 0;
+ }
+ else {
+ flb_plg_error(ctx->ins, "log event encoding error : %d", result);
+
+ result = -1;
+ }
+
+ flb_log_event_encoder_reset(ctx->log_encoder);
+
+ if (modified_data_buffer != NULL) {
+ flb_free(modified_data_buffer);
+ }
+ if (appended_address_buffer != NULL) {
+ flb_free(appended_address_buffer);
+ }
+
+ return result;
+}
+
+int syslog_prot_process(struct syslog_conn *conn)
+{
+ int len;
+ int ret;
+ char *p;
+ char *eof;
+ char *end;
+ void *out_buf;
+ size_t out_size;
+ struct flb_time out_time;
+ struct flb_syslog *ctx = conn->ctx;
+
+ eof = conn->buf_data;
+ end = conn->buf_data + conn->buf_len;
+
+ /* Always parse while some remaining bytes exists */
+ while (eof < end) {
+ /* Lookup the ending byte */
+ eof = p = conn->buf_data + conn->buf_parsed;
+ while (*eof != '\n' && *eof != '\0' && eof < end) {
+ eof++;
+ }
+
+ /* Incomplete message */
+ if (eof == end || (*eof != '\n' && *eof != '\0')) {
+ break;
+ }
+
+ /* No data ? */
+ len = (eof - p);
+ if (len == 0) {
+ consume_bytes(conn->buf_data, 1, conn->buf_len);
+ conn->buf_len--;
+ conn->buf_parsed = 0;
+ conn->buf_data[conn->buf_len] = '\0';
+ end = conn->buf_data + conn->buf_len;
+
+ if (conn->buf_len == 0) {
+ break;
+ }
+
+ continue;
+ }
+
+ /* Process the string */
+ ret = flb_parser_do(ctx->parser, p, len,
+ &out_buf, &out_size, &out_time);
+ if (ret >= 0) {
+ if (flb_time_to_nanosec(&out_time) == 0L) {
+ flb_time_get(&out_time);
+ }
+ pack_line(ctx, &out_time,
+ conn->connection,
+ out_buf, out_size,
+ p, len);
+ flb_free(out_buf);
+ }
+ else {
+ flb_plg_warn(ctx->ins, "error parsing log message with parser '%s'",
+ ctx->parser->name);
+ flb_plg_debug(ctx->ins, "unparsed log message: %.*s", len, p);
+ }
+
+ conn->buf_parsed += len + 1;
+ end = conn->buf_data + conn->buf_len;
+ eof = conn->buf_data + conn->buf_parsed;
+ }
+
+ if (conn->buf_parsed > 0) {
+ consume_bytes(conn->buf_data, conn->buf_parsed, conn->buf_len);
+ conn->buf_len -= conn->buf_parsed;
+ conn->buf_parsed = 0;
+ conn->buf_data[conn->buf_len] = '\0';
+ }
+
+ return 0;
+}
+
+int syslog_prot_process_udp(struct syslog_conn *conn)
+{
+ int ret;
+ void *out_buf;
+ size_t out_size;
+ struct flb_time out_time = {0};
+ char *buf;
+ size_t size;
+ struct flb_syslog *ctx;
+ struct flb_connection *connection;
+
+ buf = conn->buf_data;
+ size = conn->buf_len;
+ ctx = conn->ctx;
+ connection = conn->connection;
+
+ ret = flb_parser_do(ctx->parser, buf, size,
+ &out_buf, &out_size, &out_time);
+ if (ret >= 0) {
+ if (flb_time_to_double(&out_time) == 0) {
+ flb_time_get(&out_time);
+ }
+ pack_line(ctx, &out_time,
+ connection,
+ out_buf, out_size,
+ buf, size);
+ flb_free(out_buf);
+ }
+ else {
+ flb_plg_warn(ctx->ins, "error parsing log message with parser '%s'",
+ ctx->parser->name);
+ flb_plg_debug(ctx->ins, "unparsed log message: %.*s",
+ (int) size, buf);
+ return -1;
+ }
+
+ return 0;
+}
diff --git a/fluent-bit/plugins/in_syslog/syslog_prot.h b/fluent-bit/plugins/in_syslog/syslog_prot.h
new file mode 100644
index 00000000..cb5976b7
--- /dev/null
+++ b/fluent-bit/plugins/in_syslog/syslog_prot.h
@@ -0,0 +1,35 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_IN_SYSLOG_PROT_H
+#define FLB_IN_SYSLOG_PROT_H
+
+#include <fluent-bit/flb_info.h>
+
+#include "syslog.h"
+
+#define FLB_MAP_EXPAND_SUCCESS 0
+#define FLB_MAP_NOT_MODIFIED -1
+#define FLB_MAP_EXPANSION_ERROR -2
+#define FLB_MAP_EXPANSION_INVALID_VALUE_TYPE -3
+
+int syslog_prot_process(struct syslog_conn *conn);
+int syslog_prot_process_udp(struct syslog_conn *conn);
+
+#endif
diff --git a/fluent-bit/plugins/in_syslog/syslog_server.c b/fluent-bit/plugins/in_syslog/syslog_server.c
new file mode 100644
index 00000000..5317851d
--- /dev/null
+++ b/fluent-bit/plugins/in_syslog/syslog_server.c
@@ -0,0 +1,235 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_macros.h>
+#include <fluent-bit/flb_log.h>
+#include <fluent-bit/flb_socket.h>
+#include <fluent-bit/flb_network.h>
+#include <fluent-bit/tls/flb_tls.h>
+#include <fluent-bit/flb_downstream.h>
+#include <fluent-bit/flb_input_plugin.h>
+
+#if !defined(FLB_SYSTEM_WINDOWS)
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#endif
+#include <sys/stat.h>
+#include <sys/types.h>
+
+#include "syslog.h"
+
+static int remove_existing_socket_file(char *socket_path)
+{
+ struct stat file_data;
+ int result;
+
+ result = stat(socket_path, &file_data);
+
+ if (result == -1) {
+ if (errno == ENOENT) {
+ return 0;
+ }
+
+ flb_errno();
+
+ return -1;
+ }
+
+ if (S_ISSOCK(file_data.st_mode) == 0) {
+ return -2;
+ }
+
+ result = unlink(socket_path);
+
+ if (result != 0) {
+ return -3;
+ }
+
+ return 0;
+}
+
+#if !defined(FLB_SYSTEM_WINDOWS)
+static int syslog_server_unix_create(struct flb_syslog *ctx)
+{
+ int result;
+ int mode;
+ struct flb_tls *tls;
+
+ if (ctx->mode == FLB_SYSLOG_UNIX_TCP) {
+ mode = FLB_TRANSPORT_UNIX_STREAM;
+ tls = ctx->ins->tls;
+ }
+ else if (ctx->mode == FLB_SYSLOG_UNIX_UDP) {
+ ctx->dgram_mode_flag = FLB_TRUE;
+
+ mode = FLB_TRANSPORT_UNIX_DGRAM;
+ tls = NULL;
+ }
+ else {
+ return -1;
+ }
+
+ result = remove_existing_socket_file(ctx->unix_path);
+
+ if (result != 0) {
+ if (result == -2) {
+ flb_plg_error(ctx->ins,
+ "%s exists and it is not a unix socket. Aborting",
+ ctx->unix_path);
+ }
+ else {
+ flb_plg_error(ctx->ins,
+ "could not remove existing unix socket %s. Aborting",
+ ctx->unix_path);
+ }
+
+ return -1;
+ }
+
+ ctx->downstream = flb_downstream_create(mode,
+ ctx->ins->flags,
+ ctx->unix_path,
+ 0,
+ tls,
+ ctx->ins->config,
+ &ctx->ins->net_setup);
+
+ if (ctx->downstream == NULL) {
+ return -1;
+ }
+
+ if (chmod(ctx->unix_path, ctx->unix_perm)) {
+ flb_errno();
+ flb_error("[in_syslog] cannot set permission on '%s' to %04o",
+ ctx->unix_path, ctx->unix_perm);
+
+ return -1;
+ }
+
+ return 0;
+}
+#else
+static int syslog_server_unix_create(struct flb_syslog *ctx)
+{
+ return -1;
+}
+#endif
+
+static int syslog_server_net_create(struct flb_syslog *ctx)
+{
+ unsigned short int port;
+ int mode;
+ struct flb_tls *tls;
+
+ port = (unsigned short int) strtoul(ctx->port, NULL, 10);
+
+ if (ctx->mode == FLB_SYSLOG_TCP) {
+ mode = FLB_TRANSPORT_TCP;
+ tls = ctx->ins->tls;
+ }
+ else if (ctx->mode == FLB_SYSLOG_UDP) {
+ ctx->dgram_mode_flag = FLB_TRUE;
+
+ mode = FLB_TRANSPORT_UDP;
+ tls = NULL;
+ }
+ else {
+ return -1;
+ }
+
+ ctx->downstream = flb_downstream_create(mode,
+ ctx->ins->flags,
+ ctx->listen,
+ port,
+ tls,
+ ctx->ins->config,
+ &ctx->ins->net_setup);
+
+ if (ctx->downstream != NULL) {
+ flb_info("[in_syslog] %s server binding %s:%s",
+ ((ctx->mode == FLB_SYSLOG_TCP) ? "TCP" : "UDP"),
+ ctx->listen, ctx->port);
+ }
+ else {
+ flb_error("[in_syslog] could not bind address %s:%s. Aborting",
+ ctx->listen, ctx->port);
+
+ return -1;
+ }
+
+ if (ctx->receive_buffer_size) {
+ if (flb_net_socket_rcv_buffer(ctx->downstream->server_fd,
+ ctx->receive_buffer_size)) {
+ flb_error("[in_syslog] could not set rcv buffer to %ld. Aborting",
+ ctx->receive_buffer_size);
+ return -1;
+ }
+ }
+
+ flb_net_socket_nonblocking(ctx->downstream->server_fd);
+
+ return 0;
+}
+
+int syslog_server_create(struct flb_syslog *ctx)
+{
+ int ret;
+
+ if (ctx->mode == FLB_SYSLOG_TCP || ctx->mode == FLB_SYSLOG_UDP) {
+ ret = syslog_server_net_create(ctx);
+ }
+ else {
+ /* Create unix socket end-point */
+ ret = syslog_server_unix_create(ctx);
+ }
+
+ if (ret != 0) {
+ return -1;
+ }
+
+ return 0;
+}
+
+int syslog_server_destroy(struct flb_syslog *ctx)
+{
+ if (ctx->collector_id != -1) {
+ flb_input_collector_delete(ctx->collector_id, ctx->ins);
+
+ ctx->collector_id = -1;
+ }
+
+ if (ctx->downstream != NULL) {
+ flb_downstream_destroy(ctx->downstream);
+
+ ctx->downstream = NULL;
+ }
+
+ if (ctx->mode == FLB_SYSLOG_UNIX_TCP || ctx->mode == FLB_SYSLOG_UNIX_UDP) {
+ if (ctx->unix_path) {
+ unlink(ctx->unix_path);
+ }
+ }
+ else {
+ flb_free(ctx->port);
+ }
+
+ return 0;
+}
diff --git a/fluent-bit/plugins/in_syslog/syslog_server.h b/fluent-bit/plugins/in_syslog/syslog_server.h
new file mode 100644
index 00000000..d14feba7
--- /dev/null
+++ b/fluent-bit/plugins/in_syslog/syslog_server.h
@@ -0,0 +1,31 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_IN_SYSLOG_SERVER_H
+#define FLB_IN_SYSLOG_SERVER_H
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_config.h>
+
+#include "syslog.h"
+
+int syslog_server_create(struct flb_syslog *ctx);
+int syslog_server_destroy(struct flb_syslog *ctx);
+
+#endif