summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/out_azure/azure.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/plugins/out_azure/azure.c')
-rw-r--r--src/fluent-bit/plugins/out_azure/azure.c452
1 files changed, 0 insertions, 452 deletions
diff --git a/src/fluent-bit/plugins/out_azure/azure.c b/src/fluent-bit/plugins/out_azure/azure.c
deleted file mode 100644
index d4322fb65..000000000
--- a/src/fluent-bit/plugins/out_azure/azure.c
+++ /dev/null
@@ -1,452 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <fluent-bit/flb_output_plugin.h>
-#include <fluent-bit/flb_http_client.h>
-#include <fluent-bit/flb_base64.h>
-#include <fluent-bit/flb_crypto.h>
-#include <fluent-bit/flb_hmac.h>
-#include <fluent-bit/flb_pack.h>
-#include <fluent-bit/flb_utils.h>
-#include <fluent-bit/flb_time.h>
-#include <fluent-bit/flb_log_event_decoder.h>
-#include <fluent-bit/flb_record_accessor.h>
-#include <fluent-bit/flb_ra_key.h>
-#include <msgpack.h>
-
-#include "azure.h"
-#include "azure_conf.h"
-
-static int cb_azure_init(struct flb_output_instance *ins,
- struct flb_config *config, void *data)
-{
- struct flb_azure *ctx;
-
- ctx = flb_azure_conf_create(ins, config);
- if (!ctx) {
- flb_plg_error(ins, "configuration failed");
- return -1;
- }
-
- flb_output_set_context(ins, ctx);
- return 0;
-}
-
-static int azure_format(const void *in_buf, size_t in_bytes,
- flb_sds_t tag, flb_sds_t *tag_val_out,
- char **out_buf, size_t *out_size,
- struct flb_azure *ctx)
-{
- int i;
- int array_size = 0;
- int map_size;
- double t;
- msgpack_object map;
- msgpack_object k;
- msgpack_object v;
- msgpack_sbuffer mp_sbuf;
- msgpack_packer mp_pck;
- msgpack_sbuffer tmp_sbuf;
- msgpack_packer tmp_pck;
- flb_sds_t record;
- char time_formatted[32];
- size_t s;
- struct tm tms;
- int len;
- struct flb_log_event_decoder log_decoder;
- struct flb_log_event log_event;
- flb_sds_t tmp = NULL;
- int ret;
-
- /* Count number of items */
- array_size = flb_mp_count(in_buf, in_bytes);
-
- ret = flb_log_event_decoder_init(&log_decoder, (char *) in_buf, in_bytes);
-
- if (ret != FLB_EVENT_DECODER_SUCCESS) {
- flb_plg_error(ctx->ins,
- "Log event decoder initialization error : %d", ret);
-
- return -1;
- }
-
- /* Create temporary msgpack buffer */
- msgpack_sbuffer_init(&mp_sbuf);
- msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
- msgpack_pack_array(&mp_pck, array_size);
-
- while ((ret = flb_log_event_decoder_next(
- &log_decoder,
- &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
- /* Create temporary msgpack buffer */
- msgpack_sbuffer_init(&tmp_sbuf);
- msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write);
-
- map = *log_event.body;
- map_size = map.via.map.size;
-
- if (ctx->log_type_key) {
- tmp = flb_ra_translate(ctx->ra_prefix_key,
- tag, flb_sds_len(tag),
- map, NULL);
- if (!tmp) {
- flb_plg_error(ctx->ins, "Tagged record translation failed!");
- }
- else if (flb_sds_is_empty(tmp)) {
- flb_plg_warn(ctx->ins, "Record accessor key not matched");
- flb_sds_destroy(tmp);
- }
- else {
- /* tag_val_out must be destroyed by the caller */
- *tag_val_out = tmp;
- }
- }
-
- msgpack_pack_map(&mp_pck, map_size + 1);
-
- /* Append the time key */
- msgpack_pack_str(&mp_pck, flb_sds_len(ctx->time_key));
- msgpack_pack_str_body(&mp_pck,
- ctx->time_key,
- flb_sds_len(ctx->time_key));
-
- if (ctx->time_generated == FLB_TRUE) {
- /* Append the time value as ISO 8601 */
- gmtime_r(&log_event.timestamp.tm.tv_sec, &tms);
-
- s = strftime(time_formatted, sizeof(time_formatted) - 1,
- FLB_PACK_JSON_DATE_ISO8601_FMT, &tms);
-
- len = snprintf(time_formatted + s,
- sizeof(time_formatted) - 1 - s,
- ".%03" PRIu64 "Z",
- (uint64_t) log_event.timestamp.tm.tv_nsec / 1000000);
-
- s += len;
- msgpack_pack_str(&mp_pck, s);
- msgpack_pack_str_body(&mp_pck, time_formatted, s);
- } else {
- /* Append the time value as millis.nanos */
- t = flb_time_to_double(&log_event.timestamp);
-
- msgpack_pack_double(&mp_pck, t);
- }
-
- /* Append original map k/v */
- for (i = 0; i < map_size; i++) {
- k = map.via.map.ptr[i].key;
- v = map.via.map.ptr[i].val;
-
- msgpack_pack_object(&tmp_pck, k);
- msgpack_pack_object(&tmp_pck, v);
- }
- msgpack_sbuffer_write(&mp_sbuf, tmp_sbuf.data, tmp_sbuf.size);
- msgpack_sbuffer_destroy(&tmp_sbuf);
- }
-
- record = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
- if (!record) {
- flb_errno();
-
- flb_log_event_decoder_destroy(&log_decoder);
- msgpack_sbuffer_destroy(&mp_sbuf);
-
- return -1;
- }
-
- flb_log_event_decoder_destroy(&log_decoder);
-
- msgpack_sbuffer_destroy(&mp_sbuf);
-
- *out_buf = record;
- *out_size = flb_sds_len(record);
-
- return 0;
-}
-
-static int build_headers(struct flb_http_client *c,
- flb_sds_t log_type,
- size_t content_length,
- struct flb_azure *ctx)
-{
- int len;
- char *auth;
- char tmp[256];
- time_t t;
- size_t size;
- size_t olen;
- flb_sds_t rfc1123date;
- flb_sds_t str_hash;
- struct tm tm = {0};
- unsigned char hmac_hash[32] = {0};
- int result;
-
- /* Format Date */
- rfc1123date = flb_sds_create_size(32);
- if (!rfc1123date) {
- flb_errno();
- return -1;
- }
-
- t = time(NULL);
- if (!gmtime_r(&t, &tm)) {
- flb_errno();
- flb_sds_destroy(rfc1123date);
- return -1;
- }
- size = strftime(rfc1123date,
- flb_sds_alloc(rfc1123date) - 1,
- "%a, %d %b %Y %H:%M:%S GMT", &tm);
- if (size <= 0) {
- flb_errno();
- flb_sds_destroy(rfc1123date);
- return -1;
- }
- flb_sds_len_set(rfc1123date, size);
-
- /* Compose source string for the hash */
- str_hash = flb_sds_create_size(256);
- if (!str_hash) {
- flb_errno();
- flb_sds_destroy(rfc1123date);
- return -1;
- }
-
- len = snprintf(tmp, sizeof(tmp) - 1, "%zu\n", content_length);
- flb_sds_cat(str_hash, "POST\n", 5);
- flb_sds_cat(str_hash, tmp, len);
- flb_sds_cat(str_hash, "application/json\n", 17);
- flb_sds_cat(str_hash, "x-ms-date:", 10);
- flb_sds_cat(str_hash, rfc1123date, flb_sds_len(rfc1123date));
- flb_sds_cat(str_hash, "\n", 1);
- flb_sds_cat(str_hash, FLB_AZURE_RESOURCE, sizeof(FLB_AZURE_RESOURCE) - 1);
-
- /* Authorization signature */
- result = flb_hmac_simple(FLB_HASH_SHA256,
- (unsigned char *) ctx->dec_shared_key,
- flb_sds_len(ctx->dec_shared_key),
- (unsigned char *) str_hash,
- flb_sds_len(str_hash),
- hmac_hash,
- sizeof(hmac_hash));
-
- if (result != FLB_CRYPTO_SUCCESS) {
- flb_sds_destroy(rfc1123date);
- flb_sds_destroy(str_hash);
- return -1;
- }
-
- /* Encoded hash */
- result = flb_base64_encode((unsigned char *) &tmp, sizeof(tmp) - 1, &olen,
- hmac_hash, sizeof(hmac_hash));
- tmp[olen] = '\0';
-
- /* Append headers */
- flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
- flb_http_add_header(c, "Log-Type", 8,
- log_type, flb_sds_len(log_type));
- flb_http_add_header(c, "Content-Type", 12, "application/json", 16);
- flb_http_add_header(c, "x-ms-date", 9, rfc1123date,
- flb_sds_len(rfc1123date));
- if (ctx->time_generated == FLB_TRUE) {
- /* Use time value as time-generated within azure */
- flb_http_add_header(c, "time-generated-field", 20, ctx->time_key, flb_sds_len(ctx->time_key));
- }
-
- size = 32 + flb_sds_len(ctx->customer_id) + olen;
- auth = flb_malloc(size);
- if (!auth) {
- flb_errno();
- flb_sds_destroy(rfc1123date);
- flb_sds_destroy(str_hash);
- return -1;
- }
-
-
- len = snprintf(auth, size, "SharedKey %s:%s",
- ctx->customer_id, tmp);
- flb_http_add_header(c, "Authorization", 13, auth, len);
-
- /* release resources */
- flb_sds_destroy(rfc1123date);
- flb_sds_destroy(str_hash);
- flb_free(auth);
-
- return 0;
-}
-
-static void cb_azure_flush(struct flb_event_chunk *event_chunk,
- struct flb_output_flush *out_flush,
- struct flb_input_instance *i_ins,
- void *out_context,
- struct flb_config *config)
-{
- int ret;
- size_t b_sent;
- char *buf_data;
- size_t buf_size;
- struct flb_azure *ctx = out_context;
- struct flb_connection *u_conn;
- struct flb_http_client *c;
- flb_sds_t payload;
- flb_sds_t final_log_type = NULL;
- (void) i_ins;
- (void) config;
-
- /* Get upstream connection */
- u_conn = flb_upstream_conn_get(ctx->u);
- if (!u_conn) {
- FLB_OUTPUT_RETURN(FLB_RETRY);
- }
-
- /* Convert binary logs into a JSON payload */
- ret = azure_format(event_chunk->data, event_chunk->size,
- event_chunk->tag, &final_log_type, &buf_data, &buf_size, ctx);
- /* If cannot get matching record using log_type_prefix, use log_type directly */
- if (!final_log_type) {
- final_log_type = ctx->log_type;
- }
-
- if (ret == -1) {
- flb_upstream_conn_release(u_conn);
- FLB_OUTPUT_RETURN(FLB_ERROR);
- }
- payload = (flb_sds_t) buf_data;
-
- /* Compose HTTP Client request */
- c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->uri,
- buf_data, buf_size, NULL, 0, NULL, 0);
- flb_http_buffer_size(c, FLB_HTTP_DATA_SIZE_MAX);
-
- /* Append headers and Azure signature */
- ret = build_headers(c, final_log_type, flb_sds_len(payload), ctx);
- if (ret == -1) {
- flb_plg_error(ctx->ins, "error composing signature");
- flb_sds_destroy(payload);
- flb_http_client_destroy(c);
- flb_upstream_conn_release(u_conn);
- FLB_OUTPUT_RETURN(FLB_ERROR);
- }
-
- ret = flb_http_do(c, &b_sent);
- if (ret != 0) {
- flb_plg_warn(ctx->ins, "http_do=%i", ret);
- goto retry;
- }
- else {
- if (c->resp.status >= 200 && c->resp.status <= 299) {
- flb_plg_info(ctx->ins, "customer_id=%s, HTTP status=%i",
- ctx->customer_id, c->resp.status);
- }
- else {
- if (c->resp.payload_size > 0) {
- flb_plg_warn(ctx->ins, "http_status=%i:\n%s",
- c->resp.status, c->resp.payload);
- }
- else {
- flb_plg_warn(ctx->ins, "http_status=%i", c->resp.status);
- }
- goto retry;
- }
- }
-
- /* Cleanup */
- if (final_log_type != ctx->log_type) {
- flb_sds_destroy(final_log_type);
- }
- flb_http_client_destroy(c);
- flb_sds_destroy(payload);
- flb_upstream_conn_release(u_conn);
- FLB_OUTPUT_RETURN(FLB_OK);
-
- /* Issue a retry */
- retry:
- flb_http_client_destroy(c);
- flb_sds_destroy(payload);
- flb_upstream_conn_release(u_conn);
- FLB_OUTPUT_RETURN(FLB_RETRY);
-}
-
-static int cb_azure_exit(void *data, struct flb_config *config)
-{
- struct flb_azure *ctx = data;
-
- flb_azure_conf_destroy(ctx);
- return 0;
-}
-
-/* Configuration properties map */
-static struct flb_config_map config_map[] = {
- {
- FLB_CONFIG_MAP_STR, "customer_id", NULL,
- 0, FLB_TRUE, offsetof(struct flb_azure, customer_id),
- "Customer ID or WorkspaceID string."
- },
-
- {
- FLB_CONFIG_MAP_STR, "shared_key", NULL,
- 0, FLB_TRUE, offsetof(struct flb_azure, shared_key),
- "The primary or the secondary Connected Sources client authentication key."
- },
-
- {
- FLB_CONFIG_MAP_STR, "log_type", FLB_AZURE_LOG_TYPE,
- 0, FLB_TRUE, offsetof(struct flb_azure, log_type),
- "The name of the event type."
- },
-
- {
- FLB_CONFIG_MAP_STR, "log_type_key", NULL,
- 0, FLB_TRUE, offsetof(struct flb_azure, log_type_key),
- "If included, the value for this key will be looked upon in the record "
- "and if present, will over-write the `log_type`. If the key/value "
- "is not found in the record then the `log_type` option will be used. "
- },
-
- {
- FLB_CONFIG_MAP_STR, "time_key", FLB_AZURE_TIME_KEY,
- 0, FLB_TRUE, offsetof(struct flb_azure, time_key),
- "Optional parameter to specify the key name where the timestamp will be stored."
- },
-
- {
- FLB_CONFIG_MAP_BOOL, "time_generated", "false",
- 0, FLB_TRUE, offsetof(struct flb_azure, time_generated),
- "If enabled, the HTTP request header 'time-generated-field' will be included "
- "so Azure can override the timestamp with the key specified by 'time_key' "
- "option."
- },
-
- /* EOF */
- {0}
-};
-
-struct flb_output_plugin out_azure_plugin = {
- .name = "azure",
- .description = "Send events to Azure HTTP Event Collector",
- .cb_init = cb_azure_init,
- .cb_flush = cb_azure_flush,
- .cb_exit = cb_azure_exit,
-
- /* Configuration */
- .config_map = config_map,
-
- /* Plugin flags */
- .flags = FLB_OUTPUT_NET | FLB_IO_TLS,
-};