diff options
Diffstat (limited to 'src/fluent-bit/plugins/out_s3/s3_multipart.c')
-rw-r--r-- | src/fluent-bit/plugins/out_s3/s3_multipart.c | 707 |
1 files changed, 707 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/out_s3/s3_multipart.c b/src/fluent-bit/plugins/out_s3/s3_multipart.c new file mode 100644 index 000000000..1eb2a1061 --- /dev/null +++ b/src/fluent-bit/plugins/out_s3/s3_multipart.c @@ -0,0 +1,707 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_slist.h> +#include <fluent-bit/flb_time.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_config_map.h> +#include <fluent-bit/flb_aws_util.h> +#include <fluent-bit/flb_signv4.h> +#include <fluent-bit/flb_fstore.h> +#include <ctype.h> + +#include "s3.h" +#include "s3_store.h" + +#define COMPLETE_MULTIPART_UPLOAD_BASE_LEN 100 +#define COMPLETE_MULTIPART_UPLOAD_PART_LEN 124 + +flb_sds_t get_etag(char *response, size_t size); + +static inline int try_to_write(char *buf, int *off, size_t left, + const char *str, size_t str_len) +{ + if (str_len <= 0){ + str_len = strlen(str); + } + if (left <= *off+str_len) { + return FLB_FALSE; + } + memcpy(buf+*off, str, str_len); + *off += str_len; + return FLB_TRUE; +} + + +/* the 'tag' or key in the upload_dir is s3_key + \n + upload_id */ +static flb_sds_t upload_key(struct multipart_upload *m_upload) +{ + flb_sds_t key; + flb_sds_t tmp; + + key = flb_sds_create_size(64); + + tmp = flb_sds_printf(&key, "%s\n%s", m_upload->s3_key, m_upload->upload_id); + if (!tmp) { + flb_errno(); + flb_sds_destroy(key); + return NULL; + } + key = tmp; + + return key; +} + +/* the 'tag' or key in the upload_dir is s3_key + \n + upload_id */ +static int upload_data_from_key(struct multipart_upload *m_upload, char *key) +{ + flb_sds_t tmp_sds; + int len = 0; + int original_len; + char *tmp; + + original_len = strlen(key); + + tmp = strchr(key, '\n'); + if (!tmp) { + return -1; + } + + len = tmp - key; + tmp_sds = flb_sds_create_len(key, len); + if (!tmp_sds) { + flb_errno(); + return -1; + } + m_upload->s3_key = tmp_sds; + + tmp++; + original_len -= (len + 1); + + tmp_sds = flb_sds_create_len(tmp, original_len); + if (!tmp_sds) { + flb_errno(); + return -1; + } + m_upload->upload_id = tmp_sds; + + return 0; +} + +/* parse etags from file data */ +static void parse_etags(struct multipart_upload *m_upload, char *data) +{ + char *line = data; + char *start; + char *end; + flb_sds_t etag; + int part_num; + int len; + + if (!data) { + return; + } + + line = strtok(data, "\n"); + + if (!line) { + return; + } + + do { + start = strstr(line, "part_number="); + if (!start) { + return; + } + start += 12; + end = strchr(start, '\t'); + if (!end) { + flb_debug("[s3 restart parser] Did not find tab separator in line %s", start); + return; + } + *end = '\0'; + part_num = atoi(start); + if (part_num <= 0) { + flb_debug("[s3 restart parser] Could not parse part_number from %s", start); + return; + } + m_upload->part_number = part_num; + *end = '\t'; + + start = strstr(line, "tag="); + if (!start) { + flb_debug("[s3 restart parser] Could not find 'etag=' %s", line); + return; + } + + start += 4; + len = strlen(start); + + if (len <= 0) { + flb_debug("[s3 restart parser] Could not find etag %s", line); + return; + } + + etag = flb_sds_create_len(start, len); + if (!etag) { + flb_debug("[s3 restart parser] Could create etag"); + return; + } + flb_debug("[s3 restart parser] found part number %d=%s", part_num, etag); + m_upload->etags[part_num - 1] = etag; + + line = strtok(NULL, "\n"); + } while (line != NULL); +} + +static struct multipart_upload *upload_from_file(struct flb_s3 *ctx, + struct flb_fstore_file *fsf) +{ + struct multipart_upload *m_upload = NULL; + char *buffered_data = NULL; + size_t buffer_size = 0; + int ret; + + ret = s3_store_file_upload_read(ctx, fsf, &buffered_data, &buffer_size); + if (ret < 0) { + flb_plg_error(ctx->ins, "Could not read locally buffered data %s", + fsf->name); + return NULL; + } + + /* always make sure we have a fresh copy of metadata */ + ret = s3_store_file_meta_get(ctx, fsf); + if (ret == -1) { + flb_plg_error(ctx->ins, "Could not read file metadata: %s", + fsf->name); + return NULL; + } + + m_upload = flb_calloc(1, sizeof(struct multipart_upload)); + if (!m_upload) { + flb_errno(); + flb_free(buffered_data); + return NULL; + } + m_upload->init_time = time(NULL); + m_upload->upload_state = MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS; + + ret = upload_data_from_key(m_upload, fsf->meta_buf); + if (ret < 0) { + flb_plg_error(ctx->ins, "Could not extract upload data from: %s", + fsf->name); + flb_free(buffered_data); + multipart_upload_destroy(m_upload); + return NULL; + } + + parse_etags(m_upload, buffered_data); + flb_free(buffered_data); + if (m_upload->part_number == 0) { + flb_plg_error(ctx->ins, "Could not extract upload data from %s", + fsf->name); + multipart_upload_destroy(m_upload); + return NULL; + } + + /* code expects it to be 1 more than the last part read */ + m_upload->part_number++; + + return m_upload; +} + +void multipart_read_uploads_from_fs(struct flb_s3 *ctx) +{ + struct mk_list *tmp; + struct mk_list *head; + struct multipart_upload *m_upload = NULL; + struct flb_fstore_file *fsf; + + mk_list_foreach_safe(head, tmp, &ctx->stream_upload->files) { + fsf = mk_list_entry(head, struct flb_fstore_file, _head); + m_upload = upload_from_file(ctx, fsf); + if (!m_upload) { + flb_plg_error(ctx->ins, + "Could not process multipart upload data in %s", + fsf->name); + continue; + } + mk_list_add(&m_upload->_head, &ctx->uploads); + flb_plg_info(ctx->ins, + "Successfully read existing upload from file system, s3_key=%s", + m_upload->s3_key); + } +} + +/* store list of part number and etag */ +static flb_sds_t upload_data(flb_sds_t etag, int part_num) +{ + flb_sds_t data; + flb_sds_t tmp; + + data = flb_sds_create_size(64); + + tmp = flb_sds_printf(&data, "part_number=%d\tetag=%s\n", part_num, etag); + if (!tmp) { + flb_errno(); + flb_sds_destroy(data); + return NULL; + } + data = tmp; + + return data; +} + +/* persists upload data to the file system */ +static int save_upload(struct flb_s3 *ctx, struct multipart_upload *m_upload, + flb_sds_t etag) +{ + int ret; + flb_sds_t key; + flb_sds_t data; + struct flb_fstore_file *fsf; + + key = upload_key(m_upload); + if (!key) { + flb_plg_debug(ctx->ins, "Could not constuct upload key for buffer dir"); + return -1; + } + + data = upload_data(etag, m_upload->part_number); + if (!data) { + flb_plg_debug(ctx->ins, "Could not constuct upload key for buffer dir"); + return -1; + } + + fsf = s3_store_file_upload_get(ctx, key, flb_sds_len(key)); + + /* Write the key to the file */ + ret = s3_store_file_upload_put(ctx, fsf, key, data); + + flb_sds_destroy(key); + flb_sds_destroy(data); + + return ret; +} + +static int remove_upload_from_fs(struct flb_s3 *ctx, struct multipart_upload *m_upload) +{ + flb_sds_t key; + struct flb_fstore_file *fsf; + + key = upload_key(m_upload); + if (!key) { + flb_plg_debug(ctx->ins, "Could not construct upload key"); + return -1; + } + + fsf = s3_store_file_upload_get(ctx, key, flb_sds_len(key)); + if (fsf) { + s3_store_file_upload_delete(ctx, fsf); + } + flb_sds_destroy(key); + return 0; +} + +/* + * https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html + */ +static int complete_multipart_upload_payload(struct flb_s3 *ctx, + struct multipart_upload *m_upload, + char **out_buf, size_t *out_size) +{ + char *buf; + int i; + int offset = 0; + flb_sds_t etag; + size_t size = COMPLETE_MULTIPART_UPLOAD_BASE_LEN; + char part_num[7]; + + size = size + (COMPLETE_MULTIPART_UPLOAD_PART_LEN * m_upload->part_number); + + buf = flb_malloc(size + 1); + if (!buf) { + flb_errno(); + return -1; + } + + if (!try_to_write(buf, &offset, size, + "<CompleteMultipartUpload xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">", 73)) { + goto error; + } + + for (i = 0; i < m_upload->part_number; i++) { + etag = m_upload->etags[i]; + if (etag == NULL) { + continue; + } + if (!try_to_write(buf, &offset, size, + "<Part><ETag>", 12)) { + goto error; + } + + if (!try_to_write(buf, &offset, size, + etag, 0)) { + goto error; + } + + if (!try_to_write(buf, &offset, size, + "</ETag><PartNumber>", 19)) { + goto error; + } + + if (!sprintf(part_num, "%d", i + 1)) { + goto error; + } + + if (!try_to_write(buf, &offset, size, + part_num, 0)) { + goto error; + } + + if (!try_to_write(buf, &offset, size, + "</PartNumber></Part>", 20)) { + goto error; + } + } + + if (!try_to_write(buf, &offset, size, + "</CompleteMultipartUpload>", 26)) { + goto error; + } + + buf[offset] = '\0'; + + *out_buf = buf; + *out_size = offset; + return 0; + +error: + flb_free(buf); + flb_plg_error(ctx->ins, "Failed to construct CompleteMultipartUpload " + "request body"); + return -1; +} + +int complete_multipart_upload(struct flb_s3 *ctx, + struct multipart_upload *m_upload) +{ + char *body; + size_t size; + flb_sds_t uri = NULL; + flb_sds_t tmp; + int ret; + struct flb_http_client *c = NULL; + struct flb_aws_client *s3_client; + + if (!m_upload->upload_id) { + flb_plg_error(ctx->ins, "Cannot complete multipart upload for key %s: " + "upload ID is unset ", m_upload->s3_key); + return -1; + } + + uri = flb_sds_create_size(flb_sds_len(m_upload->s3_key) + 11 + + flb_sds_len(m_upload->upload_id)); + if (!uri) { + flb_errno(); + return -1; + } + + tmp = flb_sds_printf(&uri, "/%s%s?uploadId=%s", ctx->bucket, + m_upload->s3_key, m_upload->upload_id); + if (!tmp) { + flb_sds_destroy(uri); + return -1; + } + uri = tmp; + + ret = complete_multipart_upload_payload(ctx, m_upload, &body, &size); + if (ret < 0) { + flb_sds_destroy(uri); + return -1; + } + + s3_client = ctx->s3_client; + if (s3_plugin_under_test() == FLB_TRUE) { + c = mock_s3_call("TEST_COMPLETE_MULTIPART_UPLOAD_ERROR", "CompleteMultipartUpload"); + } + else { + c = s3_client->client_vtable->request(s3_client, FLB_HTTP_POST, + uri, body, size, + NULL, 0); + } + flb_sds_destroy(uri); + flb_free(body); + if (c) { + flb_plg_debug(ctx->ins, "CompleteMultipartUpload http status=%d", + c->resp.status); + if (c->resp.status == 200) { + flb_plg_info(ctx->ins, "Successfully completed multipart upload " + "for %s, UploadId=%s", m_upload->s3_key, + m_upload->upload_id); + flb_http_client_destroy(c); + /* remove this upload from the file system */ + remove_upload_from_fs(ctx, m_upload); + return 0; + } + flb_aws_print_xml_error(c->resp.payload, c->resp.payload_size, + "CompleteMultipartUpload", ctx->ins); + if (c->resp.payload != NULL) { + flb_plg_debug(ctx->ins, "Raw CompleteMultipartUpload response: %s", + c->resp.payload); + } + flb_http_client_destroy(c); + } + + flb_plg_error(ctx->ins, "CompleteMultipartUpload request failed"); + return -1; +} + + +int create_multipart_upload(struct flb_s3 *ctx, + struct multipart_upload *m_upload) +{ + flb_sds_t uri = NULL; + flb_sds_t tmp; + struct flb_http_client *c = NULL; + struct flb_aws_client *s3_client; + struct flb_aws_header *headers = NULL; + int num_headers = 0; + int ret; + + uri = flb_sds_create_size(flb_sds_len(m_upload->s3_key) + 8); + if (!uri) { + flb_errno(); + return -1; + } + + tmp = flb_sds_printf(&uri, "/%s%s?uploads=", ctx->bucket, m_upload->s3_key); + if (!tmp) { + flb_sds_destroy(uri); + return -1; + } + uri = tmp; + + s3_client = ctx->s3_client; + if (s3_plugin_under_test() == FLB_TRUE) { + c = mock_s3_call("TEST_CREATE_MULTIPART_UPLOAD_ERROR", "CreateMultipartUpload"); + } + else { + ret = create_headers(ctx, NULL, &headers, &num_headers, FLB_TRUE); + if (ret == -1) { + flb_plg_error(ctx->ins, "Failed to create headers"); + flb_sds_destroy(uri); + return -1; + } + c = s3_client->client_vtable->request(s3_client, FLB_HTTP_POST, + uri, NULL, 0, headers, num_headers); + if (headers) { + flb_free(headers); + } + } + flb_sds_destroy(uri); + if (c) { + flb_plg_debug(ctx->ins, "CreateMultipartUpload http status=%d", + c->resp.status); + if (c->resp.status == 200) { + tmp = flb_aws_xml_get_val(c->resp.payload, c->resp.payload_size, + "<UploadId>", "</UploadId>"); + if (!tmp) { + flb_plg_error(ctx->ins, "Could not find upload ID in " + "CreateMultipartUpload response"); + flb_plg_debug(ctx->ins, "Raw CreateMultipartUpload response: %s", + c->resp.payload); + flb_http_client_destroy(c); + return -1; + } + m_upload->upload_id = tmp; + flb_plg_info(ctx->ins, "Successfully initiated multipart upload " + "for %s, UploadId=%s", m_upload->s3_key, + m_upload->upload_id); + flb_http_client_destroy(c); + return 0; + } + flb_aws_print_xml_error(c->resp.payload, c->resp.payload_size, + "CreateMultipartUpload", ctx->ins); + if (c->resp.payload != NULL) { + flb_plg_debug(ctx->ins, "Raw CreateMultipartUpload response: %s", + c->resp.payload); + } + flb_http_client_destroy(c); + } + + flb_plg_error(ctx->ins, "CreateMultipartUpload request failed"); + return -1; +} + +/* gets the ETag value from response headers */ +flb_sds_t get_etag(char *response, size_t size) +{ + char *tmp; + int start; + int end; + int len; + int i = 0; + flb_sds_t etag; + + if (response == NULL) { + return NULL; + } + + tmp = strstr(response, "ETag:"); + if (!tmp) { + return NULL; + } + i = tmp - response; + + /* advance to end of ETag key */ + i += 5; + + /* advance across any whitespace and the opening quote */ + while (i < size && (response[i] == '\"' || isspace(response[i]) != 0)) { + i++; + } + start = i; + /* advance until we hit whitespace or the end quote */ + while (i < size && (response[i] != '\"' && isspace(response[i]) == 0)) { + i++; + } + end = i; + len = end - start; + + etag = flb_sds_create_len(response + start, len); + if (!etag) { + flb_errno(); + return NULL; + } + + return etag; +} + +int upload_part(struct flb_s3 *ctx, struct multipart_upload *m_upload, + char *body, size_t body_size) +{ + flb_sds_t uri = NULL; + flb_sds_t tmp; + int ret; + struct flb_http_client *c = NULL; + struct flb_aws_client *s3_client; + struct flb_aws_header *headers = NULL; + int num_headers = 0; + char body_md5[25]; + + uri = flb_sds_create_size(flb_sds_len(m_upload->s3_key) + 8); + if (!uri) { + flb_errno(); + return -1; + } + + tmp = flb_sds_printf(&uri, "/%s%s?partNumber=%d&uploadId=%s", + ctx->bucket, m_upload->s3_key, m_upload->part_number, + m_upload->upload_id); + if (!tmp) { + flb_errno(); + flb_sds_destroy(uri); + return -1; + } + uri = tmp; + + memset(body_md5, 0, sizeof(body_md5)); + if (ctx->send_content_md5 == FLB_TRUE) { + ret = get_md5_base64(body, body_size, body_md5, sizeof(body_md5)); + if (ret != 0) { + flb_plg_error(ctx->ins, "Failed to create Content-MD5 header"); + flb_sds_destroy(uri); + return -1; + } + + num_headers = 1; + headers = flb_malloc(sizeof(struct flb_aws_header) * num_headers); + if (headers == NULL) { + flb_errno(); + flb_sds_destroy(uri); + return -1; + } + + headers[0].key = "Content-MD5"; + headers[0].key_len = 11; + headers[0].val = body_md5; + headers[0].val_len = strlen(body_md5); + } + + s3_client = ctx->s3_client; + if (s3_plugin_under_test() == FLB_TRUE) { + c = mock_s3_call("TEST_UPLOAD_PART_ERROR", "UploadPart"); + } + else { + c = s3_client->client_vtable->request(s3_client, FLB_HTTP_PUT, + uri, body, body_size, + headers, num_headers); + } + flb_free(headers); + flb_sds_destroy(uri); + if (c) { + flb_plg_info(ctx->ins, "UploadPart http status=%d", + c->resp.status); + if (c->resp.status == 200) { + tmp = get_etag(c->resp.data, c->resp.data_size); + if (!tmp) { + flb_plg_error(ctx->ins, "Could not find ETag in " + "UploadPart response"); + flb_plg_debug(ctx->ins, "Raw UploadPart response: %s", + c->resp.payload); + flb_http_client_destroy(c); + return -1; + } + m_upload->etags[m_upload->part_number - 1] = tmp; + flb_plg_info(ctx->ins, "Successfully uploaded part #%d " + "for %s, UploadId=%s, ETag=%s", m_upload->part_number, + m_upload->s3_key, m_upload->upload_id, tmp); + flb_http_client_destroy(c); + /* track how many bytes are have gone toward this upload */ + m_upload->bytes += body_size; + + /* finally, attempt to persist the data for this upload */ + ret = save_upload(ctx, m_upload, tmp); + if (ret == 0) { + flb_plg_debug(ctx->ins, "Successfully persisted upload data, UploadId=%s", + m_upload->upload_id); + } + else { + flb_plg_warn(ctx->ins, "Was not able to persisted upload data to disk; " + "if fluent bit dies without completing this upload the part " + "could be lost, UploadId=%s, ETag=%s", + m_upload->upload_id, tmp); + } + return 0; + } + flb_aws_print_xml_error(c->resp.payload, c->resp.payload_size, + "UploadPart", ctx->ins); + if (c->resp.payload != NULL) { + flb_plg_debug(ctx->ins, "Raw UploadPart response: %s", + c->resp.payload); + } + flb_http_client_destroy(c); + } + + flb_plg_error(ctx->ins, "UploadPart request failed"); + return -1; +} |