diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:23 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:44 +0000 |
commit | 836b47cb7e99a977c5a23b059ca1d0b5065d310e (patch) | |
tree | 1604da8f482d02effa033c94a84be42bc0c848c3 /fluent-bit/plugins/out_azure_kusto | |
parent | Releasing debian version 1.44.3-2. (diff) | |
download | netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.tar.xz netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.zip |
Merging upstream version 1.46.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/plugins/out_azure_kusto')
-rw-r--r-- | fluent-bit/plugins/out_azure_kusto/CMakeLists.txt | 7 | ||||
-rw-r--r-- | fluent-bit/plugins/out_azure_kusto/azure_kusto.c | 477 | ||||
-rw-r--r-- | fluent-bit/plugins/out_azure_kusto/azure_kusto.h | 110 | ||||
-rw-r--r-- | fluent-bit/plugins/out_azure_kusto/azure_kusto_conf.c | 665 | ||||
-rw-r--r-- | fluent-bit/plugins/out_azure_kusto/azure_kusto_conf.h | 31 | ||||
-rw-r--r-- | fluent-bit/plugins/out_azure_kusto/azure_kusto_ingest.c | 496 | ||||
-rw-r--r-- | fluent-bit/plugins/out_azure_kusto/azure_kusto_ingest.h | 28 |
7 files changed, 0 insertions, 1814 deletions
diff --git a/fluent-bit/plugins/out_azure_kusto/CMakeLists.txt b/fluent-bit/plugins/out_azure_kusto/CMakeLists.txt deleted file mode 100644 index 6803bee0..00000000 --- a/fluent-bit/plugins/out_azure_kusto/CMakeLists.txt +++ /dev/null @@ -1,7 +0,0 @@ -set(src - azure_kusto.c - azure_kusto_conf.c - azure_kusto_ingest.c - ) - -FLB_PLUGIN(out_azure_kusto "${src}" "") diff --git a/fluent-bit/plugins/out_azure_kusto/azure_kusto.c b/fluent-bit/plugins/out_azure_kusto/azure_kusto.c deleted file mode 100644 index 4b8ad9b8..00000000 --- a/fluent-bit/plugins/out_azure_kusto/azure_kusto.c +++ /dev/null @@ -1,477 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_http_client.h> -#include <fluent-bit/flb_kv.h> -#include <fluent-bit/flb_oauth2.h> -#include <fluent-bit/flb_output_plugin.h> -#include <fluent-bit/flb_pack.h> -#include <fluent-bit/flb_signv4.h> -#include <fluent-bit/flb_log_event_decoder.h> - -#include "azure_kusto.h" -#include "azure_kusto_conf.h" -#include "azure_kusto_ingest.h" - -/* Create a new oauth2 context and get a oauth2 token */ -static int azure_kusto_get_oauth2_token(struct flb_azure_kusto *ctx) -{ - int ret; - char *token; - - /* Clear any previous oauth2 payload content */ - flb_oauth2_payload_clear(ctx->o); - - ret = flb_oauth2_payload_append(ctx->o, "grant_type", 10, "client_credentials", 18); - if (ret == -1) { - flb_plg_error(ctx->ins, "error appending oauth2 params"); - return -1; - } - - ret = flb_oauth2_payload_append(ctx->o, "scope", 5, FLB_AZURE_KUSTO_SCOPE, 39); - if (ret == -1) { - flb_plg_error(ctx->ins, "error appending oauth2 params"); - return -1; - } - - ret = flb_oauth2_payload_append(ctx->o, "client_id", 9, ctx->client_id, -1); - if (ret == -1) { - flb_plg_error(ctx->ins, "error appending oauth2 params"); - return -1; - } - - ret = flb_oauth2_payload_append(ctx->o, "client_secret", 13, ctx->client_secret, -1); - if (ret == -1) { - flb_plg_error(ctx->ins, "error appending oauth2 params"); - return -1; - } - - /* Retrieve access token */ - token = flb_oauth2_token_get(ctx->o); - if (!token) { - flb_plg_error(ctx->ins, "error retrieving oauth2 access token"); - return -1; - } - - return 0; -} - -flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx) -{ - int ret = 0; - flb_sds_t output = NULL; - - if (pthread_mutex_lock(&ctx->token_mutex)) { - flb_plg_error(ctx->ins, "error locking mutex"); - return NULL; - } - - if (flb_oauth2_token_expired(ctx->o) == FLB_TRUE) { - ret = azure_kusto_get_oauth2_token(ctx); - } - - /* Copy string to prevent race conditions (get_oauth2 can free the string) */ - if (ret == 0) { - output = flb_sds_create_size(flb_sds_len(ctx->o->token_type) + - flb_sds_len(ctx->o->access_token) + 2); - if (!output) { - flb_plg_error(ctx->ins, "error creating token buffer"); - return NULL; - } - flb_sds_snprintf(&output, flb_sds_alloc(output), "%s %s", ctx->o->token_type, - ctx->o->access_token); - } - - if (pthread_mutex_unlock(&ctx->token_mutex)) { - flb_plg_error(ctx->ins, "error unlocking mutex"); - if (output) { - flb_sds_destroy(output); - } - return NULL; - } - - return output; -} - -/** - * Executes a control command against kusto's endpoint - * - * @param ctx Plugin's context - * @param csl Kusto's control command - * @return flb_sds_t Returns the response or NULL on error. - */ -flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *csl) -{ - flb_sds_t token; - flb_sds_t body; - size_t b_sent; - int ret; - struct flb_connection *u_conn; - struct flb_http_client *c; - flb_sds_t resp = NULL; - - /* Get upstream connection */ - u_conn = flb_upstream_conn_get(ctx->u); - - if (u_conn) { - token = get_azure_kusto_token(ctx); - - if (token) { - /* Compose request body */ - body = flb_sds_create_size(sizeof(FLB_AZURE_KUSTO_MGMT_BODY_TEMPLATE) - 1 + - strlen(csl)); - - if (body) { - flb_sds_snprintf(&body, flb_sds_alloc(body), - FLB_AZURE_KUSTO_MGMT_BODY_TEMPLATE, csl); - - /* Compose HTTP Client request */ - c = flb_http_client(u_conn, FLB_HTTP_POST, FLB_AZURE_KUSTO_MGMT_URI_PATH, - body, flb_sds_len(body), NULL, 0, NULL, 0); - - if (c) { - /* Add headers */ - flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); - flb_http_add_header(c, "Content-Type", 12, "application/json", 16); - flb_http_add_header(c, "Accept", 6, "application/json", 16); - flb_http_add_header(c, "Authorization", 13, token, - flb_sds_len(token)); - flb_http_buffer_size(c, FLB_HTTP_DATA_SIZE_MAX * 10); - - /* Send HTTP request */ - ret = flb_http_do(c, &b_sent); - flb_plg_debug( - ctx->ins, - "Kusto ingestion command request http_do=%i, HTTP Status: %i", - ret, c->resp.status); - - if (ret == 0) { - if (c->resp.status == 200) { - /* Copy payload response to the response param */ - resp = - flb_sds_create_len(c->resp.payload, c->resp.payload_size); - } - else if (c->resp.payload_size > 0) { - flb_plg_debug(ctx->ins, "Request failed and returned: \n%s", - c->resp.payload); - } - else { - flb_plg_debug(ctx->ins, "Request failed"); - } - } - else { - flb_plg_error(ctx->ins, "cannot send HTTP request"); - } - - flb_http_client_destroy(c); - } - else { - flb_plg_error(ctx->ins, "cannot create HTTP client context"); - } - - flb_sds_destroy(body); - } - else { - flb_plg_error(ctx->ins, "cannot construct request body"); - } - - flb_sds_destroy(token); - } - else { - flb_plg_error(ctx->ins, "cannot retrieve oauth2 token"); - } - - flb_upstream_conn_release(u_conn); - } - else { - flb_plg_error(ctx->ins, "cannot create upstream connection"); - } - - return resp; -} - -static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_config *config, - void *data) -{ - int io_flags = FLB_IO_TLS; - struct flb_azure_kusto *ctx; - - /* Create config context */ - ctx = flb_azure_kusto_conf_create(ins, config); - if (!ctx) { - flb_plg_error(ins, "configuration failed"); - return -1; - } - - flb_output_set_context(ins, ctx); - - /* Network mode IPv6 */ - if (ins->host.ipv6 == FLB_TRUE) { - io_flags |= FLB_IO_IPV6; - } - - /* Create mutex for acquiring oauth tokens and getting ingestion resources (they - * are shared across flush coroutines) - */ - pthread_mutex_init(&ctx->token_mutex, NULL); - pthread_mutex_init(&ctx->resources_mutex, NULL); - - /* - * Create upstream context for Kusto Ingestion endpoint - */ - ctx->u = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls); - if (!ctx->u) { - flb_plg_error(ctx->ins, "upstream creation failed"); - return -1; - } - - /* Create oauth2 context */ - ctx->o = - flb_oauth2_create(ctx->config, ctx->oauth_url, FLB_AZURE_KUSTO_TOKEN_REFRESH); - if (!ctx->o) { - flb_plg_error(ctx->ins, "cannot create oauth2 context"); - return -1; - } - flb_output_upstream_set(ctx->u, ins); - - return 0; -} - -static int azure_kusto_format(struct flb_azure_kusto *ctx, const char *tag, int tag_len, - const void *data, size_t bytes, void **out_data, - size_t *out_size) -{ - int records = 0; - msgpack_sbuffer mp_sbuf; - msgpack_packer mp_pck; - /* for sub msgpack objs */ - int map_size; - struct tm tms; - char time_formatted[32]; - size_t s; - int len; - struct flb_log_event_decoder log_decoder; - struct flb_log_event log_event; - int ret; - /* output buffer */ - flb_sds_t out_buf; - - /* Create array for all records */ - records = flb_mp_count(data, bytes); - if (records <= 0) { - flb_plg_error(ctx->ins, "error counting msgpack entries"); - return -1; - } - - ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); - - if (ret != FLB_EVENT_DECODER_SUCCESS) { - flb_plg_error(ctx->ins, - "Log event decoder initialization error : %d", ret); - - return -1; - } - - /* Create temporary msgpack buffer */ - msgpack_sbuffer_init(&mp_sbuf); - msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); - - msgpack_pack_array(&mp_pck, records); - - while ((ret = flb_log_event_decoder_next( - &log_decoder, - &log_event)) == FLB_EVENT_DECODER_SUCCESS) { - map_size = 1; - if (ctx->include_time_key == FLB_TRUE) { - map_size++; - } - - if (ctx->include_tag_key == FLB_TRUE) { - map_size++; - } - - msgpack_pack_map(&mp_pck, map_size); - - /* include_time_key */ - if (ctx->include_time_key == FLB_TRUE) { - msgpack_pack_str(&mp_pck, flb_sds_len(ctx->time_key)); - msgpack_pack_str_body(&mp_pck, ctx->time_key, flb_sds_len(ctx->time_key)); - - /* Append the time value as ISO 8601 */ - gmtime_r(&log_event.timestamp.tm.tv_sec, &tms); - s = strftime(time_formatted, sizeof(time_formatted) - 1, - FLB_PACK_JSON_DATE_ISO8601_FMT, &tms); - - len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s, - ".%03" PRIu64 "Z", - (uint64_t)log_event.timestamp.tm.tv_nsec / 1000000); - s += len; - msgpack_pack_str(&mp_pck, s); - msgpack_pack_str_body(&mp_pck, time_formatted, s); - } - - /* include_tag_key */ - if (ctx->include_tag_key == FLB_TRUE) { - msgpack_pack_str(&mp_pck, flb_sds_len(ctx->tag_key)); - msgpack_pack_str_body(&mp_pck, ctx->tag_key, flb_sds_len(ctx->tag_key)); - msgpack_pack_str(&mp_pck, tag_len); - msgpack_pack_str_body(&mp_pck, tag, tag_len); - } - - msgpack_pack_str(&mp_pck, flb_sds_len(ctx->log_key)); - msgpack_pack_str_body(&mp_pck, ctx->log_key, flb_sds_len(ctx->log_key)); - msgpack_pack_object(&mp_pck, *log_event.body); - } - - /* Convert from msgpack to JSON */ - out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size); - - /* Cleanup */ - flb_log_event_decoder_destroy(&log_decoder); - msgpack_sbuffer_destroy(&mp_sbuf); - - if (!out_buf) { - flb_plg_error(ctx->ins, "error formatting JSON payload"); - return -1; - } - - *out_data = out_buf; - *out_size = flb_sds_len(out_buf); - - return 0; -} - -static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, - struct flb_output_flush *out_flush, - struct flb_input_instance *i_ins, void *out_context, - struct flb_config *config) -{ - int ret; - flb_sds_t json; - size_t json_size; - size_t tag_len; - struct flb_azure_kusto *ctx = out_context; - - (void)i_ins; - (void)config; - - flb_plg_trace(ctx->ins, "flushing bytes %zu", event_chunk->size); - - tag_len = flb_sds_len(event_chunk->tag); - - /* Load or refresh ingestion resources */ - ret = azure_kusto_load_ingestion_resources(ctx, config); - if (ret != 0) { - flb_plg_error(ctx->ins, "cannot load ingestion resources"); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - /* Reformat msgpack to JSON payload */ - ret = azure_kusto_format(ctx, event_chunk->tag, tag_len, event_chunk->data, - event_chunk->size, (void **)&json, &json_size); - if (ret != 0) { - flb_plg_error(ctx->ins, "cannot reformat data into json"); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, json, json_size); - if (ret != 0) { - flb_plg_error(ctx->ins, "cannot perform queued ingestion"); - flb_sds_destroy(json); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - /* Cleanup */ - flb_sds_destroy(json); - - /* Done */ - FLB_OUTPUT_RETURN(FLB_OK); -} - -static int cb_azure_kusto_exit(void *data, struct flb_config *config) -{ - struct flb_azure_kusto *ctx = data; - - if (!ctx) { - return -1; - } - - if (ctx->u) { - flb_upstream_destroy(ctx->u); - ctx->u = NULL; - } - - flb_azure_kusto_conf_destroy(ctx); - - return 0; -} - -static struct flb_config_map config_map[] = { - {FLB_CONFIG_MAP_STR, "tenant_id", (char *)NULL, 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, tenant_id), - "Set the tenant ID of the AAD application used for authentication"}, - {FLB_CONFIG_MAP_STR, "client_id", (char *)NULL, 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, client_id), - "Set the client ID (Application ID) of the AAD application used for authentication"}, - {FLB_CONFIG_MAP_STR, "client_secret", (char *)NULL, 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, client_secret), - "Set the client secret (Application Password) of the AAD application used for " - "authentication"}, - {FLB_CONFIG_MAP_STR, "ingestion_endpoint", (char *)NULL, 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, ingestion_endpoint), - "Set the Kusto cluster's ingestion endpoint URL (e.g. " - "https://ingest-mycluster.eastus.kusto.windows.net)"}, - {FLB_CONFIG_MAP_STR, "database_name", (char *)NULL, 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, database_name), "Set the database name"}, - {FLB_CONFIG_MAP_STR, "table_name", (char *)NULL, 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, table_name), "Set the table name"}, - {FLB_CONFIG_MAP_STR, "ingestion_mapping_reference", (char *)NULL, 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, ingestion_mapping_reference), - "Set the ingestion mapping reference"}, - {FLB_CONFIG_MAP_STR, "log_key", FLB_AZURE_KUSTO_DEFAULT_LOG_KEY, 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, log_key), "The key name of event payload"}, - {FLB_CONFIG_MAP_BOOL, "include_tag_key", "true", 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, include_tag_key), - "If enabled, tag is appended to output. " - "The key name is used 'tag_key' property."}, - {FLB_CONFIG_MAP_STR, "tag_key", FLB_AZURE_KUSTO_DEFAULT_TAG_KEY, 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, tag_key), - "The key name of tag. If 'include_tag_key' is false, " - "This property is ignored"}, - {FLB_CONFIG_MAP_BOOL, "include_time_key", "true", 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, include_time_key), - "If enabled, time is appended to output. " - "The key name is used 'time_key' property."}, - {FLB_CONFIG_MAP_STR, "time_key", FLB_AZURE_KUSTO_DEFAULT_TIME_KEY, 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, time_key), - "The key name of the time. If 'include_time_key' is false, " - "This property is ignored"}, - /* EOF */ - {0}}; - -struct flb_output_plugin out_azure_kusto_plugin = { - .name = "azure_kusto", - .description = "Send events to Kusto (Azure Data Explorer)", - .cb_init = cb_azure_kusto_init, - .cb_flush = cb_azure_kusto_flush, - .cb_exit = cb_azure_kusto_exit, - .config_map = config_map, - /* Plugin flags */ - .flags = FLB_OUTPUT_NET | FLB_IO_TLS, -}; diff --git a/fluent-bit/plugins/out_azure_kusto/azure_kusto.h b/fluent-bit/plugins/out_azure_kusto/azure_kusto.h deleted file mode 100644 index ac4eedfd..00000000 --- a/fluent-bit/plugins/out_azure_kusto/azure_kusto.h +++ /dev/null @@ -1,110 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_OUT_AZURE_KUSTO -#define FLB_OUT_AZURE_KUSTO - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_oauth2.h> -#include <fluent-bit/flb_output.h> -#include <fluent-bit/flb_sds.h> -#include <fluent-bit/flb_upstream_ha.h> - -/* refresh token every 50 minutes */ -#define FLB_AZURE_KUSTO_TOKEN_REFRESH 3000 - -/* Kusto streaming inserts oauth scope */ -#define FLB_AZURE_KUSTO_SCOPE "https://help.kusto.windows.net/.default" - -/* MSAL authorization URL */ -#define FLB_MSAL_AUTH_URL_TEMPLATE \ - "https://login.microsoftonline.com/%s/oauth2/v2.0/token" - -#define FLB_AZURE_KUSTO_MGMT_URI_PATH "/v1/rest/mgmt" -#define FLB_AZURE_KUSTO_MGMT_BODY_TEMPLATE "{\"csl\":\"%s\", \"db\": \"NetDefaultDB\"}" - -#define FLB_AZURE_KUSTO_DEFAULT_TIME_KEY "timestamp" -#define FLB_AZURE_KUSTO_DEFAULT_TAG_KEY "tag" -#define FLB_AZURE_KUSTO_DEFAULT_LOG_KEY "log" - -#define AZURE_KUSTO_RESOURCE_STORAGE 0 -#define AZURE_KUSTO_RESOURCE_QUEUE 1 - -#define AZURE_KUSTO_RESOURCE_UPSTREAM_URI "uri" -#define AZURE_KUSTO_RESOURCE_UPSTREAM_SAS "sas" - -#define FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC 3600 - -struct flb_azure_kusto_resources { - struct flb_upstream_ha *blob_ha; - struct flb_upstream_ha *queue_ha; - flb_sds_t identity_token; - - /* used to reload resouces after some time */ - time_t load_time; -}; - -struct flb_azure_kusto { - /* azure_kusto configuration */ - flb_sds_t tenant_id; - flb_sds_t client_id; - flb_sds_t client_secret; - flb_sds_t ingestion_endpoint; - flb_sds_t database_name; - flb_sds_t table_name; - flb_sds_t ingestion_mapping_reference; - - /* records configuration */ - flb_sds_t log_key; - int include_tag_key; - flb_sds_t tag_key; - int include_time_key; - flb_sds_t time_key; - - /* --- internal data --- */ - - flb_sds_t ingestion_mgmt_endpoint; - - /* oauth2 context */ - flb_sds_t oauth_url; - struct flb_oauth2 *o; - - /* mutex for acquiring oauth tokens */ - pthread_mutex_t token_mutex; - - /* ingestion resources */ - struct flb_azure_kusto_resources *resources; - - /* mutex for loading reosurces */ - pthread_mutex_t resources_mutex; - - /* Upstream connection to the backend server */ - struct flb_upstream *u; - - /* Fluent Bit context */ - struct flb_config *config; - - /* Plugin output instance reference */ - struct flb_output_instance *ins; -}; - -flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx); -flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *csl); - -#endif diff --git a/fluent-bit/plugins/out_azure_kusto/azure_kusto_conf.c b/fluent-bit/plugins/out_azure_kusto/azure_kusto_conf.c deleted file mode 100644 index 5303fef6..00000000 --- a/fluent-bit/plugins/out_azure_kusto/azure_kusto_conf.c +++ /dev/null @@ -1,665 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_jsmn.h> -#include <fluent-bit/flb_oauth2.h> -#include <fluent-bit/flb_output.h> -#include <fluent-bit/flb_output_plugin.h> -#include <fluent-bit/flb_sds.h> -#include <fluent-bit/flb_time.h> -#include <fluent-bit/flb_unescape.h> -#include <fluent-bit/flb_upstream_ha.h> -#include <fluent-bit/flb_utils.h> - -#include "azure_kusto.h" -#include "azure_kusto_conf.h" - -static struct flb_upstream_node *flb_upstream_node_create_url(struct flb_azure_kusto *ctx, - struct flb_config *config, - const char *url) -{ - int ret; - char *prot = NULL; - char *host = NULL; - char *port = NULL; - char *uri = NULL; - flb_sds_t sds_host = NULL; - flb_sds_t sds_port = NULL; - char *tmp; - struct flb_hash_table *kv = NULL; - struct flb_upstream_node *node = NULL; - int uri_length; - int sas_length; - - /* Parse and split URL */ - ret = flb_utils_url_split(url, &prot, &host, &port, &uri); - if (ret == -1) { - flb_plg_error(ctx->ins, "invalid URL: %s", url); - return NULL; - } - - /* find sas token in query */ - tmp = strchr(uri, '?'); - - if (tmp) { - uri_length = tmp - uri; - sas_length = strnlen(tmp + 1, 256); - - /* kv that will hold base uri, and sas token */ - kv = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 2, 2); - - if (kv) { - ret = flb_hash_table_add(kv, AZURE_KUSTO_RESOURCE_UPSTREAM_URI, 3, uri, uri_length); - - if (ret != -1) { - ret = flb_hash_table_add(kv, AZURE_KUSTO_RESOURCE_UPSTREAM_SAS, 3, tmp + 1, - sas_length); - - if (ret != -1) { - /* if any/all of these creations would fail the node creation will fail and cleanup */ - sds_host = flb_sds_create(host); - sds_port = flb_sds_create(port); - - node = flb_upstream_node_create( - NULL, sds_host, sds_port, FLB_TRUE, ctx->ins->tls->verify, - ctx->ins->tls->debug, ctx->ins->tls->vhost, NULL, NULL, NULL, - NULL, NULL, kv, config); - - if (!node) { - flb_plg_error(ctx->ins, "error creating resource upstream node"); - } - } - else { - flb_plg_error(ctx->ins, "error storing resource sas token"); - } - } - else { - flb_plg_error(ctx->ins, "error storing resource uri"); - } - - /* avoid destorying if function is successful */ - if (!node) { - flb_hash_table_destroy(kv); - } - } - else { - flb_plg_error(ctx->ins, "error creating upstream node hash table"); - } - } - else { - flb_plg_error(ctx->ins, "uri has no sas token query: %s", uri); - } - - flb_free(prot); - flb_free(host); - flb_free(port); - flb_free(uri); - - return node; -} - -static int flb_azure_kusto_resources_clear(struct flb_azure_kusto_resources *resources) -{ - if (!resources) { - return -1; - } - - if (resources->blob_ha) { - flb_upstream_ha_destroy(resources->blob_ha); - resources->blob_ha = NULL; - } - - if (resources->queue_ha) { - flb_upstream_ha_destroy(resources->queue_ha); - resources->queue_ha = NULL; - } - - if (resources->identity_token) { - flb_sds_destroy(resources->identity_token); - resources->identity_token = NULL; - } - - resources->load_time = 0; - - return 0; -} - -/** - * Parses ".get ingestion resources" response into HA upstreams of the queue & blob - * resources in the response. - * - * @param ctx Pointer to the plugin's context - * @param config Pointer to the config - * @param response sds string containing the response body - * @param blob_ha Pointer to an HA upstream for the blob resources, that would be - * allocated here. - * @param queue_ha Pointer to an HA upstream for the queue resources, that would be - * allocated here. - * @return int 0 on success, -1 on failure - */ -static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_config *config, - flb_sds_t response, struct flb_upstream_ha *blob_ha, - struct flb_upstream_ha *queue_ha) -{ - jsmn_parser parser; - jsmntok_t *t; - jsmntok_t *tokens; - int tok_size = 100; - int ret = -1; - int i; - int blob_count = 0; - int queue_count = 0; - char *token_str; - int token_str_len; - int resource_type; - struct flb_upstream_node *node; - struct flb_upstream_ha *ha; - flb_sds_t resource_uri; - - /* Response is a json in the form of - * { - * "Tables": [ - * { - * "TableName": "Table_0", - * "Columns": [...], - * "Rows": [ - * [ - * ("TempStorage" | "SecuredReadyForAggregationQueue" | - * "SuccessfulIngestionsQueue" | "FailedIngestionsQueue" | "IngestionsStatusTable"), - * <URI with SAS> - * ], - * ... - * ] - * } - * ] - * } - */ - - resource_uri = flb_sds_create(NULL); - if (!resource_uri) { - flb_plg_error(ctx->ins, "error allocating resource uri buffer"); - return -1; - } - - jsmn_init(&parser); - tokens = flb_calloc(1, sizeof(jsmntok_t) * tok_size); - - if (tokens) { - ret = jsmn_parse(&parser, response, flb_sds_len(response), tokens, tok_size); - - if (ret > 0) { - /* skip all tokens until we reach "Rows" */ - for (i = 0; i < ret - 1; i++) { - t = &tokens[i]; - - if (t->type != JSMN_STRING) { - continue; - } - - token_str = response + t->start; - token_str_len = (t->end - t->start); - - /** - * if we found the Rows key, skipping this token and the next one (key and - * wrapping array value) - */ - if (token_str_len == 4 && strncmp(token_str, "Rows", 4) == 0) { - i += 2; - break; - } - } - - /* iterating rows, each row will have 3 tokens: the array holding the column - * values, the first value containing the resource type, and the second value - * containing the resource uri */ - for (; i < ret; i++) { - t = &tokens[i]; - - /** - * each token should be an array with 2 strings: - * First will be the resource type (TempStorage, - * SecuredReadyForAggregationQueue, etc...) Second will be the SAS URI - */ - if (t->type != JSMN_ARRAY) { - break; - } - - /* move to the next token, first item in the array - resource type */ - i++; - t = &tokens[i]; - if (t->type != JSMN_STRING) { - break; - } - - token_str = response + t->start; - token_str_len = (t->end - t->start); - - flb_plg_debug(ctx->ins, "found resource of type: %.*s ", - t->end - t->start, response + t->start); - - if (token_str_len == 11 && strncmp(token_str, "TempStorage", 11) == 0) { - resource_type = AZURE_KUSTO_RESOURCE_STORAGE; - } - else if (token_str_len == 31 && - strncmp(token_str, "SecuredReadyForAggregationQueue", 31) == 0) { - resource_type = AZURE_KUSTO_RESOURCE_QUEUE; - } - /* we don't care about other resources so we just skip the next token and - move on to the next pair */ - else { - i++; - continue; - } - - /* move to the next token, second item in the array - resource URI */ - i++; - t = &tokens[i]; - - if (t->type != JSMN_STRING) { - break; - } - - token_str = response + t->start; - token_str_len = (t->end - t->start); - - resource_uri = flb_sds_copy(resource_uri, token_str, token_str_len); - if (resource_type == AZURE_KUSTO_RESOURCE_QUEUE) { - ha = queue_ha; - queue_count++; - } - else { - ha = blob_ha; - blob_count++; - } - - if (!ha) { - flb_plg_error(ctx->ins, "error creating HA upstream"); - ret = -1; - break; - } - - node = flb_upstream_node_create_url(ctx, config, resource_uri); - - if (!node) { - flb_plg_error(ctx->ins, "error creating HA upstream node"); - ret = -1; - break; - } - - flb_upstream_ha_node_add(ha, node); - } - - if (ret != -1) { - if (queue_count > 0 && blob_count > 0) { - flb_plg_debug(ctx->ins, - "parsed %d blob resources and %d queue resources", - blob_count, queue_count); - ret = 0; - } - else { - flb_plg_error(ctx->ins, "error parsing resources: missing resources"); - ret = -1; - } - } - } - else { - flb_plg_error(ctx->ins, "error parsing JSON response: %s", response); - ret = -1; - } - } - else { - flb_plg_error(ctx->ins, "error allocating tokens"); - ret = -1; - } - - flb_sds_destroy(resource_uri); - flb_free(tokens); - - return ret; -} - -/** - * Parses ".get kusto identity token" response and returns the token as an sds string - * - * @param ctx Pointer to the plugin's context - * @param response sds string containing the response body - * @return flb_sds_t The parsed token - */ -static flb_sds_t parse_ingestion_identity_token(struct flb_azure_kusto *ctx, - flb_sds_t response) -{ - flb_sds_t identity_token = NULL; - int tok_size = 19; - jsmn_parser parser; - jsmntok_t *t; - jsmntok_t *tokens; - int ret; - char *token_str; - int token_str_len; - - /** - * Response is a json in the form of - * { - * "Tables": [ - * { - * "TableName": "Table_0", - * "Columns": [{ - * "ColumnName": "AuthorizationContext", - * "DataType": "String", - * "ColumnType": "string" - * }], - * "Rows": [ - * [ - * <value>, - * ] - * ] - * } - * ] - * } - * i.e. only one row and one column is expected (exactly 13 tokens) and the value - * should be the last - */ - - jsmn_init(&parser); - tokens = flb_calloc(1, sizeof(jsmntok_t) * tok_size); - if (!tokens) { - flb_plg_error(ctx->ins, "error allocating tokens"); - return NULL; - } - - ret = jsmn_parse(&parser, response, flb_sds_len(response), tokens, tok_size); - if (ret > 0) { - t = &tokens[tok_size - 1]; - - if (t->type == JSMN_STRING) { - t = &tokens[tok_size - 1]; - token_str = response + t->start; - token_str_len = (t->end - t->start); - - identity_token = flb_sds_create_len(token_str, token_str_len); - - if (identity_token) { - flb_plg_debug(ctx->ins, "parsed kusto identity token: '%s'", - identity_token); - } - else { - flb_plg_error(ctx->ins, "error parsing kusto identity token"); - } - } - else { - flb_plg_error(ctx->ins, "unexpected JSON response: %s", response); - } - } - else { - flb_plg_error(ctx->ins, "error parsing JSON response: %s", response); - } - - flb_free(tokens); - - return identity_token; -} - -int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, - struct flb_config *config) -{ - int ret = -1; - flb_sds_t response = NULL; - flb_sds_t identity_token = NULL; - struct flb_upstream_ha *blob_ha = NULL; - struct flb_upstream_ha *queue_ha = NULL; - time_t now; - - if (pthread_mutex_lock(&ctx->resources_mutex)) { - flb_plg_error(ctx->ins, "error locking mutex"); - return -1; - } - - now = time(NULL); - - /* check if we have all resources and they are not stale */ - if (ctx->resources->blob_ha && ctx->resources->queue_ha && - ctx->resources->identity_token && - now - ctx->resources->load_time < FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC) { - flb_plg_debug(ctx->ins, "resources are already loaded and are not stale"); - ret = 0; - } - else { - flb_plg_info(ctx->ins, "loading kusto ingestion resourcs"); - response = execute_ingest_csl_command(ctx, ".get ingestion resources"); - - if (response) { - queue_ha = flb_upstream_ha_create("azure_kusto_queue_ha"); - - if (queue_ha) { - blob_ha = flb_upstream_ha_create("azure_kusto_blob_ha"); - - if (blob_ha) { - ret = - parse_storage_resources(ctx, config, response, blob_ha, queue_ha); - - if (ret == 0) { - flb_sds_destroy(response); - response = NULL; - - response = - execute_ingest_csl_command(ctx, ".get kusto identity token"); - - if (response) { - identity_token = - parse_ingestion_identity_token(ctx, response); - - if (identity_token) { - ret = flb_azure_kusto_resources_clear(ctx->resources); - - if (ret != -1) { - ctx->resources->blob_ha = blob_ha; - ctx->resources->queue_ha = queue_ha; - ctx->resources->identity_token = identity_token; - ctx->resources->load_time = now; - - ret = 0; - } - else { - flb_plg_error( - ctx->ins, - "error destroying previous ingestion resources"); - } - } - else { - flb_plg_error(ctx->ins, - "error parsing ingestion identity token"); - ret = -1; - } - } - else { - flb_plg_error(ctx->ins, "error getting kusto identity token"); - ret = -1; - } - } - else { - flb_plg_error(ctx->ins, - "error parsing ingestion storage resources"); - ret = -1; - } - - if (ret == -1) { - flb_upstream_ha_destroy(blob_ha); - } - } - else { - flb_plg_error(ctx->ins, "error creating storage resources upstreams"); - ret = -1; - } - - if (ret == -1) { - flb_upstream_ha_destroy(queue_ha); - } - } - else { - flb_plg_error(ctx->ins, "error creating storage resources upstreams"); - } - - if (response) { - flb_sds_destroy(response); - } - } - if (!response) { - flb_plg_error(ctx->ins, "error getting ingestion storage resources"); - } - } - - if (pthread_mutex_unlock(&ctx->resources_mutex)) { - flb_plg_error(ctx->ins, "error unlocking mutex"); - return -1; - } - - return ret; -} - -static int flb_azure_kusto_resources_destroy(struct flb_azure_kusto_resources *resources) -{ - int ret; - - if (!resources) { - return -1; - } - - ret = flb_azure_kusto_resources_clear(resources); - if (ret != 0) { - return -1; - } - - flb_free(resources); - - return 0; -} - -struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance *ins, - struct flb_config *config) -{ - int ret; - struct flb_azure_kusto *ctx; - - /* Allocate config context */ - ctx = flb_calloc(1, sizeof(struct flb_azure_kusto)); - if (!ctx) { - flb_errno(); - return NULL; - } - ctx->ins = ins; - ctx->config = config; - - ret = flb_output_config_map_set(ins, (void *)ctx); - if (ret == -1) { - flb_plg_error(ins, "unable to load configuration"); - flb_free(ctx); - return NULL; - } - - /* config: 'tenant_id' */ - if (ctx->tenant_id == NULL) { - flb_plg_error(ctx->ins, "property 'tenant_id' is not defined."); - flb_azure_kusto_conf_destroy(ctx); - return NULL; - } - - /* config: 'client_id' */ - if (ctx->client_id == NULL) { - flb_plg_error(ctx->ins, "property 'client_id' is not defined"); - flb_azure_kusto_conf_destroy(ctx); - return NULL; - } - - /* config: 'client_secret' */ - if (ctx->client_secret == NULL) { - flb_plg_error(ctx->ins, "property 'client_secret' is not defined"); - flb_azure_kusto_conf_destroy(ctx); - return NULL; - } - - /* config: 'ingestion_endpoint' */ - if (ctx->ingestion_endpoint == NULL) { - flb_plg_error(ctx->ins, "property 'ingestion_endpoint' is not defined"); - flb_azure_kusto_conf_destroy(ctx); - return NULL; - } - - /* config: 'database_name' */ - if (ctx->database_name == NULL) { - flb_plg_error(ctx->ins, "property 'database_name' is not defined"); - flb_azure_kusto_conf_destroy(ctx); - return NULL; - } - - /* config: 'table_name' */ - if (ctx->table_name == NULL) { - flb_plg_error(ctx->ins, "property 'table_name' is not defined"); - flb_azure_kusto_conf_destroy(ctx); - return NULL; - } - - /* Create the auth URL */ - ctx->oauth_url = flb_sds_create_size(sizeof(FLB_MSAL_AUTH_URL_TEMPLATE) - 1 + - flb_sds_len(ctx->tenant_id)); - if (!ctx->oauth_url) { - flb_errno(); - flb_azure_kusto_conf_destroy(ctx); - return NULL; - } - flb_sds_snprintf(&ctx->oauth_url, flb_sds_alloc(ctx->oauth_url), - FLB_MSAL_AUTH_URL_TEMPLATE, ctx->tenant_id); - - ctx->resources = flb_calloc(1, sizeof(struct flb_azure_kusto_resources)); - if (!ctx->resources) { - flb_errno(); - flb_azure_kusto_conf_destroy(ctx); - return NULL; - } - - flb_plg_info(ctx->ins, "endpoint='%s', database='%s', table='%s'", - ctx->ingestion_endpoint, ctx->database_name, ctx->table_name); - - return ctx; -} - -int flb_azure_kusto_conf_destroy(struct flb_azure_kusto *ctx) -{ - if (!ctx) { - return -1; - } - - if (ctx->oauth_url) { - flb_sds_destroy(ctx->oauth_url); - ctx->oauth_url = NULL; - } - - if (ctx->o) { - flb_oauth2_destroy(ctx->o); - ctx->o = NULL; - } - - if (ctx->resources) { - flb_azure_kusto_resources_destroy(ctx->resources); - ctx->resources = NULL; - } - - flb_free(ctx); - return 0; -} diff --git a/fluent-bit/plugins/out_azure_kusto/azure_kusto_conf.h b/fluent-bit/plugins/out_azure_kusto/azure_kusto_conf.h deleted file mode 100644 index b4b2e3a3..00000000 --- a/fluent-bit/plugins/out_azure_kusto/azure_kusto_conf.h +++ /dev/null @@ -1,31 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_OUT_AZURE_KUSTO_CONF_H -#define FLB_OUT_AZURE_KUSTO_CONF_H - -#include "azure_kusto.h" - -int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, - struct flb_config *config); -struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance *ins, - struct flb_config *config); -int flb_azure_kusto_conf_destroy(struct flb_azure_kusto *ctx); - -#endif diff --git a/fluent-bit/plugins/out_azure_kusto/azure_kusto_ingest.c b/fluent-bit/plugins/out_azure_kusto/azure_kusto_ingest.c deleted file mode 100644 index d38d92e7..00000000 --- a/fluent-bit/plugins/out_azure_kusto/azure_kusto_ingest.c +++ /dev/null @@ -1,496 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_base64.h> -#include <fluent-bit/flb_http_client.h> -#include <fluent-bit/flb_output_plugin.h> -#include <fluent-bit/flb_random.h> -#include <fluent-bit/flb_time.h> -#include <fluent-bit/flb_utils.h> - -#include <math.h> -#include <msgpack.h> - -#include "azure_kusto_ingest.h" - -/* not really uuid but a random string in the form 00000000-0000-0000-0000-000000000000 */ -static char *generate_uuid() -{ - char *chars = "0123456789abcdef"; - char *uuid; - int i; - uint64_t rand; - - uuid = flb_malloc(37); - if (!uuid) { - flb_errno(); - return NULL; - } - - for (i = 0; i < 36; i++) { - if (i == 8 || i == 13 || i == 18 || i == 23) { - uuid[i] = '-'; - continue; - } - - if (flb_random_bytes((unsigned char *)&rand, sizeof(uint64_t))) { - rand = time(NULL); - } - uuid[i] = chars[rand % 16]; - } - uuid[36] = '\0'; - - return uuid; -} - -static char *base64_encode(flb_sds_t s, size_t len, size_t *out_len) -{ - char *b64; - int ret; - size_t buffer_len = 4 * ceil(((double)len / 3) + 1); - - b64 = flb_malloc(buffer_len); - if (!b64) { - flb_errno(); - return NULL; - } - - ret = flb_base64_encode((unsigned char *)b64, buffer_len, out_len, (unsigned char *)s, - len); - if (ret != 0) { - flb_error("cannot encode string %s into base64", s); - flb_free(b64); - return NULL; - } - - return b64; -} - -static flb_sds_t azure_kusto_create_blob_uri(struct flb_azure_kusto *ctx, - struct flb_upstream_node *u_node, - flb_sds_t blob_id) -{ - int ret; - flb_sds_t uri = NULL; - char *blob_uri; - size_t blob_uri_size; - char *blob_sas; - size_t blob_sas_size; - - ret = flb_hash_table_get(u_node->ht, AZURE_KUSTO_RESOURCE_UPSTREAM_URI, 3, - (void **)&blob_uri, &blob_uri_size); - if (ret == -1) { - flb_plg_error(ctx->ins, "error getting blob uri"); - return NULL; - } - - ret = flb_hash_table_get(u_node->ht, AZURE_KUSTO_RESOURCE_UPSTREAM_SAS, 3, - (void **)&blob_sas, &blob_sas_size); - if (ret == -1) { - flb_plg_error(ctx->ins, "error getting blob sas token"); - return NULL; - } - - /* uri will be https://<blob_host>/<container_uri>/<blob_id>.multijson?<sas_token> */ - uri = flb_sds_create_size(flb_sds_len(u_node->host) + blob_uri_size + blob_sas_size + - flb_sds_len(blob_id) + 21); - - if (uri) { - flb_sds_snprintf(&uri, flb_sds_alloc(uri), "https://%s%s/%s.multijson?%s", - u_node->host, blob_uri, blob_id, blob_sas); - flb_plg_debug(ctx->ins, "created blob uri %s", uri); - } - else { - flb_plg_error(ctx->ins, "cannot create blob uri buffer"); - } - - return uri; -} - -static flb_sds_t azure_kusto_create_blob(struct flb_azure_kusto *ctx, flb_sds_t blob_id, - flb_sds_t payload, size_t payload_size) -{ - int ret = -1; - flb_sds_t uri = NULL; - struct flb_upstream_node *u_node; - struct flb_connection *u_conn; - struct flb_http_client *c; - size_t resp_size; - time_t now; - struct tm tm; - char tmp[64]; - int len; - - now = time(NULL); - gmtime_r(&now, &tm); - len = strftime(tmp, sizeof(tmp) - 1, "%a, %d %b %Y %H:%M:%S GMT", &tm); - - u_node = flb_upstream_ha_node_get(ctx->resources->blob_ha); - if (!u_node) { - flb_plg_error(ctx->ins, "error getting blob upstream"); - return NULL; - } - - u_conn = flb_upstream_conn_get(u_node->u); - - if (u_conn) { - uri = azure_kusto_create_blob_uri(ctx, u_node, blob_id); - - if (uri) { - flb_plg_debug(ctx->ins, "uploading payload to blob uri: %s", uri); - c = flb_http_client(u_conn, FLB_HTTP_PUT, uri, payload, payload_size, NULL, 0, - NULL, 0); - - if (c) { - flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); - flb_http_add_header(c, "Content-Type", 12, "application/json", 16); - flb_http_add_header(c, "x-ms-blob-type", 14, "BlockBlob", 9); - flb_http_add_header(c, "x-ms-date", 9, tmp, len); - flb_http_add_header(c, "x-ms-version", 12, "2019-12-12", 10); - - ret = flb_http_do(c, &resp_size); - flb_plg_debug(ctx->ins, - "kusto blob upload request http_do=%i, HTTP Status: %i", - ret, c->resp.status); - - if (ret == 0) { - /* Validate return status and HTTP status if set */ - if (c->resp.status != 201) { - ret = -1; - - if (c->resp.payload_size > 0) { - flb_plg_debug(ctx->ins, "Request failed and returned: \n%s", - c->resp.payload); - } - else { - flb_plg_debug(ctx->ins, "Request failed"); - } - } - } - else { - flb_plg_error(ctx->ins, "cannot send HTTP request"); - } - - flb_http_client_destroy(c); - } - else { - flb_plg_error(ctx->ins, - "cannot create HTTP client context for blob container"); - } - - if (ret != 0) { - flb_sds_destroy(uri); - uri = NULL; - } - } - else { - flb_plg_error(ctx->ins, "error creating blob container uri buffer"); - } - - flb_upstream_conn_release(u_conn); - } - else { - flb_plg_error(ctx->ins, "error getting blob container upstream connection"); - } - - return uri; -} - -static flb_sds_t create_ingestion_message(struct flb_azure_kusto *ctx, flb_sds_t blob_uri, - size_t payload_size) -{ - flb_sds_t message = NULL; - int ret = 0; - char *uuid; - char *message_b64; - size_t b64_len; - size_t message_len; - - uuid = generate_uuid(); - if (uuid) { - message = flb_sds_create(NULL); - - if (message) { - message_len = - flb_sds_snprintf(&message, 0, - "{\"Id\": \"%s\", \"BlobPath\": \"%s\", " - "\"RawDataSize\": %lu, \"DatabaseName\": " - "\"%s\", \"TableName\": \"%s\"," - "\"AdditionalProperties\": { \"format\": \"multijson\", " - "\"authorizationContext\": " - "\"%s\", \"jsonMappingReference\": \"%s\" }}%c", - uuid, blob_uri, payload_size, ctx->database_name, - ctx->table_name, ctx->resources->identity_token, - ctx->ingestion_mapping_reference == NULL - ? "" - : ctx->ingestion_mapping_reference, 0); - - if (message_len != -1) { - flb_plg_debug(ctx->ins, "created ingestion message:\n%s", message); - message_b64 = base64_encode(message, message_len, &b64_len); - - if (message_b64) { - ret = flb_sds_snprintf( - &message, flb_sds_alloc(message), - "<QueueMessage><MessageText>%s</MessageText></QueueMessage>%c", - message_b64, 0); - - if (ret == -1) { - flb_plg_error(ctx->ins, "error creating ingestion queue message"); - } - - flb_free(message_b64); - } - else { - flb_plg_error(ctx->ins, "error encoding ingestion message to base64"); - } - } - else { - flb_plg_error(ctx->ins, "error creating ingestion message"); - ret = -1; - } - - if (ret == -1) { - flb_sds_destroy(message); - message = NULL; - } - } - else { - flb_plg_error(ctx->ins, "error creating ingestion message buffer"); - } - - flb_free(uuid); - } - else { - flb_plg_error(ctx->ins, "error generating unique ingestion UUID"); - } - - return message; -} - -static flb_sds_t azure_kusto_create_queue_uri(struct flb_azure_kusto *ctx, - struct flb_upstream_node *u_node) -{ - int ret; - flb_sds_t uri = NULL; - char *queue_uri; - size_t queue_uri_size; - char *queue_sas; - size_t queue_sas_size; - - ret = flb_hash_table_get(u_node->ht, AZURE_KUSTO_RESOURCE_UPSTREAM_URI, 3, - (void **)&queue_uri, &queue_uri_size); - if (ret == -1) { - flb_plg_error(ctx->ins, "error getting queue uri"); - return NULL; - } - - ret = flb_hash_table_get(u_node->ht, AZURE_KUSTO_RESOURCE_UPSTREAM_SAS, 3, - (void **)&queue_sas, &queue_sas_size); - if (ret == -1) { - flb_plg_error(ctx->ins, "error getting queue sas token"); - return NULL; - } - - /* uri will be <container_uri>/messages?<sas_token> */ - uri = flb_sds_create_size(queue_uri_size + queue_sas_size + 11); - - if (uri) { - flb_sds_snprintf(&uri, flb_sds_alloc(uri), "%s/messages?%s", queue_uri, - queue_sas); - flb_plg_debug(ctx->ins, "created queue uri %s", uri); - } - else { - flb_plg_error(ctx->ins, "cannot create queue uri buffer"); - } - - return uri; -} - -static int azure_kusto_enqueue_ingestion(struct flb_azure_kusto *ctx, flb_sds_t blob_uri, - size_t payload_size) -{ - int ret = -1; - struct flb_upstream_node *u_node; - struct flb_connection *u_conn; - struct flb_http_client *c; - flb_sds_t uri; - flb_sds_t payload; - size_t resp_size; - time_t now; - struct tm tm; - char tmp[64]; - int len; - - now = time(NULL); - gmtime_r(&now, &tm); - len = strftime(tmp, sizeof(tmp) - 1, "%a, %d %b %Y %H:%M:%S GMT", &tm); - - u_node = flb_upstream_ha_node_get(ctx->resources->queue_ha); - if (!u_node) { - flb_plg_error(ctx->ins, "error getting queue upstream"); - return -1; - } - - u_conn = flb_upstream_conn_get(u_node->u); - - if (u_conn) { - uri = azure_kusto_create_queue_uri(ctx, u_node); - - if (uri) { - payload = create_ingestion_message(ctx, blob_uri, payload_size); - - if (payload) { - c = flb_http_client(u_conn, FLB_HTTP_POST, uri, payload, - flb_sds_len(payload), NULL, 0, NULL, 0); - - if (c) { - flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); - flb_http_add_header(c, "Content-Type", 12, "application/atom+xml", - 20); - flb_http_add_header(c, "x-ms-date", 9, tmp, len); - flb_http_add_header(c, "x-ms-version", 12, "2019-12-12", 10); - - ret = flb_http_do(c, &resp_size); - flb_plg_debug(ctx->ins, - "kusto queue request http_do=%i, HTTP Status: %i", ret, - c->resp.status); - - if (ret == 0) { - /* Validate return status and HTTP status if set */ - if (c->resp.status != 201) { - ret = -1; - - if (c->resp.payload_size > 0) { - flb_plg_debug(ctx->ins, - "Request failed and returned: \n%s", - c->resp.payload); - } - else { - flb_plg_debug(ctx->ins, "Request failed"); - } - } - } - else { - flb_plg_error(ctx->ins, "cannot send HTTP request"); - } - - flb_http_client_destroy(c); - } - else { - flb_plg_error(ctx->ins, - "cannot create HTTP client context for queue"); - } - - flb_sds_destroy(payload); - } - else { - flb_plg_error(ctx->ins, "error creating payload buffer"); - } - - flb_sds_destroy(uri); - } - else { - flb_plg_error(ctx->ins, "error creating queue uri buffer"); - } - - flb_upstream_conn_release(u_conn); - } - else { - flb_plg_error(ctx->ins, "error getting queue upstream connection"); - } - - return ret; -} - -static flb_sds_t azure_kusto_create_blob_id(struct flb_azure_kusto *ctx, flb_sds_t tag, - size_t tag_len) -{ - flb_sds_t blob_id = NULL; - struct flb_time tm; - uint64_t ms; - char *b64tag; - size_t b64_len; - - flb_time_get(&tm); - ms = ((tm.tm.tv_sec * 1000) + (tm.tm.tv_nsec / 1000000)); - - b64tag = base64_encode(tag, tag_len, &b64_len); - - if (b64tag) { - /* remove trailing '=' */ - while (b64_len && b64tag[b64_len - 1] == '=') { - b64tag[b64_len - 1] = '\0'; - b64_len--; - } - - blob_id = flb_sds_create_size(flb_sds_len(ctx->database_name) + - flb_sds_len(ctx->table_name) + b64_len + 24); - if (blob_id) { - flb_sds_snprintf(&blob_id, flb_sds_alloc(blob_id), "flb__%s__%s__%s__%lu", - ctx->database_name, ctx->table_name, b64tag, ms); - } - else { - flb_plg_error(ctx->ins, "cannot create blob id buffer"); - } - - flb_free(b64tag); - } - else { - flb_plg_error(ctx->ins, "error encoding tag '%s' to base64", tag); - } - - return blob_id; -} - -int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, - size_t tag_len, flb_sds_t payload, size_t payload_size) -{ - int ret = -1; - flb_sds_t blob_id; - flb_sds_t blob_uri; - - /* flb__<db>__<table>__<b64tag>__<timestamp> */ - blob_id = azure_kusto_create_blob_id(ctx, tag, tag_len); - - if (blob_id) { - blob_uri = azure_kusto_create_blob(ctx, blob_id, payload, payload_size); - - if (blob_uri) { - ret = azure_kusto_enqueue_ingestion(ctx, blob_uri, payload_size); - - if (ret != 0) { - flb_plg_error(ctx->ins, "failed to enqueue ingestion blob to queue"); - ret = -1; - } - - flb_sds_destroy(blob_uri); - } - else { - flb_plg_error(ctx->ins, "failed to create payload blob uri"); - } - - flb_sds_destroy(blob_id); - } - else { - flb_plg_error(ctx->ins, "cannot create blob id"); - } - - return ret; -} diff --git a/fluent-bit/plugins/out_azure_kusto/azure_kusto_ingest.h b/fluent-bit/plugins/out_azure_kusto/azure_kusto_ingest.h deleted file mode 100644 index 60613919..00000000 --- a/fluent-bit/plugins/out_azure_kusto/azure_kusto_ingest.h +++ /dev/null @@ -1,28 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_OUT_AZURE_KUSTO_INGEST_H -#define FLB_OUT_AZURE_KUSTO_INGEST_H - -#include "azure_kusto.h" - -int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, - size_t tag_len, flb_sds_t payload, size_t payload_size); - -#endif
\ No newline at end of file |