summaryrefslogtreecommitdiffstats
path: root/fluent-bit/plugins/out_azure/azure.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/plugins/out_azure/azure.c')
-rw-r--r--fluent-bit/plugins/out_azure/azure.c452
1 files changed, 452 insertions, 0 deletions
diff --git a/fluent-bit/plugins/out_azure/azure.c b/fluent-bit/plugins/out_azure/azure.c
new file mode 100644
index 000000000..d4322fb65
--- /dev/null
+++ b/fluent-bit/plugins/out_azure/azure.c
@@ -0,0 +1,452 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_http_client.h>
+#include <fluent-bit/flb_base64.h>
+#include <fluent-bit/flb_crypto.h>
+#include <fluent-bit/flb_hmac.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_log_event_decoder.h>
+#include <fluent-bit/flb_record_accessor.h>
+#include <fluent-bit/flb_ra_key.h>
+#include <msgpack.h>
+
+#include "azure.h"
+#include "azure_conf.h"
+
+static int cb_azure_init(struct flb_output_instance *ins,
+ struct flb_config *config, void *data)
+{
+ struct flb_azure *ctx;
+
+ ctx = flb_azure_conf_create(ins, config);
+ if (!ctx) {
+ flb_plg_error(ins, "configuration failed");
+ return -1;
+ }
+
+ flb_output_set_context(ins, ctx);
+ return 0;
+}
+
+static int azure_format(const void *in_buf, size_t in_bytes,
+ flb_sds_t tag, flb_sds_t *tag_val_out,
+ char **out_buf, size_t *out_size,
+ struct flb_azure *ctx)
+{
+ int i;
+ int array_size = 0;
+ int map_size;
+ double t;
+ msgpack_object map;
+ msgpack_object k;
+ msgpack_object v;
+ msgpack_sbuffer mp_sbuf;
+ msgpack_packer mp_pck;
+ msgpack_sbuffer tmp_sbuf;
+ msgpack_packer tmp_pck;
+ flb_sds_t record;
+ char time_formatted[32];
+ size_t s;
+ struct tm tms;
+ int len;
+ struct flb_log_event_decoder log_decoder;
+ struct flb_log_event log_event;
+ flb_sds_t tmp = NULL;
+ int ret;
+
+ /* Count number of items */
+ array_size = flb_mp_count(in_buf, in_bytes);
+
+ ret = flb_log_event_decoder_init(&log_decoder, (char *) in_buf, in_bytes);
+
+ if (ret != FLB_EVENT_DECODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event decoder initialization error : %d", ret);
+
+ return -1;
+ }
+
+ /* Create temporary msgpack buffer */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+ msgpack_pack_array(&mp_pck, array_size);
+
+ while ((ret = flb_log_event_decoder_next(
+ &log_decoder,
+ &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
+ /* Create temporary msgpack buffer */
+ msgpack_sbuffer_init(&tmp_sbuf);
+ msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write);
+
+ map = *log_event.body;
+ map_size = map.via.map.size;
+
+ if (ctx->log_type_key) {
+ tmp = flb_ra_translate(ctx->ra_prefix_key,
+ tag, flb_sds_len(tag),
+ map, NULL);
+ if (!tmp) {
+ flb_plg_error(ctx->ins, "Tagged record translation failed!");
+ }
+ else if (flb_sds_is_empty(tmp)) {
+ flb_plg_warn(ctx->ins, "Record accessor key not matched");
+ flb_sds_destroy(tmp);
+ }
+ else {
+ /* tag_val_out must be destroyed by the caller */
+ *tag_val_out = tmp;
+ }
+ }
+
+ msgpack_pack_map(&mp_pck, map_size + 1);
+
+ /* Append the time key */
+ msgpack_pack_str(&mp_pck, flb_sds_len(ctx->time_key));
+ msgpack_pack_str_body(&mp_pck,
+ ctx->time_key,
+ flb_sds_len(ctx->time_key));
+
+ if (ctx->time_generated == FLB_TRUE) {
+ /* Append the time value as ISO 8601 */
+ gmtime_r(&log_event.timestamp.tm.tv_sec, &tms);
+
+ s = strftime(time_formatted, sizeof(time_formatted) - 1,
+ FLB_PACK_JSON_DATE_ISO8601_FMT, &tms);
+
+ len = snprintf(time_formatted + s,
+ sizeof(time_formatted) - 1 - s,
+ ".%03" PRIu64 "Z",
+ (uint64_t) log_event.timestamp.tm.tv_nsec / 1000000);
+
+ s += len;
+ msgpack_pack_str(&mp_pck, s);
+ msgpack_pack_str_body(&mp_pck, time_formatted, s);
+ } else {
+ /* Append the time value as millis.nanos */
+ t = flb_time_to_double(&log_event.timestamp);
+
+ msgpack_pack_double(&mp_pck, t);
+ }
+
+ /* Append original map k/v */
+ for (i = 0; i < map_size; i++) {
+ k = map.via.map.ptr[i].key;
+ v = map.via.map.ptr[i].val;
+
+ msgpack_pack_object(&tmp_pck, k);
+ msgpack_pack_object(&tmp_pck, v);
+ }
+ msgpack_sbuffer_write(&mp_sbuf, tmp_sbuf.data, tmp_sbuf.size);
+ msgpack_sbuffer_destroy(&tmp_sbuf);
+ }
+
+ record = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
+ if (!record) {
+ flb_errno();
+
+ flb_log_event_decoder_destroy(&log_decoder);
+ msgpack_sbuffer_destroy(&mp_sbuf);
+
+ return -1;
+ }
+
+ flb_log_event_decoder_destroy(&log_decoder);
+
+ msgpack_sbuffer_destroy(&mp_sbuf);
+
+ *out_buf = record;
+ *out_size = flb_sds_len(record);
+
+ return 0;
+}
+
+static int build_headers(struct flb_http_client *c,
+ flb_sds_t log_type,
+ size_t content_length,
+ struct flb_azure *ctx)
+{
+ int len;
+ char *auth;
+ char tmp[256];
+ time_t t;
+ size_t size;
+ size_t olen;
+ flb_sds_t rfc1123date;
+ flb_sds_t str_hash;
+ struct tm tm = {0};
+ unsigned char hmac_hash[32] = {0};
+ int result;
+
+ /* Format Date */
+ rfc1123date = flb_sds_create_size(32);
+ if (!rfc1123date) {
+ flb_errno();
+ return -1;
+ }
+
+ t = time(NULL);
+ if (!gmtime_r(&t, &tm)) {
+ flb_errno();
+ flb_sds_destroy(rfc1123date);
+ return -1;
+ }
+ size = strftime(rfc1123date,
+ flb_sds_alloc(rfc1123date) - 1,
+ "%a, %d %b %Y %H:%M:%S GMT", &tm);
+ if (size <= 0) {
+ flb_errno();
+ flb_sds_destroy(rfc1123date);
+ return -1;
+ }
+ flb_sds_len_set(rfc1123date, size);
+
+ /* Compose source string for the hash */
+ str_hash = flb_sds_create_size(256);
+ if (!str_hash) {
+ flb_errno();
+ flb_sds_destroy(rfc1123date);
+ return -1;
+ }
+
+ len = snprintf(tmp, sizeof(tmp) - 1, "%zu\n", content_length);
+ flb_sds_cat(str_hash, "POST\n", 5);
+ flb_sds_cat(str_hash, tmp, len);
+ flb_sds_cat(str_hash, "application/json\n", 17);
+ flb_sds_cat(str_hash, "x-ms-date:", 10);
+ flb_sds_cat(str_hash, rfc1123date, flb_sds_len(rfc1123date));
+ flb_sds_cat(str_hash, "\n", 1);
+ flb_sds_cat(str_hash, FLB_AZURE_RESOURCE, sizeof(FLB_AZURE_RESOURCE) - 1);
+
+ /* Authorization signature */
+ result = flb_hmac_simple(FLB_HASH_SHA256,
+ (unsigned char *) ctx->dec_shared_key,
+ flb_sds_len(ctx->dec_shared_key),
+ (unsigned char *) str_hash,
+ flb_sds_len(str_hash),
+ hmac_hash,
+ sizeof(hmac_hash));
+
+ if (result != FLB_CRYPTO_SUCCESS) {
+ flb_sds_destroy(rfc1123date);
+ flb_sds_destroy(str_hash);
+ return -1;
+ }
+
+ /* Encoded hash */
+ result = flb_base64_encode((unsigned char *) &tmp, sizeof(tmp) - 1, &olen,
+ hmac_hash, sizeof(hmac_hash));
+ tmp[olen] = '\0';
+
+ /* Append headers */
+ flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
+ flb_http_add_header(c, "Log-Type", 8,
+ log_type, flb_sds_len(log_type));
+ flb_http_add_header(c, "Content-Type", 12, "application/json", 16);
+ flb_http_add_header(c, "x-ms-date", 9, rfc1123date,
+ flb_sds_len(rfc1123date));
+ if (ctx->time_generated == FLB_TRUE) {
+ /* Use time value as time-generated within azure */
+ flb_http_add_header(c, "time-generated-field", 20, ctx->time_key, flb_sds_len(ctx->time_key));
+ }
+
+ size = 32 + flb_sds_len(ctx->customer_id) + olen;
+ auth = flb_malloc(size);
+ if (!auth) {
+ flb_errno();
+ flb_sds_destroy(rfc1123date);
+ flb_sds_destroy(str_hash);
+ return -1;
+ }
+
+
+ len = snprintf(auth, size, "SharedKey %s:%s",
+ ctx->customer_id, tmp);
+ flb_http_add_header(c, "Authorization", 13, auth, len);
+
+ /* release resources */
+ flb_sds_destroy(rfc1123date);
+ flb_sds_destroy(str_hash);
+ flb_free(auth);
+
+ return 0;
+}
+
+static void cb_azure_flush(struct flb_event_chunk *event_chunk,
+ struct flb_output_flush *out_flush,
+ struct flb_input_instance *i_ins,
+ void *out_context,
+ struct flb_config *config)
+{
+ int ret;
+ size_t b_sent;
+ char *buf_data;
+ size_t buf_size;
+ struct flb_azure *ctx = out_context;
+ struct flb_connection *u_conn;
+ struct flb_http_client *c;
+ flb_sds_t payload;
+ flb_sds_t final_log_type = NULL;
+ (void) i_ins;
+ (void) config;
+
+ /* Get upstream connection */
+ u_conn = flb_upstream_conn_get(ctx->u);
+ if (!u_conn) {
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ /* Convert binary logs into a JSON payload */
+ ret = azure_format(event_chunk->data, event_chunk->size,
+ event_chunk->tag, &final_log_type, &buf_data, &buf_size, ctx);
+ /* If cannot get matching record using log_type_prefix, use log_type directly */
+ if (!final_log_type) {
+ final_log_type = ctx->log_type;
+ }
+
+ if (ret == -1) {
+ flb_upstream_conn_release(u_conn);
+ FLB_OUTPUT_RETURN(FLB_ERROR);
+ }
+ payload = (flb_sds_t) buf_data;
+
+ /* Compose HTTP Client request */
+ c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->uri,
+ buf_data, buf_size, NULL, 0, NULL, 0);
+ flb_http_buffer_size(c, FLB_HTTP_DATA_SIZE_MAX);
+
+ /* Append headers and Azure signature */
+ ret = build_headers(c, final_log_type, flb_sds_len(payload), ctx);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "error composing signature");
+ flb_sds_destroy(payload);
+ flb_http_client_destroy(c);
+ flb_upstream_conn_release(u_conn);
+ FLB_OUTPUT_RETURN(FLB_ERROR);
+ }
+
+ ret = flb_http_do(c, &b_sent);
+ if (ret != 0) {
+ flb_plg_warn(ctx->ins, "http_do=%i", ret);
+ goto retry;
+ }
+ else {
+ if (c->resp.status >= 200 && c->resp.status <= 299) {
+ flb_plg_info(ctx->ins, "customer_id=%s, HTTP status=%i",
+ ctx->customer_id, c->resp.status);
+ }
+ else {
+ if (c->resp.payload_size > 0) {
+ flb_plg_warn(ctx->ins, "http_status=%i:\n%s",
+ c->resp.status, c->resp.payload);
+ }
+ else {
+ flb_plg_warn(ctx->ins, "http_status=%i", c->resp.status);
+ }
+ goto retry;
+ }
+ }
+
+ /* Cleanup */
+ if (final_log_type != ctx->log_type) {
+ flb_sds_destroy(final_log_type);
+ }
+ flb_http_client_destroy(c);
+ flb_sds_destroy(payload);
+ flb_upstream_conn_release(u_conn);
+ FLB_OUTPUT_RETURN(FLB_OK);
+
+ /* Issue a retry */
+ retry:
+ flb_http_client_destroy(c);
+ flb_sds_destroy(payload);
+ flb_upstream_conn_release(u_conn);
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+}
+
+static int cb_azure_exit(void *data, struct flb_config *config)
+{
+ struct flb_azure *ctx = data;
+
+ flb_azure_conf_destroy(ctx);
+ return 0;
+}
+
+/* Configuration properties map */
+static struct flb_config_map config_map[] = {
+ {
+ FLB_CONFIG_MAP_STR, "customer_id", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_azure, customer_id),
+ "Customer ID or WorkspaceID string."
+ },
+
+ {
+ FLB_CONFIG_MAP_STR, "shared_key", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_azure, shared_key),
+ "The primary or the secondary Connected Sources client authentication key."
+ },
+
+ {
+ FLB_CONFIG_MAP_STR, "log_type", FLB_AZURE_LOG_TYPE,
+ 0, FLB_TRUE, offsetof(struct flb_azure, log_type),
+ "The name of the event type."
+ },
+
+ {
+ FLB_CONFIG_MAP_STR, "log_type_key", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_azure, log_type_key),
+ "If included, the value for this key will be looked upon in the record "
+ "and if present, will over-write the `log_type`. If the key/value "
+ "is not found in the record then the `log_type` option will be used. "
+ },
+
+ {
+ FLB_CONFIG_MAP_STR, "time_key", FLB_AZURE_TIME_KEY,
+ 0, FLB_TRUE, offsetof(struct flb_azure, time_key),
+ "Optional parameter to specify the key name where the timestamp will be stored."
+ },
+
+ {
+ FLB_CONFIG_MAP_BOOL, "time_generated", "false",
+ 0, FLB_TRUE, offsetof(struct flb_azure, time_generated),
+ "If enabled, the HTTP request header 'time-generated-field' will be included "
+ "so Azure can override the timestamp with the key specified by 'time_key' "
+ "option."
+ },
+
+ /* EOF */
+ {0}
+};
+
+struct flb_output_plugin out_azure_plugin = {
+ .name = "azure",
+ .description = "Send events to Azure HTTP Event Collector",
+ .cb_init = cb_azure_init,
+ .cb_flush = cb_azure_flush,
+ .cb_exit = cb_azure_exit,
+
+ /* Configuration */
+ .config_map = config_map,
+
+ /* Plugin flags */
+ .flags = FLB_OUTPUT_NET | FLB_IO_TLS,
+};