summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/out_azure_kusto
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:08:03 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:08:18 +0000
commit5da14042f70711ea5cf66e034699730335462f66 (patch)
tree0f6354ccac934ed87a2d555f45be4c831cf92f4a /src/fluent-bit/plugins/out_azure_kusto
parentReleasing debian version 1.44.3-2. (diff)
downloadnetdata-5da14042f70711ea5cf66e034699730335462f66.tar.xz
netdata-5da14042f70711ea5cf66e034699730335462f66.zip
Merging upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/fluent-bit/plugins/out_azure_kusto')
-rw-r--r--src/fluent-bit/plugins/out_azure_kusto/CMakeLists.txt7
-rw-r--r--src/fluent-bit/plugins/out_azure_kusto/azure_kusto.c477
-rw-r--r--src/fluent-bit/plugins/out_azure_kusto/azure_kusto.h110
-rw-r--r--src/fluent-bit/plugins/out_azure_kusto/azure_kusto_conf.c665
-rw-r--r--src/fluent-bit/plugins/out_azure_kusto/azure_kusto_conf.h31
-rw-r--r--src/fluent-bit/plugins/out_azure_kusto/azure_kusto_ingest.c496
-rw-r--r--src/fluent-bit/plugins/out_azure_kusto/azure_kusto_ingest.h28
7 files changed, 1814 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/out_azure_kusto/CMakeLists.txt b/src/fluent-bit/plugins/out_azure_kusto/CMakeLists.txt
new file mode 100644
index 000000000..6803bee09
--- /dev/null
+++ b/src/fluent-bit/plugins/out_azure_kusto/CMakeLists.txt
@@ -0,0 +1,7 @@
+set(src
+ azure_kusto.c
+ azure_kusto_conf.c
+ azure_kusto_ingest.c
+ )
+
+FLB_PLUGIN(out_azure_kusto "${src}" "")
diff --git a/src/fluent-bit/plugins/out_azure_kusto/azure_kusto.c b/src/fluent-bit/plugins/out_azure_kusto/azure_kusto.c
new file mode 100644
index 000000000..4b8ad9b82
--- /dev/null
+++ b/src/fluent-bit/plugins/out_azure_kusto/azure_kusto.c
@@ -0,0 +1,477 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_http_client.h>
+#include <fluent-bit/flb_kv.h>
+#include <fluent-bit/flb_oauth2.h>
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_signv4.h>
+#include <fluent-bit/flb_log_event_decoder.h>
+
+#include "azure_kusto.h"
+#include "azure_kusto_conf.h"
+#include "azure_kusto_ingest.h"
+
+/* Create a new oauth2 context and get a oauth2 token */
+static int azure_kusto_get_oauth2_token(struct flb_azure_kusto *ctx)
+{
+ int ret;
+ char *token;
+
+ /* Clear any previous oauth2 payload content */
+ flb_oauth2_payload_clear(ctx->o);
+
+ ret = flb_oauth2_payload_append(ctx->o, "grant_type", 10, "client_credentials", 18);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "error appending oauth2 params");
+ return -1;
+ }
+
+ ret = flb_oauth2_payload_append(ctx->o, "scope", 5, FLB_AZURE_KUSTO_SCOPE, 39);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "error appending oauth2 params");
+ return -1;
+ }
+
+ ret = flb_oauth2_payload_append(ctx->o, "client_id", 9, ctx->client_id, -1);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "error appending oauth2 params");
+ return -1;
+ }
+
+ ret = flb_oauth2_payload_append(ctx->o, "client_secret", 13, ctx->client_secret, -1);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "error appending oauth2 params");
+ return -1;
+ }
+
+ /* Retrieve access token */
+ token = flb_oauth2_token_get(ctx->o);
+ if (!token) {
+ flb_plg_error(ctx->ins, "error retrieving oauth2 access token");
+ return -1;
+ }
+
+ return 0;
+}
+
+flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx)
+{
+ int ret = 0;
+ flb_sds_t output = NULL;
+
+ if (pthread_mutex_lock(&ctx->token_mutex)) {
+ flb_plg_error(ctx->ins, "error locking mutex");
+ return NULL;
+ }
+
+ if (flb_oauth2_token_expired(ctx->o) == FLB_TRUE) {
+ ret = azure_kusto_get_oauth2_token(ctx);
+ }
+
+ /* Copy string to prevent race conditions (get_oauth2 can free the string) */
+ if (ret == 0) {
+ output = flb_sds_create_size(flb_sds_len(ctx->o->token_type) +
+ flb_sds_len(ctx->o->access_token) + 2);
+ if (!output) {
+ flb_plg_error(ctx->ins, "error creating token buffer");
+ return NULL;
+ }
+ flb_sds_snprintf(&output, flb_sds_alloc(output), "%s %s", ctx->o->token_type,
+ ctx->o->access_token);
+ }
+
+ if (pthread_mutex_unlock(&ctx->token_mutex)) {
+ flb_plg_error(ctx->ins, "error unlocking mutex");
+ if (output) {
+ flb_sds_destroy(output);
+ }
+ return NULL;
+ }
+
+ return output;
+}
+
+/**
+ * Executes a control command against kusto's endpoint
+ *
+ * @param ctx Plugin's context
+ * @param csl Kusto's control command
+ * @return flb_sds_t Returns the response or NULL on error.
+ */
+flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *csl)
+{
+ flb_sds_t token;
+ flb_sds_t body;
+ size_t b_sent;
+ int ret;
+ struct flb_connection *u_conn;
+ struct flb_http_client *c;
+ flb_sds_t resp = NULL;
+
+ /* Get upstream connection */
+ u_conn = flb_upstream_conn_get(ctx->u);
+
+ if (u_conn) {
+ token = get_azure_kusto_token(ctx);
+
+ if (token) {
+ /* Compose request body */
+ body = flb_sds_create_size(sizeof(FLB_AZURE_KUSTO_MGMT_BODY_TEMPLATE) - 1 +
+ strlen(csl));
+
+ if (body) {
+ flb_sds_snprintf(&body, flb_sds_alloc(body),
+ FLB_AZURE_KUSTO_MGMT_BODY_TEMPLATE, csl);
+
+ /* Compose HTTP Client request */
+ c = flb_http_client(u_conn, FLB_HTTP_POST, FLB_AZURE_KUSTO_MGMT_URI_PATH,
+ body, flb_sds_len(body), NULL, 0, NULL, 0);
+
+ if (c) {
+ /* Add headers */
+ flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
+ flb_http_add_header(c, "Content-Type", 12, "application/json", 16);
+ flb_http_add_header(c, "Accept", 6, "application/json", 16);
+ flb_http_add_header(c, "Authorization", 13, token,
+ flb_sds_len(token));
+ flb_http_buffer_size(c, FLB_HTTP_DATA_SIZE_MAX * 10);
+
+ /* Send HTTP request */
+ ret = flb_http_do(c, &b_sent);
+ flb_plg_debug(
+ ctx->ins,
+ "Kusto ingestion command request http_do=%i, HTTP Status: %i",
+ ret, c->resp.status);
+
+ if (ret == 0) {
+ if (c->resp.status == 200) {
+ /* Copy payload response to the response param */
+ resp =
+ flb_sds_create_len(c->resp.payload, c->resp.payload_size);
+ }
+ else if (c->resp.payload_size > 0) {
+ flb_plg_debug(ctx->ins, "Request failed and returned: \n%s",
+ c->resp.payload);
+ }
+ else {
+ flb_plg_debug(ctx->ins, "Request failed");
+ }
+ }
+ else {
+ flb_plg_error(ctx->ins, "cannot send HTTP request");
+ }
+
+ flb_http_client_destroy(c);
+ }
+ else {
+ flb_plg_error(ctx->ins, "cannot create HTTP client context");
+ }
+
+ flb_sds_destroy(body);
+ }
+ else {
+ flb_plg_error(ctx->ins, "cannot construct request body");
+ }
+
+ flb_sds_destroy(token);
+ }
+ else {
+ flb_plg_error(ctx->ins, "cannot retrieve oauth2 token");
+ }
+
+ flb_upstream_conn_release(u_conn);
+ }
+ else {
+ flb_plg_error(ctx->ins, "cannot create upstream connection");
+ }
+
+ return resp;
+}
+
+static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_config *config,
+ void *data)
+{
+ int io_flags = FLB_IO_TLS;
+ struct flb_azure_kusto *ctx;
+
+ /* Create config context */
+ ctx = flb_azure_kusto_conf_create(ins, config);
+ if (!ctx) {
+ flb_plg_error(ins, "configuration failed");
+ return -1;
+ }
+
+ flb_output_set_context(ins, ctx);
+
+ /* Network mode IPv6 */
+ if (ins->host.ipv6 == FLB_TRUE) {
+ io_flags |= FLB_IO_IPV6;
+ }
+
+ /* Create mutex for acquiring oauth tokens and getting ingestion resources (they
+ * are shared across flush coroutines)
+ */
+ pthread_mutex_init(&ctx->token_mutex, NULL);
+ pthread_mutex_init(&ctx->resources_mutex, NULL);
+
+ /*
+ * Create upstream context for Kusto Ingestion endpoint
+ */
+ ctx->u = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls);
+ if (!ctx->u) {
+ flb_plg_error(ctx->ins, "upstream creation failed");
+ return -1;
+ }
+
+ /* Create oauth2 context */
+ ctx->o =
+ flb_oauth2_create(ctx->config, ctx->oauth_url, FLB_AZURE_KUSTO_TOKEN_REFRESH);
+ if (!ctx->o) {
+ flb_plg_error(ctx->ins, "cannot create oauth2 context");
+ return -1;
+ }
+ flb_output_upstream_set(ctx->u, ins);
+
+ return 0;
+}
+
+static int azure_kusto_format(struct flb_azure_kusto *ctx, const char *tag, int tag_len,
+ const void *data, size_t bytes, void **out_data,
+ size_t *out_size)
+{
+ int records = 0;
+ msgpack_sbuffer mp_sbuf;
+ msgpack_packer mp_pck;
+ /* for sub msgpack objs */
+ int map_size;
+ struct tm tms;
+ char time_formatted[32];
+ size_t s;
+ int len;
+ struct flb_log_event_decoder log_decoder;
+ struct flb_log_event log_event;
+ int ret;
+ /* output buffer */
+ flb_sds_t out_buf;
+
+ /* Create array for all records */
+ records = flb_mp_count(data, bytes);
+ if (records <= 0) {
+ flb_plg_error(ctx->ins, "error counting msgpack entries");
+ return -1;
+ }
+
+ ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
+
+ if (ret != FLB_EVENT_DECODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event decoder initialization error : %d", ret);
+
+ return -1;
+ }
+
+ /* Create temporary msgpack buffer */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ msgpack_pack_array(&mp_pck, records);
+
+ while ((ret = flb_log_event_decoder_next(
+ &log_decoder,
+ &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
+ map_size = 1;
+ if (ctx->include_time_key == FLB_TRUE) {
+ map_size++;
+ }
+
+ if (ctx->include_tag_key == FLB_TRUE) {
+ map_size++;
+ }
+
+ msgpack_pack_map(&mp_pck, map_size);
+
+ /* include_time_key */
+ if (ctx->include_time_key == FLB_TRUE) {
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->time_key));
+ msgpack_pack_str_body(&mp_pck, ctx->time_key, flb_sds_len(ctx->time_key));
+
+ /* Append the time value as ISO 8601 */
+ gmtime_r(&log_event.timestamp.tm.tv_sec, &tms);
+ s = strftime(time_formatted, sizeof(time_formatted) - 1,
+ FLB_PACK_JSON_DATE_ISO8601_FMT, &tms);
+
+ len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s,
+ ".%03" PRIu64 "Z",
+ (uint64_t)log_event.timestamp.tm.tv_nsec / 1000000);
+ s += len;
+ msgpack_pack_str(&mp_pck, s);
+ msgpack_pack_str_body(&mp_pck, time_formatted, s);
+ }
+
+ /* include_tag_key */
+ if (ctx->include_tag_key == FLB_TRUE) {
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->tag_key));
+ msgpack_pack_str_body(&mp_pck, ctx->tag_key, flb_sds_len(ctx->tag_key));
+ msgpack_pack_str(&mp_pck, tag_len);
+ msgpack_pack_str_body(&mp_pck, tag, tag_len);
+ }
+
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->log_key));
+ msgpack_pack_str_body(&mp_pck, ctx->log_key, flb_sds_len(ctx->log_key));
+ msgpack_pack_object(&mp_pck, *log_event.body);
+ }
+
+ /* Convert from msgpack to JSON */
+ out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
+
+ /* Cleanup */
+ flb_log_event_decoder_destroy(&log_decoder);
+ msgpack_sbuffer_destroy(&mp_sbuf);
+
+ if (!out_buf) {
+ flb_plg_error(ctx->ins, "error formatting JSON payload");
+ return -1;
+ }
+
+ *out_data = out_buf;
+ *out_size = flb_sds_len(out_buf);
+
+ return 0;
+}
+
+static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
+ struct flb_output_flush *out_flush,
+ struct flb_input_instance *i_ins, void *out_context,
+ struct flb_config *config)
+{
+ int ret;
+ flb_sds_t json;
+ size_t json_size;
+ size_t tag_len;
+ struct flb_azure_kusto *ctx = out_context;
+
+ (void)i_ins;
+ (void)config;
+
+ flb_plg_trace(ctx->ins, "flushing bytes %zu", event_chunk->size);
+
+ tag_len = flb_sds_len(event_chunk->tag);
+
+ /* Load or refresh ingestion resources */
+ ret = azure_kusto_load_ingestion_resources(ctx, config);
+ if (ret != 0) {
+ flb_plg_error(ctx->ins, "cannot load ingestion resources");
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ /* Reformat msgpack to JSON payload */
+ ret = azure_kusto_format(ctx, event_chunk->tag, tag_len, event_chunk->data,
+ event_chunk->size, (void **)&json, &json_size);
+ if (ret != 0) {
+ flb_plg_error(ctx->ins, "cannot reformat data into json");
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, json, json_size);
+ if (ret != 0) {
+ flb_plg_error(ctx->ins, "cannot perform queued ingestion");
+ flb_sds_destroy(json);
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ /* Cleanup */
+ flb_sds_destroy(json);
+
+ /* Done */
+ FLB_OUTPUT_RETURN(FLB_OK);
+}
+
+static int cb_azure_kusto_exit(void *data, struct flb_config *config)
+{
+ struct flb_azure_kusto *ctx = data;
+
+ if (!ctx) {
+ return -1;
+ }
+
+ if (ctx->u) {
+ flb_upstream_destroy(ctx->u);
+ ctx->u = NULL;
+ }
+
+ flb_azure_kusto_conf_destroy(ctx);
+
+ return 0;
+}
+
+static struct flb_config_map config_map[] = {
+ {FLB_CONFIG_MAP_STR, "tenant_id", (char *)NULL, 0, FLB_TRUE,
+ offsetof(struct flb_azure_kusto, tenant_id),
+ "Set the tenant ID of the AAD application used for authentication"},
+ {FLB_CONFIG_MAP_STR, "client_id", (char *)NULL, 0, FLB_TRUE,
+ offsetof(struct flb_azure_kusto, client_id),
+ "Set the client ID (Application ID) of the AAD application used for authentication"},
+ {FLB_CONFIG_MAP_STR, "client_secret", (char *)NULL, 0, FLB_TRUE,
+ offsetof(struct flb_azure_kusto, client_secret),
+ "Set the client secret (Application Password) of the AAD application used for "
+ "authentication"},
+ {FLB_CONFIG_MAP_STR, "ingestion_endpoint", (char *)NULL, 0, FLB_TRUE,
+ offsetof(struct flb_azure_kusto, ingestion_endpoint),
+ "Set the Kusto cluster's ingestion endpoint URL (e.g. "
+ "https://ingest-mycluster.eastus.kusto.windows.net)"},
+ {FLB_CONFIG_MAP_STR, "database_name", (char *)NULL, 0, FLB_TRUE,
+ offsetof(struct flb_azure_kusto, database_name), "Set the database name"},
+ {FLB_CONFIG_MAP_STR, "table_name", (char *)NULL, 0, FLB_TRUE,
+ offsetof(struct flb_azure_kusto, table_name), "Set the table name"},
+ {FLB_CONFIG_MAP_STR, "ingestion_mapping_reference", (char *)NULL, 0, FLB_TRUE,
+ offsetof(struct flb_azure_kusto, ingestion_mapping_reference),
+ "Set the ingestion mapping reference"},
+ {FLB_CONFIG_MAP_STR, "log_key", FLB_AZURE_KUSTO_DEFAULT_LOG_KEY, 0, FLB_TRUE,
+ offsetof(struct flb_azure_kusto, log_key), "The key name of event payload"},
+ {FLB_CONFIG_MAP_BOOL, "include_tag_key", "true", 0, FLB_TRUE,
+ offsetof(struct flb_azure_kusto, include_tag_key),
+ "If enabled, tag is appended to output. "
+ "The key name is used 'tag_key' property."},
+ {FLB_CONFIG_MAP_STR, "tag_key", FLB_AZURE_KUSTO_DEFAULT_TAG_KEY, 0, FLB_TRUE,
+ offsetof(struct flb_azure_kusto, tag_key),
+ "The key name of tag. If 'include_tag_key' is false, "
+ "This property is ignored"},
+ {FLB_CONFIG_MAP_BOOL, "include_time_key", "true", 0, FLB_TRUE,
+ offsetof(struct flb_azure_kusto, include_time_key),
+ "If enabled, time is appended to output. "
+ "The key name is used 'time_key' property."},
+ {FLB_CONFIG_MAP_STR, "time_key", FLB_AZURE_KUSTO_DEFAULT_TIME_KEY, 0, FLB_TRUE,
+ offsetof(struct flb_azure_kusto, time_key),
+ "The key name of the time. If 'include_time_key' is false, "
+ "This property is ignored"},
+ /* EOF */
+ {0}};
+
+struct flb_output_plugin out_azure_kusto_plugin = {
+ .name = "azure_kusto",
+ .description = "Send events to Kusto (Azure Data Explorer)",
+ .cb_init = cb_azure_kusto_init,
+ .cb_flush = cb_azure_kusto_flush,
+ .cb_exit = cb_azure_kusto_exit,
+ .config_map = config_map,
+ /* Plugin flags */
+ .flags = FLB_OUTPUT_NET | FLB_IO_TLS,
+};
diff --git a/src/fluent-bit/plugins/out_azure_kusto/azure_kusto.h b/src/fluent-bit/plugins/out_azure_kusto/azure_kusto.h
new file mode 100644
index 000000000..ac4eedfd0
--- /dev/null
+++ b/src/fluent-bit/plugins/out_azure_kusto/azure_kusto.h
@@ -0,0 +1,110 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_OUT_AZURE_KUSTO
+#define FLB_OUT_AZURE_KUSTO
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_oauth2.h>
+#include <fluent-bit/flb_output.h>
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/flb_upstream_ha.h>
+
+/* refresh token every 50 minutes */
+#define FLB_AZURE_KUSTO_TOKEN_REFRESH 3000
+
+/* Kusto streaming inserts oauth scope */
+#define FLB_AZURE_KUSTO_SCOPE "https://help.kusto.windows.net/.default"
+
+/* MSAL authorization URL */
+#define FLB_MSAL_AUTH_URL_TEMPLATE \
+ "https://login.microsoftonline.com/%s/oauth2/v2.0/token"
+
+#define FLB_AZURE_KUSTO_MGMT_URI_PATH "/v1/rest/mgmt"
+#define FLB_AZURE_KUSTO_MGMT_BODY_TEMPLATE "{\"csl\":\"%s\", \"db\": \"NetDefaultDB\"}"
+
+#define FLB_AZURE_KUSTO_DEFAULT_TIME_KEY "timestamp"
+#define FLB_AZURE_KUSTO_DEFAULT_TAG_KEY "tag"
+#define FLB_AZURE_KUSTO_DEFAULT_LOG_KEY "log"
+
+#define AZURE_KUSTO_RESOURCE_STORAGE 0
+#define AZURE_KUSTO_RESOURCE_QUEUE 1
+
+#define AZURE_KUSTO_RESOURCE_UPSTREAM_URI "uri"
+#define AZURE_KUSTO_RESOURCE_UPSTREAM_SAS "sas"
+
+#define FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC 3600
+
+struct flb_azure_kusto_resources {
+ struct flb_upstream_ha *blob_ha;
+ struct flb_upstream_ha *queue_ha;
+ flb_sds_t identity_token;
+
+ /* used to reload resouces after some time */
+ time_t load_time;
+};
+
+struct flb_azure_kusto {
+ /* azure_kusto configuration */
+ flb_sds_t tenant_id;
+ flb_sds_t client_id;
+ flb_sds_t client_secret;
+ flb_sds_t ingestion_endpoint;
+ flb_sds_t database_name;
+ flb_sds_t table_name;
+ flb_sds_t ingestion_mapping_reference;
+
+ /* records configuration */
+ flb_sds_t log_key;
+ int include_tag_key;
+ flb_sds_t tag_key;
+ int include_time_key;
+ flb_sds_t time_key;
+
+ /* --- internal data --- */
+
+ flb_sds_t ingestion_mgmt_endpoint;
+
+ /* oauth2 context */
+ flb_sds_t oauth_url;
+ struct flb_oauth2 *o;
+
+ /* mutex for acquiring oauth tokens */
+ pthread_mutex_t token_mutex;
+
+ /* ingestion resources */
+ struct flb_azure_kusto_resources *resources;
+
+ /* mutex for loading reosurces */
+ pthread_mutex_t resources_mutex;
+
+ /* Upstream connection to the backend server */
+ struct flb_upstream *u;
+
+ /* Fluent Bit context */
+ struct flb_config *config;
+
+ /* Plugin output instance reference */
+ struct flb_output_instance *ins;
+};
+
+flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx);
+flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *csl);
+
+#endif
diff --git a/src/fluent-bit/plugins/out_azure_kusto/azure_kusto_conf.c b/src/fluent-bit/plugins/out_azure_kusto/azure_kusto_conf.c
new file mode 100644
index 000000000..5303fef67
--- /dev/null
+++ b/src/fluent-bit/plugins/out_azure_kusto/azure_kusto_conf.c
@@ -0,0 +1,665 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_jsmn.h>
+#include <fluent-bit/flb_oauth2.h>
+#include <fluent-bit/flb_output.h>
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_unescape.h>
+#include <fluent-bit/flb_upstream_ha.h>
+#include <fluent-bit/flb_utils.h>
+
+#include "azure_kusto.h"
+#include "azure_kusto_conf.h"
+
+static struct flb_upstream_node *flb_upstream_node_create_url(struct flb_azure_kusto *ctx,
+ struct flb_config *config,
+ const char *url)
+{
+ int ret;
+ char *prot = NULL;
+ char *host = NULL;
+ char *port = NULL;
+ char *uri = NULL;
+ flb_sds_t sds_host = NULL;
+ flb_sds_t sds_port = NULL;
+ char *tmp;
+ struct flb_hash_table *kv = NULL;
+ struct flb_upstream_node *node = NULL;
+ int uri_length;
+ int sas_length;
+
+ /* Parse and split URL */
+ ret = flb_utils_url_split(url, &prot, &host, &port, &uri);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "invalid URL: %s", url);
+ return NULL;
+ }
+
+ /* find sas token in query */
+ tmp = strchr(uri, '?');
+
+ if (tmp) {
+ uri_length = tmp - uri;
+ sas_length = strnlen(tmp + 1, 256);
+
+ /* kv that will hold base uri, and sas token */
+ kv = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 2, 2);
+
+ if (kv) {
+ ret = flb_hash_table_add(kv, AZURE_KUSTO_RESOURCE_UPSTREAM_URI, 3, uri, uri_length);
+
+ if (ret != -1) {
+ ret = flb_hash_table_add(kv, AZURE_KUSTO_RESOURCE_UPSTREAM_SAS, 3, tmp + 1,
+ sas_length);
+
+ if (ret != -1) {
+ /* if any/all of these creations would fail the node creation will fail and cleanup */
+ sds_host = flb_sds_create(host);
+ sds_port = flb_sds_create(port);
+
+ node = flb_upstream_node_create(
+ NULL, sds_host, sds_port, FLB_TRUE, ctx->ins->tls->verify,
+ ctx->ins->tls->debug, ctx->ins->tls->vhost, NULL, NULL, NULL,
+ NULL, NULL, kv, config);
+
+ if (!node) {
+ flb_plg_error(ctx->ins, "error creating resource upstream node");
+ }
+ }
+ else {
+ flb_plg_error(ctx->ins, "error storing resource sas token");
+ }
+ }
+ else {
+ flb_plg_error(ctx->ins, "error storing resource uri");
+ }
+
+ /* avoid destorying if function is successful */
+ if (!node) {
+ flb_hash_table_destroy(kv);
+ }
+ }
+ else {
+ flb_plg_error(ctx->ins, "error creating upstream node hash table");
+ }
+ }
+ else {
+ flb_plg_error(ctx->ins, "uri has no sas token query: %s", uri);
+ }
+
+ flb_free(prot);
+ flb_free(host);
+ flb_free(port);
+ flb_free(uri);
+
+ return node;
+}
+
+static int flb_azure_kusto_resources_clear(struct flb_azure_kusto_resources *resources)
+{
+ if (!resources) {
+ return -1;
+ }
+
+ if (resources->blob_ha) {
+ flb_upstream_ha_destroy(resources->blob_ha);
+ resources->blob_ha = NULL;
+ }
+
+ if (resources->queue_ha) {
+ flb_upstream_ha_destroy(resources->queue_ha);
+ resources->queue_ha = NULL;
+ }
+
+ if (resources->identity_token) {
+ flb_sds_destroy(resources->identity_token);
+ resources->identity_token = NULL;
+ }
+
+ resources->load_time = 0;
+
+ return 0;
+}
+
+/**
+ * Parses ".get ingestion resources" response into HA upstreams of the queue & blob
+ * resources in the response.
+ *
+ * @param ctx Pointer to the plugin's context
+ * @param config Pointer to the config
+ * @param response sds string containing the response body
+ * @param blob_ha Pointer to an HA upstream for the blob resources, that would be
+ * allocated here.
+ * @param queue_ha Pointer to an HA upstream for the queue resources, that would be
+ * allocated here.
+ * @return int 0 on success, -1 on failure
+ */
+static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_config *config,
+ flb_sds_t response, struct flb_upstream_ha *blob_ha,
+ struct flb_upstream_ha *queue_ha)
+{
+ jsmn_parser parser;
+ jsmntok_t *t;
+ jsmntok_t *tokens;
+ int tok_size = 100;
+ int ret = -1;
+ int i;
+ int blob_count = 0;
+ int queue_count = 0;
+ char *token_str;
+ int token_str_len;
+ int resource_type;
+ struct flb_upstream_node *node;
+ struct flb_upstream_ha *ha;
+ flb_sds_t resource_uri;
+
+ /* Response is a json in the form of
+ * {
+ * "Tables": [
+ * {
+ * "TableName": "Table_0",
+ * "Columns": [...],
+ * "Rows": [
+ * [
+ * ("TempStorage" | "SecuredReadyForAggregationQueue" |
+ * "SuccessfulIngestionsQueue" | "FailedIngestionsQueue" | "IngestionsStatusTable"),
+ * <URI with SAS>
+ * ],
+ * ...
+ * ]
+ * }
+ * ]
+ * }
+ */
+
+ resource_uri = flb_sds_create(NULL);
+ if (!resource_uri) {
+ flb_plg_error(ctx->ins, "error allocating resource uri buffer");
+ return -1;
+ }
+
+ jsmn_init(&parser);
+ tokens = flb_calloc(1, sizeof(jsmntok_t) * tok_size);
+
+ if (tokens) {
+ ret = jsmn_parse(&parser, response, flb_sds_len(response), tokens, tok_size);
+
+ if (ret > 0) {
+ /* skip all tokens until we reach "Rows" */
+ for (i = 0; i < ret - 1; i++) {
+ t = &tokens[i];
+
+ if (t->type != JSMN_STRING) {
+ continue;
+ }
+
+ token_str = response + t->start;
+ token_str_len = (t->end - t->start);
+
+ /**
+ * if we found the Rows key, skipping this token and the next one (key and
+ * wrapping array value)
+ */
+ if (token_str_len == 4 && strncmp(token_str, "Rows", 4) == 0) {
+ i += 2;
+ break;
+ }
+ }
+
+ /* iterating rows, each row will have 3 tokens: the array holding the column
+ * values, the first value containing the resource type, and the second value
+ * containing the resource uri */
+ for (; i < ret; i++) {
+ t = &tokens[i];
+
+ /**
+ * each token should be an array with 2 strings:
+ * First will be the resource type (TempStorage,
+ * SecuredReadyForAggregationQueue, etc...) Second will be the SAS URI
+ */
+ if (t->type != JSMN_ARRAY) {
+ break;
+ }
+
+ /* move to the next token, first item in the array - resource type */
+ i++;
+ t = &tokens[i];
+ if (t->type != JSMN_STRING) {
+ break;
+ }
+
+ token_str = response + t->start;
+ token_str_len = (t->end - t->start);
+
+ flb_plg_debug(ctx->ins, "found resource of type: %.*s ",
+ t->end - t->start, response + t->start);
+
+ if (token_str_len == 11 && strncmp(token_str, "TempStorage", 11) == 0) {
+ resource_type = AZURE_KUSTO_RESOURCE_STORAGE;
+ }
+ else if (token_str_len == 31 &&
+ strncmp(token_str, "SecuredReadyForAggregationQueue", 31) == 0) {
+ resource_type = AZURE_KUSTO_RESOURCE_QUEUE;
+ }
+ /* we don't care about other resources so we just skip the next token and
+ move on to the next pair */
+ else {
+ i++;
+ continue;
+ }
+
+ /* move to the next token, second item in the array - resource URI */
+ i++;
+ t = &tokens[i];
+
+ if (t->type != JSMN_STRING) {
+ break;
+ }
+
+ token_str = response + t->start;
+ token_str_len = (t->end - t->start);
+
+ resource_uri = flb_sds_copy(resource_uri, token_str, token_str_len);
+ if (resource_type == AZURE_KUSTO_RESOURCE_QUEUE) {
+ ha = queue_ha;
+ queue_count++;
+ }
+ else {
+ ha = blob_ha;
+ blob_count++;
+ }
+
+ if (!ha) {
+ flb_plg_error(ctx->ins, "error creating HA upstream");
+ ret = -1;
+ break;
+ }
+
+ node = flb_upstream_node_create_url(ctx, config, resource_uri);
+
+ if (!node) {
+ flb_plg_error(ctx->ins, "error creating HA upstream node");
+ ret = -1;
+ break;
+ }
+
+ flb_upstream_ha_node_add(ha, node);
+ }
+
+ if (ret != -1) {
+ if (queue_count > 0 && blob_count > 0) {
+ flb_plg_debug(ctx->ins,
+ "parsed %d blob resources and %d queue resources",
+ blob_count, queue_count);
+ ret = 0;
+ }
+ else {
+ flb_plg_error(ctx->ins, "error parsing resources: missing resources");
+ ret = -1;
+ }
+ }
+ }
+ else {
+ flb_plg_error(ctx->ins, "error parsing JSON response: %s", response);
+ ret = -1;
+ }
+ }
+ else {
+ flb_plg_error(ctx->ins, "error allocating tokens");
+ ret = -1;
+ }
+
+ flb_sds_destroy(resource_uri);
+ flb_free(tokens);
+
+ return ret;
+}
+
+/**
+ * Parses ".get kusto identity token" response and returns the token as an sds string
+ *
+ * @param ctx Pointer to the plugin's context
+ * @param response sds string containing the response body
+ * @return flb_sds_t The parsed token
+ */
+static flb_sds_t parse_ingestion_identity_token(struct flb_azure_kusto *ctx,
+ flb_sds_t response)
+{
+ flb_sds_t identity_token = NULL;
+ int tok_size = 19;
+ jsmn_parser parser;
+ jsmntok_t *t;
+ jsmntok_t *tokens;
+ int ret;
+ char *token_str;
+ int token_str_len;
+
+ /**
+ * Response is a json in the form of
+ * {
+ * "Tables": [
+ * {
+ * "TableName": "Table_0",
+ * "Columns": [{
+ * "ColumnName": "AuthorizationContext",
+ * "DataType": "String",
+ * "ColumnType": "string"
+ * }],
+ * "Rows": [
+ * [
+ * <value>,
+ * ]
+ * ]
+ * }
+ * ]
+ * }
+ * i.e. only one row and one column is expected (exactly 13 tokens) and the value
+ * should be the last
+ */
+
+ jsmn_init(&parser);
+ tokens = flb_calloc(1, sizeof(jsmntok_t) * tok_size);
+ if (!tokens) {
+ flb_plg_error(ctx->ins, "error allocating tokens");
+ return NULL;
+ }
+
+ ret = jsmn_parse(&parser, response, flb_sds_len(response), tokens, tok_size);
+ if (ret > 0) {
+ t = &tokens[tok_size - 1];
+
+ if (t->type == JSMN_STRING) {
+ t = &tokens[tok_size - 1];
+ token_str = response + t->start;
+ token_str_len = (t->end - t->start);
+
+ identity_token = flb_sds_create_len(token_str, token_str_len);
+
+ if (identity_token) {
+ flb_plg_debug(ctx->ins, "parsed kusto identity token: '%s'",
+ identity_token);
+ }
+ else {
+ flb_plg_error(ctx->ins, "error parsing kusto identity token");
+ }
+ }
+ else {
+ flb_plg_error(ctx->ins, "unexpected JSON response: %s", response);
+ }
+ }
+ else {
+ flb_plg_error(ctx->ins, "error parsing JSON response: %s", response);
+ }
+
+ flb_free(tokens);
+
+ return identity_token;
+}
+
+int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx,
+ struct flb_config *config)
+{
+ int ret = -1;
+ flb_sds_t response = NULL;
+ flb_sds_t identity_token = NULL;
+ struct flb_upstream_ha *blob_ha = NULL;
+ struct flb_upstream_ha *queue_ha = NULL;
+ time_t now;
+
+ if (pthread_mutex_lock(&ctx->resources_mutex)) {
+ flb_plg_error(ctx->ins, "error locking mutex");
+ return -1;
+ }
+
+ now = time(NULL);
+
+ /* check if we have all resources and they are not stale */
+ if (ctx->resources->blob_ha && ctx->resources->queue_ha &&
+ ctx->resources->identity_token &&
+ now - ctx->resources->load_time < FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC) {
+ flb_plg_debug(ctx->ins, "resources are already loaded and are not stale");
+ ret = 0;
+ }
+ else {
+ flb_plg_info(ctx->ins, "loading kusto ingestion resourcs");
+ response = execute_ingest_csl_command(ctx, ".get ingestion resources");
+
+ if (response) {
+ queue_ha = flb_upstream_ha_create("azure_kusto_queue_ha");
+
+ if (queue_ha) {
+ blob_ha = flb_upstream_ha_create("azure_kusto_blob_ha");
+
+ if (blob_ha) {
+ ret =
+ parse_storage_resources(ctx, config, response, blob_ha, queue_ha);
+
+ if (ret == 0) {
+ flb_sds_destroy(response);
+ response = NULL;
+
+ response =
+ execute_ingest_csl_command(ctx, ".get kusto identity token");
+
+ if (response) {
+ identity_token =
+ parse_ingestion_identity_token(ctx, response);
+
+ if (identity_token) {
+ ret = flb_azure_kusto_resources_clear(ctx->resources);
+
+ if (ret != -1) {
+ ctx->resources->blob_ha = blob_ha;
+ ctx->resources->queue_ha = queue_ha;
+ ctx->resources->identity_token = identity_token;
+ ctx->resources->load_time = now;
+
+ ret = 0;
+ }
+ else {
+ flb_plg_error(
+ ctx->ins,
+ "error destroying previous ingestion resources");
+ }
+ }
+ else {
+ flb_plg_error(ctx->ins,
+ "error parsing ingestion identity token");
+ ret = -1;
+ }
+ }
+ else {
+ flb_plg_error(ctx->ins, "error getting kusto identity token");
+ ret = -1;
+ }
+ }
+ else {
+ flb_plg_error(ctx->ins,
+ "error parsing ingestion storage resources");
+ ret = -1;
+ }
+
+ if (ret == -1) {
+ flb_upstream_ha_destroy(blob_ha);
+ }
+ }
+ else {
+ flb_plg_error(ctx->ins, "error creating storage resources upstreams");
+ ret = -1;
+ }
+
+ if (ret == -1) {
+ flb_upstream_ha_destroy(queue_ha);
+ }
+ }
+ else {
+ flb_plg_error(ctx->ins, "error creating storage resources upstreams");
+ }
+
+ if (response) {
+ flb_sds_destroy(response);
+ }
+ }
+ if (!response) {
+ flb_plg_error(ctx->ins, "error getting ingestion storage resources");
+ }
+ }
+
+ if (pthread_mutex_unlock(&ctx->resources_mutex)) {
+ flb_plg_error(ctx->ins, "error unlocking mutex");
+ return -1;
+ }
+
+ return ret;
+}
+
+static int flb_azure_kusto_resources_destroy(struct flb_azure_kusto_resources *resources)
+{
+ int ret;
+
+ if (!resources) {
+ return -1;
+ }
+
+ ret = flb_azure_kusto_resources_clear(resources);
+ if (ret != 0) {
+ return -1;
+ }
+
+ flb_free(resources);
+
+ return 0;
+}
+
+struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance *ins,
+ struct flb_config *config)
+{
+ int ret;
+ struct flb_azure_kusto *ctx;
+
+ /* Allocate config context */
+ ctx = flb_calloc(1, sizeof(struct flb_azure_kusto));
+ if (!ctx) {
+ flb_errno();
+ return NULL;
+ }
+ ctx->ins = ins;
+ ctx->config = config;
+
+ ret = flb_output_config_map_set(ins, (void *)ctx);
+ if (ret == -1) {
+ flb_plg_error(ins, "unable to load configuration");
+ flb_free(ctx);
+ return NULL;
+ }
+
+ /* config: 'tenant_id' */
+ if (ctx->tenant_id == NULL) {
+ flb_plg_error(ctx->ins, "property 'tenant_id' is not defined.");
+ flb_azure_kusto_conf_destroy(ctx);
+ return NULL;
+ }
+
+ /* config: 'client_id' */
+ if (ctx->client_id == NULL) {
+ flb_plg_error(ctx->ins, "property 'client_id' is not defined");
+ flb_azure_kusto_conf_destroy(ctx);
+ return NULL;
+ }
+
+ /* config: 'client_secret' */
+ if (ctx->client_secret == NULL) {
+ flb_plg_error(ctx->ins, "property 'client_secret' is not defined");
+ flb_azure_kusto_conf_destroy(ctx);
+ return NULL;
+ }
+
+ /* config: 'ingestion_endpoint' */
+ if (ctx->ingestion_endpoint == NULL) {
+ flb_plg_error(ctx->ins, "property 'ingestion_endpoint' is not defined");
+ flb_azure_kusto_conf_destroy(ctx);
+ return NULL;
+ }
+
+ /* config: 'database_name' */
+ if (ctx->database_name == NULL) {
+ flb_plg_error(ctx->ins, "property 'database_name' is not defined");
+ flb_azure_kusto_conf_destroy(ctx);
+ return NULL;
+ }
+
+ /* config: 'table_name' */
+ if (ctx->table_name == NULL) {
+ flb_plg_error(ctx->ins, "property 'table_name' is not defined");
+ flb_azure_kusto_conf_destroy(ctx);
+ return NULL;
+ }
+
+ /* Create the auth URL */
+ ctx->oauth_url = flb_sds_create_size(sizeof(FLB_MSAL_AUTH_URL_TEMPLATE) - 1 +
+ flb_sds_len(ctx->tenant_id));
+ if (!ctx->oauth_url) {
+ flb_errno();
+ flb_azure_kusto_conf_destroy(ctx);
+ return NULL;
+ }
+ flb_sds_snprintf(&ctx->oauth_url, flb_sds_alloc(ctx->oauth_url),
+ FLB_MSAL_AUTH_URL_TEMPLATE, ctx->tenant_id);
+
+ ctx->resources = flb_calloc(1, sizeof(struct flb_azure_kusto_resources));
+ if (!ctx->resources) {
+ flb_errno();
+ flb_azure_kusto_conf_destroy(ctx);
+ return NULL;
+ }
+
+ flb_plg_info(ctx->ins, "endpoint='%s', database='%s', table='%s'",
+ ctx->ingestion_endpoint, ctx->database_name, ctx->table_name);
+
+ return ctx;
+}
+
+int flb_azure_kusto_conf_destroy(struct flb_azure_kusto *ctx)
+{
+ if (!ctx) {
+ return -1;
+ }
+
+ if (ctx->oauth_url) {
+ flb_sds_destroy(ctx->oauth_url);
+ ctx->oauth_url = NULL;
+ }
+
+ if (ctx->o) {
+ flb_oauth2_destroy(ctx->o);
+ ctx->o = NULL;
+ }
+
+ if (ctx->resources) {
+ flb_azure_kusto_resources_destroy(ctx->resources);
+ ctx->resources = NULL;
+ }
+
+ flb_free(ctx);
+ return 0;
+}
diff --git a/src/fluent-bit/plugins/out_azure_kusto/azure_kusto_conf.h b/src/fluent-bit/plugins/out_azure_kusto/azure_kusto_conf.h
new file mode 100644
index 000000000..b4b2e3a39
--- /dev/null
+++ b/src/fluent-bit/plugins/out_azure_kusto/azure_kusto_conf.h
@@ -0,0 +1,31 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_OUT_AZURE_KUSTO_CONF_H
+#define FLB_OUT_AZURE_KUSTO_CONF_H
+
+#include "azure_kusto.h"
+
+int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx,
+ struct flb_config *config);
+struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance *ins,
+ struct flb_config *config);
+int flb_azure_kusto_conf_destroy(struct flb_azure_kusto *ctx);
+
+#endif
diff --git a/src/fluent-bit/plugins/out_azure_kusto/azure_kusto_ingest.c b/src/fluent-bit/plugins/out_azure_kusto/azure_kusto_ingest.c
new file mode 100644
index 000000000..d38d92e7f
--- /dev/null
+++ b/src/fluent-bit/plugins/out_azure_kusto/azure_kusto_ingest.c
@@ -0,0 +1,496 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_base64.h>
+#include <fluent-bit/flb_http_client.h>
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_random.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_utils.h>
+
+#include <math.h>
+#include <msgpack.h>
+
+#include "azure_kusto_ingest.h"
+
+/* not really uuid but a random string in the form 00000000-0000-0000-0000-000000000000 */
+static char *generate_uuid()
+{
+ char *chars = "0123456789abcdef";
+ char *uuid;
+ int i;
+ uint64_t rand;
+
+ uuid = flb_malloc(37);
+ if (!uuid) {
+ flb_errno();
+ return NULL;
+ }
+
+ for (i = 0; i < 36; i++) {
+ if (i == 8 || i == 13 || i == 18 || i == 23) {
+ uuid[i] = '-';
+ continue;
+ }
+
+ if (flb_random_bytes((unsigned char *)&rand, sizeof(uint64_t))) {
+ rand = time(NULL);
+ }
+ uuid[i] = chars[rand % 16];
+ }
+ uuid[36] = '\0';
+
+ return uuid;
+}
+
+static char *base64_encode(flb_sds_t s, size_t len, size_t *out_len)
+{
+ char *b64;
+ int ret;
+ size_t buffer_len = 4 * ceil(((double)len / 3) + 1);
+
+ b64 = flb_malloc(buffer_len);
+ if (!b64) {
+ flb_errno();
+ return NULL;
+ }
+
+ ret = flb_base64_encode((unsigned char *)b64, buffer_len, out_len, (unsigned char *)s,
+ len);
+ if (ret != 0) {
+ flb_error("cannot encode string %s into base64", s);
+ flb_free(b64);
+ return NULL;
+ }
+
+ return b64;
+}
+
+static flb_sds_t azure_kusto_create_blob_uri(struct flb_azure_kusto *ctx,
+ struct flb_upstream_node *u_node,
+ flb_sds_t blob_id)
+{
+ int ret;
+ flb_sds_t uri = NULL;
+ char *blob_uri;
+ size_t blob_uri_size;
+ char *blob_sas;
+ size_t blob_sas_size;
+
+ ret = flb_hash_table_get(u_node->ht, AZURE_KUSTO_RESOURCE_UPSTREAM_URI, 3,
+ (void **)&blob_uri, &blob_uri_size);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "error getting blob uri");
+ return NULL;
+ }
+
+ ret = flb_hash_table_get(u_node->ht, AZURE_KUSTO_RESOURCE_UPSTREAM_SAS, 3,
+ (void **)&blob_sas, &blob_sas_size);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "error getting blob sas token");
+ return NULL;
+ }
+
+ /* uri will be https://<blob_host>/<container_uri>/<blob_id>.multijson?<sas_token> */
+ uri = flb_sds_create_size(flb_sds_len(u_node->host) + blob_uri_size + blob_sas_size +
+ flb_sds_len(blob_id) + 21);
+
+ if (uri) {
+ flb_sds_snprintf(&uri, flb_sds_alloc(uri), "https://%s%s/%s.multijson?%s",
+ u_node->host, blob_uri, blob_id, blob_sas);
+ flb_plg_debug(ctx->ins, "created blob uri %s", uri);
+ }
+ else {
+ flb_plg_error(ctx->ins, "cannot create blob uri buffer");
+ }
+
+ return uri;
+}
+
+static flb_sds_t azure_kusto_create_blob(struct flb_azure_kusto *ctx, flb_sds_t blob_id,
+ flb_sds_t payload, size_t payload_size)
+{
+ int ret = -1;
+ flb_sds_t uri = NULL;
+ struct flb_upstream_node *u_node;
+ struct flb_connection *u_conn;
+ struct flb_http_client *c;
+ size_t resp_size;
+ time_t now;
+ struct tm tm;
+ char tmp[64];
+ int len;
+
+ now = time(NULL);
+ gmtime_r(&now, &tm);
+ len = strftime(tmp, sizeof(tmp) - 1, "%a, %d %b %Y %H:%M:%S GMT", &tm);
+
+ u_node = flb_upstream_ha_node_get(ctx->resources->blob_ha);
+ if (!u_node) {
+ flb_plg_error(ctx->ins, "error getting blob upstream");
+ return NULL;
+ }
+
+ u_conn = flb_upstream_conn_get(u_node->u);
+
+ if (u_conn) {
+ uri = azure_kusto_create_blob_uri(ctx, u_node, blob_id);
+
+ if (uri) {
+ flb_plg_debug(ctx->ins, "uploading payload to blob uri: %s", uri);
+ c = flb_http_client(u_conn, FLB_HTTP_PUT, uri, payload, payload_size, NULL, 0,
+ NULL, 0);
+
+ if (c) {
+ flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
+ flb_http_add_header(c, "Content-Type", 12, "application/json", 16);
+ flb_http_add_header(c, "x-ms-blob-type", 14, "BlockBlob", 9);
+ flb_http_add_header(c, "x-ms-date", 9, tmp, len);
+ flb_http_add_header(c, "x-ms-version", 12, "2019-12-12", 10);
+
+ ret = flb_http_do(c, &resp_size);
+ flb_plg_debug(ctx->ins,
+ "kusto blob upload request http_do=%i, HTTP Status: %i",
+ ret, c->resp.status);
+
+ if (ret == 0) {
+ /* Validate return status and HTTP status if set */
+ if (c->resp.status != 201) {
+ ret = -1;
+
+ if (c->resp.payload_size > 0) {
+ flb_plg_debug(ctx->ins, "Request failed and returned: \n%s",
+ c->resp.payload);
+ }
+ else {
+ flb_plg_debug(ctx->ins, "Request failed");
+ }
+ }
+ }
+ else {
+ flb_plg_error(ctx->ins, "cannot send HTTP request");
+ }
+
+ flb_http_client_destroy(c);
+ }
+ else {
+ flb_plg_error(ctx->ins,
+ "cannot create HTTP client context for blob container");
+ }
+
+ if (ret != 0) {
+ flb_sds_destroy(uri);
+ uri = NULL;
+ }
+ }
+ else {
+ flb_plg_error(ctx->ins, "error creating blob container uri buffer");
+ }
+
+ flb_upstream_conn_release(u_conn);
+ }
+ else {
+ flb_plg_error(ctx->ins, "error getting blob container upstream connection");
+ }
+
+ return uri;
+}
+
+static flb_sds_t create_ingestion_message(struct flb_azure_kusto *ctx, flb_sds_t blob_uri,
+ size_t payload_size)
+{
+ flb_sds_t message = NULL;
+ int ret = 0;
+ char *uuid;
+ char *message_b64;
+ size_t b64_len;
+ size_t message_len;
+
+ uuid = generate_uuid();
+ if (uuid) {
+ message = flb_sds_create(NULL);
+
+ if (message) {
+ message_len =
+ flb_sds_snprintf(&message, 0,
+ "{\"Id\": \"%s\", \"BlobPath\": \"%s\", "
+ "\"RawDataSize\": %lu, \"DatabaseName\": "
+ "\"%s\", \"TableName\": \"%s\","
+ "\"AdditionalProperties\": { \"format\": \"multijson\", "
+ "\"authorizationContext\": "
+ "\"%s\", \"jsonMappingReference\": \"%s\" }}%c",
+ uuid, blob_uri, payload_size, ctx->database_name,
+ ctx->table_name, ctx->resources->identity_token,
+ ctx->ingestion_mapping_reference == NULL
+ ? ""
+ : ctx->ingestion_mapping_reference, 0);
+
+ if (message_len != -1) {
+ flb_plg_debug(ctx->ins, "created ingestion message:\n%s", message);
+ message_b64 = base64_encode(message, message_len, &b64_len);
+
+ if (message_b64) {
+ ret = flb_sds_snprintf(
+ &message, flb_sds_alloc(message),
+ "<QueueMessage><MessageText>%s</MessageText></QueueMessage>%c",
+ message_b64, 0);
+
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "error creating ingestion queue message");
+ }
+
+ flb_free(message_b64);
+ }
+ else {
+ flb_plg_error(ctx->ins, "error encoding ingestion message to base64");
+ }
+ }
+ else {
+ flb_plg_error(ctx->ins, "error creating ingestion message");
+ ret = -1;
+ }
+
+ if (ret == -1) {
+ flb_sds_destroy(message);
+ message = NULL;
+ }
+ }
+ else {
+ flb_plg_error(ctx->ins, "error creating ingestion message buffer");
+ }
+
+ flb_free(uuid);
+ }
+ else {
+ flb_plg_error(ctx->ins, "error generating unique ingestion UUID");
+ }
+
+ return message;
+}
+
+static flb_sds_t azure_kusto_create_queue_uri(struct flb_azure_kusto *ctx,
+ struct flb_upstream_node *u_node)
+{
+ int ret;
+ flb_sds_t uri = NULL;
+ char *queue_uri;
+ size_t queue_uri_size;
+ char *queue_sas;
+ size_t queue_sas_size;
+
+ ret = flb_hash_table_get(u_node->ht, AZURE_KUSTO_RESOURCE_UPSTREAM_URI, 3,
+ (void **)&queue_uri, &queue_uri_size);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "error getting queue uri");
+ return NULL;
+ }
+
+ ret = flb_hash_table_get(u_node->ht, AZURE_KUSTO_RESOURCE_UPSTREAM_SAS, 3,
+ (void **)&queue_sas, &queue_sas_size);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "error getting queue sas token");
+ return NULL;
+ }
+
+ /* uri will be <container_uri>/messages?<sas_token> */
+ uri = flb_sds_create_size(queue_uri_size + queue_sas_size + 11);
+
+ if (uri) {
+ flb_sds_snprintf(&uri, flb_sds_alloc(uri), "%s/messages?%s", queue_uri,
+ queue_sas);
+ flb_plg_debug(ctx->ins, "created queue uri %s", uri);
+ }
+ else {
+ flb_plg_error(ctx->ins, "cannot create queue uri buffer");
+ }
+
+ return uri;
+}
+
+static int azure_kusto_enqueue_ingestion(struct flb_azure_kusto *ctx, flb_sds_t blob_uri,
+ size_t payload_size)
+{
+ int ret = -1;
+ struct flb_upstream_node *u_node;
+ struct flb_connection *u_conn;
+ struct flb_http_client *c;
+ flb_sds_t uri;
+ flb_sds_t payload;
+ size_t resp_size;
+ time_t now;
+ struct tm tm;
+ char tmp[64];
+ int len;
+
+ now = time(NULL);
+ gmtime_r(&now, &tm);
+ len = strftime(tmp, sizeof(tmp) - 1, "%a, %d %b %Y %H:%M:%S GMT", &tm);
+
+ u_node = flb_upstream_ha_node_get(ctx->resources->queue_ha);
+ if (!u_node) {
+ flb_plg_error(ctx->ins, "error getting queue upstream");
+ return -1;
+ }
+
+ u_conn = flb_upstream_conn_get(u_node->u);
+
+ if (u_conn) {
+ uri = azure_kusto_create_queue_uri(ctx, u_node);
+
+ if (uri) {
+ payload = create_ingestion_message(ctx, blob_uri, payload_size);
+
+ if (payload) {
+ c = flb_http_client(u_conn, FLB_HTTP_POST, uri, payload,
+ flb_sds_len(payload), NULL, 0, NULL, 0);
+
+ if (c) {
+ flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
+ flb_http_add_header(c, "Content-Type", 12, "application/atom+xml",
+ 20);
+ flb_http_add_header(c, "x-ms-date", 9, tmp, len);
+ flb_http_add_header(c, "x-ms-version", 12, "2019-12-12", 10);
+
+ ret = flb_http_do(c, &resp_size);
+ flb_plg_debug(ctx->ins,
+ "kusto queue request http_do=%i, HTTP Status: %i", ret,
+ c->resp.status);
+
+ if (ret == 0) {
+ /* Validate return status and HTTP status if set */
+ if (c->resp.status != 201) {
+ ret = -1;
+
+ if (c->resp.payload_size > 0) {
+ flb_plg_debug(ctx->ins,
+ "Request failed and returned: \n%s",
+ c->resp.payload);
+ }
+ else {
+ flb_plg_debug(ctx->ins, "Request failed");
+ }
+ }
+ }
+ else {
+ flb_plg_error(ctx->ins, "cannot send HTTP request");
+ }
+
+ flb_http_client_destroy(c);
+ }
+ else {
+ flb_plg_error(ctx->ins,
+ "cannot create HTTP client context for queue");
+ }
+
+ flb_sds_destroy(payload);
+ }
+ else {
+ flb_plg_error(ctx->ins, "error creating payload buffer");
+ }
+
+ flb_sds_destroy(uri);
+ }
+ else {
+ flb_plg_error(ctx->ins, "error creating queue uri buffer");
+ }
+
+ flb_upstream_conn_release(u_conn);
+ }
+ else {
+ flb_plg_error(ctx->ins, "error getting queue upstream connection");
+ }
+
+ return ret;
+}
+
+static flb_sds_t azure_kusto_create_blob_id(struct flb_azure_kusto *ctx, flb_sds_t tag,
+ size_t tag_len)
+{
+ flb_sds_t blob_id = NULL;
+ struct flb_time tm;
+ uint64_t ms;
+ char *b64tag;
+ size_t b64_len;
+
+ flb_time_get(&tm);
+ ms = ((tm.tm.tv_sec * 1000) + (tm.tm.tv_nsec / 1000000));
+
+ b64tag = base64_encode(tag, tag_len, &b64_len);
+
+ if (b64tag) {
+ /* remove trailing '=' */
+ while (b64_len && b64tag[b64_len - 1] == '=') {
+ b64tag[b64_len - 1] = '\0';
+ b64_len--;
+ }
+
+ blob_id = flb_sds_create_size(flb_sds_len(ctx->database_name) +
+ flb_sds_len(ctx->table_name) + b64_len + 24);
+ if (blob_id) {
+ flb_sds_snprintf(&blob_id, flb_sds_alloc(blob_id), "flb__%s__%s__%s__%lu",
+ ctx->database_name, ctx->table_name, b64tag, ms);
+ }
+ else {
+ flb_plg_error(ctx->ins, "cannot create blob id buffer");
+ }
+
+ flb_free(b64tag);
+ }
+ else {
+ flb_plg_error(ctx->ins, "error encoding tag '%s' to base64", tag);
+ }
+
+ return blob_id;
+}
+
+int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
+ size_t tag_len, flb_sds_t payload, size_t payload_size)
+{
+ int ret = -1;
+ flb_sds_t blob_id;
+ flb_sds_t blob_uri;
+
+ /* flb__<db>__<table>__<b64tag>__<timestamp> */
+ blob_id = azure_kusto_create_blob_id(ctx, tag, tag_len);
+
+ if (blob_id) {
+ blob_uri = azure_kusto_create_blob(ctx, blob_id, payload, payload_size);
+
+ if (blob_uri) {
+ ret = azure_kusto_enqueue_ingestion(ctx, blob_uri, payload_size);
+
+ if (ret != 0) {
+ flb_plg_error(ctx->ins, "failed to enqueue ingestion blob to queue");
+ ret = -1;
+ }
+
+ flb_sds_destroy(blob_uri);
+ }
+ else {
+ flb_plg_error(ctx->ins, "failed to create payload blob uri");
+ }
+
+ flb_sds_destroy(blob_id);
+ }
+ else {
+ flb_plg_error(ctx->ins, "cannot create blob id");
+ }
+
+ return ret;
+}
diff --git a/src/fluent-bit/plugins/out_azure_kusto/azure_kusto_ingest.h b/src/fluent-bit/plugins/out_azure_kusto/azure_kusto_ingest.h
new file mode 100644
index 000000000..60613919a
--- /dev/null
+++ b/src/fluent-bit/plugins/out_azure_kusto/azure_kusto_ingest.h
@@ -0,0 +1,28 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_OUT_AZURE_KUSTO_INGEST_H
+#define FLB_OUT_AZURE_KUSTO_INGEST_H
+
+#include "azure_kusto.h"
+
+int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
+ size_t tag_len, flb_sds_t payload, size_t payload_size);
+
+#endif \ No newline at end of file