diff options
Diffstat (limited to 'src/fluent-bit/plugins/out_azure_kusto/azure_kusto_conf.c')
-rw-r--r-- | src/fluent-bit/plugins/out_azure_kusto/azure_kusto_conf.c | 665 |
1 files changed, 665 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/out_azure_kusto/azure_kusto_conf.c b/src/fluent-bit/plugins/out_azure_kusto/azure_kusto_conf.c new file mode 100644 index 000000000..5303fef67 --- /dev/null +++ b/src/fluent-bit/plugins/out_azure_kusto/azure_kusto_conf.c @@ -0,0 +1,665 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_jsmn.h> +#include <fluent-bit/flb_oauth2.h> +#include <fluent-bit/flb_output.h> +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/flb_time.h> +#include <fluent-bit/flb_unescape.h> +#include <fluent-bit/flb_upstream_ha.h> +#include <fluent-bit/flb_utils.h> + +#include "azure_kusto.h" +#include "azure_kusto_conf.h" + +static struct flb_upstream_node *flb_upstream_node_create_url(struct flb_azure_kusto *ctx, + struct flb_config *config, + const char *url) +{ + int ret; + char *prot = NULL; + char *host = NULL; + char *port = NULL; + char *uri = NULL; + flb_sds_t sds_host = NULL; + flb_sds_t sds_port = NULL; + char *tmp; + struct flb_hash_table *kv = NULL; + struct flb_upstream_node *node = NULL; + int uri_length; + int sas_length; + + /* Parse and split URL */ + ret = flb_utils_url_split(url, &prot, &host, &port, &uri); + if (ret == -1) { + flb_plg_error(ctx->ins, "invalid URL: %s", url); + return NULL; + } + + /* find sas token in query */ + tmp = strchr(uri, '?'); + + if (tmp) { + uri_length = tmp - uri; + sas_length = strnlen(tmp + 1, 256); + + /* kv that will hold base uri, and sas token */ + kv = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 2, 2); + + if (kv) { + ret = flb_hash_table_add(kv, AZURE_KUSTO_RESOURCE_UPSTREAM_URI, 3, uri, uri_length); + + if (ret != -1) { + ret = flb_hash_table_add(kv, AZURE_KUSTO_RESOURCE_UPSTREAM_SAS, 3, tmp + 1, + sas_length); + + if (ret != -1) { + /* if any/all of these creations would fail the node creation will fail and cleanup */ + sds_host = flb_sds_create(host); + sds_port = flb_sds_create(port); + + node = flb_upstream_node_create( + NULL, sds_host, sds_port, FLB_TRUE, ctx->ins->tls->verify, + ctx->ins->tls->debug, ctx->ins->tls->vhost, NULL, NULL, NULL, + NULL, NULL, kv, config); + + if (!node) { + flb_plg_error(ctx->ins, "error creating resource upstream node"); + } + } + else { + flb_plg_error(ctx->ins, "error storing resource sas token"); + } + } + else { + flb_plg_error(ctx->ins, "error storing resource uri"); + } + + /* avoid destorying if function is successful */ + if (!node) { + flb_hash_table_destroy(kv); + } + } + else { + flb_plg_error(ctx->ins, "error creating upstream node hash table"); + } + } + else { + flb_plg_error(ctx->ins, "uri has no sas token query: %s", uri); + } + + flb_free(prot); + flb_free(host); + flb_free(port); + flb_free(uri); + + return node; +} + +static int flb_azure_kusto_resources_clear(struct flb_azure_kusto_resources *resources) +{ + if (!resources) { + return -1; + } + + if (resources->blob_ha) { + flb_upstream_ha_destroy(resources->blob_ha); + resources->blob_ha = NULL; + } + + if (resources->queue_ha) { + flb_upstream_ha_destroy(resources->queue_ha); + resources->queue_ha = NULL; + } + + if (resources->identity_token) { + flb_sds_destroy(resources->identity_token); + resources->identity_token = NULL; + } + + resources->load_time = 0; + + return 0; +} + +/** + * Parses ".get ingestion resources" response into HA upstreams of the queue & blob + * resources in the response. + * + * @param ctx Pointer to the plugin's context + * @param config Pointer to the config + * @param response sds string containing the response body + * @param blob_ha Pointer to an HA upstream for the blob resources, that would be + * allocated here. + * @param queue_ha Pointer to an HA upstream for the queue resources, that would be + * allocated here. + * @return int 0 on success, -1 on failure + */ +static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_config *config, + flb_sds_t response, struct flb_upstream_ha *blob_ha, + struct flb_upstream_ha *queue_ha) +{ + jsmn_parser parser; + jsmntok_t *t; + jsmntok_t *tokens; + int tok_size = 100; + int ret = -1; + int i; + int blob_count = 0; + int queue_count = 0; + char *token_str; + int token_str_len; + int resource_type; + struct flb_upstream_node *node; + struct flb_upstream_ha *ha; + flb_sds_t resource_uri; + + /* Response is a json in the form of + * { + * "Tables": [ + * { + * "TableName": "Table_0", + * "Columns": [...], + * "Rows": [ + * [ + * ("TempStorage" | "SecuredReadyForAggregationQueue" | + * "SuccessfulIngestionsQueue" | "FailedIngestionsQueue" | "IngestionsStatusTable"), + * <URI with SAS> + * ], + * ... + * ] + * } + * ] + * } + */ + + resource_uri = flb_sds_create(NULL); + if (!resource_uri) { + flb_plg_error(ctx->ins, "error allocating resource uri buffer"); + return -1; + } + + jsmn_init(&parser); + tokens = flb_calloc(1, sizeof(jsmntok_t) * tok_size); + + if (tokens) { + ret = jsmn_parse(&parser, response, flb_sds_len(response), tokens, tok_size); + + if (ret > 0) { + /* skip all tokens until we reach "Rows" */ + for (i = 0; i < ret - 1; i++) { + t = &tokens[i]; + + if (t->type != JSMN_STRING) { + continue; + } + + token_str = response + t->start; + token_str_len = (t->end - t->start); + + /** + * if we found the Rows key, skipping this token and the next one (key and + * wrapping array value) + */ + if (token_str_len == 4 && strncmp(token_str, "Rows", 4) == 0) { + i += 2; + break; + } + } + + /* iterating rows, each row will have 3 tokens: the array holding the column + * values, the first value containing the resource type, and the second value + * containing the resource uri */ + for (; i < ret; i++) { + t = &tokens[i]; + + /** + * each token should be an array with 2 strings: + * First will be the resource type (TempStorage, + * SecuredReadyForAggregationQueue, etc...) Second will be the SAS URI + */ + if (t->type != JSMN_ARRAY) { + break; + } + + /* move to the next token, first item in the array - resource type */ + i++; + t = &tokens[i]; + if (t->type != JSMN_STRING) { + break; + } + + token_str = response + t->start; + token_str_len = (t->end - t->start); + + flb_plg_debug(ctx->ins, "found resource of type: %.*s ", + t->end - t->start, response + t->start); + + if (token_str_len == 11 && strncmp(token_str, "TempStorage", 11) == 0) { + resource_type = AZURE_KUSTO_RESOURCE_STORAGE; + } + else if (token_str_len == 31 && + strncmp(token_str, "SecuredReadyForAggregationQueue", 31) == 0) { + resource_type = AZURE_KUSTO_RESOURCE_QUEUE; + } + /* we don't care about other resources so we just skip the next token and + move on to the next pair */ + else { + i++; + continue; + } + + /* move to the next token, second item in the array - resource URI */ + i++; + t = &tokens[i]; + + if (t->type != JSMN_STRING) { + break; + } + + token_str = response + t->start; + token_str_len = (t->end - t->start); + + resource_uri = flb_sds_copy(resource_uri, token_str, token_str_len); + if (resource_type == AZURE_KUSTO_RESOURCE_QUEUE) { + ha = queue_ha; + queue_count++; + } + else { + ha = blob_ha; + blob_count++; + } + + if (!ha) { + flb_plg_error(ctx->ins, "error creating HA upstream"); + ret = -1; + break; + } + + node = flb_upstream_node_create_url(ctx, config, resource_uri); + + if (!node) { + flb_plg_error(ctx->ins, "error creating HA upstream node"); + ret = -1; + break; + } + + flb_upstream_ha_node_add(ha, node); + } + + if (ret != -1) { + if (queue_count > 0 && blob_count > 0) { + flb_plg_debug(ctx->ins, + "parsed %d blob resources and %d queue resources", + blob_count, queue_count); + ret = 0; + } + else { + flb_plg_error(ctx->ins, "error parsing resources: missing resources"); + ret = -1; + } + } + } + else { + flb_plg_error(ctx->ins, "error parsing JSON response: %s", response); + ret = -1; + } + } + else { + flb_plg_error(ctx->ins, "error allocating tokens"); + ret = -1; + } + + flb_sds_destroy(resource_uri); + flb_free(tokens); + + return ret; +} + +/** + * Parses ".get kusto identity token" response and returns the token as an sds string + * + * @param ctx Pointer to the plugin's context + * @param response sds string containing the response body + * @return flb_sds_t The parsed token + */ +static flb_sds_t parse_ingestion_identity_token(struct flb_azure_kusto *ctx, + flb_sds_t response) +{ + flb_sds_t identity_token = NULL; + int tok_size = 19; + jsmn_parser parser; + jsmntok_t *t; + jsmntok_t *tokens; + int ret; + char *token_str; + int token_str_len; + + /** + * Response is a json in the form of + * { + * "Tables": [ + * { + * "TableName": "Table_0", + * "Columns": [{ + * "ColumnName": "AuthorizationContext", + * "DataType": "String", + * "ColumnType": "string" + * }], + * "Rows": [ + * [ + * <value>, + * ] + * ] + * } + * ] + * } + * i.e. only one row and one column is expected (exactly 13 tokens) and the value + * should be the last + */ + + jsmn_init(&parser); + tokens = flb_calloc(1, sizeof(jsmntok_t) * tok_size); + if (!tokens) { + flb_plg_error(ctx->ins, "error allocating tokens"); + return NULL; + } + + ret = jsmn_parse(&parser, response, flb_sds_len(response), tokens, tok_size); + if (ret > 0) { + t = &tokens[tok_size - 1]; + + if (t->type == JSMN_STRING) { + t = &tokens[tok_size - 1]; + token_str = response + t->start; + token_str_len = (t->end - t->start); + + identity_token = flb_sds_create_len(token_str, token_str_len); + + if (identity_token) { + flb_plg_debug(ctx->ins, "parsed kusto identity token: '%s'", + identity_token); + } + else { + flb_plg_error(ctx->ins, "error parsing kusto identity token"); + } + } + else { + flb_plg_error(ctx->ins, "unexpected JSON response: %s", response); + } + } + else { + flb_plg_error(ctx->ins, "error parsing JSON response: %s", response); + } + + flb_free(tokens); + + return identity_token; +} + +int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, + struct flb_config *config) +{ + int ret = -1; + flb_sds_t response = NULL; + flb_sds_t identity_token = NULL; + struct flb_upstream_ha *blob_ha = NULL; + struct flb_upstream_ha *queue_ha = NULL; + time_t now; + + if (pthread_mutex_lock(&ctx->resources_mutex)) { + flb_plg_error(ctx->ins, "error locking mutex"); + return -1; + } + + now = time(NULL); + + /* check if we have all resources and they are not stale */ + if (ctx->resources->blob_ha && ctx->resources->queue_ha && + ctx->resources->identity_token && + now - ctx->resources->load_time < FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC) { + flb_plg_debug(ctx->ins, "resources are already loaded and are not stale"); + ret = 0; + } + else { + flb_plg_info(ctx->ins, "loading kusto ingestion resourcs"); + response = execute_ingest_csl_command(ctx, ".get ingestion resources"); + + if (response) { + queue_ha = flb_upstream_ha_create("azure_kusto_queue_ha"); + + if (queue_ha) { + blob_ha = flb_upstream_ha_create("azure_kusto_blob_ha"); + + if (blob_ha) { + ret = + parse_storage_resources(ctx, config, response, blob_ha, queue_ha); + + if (ret == 0) { + flb_sds_destroy(response); + response = NULL; + + response = + execute_ingest_csl_command(ctx, ".get kusto identity token"); + + if (response) { + identity_token = + parse_ingestion_identity_token(ctx, response); + + if (identity_token) { + ret = flb_azure_kusto_resources_clear(ctx->resources); + + if (ret != -1) { + ctx->resources->blob_ha = blob_ha; + ctx->resources->queue_ha = queue_ha; + ctx->resources->identity_token = identity_token; + ctx->resources->load_time = now; + + ret = 0; + } + else { + flb_plg_error( + ctx->ins, + "error destroying previous ingestion resources"); + } + } + else { + flb_plg_error(ctx->ins, + "error parsing ingestion identity token"); + ret = -1; + } + } + else { + flb_plg_error(ctx->ins, "error getting kusto identity token"); + ret = -1; + } + } + else { + flb_plg_error(ctx->ins, + "error parsing ingestion storage resources"); + ret = -1; + } + + if (ret == -1) { + flb_upstream_ha_destroy(blob_ha); + } + } + else { + flb_plg_error(ctx->ins, "error creating storage resources upstreams"); + ret = -1; + } + + if (ret == -1) { + flb_upstream_ha_destroy(queue_ha); + } + } + else { + flb_plg_error(ctx->ins, "error creating storage resources upstreams"); + } + + if (response) { + flb_sds_destroy(response); + } + } + if (!response) { + flb_plg_error(ctx->ins, "error getting ingestion storage resources"); + } + } + + if (pthread_mutex_unlock(&ctx->resources_mutex)) { + flb_plg_error(ctx->ins, "error unlocking mutex"); + return -1; + } + + return ret; +} + +static int flb_azure_kusto_resources_destroy(struct flb_azure_kusto_resources *resources) +{ + int ret; + + if (!resources) { + return -1; + } + + ret = flb_azure_kusto_resources_clear(resources); + if (ret != 0) { + return -1; + } + + flb_free(resources); + + return 0; +} + +struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance *ins, + struct flb_config *config) +{ + int ret; + struct flb_azure_kusto *ctx; + + /* Allocate config context */ + ctx = flb_calloc(1, sizeof(struct flb_azure_kusto)); + if (!ctx) { + flb_errno(); + return NULL; + } + ctx->ins = ins; + ctx->config = config; + + ret = flb_output_config_map_set(ins, (void *)ctx); + if (ret == -1) { + flb_plg_error(ins, "unable to load configuration"); + flb_free(ctx); + return NULL; + } + + /* config: 'tenant_id' */ + if (ctx->tenant_id == NULL) { + flb_plg_error(ctx->ins, "property 'tenant_id' is not defined."); + flb_azure_kusto_conf_destroy(ctx); + return NULL; + } + + /* config: 'client_id' */ + if (ctx->client_id == NULL) { + flb_plg_error(ctx->ins, "property 'client_id' is not defined"); + flb_azure_kusto_conf_destroy(ctx); + return NULL; + } + + /* config: 'client_secret' */ + if (ctx->client_secret == NULL) { + flb_plg_error(ctx->ins, "property 'client_secret' is not defined"); + flb_azure_kusto_conf_destroy(ctx); + return NULL; + } + + /* config: 'ingestion_endpoint' */ + if (ctx->ingestion_endpoint == NULL) { + flb_plg_error(ctx->ins, "property 'ingestion_endpoint' is not defined"); + flb_azure_kusto_conf_destroy(ctx); + return NULL; + } + + /* config: 'database_name' */ + if (ctx->database_name == NULL) { + flb_plg_error(ctx->ins, "property 'database_name' is not defined"); + flb_azure_kusto_conf_destroy(ctx); + return NULL; + } + + /* config: 'table_name' */ + if (ctx->table_name == NULL) { + flb_plg_error(ctx->ins, "property 'table_name' is not defined"); + flb_azure_kusto_conf_destroy(ctx); + return NULL; + } + + /* Create the auth URL */ + ctx->oauth_url = flb_sds_create_size(sizeof(FLB_MSAL_AUTH_URL_TEMPLATE) - 1 + + flb_sds_len(ctx->tenant_id)); + if (!ctx->oauth_url) { + flb_errno(); + flb_azure_kusto_conf_destroy(ctx); + return NULL; + } + flb_sds_snprintf(&ctx->oauth_url, flb_sds_alloc(ctx->oauth_url), + FLB_MSAL_AUTH_URL_TEMPLATE, ctx->tenant_id); + + ctx->resources = flb_calloc(1, sizeof(struct flb_azure_kusto_resources)); + if (!ctx->resources) { + flb_errno(); + flb_azure_kusto_conf_destroy(ctx); + return NULL; + } + + flb_plg_info(ctx->ins, "endpoint='%s', database='%s', table='%s'", + ctx->ingestion_endpoint, ctx->database_name, ctx->table_name); + + return ctx; +} + +int flb_azure_kusto_conf_destroy(struct flb_azure_kusto *ctx) +{ + if (!ctx) { + return -1; + } + + if (ctx->oauth_url) { + flb_sds_destroy(ctx->oauth_url); + ctx->oauth_url = NULL; + } + + if (ctx->o) { + flb_oauth2_destroy(ctx->o); + ctx->o = NULL; + } + + if (ctx->resources) { + flb_azure_kusto_resources_destroy(ctx->resources); + ctx->resources = NULL; + } + + flb_free(ctx); + return 0; +} |