summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/out_azure_kusto/azure_kusto.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/plugins/out_azure_kusto/azure_kusto.c')
-rw-r--r--src/fluent-bit/plugins/out_azure_kusto/azure_kusto.c477
1 files changed, 477 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/out_azure_kusto/azure_kusto.c b/src/fluent-bit/plugins/out_azure_kusto/azure_kusto.c
new file mode 100644
index 000000000..4b8ad9b82
--- /dev/null
+++ b/src/fluent-bit/plugins/out_azure_kusto/azure_kusto.c
@@ -0,0 +1,477 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_http_client.h>
+#include <fluent-bit/flb_kv.h>
+#include <fluent-bit/flb_oauth2.h>
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_signv4.h>
+#include <fluent-bit/flb_log_event_decoder.h>
+
+#include "azure_kusto.h"
+#include "azure_kusto_conf.h"
+#include "azure_kusto_ingest.h"
+
+/* Create a new oauth2 context and get a oauth2 token */
+static int azure_kusto_get_oauth2_token(struct flb_azure_kusto *ctx)
+{
+ int ret;
+ char *token;
+
+ /* Clear any previous oauth2 payload content */
+ flb_oauth2_payload_clear(ctx->o);
+
+ ret = flb_oauth2_payload_append(ctx->o, "grant_type", 10, "client_credentials", 18);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "error appending oauth2 params");
+ return -1;
+ }
+
+ ret = flb_oauth2_payload_append(ctx->o, "scope", 5, FLB_AZURE_KUSTO_SCOPE, 39);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "error appending oauth2 params");
+ return -1;
+ }
+
+ ret = flb_oauth2_payload_append(ctx->o, "client_id", 9, ctx->client_id, -1);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "error appending oauth2 params");
+ return -1;
+ }
+
+ ret = flb_oauth2_payload_append(ctx->o, "client_secret", 13, ctx->client_secret, -1);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "error appending oauth2 params");
+ return -1;
+ }
+
+ /* Retrieve access token */
+ token = flb_oauth2_token_get(ctx->o);
+ if (!token) {
+ flb_plg_error(ctx->ins, "error retrieving oauth2 access token");
+ return -1;
+ }
+
+ return 0;
+}
+
+flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx)
+{
+ int ret = 0;
+ flb_sds_t output = NULL;
+
+ if (pthread_mutex_lock(&ctx->token_mutex)) {
+ flb_plg_error(ctx->ins, "error locking mutex");
+ return NULL;
+ }
+
+ if (flb_oauth2_token_expired(ctx->o) == FLB_TRUE) {
+ ret = azure_kusto_get_oauth2_token(ctx);
+ }
+
+ /* Copy string to prevent race conditions (get_oauth2 can free the string) */
+ if (ret == 0) {
+ output = flb_sds_create_size(flb_sds_len(ctx->o->token_type) +
+ flb_sds_len(ctx->o->access_token) + 2);
+ if (!output) {
+ flb_plg_error(ctx->ins, "error creating token buffer");
+ return NULL;
+ }
+ flb_sds_snprintf(&output, flb_sds_alloc(output), "%s %s", ctx->o->token_type,
+ ctx->o->access_token);
+ }
+
+ if (pthread_mutex_unlock(&ctx->token_mutex)) {
+ flb_plg_error(ctx->ins, "error unlocking mutex");
+ if (output) {
+ flb_sds_destroy(output);
+ }
+ return NULL;
+ }
+
+ return output;
+}
+
+/**
+ * Executes a control command against kusto's endpoint
+ *
+ * @param ctx Plugin's context
+ * @param csl Kusto's control command
+ * @return flb_sds_t Returns the response or NULL on error.
+ */
+flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *csl)
+{
+ flb_sds_t token;
+ flb_sds_t body;
+ size_t b_sent;
+ int ret;
+ struct flb_connection *u_conn;
+ struct flb_http_client *c;
+ flb_sds_t resp = NULL;
+
+ /* Get upstream connection */
+ u_conn = flb_upstream_conn_get(ctx->u);
+
+ if (u_conn) {
+ token = get_azure_kusto_token(ctx);
+
+ if (token) {
+ /* Compose request body */
+ body = flb_sds_create_size(sizeof(FLB_AZURE_KUSTO_MGMT_BODY_TEMPLATE) - 1 +
+ strlen(csl));
+
+ if (body) {
+ flb_sds_snprintf(&body, flb_sds_alloc(body),
+ FLB_AZURE_KUSTO_MGMT_BODY_TEMPLATE, csl);
+
+ /* Compose HTTP Client request */
+ c = flb_http_client(u_conn, FLB_HTTP_POST, FLB_AZURE_KUSTO_MGMT_URI_PATH,
+ body, flb_sds_len(body), NULL, 0, NULL, 0);
+
+ if (c) {
+ /* Add headers */
+ flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
+ flb_http_add_header(c, "Content-Type", 12, "application/json", 16);
+ flb_http_add_header(c, "Accept", 6, "application/json", 16);
+ flb_http_add_header(c, "Authorization", 13, token,
+ flb_sds_len(token));
+ flb_http_buffer_size(c, FLB_HTTP_DATA_SIZE_MAX * 10);
+
+ /* Send HTTP request */
+ ret = flb_http_do(c, &b_sent);
+ flb_plg_debug(
+ ctx->ins,
+ "Kusto ingestion command request http_do=%i, HTTP Status: %i",
+ ret, c->resp.status);
+
+ if (ret == 0) {
+ if (c->resp.status == 200) {
+ /* Copy payload response to the response param */
+ resp =
+ flb_sds_create_len(c->resp.payload, c->resp.payload_size);
+ }
+ else if (c->resp.payload_size > 0) {
+ flb_plg_debug(ctx->ins, "Request failed and returned: \n%s",
+ c->resp.payload);
+ }
+ else {
+ flb_plg_debug(ctx->ins, "Request failed");
+ }
+ }
+ else {
+ flb_plg_error(ctx->ins, "cannot send HTTP request");
+ }
+
+ flb_http_client_destroy(c);
+ }
+ else {
+ flb_plg_error(ctx->ins, "cannot create HTTP client context");
+ }
+
+ flb_sds_destroy(body);
+ }
+ else {
+ flb_plg_error(ctx->ins, "cannot construct request body");
+ }
+
+ flb_sds_destroy(token);
+ }
+ else {
+ flb_plg_error(ctx->ins, "cannot retrieve oauth2 token");
+ }
+
+ flb_upstream_conn_release(u_conn);
+ }
+ else {
+ flb_plg_error(ctx->ins, "cannot create upstream connection");
+ }
+
+ return resp;
+}
+
+static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_config *config,
+ void *data)
+{
+ int io_flags = FLB_IO_TLS;
+ struct flb_azure_kusto *ctx;
+
+ /* Create config context */
+ ctx = flb_azure_kusto_conf_create(ins, config);
+ if (!ctx) {
+ flb_plg_error(ins, "configuration failed");
+ return -1;
+ }
+
+ flb_output_set_context(ins, ctx);
+
+ /* Network mode IPv6 */
+ if (ins->host.ipv6 == FLB_TRUE) {
+ io_flags |= FLB_IO_IPV6;
+ }
+
+ /* Create mutex for acquiring oauth tokens and getting ingestion resources (they
+ * are shared across flush coroutines)
+ */
+ pthread_mutex_init(&ctx->token_mutex, NULL);
+ pthread_mutex_init(&ctx->resources_mutex, NULL);
+
+ /*
+ * Create upstream context for Kusto Ingestion endpoint
+ */
+ ctx->u = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls);
+ if (!ctx->u) {
+ flb_plg_error(ctx->ins, "upstream creation failed");
+ return -1;
+ }
+
+ /* Create oauth2 context */
+ ctx->o =
+ flb_oauth2_create(ctx->config, ctx->oauth_url, FLB_AZURE_KUSTO_TOKEN_REFRESH);
+ if (!ctx->o) {
+ flb_plg_error(ctx->ins, "cannot create oauth2 context");
+ return -1;
+ }
+ flb_output_upstream_set(ctx->u, ins);
+
+ return 0;
+}
+
+static int azure_kusto_format(struct flb_azure_kusto *ctx, const char *tag, int tag_len,
+ const void *data, size_t bytes, void **out_data,
+ size_t *out_size)
+{
+ int records = 0;
+ msgpack_sbuffer mp_sbuf;
+ msgpack_packer mp_pck;
+ /* for sub msgpack objs */
+ int map_size;
+ struct tm tms;
+ char time_formatted[32];
+ size_t s;
+ int len;
+ struct flb_log_event_decoder log_decoder;
+ struct flb_log_event log_event;
+ int ret;
+ /* output buffer */
+ flb_sds_t out_buf;
+
+ /* Create array for all records */
+ records = flb_mp_count(data, bytes);
+ if (records <= 0) {
+ flb_plg_error(ctx->ins, "error counting msgpack entries");
+ return -1;
+ }
+
+ ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
+
+ if (ret != FLB_EVENT_DECODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event decoder initialization error : %d", ret);
+
+ return -1;
+ }
+
+ /* Create temporary msgpack buffer */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ msgpack_pack_array(&mp_pck, records);
+
+ while ((ret = flb_log_event_decoder_next(
+ &log_decoder,
+ &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
+ map_size = 1;
+ if (ctx->include_time_key == FLB_TRUE) {
+ map_size++;
+ }
+
+ if (ctx->include_tag_key == FLB_TRUE) {
+ map_size++;
+ }
+
+ msgpack_pack_map(&mp_pck, map_size);
+
+ /* include_time_key */
+ if (ctx->include_time_key == FLB_TRUE) {
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->time_key));
+ msgpack_pack_str_body(&mp_pck, ctx->time_key, flb_sds_len(ctx->time_key));
+
+ /* Append the time value as ISO 8601 */
+ gmtime_r(&log_event.timestamp.tm.tv_sec, &tms);
+ s = strftime(time_formatted, sizeof(time_formatted) - 1,
+ FLB_PACK_JSON_DATE_ISO8601_FMT, &tms);
+
+ len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s,
+ ".%03" PRIu64 "Z",
+ (uint64_t)log_event.timestamp.tm.tv_nsec / 1000000);
+ s += len;
+ msgpack_pack_str(&mp_pck, s);
+ msgpack_pack_str_body(&mp_pck, time_formatted, s);
+ }
+
+ /* include_tag_key */
+ if (ctx->include_tag_key == FLB_TRUE) {
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->tag_key));
+ msgpack_pack_str_body(&mp_pck, ctx->tag_key, flb_sds_len(ctx->tag_key));
+ msgpack_pack_str(&mp_pck, tag_len);
+ msgpack_pack_str_body(&mp_pck, tag, tag_len);
+ }
+
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->log_key));
+ msgpack_pack_str_body(&mp_pck, ctx->log_key, flb_sds_len(ctx->log_key));
+ msgpack_pack_object(&mp_pck, *log_event.body);
+ }
+
+ /* Convert from msgpack to JSON */
+ out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
+
+ /* Cleanup */
+ flb_log_event_decoder_destroy(&log_decoder);
+ msgpack_sbuffer_destroy(&mp_sbuf);
+
+ if (!out_buf) {
+ flb_plg_error(ctx->ins, "error formatting JSON payload");
+ return -1;
+ }
+
+ *out_data = out_buf;
+ *out_size = flb_sds_len(out_buf);
+
+ return 0;
+}
+
+static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
+ struct flb_output_flush *out_flush,
+ struct flb_input_instance *i_ins, void *out_context,
+ struct flb_config *config)
+{
+ int ret;
+ flb_sds_t json;
+ size_t json_size;
+ size_t tag_len;
+ struct flb_azure_kusto *ctx = out_context;
+
+ (void)i_ins;
+ (void)config;
+
+ flb_plg_trace(ctx->ins, "flushing bytes %zu", event_chunk->size);
+
+ tag_len = flb_sds_len(event_chunk->tag);
+
+ /* Load or refresh ingestion resources */
+ ret = azure_kusto_load_ingestion_resources(ctx, config);
+ if (ret != 0) {
+ flb_plg_error(ctx->ins, "cannot load ingestion resources");
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ /* Reformat msgpack to JSON payload */
+ ret = azure_kusto_format(ctx, event_chunk->tag, tag_len, event_chunk->data,
+ event_chunk->size, (void **)&json, &json_size);
+ if (ret != 0) {
+ flb_plg_error(ctx->ins, "cannot reformat data into json");
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, json, json_size);
+ if (ret != 0) {
+ flb_plg_error(ctx->ins, "cannot perform queued ingestion");
+ flb_sds_destroy(json);
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ /* Cleanup */
+ flb_sds_destroy(json);
+
+ /* Done */
+ FLB_OUTPUT_RETURN(FLB_OK);
+}
+
+static int cb_azure_kusto_exit(void *data, struct flb_config *config)
+{
+ struct flb_azure_kusto *ctx = data;
+
+ if (!ctx) {
+ return -1;
+ }
+
+ if (ctx->u) {
+ flb_upstream_destroy(ctx->u);
+ ctx->u = NULL;
+ }
+
+ flb_azure_kusto_conf_destroy(ctx);
+
+ return 0;
+}
+
+static struct flb_config_map config_map[] = {
+ {FLB_CONFIG_MAP_STR, "tenant_id", (char *)NULL, 0, FLB_TRUE,
+ offsetof(struct flb_azure_kusto, tenant_id),
+ "Set the tenant ID of the AAD application used for authentication"},
+ {FLB_CONFIG_MAP_STR, "client_id", (char *)NULL, 0, FLB_TRUE,
+ offsetof(struct flb_azure_kusto, client_id),
+ "Set the client ID (Application ID) of the AAD application used for authentication"},
+ {FLB_CONFIG_MAP_STR, "client_secret", (char *)NULL, 0, FLB_TRUE,
+ offsetof(struct flb_azure_kusto, client_secret),
+ "Set the client secret (Application Password) of the AAD application used for "
+ "authentication"},
+ {FLB_CONFIG_MAP_STR, "ingestion_endpoint", (char *)NULL, 0, FLB_TRUE,
+ offsetof(struct flb_azure_kusto, ingestion_endpoint),
+ "Set the Kusto cluster's ingestion endpoint URL (e.g. "
+ "https://ingest-mycluster.eastus.kusto.windows.net)"},
+ {FLB_CONFIG_MAP_STR, "database_name", (char *)NULL, 0, FLB_TRUE,
+ offsetof(struct flb_azure_kusto, database_name), "Set the database name"},
+ {FLB_CONFIG_MAP_STR, "table_name", (char *)NULL, 0, FLB_TRUE,
+ offsetof(struct flb_azure_kusto, table_name), "Set the table name"},
+ {FLB_CONFIG_MAP_STR, "ingestion_mapping_reference", (char *)NULL, 0, FLB_TRUE,
+ offsetof(struct flb_azure_kusto, ingestion_mapping_reference),
+ "Set the ingestion mapping reference"},
+ {FLB_CONFIG_MAP_STR, "log_key", FLB_AZURE_KUSTO_DEFAULT_LOG_KEY, 0, FLB_TRUE,
+ offsetof(struct flb_azure_kusto, log_key), "The key name of event payload"},
+ {FLB_CONFIG_MAP_BOOL, "include_tag_key", "true", 0, FLB_TRUE,
+ offsetof(struct flb_azure_kusto, include_tag_key),
+ "If enabled, tag is appended to output. "
+ "The key name is used 'tag_key' property."},
+ {FLB_CONFIG_MAP_STR, "tag_key", FLB_AZURE_KUSTO_DEFAULT_TAG_KEY, 0, FLB_TRUE,
+ offsetof(struct flb_azure_kusto, tag_key),
+ "The key name of tag. If 'include_tag_key' is false, "
+ "This property is ignored"},
+ {FLB_CONFIG_MAP_BOOL, "include_time_key", "true", 0, FLB_TRUE,
+ offsetof(struct flb_azure_kusto, include_time_key),
+ "If enabled, time is appended to output. "
+ "The key name is used 'time_key' property."},
+ {FLB_CONFIG_MAP_STR, "time_key", FLB_AZURE_KUSTO_DEFAULT_TIME_KEY, 0, FLB_TRUE,
+ offsetof(struct flb_azure_kusto, time_key),
+ "The key name of the time. If 'include_time_key' is false, "
+ "This property is ignored"},
+ /* EOF */
+ {0}};
+
+struct flb_output_plugin out_azure_kusto_plugin = {
+ .name = "azure_kusto",
+ .description = "Send events to Kusto (Azure Data Explorer)",
+ .cb_init = cb_azure_kusto_init,
+ .cb_flush = cb_azure_kusto_flush,
+ .cb_exit = cb_azure_kusto_exit,
+ .config_map = config_map,
+ /* Plugin flags */
+ .flags = FLB_OUTPUT_NET | FLB_IO_TLS,
+};