summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/aws/flb_aws_util.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/src/aws/flb_aws_util.c')
-rw-r--r--fluent-bit/src/aws/flb_aws_util.c1047
1 files changed, 1047 insertions, 0 deletions
diff --git a/fluent-bit/src/aws/flb_aws_util.c b/fluent-bit/src/aws/flb_aws_util.c
new file mode 100644
index 000000000..533bba7eb
--- /dev/null
+++ b/fluent-bit/src/aws/flb_aws_util.c
@@ -0,0 +1,1047 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/flb_http_client.h>
+#include <fluent-bit/flb_signv4.h>
+#include <fluent-bit/flb_aws_util.h>
+#include <fluent-bit/flb_aws_credentials.h>
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_jsmn.h>
+#include <fluent-bit/flb_env.h>
+
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#define AWS_SERVICE_ENDPOINT_FORMAT "%s.%s.amazonaws.com"
+#define AWS_SERVICE_ENDPOINT_BASE_LEN 15
+
+#define TAG_PART_DESCRIPTOR "$TAG[%d]"
+#define TAG_DESCRIPTOR "$TAG"
+#define MAX_TAG_PARTS 10
+#define S3_KEY_SIZE 1024
+#define RANDOM_STRING "$UUID"
+#define INDEX_STRING "$INDEX"
+#define AWS_USER_AGENT_NONE "none"
+#define AWS_USER_AGENT_ECS "ecs"
+#define AWS_USER_AGENT_K8S "k8s"
+#define AWS_ECS_METADATA_URI "ECS_CONTAINER_METADATA_URI_V4"
+#define FLB_MAX_AWS_RESP_BUFFER_SIZE 0 /* 0 means unlimited capacity as per requirement */
+
+#ifdef FLB_SYSTEM_WINDOWS
+#define FLB_AWS_BASE_USER_AGENT "aws-fluent-bit-plugin-windows"
+#define FLB_AWS_BASE_USER_AGENT_FORMAT "aws-fluent-bit-plugin-windows-%s"
+#define FLB_AWS_BASE_USER_AGENT_LEN 29
+#else
+#define FLB_AWS_BASE_USER_AGENT "aws-fluent-bit-plugin"
+#define FLB_AWS_BASE_USER_AGENT_FORMAT "aws-fluent-bit-plugin-%s"
+#define FLB_AWS_BASE_USER_AGENT_LEN 21
+#endif
+
+#define FLB_AWS_MILLISECOND_FORMATTER_LENGTH 3
+#define FLB_AWS_NANOSECOND_FORMATTER_LENGTH 9
+#define FLB_AWS_MILLISECOND_FORMATTER "%3N"
+#define FLB_AWS_NANOSECOND_FORMATTER_N "%9N"
+#define FLB_AWS_NANOSECOND_FORMATTER_L "%L"
+
+struct flb_http_client *request_do(struct flb_aws_client *aws_client,
+ int method, const char *uri,
+ const char *body, size_t body_len,
+ struct flb_aws_header *dynamic_headers,
+ size_t dynamic_headers_len);
+
+/*
+ * https://service.region.amazonaws.com(.cn)
+ */
+char *flb_aws_endpoint(char* service, char* region)
+{
+ char *endpoint = NULL;
+ size_t len = AWS_SERVICE_ENDPOINT_BASE_LEN;
+ int is_cn = FLB_FALSE;
+ int bytes;
+
+
+ /* In the China regions, ".cn" is appended to the URL */
+ if (strcmp("cn-north-1", region) == 0) {
+ len += 3;
+ is_cn = FLB_TRUE;
+ }
+ if (strcmp("cn-northwest-1", region) == 0) {
+ len += 3;
+ is_cn = FLB_TRUE;
+ }
+
+ len += strlen(service);
+ len += strlen(region);
+ len++; /* null byte */
+
+ endpoint = flb_calloc(len, sizeof(char));
+ if (!endpoint) {
+ flb_errno();
+ return NULL;
+ }
+
+ bytes = snprintf(endpoint, len, AWS_SERVICE_ENDPOINT_FORMAT, service, region);
+ if (bytes < 0) {
+ flb_errno();
+ flb_free(endpoint);
+ return NULL;
+ }
+
+ if (is_cn) {
+ memcpy(endpoint + bytes, ".cn", 3);
+ endpoint[bytes + 3] = '\0';
+ }
+
+ return endpoint;
+
+}
+
+int flb_read_file(const char *path, char **out_buf, size_t *out_size)
+{
+ int ret;
+ long bytes;
+ char *buf = NULL;
+ struct stat st;
+ int fd;
+
+ fd = open(path, O_RDONLY);
+ if (fd < 0) {
+ return -1;
+ }
+
+ ret = fstat(fd, &st);
+ if (ret == -1) {
+ flb_errno();
+ close(fd);
+ return -1;
+ }
+
+ buf = flb_calloc(st.st_size + 1, sizeof(char));
+ if (!buf) {
+ flb_errno();
+ close(fd);
+ return -1;
+ }
+
+ bytes = read(fd, buf, st.st_size);
+ if (bytes < 0) {
+ flb_errno();
+ flb_free(buf);
+ close(fd);
+ return -1;
+ }
+
+ /* fread does not add null byte */
+ buf[st.st_size] = '\0';
+
+ close(fd);
+ *out_buf = buf;
+ *out_size = st.st_size;
+
+ return 0;
+}
+
+
+char *removeProtocol (char *endpoint, char *protocol) {
+ if (strncmp(protocol, endpoint, strlen(protocol)) == 0){
+ endpoint = endpoint + strlen(protocol);
+ }
+ return endpoint;
+}
+
+struct flb_http_client *flb_aws_client_request(struct flb_aws_client *aws_client,
+ int method, const char *uri,
+ const char *body, size_t body_len,
+ struct flb_aws_header
+ *dynamic_headers,
+ size_t dynamic_headers_len)
+{
+ struct flb_http_client *c = NULL;
+
+ c = request_do(aws_client, method, uri, body, body_len,
+ dynamic_headers, dynamic_headers_len);
+
+ // Auto retry if request fails
+ if (c == NULL && aws_client->retry_requests) {
+ flb_debug("[aws_client] auto-retrying");
+ c = request_do(aws_client, method, uri, body, body_len,
+ dynamic_headers, dynamic_headers_len);
+ }
+
+ /*
+ * 400 or 403 could indicate an issue with credentials- so we check for auth
+ * specific error messages and then force a refresh on the provider.
+ * For safety a refresh can be performed only once
+ * per FLB_AWS_CREDENTIAL_REFRESH_LIMIT.
+ *
+ */
+ if (c && (c->resp.status >= 400 && c->resp.status < 500)) {
+ if (aws_client->has_auth && time(NULL) > aws_client->refresh_limit) {
+ if (flb_aws_is_auth_error(c->resp.payload, c->resp.payload_size)
+ == FLB_TRUE) {
+ flb_info("[aws_client] auth error, refreshing creds");
+ aws_client->refresh_limit = time(NULL)
+ + FLB_AWS_CREDENTIAL_REFRESH_LIMIT;
+ aws_client->provider->provider_vtable->
+ refresh(aws_client->provider);
+ }
+ }
+ }
+
+ return c;
+}
+
+static struct flb_aws_client_vtable client_vtable = {
+ .request = flb_aws_client_request,
+};
+
+struct flb_aws_client *flb_aws_client_create()
+{
+ struct flb_aws_client *client = flb_calloc(1, sizeof(struct flb_aws_client));
+ if (!client) {
+ flb_errno();
+ return NULL;
+ }
+ client->client_vtable = &client_vtable;
+ client->retry_requests = FLB_FALSE;
+ client->debug_only = FLB_FALSE;
+ return client;
+}
+
+/* Generator that returns clients with the default vtable */
+
+static struct flb_aws_client_generator default_generator = {
+ .create = flb_aws_client_create,
+};
+
+struct flb_aws_client_generator *flb_aws_client_generator()
+{
+ return &default_generator;
+}
+
+void flb_aws_client_destroy(struct flb_aws_client *aws_client)
+{
+ if (aws_client) {
+ if (aws_client->upstream) {
+ flb_upstream_destroy(aws_client->upstream);
+ }
+ if (aws_client->extra_user_agent) {
+ flb_sds_destroy(aws_client->extra_user_agent);
+ }
+ flb_free(aws_client);
+ }
+}
+
+int flb_aws_is_auth_error(char *payload, size_t payload_size)
+{
+ flb_sds_t error = NULL;
+
+ if (payload_size == 0) {
+ return FLB_FALSE;
+ }
+
+ /* Fluent Bit calls the STS API which returns XML */
+ if (strcasestr(payload, "InvalidClientTokenId") != NULL) {
+ return FLB_TRUE;
+ }
+
+ if (strcasestr(payload, "AccessDenied") != NULL) {
+ return FLB_TRUE;
+ }
+
+ if (strcasestr(payload, "Expired") != NULL) {
+ return FLB_TRUE;
+ }
+
+ /* Most APIs we use return JSON */
+ error = flb_aws_error(payload, payload_size);
+ if (error != NULL) {
+ if (strcmp(error, "ExpiredToken") == 0 ||
+ strcmp(error, "ExpiredTokenException") == 0 ||
+ strcmp(error, "AccessDeniedException") == 0 ||
+ strcmp(error, "AccessDenied") == 0 ||
+ strcmp(error, "IncompleteSignature") == 0 ||
+ strcmp(error, "SignatureDoesNotMatch") == 0 ||
+ strcmp(error, "MissingAuthenticationToken") == 0 ||
+ strcmp(error, "InvalidClientTokenId") == 0 ||
+ strcmp(error, "InvalidToken") == 0 ||
+ strcmp(error, "InvalidAccessKeyId") == 0 ||
+ strcmp(error, "UnrecognizedClientException") == 0) {
+ flb_sds_destroy(error);
+ return FLB_TRUE;
+ }
+ flb_sds_destroy(error);
+ }
+
+ return FLB_FALSE;
+}
+
+struct flb_http_client *request_do(struct flb_aws_client *aws_client,
+ int method, const char *uri,
+ const char *body, size_t body_len,
+ struct flb_aws_header *dynamic_headers,
+ size_t dynamic_headers_len)
+{
+ size_t b_sent;
+ int ret;
+ struct flb_connection *u_conn = NULL;
+ flb_sds_t signature = NULL;
+ int i;
+ int normalize_uri;
+ struct flb_aws_header header;
+ struct flb_http_client *c = NULL;
+ flb_sds_t tmp;
+ flb_sds_t user_agent_prefix;
+ flb_sds_t user_agent = NULL;
+ char *buf;
+ struct flb_env *env;
+
+ u_conn = flb_upstream_conn_get(aws_client->upstream);
+ if (!u_conn) {
+ if (aws_client->debug_only == FLB_TRUE) {
+ flb_debug("[aws_client] connection initialization error");
+ }
+ else {
+ flb_error("[aws_client] connection initialization error");
+ }
+ return NULL;
+ }
+
+ /* Compose HTTP request */
+ c = flb_http_client(u_conn, method, uri,
+ body, body_len,
+ aws_client->host, aws_client->port,
+ aws_client->proxy, aws_client->flags);
+
+ if (!c) {
+ if (aws_client->debug_only == FLB_TRUE) {
+ flb_debug("[aws_client] could not initialize request");
+ }
+ else {
+ flb_error("[aws_client] could not initialize request");
+ }
+ goto error;
+ }
+
+ /* Increase the maximum HTTP response buffer size to fit large responses from AWS services */
+ ret = flb_http_buffer_size(c, FLB_MAX_AWS_RESP_BUFFER_SIZE);
+ if (ret != 0) {
+ flb_warn("[aws_http_client] failed to increase max response buffer size");
+ }
+
+ /* Set AWS Fluent Bit user agent */
+ env = aws_client->upstream->base.config->env;
+ buf = (char *) flb_env_get(env, "FLB_AWS_USER_AGENT");
+ if (buf == NULL) {
+ if (getenv(AWS_ECS_METADATA_URI) != NULL) {
+ user_agent = AWS_USER_AGENT_ECS;
+ }
+ else {
+ buf = (char *) flb_env_get(env, AWS_USER_AGENT_K8S);
+ if (buf && strcasecmp(buf, "enabled") == 0) {
+ user_agent = AWS_USER_AGENT_K8S;
+ }
+ }
+
+ if (user_agent == NULL) {
+ user_agent = AWS_USER_AGENT_NONE;
+ }
+
+ flb_env_set(env, "FLB_AWS_USER_AGENT", user_agent);
+ }
+ if (aws_client->extra_user_agent == NULL) {
+ buf = (char *) flb_env_get(env, "FLB_AWS_USER_AGENT");
+ tmp = flb_sds_create(buf);
+ if (!tmp) {
+ flb_errno();
+ goto error;
+ }
+ aws_client->extra_user_agent = tmp;
+ tmp = NULL;
+ }
+
+ /* Add AWS Fluent Bit user agent header */
+ if (strcasecmp(aws_client->extra_user_agent, AWS_USER_AGENT_NONE) == 0) {
+ ret = flb_http_add_header(c, "User-Agent", 10,
+ FLB_AWS_BASE_USER_AGENT, FLB_AWS_BASE_USER_AGENT_LEN);
+ }
+ else {
+ user_agent_prefix = flb_sds_create_size(64);
+ if (!user_agent_prefix) {
+ flb_errno();
+ flb_error("[aws_client] failed to create user agent");
+ goto error;
+ }
+ tmp = flb_sds_printf(&user_agent_prefix, FLB_AWS_BASE_USER_AGENT_FORMAT,
+ aws_client->extra_user_agent);
+ if (!tmp) {
+ flb_errno();
+ flb_sds_destroy(user_agent_prefix);
+ flb_error("[aws_client] failed to create user agent");
+ goto error;
+ }
+ user_agent_prefix = tmp;
+
+ ret = flb_http_add_header(c, "User-Agent", 10, user_agent_prefix,
+ flb_sds_len(user_agent_prefix));
+ flb_sds_destroy(user_agent_prefix);
+ }
+
+ if (ret < 0) {
+ if (aws_client->debug_only == FLB_TRUE) {
+ flb_debug("[aws_client] failed to add header to request");
+ }
+ else {
+ flb_error("[aws_client] failed to add header to request");
+ }
+ goto error;
+ }
+
+ /* add headers */
+ for (i = 0; i < aws_client->static_headers_len; i++) {
+ header = aws_client->static_headers[i];
+ ret = flb_http_add_header(c,
+ header.key, header.key_len,
+ header.val, header.val_len);
+ if (ret < 0) {
+ if (aws_client->debug_only == FLB_TRUE) {
+ flb_debug("[aws_client] failed to add header to request");
+ }
+ else {
+ flb_error("[aws_client] failed to add header to request");
+ }
+ goto error;
+ }
+ }
+
+ for (i = 0; i < dynamic_headers_len; i++) {
+ header = dynamic_headers[i];
+ ret = flb_http_add_header(c,
+ header.key, header.key_len,
+ header.val, header.val_len);
+ if (ret < 0) {
+ if (aws_client->debug_only == FLB_TRUE) {
+ flb_debug("[aws_client] failed to add header to request");
+ }
+ else {
+ flb_error("[aws_client] failed to add header to request");
+ }
+ goto error;
+ }
+ }
+
+ if (aws_client->has_auth) {
+ if (aws_client->s3_mode == S3_MODE_NONE) {
+ normalize_uri = FLB_TRUE;
+ }
+ else {
+ normalize_uri = FLB_FALSE;
+ }
+ signature = flb_signv4_do(c, normalize_uri, FLB_TRUE, time(NULL),
+ aws_client->region, aws_client->service,
+ aws_client->s3_mode, NULL,
+ aws_client->provider);
+ if (!signature) {
+ if (aws_client->debug_only == FLB_TRUE) {
+ flb_debug("[aws_client] could not sign request");
+ }
+ else {
+ flb_error("[aws_client] could not sign request");
+ }
+ goto error;
+ }
+ }
+
+ /* Perform request */
+ ret = flb_http_do(c, &b_sent);
+
+ if (ret != 0 || c->resp.status != 200) {
+ flb_debug("[aws_client] %s: http_do=%i, HTTP Status: %i",
+ aws_client->host, ret, c->resp.status);
+ }
+
+ if (ret != 0 && c != NULL) {
+ flb_http_client_destroy(c);
+ c = NULL;
+ }
+
+ flb_upstream_conn_release(u_conn);
+ flb_sds_destroy(signature);
+ return c;
+
+error:
+ if (u_conn) {
+ flb_upstream_conn_release(u_conn);
+ }
+ if (signature) {
+ flb_sds_destroy(signature);
+ }
+ if (c) {
+ flb_http_client_destroy(c);
+ }
+ return NULL;
+}
+
+void flb_aws_print_xml_error(char *response, size_t response_len,
+ char *api, struct flb_output_instance *ins)
+{
+ flb_sds_t error;
+ flb_sds_t message;
+
+ error = flb_aws_xml_get_val(response, response_len, "<Code>", "</Code>");
+ if (!error) {
+ flb_plg_error(ins, "%s: Could not parse response", api);
+ return;
+ }
+
+ message = flb_aws_xml_get_val(response, response_len, "<Message>", "</Message>");
+ if (!message) {
+ /* just print the error */
+ flb_plg_error(ins, "%s API responded with error='%s'", api, error);
+ }
+ else {
+ flb_plg_error(ins, "%s API responded with error='%s', message='%s'",
+ api, error, message);
+ flb_sds_destroy(message);
+ }
+
+ flb_sds_destroy(error);
+}
+
+/* Parses AWS XML API Error responses and returns the value of the <code> tag */
+flb_sds_t flb_aws_xml_error(char *response, size_t response_len)
+{
+ return flb_aws_xml_get_val(response, response_len, "<Code>", "</Code>");
+}
+
+/*
+ * Parses an XML document and returns the value of the given tag
+ * Param `tag` should include angle brackets; ex "<code>"
+ * And param `end` should include end brackets: "</code>"
+ */
+flb_sds_t flb_aws_xml_get_val(char *response, size_t response_len, char *tag, char *tag_end)
+{
+ flb_sds_t val = NULL;
+ char *node = NULL;
+ char *end;
+ int len;
+
+ if (response_len == 0) {
+ return NULL;
+ }
+ node = strstr(response, tag);
+ if (!node) {
+ flb_debug("[aws] Could not find '%s' tag in API response", tag);
+ return NULL;
+ }
+
+ /* advance to end of tag */
+ node += strlen(tag);
+
+ end = strstr(node, tag_end);
+ if (!end) {
+ flb_error("[aws] Could not find end of '%s' node in xml", tag);
+ return NULL;
+ }
+ len = end - node;
+ val = flb_sds_create_len(node, len);
+ if (!val) {
+ flb_errno();
+ return NULL;
+ }
+
+ return val;
+}
+
+void flb_aws_print_error(char *response, size_t response_len,
+ char *api, struct flb_output_instance *ins)
+{
+ flb_sds_t error;
+ flb_sds_t message;
+
+ error = flb_json_get_val(response, response_len, "__type");
+ if (!error) {
+ return;
+ }
+
+ message = flb_json_get_val(response, response_len, "message");
+ if (!message) {
+ /* just print the error */
+ flb_plg_error(ins, "%s API responded with error='%s'", api, error);
+ }
+ else {
+ flb_plg_error(ins, "%s API responded with error='%s', message='%s'",
+ api, error, message);
+ flb_sds_destroy(message);
+ }
+
+ flb_sds_destroy(error);
+}
+
+/* parses AWS JSON API error responses and returns the value of the __type field */
+flb_sds_t flb_aws_error(char *response, size_t response_len)
+{
+ return flb_json_get_val(response, response_len, "__type");
+}
+
+/* gets the value of a key in a json string */
+flb_sds_t flb_json_get_val(char *response, size_t response_len, char *key)
+{
+ jsmntok_t *tokens = NULL;
+ const jsmntok_t *t = NULL;
+ char *current_token = NULL;
+ jsmn_parser parser;
+ int tokens_size = 50;
+ size_t size;
+ int ret;
+ int i = 0;
+ int len;
+ flb_sds_t error_type = NULL;
+
+ jsmn_init(&parser);
+
+ size = sizeof(jsmntok_t) * tokens_size;
+ tokens = flb_calloc(1, size);
+ if (!tokens) {
+ flb_errno();
+ return NULL;
+ }
+
+ ret = jsmn_parse(&parser, response, response_len,
+ tokens, tokens_size);
+
+ if (ret == JSMN_ERROR_INVAL || ret == JSMN_ERROR_PART) {
+ flb_free(tokens);
+ flb_debug("[aws_client] Unable to parse API response- response is not"
+ " valid JSON.");
+ return NULL;
+ }
+
+ /* return value is number of tokens parsed */
+ tokens_size = ret;
+
+ /*
+ * jsmn will create an array of tokens like:
+ * key, value, key, value
+ */
+ while (i < (tokens_size - 1)) {
+ t = &tokens[i];
+
+ if (t->start == -1 || t->end == -1 || (t->start == 0 && t->end == 0)) {
+ break;
+ }
+
+ if (t->type == JSMN_STRING) {
+ current_token = &response[t->start];
+
+ if (strncmp(current_token, key, strlen(key)) == 0) {
+ i++;
+ t = &tokens[i];
+ current_token = &response[t->start];
+ len = t->end - t->start;
+ error_type = flb_sds_create_len(current_token, len);
+ if (!error_type) {
+ flb_errno();
+ flb_free(tokens);
+ return NULL;
+ }
+ break;
+ }
+ }
+
+ i++;
+ }
+ flb_free(tokens);
+ return error_type;
+}
+
+/* Generic replace function for strings. */
+static char* replace_uri_tokens(const char* original_string, const char* current_word,
+ const char* new_word)
+{
+ char *result;
+ int i = 0;
+ int count = 0;
+ int new_word_len = strlen(new_word);
+ int old_word_len = strlen(current_word);
+
+ for (i = 0; original_string[i] != '\0'; i++) {
+ if (strstr(&original_string[i], current_word) == &original_string[i]) {
+ count++;
+ i += old_word_len - 1;
+ }
+ }
+
+ result = flb_sds_create_size(i + count * (new_word_len - old_word_len) + 1);
+ if (!result) {
+ flb_errno();
+ return NULL;
+ }
+
+ i = 0;
+ while (*original_string) {
+ if (strstr(original_string, current_word) == original_string) {
+ strncpy(&result[i], new_word, new_word_len);
+ i += new_word_len;
+ original_string += old_word_len;
+ }
+ else
+ result[i++] = *original_string++;
+ }
+
+ result[i] = '\0';
+ return result;
+}
+
+/*
+ * Linux has strtok_r as the concurrent safe version
+ * Windows has strtok_s
+ */
+char* strtok_concurrent(
+ char* str,
+ char* delimiters,
+ char** context
+)
+{
+#ifdef FLB_SYSTEM_WINDOWS
+ return strtok_s(str, delimiters, context);
+#else
+ return strtok_r(str, delimiters, context);
+#endif
+}
+
+/* Constructs S3 object key as per the format. */
+flb_sds_t flb_get_s3_key(const char *format, time_t time, const char *tag,
+ char *tag_delimiter, uint64_t seq_index)
+{
+ int i = 0;
+ int ret = 0;
+ int seq_index_len;
+ char *tag_token = NULL;
+ char *key;
+ char *random_alphanumeric;
+ char *seq_index_str;
+ /* concurrent safe strtok_r requires a tracking ptr */
+ char *strtok_saveptr;
+ int len;
+ flb_sds_t tmp = NULL;
+ flb_sds_t buf = NULL;
+ flb_sds_t s3_key = NULL;
+ flb_sds_t tmp_key = NULL;
+ flb_sds_t tmp_tag = NULL;
+ struct tm gmt = {0};
+
+ if (strlen(format) > S3_KEY_SIZE){
+ flb_warn("[s3_key] Object key length is longer than the 1024 character limit.");
+ }
+
+ tmp_tag = flb_sds_create_len(tag, strlen(tag));
+ if(!tmp_tag){
+ goto error;
+ }
+
+ s3_key = flb_sds_create_len(format, strlen(format));
+ if (!s3_key) {
+ goto error;
+ }
+
+ /* Check if delimiter(s) specifed exists in the tag. */
+ for (i = 0; i < strlen(tag_delimiter); i++){
+ if (strchr(tag, tag_delimiter[i])){
+ ret = 1;
+ break;
+ }
+ }
+
+ tmp = flb_sds_create_len(TAG_PART_DESCRIPTOR, 5);
+ if (!tmp) {
+ goto error;
+ }
+ if (strstr(s3_key, tmp)){
+ if(ret == 0){
+ flb_warn("[s3_key] Invalid Tag delimiter: does not exist in tag. "
+ "tag=%s, format=%s", tag, format);
+ }
+ }
+
+ flb_sds_destroy(tmp);
+ tmp = NULL;
+
+ /* Split the string on the delimiters */
+ tag_token = strtok_concurrent(tmp_tag, tag_delimiter, &strtok_saveptr);
+
+ /* Find all occurences of $TAG[*] and
+ * replaces it with the right token from tag.
+ */
+ i = 0;
+ while(tag_token != NULL && i < MAX_TAG_PARTS) {
+ buf = flb_sds_create_size(10);
+ if (!buf) {
+ goto error;
+ }
+ tmp = flb_sds_printf(&buf, TAG_PART_DESCRIPTOR, i);
+ if (!tmp) {
+ goto error;
+ }
+
+ tmp_key = replace_uri_tokens(s3_key, tmp, tag_token);
+ if (!tmp_key) {
+ goto error;
+ }
+
+ if(strlen(tmp_key) > S3_KEY_SIZE){
+ flb_warn("[s3_key] Object key length is longer than the 1024 character limit.");
+ }
+
+ if (buf != tmp) {
+ flb_sds_destroy(buf);
+ }
+ flb_sds_destroy(tmp);
+ tmp = NULL;
+ buf = NULL;
+ flb_sds_destroy(s3_key);
+ s3_key = tmp_key;
+ tmp_key = NULL;
+
+ tag_token = strtok_concurrent(NULL, tag_delimiter, &strtok_saveptr);
+ i++;
+ }
+
+ tmp = flb_sds_create_len(TAG_PART_DESCRIPTOR, 5);
+ if (!tmp) {
+ goto error;
+ }
+
+ /* A match against "$TAG[" indicates an invalid or out of bounds tag part. */
+ if (strstr(s3_key, tmp)){
+ flb_warn("[s3_key] Invalid / Out of bounds tag part: At most 10 tag parts "
+ "($TAG[0] - $TAG[9]) can be processed. tag=%s, format=%s, delimiters=%s",
+ tag, format, tag_delimiter);
+ }
+
+ /* Find all occurences of $TAG and replace with the entire tag. */
+ tmp_key = replace_uri_tokens(s3_key, TAG_DESCRIPTOR, tag);
+ if (!tmp_key) {
+ goto error;
+ }
+
+ if(strlen(tmp_key) > S3_KEY_SIZE){
+ flb_warn("[s3_key] Object key length is longer than the 1024 character limit.");
+ }
+
+ flb_sds_destroy(s3_key);
+ s3_key = tmp_key;
+ tmp_key = NULL;
+
+ /* Find all occurences of $INDEX and replace with the appropriate index. */
+ if (strstr((char *) format, INDEX_STRING)) {
+ seq_index_len = snprintf(NULL, 0, "%"PRIu64, seq_index);
+ seq_index_str = flb_calloc(seq_index_len + 1, sizeof(char));
+ if (seq_index_str == NULL) {
+ goto error;
+ }
+
+ sprintf(seq_index_str, "%"PRIu64, seq_index);
+ seq_index_str[seq_index_len] = '\0';
+ tmp_key = replace_uri_tokens(s3_key, INDEX_STRING, seq_index_str);
+ if (tmp_key == NULL) {
+ flb_free(seq_index_str);
+ goto error;
+ }
+ if (strlen(tmp_key) > S3_KEY_SIZE) {
+ flb_warn("[s3_key] Object key length is longer than the 1024 character limit.");
+ }
+
+ flb_sds_destroy(s3_key);
+ s3_key = tmp_key;
+ tmp_key = NULL;
+ flb_free(seq_index_str);
+ }
+
+ /* Find all occurences of $UUID and replace with a random string. */
+ random_alphanumeric = flb_sts_session_name();
+ if (!random_alphanumeric) {
+ goto error;
+ }
+ /* only use 8 chars of the random string */
+ random_alphanumeric[8] = '\0';
+ tmp_key = replace_uri_tokens(s3_key, RANDOM_STRING, random_alphanumeric);
+ if (!tmp_key) {
+ flb_free(random_alphanumeric);
+ goto error;
+ }
+
+ if(strlen(tmp_key) > S3_KEY_SIZE){
+ flb_warn("[s3_key] Object key length is longer than the 1024 character limit.");
+ }
+
+ flb_sds_destroy(s3_key);
+ s3_key = tmp_key;
+ tmp_key = NULL;
+ flb_free(random_alphanumeric);
+
+ if (!gmtime_r(&time, &gmt)) {
+ flb_error("[s3_key] Failed to create timestamp.");
+ goto error;
+ }
+
+ flb_sds_destroy(tmp);
+ tmp = NULL;
+
+ /* A string no longer than S3_KEY_SIZE + 1 is created to store the formatted timestamp. */
+ key = flb_calloc(1, (S3_KEY_SIZE + 1) * sizeof(char));
+ if (!key) {
+ goto error;
+ }
+
+ ret = strftime(key, S3_KEY_SIZE, s3_key, &gmt);
+ if(ret == 0){
+ flb_warn("[s3_key] Object key length is longer than the 1024 character limit.");
+ }
+ flb_sds_destroy(s3_key);
+
+ len = strlen(key);
+ if (len > S3_KEY_SIZE) {
+ len = S3_KEY_SIZE;
+ }
+
+ s3_key = flb_sds_create_len(key, len);
+ flb_free(key);
+ if (!s3_key) {
+ goto error;
+ }
+
+ flb_sds_destroy(tmp_tag);
+ tmp_tag = NULL;
+ return s3_key;
+
+ error:
+ flb_errno();
+ if (tmp_tag){
+ flb_sds_destroy(tmp_tag);
+ }
+ if (s3_key){
+ flb_sds_destroy(s3_key);
+ }
+ if (buf && buf != tmp){
+ flb_sds_destroy(buf);
+ }
+ if (tmp){
+ flb_sds_destroy(tmp);
+ }
+ if (tmp_key){
+ flb_sds_destroy(tmp_key);
+ }
+ return NULL;
+}
+
+/*
+ * This function is an extension to strftime which can support milliseconds with %3N,
+ * support nanoseconds with %9N or %L. The return value is the length of formatted
+ * time string.
+ */
+size_t flb_aws_strftime_precision(char **out_buf, const char *time_format,
+ struct flb_time *tms)
+{
+ char millisecond_str[FLB_AWS_MILLISECOND_FORMATTER_LENGTH+1];
+ char nanosecond_str[FLB_AWS_NANOSECOND_FORMATTER_LENGTH+1];
+ char *tmp_parsed_time_str;
+ char *buf;
+ size_t out_size;
+ size_t tmp_parsed_time_str_len;
+ size_t time_format_len;
+ struct tm timestamp;
+ struct tm *tmp;
+ int i;
+
+ /*
+ * Guess the max length needed for tmp_parsed_time_str and tmp_out_buf. The
+ * upper bound is 12*strlen(time_format) because the worst scenario will be only
+ * %c in time_format, and %c will be transfer to 24 chars long by function strftime().
+ */
+ time_format_len = strlen(time_format);
+ tmp_parsed_time_str_len = 12*time_format_len;
+
+ /*
+ * Use tmp_parsed_time_str to buffer when replace %3N with milliseconds, replace
+ * %9N and %L with nanoseconds in time_format.
+ */
+ tmp_parsed_time_str = (char *)flb_calloc(1, tmp_parsed_time_str_len*sizeof(char));
+ if (!tmp_parsed_time_str) {
+ flb_errno();
+ return 0;
+ }
+
+ buf = (char *)flb_calloc(1, tmp_parsed_time_str_len*sizeof(char));
+ if (!buf) {
+ flb_errno();
+ flb_free(tmp_parsed_time_str);
+ return 0;
+ }
+
+ /* Replace %3N to millisecond, %9N and %L to nanosecond in time_format. */
+ snprintf(millisecond_str, FLB_AWS_MILLISECOND_FORMATTER_LENGTH+1,
+ "%" PRIu64, (uint64_t) tms->tm.tv_nsec / 1000000);
+ snprintf(nanosecond_str, FLB_AWS_NANOSECOND_FORMATTER_LENGTH+1,
+ "%" PRIu64, (uint64_t) tms->tm.tv_nsec);
+ for (i = 0; i < time_format_len; i++) {
+ if (strncmp(time_format+i, FLB_AWS_MILLISECOND_FORMATTER, 3) == 0) {
+ strncat(tmp_parsed_time_str, millisecond_str,
+ FLB_AWS_MILLISECOND_FORMATTER_LENGTH+1);
+ i += 2;
+ }
+ else if (strncmp(time_format+i, FLB_AWS_NANOSECOND_FORMATTER_N, 3) == 0) {
+ strncat(tmp_parsed_time_str, nanosecond_str,
+ FLB_AWS_NANOSECOND_FORMATTER_LENGTH+1);
+ i += 2;
+ }
+ else if (strncmp(time_format+i, FLB_AWS_NANOSECOND_FORMATTER_L, 2) == 0) {
+ strncat(tmp_parsed_time_str, nanosecond_str,
+ FLB_AWS_NANOSECOND_FORMATTER_LENGTH+1);
+ i += 1;
+ }
+ else {
+ strncat(tmp_parsed_time_str,time_format+i,1);
+ }
+ }
+
+ tmp = gmtime_r(&tms->tm.tv_sec, &timestamp);
+ if (!tmp) {
+ return 0;
+ }
+
+ out_size = strftime(buf, tmp_parsed_time_str_len,
+ tmp_parsed_time_str, &timestamp);
+
+ /* Check whether tmp_parsed_time_str_len is enough for tmp_out_buff */
+ if (out_size == 0) {
+ flb_free(tmp_parsed_time_str);
+ flb_free(buf);
+ return 0;
+ }
+
+ *out_buf = buf;
+ flb_free(tmp_parsed_time_str);
+
+ return out_size;
+}