summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/out_s3
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:08:03 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:08:18 +0000
commit5da14042f70711ea5cf66e034699730335462f66 (patch)
tree0f6354ccac934ed87a2d555f45be4c831cf92f4a /src/fluent-bit/plugins/out_s3
parentReleasing debian version 1.44.3-2. (diff)
downloadnetdata-5da14042f70711ea5cf66e034699730335462f66.tar.xz
netdata-5da14042f70711ea5cf66e034699730335462f66.zip
Merging upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/fluent-bit/plugins/out_s3')
-rw-r--r--src/fluent-bit/plugins/out_s3/CMakeLists.txt6
-rw-r--r--src/fluent-bit/plugins/out_s3/s3.c2500
-rw-r--r--src/fluent-bit/plugins/out_s3/s3.h203
-rw-r--r--src/fluent-bit/plugins/out_s3/s3_multipart.c707
-rw-r--r--src/fluent-bit/plugins/out_s3/s3_store.c543
-rw-r--r--src/fluent-bit/plugins/out_s3/s3_store.h68
6 files changed, 4027 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/out_s3/CMakeLists.txt b/src/fluent-bit/plugins/out_s3/CMakeLists.txt
new file mode 100644
index 000000000..94e048617
--- /dev/null
+++ b/src/fluent-bit/plugins/out_s3/CMakeLists.txt
@@ -0,0 +1,6 @@
+set(src
+ s3.c
+ s3_store.c
+ s3_multipart.c)
+
+FLB_PLUGIN(out_s3 "${src}" "")
diff --git a/src/fluent-bit/plugins/out_s3/s3.c b/src/fluent-bit/plugins/out_s3/s3.c
new file mode 100644
index 000000000..57e68e6ef
--- /dev/null
+++ b/src/fluent-bit/plugins/out_s3/s3.c
@@ -0,0 +1,2500 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_slist.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_config_map.h>
+#include <fluent-bit/flb_aws_util.h>
+#include <fluent-bit/aws/flb_aws_compress.h>
+#include <fluent-bit/flb_hash.h>
+#include <fluent-bit/flb_crypto.h>
+#include <fluent-bit/flb_signv4.h>
+#include <fluent-bit/flb_scheduler.h>
+#include <fluent-bit/flb_gzip.h>
+#include <fluent-bit/flb_base64.h>
+#include <fluent-bit/flb_log_event_decoder.h>
+#include <stdlib.h>
+#include <sys/stat.h>
+
+#include <msgpack.h>
+
+#include "s3.h"
+#include "s3_store.h"
+
+#define DEFAULT_S3_PORT 443
+#define DEFAULT_S3_INSECURE_PORT 80
+
+static int construct_request_buffer(struct flb_s3 *ctx, flb_sds_t new_data,
+ struct s3_file *chunk,
+ char **out_buf, size_t *out_size);
+
+static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t file_first_log_time,
+ char *body, size_t body_size);
+
+static int put_all_chunks(struct flb_s3 *ctx);
+
+static void cb_s3_upload(struct flb_config *ctx, void *data);
+
+static struct multipart_upload *get_upload(struct flb_s3 *ctx,
+ const char *tag, int tag_len);
+
+static struct multipart_upload *create_upload(struct flb_s3 *ctx,
+ const char *tag, int tag_len,
+ time_t file_first_log_time);
+
+static void remove_from_queue(struct upload_queue *entry);
+
+static struct flb_aws_header content_encoding_header = {
+ .key = "Content-Encoding",
+ .key_len = 16,
+ .val = "gzip",
+ .val_len = 4,
+};
+
+static struct flb_aws_header content_type_header = {
+ .key = "Content-Type",
+ .key_len = 12,
+ .val = "",
+ .val_len = 0,
+};
+
+static struct flb_aws_header canned_acl_header = {
+ .key = "x-amz-acl",
+ .key_len = 9,
+ .val = "",
+ .val_len = 0,
+};
+
+static struct flb_aws_header content_md5_header = {
+ .key = "Content-MD5",
+ .key_len = 11,
+ .val = "",
+ .val_len = 0,
+};
+
+static struct flb_aws_header storage_class_header = {
+ .key = "x-amz-storage-class",
+ .key_len = 19,
+ .val = "",
+ .val_len = 0,
+};
+
+static char *mock_error_response(char *error_env_var)
+{
+ char *err_val = NULL;
+ char *error = NULL;
+ int len = 0;
+
+ err_val = getenv(error_env_var);
+ if (err_val != NULL && strlen(err_val) > 0) {
+ error = flb_calloc(strlen(err_val) + 1, sizeof(char));
+ if (error == NULL) {
+ flb_errno();
+ return NULL;
+ }
+
+ len = strlen(err_val);
+ memcpy(error, err_val, len);
+ error[len] = '\0';
+ return error;
+ }
+
+ return NULL;
+}
+
+int s3_plugin_under_test()
+{
+ if (getenv("FLB_S3_PLUGIN_UNDER_TEST") != NULL) {
+ return FLB_TRUE;
+ }
+
+ return FLB_FALSE;
+}
+
+int create_headers(struct flb_s3 *ctx, char *body_md5,
+ struct flb_aws_header **headers, int *num_headers,
+ int multipart_upload)
+{
+ int n = 0;
+ int headers_len = 0;
+ struct flb_aws_header *s3_headers = NULL;
+
+ if (ctx->content_type != NULL) {
+ headers_len++;
+ }
+ if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
+ headers_len++;
+ }
+ if (ctx->canned_acl != NULL) {
+ headers_len++;
+ }
+ if (body_md5 != NULL && strlen(body_md5) && multipart_upload == FLB_FALSE) {
+ headers_len++;
+ }
+ if (ctx->storage_class != NULL) {
+ headers_len++;
+ }
+ if (headers_len == 0) {
+ *num_headers = headers_len;
+ *headers = s3_headers;
+ return 0;
+ }
+
+ s3_headers = flb_calloc(headers_len, sizeof(struct flb_aws_header));
+ if (s3_headers == NULL) {
+ flb_errno();
+ return -1;
+ }
+
+ if (ctx->content_type != NULL) {
+ s3_headers[n] = content_type_header;
+ s3_headers[n].val = ctx->content_type;
+ s3_headers[n].val_len = strlen(ctx->content_type);
+ n++;
+ }
+ if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
+ s3_headers[n] = content_encoding_header;
+ n++;
+ }
+ if (ctx->canned_acl != NULL) {
+ s3_headers[n] = canned_acl_header;
+ s3_headers[n].val = ctx->canned_acl;
+ s3_headers[n].val_len = strlen(ctx->canned_acl);
+ n++;
+ }
+ if (body_md5 != NULL && strlen(body_md5) && multipart_upload == FLB_FALSE) {
+ s3_headers[n] = content_md5_header;
+ s3_headers[n].val = body_md5;
+ s3_headers[n].val_len = strlen(body_md5);
+ n++;
+ }
+ if (ctx->storage_class != NULL) {
+ s3_headers[n] = storage_class_header;
+ s3_headers[n].val = ctx->storage_class;
+ s3_headers[n].val_len = strlen(ctx->storage_class);
+ }
+
+ *num_headers = headers_len;
+ *headers = s3_headers;
+ return 0;
+};
+
+struct flb_http_client *mock_s3_call(char *error_env_var, char *api)
+{
+ /* create an http client so that we can set the response */
+ struct flb_http_client *c = NULL;
+ char *error = mock_error_response(error_env_var);
+ char *resp;
+ int len;
+
+ c = flb_calloc(1, sizeof(struct flb_http_client));
+ if (!c) {
+ flb_errno();
+ flb_free(error);
+ return NULL;
+ }
+ mk_list_init(&c->headers);
+
+ if (error != NULL) {
+ c->resp.status = 400;
+ /* resp.data is freed on destroy, payload is supposed to reference it */
+ c->resp.data = error;
+ c->resp.payload = c->resp.data;
+ c->resp.payload_size = strlen(error);
+ }
+ else {
+ c->resp.status = 200;
+ c->resp.payload = "";
+ c->resp.payload_size = 0;
+ if (strcmp(api, "CreateMultipartUpload") == 0) {
+ /* mocked success response */
+ c->resp.payload = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
+ "<InitiateMultipartUploadResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\n"
+ "<Bucket>example-bucket</Bucket>\n"
+ "<Key>example-object</Key>\n"
+ "<UploadId>VXBsb2FkIElEIGZvciA2aWWpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZA</UploadId>\n"
+ "</InitiateMultipartUploadResult>";
+ c->resp.payload_size = strlen(c->resp.payload);
+ }
+ else if (strcmp(api, "UploadPart") == 0) {
+ /* mocked success response */
+ resp = "Date: Mon, 1 Nov 2010 20:34:56 GMT\n"
+ "ETag: \"b54357faf0632cce46e942fa68356b38\"\n"
+ "Content-Length: 0\n"
+ "Connection: keep-alive\n"
+ "Server: AmazonS3";
+ /* since etag is in the headers, this code uses resp.data */
+ len = strlen(resp);
+ c->resp.data = flb_calloc(len + 1, sizeof(char));
+ if (!c->resp.data) {
+ flb_errno();
+ return NULL;
+ }
+ memcpy(c->resp.data, resp, len);
+ c->resp.data[len] = '\0';
+ c->resp.data_size = len;
+ }
+ else {
+ c->resp.payload = "";
+ c->resp.payload_size = 0;
+ }
+ }
+
+ return c;
+}
+
+static flb_sds_t concat_path(char *p1, char *p2)
+{
+ flb_sds_t dir;
+ flb_sds_t tmp;
+
+ dir = flb_sds_create_size(64);
+
+ tmp = flb_sds_printf(&dir, "%s/%s", p1, p2);
+ if (!tmp) {
+ flb_errno();
+ flb_sds_destroy(dir);
+ return NULL;
+ }
+ dir = tmp;
+
+ return dir;
+}
+
+/* Reads in index value from metadata file and sets seq_index to value */
+static int read_seq_index(char *seq_index_file, uint64_t *seq_index)
+{
+ FILE *fp;
+ int ret;
+
+ fp = fopen(seq_index_file, "r");
+ if (fp == NULL) {
+ flb_errno();
+ return -1;
+ }
+
+ ret = fscanf(fp, "%"PRIu64, seq_index);
+ if (ret != 1) {
+ fclose(fp);
+ flb_errno();
+ return -1;
+ }
+
+ fclose(fp);
+ return 0;
+}
+
+/* Writes index value to metadata file */
+static int write_seq_index(char *seq_index_file, uint64_t seq_index)
+{
+ FILE *fp;
+ int ret;
+
+ fp = fopen(seq_index_file, "w+");
+ if (fp == NULL) {
+ flb_errno();
+ return -1;
+ }
+
+ ret = fprintf(fp, "%"PRIu64, seq_index);
+ if (ret < 0) {
+ fclose(fp);
+ flb_errno();
+ return -1;
+ }
+
+ fclose(fp);
+ return 0;
+}
+
+static int init_seq_index(void *context) {
+ int ret;
+ const char *tmp;
+ char tmp_buf[1024];
+ struct flb_s3 *ctx = context;
+
+ ctx->key_fmt_has_seq_index = FLB_TRUE;
+
+ ctx->stream_metadata = flb_fstore_stream_create(ctx->fs, "sequence");
+ if (!ctx->stream_metadata) {
+ flb_plg_error(ctx->ins, "could not initialize metadata stream");
+ flb_fstore_destroy(ctx->fs);
+ ctx->fs = NULL;
+ return -1;
+ }
+
+ /* Construct directories and file path names */
+ ctx->metadata_dir = flb_sds_create(ctx->stream_metadata->path);
+ if (ctx->metadata_dir == NULL) {
+ flb_plg_error(ctx->ins, "Failed to create metadata path");
+ flb_errno();
+ return -1;
+ }
+ tmp = "/index_metadata";
+ ret = flb_sds_cat_safe(&ctx->metadata_dir, tmp, strlen(tmp));
+ if (ret < 0) {
+ flb_plg_error(ctx->ins, "Failed to create metadata path");
+ flb_errno();
+ return -1;
+ }
+
+ ctx->seq_index_file = flb_sds_create(ctx->metadata_dir);
+ if (ctx->seq_index_file == NULL) {
+ flb_plg_error(ctx->ins, "Failed to create sequential index file path");
+ flb_errno();
+ return -1;
+ }
+ tmp = "/seq_index_";
+ ret = flb_sds_cat_safe(&ctx->seq_index_file, tmp, strlen(tmp));
+ if (ret < 0) {
+ flb_plg_error(ctx->ins, "Failed to create sequential index file path");
+ flb_errno();
+ return -1;
+ }
+
+ sprintf(tmp_buf, "%d", ctx->ins->id);
+ ret = flb_sds_cat_safe(&ctx->seq_index_file, tmp_buf, strlen(tmp_buf));
+ if (ret < 0) {
+ flb_plg_error(ctx->ins, "Failed to create sequential index file path");
+ flb_errno();
+ return -1;
+ }
+
+ /* Create directory path if it doesn't exist */
+ ret = mkdir(ctx->metadata_dir, 0700);
+ if (ret < 0 && errno != EEXIST) {
+ flb_plg_error(ctx->ins, "Failed to create metadata directory");
+ return -1;
+ }
+
+ /* Check if index file doesn't exist and set index value */
+ if (access(ctx->seq_index_file, F_OK) != 0) {
+ ctx->seq_index = 0;
+ ret = write_seq_index(ctx->seq_index_file, ctx->seq_index);
+ if (ret < 0) {
+ flb_plg_error(ctx->ins, "Failed to write to sequential index metadata file");
+ return -1;
+ }
+ }
+ else {
+ ret = read_seq_index(ctx->seq_index_file, &ctx->seq_index);
+ if (ret < 0) {
+ flb_plg_error(ctx->ins, "Failed to read from sequential index "
+ "metadata file");
+ return -1;
+ }
+ flb_plg_info(ctx->ins, "Successfully recovered index. "
+ "Continuing at index=%"PRIu64, ctx->seq_index);
+ }
+ return 0;
+}
+
+void multipart_upload_destroy(struct multipart_upload *m_upload)
+{
+ int i;
+ flb_sds_t etag;
+
+ if (!m_upload) {
+ return;
+ }
+
+ if (m_upload->s3_key) {
+ flb_sds_destroy(m_upload->s3_key);
+ }
+ if (m_upload->tag) {
+ flb_sds_destroy(m_upload->tag);
+ }
+ if (m_upload->upload_id) {
+ flb_sds_destroy(m_upload->upload_id);
+ }
+
+ for (i = 0; i < m_upload->part_number; i++) {
+ etag = m_upload->etags[i];
+ if (etag) {
+ flb_sds_destroy(etag);
+ }
+ }
+
+ flb_free(m_upload);
+}
+
+static void s3_context_destroy(struct flb_s3 *ctx)
+{
+ struct mk_list *head;
+ struct mk_list *tmp;
+ struct multipart_upload *m_upload;
+ struct upload_queue *upload_contents;
+
+ if (!ctx) {
+ return;
+ }
+
+ if (ctx->base_provider) {
+ flb_aws_provider_destroy(ctx->base_provider);
+ }
+
+ if (ctx->provider) {
+ flb_aws_provider_destroy(ctx->provider);
+ }
+
+ if (ctx->provider_tls) {
+ flb_tls_destroy(ctx->provider_tls);
+ }
+
+ if (ctx->sts_provider_tls) {
+ flb_tls_destroy(ctx->sts_provider_tls);
+ }
+
+ if (ctx->s3_client) {
+ flb_aws_client_destroy(ctx->s3_client);
+ }
+
+ if (ctx->client_tls) {
+ flb_tls_destroy(ctx->client_tls);
+ }
+
+ if (ctx->free_endpoint == FLB_TRUE) {
+ flb_free(ctx->endpoint);
+ }
+
+ if (ctx->buffer_dir) {
+ flb_sds_destroy(ctx->buffer_dir);
+ }
+
+ if (ctx->metadata_dir) {
+ flb_sds_destroy(ctx->metadata_dir);
+ }
+
+ if (ctx->seq_index_file) {
+ flb_sds_destroy(ctx->seq_index_file);
+ }
+
+ /* Remove uploads */
+ mk_list_foreach_safe(head, tmp, &ctx->uploads) {
+ m_upload = mk_list_entry(head, struct multipart_upload, _head);
+ mk_list_del(&m_upload->_head);
+ multipart_upload_destroy(m_upload);
+ }
+
+ mk_list_foreach_safe(head, tmp, &ctx->upload_queue) {
+ upload_contents = mk_list_entry(head, struct upload_queue, _head);
+ s3_store_file_delete(ctx, upload_contents->upload_file);
+ multipart_upload_destroy(upload_contents->m_upload_file);
+ remove_from_queue(upload_contents);
+ }
+
+ flb_free(ctx);
+}
+
+static int cb_s3_init(struct flb_output_instance *ins,
+ struct flb_config *config, void *data)
+{
+ int ret;
+ flb_sds_t tmp_sds;
+ char *role_arn = NULL;
+ char *session_name;
+ const char *tmp;
+ struct flb_s3 *ctx = NULL;
+ struct flb_aws_client_generator *generator;
+ (void) config;
+ (void) data;
+ char *ep;
+ struct flb_split_entry *tok;
+ struct mk_list *split;
+ int list_size;
+
+ ctx = flb_calloc(1, sizeof(struct flb_s3));
+ if (!ctx) {
+ flb_errno();
+ return -1;
+ }
+ ctx->ins = ins;
+ mk_list_init(&ctx->uploads);
+ mk_list_init(&ctx->upload_queue);
+
+ ctx->retry_time = 0;
+ ctx->upload_queue_success = FLB_FALSE;
+
+ /* Export context */
+ flb_output_set_context(ins, ctx);
+
+ /* initialize config map */
+ ret = flb_output_config_map_set(ins, (void *) ctx);
+ if (ret == -1) {
+ return -1;
+ }
+
+ /* the check against -1 is works here because size_t is unsigned
+ * and (int) -1 == unsigned max value
+ * Fluent Bit uses -1 (which becomes max value) to indicate undefined
+ */
+ if (ctx->ins->total_limit_size != -1) {
+ flb_plg_warn(ctx->ins, "Please use 'store_dir_limit_size' with s3 output instead of 'storage.total_limit_size'. "
+ "S3 has its own buffer files located in the store_dir.");
+ }
+
+ /* Date key */
+ ctx->date_key = ctx->json_date_key;
+ tmp = flb_output_get_property("json_date_key", ins);
+ if (tmp) {
+ /* Just check if we have to disable it */
+ if (flb_utils_bool(tmp) == FLB_FALSE) {
+ ctx->date_key = NULL;
+ }
+ }
+
+ /* Date format for JSON output */
+ ctx->json_date_format = FLB_PACK_JSON_DATE_ISO8601;
+ tmp = flb_output_get_property("json_date_format", ins);
+ if (tmp) {
+ ret = flb_pack_to_json_date_type(tmp);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "invalid json_date_format '%s'. ", tmp);
+ return -1;
+ }
+ else {
+ ctx->json_date_format = ret;
+ }
+ }
+
+ tmp = flb_output_get_property("bucket", ins);
+ if (!tmp) {
+ flb_plg_error(ctx->ins, "'bucket' is a required parameter");
+ return -1;
+ }
+
+ /*
+ * store_dir is the user input, buffer_dir is what the code uses
+ * We append the bucket name to the dir, to support multiple instances
+ * of this plugin using the same buffer dir
+ */
+ tmp_sds = concat_path(ctx->store_dir, ctx->bucket);
+ if (!tmp_sds) {
+ flb_plg_error(ctx->ins, "Could not construct buffer path");
+ return -1;
+ }
+ ctx->buffer_dir = tmp_sds;
+
+ /* Initialize local storage */
+ ret = s3_store_init(ctx);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "Failed to initialize S3 storage: %s",
+ ctx->store_dir);
+ return -1;
+ }
+
+ tmp = flb_output_get_property("s3_key_format", ins);
+ if (tmp) {
+ if (tmp[0] != '/') {
+ flb_plg_error(ctx->ins, "'s3_key_format' must start with a '/'");
+ return -1;
+ }
+ if (strstr((char *) tmp, "$INDEX")) {
+ ret = init_seq_index(ctx);
+ if (ret < 0) {
+ return -1;
+ }
+ }
+ if (strstr((char *) tmp, "$UUID")) {
+ ctx->key_fmt_has_uuid = FLB_TRUE;
+ }
+ }
+
+ /* validate 'total_file_size' */
+ if (ctx->file_size <= 0) {
+ flb_plg_error(ctx->ins, "Failed to parse total_file_size %s", tmp);
+ return -1;
+ }
+ if (ctx->file_size < 1000000) {
+ flb_plg_error(ctx->ins, "total_file_size must be at least 1MB");
+ return -1;
+ }
+ if (ctx->file_size > MAX_FILE_SIZE) {
+ flb_plg_error(ctx->ins, "Max total_file_size is %s bytes", MAX_FILE_SIZE_STR);
+ return -1;
+ }
+ flb_plg_info(ctx->ins, "Using upload size %lu bytes", ctx->file_size);
+
+ if (ctx->use_put_object == FLB_FALSE && ctx->file_size < 2 * MIN_CHUNKED_UPLOAD_SIZE) {
+ flb_plg_info(ctx->ins,
+ "total_file_size is less than 10 MB, will use PutObject API");
+ ctx->use_put_object = FLB_TRUE;
+ }
+
+ tmp = flb_output_get_property("compression", ins);
+ if (tmp) {
+ ret = flb_aws_compression_get_type(tmp);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "unknown compression: %s", tmp);
+ return -1;
+ }
+ if (ctx->use_put_object == FLB_FALSE && ctx->compression == FLB_AWS_COMPRESS_ARROW) {
+ flb_plg_error(ctx->ins,
+ "use_put_object must be enabled when Apache Arrow is enabled");
+ return -1;
+ }
+ ctx->compression = ret;
+ }
+
+ tmp = flb_output_get_property("content_type", ins);
+ if (tmp) {
+ ctx->content_type = (char *) tmp;
+ }
+ if (ctx->use_put_object == FLB_FALSE) {
+ /* upload_chunk_size */
+ if (ctx->upload_chunk_size <= 0) {
+ flb_plg_error(ctx->ins, "Failed to parse upload_chunk_size %s", tmp);
+ return -1;
+ }
+ if (ctx->upload_chunk_size > ctx->file_size) {
+ flb_plg_error(ctx->ins,
+ "upload_chunk_size can not be larger than total_file_size");
+ return -1;
+ }
+ if (ctx->upload_chunk_size < MIN_CHUNKED_UPLOAD_SIZE) {
+ flb_plg_error(ctx->ins, "upload_chunk_size must be at least 5,242,880 bytes");
+ return -1;
+ }
+ if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
+ if(ctx->upload_chunk_size > MAX_CHUNKED_UPLOAD_COMPRESS_SIZE) {
+ flb_plg_error(ctx->ins, "upload_chunk_size in compressed multipart upload cannot exceed 5GB");
+ return -1;
+ }
+ } else {
+ if (ctx->upload_chunk_size > MAX_CHUNKED_UPLOAD_SIZE) {
+ flb_plg_error(ctx->ins, "Max upload_chunk_size is 50MB");
+ return -1;
+ }
+ }
+ }
+
+ if (ctx->upload_chunk_size != MIN_CHUNKED_UPLOAD_SIZE &&
+ (ctx->upload_chunk_size * 2) > ctx->file_size) {
+ flb_plg_error(ctx->ins, "total_file_size is less than 2x upload_chunk_size");
+ return -1;
+ }
+
+ if (ctx->use_put_object == FLB_TRUE) {
+ /*
+ * code internally uses 'upload_chunk_size' as the unit for each Put,
+ * regardless of which API is used to send data
+ */
+ ctx->upload_chunk_size = ctx->file_size;
+ if (ctx->file_size > MAX_FILE_SIZE_PUT_OBJECT) {
+ flb_plg_error(ctx->ins, "Max total_file_size is 50M when use_put_object is enabled");
+ return -1;
+ }
+ }
+
+ tmp = flb_output_get_property("endpoint", ins);
+ if (tmp) {
+ ctx->insecure = strncmp(tmp, "http://", 7) == 0 ? FLB_TRUE : FLB_FALSE;
+ if (ctx->insecure == FLB_TRUE) {
+ ep = removeProtocol((char *) tmp, "http://");
+ }
+ else {
+ ep = removeProtocol((char *) tmp, "https://");
+ }
+
+ split = flb_utils_split((const char *)ep, ':', 1);
+ if (!split) {
+ flb_errno();
+ return -1;
+ }
+ list_size = mk_list_size(split);
+ if (list_size > 2) {
+ flb_plg_error(ctx->ins, "Failed to split endpoint");
+ flb_utils_split_free(split);
+ return -1;
+ }
+
+ tok = mk_list_entry_first(split, struct flb_split_entry, _head);
+ ctx->endpoint = flb_strndup(tok->value, tok->len);
+ if (!ctx->endpoint) {
+ flb_errno();
+ flb_utils_split_free(split);
+ return -1;
+ }
+ ctx->free_endpoint = FLB_TRUE;
+ if (list_size == 2) {
+ tok = mk_list_entry_next(&tok->_head, struct flb_split_entry, _head, split);
+ ctx->port = atoi(tok->value);
+ }
+ else {
+ ctx->port = ctx->insecure == FLB_TRUE ? DEFAULT_S3_INSECURE_PORT : DEFAULT_S3_PORT;
+ }
+ flb_utils_split_free(split);
+ }
+ else {
+ /* default endpoint for the given region */
+ ctx->endpoint = flb_aws_endpoint("s3", ctx->region);
+ ctx->insecure = FLB_FALSE;
+ ctx->port = DEFAULT_S3_PORT;
+ ctx->free_endpoint = FLB_TRUE;
+ if (!ctx->endpoint) {
+ flb_plg_error(ctx->ins, "Could not construct S3 endpoint");
+ return -1;
+ }
+ }
+
+ tmp = flb_output_get_property("sts_endpoint", ins);
+ if (tmp) {
+ ctx->sts_endpoint = (char *) tmp;
+ }
+
+ tmp = flb_output_get_property("canned_acl", ins);
+ if (tmp) {
+ ctx->canned_acl = (char *) tmp;
+ }
+
+ tmp = flb_output_get_property("storage_class", ins);
+ if (tmp) {
+ ctx->storage_class = (char *) tmp;
+ }
+
+ if (ctx->insecure == FLB_FALSE) {
+ ctx->client_tls = flb_tls_create(FLB_TLS_CLIENT_MODE,
+ ins->tls_verify,
+ ins->tls_debug,
+ ins->tls_vhost,
+ ins->tls_ca_path,
+ ins->tls_ca_file,
+ ins->tls_crt_file,
+ ins->tls_key_file,
+ ins->tls_key_passwd);
+ if (!ctx->client_tls) {
+ flb_plg_error(ctx->ins, "Failed to create tls context");
+ return -1;
+ }
+ }
+
+ /* AWS provider needs a separate TLS instance */
+ ctx->provider_tls = flb_tls_create(FLB_TLS_CLIENT_MODE,
+ FLB_TRUE,
+ ins->tls_debug,
+ ins->tls_vhost,
+ ins->tls_ca_path,
+ ins->tls_ca_file,
+ ins->tls_crt_file,
+ ins->tls_key_file,
+ ins->tls_key_passwd);
+ if (!ctx->provider_tls) {
+ flb_errno();
+ return -1;
+ }
+
+ ctx->provider = flb_standard_chain_provider_create(config,
+ ctx->provider_tls,
+ ctx->region,
+ ctx->sts_endpoint,
+ NULL,
+ flb_aws_client_generator(),
+ ctx->profile);
+
+ if (!ctx->provider) {
+ flb_plg_error(ctx->ins, "Failed to create AWS Credential Provider");
+ return -1;
+ }
+
+ tmp = flb_output_get_property("role_arn", ins);
+ if (tmp) {
+ /* Use the STS Provider */
+ ctx->base_provider = ctx->provider;
+ role_arn = (char *) tmp;
+
+ /* STS provider needs yet another separate TLS instance */
+ ctx->sts_provider_tls = flb_tls_create(FLB_TLS_CLIENT_MODE,
+ FLB_TRUE,
+ ins->tls_debug,
+ ins->tls_vhost,
+ ins->tls_ca_path,
+ ins->tls_ca_file,
+ ins->tls_crt_file,
+ ins->tls_key_file,
+ ins->tls_key_passwd);
+
+ if (!ctx->sts_provider_tls) {
+ flb_errno();
+ return -1;
+ }
+
+ session_name = flb_sts_session_name();
+ if (!session_name) {
+ flb_plg_error(ctx->ins, "Failed to create aws iam role "
+ "session name");
+ flb_errno();
+ return -1;
+ }
+
+ ctx->provider = flb_sts_provider_create(config,
+ ctx->sts_provider_tls,
+ ctx->base_provider,
+ ctx->external_id,
+ role_arn,
+ session_name,
+ ctx->region,
+ ctx->sts_endpoint,
+ NULL,
+ flb_aws_client_generator());
+ flb_free(session_name);
+ if (!ctx->provider) {
+ flb_plg_error(ctx->ins, "Failed to create AWS STS Credential "
+ "Provider");
+ return -1;
+ }
+ }
+
+ /* read any remaining buffers from previous (failed) executions */
+ ctx->has_old_buffers = s3_store_has_data(ctx);
+ ctx->has_old_uploads = s3_store_has_uploads(ctx);
+
+ /* Multipart */
+ multipart_read_uploads_from_fs(ctx);
+
+ if (mk_list_size(&ctx->uploads) > 0) {
+ /* note that these should be sent */
+ ctx->has_old_uploads = FLB_TRUE;
+ }
+
+ /* create S3 client */
+ generator = flb_aws_client_generator();
+ ctx->s3_client = generator->create();
+ if (!ctx->s3_client) {
+ return -1;
+ }
+ ctx->s3_client->name = "s3_client";
+ ctx->s3_client->has_auth = FLB_TRUE;
+ ctx->s3_client->provider = ctx->provider;
+ ctx->s3_client->region = ctx->region;
+ ctx->s3_client->service = "s3";
+ ctx->s3_client->port = ctx->port;
+ ctx->s3_client->flags = 0;
+ ctx->s3_client->proxy = NULL;
+ ctx->s3_client->s3_mode = S3_MODE_SIGNED_PAYLOAD;
+ ctx->s3_client->retry_requests = ctx->retry_requests;
+
+ if (ctx->insecure == FLB_TRUE) {
+ ctx->s3_client->upstream = flb_upstream_create(config, ctx->endpoint, ctx->port,
+ FLB_IO_TCP, NULL);
+ } else {
+ ctx->s3_client->upstream = flb_upstream_create(config, ctx->endpoint, ctx->port,
+ FLB_IO_TLS, ctx->client_tls);
+ }
+ if (!ctx->s3_client->upstream) {
+ flb_plg_error(ctx->ins, "Connection initialization error");
+ return -1;
+ }
+
+ flb_output_upstream_set(ctx->s3_client->upstream, ctx->ins);
+
+ ctx->s3_client->host = ctx->endpoint;
+
+ /* set to sync mode and initialize credentials */
+ ctx->provider->provider_vtable->sync(ctx->provider);
+ ctx->provider->provider_vtable->init(ctx->provider);
+
+ ctx->timer_created = FLB_FALSE;
+ ctx->timer_ms = (int) (ctx->upload_timeout / 6) * 1000;
+ if (ctx->timer_ms > UPLOAD_TIMER_MAX_WAIT) {
+ ctx->timer_ms = UPLOAD_TIMER_MAX_WAIT;
+ }
+ else if (ctx->timer_ms < UPLOAD_TIMER_MIN_WAIT) {
+ ctx->timer_ms = UPLOAD_TIMER_MIN_WAIT;
+ }
+
+ /*
+ * S3 must ALWAYS use sync mode
+ * In the timer thread we do a mk_list_foreach_safe on the queue of uplaods and chunks
+ * Iterating over those lists is not concurrent safe. If a flush call ran at the same time
+ * And deleted an item from the list, this could cause a crash/corruption.
+ */
+ flb_stream_disable_async_mode(&ctx->s3_client->upstream->base);
+
+ /* clean up any old buffers found on startup */
+ if (ctx->has_old_buffers == FLB_TRUE) {
+ flb_plg_info(ctx->ins,
+ "Sending locally buffered data from previous "
+ "executions to S3; buffer=%s",
+ ctx->fs->root_path);
+ ctx->has_old_buffers = FLB_FALSE;
+ ret = put_all_chunks(ctx);
+ if (ret < 0) {
+ ctx->has_old_buffers = FLB_TRUE;
+ flb_plg_error(ctx->ins,
+ "Failed to send locally buffered data left over "
+ "from previous executions; will retry. Buffer=%s",
+ ctx->fs->root_path);
+ }
+ }
+
+ /* clean up any old uploads found on start up */
+ if (ctx->has_old_uploads == FLB_TRUE) {
+ flb_plg_info(ctx->ins,
+ "Completing multipart uploads from previous "
+ "executions to S3; buffer=%s",
+ ctx->stream_upload->path);
+ ctx->has_old_uploads = FLB_FALSE;
+
+ /*
+ * we don't need to worry if this fails; it will retry each
+ * time the upload callback is called
+ */
+ cb_s3_upload(config, ctx);
+ }
+
+ /* this is done last since in the previous block we make calls to AWS */
+ ctx->provider->provider_vtable->upstream_set(ctx->provider, ctx->ins);
+
+ return 0;
+}
+
+/*
+ * return value is one of FLB_OK, FLB_RETRY, FLB_ERROR
+ *
+ * Chunk is allowed to be NULL
+ */
+static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
+ struct multipart_upload *m_upload,
+ char *body, size_t body_size,
+ const char *tag, int tag_len)
+{
+ int init_upload = FLB_FALSE;
+ int complete_upload = FLB_FALSE;
+ int size_check = FLB_FALSE;
+ int part_num_check = FLB_FALSE;
+ int timeout_check = FLB_FALSE;
+ int ret;
+ void *payload_buf = NULL;
+ size_t payload_size = 0;
+ size_t preCompress_size = 0;
+ time_t file_first_log_time = time(NULL);
+
+ /*
+ * When chunk does not exist, file_first_log_time will be the current time.
+ * This is only for unit tests and prevents unit tests from segfaulting when chunk is
+ * NULL because if so chunk->first_log_time will be NULl either and will cause
+ * segfault during the process of put_object upload or mutipart upload.
+ */
+ if (chunk != NULL) {
+ file_first_log_time = chunk->first_log_time;
+ }
+
+ if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
+ /* Map payload */
+ ret = flb_aws_compression_compress(ctx->compression, body, body_size, &payload_buf, &payload_size);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "Failed to compress data");
+ return FLB_RETRY;
+ } else {
+ preCompress_size = body_size;
+ body = (void *) payload_buf;
+ body_size = payload_size;
+ }
+ }
+
+ if (ctx->use_put_object == FLB_TRUE) {
+ goto put_object;
+ }
+
+ if (s3_plugin_under_test() == FLB_TRUE) {
+ init_upload = FLB_TRUE;
+ complete_upload = FLB_TRUE;
+ if (ctx->use_put_object == FLB_TRUE) {
+ goto put_object;
+ }
+ else {
+ goto multipart;
+ }
+ }
+
+ if (m_upload == NULL) {
+ if (chunk != NULL && time(NULL) >
+ (chunk->create_time + ctx->upload_timeout + ctx->retry_time)) {
+ /* timeout already reached, just PutObject */
+ goto put_object;
+ }
+ else if (body_size >= ctx->file_size) {
+ /* already big enough, just use PutObject API */
+ goto put_object;
+ }
+ else if(body_size > MIN_CHUNKED_UPLOAD_SIZE) {
+ init_upload = FLB_TRUE;
+ goto multipart;
+ }
+ else {
+ if (ctx->use_put_object == FLB_FALSE && ctx->compression == FLB_AWS_COMPRESS_GZIP) {
+ flb_plg_info(ctx->ins, "Pre-compression upload_chunk_size= %zu, After compression, chunk is only %zu bytes, "
+ "the chunk was too small, using PutObject to upload", preCompress_size, body_size);
+ }
+ goto put_object;
+ }
+ }
+ else {
+ /* existing upload */
+ if (body_size < MIN_CHUNKED_UPLOAD_SIZE) {
+ complete_upload = FLB_TRUE;
+ }
+
+ goto multipart;
+ }
+
+put_object:
+
+ /*
+ * remove chunk from buffer list
+ */
+ ret = s3_put_object(ctx, tag, file_first_log_time, body, body_size);
+ if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
+ flb_free(payload_buf);
+ }
+ if (ret < 0) {
+ /* re-add chunk to list */
+ if (chunk) {
+ s3_store_file_unlock(chunk);
+ chunk->failures += 1;
+ }
+ return FLB_RETRY;
+ }
+
+ /* data was sent successfully- delete the local buffer */
+ if (chunk) {
+ s3_store_file_delete(ctx, chunk);
+ }
+ return FLB_OK;
+
+multipart:
+
+ if (init_upload == FLB_TRUE) {
+ m_upload = create_upload(ctx, tag, tag_len, file_first_log_time);
+ if (!m_upload) {
+ flb_plg_error(ctx->ins, "Could not find or create upload for tag %s", tag);
+ if (chunk) {
+ s3_store_file_unlock(chunk);
+ }
+ if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
+ flb_free(payload_buf);
+ }
+ return FLB_RETRY;
+ }
+ }
+
+ if (m_upload->upload_state == MULTIPART_UPLOAD_STATE_NOT_CREATED) {
+ ret = create_multipart_upload(ctx, m_upload);
+ if (ret < 0) {
+ flb_plg_error(ctx->ins, "Could not initiate multipart upload");
+ if (chunk) {
+ s3_store_file_unlock(chunk);
+ }
+ if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
+ flb_free(payload_buf);
+ }
+ return FLB_RETRY;
+ }
+ m_upload->upload_state = MULTIPART_UPLOAD_STATE_CREATED;
+ }
+
+ ret = upload_part(ctx, m_upload, body, body_size);
+ if (ret < 0) {
+ if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
+ flb_free(payload_buf);
+ }
+ m_upload->upload_errors += 1;
+ /* re-add chunk to list */
+ if (chunk) {
+ s3_store_file_unlock(chunk);
+ chunk->failures += 1;
+ }
+ return FLB_RETRY;
+ }
+ m_upload->part_number += 1;
+ /* data was sent successfully- delete the local buffer */
+ if (chunk) {
+ s3_store_file_delete(ctx, chunk);
+ chunk = NULL;
+ }
+ if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
+ flb_free(payload_buf);
+ }
+ if (m_upload->bytes >= ctx->file_size) {
+ size_check = FLB_TRUE;
+ flb_plg_info(ctx->ins, "Will complete upload for %s because uploaded data is greater"
+ " than size set by total_file_size", m_upload->s3_key);
+ }
+ if (m_upload->part_number >= 10000) {
+ part_num_check = FLB_TRUE;
+ flb_plg_info(ctx->ins, "Will complete upload for %s because 10,000 chunks "
+ "(the API limit) have been uploaded", m_upload->s3_key);
+ }
+ if (time(NULL) >
+ (m_upload->init_time + ctx->upload_timeout + ctx->retry_time)) {
+ timeout_check = FLB_TRUE;
+ flb_plg_info(ctx->ins, "Will complete upload for %s because upload_timeout"
+ " has elapsed", m_upload->s3_key);
+ }
+ if (size_check || part_num_check || timeout_check) {
+ complete_upload = FLB_TRUE;
+ }
+
+ if (complete_upload == FLB_TRUE) {
+ /* mark for completion- the upload timer will handle actual completion */
+ m_upload->upload_state = MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS;
+ }
+
+ return FLB_OK;
+}
+
+
+/*
+ * Attempts to send all chunks to S3 using PutObject
+ * Used on shut down to try to send all buffered data
+ * Used on start up to try to send any leftover buffers from previous executions
+ */
+static int put_all_chunks(struct flb_s3 *ctx)
+{
+ struct s3_file *chunk;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct mk_list *f_head;
+ struct flb_fstore_file *fsf;
+ struct flb_fstore_stream *fs_stream;
+ void *payload_buf = NULL;
+ size_t payload_size = 0;
+ char *buffer = NULL;
+ size_t buffer_size;
+ int ret;
+
+ mk_list_foreach(head, &ctx->fs->streams) {
+ /* skip multi upload stream */
+ fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head);
+ if (fs_stream == ctx->stream_upload) {
+ continue;
+ }
+ /* skip metadata stream */
+ if (fs_stream == ctx->stream_metadata) {
+ continue;
+ }
+
+ mk_list_foreach_safe(f_head, tmp, &fs_stream->files) {
+ fsf = mk_list_entry(f_head, struct flb_fstore_file, _head);
+ chunk = fsf->data;
+
+ /* Locked chunks are being processed, skip */
+ if (chunk->locked == FLB_TRUE) {
+ continue;
+ }
+
+ if (chunk->failures >= MAX_UPLOAD_ERRORS) {
+ flb_plg_warn(ctx->ins,
+ "Chunk for tag %s failed to send %i times, "
+ "will not retry",
+ (char *) fsf->meta_buf, MAX_UPLOAD_ERRORS);
+ flb_fstore_file_inactive(ctx->fs, fsf);
+ continue;
+ }
+
+ ret = construct_request_buffer(ctx, NULL, chunk,
+ &buffer, &buffer_size);
+ if (ret < 0) {
+ flb_plg_error(ctx->ins,
+ "Could not construct request buffer for %s",
+ chunk->file_path);
+ return -1;
+ }
+
+ if (ctx->compression != FLB_AWS_COMPRESS_NONE) {
+ /* Map payload */
+ ret = flb_aws_compression_compress(ctx->compression, buffer, buffer_size, &payload_buf, &payload_size);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "Failed to compress data, uploading uncompressed data instead to prevent data loss");
+ } else {
+ flb_plg_info(ctx->ins, "Pre-compression chunk size is %zu, After compression, chunk is %zu bytes", buffer_size, payload_size);
+ buffer = (void *) payload_buf;
+ buffer_size = payload_size;
+ }
+ }
+
+ ret = s3_put_object(ctx, (const char *)
+ fsf->meta_buf,
+ chunk->create_time, buffer, buffer_size);
+ flb_free(buffer);
+ if (ret < 0) {
+ s3_store_file_unlock(chunk);
+ chunk->failures += 1;
+ return -1;
+ }
+
+ /* data was sent successfully- delete the local buffer */
+ s3_store_file_delete(ctx, chunk);
+ }
+ }
+
+ return 0;
+}
+
+/*
+ * Either new_data or chunk can be NULL, but not both
+ */
+static int construct_request_buffer(struct flb_s3 *ctx, flb_sds_t new_data,
+ struct s3_file *chunk,
+ char **out_buf, size_t *out_size)
+{
+ char *body;
+ char *tmp;
+ size_t body_size = 0;
+ char *buffered_data = NULL;
+ size_t buffer_size = 0;
+ int ret;
+
+ if (new_data == NULL && chunk == NULL) {
+ flb_plg_error(ctx->ins, "[construct_request_buffer] Something went wrong"
+ " both chunk and new_data are NULL");
+ return -1;
+ }
+
+ if (chunk) {
+ ret = s3_store_file_read(ctx, chunk, &buffered_data, &buffer_size);
+ if (ret < 0) {
+ flb_plg_error(ctx->ins, "Could not read locally buffered data %s",
+ chunk->file_path);
+ return -1;
+ }
+
+ /*
+ * lock the chunk from buffer list
+ */
+ s3_store_file_lock(chunk);
+ body = buffered_data;
+ body_size = buffer_size;
+ }
+
+ /*
+ * If new data is arriving, increase the original 'buffered_data' size
+ * to append the new one.
+ */
+ if (new_data) {
+ body_size += flb_sds_len(new_data);
+
+ tmp = flb_realloc(buffered_data, body_size + 1);
+ if (!tmp) {
+ flb_errno();
+ flb_free(buffered_data);
+ if (chunk) {
+ s3_store_file_unlock(chunk);
+ }
+ return -1;
+ }
+ body = buffered_data = tmp;
+ memcpy(body + buffer_size, new_data, flb_sds_len(new_data));
+ body[body_size] = '\0';
+ }
+
+ *out_buf = body;
+ *out_size = body_size;
+
+ return 0;
+}
+
+static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t file_first_log_time,
+ char *body, size_t body_size)
+{
+ flb_sds_t s3_key = NULL;
+ struct flb_http_client *c = NULL;
+ struct flb_aws_client *s3_client;
+ struct flb_aws_header *headers = NULL;
+ char *random_alphanumeric;
+ int append_random = FLB_FALSE;
+ int len;
+ int ret;
+ int num_headers = 0;
+ char *final_key;
+ flb_sds_t uri;
+ flb_sds_t tmp;
+ char final_body_md5[25];
+
+ s3_key = flb_get_s3_key(ctx->s3_key_format, file_first_log_time, tag,
+ ctx->tag_delimiters, ctx->seq_index);
+ if (!s3_key) {
+ flb_plg_error(ctx->ins, "Failed to construct S3 Object Key for %s", tag);
+ return -1;
+ }
+
+ len = strlen(s3_key);
+ if ((len + 16) <= 1024 && !ctx->key_fmt_has_uuid && !ctx->static_file_path &&
+ !ctx->key_fmt_has_seq_index) {
+ append_random = FLB_TRUE;
+ len += 16;
+ }
+ len += strlen(ctx->bucket + 1);
+
+ uri = flb_sds_create_size(len);
+
+ if (append_random == FLB_TRUE) {
+ random_alphanumeric = flb_sts_session_name();
+ if (!random_alphanumeric) {
+ flb_sds_destroy(s3_key);
+ flb_sds_destroy(uri);
+ flb_plg_error(ctx->ins, "Failed to create randomness for S3 key %s", tag);
+ return -1;
+ }
+ /* only use 8 chars of the random string */
+ random_alphanumeric[8] = '\0';
+
+ tmp = flb_sds_printf(&uri, "/%s%s-object%s", ctx->bucket, s3_key,
+ random_alphanumeric);
+ flb_free(random_alphanumeric);
+ }
+ else {
+ tmp = flb_sds_printf(&uri, "/%s%s", ctx->bucket, s3_key);
+ }
+
+ if (!tmp) {
+ flb_sds_destroy(s3_key);
+ flb_plg_error(ctx->ins, "Failed to create PutObject URI");
+ return -1;
+ }
+ flb_sds_destroy(s3_key);
+ uri = tmp;
+
+ memset(final_body_md5, 0, sizeof(final_body_md5));
+ if (ctx->send_content_md5 == FLB_TRUE) {
+ ret = get_md5_base64(body, body_size,
+ final_body_md5, sizeof(final_body_md5));
+ if (ret != 0) {
+ flb_plg_error(ctx->ins, "Failed to create Content-MD5 header");
+ flb_sds_destroy(uri);
+ return -1;
+ }
+ }
+
+ /* Update file and increment index value right before request */
+ if (ctx->key_fmt_has_seq_index) {
+ ctx->seq_index++;
+
+ ret = write_seq_index(ctx->seq_index_file, ctx->seq_index);
+ if (ret < 0 && access(ctx->seq_index_file, F_OK) == 0) {
+ ctx->seq_index--;
+ flb_sds_destroy(s3_key);
+ flb_plg_error(ctx->ins, "Failed to update sequential index metadata file");
+ return -1;
+ }
+ }
+
+ s3_client = ctx->s3_client;
+ if (s3_plugin_under_test() == FLB_TRUE) {
+ c = mock_s3_call("TEST_PUT_OBJECT_ERROR", "PutObject");
+ }
+ else {
+ ret = create_headers(ctx, final_body_md5, &headers, &num_headers, FLB_FALSE);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "Failed to create headers");
+ flb_sds_destroy(uri);
+ goto decrement_index;
+ }
+ c = s3_client->client_vtable->request(s3_client, FLB_HTTP_PUT,
+ uri, body, body_size,
+ headers, num_headers);
+ flb_free(headers);
+ }
+ if (c) {
+ flb_plg_debug(ctx->ins, "PutObject http status=%d", c->resp.status);
+ if (c->resp.status == 200) {
+ /*
+ * URI contains bucket name, so we must advance over it
+ * to print the object key
+ */
+ final_key = uri + strlen(ctx->bucket) + 1;
+ flb_plg_info(ctx->ins, "Successfully uploaded object %s", final_key);
+ flb_sds_destroy(uri);
+ flb_http_client_destroy(c);
+
+ return 0;
+ }
+ flb_aws_print_xml_error(c->resp.payload, c->resp.payload_size,
+ "PutObject", ctx->ins);
+ if (c->resp.data != NULL) {
+ flb_plg_error(ctx->ins, "Raw PutObject response: %s", c->resp.data);
+ }
+ flb_http_client_destroy(c);
+ }
+
+ flb_plg_error(ctx->ins, "PutObject request failed");
+ flb_sds_destroy(uri);
+ goto decrement_index;
+
+decrement_index:
+ if (ctx->key_fmt_has_seq_index) {
+ ctx->seq_index--;
+
+ ret = write_seq_index(ctx->seq_index_file, ctx->seq_index);
+ if (ret < 0) {
+ flb_plg_error(ctx->ins, "Failed to decrement index after request error");
+ return -1;
+ }
+ }
+ return -1;
+}
+
+int get_md5_base64(char *buf, size_t buf_size, char *md5_str, size_t md5_str_size)
+{
+ unsigned char md5_bin[16];
+ size_t olen;
+ int ret;
+
+ ret = flb_hash_simple(FLB_HASH_MD5,
+ (unsigned char *) buf, buf_size,
+ md5_bin, sizeof(md5_bin));
+
+ if (ret != FLB_CRYPTO_SUCCESS) {
+ return -1;
+ }
+
+ ret = flb_base64_encode((unsigned char*) md5_str, md5_str_size,
+ &olen, md5_bin, sizeof(md5_bin));
+ if (ret != 0) {
+ return ret;
+ }
+
+ return 0;
+}
+
+static struct multipart_upload *get_upload(struct flb_s3 *ctx,
+ const char *tag, int tag_len)
+{
+ struct multipart_upload *m_upload = NULL;
+ struct multipart_upload *tmp_upload = NULL;
+ struct mk_list *tmp;
+ struct mk_list *head;
+
+ mk_list_foreach_safe(head, tmp, &ctx->uploads) {
+ tmp_upload = mk_list_entry(head, struct multipart_upload, _head);
+
+ if (tmp_upload->upload_state == MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS) {
+ continue;
+ }
+ if (tmp_upload->upload_errors >= MAX_UPLOAD_ERRORS) {
+ tmp_upload->upload_state = MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS;
+ flb_plg_error(ctx->ins, "Upload for %s has reached max upload errors",
+ tmp_upload->s3_key);
+ continue;
+ }
+ if (strcmp(tmp_upload->tag, tag) == 0) {
+ m_upload = tmp_upload;
+ break;
+ }
+ }
+
+ return m_upload;
+}
+
+static struct multipart_upload *create_upload(struct flb_s3 *ctx, const char *tag,
+ int tag_len, time_t file_first_log_time)
+{
+ int ret;
+ struct multipart_upload *m_upload = NULL;
+ flb_sds_t s3_key = NULL;
+ flb_sds_t tmp_sds = NULL;
+
+ /* create new upload for this key */
+ m_upload = flb_calloc(1, sizeof(struct multipart_upload));
+ if (!m_upload) {
+ flb_errno();
+ return NULL;
+ }
+ s3_key = flb_get_s3_key(ctx->s3_key_format, file_first_log_time, tag,
+ ctx->tag_delimiters, ctx->seq_index);
+ if (!s3_key) {
+ flb_plg_error(ctx->ins, "Failed to construct S3 Object Key for %s", tag);
+ flb_free(m_upload);
+ return NULL;
+ }
+ m_upload->s3_key = s3_key;
+ tmp_sds = flb_sds_create_len(tag, tag_len);
+ if (!tmp_sds) {
+ flb_errno();
+ flb_free(m_upload);
+ return NULL;
+ }
+ m_upload->tag = tmp_sds;
+ m_upload->upload_state = MULTIPART_UPLOAD_STATE_NOT_CREATED;
+ m_upload->part_number = 1;
+ m_upload->init_time = time(NULL);
+ mk_list_add(&m_upload->_head, &ctx->uploads);
+
+ /* Update file and increment index value right before request */
+ if (ctx->key_fmt_has_seq_index) {
+ ctx->seq_index++;
+
+ ret = write_seq_index(ctx->seq_index_file, ctx->seq_index);
+ if (ret < 0) {
+ ctx->seq_index--;
+ flb_sds_destroy(s3_key);
+ flb_plg_error(ctx->ins, "Failed to write to sequential index metadata file");
+ return NULL;
+ }
+ }
+
+ return m_upload;
+}
+
+/* Adds an entry to upload queue */
+static int add_to_queue(struct flb_s3 *ctx, struct s3_file *upload_file,
+ struct multipart_upload *m_upload_file, const char *tag, int tag_len)
+{
+ struct upload_queue *upload_contents;
+ flb_sds_t tag_cpy;
+
+ /* Create upload contents object and add to upload queue */
+ upload_contents = flb_calloc(1, sizeof(struct upload_queue));
+ if (upload_contents == NULL) {
+ flb_plg_error(ctx->ins, "Error allocating memory for upload_queue entry");
+ flb_errno();
+ return -1;
+ }
+ upload_contents->upload_file = upload_file;
+ upload_contents->m_upload_file = m_upload_file;
+ upload_contents->tag_len = tag_len;
+ upload_contents->retry_counter = 0;
+ upload_contents->upload_time = -1;
+
+ /* Necessary to create separate string for tag to prevent corruption */
+ tag_cpy = flb_sds_create_len(tag, tag_len);
+ if (!tag_cpy) {
+ flb_errno();
+ flb_free(upload_contents);
+ return -1;
+ }
+ upload_contents->tag = tag_cpy;
+
+
+ /* Add entry to upload queue */
+ mk_list_add(&upload_contents->_head, &ctx->upload_queue);
+ return 0;
+}
+
+/* Removes an entry from upload_queue */
+void remove_from_queue(struct upload_queue *entry)
+{
+ mk_list_del(&entry->_head);
+ flb_sds_destroy(entry->tag);
+ flb_free(entry);
+ return;
+}
+
+/* Validity check for upload queue object */
+static int upload_queue_valid(struct upload_queue *upload_contents, time_t now,
+ void *out_context)
+{
+ struct flb_s3 *ctx = out_context;
+
+ if (upload_contents == NULL) {
+ flb_plg_error(ctx->ins, "Error getting entry from upload_queue");
+ return -1;
+ }
+ if (upload_contents->_head.next == NULL || upload_contents->_head.prev == NULL) {
+ flb_plg_debug(ctx->ins, "Encountered previously deleted entry in "
+ "upload_queue. Deleting invalid entry");
+ mk_list_del(&upload_contents->_head);
+ return -1;
+ }
+ if (upload_contents->upload_file->locked == FLB_FALSE) {
+ flb_plg_debug(ctx->ins, "Encountered unlocked file in upload_queue. "
+ "Exiting");
+ return -1;
+ }
+ if (upload_contents->upload_file->size <= 0) {
+ flb_plg_debug(ctx->ins, "Encountered empty chunk file in upload_queue. "
+ "Deleting empty chunk file");
+ remove_from_queue(upload_contents);
+ return -1;
+ }
+ if (now < upload_contents->upload_time) {
+ flb_plg_debug(ctx->ins, "Found valid chunk file but not ready to upload");
+ return -1;
+ }
+ return 0;
+}
+
+static int send_upload_request(void *out_context, flb_sds_t chunk,
+ struct s3_file *upload_file,
+ struct multipart_upload *m_upload_file,
+ const char *tag, int tag_len)
+{
+ int ret;
+ char *buffer;
+ size_t buffer_size;
+ struct flb_s3 *ctx = out_context;
+
+ /* Create buffer to upload to S3 */
+ ret = construct_request_buffer(ctx, chunk, upload_file, &buffer, &buffer_size);
+ flb_sds_destroy(chunk);
+ if (ret < 0) {
+ flb_plg_error(ctx->ins, "Could not construct request buffer for %s",
+ upload_file->file_path);
+ return -1;
+ }
+
+ /* Upload to S3 */
+ ret = upload_data(ctx, upload_file, m_upload_file, buffer, buffer_size, tag, tag_len);
+ flb_free(buffer);
+
+ return ret;
+}
+
+static int buffer_chunk(void *out_context, struct s3_file *upload_file,
+ flb_sds_t chunk, int chunk_size,
+ const char *tag, int tag_len,
+ time_t file_first_log_time)
+{
+ int ret;
+ struct flb_s3 *ctx = out_context;
+
+ ret = s3_store_buffer_put(ctx, upload_file, tag,
+ tag_len, chunk, (size_t) chunk_size, file_first_log_time);
+ flb_sds_destroy(chunk);
+ if (ret < 0) {
+ flb_plg_warn(ctx->ins, "Could not buffer chunk. Data order preservation "
+ "will be compromised");
+ return -1;
+ }
+ return 0;
+}
+
+/* Uploads all chunk files in queue synchronously */
+static void s3_upload_queue(struct flb_config *config, void *out_context)
+{
+ int ret;
+ time_t now;
+ struct upload_queue *upload_contents;
+ struct flb_s3 *ctx = out_context;
+ struct mk_list *tmp;
+ struct mk_list *head;
+
+ flb_plg_debug(ctx->ins, "Running upload timer callback (upload_queue)..");
+
+ /* No chunks in upload queue. Scan for timed out chunks. */
+ if (mk_list_size(&ctx->upload_queue) == 0) {
+ flb_plg_debug(ctx->ins, "No files found in upload_queue. Scanning for timed "
+ "out chunks");
+ cb_s3_upload(config, out_context);
+ }
+
+ /* Iterate through each file in upload queue */
+ mk_list_foreach_safe(head, tmp, &ctx->upload_queue) {
+ upload_contents = mk_list_entry(head, struct upload_queue, _head);
+
+ now = time(NULL);
+
+ /* Checks if upload_contents is valid */
+ ret = upload_queue_valid(upload_contents, now, ctx);
+ if (ret < 0) {
+ goto exit;
+ }
+
+ /* Try to upload file. Return value can be -1, FLB_OK, FLB_ERROR, FLB_RETRY. */
+ ret = send_upload_request(ctx, NULL, upload_contents->upload_file,
+ upload_contents->m_upload_file,
+ upload_contents->tag, upload_contents->tag_len);
+ if (ret < 0) {
+ goto exit;
+ }
+ else if (ret == FLB_OK) {
+ remove_from_queue(upload_contents);
+ ctx->retry_time = 0;
+ ctx->upload_queue_success = FLB_TRUE;
+ }
+ else {
+ s3_store_file_lock(upload_contents->upload_file);
+ ctx->upload_queue_success = FLB_FALSE;
+
+ /* If retry limit was reached, discard file and remove file from queue */
+ upload_contents->retry_counter++;
+ if (upload_contents->retry_counter >= MAX_UPLOAD_ERRORS) {
+ flb_plg_warn(ctx->ins, "Chunk file failed to send %d times, will not "
+ "retry", upload_contents->retry_counter);
+ s3_store_file_inactive(ctx, upload_contents->upload_file);
+ multipart_upload_destroy(upload_contents->m_upload_file);
+ remove_from_queue(upload_contents);
+ continue;
+ }
+
+ /* Retry in N seconds */
+ upload_contents->upload_time = now + 2 * upload_contents->retry_counter;
+ ctx->retry_time += 2 * upload_contents->retry_counter;
+ flb_plg_debug(ctx->ins, "Failed to upload file in upload_queue. Will not "
+ "retry for %d seconds", 2 * upload_contents->retry_counter);
+ break;
+ }
+ }
+
+exit:
+ return;
+}
+
+static void cb_s3_upload(struct flb_config *config, void *data)
+{
+ struct flb_s3 *ctx = data;
+ struct s3_file *chunk = NULL;
+ struct multipart_upload *m_upload = NULL;
+ struct flb_fstore_file *fsf;
+ char *buffer = NULL;
+ size_t buffer_size = 0;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ int complete;
+ int ret;
+ time_t now;
+
+ flb_plg_debug(ctx->ins, "Running upload timer callback (cb_s3_upload)..");
+
+ now = time(NULL);
+
+ /* Check all chunks and see if any have timed out */
+ mk_list_foreach_safe(head, tmp, &ctx->stream_active->files) {
+ fsf = mk_list_entry(head, struct flb_fstore_file, _head);
+ chunk = fsf->data;
+
+ if (now < (chunk->create_time + ctx->upload_timeout + ctx->retry_time)) {
+ continue; /* Only send chunks which have timed out */
+ }
+
+ /* Locked chunks are being processed, skip */
+ if (chunk->locked == FLB_TRUE) {
+ continue;
+ }
+
+ m_upload = get_upload(ctx, (const char *) fsf->meta_buf, fsf->meta_size);
+
+ ret = construct_request_buffer(ctx, NULL, chunk, &buffer, &buffer_size);
+ if (ret < 0) {
+ flb_plg_error(ctx->ins, "Could not construct request buffer for %s",
+ chunk->file_path);
+ continue;
+ }
+
+ /* FYI: if construct_request_buffer() succeedeed, the s3_file is locked */
+ ret = upload_data(ctx, chunk, m_upload, buffer, buffer_size,
+ (const char *) fsf->meta_buf, fsf->meta_size);
+ flb_free(buffer);
+ if (ret != FLB_OK) {
+ flb_plg_error(ctx->ins, "Could not send chunk with tag %s",
+ (char *) fsf->meta_buf);
+ }
+ }
+
+ /* Check all uploads and see if any need completion */
+ mk_list_foreach_safe(head, tmp, &ctx->uploads) {
+ m_upload = mk_list_entry(head, struct multipart_upload, _head);
+ complete = FLB_FALSE;
+
+ if (m_upload->complete_errors >= MAX_UPLOAD_ERRORS) {
+ flb_plg_error(ctx->ins,
+ "Upload for %s has reached max completion errors, "
+ "plugin will give up", m_upload->s3_key);
+ mk_list_del(&m_upload->_head);
+ continue;
+ }
+
+ if (m_upload->upload_state == MULTIPART_UPLOAD_STATE_NOT_CREATED) {
+ continue;
+ }
+
+ if (m_upload->upload_state == MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS) {
+ complete = FLB_TRUE;
+ }
+ if (time(NULL) > (m_upload->init_time + ctx->upload_timeout + ctx->retry_time)) {
+ flb_plg_info(ctx->ins, "Completing upload for %s because upload_timeout"
+ " has passed", m_upload->s3_key);
+ complete = FLB_TRUE;
+ }
+ if (complete == FLB_TRUE) {
+ m_upload->upload_state = MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS;
+ mk_list_del(&m_upload->_head);
+ ret = complete_multipart_upload(ctx, m_upload);
+ if (ret == 0) {
+ multipart_upload_destroy(m_upload);
+ }
+ else {
+ mk_list_add(&m_upload->_head, &ctx->uploads);
+ /* data was persisted, this can be retried */
+ m_upload->complete_errors += 1;
+ flb_plg_error(ctx->ins, "Could not complete upload %s, will retry..",
+ m_upload->s3_key);
+ }
+ }
+ }
+
+}
+
+static flb_sds_t flb_pack_msgpack_extract_log_key(void *out_context, const char *data,
+ uint64_t bytes)
+{
+ int i;
+ int records = 0;
+ int map_size;
+ int check = FLB_FALSE;
+ int found = FLB_FALSE;
+ int log_key_missing = 0;
+ int ret;
+ int alloc_error = 0;
+ struct flb_s3 *ctx = out_context;
+ char *val_buf;
+ char *key_str = NULL;
+ size_t key_str_size = 0;
+ size_t msgpack_size = bytes + bytes / 4;
+ size_t val_offset = 0;
+ flb_sds_t out_buf;
+ msgpack_object map;
+ msgpack_object key;
+ msgpack_object val;
+ struct flb_log_event_decoder log_decoder;
+ struct flb_log_event log_event;
+
+ /* Iterate the original buffer and perform adjustments */
+ records = flb_mp_count(data, bytes);
+ if (records <= 0) {
+ return NULL;
+ }
+
+ /* Allocate buffer to store log_key contents */
+ val_buf = flb_calloc(1, msgpack_size);
+ if (val_buf == NULL) {
+ flb_plg_error(ctx->ins, "Could not allocate enough "
+ "memory to read record");
+ flb_errno();
+ return NULL;
+ }
+
+ ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
+
+ if (ret != FLB_EVENT_DECODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event decoder initialization error : %d", ret);
+
+ flb_free(val_buf);
+
+ return NULL;
+ }
+
+
+ while (!alloc_error &&
+ (ret = flb_log_event_decoder_next(
+ &log_decoder,
+ &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
+
+ /* Get the record/map */
+ map = *log_event.body;
+
+ if (map.type != MSGPACK_OBJECT_MAP) {
+ continue;
+ }
+
+ map_size = map.via.map.size;
+
+ /* Reset variables for found log_key and correct type */
+ found = FLB_FALSE;
+ check = FLB_FALSE;
+
+ /* Extract log_key from record and append to output buffer */
+ for (i = 0; i < map_size; i++) {
+ key = map.via.map.ptr[i].key;
+ val = map.via.map.ptr[i].val;
+
+ if (key.type == MSGPACK_OBJECT_BIN) {
+ key_str = (char *) key.via.bin.ptr;
+ key_str_size = key.via.bin.size;
+ check = FLB_TRUE;
+ }
+ if (key.type == MSGPACK_OBJECT_STR) {
+ key_str = (char *) key.via.str.ptr;
+ key_str_size = key.via.str.size;
+ check = FLB_TRUE;
+ }
+
+ if (check == FLB_TRUE) {
+ if (strncmp(ctx->log_key, key_str, key_str_size) == 0) {
+ found = FLB_TRUE;
+
+ /*
+ * Copy contents of value into buffer. Necessary to copy
+ * strings because flb_msgpack_to_json does not handle nested
+ * JSON gracefully and double escapes them.
+ */
+ if (val.type == MSGPACK_OBJECT_BIN) {
+ memcpy(val_buf + val_offset, val.via.bin.ptr, val.via.bin.size);
+ val_offset += val.via.bin.size;
+ val_buf[val_offset] = '\n';
+ val_offset++;
+ }
+ else if (val.type == MSGPACK_OBJECT_STR) {
+ memcpy(val_buf + val_offset, val.via.str.ptr, val.via.str.size);
+ val_offset += val.via.str.size;
+ val_buf[val_offset] = '\n';
+ val_offset++;
+ }
+ else {
+ ret = flb_msgpack_to_json(val_buf + val_offset,
+ msgpack_size - val_offset, &val);
+ if (ret < 0) {
+ break;
+ }
+ val_offset += ret;
+ val_buf[val_offset] = '\n';
+ val_offset++;
+ }
+ /* Exit early once log_key has been found for current record */
+ break;
+ }
+ }
+ }
+
+ /* If log_key was not found in the current record, mark log key as missing */
+ if (found == FLB_FALSE) {
+ log_key_missing++;
+ }
+ }
+
+ /* Throw error once per chunk if at least one log key was not found */
+ if (log_key_missing > 0) {
+ flb_plg_error(ctx->ins, "Could not find log_key '%s' in %d records",
+ ctx->log_key, log_key_missing);
+ }
+
+ flb_log_event_decoder_destroy(&log_decoder);
+
+ /* If nothing was read, destroy buffer */
+ if (val_offset == 0) {
+ flb_free(val_buf);
+ return NULL;
+ }
+ val_buf[val_offset] = '\0';
+
+ /* Create output buffer to store contents */
+ out_buf = flb_sds_create(val_buf);
+ if (out_buf == NULL) {
+ flb_plg_error(ctx->ins, "Error creating buffer to store log_key contents.");
+ flb_errno();
+ }
+ flb_free(val_buf);
+
+ return out_buf;
+}
+
+static void unit_test_flush(void *out_context, struct s3_file *upload_file,
+ const char *tag, int tag_len, flb_sds_t chunk,
+ int chunk_size, struct multipart_upload *m_upload_file,
+ time_t file_first_log_time)
+{
+ int ret;
+ char *buffer;
+ size_t buffer_size;
+ struct flb_s3 *ctx = out_context;
+
+ s3_store_buffer_put(ctx, upload_file, tag, tag_len,
+ chunk, (size_t) chunk_size, file_first_log_time);
+ ret = construct_request_buffer(ctx, chunk, upload_file, &buffer, &buffer_size);
+ if (ret < 0) {
+ flb_plg_error(ctx->ins, "Could not construct request buffer for %s",
+ upload_file->file_path);
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ ret = upload_data(ctx, upload_file, m_upload_file, buffer, buffer_size, tag, tag_len);
+ flb_free(buffer);
+
+ FLB_OUTPUT_RETURN(ret);
+}
+
+static void flush_init(void *out_context)
+{
+ int ret;
+ struct flb_s3 *ctx = out_context;
+ struct flb_sched *sched;
+
+ /* clean up any old buffers found on startup */
+ if (ctx->has_old_buffers == FLB_TRUE) {
+ flb_plg_info(ctx->ins,
+ "Sending locally buffered data from previous "
+ "executions to S3; buffer=%s",
+ ctx->fs->root_path);
+ ctx->has_old_buffers = FLB_FALSE;
+ ret = put_all_chunks(ctx);
+ if (ret < 0) {
+ ctx->has_old_buffers = FLB_TRUE;
+ flb_plg_error(ctx->ins,
+ "Failed to send locally buffered data left over "
+ "from previous executions; will retry. Buffer=%s",
+ ctx->fs->root_path);
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+ }
+
+ /*
+ * create a timer that will run periodically and check if uploads
+ * are ready for completion
+ * this is created once on the first flush
+ */
+ if (ctx->timer_created == FLB_FALSE) {
+ flb_plg_debug(ctx->ins,
+ "Creating upload timer with frequency %ds",
+ ctx->timer_ms / 1000);
+
+ sched = flb_sched_ctx_get();
+
+ if (ctx->preserve_data_ordering) {
+ ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM,
+ ctx->timer_ms, s3_upload_queue, ctx, NULL);
+ }
+ else {
+ ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM,
+ ctx->timer_ms, cb_s3_upload, ctx, NULL);
+ }
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "Failed to create upload timer");
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+ ctx->timer_created = FLB_TRUE;
+ }
+}
+
+static void cb_s3_flush(struct flb_event_chunk *event_chunk,
+ struct flb_output_flush *out_flush,
+ struct flb_input_instance *i_ins,
+ void *out_context,
+ struct flb_config *config)
+{
+ int ret;
+ int chunk_size;
+ int upload_timeout_check = FLB_FALSE;
+ int total_file_size_check = FLB_FALSE;
+ flb_sds_t chunk = NULL;
+ struct s3_file *upload_file = NULL;
+ struct flb_s3 *ctx = out_context;
+ struct multipart_upload *m_upload_file = NULL;
+ time_t file_first_log_time = 0;
+ struct flb_log_event_decoder log_decoder;
+ struct flb_log_event log_event;
+
+ /* Cleanup old buffers and initialize upload timer */
+ flush_init(ctx);
+
+ /* Process chunk */
+ if (ctx->log_key) {
+ chunk = flb_pack_msgpack_extract_log_key(ctx,
+ event_chunk->data,
+ event_chunk->size);
+ }
+ else {
+ chunk = flb_pack_msgpack_to_json_format(event_chunk->data,
+ event_chunk->size,
+ FLB_PACK_JSON_FORMAT_LINES,
+ ctx->json_date_format,
+ ctx->date_key);
+ }
+ if (chunk == NULL) {
+ flb_plg_error(ctx->ins, "Could not marshal msgpack to output string");
+ FLB_OUTPUT_RETURN(FLB_ERROR);
+ }
+ chunk_size = flb_sds_len(chunk);
+
+ /* Get a file candidate matching the given 'tag' */
+ upload_file = s3_store_file_get(ctx,
+ event_chunk->tag,
+ flb_sds_len(event_chunk->tag));
+
+ if (upload_file == NULL) {
+ ret = flb_log_event_decoder_init(&log_decoder,
+ (char *) event_chunk->data,
+ event_chunk->size);
+
+ if (ret != FLB_EVENT_DECODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event decoder initialization error : %d", ret);
+
+ flb_sds_destroy(chunk);
+
+ FLB_OUTPUT_RETURN(FLB_ERROR);
+ }
+
+ while ((ret = flb_log_event_decoder_next(
+ &log_decoder,
+ &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
+ if (log_event.timestamp.tm.tv_sec != 0) {
+ file_first_log_time = log_event.timestamp.tm.tv_sec;
+ break;
+ }
+ }
+
+ flb_log_event_decoder_destroy(&log_decoder);
+ }
+ else {
+ /* Get file_first_log_time from upload_file */
+ file_first_log_time = upload_file->first_log_time;
+ }
+
+ if (file_first_log_time == 0) {
+ file_first_log_time = time(NULL);
+ }
+
+ /* Specific to unit tests, will not get called normally */
+ if (s3_plugin_under_test() == FLB_TRUE) {
+ unit_test_flush(ctx, upload_file,
+ event_chunk->tag, flb_sds_len(event_chunk->tag),
+ chunk, chunk_size,
+ m_upload_file, file_first_log_time);
+ }
+
+ /* Discard upload_file if it has failed to upload MAX_UPLOAD_ERRORS times */
+ if (upload_file != NULL && upload_file->failures >= MAX_UPLOAD_ERRORS) {
+ flb_plg_warn(ctx->ins, "File with tag %s failed to send %d times, will not "
+ "retry", event_chunk->tag, MAX_UPLOAD_ERRORS);
+ s3_store_file_inactive(ctx, upload_file);
+ upload_file = NULL;
+ }
+
+ /* If upload_timeout has elapsed, upload file */
+ if (upload_file != NULL && time(NULL) >
+ (upload_file->create_time + ctx->upload_timeout)) {
+ upload_timeout_check = FLB_TRUE;
+ flb_plg_info(ctx->ins, "upload_timeout reached for %s",
+ event_chunk->tag);
+ }
+
+ m_upload_file = get_upload(ctx,
+ event_chunk->tag, flb_sds_len(event_chunk->tag));
+
+ if (m_upload_file != NULL && time(NULL) >
+ (m_upload_file->init_time + ctx->upload_timeout)) {
+ upload_timeout_check = FLB_TRUE;
+ flb_plg_info(ctx->ins, "upload_timeout reached for %s", event_chunk->tag);
+ }
+
+ /* If total_file_size has been reached, upload file */
+ if ((upload_file && upload_file->size + chunk_size > ctx->upload_chunk_size) ||
+ (m_upload_file && m_upload_file->bytes + chunk_size > ctx->file_size)) {
+ total_file_size_check = FLB_TRUE;
+ }
+
+ /* File is ready for upload, upload_file != NULL prevents from segfaulting. */
+ if ((upload_file != NULL) && (upload_timeout_check == FLB_TRUE || total_file_size_check == FLB_TRUE)) {
+ if (ctx->preserve_data_ordering == FLB_TRUE) {
+ /* Buffer last chunk in file and lock file to prevent further changes */
+ ret = buffer_chunk(ctx, upload_file, chunk, chunk_size,
+ event_chunk->tag, flb_sds_len(event_chunk->tag),
+ file_first_log_time);
+
+ if (ret < 0) {
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+ s3_store_file_lock(upload_file);
+
+ /* Add chunk file to upload queue */
+ ret = add_to_queue(ctx, upload_file, m_upload_file,
+ event_chunk->tag, flb_sds_len(event_chunk->tag));
+ if (ret < 0) {
+ FLB_OUTPUT_RETURN(FLB_ERROR);
+ }
+
+ /* Go through upload queue and return error if something went wrong */
+ s3_upload_queue(config, ctx);
+ if (ctx->upload_queue_success == FLB_FALSE) {
+ ctx->upload_queue_success = FLB_TRUE;
+ FLB_OUTPUT_RETURN(FLB_ERROR);
+ }
+ FLB_OUTPUT_RETURN(FLB_OK);
+ }
+ else {
+ /* Send upload directly without upload queue */
+ ret = send_upload_request(ctx, chunk, upload_file, m_upload_file,
+ event_chunk->tag,
+ flb_sds_len(event_chunk->tag));
+ if (ret < 0) {
+ FLB_OUTPUT_RETURN(FLB_ERROR);
+ }
+ FLB_OUTPUT_RETURN(ret);
+ }
+ }
+
+ /* Buffer current chunk in filesystem and wait for next chunk from engine */
+ ret = buffer_chunk(ctx, upload_file, chunk, chunk_size,
+ event_chunk->tag, flb_sds_len(event_chunk->tag),
+ file_first_log_time);
+
+ if (ret < 0) {
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+ FLB_OUTPUT_RETURN(FLB_OK);
+}
+
+static int cb_s3_exit(void *data, struct flb_config *config)
+{
+ int ret;
+ struct flb_s3 *ctx = data;
+ struct multipart_upload *m_upload = NULL;
+ struct mk_list *tmp;
+ struct mk_list *head;
+
+ if (!ctx) {
+ return 0;
+ }
+
+ if (s3_store_has_data(ctx) == FLB_TRUE) {
+ flb_plg_info(ctx->ins, "Sending all locally buffered data to S3");
+ ret = put_all_chunks(ctx);
+ if (ret < 0) {
+ flb_plg_error(ctx->ins, "Could not send all chunks on exit");
+ }
+ }
+
+ if (s3_store_has_uploads(ctx) == FLB_TRUE) {
+ mk_list_foreach_safe(head, tmp, &ctx->uploads) {
+ m_upload = mk_list_entry(head, struct multipart_upload, _head);
+
+ if (m_upload->upload_state == MULTIPART_UPLOAD_STATE_NOT_CREATED) {
+ continue;
+ }
+
+ if (m_upload->bytes > 0) {
+ m_upload->upload_state = MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS;
+ mk_list_del(&m_upload->_head);
+ ret = complete_multipart_upload(ctx, m_upload);
+ if (ret == 0) {
+ multipart_upload_destroy(m_upload);
+ }
+ else {
+ mk_list_add(&m_upload->_head, &ctx->uploads);
+ flb_plg_error(ctx->ins, "Could not complete upload %s",
+ m_upload->s3_key);
+ }
+ }
+ }
+ }
+
+ s3_store_exit(ctx);
+ s3_context_destroy(ctx);
+
+ return 0;
+}
+
+/* Configuration properties map */
+static struct flb_config_map config_map[] = {
+ {
+ FLB_CONFIG_MAP_STR, "json_date_format", NULL,
+ 0, FLB_FALSE, 0,
+ FBL_PACK_JSON_DATE_FORMAT_DESCRIPTION
+ },
+ {
+ FLB_CONFIG_MAP_STR, "json_date_key", "date",
+ 0, FLB_TRUE, offsetof(struct flb_s3, json_date_key),
+ "Specifies the name of the date field in output."
+ },
+ {
+ FLB_CONFIG_MAP_SIZE, "total_file_size", "100000000",
+ 0, FLB_TRUE, offsetof(struct flb_s3, file_size),
+ "Specifies the size of files in S3. Maximum size is 50GB, minimum is 1MB"
+ },
+ {
+ FLB_CONFIG_MAP_SIZE, "upload_chunk_size", "5242880",
+ 0, FLB_TRUE, offsetof(struct flb_s3, upload_chunk_size),
+ "This plugin uses the S3 Multipart Upload API to stream data to S3, "
+ "ensuring your data gets-off-the-box as quickly as possible. "
+ "This parameter configures the size of each “part” in the upload. "
+ "The total_file_size option configures the size of the file you will see "
+ "in S3; this option determines the size of chunks uploaded until that "
+ "size is reached. These chunks are temporarily stored in chunk_buffer_path "
+ "until their size reaches upload_chunk_size, which point the chunk is "
+ "uploaded to S3. Default: 5M, Max: 50M, Min: 5M."
+ },
+
+ {
+ FLB_CONFIG_MAP_TIME, "upload_timeout", "10m",
+ 0, FLB_TRUE, offsetof(struct flb_s3, upload_timeout),
+ "Optionally specify a timeout for uploads. "
+ "Whenever this amount of time has elapsed, Fluent Bit will complete an "
+ "upload and create a new file in S3. For example, set this value to 60m "
+ "and you will get a new file in S3 every hour. Default is 10m."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "bucket", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_s3, bucket),
+ "S3 bucket name."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "region", "us-east-1",
+ 0, FLB_TRUE, offsetof(struct flb_s3, region),
+ "AWS region."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "role_arn", NULL,
+ 0, FLB_FALSE, 0,
+ "ARN of an IAM role to assume (ex. for cross account access)."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "endpoint", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_s3, endpoint),
+ "Custom endpoint for the S3 API."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "sts_endpoint", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_s3, sts_endpoint),
+ "Custom endpoint for the STS API."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "canned_acl", NULL,
+ 0, FLB_FALSE, 0,
+ "Predefined Canned ACL policy for S3 objects."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "compression", NULL,
+ 0, FLB_FALSE, 0,
+ "Compression type for S3 objects. 'gzip' and 'arrow' are the supported values. "
+ "'arrow' is only an available if Apache Arrow was enabled at compile time. "
+ "Defaults to no compression. "
+ "If 'gzip' is selected, the Content-Encoding HTTP Header will be set to 'gzip'."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "content_type", NULL,
+ 0, FLB_FALSE, 0,
+ "A standard MIME type for the S3 object; this will be set "
+ "as the Content-Type HTTP header."
+ },
+
+ {
+ FLB_CONFIG_MAP_STR, "store_dir", "/tmp/fluent-bit/s3",
+ 0, FLB_TRUE, offsetof(struct flb_s3, store_dir),
+ "Directory to locally buffer data before sending. Plugin uses the S3 Multipart "
+ "upload API to send data in chunks of 5 MB at a time- only a small amount of"
+ " data will be locally buffered at any given point in time."
+ },
+
+ {
+ FLB_CONFIG_MAP_SIZE, "store_dir_limit_size", (char *) NULL,
+ 0, FLB_TRUE, offsetof(struct flb_s3, store_dir_limit_size),
+ "S3 plugin has its own buffering system with files in the `store_dir`. "
+ "Use the `store_dir_limit_size` to limit the amount of data S3 buffers in "
+ "the `store_dir` to limit disk usage. If the limit is reached, "
+ "data will be discarded. Default is 0 which means unlimited."
+ },
+
+ {
+ FLB_CONFIG_MAP_STR, "s3_key_format", "/fluent-bit-logs/$TAG/%Y/%m/%d/%H/%M/%S",
+ 0, FLB_TRUE, offsetof(struct flb_s3, s3_key_format),
+ "Format string for keys in S3. This option supports strftime time formatters "
+ "and a syntax for selecting parts of the Fluent log tag using a syntax inspired "
+ "by the rewrite_tag filter. Add $TAG in the format string to insert the full "
+ "log tag; add $TAG[0] to insert the first part of the tag in the s3 key. "
+ "The tag is split into “parts” using the characters specified with the "
+ "s3_key_format_tag_delimiters option. Add $INDEX to enable sequential indexing "
+ "for file names. Adding $INDEX will prevent random string being added to end of key"
+ "when $UUID is not provided. See the in depth examples and tutorial in the "
+ "documentation."
+ },
+
+ {
+ FLB_CONFIG_MAP_STR, "s3_key_format_tag_delimiters", ".",
+ 0, FLB_TRUE, offsetof(struct flb_s3, tag_delimiters),
+ "A series of characters which will be used to split the tag into “parts” for "
+ "use with the s3_key_format option. See the in depth examples and tutorial in "
+ "the documentation."
+ },
+
+ {
+ FLB_CONFIG_MAP_BOOL, "auto_retry_requests", "true",
+ 0, FLB_TRUE, offsetof(struct flb_s3, retry_requests),
+ "Immediately retry failed requests to AWS services once. This option "
+ "does not affect the normal Fluent Bit retry mechanism with backoff. "
+ "Instead, it enables an immediate retry with no delay for networking "
+ "errors, which may help improve throughput when there are transient/random "
+ "networking issues."
+ },
+
+ {
+ FLB_CONFIG_MAP_BOOL, "use_put_object", "false",
+ 0, FLB_TRUE, offsetof(struct flb_s3, use_put_object),
+ "Use the S3 PutObject API, instead of the multipart upload API"
+ },
+
+ {
+ FLB_CONFIG_MAP_BOOL, "send_content_md5", "false",
+ 0, FLB_TRUE, offsetof(struct flb_s3, send_content_md5),
+ "Send the Content-MD5 header with object uploads, as is required when Object Lock is enabled"
+ },
+
+ {
+ FLB_CONFIG_MAP_BOOL, "preserve_data_ordering", "true",
+ 0, FLB_TRUE, offsetof(struct flb_s3, preserve_data_ordering),
+ "Normally, when an upload request fails, there is a high chance for the last "
+ "received chunk to be swapped with a later chunk, resulting in data shuffling. "
+ "This feature prevents this shuffling by using a queue logic for uploads."
+ },
+
+ {
+ FLB_CONFIG_MAP_STR, "log_key", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_s3, log_key),
+ "By default, the whole log record will be sent to S3. "
+ "If you specify a key name with this option, then only the value of "
+ "that key will be sent to S3."
+ },
+
+ {
+ FLB_CONFIG_MAP_STR, "external_id", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_s3, external_id),
+ "Specify an external ID for the STS API, can be used with the role_arn parameter if your role "
+ "requires an external ID."
+ },
+
+ {
+ FLB_CONFIG_MAP_BOOL, "static_file_path", "false",
+ 0, FLB_TRUE, offsetof(struct flb_s3, static_file_path),
+ "Disables behavior where UUID string is automatically appended to end of S3 key name when "
+ "$UUID is not provided in s3_key_format. $UUID, time formatters, $TAG, and other dynamic "
+ "key formatters all work as expected while this feature is set to true."
+ },
+
+ {
+ FLB_CONFIG_MAP_STR, "storage_class", NULL,
+ 0, FLB_FALSE, 0,
+ "Specify the storage class for S3 objects. If this option is not specified, objects "
+ "will be stored with the default 'STANDARD' storage class."
+ },
+
+ {
+ FLB_CONFIG_MAP_STR, "profile", NULL,
+ 0, FLB_TRUE, offsetof(struct flb_s3, profile),
+ "AWS Profile name. AWS Profiles can be configured with AWS CLI and are usually stored in "
+ "$HOME/.aws/ directory."
+ },
+
+ /* EOF */
+ {0}
+};
+
+/* Plugin registration */
+struct flb_output_plugin out_s3_plugin = {
+ .name = "s3",
+ .description = "Send to S3",
+ .cb_init = cb_s3_init,
+ .cb_flush = cb_s3_flush,
+ .cb_exit = cb_s3_exit,
+ .workers = 1,
+ .flags = FLB_OUTPUT_NET | FLB_IO_TLS,
+ .config_map = config_map
+};
diff --git a/src/fluent-bit/plugins/out_s3/s3.h b/src/fluent-bit/plugins/out_s3/s3.h
new file mode 100644
index 000000000..e145b1ad6
--- /dev/null
+++ b/src/fluent-bit/plugins/out_s3/s3.h
@@ -0,0 +1,203 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_OUT_S3
+#define FLB_OUT_S3
+
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_aws_credentials.h>
+#include <fluent-bit/flb_aws_util.h>
+
+/* Upload data to S3 in 5MB chunks */
+#define MIN_CHUNKED_UPLOAD_SIZE 5242880
+#define MAX_CHUNKED_UPLOAD_SIZE 50000000
+#define MAX_CHUNKED_UPLOAD_COMPRESS_SIZE 5000000000
+
+#define UPLOAD_TIMER_MAX_WAIT 60000
+#define UPLOAD_TIMER_MIN_WAIT 6000
+
+#define MULTIPART_UPLOAD_STATE_NOT_CREATED 0
+#define MULTIPART_UPLOAD_STATE_CREATED 1
+#define MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS 2
+
+#define DEFAULT_FILE_SIZE 100000000
+#define MAX_FILE_SIZE 50000000000
+#define MAX_FILE_SIZE_STR "50,000,000,000"
+
+/* Allowed max file size 1 GB for publishing to S3 */
+#define MAX_FILE_SIZE_PUT_OBJECT 1000000000
+
+#define DEFAULT_UPLOAD_TIMEOUT 3600
+
+/*
+ * If we see repeated errors on an upload/chunk, we will discard it
+ * This saves us from scenarios where something goes wrong and an upload can
+ * not proceed (may be some other process completed it or deleted the upload)
+ * instead of erroring out forever, we eventually discard the upload.
+ *
+ * The same is done for chunks, just to be safe, even though realistically
+ * I can't think of a reason why a chunk could become unsendable.
+ */
+#define MAX_UPLOAD_ERRORS 5
+
+struct upload_queue {
+ struct s3_file *upload_file;
+ struct multipart_upload *m_upload_file;
+ flb_sds_t tag;
+ int tag_len;
+
+ int retry_counter;
+ time_t upload_time;
+
+ struct mk_list _head;
+};
+
+struct multipart_upload {
+ flb_sds_t s3_key;
+ flb_sds_t tag;
+ flb_sds_t upload_id;
+ int upload_state;
+ time_t init_time;
+
+ /*
+ * maximum of 10,000 parts in an upload, for each we need to store mapping
+ * of Part Number to ETag
+ */
+ flb_sds_t etags[10000];
+ int part_number;
+
+ /*
+ * we use async http, so we need to check that all part requests have
+ * completed before we complete the upload
+ */
+ int parts_uploaded;
+
+ /* ongoing tracker of how much data has been sent for this upload */
+ size_t bytes;
+
+ struct mk_list _head;
+
+ /* see note for MAX_UPLOAD_ERRORS */
+ int upload_errors;
+ int complete_errors;
+};
+
+struct flb_s3 {
+ char *bucket;
+ char *region;
+ char *s3_key_format;
+ char *tag_delimiters;
+ char *endpoint;
+ char *sts_endpoint;
+ char *canned_acl;
+ char *content_type;
+ char *storage_class;
+ char *log_key;
+ char *external_id;
+ char *profile;
+ int free_endpoint;
+ int retry_requests;
+ int use_put_object;
+ int send_content_md5;
+ int static_file_path;
+ int compression;
+ int port;
+ int insecure;
+ size_t store_dir_limit_size;
+
+ /* track the total amount of buffered data */
+ size_t current_buffer_size;
+
+ struct flb_aws_provider *provider;
+ struct flb_aws_provider *base_provider;
+ /* tls instances can't be re-used; aws provider requires a separate one */
+ struct flb_tls *provider_tls;
+ /* one for the standard chain provider, one for sts assume role */
+ struct flb_tls *sts_provider_tls;
+ struct flb_tls *client_tls;
+
+ struct flb_aws_client *s3_client;
+ int json_date_format;
+ flb_sds_t json_date_key;
+ flb_sds_t date_key;
+
+ flb_sds_t buffer_dir;
+
+ char *store_dir;
+ struct flb_fstore *fs;
+ struct flb_fstore_stream *stream_active; /* default active stream */
+ struct flb_fstore_stream *stream_upload; /* multipart upload stream */
+ struct flb_fstore_stream *stream_metadata; /* s3 metadata stream */
+
+ /*
+ * used to track that unset buffers were found on startup that have not
+ * been sent
+ */
+ int has_old_buffers;
+ /* old multipart uploads read on start up */
+ int has_old_uploads;
+
+ struct mk_list uploads;
+
+ int preserve_data_ordering;
+ int upload_queue_success;
+ struct mk_list upload_queue;
+
+ size_t file_size;
+ size_t upload_chunk_size;
+ time_t upload_timeout;
+ time_t retry_time;
+
+ int timer_created;
+ int timer_ms;
+ int key_fmt_has_uuid;
+
+ uint64_t seq_index;
+ int key_fmt_has_seq_index;
+ flb_sds_t metadata_dir;
+ flb_sds_t seq_index_file;
+
+ struct flb_output_instance *ins;
+};
+
+int upload_part(struct flb_s3 *ctx, struct multipart_upload *m_upload,
+ char *body, size_t body_size);
+
+int create_multipart_upload(struct flb_s3 *ctx,
+ struct multipart_upload *m_upload);
+
+int complete_multipart_upload(struct flb_s3 *ctx,
+ struct multipart_upload *m_upload);
+
+void multipart_read_uploads_from_fs(struct flb_s3 *ctx);
+
+void multipart_upload_destroy(struct multipart_upload *m_upload);
+
+struct flb_http_client *mock_s3_call(char *error_env_var, char *api);
+int s3_plugin_under_test();
+
+int get_md5_base64(char *buf, size_t buf_size, char *md5_str, size_t md5_str_size);
+
+int create_headers(struct flb_s3 *ctx, char *body_md5,
+ struct flb_aws_header **headers, int *num_headers,
+ int multipart_upload);
+
+#endif
diff --git a/src/fluent-bit/plugins/out_s3/s3_multipart.c b/src/fluent-bit/plugins/out_s3/s3_multipart.c
new file mode 100644
index 000000000..1eb2a1061
--- /dev/null
+++ b/src/fluent-bit/plugins/out_s3/s3_multipart.c
@@ -0,0 +1,707 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_slist.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_config_map.h>
+#include <fluent-bit/flb_aws_util.h>
+#include <fluent-bit/flb_signv4.h>
+#include <fluent-bit/flb_fstore.h>
+#include <ctype.h>
+
+#include "s3.h"
+#include "s3_store.h"
+
+#define COMPLETE_MULTIPART_UPLOAD_BASE_LEN 100
+#define COMPLETE_MULTIPART_UPLOAD_PART_LEN 124
+
+flb_sds_t get_etag(char *response, size_t size);
+
+static inline int try_to_write(char *buf, int *off, size_t left,
+ const char *str, size_t str_len)
+{
+ if (str_len <= 0){
+ str_len = strlen(str);
+ }
+ if (left <= *off+str_len) {
+ return FLB_FALSE;
+ }
+ memcpy(buf+*off, str, str_len);
+ *off += str_len;
+ return FLB_TRUE;
+}
+
+
+/* the 'tag' or key in the upload_dir is s3_key + \n + upload_id */
+static flb_sds_t upload_key(struct multipart_upload *m_upload)
+{
+ flb_sds_t key;
+ flb_sds_t tmp;
+
+ key = flb_sds_create_size(64);
+
+ tmp = flb_sds_printf(&key, "%s\n%s", m_upload->s3_key, m_upload->upload_id);
+ if (!tmp) {
+ flb_errno();
+ flb_sds_destroy(key);
+ return NULL;
+ }
+ key = tmp;
+
+ return key;
+}
+
+/* the 'tag' or key in the upload_dir is s3_key + \n + upload_id */
+static int upload_data_from_key(struct multipart_upload *m_upload, char *key)
+{
+ flb_sds_t tmp_sds;
+ int len = 0;
+ int original_len;
+ char *tmp;
+
+ original_len = strlen(key);
+
+ tmp = strchr(key, '\n');
+ if (!tmp) {
+ return -1;
+ }
+
+ len = tmp - key;
+ tmp_sds = flb_sds_create_len(key, len);
+ if (!tmp_sds) {
+ flb_errno();
+ return -1;
+ }
+ m_upload->s3_key = tmp_sds;
+
+ tmp++;
+ original_len -= (len + 1);
+
+ tmp_sds = flb_sds_create_len(tmp, original_len);
+ if (!tmp_sds) {
+ flb_errno();
+ return -1;
+ }
+ m_upload->upload_id = tmp_sds;
+
+ return 0;
+}
+
+/* parse etags from file data */
+static void parse_etags(struct multipart_upload *m_upload, char *data)
+{
+ char *line = data;
+ char *start;
+ char *end;
+ flb_sds_t etag;
+ int part_num;
+ int len;
+
+ if (!data) {
+ return;
+ }
+
+ line = strtok(data, "\n");
+
+ if (!line) {
+ return;
+ }
+
+ do {
+ start = strstr(line, "part_number=");
+ if (!start) {
+ return;
+ }
+ start += 12;
+ end = strchr(start, '\t');
+ if (!end) {
+ flb_debug("[s3 restart parser] Did not find tab separator in line %s", start);
+ return;
+ }
+ *end = '\0';
+ part_num = atoi(start);
+ if (part_num <= 0) {
+ flb_debug("[s3 restart parser] Could not parse part_number from %s", start);
+ return;
+ }
+ m_upload->part_number = part_num;
+ *end = '\t';
+
+ start = strstr(line, "tag=");
+ if (!start) {
+ flb_debug("[s3 restart parser] Could not find 'etag=' %s", line);
+ return;
+ }
+
+ start += 4;
+ len = strlen(start);
+
+ if (len <= 0) {
+ flb_debug("[s3 restart parser] Could not find etag %s", line);
+ return;
+ }
+
+ etag = flb_sds_create_len(start, len);
+ if (!etag) {
+ flb_debug("[s3 restart parser] Could create etag");
+ return;
+ }
+ flb_debug("[s3 restart parser] found part number %d=%s", part_num, etag);
+ m_upload->etags[part_num - 1] = etag;
+
+ line = strtok(NULL, "\n");
+ } while (line != NULL);
+}
+
+static struct multipart_upload *upload_from_file(struct flb_s3 *ctx,
+ struct flb_fstore_file *fsf)
+{
+ struct multipart_upload *m_upload = NULL;
+ char *buffered_data = NULL;
+ size_t buffer_size = 0;
+ int ret;
+
+ ret = s3_store_file_upload_read(ctx, fsf, &buffered_data, &buffer_size);
+ if (ret < 0) {
+ flb_plg_error(ctx->ins, "Could not read locally buffered data %s",
+ fsf->name);
+ return NULL;
+ }
+
+ /* always make sure we have a fresh copy of metadata */
+ ret = s3_store_file_meta_get(ctx, fsf);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "Could not read file metadata: %s",
+ fsf->name);
+ return NULL;
+ }
+
+ m_upload = flb_calloc(1, sizeof(struct multipart_upload));
+ if (!m_upload) {
+ flb_errno();
+ flb_free(buffered_data);
+ return NULL;
+ }
+ m_upload->init_time = time(NULL);
+ m_upload->upload_state = MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS;
+
+ ret = upload_data_from_key(m_upload, fsf->meta_buf);
+ if (ret < 0) {
+ flb_plg_error(ctx->ins, "Could not extract upload data from: %s",
+ fsf->name);
+ flb_free(buffered_data);
+ multipart_upload_destroy(m_upload);
+ return NULL;
+ }
+
+ parse_etags(m_upload, buffered_data);
+ flb_free(buffered_data);
+ if (m_upload->part_number == 0) {
+ flb_plg_error(ctx->ins, "Could not extract upload data from %s",
+ fsf->name);
+ multipart_upload_destroy(m_upload);
+ return NULL;
+ }
+
+ /* code expects it to be 1 more than the last part read */
+ m_upload->part_number++;
+
+ return m_upload;
+}
+
+void multipart_read_uploads_from_fs(struct flb_s3 *ctx)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct multipart_upload *m_upload = NULL;
+ struct flb_fstore_file *fsf;
+
+ mk_list_foreach_safe(head, tmp, &ctx->stream_upload->files) {
+ fsf = mk_list_entry(head, struct flb_fstore_file, _head);
+ m_upload = upload_from_file(ctx, fsf);
+ if (!m_upload) {
+ flb_plg_error(ctx->ins,
+ "Could not process multipart upload data in %s",
+ fsf->name);
+ continue;
+ }
+ mk_list_add(&m_upload->_head, &ctx->uploads);
+ flb_plg_info(ctx->ins,
+ "Successfully read existing upload from file system, s3_key=%s",
+ m_upload->s3_key);
+ }
+}
+
+/* store list of part number and etag */
+static flb_sds_t upload_data(flb_sds_t etag, int part_num)
+{
+ flb_sds_t data;
+ flb_sds_t tmp;
+
+ data = flb_sds_create_size(64);
+
+ tmp = flb_sds_printf(&data, "part_number=%d\tetag=%s\n", part_num, etag);
+ if (!tmp) {
+ flb_errno();
+ flb_sds_destroy(data);
+ return NULL;
+ }
+ data = tmp;
+
+ return data;
+}
+
+/* persists upload data to the file system */
+static int save_upload(struct flb_s3 *ctx, struct multipart_upload *m_upload,
+ flb_sds_t etag)
+{
+ int ret;
+ flb_sds_t key;
+ flb_sds_t data;
+ struct flb_fstore_file *fsf;
+
+ key = upload_key(m_upload);
+ if (!key) {
+ flb_plg_debug(ctx->ins, "Could not constuct upload key for buffer dir");
+ return -1;
+ }
+
+ data = upload_data(etag, m_upload->part_number);
+ if (!data) {
+ flb_plg_debug(ctx->ins, "Could not constuct upload key for buffer dir");
+ return -1;
+ }
+
+ fsf = s3_store_file_upload_get(ctx, key, flb_sds_len(key));
+
+ /* Write the key to the file */
+ ret = s3_store_file_upload_put(ctx, fsf, key, data);
+
+ flb_sds_destroy(key);
+ flb_sds_destroy(data);
+
+ return ret;
+}
+
+static int remove_upload_from_fs(struct flb_s3 *ctx, struct multipart_upload *m_upload)
+{
+ flb_sds_t key;
+ struct flb_fstore_file *fsf;
+
+ key = upload_key(m_upload);
+ if (!key) {
+ flb_plg_debug(ctx->ins, "Could not construct upload key");
+ return -1;
+ }
+
+ fsf = s3_store_file_upload_get(ctx, key, flb_sds_len(key));
+ if (fsf) {
+ s3_store_file_upload_delete(ctx, fsf);
+ }
+ flb_sds_destroy(key);
+ return 0;
+}
+
+/*
+ * https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
+ */
+static int complete_multipart_upload_payload(struct flb_s3 *ctx,
+ struct multipart_upload *m_upload,
+ char **out_buf, size_t *out_size)
+{
+ char *buf;
+ int i;
+ int offset = 0;
+ flb_sds_t etag;
+ size_t size = COMPLETE_MULTIPART_UPLOAD_BASE_LEN;
+ char part_num[7];
+
+ size = size + (COMPLETE_MULTIPART_UPLOAD_PART_LEN * m_upload->part_number);
+
+ buf = flb_malloc(size + 1);
+ if (!buf) {
+ flb_errno();
+ return -1;
+ }
+
+ if (!try_to_write(buf, &offset, size,
+ "<CompleteMultipartUpload xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">", 73)) {
+ goto error;
+ }
+
+ for (i = 0; i < m_upload->part_number; i++) {
+ etag = m_upload->etags[i];
+ if (etag == NULL) {
+ continue;
+ }
+ if (!try_to_write(buf, &offset, size,
+ "<Part><ETag>", 12)) {
+ goto error;
+ }
+
+ if (!try_to_write(buf, &offset, size,
+ etag, 0)) {
+ goto error;
+ }
+
+ if (!try_to_write(buf, &offset, size,
+ "</ETag><PartNumber>", 19)) {
+ goto error;
+ }
+
+ if (!sprintf(part_num, "%d", i + 1)) {
+ goto error;
+ }
+
+ if (!try_to_write(buf, &offset, size,
+ part_num, 0)) {
+ goto error;
+ }
+
+ if (!try_to_write(buf, &offset, size,
+ "</PartNumber></Part>", 20)) {
+ goto error;
+ }
+ }
+
+ if (!try_to_write(buf, &offset, size,
+ "</CompleteMultipartUpload>", 26)) {
+ goto error;
+ }
+
+ buf[offset] = '\0';
+
+ *out_buf = buf;
+ *out_size = offset;
+ return 0;
+
+error:
+ flb_free(buf);
+ flb_plg_error(ctx->ins, "Failed to construct CompleteMultipartUpload "
+ "request body");
+ return -1;
+}
+
+int complete_multipart_upload(struct flb_s3 *ctx,
+ struct multipart_upload *m_upload)
+{
+ char *body;
+ size_t size;
+ flb_sds_t uri = NULL;
+ flb_sds_t tmp;
+ int ret;
+ struct flb_http_client *c = NULL;
+ struct flb_aws_client *s3_client;
+
+ if (!m_upload->upload_id) {
+ flb_plg_error(ctx->ins, "Cannot complete multipart upload for key %s: "
+ "upload ID is unset ", m_upload->s3_key);
+ return -1;
+ }
+
+ uri = flb_sds_create_size(flb_sds_len(m_upload->s3_key) + 11 +
+ flb_sds_len(m_upload->upload_id));
+ if (!uri) {
+ flb_errno();
+ return -1;
+ }
+
+ tmp = flb_sds_printf(&uri, "/%s%s?uploadId=%s", ctx->bucket,
+ m_upload->s3_key, m_upload->upload_id);
+ if (!tmp) {
+ flb_sds_destroy(uri);
+ return -1;
+ }
+ uri = tmp;
+
+ ret = complete_multipart_upload_payload(ctx, m_upload, &body, &size);
+ if (ret < 0) {
+ flb_sds_destroy(uri);
+ return -1;
+ }
+
+ s3_client = ctx->s3_client;
+ if (s3_plugin_under_test() == FLB_TRUE) {
+ c = mock_s3_call("TEST_COMPLETE_MULTIPART_UPLOAD_ERROR", "CompleteMultipartUpload");
+ }
+ else {
+ c = s3_client->client_vtable->request(s3_client, FLB_HTTP_POST,
+ uri, body, size,
+ NULL, 0);
+ }
+ flb_sds_destroy(uri);
+ flb_free(body);
+ if (c) {
+ flb_plg_debug(ctx->ins, "CompleteMultipartUpload http status=%d",
+ c->resp.status);
+ if (c->resp.status == 200) {
+ flb_plg_info(ctx->ins, "Successfully completed multipart upload "
+ "for %s, UploadId=%s", m_upload->s3_key,
+ m_upload->upload_id);
+ flb_http_client_destroy(c);
+ /* remove this upload from the file system */
+ remove_upload_from_fs(ctx, m_upload);
+ return 0;
+ }
+ flb_aws_print_xml_error(c->resp.payload, c->resp.payload_size,
+ "CompleteMultipartUpload", ctx->ins);
+ if (c->resp.payload != NULL) {
+ flb_plg_debug(ctx->ins, "Raw CompleteMultipartUpload response: %s",
+ c->resp.payload);
+ }
+ flb_http_client_destroy(c);
+ }
+
+ flb_plg_error(ctx->ins, "CompleteMultipartUpload request failed");
+ return -1;
+}
+
+
+int create_multipart_upload(struct flb_s3 *ctx,
+ struct multipart_upload *m_upload)
+{
+ flb_sds_t uri = NULL;
+ flb_sds_t tmp;
+ struct flb_http_client *c = NULL;
+ struct flb_aws_client *s3_client;
+ struct flb_aws_header *headers = NULL;
+ int num_headers = 0;
+ int ret;
+
+ uri = flb_sds_create_size(flb_sds_len(m_upload->s3_key) + 8);
+ if (!uri) {
+ flb_errno();
+ return -1;
+ }
+
+ tmp = flb_sds_printf(&uri, "/%s%s?uploads=", ctx->bucket, m_upload->s3_key);
+ if (!tmp) {
+ flb_sds_destroy(uri);
+ return -1;
+ }
+ uri = tmp;
+
+ s3_client = ctx->s3_client;
+ if (s3_plugin_under_test() == FLB_TRUE) {
+ c = mock_s3_call("TEST_CREATE_MULTIPART_UPLOAD_ERROR", "CreateMultipartUpload");
+ }
+ else {
+ ret = create_headers(ctx, NULL, &headers, &num_headers, FLB_TRUE);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "Failed to create headers");
+ flb_sds_destroy(uri);
+ return -1;
+ }
+ c = s3_client->client_vtable->request(s3_client, FLB_HTTP_POST,
+ uri, NULL, 0, headers, num_headers);
+ if (headers) {
+ flb_free(headers);
+ }
+ }
+ flb_sds_destroy(uri);
+ if (c) {
+ flb_plg_debug(ctx->ins, "CreateMultipartUpload http status=%d",
+ c->resp.status);
+ if (c->resp.status == 200) {
+ tmp = flb_aws_xml_get_val(c->resp.payload, c->resp.payload_size,
+ "<UploadId>", "</UploadId>");
+ if (!tmp) {
+ flb_plg_error(ctx->ins, "Could not find upload ID in "
+ "CreateMultipartUpload response");
+ flb_plg_debug(ctx->ins, "Raw CreateMultipartUpload response: %s",
+ c->resp.payload);
+ flb_http_client_destroy(c);
+ return -1;
+ }
+ m_upload->upload_id = tmp;
+ flb_plg_info(ctx->ins, "Successfully initiated multipart upload "
+ "for %s, UploadId=%s", m_upload->s3_key,
+ m_upload->upload_id);
+ flb_http_client_destroy(c);
+ return 0;
+ }
+ flb_aws_print_xml_error(c->resp.payload, c->resp.payload_size,
+ "CreateMultipartUpload", ctx->ins);
+ if (c->resp.payload != NULL) {
+ flb_plg_debug(ctx->ins, "Raw CreateMultipartUpload response: %s",
+ c->resp.payload);
+ }
+ flb_http_client_destroy(c);
+ }
+
+ flb_plg_error(ctx->ins, "CreateMultipartUpload request failed");
+ return -1;
+}
+
+/* gets the ETag value from response headers */
+flb_sds_t get_etag(char *response, size_t size)
+{
+ char *tmp;
+ int start;
+ int end;
+ int len;
+ int i = 0;
+ flb_sds_t etag;
+
+ if (response == NULL) {
+ return NULL;
+ }
+
+ tmp = strstr(response, "ETag:");
+ if (!tmp) {
+ return NULL;
+ }
+ i = tmp - response;
+
+ /* advance to end of ETag key */
+ i += 5;
+
+ /* advance across any whitespace and the opening quote */
+ while (i < size && (response[i] == '\"' || isspace(response[i]) != 0)) {
+ i++;
+ }
+ start = i;
+ /* advance until we hit whitespace or the end quote */
+ while (i < size && (response[i] != '\"' && isspace(response[i]) == 0)) {
+ i++;
+ }
+ end = i;
+ len = end - start;
+
+ etag = flb_sds_create_len(response + start, len);
+ if (!etag) {
+ flb_errno();
+ return NULL;
+ }
+
+ return etag;
+}
+
+int upload_part(struct flb_s3 *ctx, struct multipart_upload *m_upload,
+ char *body, size_t body_size)
+{
+ flb_sds_t uri = NULL;
+ flb_sds_t tmp;
+ int ret;
+ struct flb_http_client *c = NULL;
+ struct flb_aws_client *s3_client;
+ struct flb_aws_header *headers = NULL;
+ int num_headers = 0;
+ char body_md5[25];
+
+ uri = flb_sds_create_size(flb_sds_len(m_upload->s3_key) + 8);
+ if (!uri) {
+ flb_errno();
+ return -1;
+ }
+
+ tmp = flb_sds_printf(&uri, "/%s%s?partNumber=%d&uploadId=%s",
+ ctx->bucket, m_upload->s3_key, m_upload->part_number,
+ m_upload->upload_id);
+ if (!tmp) {
+ flb_errno();
+ flb_sds_destroy(uri);
+ return -1;
+ }
+ uri = tmp;
+
+ memset(body_md5, 0, sizeof(body_md5));
+ if (ctx->send_content_md5 == FLB_TRUE) {
+ ret = get_md5_base64(body, body_size, body_md5, sizeof(body_md5));
+ if (ret != 0) {
+ flb_plg_error(ctx->ins, "Failed to create Content-MD5 header");
+ flb_sds_destroy(uri);
+ return -1;
+ }
+
+ num_headers = 1;
+ headers = flb_malloc(sizeof(struct flb_aws_header) * num_headers);
+ if (headers == NULL) {
+ flb_errno();
+ flb_sds_destroy(uri);
+ return -1;
+ }
+
+ headers[0].key = "Content-MD5";
+ headers[0].key_len = 11;
+ headers[0].val = body_md5;
+ headers[0].val_len = strlen(body_md5);
+ }
+
+ s3_client = ctx->s3_client;
+ if (s3_plugin_under_test() == FLB_TRUE) {
+ c = mock_s3_call("TEST_UPLOAD_PART_ERROR", "UploadPart");
+ }
+ else {
+ c = s3_client->client_vtable->request(s3_client, FLB_HTTP_PUT,
+ uri, body, body_size,
+ headers, num_headers);
+ }
+ flb_free(headers);
+ flb_sds_destroy(uri);
+ if (c) {
+ flb_plg_info(ctx->ins, "UploadPart http status=%d",
+ c->resp.status);
+ if (c->resp.status == 200) {
+ tmp = get_etag(c->resp.data, c->resp.data_size);
+ if (!tmp) {
+ flb_plg_error(ctx->ins, "Could not find ETag in "
+ "UploadPart response");
+ flb_plg_debug(ctx->ins, "Raw UploadPart response: %s",
+ c->resp.payload);
+ flb_http_client_destroy(c);
+ return -1;
+ }
+ m_upload->etags[m_upload->part_number - 1] = tmp;
+ flb_plg_info(ctx->ins, "Successfully uploaded part #%d "
+ "for %s, UploadId=%s, ETag=%s", m_upload->part_number,
+ m_upload->s3_key, m_upload->upload_id, tmp);
+ flb_http_client_destroy(c);
+ /* track how many bytes are have gone toward this upload */
+ m_upload->bytes += body_size;
+
+ /* finally, attempt to persist the data for this upload */
+ ret = save_upload(ctx, m_upload, tmp);
+ if (ret == 0) {
+ flb_plg_debug(ctx->ins, "Successfully persisted upload data, UploadId=%s",
+ m_upload->upload_id);
+ }
+ else {
+ flb_plg_warn(ctx->ins, "Was not able to persisted upload data to disk; "
+ "if fluent bit dies without completing this upload the part "
+ "could be lost, UploadId=%s, ETag=%s",
+ m_upload->upload_id, tmp);
+ }
+ return 0;
+ }
+ flb_aws_print_xml_error(c->resp.payload, c->resp.payload_size,
+ "UploadPart", ctx->ins);
+ if (c->resp.payload != NULL) {
+ flb_plg_debug(ctx->ins, "Raw UploadPart response: %s",
+ c->resp.payload);
+ }
+ flb_http_client_destroy(c);
+ }
+
+ flb_plg_error(ctx->ins, "UploadPart request failed");
+ return -1;
+}
diff --git a/src/fluent-bit/plugins/out_s3/s3_store.c b/src/fluent-bit/plugins/out_s3/s3_store.c
new file mode 100644
index 000000000..8a9640633
--- /dev/null
+++ b/src/fluent-bit/plugins/out_s3/s3_store.c
@@ -0,0 +1,543 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_fstore.h>
+#include <fluent-bit/flb_time.h>
+
+#include "s3.h"
+#include "s3_store.h"
+
+static int s3_store_under_travis_ci()
+{
+
+ if (getenv("CI") != NULL && getenv("TRAVIS") != NULL) {
+ return FLB_TRUE;
+ }
+
+ return FLB_FALSE;
+}
+
+/*
+ * Simple and fast hashing algorithm to create keys in the local buffer
+ */
+static flb_sds_t gen_store_filename(const char *tag)
+{
+ int c;
+ unsigned long hash = 5381;
+ unsigned long hash2 = 5381;
+ flb_sds_t hash_str;
+ flb_sds_t tmp;
+ struct flb_time tm;
+
+ /* get current time */
+ flb_time_get(&tm);
+
+ /* compose hash */
+ while ((c = *tag++)) {
+ hash = ((hash << 5) + hash) + c; /* hash * 33 + c */
+ }
+ hash2 = (unsigned long) hash2 * tm.tm.tv_sec * tm.tm.tv_nsec;
+
+ /* flb_sds_printf allocs if the incoming sds is not at least 64 bytes */
+ hash_str = flb_sds_create_size(64);
+ if (!hash_str) {
+ flb_errno();
+ return NULL;
+ }
+ tmp = flb_sds_printf(&hash_str, "%lu-%lu", hash, hash2);
+ if (!tmp) {
+ flb_errno();
+ flb_sds_destroy(hash_str);
+ return NULL;
+ }
+ hash_str = tmp;
+
+ return hash_str;
+}
+
+/* Retrieve a candidate s3 local file using the tag */
+struct s3_file *s3_store_file_get(struct flb_s3 *ctx, const char *tag,
+ int tag_len)
+{
+ struct mk_list *head;
+ struct mk_list *tmp;
+ struct flb_fstore_file *fsf = NULL;
+ struct s3_file *s3_file;
+
+ /*
+ * Based in the current ctx->stream_name, locate a candidate file to
+ * store the incoming data using as a lookup pattern the content Tag.
+ */
+ mk_list_foreach_safe(head, tmp, &ctx->stream_active->files) {
+ fsf = mk_list_entry(head, struct flb_fstore_file, _head);
+
+ /* skip and warn on partially initialized chunks */
+ if (fsf->data == NULL) {
+ flb_plg_warn(ctx->ins, "BAD: found flb_fstore_file with NULL data reference, tag=%s, file=%s, will try to delete", tag, fsf->name);
+ flb_fstore_file_delete(ctx->fs, fsf);
+ }
+
+ if (fsf->meta_size != tag_len) {
+ fsf = NULL;
+ continue;
+ }
+
+ /* skip locked chunks */
+ s3_file = fsf->data;
+ if (s3_file->locked == FLB_TRUE) {
+ fsf = NULL;
+ continue;
+ }
+
+ /* compare meta and tag */
+ if (strncmp((char *) fsf->meta_buf, tag, tag_len) == 0) {
+ break;
+ }
+
+ /* not found, invalidate the reference */
+ fsf = NULL;
+ }
+
+ if (!fsf) {
+ return NULL;
+ }
+
+ return fsf->data;
+}
+
+/* Append data to a new or existing fstore file */
+int s3_store_buffer_put(struct flb_s3 *ctx, struct s3_file *s3_file,
+ const char *tag, int tag_len,
+ char *data, size_t bytes,
+ time_t file_first_log_time)
+{
+ int ret;
+ flb_sds_t name;
+ struct flb_fstore_file *fsf;
+ size_t space_remaining;
+
+ if (ctx->store_dir_limit_size > 0 && ctx->current_buffer_size + bytes >= ctx->store_dir_limit_size) {
+ flb_plg_error(ctx->ins, "Buffer is full: current_buffer_size=%zu, new_data=%zu, store_dir_limit_size=%zu bytes",
+ ctx->current_buffer_size, bytes, ctx->store_dir_limit_size);
+ return -1;
+ }
+
+ /* If no target file was found, create a new one */
+ if (!s3_file) {
+ name = gen_store_filename(tag);
+ if (!name) {
+ flb_plg_error(ctx->ins, "could not generate chunk file name");
+ return -1;
+ }
+
+ /* Create the file */
+ fsf = flb_fstore_file_create(ctx->fs, ctx->stream_active, name, bytes);
+ if (!fsf) {
+ flb_plg_error(ctx->ins, "could not create the file '%s' in the store",
+ name);
+ flb_sds_destroy(name);
+ return -1;
+ }
+ flb_sds_destroy(name);
+
+ /* Write tag as metadata */
+ ret = flb_fstore_file_meta_set(ctx->fs, fsf, (char *) tag, tag_len);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "error writing tag metadata");
+ flb_plg_warn(ctx->ins, "Deleting buffer file because metadata could not be written");
+ flb_fstore_file_delete(ctx->fs, fsf);
+ return -1;
+ }
+
+ /* Allocate local context */
+ s3_file = flb_calloc(1, sizeof(struct s3_file));
+ if (!s3_file) {
+ flb_errno();
+ flb_plg_error(ctx->ins, "cannot allocate s3 file context");
+ flb_plg_warn(ctx->ins, "Deleting buffer file because S3 context creation failed");
+ flb_fstore_file_delete(ctx->fs, fsf);
+ return -1;
+ }
+ s3_file->fsf = fsf;
+ s3_file->first_log_time = file_first_log_time;
+ s3_file->create_time = time(NULL);
+
+ /* Use fstore opaque 'data' reference to keep our context */
+ fsf->data = s3_file;
+ }
+ else {
+ fsf = s3_file->fsf;
+ }
+
+ /* Append data to the target file */
+ ret = flb_fstore_file_append(fsf, data, bytes);
+ if (ret != 0) {
+ flb_plg_error(ctx->ins, "error writing data to local s3 file");
+ return -1;
+ }
+ s3_file->size += bytes;
+ ctx->current_buffer_size += bytes;
+
+ /* if buffer is 95% full, warn user */
+ if (ctx->store_dir_limit_size > 0) {
+ space_remaining = ctx->store_dir_limit_size - ctx->current_buffer_size;
+ if ((space_remaining * 20) < ctx->store_dir_limit_size) {
+ flb_plg_warn(ctx->ins, "Buffer is almost full: current_buffer_size=%zu, store_dir_limit_size=%zu bytes",
+ ctx->current_buffer_size, ctx->store_dir_limit_size);
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+static int set_files_context(struct flb_s3 *ctx)
+{
+ struct mk_list *head;
+ struct mk_list *f_head;
+ struct flb_fstore_stream *fs_stream;
+ struct flb_fstore_file *fsf;
+ struct s3_file *s3_file;
+
+ mk_list_foreach(head, &ctx->fs->streams) {
+ fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head);
+
+ /* skip current stream since it's new */
+ if (fs_stream == ctx->stream_active) {
+ continue;
+ }
+
+ /* skip multi-upload */
+ if (fs_stream == ctx->stream_upload) {
+ continue;
+ }
+
+ mk_list_foreach(f_head, &fs_stream->files) {
+ fsf = mk_list_entry(f_head, struct flb_fstore_file, _head);
+ if (fsf->data) {
+ continue;
+ }
+
+ /* Allocate local context */
+ s3_file = flb_calloc(1, sizeof(struct s3_file));
+ if (!s3_file) {
+ flb_errno();
+ flb_plg_error(ctx->ins, "cannot allocate s3 file context");
+ continue;
+ }
+ s3_file->fsf = fsf;
+ s3_file->first_log_time = time(NULL);
+ s3_file->create_time = time(NULL);
+
+ /* Use fstore opaque 'data' reference to keep our context */
+ fsf->data = s3_file;
+ }
+ }
+
+ return 0;
+}
+
+/* Initialize filesystem storage for S3 plugin */
+int s3_store_init(struct flb_s3 *ctx)
+{
+ int type;
+ time_t now;
+ char tmp[64];
+ struct tm *tm;
+ struct flb_fstore *fs;
+ struct flb_fstore_stream *fs_stream;
+
+ if (s3_store_under_travis_ci() == FLB_TRUE) {
+ type = FLB_FSTORE_MEM;
+ flb_plg_warn(ctx->ins, "Travis CI test, using s3 store memory backend");
+ }
+ else {
+ type = FLB_FSTORE_FS;
+ }
+
+ /* Initialize the storage context */
+ fs = flb_fstore_create(ctx->buffer_dir, type);
+ if (!fs) {
+ return -1;
+ }
+ ctx->fs = fs;
+
+ /*
+ * On every start we create a new stream, this stream in the file system
+ * is directory with the name using the date like '2020-10-03T13:00:02'. So
+ * all the 'new' data that is generated on this process is stored there.
+ *
+ * Note that previous data in similar directories from previous runs is
+ * considered backlog data, in the S3 plugin we need to differenciate the
+ * new v/s the older buffered data.
+ *
+ * Compose a stream name...
+ */
+ now = time(NULL);
+ tm = localtime(&now);
+
+#ifdef FLB_SYSTEM_WINDOWS
+ /* Windows does not allow ':' in directory names */
+ strftime(tmp, sizeof(tmp) - 1, "%Y-%m-%dT%H-%M-%S", tm);
+#else
+ strftime(tmp, sizeof(tmp) - 1, "%Y-%m-%dT%H:%M:%S", tm);
+#endif
+
+ /* Create the stream */
+ fs_stream = flb_fstore_stream_create(ctx->fs, tmp);
+ if (!fs_stream) {
+ /* Upon exception abort */
+ flb_plg_error(ctx->ins, "could not initialize active stream: %s", tmp);
+ flb_fstore_destroy(fs);
+ ctx->fs = NULL;
+ return -1;
+ }
+ ctx->stream_active = fs_stream;
+
+ /* Multipart upload stream */
+ fs_stream = flb_fstore_stream_create(ctx->fs, "multipart_upload_metadata");
+ if (!fs_stream) {
+ flb_plg_error(ctx->ins, "could not initialize multipart_upload stream");
+ flb_fstore_destroy(fs);
+ ctx->fs = NULL;
+ return -1;
+ }
+ ctx->stream_upload = fs_stream;
+
+ set_files_context(ctx);
+ return 0;
+}
+
+int s3_store_exit(struct flb_s3 *ctx)
+{
+ struct mk_list *head;
+ struct mk_list *f_head;
+ struct flb_fstore_stream *fs_stream;
+ struct flb_fstore_file *fsf;
+ struct s3_file *s3_file;
+
+ if (!ctx->fs) {
+ return 0;
+ }
+
+ /* release local context on non-multi upload files */
+ mk_list_foreach(head, &ctx->fs->streams) {
+ fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head);
+ if (fs_stream == ctx->stream_upload) {
+ continue;
+ }
+
+ mk_list_foreach(f_head, &fs_stream->files) {
+ fsf = mk_list_entry(f_head, struct flb_fstore_file, _head);
+ if (fsf->data != NULL) {
+ s3_file = fsf->data;
+ flb_sds_destroy(s3_file->file_path);
+ flb_free(s3_file);
+ }
+ }
+ }
+
+ if (ctx->fs) {
+ flb_fstore_destroy(ctx->fs);
+ }
+ return 0;
+}
+
+/*
+ * Check if the store has data. This function is only used on plugin
+ * initialization
+ */
+int s3_store_has_data(struct flb_s3 *ctx)
+{
+ struct mk_list *head;
+ struct flb_fstore_stream *fs_stream;
+
+ if (!ctx->fs) {
+ return FLB_FALSE;
+ }
+
+ mk_list_foreach(head, &ctx->fs->streams) {
+ /* skip multi upload stream */
+ fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head);
+ if (fs_stream == ctx->stream_upload) {
+ continue;
+ }
+
+ if (mk_list_size(&fs_stream->files) > 0) {
+ return FLB_TRUE;
+ }
+ }
+
+ return FLB_FALSE;
+}
+
+int s3_store_has_uploads(struct flb_s3 *ctx)
+{
+ if (!ctx || !ctx->stream_upload) {
+ return FLB_FALSE;
+ }
+
+ if (mk_list_size(&ctx->stream_upload->files) > 0) {
+ return FLB_TRUE;
+ }
+
+ return FLB_FALSE;
+}
+
+int s3_store_file_inactive(struct flb_s3 *ctx, struct s3_file *s3_file)
+{
+ int ret;
+ struct flb_fstore_file *fsf;
+
+ fsf = s3_file->fsf;
+ flb_free(s3_file);
+ ret = flb_fstore_file_inactive(ctx->fs, fsf);
+
+ return ret;
+}
+
+int s3_store_file_delete(struct flb_s3 *ctx, struct s3_file *s3_file)
+{
+ struct flb_fstore_file *fsf;
+
+ fsf = s3_file->fsf;
+ ctx->current_buffer_size -= s3_file->size;
+
+ /* permanent deletion */
+ flb_fstore_file_delete(ctx->fs, fsf);
+ flb_free(s3_file);
+
+ return 0;
+}
+
+int s3_store_file_read(struct flb_s3 *ctx, struct s3_file *s3_file,
+ char **out_buf, size_t *out_size)
+{
+ int ret;
+
+ ret = flb_fstore_file_content_copy(ctx->fs, s3_file->fsf,
+ (void **) out_buf, out_size);
+ return ret;
+}
+
+int s3_store_file_upload_read(struct flb_s3 *ctx, struct flb_fstore_file *fsf,
+ char **out_buf, size_t *out_size)
+{
+ int ret;
+
+ ret = flb_fstore_file_content_copy(ctx->fs, fsf,
+ (void **) out_buf, out_size);
+ return ret;
+}
+
+struct flb_fstore_file *s3_store_file_upload_get(struct flb_s3 *ctx,
+ char *key, int key_len)
+{
+ struct mk_list *head;
+ struct flb_fstore_file *fsf = NULL;
+
+ mk_list_foreach(head, &ctx->stream_upload->files) {
+ fsf = mk_list_entry(head, struct flb_fstore_file, _head);
+ if (fsf->meta_buf == NULL) {
+ continue;
+ }
+
+ if (fsf->meta_size != key_len ){
+ continue;
+ }
+
+ if (strncmp(fsf->meta_buf, key, key_len) == 0) {
+ break;
+ }
+ fsf = NULL;
+ }
+
+ return fsf;
+}
+
+/* param fsf can NULL if the file has not yet been created */
+int s3_store_file_upload_put(struct flb_s3 *ctx,
+ struct flb_fstore_file *fsf, flb_sds_t key,
+ flb_sds_t data)
+{
+ int ret;
+ flb_sds_t name;
+
+ /* If no target file was found, create a new one */
+ if (!fsf) {
+ name = gen_store_filename(key);
+ if (!name) {
+ flb_plg_error(ctx->ins, "could not generate chunk file name");
+ return -1;
+ }
+
+ /* Create the file */
+ fsf = flb_fstore_file_create(ctx->fs, ctx->stream_upload, name, flb_sds_len(data));
+ if (!fsf) {
+ flb_plg_error(ctx->ins, "could not create the file '%s' in the upload store",
+ name);
+ flb_sds_destroy(name);
+ return -1;
+ }
+ flb_sds_destroy(name);
+
+ /* Write key as metadata */
+ ret = flb_fstore_file_meta_set(ctx->fs, fsf,
+ key, flb_sds_len(key));
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "error writing upload metadata");
+ flb_plg_warn(ctx->ins, "Deleting s3 upload cache file because metadata could not be written");
+ flb_fstore_file_delete(ctx->fs, fsf);
+ return -1;
+ }
+ }
+
+ /* Append data to the target file */
+ ret = flb_fstore_file_append(fsf, data, flb_sds_len(data));
+ if (ret != 0) {
+ flb_plg_error(ctx->ins, "error writing data to local s3 file");
+ return -1;
+ }
+
+ return 0;
+}
+
+int s3_store_file_upload_delete(struct flb_s3 *ctx, struct flb_fstore_file *fsf)
+{
+ /* permanent deletion */
+ flb_fstore_file_delete(ctx->fs, fsf);
+ return 0;
+}
+
+/* Always set an updated copy of metadata into the fs_store_file entry */
+int s3_store_file_meta_get(struct flb_s3 *ctx, struct flb_fstore_file *fsf)
+{
+ return flb_fstore_file_meta_get(ctx->fs, fsf);
+}
+
+void s3_store_file_lock(struct s3_file *s3_file)
+{
+ s3_file->locked = FLB_TRUE;
+}
+
+void s3_store_file_unlock(struct s3_file *s3_file)
+{
+ s3_file->locked = FLB_FALSE;
+}
diff --git a/src/fluent-bit/plugins/out_s3/s3_store.h b/src/fluent-bit/plugins/out_s3/s3_store.h
new file mode 100644
index 000000000..9caa7bdf4
--- /dev/null
+++ b/src/fluent-bit/plugins/out_s3/s3_store.h
@@ -0,0 +1,68 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_S3_STORE_H
+#define FLB_S3_STORE_H
+
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_fstore.h>
+
+struct s3_file {
+ int locked; /* locked chunk is busy, cannot write to it */
+ int failures; /* delivery failures */
+ size_t size; /* file size */
+ time_t create_time; /* creation time */
+ time_t first_log_time; /* first log time */
+ flb_sds_t file_path; /* file path */
+ struct flb_fstore_file *fsf; /* reference to parent flb_fstore_file */
+};
+
+int s3_store_buffer_put(struct flb_s3 *ctx, struct s3_file *s3_file,
+ const char *tag, int tag_len,
+ char *data, size_t bytes,
+ time_t file_first_log_time);
+
+int s3_store_init(struct flb_s3 *ctx);
+int s3_store_exit(struct flb_s3 *ctx);
+
+int s3_store_has_data(struct flb_s3 *ctx);
+int s3_store_has_uploads(struct flb_s3 *ctx);
+
+int s3_store_file_inactive(struct flb_s3 *ctx, struct s3_file *s3_file);
+struct s3_file *s3_store_file_get(struct flb_s3 *ctx, const char *tag,
+ int tag_len);
+int s3_store_file_delete(struct flb_s3 *ctx, struct s3_file *s3_file);
+int s3_store_file_read(struct flb_s3 *ctx, struct s3_file *s3_file,
+ char **out_buf, size_t *out_size);
+int s3_store_file_upload_read(struct flb_s3 *ctx, struct flb_fstore_file *fsf,
+ char **out_buf, size_t *out_size);
+struct flb_fstore_file *s3_store_file_upload_get(struct flb_s3 *ctx,
+ char *key, int key_len);
+
+int s3_store_file_upload_put(struct flb_s3 *ctx,
+ struct flb_fstore_file *fsf, flb_sds_t key,
+ flb_sds_t data);
+int s3_store_file_upload_delete(struct flb_s3 *ctx, struct flb_fstore_file *fsf);
+
+int s3_store_file_meta_get(struct flb_s3 *ctx, struct flb_fstore_file *fsf);
+
+void s3_store_file_lock(struct s3_file *s3_file);
+void s3_store_file_unlock(struct s3_file *s3_file);
+
+#endif