summaryrefslogtreecommitdiffstats
path: root/fluent-bit/plugins/in_opentelemetry
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--fluent-bit/plugins/in_opentelemetry/CMakeLists.txt12
-rw-r--r--fluent-bit/plugins/in_opentelemetry/http_conn.c301
-rw-r--r--fluent-bit/plugins/in_opentelemetry/http_conn.h57
-rw-r--r--fluent-bit/plugins/in_opentelemetry/opentelemetry.c200
-rw-r--r--fluent-bit/plugins/in_opentelemetry/opentelemetry.h51
-rw-r--r--fluent-bit/plugins/in_opentelemetry/opentelemetry_config.c92
-rw-r--r--fluent-bit/plugins/in_opentelemetry/opentelemetry_config.h29
-rw-r--r--fluent-bit/plugins/in_opentelemetry/opentelemetry_prot.c1674
-rw-r--r--fluent-bit/plugins/in_opentelemetry/opentelemetry_prot.h31
9 files changed, 2447 insertions, 0 deletions
diff --git a/fluent-bit/plugins/in_opentelemetry/CMakeLists.txt b/fluent-bit/plugins/in_opentelemetry/CMakeLists.txt
new file mode 100644
index 000000000..4c3d6db32
--- /dev/null
+++ b/fluent-bit/plugins/in_opentelemetry/CMakeLists.txt
@@ -0,0 +1,12 @@
+if(NOT FLB_METRICS)
+ message(FATAL_ERROR "OpenTelemetry input plugin requires FLB_HTTP_SERVER=On.")
+endif()
+
+set(src
+ http_conn.c
+ opentelemetry.c
+ opentelemetry_prot.c
+ opentelemetry_config.c
+ )
+
+FLB_PLUGIN(in_opentelemetry "${src}" "monkey-core-static")
diff --git a/fluent-bit/plugins/in_opentelemetry/http_conn.c b/fluent-bit/plugins/in_opentelemetry/http_conn.c
new file mode 100644
index 000000000..a402295b1
--- /dev/null
+++ b/fluent-bit/plugins/in_opentelemetry/http_conn.c
@@ -0,0 +1,301 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_engine.h>
+#include <fluent-bit/flb_downstream.h>
+
+#include "opentelemetry.h"
+#include "http_conn.h"
+#include "opentelemetry_prot.h"
+
+static void opentelemetry_conn_request_init(struct mk_http_session *session,
+ struct mk_http_request *request);
+
+static int opentelemetry_conn_event(void *data)
+{
+ int status;
+ size_t size;
+ ssize_t available;
+ ssize_t bytes;
+ char *tmp;
+ char *request_end;
+ size_t request_len;
+ struct http_conn *conn;
+ struct mk_event *event;
+ struct flb_opentelemetry *ctx;
+ struct flb_connection *connection;
+
+ connection = (struct flb_connection *) data;
+
+ conn = connection->user_data;
+
+ ctx = conn->ctx;
+
+ event = &connection->event;
+
+ if (event->mask & MK_EVENT_READ) {
+ available = (conn->buf_size - conn->buf_len) - 1;
+ if (available < 1) {
+ if (conn->buf_size + ctx->buffer_chunk_size > ctx->buffer_max_size) {
+ flb_plg_trace(ctx->ins,
+ "fd=%i incoming data exceed limit (%zu KB)",
+ event->fd, (ctx->buffer_max_size / 1024));
+ opentelemetry_conn_del(conn);
+ return -1;
+ }
+
+ size = conn->buf_size + ctx->buffer_chunk_size;
+ tmp = flb_realloc(conn->buf_data, size);
+ if (!tmp) {
+ flb_errno();
+ return -1;
+ }
+ flb_plg_trace(ctx->ins, "fd=%i buffer realloc %i -> %zu",
+ event->fd, conn->buf_size, size);
+
+ conn->buf_data = tmp;
+ conn->buf_size = size;
+ available = (conn->buf_size - conn->buf_len) - 1;
+ }
+
+ /* Read data */
+ bytes = flb_io_net_read(connection,
+ (void *) &conn->buf_data[conn->buf_len],
+ available);
+
+ if (bytes <= 0) {
+ flb_plg_trace(ctx->ins, "fd=%i closed connection", event->fd);
+ opentelemetry_conn_del(conn);
+ return -1;
+ }
+
+ flb_plg_trace(ctx->ins, "read()=%zi pre_len=%i now_len=%zi",
+ bytes, conn->buf_len, conn->buf_len + bytes);
+ conn->buf_len += bytes;
+ conn->buf_data[conn->buf_len] = '\0';
+
+ status = mk_http_parser(&conn->request, &conn->session.parser,
+ conn->buf_data, conn->buf_len, conn->session.server);
+
+ if (status == MK_HTTP_PARSER_OK) {
+ /* Do more logic parsing and checks for this request */
+ opentelemetry_prot_handle(ctx, conn, &conn->session, &conn->request);
+
+ /* Evict the processed request from the connection buffer and reinitialize
+ * the HTTP parser.
+ */
+
+ request_end = NULL;
+
+ if (NULL != conn->request.data.data) {
+ request_end = &conn->request.data.data[conn->request.data.len];
+ }
+ else {
+ request_end = strstr(conn->buf_data, "\r\n\r\n");
+
+ if(NULL != request_end) {
+ request_end = &request_end[4];
+ }
+ }
+
+ if (NULL != request_end) {
+ request_len = (size_t)(request_end - conn->buf_data);
+
+ if (0 < (conn->buf_len - request_len)) {
+ memmove(conn->buf_data, &conn->buf_data[request_len],
+ conn->buf_len - request_len);
+
+ conn->buf_data[conn->buf_len - request_len] = '\0';
+ conn->buf_len -= request_len;
+ }
+ else {
+ memset(conn->buf_data, 0, request_len);
+
+ conn->buf_len = 0;
+ }
+
+ /* Reinitialize the parser so the next request is properly
+ * handled, the additional memset intends to wipe any left over data
+ * from the headers parsed in the previous request.
+ */
+ memset(&conn->session.parser, 0, sizeof(struct mk_http_parser));
+ mk_http_parser_init(&conn->session.parser);
+ opentelemetry_conn_request_init(&conn->session, &conn->request);
+ }
+ }
+ else if (status == MK_HTTP_PARSER_ERROR) {
+ opentelemetry_prot_handle_error(ctx, conn, &conn->session, &conn->request);
+
+ /* Reinitialize the parser so the next request is properly
+ * handled, the additional memset intends to wipe any left over data
+ * from the headers parsed in the previous request.
+ */
+ memset(&conn->session.parser, 0, sizeof(struct mk_http_parser));
+ mk_http_parser_init(&conn->session.parser);
+ opentelemetry_conn_request_init(&conn->session, &conn->request);
+ }
+
+ /* FIXME: add Protocol handler here */
+ return bytes;
+ }
+
+ if (event->mask & MK_EVENT_CLOSE) {
+ flb_plg_trace(ctx->ins, "fd=%i hangup", event->fd);
+ opentelemetry_conn_del(conn);
+ return -1;
+ }
+
+ return 0;
+
+}
+
+static void opentelemetry_conn_session_init(struct mk_http_session *session,
+ struct mk_server *server,
+ int client_fd)
+{
+ /* Alloc memory for node */
+ session->_sched_init = MK_TRUE;
+ session->pipelined = MK_FALSE;
+ session->counter_connections = 0;
+ session->close_now = MK_FALSE;
+ session->status = MK_REQUEST_STATUS_INCOMPLETE;
+ session->server = server;
+ session->socket = client_fd;
+
+ /* creation time in unix time */
+ session->init_time = time(NULL);
+
+ session->channel = mk_channel_new(MK_CHANNEL_SOCKET, session->socket);
+ session->channel->io = session->server->network;
+
+ /* Init session request list */
+ mk_list_init(&session->request_list);
+
+ /* Initialize the parser */
+ mk_http_parser_init(&session->parser);
+}
+
+static void opentelemetry_conn_request_init(struct mk_http_session *session,
+ struct mk_http_request *request)
+{
+ memset(request, 0, sizeof(struct mk_http_request));
+
+ mk_http_request_init(session, request, session->server);
+
+ request->in_headers.type = MK_STREAM_IOV;
+ request->in_headers.dynamic = MK_FALSE;
+ request->in_headers.cb_consumed = NULL;
+ request->in_headers.cb_finished = NULL;
+ request->in_headers.stream = &request->stream;
+
+ mk_list_add(&request->in_headers._head, &request->stream.inputs);
+
+ request->session = session;
+}
+
+struct http_conn *opentelemetry_conn_add(struct flb_connection *connection,
+ struct flb_opentelemetry *ctx)
+{
+ struct http_conn *conn;
+ int ret;
+
+ conn = flb_calloc(1, sizeof(struct http_conn));
+ if (!conn) {
+ flb_errno();
+ return NULL;
+ }
+ conn->connection = connection;
+
+ /* Set data for the event-loop */
+ MK_EVENT_NEW(&connection->event);
+
+ connection->user_data = conn;
+ connection->event.type = FLB_ENGINE_EV_CUSTOM;
+ connection->event.handler = opentelemetry_conn_event;
+
+ /* Connection info */
+ conn->ctx = ctx;
+ conn->buf_len = 0;
+
+ conn->buf_data = flb_malloc(ctx->buffer_chunk_size);
+ if (!conn->buf_data) {
+ flb_errno();
+ flb_plg_error(ctx->ins, "could not allocate new connection");
+ flb_free(conn);
+ return NULL;
+ }
+ conn->buf_size = ctx->buffer_chunk_size;
+
+ /* Register instance into the event loop */
+ ret = mk_event_add(flb_engine_evl_get(),
+ connection->fd,
+ FLB_ENGINE_EV_CUSTOM,
+ MK_EVENT_READ,
+ &connection->event);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "could not register new connection");
+ flb_free(conn->buf_data);
+ flb_free(conn);
+ return NULL;
+ }
+
+ /* Initialize HTTP Session: this is a custom context for Monkey HTTP */
+ opentelemetry_conn_session_init(&conn->session, ctx->server, connection->fd);
+
+ /* Initialize HTTP Request: this is the initial request and it will be reinitialized
+ * automatically after the request is handled so it can be used for the next one.
+ */
+ opentelemetry_conn_request_init(&conn->session, &conn->request);
+
+ /* Link connection node to parent context list */
+ mk_list_add(&conn->_head, &ctx->connections);
+ return conn;
+}
+
+int opentelemetry_conn_del(struct http_conn *conn)
+{
+ if (conn->session.channel != NULL) {
+ mk_channel_release(conn->session.channel);
+ }
+
+ /* The downstream unregisters the file descriptor from the event-loop
+ * so there's nothing to be done by the plugin
+ */
+ flb_downstream_conn_release(conn->connection);
+
+ mk_list_del(&conn->_head);
+
+ flb_free(conn->buf_data);
+ flb_free(conn);
+
+ return 0;
+}
+
+void opentelemetry_conn_release_all(struct flb_opentelemetry *ctx)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct http_conn *conn;
+
+ mk_list_foreach_safe(head, tmp, &ctx->connections) {
+ conn = mk_list_entry(head, struct http_conn, _head);
+ opentelemetry_conn_del(conn);
+ }
+}
diff --git a/fluent-bit/plugins/in_opentelemetry/http_conn.h b/fluent-bit/plugins/in_opentelemetry/http_conn.h
new file mode 100644
index 000000000..60627d860
--- /dev/null
+++ b/fluent-bit/plugins/in_opentelemetry/http_conn.h
@@ -0,0 +1,57 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_IN_HTTP_CONN
+#define FLB_IN_HTTP_CONN
+
+#include <fluent-bit/flb_input_plugin.h>
+#include <monkey/mk_http.h>
+#include <monkey/mk_http_parser.h>
+#include <monkey/mk_utils.h>
+
+#include "opentelemetry.h"
+
+struct http_conn {
+ struct mk_event event; /* Built-in event data for mk_events */
+
+ /* Buffer */
+ char *buf_data; /* Buffer data */
+ int buf_len; /* Data length */
+ int buf_size; /* Buffer size */
+
+ /*
+ * Parser context: we only held one parser per connection
+ * which is re-used everytime we have a new request.
+ */
+ struct mk_http_parser parser;
+ struct mk_http_request request;
+ struct mk_http_session session;
+ struct flb_connection *connection;
+
+ void *ctx; /* Plugin parent context */
+ struct mk_list _head; /* link to flb_opentelemetry->connections */
+};
+
+struct http_conn *opentelemetry_conn_add(struct flb_connection *connection,
+ struct flb_opentelemetry *ctx);
+int opentelemetry_conn_del(struct http_conn *conn);
+void opentelemetry_conn_release_all(struct flb_opentelemetry *ctx);
+
+
+#endif
diff --git a/fluent-bit/plugins/in_opentelemetry/opentelemetry.c b/fluent-bit/plugins/in_opentelemetry/opentelemetry.c
new file mode 100644
index 000000000..5cd26f8e6
--- /dev/null
+++ b/fluent-bit/plugins/in_opentelemetry/opentelemetry.c
@@ -0,0 +1,200 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_downstream.h>
+#include <fluent-bit/flb_network.h>
+#include <fluent-bit/flb_config.h>
+
+#include "http_conn.h"
+#include "opentelemetry.h"
+#include "opentelemetry_config.h"
+
+/*
+ * For a server event, the collection event means a new client have arrived, we
+ * accept the connection and create a new TCP instance which will wait for
+ * JSON map messages.
+ */
+static int in_opentelemetry_collect(struct flb_input_instance *ins,
+ struct flb_config *config, void *in_context)
+{
+ struct flb_connection *connection;
+ struct http_conn *conn;
+ struct flb_opentelemetry *ctx;
+
+ ctx = in_context;
+
+ connection = flb_downstream_conn_get(ctx->downstream);
+
+ if (connection == NULL) {
+ flb_plg_error(ctx->ins, "could not accept new connection");
+
+ return -1;
+ }
+
+ flb_plg_trace(ctx->ins, "new TCP connection arrived FD=%i", connection->fd);
+
+ conn = opentelemetry_conn_add(connection, ctx);
+
+ if (conn == NULL) {
+ return -1;
+ }
+
+ return 0;
+}
+
+static int in_opentelemetry_init(struct flb_input_instance *ins,
+ struct flb_config *config, void *data)
+{
+ unsigned short int port;
+ int ret;
+ struct flb_opentelemetry *ctx;
+
+ (void) data;
+
+ /* Create context and basic conf */
+ ctx = opentelemetry_config_create(ins);
+ if (!ctx) {
+ return -1;
+ }
+ ctx->collector_id = -1;
+
+ /* Populate context with config map defaults and incoming properties */
+ ret = flb_input_config_map_set(ins, (void *) ctx);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "configuration error");
+ opentelemetry_config_destroy(ctx);
+ return -1;
+ }
+
+ /* Set the context */
+ flb_input_set_context(ins, ctx);
+
+ port = (unsigned short int) strtoul(ctx->tcp_port, NULL, 10);
+
+ ctx->downstream = flb_downstream_create(FLB_TRANSPORT_TCP,
+ ins->flags,
+ ctx->listen,
+ port,
+ ins->tls,
+ config,
+ &ins->net_setup);
+
+ if (ctx->downstream == NULL) {
+ flb_plg_error(ctx->ins,
+ "could not initialize downstream on %s:%s. Aborting",
+ ctx->listen, ctx->tcp_port);
+
+ opentelemetry_config_destroy(ctx);
+
+ return -1;
+ }
+
+ flb_input_downstream_set(ctx->downstream, ctx->ins);
+
+ flb_plg_info(ctx->ins, "listening on %s:%s", ctx->listen, ctx->tcp_port);
+
+ if (ctx->successful_response_code != 200 &&
+ ctx->successful_response_code != 201 &&
+ ctx->successful_response_code != 204) {
+ flb_plg_error(ctx->ins, "%d is not supported response code. Use default 201",
+ ctx->successful_response_code);
+ ctx->successful_response_code = 201;
+ }
+
+ /* Collect upon data available on the standard input */
+ ret = flb_input_set_collector_socket(ins,
+ in_opentelemetry_collect,
+ ctx->downstream->server_fd,
+ config);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "Could not set collector for IN_TCP input plugin");
+ opentelemetry_config_destroy(ctx);
+ return -1;
+ }
+
+ ctx->collector_id = ret;
+
+ return 0;
+}
+
+static int in_opentelemetry_exit(void *data, struct flb_config *config)
+{
+ struct flb_opentelemetry *ctx;
+
+ (void) config;
+
+ ctx = data;
+
+ if (ctx != NULL) {
+ opentelemetry_config_destroy(ctx);
+ }
+
+ return 0;
+}
+
+/* Configuration properties map */
+static struct flb_config_map config_map[] = {
+ {
+ FLB_CONFIG_MAP_SIZE, "buffer_max_size", HTTP_BUFFER_MAX_SIZE,
+ 0, FLB_TRUE, offsetof(struct flb_opentelemetry, buffer_max_size),
+ ""
+ },
+
+ {
+ FLB_CONFIG_MAP_SIZE, "buffer_chunk_size", HTTP_BUFFER_CHUNK_SIZE,
+ 0, FLB_TRUE, offsetof(struct flb_opentelemetry, buffer_chunk_size),
+ ""
+ },
+
+ {
+ FLB_CONFIG_MAP_STR, "tag_key", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_opentelemetry, tag_key),
+ ""
+ },
+ {
+ FLB_CONFIG_MAP_INT, "successful_response_code", "201",
+ 0, FLB_TRUE, offsetof(struct flb_opentelemetry, successful_response_code),
+ "Set successful response code. 200, 201 and 204 are supported."
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "raw_traces", "false",
+ 0, FLB_TRUE, offsetof(struct flb_opentelemetry, raw_traces),
+ "Forward traces without processing"
+ },
+
+ /* EOF */
+ {0}
+};
+
+/* Plugin reference */
+struct flb_input_plugin in_opentelemetry_plugin = {
+ .name = "opentelemetry",
+ .description = "OpenTelemetry",
+ .cb_init = in_opentelemetry_init,
+ .cb_pre_run = NULL,
+ .cb_collect = in_opentelemetry_collect,
+ .cb_flush_buf = NULL,
+ .cb_pause = NULL,
+ .cb_resume = NULL,
+ .cb_exit = in_opentelemetry_exit,
+ .config_map = config_map,
+ .flags = FLB_INPUT_NET_SERVER | FLB_IO_OPT_TLS
+};
diff --git a/fluent-bit/plugins/in_opentelemetry/opentelemetry.h b/fluent-bit/plugins/in_opentelemetry/opentelemetry.h
new file mode 100644
index 000000000..512f2ab6f
--- /dev/null
+++ b/fluent-bit/plugins/in_opentelemetry/opentelemetry.h
@@ -0,0 +1,51 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_IN_OPENTELEMETRY_H
+#define FLB_IN_OPENTELEMETRY_H
+
+#include <fluent-bit/flb_config.h>
+#include <fluent-bit/flb_input.h>
+#include <fluent-bit/flb_utils.h>
+
+#include <monkey/monkey.h>
+
+#define HTTP_BUFFER_MAX_SIZE "4M"
+#define HTTP_BUFFER_CHUNK_SIZE "512K"
+
+struct flb_opentelemetry {
+ int successful_response_code;
+ flb_sds_t listen;
+ flb_sds_t tcp_port;
+ const char *tag_key;
+ bool raw_traces;
+
+ size_t buffer_max_size; /* Maximum buffer size */
+ size_t buffer_chunk_size; /* Chunk allocation size */
+
+ int collector_id; /* Listener collector id */
+ struct flb_downstream *downstream; /* Client manager */
+ struct mk_list connections; /* linked list of connections */
+
+ struct mk_server *server;
+ struct flb_input_instance *ins;
+};
+
+
+#endif
diff --git a/fluent-bit/plugins/in_opentelemetry/opentelemetry_config.c b/fluent-bit/plugins/in_opentelemetry/opentelemetry_config.c
new file mode 100644
index 000000000..b57596f94
--- /dev/null
+++ b/fluent-bit/plugins/in_opentelemetry/opentelemetry_config.c
@@ -0,0 +1,92 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_downstream.h>
+
+#include "opentelemetry.h"
+#include "http_conn.h"
+
+/* default HTTP port for OTLP/HTTP is 4318 */
+#define OTLP_HTTP_PORT 4318
+
+struct flb_opentelemetry *opentelemetry_config_create(struct flb_input_instance *ins)
+{
+ int ret;
+ char port[8];
+ struct flb_opentelemetry *ctx;
+
+ ctx = flb_calloc(1, sizeof(struct flb_opentelemetry));
+ if (!ctx) {
+ flb_errno();
+ return NULL;
+ }
+ ctx->ins = ins;
+ mk_list_init(&ctx->connections);
+
+ /* Load the config map */
+ ret = flb_input_config_map_set(ins, (void *) ctx);
+ if (ret == -1) {
+ flb_free(ctx);
+ return NULL;
+ }
+
+ /* Listen interface (if not set, defaults to 0.0.0.0:4318) */
+ flb_input_net_default_listener("0.0.0.0", OTLP_HTTP_PORT, ins);
+
+ ctx->listen = flb_strdup(ins->host.listen);
+ snprintf(port, sizeof(port) - 1, "%d", ins->host.port);
+ ctx->tcp_port = flb_strdup(port);
+
+ /* HTTP Server specifics */
+ ctx->server = flb_calloc(1, sizeof(struct mk_server));
+ ctx->server->keep_alive = MK_TRUE;
+
+ /* monkey detects server->workers == 0 as the server not being initialized at the
+ * moment so we want to make sure that it stays that way!
+ */
+
+ return ctx;
+}
+
+int opentelemetry_config_destroy(struct flb_opentelemetry *ctx)
+{
+ /* release all connections */
+ opentelemetry_conn_release_all(ctx);
+
+ if (ctx->collector_id != -1) {
+ flb_input_collector_delete(ctx->collector_id, ctx->ins);
+
+ ctx->collector_id = -1;
+ }
+
+ if (ctx->downstream != NULL) {
+ flb_downstream_destroy(ctx->downstream);
+ }
+
+ if (ctx->server) {
+ flb_free(ctx->server);
+ }
+
+ flb_free(ctx->listen);
+ flb_free(ctx->tcp_port);
+ flb_free(ctx);
+
+ return 0;
+}
diff --git a/fluent-bit/plugins/in_opentelemetry/opentelemetry_config.h b/fluent-bit/plugins/in_opentelemetry/opentelemetry_config.h
new file mode 100644
index 000000000..0d980c7aa
--- /dev/null
+++ b/fluent-bit/plugins/in_opentelemetry/opentelemetry_config.h
@@ -0,0 +1,29 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_IN_OPENTELEMETRY_CONFIG_H
+#define FLB_IN_OPENTELEMETRY_CONFIG_H
+
+#include <fluent-bit/flb_input_plugin.h>
+#include "opentelemetry.h"
+
+struct flb_opentelemetry *opentelemetry_config_create(struct flb_input_instance *ins);
+int opentelemetry_config_destroy(struct flb_opentelemetry *ctx);
+
+#endif
diff --git a/fluent-bit/plugins/in_opentelemetry/opentelemetry_prot.c b/fluent-bit/plugins/in_opentelemetry/opentelemetry_prot.c
new file mode 100644
index 000000000..c9ccba7f9
--- /dev/null
+++ b/fluent-bit/plugins/in_opentelemetry/opentelemetry_prot.c
@@ -0,0 +1,1674 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_version.h>
+#include <fluent-bit/flb_error.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_gzip.h>
+#include <fluent-bit/flb_snappy.h>
+#include <fluent-bit/flb_log_event_encoder.h>
+
+#include <monkey/monkey.h>
+#include <monkey/mk_core.h>
+#include <cmetrics/cmt_decode_opentelemetry.h>
+
+#include <fluent-otel-proto/fluent-otel.h>
+#include "opentelemetry.h"
+#include "http_conn.h"
+
+#define HTTP_CONTENT_JSON 0
+
+static int json_payload_append_converted_value(
+ struct flb_log_event_encoder *encoder,
+ int target_field,
+ msgpack_object *object);
+
+static int json_payload_append_converted_array(
+ struct flb_log_event_encoder *encoder,
+ int target_field,
+ msgpack_object *object);
+
+static int json_payload_append_converted_kvlist(
+ struct flb_log_event_encoder *encoder,
+ int target_field,
+ msgpack_object *object);
+
+static int json_payload_to_msgpack(struct flb_opentelemetry *ctx,
+ struct flb_log_event_encoder *encoder,
+ const char *body,
+ size_t len);
+
+static int otlp_pack_any_value(msgpack_packer *mp_pck,
+ Opentelemetry__Proto__Common__V1__AnyValue *body);
+
+static int send_response(struct http_conn *conn, int http_status, char *message)
+{
+ int len;
+ flb_sds_t out;
+ size_t sent;
+
+ out = flb_sds_create_size(256);
+ if (!out) {
+ return -1;
+ }
+
+ if (message) {
+ len = strlen(message);
+ }
+ else {
+ len = 0;
+ }
+
+ if (http_status == 201) {
+ flb_sds_printf(&out,
+ "HTTP/1.1 201 Created \r\n"
+ "Server: Fluent Bit v%s\r\n"
+ "Content-Length: 0\r\n\r\n",
+ FLB_VERSION_STR);
+ }
+ else if (http_status == 200) {
+ flb_sds_printf(&out,
+ "HTTP/1.1 200 OK\r\n"
+ "Server: Fluent Bit v%s\r\n"
+ "Content-Length: 0\r\n\r\n",
+ FLB_VERSION_STR);
+ }
+ else if (http_status == 204) {
+ flb_sds_printf(&out,
+ "HTTP/1.1 204 No Content\r\n"
+ "Server: Fluent Bit v%s\r\n"
+ "\r\n",
+ FLB_VERSION_STR);
+ }
+ else if (http_status == 400) {
+ flb_sds_printf(&out,
+ "HTTP/1.1 400 Forbidden\r\n"
+ "Server: Fluent Bit v%s\r\n"
+ "Content-Length: %i\r\n\r\n%s",
+ FLB_VERSION_STR,
+ len, message);
+ }
+
+ /* We should check the outcome of this operation */
+ flb_io_net_write(conn->connection,
+ (void *) out,
+ flb_sds_len(out),
+ &sent);
+
+ flb_sds_destroy(out);
+
+ return 0;
+}
+
+static int process_payload_metrics(struct flb_opentelemetry *ctx, struct http_conn *conn,
+ flb_sds_t tag,
+ struct mk_http_session *session,
+ struct mk_http_request *request)
+{
+ struct cfl_list decoded_contexts;
+ struct cfl_list *iterator;
+ struct cmt *context;
+ size_t offset;
+ int result;
+
+ offset = 0;
+
+ result = cmt_decode_opentelemetry_create(&decoded_contexts,
+ request->data.data,
+ request->data.len,
+ &offset);
+
+ if (result == CMT_DECODE_OPENTELEMETRY_SUCCESS) {
+ cfl_list_foreach(iterator, &decoded_contexts) {
+ context = cfl_list_entry(iterator, struct cmt, _head);
+
+ result = flb_input_metrics_append(ctx->ins, NULL, 0, context);
+
+ if (result != 0) {
+ flb_plg_debug(ctx->ins, "could not ingest metrics context : %d", result);
+ }
+ }
+
+ cmt_decode_opentelemetry_destroy(&decoded_contexts);
+ }
+
+ return 0;
+}
+
+static int process_payload_traces_proto(struct flb_opentelemetry *ctx, struct http_conn *conn,
+ flb_sds_t tag,
+ struct mk_http_session *session,
+ struct mk_http_request *request)
+{
+ struct ctrace *decoded_context;
+ size_t offset;
+ int result;
+
+ offset = 0;
+ result = ctr_decode_opentelemetry_create(&decoded_context,
+ request->data.data,
+ request->data.len,
+ &offset);
+ if (result == 0) {
+ result = flb_input_trace_append(ctx->ins, NULL, 0, decoded_context);
+ ctr_decode_opentelemetry_destroy(decoded_context);
+ }
+
+ return result;
+}
+
+static int process_payload_raw_traces(struct flb_opentelemetry *ctx, struct http_conn *conn,
+ flb_sds_t tag,
+ struct mk_http_session *session,
+ struct mk_http_request *request)
+{
+ int ret;
+ int root_type;
+ char *out_buf = NULL;
+ size_t out_size;
+
+ msgpack_packer mp_pck;
+ msgpack_sbuffer mp_sbuf;
+
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ msgpack_pack_array(&mp_pck, 2);
+ flb_pack_time_now(&mp_pck);
+
+ /* Check if the incoming payload is a valid JSON message and convert it to msgpack */
+ ret = flb_pack_json(request->data.data, request->data.len,
+ &out_buf, &out_size, &root_type, NULL);
+
+ if (ret == 0 && root_type == JSMN_OBJECT) {
+ /* JSON found, pack it msgpack representation */
+ msgpack_sbuffer_write(&mp_sbuf, out_buf, out_size);
+ }
+ else {
+ /* the content might be a binary payload or invalid JSON */
+ msgpack_pack_map(&mp_pck, 1);
+ msgpack_pack_str_with_body(&mp_pck, "trace", 5);
+ msgpack_pack_str_with_body(&mp_pck, request->data.data, request->data.len);
+ }
+
+ /* release 'out_buf' if it was allocated */
+ if (out_buf) {
+ flb_free(out_buf);
+ }
+
+ flb_input_log_append(ctx->ins, tag, flb_sds_len(tag), mp_sbuf.data, mp_sbuf.size);
+ msgpack_sbuffer_destroy(&mp_sbuf);
+
+ return 0;
+}
+
+static int process_payload_traces(struct flb_opentelemetry *ctx, struct http_conn *conn,
+ flb_sds_t tag,
+ struct mk_http_session *session,
+ struct mk_http_request *request)
+{
+ int result;
+
+ if (ctx->raw_traces) {
+ result = process_payload_raw_traces(ctx, conn, tag, session, request);
+ }
+ else {
+ result = process_payload_traces_proto(ctx, conn, tag, session, request);
+ }
+
+ return result;
+}
+
+static int otel_pack_string(msgpack_packer *mp_pck, char *str)
+{
+ return msgpack_pack_str_with_body(mp_pck, str, strlen(str));
+}
+
+static int otel_pack_bool(msgpack_packer *mp_pck, bool val)
+{
+ if (val) {
+ return msgpack_pack_true(mp_pck);
+ }
+ else {
+ return msgpack_pack_false(mp_pck);
+ }
+}
+
+static int otel_pack_int(msgpack_packer *mp_pck, int val)
+{
+ return msgpack_pack_int64(mp_pck, val);
+}
+
+static int otel_pack_double(msgpack_packer *mp_pck, double val)
+{
+ return msgpack_pack_double(mp_pck, val);
+}
+
+static int otel_pack_kvarray(msgpack_packer *mp_pck,
+ Opentelemetry__Proto__Common__V1__KeyValue **kv_array,
+ size_t kv_count)
+{
+ int result;
+ int index;
+
+ result = msgpack_pack_map(mp_pck, kv_count);
+
+ if (result != 0) {
+ return result;
+ }
+
+ for (index = 0; index < kv_count && result == 0; index++) {
+ result = otel_pack_string(mp_pck, kv_array[index]->key);
+
+ if(result == 0) {
+ result = otlp_pack_any_value(mp_pck, kv_array[index]->value);
+ }
+ }
+
+ return result;
+}
+
+static int otel_pack_kvlist(msgpack_packer *mp_pck,
+ Opentelemetry__Proto__Common__V1__KeyValueList *kv_list)
+{
+ int kv_index;
+ int ret;
+ char *key;
+ Opentelemetry__Proto__Common__V1__AnyValue *value;
+
+ ret = msgpack_pack_map(mp_pck, kv_list->n_values);
+ if (ret != 0) {
+ return ret;
+ }
+
+ for (kv_index = 0; kv_index < kv_list->n_values && ret == 0; kv_index++) {
+ key = kv_list->values[kv_index]->key;
+ value = kv_list->values[kv_index]->value;
+
+ ret = otel_pack_string(mp_pck, key);
+
+ if(ret == 0) {
+ ret = otlp_pack_any_value(mp_pck, value);
+ }
+ }
+
+ return ret;
+}
+
+static int otel_pack_array(msgpack_packer *mp_pck,
+ Opentelemetry__Proto__Common__V1__ArrayValue *array)
+{
+ int ret;
+ int array_index;
+
+ ret = msgpack_pack_array(mp_pck, array->n_values);
+
+ if (ret != 0) {
+ return ret;
+ }
+
+ for (array_index = 0; array_index < array->n_values && ret == 0; array_index++) {
+ ret = otlp_pack_any_value(mp_pck, array->values[array_index]);
+ }
+
+ return ret;
+}
+
+static int otel_pack_bytes(msgpack_packer *mp_pck,
+ ProtobufCBinaryData bytes)
+{
+ return msgpack_pack_bin_with_body(mp_pck, bytes.data, bytes.len);
+}
+
+static int otlp_pack_any_value(msgpack_packer *mp_pck,
+ Opentelemetry__Proto__Common__V1__AnyValue *body)
+{
+ int result;
+
+ result = -2;
+
+ switch(body->value_case){
+ case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_STRING_VALUE:
+ result = otel_pack_string(mp_pck, body->string_value);
+ break;
+
+ case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_BOOL_VALUE:
+ result = otel_pack_bool(mp_pck, body->bool_value);
+ break;
+
+ case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_INT_VALUE:
+ result = otel_pack_int(mp_pck, body->int_value);
+ break;
+
+ case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_DOUBLE_VALUE:
+ result = otel_pack_double(mp_pck, body->double_value);
+ break;
+
+ case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_ARRAY_VALUE:
+ result = otel_pack_array(mp_pck, body->array_value);
+ break;
+
+ case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_KVLIST_VALUE:
+ result = otel_pack_kvlist(mp_pck, body->kvlist_value);
+ break;
+
+ case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_BYTES_VALUE:
+ result = otel_pack_bytes(mp_pck, body->bytes_value);
+ break;
+
+ default:
+ break;
+ }
+
+ if (result == -2) {
+ flb_error("[otel]: invalid value type in pack_any_value");
+ result = -1;
+ }
+
+ return result;
+}
+
+static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder,
+ uint8_t *in_buf,
+ size_t in_size)
+{
+ int ret;
+ msgpack_packer packer;
+ msgpack_sbuffer buffer;
+ int resource_logs_index;
+ int scope_log_index;
+ int log_record_index;
+
+ Opentelemetry__Proto__Collector__Logs__V1__ExportLogsServiceRequest *input_logs;
+ Opentelemetry__Proto__Logs__V1__ScopeLogs **scope_logs;
+ Opentelemetry__Proto__Logs__V1__ScopeLogs *scope_log;
+ Opentelemetry__Proto__Logs__V1__ResourceLogs **resource_logs;
+ Opentelemetry__Proto__Logs__V1__ResourceLogs *resource_log;
+ Opentelemetry__Proto__Logs__V1__LogRecord **log_records;
+
+ msgpack_sbuffer_init(&buffer);
+ msgpack_packer_init(&packer, &buffer, msgpack_sbuffer_write);
+
+ input_logs = opentelemetry__proto__collector__logs__v1__export_logs_service_request__unpack(NULL, in_size, in_buf);
+ if (input_logs == NULL) {
+ flb_error("[otel] Failed to unpack input logs");
+ return -1;
+ }
+
+ resource_logs = input_logs->resource_logs;
+ if (resource_logs == NULL) {
+ flb_error("[otel] No resource logs found");
+ return -1;
+ }
+
+ for (resource_logs_index = 0; resource_logs_index < input_logs->n_resource_logs; resource_logs_index++) {
+ resource_log = resource_logs[resource_logs_index];
+ scope_logs = resource_log->scope_logs;
+
+ if (resource_log->n_scope_logs > 0 && scope_logs == NULL) {
+ flb_error("[otel] No scope logs found");
+ return -1;
+ }
+
+ for (scope_log_index = 0; scope_log_index < resource_log->n_scope_logs; scope_log_index++) {
+ scope_log = scope_logs[scope_log_index];
+ log_records = scope_log->log_records;
+
+ if (log_records == NULL) {
+ flb_error("[otel] No log records found");
+ return -1;
+ }
+
+ for (log_record_index=0; log_record_index < scope_log->n_log_records; log_record_index++) {
+ ret = flb_log_event_encoder_begin_record(encoder);
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_set_current_timestamp(encoder);
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = otel_pack_kvarray(
+ &packer,
+ log_records[log_record_index]->attributes,
+ log_records[log_record_index]->n_attributes);
+
+ if (ret != 0) {
+ flb_error("[otel] Failed to convert log record attributes");
+
+ ret = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE;
+ }
+ else {
+ ret = flb_log_event_encoder_set_metadata_from_raw_msgpack(
+ encoder,
+ buffer.data,
+ buffer.size);
+ }
+
+ msgpack_sbuffer_clear(&buffer);
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = otlp_pack_any_value(
+ &packer,
+ log_records[log_record_index]->body);
+
+ if (ret != 0) {
+ flb_error("[otel] Failed to convert log record body");
+
+ ret = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE;
+ }
+ else {
+ if (log_records[log_record_index]->body->value_case ==
+ OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_KVLIST_VALUE) {
+ ret = flb_log_event_encoder_set_body_from_raw_msgpack(
+ encoder,
+ buffer.data,
+ buffer.size);
+ }
+ else {
+ ret = flb_log_event_encoder_append_body_values(
+ encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE("message"),
+ FLB_LOG_EVENT_MSGPACK_RAW_VALUE(buffer.data, buffer.size));
+ }
+ }
+
+ msgpack_sbuffer_clear(&buffer);
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_commit_record(encoder);
+ }
+ else {
+ flb_error("[otel] marshalling error");
+
+ msgpack_sbuffer_destroy(&buffer);
+
+ return -1;
+ }
+ }
+ }
+ }
+
+ msgpack_sbuffer_destroy(&buffer);
+
+ return 0;
+}
+
+static int find_map_entry_by_key(msgpack_object_map *map,
+ char *key,
+ size_t match_index,
+ int case_insensitive)
+{
+ size_t match_count;
+ int result;
+ int index;
+
+ match_count = 0;
+
+ for (index = 0 ; index < (int) map->size ; index++) {
+ if (map->ptr[index].key.type == MSGPACK_OBJECT_STR) {
+ if (case_insensitive) {
+ result = strncasecmp(map->ptr[index].key.via.str.ptr,
+ key,
+ map->ptr[index].key.via.str.size);
+ }
+ else {
+ result = strncmp(map->ptr[index].key.via.str.ptr,
+ key,
+ map->ptr[index].key.via.str.size);
+ }
+
+ if (result == 0) {
+ if (match_count == match_index) {
+ return index;
+ }
+
+ match_count++;
+ }
+ }
+ }
+
+ return -1;
+}
+
+static int json_payload_get_wrapped_value(msgpack_object *wrapper,
+ msgpack_object **value,
+ int *type)
+{
+ int internal_type;
+ msgpack_object *kv_value;
+ msgpack_object_str *kv_key;
+ msgpack_object_map *map;
+
+ if (wrapper->type != MSGPACK_OBJECT_MAP) {
+ return -1;
+ }
+
+ map = &wrapper->via.map;
+ kv_value = NULL;
+ internal_type = -1;
+
+ if (map->size == 1) {
+ if (map->ptr[0].key.type == MSGPACK_OBJECT_STR) {
+ kv_value = &map->ptr[0].val;
+ kv_key = &map->ptr[0].key.via.str;
+
+ if (strncasecmp(kv_key->ptr, "stringValue", kv_key->size) == 0 ||
+ strncasecmp(kv_key->ptr, "string_value", kv_key->size) == 0) {
+ internal_type = MSGPACK_OBJECT_STR;
+ }
+ else if (strncasecmp(kv_key->ptr, "boolValue", kv_key->size) == 0 ||
+ strncasecmp(kv_key->ptr, "bool_value", kv_key->size) == 0) {
+ internal_type = MSGPACK_OBJECT_BOOLEAN;
+ }
+ else if (strncasecmp(kv_key->ptr, "intValue", kv_key->size) == 0 ||
+ strncasecmp(kv_key->ptr, "int_value", kv_key->size) == 0) {
+ internal_type = MSGPACK_OBJECT_POSITIVE_INTEGER;
+ }
+ else if (strncasecmp(kv_key->ptr, "doubleValue", kv_key->size) == 0 ||
+ strncasecmp(kv_key->ptr, "double_value", kv_key->size) == 0) {
+ internal_type = MSGPACK_OBJECT_FLOAT;
+ }
+ else if (strncasecmp(kv_key->ptr, "bytesValue", kv_key->size) == 0 ||
+ strncasecmp(kv_key->ptr, "bytes_value", kv_key->size) == 0) {
+ internal_type = MSGPACK_OBJECT_BIN;
+ }
+ else if (strncasecmp(kv_key->ptr, "arrayValue", kv_key->size) == 0 ||
+ strncasecmp(kv_key->ptr, "array_value", kv_key->size) == 0) {
+ internal_type = MSGPACK_OBJECT_ARRAY;
+ }
+ else if (strncasecmp(kv_key->ptr, "kvlistValue", kv_key->size) == 0 ||
+ strncasecmp(kv_key->ptr, "kvlist_value", kv_key->size) == 0) {
+ internal_type = MSGPACK_OBJECT_MAP;
+ }
+ }
+ }
+
+ if (internal_type != -1) {
+ if (type != NULL) {
+ *type = internal_type;
+ }
+
+ if (value != NULL) {
+ *value = kv_value;
+ }
+
+ if (kv_value->type == MSGPACK_OBJECT_MAP) {
+ map = &kv_value->via.map;
+
+ if (map->size == 1) {
+ kv_value = &map->ptr[0].val;
+ kv_key = &map->ptr[0].key.via.str;
+
+ if (strncasecmp(kv_key->ptr, "values", kv_key->size) == 0) {
+ if (value != NULL) {
+ *value = kv_value;
+ }
+ }
+ else {
+ return -3;
+ }
+ }
+ }
+ }
+ else {
+ return -2;
+ }
+
+ return 0;
+}
+
+static int json_payload_append_unwrapped_value(
+ struct flb_log_event_encoder *encoder,
+ int target_field,
+ msgpack_object *object,
+ int *encoder_result)
+{
+ char temporary_buffer[33];
+ int unwrap_value;
+ int result;
+ msgpack_object *value;
+ int type;
+
+ result = json_payload_get_wrapped_value(object,
+ &value,
+ &type);
+
+ if (result == 0) {
+ unwrap_value = FLB_FALSE;
+
+ if (type == MSGPACK_OBJECT_STR) {
+ unwrap_value = FLB_TRUE;
+ }
+ else if (type == MSGPACK_OBJECT_BOOLEAN) {
+ unwrap_value = FLB_TRUE;
+ }
+ else if (type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
+ if (value->type == MSGPACK_OBJECT_STR) {
+ memset(temporary_buffer, 0, sizeof(temporary_buffer));
+
+ if (value->via.str.size < sizeof(temporary_buffer)) {
+ strncpy(temporary_buffer,
+ value->via.str.ptr,
+ value->via.str.size);
+ }
+ else {
+ strncpy(temporary_buffer,
+ value->via.str.ptr,
+ sizeof(temporary_buffer) - 1);
+ }
+
+ result = flb_log_event_encoder_append_int64(
+ encoder,
+ target_field,
+ strtoll(temporary_buffer, NULL, 10));
+ }
+ else {
+ unwrap_value = FLB_TRUE;
+ }
+ }
+ else if (type == MSGPACK_OBJECT_FLOAT) {
+ unwrap_value = FLB_TRUE;
+ }
+ else if (type == MSGPACK_OBJECT_BIN) {
+ unwrap_value = FLB_TRUE;
+ }
+ else if (type == MSGPACK_OBJECT_ARRAY) {
+ result = json_payload_append_converted_array(encoder,
+ target_field,
+ value);
+ }
+ else if (type == MSGPACK_OBJECT_MAP) {
+ result = json_payload_append_converted_kvlist(encoder,
+ target_field,
+ value);
+ }
+ else {
+ return -2;
+ }
+
+ if (unwrap_value) {
+ result = json_payload_append_converted_value(encoder,
+ target_field,
+ value);
+ }
+
+ *encoder_result = result;
+
+ return 0;
+ }
+ else {
+ return -1;
+ }
+
+ return -1;
+}
+
+
+static int json_payload_append_converted_map(
+ struct flb_log_event_encoder *encoder,
+ int target_field,
+ msgpack_object *object)
+{
+ int encoder_result;
+ int result;
+ size_t index;
+ msgpack_object_map *map;
+
+ map = &object->via.map;
+
+ result = json_payload_append_unwrapped_value(
+ encoder,
+ target_field,
+ object,
+ &encoder_result);
+
+ if (result == 0 && encoder_result == FLB_EVENT_ENCODER_SUCCESS) {
+ return result;
+ }
+
+ result = flb_log_event_encoder_begin_map(encoder, target_field);
+
+ for (index = 0 ;
+ index < map->size &&
+ result == FLB_EVENT_ENCODER_SUCCESS;
+ index++) {
+ result = json_payload_append_converted_value(
+ encoder,
+ target_field,
+ &map->ptr[index].key);
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = json_payload_append_converted_value(
+ encoder,
+ target_field,
+ &map->ptr[index].val);
+ }
+ }
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_commit_map(encoder, target_field);
+ }
+ else {
+ flb_log_event_encoder_rollback_map(encoder, target_field);
+ }
+
+ return result;
+}
+
+static int json_payload_append_converted_array(
+ struct flb_log_event_encoder *encoder,
+ int target_field,
+ msgpack_object *object)
+{
+ int result;
+ size_t index;
+ msgpack_object_array *array;
+
+ array = &object->via.array;
+
+ result = flb_log_event_encoder_begin_array(encoder, target_field);
+
+ for (index = 0 ;
+ index < array->size &&
+ result == FLB_EVENT_ENCODER_SUCCESS;
+ index++) {
+ result = json_payload_append_converted_value(
+ encoder,
+ target_field,
+ &array->ptr[index]);
+ }
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_commit_array(encoder, target_field);
+ }
+ else {
+ flb_log_event_encoder_rollback_array(encoder, target_field);
+ }
+
+ return result;
+}
+
+static int json_payload_append_converted_kvlist(
+ struct flb_log_event_encoder *encoder,
+ int target_field,
+ msgpack_object *object)
+{
+ int value_index;
+ int key_index;
+ int result;
+ size_t index;
+ msgpack_object_array *array;
+ msgpack_object_map *entry;
+
+ array = &object->via.array;
+
+ result = flb_log_event_encoder_begin_map(encoder, target_field);
+
+ for (index = 0 ;
+ index < array->size &&
+ result == FLB_EVENT_ENCODER_SUCCESS;
+ index++) {
+
+ if (array->ptr[index].type != MSGPACK_OBJECT_MAP) {
+ result = FLB_EVENT_ENCODER_ERROR_INVALID_ARGUMENT;
+ }
+ else {
+ entry = &array->ptr[index].via.map;
+
+ key_index = find_map_entry_by_key(entry, "key", 0, FLB_TRUE);
+
+ if (key_index == -1) {
+ result = FLB_EVENT_ENCODER_ERROR_INVALID_ARGUMENT;
+ }
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ value_index = find_map_entry_by_key(entry, "value", 0, FLB_TRUE);
+ }
+
+ if (value_index == -1) {
+ result = FLB_EVENT_ENCODER_ERROR_INVALID_ARGUMENT;
+ }
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = json_payload_append_converted_value(
+ encoder,
+ target_field,
+ &entry->ptr[key_index].val);
+ }
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = json_payload_append_converted_value(
+ encoder,
+ target_field,
+ &entry->ptr[value_index].val);
+ }
+ }
+ }
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_commit_map(encoder, target_field);
+ }
+ else {
+ flb_log_event_encoder_rollback_map(encoder, target_field);
+ }
+
+ return result;
+}
+
+static int json_payload_append_converted_value(
+ struct flb_log_event_encoder *encoder,
+ int target_field,
+ msgpack_object *object)
+{
+ int result;
+
+ result = FLB_EVENT_ENCODER_SUCCESS;
+
+ switch (object->type) {
+ case MSGPACK_OBJECT_BOOLEAN:
+ result = flb_log_event_encoder_append_boolean(
+ encoder,
+ target_field,
+ object->via.boolean);
+ break;
+
+ case MSGPACK_OBJECT_POSITIVE_INTEGER:
+ result = flb_log_event_encoder_append_uint64(
+ encoder,
+ target_field,
+ object->via.u64);
+ break;
+ case MSGPACK_OBJECT_NEGATIVE_INTEGER:
+ result = flb_log_event_encoder_append_int64(
+ encoder,
+ target_field,
+ object->via.i64);
+ break;
+
+ case MSGPACK_OBJECT_FLOAT32:
+ case MSGPACK_OBJECT_FLOAT64:
+ result = flb_log_event_encoder_append_double(
+ encoder,
+ target_field,
+ object->via.f64);
+ break;
+
+ case MSGPACK_OBJECT_STR:
+ result = flb_log_event_encoder_append_string(
+ encoder,
+ target_field,
+ (char *) object->via.str.ptr,
+ object->via.str.size);
+
+ break;
+
+ case MSGPACK_OBJECT_BIN:
+ result = flb_log_event_encoder_append_binary(
+ encoder,
+ target_field,
+ (char *) object->via.bin.ptr,
+ object->via.bin.size);
+ break;
+
+ case MSGPACK_OBJECT_ARRAY:
+ result = json_payload_append_converted_array(
+ encoder,
+ target_field,
+ object);
+ break;
+
+ case MSGPACK_OBJECT_MAP:
+ result = json_payload_append_converted_map(
+ encoder,
+ target_field,
+ object);
+
+ break;
+
+ default:
+ break;
+ }
+
+ return result;
+}
+
+static int process_json_payload_log_records_entry(
+ struct flb_opentelemetry *ctx,
+ struct flb_log_event_encoder *encoder,
+ msgpack_object *log_records_object)
+{
+ msgpack_object_map *log_records_entry;
+ char timestamp_str[32];
+ msgpack_object *timestamp_object;
+ uint64_t timestamp_uint64;
+ msgpack_object *metadata_object;
+ msgpack_object *body_object;
+ int body_type;
+ struct flb_time timestamp;
+ int result;
+
+ if (log_records_object->type != MSGPACK_OBJECT_MAP) {
+ flb_plg_error(ctx->ins, "unexpected logRecords entry type");
+
+ return -4;
+ }
+
+ log_records_entry = &log_records_object->via.map;
+
+ result = find_map_entry_by_key(log_records_entry, "timeUnixNano", 0, FLB_TRUE);
+
+ if (result == -1) {
+ result = find_map_entry_by_key(log_records_entry, "time_unix_nano", 0, FLB_TRUE);
+ }
+
+ if (result == -1) {
+ result = find_map_entry_by_key(log_records_entry, "observedTimeUnixNano", 0, FLB_TRUE);
+ }
+
+ if (result == -1) {
+ result = find_map_entry_by_key(log_records_entry, "observed_time_unix_nano", 0, FLB_TRUE);
+ }
+
+ if (result == -1) {
+ flb_plg_info(ctx->ins, "neither timeUnixNano nor observedTimeUnixNano found");
+
+ flb_time_get(&timestamp);
+ }
+ else {
+ timestamp_object = &log_records_entry->ptr[result].val;
+
+ if (timestamp_object->type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
+ timestamp_uint64 = timestamp_object->via.u64;
+ }
+ else if (timestamp_object->type == MSGPACK_OBJECT_STR) {
+ memset(timestamp_str, 0, sizeof(timestamp_str));
+
+ if (timestamp_object->via.str.size < sizeof(timestamp_str)) {
+ strncpy(timestamp_str,
+ timestamp_object->via.str.ptr,
+ timestamp_object->via.str.size);
+ }
+ else {
+ strncpy(timestamp_str,
+ timestamp_object->via.str.ptr,
+ sizeof(timestamp_str) - 1);
+ }
+
+ timestamp_uint64 = strtoul(timestamp_str, NULL, 10);
+ }
+ else {
+ flb_plg_error(ctx->ins, "unexpected timeUnixNano type");
+
+ return -4;
+ }
+
+ flb_time_from_uint64(&timestamp, timestamp_uint64);
+ }
+
+
+ result = find_map_entry_by_key(log_records_entry, "attributes", 0, FLB_TRUE);
+
+ if (result == -1) {
+ flb_plg_debug(ctx->ins, "attributes missing");
+
+ metadata_object = NULL;
+ }
+ else {
+ if (log_records_entry->ptr[result].val.type != MSGPACK_OBJECT_ARRAY) {
+ flb_plg_error(ctx->ins, "unexpected attributes type");
+
+ return -4;
+ }
+
+ metadata_object = &log_records_entry->ptr[result].val;
+ }
+
+ result = find_map_entry_by_key(log_records_entry, "body", 0, FLB_TRUE);
+
+ if (result == -1) {
+ flb_plg_info(ctx->ins, "body missing");
+
+ body_object = NULL;
+ }
+ else {
+ if (log_records_entry->ptr[result].val.type != MSGPACK_OBJECT_MAP) {
+ flb_plg_error(ctx->ins, "unexpected body type");
+
+ return -4;
+ }
+
+ body_object = &log_records_entry->ptr[result].val;
+ }
+
+ result = flb_log_event_encoder_begin_record(encoder);
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_set_timestamp(encoder, &timestamp);
+ }
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS &&
+ metadata_object != NULL) {
+ flb_log_event_encoder_dynamic_field_reset(&encoder->metadata);
+
+ result = json_payload_append_converted_kvlist(
+ encoder,
+ FLB_LOG_EVENT_METADATA,
+ metadata_object);
+ }
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS &&
+ body_object != NULL) {
+ result = json_payload_get_wrapped_value(body_object, NULL, &body_type);
+
+ if (result != 0 || body_type == MSGPACK_OBJECT_MAP) {
+ flb_log_event_encoder_dynamic_field_reset(&encoder->body);
+ }
+ else {
+ flb_log_event_encoder_append_cstring(
+ encoder,
+ FLB_LOG_EVENT_BODY,
+ "log");
+ }
+
+ result = json_payload_append_converted_value(
+ encoder,
+ FLB_LOG_EVENT_BODY,
+ body_object);
+ }
+
+ result = flb_log_event_encoder_dynamic_field_flush(&encoder->body);
+
+ if (result == FLB_EVENT_ENCODER_SUCCESS) {
+ result = flb_log_event_encoder_commit_record(encoder);
+ }
+ else {
+ flb_plg_error(ctx->ins, "log event encoder failure : %d", result);
+
+ flb_log_event_encoder_rollback_record(encoder);
+
+ result = -4;
+ }
+
+ return result;
+}
+
+static int process_json_payload_scope_logs_entry(
+ struct flb_opentelemetry *ctx,
+ struct flb_log_event_encoder *encoder,
+ msgpack_object *scope_logs_object)
+{
+ msgpack_object_map *scope_logs_entry;
+ msgpack_object_array *log_records;
+ int result;
+ size_t index;
+
+ if (scope_logs_object->type != MSGPACK_OBJECT_MAP) {
+ flb_plg_error(ctx->ins, "unexpected scopeLogs entry type");
+
+ return -3;
+ }
+
+ scope_logs_entry = &scope_logs_object->via.map;
+
+ result = find_map_entry_by_key(scope_logs_entry, "logRecords", 0, FLB_TRUE);
+
+ if (result == -1) {
+ result = find_map_entry_by_key(scope_logs_entry, "logRecords", 0, FLB_TRUE);
+
+ if (result == -1) {
+ flb_plg_error(ctx->ins, "scopeLogs missing");
+
+ return -3;
+ }
+ }
+
+ if (scope_logs_entry->ptr[result].val.type != MSGPACK_OBJECT_ARRAY) {
+ flb_plg_error(ctx->ins, "unexpected logRecords type");
+
+ return -3;
+ }
+
+ log_records = &scope_logs_entry->ptr[result].val.via.array;
+
+ result = 0;
+
+ for (index = 0 ; index < log_records->size ; index++) {
+ result = process_json_payload_log_records_entry(
+ ctx,
+ encoder,
+ &log_records->ptr[index]);
+ }
+
+ return result;
+}
+
+
+static int process_json_payload_resource_logs_entry(
+ struct flb_opentelemetry *ctx,
+ struct flb_log_event_encoder *encoder,
+ msgpack_object *resource_logs_object)
+{
+ msgpack_object_map *resource_logs_entry;
+ msgpack_object_array *scope_logs;
+ int result;
+ size_t index;
+
+
+ if (resource_logs_object->type != MSGPACK_OBJECT_MAP) {
+ flb_plg_error(ctx->ins, "unexpected resourceLogs entry type");
+
+ return -2;
+ }
+
+ resource_logs_entry = &resource_logs_object->via.map;
+
+ result = find_map_entry_by_key(resource_logs_entry, "scopeLogs", 0, FLB_TRUE);
+
+ if (result == -1) {
+ result = find_map_entry_by_key(resource_logs_entry, "scope_logs", 0, FLB_TRUE);
+
+ if (result == -1) {
+ flb_plg_error(ctx->ins, "scopeLogs missing");
+
+ return -2;
+ }
+ }
+
+ if (resource_logs_entry->ptr[result].val.type != MSGPACK_OBJECT_ARRAY) {
+ flb_plg_error(ctx->ins, "unexpected scopeLogs type");
+
+ return -2;
+ }
+
+ scope_logs = &resource_logs_entry->ptr[result].val.via.array;
+
+ result = 0;
+
+ for (index = 0 ; index < scope_logs->size ; index++) {
+ result = process_json_payload_scope_logs_entry(
+ ctx,
+ encoder,
+ &scope_logs->ptr[index]);
+ }
+
+ return result;
+}
+
+static int process_json_payload_root(struct flb_opentelemetry *ctx,
+ struct flb_log_event_encoder *encoder,
+ msgpack_object *root_object)
+{
+ msgpack_object_array *resource_logs;
+ int result;
+ size_t index;
+ msgpack_object_map *root;
+
+ if (root_object->type != MSGPACK_OBJECT_MAP) {
+ flb_plg_error(ctx->ins, "unexpected root object type");
+
+ return -1;
+ }
+
+ root = &root_object->via.map;
+
+ result = find_map_entry_by_key(root, "resourceLogs", 0, FLB_TRUE);
+
+ if (result == -1) {
+ result = find_map_entry_by_key(root, "resource_logs", 0, FLB_TRUE);
+
+ if (result == -1) {
+ flb_plg_error(ctx->ins, "resourceLogs missing");
+
+ return -1;
+ }
+ }
+
+ if (root->ptr[result].val.type != MSGPACK_OBJECT_ARRAY) {
+ flb_plg_error(ctx->ins, "unexpected resourceLogs type");
+
+ return -1;
+ }
+
+ resource_logs = &root->ptr[result].val.via.array;
+
+ result = 0;
+
+ for (index = 0 ; index < resource_logs->size ; index++) {
+ result = process_json_payload_resource_logs_entry(
+ ctx,
+ encoder,
+ &resource_logs->ptr[index]);
+ }
+
+ return result;
+}
+
+/* This code is definitely not complete and beyond fishy, it needs to be
+ * refactored.
+ */
+static int json_payload_to_msgpack(struct flb_opentelemetry *ctx,
+ struct flb_log_event_encoder *encoder,
+ const char *body,
+ size_t len)
+{
+ size_t msgpack_body_length;
+ msgpack_unpacked unpacked_root;
+ char *msgpack_body;
+ int root_type;
+ size_t offset;
+ int result;
+
+ result = flb_pack_json(body, len, &msgpack_body, &msgpack_body_length,
+ &root_type, NULL);
+
+ if (result != 0) {
+ flb_plg_error(ctx->ins, "json to msgpack conversion error");
+ }
+ else {
+ msgpack_unpacked_init(&unpacked_root);
+
+ offset = 0;
+ result = msgpack_unpack_next(&unpacked_root,
+ msgpack_body,
+ msgpack_body_length,
+ &offset);
+
+ if (result == MSGPACK_UNPACK_SUCCESS) {
+ result = process_json_payload_root(ctx,
+ encoder,
+ &unpacked_root.data);
+ }
+ else {
+ result = -1;
+ }
+
+ msgpack_unpacked_destroy(&unpacked_root);
+
+ flb_free(msgpack_body);
+ }
+
+ return result;
+}
+
+static int process_payload_logs(struct flb_opentelemetry *ctx, struct http_conn *conn,
+ flb_sds_t tag,
+ struct mk_http_session *session,
+ struct mk_http_request *request)
+{
+ struct flb_log_event_encoder *encoder;
+ int ret;
+
+ encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_FLUENT_BIT_V2);
+
+ if (encoder == NULL) {
+ return -1;
+ }
+
+ /* Check if the incoming payload is a valid JSON message and convert it to msgpack */
+ if (strncasecmp(request->content_type.data,
+ "application/json",
+ request->content_type.len) == 0) {
+ ret = json_payload_to_msgpack(ctx,
+ encoder,
+ request->data.data,
+ request->data.len);
+ }
+ else if (strncasecmp(request->content_type.data,
+ "application/x-protobuf",
+ request->content_type.len) == 0) {
+ ret = binary_payload_to_msgpack(encoder, (uint8_t *) request->data.data, request->data.len);
+ }
+ else {
+ flb_error("[otel] Unsupported content type %.*s", (int)request->content_type.len, request->content_type.data);
+
+ ret = -1;
+ }
+
+ if (ret == 0) {
+ ret = flb_input_log_append(ctx->ins,
+ tag,
+ flb_sds_len(tag),
+ encoder->output_buffer,
+ encoder->output_length);
+ }
+
+ flb_log_event_encoder_destroy(encoder);
+
+ return ret;
+}
+
+static inline int mk_http_point_header(mk_ptr_t *h,
+ struct mk_http_parser *parser, int key)
+{
+ struct mk_http_header *header;
+
+ header = &parser->headers[key];
+ if (header->type == key) {
+ h->data = header->val.data;
+ h->len = header->val.len;
+ return 0;
+ }
+ else {
+ h->data = NULL;
+ h->len = -1;
+ }
+
+ return -1;
+}
+
+static \
+int uncompress_zlib(char **output_buffer,
+ size_t *output_size,
+ char *input_buffer,
+ size_t input_size)
+{
+ flb_error("[opentelemetry] unsupported compression format");
+
+ return -1;
+}
+
+static \
+int uncompress_zstd(char **output_buffer,
+ size_t *output_size,
+ char *input_buffer,
+ size_t input_size)
+{
+ flb_error("[opentelemetry] unsupported compression format");
+
+ return -1;
+}
+
+static \
+int uncompress_deflate(char **output_buffer,
+ size_t *output_size,
+ char *input_buffer,
+ size_t input_size)
+{
+ flb_error("[opentelemetry] unsupported compression format");
+
+ return -1;
+}
+
+static \
+int uncompress_snappy(char **output_buffer,
+ size_t *output_size,
+ char *input_buffer,
+ size_t input_size)
+{
+ int ret;
+
+ ret = flb_snappy_uncompress_framed_data(input_buffer,
+ input_size,
+ output_buffer,
+ output_size);
+
+ if (ret != 0) {
+ flb_error("[opentelemetry] snappy decompression failed");
+
+ return -1;
+ }
+
+ return 1;
+}
+
+static \
+int uncompress_gzip(char **output_buffer,
+ size_t *output_size,
+ char *input_buffer,
+ size_t input_size)
+{
+ int ret;
+
+ ret = flb_gzip_uncompress(input_buffer,
+ input_size,
+ (void *) output_buffer,
+ output_size);
+
+ if (ret == -1) {
+ flb_error("[opentelemetry] gzip decompression failed");
+
+ return -1;
+ }
+
+ return 1;
+}
+
+int opentelemetry_prot_uncompress(struct mk_http_session *session,
+ struct mk_http_request *request,
+ char **output_buffer,
+ size_t *output_size)
+{
+ struct mk_http_header *header;
+ size_t index;
+
+ *output_buffer = NULL;
+ *output_size = 0;
+
+ for (index = 0;
+ index < session->parser.headers_extra_count;
+ index++) {
+ header = &session->parser.headers_extra[index];
+
+ if (strncasecmp(header->key.data, "Content-Encoding", 16) == 0) {
+ if (strncasecmp(header->val.data, "gzip", 4) == 0) {
+ return uncompress_gzip(output_buffer,
+ output_size,
+ request->data.data,
+ request->data.len);
+ }
+ else if (strncasecmp(header->val.data, "zlib", 4) == 0) {
+ return uncompress_zlib(output_buffer,
+ output_size,
+ request->data.data,
+ request->data.len);
+ }
+ else if (strncasecmp(header->val.data, "zstd", 4) == 0) {
+ return uncompress_zstd(output_buffer,
+ output_size,
+ request->data.data,
+ request->data.len);
+ }
+ else if (strncasecmp(header->val.data, "snappy", 6) == 0) {
+ return uncompress_snappy(output_buffer,
+ output_size,
+ request->data.data,
+ request->data.len);
+ }
+ else if (strncasecmp(header->val.data, "deflate", 4) == 0) {
+ return uncompress_deflate(output_buffer,
+ output_size,
+ request->data.data,
+ request->data.len);
+ }
+ else {
+ return -2;
+ }
+ }
+ }
+
+ return 0;
+}
+
+
+/*
+ * Handle an incoming request. It perform extra checks over the request, if
+ * everything is OK, it enqueue the incoming payload.
+ */
+int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *conn,
+ struct mk_http_session *session,
+ struct mk_http_request *request)
+{
+ int i;
+ int ret = -1;
+ int len;
+ char *uri;
+ char *qs;
+ off_t diff;
+ flb_sds_t tag;
+ struct mk_http_header *header;
+ char *original_data;
+ size_t original_data_size;
+ char *uncompressed_data;
+ size_t uncompressed_data_size;
+
+ if (request->uri.data[0] != '/') {
+ send_response(conn, 400, "error: invalid request\n");
+ return -1;
+ }
+
+ /* Decode URI */
+ uri = mk_utils_url_decode(request->uri);
+ if (!uri) {
+ uri = mk_mem_alloc_z(request->uri.len + 1);
+ if (!uri) {
+ return -1;
+ }
+ memcpy(uri, request->uri.data, request->uri.len);
+ uri[request->uri.len] = '\0';
+ }
+
+ if (strcmp(uri, "/v1/metrics") != 0 &&
+ strcmp(uri, "/v1/traces") != 0 &&
+ strcmp(uri, "/v1/logs") != 0) {
+
+ send_response(conn, 400, "error: invalid endpoint\n");
+ mk_mem_free(uri);
+
+ return -1;
+ }
+
+ /* Try to match a query string so we can remove it */
+ qs = strchr(uri, '?');
+ if (qs) {
+ /* remove the query string part */
+ diff = qs - uri;
+ uri[diff] = '\0';
+ }
+
+ /* Compose the query string using the URI */
+ len = strlen(uri);
+
+ if (len == 1) {
+ tag = NULL; /* use default tag */
+ }
+ else {
+ tag = flb_sds_create_size(len);
+ if (!tag) {
+ mk_mem_free(uri);
+ return -1;
+ }
+
+ /* New tag skipping the URI '/' */
+ flb_sds_cat(tag, uri + 1, len - 1);
+
+ /* Sanitize, only allow alphanum chars */
+ for (i = 0; i < flb_sds_len(tag); i++) {
+ if (!isalnum(tag[i]) && tag[i] != '_' && tag[i] != '.') {
+ tag[i] = '_';
+ }
+ }
+ }
+
+ /* Check if we have a Host header: Hostname ; port */
+ mk_http_point_header(&request->host, &session->parser, MK_HEADER_HOST);
+
+ /* Header: Connection */
+ mk_http_point_header(&request->connection, &session->parser,
+ MK_HEADER_CONNECTION);
+
+ /* HTTP/1.1 needs Host header */
+ if (!request->host.data && request->protocol == MK_HTTP_PROTOCOL_11) {
+ flb_sds_destroy(tag);
+ mk_mem_free(uri);
+ return -1;
+ }
+
+ /* Should we close the session after this request ? */
+ mk_http_keepalive_check(session, request, ctx->server);
+
+ /* Content Length */
+ header = &session->parser.headers[MK_HEADER_CONTENT_LENGTH];
+ if (header->type == MK_HEADER_CONTENT_LENGTH) {
+ request->_content_length.data = header->val.data;
+ request->_content_length.len = header->val.len;
+ }
+ else {
+ request->_content_length.data = NULL;
+ }
+
+ mk_http_point_header(&request->content_type, &session->parser, MK_HEADER_CONTENT_TYPE);
+
+ if (request->method != MK_METHOD_POST) {
+ flb_sds_destroy(tag);
+ mk_mem_free(uri);
+ send_response(conn, 400, "error: invalid HTTP method\n");
+ return -1;
+ }
+
+ original_data = request->data.data;
+ original_data_size = request->data.len;
+
+ ret = opentelemetry_prot_uncompress(session, request,
+ &uncompressed_data,
+ &uncompressed_data_size);
+
+ if (ret > 0) {
+ request->data.data = uncompressed_data;
+ request->data.len = uncompressed_data_size;
+ }
+
+ if (strcmp(uri, "/v1/metrics") == 0) {
+ ret = process_payload_metrics(ctx, conn, tag, session, request);
+ }
+ else if (strcmp(uri, "/v1/traces") == 0) {
+ ret = process_payload_traces(ctx, conn, tag, session, request);
+ }
+ else if (strcmp(uri, "/v1/logs") == 0) {
+ ret = process_payload_logs(ctx, conn, tag, session, request);
+ }
+
+ if (uncompressed_data != NULL) {
+ flb_free(uncompressed_data);
+ }
+
+ request->data.data = original_data;
+ request->data.len = original_data_size;
+
+ mk_mem_free(uri);
+ flb_sds_destroy(tag);
+
+ send_response(conn, ctx->successful_response_code, NULL);
+
+ return ret;
+}
+
+/*
+ * Handle an incoming request which has resulted in an http parser error.
+ */
+int opentelemetry_prot_handle_error(struct flb_opentelemetry *ctx, struct http_conn *conn,
+ struct mk_http_session *session,
+ struct mk_http_request *request)
+{
+ send_response(conn, 400, "error: invalid request\n");
+ return -1;
+}
diff --git a/fluent-bit/plugins/in_opentelemetry/opentelemetry_prot.h b/fluent-bit/plugins/in_opentelemetry/opentelemetry_prot.h
new file mode 100644
index 000000000..bbfd8332f
--- /dev/null
+++ b/fluent-bit/plugins/in_opentelemetry/opentelemetry_prot.h
@@ -0,0 +1,31 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_IN_OPENTELEMETRY_PROT
+#define FLB_IN_OPENTELEMETRY_PROT
+
+int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *conn,
+ struct mk_http_session *session,
+ struct mk_http_request *request);
+
+int opentelemetry_prot_handle_error(struct flb_opentelemetry *ctx, struct http_conn *conn,
+ struct mk_http_session *session,
+ struct mk_http_request *request);
+
+#endif