diff options
Diffstat (limited to 'fluent-bit/plugins/out_azure_blob')
-rw-r--r-- | fluent-bit/plugins/out_azure_blob/CMakeLists.txt | 10 | ||||
-rw-r--r-- | fluent-bit/plugins/out_azure_blob/azure_blob.c | 594 | ||||
-rw-r--r-- | fluent-bit/plugins/out_azure_blob/azure_blob.h | 74 | ||||
-rw-r--r-- | fluent-bit/plugins/out_azure_blob/azure_blob_appendblob.c | 44 | ||||
-rw-r--r-- | fluent-bit/plugins/out_azure_blob/azure_blob_appendblob.h | 28 | ||||
-rw-r--r-- | fluent-bit/plugins/out_azure_blob/azure_blob_blockblob.c | 238 | ||||
-rw-r--r-- | fluent-bit/plugins/out_azure_blob/azure_blob_blockblob.h | 32 | ||||
-rw-r--r-- | fluent-bit/plugins/out_azure_blob/azure_blob_conf.c | 245 | ||||
-rw-r--r-- | fluent-bit/plugins/out_azure_blob/azure_blob_conf.h | 29 | ||||
-rw-r--r-- | fluent-bit/plugins/out_azure_blob/azure_blob_http.c | 361 | ||||
-rw-r--r-- | fluent-bit/plugins/out_azure_blob/azure_blob_http.h | 36 | ||||
-rw-r--r-- | fluent-bit/plugins/out_azure_blob/azure_blob_uri.c | 150 | ||||
-rw-r--r-- | fluent-bit/plugins/out_azure_blob/azure_blob_uri.h | 34 |
13 files changed, 1875 insertions, 0 deletions
diff --git a/fluent-bit/plugins/out_azure_blob/CMakeLists.txt b/fluent-bit/plugins/out_azure_blob/CMakeLists.txt new file mode 100644 index 00000000..3624480e --- /dev/null +++ b/fluent-bit/plugins/out_azure_blob/CMakeLists.txt @@ -0,0 +1,10 @@ +set(src + azure_blob.c + azure_blob_uri.c + azure_blob_conf.c + azure_blob_http.c + azure_blob_appendblob.c + azure_blob_blockblob.c + ) + +FLB_PLUGIN(out_azure_blob "${src}" "") diff --git a/fluent-bit/plugins/out_azure_blob/azure_blob.c b/fluent-bit/plugins/out_azure_blob/azure_blob.c new file mode 100644 index 00000000..3f539826 --- /dev/null +++ b/fluent-bit/plugins/out_azure_blob/azure_blob.c @@ -0,0 +1,594 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/flb_kv.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_config_map.h> +#include <fluent-bit/flb_gzip.h> +#include <fluent-bit/flb_base64.h> + +#include <msgpack.h> + +#include "azure_blob.h" +#include "azure_blob_uri.h" +#include "azure_blob_conf.h" +#include "azure_blob_appendblob.h" +#include "azure_blob_blockblob.h" +#include "azure_blob_http.h" + +#define CREATE_BLOB 1337 + +static int azure_blob_format(struct flb_config *config, + struct flb_input_instance *ins, + void *plugin_context, + void *flush_ctx, + int event_type, + const char *tag, int tag_len, + const void *data, size_t bytes, + void **out_data, size_t *out_size) +{ + flb_sds_t out_buf; + struct flb_azure_blob *ctx = plugin_context; + + out_buf = flb_pack_msgpack_to_json_format(data, bytes, + FLB_PACK_JSON_FORMAT_LINES, + FLB_PACK_JSON_DATE_ISO8601, + ctx->date_key); + if (!out_buf) { + return -1; + } + + *out_data = out_buf; + *out_size = flb_sds_len(out_buf); + return 0; +} + +static int send_blob(struct flb_config *config, + struct flb_input_instance *i_ins, + struct flb_azure_blob *ctx, char *name, + char *tag, int tag_len, void *data, size_t bytes) +{ + int ret; + int compressed = FLB_FALSE; + int content_encoding = FLB_FALSE; + int content_type = FLB_FALSE; + uint64_t ms = 0; + size_t b_sent; + void *out_buf; + size_t out_size; + flb_sds_t uri = NULL; + flb_sds_t blockid = NULL; + void *payload_buf; + size_t payload_size; + struct flb_http_client *c; + struct flb_connection *u_conn; + + if (ctx->btype == AZURE_BLOB_APPENDBLOB) { + uri = azb_append_blob_uri(ctx, tag); + } + else if (ctx->btype == AZURE_BLOB_BLOCKBLOB) { + blockid = azb_block_blob_id(&ms); + if (!blockid) { + flb_plg_error(ctx->ins, "could not generate block id"); + return FLB_RETRY; + } + uri = azb_block_blob_uri(ctx, tag, blockid, ms); + } + + if (!uri) { + flb_free(blockid); + return FLB_RETRY; + } + + /* Get upstream connection */ + u_conn = flb_upstream_conn_get(ctx->u); + if (!u_conn) { + flb_plg_error(ctx->ins, + "cannot create upstream connection for append_blob"); + flb_sds_destroy(uri); + flb_free(blockid); + return FLB_RETRY; + } + + /* Format the data */ + ret = azure_blob_format(config, i_ins, + ctx, NULL, + FLB_EVENT_TYPE_LOGS, + tag, tag_len, + data, bytes, + &out_buf, &out_size); + if (ret != 0) { + flb_upstream_conn_release(u_conn); + flb_sds_destroy(uri); + flb_free(blockid); + return FLB_RETRY; + } + + /* Map buffer */ + payload_buf = out_buf; + payload_size = out_size; + + if (ctx->compress_gzip == FLB_TRUE || ctx->compress_blob == FLB_TRUE) { + ret = flb_gzip_compress((void *) out_buf, out_size, + &payload_buf, &payload_size); + if (ret == -1) { + flb_plg_error(ctx->ins, + "cannot gzip payload, disabling compression"); + } + else { + compressed = FLB_TRUE; + /* JSON buffer is not longer needed */ + flb_sds_destroy(out_buf); + } + } + + if (ctx->compress_blob == FLB_TRUE) { + content_encoding = AZURE_BLOB_CE_NONE; + content_type = AZURE_BLOB_CT_GZIP; + } + else if (compressed == FLB_TRUE) { + content_encoding = AZURE_BLOB_CE_GZIP; + content_type = AZURE_BLOB_CT_JSON; + } + + /* Create HTTP client context */ + c = flb_http_client(u_conn, FLB_HTTP_PUT, + uri, + payload_buf, payload_size, NULL, 0, NULL, 0); + if (!c) { + flb_plg_error(ctx->ins, "cannot create HTTP client context"); + flb_sds_destroy(out_buf); + flb_upstream_conn_release(u_conn); + flb_free(blockid); + return FLB_RETRY; + } + + /* Prepare headers and authentication */ + azb_http_client_setup(ctx, c, (ssize_t) payload_size, FLB_FALSE, + content_type, content_encoding); + + /* Send HTTP request */ + ret = flb_http_do(c, &b_sent); + flb_sds_destroy(uri); + + /* Release */ + if (compressed == FLB_FALSE) { + flb_sds_destroy(out_buf); + } + else { + flb_free(payload_buf); + } + + flb_upstream_conn_release(u_conn); + + /* Validate HTTP status */ + if (ret == -1) { + flb_plg_error(ctx->ins, "error sending append_blob"); + flb_free(blockid); + return FLB_RETRY; + } + + if (c->resp.status == 201) { + flb_plg_info(ctx->ins, "content appended to blob successfully"); + flb_http_client_destroy(c); + + if (ctx->btype == AZURE_BLOB_BLOCKBLOB) { + ret = azb_block_blob_commit(ctx, blockid, tag, ms); + flb_free(blockid); + return ret; + } + flb_free(blockid); + return FLB_OK; + } + else if (c->resp.status == 404) { + flb_plg_info(ctx->ins, "blob not found: %s", c->uri); + flb_http_client_destroy(c); + return CREATE_BLOB; + } + else if (c->resp.payload_size > 0) { + flb_plg_error(ctx->ins, "cannot append content to blob\n%s", + c->resp.payload); + if (strstr(c->resp.payload, "must be 0 for Create Append")) { + flb_http_client_destroy(c); + return CREATE_BLOB; + } + } + else { + flb_plg_error(ctx->ins, "cannot append content to blob"); + } + flb_http_client_destroy(c); + + return FLB_RETRY; +} + +static int create_blob(struct flb_azure_blob *ctx, char *name) +{ + int ret; + size_t b_sent; + flb_sds_t uri = NULL; + struct flb_http_client *c; + struct flb_connection *u_conn; + + uri = azb_uri_create_blob(ctx, name); + if (!uri) { + return FLB_RETRY; + } + + /* Get upstream connection */ + u_conn = flb_upstream_conn_get(ctx->u); + if (!u_conn) { + flb_plg_error(ctx->ins, + "cannot create upstream connection for create_append_blob"); + flb_sds_destroy(uri); + return FLB_RETRY; + } + + /* Create HTTP client context */ + c = flb_http_client(u_conn, FLB_HTTP_PUT, + uri, + NULL, 0, NULL, 0, NULL, 0); + if (!c) { + flb_plg_error(ctx->ins, "cannot create HTTP client context"); + flb_upstream_conn_release(u_conn); + flb_sds_destroy(uri); + return FLB_RETRY; + } + + /* Prepare headers and authentication */ + azb_http_client_setup(ctx, c, -1, FLB_TRUE, + AZURE_BLOB_CT_NONE, AZURE_BLOB_CE_NONE); + + /* Send HTTP request */ + ret = flb_http_do(c, &b_sent); + flb_sds_destroy(uri); + + if (ret == -1) { + flb_plg_error(ctx->ins, "error sending append_blob"); + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + return FLB_RETRY; + } + + if (c->resp.status == 201) { + flb_plg_info(ctx->ins, "blob created successfully: %s", c->uri); + } + else { + if (c->resp.payload_size > 0) { + flb_plg_error(ctx->ins, "http_status=%i cannot create append blob\n%s", + c->resp.status, c->resp.payload); + } + else { + flb_plg_error(ctx->ins, "http_status=%i cannot create append blob", + c->resp.status); + } + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + return FLB_RETRY; + } + + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + return FLB_OK; +} + +static int create_container(struct flb_azure_blob *ctx, char *name) +{ + int ret; + size_t b_sent; + flb_sds_t uri; + struct flb_http_client *c; + struct flb_connection *u_conn; + + /* Get upstream connection */ + u_conn = flb_upstream_conn_get(ctx->u); + if (!u_conn) { + flb_plg_error(ctx->ins, + "cannot create upstream connection for container creation"); + return FLB_FALSE; + } + + /* URI */ + uri = azb_uri_ensure_or_create_container(ctx); + if (!uri) { + flb_upstream_conn_release(u_conn); + return FLB_FALSE; + } + + /* Create HTTP client context */ + c = flb_http_client(u_conn, FLB_HTTP_PUT, + uri, + NULL, 0, NULL, 0, NULL, 0); + if (!c) { + flb_plg_error(ctx->ins, "cannot create HTTP client context"); + flb_upstream_conn_release(u_conn); + return FLB_FALSE; + } + + /* Prepare headers and authentication */ + azb_http_client_setup(ctx, c, -1, FLB_FALSE, + AZURE_BLOB_CT_NONE, AZURE_BLOB_CE_NONE); + + /* Send HTTP request */ + ret = flb_http_do(c, &b_sent); + + /* Release URI */ + flb_sds_destroy(uri); + + /* Validate http response */ + if (ret == -1) { + flb_plg_error(ctx->ins, "error requesting container creation"); + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + return FLB_FALSE; + } + + if (c->resp.status == 201) { + flb_plg_info(ctx->ins, "container '%s' created sucessfully", name); + } + else { + if (c->resp.payload_size > 0) { + flb_plg_error(ctx->ins, "cannot create container '%s'\n%s", + name, c->resp.payload); + } + else { + flb_plg_error(ctx->ins, "cannot create container '%s'\n%s", + name, c->resp.payload); + } + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + return FLB_FALSE; + } + + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + return FLB_TRUE; +} + +/* + * Check that the container exists, if it doesn't and the configuration property + * auto_create_container is enabled, it will send a request to create it. If it + * could not be created or auto_create_container is disabled, it returns FLB_FALSE. + */ +static int ensure_container(struct flb_azure_blob *ctx) +{ + int ret; + int status; + size_t b_sent; + flb_sds_t uri; + struct flb_http_client *c; + struct flb_connection *u_conn; + + uri = azb_uri_ensure_or_create_container(ctx); + if (!uri) { + return FLB_FALSE; + } + + /* Get upstream connection */ + u_conn = flb_upstream_conn_get(ctx->u); + if (!u_conn) { + flb_plg_error(ctx->ins, + "cannot create upstream connection for container check"); + flb_sds_destroy(uri); + return FLB_FALSE; + } + + /* Create HTTP client context */ + c = flb_http_client(u_conn, FLB_HTTP_GET, + uri, + NULL, 0, NULL, 0, NULL, 0); + if (!c) { + flb_plg_error(ctx->ins, "cannot create HTTP client context"); + flb_upstream_conn_release(u_conn); + return FLB_FALSE; + } + flb_http_strip_port_from_host(c); + + /* Prepare headers and authentication */ + azb_http_client_setup(ctx, c, -1, FLB_FALSE, + AZURE_BLOB_CT_NONE, AZURE_BLOB_CE_NONE); + + /* Send HTTP request */ + ret = flb_http_do(c, &b_sent); + flb_sds_destroy(uri); + + if (ret == -1) { + flb_plg_error(ctx->ins, "error requesting container properties"); + flb_upstream_conn_release(u_conn); + return FLB_FALSE; + } + + status = c->resp.status; + flb_http_client_destroy(c); + + /* Release connection */ + flb_upstream_conn_release(u_conn); + + /* Request was successful, validate HTTP status code */ + if (status == 404) { + /* The container was not found, try to create it */ + flb_plg_info(ctx->ins, "container '%s' not found, trying to create it", + ctx->container_name); + ret = create_container(ctx, ctx->container_name); + return ret; + } + else if (status == 200) { + return FLB_TRUE; + } + + return FLB_FALSE; +} + +static int cb_azure_blob_init(struct flb_output_instance *ins, + struct flb_config *config, void *data) +{ + struct flb_azure_blob *ctx = NULL; + (void) ins; + (void) config; + (void) data; + + ctx = flb_azure_blob_conf_create(ins, config); + if (!ctx) { + return -1; + } + + flb_output_set_http_debug_callbacks(ins); + return 0; +} + +static void cb_azure_blob_flush(struct flb_event_chunk *event_chunk, + struct flb_output_flush *out_flush, + struct flb_input_instance *i_ins, + void *out_context, + struct flb_config *config) +{ + int ret; + struct flb_azure_blob *ctx = out_context; + (void) i_ins; + (void) config; + + /* Validate the container exists, otherwise just create it */ + ret = ensure_container(ctx); + if (ret == FLB_FALSE) { + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + ret = send_blob(config, i_ins, ctx, + (char *) event_chunk->tag, /* use tag as 'name' */ + (char *) event_chunk->tag, flb_sds_len(event_chunk->tag), + (char *) event_chunk->data, event_chunk->size); + + if (ret == CREATE_BLOB) { + ret = create_blob(ctx, event_chunk->tag); + if (ret == FLB_OK) { + ret = send_blob(config, i_ins, ctx, + (char *) event_chunk->tag, /* use tag as 'name' */ + (char *) event_chunk->tag, + flb_sds_len(event_chunk->tag), + (char *) event_chunk->data, event_chunk->size); + } + } + + /* FLB_RETRY, FLB_OK, FLB_ERROR */ + FLB_OUTPUT_RETURN(ret); +} + +static int cb_azure_blob_exit(void *data, struct flb_config *config) +{ + struct flb_azure_blob *ctx = data; + + if (!ctx) { + return 0; + } + + flb_azure_blob_conf_destroy(ctx); + return 0; +} + +/* Configuration properties map */ +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_STR, "account_name", NULL, + 0, FLB_TRUE, offsetof(struct flb_azure_blob, account_name), + "Azure account name (mandatory)" + }, + + { + FLB_CONFIG_MAP_STR, "container_name", NULL, + 0, FLB_TRUE, offsetof(struct flb_azure_blob, container_name), + "Container name (mandatory)" + }, + + { + FLB_CONFIG_MAP_BOOL, "auto_create_container", "true", + 0, FLB_TRUE, offsetof(struct flb_azure_blob, auto_create_container), + "Auto create container if it don't exists" + }, + + { + FLB_CONFIG_MAP_STR, "blob_type", "appendblob", + 0, FLB_TRUE, offsetof(struct flb_azure_blob, blob_type), + "Set the block type: appendblob or blockblob" + }, + + { + FLB_CONFIG_MAP_STR, "compress", NULL, + 0, FLB_FALSE, 0, + "Set payload compression in network transfer. Option available is 'gzip'" + }, + + { + FLB_CONFIG_MAP_BOOL, "compress_blob", "false", + 0, FLB_TRUE, offsetof(struct flb_azure_blob, compress_blob), + "Enable block blob GZIP compression in the final blob file. This option is " + "not compatible with 'appendblob' block type" + }, + + { + FLB_CONFIG_MAP_BOOL, "emulator_mode", "false", + 0, FLB_TRUE, offsetof(struct flb_azure_blob, emulator_mode), + "Use emulator mode, enable it if you want to use Azurite" + }, + + { + FLB_CONFIG_MAP_STR, "shared_key", NULL, + 0, FLB_TRUE, offsetof(struct flb_azure_blob, shared_key), + "Azure shared key" + }, + + { + FLB_CONFIG_MAP_STR, "endpoint", NULL, + 0, FLB_TRUE, offsetof(struct flb_azure_blob, endpoint), + "Custom full URL endpoint to use an emulator" + }, + + { + FLB_CONFIG_MAP_STR, "path", NULL, + 0, FLB_TRUE, offsetof(struct flb_azure_blob, path), + "Set a path for your blob" + }, + + { + FLB_CONFIG_MAP_STR, "date_key", "@timestamp", + 0, FLB_TRUE, offsetof(struct flb_azure_blob, date_key), + "Name of the key that will have the record timestamp" + }, + + /* EOF */ + {0} +}; + +/* Plugin registration */ +struct flb_output_plugin out_azure_blob_plugin = { + .name = "azure_blob", + .description = "Azure Blob Storage", + .cb_init = cb_azure_blob_init, + .cb_flush = cb_azure_blob_flush, + .cb_exit = cb_azure_blob_exit, + + /* Test */ + .test_formatter.callback = azure_blob_format, + + .config_map = config_map, + + /* Plugin flags */ + .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS, +}; diff --git a/fluent-bit/plugins/out_azure_blob/azure_blob.h b/fluent-bit/plugins/out_azure_blob/azure_blob.h new file mode 100644 index 00000000..5cf8a292 --- /dev/null +++ b/fluent-bit/plugins/out_azure_blob/azure_blob.h @@ -0,0 +1,74 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_AZURE_BLOB_H +#define FLB_OUT_AZURE_BLOB_H + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_upstream.h> +#include <fluent-bit/flb_sds.h> + +/* Content-Type */ +#define AZURE_BLOB_CT "Content-Type" +#define AZURE_BLOB_CT_NONE 0 +#define AZURE_BLOB_CT_JSON 1 /* application/json */ +#define AZURE_BLOB_CT_GZIP 2 /* application/gzip */ + +/* Content-Encoding */ +#define AZURE_BLOB_CE "Content-Encoding" +#define AZURE_BLOB_CE_NONE 0 +#define AZURE_BLOB_CE_GZIP 1 /* gzip */ + +/* service endpoint */ +#define AZURE_ENDPOINT_PREFIX ".blob.core.windows.net" + +#define AZURE_BLOB_APPENDBLOB 0 +#define AZURE_BLOB_BLOCKBLOB 1 + +struct flb_azure_blob { + int auto_create_container; + int emulator_mode; + int compress_gzip; + int compress_blob; + flb_sds_t account_name; + flb_sds_t container_name; + flb_sds_t blob_type; + flb_sds_t shared_key; + flb_sds_t endpoint; + flb_sds_t path; + flb_sds_t date_key; + + /* + * Internal use + */ + int btype; /* blob type */ + flb_sds_t real_endpoint; + flb_sds_t base_uri; + flb_sds_t shared_key_prefix; + + /* Shared key */ + unsigned char *decoded_sk; /* decoded shared key */ + size_t decoded_sk_size; /* size of decoded shared key */ + + /* Upstream connection */ + struct flb_upstream *u; + struct flb_output_instance *ins; +}; + +#endif diff --git a/fluent-bit/plugins/out_azure_blob/azure_blob_appendblob.c b/fluent-bit/plugins/out_azure_blob/azure_blob_appendblob.c new file mode 100644 index 00000000..2d9a8217 --- /dev/null +++ b/fluent-bit/plugins/out_azure_blob/azure_blob_appendblob.c @@ -0,0 +1,44 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_sds.h> + +#include "azure_blob.h" +#include "azure_blob_conf.h" +#include "azure_blob_uri.h" + +flb_sds_t azb_append_blob_uri(struct flb_azure_blob *ctx, char *tag) +{ + flb_sds_t uri; + + uri = azb_uri_container(ctx); + if (!uri) { + return NULL; + } + + if (ctx->path) { + flb_sds_printf(&uri, "/%s/%s?comp=appendblock", ctx->path, tag); + } + else { + flb_sds_printf(&uri, "/%s?comp=appendblock", tag); + } + + return uri; +} diff --git a/fluent-bit/plugins/out_azure_blob/azure_blob_appendblob.h b/fluent-bit/plugins/out_azure_blob/azure_blob_appendblob.h new file mode 100644 index 00000000..9ab103b0 --- /dev/null +++ b/fluent-bit/plugins/out_azure_blob/azure_blob_appendblob.h @@ -0,0 +1,28 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef AZURE_BLOB_APPENDBLOB_H +#define AZURE_BLOB_APPENDBLOB_H + +#include <fluent-bit/flb_output_plugin.h> +#include "azure_blob.h" + +flb_sds_t azb_append_blob_uri(struct flb_azure_blob *ctx, char *tag); + +#endif diff --git a/fluent-bit/plugins/out_azure_blob/azure_blob_blockblob.c b/fluent-bit/plugins/out_azure_blob/azure_blob_blockblob.c new file mode 100644 index 00000000..a9b0e4a2 --- /dev/null +++ b/fluent-bit/plugins/out_azure_blob/azure_blob_blockblob.c @@ -0,0 +1,238 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_base64.h> +#include <fluent-bit/flb_time.h> +#include <fluent-bit/flb_sds.h> + +#include <math.h> + +#include "azure_blob.h" +#include "azure_blob_conf.h" +#include "azure_blob_uri.h" +#include "azure_blob_http.h" + +flb_sds_t azb_block_blob_uri(struct flb_azure_blob *ctx, char *tag, + char *blockid, uint64_t ms) +{ + int len; + flb_sds_t uri; + char *ext; + char *encoded_blockid; + + len = strlen(blockid); + encoded_blockid = azb_uri_encode(blockid, len); + if (!encoded_blockid) { + return NULL; + } + + uri = azb_uri_container(ctx); + if (!uri) { + flb_sds_destroy(encoded_blockid); + return NULL; + } + + if (ctx->compress_blob == FLB_TRUE) { + ext = ".gz"; + } + else { + ext = ""; + } + + if (ctx->path) { + flb_sds_printf(&uri, "/%s/%s.%" PRIu64 "%s?blockid=%s&comp=block", + ctx->path, tag, ms, ext, encoded_blockid); + } + else { + flb_sds_printf(&uri, "/%s.%" PRIu64 "%s?blockid=%s&comp=block", + tag, ms, ext, encoded_blockid); + } + + flb_sds_destroy(encoded_blockid); + return uri; +} + +flb_sds_t azb_block_blob_uri_commit(struct flb_azure_blob *ctx, + char *tag, uint64_t ms) +{ + char *ext; + flb_sds_t uri; + + uri = azb_uri_container(ctx); + if (!uri) { + return NULL; + } + + if (ctx->compress_blob == FLB_TRUE) { + ext = ".gz"; + } + else { + ext = ""; + } + + if (ctx->path) { + flb_sds_printf(&uri, "/%s/%s.%" PRIu64 "%s?comp=blocklist", ctx->path, tag, + ms, ext); + } + else { + flb_sds_printf(&uri, "/%s.%" PRIu64 "%s?comp=blocklist", tag, ms, ext); + } + + return uri; +} + +/* Generate a block id */ +char *azb_block_blob_id(uint64_t *ms) +{ + int len; + int ret; + double now; + char tmp[32]; + size_t size; + size_t o_len; + char *b64; + struct flb_time tm; + + /* Get current time */ + flb_time_get(&tm); + + /* + * Set outgoing time in milliseconds: this is used as a suffix for the + * block name + */ + *ms = ((tm.tm.tv_sec * 1000) + (tm.tm.tv_nsec / 1000000)); + + /* Convert time to double to format the block id */ + now = flb_time_to_double(&tm); + len = snprintf(tmp, sizeof(tmp), "flb-%.4f.id", now); + + /* Allocate space for the outgoing base64 buffer */ + size = (4 * ceil(((double) len / 3) + 1)); + b64 = flb_malloc(size); + if (!b64) { + return NULL; + } + + /* base64 encode block id */ + ret = flb_base64_encode((unsigned char *) b64, size, &o_len, + (unsigned char *) tmp, len); + if (ret != 0) { + flb_free(b64); + return NULL; + } + return b64; +} + +int azb_block_blob_commit(struct flb_azure_blob *ctx, char *blockid, char *tag, + uint64_t ms) +{ + int ret; + size_t b_sent; + flb_sds_t uri = NULL; + flb_sds_t payload; + struct flb_http_client *c; + struct flb_connection *u_conn; + + /* Get upstream connection */ + u_conn = flb_upstream_conn_get(ctx->u); + if (!u_conn) { + flb_plg_error(ctx->ins, + "cannot create upstream connection for blockblob commit"); + return FLB_RETRY; + } + + /* Compose commit URI */ + uri = azb_block_blob_uri_commit(ctx, tag, ms); + if (!uri) { + flb_upstream_conn_release(u_conn); + return FLB_ERROR; + } + + payload = flb_sds_create_size(256); + if (!payload) { + flb_sds_destroy(uri); + flb_upstream_conn_release(u_conn); + return FLB_ERROR; + } + + flb_sds_printf(&payload, + "<?xml version=\"1.0\" encoding=\"utf-8\"?>" + "<BlockList>" + " <Latest>%s</Latest>" + "</BlockList>", + blockid); + + /* Create HTTP client context */ + c = flb_http_client(u_conn, FLB_HTTP_PUT, + uri, + payload, flb_sds_len(payload), NULL, 0, NULL, 0); + if (!c) { + flb_plg_error(ctx->ins, "cannot create HTTP client context"); + flb_sds_destroy(payload); + flb_sds_destroy(uri); + flb_upstream_conn_release(u_conn); + return FLB_RETRY; + } + + /* Prepare headers and authentication */ + azb_http_client_setup(ctx, c, flb_sds_len(payload), + FLB_FALSE, + AZURE_BLOB_CT_NONE, AZURE_BLOB_CE_NONE); + + /* Send HTTP request */ + ret = flb_http_do(c, &b_sent); + flb_sds_destroy(uri); + flb_sds_destroy(payload); + + /* Validate HTTP status */ + if (ret == -1) { + flb_plg_error(ctx->ins, "error sending append_blob"); + return FLB_RETRY; + } + + if (c->resp.status == 201) { + flb_plg_info(ctx->ins, "blob id %s committed successfully", blockid); + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + return FLB_OK; + } + else if (c->resp.status == 404) { + flb_plg_info(ctx->ins, "blob not found: %s", c->uri); + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + return FLB_RETRY; + } + else if (c->resp.payload_size > 0) { + flb_plg_error(ctx->ins, "cannot commit blob id %s\n%s", + blockid, c->resp.payload); + if (strstr(c->resp.payload, "must be 0 for Create Append")) { + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + return FLB_RETRY; + } + } + else { + flb_plg_error(ctx->ins, "cannot append content to blob"); + } + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + + return FLB_OK; +} diff --git a/fluent-bit/plugins/out_azure_blob/azure_blob_blockblob.h b/fluent-bit/plugins/out_azure_blob/azure_blob_blockblob.h new file mode 100644 index 00000000..ee210d13 --- /dev/null +++ b/fluent-bit/plugins/out_azure_blob/azure_blob_blockblob.h @@ -0,0 +1,32 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef AZURE_BLOB_BLOCKBLOB_H +#define AZURE_BLOB_BLOCKBLOB_H + +#include <fluent-bit/flb_output_plugin.h> +#include "azure_blob.h" + +flb_sds_t azb_block_blob_uri(struct flb_azure_blob *ctx, char *tag, char *blockid, + uint64_t ms); +char *azb_block_blob_id(uint64_t *ms); +int azb_block_blob_commit(struct flb_azure_blob *ctx, char *blockid, char *tag, + uint64_t ms); + +#endif diff --git a/fluent-bit/plugins/out_azure_blob/azure_blob_conf.c b/fluent-bit/plugins/out_azure_blob/azure_blob_conf.c new file mode 100644 index 00000000..4437a6d2 --- /dev/null +++ b/fluent-bit/plugins/out_azure_blob/azure_blob_conf.c @@ -0,0 +1,245 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_base64.h> + +#include "azure_blob.h" +#include "azure_blob_conf.h" + +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> + +static int set_shared_key(struct flb_azure_blob *ctx) +{ + int s; + int ret; + size_t o_len = 0; + + s = flb_sds_len(ctx->shared_key); + + /* buffer for final hex key */ + ctx->decoded_sk = flb_malloc(s * 2); + if (!ctx->decoded_sk) { + return -1; + } + + /* decode base64 */ + ret = flb_base64_decode(ctx->decoded_sk, s * 2, + &o_len, + (unsigned char *)ctx->shared_key, + flb_sds_len(ctx->shared_key)); + if (ret != 0) { + flb_plg_error(ctx->ins, "cannot decode shared_key"); + return -1; + } + + ctx->decoded_sk_size = o_len; + return 0; +} + +struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *ins, + struct flb_config *config) +{ + int ret; + int port; + int io_flags = 0; + flb_sds_t tmp; + struct flb_azure_blob *ctx; + + ctx = flb_calloc(1, sizeof(struct flb_azure_blob)); + if (!ctx) { + flb_errno(); + return NULL; + } + ctx->ins = ins; + + /* Set context */ + flb_output_set_context(ins, ctx); + + /* Load config map */ + ret = flb_output_config_map_set(ins, (void *) ctx); + if (ret == -1) { + return NULL; + } + + if (!ctx->container_name) { + flb_plg_error(ctx->ins, "'container_name' has not been set"); + return NULL; + } + + /* If the shared key is set decode it */ + if (ctx->shared_key) { + ret = set_shared_key(ctx); + if (ret == -1) { + return NULL; + } + } + + /* Set Blob type */ + tmp = (char *) flb_output_get_property("blob_type", ins); + if (!tmp) { + ctx->btype = AZURE_BLOB_APPENDBLOB; + } + else { + if (strcasecmp(tmp, "appendblob") == 0) { + ctx->btype = AZURE_BLOB_APPENDBLOB; + } + else if (strcasecmp(tmp, "blockblob") == 0) { + ctx->btype = AZURE_BLOB_BLOCKBLOB; + } + else { + flb_plg_error(ctx->ins, "invalid blob_type value '%s'", tmp); + return NULL; + } + } + + /* Compress (gzip) */ + tmp = (char *) flb_output_get_property("compress", ins); + ctx->compress_gzip = FLB_FALSE; + if (tmp) { + if (strcasecmp(tmp, "gzip") == 0) { + ctx->compress_gzip = FLB_TRUE; + } + } + + /* Compress Blob: only availabel for blockblob type */ + if (ctx->compress_blob == FLB_TRUE && ctx->btype != AZURE_BLOB_BLOCKBLOB) { + flb_plg_error(ctx->ins, + "the option 'compress_blob' is not compatible with 'appendblob' " + "blob_type"); + return NULL; + } + + /* + * Setting up the real endpoint: + * + * If the user provided a custom endpoint, just parse it. Here we need to + * discover if a TLS connection is required, just use the protocol prefix. + */ + if (ctx->endpoint) { + if (strncmp(ctx->endpoint, "https", 5) == 0) { + io_flags |= FLB_IO_TLS; + } + else { + io_flags |= FLB_IO_TCP; + } + + ctx->u = flb_upstream_create_url(config, ctx->endpoint, + io_flags, ins->tls); + if (!ctx->u) { + flb_plg_error(ctx->ins, "invalid endpoint '%s'", ctx->endpoint); + return NULL; + } + ctx->real_endpoint = flb_sds_create(ctx->endpoint); + } + else { + ctx->real_endpoint = flb_sds_create_size(256); + if (!ctx->real_endpoint) { + flb_plg_error(ctx->ins, "cannot create endpoint"); + return NULL; + } + flb_sds_printf(&ctx->real_endpoint, "%s%s", + ctx->account_name, + AZURE_ENDPOINT_PREFIX); + + /* use TLS ? */ + if (ins->use_tls == FLB_TRUE) { + port = 443; + io_flags = FLB_IO_TLS; + } + else { + port = 80; + io_flags = FLB_IO_TCP; + } + + ctx->u = flb_upstream_create(config, ctx->real_endpoint, port, io_flags, + ins->tls); + if (!ctx->u) { + flb_plg_error(ctx->ins, "cannot create upstream for endpoint '%s'", + ctx->real_endpoint); + return NULL; + } + } + flb_output_upstream_set(ctx->u, ins); + + /* Compose base uri */ + ctx->base_uri = flb_sds_create_size(256); + if (!ctx->base_uri) { + flb_plg_error(ctx->ins, "cannot create base_uri for endpoint '%s'", + ctx->real_endpoint); + return NULL; + } + + if (ctx->emulator_mode == FLB_TRUE) { + flb_sds_printf(&ctx->base_uri, "/%s/", ctx->account_name); + } + else { + flb_sds_printf(&ctx->base_uri, "/"); + } + + /* Prepare shared key buffer */ + ctx->shared_key_prefix = flb_sds_create_size(256); + if (!ctx->shared_key_prefix) { + flb_plg_error(ctx->ins, "cannot create shared key prefix"); + return NULL; + } + flb_sds_printf(&ctx->shared_key_prefix, "SharedKey %s:", ctx->account_name); + + /* Sanitize path: remove any ending slash */ + if (ctx->path) { + if (ctx->path[flb_sds_len(ctx->path) - 1] == '/') { + ctx->path[flb_sds_len(ctx->path) - 1] = '\0'; + } + } + + flb_plg_info(ctx->ins, + "account_name=%s, container_name=%s, blob_type=%s, emulator_mode=%s, endpoint=%s", + ctx->account_name, ctx->container_name, + ctx->btype == AZURE_BLOB_APPENDBLOB ? "appendblob": "blockblob", + ctx->emulator_mode ? "yes": "no", + ctx->real_endpoint ? ctx->real_endpoint: "no"); + return ctx; +} + +void flb_azure_blob_conf_destroy(struct flb_azure_blob *ctx) +{ + if (ctx->decoded_sk) { + flb_free(ctx->decoded_sk); + } + + if (ctx->base_uri) { + flb_sds_destroy(ctx->base_uri); + } + + if (ctx->real_endpoint) { + flb_sds_destroy(ctx->real_endpoint); + } + + if (ctx->shared_key_prefix) { + flb_sds_destroy(ctx->shared_key_prefix); + } + + if (ctx->u) { + flb_upstream_destroy(ctx->u); + } + + flb_free(ctx); +} diff --git a/fluent-bit/plugins/out_azure_blob/azure_blob_conf.h b/fluent-bit/plugins/out_azure_blob/azure_blob_conf.h new file mode 100644 index 00000000..32a85c67 --- /dev/null +++ b/fluent-bit/plugins/out_azure_blob/azure_blob_conf.h @@ -0,0 +1,29 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_AZURE_BLOB_CONF_H +#define FLB_OUT_AZURE_BLOB_CONF_H + +#include <fluent-bit/flb_output_plugin.h> + +struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *ins, + struct flb_config *config); +void flb_azure_blob_conf_destroy(struct flb_azure_blob *ctx); + +#endif diff --git a/fluent-bit/plugins/out_azure_blob/azure_blob_http.c b/fluent-bit/plugins/out_azure_blob/azure_blob_http.c new file mode 100644 index 00000000..5ac81a9a --- /dev/null +++ b/fluent-bit/plugins/out_azure_blob/azure_blob_http.c @@ -0,0 +1,361 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_http_client.h> +#include <fluent-bit/flb_base64.h> +#include <fluent-bit/flb_crypto.h> +#include <fluent-bit/flb_hmac.h> +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/flb_kv.h> + +#include "azure_blob.h" +#include "azure_blob_uri.h" + +static int hmac_sha256_sign(unsigned char out[32], + unsigned char *key, size_t key_len, + unsigned char *msg, size_t msg_len) +{ + return flb_hmac_simple(FLB_HASH_SHA256, + key, key_len, + msg, msg_len, + out, 32); +} + +static flb_sds_t canonical_headers(struct flb_http_client *c) +{ + flb_sds_t ch; + flb_sds_t tmp; + struct flb_kv *kv; + struct mk_list *head; + + ch = flb_sds_create_size(mk_list_size(&c->headers) * 64); + if (!ch) { + return NULL; + } + + mk_list_foreach(head, &c->headers) { + kv = mk_list_entry(head, struct flb_kv, _head); + if (strncmp(kv->key, "x-ms-", 5) != 0) { + continue; + } + + /* key */ + tmp = flb_sds_cat(ch, kv->key, flb_sds_len(kv->key)); + if (!tmp) { + flb_sds_destroy(ch); + return NULL; + } + ch = tmp; + + /* sep */ + tmp = flb_sds_cat(ch, ":", 1); + if (!tmp) { + flb_sds_destroy(ch); + return NULL; + } + ch = tmp; + + /* value */ + tmp = flb_sds_cat(ch, kv->val, flb_sds_len(kv->val)); + if (!tmp) { + flb_sds_destroy(ch); + return NULL; + } + ch = tmp; + + tmp = flb_sds_cat(ch, "\n", 1); + if (!tmp) { + flb_sds_destroy(ch); + return NULL; + } + ch = tmp; + } + + return ch; +} + +static flb_sds_t canonical_resource(struct flb_azure_blob *ctx, + struct flb_http_client *c) +{ + int pos; + int len; + int kv_start; + char *p; + size_t size; + flb_sds_t cr; + flb_sds_t dec_uri; + flb_sds_t tmp; + + len = strlen(c->uri); + size = flb_sds_len(ctx->account_name) + len + 64; + + cr = flb_sds_create_size(size); + if (!cr) { + return NULL; + } + + dec_uri = azb_uri_decode(c->uri, len); + tmp = flb_sds_printf(&cr, "/%s%s", ctx->account_name, dec_uri); + if (!tmp) { + flb_sds_destroy(dec_uri); + flb_sds_destroy(cr); + return NULL; + } + flb_sds_destroy(dec_uri); + + pos = 1 + flb_sds_len(ctx->account_name); + + p = strchr(cr + pos, '?'); + if (p) { + kv_start = FLB_TRUE; + while (*p) { + if (*p == '?') { + *p = '\n'; + } + else if (*p == '=' && kv_start == FLB_TRUE) { + *p = ':'; + kv_start = FLB_FALSE; + } + else if (*p == '&') { + *p = '\n'; + kv_start = FLB_TRUE; + } + p++; + } + } + + return cr; +} + +flb_sds_t azb_http_canonical_request(struct flb_azure_blob *ctx, + struct flb_http_client *c, + ssize_t content_length, + int content_type, + int content_encoding) +{ + int ret; + size_t size; + size_t o_len = 0; + flb_sds_t can_req; + flb_sds_t can_res; + flb_sds_t can_headers; + flb_sds_t tmp = NULL; + char *b64 = NULL; + char *encoding; + char *ctype = ""; + unsigned char signature[32]; + + size = strlen(c->uri) + (mk_list_size(&c->headers) * 64) + 256; + can_req = flb_sds_create_size(size); + if (!can_req) { + flb_plg_error(ctx->ins, "cannot allocate buffer for canonical request"); + return NULL; + } + + switch (c->method) { + case FLB_HTTP_GET: + tmp = flb_sds_cat(can_req, "GET\n", 4); + break; + case FLB_HTTP_POST: + tmp = flb_sds_cat(can_req, "POST\n", 5); + break; + case FLB_HTTP_PUT: + tmp = flb_sds_cat(can_req, "PUT\n", 4); + break; + }; + + if (!tmp) { + flb_plg_error(ctx->ins, "invalid processing HTTP method"); + flb_sds_destroy(can_req); + return NULL; + } + + if (content_encoding == AZURE_BLOB_CE_GZIP) { + encoding = "gzip"; + } + else { + encoding = ""; + } + + flb_sds_printf(&can_req, + "%s\n" /* Content-Encoding */ + "\n", /* Content-Language */ + encoding + ); + + if (content_length >= 0) { + flb_sds_printf(&can_req, + "%zi\n" /* Content-Length */, + content_length); + } + else { + flb_sds_printf(&can_req, + "\n" /* Content-Length */ + ); + } + + if (content_type == AZURE_BLOB_CT_NONE) { + ctype = ""; + } + else if (content_type == AZURE_BLOB_CT_JSON) { + ctype = "application/json"; + } + else if (content_type == AZURE_BLOB_CT_GZIP) { + ctype = "application/gzip"; + } + + flb_sds_printf(&can_req, + "\n" /* Content-MD5 */ + "%s\n" /* Content-Type */ + "\n" /* Date */ + "\n" /* If-Modified-Since */ + "\n" /* If-Match */ + "\n" /* If-None-Match */ + "\n" /* If-Unmodified-Since */ + "\n" /* Range */, + ctype); + + /* Append canonicalized headers */ + can_headers = canonical_headers(c); + if (!can_headers) { + flb_sds_destroy(can_req); + return NULL; + } + tmp = flb_sds_cat(can_req, can_headers, flb_sds_len(can_headers)); + if (!tmp) { + flb_sds_destroy(can_req); + flb_sds_destroy(can_headers); + return NULL; + } + can_req = tmp; + flb_sds_destroy(can_headers); + + /* Append canonical resource */ + can_res = canonical_resource(ctx, c); + if (!can_res) { + flb_sds_destroy(can_req); + return NULL; + } + tmp = flb_sds_cat(can_req, can_res, flb_sds_len(can_res)); + if (!tmp) { + flb_sds_destroy(can_res); + flb_sds_destroy(can_req); + return NULL; + } + can_req = tmp; + flb_sds_destroy(can_res); + + flb_plg_trace(ctx->ins, "string to sign\n%s", can_req); + + /* Signature */ + hmac_sha256_sign(signature, ctx->decoded_sk, ctx->decoded_sk_size, + (unsigned char *) can_req, flb_sds_len(can_req)); + flb_sds_destroy(can_req); + + /* base64 decoded size */ + size = ((4 * ((sizeof(signature) + 1)) / 3) + 1); + b64 = flb_sds_create_size(size); + if (!b64) { + return NULL; + } + + ret = flb_base64_encode((unsigned char *) b64, size, &o_len, + signature, sizeof(signature)); + if (ret != 0) { + flb_sds_destroy(b64); + return NULL; + } + flb_sds_len_set(b64, o_len); + + return b64; +} + +int azb_http_client_setup(struct flb_azure_blob *ctx, struct flb_http_client *c, + ssize_t content_length, int blob_type, + int content_type, int content_encoding) +{ + int len; + time_t now; + struct tm tm; + char tmp[64]; + flb_sds_t can_req; + flb_sds_t auth; + + /* Header: User Agent */ + flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); + + /* Header: Content-Type */ + if (content_type == AZURE_BLOB_CT_JSON) { + flb_http_add_header(c, + AZURE_BLOB_CT, sizeof(AZURE_BLOB_CT) - 1, + "application/json", 16); + } + else if (content_type == AZURE_BLOB_CT_GZIP) { + flb_http_add_header(c, + AZURE_BLOB_CT, sizeof(AZURE_BLOB_CT) - 1, + "application/gzip", 16); + } + + if (content_encoding == AZURE_BLOB_CE_GZIP) { + flb_http_add_header(c, + AZURE_BLOB_CE, sizeof(AZURE_BLOB_CE) - 1, + "gzip", 4); + } + + /* Azure header: x-ms-blob-type */ + if (blob_type == FLB_TRUE) { + if (ctx->btype == AZURE_BLOB_APPENDBLOB) { + flb_http_add_header(c, "x-ms-blob-type", 14, "AppendBlob", 10); + } + else if (ctx->btype == AZURE_BLOB_BLOCKBLOB) { + flb_http_add_header(c, "x-ms-blob-type", 14, "BlockBlob", 9); + } + } + + /* Azure header: x-ms-date */ + now = time(NULL); + gmtime_r(&now, &tm); + len = strftime(tmp, sizeof(tmp) - 1, "%a, %d %b %Y %H:%M:%S GMT", &tm); + + flb_http_add_header(c, "x-ms-date", 9, tmp, len); + + /* Azure header: x-ms-version */ + flb_http_add_header(c, "x-ms-version", 12, "2019-12-12", 10); + + can_req = azb_http_canonical_request(ctx, c, content_length, content_type, + content_encoding); + + auth = flb_sds_create_size(64 + flb_sds_len(can_req)); + + flb_sds_cat(auth, ctx->shared_key_prefix, flb_sds_len(ctx->shared_key_prefix)); + flb_sds_cat(auth, can_req, flb_sds_len(can_req)); + + /* Azure header: authorization */ + flb_http_add_header(c, "Authorization", 13, auth, flb_sds_len(auth)); + + /* Release buffers */ + flb_sds_destroy(can_req); + flb_sds_destroy(auth); + + /* Set callback context to the HTTP client context */ + flb_http_set_callback_context(c, ctx->ins->callback); + + return 0; +} diff --git a/fluent-bit/plugins/out_azure_blob/azure_blob_http.h b/fluent-bit/plugins/out_azure_blob/azure_blob_http.h new file mode 100644 index 00000000..04f7cfd9 --- /dev/null +++ b/fluent-bit/plugins/out_azure_blob/azure_blob_http.h @@ -0,0 +1,36 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef AZURE_BLOB_HTTP_H +#define AZURE_BLOB_HTTP_H + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_http_client.h> +#include "azure_blob.h" + +int azb_http_client_setup(struct flb_azure_blob *ctx, struct flb_http_client *c, + ssize_t content_length, int blob_type, + int content_type, int content_encoding); + +flb_sds_t azb_http_canonical_request(struct flb_azure_blob *ctx, + struct flb_http_client *c, + ssize_t content_length, + int content_type); + +#endif diff --git a/fluent-bit/plugins/out_azure_blob/azure_blob_uri.c b/fluent-bit/plugins/out_azure_blob/azure_blob_uri.c new file mode 100644 index 00000000..c7a05e28 --- /dev/null +++ b/fluent-bit/plugins/out_azure_blob/azure_blob_uri.c @@ -0,0 +1,150 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_sds.h> + +#include "azure_blob.h" + +static inline int to_encode(char c) +{ + if ((c >= 48 && c <= 57) || /* 0-9 */ + (c >= 65 && c <= 90) || /* A-Z */ + (c >= 97 && c <= 122) || /* a-z */ + (c == '?' || c == '&' || c == '-' || c == '_' || c == '.' || + c == '~' || c == '/')) { + return FLB_FALSE; + } + + return FLB_TRUE; +} + +flb_sds_t azb_uri_encode(const char *uri, size_t len) +{ + int i; + flb_sds_t buf = NULL; + flb_sds_t tmp = NULL; + + buf = flb_sds_create_size(len * 2); + if (!buf) { + flb_error("[uri] cannot allocate buffer for URI encoding"); + return NULL; + } + + for (i = 0; i < len; i++) { + if (to_encode(uri[i]) == FLB_TRUE) { + tmp = flb_sds_printf(&buf, "%%%02X", (unsigned char) *(uri + i)); + if (!tmp) { + flb_sds_destroy(buf); + return NULL; + } + continue; + } + + /* Direct assignment, just copy the character */ + if (buf) { + tmp = flb_sds_cat(buf, uri + i, 1); + if (!tmp) { + flb_sds_destroy(buf); + return NULL; + } + buf = tmp; + } + } + + return buf; +} + +flb_sds_t azb_uri_decode(const char *uri, size_t len) +{ + int i; + int hex_result; + int c = 0; + char hex[3]; + flb_sds_t out; + + out = flb_sds_create_size(len); + if (!out) { + return NULL; + } + + for (i = 0; i < len; i++) { + if (uri[i] == '%') { + hex[0] = uri[i + 1]; + hex[1] = uri[i + 2]; + hex[2] = '\0'; + + hex_result = flb_utils_hex2int(hex, 2); + out[c++] = hex_result; + i += 2; + } + else { + out[c++] = uri[i]; + } + } + out[c++] = '\0'; + + return out; +} + +flb_sds_t azb_uri_container(struct flb_azure_blob *ctx) +{ + flb_sds_t uri; + + uri = flb_sds_create_size(256); + if (!uri) { + return NULL; + } + + flb_sds_printf(&uri, "%s%s", ctx->base_uri, ctx->container_name); + return uri; +} + +flb_sds_t azb_uri_ensure_or_create_container(struct flb_azure_blob *ctx) +{ + flb_sds_t uri; + + uri = azb_uri_container(ctx); + if (!uri) { + return NULL; + } + + flb_sds_printf(&uri, "?restype=container"); + return uri; +} + +flb_sds_t azb_uri_create_blob(struct flb_azure_blob *ctx, char *tag) +{ + flb_sds_t uri; + + uri = azb_uri_container(ctx); + if (!uri) { + return NULL; + } + + if (ctx->path) { + flb_sds_printf(&uri, "/%s/%s", ctx->path, tag); + } + else { + flb_sds_printf(&uri, "/%s", tag); + } + + return uri; +} diff --git a/fluent-bit/plugins/out_azure_blob/azure_blob_uri.h b/fluent-bit/plugins/out_azure_blob/azure_blob_uri.h new file mode 100644 index 00000000..ffeed763 --- /dev/null +++ b/fluent-bit/plugins/out_azure_blob/azure_blob_uri.h @@ -0,0 +1,34 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_AZURE_BLOB_URI +#define FLB_AZURE_BLOB_URI + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_sds.h> + +#include "azure_blob.h" + +flb_sds_t azb_uri_container(struct flb_azure_blob *ctx); +flb_sds_t azb_uri_ensure_or_create_container(struct flb_azure_blob *ctx); +flb_sds_t azb_uri_create_blob(struct flb_azure_blob *ctx, char *tag); +flb_sds_t azb_uri_encode(const char *uri, size_t len); +flb_sds_t azb_uri_decode(const char *uri, size_t len); + +#endif |