summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/out_s3/s3.h
blob: e145b1ad6c9546745b2fee3a728e509e60d7f6a8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

/*  Fluent Bit
 *  ==========
 *  Copyright (C) 2015-2022 The Fluent Bit Authors
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

#ifndef FLB_OUT_S3
#define FLB_OUT_S3

#include <fluent-bit/flb_output_plugin.h>
#include <fluent-bit/flb_sds.h>
#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_aws_credentials.h>
#include <fluent-bit/flb_aws_util.h>

/* Upload data to S3 in 5MB chunks */
#define MIN_CHUNKED_UPLOAD_SIZE 5242880
#define MAX_CHUNKED_UPLOAD_SIZE 50000000
#define MAX_CHUNKED_UPLOAD_COMPRESS_SIZE 5000000000

#define UPLOAD_TIMER_MAX_WAIT 60000
#define UPLOAD_TIMER_MIN_WAIT 6000

#define MULTIPART_UPLOAD_STATE_NOT_CREATED              0
#define MULTIPART_UPLOAD_STATE_CREATED                  1
#define MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS     2

#define DEFAULT_FILE_SIZE     100000000
#define MAX_FILE_SIZE         50000000000
#define MAX_FILE_SIZE_STR     "50,000,000,000"

/* Allowed max file size 1 GB for publishing to S3 */
#define MAX_FILE_SIZE_PUT_OBJECT        1000000000 

#define DEFAULT_UPLOAD_TIMEOUT 3600

/*
 * If we see repeated errors on an upload/chunk, we will discard it
 * This saves us from scenarios where something goes wrong and an upload can
 * not proceed (may be some other process completed it or deleted the upload)
 * instead of erroring out forever, we eventually discard the upload.
 *
 * The same is done for chunks, just to be safe, even though realistically
 * I can't think of a reason why a chunk could become unsendable.
 */
#define MAX_UPLOAD_ERRORS 5

struct upload_queue {
    struct s3_file *upload_file;
    struct multipart_upload *m_upload_file;
    flb_sds_t tag;
    int tag_len;

    int retry_counter;
    time_t upload_time;

    struct mk_list _head;
};

struct multipart_upload {
    flb_sds_t s3_key;
    flb_sds_t tag;
    flb_sds_t upload_id;
    int upload_state;
    time_t init_time;

    /*
     * maximum of 10,000 parts in an upload, for each we need to store mapping
     * of Part Number to ETag
     */
    flb_sds_t etags[10000];
    int part_number;

    /*
     * we use async http, so we need to check that all part requests have
     * completed before we complete the upload
     */
    int parts_uploaded;

    /* ongoing tracker of how much data has been sent for this upload */
    size_t bytes;

    struct mk_list _head;

    /* see note for MAX_UPLOAD_ERRORS */
    int upload_errors;
    int complete_errors;
};

struct flb_s3 {
    char *bucket;
    char *region;
    char *s3_key_format;
    char *tag_delimiters;
    char *endpoint;
    char *sts_endpoint;
    char *canned_acl;
    char *content_type;
    char *storage_class;
    char *log_key;
    char *external_id;
    char *profile;
    int free_endpoint;
    int retry_requests;
    int use_put_object;
    int send_content_md5;
    int static_file_path;
    int compression;
    int port;
    int insecure;
    size_t store_dir_limit_size;

    /* track the total amount of buffered data */
    size_t current_buffer_size;

    struct flb_aws_provider *provider;
    struct flb_aws_provider *base_provider;
    /* tls instances can't be re-used; aws provider requires a separate one */
    struct flb_tls *provider_tls;
    /* one for the standard chain provider, one for sts assume role */
    struct flb_tls *sts_provider_tls;
    struct flb_tls *client_tls;

    struct flb_aws_client *s3_client;
    int json_date_format;
    flb_sds_t json_date_key;
    flb_sds_t date_key;

    flb_sds_t buffer_dir;

    char *store_dir;
    struct flb_fstore *fs;
    struct flb_fstore_stream *stream_active;  /* default active stream */
    struct flb_fstore_stream *stream_upload;  /* multipart upload stream */
    struct flb_fstore_stream *stream_metadata; /* s3 metadata stream */

    /*
     * used to track that unset buffers were found on startup that have not
     * been sent
     */
    int has_old_buffers;
    /* old multipart uploads read on start up */
    int has_old_uploads;

    struct mk_list uploads;

    int preserve_data_ordering;
    int upload_queue_success;
    struct mk_list upload_queue;

    size_t file_size;
    size_t upload_chunk_size;
    time_t upload_timeout;
    time_t retry_time;

    int timer_created;
    int timer_ms;
    int key_fmt_has_uuid;

    uint64_t seq_index;
    int key_fmt_has_seq_index;
    flb_sds_t metadata_dir;
    flb_sds_t seq_index_file;

    struct flb_output_instance *ins;
};

int upload_part(struct flb_s3 *ctx, struct multipart_upload *m_upload,
                char *body, size_t body_size);

int create_multipart_upload(struct flb_s3 *ctx,
                            struct multipart_upload *m_upload);

int complete_multipart_upload(struct flb_s3 *ctx,
                              struct multipart_upload *m_upload);

void multipart_read_uploads_from_fs(struct flb_s3 *ctx);

void multipart_upload_destroy(struct multipart_upload *m_upload);

struct flb_http_client *mock_s3_call(char *error_env_var, char *api);
int s3_plugin_under_test();

int get_md5_base64(char *buf, size_t buf_size, char *md5_str, size_t md5_str_size);

int create_headers(struct flb_s3 *ctx, char *body_md5,
                   struct flb_aws_header **headers, int *num_headers,
                   int multipart_upload);

#endif