diff options
Diffstat (limited to 'fluent-bit/plugins/out_s3')
-rw-r--r-- | fluent-bit/plugins/out_s3/CMakeLists.txt | 6 | ||||
-rw-r--r-- | fluent-bit/plugins/out_s3/s3.c | 2500 | ||||
-rw-r--r-- | fluent-bit/plugins/out_s3/s3.h | 203 | ||||
-rw-r--r-- | fluent-bit/plugins/out_s3/s3_multipart.c | 707 | ||||
-rw-r--r-- | fluent-bit/plugins/out_s3/s3_store.c | 543 | ||||
-rw-r--r-- | fluent-bit/plugins/out_s3/s3_store.h | 68 |
6 files changed, 4027 insertions, 0 deletions
diff --git a/fluent-bit/plugins/out_s3/CMakeLists.txt b/fluent-bit/plugins/out_s3/CMakeLists.txt new file mode 100644 index 00000000..94e04861 --- /dev/null +++ b/fluent-bit/plugins/out_s3/CMakeLists.txt @@ -0,0 +1,6 @@ +set(src + s3.c + s3_store.c + s3_multipart.c) + +FLB_PLUGIN(out_s3 "${src}" "") diff --git a/fluent-bit/plugins/out_s3/s3.c b/fluent-bit/plugins/out_s3/s3.c new file mode 100644 index 00000000..57e68e6e --- /dev/null +++ b/fluent-bit/plugins/out_s3/s3.c @@ -0,0 +1,2500 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_slist.h> +#include <fluent-bit/flb_time.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_config_map.h> +#include <fluent-bit/flb_aws_util.h> +#include <fluent-bit/aws/flb_aws_compress.h> +#include <fluent-bit/flb_hash.h> +#include <fluent-bit/flb_crypto.h> +#include <fluent-bit/flb_signv4.h> +#include <fluent-bit/flb_scheduler.h> +#include <fluent-bit/flb_gzip.h> +#include <fluent-bit/flb_base64.h> +#include <fluent-bit/flb_log_event_decoder.h> +#include <stdlib.h> +#include <sys/stat.h> + +#include <msgpack.h> + +#include "s3.h" +#include "s3_store.h" + +#define DEFAULT_S3_PORT 443 +#define DEFAULT_S3_INSECURE_PORT 80 + +static int construct_request_buffer(struct flb_s3 *ctx, flb_sds_t new_data, + struct s3_file *chunk, + char **out_buf, size_t *out_size); + +static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t file_first_log_time, + char *body, size_t body_size); + +static int put_all_chunks(struct flb_s3 *ctx); + +static void cb_s3_upload(struct flb_config *ctx, void *data); + +static struct multipart_upload *get_upload(struct flb_s3 *ctx, + const char *tag, int tag_len); + +static struct multipart_upload *create_upload(struct flb_s3 *ctx, + const char *tag, int tag_len, + time_t file_first_log_time); + +static void remove_from_queue(struct upload_queue *entry); + +static struct flb_aws_header content_encoding_header = { + .key = "Content-Encoding", + .key_len = 16, + .val = "gzip", + .val_len = 4, +}; + +static struct flb_aws_header content_type_header = { + .key = "Content-Type", + .key_len = 12, + .val = "", + .val_len = 0, +}; + +static struct flb_aws_header canned_acl_header = { + .key = "x-amz-acl", + .key_len = 9, + .val = "", + .val_len = 0, +}; + +static struct flb_aws_header content_md5_header = { + .key = "Content-MD5", + .key_len = 11, + .val = "", + .val_len = 0, +}; + +static struct flb_aws_header storage_class_header = { + .key = "x-amz-storage-class", + .key_len = 19, + .val = "", + .val_len = 0, +}; + +static char *mock_error_response(char *error_env_var) +{ + char *err_val = NULL; + char *error = NULL; + int len = 0; + + err_val = getenv(error_env_var); + if (err_val != NULL && strlen(err_val) > 0) { + error = flb_calloc(strlen(err_val) + 1, sizeof(char)); + if (error == NULL) { + flb_errno(); + return NULL; + } + + len = strlen(err_val); + memcpy(error, err_val, len); + error[len] = '\0'; + return error; + } + + return NULL; +} + +int s3_plugin_under_test() +{ + if (getenv("FLB_S3_PLUGIN_UNDER_TEST") != NULL) { + return FLB_TRUE; + } + + return FLB_FALSE; +} + +int create_headers(struct flb_s3 *ctx, char *body_md5, + struct flb_aws_header **headers, int *num_headers, + int multipart_upload) +{ + int n = 0; + int headers_len = 0; + struct flb_aws_header *s3_headers = NULL; + + if (ctx->content_type != NULL) { + headers_len++; + } + if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + headers_len++; + } + if (ctx->canned_acl != NULL) { + headers_len++; + } + if (body_md5 != NULL && strlen(body_md5) && multipart_upload == FLB_FALSE) { + headers_len++; + } + if (ctx->storage_class != NULL) { + headers_len++; + } + if (headers_len == 0) { + *num_headers = headers_len; + *headers = s3_headers; + return 0; + } + + s3_headers = flb_calloc(headers_len, sizeof(struct flb_aws_header)); + if (s3_headers == NULL) { + flb_errno(); + return -1; + } + + if (ctx->content_type != NULL) { + s3_headers[n] = content_type_header; + s3_headers[n].val = ctx->content_type; + s3_headers[n].val_len = strlen(ctx->content_type); + n++; + } + if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + s3_headers[n] = content_encoding_header; + n++; + } + if (ctx->canned_acl != NULL) { + s3_headers[n] = canned_acl_header; + s3_headers[n].val = ctx->canned_acl; + s3_headers[n].val_len = strlen(ctx->canned_acl); + n++; + } + if (body_md5 != NULL && strlen(body_md5) && multipart_upload == FLB_FALSE) { + s3_headers[n] = content_md5_header; + s3_headers[n].val = body_md5; + s3_headers[n].val_len = strlen(body_md5); + n++; + } + if (ctx->storage_class != NULL) { + s3_headers[n] = storage_class_header; + s3_headers[n].val = ctx->storage_class; + s3_headers[n].val_len = strlen(ctx->storage_class); + } + + *num_headers = headers_len; + *headers = s3_headers; + return 0; +}; + +struct flb_http_client *mock_s3_call(char *error_env_var, char *api) +{ + /* create an http client so that we can set the response */ + struct flb_http_client *c = NULL; + char *error = mock_error_response(error_env_var); + char *resp; + int len; + + c = flb_calloc(1, sizeof(struct flb_http_client)); + if (!c) { + flb_errno(); + flb_free(error); + return NULL; + } + mk_list_init(&c->headers); + + if (error != NULL) { + c->resp.status = 400; + /* resp.data is freed on destroy, payload is supposed to reference it */ + c->resp.data = error; + c->resp.payload = c->resp.data; + c->resp.payload_size = strlen(error); + } + else { + c->resp.status = 200; + c->resp.payload = ""; + c->resp.payload_size = 0; + if (strcmp(api, "CreateMultipartUpload") == 0) { + /* mocked success response */ + c->resp.payload = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" + "<InitiateMultipartUploadResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\n" + "<Bucket>example-bucket</Bucket>\n" + "<Key>example-object</Key>\n" + "<UploadId>VXBsb2FkIElEIGZvciA2aWWpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZA</UploadId>\n" + "</InitiateMultipartUploadResult>"; + c->resp.payload_size = strlen(c->resp.payload); + } + else if (strcmp(api, "UploadPart") == 0) { + /* mocked success response */ + resp = "Date: Mon, 1 Nov 2010 20:34:56 GMT\n" + "ETag: \"b54357faf0632cce46e942fa68356b38\"\n" + "Content-Length: 0\n" + "Connection: keep-alive\n" + "Server: AmazonS3"; + /* since etag is in the headers, this code uses resp.data */ + len = strlen(resp); + c->resp.data = flb_calloc(len + 1, sizeof(char)); + if (!c->resp.data) { + flb_errno(); + return NULL; + } + memcpy(c->resp.data, resp, len); + c->resp.data[len] = '\0'; + c->resp.data_size = len; + } + else { + c->resp.payload = ""; + c->resp.payload_size = 0; + } + } + + return c; +} + +static flb_sds_t concat_path(char *p1, char *p2) +{ + flb_sds_t dir; + flb_sds_t tmp; + + dir = flb_sds_create_size(64); + + tmp = flb_sds_printf(&dir, "%s/%s", p1, p2); + if (!tmp) { + flb_errno(); + flb_sds_destroy(dir); + return NULL; + } + dir = tmp; + + return dir; +} + +/* Reads in index value from metadata file and sets seq_index to value */ +static int read_seq_index(char *seq_index_file, uint64_t *seq_index) +{ + FILE *fp; + int ret; + + fp = fopen(seq_index_file, "r"); + if (fp == NULL) { + flb_errno(); + return -1; + } + + ret = fscanf(fp, "%"PRIu64, seq_index); + if (ret != 1) { + fclose(fp); + flb_errno(); + return -1; + } + + fclose(fp); + return 0; +} + +/* Writes index value to metadata file */ +static int write_seq_index(char *seq_index_file, uint64_t seq_index) +{ + FILE *fp; + int ret; + + fp = fopen(seq_index_file, "w+"); + if (fp == NULL) { + flb_errno(); + return -1; + } + + ret = fprintf(fp, "%"PRIu64, seq_index); + if (ret < 0) { + fclose(fp); + flb_errno(); + return -1; + } + + fclose(fp); + return 0; +} + +static int init_seq_index(void *context) { + int ret; + const char *tmp; + char tmp_buf[1024]; + struct flb_s3 *ctx = context; + + ctx->key_fmt_has_seq_index = FLB_TRUE; + + ctx->stream_metadata = flb_fstore_stream_create(ctx->fs, "sequence"); + if (!ctx->stream_metadata) { + flb_plg_error(ctx->ins, "could not initialize metadata stream"); + flb_fstore_destroy(ctx->fs); + ctx->fs = NULL; + return -1; + } + + /* Construct directories and file path names */ + ctx->metadata_dir = flb_sds_create(ctx->stream_metadata->path); + if (ctx->metadata_dir == NULL) { + flb_plg_error(ctx->ins, "Failed to create metadata path"); + flb_errno(); + return -1; + } + tmp = "/index_metadata"; + ret = flb_sds_cat_safe(&ctx->metadata_dir, tmp, strlen(tmp)); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to create metadata path"); + flb_errno(); + return -1; + } + + ctx->seq_index_file = flb_sds_create(ctx->metadata_dir); + if (ctx->seq_index_file == NULL) { + flb_plg_error(ctx->ins, "Failed to create sequential index file path"); + flb_errno(); + return -1; + } + tmp = "/seq_index_"; + ret = flb_sds_cat_safe(&ctx->seq_index_file, tmp, strlen(tmp)); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to create sequential index file path"); + flb_errno(); + return -1; + } + + sprintf(tmp_buf, "%d", ctx->ins->id); + ret = flb_sds_cat_safe(&ctx->seq_index_file, tmp_buf, strlen(tmp_buf)); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to create sequential index file path"); + flb_errno(); + return -1; + } + + /* Create directory path if it doesn't exist */ + ret = mkdir(ctx->metadata_dir, 0700); + if (ret < 0 && errno != EEXIST) { + flb_plg_error(ctx->ins, "Failed to create metadata directory"); + return -1; + } + + /* Check if index file doesn't exist and set index value */ + if (access(ctx->seq_index_file, F_OK) != 0) { + ctx->seq_index = 0; + ret = write_seq_index(ctx->seq_index_file, ctx->seq_index); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to write to sequential index metadata file"); + return -1; + } + } + else { + ret = read_seq_index(ctx->seq_index_file, &ctx->seq_index); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to read from sequential index " + "metadata file"); + return -1; + } + flb_plg_info(ctx->ins, "Successfully recovered index. " + "Continuing at index=%"PRIu64, ctx->seq_index); + } + return 0; +} + +void multipart_upload_destroy(struct multipart_upload *m_upload) +{ + int i; + flb_sds_t etag; + + if (!m_upload) { + return; + } + + if (m_upload->s3_key) { + flb_sds_destroy(m_upload->s3_key); + } + if (m_upload->tag) { + flb_sds_destroy(m_upload->tag); + } + if (m_upload->upload_id) { + flb_sds_destroy(m_upload->upload_id); + } + + for (i = 0; i < m_upload->part_number; i++) { + etag = m_upload->etags[i]; + if (etag) { + flb_sds_destroy(etag); + } + } + + flb_free(m_upload); +} + +static void s3_context_destroy(struct flb_s3 *ctx) +{ + struct mk_list *head; + struct mk_list *tmp; + struct multipart_upload *m_upload; + struct upload_queue *upload_contents; + + if (!ctx) { + return; + } + + if (ctx->base_provider) { + flb_aws_provider_destroy(ctx->base_provider); + } + + if (ctx->provider) { + flb_aws_provider_destroy(ctx->provider); + } + + if (ctx->provider_tls) { + flb_tls_destroy(ctx->provider_tls); + } + + if (ctx->sts_provider_tls) { + flb_tls_destroy(ctx->sts_provider_tls); + } + + if (ctx->s3_client) { + flb_aws_client_destroy(ctx->s3_client); + } + + if (ctx->client_tls) { + flb_tls_destroy(ctx->client_tls); + } + + if (ctx->free_endpoint == FLB_TRUE) { + flb_free(ctx->endpoint); + } + + if (ctx->buffer_dir) { + flb_sds_destroy(ctx->buffer_dir); + } + + if (ctx->metadata_dir) { + flb_sds_destroy(ctx->metadata_dir); + } + + if (ctx->seq_index_file) { + flb_sds_destroy(ctx->seq_index_file); + } + + /* Remove uploads */ + mk_list_foreach_safe(head, tmp, &ctx->uploads) { + m_upload = mk_list_entry(head, struct multipart_upload, _head); + mk_list_del(&m_upload->_head); + multipart_upload_destroy(m_upload); + } + + mk_list_foreach_safe(head, tmp, &ctx->upload_queue) { + upload_contents = mk_list_entry(head, struct upload_queue, _head); + s3_store_file_delete(ctx, upload_contents->upload_file); + multipart_upload_destroy(upload_contents->m_upload_file); + remove_from_queue(upload_contents); + } + + flb_free(ctx); +} + +static int cb_s3_init(struct flb_output_instance *ins, + struct flb_config *config, void *data) +{ + int ret; + flb_sds_t tmp_sds; + char *role_arn = NULL; + char *session_name; + const char *tmp; + struct flb_s3 *ctx = NULL; + struct flb_aws_client_generator *generator; + (void) config; + (void) data; + char *ep; + struct flb_split_entry *tok; + struct mk_list *split; + int list_size; + + ctx = flb_calloc(1, sizeof(struct flb_s3)); + if (!ctx) { + flb_errno(); + return -1; + } + ctx->ins = ins; + mk_list_init(&ctx->uploads); + mk_list_init(&ctx->upload_queue); + + ctx->retry_time = 0; + ctx->upload_queue_success = FLB_FALSE; + + /* Export context */ + flb_output_set_context(ins, ctx); + + /* initialize config map */ + ret = flb_output_config_map_set(ins, (void *) ctx); + if (ret == -1) { + return -1; + } + + /* the check against -1 is works here because size_t is unsigned + * and (int) -1 == unsigned max value + * Fluent Bit uses -1 (which becomes max value) to indicate undefined + */ + if (ctx->ins->total_limit_size != -1) { + flb_plg_warn(ctx->ins, "Please use 'store_dir_limit_size' with s3 output instead of 'storage.total_limit_size'. " + "S3 has its own buffer files located in the store_dir."); + } + + /* Date key */ + ctx->date_key = ctx->json_date_key; + tmp = flb_output_get_property("json_date_key", ins); + if (tmp) { + /* Just check if we have to disable it */ + if (flb_utils_bool(tmp) == FLB_FALSE) { + ctx->date_key = NULL; + } + } + + /* Date format for JSON output */ + ctx->json_date_format = FLB_PACK_JSON_DATE_ISO8601; + tmp = flb_output_get_property("json_date_format", ins); + if (tmp) { + ret = flb_pack_to_json_date_type(tmp); + if (ret == -1) { + flb_plg_error(ctx->ins, "invalid json_date_format '%s'. ", tmp); + return -1; + } + else { + ctx->json_date_format = ret; + } + } + + tmp = flb_output_get_property("bucket", ins); + if (!tmp) { + flb_plg_error(ctx->ins, "'bucket' is a required parameter"); + return -1; + } + + /* + * store_dir is the user input, buffer_dir is what the code uses + * We append the bucket name to the dir, to support multiple instances + * of this plugin using the same buffer dir + */ + tmp_sds = concat_path(ctx->store_dir, ctx->bucket); + if (!tmp_sds) { + flb_plg_error(ctx->ins, "Could not construct buffer path"); + return -1; + } + ctx->buffer_dir = tmp_sds; + + /* Initialize local storage */ + ret = s3_store_init(ctx); + if (ret == -1) { + flb_plg_error(ctx->ins, "Failed to initialize S3 storage: %s", + ctx->store_dir); + return -1; + } + + tmp = flb_output_get_property("s3_key_format", ins); + if (tmp) { + if (tmp[0] != '/') { + flb_plg_error(ctx->ins, "'s3_key_format' must start with a '/'"); + return -1; + } + if (strstr((char *) tmp, "$INDEX")) { + ret = init_seq_index(ctx); + if (ret < 0) { + return -1; + } + } + if (strstr((char *) tmp, "$UUID")) { + ctx->key_fmt_has_uuid = FLB_TRUE; + } + } + + /* validate 'total_file_size' */ + if (ctx->file_size <= 0) { + flb_plg_error(ctx->ins, "Failed to parse total_file_size %s", tmp); + return -1; + } + if (ctx->file_size < 1000000) { + flb_plg_error(ctx->ins, "total_file_size must be at least 1MB"); + return -1; + } + if (ctx->file_size > MAX_FILE_SIZE) { + flb_plg_error(ctx->ins, "Max total_file_size is %s bytes", MAX_FILE_SIZE_STR); + return -1; + } + flb_plg_info(ctx->ins, "Using upload size %lu bytes", ctx->file_size); + + if (ctx->use_put_object == FLB_FALSE && ctx->file_size < 2 * MIN_CHUNKED_UPLOAD_SIZE) { + flb_plg_info(ctx->ins, + "total_file_size is less than 10 MB, will use PutObject API"); + ctx->use_put_object = FLB_TRUE; + } + + tmp = flb_output_get_property("compression", ins); + if (tmp) { + ret = flb_aws_compression_get_type(tmp); + if (ret == -1) { + flb_plg_error(ctx->ins, "unknown compression: %s", tmp); + return -1; + } + if (ctx->use_put_object == FLB_FALSE && ctx->compression == FLB_AWS_COMPRESS_ARROW) { + flb_plg_error(ctx->ins, + "use_put_object must be enabled when Apache Arrow is enabled"); + return -1; + } + ctx->compression = ret; + } + + tmp = flb_output_get_property("content_type", ins); + if (tmp) { + ctx->content_type = (char *) tmp; + } + if (ctx->use_put_object == FLB_FALSE) { + /* upload_chunk_size */ + if (ctx->upload_chunk_size <= 0) { + flb_plg_error(ctx->ins, "Failed to parse upload_chunk_size %s", tmp); + return -1; + } + if (ctx->upload_chunk_size > ctx->file_size) { + flb_plg_error(ctx->ins, + "upload_chunk_size can not be larger than total_file_size"); + return -1; + } + if (ctx->upload_chunk_size < MIN_CHUNKED_UPLOAD_SIZE) { + flb_plg_error(ctx->ins, "upload_chunk_size must be at least 5,242,880 bytes"); + return -1; + } + if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + if(ctx->upload_chunk_size > MAX_CHUNKED_UPLOAD_COMPRESS_SIZE) { + flb_plg_error(ctx->ins, "upload_chunk_size in compressed multipart upload cannot exceed 5GB"); + return -1; + } + } else { + if (ctx->upload_chunk_size > MAX_CHUNKED_UPLOAD_SIZE) { + flb_plg_error(ctx->ins, "Max upload_chunk_size is 50MB"); + return -1; + } + } + } + + if (ctx->upload_chunk_size != MIN_CHUNKED_UPLOAD_SIZE && + (ctx->upload_chunk_size * 2) > ctx->file_size) { + flb_plg_error(ctx->ins, "total_file_size is less than 2x upload_chunk_size"); + return -1; + } + + if (ctx->use_put_object == FLB_TRUE) { + /* + * code internally uses 'upload_chunk_size' as the unit for each Put, + * regardless of which API is used to send data + */ + ctx->upload_chunk_size = ctx->file_size; + if (ctx->file_size > MAX_FILE_SIZE_PUT_OBJECT) { + flb_plg_error(ctx->ins, "Max total_file_size is 50M when use_put_object is enabled"); + return -1; + } + } + + tmp = flb_output_get_property("endpoint", ins); + if (tmp) { + ctx->insecure = strncmp(tmp, "http://", 7) == 0 ? FLB_TRUE : FLB_FALSE; + if (ctx->insecure == FLB_TRUE) { + ep = removeProtocol((char *) tmp, "http://"); + } + else { + ep = removeProtocol((char *) tmp, "https://"); + } + + split = flb_utils_split((const char *)ep, ':', 1); + if (!split) { + flb_errno(); + return -1; + } + list_size = mk_list_size(split); + if (list_size > 2) { + flb_plg_error(ctx->ins, "Failed to split endpoint"); + flb_utils_split_free(split); + return -1; + } + + tok = mk_list_entry_first(split, struct flb_split_entry, _head); + ctx->endpoint = flb_strndup(tok->value, tok->len); + if (!ctx->endpoint) { + flb_errno(); + flb_utils_split_free(split); + return -1; + } + ctx->free_endpoint = FLB_TRUE; + if (list_size == 2) { + tok = mk_list_entry_next(&tok->_head, struct flb_split_entry, _head, split); + ctx->port = atoi(tok->value); + } + else { + ctx->port = ctx->insecure == FLB_TRUE ? DEFAULT_S3_INSECURE_PORT : DEFAULT_S3_PORT; + } + flb_utils_split_free(split); + } + else { + /* default endpoint for the given region */ + ctx->endpoint = flb_aws_endpoint("s3", ctx->region); + ctx->insecure = FLB_FALSE; + ctx->port = DEFAULT_S3_PORT; + ctx->free_endpoint = FLB_TRUE; + if (!ctx->endpoint) { + flb_plg_error(ctx->ins, "Could not construct S3 endpoint"); + return -1; + } + } + + tmp = flb_output_get_property("sts_endpoint", ins); + if (tmp) { + ctx->sts_endpoint = (char *) tmp; + } + + tmp = flb_output_get_property("canned_acl", ins); + if (tmp) { + ctx->canned_acl = (char *) tmp; + } + + tmp = flb_output_get_property("storage_class", ins); + if (tmp) { + ctx->storage_class = (char *) tmp; + } + + if (ctx->insecure == FLB_FALSE) { + ctx->client_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, + ins->tls_verify, + ins->tls_debug, + ins->tls_vhost, + ins->tls_ca_path, + ins->tls_ca_file, + ins->tls_crt_file, + ins->tls_key_file, + ins->tls_key_passwd); + if (!ctx->client_tls) { + flb_plg_error(ctx->ins, "Failed to create tls context"); + return -1; + } + } + + /* AWS provider needs a separate TLS instance */ + ctx->provider_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, + FLB_TRUE, + ins->tls_debug, + ins->tls_vhost, + ins->tls_ca_path, + ins->tls_ca_file, + ins->tls_crt_file, + ins->tls_key_file, + ins->tls_key_passwd); + if (!ctx->provider_tls) { + flb_errno(); + return -1; + } + + ctx->provider = flb_standard_chain_provider_create(config, + ctx->provider_tls, + ctx->region, + ctx->sts_endpoint, + NULL, + flb_aws_client_generator(), + ctx->profile); + + if (!ctx->provider) { + flb_plg_error(ctx->ins, "Failed to create AWS Credential Provider"); + return -1; + } + + tmp = flb_output_get_property("role_arn", ins); + if (tmp) { + /* Use the STS Provider */ + ctx->base_provider = ctx->provider; + role_arn = (char *) tmp; + + /* STS provider needs yet another separate TLS instance */ + ctx->sts_provider_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, + FLB_TRUE, + ins->tls_debug, + ins->tls_vhost, + ins->tls_ca_path, + ins->tls_ca_file, + ins->tls_crt_file, + ins->tls_key_file, + ins->tls_key_passwd); + + if (!ctx->sts_provider_tls) { + flb_errno(); + return -1; + } + + session_name = flb_sts_session_name(); + if (!session_name) { + flb_plg_error(ctx->ins, "Failed to create aws iam role " + "session name"); + flb_errno(); + return -1; + } + + ctx->provider = flb_sts_provider_create(config, + ctx->sts_provider_tls, + ctx->base_provider, + ctx->external_id, + role_arn, + session_name, + ctx->region, + ctx->sts_endpoint, + NULL, + flb_aws_client_generator()); + flb_free(session_name); + if (!ctx->provider) { + flb_plg_error(ctx->ins, "Failed to create AWS STS Credential " + "Provider"); + return -1; + } + } + + /* read any remaining buffers from previous (failed) executions */ + ctx->has_old_buffers = s3_store_has_data(ctx); + ctx->has_old_uploads = s3_store_has_uploads(ctx); + + /* Multipart */ + multipart_read_uploads_from_fs(ctx); + + if (mk_list_size(&ctx->uploads) > 0) { + /* note that these should be sent */ + ctx->has_old_uploads = FLB_TRUE; + } + + /* create S3 client */ + generator = flb_aws_client_generator(); + ctx->s3_client = generator->create(); + if (!ctx->s3_client) { + return -1; + } + ctx->s3_client->name = "s3_client"; + ctx->s3_client->has_auth = FLB_TRUE; + ctx->s3_client->provider = ctx->provider; + ctx->s3_client->region = ctx->region; + ctx->s3_client->service = "s3"; + ctx->s3_client->port = ctx->port; + ctx->s3_client->flags = 0; + ctx->s3_client->proxy = NULL; + ctx->s3_client->s3_mode = S3_MODE_SIGNED_PAYLOAD; + ctx->s3_client->retry_requests = ctx->retry_requests; + + if (ctx->insecure == FLB_TRUE) { + ctx->s3_client->upstream = flb_upstream_create(config, ctx->endpoint, ctx->port, + FLB_IO_TCP, NULL); + } else { + ctx->s3_client->upstream = flb_upstream_create(config, ctx->endpoint, ctx->port, + FLB_IO_TLS, ctx->client_tls); + } + if (!ctx->s3_client->upstream) { + flb_plg_error(ctx->ins, "Connection initialization error"); + return -1; + } + + flb_output_upstream_set(ctx->s3_client->upstream, ctx->ins); + + ctx->s3_client->host = ctx->endpoint; + + /* set to sync mode and initialize credentials */ + ctx->provider->provider_vtable->sync(ctx->provider); + ctx->provider->provider_vtable->init(ctx->provider); + + ctx->timer_created = FLB_FALSE; + ctx->timer_ms = (int) (ctx->upload_timeout / 6) * 1000; + if (ctx->timer_ms > UPLOAD_TIMER_MAX_WAIT) { + ctx->timer_ms = UPLOAD_TIMER_MAX_WAIT; + } + else if (ctx->timer_ms < UPLOAD_TIMER_MIN_WAIT) { + ctx->timer_ms = UPLOAD_TIMER_MIN_WAIT; + } + + /* + * S3 must ALWAYS use sync mode + * In the timer thread we do a mk_list_foreach_safe on the queue of uplaods and chunks + * Iterating over those lists is not concurrent safe. If a flush call ran at the same time + * And deleted an item from the list, this could cause a crash/corruption. + */ + flb_stream_disable_async_mode(&ctx->s3_client->upstream->base); + + /* clean up any old buffers found on startup */ + if (ctx->has_old_buffers == FLB_TRUE) { + flb_plg_info(ctx->ins, + "Sending locally buffered data from previous " + "executions to S3; buffer=%s", + ctx->fs->root_path); + ctx->has_old_buffers = FLB_FALSE; + ret = put_all_chunks(ctx); + if (ret < 0) { + ctx->has_old_buffers = FLB_TRUE; + flb_plg_error(ctx->ins, + "Failed to send locally buffered data left over " + "from previous executions; will retry. Buffer=%s", + ctx->fs->root_path); + } + } + + /* clean up any old uploads found on start up */ + if (ctx->has_old_uploads == FLB_TRUE) { + flb_plg_info(ctx->ins, + "Completing multipart uploads from previous " + "executions to S3; buffer=%s", + ctx->stream_upload->path); + ctx->has_old_uploads = FLB_FALSE; + + /* + * we don't need to worry if this fails; it will retry each + * time the upload callback is called + */ + cb_s3_upload(config, ctx); + } + + /* this is done last since in the previous block we make calls to AWS */ + ctx->provider->provider_vtable->upstream_set(ctx->provider, ctx->ins); + + return 0; +} + +/* + * return value is one of FLB_OK, FLB_RETRY, FLB_ERROR + * + * Chunk is allowed to be NULL + */ +static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, + struct multipart_upload *m_upload, + char *body, size_t body_size, + const char *tag, int tag_len) +{ + int init_upload = FLB_FALSE; + int complete_upload = FLB_FALSE; + int size_check = FLB_FALSE; + int part_num_check = FLB_FALSE; + int timeout_check = FLB_FALSE; + int ret; + void *payload_buf = NULL; + size_t payload_size = 0; + size_t preCompress_size = 0; + time_t file_first_log_time = time(NULL); + + /* + * When chunk does not exist, file_first_log_time will be the current time. + * This is only for unit tests and prevents unit tests from segfaulting when chunk is + * NULL because if so chunk->first_log_time will be NULl either and will cause + * segfault during the process of put_object upload or mutipart upload. + */ + if (chunk != NULL) { + file_first_log_time = chunk->first_log_time; + } + + if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + /* Map payload */ + ret = flb_aws_compression_compress(ctx->compression, body, body_size, &payload_buf, &payload_size); + if (ret == -1) { + flb_plg_error(ctx->ins, "Failed to compress data"); + return FLB_RETRY; + } else { + preCompress_size = body_size; + body = (void *) payload_buf; + body_size = payload_size; + } + } + + if (ctx->use_put_object == FLB_TRUE) { + goto put_object; + } + + if (s3_plugin_under_test() == FLB_TRUE) { + init_upload = FLB_TRUE; + complete_upload = FLB_TRUE; + if (ctx->use_put_object == FLB_TRUE) { + goto put_object; + } + else { + goto multipart; + } + } + + if (m_upload == NULL) { + if (chunk != NULL && time(NULL) > + (chunk->create_time + ctx->upload_timeout + ctx->retry_time)) { + /* timeout already reached, just PutObject */ + goto put_object; + } + else if (body_size >= ctx->file_size) { + /* already big enough, just use PutObject API */ + goto put_object; + } + else if(body_size > MIN_CHUNKED_UPLOAD_SIZE) { + init_upload = FLB_TRUE; + goto multipart; + } + else { + if (ctx->use_put_object == FLB_FALSE && ctx->compression == FLB_AWS_COMPRESS_GZIP) { + flb_plg_info(ctx->ins, "Pre-compression upload_chunk_size= %zu, After compression, chunk is only %zu bytes, " + "the chunk was too small, using PutObject to upload", preCompress_size, body_size); + } + goto put_object; + } + } + else { + /* existing upload */ + if (body_size < MIN_CHUNKED_UPLOAD_SIZE) { + complete_upload = FLB_TRUE; + } + + goto multipart; + } + +put_object: + + /* + * remove chunk from buffer list + */ + ret = s3_put_object(ctx, tag, file_first_log_time, body, body_size); + if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + flb_free(payload_buf); + } + if (ret < 0) { + /* re-add chunk to list */ + if (chunk) { + s3_store_file_unlock(chunk); + chunk->failures += 1; + } + return FLB_RETRY; + } + + /* data was sent successfully- delete the local buffer */ + if (chunk) { + s3_store_file_delete(ctx, chunk); + } + return FLB_OK; + +multipart: + + if (init_upload == FLB_TRUE) { + m_upload = create_upload(ctx, tag, tag_len, file_first_log_time); + if (!m_upload) { + flb_plg_error(ctx->ins, "Could not find or create upload for tag %s", tag); + if (chunk) { + s3_store_file_unlock(chunk); + } + if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + flb_free(payload_buf); + } + return FLB_RETRY; + } + } + + if (m_upload->upload_state == MULTIPART_UPLOAD_STATE_NOT_CREATED) { + ret = create_multipart_upload(ctx, m_upload); + if (ret < 0) { + flb_plg_error(ctx->ins, "Could not initiate multipart upload"); + if (chunk) { + s3_store_file_unlock(chunk); + } + if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + flb_free(payload_buf); + } + return FLB_RETRY; + } + m_upload->upload_state = MULTIPART_UPLOAD_STATE_CREATED; + } + + ret = upload_part(ctx, m_upload, body, body_size); + if (ret < 0) { + if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + flb_free(payload_buf); + } + m_upload->upload_errors += 1; + /* re-add chunk to list */ + if (chunk) { + s3_store_file_unlock(chunk); + chunk->failures += 1; + } + return FLB_RETRY; + } + m_upload->part_number += 1; + /* data was sent successfully- delete the local buffer */ + if (chunk) { + s3_store_file_delete(ctx, chunk); + chunk = NULL; + } + if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + flb_free(payload_buf); + } + if (m_upload->bytes >= ctx->file_size) { + size_check = FLB_TRUE; + flb_plg_info(ctx->ins, "Will complete upload for %s because uploaded data is greater" + " than size set by total_file_size", m_upload->s3_key); + } + if (m_upload->part_number >= 10000) { + part_num_check = FLB_TRUE; + flb_plg_info(ctx->ins, "Will complete upload for %s because 10,000 chunks " + "(the API limit) have been uploaded", m_upload->s3_key); + } + if (time(NULL) > + (m_upload->init_time + ctx->upload_timeout + ctx->retry_time)) { + timeout_check = FLB_TRUE; + flb_plg_info(ctx->ins, "Will complete upload for %s because upload_timeout" + " has elapsed", m_upload->s3_key); + } + if (size_check || part_num_check || timeout_check) { + complete_upload = FLB_TRUE; + } + + if (complete_upload == FLB_TRUE) { + /* mark for completion- the upload timer will handle actual completion */ + m_upload->upload_state = MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS; + } + + return FLB_OK; +} + + +/* + * Attempts to send all chunks to S3 using PutObject + * Used on shut down to try to send all buffered data + * Used on start up to try to send any leftover buffers from previous executions + */ +static int put_all_chunks(struct flb_s3 *ctx) +{ + struct s3_file *chunk; + struct mk_list *tmp; + struct mk_list *head; + struct mk_list *f_head; + struct flb_fstore_file *fsf; + struct flb_fstore_stream *fs_stream; + void *payload_buf = NULL; + size_t payload_size = 0; + char *buffer = NULL; + size_t buffer_size; + int ret; + + mk_list_foreach(head, &ctx->fs->streams) { + /* skip multi upload stream */ + fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head); + if (fs_stream == ctx->stream_upload) { + continue; + } + /* skip metadata stream */ + if (fs_stream == ctx->stream_metadata) { + continue; + } + + mk_list_foreach_safe(f_head, tmp, &fs_stream->files) { + fsf = mk_list_entry(f_head, struct flb_fstore_file, _head); + chunk = fsf->data; + + /* Locked chunks are being processed, skip */ + if (chunk->locked == FLB_TRUE) { + continue; + } + + if (chunk->failures >= MAX_UPLOAD_ERRORS) { + flb_plg_warn(ctx->ins, + "Chunk for tag %s failed to send %i times, " + "will not retry", + (char *) fsf->meta_buf, MAX_UPLOAD_ERRORS); + flb_fstore_file_inactive(ctx->fs, fsf); + continue; + } + + ret = construct_request_buffer(ctx, NULL, chunk, + &buffer, &buffer_size); + if (ret < 0) { + flb_plg_error(ctx->ins, + "Could not construct request buffer for %s", + chunk->file_path); + return -1; + } + + if (ctx->compression != FLB_AWS_COMPRESS_NONE) { + /* Map payload */ + ret = flb_aws_compression_compress(ctx->compression, buffer, buffer_size, &payload_buf, &payload_size); + if (ret == -1) { + flb_plg_error(ctx->ins, "Failed to compress data, uploading uncompressed data instead to prevent data loss"); + } else { + flb_plg_info(ctx->ins, "Pre-compression chunk size is %zu, After compression, chunk is %zu bytes", buffer_size, payload_size); + buffer = (void *) payload_buf; + buffer_size = payload_size; + } + } + + ret = s3_put_object(ctx, (const char *) + fsf->meta_buf, + chunk->create_time, buffer, buffer_size); + flb_free(buffer); + if (ret < 0) { + s3_store_file_unlock(chunk); + chunk->failures += 1; + return -1; + } + + /* data was sent successfully- delete the local buffer */ + s3_store_file_delete(ctx, chunk); + } + } + + return 0; +} + +/* + * Either new_data or chunk can be NULL, but not both + */ +static int construct_request_buffer(struct flb_s3 *ctx, flb_sds_t new_data, + struct s3_file *chunk, + char **out_buf, size_t *out_size) +{ + char *body; + char *tmp; + size_t body_size = 0; + char *buffered_data = NULL; + size_t buffer_size = 0; + int ret; + + if (new_data == NULL && chunk == NULL) { + flb_plg_error(ctx->ins, "[construct_request_buffer] Something went wrong" + " both chunk and new_data are NULL"); + return -1; + } + + if (chunk) { + ret = s3_store_file_read(ctx, chunk, &buffered_data, &buffer_size); + if (ret < 0) { + flb_plg_error(ctx->ins, "Could not read locally buffered data %s", + chunk->file_path); + return -1; + } + + /* + * lock the chunk from buffer list + */ + s3_store_file_lock(chunk); + body = buffered_data; + body_size = buffer_size; + } + + /* + * If new data is arriving, increase the original 'buffered_data' size + * to append the new one. + */ + if (new_data) { + body_size += flb_sds_len(new_data); + + tmp = flb_realloc(buffered_data, body_size + 1); + if (!tmp) { + flb_errno(); + flb_free(buffered_data); + if (chunk) { + s3_store_file_unlock(chunk); + } + return -1; + } + body = buffered_data = tmp; + memcpy(body + buffer_size, new_data, flb_sds_len(new_data)); + body[body_size] = '\0'; + } + + *out_buf = body; + *out_size = body_size; + + return 0; +} + +static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t file_first_log_time, + char *body, size_t body_size) +{ + flb_sds_t s3_key = NULL; + struct flb_http_client *c = NULL; + struct flb_aws_client *s3_client; + struct flb_aws_header *headers = NULL; + char *random_alphanumeric; + int append_random = FLB_FALSE; + int len; + int ret; + int num_headers = 0; + char *final_key; + flb_sds_t uri; + flb_sds_t tmp; + char final_body_md5[25]; + + s3_key = flb_get_s3_key(ctx->s3_key_format, file_first_log_time, tag, + ctx->tag_delimiters, ctx->seq_index); + if (!s3_key) { + flb_plg_error(ctx->ins, "Failed to construct S3 Object Key for %s", tag); + return -1; + } + + len = strlen(s3_key); + if ((len + 16) <= 1024 && !ctx->key_fmt_has_uuid && !ctx->static_file_path && + !ctx->key_fmt_has_seq_index) { + append_random = FLB_TRUE; + len += 16; + } + len += strlen(ctx->bucket + 1); + + uri = flb_sds_create_size(len); + + if (append_random == FLB_TRUE) { + random_alphanumeric = flb_sts_session_name(); + if (!random_alphanumeric) { + flb_sds_destroy(s3_key); + flb_sds_destroy(uri); + flb_plg_error(ctx->ins, "Failed to create randomness for S3 key %s", tag); + return -1; + } + /* only use 8 chars of the random string */ + random_alphanumeric[8] = '\0'; + + tmp = flb_sds_printf(&uri, "/%s%s-object%s", ctx->bucket, s3_key, + random_alphanumeric); + flb_free(random_alphanumeric); + } + else { + tmp = flb_sds_printf(&uri, "/%s%s", ctx->bucket, s3_key); + } + + if (!tmp) { + flb_sds_destroy(s3_key); + flb_plg_error(ctx->ins, "Failed to create PutObject URI"); + return -1; + } + flb_sds_destroy(s3_key); + uri = tmp; + + memset(final_body_md5, 0, sizeof(final_body_md5)); + if (ctx->send_content_md5 == FLB_TRUE) { + ret = get_md5_base64(body, body_size, + final_body_md5, sizeof(final_body_md5)); + if (ret != 0) { + flb_plg_error(ctx->ins, "Failed to create Content-MD5 header"); + flb_sds_destroy(uri); + return -1; + } + } + + /* Update file and increment index value right before request */ + if (ctx->key_fmt_has_seq_index) { + ctx->seq_index++; + + ret = write_seq_index(ctx->seq_index_file, ctx->seq_index); + if (ret < 0 && access(ctx->seq_index_file, F_OK) == 0) { + ctx->seq_index--; + flb_sds_destroy(s3_key); + flb_plg_error(ctx->ins, "Failed to update sequential index metadata file"); + return -1; + } + } + + s3_client = ctx->s3_client; + if (s3_plugin_under_test() == FLB_TRUE) { + c = mock_s3_call("TEST_PUT_OBJECT_ERROR", "PutObject"); + } + else { + ret = create_headers(ctx, final_body_md5, &headers, &num_headers, FLB_FALSE); + if (ret == -1) { + flb_plg_error(ctx->ins, "Failed to create headers"); + flb_sds_destroy(uri); + goto decrement_index; + } + c = s3_client->client_vtable->request(s3_client, FLB_HTTP_PUT, + uri, body, body_size, + headers, num_headers); + flb_free(headers); + } + if (c) { + flb_plg_debug(ctx->ins, "PutObject http status=%d", c->resp.status); + if (c->resp.status == 200) { + /* + * URI contains bucket name, so we must advance over it + * to print the object key + */ + final_key = uri + strlen(ctx->bucket) + 1; + flb_plg_info(ctx->ins, "Successfully uploaded object %s", final_key); + flb_sds_destroy(uri); + flb_http_client_destroy(c); + + return 0; + } + flb_aws_print_xml_error(c->resp.payload, c->resp.payload_size, + "PutObject", ctx->ins); + if (c->resp.data != NULL) { + flb_plg_error(ctx->ins, "Raw PutObject response: %s", c->resp.data); + } + flb_http_client_destroy(c); + } + + flb_plg_error(ctx->ins, "PutObject request failed"); + flb_sds_destroy(uri); + goto decrement_index; + +decrement_index: + if (ctx->key_fmt_has_seq_index) { + ctx->seq_index--; + + ret = write_seq_index(ctx->seq_index_file, ctx->seq_index); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to decrement index after request error"); + return -1; + } + } + return -1; +} + +int get_md5_base64(char *buf, size_t buf_size, char *md5_str, size_t md5_str_size) +{ + unsigned char md5_bin[16]; + size_t olen; + int ret; + + ret = flb_hash_simple(FLB_HASH_MD5, + (unsigned char *) buf, buf_size, + md5_bin, sizeof(md5_bin)); + + if (ret != FLB_CRYPTO_SUCCESS) { + return -1; + } + + ret = flb_base64_encode((unsigned char*) md5_str, md5_str_size, + &olen, md5_bin, sizeof(md5_bin)); + if (ret != 0) { + return ret; + } + + return 0; +} + +static struct multipart_upload *get_upload(struct flb_s3 *ctx, + const char *tag, int tag_len) +{ + struct multipart_upload *m_upload = NULL; + struct multipart_upload *tmp_upload = NULL; + struct mk_list *tmp; + struct mk_list *head; + + mk_list_foreach_safe(head, tmp, &ctx->uploads) { + tmp_upload = mk_list_entry(head, struct multipart_upload, _head); + + if (tmp_upload->upload_state == MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS) { + continue; + } + if (tmp_upload->upload_errors >= MAX_UPLOAD_ERRORS) { + tmp_upload->upload_state = MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS; + flb_plg_error(ctx->ins, "Upload for %s has reached max upload errors", + tmp_upload->s3_key); + continue; + } + if (strcmp(tmp_upload->tag, tag) == 0) { + m_upload = tmp_upload; + break; + } + } + + return m_upload; +} + +static struct multipart_upload *create_upload(struct flb_s3 *ctx, const char *tag, + int tag_len, time_t file_first_log_time) +{ + int ret; + struct multipart_upload *m_upload = NULL; + flb_sds_t s3_key = NULL; + flb_sds_t tmp_sds = NULL; + + /* create new upload for this key */ + m_upload = flb_calloc(1, sizeof(struct multipart_upload)); + if (!m_upload) { + flb_errno(); + return NULL; + } + s3_key = flb_get_s3_key(ctx->s3_key_format, file_first_log_time, tag, + ctx->tag_delimiters, ctx->seq_index); + if (!s3_key) { + flb_plg_error(ctx->ins, "Failed to construct S3 Object Key for %s", tag); + flb_free(m_upload); + return NULL; + } + m_upload->s3_key = s3_key; + tmp_sds = flb_sds_create_len(tag, tag_len); + if (!tmp_sds) { + flb_errno(); + flb_free(m_upload); + return NULL; + } + m_upload->tag = tmp_sds; + m_upload->upload_state = MULTIPART_UPLOAD_STATE_NOT_CREATED; + m_upload->part_number = 1; + m_upload->init_time = time(NULL); + mk_list_add(&m_upload->_head, &ctx->uploads); + + /* Update file and increment index value right before request */ + if (ctx->key_fmt_has_seq_index) { + ctx->seq_index++; + + ret = write_seq_index(ctx->seq_index_file, ctx->seq_index); + if (ret < 0) { + ctx->seq_index--; + flb_sds_destroy(s3_key); + flb_plg_error(ctx->ins, "Failed to write to sequential index metadata file"); + return NULL; + } + } + + return m_upload; +} + +/* Adds an entry to upload queue */ +static int add_to_queue(struct flb_s3 *ctx, struct s3_file *upload_file, + struct multipart_upload *m_upload_file, const char *tag, int tag_len) +{ + struct upload_queue *upload_contents; + flb_sds_t tag_cpy; + + /* Create upload contents object and add to upload queue */ + upload_contents = flb_calloc(1, sizeof(struct upload_queue)); + if (upload_contents == NULL) { + flb_plg_error(ctx->ins, "Error allocating memory for upload_queue entry"); + flb_errno(); + return -1; + } + upload_contents->upload_file = upload_file; + upload_contents->m_upload_file = m_upload_file; + upload_contents->tag_len = tag_len; + upload_contents->retry_counter = 0; + upload_contents->upload_time = -1; + + /* Necessary to create separate string for tag to prevent corruption */ + tag_cpy = flb_sds_create_len(tag, tag_len); + if (!tag_cpy) { + flb_errno(); + flb_free(upload_contents); + return -1; + } + upload_contents->tag = tag_cpy; + + + /* Add entry to upload queue */ + mk_list_add(&upload_contents->_head, &ctx->upload_queue); + return 0; +} + +/* Removes an entry from upload_queue */ +void remove_from_queue(struct upload_queue *entry) +{ + mk_list_del(&entry->_head); + flb_sds_destroy(entry->tag); + flb_free(entry); + return; +} + +/* Validity check for upload queue object */ +static int upload_queue_valid(struct upload_queue *upload_contents, time_t now, + void *out_context) +{ + struct flb_s3 *ctx = out_context; + + if (upload_contents == NULL) { + flb_plg_error(ctx->ins, "Error getting entry from upload_queue"); + return -1; + } + if (upload_contents->_head.next == NULL || upload_contents->_head.prev == NULL) { + flb_plg_debug(ctx->ins, "Encountered previously deleted entry in " + "upload_queue. Deleting invalid entry"); + mk_list_del(&upload_contents->_head); + return -1; + } + if (upload_contents->upload_file->locked == FLB_FALSE) { + flb_plg_debug(ctx->ins, "Encountered unlocked file in upload_queue. " + "Exiting"); + return -1; + } + if (upload_contents->upload_file->size <= 0) { + flb_plg_debug(ctx->ins, "Encountered empty chunk file in upload_queue. " + "Deleting empty chunk file"); + remove_from_queue(upload_contents); + return -1; + } + if (now < upload_contents->upload_time) { + flb_plg_debug(ctx->ins, "Found valid chunk file but not ready to upload"); + return -1; + } + return 0; +} + +static int send_upload_request(void *out_context, flb_sds_t chunk, + struct s3_file *upload_file, + struct multipart_upload *m_upload_file, + const char *tag, int tag_len) +{ + int ret; + char *buffer; + size_t buffer_size; + struct flb_s3 *ctx = out_context; + + /* Create buffer to upload to S3 */ + ret = construct_request_buffer(ctx, chunk, upload_file, &buffer, &buffer_size); + flb_sds_destroy(chunk); + if (ret < 0) { + flb_plg_error(ctx->ins, "Could not construct request buffer for %s", + upload_file->file_path); + return -1; + } + + /* Upload to S3 */ + ret = upload_data(ctx, upload_file, m_upload_file, buffer, buffer_size, tag, tag_len); + flb_free(buffer); + + return ret; +} + +static int buffer_chunk(void *out_context, struct s3_file *upload_file, + flb_sds_t chunk, int chunk_size, + const char *tag, int tag_len, + time_t file_first_log_time) +{ + int ret; + struct flb_s3 *ctx = out_context; + + ret = s3_store_buffer_put(ctx, upload_file, tag, + tag_len, chunk, (size_t) chunk_size, file_first_log_time); + flb_sds_destroy(chunk); + if (ret < 0) { + flb_plg_warn(ctx->ins, "Could not buffer chunk. Data order preservation " + "will be compromised"); + return -1; + } + return 0; +} + +/* Uploads all chunk files in queue synchronously */ +static void s3_upload_queue(struct flb_config *config, void *out_context) +{ + int ret; + time_t now; + struct upload_queue *upload_contents; + struct flb_s3 *ctx = out_context; + struct mk_list *tmp; + struct mk_list *head; + + flb_plg_debug(ctx->ins, "Running upload timer callback (upload_queue).."); + + /* No chunks in upload queue. Scan for timed out chunks. */ + if (mk_list_size(&ctx->upload_queue) == 0) { + flb_plg_debug(ctx->ins, "No files found in upload_queue. Scanning for timed " + "out chunks"); + cb_s3_upload(config, out_context); + } + + /* Iterate through each file in upload queue */ + mk_list_foreach_safe(head, tmp, &ctx->upload_queue) { + upload_contents = mk_list_entry(head, struct upload_queue, _head); + + now = time(NULL); + + /* Checks if upload_contents is valid */ + ret = upload_queue_valid(upload_contents, now, ctx); + if (ret < 0) { + goto exit; + } + + /* Try to upload file. Return value can be -1, FLB_OK, FLB_ERROR, FLB_RETRY. */ + ret = send_upload_request(ctx, NULL, upload_contents->upload_file, + upload_contents->m_upload_file, + upload_contents->tag, upload_contents->tag_len); + if (ret < 0) { + goto exit; + } + else if (ret == FLB_OK) { + remove_from_queue(upload_contents); + ctx->retry_time = 0; + ctx->upload_queue_success = FLB_TRUE; + } + else { + s3_store_file_lock(upload_contents->upload_file); + ctx->upload_queue_success = FLB_FALSE; + + /* If retry limit was reached, discard file and remove file from queue */ + upload_contents->retry_counter++; + if (upload_contents->retry_counter >= MAX_UPLOAD_ERRORS) { + flb_plg_warn(ctx->ins, "Chunk file failed to send %d times, will not " + "retry", upload_contents->retry_counter); + s3_store_file_inactive(ctx, upload_contents->upload_file); + multipart_upload_destroy(upload_contents->m_upload_file); + remove_from_queue(upload_contents); + continue; + } + + /* Retry in N seconds */ + upload_contents->upload_time = now + 2 * upload_contents->retry_counter; + ctx->retry_time += 2 * upload_contents->retry_counter; + flb_plg_debug(ctx->ins, "Failed to upload file in upload_queue. Will not " + "retry for %d seconds", 2 * upload_contents->retry_counter); + break; + } + } + +exit: + return; +} + +static void cb_s3_upload(struct flb_config *config, void *data) +{ + struct flb_s3 *ctx = data; + struct s3_file *chunk = NULL; + struct multipart_upload *m_upload = NULL; + struct flb_fstore_file *fsf; + char *buffer = NULL; + size_t buffer_size = 0; + struct mk_list *tmp; + struct mk_list *head; + int complete; + int ret; + time_t now; + + flb_plg_debug(ctx->ins, "Running upload timer callback (cb_s3_upload).."); + + now = time(NULL); + + /* Check all chunks and see if any have timed out */ + mk_list_foreach_safe(head, tmp, &ctx->stream_active->files) { + fsf = mk_list_entry(head, struct flb_fstore_file, _head); + chunk = fsf->data; + + if (now < (chunk->create_time + ctx->upload_timeout + ctx->retry_time)) { + continue; /* Only send chunks which have timed out */ + } + + /* Locked chunks are being processed, skip */ + if (chunk->locked == FLB_TRUE) { + continue; + } + + m_upload = get_upload(ctx, (const char *) fsf->meta_buf, fsf->meta_size); + + ret = construct_request_buffer(ctx, NULL, chunk, &buffer, &buffer_size); + if (ret < 0) { + flb_plg_error(ctx->ins, "Could not construct request buffer for %s", + chunk->file_path); + continue; + } + + /* FYI: if construct_request_buffer() succeedeed, the s3_file is locked */ + ret = upload_data(ctx, chunk, m_upload, buffer, buffer_size, + (const char *) fsf->meta_buf, fsf->meta_size); + flb_free(buffer); + if (ret != FLB_OK) { + flb_plg_error(ctx->ins, "Could not send chunk with tag %s", + (char *) fsf->meta_buf); + } + } + + /* Check all uploads and see if any need completion */ + mk_list_foreach_safe(head, tmp, &ctx->uploads) { + m_upload = mk_list_entry(head, struct multipart_upload, _head); + complete = FLB_FALSE; + + if (m_upload->complete_errors >= MAX_UPLOAD_ERRORS) { + flb_plg_error(ctx->ins, + "Upload for %s has reached max completion errors, " + "plugin will give up", m_upload->s3_key); + mk_list_del(&m_upload->_head); + continue; + } + + if (m_upload->upload_state == MULTIPART_UPLOAD_STATE_NOT_CREATED) { + continue; + } + + if (m_upload->upload_state == MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS) { + complete = FLB_TRUE; + } + if (time(NULL) > (m_upload->init_time + ctx->upload_timeout + ctx->retry_time)) { + flb_plg_info(ctx->ins, "Completing upload for %s because upload_timeout" + " has passed", m_upload->s3_key); + complete = FLB_TRUE; + } + if (complete == FLB_TRUE) { + m_upload->upload_state = MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS; + mk_list_del(&m_upload->_head); + ret = complete_multipart_upload(ctx, m_upload); + if (ret == 0) { + multipart_upload_destroy(m_upload); + } + else { + mk_list_add(&m_upload->_head, &ctx->uploads); + /* data was persisted, this can be retried */ + m_upload->complete_errors += 1; + flb_plg_error(ctx->ins, "Could not complete upload %s, will retry..", + m_upload->s3_key); + } + } + } + +} + +static flb_sds_t flb_pack_msgpack_extract_log_key(void *out_context, const char *data, + uint64_t bytes) +{ + int i; + int records = 0; + int map_size; + int check = FLB_FALSE; + int found = FLB_FALSE; + int log_key_missing = 0; + int ret; + int alloc_error = 0; + struct flb_s3 *ctx = out_context; + char *val_buf; + char *key_str = NULL; + size_t key_str_size = 0; + size_t msgpack_size = bytes + bytes / 4; + size_t val_offset = 0; + flb_sds_t out_buf; + msgpack_object map; + msgpack_object key; + msgpack_object val; + struct flb_log_event_decoder log_decoder; + struct flb_log_event log_event; + + /* Iterate the original buffer and perform adjustments */ + records = flb_mp_count(data, bytes); + if (records <= 0) { + return NULL; + } + + /* Allocate buffer to store log_key contents */ + val_buf = flb_calloc(1, msgpack_size); + if (val_buf == NULL) { + flb_plg_error(ctx->ins, "Could not allocate enough " + "memory to read record"); + flb_errno(); + return NULL; + } + + ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); + + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_plg_error(ctx->ins, + "Log event decoder initialization error : %d", ret); + + flb_free(val_buf); + + return NULL; + } + + + while (!alloc_error && + (ret = flb_log_event_decoder_next( + &log_decoder, + &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + + /* Get the record/map */ + map = *log_event.body; + + if (map.type != MSGPACK_OBJECT_MAP) { + continue; + } + + map_size = map.via.map.size; + + /* Reset variables for found log_key and correct type */ + found = FLB_FALSE; + check = FLB_FALSE; + + /* Extract log_key from record and append to output buffer */ + for (i = 0; i < map_size; i++) { + key = map.via.map.ptr[i].key; + val = map.via.map.ptr[i].val; + + if (key.type == MSGPACK_OBJECT_BIN) { + key_str = (char *) key.via.bin.ptr; + key_str_size = key.via.bin.size; + check = FLB_TRUE; + } + if (key.type == MSGPACK_OBJECT_STR) { + key_str = (char *) key.via.str.ptr; + key_str_size = key.via.str.size; + check = FLB_TRUE; + } + + if (check == FLB_TRUE) { + if (strncmp(ctx->log_key, key_str, key_str_size) == 0) { + found = FLB_TRUE; + + /* + * Copy contents of value into buffer. Necessary to copy + * strings because flb_msgpack_to_json does not handle nested + * JSON gracefully and double escapes them. + */ + if (val.type == MSGPACK_OBJECT_BIN) { + memcpy(val_buf + val_offset, val.via.bin.ptr, val.via.bin.size); + val_offset += val.via.bin.size; + val_buf[val_offset] = '\n'; + val_offset++; + } + else if (val.type == MSGPACK_OBJECT_STR) { + memcpy(val_buf + val_offset, val.via.str.ptr, val.via.str.size); + val_offset += val.via.str.size; + val_buf[val_offset] = '\n'; + val_offset++; + } + else { + ret = flb_msgpack_to_json(val_buf + val_offset, + msgpack_size - val_offset, &val); + if (ret < 0) { + break; + } + val_offset += ret; + val_buf[val_offset] = '\n'; + val_offset++; + } + /* Exit early once log_key has been found for current record */ + break; + } + } + } + + /* If log_key was not found in the current record, mark log key as missing */ + if (found == FLB_FALSE) { + log_key_missing++; + } + } + + /* Throw error once per chunk if at least one log key was not found */ + if (log_key_missing > 0) { + flb_plg_error(ctx->ins, "Could not find log_key '%s' in %d records", + ctx->log_key, log_key_missing); + } + + flb_log_event_decoder_destroy(&log_decoder); + + /* If nothing was read, destroy buffer */ + if (val_offset == 0) { + flb_free(val_buf); + return NULL; + } + val_buf[val_offset] = '\0'; + + /* Create output buffer to store contents */ + out_buf = flb_sds_create(val_buf); + if (out_buf == NULL) { + flb_plg_error(ctx->ins, "Error creating buffer to store log_key contents."); + flb_errno(); + } + flb_free(val_buf); + + return out_buf; +} + +static void unit_test_flush(void *out_context, struct s3_file *upload_file, + const char *tag, int tag_len, flb_sds_t chunk, + int chunk_size, struct multipart_upload *m_upload_file, + time_t file_first_log_time) +{ + int ret; + char *buffer; + size_t buffer_size; + struct flb_s3 *ctx = out_context; + + s3_store_buffer_put(ctx, upload_file, tag, tag_len, + chunk, (size_t) chunk_size, file_first_log_time); + ret = construct_request_buffer(ctx, chunk, upload_file, &buffer, &buffer_size); + if (ret < 0) { + flb_plg_error(ctx->ins, "Could not construct request buffer for %s", + upload_file->file_path); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + ret = upload_data(ctx, upload_file, m_upload_file, buffer, buffer_size, tag, tag_len); + flb_free(buffer); + + FLB_OUTPUT_RETURN(ret); +} + +static void flush_init(void *out_context) +{ + int ret; + struct flb_s3 *ctx = out_context; + struct flb_sched *sched; + + /* clean up any old buffers found on startup */ + if (ctx->has_old_buffers == FLB_TRUE) { + flb_plg_info(ctx->ins, + "Sending locally buffered data from previous " + "executions to S3; buffer=%s", + ctx->fs->root_path); + ctx->has_old_buffers = FLB_FALSE; + ret = put_all_chunks(ctx); + if (ret < 0) { + ctx->has_old_buffers = FLB_TRUE; + flb_plg_error(ctx->ins, + "Failed to send locally buffered data left over " + "from previous executions; will retry. Buffer=%s", + ctx->fs->root_path); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + } + + /* + * create a timer that will run periodically and check if uploads + * are ready for completion + * this is created once on the first flush + */ + if (ctx->timer_created == FLB_FALSE) { + flb_plg_debug(ctx->ins, + "Creating upload timer with frequency %ds", + ctx->timer_ms / 1000); + + sched = flb_sched_ctx_get(); + + if (ctx->preserve_data_ordering) { + ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM, + ctx->timer_ms, s3_upload_queue, ctx, NULL); + } + else { + ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM, + ctx->timer_ms, cb_s3_upload, ctx, NULL); + } + if (ret == -1) { + flb_plg_error(ctx->ins, "Failed to create upload timer"); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + ctx->timer_created = FLB_TRUE; + } +} + +static void cb_s3_flush(struct flb_event_chunk *event_chunk, + struct flb_output_flush *out_flush, + struct flb_input_instance *i_ins, + void *out_context, + struct flb_config *config) +{ + int ret; + int chunk_size; + int upload_timeout_check = FLB_FALSE; + int total_file_size_check = FLB_FALSE; + flb_sds_t chunk = NULL; + struct s3_file *upload_file = NULL; + struct flb_s3 *ctx = out_context; + struct multipart_upload *m_upload_file = NULL; + time_t file_first_log_time = 0; + struct flb_log_event_decoder log_decoder; + struct flb_log_event log_event; + + /* Cleanup old buffers and initialize upload timer */ + flush_init(ctx); + + /* Process chunk */ + if (ctx->log_key) { + chunk = flb_pack_msgpack_extract_log_key(ctx, + event_chunk->data, + event_chunk->size); + } + else { + chunk = flb_pack_msgpack_to_json_format(event_chunk->data, + event_chunk->size, + FLB_PACK_JSON_FORMAT_LINES, + ctx->json_date_format, + ctx->date_key); + } + if (chunk == NULL) { + flb_plg_error(ctx->ins, "Could not marshal msgpack to output string"); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + chunk_size = flb_sds_len(chunk); + + /* Get a file candidate matching the given 'tag' */ + upload_file = s3_store_file_get(ctx, + event_chunk->tag, + flb_sds_len(event_chunk->tag)); + + if (upload_file == NULL) { + ret = flb_log_event_decoder_init(&log_decoder, + (char *) event_chunk->data, + event_chunk->size); + + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_plg_error(ctx->ins, + "Log event decoder initialization error : %d", ret); + + flb_sds_destroy(chunk); + + FLB_OUTPUT_RETURN(FLB_ERROR); + } + + while ((ret = flb_log_event_decoder_next( + &log_decoder, + &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + if (log_event.timestamp.tm.tv_sec != 0) { + file_first_log_time = log_event.timestamp.tm.tv_sec; + break; + } + } + + flb_log_event_decoder_destroy(&log_decoder); + } + else { + /* Get file_first_log_time from upload_file */ + file_first_log_time = upload_file->first_log_time; + } + + if (file_first_log_time == 0) { + file_first_log_time = time(NULL); + } + + /* Specific to unit tests, will not get called normally */ + if (s3_plugin_under_test() == FLB_TRUE) { + unit_test_flush(ctx, upload_file, + event_chunk->tag, flb_sds_len(event_chunk->tag), + chunk, chunk_size, + m_upload_file, file_first_log_time); + } + + /* Discard upload_file if it has failed to upload MAX_UPLOAD_ERRORS times */ + if (upload_file != NULL && upload_file->failures >= MAX_UPLOAD_ERRORS) { + flb_plg_warn(ctx->ins, "File with tag %s failed to send %d times, will not " + "retry", event_chunk->tag, MAX_UPLOAD_ERRORS); + s3_store_file_inactive(ctx, upload_file); + upload_file = NULL; + } + + /* If upload_timeout has elapsed, upload file */ + if (upload_file != NULL && time(NULL) > + (upload_file->create_time + ctx->upload_timeout)) { + upload_timeout_check = FLB_TRUE; + flb_plg_info(ctx->ins, "upload_timeout reached for %s", + event_chunk->tag); + } + + m_upload_file = get_upload(ctx, + event_chunk->tag, flb_sds_len(event_chunk->tag)); + + if (m_upload_file != NULL && time(NULL) > + (m_upload_file->init_time + ctx->upload_timeout)) { + upload_timeout_check = FLB_TRUE; + flb_plg_info(ctx->ins, "upload_timeout reached for %s", event_chunk->tag); + } + + /* If total_file_size has been reached, upload file */ + if ((upload_file && upload_file->size + chunk_size > ctx->upload_chunk_size) || + (m_upload_file && m_upload_file->bytes + chunk_size > ctx->file_size)) { + total_file_size_check = FLB_TRUE; + } + + /* File is ready for upload, upload_file != NULL prevents from segfaulting. */ + if ((upload_file != NULL) && (upload_timeout_check == FLB_TRUE || total_file_size_check == FLB_TRUE)) { + if (ctx->preserve_data_ordering == FLB_TRUE) { + /* Buffer last chunk in file and lock file to prevent further changes */ + ret = buffer_chunk(ctx, upload_file, chunk, chunk_size, + event_chunk->tag, flb_sds_len(event_chunk->tag), + file_first_log_time); + + if (ret < 0) { + FLB_OUTPUT_RETURN(FLB_RETRY); + } + s3_store_file_lock(upload_file); + + /* Add chunk file to upload queue */ + ret = add_to_queue(ctx, upload_file, m_upload_file, + event_chunk->tag, flb_sds_len(event_chunk->tag)); + if (ret < 0) { + FLB_OUTPUT_RETURN(FLB_ERROR); + } + + /* Go through upload queue and return error if something went wrong */ + s3_upload_queue(config, ctx); + if (ctx->upload_queue_success == FLB_FALSE) { + ctx->upload_queue_success = FLB_TRUE; + FLB_OUTPUT_RETURN(FLB_ERROR); + } + FLB_OUTPUT_RETURN(FLB_OK); + } + else { + /* Send upload directly without upload queue */ + ret = send_upload_request(ctx, chunk, upload_file, m_upload_file, + event_chunk->tag, + flb_sds_len(event_chunk->tag)); + if (ret < 0) { + FLB_OUTPUT_RETURN(FLB_ERROR); + } + FLB_OUTPUT_RETURN(ret); + } + } + + /* Buffer current chunk in filesystem and wait for next chunk from engine */ + ret = buffer_chunk(ctx, upload_file, chunk, chunk_size, + event_chunk->tag, flb_sds_len(event_chunk->tag), + file_first_log_time); + + if (ret < 0) { + FLB_OUTPUT_RETURN(FLB_RETRY); + } + FLB_OUTPUT_RETURN(FLB_OK); +} + +static int cb_s3_exit(void *data, struct flb_config *config) +{ + int ret; + struct flb_s3 *ctx = data; + struct multipart_upload *m_upload = NULL; + struct mk_list *tmp; + struct mk_list *head; + + if (!ctx) { + return 0; + } + + if (s3_store_has_data(ctx) == FLB_TRUE) { + flb_plg_info(ctx->ins, "Sending all locally buffered data to S3"); + ret = put_all_chunks(ctx); + if (ret < 0) { + flb_plg_error(ctx->ins, "Could not send all chunks on exit"); + } + } + + if (s3_store_has_uploads(ctx) == FLB_TRUE) { + mk_list_foreach_safe(head, tmp, &ctx->uploads) { + m_upload = mk_list_entry(head, struct multipart_upload, _head); + + if (m_upload->upload_state == MULTIPART_UPLOAD_STATE_NOT_CREATED) { + continue; + } + + if (m_upload->bytes > 0) { + m_upload->upload_state = MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS; + mk_list_del(&m_upload->_head); + ret = complete_multipart_upload(ctx, m_upload); + if (ret == 0) { + multipart_upload_destroy(m_upload); + } + else { + mk_list_add(&m_upload->_head, &ctx->uploads); + flb_plg_error(ctx->ins, "Could not complete upload %s", + m_upload->s3_key); + } + } + } + } + + s3_store_exit(ctx); + s3_context_destroy(ctx); + + return 0; +} + +/* Configuration properties map */ +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_STR, "json_date_format", NULL, + 0, FLB_FALSE, 0, + FBL_PACK_JSON_DATE_FORMAT_DESCRIPTION + }, + { + FLB_CONFIG_MAP_STR, "json_date_key", "date", + 0, FLB_TRUE, offsetof(struct flb_s3, json_date_key), + "Specifies the name of the date field in output." + }, + { + FLB_CONFIG_MAP_SIZE, "total_file_size", "100000000", + 0, FLB_TRUE, offsetof(struct flb_s3, file_size), + "Specifies the size of files in S3. Maximum size is 50GB, minimum is 1MB" + }, + { + FLB_CONFIG_MAP_SIZE, "upload_chunk_size", "5242880", + 0, FLB_TRUE, offsetof(struct flb_s3, upload_chunk_size), + "This plugin uses the S3 Multipart Upload API to stream data to S3, " + "ensuring your data gets-off-the-box as quickly as possible. " + "This parameter configures the size of each “part” in the upload. " + "The total_file_size option configures the size of the file you will see " + "in S3; this option determines the size of chunks uploaded until that " + "size is reached. These chunks are temporarily stored in chunk_buffer_path " + "until their size reaches upload_chunk_size, which point the chunk is " + "uploaded to S3. Default: 5M, Max: 50M, Min: 5M." + }, + + { + FLB_CONFIG_MAP_TIME, "upload_timeout", "10m", + 0, FLB_TRUE, offsetof(struct flb_s3, upload_timeout), + "Optionally specify a timeout for uploads. " + "Whenever this amount of time has elapsed, Fluent Bit will complete an " + "upload and create a new file in S3. For example, set this value to 60m " + "and you will get a new file in S3 every hour. Default is 10m." + }, + { + FLB_CONFIG_MAP_STR, "bucket", NULL, + 0, FLB_TRUE, offsetof(struct flb_s3, bucket), + "S3 bucket name." + }, + { + FLB_CONFIG_MAP_STR, "region", "us-east-1", + 0, FLB_TRUE, offsetof(struct flb_s3, region), + "AWS region." + }, + { + FLB_CONFIG_MAP_STR, "role_arn", NULL, + 0, FLB_FALSE, 0, + "ARN of an IAM role to assume (ex. for cross account access)." + }, + { + FLB_CONFIG_MAP_STR, "endpoint", NULL, + 0, FLB_TRUE, offsetof(struct flb_s3, endpoint), + "Custom endpoint for the S3 API." + }, + { + FLB_CONFIG_MAP_STR, "sts_endpoint", NULL, + 0, FLB_TRUE, offsetof(struct flb_s3, sts_endpoint), + "Custom endpoint for the STS API." + }, + { + FLB_CONFIG_MAP_STR, "canned_acl", NULL, + 0, FLB_FALSE, 0, + "Predefined Canned ACL policy for S3 objects." + }, + { + FLB_CONFIG_MAP_STR, "compression", NULL, + 0, FLB_FALSE, 0, + "Compression type for S3 objects. 'gzip' and 'arrow' are the supported values. " + "'arrow' is only an available if Apache Arrow was enabled at compile time. " + "Defaults to no compression. " + "If 'gzip' is selected, the Content-Encoding HTTP Header will be set to 'gzip'." + }, + { + FLB_CONFIG_MAP_STR, "content_type", NULL, + 0, FLB_FALSE, 0, + "A standard MIME type for the S3 object; this will be set " + "as the Content-Type HTTP header." + }, + + { + FLB_CONFIG_MAP_STR, "store_dir", "/tmp/fluent-bit/s3", + 0, FLB_TRUE, offsetof(struct flb_s3, store_dir), + "Directory to locally buffer data before sending. Plugin uses the S3 Multipart " + "upload API to send data in chunks of 5 MB at a time- only a small amount of" + " data will be locally buffered at any given point in time." + }, + + { + FLB_CONFIG_MAP_SIZE, "store_dir_limit_size", (char *) NULL, + 0, FLB_TRUE, offsetof(struct flb_s3, store_dir_limit_size), + "S3 plugin has its own buffering system with files in the `store_dir`. " + "Use the `store_dir_limit_size` to limit the amount of data S3 buffers in " + "the `store_dir` to limit disk usage. If the limit is reached, " + "data will be discarded. Default is 0 which means unlimited." + }, + + { + FLB_CONFIG_MAP_STR, "s3_key_format", "/fluent-bit-logs/$TAG/%Y/%m/%d/%H/%M/%S", + 0, FLB_TRUE, offsetof(struct flb_s3, s3_key_format), + "Format string for keys in S3. This option supports strftime time formatters " + "and a syntax for selecting parts of the Fluent log tag using a syntax inspired " + "by the rewrite_tag filter. Add $TAG in the format string to insert the full " + "log tag; add $TAG[0] to insert the first part of the tag in the s3 key. " + "The tag is split into “parts” using the characters specified with the " + "s3_key_format_tag_delimiters option. Add $INDEX to enable sequential indexing " + "for file names. Adding $INDEX will prevent random string being added to end of key" + "when $UUID is not provided. See the in depth examples and tutorial in the " + "documentation." + }, + + { + FLB_CONFIG_MAP_STR, "s3_key_format_tag_delimiters", ".", + 0, FLB_TRUE, offsetof(struct flb_s3, tag_delimiters), + "A series of characters which will be used to split the tag into “parts” for " + "use with the s3_key_format option. See the in depth examples and tutorial in " + "the documentation." + }, + + { + FLB_CONFIG_MAP_BOOL, "auto_retry_requests", "true", + 0, FLB_TRUE, offsetof(struct flb_s3, retry_requests), + "Immediately retry failed requests to AWS services once. This option " + "does not affect the normal Fluent Bit retry mechanism with backoff. " + "Instead, it enables an immediate retry with no delay for networking " + "errors, which may help improve throughput when there are transient/random " + "networking issues." + }, + + { + FLB_CONFIG_MAP_BOOL, "use_put_object", "false", + 0, FLB_TRUE, offsetof(struct flb_s3, use_put_object), + "Use the S3 PutObject API, instead of the multipart upload API" + }, + + { + FLB_CONFIG_MAP_BOOL, "send_content_md5", "false", + 0, FLB_TRUE, offsetof(struct flb_s3, send_content_md5), + "Send the Content-MD5 header with object uploads, as is required when Object Lock is enabled" + }, + + { + FLB_CONFIG_MAP_BOOL, "preserve_data_ordering", "true", + 0, FLB_TRUE, offsetof(struct flb_s3, preserve_data_ordering), + "Normally, when an upload request fails, there is a high chance for the last " + "received chunk to be swapped with a later chunk, resulting in data shuffling. " + "This feature prevents this shuffling by using a queue logic for uploads." + }, + + { + FLB_CONFIG_MAP_STR, "log_key", NULL, + 0, FLB_TRUE, offsetof(struct flb_s3, log_key), + "By default, the whole log record will be sent to S3. " + "If you specify a key name with this option, then only the value of " + "that key will be sent to S3." + }, + + { + FLB_CONFIG_MAP_STR, "external_id", NULL, + 0, FLB_TRUE, offsetof(struct flb_s3, external_id), + "Specify an external ID for the STS API, can be used with the role_arn parameter if your role " + "requires an external ID." + }, + + { + FLB_CONFIG_MAP_BOOL, "static_file_path", "false", + 0, FLB_TRUE, offsetof(struct flb_s3, static_file_path), + "Disables behavior where UUID string is automatically appended to end of S3 key name when " + "$UUID is not provided in s3_key_format. $UUID, time formatters, $TAG, and other dynamic " + "key formatters all work as expected while this feature is set to true." + }, + + { + FLB_CONFIG_MAP_STR, "storage_class", NULL, + 0, FLB_FALSE, 0, + "Specify the storage class for S3 objects. If this option is not specified, objects " + "will be stored with the default 'STANDARD' storage class." + }, + + { + FLB_CONFIG_MAP_STR, "profile", NULL, + 0, FLB_TRUE, offsetof(struct flb_s3, profile), + "AWS Profile name. AWS Profiles can be configured with AWS CLI and are usually stored in " + "$HOME/.aws/ directory." + }, + + /* EOF */ + {0} +}; + +/* Plugin registration */ +struct flb_output_plugin out_s3_plugin = { + .name = "s3", + .description = "Send to S3", + .cb_init = cb_s3_init, + .cb_flush = cb_s3_flush, + .cb_exit = cb_s3_exit, + .workers = 1, + .flags = FLB_OUTPUT_NET | FLB_IO_TLS, + .config_map = config_map +}; diff --git a/fluent-bit/plugins/out_s3/s3.h b/fluent-bit/plugins/out_s3/s3.h new file mode 100644 index 00000000..e145b1ad --- /dev/null +++ b/fluent-bit/plugins/out_s3/s3.h @@ -0,0 +1,203 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_S3 +#define FLB_OUT_S3 + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_aws_credentials.h> +#include <fluent-bit/flb_aws_util.h> + +/* Upload data to S3 in 5MB chunks */ +#define MIN_CHUNKED_UPLOAD_SIZE 5242880 +#define MAX_CHUNKED_UPLOAD_SIZE 50000000 +#define MAX_CHUNKED_UPLOAD_COMPRESS_SIZE 5000000000 + +#define UPLOAD_TIMER_MAX_WAIT 60000 +#define UPLOAD_TIMER_MIN_WAIT 6000 + +#define MULTIPART_UPLOAD_STATE_NOT_CREATED 0 +#define MULTIPART_UPLOAD_STATE_CREATED 1 +#define MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS 2 + +#define DEFAULT_FILE_SIZE 100000000 +#define MAX_FILE_SIZE 50000000000 +#define MAX_FILE_SIZE_STR "50,000,000,000" + +/* Allowed max file size 1 GB for publishing to S3 */ +#define MAX_FILE_SIZE_PUT_OBJECT 1000000000 + +#define DEFAULT_UPLOAD_TIMEOUT 3600 + +/* + * If we see repeated errors on an upload/chunk, we will discard it + * This saves us from scenarios where something goes wrong and an upload can + * not proceed (may be some other process completed it or deleted the upload) + * instead of erroring out forever, we eventually discard the upload. + * + * The same is done for chunks, just to be safe, even though realistically + * I can't think of a reason why a chunk could become unsendable. + */ +#define MAX_UPLOAD_ERRORS 5 + +struct upload_queue { + struct s3_file *upload_file; + struct multipart_upload *m_upload_file; + flb_sds_t tag; + int tag_len; + + int retry_counter; + time_t upload_time; + + struct mk_list _head; +}; + +struct multipart_upload { + flb_sds_t s3_key; + flb_sds_t tag; + flb_sds_t upload_id; + int upload_state; + time_t init_time; + + /* + * maximum of 10,000 parts in an upload, for each we need to store mapping + * of Part Number to ETag + */ + flb_sds_t etags[10000]; + int part_number; + + /* + * we use async http, so we need to check that all part requests have + * completed before we complete the upload + */ + int parts_uploaded; + + /* ongoing tracker of how much data has been sent for this upload */ + size_t bytes; + + struct mk_list _head; + + /* see note for MAX_UPLOAD_ERRORS */ + int upload_errors; + int complete_errors; +}; + +struct flb_s3 { + char *bucket; + char *region; + char *s3_key_format; + char *tag_delimiters; + char *endpoint; + char *sts_endpoint; + char *canned_acl; + char *content_type; + char *storage_class; + char *log_key; + char *external_id; + char *profile; + int free_endpoint; + int retry_requests; + int use_put_object; + int send_content_md5; + int static_file_path; + int compression; + int port; + int insecure; + size_t store_dir_limit_size; + + /* track the total amount of buffered data */ + size_t current_buffer_size; + + struct flb_aws_provider *provider; + struct flb_aws_provider *base_provider; + /* tls instances can't be re-used; aws provider requires a separate one */ + struct flb_tls *provider_tls; + /* one for the standard chain provider, one for sts assume role */ + struct flb_tls *sts_provider_tls; + struct flb_tls *client_tls; + + struct flb_aws_client *s3_client; + int json_date_format; + flb_sds_t json_date_key; + flb_sds_t date_key; + + flb_sds_t buffer_dir; + + char *store_dir; + struct flb_fstore *fs; + struct flb_fstore_stream *stream_active; /* default active stream */ + struct flb_fstore_stream *stream_upload; /* multipart upload stream */ + struct flb_fstore_stream *stream_metadata; /* s3 metadata stream */ + + /* + * used to track that unset buffers were found on startup that have not + * been sent + */ + int has_old_buffers; + /* old multipart uploads read on start up */ + int has_old_uploads; + + struct mk_list uploads; + + int preserve_data_ordering; + int upload_queue_success; + struct mk_list upload_queue; + + size_t file_size; + size_t upload_chunk_size; + time_t upload_timeout; + time_t retry_time; + + int timer_created; + int timer_ms; + int key_fmt_has_uuid; + + uint64_t seq_index; + int key_fmt_has_seq_index; + flb_sds_t metadata_dir; + flb_sds_t seq_index_file; + + struct flb_output_instance *ins; +}; + +int upload_part(struct flb_s3 *ctx, struct multipart_upload *m_upload, + char *body, size_t body_size); + +int create_multipart_upload(struct flb_s3 *ctx, + struct multipart_upload *m_upload); + +int complete_multipart_upload(struct flb_s3 *ctx, + struct multipart_upload *m_upload); + +void multipart_read_uploads_from_fs(struct flb_s3 *ctx); + +void multipart_upload_destroy(struct multipart_upload *m_upload); + +struct flb_http_client *mock_s3_call(char *error_env_var, char *api); +int s3_plugin_under_test(); + +int get_md5_base64(char *buf, size_t buf_size, char *md5_str, size_t md5_str_size); + +int create_headers(struct flb_s3 *ctx, char *body_md5, + struct flb_aws_header **headers, int *num_headers, + int multipart_upload); + +#endif diff --git a/fluent-bit/plugins/out_s3/s3_multipart.c b/fluent-bit/plugins/out_s3/s3_multipart.c new file mode 100644 index 00000000..1eb2a106 --- /dev/null +++ b/fluent-bit/plugins/out_s3/s3_multipart.c @@ -0,0 +1,707 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_slist.h> +#include <fluent-bit/flb_time.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_config_map.h> +#include <fluent-bit/flb_aws_util.h> +#include <fluent-bit/flb_signv4.h> +#include <fluent-bit/flb_fstore.h> +#include <ctype.h> + +#include "s3.h" +#include "s3_store.h" + +#define COMPLETE_MULTIPART_UPLOAD_BASE_LEN 100 +#define COMPLETE_MULTIPART_UPLOAD_PART_LEN 124 + +flb_sds_t get_etag(char *response, size_t size); + +static inline int try_to_write(char *buf, int *off, size_t left, + const char *str, size_t str_len) +{ + if (str_len <= 0){ + str_len = strlen(str); + } + if (left <= *off+str_len) { + return FLB_FALSE; + } + memcpy(buf+*off, str, str_len); + *off += str_len; + return FLB_TRUE; +} + + +/* the 'tag' or key in the upload_dir is s3_key + \n + upload_id */ +static flb_sds_t upload_key(struct multipart_upload *m_upload) +{ + flb_sds_t key; + flb_sds_t tmp; + + key = flb_sds_create_size(64); + + tmp = flb_sds_printf(&key, "%s\n%s", m_upload->s3_key, m_upload->upload_id); + if (!tmp) { + flb_errno(); + flb_sds_destroy(key); + return NULL; + } + key = tmp; + + return key; +} + +/* the 'tag' or key in the upload_dir is s3_key + \n + upload_id */ +static int upload_data_from_key(struct multipart_upload *m_upload, char *key) +{ + flb_sds_t tmp_sds; + int len = 0; + int original_len; + char *tmp; + + original_len = strlen(key); + + tmp = strchr(key, '\n'); + if (!tmp) { + return -1; + } + + len = tmp - key; + tmp_sds = flb_sds_create_len(key, len); + if (!tmp_sds) { + flb_errno(); + return -1; + } + m_upload->s3_key = tmp_sds; + + tmp++; + original_len -= (len + 1); + + tmp_sds = flb_sds_create_len(tmp, original_len); + if (!tmp_sds) { + flb_errno(); + return -1; + } + m_upload->upload_id = tmp_sds; + + return 0; +} + +/* parse etags from file data */ +static void parse_etags(struct multipart_upload *m_upload, char *data) +{ + char *line = data; + char *start; + char *end; + flb_sds_t etag; + int part_num; + int len; + + if (!data) { + return; + } + + line = strtok(data, "\n"); + + if (!line) { + return; + } + + do { + start = strstr(line, "part_number="); + if (!start) { + return; + } + start += 12; + end = strchr(start, '\t'); + if (!end) { + flb_debug("[s3 restart parser] Did not find tab separator in line %s", start); + return; + } + *end = '\0'; + part_num = atoi(start); + if (part_num <= 0) { + flb_debug("[s3 restart parser] Could not parse part_number from %s", start); + return; + } + m_upload->part_number = part_num; + *end = '\t'; + + start = strstr(line, "tag="); + if (!start) { + flb_debug("[s3 restart parser] Could not find 'etag=' %s", line); + return; + } + + start += 4; + len = strlen(start); + + if (len <= 0) { + flb_debug("[s3 restart parser] Could not find etag %s", line); + return; + } + + etag = flb_sds_create_len(start, len); + if (!etag) { + flb_debug("[s3 restart parser] Could create etag"); + return; + } + flb_debug("[s3 restart parser] found part number %d=%s", part_num, etag); + m_upload->etags[part_num - 1] = etag; + + line = strtok(NULL, "\n"); + } while (line != NULL); +} + +static struct multipart_upload *upload_from_file(struct flb_s3 *ctx, + struct flb_fstore_file *fsf) +{ + struct multipart_upload *m_upload = NULL; + char *buffered_data = NULL; + size_t buffer_size = 0; + int ret; + + ret = s3_store_file_upload_read(ctx, fsf, &buffered_data, &buffer_size); + if (ret < 0) { + flb_plg_error(ctx->ins, "Could not read locally buffered data %s", + fsf->name); + return NULL; + } + + /* always make sure we have a fresh copy of metadata */ + ret = s3_store_file_meta_get(ctx, fsf); + if (ret == -1) { + flb_plg_error(ctx->ins, "Could not read file metadata: %s", + fsf->name); + return NULL; + } + + m_upload = flb_calloc(1, sizeof(struct multipart_upload)); + if (!m_upload) { + flb_errno(); + flb_free(buffered_data); + return NULL; + } + m_upload->init_time = time(NULL); + m_upload->upload_state = MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS; + + ret = upload_data_from_key(m_upload, fsf->meta_buf); + if (ret < 0) { + flb_plg_error(ctx->ins, "Could not extract upload data from: %s", + fsf->name); + flb_free(buffered_data); + multipart_upload_destroy(m_upload); + return NULL; + } + + parse_etags(m_upload, buffered_data); + flb_free(buffered_data); + if (m_upload->part_number == 0) { + flb_plg_error(ctx->ins, "Could not extract upload data from %s", + fsf->name); + multipart_upload_destroy(m_upload); + return NULL; + } + + /* code expects it to be 1 more than the last part read */ + m_upload->part_number++; + + return m_upload; +} + +void multipart_read_uploads_from_fs(struct flb_s3 *ctx) +{ + struct mk_list *tmp; + struct mk_list *head; + struct multipart_upload *m_upload = NULL; + struct flb_fstore_file *fsf; + + mk_list_foreach_safe(head, tmp, &ctx->stream_upload->files) { + fsf = mk_list_entry(head, struct flb_fstore_file, _head); + m_upload = upload_from_file(ctx, fsf); + if (!m_upload) { + flb_plg_error(ctx->ins, + "Could not process multipart upload data in %s", + fsf->name); + continue; + } + mk_list_add(&m_upload->_head, &ctx->uploads); + flb_plg_info(ctx->ins, + "Successfully read existing upload from file system, s3_key=%s", + m_upload->s3_key); + } +} + +/* store list of part number and etag */ +static flb_sds_t upload_data(flb_sds_t etag, int part_num) +{ + flb_sds_t data; + flb_sds_t tmp; + + data = flb_sds_create_size(64); + + tmp = flb_sds_printf(&data, "part_number=%d\tetag=%s\n", part_num, etag); + if (!tmp) { + flb_errno(); + flb_sds_destroy(data); + return NULL; + } + data = tmp; + + return data; +} + +/* persists upload data to the file system */ +static int save_upload(struct flb_s3 *ctx, struct multipart_upload *m_upload, + flb_sds_t etag) +{ + int ret; + flb_sds_t key; + flb_sds_t data; + struct flb_fstore_file *fsf; + + key = upload_key(m_upload); + if (!key) { + flb_plg_debug(ctx->ins, "Could not constuct upload key for buffer dir"); + return -1; + } + + data = upload_data(etag, m_upload->part_number); + if (!data) { + flb_plg_debug(ctx->ins, "Could not constuct upload key for buffer dir"); + return -1; + } + + fsf = s3_store_file_upload_get(ctx, key, flb_sds_len(key)); + + /* Write the key to the file */ + ret = s3_store_file_upload_put(ctx, fsf, key, data); + + flb_sds_destroy(key); + flb_sds_destroy(data); + + return ret; +} + +static int remove_upload_from_fs(struct flb_s3 *ctx, struct multipart_upload *m_upload) +{ + flb_sds_t key; + struct flb_fstore_file *fsf; + + key = upload_key(m_upload); + if (!key) { + flb_plg_debug(ctx->ins, "Could not construct upload key"); + return -1; + } + + fsf = s3_store_file_upload_get(ctx, key, flb_sds_len(key)); + if (fsf) { + s3_store_file_upload_delete(ctx, fsf); + } + flb_sds_destroy(key); + return 0; +} + +/* + * https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html + */ +static int complete_multipart_upload_payload(struct flb_s3 *ctx, + struct multipart_upload *m_upload, + char **out_buf, size_t *out_size) +{ + char *buf; + int i; + int offset = 0; + flb_sds_t etag; + size_t size = COMPLETE_MULTIPART_UPLOAD_BASE_LEN; + char part_num[7]; + + size = size + (COMPLETE_MULTIPART_UPLOAD_PART_LEN * m_upload->part_number); + + buf = flb_malloc(size + 1); + if (!buf) { + flb_errno(); + return -1; + } + + if (!try_to_write(buf, &offset, size, + "<CompleteMultipartUpload xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">", 73)) { + goto error; + } + + for (i = 0; i < m_upload->part_number; i++) { + etag = m_upload->etags[i]; + if (etag == NULL) { + continue; + } + if (!try_to_write(buf, &offset, size, + "<Part><ETag>", 12)) { + goto error; + } + + if (!try_to_write(buf, &offset, size, + etag, 0)) { + goto error; + } + + if (!try_to_write(buf, &offset, size, + "</ETag><PartNumber>", 19)) { + goto error; + } + + if (!sprintf(part_num, "%d", i + 1)) { + goto error; + } + + if (!try_to_write(buf, &offset, size, + part_num, 0)) { + goto error; + } + + if (!try_to_write(buf, &offset, size, + "</PartNumber></Part>", 20)) { + goto error; + } + } + + if (!try_to_write(buf, &offset, size, + "</CompleteMultipartUpload>", 26)) { + goto error; + } + + buf[offset] = '\0'; + + *out_buf = buf; + *out_size = offset; + return 0; + +error: + flb_free(buf); + flb_plg_error(ctx->ins, "Failed to construct CompleteMultipartUpload " + "request body"); + return -1; +} + +int complete_multipart_upload(struct flb_s3 *ctx, + struct multipart_upload *m_upload) +{ + char *body; + size_t size; + flb_sds_t uri = NULL; + flb_sds_t tmp; + int ret; + struct flb_http_client *c = NULL; + struct flb_aws_client *s3_client; + + if (!m_upload->upload_id) { + flb_plg_error(ctx->ins, "Cannot complete multipart upload for key %s: " + "upload ID is unset ", m_upload->s3_key); + return -1; + } + + uri = flb_sds_create_size(flb_sds_len(m_upload->s3_key) + 11 + + flb_sds_len(m_upload->upload_id)); + if (!uri) { + flb_errno(); + return -1; + } + + tmp = flb_sds_printf(&uri, "/%s%s?uploadId=%s", ctx->bucket, + m_upload->s3_key, m_upload->upload_id); + if (!tmp) { + flb_sds_destroy(uri); + return -1; + } + uri = tmp; + + ret = complete_multipart_upload_payload(ctx, m_upload, &body, &size); + if (ret < 0) { + flb_sds_destroy(uri); + return -1; + } + + s3_client = ctx->s3_client; + if (s3_plugin_under_test() == FLB_TRUE) { + c = mock_s3_call("TEST_COMPLETE_MULTIPART_UPLOAD_ERROR", "CompleteMultipartUpload"); + } + else { + c = s3_client->client_vtable->request(s3_client, FLB_HTTP_POST, + uri, body, size, + NULL, 0); + } + flb_sds_destroy(uri); + flb_free(body); + if (c) { + flb_plg_debug(ctx->ins, "CompleteMultipartUpload http status=%d", + c->resp.status); + if (c->resp.status == 200) { + flb_plg_info(ctx->ins, "Successfully completed multipart upload " + "for %s, UploadId=%s", m_upload->s3_key, + m_upload->upload_id); + flb_http_client_destroy(c); + /* remove this upload from the file system */ + remove_upload_from_fs(ctx, m_upload); + return 0; + } + flb_aws_print_xml_error(c->resp.payload, c->resp.payload_size, + "CompleteMultipartUpload", ctx->ins); + if (c->resp.payload != NULL) { + flb_plg_debug(ctx->ins, "Raw CompleteMultipartUpload response: %s", + c->resp.payload); + } + flb_http_client_destroy(c); + } + + flb_plg_error(ctx->ins, "CompleteMultipartUpload request failed"); + return -1; +} + + +int create_multipart_upload(struct flb_s3 *ctx, + struct multipart_upload *m_upload) +{ + flb_sds_t uri = NULL; + flb_sds_t tmp; + struct flb_http_client *c = NULL; + struct flb_aws_client *s3_client; + struct flb_aws_header *headers = NULL; + int num_headers = 0; + int ret; + + uri = flb_sds_create_size(flb_sds_len(m_upload->s3_key) + 8); + if (!uri) { + flb_errno(); + return -1; + } + + tmp = flb_sds_printf(&uri, "/%s%s?uploads=", ctx->bucket, m_upload->s3_key); + if (!tmp) { + flb_sds_destroy(uri); + return -1; + } + uri = tmp; + + s3_client = ctx->s3_client; + if (s3_plugin_under_test() == FLB_TRUE) { + c = mock_s3_call("TEST_CREATE_MULTIPART_UPLOAD_ERROR", "CreateMultipartUpload"); + } + else { + ret = create_headers(ctx, NULL, &headers, &num_headers, FLB_TRUE); + if (ret == -1) { + flb_plg_error(ctx->ins, "Failed to create headers"); + flb_sds_destroy(uri); + return -1; + } + c = s3_client->client_vtable->request(s3_client, FLB_HTTP_POST, + uri, NULL, 0, headers, num_headers); + if (headers) { + flb_free(headers); + } + } + flb_sds_destroy(uri); + if (c) { + flb_plg_debug(ctx->ins, "CreateMultipartUpload http status=%d", + c->resp.status); + if (c->resp.status == 200) { + tmp = flb_aws_xml_get_val(c->resp.payload, c->resp.payload_size, + "<UploadId>", "</UploadId>"); + if (!tmp) { + flb_plg_error(ctx->ins, "Could not find upload ID in " + "CreateMultipartUpload response"); + flb_plg_debug(ctx->ins, "Raw CreateMultipartUpload response: %s", + c->resp.payload); + flb_http_client_destroy(c); + return -1; + } + m_upload->upload_id = tmp; + flb_plg_info(ctx->ins, "Successfully initiated multipart upload " + "for %s, UploadId=%s", m_upload->s3_key, + m_upload->upload_id); + flb_http_client_destroy(c); + return 0; + } + flb_aws_print_xml_error(c->resp.payload, c->resp.payload_size, + "CreateMultipartUpload", ctx->ins); + if (c->resp.payload != NULL) { + flb_plg_debug(ctx->ins, "Raw CreateMultipartUpload response: %s", + c->resp.payload); + } + flb_http_client_destroy(c); + } + + flb_plg_error(ctx->ins, "CreateMultipartUpload request failed"); + return -1; +} + +/* gets the ETag value from response headers */ +flb_sds_t get_etag(char *response, size_t size) +{ + char *tmp; + int start; + int end; + int len; + int i = 0; + flb_sds_t etag; + + if (response == NULL) { + return NULL; + } + + tmp = strstr(response, "ETag:"); + if (!tmp) { + return NULL; + } + i = tmp - response; + + /* advance to end of ETag key */ + i += 5; + + /* advance across any whitespace and the opening quote */ + while (i < size && (response[i] == '\"' || isspace(response[i]) != 0)) { + i++; + } + start = i; + /* advance until we hit whitespace or the end quote */ + while (i < size && (response[i] != '\"' && isspace(response[i]) == 0)) { + i++; + } + end = i; + len = end - start; + + etag = flb_sds_create_len(response + start, len); + if (!etag) { + flb_errno(); + return NULL; + } + + return etag; +} + +int upload_part(struct flb_s3 *ctx, struct multipart_upload *m_upload, + char *body, size_t body_size) +{ + flb_sds_t uri = NULL; + flb_sds_t tmp; + int ret; + struct flb_http_client *c = NULL; + struct flb_aws_client *s3_client; + struct flb_aws_header *headers = NULL; + int num_headers = 0; + char body_md5[25]; + + uri = flb_sds_create_size(flb_sds_len(m_upload->s3_key) + 8); + if (!uri) { + flb_errno(); + return -1; + } + + tmp = flb_sds_printf(&uri, "/%s%s?partNumber=%d&uploadId=%s", + ctx->bucket, m_upload->s3_key, m_upload->part_number, + m_upload->upload_id); + if (!tmp) { + flb_errno(); + flb_sds_destroy(uri); + return -1; + } + uri = tmp; + + memset(body_md5, 0, sizeof(body_md5)); + if (ctx->send_content_md5 == FLB_TRUE) { + ret = get_md5_base64(body, body_size, body_md5, sizeof(body_md5)); + if (ret != 0) { + flb_plg_error(ctx->ins, "Failed to create Content-MD5 header"); + flb_sds_destroy(uri); + return -1; + } + + num_headers = 1; + headers = flb_malloc(sizeof(struct flb_aws_header) * num_headers); + if (headers == NULL) { + flb_errno(); + flb_sds_destroy(uri); + return -1; + } + + headers[0].key = "Content-MD5"; + headers[0].key_len = 11; + headers[0].val = body_md5; + headers[0].val_len = strlen(body_md5); + } + + s3_client = ctx->s3_client; + if (s3_plugin_under_test() == FLB_TRUE) { + c = mock_s3_call("TEST_UPLOAD_PART_ERROR", "UploadPart"); + } + else { + c = s3_client->client_vtable->request(s3_client, FLB_HTTP_PUT, + uri, body, body_size, + headers, num_headers); + } + flb_free(headers); + flb_sds_destroy(uri); + if (c) { + flb_plg_info(ctx->ins, "UploadPart http status=%d", + c->resp.status); + if (c->resp.status == 200) { + tmp = get_etag(c->resp.data, c->resp.data_size); + if (!tmp) { + flb_plg_error(ctx->ins, "Could not find ETag in " + "UploadPart response"); + flb_plg_debug(ctx->ins, "Raw UploadPart response: %s", + c->resp.payload); + flb_http_client_destroy(c); + return -1; + } + m_upload->etags[m_upload->part_number - 1] = tmp; + flb_plg_info(ctx->ins, "Successfully uploaded part #%d " + "for %s, UploadId=%s, ETag=%s", m_upload->part_number, + m_upload->s3_key, m_upload->upload_id, tmp); + flb_http_client_destroy(c); + /* track how many bytes are have gone toward this upload */ + m_upload->bytes += body_size; + + /* finally, attempt to persist the data for this upload */ + ret = save_upload(ctx, m_upload, tmp); + if (ret == 0) { + flb_plg_debug(ctx->ins, "Successfully persisted upload data, UploadId=%s", + m_upload->upload_id); + } + else { + flb_plg_warn(ctx->ins, "Was not able to persisted upload data to disk; " + "if fluent bit dies without completing this upload the part " + "could be lost, UploadId=%s, ETag=%s", + m_upload->upload_id, tmp); + } + return 0; + } + flb_aws_print_xml_error(c->resp.payload, c->resp.payload_size, + "UploadPart", ctx->ins); + if (c->resp.payload != NULL) { + flb_plg_debug(ctx->ins, "Raw UploadPart response: %s", + c->resp.payload); + } + flb_http_client_destroy(c); + } + + flb_plg_error(ctx->ins, "UploadPart request failed"); + return -1; +} diff --git a/fluent-bit/plugins/out_s3/s3_store.c b/fluent-bit/plugins/out_s3/s3_store.c new file mode 100644 index 00000000..8a964063 --- /dev/null +++ b/fluent-bit/plugins/out_s3/s3_store.c @@ -0,0 +1,543 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_fstore.h> +#include <fluent-bit/flb_time.h> + +#include "s3.h" +#include "s3_store.h" + +static int s3_store_under_travis_ci() +{ + + if (getenv("CI") != NULL && getenv("TRAVIS") != NULL) { + return FLB_TRUE; + } + + return FLB_FALSE; +} + +/* + * Simple and fast hashing algorithm to create keys in the local buffer + */ +static flb_sds_t gen_store_filename(const char *tag) +{ + int c; + unsigned long hash = 5381; + unsigned long hash2 = 5381; + flb_sds_t hash_str; + flb_sds_t tmp; + struct flb_time tm; + + /* get current time */ + flb_time_get(&tm); + + /* compose hash */ + while ((c = *tag++)) { + hash = ((hash << 5) + hash) + c; /* hash * 33 + c */ + } + hash2 = (unsigned long) hash2 * tm.tm.tv_sec * tm.tm.tv_nsec; + + /* flb_sds_printf allocs if the incoming sds is not at least 64 bytes */ + hash_str = flb_sds_create_size(64); + if (!hash_str) { + flb_errno(); + return NULL; + } + tmp = flb_sds_printf(&hash_str, "%lu-%lu", hash, hash2); + if (!tmp) { + flb_errno(); + flb_sds_destroy(hash_str); + return NULL; + } + hash_str = tmp; + + return hash_str; +} + +/* Retrieve a candidate s3 local file using the tag */ +struct s3_file *s3_store_file_get(struct flb_s3 *ctx, const char *tag, + int tag_len) +{ + struct mk_list *head; + struct mk_list *tmp; + struct flb_fstore_file *fsf = NULL; + struct s3_file *s3_file; + + /* + * Based in the current ctx->stream_name, locate a candidate file to + * store the incoming data using as a lookup pattern the content Tag. + */ + mk_list_foreach_safe(head, tmp, &ctx->stream_active->files) { + fsf = mk_list_entry(head, struct flb_fstore_file, _head); + + /* skip and warn on partially initialized chunks */ + if (fsf->data == NULL) { + flb_plg_warn(ctx->ins, "BAD: found flb_fstore_file with NULL data reference, tag=%s, file=%s, will try to delete", tag, fsf->name); + flb_fstore_file_delete(ctx->fs, fsf); + } + + if (fsf->meta_size != tag_len) { + fsf = NULL; + continue; + } + + /* skip locked chunks */ + s3_file = fsf->data; + if (s3_file->locked == FLB_TRUE) { + fsf = NULL; + continue; + } + + /* compare meta and tag */ + if (strncmp((char *) fsf->meta_buf, tag, tag_len) == 0) { + break; + } + + /* not found, invalidate the reference */ + fsf = NULL; + } + + if (!fsf) { + return NULL; + } + + return fsf->data; +} + +/* Append data to a new or existing fstore file */ +int s3_store_buffer_put(struct flb_s3 *ctx, struct s3_file *s3_file, + const char *tag, int tag_len, + char *data, size_t bytes, + time_t file_first_log_time) +{ + int ret; + flb_sds_t name; + struct flb_fstore_file *fsf; + size_t space_remaining; + + if (ctx->store_dir_limit_size > 0 && ctx->current_buffer_size + bytes >= ctx->store_dir_limit_size) { + flb_plg_error(ctx->ins, "Buffer is full: current_buffer_size=%zu, new_data=%zu, store_dir_limit_size=%zu bytes", + ctx->current_buffer_size, bytes, ctx->store_dir_limit_size); + return -1; + } + + /* If no target file was found, create a new one */ + if (!s3_file) { + name = gen_store_filename(tag); + if (!name) { + flb_plg_error(ctx->ins, "could not generate chunk file name"); + return -1; + } + + /* Create the file */ + fsf = flb_fstore_file_create(ctx->fs, ctx->stream_active, name, bytes); + if (!fsf) { + flb_plg_error(ctx->ins, "could not create the file '%s' in the store", + name); + flb_sds_destroy(name); + return -1; + } + flb_sds_destroy(name); + + /* Write tag as metadata */ + ret = flb_fstore_file_meta_set(ctx->fs, fsf, (char *) tag, tag_len); + if (ret == -1) { + flb_plg_error(ctx->ins, "error writing tag metadata"); + flb_plg_warn(ctx->ins, "Deleting buffer file because metadata could not be written"); + flb_fstore_file_delete(ctx->fs, fsf); + return -1; + } + + /* Allocate local context */ + s3_file = flb_calloc(1, sizeof(struct s3_file)); + if (!s3_file) { + flb_errno(); + flb_plg_error(ctx->ins, "cannot allocate s3 file context"); + flb_plg_warn(ctx->ins, "Deleting buffer file because S3 context creation failed"); + flb_fstore_file_delete(ctx->fs, fsf); + return -1; + } + s3_file->fsf = fsf; + s3_file->first_log_time = file_first_log_time; + s3_file->create_time = time(NULL); + + /* Use fstore opaque 'data' reference to keep our context */ + fsf->data = s3_file; + } + else { + fsf = s3_file->fsf; + } + + /* Append data to the target file */ + ret = flb_fstore_file_append(fsf, data, bytes); + if (ret != 0) { + flb_plg_error(ctx->ins, "error writing data to local s3 file"); + return -1; + } + s3_file->size += bytes; + ctx->current_buffer_size += bytes; + + /* if buffer is 95% full, warn user */ + if (ctx->store_dir_limit_size > 0) { + space_remaining = ctx->store_dir_limit_size - ctx->current_buffer_size; + if ((space_remaining * 20) < ctx->store_dir_limit_size) { + flb_plg_warn(ctx->ins, "Buffer is almost full: current_buffer_size=%zu, store_dir_limit_size=%zu bytes", + ctx->current_buffer_size, ctx->store_dir_limit_size); + return -1; + } + } + + return 0; +} + +static int set_files_context(struct flb_s3 *ctx) +{ + struct mk_list *head; + struct mk_list *f_head; + struct flb_fstore_stream *fs_stream; + struct flb_fstore_file *fsf; + struct s3_file *s3_file; + + mk_list_foreach(head, &ctx->fs->streams) { + fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head); + + /* skip current stream since it's new */ + if (fs_stream == ctx->stream_active) { + continue; + } + + /* skip multi-upload */ + if (fs_stream == ctx->stream_upload) { + continue; + } + + mk_list_foreach(f_head, &fs_stream->files) { + fsf = mk_list_entry(f_head, struct flb_fstore_file, _head); + if (fsf->data) { + continue; + } + + /* Allocate local context */ + s3_file = flb_calloc(1, sizeof(struct s3_file)); + if (!s3_file) { + flb_errno(); + flb_plg_error(ctx->ins, "cannot allocate s3 file context"); + continue; + } + s3_file->fsf = fsf; + s3_file->first_log_time = time(NULL); + s3_file->create_time = time(NULL); + + /* Use fstore opaque 'data' reference to keep our context */ + fsf->data = s3_file; + } + } + + return 0; +} + +/* Initialize filesystem storage for S3 plugin */ +int s3_store_init(struct flb_s3 *ctx) +{ + int type; + time_t now; + char tmp[64]; + struct tm *tm; + struct flb_fstore *fs; + struct flb_fstore_stream *fs_stream; + + if (s3_store_under_travis_ci() == FLB_TRUE) { + type = FLB_FSTORE_MEM; + flb_plg_warn(ctx->ins, "Travis CI test, using s3 store memory backend"); + } + else { + type = FLB_FSTORE_FS; + } + + /* Initialize the storage context */ + fs = flb_fstore_create(ctx->buffer_dir, type); + if (!fs) { + return -1; + } + ctx->fs = fs; + + /* + * On every start we create a new stream, this stream in the file system + * is directory with the name using the date like '2020-10-03T13:00:02'. So + * all the 'new' data that is generated on this process is stored there. + * + * Note that previous data in similar directories from previous runs is + * considered backlog data, in the S3 plugin we need to differenciate the + * new v/s the older buffered data. + * + * Compose a stream name... + */ + now = time(NULL); + tm = localtime(&now); + +#ifdef FLB_SYSTEM_WINDOWS + /* Windows does not allow ':' in directory names */ + strftime(tmp, sizeof(tmp) - 1, "%Y-%m-%dT%H-%M-%S", tm); +#else + strftime(tmp, sizeof(tmp) - 1, "%Y-%m-%dT%H:%M:%S", tm); +#endif + + /* Create the stream */ + fs_stream = flb_fstore_stream_create(ctx->fs, tmp); + if (!fs_stream) { + /* Upon exception abort */ + flb_plg_error(ctx->ins, "could not initialize active stream: %s", tmp); + flb_fstore_destroy(fs); + ctx->fs = NULL; + return -1; + } + ctx->stream_active = fs_stream; + + /* Multipart upload stream */ + fs_stream = flb_fstore_stream_create(ctx->fs, "multipart_upload_metadata"); + if (!fs_stream) { + flb_plg_error(ctx->ins, "could not initialize multipart_upload stream"); + flb_fstore_destroy(fs); + ctx->fs = NULL; + return -1; + } + ctx->stream_upload = fs_stream; + + set_files_context(ctx); + return 0; +} + +int s3_store_exit(struct flb_s3 *ctx) +{ + struct mk_list *head; + struct mk_list *f_head; + struct flb_fstore_stream *fs_stream; + struct flb_fstore_file *fsf; + struct s3_file *s3_file; + + if (!ctx->fs) { + return 0; + } + + /* release local context on non-multi upload files */ + mk_list_foreach(head, &ctx->fs->streams) { + fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head); + if (fs_stream == ctx->stream_upload) { + continue; + } + + mk_list_foreach(f_head, &fs_stream->files) { + fsf = mk_list_entry(f_head, struct flb_fstore_file, _head); + if (fsf->data != NULL) { + s3_file = fsf->data; + flb_sds_destroy(s3_file->file_path); + flb_free(s3_file); + } + } + } + + if (ctx->fs) { + flb_fstore_destroy(ctx->fs); + } + return 0; +} + +/* + * Check if the store has data. This function is only used on plugin + * initialization + */ +int s3_store_has_data(struct flb_s3 *ctx) +{ + struct mk_list *head; + struct flb_fstore_stream *fs_stream; + + if (!ctx->fs) { + return FLB_FALSE; + } + + mk_list_foreach(head, &ctx->fs->streams) { + /* skip multi upload stream */ + fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head); + if (fs_stream == ctx->stream_upload) { + continue; + } + + if (mk_list_size(&fs_stream->files) > 0) { + return FLB_TRUE; + } + } + + return FLB_FALSE; +} + +int s3_store_has_uploads(struct flb_s3 *ctx) +{ + if (!ctx || !ctx->stream_upload) { + return FLB_FALSE; + } + + if (mk_list_size(&ctx->stream_upload->files) > 0) { + return FLB_TRUE; + } + + return FLB_FALSE; +} + +int s3_store_file_inactive(struct flb_s3 *ctx, struct s3_file *s3_file) +{ + int ret; + struct flb_fstore_file *fsf; + + fsf = s3_file->fsf; + flb_free(s3_file); + ret = flb_fstore_file_inactive(ctx->fs, fsf); + + return ret; +} + +int s3_store_file_delete(struct flb_s3 *ctx, struct s3_file *s3_file) +{ + struct flb_fstore_file *fsf; + + fsf = s3_file->fsf; + ctx->current_buffer_size -= s3_file->size; + + /* permanent deletion */ + flb_fstore_file_delete(ctx->fs, fsf); + flb_free(s3_file); + + return 0; +} + +int s3_store_file_read(struct flb_s3 *ctx, struct s3_file *s3_file, + char **out_buf, size_t *out_size) +{ + int ret; + + ret = flb_fstore_file_content_copy(ctx->fs, s3_file->fsf, + (void **) out_buf, out_size); + return ret; +} + +int s3_store_file_upload_read(struct flb_s3 *ctx, struct flb_fstore_file *fsf, + char **out_buf, size_t *out_size) +{ + int ret; + + ret = flb_fstore_file_content_copy(ctx->fs, fsf, + (void **) out_buf, out_size); + return ret; +} + +struct flb_fstore_file *s3_store_file_upload_get(struct flb_s3 *ctx, + char *key, int key_len) +{ + struct mk_list *head; + struct flb_fstore_file *fsf = NULL; + + mk_list_foreach(head, &ctx->stream_upload->files) { + fsf = mk_list_entry(head, struct flb_fstore_file, _head); + if (fsf->meta_buf == NULL) { + continue; + } + + if (fsf->meta_size != key_len ){ + continue; + } + + if (strncmp(fsf->meta_buf, key, key_len) == 0) { + break; + } + fsf = NULL; + } + + return fsf; +} + +/* param fsf can NULL if the file has not yet been created */ +int s3_store_file_upload_put(struct flb_s3 *ctx, + struct flb_fstore_file *fsf, flb_sds_t key, + flb_sds_t data) +{ + int ret; + flb_sds_t name; + + /* If no target file was found, create a new one */ + if (!fsf) { + name = gen_store_filename(key); + if (!name) { + flb_plg_error(ctx->ins, "could not generate chunk file name"); + return -1; + } + + /* Create the file */ + fsf = flb_fstore_file_create(ctx->fs, ctx->stream_upload, name, flb_sds_len(data)); + if (!fsf) { + flb_plg_error(ctx->ins, "could not create the file '%s' in the upload store", + name); + flb_sds_destroy(name); + return -1; + } + flb_sds_destroy(name); + + /* Write key as metadata */ + ret = flb_fstore_file_meta_set(ctx->fs, fsf, + key, flb_sds_len(key)); + if (ret == -1) { + flb_plg_error(ctx->ins, "error writing upload metadata"); + flb_plg_warn(ctx->ins, "Deleting s3 upload cache file because metadata could not be written"); + flb_fstore_file_delete(ctx->fs, fsf); + return -1; + } + } + + /* Append data to the target file */ + ret = flb_fstore_file_append(fsf, data, flb_sds_len(data)); + if (ret != 0) { + flb_plg_error(ctx->ins, "error writing data to local s3 file"); + return -1; + } + + return 0; +} + +int s3_store_file_upload_delete(struct flb_s3 *ctx, struct flb_fstore_file *fsf) +{ + /* permanent deletion */ + flb_fstore_file_delete(ctx->fs, fsf); + return 0; +} + +/* Always set an updated copy of metadata into the fs_store_file entry */ +int s3_store_file_meta_get(struct flb_s3 *ctx, struct flb_fstore_file *fsf) +{ + return flb_fstore_file_meta_get(ctx->fs, fsf); +} + +void s3_store_file_lock(struct s3_file *s3_file) +{ + s3_file->locked = FLB_TRUE; +} + +void s3_store_file_unlock(struct s3_file *s3_file) +{ + s3_file->locked = FLB_FALSE; +} diff --git a/fluent-bit/plugins/out_s3/s3_store.h b/fluent-bit/plugins/out_s3/s3_store.h new file mode 100644 index 00000000..9caa7bdf --- /dev/null +++ b/fluent-bit/plugins/out_s3/s3_store.h @@ -0,0 +1,68 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_S3_STORE_H +#define FLB_S3_STORE_H + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_fstore.h> + +struct s3_file { + int locked; /* locked chunk is busy, cannot write to it */ + int failures; /* delivery failures */ + size_t size; /* file size */ + time_t create_time; /* creation time */ + time_t first_log_time; /* first log time */ + flb_sds_t file_path; /* file path */ + struct flb_fstore_file *fsf; /* reference to parent flb_fstore_file */ +}; + +int s3_store_buffer_put(struct flb_s3 *ctx, struct s3_file *s3_file, + const char *tag, int tag_len, + char *data, size_t bytes, + time_t file_first_log_time); + +int s3_store_init(struct flb_s3 *ctx); +int s3_store_exit(struct flb_s3 *ctx); + +int s3_store_has_data(struct flb_s3 *ctx); +int s3_store_has_uploads(struct flb_s3 *ctx); + +int s3_store_file_inactive(struct flb_s3 *ctx, struct s3_file *s3_file); +struct s3_file *s3_store_file_get(struct flb_s3 *ctx, const char *tag, + int tag_len); +int s3_store_file_delete(struct flb_s3 *ctx, struct s3_file *s3_file); +int s3_store_file_read(struct flb_s3 *ctx, struct s3_file *s3_file, + char **out_buf, size_t *out_size); +int s3_store_file_upload_read(struct flb_s3 *ctx, struct flb_fstore_file *fsf, + char **out_buf, size_t *out_size); +struct flb_fstore_file *s3_store_file_upload_get(struct flb_s3 *ctx, + char *key, int key_len); + +int s3_store_file_upload_put(struct flb_s3 *ctx, + struct flb_fstore_file *fsf, flb_sds_t key, + flb_sds_t data); +int s3_store_file_upload_delete(struct flb_s3 *ctx, struct flb_fstore_file *fsf); + +int s3_store_file_meta_get(struct flb_s3 *ctx, struct flb_fstore_file *fsf); + +void s3_store_file_lock(struct s3_file *s3_file); +void s3_store_file_unlock(struct s3_file *s3_file); + +#endif |