summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/in_elasticsearch
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/plugins/in_elasticsearch')
-rw-r--r--src/fluent-bit/plugins/in_elasticsearch/CMakeLists.txt12
-rw-r--r--src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch.c245
-rw-r--r--src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch.h59
-rw-r--r--src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_bulk_conn.c307
-rw-r--r--src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_bulk_conn.h55
-rw-r--r--src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_bulk_prot.c922
-rw-r--r--src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_bulk_prot.h40
-rw-r--r--src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_config.c105
-rw-r--r--src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_config.h29
9 files changed, 1774 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/in_elasticsearch/CMakeLists.txt b/src/fluent-bit/plugins/in_elasticsearch/CMakeLists.txt
new file mode 100644
index 000000000..50a472f6a
--- /dev/null
+++ b/src/fluent-bit/plugins/in_elasticsearch/CMakeLists.txt
@@ -0,0 +1,12 @@
+if(NOT FLB_METRICS)
+ message(FATAL_ERROR "Elasticsearch input plugin requires FLB_HTTP_SERVER=On.")
+endif()
+
+set(src
+ in_elasticsearch.c
+ in_elasticsearch_config.c
+ in_elasticsearch_bulk_conn.c
+ in_elasticsearch_bulk_prot.c
+ )
+
+FLB_PLUGIN(in_elasticsearch "${src}" "")
diff --git a/src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch.c b/src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch.c
new file mode 100644
index 000000000..af1a594c6
--- /dev/null
+++ b/src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch.c
@@ -0,0 +1,245 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2023 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_network.h>
+#include <fluent-bit/flb_config.h>
+#include <fluent-bit/flb_random.h>
+
+#include "in_elasticsearch.h"
+#include "in_elasticsearch_config.h"
+#include "in_elasticsearch_bulk_conn.h"
+
+/*
+ * For a server event, the collection event means a new client have arrived, we
+ * accept the connection and create a new TCP instance which will wait for
+ * JSON map messages.
+ */
+static int in_elasticsearch_bulk_collect(struct flb_input_instance *ins,
+ struct flb_config *config, void *in_context)
+{
+ struct flb_connection *connection;
+ struct in_elasticsearch_bulk_conn *conn;
+ struct flb_in_elasticsearch *ctx;
+
+ ctx = in_context;
+
+ connection = flb_downstream_conn_get(ctx->downstream);
+
+ if (connection == NULL) {
+ flb_plg_error(ctx->ins, "could not accept new connection");
+
+ return -1;
+ }
+
+ flb_plg_trace(ctx->ins, "new TCP connection arrived FD=%i",
+ connection->fd);
+
+ conn = in_elasticsearch_bulk_conn_add(connection, ctx);
+
+ if (conn == NULL) {
+ flb_downstream_conn_release(connection);
+
+ return -1;
+ }
+
+ return 0;
+}
+
+static void bytes_to_groupname(unsigned char *data, char *buf, size_t len) {
+ int index;
+ char charset[] = "0123456789"
+ "abcdefghijklmnopqrstuvwxyz"
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+
+ while (len-- > 0) {
+ index = (int) data[len];
+ index = index % (sizeof(charset) - 1);
+ buf[len] = charset[index];
+ }
+}
+
+static void bytes_to_nodename(unsigned char *data, char *buf, size_t len) {
+ int index;
+ char charset[] = "0123456789"
+ "abcdefghijklmnopqrstuvwxyz";
+
+ while (len-- > 0) {
+ index = (int) data[len];
+ index = index % (sizeof(charset) - 1);
+ buf[len] = charset[index];
+ }
+}
+
+static int in_elasticsearch_bulk_init(struct flb_input_instance *ins,
+ struct flb_config *config, void *data)
+{
+ unsigned short int port;
+ int ret;
+ struct flb_in_elasticsearch *ctx;
+ unsigned char rand[16];
+
+ (void) data;
+
+ /* Create context and basic conf */
+ ctx = in_elasticsearch_config_create(ins);
+ if (!ctx) {
+ return -1;
+ }
+
+ ctx->collector_id = -1;
+
+ /* Populate context with config map defaults and incoming properties */
+ ret = flb_input_config_map_set(ins, (void *) ctx);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "configuration error");
+ in_elasticsearch_config_destroy(ctx);
+ return -1;
+ }
+
+ /* Set the context */
+ flb_input_set_context(ins, ctx);
+
+ port = (unsigned short int) strtoul(ctx->tcp_port, NULL, 10);
+
+ if (flb_random_bytes(rand, 16)) {
+ flb_plg_error(ctx->ins, "cannot generate cluster name");
+ in_elasticsearch_config_destroy(ctx);
+ return -1;
+ }
+
+ bytes_to_groupname(rand, ctx->cluster_name, 16);
+
+ if (flb_random_bytes(rand, 12)) {
+ flb_plg_error(ctx->ins, "cannot generate node name");
+ in_elasticsearch_config_destroy(ctx);
+ return -1;
+ }
+
+ bytes_to_nodename(rand, ctx->node_name, 12);
+
+ ctx->downstream = flb_downstream_create(FLB_TRANSPORT_TCP,
+ ins->flags,
+ ctx->listen,
+ port,
+ ins->tls,
+ config,
+ &ins->net_setup);
+
+ if (ctx->downstream == NULL) {
+ flb_plg_error(ctx->ins,
+ "could not initialize downstream on %s:%s. Aborting",
+ ctx->listen, ctx->tcp_port);
+
+ in_elasticsearch_config_destroy(ctx);
+
+ return -1;
+ }
+
+ flb_input_downstream_set(ctx->downstream, ctx->ins);
+
+ /* Collect upon data available on the standard input */
+ ret = flb_input_set_collector_socket(ins,
+ in_elasticsearch_bulk_collect,
+ ctx->downstream->server_fd,
+ config);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "Could not set collector for IN_ELASTICSEARCH input plugin");
+ in_elasticsearch_config_destroy(ctx);
+
+ return -1;
+ }
+
+ ctx->collector_id = ret;
+
+ return 0;
+}
+
+static int in_elasticsearch_bulk_exit(void *data, struct flb_config *config)
+{
+ struct flb_in_elasticsearch *ctx;
+
+ (void) config;
+
+ ctx = data;
+
+ if (ctx != NULL) {
+ in_elasticsearch_config_destroy(ctx);
+ }
+
+ return 0;
+}
+
+/* Configuration properties map */
+static struct flb_config_map config_map[] = {
+ {
+ FLB_CONFIG_MAP_SIZE, "buffer_max_size", HTTP_BUFFER_MAX_SIZE,
+ 0, FLB_TRUE, offsetof(struct flb_in_elasticsearch, buffer_max_size),
+ "Set the maximum size of buffer"
+ },
+
+ {
+ FLB_CONFIG_MAP_SIZE, "buffer_chunk_size", HTTP_BUFFER_CHUNK_SIZE,
+ 0, FLB_TRUE, offsetof(struct flb_in_elasticsearch, buffer_chunk_size),
+ "Set the buffer chunk size"
+ },
+
+ {
+ FLB_CONFIG_MAP_STR, "tag_key", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_in_elasticsearch, tag_key),
+ "Specify a key name for extracting as a tag"
+ },
+
+ {
+ FLB_CONFIG_MAP_STR, "meta_key", "@meta",
+ 0, FLB_TRUE, offsetof(struct flb_in_elasticsearch, meta_key),
+ "Specify a key name for meta information"
+ },
+
+ {
+ FLB_CONFIG_MAP_STR, "hostname", "localhost",
+ 0, FLB_TRUE, offsetof(struct flb_in_elasticsearch, hostname),
+ "Specify hostname or FQDN. This parameter is effective for sniffering node information."
+ },
+
+ {
+ FLB_CONFIG_MAP_STR, "version", "8.0.0",
+ 0, FLB_TRUE, offsetof(struct flb_in_elasticsearch, es_version),
+ "Specify returning Elasticsearch server version."
+ },
+
+ /* EOF */
+ {0}
+};
+
+/* Plugin reference */
+struct flb_input_plugin in_elasticsearch_plugin = {
+ .name = "elasticsearch",
+ .description = "HTTP Endpoints for Elasticsearch (Bulk API)",
+ .cb_init = in_elasticsearch_bulk_init,
+ .cb_pre_run = NULL,
+ .cb_collect = in_elasticsearch_bulk_collect,
+ .cb_flush_buf = NULL,
+ .cb_pause = NULL,
+ .cb_resume = NULL,
+ .cb_exit = in_elasticsearch_bulk_exit,
+ .config_map = config_map,
+ .flags = FLB_INPUT_NET_SERVER | FLB_IO_OPT_TLS
+};
diff --git a/src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch.h b/src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch.h
new file mode 100644
index 000000000..159dff88c
--- /dev/null
+++ b/src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch.h
@@ -0,0 +1,59 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2023 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_IN_ELASTICSEARCH_H
+#define FLB_IN_ELASTICSEARCH_H
+
+#include <fluent-bit/flb_downstream.h>
+#include <fluent-bit/flb_config.h>
+#include <fluent-bit/flb_input.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_log_event_encoder.h>
+
+#include <monkey/monkey.h>
+
+#define HTTP_BUFFER_MAX_SIZE "4M"
+#define HTTP_BUFFER_CHUNK_SIZE "512K"
+
+struct flb_in_elasticsearch {
+ flb_sds_t listen;
+ flb_sds_t tcp_port;
+ const char *tag_key;
+ const char *meta_key;
+ flb_sds_t hostname;
+ flb_sds_t es_version;
+ char cluster_name[16];
+ char node_name[12];
+
+ int collector_id;
+
+ size_t buffer_max_size; /* Maximum buffer size */
+ size_t buffer_chunk_size; /* Chunk allocation size */
+
+ struct flb_downstream *downstream; /* Client manager */
+ struct mk_list connections; /* linked list of connections */
+
+ struct flb_log_event_encoder log_encoder;
+
+ struct mk_server *server;
+ struct flb_input_instance *ins;
+};
+
+
+#endif
diff --git a/src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_bulk_conn.c b/src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_bulk_conn.c
new file mode 100644
index 000000000..f835af26a
--- /dev/null
+++ b/src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_bulk_conn.c
@@ -0,0 +1,307 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2023 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_engine.h>
+
+#include "in_elasticsearch.h"
+#include "in_elasticsearch_bulk_conn.h"
+#include "in_elasticsearch_bulk_prot.h"
+
+static void in_elasticsearch_bulk_conn_request_init(struct mk_http_session *session,
+ struct mk_http_request *request);
+
+static int in_elasticsearch_bulk_conn_event(void *data)
+{
+ int status;
+ size_t size;
+ ssize_t available;
+ ssize_t bytes;
+ char *tmp;
+ char *request_end;
+ size_t request_len;
+ struct flb_connection *connection;
+ struct in_elasticsearch_bulk_conn *conn;
+ struct mk_event *event;
+ struct flb_in_elasticsearch *ctx;
+
+ connection = (struct flb_connection *) data;
+
+ conn = connection->user_data;
+
+ ctx = conn->ctx;
+
+ event = &connection->event;
+
+ if (event->mask & MK_EVENT_READ) {
+ available = (conn->buf_size - conn->buf_len) - 1;
+ if (available < 1) {
+ if (conn->buf_size + ctx->buffer_chunk_size > ctx->buffer_max_size) {
+ flb_plg_trace(ctx->ins,
+ "fd=%i incoming data exceed limit (%zu KB)",
+ event->fd, (ctx->buffer_max_size / 1024));
+ in_elasticsearch_bulk_conn_del(conn);
+ return -1;
+ }
+
+ size = conn->buf_size + ctx->buffer_chunk_size;
+ tmp = flb_realloc(conn->buf_data, size);
+ if (!tmp) {
+ flb_errno();
+ in_elasticsearch_bulk_conn_del(conn);
+ return -1;
+ }
+ flb_plg_trace(ctx->ins, "fd=%i buffer realloc %i -> %zu",
+ event->fd, conn->buf_size, size);
+
+ conn->buf_data = tmp;
+ conn->buf_size = size;
+ available = (conn->buf_size - conn->buf_len) - 1;
+ }
+
+ /* Read data */
+ bytes = flb_io_net_read(connection,
+ (void *) &conn->buf_data[conn->buf_len],
+ available);
+
+ if (bytes <= 0) {
+ flb_plg_trace(ctx->ins, "fd=%i closed connection", event->fd);
+ in_elasticsearch_bulk_conn_del(conn);
+ return -1;
+ }
+
+ flb_plg_trace(ctx->ins, "read()=%zi pre_len=%i now_len=%zi",
+ bytes, conn->buf_len, conn->buf_len + bytes);
+ conn->buf_len += bytes;
+ conn->buf_data[conn->buf_len] = '\0';
+
+ status = mk_http_parser(&conn->request, &conn->session.parser,
+ conn->buf_data, conn->buf_len, conn->session.server);
+
+ if (status == MK_HTTP_PARSER_OK) {
+ /* Do more logic parsing and checks for this request */
+ in_elasticsearch_bulk_prot_handle(ctx, conn, &conn->session, &conn->request);
+
+ /* Evict the processed request from the connection buffer and reinitialize
+ * the HTTP parser.
+ */
+
+ request_end = NULL;
+
+ if (NULL != conn->request.data.data) {
+ request_end = &conn->request.data.data[conn->request.data.len];
+ }
+ else {
+ request_end = strstr(conn->buf_data, "\r\n\r\n");
+
+ if(NULL != request_end) {
+ request_end = &request_end[4];
+ }
+ }
+
+ if (NULL != request_end) {
+ request_len = (size_t)(request_end - conn->buf_data);
+
+ if (0 < (conn->buf_len - request_len)) {
+ memmove(conn->buf_data, &conn->buf_data[request_len],
+ conn->buf_len - request_len);
+
+ conn->buf_data[conn->buf_len - request_len] = '\0';
+ conn->buf_len -= request_len;
+ }
+ else {
+ memset(conn->buf_data, 0, request_len);
+
+ conn->buf_len = 0;
+ }
+
+ /* Reinitialize the parser so the next request is properly
+ * handled, the additional memset intends to wipe any left over data
+ * from the headers parsed in the previous request.
+ */
+ memset(&conn->session.parser, 0, sizeof(struct mk_http_parser));
+ mk_http_parser_init(&conn->session.parser);
+ in_elasticsearch_bulk_conn_request_init(&conn->session, &conn->request);
+ }
+ }
+ else if (status == MK_HTTP_PARSER_ERROR) {
+ in_elasticsearch_bulk_prot_handle_error(ctx, conn, &conn->session, &conn->request);
+
+ /* Reinitialize the parser so the next request is properly
+ * handled, the additional memset intends to wipe any left over data
+ * from the headers parsed in the previous request.
+ */
+ memset(&conn->session.parser, 0, sizeof(struct mk_http_parser));
+ mk_http_parser_init(&conn->session.parser);
+ in_elasticsearch_bulk_conn_request_init(&conn->session, &conn->request);
+ }
+
+ /* FIXME: add Protocol handler here */
+ return bytes;
+ }
+
+ if (event->mask & MK_EVENT_CLOSE) {
+ flb_plg_trace(ctx->ins, "fd=%i hangup", event->fd);
+ in_elasticsearch_bulk_conn_del(conn);
+ return -1;
+ }
+
+ return 0;
+
+}
+
+static void in_elasticsearch_bulk_conn_session_init(struct mk_http_session *session,
+ struct mk_server *server,
+ int client_fd)
+{
+ /* Alloc memory for node */
+ session->_sched_init = MK_TRUE;
+ session->pipelined = MK_FALSE;
+ session->counter_connections = 0;
+ session->close_now = MK_FALSE;
+ session->status = MK_REQUEST_STATUS_INCOMPLETE;
+ session->server = server;
+ session->socket = client_fd;
+
+ /* creation time in unix time */
+ session->init_time = time(NULL);
+
+ session->channel = mk_channel_new(MK_CHANNEL_SOCKET, session->socket);
+ session->channel->io = session->server->network;
+
+ /* Init session request list */
+ mk_list_init(&session->request_list);
+
+ /* Initialize the parser */
+ mk_http_parser_init(&session->parser);
+}
+
+static void in_elasticsearch_bulk_conn_request_init(struct mk_http_session *session,
+ struct mk_http_request *request)
+{
+ memset(request, 0, sizeof(struct mk_http_request));
+
+ mk_http_request_init(session, request, session->server);
+
+ request->in_headers.type = MK_STREAM_IOV;
+ request->in_headers.dynamic = MK_FALSE;
+ request->in_headers.cb_consumed = NULL;
+ request->in_headers.cb_finished = NULL;
+ request->in_headers.stream = &request->stream;
+
+ mk_list_add(&request->in_headers._head, &request->stream.inputs);
+
+ request->session = session;
+}
+
+struct in_elasticsearch_bulk_conn *in_elasticsearch_bulk_conn_add(struct flb_connection *connection,
+ struct flb_in_elasticsearch *ctx)
+{
+ struct in_elasticsearch_bulk_conn *conn;
+ int ret;
+
+ conn = flb_calloc(1, sizeof(struct in_elasticsearch_bulk_conn));
+ if (!conn) {
+ flb_errno();
+ return NULL;
+ }
+
+ conn->connection = connection;
+
+ /* Set data for the event-loop */
+ MK_EVENT_NEW(&connection->event);
+
+ connection->user_data = conn;
+ connection->event.type = FLB_ENGINE_EV_CUSTOM;
+ connection->event.handler = in_elasticsearch_bulk_conn_event;
+
+ /* Connection info */
+ conn->ctx = ctx;
+ conn->buf_len = 0;
+
+ conn->buf_data = flb_malloc(ctx->buffer_chunk_size);
+ if (!conn->buf_data) {
+ flb_errno();
+
+ flb_plg_error(ctx->ins, "could not allocate new connection");
+ flb_free(conn);
+
+ return NULL;
+ }
+ conn->buf_size = ctx->buffer_chunk_size;
+
+ /* Register instance into the event loop */
+ ret = mk_event_add(flb_engine_evl_get(),
+ connection->fd,
+ FLB_ENGINE_EV_CUSTOM,
+ MK_EVENT_READ,
+ &connection->event);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "could not register new connection");
+
+ flb_free(conn->buf_data);
+ flb_free(conn);
+
+ return NULL;
+ }
+
+ /* Initialize HTTP Session: this is a custom context for Monkey HTTP */
+ in_elasticsearch_bulk_conn_session_init(&conn->session, ctx->server, conn->connection->fd);
+
+ /* Initialize HTTP Request: this is the initial request and it will be reinitialized
+ * automatically after the request is handled so it can be used for the next one.
+ */
+ in_elasticsearch_bulk_conn_request_init(&conn->session, &conn->request);
+
+ /* Link connection node to parent context list */
+ mk_list_add(&conn->_head, &ctx->connections);
+
+ return conn;
+}
+
+int in_elasticsearch_bulk_conn_del(struct in_elasticsearch_bulk_conn *conn)
+{
+ if (conn->session.channel != NULL) {
+ mk_channel_release(conn->session.channel);
+ }
+
+ /* The downstream unregisters the file descriptor from the event-loop
+ * so there's nothing to be done by the plugin
+ */
+ flb_downstream_conn_release(conn->connection);
+
+ mk_list_del(&conn->_head);
+
+ flb_free(conn->buf_data);
+ flb_free(conn);
+
+ return 0;
+}
+
+void in_elasticsearch_bulk_conn_release_all(struct flb_in_elasticsearch *ctx)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct in_elasticsearch_bulk_conn *conn;
+
+ mk_list_foreach_safe(head, tmp, &ctx->connections) {
+ conn = mk_list_entry(head, struct in_elasticsearch_bulk_conn, _head);
+ in_elasticsearch_bulk_conn_del(conn);
+ }
+}
diff --git a/src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_bulk_conn.h b/src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_bulk_conn.h
new file mode 100644
index 000000000..a5a7593ac
--- /dev/null
+++ b/src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_bulk_conn.h
@@ -0,0 +1,55 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2023 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_IN_ELASTICSEARCH_BULK_CONN
+#define FLB_IN_ELASTICSEARCH_BULK_CONN
+
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_connection.h>
+
+#include <monkey/mk_http.h>
+#include <monkey/mk_http_parser.h>
+#include <monkey/mk_utils.h>
+
+struct in_elasticsearch_bulk_conn {
+ /* Buffer */
+ char *buf_data; /* Buffer data */
+ int buf_len; /* Data length */
+ int buf_size; /* Buffer size */
+
+ /*
+ * Parser context: we only held one parser per connection
+ * which is re-used everytime we have a new request.
+ */
+ struct mk_http_parser parser;
+ struct mk_http_request request;
+ struct mk_http_session session;
+ struct flb_connection *connection;
+
+ void *ctx; /* Plugin parent context */
+ struct mk_list _head; /* link to flb_es_bulk->connections */
+};
+
+struct in_elasticsearch_bulk_conn *in_elasticsearch_bulk_conn_add(struct flb_connection *connection,
+ struct flb_in_elasticsearch *ctx);
+int in_elasticsearch_bulk_conn_del(struct in_elasticsearch_bulk_conn *conn);
+void in_elasticsearch_bulk_conn_release_all(struct flb_in_elasticsearch *ctx);
+
+
+#endif
diff --git a/src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_bulk_prot.c b/src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_bulk_prot.c
new file mode 100644
index 000000000..c7acfd671
--- /dev/null
+++ b/src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_bulk_prot.c
@@ -0,0 +1,922 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2023 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_version.h>
+#include <fluent-bit/flb_error.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_gzip.h>
+
+#include <monkey/monkey.h>
+#include <monkey/mk_core.h>
+
+#include "in_elasticsearch.h"
+#include "in_elasticsearch_bulk_conn.h"
+#include "in_elasticsearch_bulk_prot.h"
+
+#define HTTP_CONTENT_JSON 0
+#define HTTP_CONTENT_NDJSON 1
+
+static int send_empty_response(struct in_elasticsearch_bulk_conn *conn, int http_status)
+{
+ size_t sent;
+ flb_sds_t out;
+
+ out = flb_sds_create_size(256);
+ if (!out) {
+ return -1;
+ }
+
+ if (http_status == 200) {
+ flb_sds_printf(&out,
+ "HTTP/1.1 200 OK\r\n"
+ "Content-Type: application/json\r\n\r\n");
+ }
+
+ /* We should check this operations result */
+ flb_io_net_write(conn->connection,
+ (void *) out,
+ flb_sds_len(out),
+ &sent);
+
+ flb_sds_destroy(out);
+
+ return 0;
+}
+
+static int send_json_message_response(struct in_elasticsearch_bulk_conn *conn, int http_status, char *message)
+{
+ size_t sent;
+ int len;
+ flb_sds_t out;
+
+ out = flb_sds_create_size(256);
+ if (!out) {
+ return -1;
+ }
+
+ if (message) {
+ len = strlen(message);
+ }
+ else {
+ len = 0;
+ }
+
+ if (http_status == 200) {
+ flb_sds_printf(&out,
+ "HTTP/1.1 200 OK\r\n"
+ "Content-Type: application/json\r\n"
+ "Content-Length: %i\r\n\r\n%s",
+ len, message);
+ }
+
+ /* We should check this operations result */
+ flb_io_net_write(conn->connection,
+ (void *) out,
+ flb_sds_len(out),
+ &sent);
+
+ flb_sds_destroy(out);
+
+ return 0;
+}
+
+static int send_version_message_response(struct flb_in_elasticsearch *ctx,
+ struct in_elasticsearch_bulk_conn *conn, int http_status)
+{
+ size_t sent;
+ int len;
+ flb_sds_t out;
+ flb_sds_t resp;
+
+ out = flb_sds_create_size(256);
+ if (!out) {
+ return -1;
+ }
+ resp = flb_sds_create_size(384);
+ if (!resp) {
+ flb_sds_destroy(out);
+ return -1;
+ }
+
+ flb_sds_printf(&resp,
+ ES_VERSION_RESPONSE_TEMPLATE,
+ ctx->es_version);
+
+ len = flb_sds_len(resp);
+
+ if (http_status == 200) {
+ flb_sds_printf(&out,
+ "HTTP/1.1 200 OK\r\n"
+ "Content-Type: application/json\r\n"
+ "Content-Length: %i\r\n\r\n%s",
+ len, resp);
+ }
+
+ /* We should check this operations result */
+ flb_io_net_write(conn->connection,
+ (void *) out,
+ flb_sds_len(out),
+ &sent);
+
+ flb_sds_destroy(resp);
+ flb_sds_destroy(out);
+
+ return 0;
+}
+
+static int send_dummy_sniffer_response(struct in_elasticsearch_bulk_conn *conn, int http_status,
+ struct flb_in_elasticsearch *ctx)
+{
+ size_t sent;
+ int len;
+ flb_sds_t out;
+ flb_sds_t resp;
+ flb_sds_t hostname;
+
+ if (ctx->hostname != NULL) {
+ hostname = ctx->hostname;
+ }
+ else {
+ hostname = "localhost";
+ }
+
+ out = flb_sds_create_size(384);
+ if (!out) {
+ return -1;
+ }
+
+ resp = flb_sds_create_size(384);
+ if (!resp) {
+ flb_sds_destroy(out);
+ return -1;
+ }
+
+ flb_sds_printf(&resp,
+ ES_NODES_TEMPLATE,
+ ctx->cluster_name, ctx->node_name,
+ hostname, ctx->tcp_port, ctx->buffer_max_size);
+
+ len = flb_sds_len(resp) ;
+
+ if (http_status == 200) {
+ flb_sds_printf(&out,
+ "HTTP/1.1 200 OK\r\n"
+ "Content-Type: application/json\r\n"
+ "Content-Length: %i\r\n\r\n%s",
+ len, resp);
+ }
+
+ /* We should check this operations result */
+ flb_io_net_write(conn->connection,
+ (void *) out,
+ flb_sds_len(out),
+ &sent);
+
+ flb_sds_destroy(resp);
+ flb_sds_destroy(out);
+
+ return 0;
+}
+
+static int send_response(struct in_elasticsearch_bulk_conn *conn, int http_status, char *message)
+{
+ size_t sent;
+ int len;
+ flb_sds_t out;
+
+ out = flb_sds_create_size(256);
+ if (!out) {
+ return -1;
+ }
+
+ if (message) {
+ len = strlen(message);
+ }
+ else {
+ len = 0;
+ }
+
+ if (http_status == 200) {
+ flb_sds_printf(&out,
+ "HTTP/1.1 200 OK\r\n"
+ "Server: Fluent Bit v%s\r\n"
+ "Content-Type: application/json\r\n"
+ "Content-Length: %i\r\n\r\n%s",
+ FLB_VERSION_STR,
+ len, message);
+ }
+ else if (http_status == 400) {
+ flb_sds_printf(&out,
+ "HTTP/1.1 400 Forbidden\r\n"
+ "Server: Fluent Bit v%s\r\n"
+ "Content-Length: %i\r\n\r\n%s",
+ FLB_VERSION_STR,
+ len, message);
+ }
+
+ /* We should check this operations result */
+ flb_io_net_write(conn->connection,
+ (void *) out,
+ flb_sds_len(out),
+ &sent);
+
+ flb_sds_destroy(out);
+
+ return 0;
+}
+
+/* implements functionality to get tag from key in record */
+static flb_sds_t tag_key(struct flb_in_elasticsearch *ctx, msgpack_object *map)
+{
+ size_t map_size = map->via.map.size;
+ msgpack_object_kv *kv;
+ msgpack_object key;
+ msgpack_object val;
+ char *key_str = NULL;
+ char *val_str = NULL;
+ size_t key_str_size = 0;
+ size_t val_str_size = 0;
+ int j;
+ int check = FLB_FALSE;
+ int found = FLB_FALSE;
+ flb_sds_t tag;
+
+ kv = map->via.map.ptr;
+
+ for(j=0; j < map_size; j++) {
+ check = FLB_FALSE;
+ found = FLB_FALSE;
+ key = (kv+j)->key;
+ if (key.type == MSGPACK_OBJECT_BIN) {
+ key_str = (char *) key.via.bin.ptr;
+ key_str_size = key.via.bin.size;
+ check = FLB_TRUE;
+ }
+ if (key.type == MSGPACK_OBJECT_STR) {
+ key_str = (char *) key.via.str.ptr;
+ key_str_size = key.via.str.size;
+ check = FLB_TRUE;
+ }
+
+ if (check == FLB_TRUE) {
+ if (strncmp(ctx->tag_key, key_str, key_str_size) == 0) {
+ val = (kv+j)->val;
+ if (val.type == MSGPACK_OBJECT_BIN) {
+ val_str = (char *) val.via.bin.ptr;
+ val_str_size = val.via.str.size;
+ found = FLB_TRUE;
+ break;
+ }
+ if (val.type == MSGPACK_OBJECT_STR) {
+ val_str = (char *) val.via.str.ptr;
+ val_str_size = val.via.str.size;
+ found = FLB_TRUE;
+ break;
+ }
+ }
+ }
+ }
+
+ if (found == FLB_TRUE) {
+ tag = flb_sds_create_len(val_str, val_str_size);
+ if (!tag) {
+ flb_errno();
+ return NULL;
+ }
+ return tag;
+ }
+
+
+ flb_plg_error(ctx->ins, "Could not find tag_key %s in record", ctx->tag_key);
+ return NULL;
+}
+
+static int get_write_op(struct flb_in_elasticsearch *ctx, msgpack_object *map, flb_sds_t *out_write_op, size_t *out_key_size)
+{
+ char *op_str = NULL;
+ size_t op_str_size = 0;
+ msgpack_object_kv *kv;
+ msgpack_object key;
+ int check = FLB_FALSE;
+
+ kv = map->via.map.ptr;
+ key = kv[0].key;
+ if (key.type == MSGPACK_OBJECT_BIN) {
+ op_str = (char *) key.via.bin.ptr;
+ op_str_size = key.via.bin.size;
+ check = FLB_TRUE;
+ }
+ if (key.type == MSGPACK_OBJECT_STR) {
+ op_str = (char *) key.via.str.ptr;
+ op_str_size = key.via.str.size;
+ check = FLB_TRUE;
+ }
+
+ if (check == FLB_TRUE) {
+ *out_write_op = flb_sds_create_len(op_str, op_str_size);
+ *out_key_size = op_str_size;
+ }
+
+ return check;
+}
+
+static int status_buffer_avail(struct flb_in_elasticsearch *ctx, flb_sds_t bulk_statuses, size_t threshold)
+{
+ if (flb_sds_avail(bulk_statuses) < threshold) {
+ flb_plg_warn(ctx->ins, "left buffer for bulk status(es) is too small");
+
+ return FLB_FALSE;
+ }
+
+ return FLB_TRUE;
+}
+
+static int process_ndpack(struct flb_in_elasticsearch *ctx, flb_sds_t tag, char *buf, size_t size, flb_sds_t bulk_statuses)
+{
+ int ret;
+ size_t off = 0;
+ size_t map_copy_index;
+ msgpack_object_kv *map_copy_entry;
+ msgpack_unpacked result;
+ struct flb_time tm;
+ msgpack_object *obj;
+ flb_sds_t tag_from_record = NULL;
+ int idx = 0;
+ flb_sds_t write_op;
+ size_t op_str_size = 0;
+ int op_ret = FLB_FALSE;
+ int error_op = FLB_FALSE;
+
+ flb_time_get(&tm);
+
+ msgpack_unpacked_init(&result);
+ while (msgpack_unpack_next(&result, buf, size, &off) == MSGPACK_UNPACK_SUCCESS) {
+ if (result.data.type == MSGPACK_OBJECT_MAP) {
+ if (idx > 0 && idx % 2 == 0) {
+ flb_sds_cat(bulk_statuses, ",", 1);
+ }
+ if (status_buffer_avail(ctx, bulk_statuses, 50) == FLB_FALSE) {
+ break;
+ }
+ if (idx % 2 == 0) {
+ op_ret = get_write_op(ctx, &result.data, &write_op, &op_str_size);
+
+ if (op_ret) {
+ if (flb_sds_cmp(write_op, "index", op_str_size) == 0) {
+ flb_sds_cat(bulk_statuses, "{\"index\":", 9);
+ error_op = FLB_FALSE;
+ }
+ else if (flb_sds_cmp(write_op, "create", op_str_size) == 0) {
+ flb_sds_cat(bulk_statuses, "{\"create\":", 10);
+ error_op = FLB_FALSE;
+ }
+ else if (flb_sds_cmp(write_op, "update", op_str_size) == 0) {
+ flb_sds_cat(bulk_statuses, "{\"update\":", 10);
+ error_op = FLB_TRUE;
+ }
+ else if (flb_sds_cmp(write_op, "delete", op_str_size) == 0) {
+ flb_sds_cat(bulk_statuses, "{\"delete\":{\"status\":404,\"result\":\"not_found\"}}", 46);
+ error_op = FLB_TRUE;
+ idx += 1; /* Prepare to adjust to multiple of two
+ * in the end of the loop.
+ * Due to delete actions include only one line. */
+ flb_sds_destroy(write_op);
+
+ goto proceed;
+ }
+ else {
+ flb_sds_cat(bulk_statuses, "{\"unknown\":{\"status\":400,\"result\":\"bad_request\"}}", 49);
+ error_op = FLB_TRUE;
+
+ flb_sds_destroy(write_op);
+
+ break;
+ }
+ } else {
+ flb_sds_destroy(write_op);
+ flb_plg_error(ctx->ins, "meta information line is missing");
+ error_op = FLB_TRUE;
+
+ break;
+ }
+
+ if (error_op == FLB_FALSE) {
+ flb_log_event_encoder_reset(&ctx->log_encoder);
+
+ ret = flb_log_event_encoder_begin_record(&ctx->log_encoder);
+
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ flb_sds_destroy(write_op);
+ flb_plg_error(ctx->ins, "event encoder error : %d", ret);
+ error_op = FLB_TRUE;
+
+ break;
+ }
+
+ ret = flb_log_event_encoder_set_timestamp(
+ &ctx->log_encoder,
+ &tm);
+
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ flb_sds_destroy(write_op);
+ flb_plg_error(ctx->ins, "event encoder error : %d", ret);
+ error_op = FLB_TRUE;
+
+ break;
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_append_body_values(
+ &ctx->log_encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE((char *) ctx->meta_key),
+ FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&result.data));
+ }
+
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ flb_sds_destroy(write_op);
+ flb_plg_error(ctx->ins, "event encoder error : %d", ret);
+ error_op = FLB_TRUE;
+
+ break;
+ }
+ }
+ }
+ else if (idx % 2 == 1) {
+ if (error_op == FLB_FALSE) {
+ /* Pack body */
+
+ for (map_copy_index = 0 ;
+ map_copy_index < result.data.via.map.size &&
+ ret == FLB_EVENT_ENCODER_SUCCESS ;
+ map_copy_index++) {
+ map_copy_entry = &result.data.via.map.ptr[map_copy_index];
+
+ ret = flb_log_event_encoder_append_body_values(
+ &ctx->log_encoder,
+ FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&map_copy_entry->key),
+ FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&map_copy_entry->val));
+ }
+
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ flb_plg_error(ctx->ins, "event encoder error : %d", ret);
+ error_op = FLB_TRUE;
+
+ break;
+ }
+
+ ret = flb_log_event_encoder_commit_record(&ctx->log_encoder);
+
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ flb_plg_error(ctx->ins, "event encoder error : %d", ret);
+ error_op = FLB_TRUE;
+
+ break;
+ }
+
+ tag_from_record = NULL;
+
+ if (ctx->tag_key) {
+ obj = &result.data;
+ tag_from_record = tag_key(ctx, obj);
+ }
+
+ if (tag_from_record) {
+ flb_input_log_append(ctx->ins,
+ tag_from_record,
+ flb_sds_len(tag_from_record),
+ ctx->log_encoder.output_buffer,
+ ctx->log_encoder.output_length);
+
+ flb_sds_destroy(tag_from_record);
+ }
+ else if (tag) {
+ flb_input_log_append(ctx->ins,
+ tag,
+ flb_sds_len(tag),
+ ctx->log_encoder.output_buffer,
+ ctx->log_encoder.output_length);
+ }
+ else {
+ /* use default plugin Tag (it internal name, e.g: http.0 */
+ flb_input_log_append(ctx->ins, NULL, 0,
+ ctx->log_encoder.output_buffer,
+ ctx->log_encoder.output_length);
+ }
+
+ flb_log_event_encoder_reset(&ctx->log_encoder);
+ }
+ if (op_ret) {
+ if (flb_sds_cmp(write_op, "index", op_str_size) == 0) {
+ flb_sds_cat(bulk_statuses, "{\"status\":201,\"result\":\"created\"}}", 34);
+ }
+ else if (flb_sds_cmp(write_op, "create", op_str_size) == 0) {
+ flb_sds_cat(bulk_statuses, "{\"status\":201,\"result\":\"created\"}}", 34);
+ }
+ else if (flb_sds_cmp(write_op, "update", op_str_size) == 0) {
+ flb_sds_cat(bulk_statuses, "{\"status\":403,\"result\":\"forbidden\"}}", 36);
+ }
+ if (status_buffer_avail(ctx, bulk_statuses, 50) == FLB_FALSE) {
+ flb_sds_destroy(write_op);
+
+ break;
+ }
+ }
+ flb_sds_destroy(write_op);
+ }
+
+ proceed:
+ idx++;
+ }
+ else {
+ flb_plg_error(ctx->ins, "skip record from invalid type: %i",
+ result.data.type);
+ msgpack_unpacked_destroy(&result);
+ return -1;
+ }
+ }
+
+ if (idx % 2 != 0) {
+ flb_plg_warn(ctx->ins, "decode payload of Bulk API is failed");
+ msgpack_unpacked_destroy(&result);
+ if (error_op == FLB_FALSE) {
+ /* On lacking of body case in non-error case, there is no
+ * releasing memory code paths. We should proceed to do
+ * it here. */
+ flb_sds_destroy(write_op);
+ }
+
+ return -1;
+ }
+
+ msgpack_unpacked_destroy(&result);
+
+ return 0;
+}
+
+static ssize_t parse_payload_ndjson(struct flb_in_elasticsearch *ctx, flb_sds_t tag,
+ char *payload, size_t size, flb_sds_t bulk_statuses)
+{
+ int ret;
+ int out_size;
+ char *pack;
+ struct flb_pack_state pack_state;
+
+ /* Initialize packer */
+ flb_pack_state_init(&pack_state);
+
+ /* Pack JSON as msgpack */
+ ret = flb_pack_json_state(payload, size,
+ &pack, &out_size, &pack_state);
+ flb_pack_state_reset(&pack_state);
+
+ /* Handle exceptions */
+ if (ret == FLB_ERR_JSON_PART) {
+ flb_plg_warn(ctx->ins, "JSON data is incomplete, skipping");
+ return -1;
+ }
+ else if (ret == FLB_ERR_JSON_INVAL) {
+ flb_plg_warn(ctx->ins, "invalid JSON message, skipping");
+ return -1;
+ }
+ else if (ret == -1) {
+ return -1;
+ }
+
+ /* Process the packaged JSON and return the last byte used */
+ process_ndpack(ctx, tag, pack, out_size, bulk_statuses);
+ flb_free(pack);
+
+ return 0;
+}
+
+static int process_payload(struct flb_in_elasticsearch *ctx, struct in_elasticsearch_bulk_conn *conn,
+ flb_sds_t tag,
+ struct mk_http_session *session,
+ struct mk_http_request *request,
+ flb_sds_t bulk_statuses)
+{
+ int type = -1;
+ int i = 0;
+ int ret = 0;
+ struct mk_http_header *header;
+ int extra_size = -1;
+ struct mk_http_header *headers_extra;
+ int gzip_compressed = FLB_FALSE;
+ void *gz_data = NULL;
+ size_t gz_size = -1;
+
+ header = &session->parser.headers[MK_HEADER_CONTENT_TYPE];
+ if (header->key.data == NULL) {
+ send_response(conn, 400, "error: header 'Content-Type' is not set\n");
+ return -1;
+ }
+
+ if (header->val.len >= 20 &&
+ strncasecmp(header->val.data, "application/x-ndjson", 20) == 0) {
+ type = HTTP_CONTENT_NDJSON;
+ }
+
+ if (header->val.len >= 16 &&
+ strncasecmp(header->val.data, "application/json", 16) == 0) {
+ type = HTTP_CONTENT_JSON;
+ }
+
+ if (type == -1) {
+ send_response(conn, 400, "error: invalid 'Content-Type'\n");
+ return -1;
+ }
+
+ if (request->data.len <= 0) {
+ send_response(conn, 400, "error: no payload found\n");
+ return -1;
+ }
+
+ extra_size = session->parser.headers_extra_count;
+ if (extra_size > 0) {
+ for (i = 0; i < extra_size; i++) {
+ headers_extra = &session->parser.headers_extra[i];
+ if (headers_extra->key.len == 16 &&
+ strncasecmp(headers_extra->key.data, "Content-Encoding", 16) == 0) {
+ if (headers_extra->val.len == 4 &&
+ strncasecmp(headers_extra->val.data, "gzip", 4) == 0) {
+ flb_debug("[elasticsearch_bulk_prot] body is gzipped");
+ gzip_compressed = FLB_TRUE;
+ }
+ }
+ }
+ }
+
+ if (type == HTTP_CONTENT_NDJSON || type == HTTP_CONTENT_JSON) {
+ if (gzip_compressed == FLB_TRUE) {
+ ret = flb_gzip_uncompress((void *) request->data.data, request->data.len,
+ &gz_data, &gz_size);
+ if (ret == -1) {
+ flb_error("[elasticsearch_bulk_prot] gzip uncompress is failed");
+ return -1;
+ }
+ parse_payload_ndjson(ctx, tag, gz_data, gz_size, bulk_statuses);
+ flb_free(gz_data);
+ }
+ else {
+ parse_payload_ndjson(ctx, tag, request->data.data, request->data.len, bulk_statuses);
+ }
+ }
+
+ return 0;
+}
+
+static inline int mk_http_point_header(mk_ptr_t *h,
+ struct mk_http_parser *parser, int key)
+{
+ struct mk_http_header *header;
+
+ header = &parser->headers[key];
+ if (header->type == key) {
+ h->data = header->val.data;
+ h->len = header->val.len;
+ return 0;
+ }
+ else {
+ h->data = NULL;
+ h->len = -1;
+ }
+
+ return -1;
+}
+
+/*
+ * Handle an incoming request. It perform extra checks over the request, if
+ * everything is OK, it enqueue the incoming payload.
+ */
+int in_elasticsearch_bulk_prot_handle(struct flb_in_elasticsearch *ctx,
+ struct in_elasticsearch_bulk_conn *conn,
+ struct mk_http_session *session,
+ struct mk_http_request *request)
+{
+ int i;
+ int ret;
+ int len;
+ char *uri;
+ char *qs;
+ off_t diff;
+ flb_sds_t tag;
+ struct mk_http_header *header;
+ flb_sds_t bulk_statuses = NULL;
+ flb_sds_t bulk_response = NULL;
+ char *error_str = NULL;
+
+ if (request->uri.data[0] != '/') {
+ send_response(conn, 400, "error: invalid request\n");
+ return -1;
+ }
+
+ /* Decode URI */
+ uri = mk_utils_url_decode(request->uri);
+ if (!uri) {
+ uri = mk_mem_alloc_z(request->uri.len + 1);
+ if (!uri) {
+ return -1;
+ }
+ memcpy(uri, request->uri.data, request->uri.len);
+ uri[request->uri.len] = '\0';
+ }
+
+ /* Try to match a query string so we can remove it */
+ qs = strchr(uri, '?');
+ if (qs) {
+ /* remove the query string part */
+ diff = qs - uri;
+ uri[diff] = '\0';
+ }
+
+ /* Refer the tag at first*/
+ if (ctx->ins->tag && !ctx->ins->tag_default) {
+ tag = flb_sds_create(ctx->ins->tag);
+ if (tag == NULL) {
+ return -1;
+ }
+ }
+ else {
+ /* Compose the query string using the URI */
+ len = strlen(uri);
+
+ if (len == 1) {
+ tag = NULL; /* use default tag */
+ }
+ else {
+ /* New tag skipping the URI '/' */
+ tag = flb_sds_create_len(&uri[1], len - 1);
+ if (!tag) {
+ mk_mem_free(uri);
+ return -1;
+ }
+
+ /* Sanitize, only allow alphanum chars */
+ for (i = 0; i < flb_sds_len(tag); i++) {
+ if (!isalnum(tag[i]) && tag[i] != '_' && tag[i] != '.') {
+ tag[i] = '_';
+ }
+ }
+ }
+ }
+
+ /* Check if we have a Host header: Hostname ; port */
+ mk_http_point_header(&request->host, &session->parser, MK_HEADER_HOST);
+
+ /* Header: Connection */
+ mk_http_point_header(&request->connection, &session->parser,
+ MK_HEADER_CONNECTION);
+
+ /* HTTP/1.1 needs Host header */
+ if (!request->host.data && request->protocol == MK_HTTP_PROTOCOL_11) {
+ flb_sds_destroy(tag);
+ mk_mem_free(uri);
+ return -1;
+ }
+
+ /* Should we close the session after this request ? */
+ mk_http_keepalive_check(session, request, ctx->server);
+
+ /* Content Length */
+ header = &session->parser.headers[MK_HEADER_CONTENT_LENGTH];
+ if (header->type == MK_HEADER_CONTENT_LENGTH) {
+ request->_content_length.data = header->val.data;
+ request->_content_length.len = header->val.len;
+ }
+ else {
+ request->_content_length.data = NULL;
+ }
+
+ if (request->method == MK_METHOD_HEAD) {
+ send_empty_response(conn, 200);
+
+ flb_sds_destroy(tag);
+ mk_mem_free(uri);
+
+ return 0;
+ }
+
+ if (request->method == MK_METHOD_PUT) {
+ send_json_message_response(conn, 200, "{}");
+
+ flb_sds_destroy(tag);
+ mk_mem_free(uri);
+
+ return 0;
+ }
+
+ if (request->method == MK_METHOD_GET) {
+ if (strncmp(uri, "/_nodes/http", 12) == 0) {
+ send_dummy_sniffer_response(conn, 200, ctx);
+ }
+ else if (strlen(uri) == 1 && strncmp(uri, "/", 1) == 0) {
+ send_version_message_response(ctx, conn, 200);
+ }
+ else {
+ send_json_message_response(conn, 200, "{}");
+ }
+
+ flb_sds_destroy(tag);
+ mk_mem_free(uri);
+
+ return 0;
+ }
+
+ if (request->method == MK_METHOD_POST) {
+ if (strncmp(uri, "/_bulk", 6) == 0) {
+ bulk_statuses = flb_sds_create_size(ctx->buffer_max_size);
+ if (!bulk_statuses) {
+ flb_sds_destroy(tag);
+ mk_mem_free(uri);
+ return -1;
+ }
+
+ bulk_response = flb_sds_create_size(ctx->buffer_max_size);
+ if (!bulk_response) {
+ flb_sds_destroy(bulk_statuses);
+ flb_sds_destroy(tag);
+ mk_mem_free(uri);
+ return -1;
+ }
+ } else {
+ flb_sds_destroy(tag);
+ mk_mem_free(uri);
+
+ send_response(conn, 400, "error: invaild HTTP endpoint\n");
+
+ return -1;
+ }
+ }
+
+ if (request->method != MK_METHOD_POST &&
+ request->method != MK_METHOD_GET &&
+ request->method != MK_METHOD_HEAD &&
+ request->method != MK_METHOD_PUT) {
+
+ if (bulk_statuses) {
+ flb_sds_destroy(bulk_statuses);
+ }
+ if (bulk_response) {
+ flb_sds_destroy(bulk_response);
+ }
+
+ flb_sds_destroy(tag);
+ mk_mem_free(uri);
+
+ send_response(conn, 400, "error: invalid HTTP method\n");
+ return -1;
+ }
+
+ ret = process_payload(ctx, conn, tag, session, request, bulk_statuses);
+ flb_sds_destroy(tag);
+
+ len = flb_sds_len(bulk_statuses);
+ if (flb_sds_alloc(bulk_response) < len + 27) {
+ bulk_response = flb_sds_increase(bulk_response, len + 27 - flb_sds_alloc(bulk_response));
+ }
+ error_str = strstr(bulk_statuses, "\"status\":40");
+ if (error_str){
+ flb_sds_cat(bulk_response, "{\"errors\":true,\"items\":[", 24);
+ }
+ else {
+ flb_sds_cat(bulk_response, "{\"errors\":false,\"items\":[", 25);
+ }
+ flb_sds_cat(bulk_response, bulk_statuses, flb_sds_len(bulk_statuses));
+ flb_sds_cat(bulk_response, "]}", 2);
+ send_response(conn, 200, bulk_response);
+
+ mk_mem_free(uri);
+ flb_sds_destroy(bulk_statuses);
+ flb_sds_destroy(bulk_response);
+
+ return ret;
+}
+
+/*
+ * Handle an incoming request which has resulted in an http parser error.
+ */
+int in_elasticsearch_bulk_prot_handle_error(struct flb_in_elasticsearch *ctx,
+ struct in_elasticsearch_bulk_conn *conn,
+ struct mk_http_session *session,
+ struct mk_http_request *request)
+{
+ send_response(conn, 400, "error: invalid request\n");
+ return -1;
+}
diff --git a/src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_bulk_prot.h b/src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_bulk_prot.h
new file mode 100644
index 000000000..be1aeceea
--- /dev/null
+++ b/src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_bulk_prot.h
@@ -0,0 +1,40 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2023 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_IN_ELASTICSEARCH_BULK_PROT
+#define FLB_IN_ELASTICSEARCH_BULK_PROT
+
+#define ES_VERSION_RESPONSE_TEMPLATE \
+ "{\"version\":{\"number\":\"%s\",\"build_flavor\":\"Fluent Bit OSS\"},\"tagline\":\"Fluent Bit's Bulk API compatible endpoint\"}"
+
+#define ES_NODES_TEMPLATE "{\"_nodes\":{\"total\":1,\"successful\":1,\"failed\":0}," \
+ "\"nodes\":{\"%s\":{\"name\":\"%s\",\"version\":\"8.0.0\"," \
+ "\"http\":{\"publish_address\":\"%s:%s\",\"max_content_length_in_bytes\":%ld}}}}"
+
+int in_elasticsearch_bulk_prot_handle(struct flb_in_elasticsearch *ctx,
+ struct in_elasticsearch_bulk_conn *conn,
+ struct mk_http_session *session,
+ struct mk_http_request *request);
+
+int in_elasticsearch_bulk_prot_handle_error(struct flb_in_elasticsearch *ctx,
+ struct in_elasticsearch_bulk_conn *conn,
+ struct mk_http_session *session,
+ struct mk_http_request *request);
+
+#endif
diff --git a/src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_config.c b/src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_config.c
new file mode 100644
index 000000000..4beb96320
--- /dev/null
+++ b/src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_config.c
@@ -0,0 +1,105 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2023 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_input_plugin.h>
+
+#include "in_elasticsearch.h"
+#include "in_elasticsearch_config.h"
+#include "in_elasticsearch_bulk_conn.h"
+
+struct flb_in_elasticsearch *in_elasticsearch_config_create(struct flb_input_instance *ins)
+{
+ int ret;
+ char port[8];
+ struct flb_in_elasticsearch *ctx;
+
+ ctx = flb_calloc(1, sizeof(struct flb_in_elasticsearch));
+ if (!ctx) {
+ flb_errno();
+ return NULL;
+ }
+ ctx->ins = ins;
+ mk_list_init(&ctx->connections);
+
+ /* Load the config map */
+ ret = flb_input_config_map_set(ins, (void *) ctx);
+ if (ret == -1) {
+ flb_free(ctx);
+ return NULL;
+ }
+
+ /* Listen interface (if not set, defaults to 0.0.0.0:9200) */
+ flb_input_net_default_listener("0.0.0.0", 9200, ins);
+
+ ctx->listen = flb_sds_create(ins->host.listen);
+ snprintf(port, sizeof(port) - 1, "%d", ins->host.port);
+ ctx->tcp_port = flb_sds_create(port);
+
+ /* HTTP Server specifics */
+ ctx->server = flb_calloc(1, sizeof(struct mk_server));
+ ctx->server->keep_alive = MK_TRUE;
+
+ /* monkey detects server->workers == 0 as the server not being initialized at the
+ * moment so we want to make sure that it stays that way!
+ */
+
+ ret = flb_log_event_encoder_init(&ctx->log_encoder,
+ FLB_LOG_EVENT_FORMAT_DEFAULT);
+
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ flb_plg_error(ctx->ins, "error initializing event encoder : %d", ret);
+
+ in_elasticsearch_config_destroy(ctx);
+
+ return ctx = NULL;
+ }
+
+
+ return ctx;
+}
+
+int in_elasticsearch_config_destroy(struct flb_in_elasticsearch *ctx)
+{
+ flb_log_event_encoder_destroy(&ctx->log_encoder);
+
+ /* release all connections */
+ in_elasticsearch_bulk_conn_release_all(ctx);
+
+
+ if (ctx->collector_id != -1) {
+ flb_input_collector_delete(ctx->collector_id, ctx->ins);
+
+ ctx->collector_id = -1;
+ }
+
+ if (ctx->downstream != NULL) {
+ flb_downstream_destroy(ctx->downstream);
+ }
+
+ if (ctx->server) {
+ flb_free(ctx->server);
+ }
+
+ flb_sds_destroy(ctx->listen);
+ flb_sds_destroy(ctx->tcp_port);
+
+ flb_free(ctx);
+
+ return 0;
+}
diff --git a/src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_config.h b/src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_config.h
new file mode 100644
index 000000000..28108723d
--- /dev/null
+++ b/src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_config.h
@@ -0,0 +1,29 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2023 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_IN_ELASTICSEARCH_CONFIG_H
+#define FLB_IN_ELASTICSEARCH_CONFIG_H
+
+#include <fluent-bit/flb_input_plugin.h>
+#include "in_elasticsearch.h"
+
+struct flb_in_elasticsearch *in_elasticsearch_config_create(struct flb_input_instance *ins);
+int in_elasticsearch_config_destroy(struct flb_in_elasticsearch *ctx);
+
+#endif