summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/out_opentelemetry
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/plugins/out_opentelemetry')
-rw-r--r--src/fluent-bit/plugins/out_opentelemetry/CMakeLists.txt6
-rw-r--r--src/fluent-bit/plugins/out_opentelemetry/opentelemetry.c1207
-rw-r--r--src/fluent-bit/plugins/out_opentelemetry/opentelemetry.h80
-rw-r--r--src/fluent-bit/plugins/out_opentelemetry/opentelemetry_conf.c262
-rw-r--r--src/fluent-bit/plugins/out_opentelemetry/opentelemetry_conf.h33
5 files changed, 0 insertions, 1588 deletions
diff --git a/src/fluent-bit/plugins/out_opentelemetry/CMakeLists.txt b/src/fluent-bit/plugins/out_opentelemetry/CMakeLists.txt
deleted file mode 100644
index 03c697ab2..000000000
--- a/src/fluent-bit/plugins/out_opentelemetry/CMakeLists.txt
+++ /dev/null
@@ -1,6 +0,0 @@
-set(src
- opentelemetry.c
- opentelemetry_conf.c
- )
-
-FLB_PLUGIN(out_opentelemetry "${src}" "")
diff --git a/src/fluent-bit/plugins/out_opentelemetry/opentelemetry.c b/src/fluent-bit/plugins/out_opentelemetry/opentelemetry.c
deleted file mode 100644
index c981cc27c..000000000
--- a/src/fluent-bit/plugins/out_opentelemetry/opentelemetry.c
+++ /dev/null
@@ -1,1207 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <fluent-bit/flb_output_plugin.h>
-#include <fluent-bit/flb_snappy.h>
-#include <fluent-bit/flb_metrics.h>
-#include <fluent-bit/flb_time.h>
-#include <fluent-bit/flb_kv.h>
-#include <fluent-bit/flb_pack.h>
-#include <fluent-bit/flb_log_event_decoder.h>
-
-#include <cfl/cfl.h>
-#include <fluent-otel-proto/fluent-otel.h>
-
-#include <cmetrics/cmetrics.h>
-#include <fluent-bit/flb_gzip.h>
-#include <cmetrics/cmt_encode_opentelemetry.h>
-
-#include <ctraces/ctraces.h>
-#include <ctraces/ctr_decode_msgpack.h>
-
-extern cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt);
-extern void cmt_encode_opentelemetry_destroy(cfl_sds_t text);
-
-#include "opentelemetry.h"
-#include "opentelemetry_conf.h"
-
-static inline Opentelemetry__Proto__Common__V1__AnyValue *msgpack_object_to_otlp_any_value(struct msgpack_object *o);
-
-static inline void otlp_any_value_destroy(Opentelemetry__Proto__Common__V1__AnyValue *value);
-static inline void otlp_kvarray_destroy(Opentelemetry__Proto__Common__V1__KeyValue **kvarray, size_t entry_count);
-static inline void otlp_kvpair_destroy(Opentelemetry__Proto__Common__V1__KeyValue *kvpair);
-static inline void otlp_kvlist_destroy(Opentelemetry__Proto__Common__V1__KeyValueList *kvlist);
-static inline void otlp_array_destroy(Opentelemetry__Proto__Common__V1__ArrayValue *array);
-
-static inline void otlp_kvarray_destroy(Opentelemetry__Proto__Common__V1__KeyValue **kvarray, size_t entry_count)
-{
- size_t index;
-
- if (kvarray != NULL) {
- for (index = 0 ; index < entry_count ; index++) {
- if (kvarray[index] != NULL) {
- otlp_kvpair_destroy(kvarray[index]);
- kvarray[index] = NULL;
- }
- }
-
- flb_free(kvarray);
- }
-}
-
-static inline void otlp_kvpair_destroy(Opentelemetry__Proto__Common__V1__KeyValue *kvpair)
-{
- if (kvpair != NULL) {
- if (kvpair->key != NULL) {
- flb_free(kvpair->key);
- }
-
- if (kvpair->value != NULL) {
- otlp_any_value_destroy(kvpair->value);
- }
-
- flb_free(kvpair);
- }
-}
-
-static inline void otlp_kvlist_destroy(Opentelemetry__Proto__Common__V1__KeyValueList *kvlist)
-{
- size_t index;
-
- if (kvlist != NULL) {
- if (kvlist->values != NULL) {
- for (index = 0 ; index < kvlist->n_values ; index++) {
- otlp_kvpair_destroy(kvlist->values[index]);
- }
-
- flb_free(kvlist->values);
- }
-
- flb_free(kvlist);
- }
-}
-
-static inline void otlp_array_destroy(Opentelemetry__Proto__Common__V1__ArrayValue *array)
-{
- size_t index;
-
- if (array != NULL) {
- if (array->values != NULL) {
- for (index = 0 ; index < array->n_values ; index++) {
- otlp_any_value_destroy(array->values[index]);
- }
-
- flb_free(array->values);
- }
-
- flb_free(array);
- }
-}
-
-static inline void otlp_any_value_destroy(Opentelemetry__Proto__Common__V1__AnyValue *value)
-{
- if (value != NULL) {
- if (value->value_case == OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_STRING_VALUE) {
- if (value->string_value != NULL) {
- flb_free(value->string_value);
- }
- }
- else if (value->value_case == OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_ARRAY_VALUE) {
- if (value->array_value != NULL) {
- otlp_array_destroy(value->array_value);
- }
- }
- else if (value->value_case == OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_KVLIST_VALUE) {
- if (value->kvlist_value != NULL) {
- otlp_kvlist_destroy(value->kvlist_value);
- }
- }
- else if (value->value_case == OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_BYTES_VALUE) {
- if (value->bytes_value.data != NULL) {
- flb_free(value->bytes_value.data);
- }
- }
-
- value->string_value = NULL;
-
- flb_free(value);
- }
-}
-
-static int http_post(struct opentelemetry_context *ctx,
- const void *body, size_t body_len,
- const char *tag, int tag_len,
- const char *uri)
-{
- size_t final_body_len;
- void *final_body;
- int compressed;
- int out_ret;
- size_t b_sent;
- struct flb_connection *u_conn;
- struct mk_list *head;
- int ret;
- struct flb_slist_entry *key;
- struct flb_slist_entry *val;
- struct flb_config_map_val *mv;
- struct flb_http_client *c;
-
- compressed = FLB_FALSE;
-
- u_conn = flb_upstream_conn_get(ctx->u);
-
- if (u_conn == NULL) {
- flb_plg_error(ctx->ins,
- "no upstream connections available to %s:%i",
- ctx->u->tcp_host,
- ctx->u->tcp_port);
-
- return FLB_RETRY;
- }
-
- if (ctx->compress_gzip) {
- ret = flb_gzip_compress((void *) body, body_len,
- &final_body, &final_body_len);
-
- if (ret == 0) {
- compressed = FLB_TRUE;
- } else {
- flb_plg_error(ctx->ins, "cannot gzip payload, disabling compression");
- }
- } else {
- final_body = (void *) body;
- final_body_len = body_len;
- }
-
- /* Create HTTP client context */
- c = flb_http_client(u_conn, FLB_HTTP_POST, uri,
- final_body, final_body_len,
- ctx->host, ctx->port,
- ctx->proxy, 0);
-
- if (c == NULL) {
- flb_plg_error(ctx->ins, "error initializing http client");
-
- if (compressed) {
- flb_free(final_body);
- }
-
- flb_upstream_conn_release(u_conn);
-
- return FLB_RETRY;
- }
-
- if (c->proxy.host != NULL) {
- flb_plg_debug(ctx->ins, "[http_client] proxy host: %s port: %i",
- c->proxy.host, c->proxy.port);
- }
-
- /* Allow duplicated headers ? */
- flb_http_allow_duplicated_headers(c, FLB_FALSE);
-
- /*
- * Direct assignment of the callback context to the HTTP client context.
- * This needs to be improved through a more clean API.
- */
- c->cb_ctx = ctx->ins->callback;
-
- flb_http_add_header(c,
- FLB_OPENTELEMETRY_CONTENT_TYPE_HEADER_NAME,
- sizeof(FLB_OPENTELEMETRY_CONTENT_TYPE_HEADER_NAME) - 1,
- FLB_OPENTELEMETRY_MIME_PROTOBUF_LITERAL,
- sizeof(FLB_OPENTELEMETRY_MIME_PROTOBUF_LITERAL) - 1);
-
- /* Basic Auth headers */
- if (ctx->http_user != NULL &&
- ctx->http_passwd != NULL) {
- flb_http_basic_auth(c, ctx->http_user, ctx->http_passwd);
- }
-
- flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
-
- flb_config_map_foreach(head, mv, ctx->headers) {
- key = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head);
- val = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head);
-
- flb_http_add_header(c,
- key->str, flb_sds_len(key->str),
- val->str, flb_sds_len(val->str));
- }
-
- if (compressed) {
- flb_http_set_content_encoding_gzip(c);
- }
-
- ret = flb_http_do(c, &b_sent);
-
- if (ret == 0) {
- /*
- * Only allow the following HTTP status:
- *
- * - 200: OK
- * - 201: Created
- * - 202: Accepted
- * - 203: no authorative resp
- * - 204: No Content
- * - 205: Reset content
- *
- */
- if (c->resp.status < 200 || c->resp.status > 205) {
- if (ctx->log_response_payload &&
- c->resp.payload != NULL &&
- c->resp.payload_size > 0) {
- flb_plg_error(ctx->ins, "%s:%i, HTTP status=%i\n%.*s",
- ctx->host, ctx->port,
- c->resp.status,
- (int) c->resp.payload_size,
- c->resp.payload);
- }
- else {
- flb_plg_error(ctx->ins, "%s:%i, HTTP status=%i",
- ctx->host, ctx->port, c->resp.status);
- }
-
- out_ret = FLB_RETRY;
- }
- else {
- if (ctx->log_response_payload &&
- c->resp.payload != NULL &&
- c->resp.payload_size > 0) {
- flb_plg_info(ctx->ins, "%s:%i, HTTP status=%i\n%.*s",
- ctx->host, ctx->port,
- c->resp.status,
- (int) c->resp.payload_size,
- c->resp.payload);
- }
- else {
- flb_plg_info(ctx->ins, "%s:%i, HTTP status=%i",
- ctx->host, ctx->port,
- c->resp.status);
- }
-
- out_ret = FLB_OK;
- }
- }
- else {
- flb_plg_error(ctx->ins, "could not flush records to %s:%i (http_do=%i)",
- ctx->host, ctx->port, ret);
-
- out_ret = FLB_RETRY;
- }
-
- if (compressed) {
- flb_free(final_body);
- }
-
- /* Destroy HTTP client context */
- flb_http_client_destroy(c);
-
- /* Release the TCP connection */
- flb_upstream_conn_release(u_conn);
-
- return out_ret;
-}
-
-static void append_labels(struct opentelemetry_context *ctx,
- struct cmt *cmt)
-{
- struct flb_kv *kv;
- struct mk_list *head;
-
- mk_list_foreach(head, &ctx->kv_labels) {
- kv = mk_list_entry(head, struct flb_kv, _head);
- cmt_label_add(cmt, kv->key, kv->val);
- }
-}
-
-static void clear_array(Opentelemetry__Proto__Logs__V1__LogRecord **logs,
- size_t log_count)
-{
- size_t index;
-
- if (logs == NULL){
- return;
- }
-
- for (index = 0 ; index < log_count ; index++) {
- if (logs[index]->body != NULL) {
- otlp_any_value_destroy(logs[index]->body);
-
- logs[index]->body = NULL;
- }
-
- if (logs[index]->attributes != NULL) {
- otlp_kvarray_destroy(logs[index]->attributes,
- logs[index]->n_attributes);
-
- logs[index]->attributes = NULL;
- }
- }
-}
-
-static Opentelemetry__Proto__Common__V1__ArrayValue *otlp_array_value_initialize(size_t entry_count)
-{
- Opentelemetry__Proto__Common__V1__ArrayValue *value;
-
- value = flb_calloc(1, sizeof(Opentelemetry__Proto__Common__V1__ArrayValue));
-
- if (value != NULL) {
- opentelemetry__proto__common__v1__array_value__init(value);
-
- if (entry_count > 0) {
- value->values = \
- flb_calloc(entry_count,
- sizeof(Opentelemetry__Proto__Common__V1__AnyValue *));
-
- if (value->values == NULL) {
- flb_free(value);
-
- value = NULL;
- }
- else {
- value->n_values = entry_count;
- }
- }
- }
-
- return value;
-}
-
-static Opentelemetry__Proto__Common__V1__KeyValue *otlp_kvpair_value_initialize()
-{
- Opentelemetry__Proto__Common__V1__KeyValue *value;
-
- value = flb_calloc(1, sizeof(Opentelemetry__Proto__Common__V1__KeyValue));
-
- if (value != NULL) {
- opentelemetry__proto__common__v1__key_value__init(value);
- }
-
- return value;
-}
-
-static Opentelemetry__Proto__Common__V1__KeyValueList *otlp_kvlist_value_initialize(size_t entry_count)
-{
- Opentelemetry__Proto__Common__V1__KeyValueList *value;
-
- value = flb_calloc(1, sizeof(Opentelemetry__Proto__Common__V1__KeyValueList));
-
- if (value != NULL) {
- opentelemetry__proto__common__v1__key_value_list__init(value);
-
- if (entry_count > 0) {
- value->values = \
- flb_calloc(entry_count,
- sizeof(Opentelemetry__Proto__Common__V1__KeyValue *));
-
- if (value->values == NULL) {
- flb_free(value);
-
- value = NULL;
- }
- else {
- value->n_values = entry_count;
- }
- }
- }
-
- return value;
-}
-
-static Opentelemetry__Proto__Common__V1__AnyValue *otlp_any_value_initialize(int data_type, size_t entry_count)
-{
- Opentelemetry__Proto__Common__V1__AnyValue *value;
-
- value = flb_calloc(1, sizeof(Opentelemetry__Proto__Common__V1__AnyValue));
-
- if (value == NULL) {
- return NULL;
- }
-
- opentelemetry__proto__common__v1__any_value__init(value);
-
- if (data_type == MSGPACK_OBJECT_STR) {
- value->value_case = OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_STRING_VALUE;
- }
- else if (data_type == MSGPACK_OBJECT_NIL) {
- value->value_case = OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE__NOT_SET;
- }
- else if (data_type == MSGPACK_OBJECT_BOOLEAN) {
- value->value_case = OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_BOOL_VALUE;
- }
- else if (data_type == MSGPACK_OBJECT_POSITIVE_INTEGER || data_type == MSGPACK_OBJECT_NEGATIVE_INTEGER) {
- value->value_case = OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_INT_VALUE;
- }
- else if (data_type == MSGPACK_OBJECT_FLOAT32 || data_type == MSGPACK_OBJECT_FLOAT64) {
- value->value_case = OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_DOUBLE_VALUE;
- }
- else if (data_type == MSGPACK_OBJECT_ARRAY) {
- value->value_case = OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_ARRAY_VALUE;
- value->array_value = otlp_array_value_initialize(entry_count);
-
- if (value->array_value == NULL) {
- flb_free(value);
-
- value = NULL;
- }
- }
- else if (data_type == MSGPACK_OBJECT_MAP) {
- value->value_case = OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_KVLIST_VALUE;
-
- value->kvlist_value = otlp_kvlist_value_initialize(entry_count);
-
- if (value->kvlist_value == NULL) {
- flb_free(value);
-
- value = NULL;
- }
- }
- else if (data_type == MSGPACK_OBJECT_BIN) {
- value->value_case = OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_BYTES_VALUE;
- }
- else {
- flb_free(value);
-
- value = NULL;
- }
-
- return value;
-}
-
-static inline Opentelemetry__Proto__Common__V1__AnyValue *msgpack_boolean_to_otlp_any_value(struct msgpack_object *o)
-{
- Opentelemetry__Proto__Common__V1__AnyValue *result;
-
- result = otlp_any_value_initialize(MSGPACK_OBJECT_BOOLEAN, 0);
-
- if (result != NULL) {
- result->bool_value = o->via.boolean;
- }
-
- return result;
-}
-
-static inline Opentelemetry__Proto__Common__V1__AnyValue *msgpack_integer_to_otlp_any_value(struct msgpack_object *o)
-{
- Opentelemetry__Proto__Common__V1__AnyValue *result;
-
- result = otlp_any_value_initialize(o->type, 0);
-
- if (result != NULL) {
- if (o->type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
- result->int_value = (int64_t) o->via.u64;
- }
- else {
- result->int_value = o->via.i64;
- }
- }
-
- return result;
-}
-
-static inline Opentelemetry__Proto__Common__V1__AnyValue *msgpack_float_to_otlp_any_value(struct msgpack_object *o)
-{
- Opentelemetry__Proto__Common__V1__AnyValue *result;
-
- result = otlp_any_value_initialize(o->type, 0);
-
- if (result != NULL) {
- result->double_value = o->via.f64;
- }
-
- return result;
-}
-
-static inline Opentelemetry__Proto__Common__V1__AnyValue *msgpack_string_to_otlp_any_value(struct msgpack_object *o)
-{
- Opentelemetry__Proto__Common__V1__AnyValue *result;
-
- result = otlp_any_value_initialize(MSGPACK_OBJECT_STR, 0);
-
- if (result != NULL) {
- result->string_value = flb_strndup(o->via.str.ptr, o->via.str.size);
-
- if (result->string_value == NULL) {
- otlp_any_value_destroy(result);
-
- result = NULL;
- }
- }
-
- return result;
-}
-
-static inline Opentelemetry__Proto__Common__V1__AnyValue *msgpack_nil_to_otlp_any_value(struct msgpack_object *o)
-{
- Opentelemetry__Proto__Common__V1__AnyValue *result;
-
- result = otlp_any_value_initialize(MSGPACK_OBJECT_NIL, 0);
-
- if (result != NULL) {
- result->string_value = NULL;
- }
-
- return result;
-}
-
-static inline Opentelemetry__Proto__Common__V1__AnyValue *msgpack_bin_to_otlp_any_value(struct msgpack_object *o)
-{
- Opentelemetry__Proto__Common__V1__AnyValue *result;
-
- result = otlp_any_value_initialize(MSGPACK_OBJECT_BIN, 0);
-
- if (result != NULL) {
- result->bytes_value.len = o->via.bin.size;
- result->bytes_value.data = flb_malloc(o->via.bin.size);
-
- if (result->bytes_value.data == NULL) {
- otlp_any_value_destroy(result);
-
- result = NULL;
- }
-
- memcpy(result->bytes_value.data, o->via.bin.ptr, o->via.bin.size);
- }
-
- return result;
-}
-
-static inline Opentelemetry__Proto__Common__V1__AnyValue *msgpack_array_to_otlp_any_value(struct msgpack_object *o)
-{
- size_t entry_count;
- Opentelemetry__Proto__Common__V1__AnyValue *entry_value;
- Opentelemetry__Proto__Common__V1__AnyValue *result;
- size_t index;
- msgpack_object *p;
-
- entry_count = o->via.array.size;
- result = otlp_any_value_initialize(MSGPACK_OBJECT_ARRAY, entry_count);
-
- p = o->via.array.ptr;
-
- if (result != NULL) {
- index = 0;
-
- for (index = 0 ; index < entry_count ; index++) {
- entry_value = msgpack_object_to_otlp_any_value(&p[index]);
-
- if (entry_value == NULL) {
- otlp_any_value_destroy(result);
-
- result = NULL;
-
- break;
- }
-
- result->array_value->values[index] = entry_value;
- }
- }
-
- return result;
-}
-
-static inline Opentelemetry__Proto__Common__V1__KeyValue *msgpack_kv_to_otlp_any_value(struct msgpack_object_kv *input_pair)
-{
- Opentelemetry__Proto__Common__V1__KeyValue *kv;
-
- kv = otlp_kvpair_value_initialize();
- if (kv == NULL) {
- flb_errno();
-
- return NULL;
- }
-
- kv->key = flb_strndup(input_pair->key.via.str.ptr, input_pair->key.via.str.size);
- if (kv->key == NULL) {
- flb_errno();
- flb_free(kv);
-
- return NULL;
- }
-
- kv->value = msgpack_object_to_otlp_any_value(&input_pair->val);
- if (kv->value == NULL) {
- flb_free(kv->key);
- flb_free(kv);
-
- return NULL;
- }
-
- return kv;
-}
-
-static inline Opentelemetry__Proto__Common__V1__KeyValue **msgpack_map_to_otlp_kvarray(struct msgpack_object *o, size_t *entry_count)
-{
- Opentelemetry__Proto__Common__V1__KeyValue **result;
- size_t index;
- msgpack_object_kv *kv;
-
- *entry_count = o->via.map.size;
- result = flb_calloc(*entry_count, sizeof(Opentelemetry__Proto__Common__V1__KeyValue *));
-
- if (result != NULL) {
- for (index = 0; index < *entry_count; index++) {
- kv = &o->via.map.ptr[index];
- result[index] = msgpack_kv_to_otlp_any_value(kv);
- }
- }
- else {
- *entry_count = 0;
- }
-
- return result;
-}
-
-static inline Opentelemetry__Proto__Common__V1__AnyValue *msgpack_map_to_otlp_any_value(struct msgpack_object *o)
-{
- size_t entry_count;
- Opentelemetry__Proto__Common__V1__AnyValue *result;
- Opentelemetry__Proto__Common__V1__KeyValue *keyvalue;
- size_t index;
- msgpack_object_kv *kv;
-
- entry_count = o->via.map.size;
- result = otlp_any_value_initialize(MSGPACK_OBJECT_MAP, entry_count);
-
- if (result != NULL) {
-
- for (index = 0; index < entry_count; index++) {
- kv = &o->via.map.ptr[index];
- keyvalue = msgpack_kv_to_otlp_any_value(kv);
- result->kvlist_value->values[index] = keyvalue;
- }
- }
-
- return result;
-}
-
-static inline Opentelemetry__Proto__Common__V1__AnyValue *msgpack_object_to_otlp_any_value(struct msgpack_object *o)
-{
- Opentelemetry__Proto__Common__V1__AnyValue *result;
-
- result = NULL;
-
- switch (o->type) {
- case MSGPACK_OBJECT_NIL:
- result = msgpack_nil_to_otlp_any_value(o);
- break;
-
- case MSGPACK_OBJECT_BOOLEAN:
- result = msgpack_boolean_to_otlp_any_value(o);
- break;
-
- case MSGPACK_OBJECT_POSITIVE_INTEGER:
- case MSGPACK_OBJECT_NEGATIVE_INTEGER:
- result = msgpack_integer_to_otlp_any_value(o);
- break;
-
- case MSGPACK_OBJECT_FLOAT32:
- case MSGPACK_OBJECT_FLOAT64:
- result = msgpack_float_to_otlp_any_value(o);
- break;
-
- case MSGPACK_OBJECT_STR:
- result = msgpack_string_to_otlp_any_value(o);
- break;
-
- case MSGPACK_OBJECT_MAP:
- result = msgpack_map_to_otlp_any_value(o);
- break;
-
- case MSGPACK_OBJECT_BIN:
- result = msgpack_bin_to_otlp_any_value(o);
- break;
-
- case MSGPACK_OBJECT_ARRAY:
- result = msgpack_array_to_otlp_any_value(o);
- break;
-
- default:
- break;
- }
-
- /* This function will fail if it receives an object with
- * type MSGPACK_OBJECT_EXT
- */
-
- return result;
-}
-
-static int flush_to_otel(struct opentelemetry_context *ctx,
- struct flb_event_chunk *event_chunk,
- Opentelemetry__Proto__Logs__V1__LogRecord **logs,
- size_t log_count)
-{
- Opentelemetry__Proto__Collector__Logs__V1__ExportLogsServiceRequest export_logs;
- Opentelemetry__Proto__Logs__V1__ScopeLogs scope_log;
- Opentelemetry__Proto__Logs__V1__ResourceLogs resource_log;
- Opentelemetry__Proto__Logs__V1__ResourceLogs *resource_logs[1];
- Opentelemetry__Proto__Logs__V1__ScopeLogs *scope_logs[1];
- void *body;
- unsigned len;
- int res;
-
- opentelemetry__proto__collector__logs__v1__export_logs_service_request__init(&export_logs);
- opentelemetry__proto__logs__v1__resource_logs__init(&resource_log);
- opentelemetry__proto__logs__v1__scope_logs__init(&scope_log);
-
- scope_log.log_records = logs;
- scope_log.n_log_records = log_count;
- scope_logs[0] = &scope_log;
-
- resource_log.scope_logs = scope_logs;
- resource_log.n_scope_logs = 1;
- resource_logs[0] = &resource_log;
-
- export_logs.resource_logs = resource_logs;
- export_logs.n_resource_logs = 1;
-
- len = opentelemetry__proto__collector__logs__v1__export_logs_service_request__get_packed_size(&export_logs);
- body = flb_calloc(len, sizeof(char));
- if (!body) {
- flb_errno();
- return FLB_ERROR;
- }
-
- opentelemetry__proto__collector__logs__v1__export_logs_service_request__pack(&export_logs, body);
-
- // send post request to opentelemetry with content type application/x-protobuf
- res = http_post(ctx, body, len,
- event_chunk->tag,
- flb_sds_len(event_chunk->tag),
- ctx->logs_uri);
-
- flb_free(body);
-
- return res;
-}
-
-static int process_logs(struct flb_event_chunk *event_chunk,
- struct flb_output_flush *out_flush,
- struct flb_input_instance *ins, void *out_context,
- struct flb_config *config)
-{
- size_t log_record_count;
- Opentelemetry__Proto__Logs__V1__LogRecord **log_record_list;
- Opentelemetry__Proto__Logs__V1__LogRecord *log_records;
- Opentelemetry__Proto__Common__V1__AnyValue *log_object;
- struct flb_log_event_decoder *decoder;
- struct flb_log_event event;
- size_t index;
- struct opentelemetry_context *ctx;
- int res;
-
- ctx = (struct opentelemetry_context *) out_context;
-
- log_record_list = (Opentelemetry__Proto__Logs__V1__LogRecord **) \
- flb_calloc(ctx->batch_size,
- sizeof(Opentelemetry__Proto__Logs__V1__LogRecord *));
-
- if (log_record_list == NULL) {
- flb_errno();
-
- return -1;
- }
-
- log_records = (Opentelemetry__Proto__Logs__V1__LogRecord *)
- flb_calloc(ctx->batch_size,
- sizeof(Opentelemetry__Proto__Logs__V1__LogRecord));
-
- if (log_records == NULL) {
- flb_errno();
-
- flb_free(log_record_list);
-
- return -2;
- }
-
- for(index = 0 ; index < ctx->batch_size ; index++) {
- log_record_list[index] = &log_records[index];
- }
-
- decoder = flb_log_event_decoder_create((char *) event_chunk->data,
- event_chunk->size);
-
- if (decoder == NULL) {
- flb_plg_error(ctx->ins, "could not initialize record decoder");
-
- flb_free(log_record_list);
- flb_free(log_records);
-
- return -1;
- }
-
- log_record_count = 0;
-
- res = FLB_OK;
-
- while (flb_log_event_decoder_next(decoder, &event) == 0 &&
- res == FLB_OK) {
- opentelemetry__proto__logs__v1__log_record__init(&log_records[log_record_count]);
- log_records[log_record_count].attributes = \
- msgpack_map_to_otlp_kvarray(event.metadata,
- &log_records[log_record_count].n_attributes);
-
- log_object = msgpack_object_to_otlp_any_value(event.body);
-
- if (log_object == NULL) {
- flb_plg_error(ctx->ins, "log event conversion failure");
- res = FLB_ERROR;
- continue;
- }
-
-
- log_records[log_record_count].body = log_object;
- log_records[log_record_count].time_unix_nano = flb_time_to_nanosec(&event.timestamp);
-
- log_record_count++;
-
- if (log_record_count >= ctx->batch_size) {
- res = flush_to_otel(ctx,
- event_chunk,
- log_record_list,
- log_record_count);
-
- clear_array(log_record_list, log_record_count);
-
- log_record_count = 0;
- }
- }
-
- flb_log_event_decoder_destroy(decoder);
-
- if (log_record_count > 0 &&
- res == FLB_OK) {
- res = flush_to_otel(ctx,
- event_chunk,
- log_record_list,
- log_record_count);
-
- clear_array(log_record_list, log_record_count);
- }
-
- flb_free(log_record_list);
- flb_free(log_records);
-
- return res;
-}
-
-static int process_metrics(struct flb_event_chunk *event_chunk,
- struct flb_output_flush *out_flush,
- struct flb_input_instance *ins, void *out_context,
- struct flb_config *config)
-{
- int c = 0;
- int ok;
- int ret;
- int result;
- cfl_sds_t encoded_chunk;
- flb_sds_t buf = NULL;
- size_t diff = 0;
- size_t off = 0;
- struct cmt *cmt;
- struct opentelemetry_context *ctx = out_context;
-
- /* Initialize vars */
- ctx = out_context;
- ok = CMT_DECODE_MSGPACK_SUCCESS;
- result = FLB_OK;
-
- /* Buffer to concatenate multiple metrics contexts */
- buf = flb_sds_create_size(event_chunk->size);
- if (!buf) {
- flb_plg_error(ctx->ins, "could not allocate outgoing buffer");
- return FLB_RETRY;
- }
-
- flb_plg_debug(ctx->ins, "cmetrics msgpack size: %lu",
- event_chunk->size);
-
- /* Decode and encode every CMetric context */
- diff = 0;
- while ((ret = cmt_decode_msgpack_create(&cmt,
- (char *) event_chunk->data,
- event_chunk->size, &off)) == ok) {
- /* append labels set by config */
- append_labels(ctx, cmt);
-
- /* Create a OpenTelemetry payload */
- encoded_chunk = cmt_encode_opentelemetry_create(cmt);
- if (encoded_chunk == NULL) {
- flb_plg_error(ctx->ins,
- "Error encoding context as opentelemetry");
- result = FLB_ERROR;
- cmt_destroy(cmt);
- goto exit;
- }
-
- flb_plg_debug(ctx->ins, "cmetric_id=%i decoded %lu-%lu payload_size=%lu",
- c, diff, off, flb_sds_len(encoded_chunk));
- c++;
- diff = off;
-
- /* concat buffer */
- flb_sds_cat_safe(&buf, encoded_chunk, flb_sds_len(encoded_chunk));
-
- /* release */
- cmt_encode_opentelemetry_destroy(encoded_chunk);
- cmt_destroy(cmt);
- }
-
- if (ret == CMT_DECODE_MSGPACK_INSUFFICIENT_DATA && c > 0) {
- flb_plg_debug(ctx->ins, "final payload size: %lu", flb_sds_len(buf));
- if (buf && flb_sds_len(buf) > 0) {
- /* Send HTTP request */
- result = http_post(ctx, buf, flb_sds_len(buf),
- event_chunk->tag,
- flb_sds_len(event_chunk->tag),
- ctx->metrics_uri);
-
- /* Debug http_post() result statuses */
- if (result == FLB_OK) {
- flb_plg_debug(ctx->ins, "http_post result FLB_OK");
- }
- else if (result == FLB_ERROR) {
- flb_plg_debug(ctx->ins, "http_post result FLB_ERROR");
- }
- else if (result == FLB_RETRY) {
- flb_plg_debug(ctx->ins, "http_post result FLB_RETRY");
- }
- }
- flb_sds_destroy(buf);
- buf = NULL;
- return result;
- }
- else {
- flb_plg_error(ctx->ins, "Error decoding msgpack encoded context");
- return FLB_ERROR;
- }
-
-exit:
- if (buf) {
- flb_sds_destroy(buf);
- }
- return result;
-}
-
-static int process_traces(struct flb_event_chunk *event_chunk,
- struct flb_output_flush *out_flush,
- struct flb_input_instance *ins, void *out_context,
- struct flb_config *config)
-{
- int ret;
- int result;
- cfl_sds_t encoded_chunk;
- flb_sds_t buf = NULL;
- size_t off = 0;
- struct ctrace *ctr;
- struct opentelemetry_context *ctx = out_context;
-
- /* Initialize vars */
- ctx = out_context;
- result = FLB_OK;
-
- buf = flb_sds_create_size(event_chunk->size);
- if (!buf) {
- flb_plg_error(ctx->ins, "could not allocate outgoing buffer");
- return FLB_RETRY;
- }
-
- flb_plg_debug(ctx->ins, "ctraces msgpack size: %lu",
- event_chunk->size);
-
- while (ctr_decode_msgpack_create(&ctr,
- (char *) event_chunk->data,
- event_chunk->size, &off) == 0) {
- /* Create a OpenTelemetry payload */
- encoded_chunk = ctr_encode_opentelemetry_create(ctr);
- if (encoded_chunk == NULL) {
- flb_plg_error(ctx->ins,
- "Error encoding context as opentelemetry");
- result = FLB_ERROR;
- ctr_destroy(ctr);
- goto exit;
- }
-
- /* concat buffer */
- ret = flb_sds_cat_safe(&buf, encoded_chunk, flb_sds_len(encoded_chunk));
- if (ret != 0) {
- flb_plg_error(ctx->ins, "Error appending encoded trace to buffer");
- result = FLB_ERROR;
- ctr_encode_opentelemetry_destroy(encoded_chunk);
- ctr_destroy(ctr);
- goto exit;
- }
-
- /* release */
- ctr_encode_opentelemetry_destroy(encoded_chunk);
- ctr_destroy(ctr);
- }
-
- flb_plg_debug(ctx->ins, "final payload size: %lu", flb_sds_len(buf));
- if (buf && flb_sds_len(buf) > 0) {
- /* Send HTTP request */
- result = http_post(ctx, buf, flb_sds_len(buf),
- event_chunk->tag,
- flb_sds_len(event_chunk->tag),
- ctx->traces_uri);
-
- /* Debug http_post() result statuses */
- if (result == FLB_OK) {
- flb_plg_debug(ctx->ins, "http_post result FLB_OK");
- }
- else if (result == FLB_ERROR) {
- flb_plg_debug(ctx->ins, "http_post result FLB_ERROR");
- }
- else if (result == FLB_RETRY) {
- flb_plg_debug(ctx->ins, "http_post result FLB_RETRY");
- }
- }
-
-exit:
- if (buf) {
- flb_sds_destroy(buf);
- }
- return result;
-}
-
-static int cb_opentelemetry_exit(void *data, struct flb_config *config)
-{
- struct opentelemetry_context *ctx;
-
- ctx = (struct opentelemetry_context *) data;
-
- flb_opentelemetry_context_destroy(ctx);
-
- return 0;
-}
-
-static int cb_opentelemetry_init(struct flb_output_instance *ins,
- struct flb_config *config,
- void *data)
-{
- struct opentelemetry_context *ctx;
-
- ctx = flb_opentelemetry_context_create(ins, config);
- if (!ctx) {
- return -1;
- }
-
- if (ctx->batch_size <= 0){
- ctx->batch_size = atoi(DEFAULT_LOG_RECORD_BATCH_SIZE);
- }
-
- flb_output_set_context(ins, ctx);
-
- return 0;
-}
-
-static void cb_opentelemetry_flush(struct flb_event_chunk *event_chunk,
- struct flb_output_flush *out_flush,
- struct flb_input_instance *ins, void *out_context,
- struct flb_config *config)
-{
- int result = FLB_RETRY;
-
- if (event_chunk->type == FLB_INPUT_METRICS){
- result = process_metrics(event_chunk, out_flush, ins, out_context, config);
- }
- else if (event_chunk->type == FLB_INPUT_LOGS){
- result = process_logs(event_chunk, out_flush, ins, out_context, config);
- }
- else if (event_chunk->type == FLB_INPUT_TRACES){
- result = process_traces(event_chunk, out_flush, ins, out_context, config);
- }
- FLB_OUTPUT_RETURN(result);
-}
-
-/* Configuration properties map */
-static struct flb_config_map config_map[] = {
- {
- FLB_CONFIG_MAP_SLIST_1, "add_label", NULL,
- FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct opentelemetry_context,
- add_labels),
- "Adds a custom label to the metrics use format: 'add_label name value'"
- },
-
- {
- FLB_CONFIG_MAP_STR, "proxy", NULL,
- 0, FLB_FALSE, 0,
- "Specify an HTTP Proxy. The expected format of this value is http://host:port. "
- },
- {
- FLB_CONFIG_MAP_STR, "http_user", NULL,
- 0, FLB_TRUE, offsetof(struct opentelemetry_context, http_user),
- "Set HTTP auth user"
- },
- {
- FLB_CONFIG_MAP_STR, "http_passwd", "",
- 0, FLB_TRUE, offsetof(struct opentelemetry_context, http_passwd),
- "Set HTTP auth password"
- },
- {
- FLB_CONFIG_MAP_SLIST_1, "header", NULL,
- FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct opentelemetry_context, headers),
- "Add a HTTP header key/value pair. Multiple headers can be set"
- },
- {
- FLB_CONFIG_MAP_STR, "metrics_uri", "/v1/metrics",
- 0, FLB_TRUE, offsetof(struct opentelemetry_context, metrics_uri),
- "Specify an optional HTTP URI for the target OTel endpoint."
- },
- {
- FLB_CONFIG_MAP_STR, "logs_uri", "/v1/logs",
- 0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_uri),
- "Specify an optional HTTP URI for the target OTel endpoint."
- },
- {
- FLB_CONFIG_MAP_STR, "traces_uri", "/v1/traces",
- 0, FLB_TRUE, offsetof(struct opentelemetry_context, traces_uri),
- "Specify an optional HTTP URI for the target OTel endpoint."
- },
- {
- FLB_CONFIG_MAP_BOOL, "log_response_payload", "true",
- 0, FLB_TRUE, offsetof(struct opentelemetry_context, log_response_payload),
- "Specify if the response paylod should be logged or not"
- },
- {
- FLB_CONFIG_MAP_INT, "batch_size", DEFAULT_LOG_RECORD_BATCH_SIZE,
- 0, FLB_TRUE, offsetof(struct opentelemetry_context, batch_size),
- "Set the maximum number of log records to be flushed at a time"
- },
- {
- FLB_CONFIG_MAP_STR, "compress", NULL,
- 0, FLB_FALSE, 0,
- "Set payload compression mechanism. Option available is 'gzip'"
- },
- /* EOF */
- {0}
-};
-
-/* Plugin reference */
-struct flb_output_plugin out_opentelemetry_plugin = {
- .name = "opentelemetry",
- .description = "OpenTelemetry",
- .cb_init = cb_opentelemetry_init,
- .cb_flush = cb_opentelemetry_flush,
- .cb_exit = cb_opentelemetry_exit,
- .config_map = config_map,
- .event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS | FLB_OUTPUT_TRACES,
- .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS,
-};
diff --git a/src/fluent-bit/plugins/out_opentelemetry/opentelemetry.h b/src/fluent-bit/plugins/out_opentelemetry/opentelemetry.h
deleted file mode 100644
index 94e424ac7..000000000
--- a/src/fluent-bit/plugins/out_opentelemetry/opentelemetry.h
+++ /dev/null
@@ -1,80 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef FLB_OUT_OPENTELEMETRY_H
-#define FLB_OUT_OPENTELEMETRY_H
-
-#include <fluent-bit/flb_output_plugin.h>
-
-#define FLB_OPENTELEMETRY_CONTENT_TYPE_HEADER_NAME "Content-Type"
-#define FLB_OPENTELEMETRY_MIME_PROTOBUF_LITERAL "application/x-protobuf"
-
-/*
- * This lets you send log records in batches instead of a request per log record
- * It might be removed in furthur versions since if we have a large number of
- * log records, and a later batch fails, Fluent Bit will retry ALL the batches,
- * including the ones that succeeded. This is not ideal.
- */
-#define DEFAULT_LOG_RECORD_BATCH_SIZE "1000"
-
-/* Plugin context */
-struct opentelemetry_context {
- /* HTTP Auth */
- char *http_user;
- char *http_passwd;
-
- /* Proxy */
- const char *proxy;
- char *proxy_host;
- int proxy_port;
-
- /* HTTP URI */
- char *traces_uri;
- char *metrics_uri;
- char *logs_uri;
- char *host;
- int port;
-
- /* Number of logs to flush at a time */
- int batch_size;
-
- /* Log the response paylod */
- int log_response_payload;
-
- /* config reader for 'add_label' */
- struct mk_list *add_labels;
-
- /* internal labels ready to append */
- struct mk_list kv_labels;
-
- /* Upstream connection to the backend server */
- struct flb_upstream *u;
-
- /* Arbitrary HTTP headers */
- struct mk_list *headers;
-
-
- /* instance context */
- struct flb_output_instance *ins;
-
- /* Compression mode (gzip) */
- int compress_gzip;
-};
-
-#endif
diff --git a/src/fluent-bit/plugins/out_opentelemetry/opentelemetry_conf.c b/src/fluent-bit/plugins/out_opentelemetry/opentelemetry_conf.c
deleted file mode 100644
index 5c9c8f82c..000000000
--- a/src/fluent-bit/plugins/out_opentelemetry/opentelemetry_conf.c
+++ /dev/null
@@ -1,262 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <fluent-bit/flb_output_plugin.h>
-#include <fluent-bit/flb_utils.h>
-#include <fluent-bit/flb_pack.h>
-#include <fluent-bit/flb_sds.h>
-#include <fluent-bit/flb_kv.h>
-
-#include "opentelemetry.h"
-#include "opentelemetry_conf.h"
-
-static int config_add_labels(struct flb_output_instance *ins,
- struct opentelemetry_context *ctx)
-{
- struct mk_list *head;
- struct flb_config_map_val *mv;
- struct flb_slist_entry *k = NULL;
- struct flb_slist_entry *v = NULL;
- struct flb_kv *kv;
-
- if (!ctx->add_labels || mk_list_size(ctx->add_labels) == 0) {
- return 0;
- }
-
- /* iterate all 'add_label' definitions */
- flb_config_map_foreach(head, mv, ctx->add_labels) {
- if (mk_list_size(mv->val.list) != 2) {
- flb_plg_error(ins, "'add_label' expects a key and a value, "
- "e.g: 'add_label version 1.8.0'");
- return -1;
- }
-
- k = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head);
- v = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head);
-
- kv = flb_kv_item_create(&ctx->kv_labels, k->str, v->str);
- if (!kv) {
- flb_plg_error(ins, "could not append label %s=%s\n", k->str, v->str);
- return -1;
- }
- }
-
- return 0;
-}
-
-/*
-* Check if a Proxy have been set, if so the Upstream manager will use
-* the Proxy end-point and then we let the HTTP client know about it, so
-* it can adjust the HTTP requests.
-*/
-
-static void check_proxy(struct flb_output_instance *ins,
- struct opentelemetry_context *ctx,
- char *host, char *port,
- char *protocol, char *uri){
-
- const char *tmp = NULL;
- int ret;
- tmp = flb_output_get_property("proxy", ins);
- if (tmp) {
- ret = flb_utils_url_split(tmp, &protocol, &host, &port, &uri);
- if (ret == -1) {
- flb_plg_error(ctx->ins, "could not parse proxy parameter: '%s'", tmp);
- flb_free(ctx);
- }
-
- ctx->proxy_host = host;
- ctx->proxy_port = atoi(port);
- ctx->proxy = tmp;
- flb_free(protocol);
- flb_free(port);
- flb_free(uri);
- uri = NULL;
- }
- else {
- flb_output_net_default("127.0.0.1", 80, ins);
- }
-}
-
-static char *sanitize_uri(char *uri){
- char *new_uri;
- int uri_len;
-
- if (uri == NULL) {
- uri = flb_strdup("/");
- }
- else if (uri[0] != '/') {
- uri_len = strlen(uri);
- new_uri = flb_calloc(uri_len + 2, sizeof(char));
-
- if (new_uri != NULL) {
- new_uri[0] = '/';
-
- strncat(new_uri, uri, uri_len + 1);
- }
-
- uri = new_uri;
- }
-
- /* This function could return NULL if flb_calloc fails */
-
- return uri;
-}
-
-struct opentelemetry_context *flb_opentelemetry_context_create(
- struct flb_output_instance *ins, struct flb_config *config)
-{
- int ret;
- int io_flags = 0;
- char *protocol = NULL;
- char *host = NULL;
- char *port = NULL;
- char *metrics_uri = NULL;
- char *traces_uri = NULL;
- char *logs_uri = NULL;
- struct flb_upstream *upstream;
- struct opentelemetry_context *ctx = NULL;
- const char *tmp = NULL;
-
- /* Allocate plugin context */
- ctx = flb_calloc(1, sizeof(struct opentelemetry_context));
- if (!ctx) {
- flb_errno();
- return NULL;
- }
- ctx->ins = ins;
- mk_list_init(&ctx->kv_labels);
-
- ret = flb_output_config_map_set(ins, (void *) ctx);
- if (ret == -1) {
- flb_free(ctx);
- return NULL;
- }
-
- /* Parse 'add_label' */
- ret = config_add_labels(ins, ctx);
- if (ret == -1) {
- return NULL;
- }
-
- check_proxy(ins, ctx, host, port, protocol, metrics_uri);
- check_proxy(ins, ctx, host, port, protocol, logs_uri);
-
- /* Check if SSL/TLS is enabled */
-#ifdef FLB_HAVE_TLS
- if (ins->use_tls == FLB_TRUE) {
- io_flags = FLB_IO_TLS;
- }
- else {
- io_flags = FLB_IO_TCP;
- }
-#else
- io_flags = FLB_IO_TCP;
-#endif
-
- if (ins->host.ipv6 == FLB_TRUE) {
- io_flags |= FLB_IO_IPV6;
- }
-
- if (ctx->proxy) {
- flb_plg_trace(ctx->ins, "Upstream Proxy=%s:%i",
- ctx->proxy_host, ctx->proxy_port);
- upstream = flb_upstream_create(config,
- ctx->proxy_host,
- ctx->proxy_port,
- io_flags, ins->tls);
- }
- else {
- upstream = flb_upstream_create(config,
- ins->host.name,
- ins->host.port,
- io_flags, ins->tls);
- }
-
- if (!upstream) {
- flb_free(ctx);
- return NULL;
- }
-
- logs_uri = sanitize_uri(ctx->logs_uri);
- traces_uri = sanitize_uri(ctx->traces_uri);
- metrics_uri = sanitize_uri(ctx->metrics_uri);
-
- ctx->u = upstream;
- ctx->host = ins->host.name;
- ctx->port = ins->host.port;
-
- if (logs_uri == NULL) {
- flb_plg_trace(ctx->ins,
- "Could not allocate memory for sanitized "
- "log endpoint uri");
- }
- else {
- ctx->logs_uri = logs_uri;
- }
-
- if (traces_uri == NULL) {
- flb_plg_trace(ctx->ins,
- "Could not allocate memory for sanitized "
- "trace endpoint uri");
- }
- else {
- ctx->traces_uri = traces_uri;
- }
-
- if (metrics_uri == NULL) {
- flb_plg_trace(ctx->ins,
- "Could not allocate memory for sanitized "
- "metric endpoint uri");
- }
- else {
- ctx->metrics_uri = metrics_uri;
- }
-
-
- /* Set instance flags into upstream */
- flb_output_upstream_set(ctx->u, ins);
-
- tmp = flb_output_get_property("compress", ins);
- ctx->compress_gzip = FLB_FALSE;
- if (tmp) {
- if (strcasecmp(tmp, "gzip") == 0) {
- ctx->compress_gzip = FLB_TRUE;
- }
- }
-
- return ctx;
-}
-
-void flb_opentelemetry_context_destroy(
- struct opentelemetry_context *ctx)
-{
- if (!ctx) {
- return;
- }
-
- flb_kv_release(&ctx->kv_labels);
-
- if (ctx->u) {
- flb_upstream_destroy(ctx->u);
- }
-
- flb_free(ctx->proxy_host);
- flb_free(ctx);
-}
diff --git a/src/fluent-bit/plugins/out_opentelemetry/opentelemetry_conf.h b/src/fluent-bit/plugins/out_opentelemetry/opentelemetry_conf.h
deleted file mode 100644
index 974f7fea5..000000000
--- a/src/fluent-bit/plugins/out_opentelemetry/opentelemetry_conf.h
+++ /dev/null
@@ -1,33 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef FLB_OUT_OPENTELEMETRY_CONF_H
-#define FLB_OUT_OPENTELEMETRY_CONF_H
-
-#include <fluent-bit/flb_info.h>
-#include <fluent-bit/flb_output.h>
-
-#include "opentelemetry.h"
-
-struct opentelemetry_context *flb_opentelemetry_context_create(
- struct flb_output_instance *ins, struct flb_config *config);
-void flb_opentelemetry_context_destroy(
- struct opentelemetry_context *ctx);
-
-#endif