diff options
Diffstat (limited to 'fluent-bit/plugins/in_opentelemetry')
-rw-r--r-- | fluent-bit/plugins/in_opentelemetry/CMakeLists.txt | 12 | ||||
-rw-r--r-- | fluent-bit/plugins/in_opentelemetry/http_conn.c | 301 | ||||
-rw-r--r-- | fluent-bit/plugins/in_opentelemetry/http_conn.h | 57 | ||||
-rw-r--r-- | fluent-bit/plugins/in_opentelemetry/opentelemetry.c | 200 | ||||
-rw-r--r-- | fluent-bit/plugins/in_opentelemetry/opentelemetry.h | 51 | ||||
-rw-r--r-- | fluent-bit/plugins/in_opentelemetry/opentelemetry_config.c | 92 | ||||
-rw-r--r-- | fluent-bit/plugins/in_opentelemetry/opentelemetry_config.h | 29 | ||||
-rw-r--r-- | fluent-bit/plugins/in_opentelemetry/opentelemetry_prot.c | 1674 | ||||
-rw-r--r-- | fluent-bit/plugins/in_opentelemetry/opentelemetry_prot.h | 31 |
9 files changed, 2447 insertions, 0 deletions
diff --git a/fluent-bit/plugins/in_opentelemetry/CMakeLists.txt b/fluent-bit/plugins/in_opentelemetry/CMakeLists.txt new file mode 100644 index 000000000..4c3d6db32 --- /dev/null +++ b/fluent-bit/plugins/in_opentelemetry/CMakeLists.txt @@ -0,0 +1,12 @@ +if(NOT FLB_METRICS) + message(FATAL_ERROR "OpenTelemetry input plugin requires FLB_HTTP_SERVER=On.") +endif() + +set(src + http_conn.c + opentelemetry.c + opentelemetry_prot.c + opentelemetry_config.c + ) + +FLB_PLUGIN(in_opentelemetry "${src}" "monkey-core-static") diff --git a/fluent-bit/plugins/in_opentelemetry/http_conn.c b/fluent-bit/plugins/in_opentelemetry/http_conn.c new file mode 100644 index 000000000..a402295b1 --- /dev/null +++ b/fluent-bit/plugins/in_opentelemetry/http_conn.c @@ -0,0 +1,301 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_engine.h> +#include <fluent-bit/flb_downstream.h> + +#include "opentelemetry.h" +#include "http_conn.h" +#include "opentelemetry_prot.h" + +static void opentelemetry_conn_request_init(struct mk_http_session *session, + struct mk_http_request *request); + +static int opentelemetry_conn_event(void *data) +{ + int status; + size_t size; + ssize_t available; + ssize_t bytes; + char *tmp; + char *request_end; + size_t request_len; + struct http_conn *conn; + struct mk_event *event; + struct flb_opentelemetry *ctx; + struct flb_connection *connection; + + connection = (struct flb_connection *) data; + + conn = connection->user_data; + + ctx = conn->ctx; + + event = &connection->event; + + if (event->mask & MK_EVENT_READ) { + available = (conn->buf_size - conn->buf_len) - 1; + if (available < 1) { + if (conn->buf_size + ctx->buffer_chunk_size > ctx->buffer_max_size) { + flb_plg_trace(ctx->ins, + "fd=%i incoming data exceed limit (%zu KB)", + event->fd, (ctx->buffer_max_size / 1024)); + opentelemetry_conn_del(conn); + return -1; + } + + size = conn->buf_size + ctx->buffer_chunk_size; + tmp = flb_realloc(conn->buf_data, size); + if (!tmp) { + flb_errno(); + return -1; + } + flb_plg_trace(ctx->ins, "fd=%i buffer realloc %i -> %zu", + event->fd, conn->buf_size, size); + + conn->buf_data = tmp; + conn->buf_size = size; + available = (conn->buf_size - conn->buf_len) - 1; + } + + /* Read data */ + bytes = flb_io_net_read(connection, + (void *) &conn->buf_data[conn->buf_len], + available); + + if (bytes <= 0) { + flb_plg_trace(ctx->ins, "fd=%i closed connection", event->fd); + opentelemetry_conn_del(conn); + return -1; + } + + flb_plg_trace(ctx->ins, "read()=%zi pre_len=%i now_len=%zi", + bytes, conn->buf_len, conn->buf_len + bytes); + conn->buf_len += bytes; + conn->buf_data[conn->buf_len] = '\0'; + + status = mk_http_parser(&conn->request, &conn->session.parser, + conn->buf_data, conn->buf_len, conn->session.server); + + if (status == MK_HTTP_PARSER_OK) { + /* Do more logic parsing and checks for this request */ + opentelemetry_prot_handle(ctx, conn, &conn->session, &conn->request); + + /* Evict the processed request from the connection buffer and reinitialize + * the HTTP parser. + */ + + request_end = NULL; + + if (NULL != conn->request.data.data) { + request_end = &conn->request.data.data[conn->request.data.len]; + } + else { + request_end = strstr(conn->buf_data, "\r\n\r\n"); + + if(NULL != request_end) { + request_end = &request_end[4]; + } + } + + if (NULL != request_end) { + request_len = (size_t)(request_end - conn->buf_data); + + if (0 < (conn->buf_len - request_len)) { + memmove(conn->buf_data, &conn->buf_data[request_len], + conn->buf_len - request_len); + + conn->buf_data[conn->buf_len - request_len] = '\0'; + conn->buf_len -= request_len; + } + else { + memset(conn->buf_data, 0, request_len); + + conn->buf_len = 0; + } + + /* Reinitialize the parser so the next request is properly + * handled, the additional memset intends to wipe any left over data + * from the headers parsed in the previous request. + */ + memset(&conn->session.parser, 0, sizeof(struct mk_http_parser)); + mk_http_parser_init(&conn->session.parser); + opentelemetry_conn_request_init(&conn->session, &conn->request); + } + } + else if (status == MK_HTTP_PARSER_ERROR) { + opentelemetry_prot_handle_error(ctx, conn, &conn->session, &conn->request); + + /* Reinitialize the parser so the next request is properly + * handled, the additional memset intends to wipe any left over data + * from the headers parsed in the previous request. + */ + memset(&conn->session.parser, 0, sizeof(struct mk_http_parser)); + mk_http_parser_init(&conn->session.parser); + opentelemetry_conn_request_init(&conn->session, &conn->request); + } + + /* FIXME: add Protocol handler here */ + return bytes; + } + + if (event->mask & MK_EVENT_CLOSE) { + flb_plg_trace(ctx->ins, "fd=%i hangup", event->fd); + opentelemetry_conn_del(conn); + return -1; + } + + return 0; + +} + +static void opentelemetry_conn_session_init(struct mk_http_session *session, + struct mk_server *server, + int client_fd) +{ + /* Alloc memory for node */ + session->_sched_init = MK_TRUE; + session->pipelined = MK_FALSE; + session->counter_connections = 0; + session->close_now = MK_FALSE; + session->status = MK_REQUEST_STATUS_INCOMPLETE; + session->server = server; + session->socket = client_fd; + + /* creation time in unix time */ + session->init_time = time(NULL); + + session->channel = mk_channel_new(MK_CHANNEL_SOCKET, session->socket); + session->channel->io = session->server->network; + + /* Init session request list */ + mk_list_init(&session->request_list); + + /* Initialize the parser */ + mk_http_parser_init(&session->parser); +} + +static void opentelemetry_conn_request_init(struct mk_http_session *session, + struct mk_http_request *request) +{ + memset(request, 0, sizeof(struct mk_http_request)); + + mk_http_request_init(session, request, session->server); + + request->in_headers.type = MK_STREAM_IOV; + request->in_headers.dynamic = MK_FALSE; + request->in_headers.cb_consumed = NULL; + request->in_headers.cb_finished = NULL; + request->in_headers.stream = &request->stream; + + mk_list_add(&request->in_headers._head, &request->stream.inputs); + + request->session = session; +} + +struct http_conn *opentelemetry_conn_add(struct flb_connection *connection, + struct flb_opentelemetry *ctx) +{ + struct http_conn *conn; + int ret; + + conn = flb_calloc(1, sizeof(struct http_conn)); + if (!conn) { + flb_errno(); + return NULL; + } + conn->connection = connection; + + /* Set data for the event-loop */ + MK_EVENT_NEW(&connection->event); + + connection->user_data = conn; + connection->event.type = FLB_ENGINE_EV_CUSTOM; + connection->event.handler = opentelemetry_conn_event; + + /* Connection info */ + conn->ctx = ctx; + conn->buf_len = 0; + + conn->buf_data = flb_malloc(ctx->buffer_chunk_size); + if (!conn->buf_data) { + flb_errno(); + flb_plg_error(ctx->ins, "could not allocate new connection"); + flb_free(conn); + return NULL; + } + conn->buf_size = ctx->buffer_chunk_size; + + /* Register instance into the event loop */ + ret = mk_event_add(flb_engine_evl_get(), + connection->fd, + FLB_ENGINE_EV_CUSTOM, + MK_EVENT_READ, + &connection->event); + if (ret == -1) { + flb_plg_error(ctx->ins, "could not register new connection"); + flb_free(conn->buf_data); + flb_free(conn); + return NULL; + } + + /* Initialize HTTP Session: this is a custom context for Monkey HTTP */ + opentelemetry_conn_session_init(&conn->session, ctx->server, connection->fd); + + /* Initialize HTTP Request: this is the initial request and it will be reinitialized + * automatically after the request is handled so it can be used for the next one. + */ + opentelemetry_conn_request_init(&conn->session, &conn->request); + + /* Link connection node to parent context list */ + mk_list_add(&conn->_head, &ctx->connections); + return conn; +} + +int opentelemetry_conn_del(struct http_conn *conn) +{ + if (conn->session.channel != NULL) { + mk_channel_release(conn->session.channel); + } + + /* The downstream unregisters the file descriptor from the event-loop + * so there's nothing to be done by the plugin + */ + flb_downstream_conn_release(conn->connection); + + mk_list_del(&conn->_head); + + flb_free(conn->buf_data); + flb_free(conn); + + return 0; +} + +void opentelemetry_conn_release_all(struct flb_opentelemetry *ctx) +{ + struct mk_list *tmp; + struct mk_list *head; + struct http_conn *conn; + + mk_list_foreach_safe(head, tmp, &ctx->connections) { + conn = mk_list_entry(head, struct http_conn, _head); + opentelemetry_conn_del(conn); + } +} diff --git a/fluent-bit/plugins/in_opentelemetry/http_conn.h b/fluent-bit/plugins/in_opentelemetry/http_conn.h new file mode 100644 index 000000000..60627d860 --- /dev/null +++ b/fluent-bit/plugins/in_opentelemetry/http_conn.h @@ -0,0 +1,57 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_HTTP_CONN +#define FLB_IN_HTTP_CONN + +#include <fluent-bit/flb_input_plugin.h> +#include <monkey/mk_http.h> +#include <monkey/mk_http_parser.h> +#include <monkey/mk_utils.h> + +#include "opentelemetry.h" + +struct http_conn { + struct mk_event event; /* Built-in event data for mk_events */ + + /* Buffer */ + char *buf_data; /* Buffer data */ + int buf_len; /* Data length */ + int buf_size; /* Buffer size */ + + /* + * Parser context: we only held one parser per connection + * which is re-used everytime we have a new request. + */ + struct mk_http_parser parser; + struct mk_http_request request; + struct mk_http_session session; + struct flb_connection *connection; + + void *ctx; /* Plugin parent context */ + struct mk_list _head; /* link to flb_opentelemetry->connections */ +}; + +struct http_conn *opentelemetry_conn_add(struct flb_connection *connection, + struct flb_opentelemetry *ctx); +int opentelemetry_conn_del(struct http_conn *conn); +void opentelemetry_conn_release_all(struct flb_opentelemetry *ctx); + + +#endif diff --git a/fluent-bit/plugins/in_opentelemetry/opentelemetry.c b/fluent-bit/plugins/in_opentelemetry/opentelemetry.c new file mode 100644 index 000000000..5cd26f8e6 --- /dev/null +++ b/fluent-bit/plugins/in_opentelemetry/opentelemetry.c @@ -0,0 +1,200 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_downstream.h> +#include <fluent-bit/flb_network.h> +#include <fluent-bit/flb_config.h> + +#include "http_conn.h" +#include "opentelemetry.h" +#include "opentelemetry_config.h" + +/* + * For a server event, the collection event means a new client have arrived, we + * accept the connection and create a new TCP instance which will wait for + * JSON map messages. + */ +static int in_opentelemetry_collect(struct flb_input_instance *ins, + struct flb_config *config, void *in_context) +{ + struct flb_connection *connection; + struct http_conn *conn; + struct flb_opentelemetry *ctx; + + ctx = in_context; + + connection = flb_downstream_conn_get(ctx->downstream); + + if (connection == NULL) { + flb_plg_error(ctx->ins, "could not accept new connection"); + + return -1; + } + + flb_plg_trace(ctx->ins, "new TCP connection arrived FD=%i", connection->fd); + + conn = opentelemetry_conn_add(connection, ctx); + + if (conn == NULL) { + return -1; + } + + return 0; +} + +static int in_opentelemetry_init(struct flb_input_instance *ins, + struct flb_config *config, void *data) +{ + unsigned short int port; + int ret; + struct flb_opentelemetry *ctx; + + (void) data; + + /* Create context and basic conf */ + ctx = opentelemetry_config_create(ins); + if (!ctx) { + return -1; + } + ctx->collector_id = -1; + + /* Populate context with config map defaults and incoming properties */ + ret = flb_input_config_map_set(ins, (void *) ctx); + if (ret == -1) { + flb_plg_error(ctx->ins, "configuration error"); + opentelemetry_config_destroy(ctx); + return -1; + } + + /* Set the context */ + flb_input_set_context(ins, ctx); + + port = (unsigned short int) strtoul(ctx->tcp_port, NULL, 10); + + ctx->downstream = flb_downstream_create(FLB_TRANSPORT_TCP, + ins->flags, + ctx->listen, + port, + ins->tls, + config, + &ins->net_setup); + + if (ctx->downstream == NULL) { + flb_plg_error(ctx->ins, + "could not initialize downstream on %s:%s. Aborting", + ctx->listen, ctx->tcp_port); + + opentelemetry_config_destroy(ctx); + + return -1; + } + + flb_input_downstream_set(ctx->downstream, ctx->ins); + + flb_plg_info(ctx->ins, "listening on %s:%s", ctx->listen, ctx->tcp_port); + + if (ctx->successful_response_code != 200 && + ctx->successful_response_code != 201 && + ctx->successful_response_code != 204) { + flb_plg_error(ctx->ins, "%d is not supported response code. Use default 201", + ctx->successful_response_code); + ctx->successful_response_code = 201; + } + + /* Collect upon data available on the standard input */ + ret = flb_input_set_collector_socket(ins, + in_opentelemetry_collect, + ctx->downstream->server_fd, + config); + if (ret == -1) { + flb_plg_error(ctx->ins, "Could not set collector for IN_TCP input plugin"); + opentelemetry_config_destroy(ctx); + return -1; + } + + ctx->collector_id = ret; + + return 0; +} + +static int in_opentelemetry_exit(void *data, struct flb_config *config) +{ + struct flb_opentelemetry *ctx; + + (void) config; + + ctx = data; + + if (ctx != NULL) { + opentelemetry_config_destroy(ctx); + } + + return 0; +} + +/* Configuration properties map */ +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_SIZE, "buffer_max_size", HTTP_BUFFER_MAX_SIZE, + 0, FLB_TRUE, offsetof(struct flb_opentelemetry, buffer_max_size), + "" + }, + + { + FLB_CONFIG_MAP_SIZE, "buffer_chunk_size", HTTP_BUFFER_CHUNK_SIZE, + 0, FLB_TRUE, offsetof(struct flb_opentelemetry, buffer_chunk_size), + "" + }, + + { + FLB_CONFIG_MAP_STR, "tag_key", NULL, + 0, FLB_TRUE, offsetof(struct flb_opentelemetry, tag_key), + "" + }, + { + FLB_CONFIG_MAP_INT, "successful_response_code", "201", + 0, FLB_TRUE, offsetof(struct flb_opentelemetry, successful_response_code), + "Set successful response code. 200, 201 and 204 are supported." + }, + { + FLB_CONFIG_MAP_BOOL, "raw_traces", "false", + 0, FLB_TRUE, offsetof(struct flb_opentelemetry, raw_traces), + "Forward traces without processing" + }, + + /* EOF */ + {0} +}; + +/* Plugin reference */ +struct flb_input_plugin in_opentelemetry_plugin = { + .name = "opentelemetry", + .description = "OpenTelemetry", + .cb_init = in_opentelemetry_init, + .cb_pre_run = NULL, + .cb_collect = in_opentelemetry_collect, + .cb_flush_buf = NULL, + .cb_pause = NULL, + .cb_resume = NULL, + .cb_exit = in_opentelemetry_exit, + .config_map = config_map, + .flags = FLB_INPUT_NET_SERVER | FLB_IO_OPT_TLS +}; diff --git a/fluent-bit/plugins/in_opentelemetry/opentelemetry.h b/fluent-bit/plugins/in_opentelemetry/opentelemetry.h new file mode 100644 index 000000000..512f2ab6f --- /dev/null +++ b/fluent-bit/plugins/in_opentelemetry/opentelemetry.h @@ -0,0 +1,51 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_OPENTELEMETRY_H +#define FLB_IN_OPENTELEMETRY_H + +#include <fluent-bit/flb_config.h> +#include <fluent-bit/flb_input.h> +#include <fluent-bit/flb_utils.h> + +#include <monkey/monkey.h> + +#define HTTP_BUFFER_MAX_SIZE "4M" +#define HTTP_BUFFER_CHUNK_SIZE "512K" + +struct flb_opentelemetry { + int successful_response_code; + flb_sds_t listen; + flb_sds_t tcp_port; + const char *tag_key; + bool raw_traces; + + size_t buffer_max_size; /* Maximum buffer size */ + size_t buffer_chunk_size; /* Chunk allocation size */ + + int collector_id; /* Listener collector id */ + struct flb_downstream *downstream; /* Client manager */ + struct mk_list connections; /* linked list of connections */ + + struct mk_server *server; + struct flb_input_instance *ins; +}; + + +#endif diff --git a/fluent-bit/plugins/in_opentelemetry/opentelemetry_config.c b/fluent-bit/plugins/in_opentelemetry/opentelemetry_config.c new file mode 100644 index 000000000..b57596f94 --- /dev/null +++ b/fluent-bit/plugins/in_opentelemetry/opentelemetry_config.c @@ -0,0 +1,92 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_downstream.h> + +#include "opentelemetry.h" +#include "http_conn.h" + +/* default HTTP port for OTLP/HTTP is 4318 */ +#define OTLP_HTTP_PORT 4318 + +struct flb_opentelemetry *opentelemetry_config_create(struct flb_input_instance *ins) +{ + int ret; + char port[8]; + struct flb_opentelemetry *ctx; + + ctx = flb_calloc(1, sizeof(struct flb_opentelemetry)); + if (!ctx) { + flb_errno(); + return NULL; + } + ctx->ins = ins; + mk_list_init(&ctx->connections); + + /* Load the config map */ + ret = flb_input_config_map_set(ins, (void *) ctx); + if (ret == -1) { + flb_free(ctx); + return NULL; + } + + /* Listen interface (if not set, defaults to 0.0.0.0:4318) */ + flb_input_net_default_listener("0.0.0.0", OTLP_HTTP_PORT, ins); + + ctx->listen = flb_strdup(ins->host.listen); + snprintf(port, sizeof(port) - 1, "%d", ins->host.port); + ctx->tcp_port = flb_strdup(port); + + /* HTTP Server specifics */ + ctx->server = flb_calloc(1, sizeof(struct mk_server)); + ctx->server->keep_alive = MK_TRUE; + + /* monkey detects server->workers == 0 as the server not being initialized at the + * moment so we want to make sure that it stays that way! + */ + + return ctx; +} + +int opentelemetry_config_destroy(struct flb_opentelemetry *ctx) +{ + /* release all connections */ + opentelemetry_conn_release_all(ctx); + + if (ctx->collector_id != -1) { + flb_input_collector_delete(ctx->collector_id, ctx->ins); + + ctx->collector_id = -1; + } + + if (ctx->downstream != NULL) { + flb_downstream_destroy(ctx->downstream); + } + + if (ctx->server) { + flb_free(ctx->server); + } + + flb_free(ctx->listen); + flb_free(ctx->tcp_port); + flb_free(ctx); + + return 0; +} diff --git a/fluent-bit/plugins/in_opentelemetry/opentelemetry_config.h b/fluent-bit/plugins/in_opentelemetry/opentelemetry_config.h new file mode 100644 index 000000000..0d980c7aa --- /dev/null +++ b/fluent-bit/plugins/in_opentelemetry/opentelemetry_config.h @@ -0,0 +1,29 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_OPENTELEMETRY_CONFIG_H +#define FLB_IN_OPENTELEMETRY_CONFIG_H + +#include <fluent-bit/flb_input_plugin.h> +#include "opentelemetry.h" + +struct flb_opentelemetry *opentelemetry_config_create(struct flb_input_instance *ins); +int opentelemetry_config_destroy(struct flb_opentelemetry *ctx); + +#endif diff --git a/fluent-bit/plugins/in_opentelemetry/opentelemetry_prot.c b/fluent-bit/plugins/in_opentelemetry/opentelemetry_prot.c new file mode 100644 index 000000000..c9ccba7f9 --- /dev/null +++ b/fluent-bit/plugins/in_opentelemetry/opentelemetry_prot.c @@ -0,0 +1,1674 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_version.h> +#include <fluent-bit/flb_error.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_time.h> +#include <fluent-bit/flb_gzip.h> +#include <fluent-bit/flb_snappy.h> +#include <fluent-bit/flb_log_event_encoder.h> + +#include <monkey/monkey.h> +#include <monkey/mk_core.h> +#include <cmetrics/cmt_decode_opentelemetry.h> + +#include <fluent-otel-proto/fluent-otel.h> +#include "opentelemetry.h" +#include "http_conn.h" + +#define HTTP_CONTENT_JSON 0 + +static int json_payload_append_converted_value( + struct flb_log_event_encoder *encoder, + int target_field, + msgpack_object *object); + +static int json_payload_append_converted_array( + struct flb_log_event_encoder *encoder, + int target_field, + msgpack_object *object); + +static int json_payload_append_converted_kvlist( + struct flb_log_event_encoder *encoder, + int target_field, + msgpack_object *object); + +static int json_payload_to_msgpack(struct flb_opentelemetry *ctx, + struct flb_log_event_encoder *encoder, + const char *body, + size_t len); + +static int otlp_pack_any_value(msgpack_packer *mp_pck, + Opentelemetry__Proto__Common__V1__AnyValue *body); + +static int send_response(struct http_conn *conn, int http_status, char *message) +{ + int len; + flb_sds_t out; + size_t sent; + + out = flb_sds_create_size(256); + if (!out) { + return -1; + } + + if (message) { + len = strlen(message); + } + else { + len = 0; + } + + if (http_status == 201) { + flb_sds_printf(&out, + "HTTP/1.1 201 Created \r\n" + "Server: Fluent Bit v%s\r\n" + "Content-Length: 0\r\n\r\n", + FLB_VERSION_STR); + } + else if (http_status == 200) { + flb_sds_printf(&out, + "HTTP/1.1 200 OK\r\n" + "Server: Fluent Bit v%s\r\n" + "Content-Length: 0\r\n\r\n", + FLB_VERSION_STR); + } + else if (http_status == 204) { + flb_sds_printf(&out, + "HTTP/1.1 204 No Content\r\n" + "Server: Fluent Bit v%s\r\n" + "\r\n", + FLB_VERSION_STR); + } + else if (http_status == 400) { + flb_sds_printf(&out, + "HTTP/1.1 400 Forbidden\r\n" + "Server: Fluent Bit v%s\r\n" + "Content-Length: %i\r\n\r\n%s", + FLB_VERSION_STR, + len, message); + } + + /* We should check the outcome of this operation */ + flb_io_net_write(conn->connection, + (void *) out, + flb_sds_len(out), + &sent); + + flb_sds_destroy(out); + + return 0; +} + +static int process_payload_metrics(struct flb_opentelemetry *ctx, struct http_conn *conn, + flb_sds_t tag, + struct mk_http_session *session, + struct mk_http_request *request) +{ + struct cfl_list decoded_contexts; + struct cfl_list *iterator; + struct cmt *context; + size_t offset; + int result; + + offset = 0; + + result = cmt_decode_opentelemetry_create(&decoded_contexts, + request->data.data, + request->data.len, + &offset); + + if (result == CMT_DECODE_OPENTELEMETRY_SUCCESS) { + cfl_list_foreach(iterator, &decoded_contexts) { + context = cfl_list_entry(iterator, struct cmt, _head); + + result = flb_input_metrics_append(ctx->ins, NULL, 0, context); + + if (result != 0) { + flb_plg_debug(ctx->ins, "could not ingest metrics context : %d", result); + } + } + + cmt_decode_opentelemetry_destroy(&decoded_contexts); + } + + return 0; +} + +static int process_payload_traces_proto(struct flb_opentelemetry *ctx, struct http_conn *conn, + flb_sds_t tag, + struct mk_http_session *session, + struct mk_http_request *request) +{ + struct ctrace *decoded_context; + size_t offset; + int result; + + offset = 0; + result = ctr_decode_opentelemetry_create(&decoded_context, + request->data.data, + request->data.len, + &offset); + if (result == 0) { + result = flb_input_trace_append(ctx->ins, NULL, 0, decoded_context); + ctr_decode_opentelemetry_destroy(decoded_context); + } + + return result; +} + +static int process_payload_raw_traces(struct flb_opentelemetry *ctx, struct http_conn *conn, + flb_sds_t tag, + struct mk_http_session *session, + struct mk_http_request *request) +{ + int ret; + int root_type; + char *out_buf = NULL; + size_t out_size; + + msgpack_packer mp_pck; + msgpack_sbuffer mp_sbuf; + + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + msgpack_pack_array(&mp_pck, 2); + flb_pack_time_now(&mp_pck); + + /* Check if the incoming payload is a valid JSON message and convert it to msgpack */ + ret = flb_pack_json(request->data.data, request->data.len, + &out_buf, &out_size, &root_type, NULL); + + if (ret == 0 && root_type == JSMN_OBJECT) { + /* JSON found, pack it msgpack representation */ + msgpack_sbuffer_write(&mp_sbuf, out_buf, out_size); + } + else { + /* the content might be a binary payload or invalid JSON */ + msgpack_pack_map(&mp_pck, 1); + msgpack_pack_str_with_body(&mp_pck, "trace", 5); + msgpack_pack_str_with_body(&mp_pck, request->data.data, request->data.len); + } + + /* release 'out_buf' if it was allocated */ + if (out_buf) { + flb_free(out_buf); + } + + flb_input_log_append(ctx->ins, tag, flb_sds_len(tag), mp_sbuf.data, mp_sbuf.size); + msgpack_sbuffer_destroy(&mp_sbuf); + + return 0; +} + +static int process_payload_traces(struct flb_opentelemetry *ctx, struct http_conn *conn, + flb_sds_t tag, + struct mk_http_session *session, + struct mk_http_request *request) +{ + int result; + + if (ctx->raw_traces) { + result = process_payload_raw_traces(ctx, conn, tag, session, request); + } + else { + result = process_payload_traces_proto(ctx, conn, tag, session, request); + } + + return result; +} + +static int otel_pack_string(msgpack_packer *mp_pck, char *str) +{ + return msgpack_pack_str_with_body(mp_pck, str, strlen(str)); +} + +static int otel_pack_bool(msgpack_packer *mp_pck, bool val) +{ + if (val) { + return msgpack_pack_true(mp_pck); + } + else { + return msgpack_pack_false(mp_pck); + } +} + +static int otel_pack_int(msgpack_packer *mp_pck, int val) +{ + return msgpack_pack_int64(mp_pck, val); +} + +static int otel_pack_double(msgpack_packer *mp_pck, double val) +{ + return msgpack_pack_double(mp_pck, val); +} + +static int otel_pack_kvarray(msgpack_packer *mp_pck, + Opentelemetry__Proto__Common__V1__KeyValue **kv_array, + size_t kv_count) +{ + int result; + int index; + + result = msgpack_pack_map(mp_pck, kv_count); + + if (result != 0) { + return result; + } + + for (index = 0; index < kv_count && result == 0; index++) { + result = otel_pack_string(mp_pck, kv_array[index]->key); + + if(result == 0) { + result = otlp_pack_any_value(mp_pck, kv_array[index]->value); + } + } + + return result; +} + +static int otel_pack_kvlist(msgpack_packer *mp_pck, + Opentelemetry__Proto__Common__V1__KeyValueList *kv_list) +{ + int kv_index; + int ret; + char *key; + Opentelemetry__Proto__Common__V1__AnyValue *value; + + ret = msgpack_pack_map(mp_pck, kv_list->n_values); + if (ret != 0) { + return ret; + } + + for (kv_index = 0; kv_index < kv_list->n_values && ret == 0; kv_index++) { + key = kv_list->values[kv_index]->key; + value = kv_list->values[kv_index]->value; + + ret = otel_pack_string(mp_pck, key); + + if(ret == 0) { + ret = otlp_pack_any_value(mp_pck, value); + } + } + + return ret; +} + +static int otel_pack_array(msgpack_packer *mp_pck, + Opentelemetry__Proto__Common__V1__ArrayValue *array) +{ + int ret; + int array_index; + + ret = msgpack_pack_array(mp_pck, array->n_values); + + if (ret != 0) { + return ret; + } + + for (array_index = 0; array_index < array->n_values && ret == 0; array_index++) { + ret = otlp_pack_any_value(mp_pck, array->values[array_index]); + } + + return ret; +} + +static int otel_pack_bytes(msgpack_packer *mp_pck, + ProtobufCBinaryData bytes) +{ + return msgpack_pack_bin_with_body(mp_pck, bytes.data, bytes.len); +} + +static int otlp_pack_any_value(msgpack_packer *mp_pck, + Opentelemetry__Proto__Common__V1__AnyValue *body) +{ + int result; + + result = -2; + + switch(body->value_case){ + case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_STRING_VALUE: + result = otel_pack_string(mp_pck, body->string_value); + break; + + case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_BOOL_VALUE: + result = otel_pack_bool(mp_pck, body->bool_value); + break; + + case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_INT_VALUE: + result = otel_pack_int(mp_pck, body->int_value); + break; + + case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_DOUBLE_VALUE: + result = otel_pack_double(mp_pck, body->double_value); + break; + + case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_ARRAY_VALUE: + result = otel_pack_array(mp_pck, body->array_value); + break; + + case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_KVLIST_VALUE: + result = otel_pack_kvlist(mp_pck, body->kvlist_value); + break; + + case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_BYTES_VALUE: + result = otel_pack_bytes(mp_pck, body->bytes_value); + break; + + default: + break; + } + + if (result == -2) { + flb_error("[otel]: invalid value type in pack_any_value"); + result = -1; + } + + return result; +} + +static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder, + uint8_t *in_buf, + size_t in_size) +{ + int ret; + msgpack_packer packer; + msgpack_sbuffer buffer; + int resource_logs_index; + int scope_log_index; + int log_record_index; + + Opentelemetry__Proto__Collector__Logs__V1__ExportLogsServiceRequest *input_logs; + Opentelemetry__Proto__Logs__V1__ScopeLogs **scope_logs; + Opentelemetry__Proto__Logs__V1__ScopeLogs *scope_log; + Opentelemetry__Proto__Logs__V1__ResourceLogs **resource_logs; + Opentelemetry__Proto__Logs__V1__ResourceLogs *resource_log; + Opentelemetry__Proto__Logs__V1__LogRecord **log_records; + + msgpack_sbuffer_init(&buffer); + msgpack_packer_init(&packer, &buffer, msgpack_sbuffer_write); + + input_logs = opentelemetry__proto__collector__logs__v1__export_logs_service_request__unpack(NULL, in_size, in_buf); + if (input_logs == NULL) { + flb_error("[otel] Failed to unpack input logs"); + return -1; + } + + resource_logs = input_logs->resource_logs; + if (resource_logs == NULL) { + flb_error("[otel] No resource logs found"); + return -1; + } + + for (resource_logs_index = 0; resource_logs_index < input_logs->n_resource_logs; resource_logs_index++) { + resource_log = resource_logs[resource_logs_index]; + scope_logs = resource_log->scope_logs; + + if (resource_log->n_scope_logs > 0 && scope_logs == NULL) { + flb_error("[otel] No scope logs found"); + return -1; + } + + for (scope_log_index = 0; scope_log_index < resource_log->n_scope_logs; scope_log_index++) { + scope_log = scope_logs[scope_log_index]; + log_records = scope_log->log_records; + + if (log_records == NULL) { + flb_error("[otel] No log records found"); + return -1; + } + + for (log_record_index=0; log_record_index < scope_log->n_log_records; log_record_index++) { + ret = flb_log_event_encoder_begin_record(encoder); + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_set_current_timestamp(encoder); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = otel_pack_kvarray( + &packer, + log_records[log_record_index]->attributes, + log_records[log_record_index]->n_attributes); + + if (ret != 0) { + flb_error("[otel] Failed to convert log record attributes"); + + ret = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE; + } + else { + ret = flb_log_event_encoder_set_metadata_from_raw_msgpack( + encoder, + buffer.data, + buffer.size); + } + + msgpack_sbuffer_clear(&buffer); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = otlp_pack_any_value( + &packer, + log_records[log_record_index]->body); + + if (ret != 0) { + flb_error("[otel] Failed to convert log record body"); + + ret = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE; + } + else { + if (log_records[log_record_index]->body->value_case == + OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_KVLIST_VALUE) { + ret = flb_log_event_encoder_set_body_from_raw_msgpack( + encoder, + buffer.data, + buffer.size); + } + else { + ret = flb_log_event_encoder_append_body_values( + encoder, + FLB_LOG_EVENT_CSTRING_VALUE("message"), + FLB_LOG_EVENT_MSGPACK_RAW_VALUE(buffer.data, buffer.size)); + } + } + + msgpack_sbuffer_clear(&buffer); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_commit_record(encoder); + } + else { + flb_error("[otel] marshalling error"); + + msgpack_sbuffer_destroy(&buffer); + + return -1; + } + } + } + } + + msgpack_sbuffer_destroy(&buffer); + + return 0; +} + +static int find_map_entry_by_key(msgpack_object_map *map, + char *key, + size_t match_index, + int case_insensitive) +{ + size_t match_count; + int result; + int index; + + match_count = 0; + + for (index = 0 ; index < (int) map->size ; index++) { + if (map->ptr[index].key.type == MSGPACK_OBJECT_STR) { + if (case_insensitive) { + result = strncasecmp(map->ptr[index].key.via.str.ptr, + key, + map->ptr[index].key.via.str.size); + } + else { + result = strncmp(map->ptr[index].key.via.str.ptr, + key, + map->ptr[index].key.via.str.size); + } + + if (result == 0) { + if (match_count == match_index) { + return index; + } + + match_count++; + } + } + } + + return -1; +} + +static int json_payload_get_wrapped_value(msgpack_object *wrapper, + msgpack_object **value, + int *type) +{ + int internal_type; + msgpack_object *kv_value; + msgpack_object_str *kv_key; + msgpack_object_map *map; + + if (wrapper->type != MSGPACK_OBJECT_MAP) { + return -1; + } + + map = &wrapper->via.map; + kv_value = NULL; + internal_type = -1; + + if (map->size == 1) { + if (map->ptr[0].key.type == MSGPACK_OBJECT_STR) { + kv_value = &map->ptr[0].val; + kv_key = &map->ptr[0].key.via.str; + + if (strncasecmp(kv_key->ptr, "stringValue", kv_key->size) == 0 || + strncasecmp(kv_key->ptr, "string_value", kv_key->size) == 0) { + internal_type = MSGPACK_OBJECT_STR; + } + else if (strncasecmp(kv_key->ptr, "boolValue", kv_key->size) == 0 || + strncasecmp(kv_key->ptr, "bool_value", kv_key->size) == 0) { + internal_type = MSGPACK_OBJECT_BOOLEAN; + } + else if (strncasecmp(kv_key->ptr, "intValue", kv_key->size) == 0 || + strncasecmp(kv_key->ptr, "int_value", kv_key->size) == 0) { + internal_type = MSGPACK_OBJECT_POSITIVE_INTEGER; + } + else if (strncasecmp(kv_key->ptr, "doubleValue", kv_key->size) == 0 || + strncasecmp(kv_key->ptr, "double_value", kv_key->size) == 0) { + internal_type = MSGPACK_OBJECT_FLOAT; + } + else if (strncasecmp(kv_key->ptr, "bytesValue", kv_key->size) == 0 || + strncasecmp(kv_key->ptr, "bytes_value", kv_key->size) == 0) { + internal_type = MSGPACK_OBJECT_BIN; + } + else if (strncasecmp(kv_key->ptr, "arrayValue", kv_key->size) == 0 || + strncasecmp(kv_key->ptr, "array_value", kv_key->size) == 0) { + internal_type = MSGPACK_OBJECT_ARRAY; + } + else if (strncasecmp(kv_key->ptr, "kvlistValue", kv_key->size) == 0 || + strncasecmp(kv_key->ptr, "kvlist_value", kv_key->size) == 0) { + internal_type = MSGPACK_OBJECT_MAP; + } + } + } + + if (internal_type != -1) { + if (type != NULL) { + *type = internal_type; + } + + if (value != NULL) { + *value = kv_value; + } + + if (kv_value->type == MSGPACK_OBJECT_MAP) { + map = &kv_value->via.map; + + if (map->size == 1) { + kv_value = &map->ptr[0].val; + kv_key = &map->ptr[0].key.via.str; + + if (strncasecmp(kv_key->ptr, "values", kv_key->size) == 0) { + if (value != NULL) { + *value = kv_value; + } + } + else { + return -3; + } + } + } + } + else { + return -2; + } + + return 0; +} + +static int json_payload_append_unwrapped_value( + struct flb_log_event_encoder *encoder, + int target_field, + msgpack_object *object, + int *encoder_result) +{ + char temporary_buffer[33]; + int unwrap_value; + int result; + msgpack_object *value; + int type; + + result = json_payload_get_wrapped_value(object, + &value, + &type); + + if (result == 0) { + unwrap_value = FLB_FALSE; + + if (type == MSGPACK_OBJECT_STR) { + unwrap_value = FLB_TRUE; + } + else if (type == MSGPACK_OBJECT_BOOLEAN) { + unwrap_value = FLB_TRUE; + } + else if (type == MSGPACK_OBJECT_POSITIVE_INTEGER) { + if (value->type == MSGPACK_OBJECT_STR) { + memset(temporary_buffer, 0, sizeof(temporary_buffer)); + + if (value->via.str.size < sizeof(temporary_buffer)) { + strncpy(temporary_buffer, + value->via.str.ptr, + value->via.str.size); + } + else { + strncpy(temporary_buffer, + value->via.str.ptr, + sizeof(temporary_buffer) - 1); + } + + result = flb_log_event_encoder_append_int64( + encoder, + target_field, + strtoll(temporary_buffer, NULL, 10)); + } + else { + unwrap_value = FLB_TRUE; + } + } + else if (type == MSGPACK_OBJECT_FLOAT) { + unwrap_value = FLB_TRUE; + } + else if (type == MSGPACK_OBJECT_BIN) { + unwrap_value = FLB_TRUE; + } + else if (type == MSGPACK_OBJECT_ARRAY) { + result = json_payload_append_converted_array(encoder, + target_field, + value); + } + else if (type == MSGPACK_OBJECT_MAP) { + result = json_payload_append_converted_kvlist(encoder, + target_field, + value); + } + else { + return -2; + } + + if (unwrap_value) { + result = json_payload_append_converted_value(encoder, + target_field, + value); + } + + *encoder_result = result; + + return 0; + } + else { + return -1; + } + + return -1; +} + + +static int json_payload_append_converted_map( + struct flb_log_event_encoder *encoder, + int target_field, + msgpack_object *object) +{ + int encoder_result; + int result; + size_t index; + msgpack_object_map *map; + + map = &object->via.map; + + result = json_payload_append_unwrapped_value( + encoder, + target_field, + object, + &encoder_result); + + if (result == 0 && encoder_result == FLB_EVENT_ENCODER_SUCCESS) { + return result; + } + + result = flb_log_event_encoder_begin_map(encoder, target_field); + + for (index = 0 ; + index < map->size && + result == FLB_EVENT_ENCODER_SUCCESS; + index++) { + result = json_payload_append_converted_value( + encoder, + target_field, + &map->ptr[index].key); + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = json_payload_append_converted_value( + encoder, + target_field, + &map->ptr[index].val); + } + } + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_commit_map(encoder, target_field); + } + else { + flb_log_event_encoder_rollback_map(encoder, target_field); + } + + return result; +} + +static int json_payload_append_converted_array( + struct flb_log_event_encoder *encoder, + int target_field, + msgpack_object *object) +{ + int result; + size_t index; + msgpack_object_array *array; + + array = &object->via.array; + + result = flb_log_event_encoder_begin_array(encoder, target_field); + + for (index = 0 ; + index < array->size && + result == FLB_EVENT_ENCODER_SUCCESS; + index++) { + result = json_payload_append_converted_value( + encoder, + target_field, + &array->ptr[index]); + } + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_commit_array(encoder, target_field); + } + else { + flb_log_event_encoder_rollback_array(encoder, target_field); + } + + return result; +} + +static int json_payload_append_converted_kvlist( + struct flb_log_event_encoder *encoder, + int target_field, + msgpack_object *object) +{ + int value_index; + int key_index; + int result; + size_t index; + msgpack_object_array *array; + msgpack_object_map *entry; + + array = &object->via.array; + + result = flb_log_event_encoder_begin_map(encoder, target_field); + + for (index = 0 ; + index < array->size && + result == FLB_EVENT_ENCODER_SUCCESS; + index++) { + + if (array->ptr[index].type != MSGPACK_OBJECT_MAP) { + result = FLB_EVENT_ENCODER_ERROR_INVALID_ARGUMENT; + } + else { + entry = &array->ptr[index].via.map; + + key_index = find_map_entry_by_key(entry, "key", 0, FLB_TRUE); + + if (key_index == -1) { + result = FLB_EVENT_ENCODER_ERROR_INVALID_ARGUMENT; + } + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + value_index = find_map_entry_by_key(entry, "value", 0, FLB_TRUE); + } + + if (value_index == -1) { + result = FLB_EVENT_ENCODER_ERROR_INVALID_ARGUMENT; + } + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = json_payload_append_converted_value( + encoder, + target_field, + &entry->ptr[key_index].val); + } + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = json_payload_append_converted_value( + encoder, + target_field, + &entry->ptr[value_index].val); + } + } + } + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_commit_map(encoder, target_field); + } + else { + flb_log_event_encoder_rollback_map(encoder, target_field); + } + + return result; +} + +static int json_payload_append_converted_value( + struct flb_log_event_encoder *encoder, + int target_field, + msgpack_object *object) +{ + int result; + + result = FLB_EVENT_ENCODER_SUCCESS; + + switch (object->type) { + case MSGPACK_OBJECT_BOOLEAN: + result = flb_log_event_encoder_append_boolean( + encoder, + target_field, + object->via.boolean); + break; + + case MSGPACK_OBJECT_POSITIVE_INTEGER: + result = flb_log_event_encoder_append_uint64( + encoder, + target_field, + object->via.u64); + break; + case MSGPACK_OBJECT_NEGATIVE_INTEGER: + result = flb_log_event_encoder_append_int64( + encoder, + target_field, + object->via.i64); + break; + + case MSGPACK_OBJECT_FLOAT32: + case MSGPACK_OBJECT_FLOAT64: + result = flb_log_event_encoder_append_double( + encoder, + target_field, + object->via.f64); + break; + + case MSGPACK_OBJECT_STR: + result = flb_log_event_encoder_append_string( + encoder, + target_field, + (char *) object->via.str.ptr, + object->via.str.size); + + break; + + case MSGPACK_OBJECT_BIN: + result = flb_log_event_encoder_append_binary( + encoder, + target_field, + (char *) object->via.bin.ptr, + object->via.bin.size); + break; + + case MSGPACK_OBJECT_ARRAY: + result = json_payload_append_converted_array( + encoder, + target_field, + object); + break; + + case MSGPACK_OBJECT_MAP: + result = json_payload_append_converted_map( + encoder, + target_field, + object); + + break; + + default: + break; + } + + return result; +} + +static int process_json_payload_log_records_entry( + struct flb_opentelemetry *ctx, + struct flb_log_event_encoder *encoder, + msgpack_object *log_records_object) +{ + msgpack_object_map *log_records_entry; + char timestamp_str[32]; + msgpack_object *timestamp_object; + uint64_t timestamp_uint64; + msgpack_object *metadata_object; + msgpack_object *body_object; + int body_type; + struct flb_time timestamp; + int result; + + if (log_records_object->type != MSGPACK_OBJECT_MAP) { + flb_plg_error(ctx->ins, "unexpected logRecords entry type"); + + return -4; + } + + log_records_entry = &log_records_object->via.map; + + result = find_map_entry_by_key(log_records_entry, "timeUnixNano", 0, FLB_TRUE); + + if (result == -1) { + result = find_map_entry_by_key(log_records_entry, "time_unix_nano", 0, FLB_TRUE); + } + + if (result == -1) { + result = find_map_entry_by_key(log_records_entry, "observedTimeUnixNano", 0, FLB_TRUE); + } + + if (result == -1) { + result = find_map_entry_by_key(log_records_entry, "observed_time_unix_nano", 0, FLB_TRUE); + } + + if (result == -1) { + flb_plg_info(ctx->ins, "neither timeUnixNano nor observedTimeUnixNano found"); + + flb_time_get(×tamp); + } + else { + timestamp_object = &log_records_entry->ptr[result].val; + + if (timestamp_object->type == MSGPACK_OBJECT_POSITIVE_INTEGER) { + timestamp_uint64 = timestamp_object->via.u64; + } + else if (timestamp_object->type == MSGPACK_OBJECT_STR) { + memset(timestamp_str, 0, sizeof(timestamp_str)); + + if (timestamp_object->via.str.size < sizeof(timestamp_str)) { + strncpy(timestamp_str, + timestamp_object->via.str.ptr, + timestamp_object->via.str.size); + } + else { + strncpy(timestamp_str, + timestamp_object->via.str.ptr, + sizeof(timestamp_str) - 1); + } + + timestamp_uint64 = strtoul(timestamp_str, NULL, 10); + } + else { + flb_plg_error(ctx->ins, "unexpected timeUnixNano type"); + + return -4; + } + + flb_time_from_uint64(×tamp, timestamp_uint64); + } + + + result = find_map_entry_by_key(log_records_entry, "attributes", 0, FLB_TRUE); + + if (result == -1) { + flb_plg_debug(ctx->ins, "attributes missing"); + + metadata_object = NULL; + } + else { + if (log_records_entry->ptr[result].val.type != MSGPACK_OBJECT_ARRAY) { + flb_plg_error(ctx->ins, "unexpected attributes type"); + + return -4; + } + + metadata_object = &log_records_entry->ptr[result].val; + } + + result = find_map_entry_by_key(log_records_entry, "body", 0, FLB_TRUE); + + if (result == -1) { + flb_plg_info(ctx->ins, "body missing"); + + body_object = NULL; + } + else { + if (log_records_entry->ptr[result].val.type != MSGPACK_OBJECT_MAP) { + flb_plg_error(ctx->ins, "unexpected body type"); + + return -4; + } + + body_object = &log_records_entry->ptr[result].val; + } + + result = flb_log_event_encoder_begin_record(encoder); + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_set_timestamp(encoder, ×tamp); + } + + if (result == FLB_EVENT_ENCODER_SUCCESS && + metadata_object != NULL) { + flb_log_event_encoder_dynamic_field_reset(&encoder->metadata); + + result = json_payload_append_converted_kvlist( + encoder, + FLB_LOG_EVENT_METADATA, + metadata_object); + } + + if (result == FLB_EVENT_ENCODER_SUCCESS && + body_object != NULL) { + result = json_payload_get_wrapped_value(body_object, NULL, &body_type); + + if (result != 0 || body_type == MSGPACK_OBJECT_MAP) { + flb_log_event_encoder_dynamic_field_reset(&encoder->body); + } + else { + flb_log_event_encoder_append_cstring( + encoder, + FLB_LOG_EVENT_BODY, + "log"); + } + + result = json_payload_append_converted_value( + encoder, + FLB_LOG_EVENT_BODY, + body_object); + } + + result = flb_log_event_encoder_dynamic_field_flush(&encoder->body); + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_commit_record(encoder); + } + else { + flb_plg_error(ctx->ins, "log event encoder failure : %d", result); + + flb_log_event_encoder_rollback_record(encoder); + + result = -4; + } + + return result; +} + +static int process_json_payload_scope_logs_entry( + struct flb_opentelemetry *ctx, + struct flb_log_event_encoder *encoder, + msgpack_object *scope_logs_object) +{ + msgpack_object_map *scope_logs_entry; + msgpack_object_array *log_records; + int result; + size_t index; + + if (scope_logs_object->type != MSGPACK_OBJECT_MAP) { + flb_plg_error(ctx->ins, "unexpected scopeLogs entry type"); + + return -3; + } + + scope_logs_entry = &scope_logs_object->via.map; + + result = find_map_entry_by_key(scope_logs_entry, "logRecords", 0, FLB_TRUE); + + if (result == -1) { + result = find_map_entry_by_key(scope_logs_entry, "logRecords", 0, FLB_TRUE); + + if (result == -1) { + flb_plg_error(ctx->ins, "scopeLogs missing"); + + return -3; + } + } + + if (scope_logs_entry->ptr[result].val.type != MSGPACK_OBJECT_ARRAY) { + flb_plg_error(ctx->ins, "unexpected logRecords type"); + + return -3; + } + + log_records = &scope_logs_entry->ptr[result].val.via.array; + + result = 0; + + for (index = 0 ; index < log_records->size ; index++) { + result = process_json_payload_log_records_entry( + ctx, + encoder, + &log_records->ptr[index]); + } + + return result; +} + + +static int process_json_payload_resource_logs_entry( + struct flb_opentelemetry *ctx, + struct flb_log_event_encoder *encoder, + msgpack_object *resource_logs_object) +{ + msgpack_object_map *resource_logs_entry; + msgpack_object_array *scope_logs; + int result; + size_t index; + + + if (resource_logs_object->type != MSGPACK_OBJECT_MAP) { + flb_plg_error(ctx->ins, "unexpected resourceLogs entry type"); + + return -2; + } + + resource_logs_entry = &resource_logs_object->via.map; + + result = find_map_entry_by_key(resource_logs_entry, "scopeLogs", 0, FLB_TRUE); + + if (result == -1) { + result = find_map_entry_by_key(resource_logs_entry, "scope_logs", 0, FLB_TRUE); + + if (result == -1) { + flb_plg_error(ctx->ins, "scopeLogs missing"); + + return -2; + } + } + + if (resource_logs_entry->ptr[result].val.type != MSGPACK_OBJECT_ARRAY) { + flb_plg_error(ctx->ins, "unexpected scopeLogs type"); + + return -2; + } + + scope_logs = &resource_logs_entry->ptr[result].val.via.array; + + result = 0; + + for (index = 0 ; index < scope_logs->size ; index++) { + result = process_json_payload_scope_logs_entry( + ctx, + encoder, + &scope_logs->ptr[index]); + } + + return result; +} + +static int process_json_payload_root(struct flb_opentelemetry *ctx, + struct flb_log_event_encoder *encoder, + msgpack_object *root_object) +{ + msgpack_object_array *resource_logs; + int result; + size_t index; + msgpack_object_map *root; + + if (root_object->type != MSGPACK_OBJECT_MAP) { + flb_plg_error(ctx->ins, "unexpected root object type"); + + return -1; + } + + root = &root_object->via.map; + + result = find_map_entry_by_key(root, "resourceLogs", 0, FLB_TRUE); + + if (result == -1) { + result = find_map_entry_by_key(root, "resource_logs", 0, FLB_TRUE); + + if (result == -1) { + flb_plg_error(ctx->ins, "resourceLogs missing"); + + return -1; + } + } + + if (root->ptr[result].val.type != MSGPACK_OBJECT_ARRAY) { + flb_plg_error(ctx->ins, "unexpected resourceLogs type"); + + return -1; + } + + resource_logs = &root->ptr[result].val.via.array; + + result = 0; + + for (index = 0 ; index < resource_logs->size ; index++) { + result = process_json_payload_resource_logs_entry( + ctx, + encoder, + &resource_logs->ptr[index]); + } + + return result; +} + +/* This code is definitely not complete and beyond fishy, it needs to be + * refactored. + */ +static int json_payload_to_msgpack(struct flb_opentelemetry *ctx, + struct flb_log_event_encoder *encoder, + const char *body, + size_t len) +{ + size_t msgpack_body_length; + msgpack_unpacked unpacked_root; + char *msgpack_body; + int root_type; + size_t offset; + int result; + + result = flb_pack_json(body, len, &msgpack_body, &msgpack_body_length, + &root_type, NULL); + + if (result != 0) { + flb_plg_error(ctx->ins, "json to msgpack conversion error"); + } + else { + msgpack_unpacked_init(&unpacked_root); + + offset = 0; + result = msgpack_unpack_next(&unpacked_root, + msgpack_body, + msgpack_body_length, + &offset); + + if (result == MSGPACK_UNPACK_SUCCESS) { + result = process_json_payload_root(ctx, + encoder, + &unpacked_root.data); + } + else { + result = -1; + } + + msgpack_unpacked_destroy(&unpacked_root); + + flb_free(msgpack_body); + } + + return result; +} + +static int process_payload_logs(struct flb_opentelemetry *ctx, struct http_conn *conn, + flb_sds_t tag, + struct mk_http_session *session, + struct mk_http_request *request) +{ + struct flb_log_event_encoder *encoder; + int ret; + + encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_FLUENT_BIT_V2); + + if (encoder == NULL) { + return -1; + } + + /* Check if the incoming payload is a valid JSON message and convert it to msgpack */ + if (strncasecmp(request->content_type.data, + "application/json", + request->content_type.len) == 0) { + ret = json_payload_to_msgpack(ctx, + encoder, + request->data.data, + request->data.len); + } + else if (strncasecmp(request->content_type.data, + "application/x-protobuf", + request->content_type.len) == 0) { + ret = binary_payload_to_msgpack(encoder, (uint8_t *) request->data.data, request->data.len); + } + else { + flb_error("[otel] Unsupported content type %.*s", (int)request->content_type.len, request->content_type.data); + + ret = -1; + } + + if (ret == 0) { + ret = flb_input_log_append(ctx->ins, + tag, + flb_sds_len(tag), + encoder->output_buffer, + encoder->output_length); + } + + flb_log_event_encoder_destroy(encoder); + + return ret; +} + +static inline int mk_http_point_header(mk_ptr_t *h, + struct mk_http_parser *parser, int key) +{ + struct mk_http_header *header; + + header = &parser->headers[key]; + if (header->type == key) { + h->data = header->val.data; + h->len = header->val.len; + return 0; + } + else { + h->data = NULL; + h->len = -1; + } + + return -1; +} + +static \ +int uncompress_zlib(char **output_buffer, + size_t *output_size, + char *input_buffer, + size_t input_size) +{ + flb_error("[opentelemetry] unsupported compression format"); + + return -1; +} + +static \ +int uncompress_zstd(char **output_buffer, + size_t *output_size, + char *input_buffer, + size_t input_size) +{ + flb_error("[opentelemetry] unsupported compression format"); + + return -1; +} + +static \ +int uncompress_deflate(char **output_buffer, + size_t *output_size, + char *input_buffer, + size_t input_size) +{ + flb_error("[opentelemetry] unsupported compression format"); + + return -1; +} + +static \ +int uncompress_snappy(char **output_buffer, + size_t *output_size, + char *input_buffer, + size_t input_size) +{ + int ret; + + ret = flb_snappy_uncompress_framed_data(input_buffer, + input_size, + output_buffer, + output_size); + + if (ret != 0) { + flb_error("[opentelemetry] snappy decompression failed"); + + return -1; + } + + return 1; +} + +static \ +int uncompress_gzip(char **output_buffer, + size_t *output_size, + char *input_buffer, + size_t input_size) +{ + int ret; + + ret = flb_gzip_uncompress(input_buffer, + input_size, + (void *) output_buffer, + output_size); + + if (ret == -1) { + flb_error("[opentelemetry] gzip decompression failed"); + + return -1; + } + + return 1; +} + +int opentelemetry_prot_uncompress(struct mk_http_session *session, + struct mk_http_request *request, + char **output_buffer, + size_t *output_size) +{ + struct mk_http_header *header; + size_t index; + + *output_buffer = NULL; + *output_size = 0; + + for (index = 0; + index < session->parser.headers_extra_count; + index++) { + header = &session->parser.headers_extra[index]; + + if (strncasecmp(header->key.data, "Content-Encoding", 16) == 0) { + if (strncasecmp(header->val.data, "gzip", 4) == 0) { + return uncompress_gzip(output_buffer, + output_size, + request->data.data, + request->data.len); + } + else if (strncasecmp(header->val.data, "zlib", 4) == 0) { + return uncompress_zlib(output_buffer, + output_size, + request->data.data, + request->data.len); + } + else if (strncasecmp(header->val.data, "zstd", 4) == 0) { + return uncompress_zstd(output_buffer, + output_size, + request->data.data, + request->data.len); + } + else if (strncasecmp(header->val.data, "snappy", 6) == 0) { + return uncompress_snappy(output_buffer, + output_size, + request->data.data, + request->data.len); + } + else if (strncasecmp(header->val.data, "deflate", 4) == 0) { + return uncompress_deflate(output_buffer, + output_size, + request->data.data, + request->data.len); + } + else { + return -2; + } + } + } + + return 0; +} + + +/* + * Handle an incoming request. It perform extra checks over the request, if + * everything is OK, it enqueue the incoming payload. + */ +int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *conn, + struct mk_http_session *session, + struct mk_http_request *request) +{ + int i; + int ret = -1; + int len; + char *uri; + char *qs; + off_t diff; + flb_sds_t tag; + struct mk_http_header *header; + char *original_data; + size_t original_data_size; + char *uncompressed_data; + size_t uncompressed_data_size; + + if (request->uri.data[0] != '/') { + send_response(conn, 400, "error: invalid request\n"); + return -1; + } + + /* Decode URI */ + uri = mk_utils_url_decode(request->uri); + if (!uri) { + uri = mk_mem_alloc_z(request->uri.len + 1); + if (!uri) { + return -1; + } + memcpy(uri, request->uri.data, request->uri.len); + uri[request->uri.len] = '\0'; + } + + if (strcmp(uri, "/v1/metrics") != 0 && + strcmp(uri, "/v1/traces") != 0 && + strcmp(uri, "/v1/logs") != 0) { + + send_response(conn, 400, "error: invalid endpoint\n"); + mk_mem_free(uri); + + return -1; + } + + /* Try to match a query string so we can remove it */ + qs = strchr(uri, '?'); + if (qs) { + /* remove the query string part */ + diff = qs - uri; + uri[diff] = '\0'; + } + + /* Compose the query string using the URI */ + len = strlen(uri); + + if (len == 1) { + tag = NULL; /* use default tag */ + } + else { + tag = flb_sds_create_size(len); + if (!tag) { + mk_mem_free(uri); + return -1; + } + + /* New tag skipping the URI '/' */ + flb_sds_cat(tag, uri + 1, len - 1); + + /* Sanitize, only allow alphanum chars */ + for (i = 0; i < flb_sds_len(tag); i++) { + if (!isalnum(tag[i]) && tag[i] != '_' && tag[i] != '.') { + tag[i] = '_'; + } + } + } + + /* Check if we have a Host header: Hostname ; port */ + mk_http_point_header(&request->host, &session->parser, MK_HEADER_HOST); + + /* Header: Connection */ + mk_http_point_header(&request->connection, &session->parser, + MK_HEADER_CONNECTION); + + /* HTTP/1.1 needs Host header */ + if (!request->host.data && request->protocol == MK_HTTP_PROTOCOL_11) { + flb_sds_destroy(tag); + mk_mem_free(uri); + return -1; + } + + /* Should we close the session after this request ? */ + mk_http_keepalive_check(session, request, ctx->server); + + /* Content Length */ + header = &session->parser.headers[MK_HEADER_CONTENT_LENGTH]; + if (header->type == MK_HEADER_CONTENT_LENGTH) { + request->_content_length.data = header->val.data; + request->_content_length.len = header->val.len; + } + else { + request->_content_length.data = NULL; + } + + mk_http_point_header(&request->content_type, &session->parser, MK_HEADER_CONTENT_TYPE); + + if (request->method != MK_METHOD_POST) { + flb_sds_destroy(tag); + mk_mem_free(uri); + send_response(conn, 400, "error: invalid HTTP method\n"); + return -1; + } + + original_data = request->data.data; + original_data_size = request->data.len; + + ret = opentelemetry_prot_uncompress(session, request, + &uncompressed_data, + &uncompressed_data_size); + + if (ret > 0) { + request->data.data = uncompressed_data; + request->data.len = uncompressed_data_size; + } + + if (strcmp(uri, "/v1/metrics") == 0) { + ret = process_payload_metrics(ctx, conn, tag, session, request); + } + else if (strcmp(uri, "/v1/traces") == 0) { + ret = process_payload_traces(ctx, conn, tag, session, request); + } + else if (strcmp(uri, "/v1/logs") == 0) { + ret = process_payload_logs(ctx, conn, tag, session, request); + } + + if (uncompressed_data != NULL) { + flb_free(uncompressed_data); + } + + request->data.data = original_data; + request->data.len = original_data_size; + + mk_mem_free(uri); + flb_sds_destroy(tag); + + send_response(conn, ctx->successful_response_code, NULL); + + return ret; +} + +/* + * Handle an incoming request which has resulted in an http parser error. + */ +int opentelemetry_prot_handle_error(struct flb_opentelemetry *ctx, struct http_conn *conn, + struct mk_http_session *session, + struct mk_http_request *request) +{ + send_response(conn, 400, "error: invalid request\n"); + return -1; +} diff --git a/fluent-bit/plugins/in_opentelemetry/opentelemetry_prot.h b/fluent-bit/plugins/in_opentelemetry/opentelemetry_prot.h new file mode 100644 index 000000000..bbfd8332f --- /dev/null +++ b/fluent-bit/plugins/in_opentelemetry/opentelemetry_prot.h @@ -0,0 +1,31 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_OPENTELEMETRY_PROT +#define FLB_IN_OPENTELEMETRY_PROT + +int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *conn, + struct mk_http_session *session, + struct mk_http_request *request); + +int opentelemetry_prot_handle_error(struct flb_opentelemetry *ctx, struct http_conn *conn, + struct mk_http_session *session, + struct mk_http_request *request); + +#endif |