From 5da14042f70711ea5cf66e034699730335462f66 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 5 May 2024 14:08:03 +0200 Subject: Merging upstream version 1.45.3+dfsg. Signed-off-by: Daniel Baumann --- .../out_azure_logs_ingestion/CMakeLists.txt | 6 + .../azure_logs_ingestion.c | 445 +++++++++++++++++++++ .../azure_logs_ingestion.h | 74 ++++ .../azure_logs_ingestion_conf.c | 172 ++++++++ .../azure_logs_ingestion_conf.h | 29 ++ 5 files changed, 726 insertions(+) create mode 100644 src/fluent-bit/plugins/out_azure_logs_ingestion/CMakeLists.txt create mode 100644 src/fluent-bit/plugins/out_azure_logs_ingestion/azure_logs_ingestion.c create mode 100644 src/fluent-bit/plugins/out_azure_logs_ingestion/azure_logs_ingestion.h create mode 100644 src/fluent-bit/plugins/out_azure_logs_ingestion/azure_logs_ingestion_conf.c create mode 100644 src/fluent-bit/plugins/out_azure_logs_ingestion/azure_logs_ingestion_conf.h (limited to 'src/fluent-bit/plugins/out_azure_logs_ingestion') diff --git a/src/fluent-bit/plugins/out_azure_logs_ingestion/CMakeLists.txt b/src/fluent-bit/plugins/out_azure_logs_ingestion/CMakeLists.txt new file mode 100644 index 000000000..b51308c70 --- /dev/null +++ b/src/fluent-bit/plugins/out_azure_logs_ingestion/CMakeLists.txt @@ -0,0 +1,6 @@ +set(src + azure_logs_ingestion.c + azure_logs_ingestion_conf.c + ) + +FLB_PLUGIN(out_azure_logs_ingestion "${src}" "") diff --git a/src/fluent-bit/plugins/out_azure_logs_ingestion/azure_logs_ingestion.c b/src/fluent-bit/plugins/out_azure_logs_ingestion/azure_logs_ingestion.c new file mode 100644 index 000000000..9b839ef7e --- /dev/null +++ b/src/fluent-bit/plugins/out_azure_logs_ingestion/azure_logs_ingestion.c @@ -0,0 +1,445 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "azure_logs_ingestion.h" +#include "azure_logs_ingestion_conf.h" + +static int cb_azure_logs_ingestion_init(struct flb_output_instance *ins, + struct flb_config *config, void *data) +{ + struct flb_az_li *ctx; + (void) config; + (void) ins; + (void) data; + + /* Allocate and initialize a context from configuration */ + ctx = flb_az_li_ctx_create(ins, config); + if (!ctx) { + flb_plg_error(ins, "configuration failed"); + return -1; + } + + return 0; +} + +/* A duplicate function copied from the azure log analytics plugin. + allocates sds string */ +static int az_li_format(const void *in_buf, size_t in_bytes, + char **out_buf, size_t *out_size, + struct flb_az_li *ctx) +{ + int i; + int array_size = 0; + int map_size; + size_t off = 0; + double t; + struct flb_time tm; + msgpack_unpacked result; + msgpack_object root; + msgpack_object *obj; + msgpack_object map; + msgpack_object k; + msgpack_object v; + msgpack_sbuffer mp_sbuf; + msgpack_packer mp_pck; + msgpack_sbuffer tmp_sbuf; + msgpack_packer tmp_pck; + flb_sds_t record; + char time_formatted[32]; + size_t s; + struct tm tms; + int len; + + /* Count number of items */ + array_size = flb_mp_count(in_buf, in_bytes); + msgpack_unpacked_init(&result); + + /* Create temporary msgpack buffer */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + msgpack_pack_array(&mp_pck, array_size); + + off = 0; + while (msgpack_unpack_next(&result, in_buf, in_bytes, &off) == MSGPACK_UNPACK_SUCCESS) { + root = result.data; + + /* Get timestamp */ + flb_time_pop_from_msgpack(&tm, &result, &obj); + + /* Create temporary msgpack buffer */ + msgpack_sbuffer_init(&tmp_sbuf); + msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write); + + map = root.via.array.ptr[1]; + map_size = map.via.map.size; + + msgpack_pack_map(&mp_pck, map_size + 1); + + /* Append the time key */ + msgpack_pack_str(&mp_pck, flb_sds_len(ctx->time_key)); + msgpack_pack_str_body(&mp_pck, + ctx->time_key, + flb_sds_len(ctx->time_key)); + + if (ctx->time_generated == FLB_TRUE) { + /* Append the time value as ISO 8601 */ + gmtime_r(&tm.tm.tv_sec, &tms); + s = strftime(time_formatted, sizeof(time_formatted) - 1, + FLB_PACK_JSON_DATE_ISO8601_FMT, &tms); + + len = snprintf(time_formatted + s, + sizeof(time_formatted) - 1 - s, + ".%03" PRIu64 "Z", + (uint64_t) tm.tm.tv_nsec / 1000000); + s += len; + msgpack_pack_str(&mp_pck, s); + msgpack_pack_str_body(&mp_pck, time_formatted, s); + } + else { + /* Append the time value as millis.nanos */ + t = flb_time_to_double(&tm); + msgpack_pack_double(&mp_pck, t); + } + + /* Append original map k/v */ + for (i = 0; i < map_size; i++) { + k = map.via.map.ptr[i].key; + v = map.via.map.ptr[i].val; + + msgpack_pack_object(&tmp_pck, k); + msgpack_pack_object(&tmp_pck, v); + } + msgpack_sbuffer_write(&mp_sbuf, tmp_sbuf.data, tmp_sbuf.size); + msgpack_sbuffer_destroy(&tmp_sbuf); + } + + record = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size); + if (!record) { + flb_errno(); + msgpack_sbuffer_destroy(&mp_sbuf); + msgpack_unpacked_destroy(&result); + return -1; + } + + msgpack_sbuffer_destroy(&mp_sbuf); + msgpack_unpacked_destroy(&result); + + *out_buf = record; + *out_size = flb_sds_len(record); + + return 0; +} + +/* Gets OAuth token; (allocates sds string everytime, must deallocate) */ +flb_sds_t get_az_li_token(struct flb_az_li *ctx) +{ + int ret = 0; + char* token; + size_t token_len; + flb_sds_t token_return = NULL; + + if (pthread_mutex_lock(&ctx->token_mutex)) { + flb_plg_error(ctx->ins, "error locking mutex"); + return NULL; + } + /* Retrieve access token only if expired */ + if (flb_oauth2_token_expired(ctx->u_auth) == FLB_TRUE) { + flb_plg_debug(ctx->ins, "token expired. getting new token"); + /* Clear any previous oauth2 payload content */ + flb_oauth2_payload_clear(ctx->u_auth); + + ret = flb_oauth2_payload_append(ctx->u_auth, "grant_type", 10, + "client_credentials", 18); + if (ret == -1) { + flb_plg_error(ctx->ins, "error appending oauth2 params"); + goto token_cleanup; + } + + ret = flb_oauth2_payload_append(ctx->u_auth, "scope", 5, FLB_AZ_LI_AUTH_SCOPE, + sizeof(FLB_AZ_LI_AUTH_SCOPE) - 1); + if (ret == -1) { + flb_plg_error(ctx->ins, "error appending oauth2 params"); + goto token_cleanup; + } + + ret = flb_oauth2_payload_append(ctx->u_auth, "client_id", 9, + ctx->client_id, -1); + if (ret == -1) { + flb_plg_error(ctx->ins, "error appending oauth2 params"); + goto token_cleanup; + } + + ret = flb_oauth2_payload_append(ctx->u_auth, "client_secret", 13, + ctx->client_secret, -1); + if (ret == -1) { + flb_plg_error(ctx->ins, "error appending oauth2 params"); + goto token_cleanup; + } + + token = flb_oauth2_token_get(ctx->u_auth); + + /* Copy string to prevent race conditions */ + if (!token) { + flb_plg_error(ctx->ins, "error retrieving oauth2 access token"); + goto token_cleanup; + } + flb_plg_debug(ctx->ins, "got azure token"); + } + + /* Reached this code-block means, got new token or token not expired */ + /* Either way we copy the token to a new string */ + token_len = flb_sds_len(ctx->u_auth->token_type) + 2 + + flb_sds_len(ctx->u_auth->access_token); + flb_plg_debug(ctx->ins, "create token header string"); + /* Now create */ + token_return = flb_sds_create_size(token_len); + if (!token_return) { + flb_plg_error(ctx->ins, "error creating token buffer"); + goto token_cleanup; + } + flb_sds_snprintf(&token_return, flb_sds_alloc(token_return), "%s %s", + ctx->u_auth->token_type, ctx->u_auth->access_token); + +token_cleanup: + if (pthread_mutex_unlock(&ctx->token_mutex)) { + flb_plg_error(ctx->ins, "error unlocking mutex"); + return NULL; + } + + return token_return; +} + +static void cb_azure_logs_ingestion_flush(struct flb_event_chunk *event_chunk, + struct flb_output_flush *out_flush, + struct flb_input_instance *i_ins, + void *out_context, + struct flb_config *config) +{ + int ret; + int flush_status; + size_t b_sent; + size_t json_payload_size; + void* final_payload; + size_t final_payload_size; + flb_sds_t token; + struct flb_connection *u_conn; + struct flb_http_client *c = NULL; + int is_compressed = FLB_FALSE; + flb_sds_t json_payload = NULL; + struct flb_az_li *ctx = out_context; + (void) i_ins; + (void) config; + + /* Get upstream connection */ + u_conn = flb_upstream_conn_get(ctx->u_dce); + if (!u_conn) { + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + /* Convert binary logs into a JSON payload */ + ret = az_li_format(event_chunk->data, event_chunk->size, + &json_payload, &json_payload_size, ctx); + if (ret == -1) { + flb_upstream_conn_release(u_conn); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + + /* Get OAuth2 token */ + token = get_az_li_token(ctx); + if (!token) { + flush_status = FLB_RETRY; + goto cleanup; + } + + /* Map buffer */ + final_payload = json_payload; + final_payload_size = json_payload_size; + if (ctx->compress_enabled == FLB_TRUE) { + ret = flb_gzip_compress((void *) json_payload, json_payload_size, + &final_payload, &final_payload_size); + if (ret == -1) { + flb_plg_error(ctx->ins, + "cannot gzip payload, disabling compression"); + } + else { + is_compressed = FLB_TRUE; + flb_plg_debug(ctx->ins, "enabled payload gzip compression"); + /* JSON buffer will be cleared at cleanup: */ + } + } + + /* Compose HTTP Client request */ + c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->dce_u_url, + final_payload, final_payload_size, NULL, 0, NULL, 0); + + if (!c) { + flb_plg_warn(ctx->ins, "retrying payload bytes=%lu", final_payload_size); + flush_status = FLB_RETRY; + goto cleanup; + } + + /* Append headers */ + flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); + flb_http_add_header(c, "Content-Type", 12, "application/json", 16); + if (is_compressed) { + flb_http_add_header(c, "Content-Encoding", 16, "gzip", 4); + } + flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token)); + flb_http_buffer_size(c, FLB_HTTP_DATA_SIZE_MAX); + + /* Execute rest call */ + ret = flb_http_do(c, &b_sent); + if (ret != 0) { + flb_plg_warn(ctx->ins, "http_do=%i", ret); + flush_status = FLB_RETRY; + goto cleanup; + } + else { + if (c->resp.status >= 200 && c->resp.status <= 299) { + flb_plg_info(ctx->ins, "http_status=%i, dcr_id=%s, table=%s", + c->resp.status, ctx->dcr_id, ctx->table_name); + flush_status = FLB_OK; + goto cleanup; + } + else { + if (c->resp.payload_size > 0) { + flb_plg_warn(ctx->ins, "http_status=%i:\n%s", + c->resp.status, c->resp.payload); + } + else { + flb_plg_warn(ctx->ins, "http_status=%i", c->resp.status); + } + flb_plg_debug(ctx->ins, "retrying payload bytes=%lu", final_payload_size); + flush_status = FLB_RETRY; + goto cleanup; + } + } + +cleanup: + /* cleanup */ + if (json_payload) { + flb_sds_destroy(json_payload); + } + + /* release compressed payload */ + if (is_compressed == FLB_TRUE) { + flb_free(final_payload); + } + + if (c) { + flb_http_client_destroy(c); + } + if (u_conn) { + flb_upstream_conn_release(u_conn); + } + + /* destory token at last after HTTP call has finished */ + if (token) { + flb_sds_destroy(token); + } + FLB_OUTPUT_RETURN(flush_status); +} + +static int cb_azure_logs_ingestion_exit(void *data, struct flb_config *config) +{ + struct flb_az_li *ctx = data; + flb_plg_debug(ctx->ins, "exiting logs ingestion plugin"); + flb_az_li_ctx_destroy(ctx); + return 0; +} + +/* Configuration properties map */ +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_STR, "tenant_id", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_az_li, tenant_id), + "Set the tenant ID of the AAD application" + }, + { + FLB_CONFIG_MAP_STR, "client_id", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_az_li, client_id), + "Set the client/app ID of the AAD application" + }, + { + FLB_CONFIG_MAP_STR, "client_secret", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_az_li, client_secret), + "Set the client secret of the AAD application" + }, + { + FLB_CONFIG_MAP_STR, "dce_url", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_az_li, dce_url), + "Data Collection Endpoint(DCE) URI (e.g. " + "https://la-endpoint-q12a.eastus-1.ingest.monitor.azure.com)" + }, + { + FLB_CONFIG_MAP_STR, "dcr_id", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_az_li, dcr_id), + "Data Collection Rule (DCR) immutable ID" + }, + { + FLB_CONFIG_MAP_STR, "table_name", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_az_li, table_name), + "The name of the custom log table, including '_CL' suffix" + }, + /* optional params */ + { + FLB_CONFIG_MAP_STR, "time_key", FLB_AZ_LI_TIME_KEY, + 0, FLB_TRUE, offsetof(struct flb_az_li, time_key), + "[Optional] Specify the key name where the timestamp will be stored." + }, + { + FLB_CONFIG_MAP_BOOL, "time_generated", "false", + 0, FLB_TRUE, offsetof(struct flb_az_li, time_generated), + "If enabled, will generate a timestamp and append it to JSON. " + "The key name is set by the 'time_key' parameter" + }, + { + FLB_CONFIG_MAP_BOOL, "compress", "false", + 0, FLB_TRUE, offsetof(struct flb_az_li, compress_enabled), + "Enable HTTP payload compression (gzip)." + }, + /* EOF */ + {0} +}; + +struct flb_output_plugin out_azure_logs_ingestion_plugin = { + .name = "azure_logs_ingestion", + .description = "Send logs to Log Analytics with Log Ingestion API", + .cb_init = cb_azure_logs_ingestion_init, + .cb_flush = cb_azure_logs_ingestion_flush, + .cb_exit = cb_azure_logs_ingestion_exit, + + /* Configuration */ + .config_map = config_map, + + /* Plugin flags */ + .flags = FLB_OUTPUT_NET | FLB_IO_TLS, +}; diff --git a/src/fluent-bit/plugins/out_azure_logs_ingestion/azure_logs_ingestion.h b/src/fluent-bit/plugins/out_azure_logs_ingestion/azure_logs_ingestion.h new file mode 100644 index 000000000..15b2420b8 --- /dev/null +++ b/src/fluent-bit/plugins/out_azure_logs_ingestion/azure_logs_ingestion.h @@ -0,0 +1,74 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_AZURE_LOGS_INGESTION +#define FLB_OUT_AZURE_LOGS_INGESTION + +#define FLB_AZ_LI_API_VERSION "api-version=2021-11-01-preview" +#define FLB_AZ_LI_TIME_KEY "@timestamp" +#define FLB_AZ_LI_AUTH_SCOPE "https://monitor.azure.com/.default" +/* auth url needs tenant_id */ +#define FLB_AZ_LI_AUTH_URL_TMPLT "https://login.microsoftonline.com/"\ + "%s/oauth2/v2.0/token" +/* DCE Full URL needs: dce_url, dcr_id, Log Analytics custom table name */ +#define FLB_AZ_LI_DCE_URL_TMPLT "%s/dataCollectionRules/%s/streams/"\ + "Custom-%s?"FLB_AZ_LI_API_VERSION +/* TLS Modes for upstream connection = FLB_IO_TLS or FLB_IO_OPT_TLS*/ +#define FLB_AZ_LI_TLS_MODE FLB_IO_TLS +/* refresh token every 60 minutes */ +#define FLB_AZ_LI_TOKEN_TIMEOUT 3600 + +#include +#include +#include + +/* Context structure for Azure Logs Ingestion API */ +struct flb_az_li { + /* log ingestion account setup */ + flb_sds_t tenant_id; + flb_sds_t client_id; + flb_sds_t client_secret; + flb_sds_t dce_url; + flb_sds_t dcr_id; + flb_sds_t table_name; + + /* time_generated: on/off */ + int time_generated; + /* time key name */ + flb_sds_t time_key; + + /* compress payload */ + int compress_enabled; + + /* mangement auth */ + flb_sds_t auth_url; + struct flb_oauth2 *u_auth; + /* mutex for acquiring tokens */ + pthread_mutex_t token_mutex; + + /* upstream connection to the data collection endpoint */ + struct flb_upstream *u_dce; + flb_sds_t dce_u_url; + + /* plugin output and config instance reference */ + struct flb_output_instance *ins; + struct flb_config *config; +}; + +#endif diff --git a/src/fluent-bit/plugins/out_azure_logs_ingestion/azure_logs_ingestion_conf.c b/src/fluent-bit/plugins/out_azure_logs_ingestion/azure_logs_ingestion_conf.c new file mode 100644 index 000000000..344a7f5ff --- /dev/null +++ b/src/fluent-bit/plugins/out_azure_logs_ingestion/azure_logs_ingestion_conf.c @@ -0,0 +1,172 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "azure_logs_ingestion.h" +#include "azure_logs_ingestion_conf.h" + +struct flb_az_li* flb_az_li_ctx_create(struct flb_output_instance *ins, + struct flb_config *config) +{ + int ret; + struct flb_az_li *ctx; + (void) ins; + (void) config; + + /* Allocate a new context object for this output instance */ + ctx = flb_calloc(1, sizeof(struct flb_az_li)); + if (!ctx) { + flb_errno(); + return NULL; + } + + /* Set the conext in output_instance so that we can retrieve it later */ + ctx->ins = ins; + ctx->config = config; + /* Set context */ + flb_output_set_context(ins, ctx); + + /* Load config map */ + ret = flb_output_config_map_set(ins, (void *) ctx); + if (ret == -1) { + flb_plg_error(ins, "unable to load configuration"); + return NULL; + } + + /* config: 'client_id' */ + if (!ctx->client_id) { + flb_plg_error(ins, "property 'client_id' is not defined"); + flb_az_li_ctx_destroy(ctx); + return NULL; + } + /* config: 'tenant_id' */ + if (!ctx->tenant_id) { + flb_plg_error(ins, "property 'tenant_id' is not defined"); + flb_az_li_ctx_destroy(ctx); + return NULL; + } + /* config: 'client_secret' */ + if (!ctx->client_secret) { + flb_plg_error(ins, "property 'client_secret' is not defined"); + flb_az_li_ctx_destroy(ctx); + return NULL; + } + /* config: 'dce_url' */ + if (!ctx->dce_url) { + flb_plg_error(ins, "property 'dce_url' is not defined"); + flb_az_li_ctx_destroy(ctx); + return NULL; + } + /* config: 'dcr_id' */ + if (!ctx->dcr_id) { + flb_plg_error(ins, "property 'dcr_id' is not defined"); + flb_az_li_ctx_destroy(ctx); + return NULL; + } + /* config: 'table_name' */ + if (!ctx->table_name) { + flb_plg_error(ins, "property 'table_name' is not defined"); + flb_az_li_ctx_destroy(ctx); + return NULL; + } + + /* Allocate and set auth url */ + ctx->auth_url = flb_sds_create_size(sizeof(FLB_AZ_LI_AUTH_URL_TMPLT) - 1 + + flb_sds_len(ctx->tenant_id)); + if (!ctx->auth_url) { + flb_errno(); + flb_az_li_ctx_destroy(ctx); + return NULL; + } + flb_sds_snprintf(&ctx->auth_url, flb_sds_alloc(ctx->auth_url), + FLB_AZ_LI_AUTH_URL_TMPLT, ctx->tenant_id); + + /* Allocate and set dce full url */ + ctx->dce_u_url = flb_sds_create_size(sizeof(FLB_AZ_LI_DCE_URL_TMPLT) - 1 + + flb_sds_len(ctx->dce_url) + + flb_sds_len(ctx->dcr_id) + + flb_sds_len(ctx->table_name)); + if (!ctx->dce_u_url) { + flb_errno(); + flb_az_li_ctx_destroy(ctx); + return NULL; + } + flb_sds_snprintf(&ctx->dce_u_url, flb_sds_alloc(ctx->dce_u_url), + FLB_AZ_LI_DCE_URL_TMPLT, ctx->dce_url, + ctx->dcr_id, ctx->table_name); + + /* Initialize the auth mutex */ + pthread_mutex_init(&ctx->token_mutex, NULL); + + /* Create oauth2 context */ + ctx->u_auth = flb_oauth2_create(config, ctx->auth_url, + FLB_AZ_LI_TOKEN_TIMEOUT); + if (!ctx->u_auth) { + flb_plg_error(ins, "cannot create oauth2 context"); + flb_az_li_ctx_destroy(ctx); + return NULL; + } + + /* Create upstream context for Log Ingsetion endpoint */ + ctx->u_dce = flb_upstream_create_url(config, ctx->dce_url, + FLB_AZ_LI_TLS_MODE, ins->tls); + if (!ctx->u_dce) { + flb_plg_error(ins, "upstream creation failed"); + flb_az_li_ctx_destroy(ctx); + return NULL; + } + flb_output_upstream_set(ctx->u_dce, ins); + + flb_plg_info(ins, "dce_url='%s', dcr='%s', table='%s', stream='Custom-%s'", + ctx->dce_url, ctx->dcr_id, ctx->table_name, ctx->table_name); + + return ctx; +} + +/* Free the context and created memory */ +int flb_az_li_ctx_destroy(struct flb_az_li *ctx) +{ + if (!ctx) { + return -1; + } + + if (ctx->auth_url) { + flb_sds_destroy(ctx->auth_url); + } + + if (ctx->dce_u_url) { + flb_sds_destroy(ctx->dce_u_url); + } + + if (ctx->u_auth) { + flb_oauth2_destroy(ctx->u_auth); + } + + if (ctx->u_dce) { + flb_upstream_destroy(ctx->u_dce); + } + flb_free(ctx); + + return 0; +} diff --git a/src/fluent-bit/plugins/out_azure_logs_ingestion/azure_logs_ingestion_conf.h b/src/fluent-bit/plugins/out_azure_logs_ingestion/azure_logs_ingestion_conf.h new file mode 100644 index 000000000..3886f75bc --- /dev/null +++ b/src/fluent-bit/plugins/out_azure_logs_ingestion/azure_logs_ingestion_conf.h @@ -0,0 +1,29 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_AZURE_LOGS_INGESTION_CONF_H +#define FLB_OUT_AZURE_LOGS_INGESTION_CONF_H + +#include "azure_logs_ingestion.h" + +struct flb_az_li* flb_az_li_ctx_create(struct flb_output_instance *ins, + struct flb_config *config); +int flb_az_li_ctx_destroy(struct flb_az_li *ctx); + +#endif -- cgit v1.2.3