diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:19:48 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:20:02 +0000 |
commit | 58daab21cd043e1dc37024a7f99b396788372918 (patch) | |
tree | 96771e43bb69f7c1c2b0b4f7374cb74d7866d0cb /fluent-bit/src/aws/flb_aws_compress.c | |
parent | Releasing debian version 1.43.2-1. (diff) | |
download | netdata-58daab21cd043e1dc37024a7f99b396788372918.tar.xz netdata-58daab21cd043e1dc37024a7f99b396788372918.zip |
Merging upstream version 1.44.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/src/aws/flb_aws_compress.c')
-rw-r--r-- | fluent-bit/src/aws/flb_aws_compress.c | 245 |
1 files changed, 245 insertions, 0 deletions
diff --git a/fluent-bit/src/aws/flb_aws_compress.c b/fluent-bit/src/aws/flb_aws_compress.c new file mode 100644 index 000000000..e98ce8318 --- /dev/null +++ b/fluent-bit/src/aws/flb_aws_compress.c @@ -0,0 +1,245 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019-2021 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/flb_log.h> +#include <fluent-bit/flb_base64.h> + +#include <fluent-bit/aws/flb_aws_compress.h> +#include <fluent-bit/flb_gzip.h> + +#include <stdint.h> + +#ifdef FLB_HAVE_ARROW +#include "compression/arrow/compress.h" +#endif + +struct compression_option { + int compression_type; + char *compression_keyword; + int(*compress)(void *in_data, size_t in_len, void **out_data, size_t *out_len); +}; + +/* + * Library of compression options + * AWS plugins that support compression will have these options. + * Referenced function should return -1 on error and 0 on success. + */ +static const struct compression_option compression_options[] = { + /* FLB_AWS_COMPRESS_NONE which is 0 is reserved for array footer */ + { + FLB_AWS_COMPRESS_GZIP, + "gzip", + &flb_gzip_compress + }, +#ifdef FLB_HAVE_ARROW + { + FLB_AWS_COMPRESS_ARROW, + "arrow", + &out_s3_compress_arrow + }, +#endif + { 0 } +}; + +int flb_aws_compression_get_type(const char *compression_keyword) +{ + int ret; + const struct compression_option *o; + + o = compression_options; + + while (o->compression_type != 0) { + ret = strcmp(o->compression_keyword, compression_keyword); + if (ret == 0) { + return o->compression_type; + } + ++o; + } + + flb_error("[aws_compress] unknown compression type: %s", compression_keyword); + return -1; +} + +int flb_aws_compression_compress(int compression_type, void *in_data, size_t in_len, + void **out_data, size_t *out_len) +{ + const struct compression_option *o; + + o = compression_options; + + while (o->compression_type != 0) { + if (o->compression_type == compression_type) { + return o->compress(in_data, in_len, out_data, out_len); + } + ++o; + } + + flb_error("[aws_compress] invalid compression type: %i", compression_type); + flb_errno(); + return -1; +} + +int flb_aws_compression_b64_truncate_compress(int compression_type, size_t max_out_len, + void *in_data, size_t in_len, + void **out_data, size_t *out_len) +{ + static const void *truncation_suffix = "[Truncated...]"; + static const size_t truncation_suffix_len = 14; + static const double truncation_reduction_percent = 90; /* % out of 100 */ + static const int truncation_compression_max_attempts = 10; + + int ret; + int is_truncated; + int compression_attempts; + size_t truncated_in_len_prev; + size_t truncated_in_len; + void *truncated_in_buf; + void *compressed_buf; + size_t compressed_len; + size_t original_b64_compressed_len; + + unsigned char *b64_compressed_buf; + size_t b64_compressed_len; + size_t b64_actual_len; + + /* Iterative approach to truncation */ + truncated_in_len = in_len; + truncated_in_buf = in_data; + is_truncated = FLB_FALSE; + b64_compressed_len = SIZE_MAX; + compression_attempts = 0; + while (max_out_len < b64_compressed_len - 1) { + + /* Limit compression truncation attempts, just to be safe */ + if (compression_attempts >= truncation_compression_max_attempts) { + if (is_truncated) { + flb_free(truncated_in_buf); + } + flb_error("[aws_compress] truncation failed, too many compression attempts"); + return -1; + } + + ret = flb_aws_compression_compress(compression_type, truncated_in_buf, + truncated_in_len, &compressed_buf, + &compressed_len); + ++compression_attempts; + if (ret != 0) { + if (is_truncated) { + flb_free(truncated_in_buf); + } + return -1; + } + + /* Determine encoded base64 buffer size */ + b64_compressed_len = compressed_len / 3; /* Compute number of 4 sextet groups */ + b64_compressed_len += (compressed_len % 3 != 0); /* Add padding partial group */ + b64_compressed_len *= 4; /* Compute number of sextets */ + b64_compressed_len += 1; /* Add room for null character 0x00 */ + + /* Truncation needed */ + if (max_out_len < b64_compressed_len - 1) { + flb_debug("[aws_compress] iterative truncation round"); + + /* This compressed_buf is the wrong size. Free */ + flb_free(compressed_buf); + + /* Base case: input compressed empty string, output still too large */ + if (truncated_in_len == 0) { + if (is_truncated) { + flb_free(truncated_in_buf); + } + flb_error("[aws_compress] truncation failed, compressed empty input too " + "large"); + return -1; + } + + /* Calculate corrected input size */ + truncated_in_len_prev = truncated_in_len; + truncated_in_len = (max_out_len * truncated_in_len) / b64_compressed_len; + truncated_in_len = (truncated_in_len * truncation_reduction_percent) / 100; + + /* Ensure working down */ + if (truncated_in_len >= truncated_in_len_prev) { + truncated_in_len = truncated_in_len_prev - 1; + } + + /* Allocate truncation buffer */ + if (!is_truncated) { + is_truncated = FLB_TRUE; + original_b64_compressed_len = b64_compressed_len; + truncated_in_buf = flb_malloc(in_len); + if (!truncated_in_buf) { + flb_errno(); + return -1; + } + memcpy(truncated_in_buf, in_data, in_len); + } + + /* Slap on truncation suffix */ + if (truncated_in_len < truncation_suffix_len) { + /* No room for the truncation suffix. Terminal error */ + flb_error("[aws_compress] truncation failed, no room for suffix"); + flb_free(truncated_in_buf); + return -1; + } + memcpy((char *) truncated_in_buf + truncated_in_len - truncation_suffix_len, + truncation_suffix, truncation_suffix_len); + } + } + + /* Truncate buffer free and compression buffer allocation */ + if (is_truncated) { + flb_free(truncated_in_buf); + flb_warn("[aws_compress][size=%zu] Truncating input for compressed output " + "larger than %zu bytes, output from %zu to %zu bytes", + in_len, max_out_len, original_b64_compressed_len - 1, + b64_compressed_len - 1); + } + b64_compressed_buf = flb_malloc(b64_compressed_len); + if (!b64_compressed_buf) { + flb_errno(); + return -1; + } + + /* Base64 encode compressed out bytes */ + ret = flb_base64_encode(b64_compressed_buf, b64_compressed_len, &b64_actual_len, + compressed_buf, compressed_len); + flb_free(compressed_buf); + + if (ret == FLB_BASE64_ERR_BUFFER_TOO_SMALL) { + flb_error("[aws_compress] compressed log base64 buffer too small"); + return -1; /* not handle truncation at this point */ + } + if (ret != 0) { + flb_free(b64_compressed_buf); + return -1; + } + + /* Double check b64 buf len */ + if (b64_compressed_len - 1 != b64_actual_len) { + flb_error("[aws_compress] buffer len should be 1 greater than actual len"); + flb_free(b64_compressed_buf); + return -1; + } + + *out_data = b64_compressed_buf; + *out_len = b64_compressed_len - 1; /* disregard added null character */ + return 0; +} |