summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/aws/flb_aws_compress.c
blob: e98ce8318c354aeef56ece19670df856a15cbb66 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

/*  Fluent Bit
 *  ==========
 *  Copyright (C) 2019-2021 The Fluent Bit Authors
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

#include <fluent-bit/flb_mem.h>
#include <fluent-bit/flb_log.h>
#include <fluent-bit/flb_base64.h>

#include <fluent-bit/aws/flb_aws_compress.h>
#include <fluent-bit/flb_gzip.h>

#include <stdint.h>

#ifdef FLB_HAVE_ARROW
#include "compression/arrow/compress.h"
#endif

struct compression_option {
    int compression_type;
    char *compression_keyword;
    int(*compress)(void *in_data, size_t in_len, void **out_data, size_t *out_len);
};

/*
 * Library of compression options
 * AWS plugins that support compression will have these options.
 * Referenced function should return -1 on error and 0 on success.
 */
static const struct compression_option compression_options[] = {
    /* FLB_AWS_COMPRESS_NONE which is 0 is reserved for array footer */
    {
        FLB_AWS_COMPRESS_GZIP,
        "gzip",
        &flb_gzip_compress
    },
#ifdef FLB_HAVE_ARROW
    {
        FLB_AWS_COMPRESS_ARROW,
        "arrow",
        &out_s3_compress_arrow
    },
#endif
    { 0 }
};

int flb_aws_compression_get_type(const char *compression_keyword)
{
    int ret;
    const struct compression_option *o;

    o = compression_options;

    while (o->compression_type != 0) {
        ret = strcmp(o->compression_keyword, compression_keyword);
        if (ret == 0) {
            return o->compression_type;
        }
        ++o;
    }

    flb_error("[aws_compress] unknown compression type: %s", compression_keyword);
    return -1;
}

int flb_aws_compression_compress(int compression_type, void *in_data, size_t in_len,
                                void **out_data, size_t *out_len)
{
    const struct compression_option *o;

    o = compression_options;

    while (o->compression_type != 0) {
        if (o->compression_type == compression_type) {
            return o->compress(in_data, in_len, out_data, out_len);
        }
        ++o;
    }
    
    flb_error("[aws_compress] invalid compression type: %i", compression_type);
    flb_errno();
    return -1;
}

int flb_aws_compression_b64_truncate_compress(int compression_type, size_t max_out_len,
                                             void *in_data, size_t in_len,
                                             void **out_data, size_t *out_len)
{
    static const void *truncation_suffix = "[Truncated...]";
    static const size_t truncation_suffix_len = 14;
    static const double truncation_reduction_percent = 90; /* % out of 100 */
    static const int truncation_compression_max_attempts = 10;

    int ret;
    int is_truncated;
    int compression_attempts;
    size_t truncated_in_len_prev;
    size_t truncated_in_len;
    void *truncated_in_buf;
    void *compressed_buf;
    size_t compressed_len;
    size_t original_b64_compressed_len;

    unsigned char *b64_compressed_buf;
    size_t b64_compressed_len;
    size_t b64_actual_len;

    /* Iterative approach to truncation */
    truncated_in_len = in_len;
    truncated_in_buf = in_data;
    is_truncated = FLB_FALSE;
    b64_compressed_len = SIZE_MAX;
    compression_attempts = 0;
    while (max_out_len < b64_compressed_len - 1) {

        /* Limit compression truncation attempts, just to be safe */
        if (compression_attempts >= truncation_compression_max_attempts) {
            if (is_truncated) {
                flb_free(truncated_in_buf);
            }
            flb_error("[aws_compress] truncation failed, too many compression attempts");
            return -1;
        }

        ret = flb_aws_compression_compress(compression_type, truncated_in_buf,
                                          truncated_in_len, &compressed_buf,
                                          &compressed_len);
        ++compression_attempts;
        if (ret != 0) {
            if (is_truncated) {
                flb_free(truncated_in_buf);
            }
            return -1;
        }

        /* Determine encoded base64 buffer size */
        b64_compressed_len = compressed_len / 3; /* Compute number of 4 sextet groups */
        b64_compressed_len += (compressed_len % 3 != 0); /* Add padding partial group */
        b64_compressed_len *= 4; /* Compute number of sextets */
        b64_compressed_len += 1; /* Add room for null character 0x00 */

        /* Truncation needed */
        if (max_out_len < b64_compressed_len - 1) {
            flb_debug("[aws_compress] iterative truncation round");

            /* This compressed_buf is the wrong size. Free */
            flb_free(compressed_buf);

            /* Base case: input compressed empty string, output still too large */
            if (truncated_in_len == 0) {
                if (is_truncated) {
                    flb_free(truncated_in_buf);
                }
                flb_error("[aws_compress] truncation failed, compressed empty input too "
                         "large");
                return -1;
            }
            
            /* Calculate corrected input size */
            truncated_in_len_prev = truncated_in_len;
            truncated_in_len = (max_out_len * truncated_in_len) / b64_compressed_len;
            truncated_in_len = (truncated_in_len * truncation_reduction_percent) / 100;

            /* Ensure working down */
            if (truncated_in_len >= truncated_in_len_prev) {
                truncated_in_len = truncated_in_len_prev - 1;
            }
            
            /* Allocate truncation buffer */
            if (!is_truncated) {
                is_truncated = FLB_TRUE;
                original_b64_compressed_len = b64_compressed_len;
                truncated_in_buf = flb_malloc(in_len);
                if (!truncated_in_buf) {
                    flb_errno();
                    return -1;
                }
                memcpy(truncated_in_buf, in_data, in_len);
            }

            /* Slap on truncation suffix */
            if (truncated_in_len < truncation_suffix_len) {
                /* No room for the truncation suffix. Terminal error */
                flb_error("[aws_compress] truncation failed, no room for suffix");
                flb_free(truncated_in_buf);
                return -1;
            }
            memcpy((char *) truncated_in_buf + truncated_in_len - truncation_suffix_len,
                  truncation_suffix, truncation_suffix_len);
        }
    }

    /* Truncate buffer free and compression buffer allocation */
    if (is_truncated) {
        flb_free(truncated_in_buf);
        flb_warn("[aws_compress][size=%zu] Truncating input for compressed output "
                "larger than %zu bytes, output from %zu to %zu bytes",
                in_len, max_out_len, original_b64_compressed_len - 1,
                b64_compressed_len - 1);
    }
    b64_compressed_buf = flb_malloc(b64_compressed_len);
    if (!b64_compressed_buf) {
        flb_errno();
        return -1;
    }

    /* Base64 encode compressed out bytes */
    ret = flb_base64_encode(b64_compressed_buf, b64_compressed_len, &b64_actual_len,
                               compressed_buf, compressed_len);
    flb_free(compressed_buf);

    if (ret == FLB_BASE64_ERR_BUFFER_TOO_SMALL) {
        flb_error("[aws_compress] compressed log base64 buffer too small");
        return -1; /* not handle truncation at this point */
    }
    if (ret != 0) {
        flb_free(b64_compressed_buf);
        return -1;
    }

    /* Double check b64 buf len */
    if (b64_compressed_len - 1 != b64_actual_len) {
        flb_error("[aws_compress] buffer len should be 1 greater than actual len");
        flb_free(b64_compressed_buf);
        return -1;
    }

    *out_data = b64_compressed_buf;
    *out_len = b64_compressed_len - 1; /* disregard added null character */
    return 0;
}