summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/out_azure_blob/azure_blob_blockblob.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/plugins/out_azure_blob/azure_blob_blockblob.c')
-rw-r--r--src/fluent-bit/plugins/out_azure_blob/azure_blob_blockblob.c238
1 files changed, 238 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/out_azure_blob/azure_blob_blockblob.c b/src/fluent-bit/plugins/out_azure_blob/azure_blob_blockblob.c
new file mode 100644
index 000000000..a9b0e4a28
--- /dev/null
+++ b/src/fluent-bit/plugins/out_azure_blob/azure_blob_blockblob.c
@@ -0,0 +1,238 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_base64.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_sds.h>
+
+#include <math.h>
+
+#include "azure_blob.h"
+#include "azure_blob_conf.h"
+#include "azure_blob_uri.h"
+#include "azure_blob_http.h"
+
+flb_sds_t azb_block_blob_uri(struct flb_azure_blob *ctx, char *tag,
+ char *blockid, uint64_t ms)
+{
+ int len;
+ flb_sds_t uri;
+ char *ext;
+ char *encoded_blockid;
+
+ len = strlen(blockid);
+ encoded_blockid = azb_uri_encode(blockid, len);
+ if (!encoded_blockid) {
+ return NULL;
+ }
+
+ uri = azb_uri_container(ctx);
+ if (!uri) {
+ flb_sds_destroy(encoded_blockid);
+ return NULL;
+ }
+
+ if (ctx->compress_blob == FLB_TRUE) {
+ ext = ".gz";
+ }
+ else {
+ ext = "";
+ }
+
+ if (ctx->path) {
+ flb_sds_printf(&uri, "/%s/%s.%" PRIu64 "%s?blockid=%s&comp=block",
+ ctx->path, tag, ms, ext, encoded_blockid);
+ }
+ else {
+ flb_sds_printf(&uri, "/%s.%" PRIu64 "%s?blockid=%s&comp=block",
+ tag, ms, ext, encoded_blockid);
+ }
+
+ flb_sds_destroy(encoded_blockid);
+ return uri;
+}
+
+flb_sds_t azb_block_blob_uri_commit(struct flb_azure_blob *ctx,
+ char *tag, uint64_t ms)
+{
+ char *ext;
+ flb_sds_t uri;
+
+ uri = azb_uri_container(ctx);
+ if (!uri) {
+ return NULL;
+ }
+
+ if (ctx->compress_blob == FLB_TRUE) {
+ ext = ".gz";
+ }
+ else {
+ ext = "";
+ }
+
+ if (ctx->path) {
+ flb_sds_printf(&uri, "/%s/%s.%" PRIu64 "%s?comp=blocklist", ctx->path, tag,
+ ms, ext);
+ }
+ else {
+ flb_sds_printf(&uri, "/%s.%" PRIu64 "%s?comp=blocklist", tag, ms, ext);
+ }
+
+ return uri;
+}
+
+/* Generate a block id */
+char *azb_block_blob_id(uint64_t *ms)
+{
+ int len;
+ int ret;
+ double now;
+ char tmp[32];
+ size_t size;
+ size_t o_len;
+ char *b64;
+ struct flb_time tm;
+
+ /* Get current time */
+ flb_time_get(&tm);
+
+ /*
+ * Set outgoing time in milliseconds: this is used as a suffix for the
+ * block name
+ */
+ *ms = ((tm.tm.tv_sec * 1000) + (tm.tm.tv_nsec / 1000000));
+
+ /* Convert time to double to format the block id */
+ now = flb_time_to_double(&tm);
+ len = snprintf(tmp, sizeof(tmp), "flb-%.4f.id", now);
+
+ /* Allocate space for the outgoing base64 buffer */
+ size = (4 * ceil(((double) len / 3) + 1));
+ b64 = flb_malloc(size);
+ if (!b64) {
+ return NULL;
+ }
+
+ /* base64 encode block id */
+ ret = flb_base64_encode((unsigned char *) b64, size, &o_len,
+ (unsigned char *) tmp, len);
+ if (ret != 0) {
+ flb_free(b64);
+ return NULL;
+ }
+ return b64;
+}
+
+int azb_block_blob_commit(struct flb_azure_blob *ctx, char *blockid, char *tag,
+ uint64_t ms)
+{
+ int ret;
+ size_t b_sent;
+ flb_sds_t uri = NULL;
+ flb_sds_t payload;
+ struct flb_http_client *c;
+ struct flb_connection *u_conn;
+
+ /* Get upstream connection */
+ u_conn = flb_upstream_conn_get(ctx->u);
+ if (!u_conn) {
+ flb_plg_error(ctx->ins,
+ "cannot create upstream connection for blockblob commit");
+ return FLB_RETRY;
+ }
+
+ /* Compose commit URI */
+ uri = azb_block_blob_uri_commit(ctx, tag, ms);
+ if (!uri) {
+ flb_upstream_conn_release(u_conn);
+ return FLB_ERROR;
+ }
+
+ payload = flb_sds_create_size(256);
+ if (!payload) {
+ flb_sds_destroy(uri);
+ flb_upstream_conn_release(u_conn);
+ return FLB_ERROR;
+ }
+
+ flb_sds_printf(&payload,
+ "<?xml version=\"1.0\" encoding=\"utf-8\"?>"
+ "<BlockList>"
+ " <Latest>%s</Latest>"
+ "</BlockList>",
+ blockid);
+
+ /* Create HTTP client context */
+ c = flb_http_client(u_conn, FLB_HTTP_PUT,
+ uri,
+ payload, flb_sds_len(payload), NULL, 0, NULL, 0);
+ if (!c) {
+ flb_plg_error(ctx->ins, "cannot create HTTP client context");
+ flb_sds_destroy(payload);
+ flb_sds_destroy(uri);
+ flb_upstream_conn_release(u_conn);
+ return FLB_RETRY;
+ }
+
+ /* Prepare headers and authentication */
+ azb_http_client_setup(ctx, c, flb_sds_len(payload),
+ FLB_FALSE,
+ AZURE_BLOB_CT_NONE, AZURE_BLOB_CE_NONE);
+
+ /* Send HTTP request */
+ ret = flb_http_do(c, &b_sent);
+ flb_sds_destroy(uri);
+ flb_sds_destroy(payload);
+
+ /* Validate HTTP status */
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "error sending append_blob");
+ return FLB_RETRY;
+ }
+
+ if (c->resp.status == 201) {
+ flb_plg_info(ctx->ins, "blob id %s committed successfully", blockid);
+ flb_http_client_destroy(c);
+ flb_upstream_conn_release(u_conn);
+ return FLB_OK;
+ }
+ else if (c->resp.status == 404) {
+ flb_plg_info(ctx->ins, "blob not found: %s", c->uri);
+ flb_http_client_destroy(c);
+ flb_upstream_conn_release(u_conn);
+ return FLB_RETRY;
+ }
+ else if (c->resp.payload_size > 0) {
+ flb_plg_error(ctx->ins, "cannot commit blob id %s\n%s",
+ blockid, c->resp.payload);
+ if (strstr(c->resp.payload, "must be 0 for Create Append")) {
+ flb_http_client_destroy(c);
+ flb_upstream_conn_release(u_conn);
+ return FLB_RETRY;
+ }
+ }
+ else {
+ flb_plg_error(ctx->ins, "cannot append content to blob");
+ }
+ flb_http_client_destroy(c);
+ flb_upstream_conn_release(u_conn);
+
+ return FLB_OK;
+}