diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:19:48 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:20:02 +0000 |
commit | 58daab21cd043e1dc37024a7f99b396788372918 (patch) | |
tree | 96771e43bb69f7c1c2b0b4f7374cb74d7866d0cb /fluent-bit/src/aws | |
parent | Releasing debian version 1.43.2-1. (diff) | |
download | netdata-58daab21cd043e1dc37024a7f99b396788372918.tar.xz netdata-58daab21cd043e1dc37024a7f99b396788372918.zip |
Merging upstream version 1.44.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/src/aws')
-rw-r--r-- | fluent-bit/src/aws/CMakeLists.txt | 32 | ||||
-rw-r--r-- | fluent-bit/src/aws/compression/CMakeLists.txt | 6 | ||||
-rw-r--r-- | fluent-bit/src/aws/compression/arrow/CMakeLists.txt | 7 | ||||
-rw-r--r-- | fluent-bit/src/aws/compression/arrow/compress.c | 147 | ||||
-rw-r--r-- | fluent-bit/src/aws/compression/arrow/compress.h | 13 | ||||
-rw-r--r-- | fluent-bit/src/aws/flb_aws_compress.c | 245 | ||||
-rw-r--r-- | fluent-bit/src/aws/flb_aws_credentials.c | 862 | ||||
-rw-r--r-- | fluent-bit/src/aws/flb_aws_credentials_ec2.c | 371 | ||||
-rw-r--r-- | fluent-bit/src/aws/flb_aws_credentials_http.c | 566 | ||||
-rw-r--r-- | fluent-bit/src/aws/flb_aws_credentials_log.h | 39 | ||||
-rw-r--r-- | fluent-bit/src/aws/flb_aws_credentials_process.c | 783 | ||||
-rw-r--r-- | fluent-bit/src/aws/flb_aws_credentials_profile.c | 753 | ||||
-rw-r--r-- | fluent-bit/src/aws/flb_aws_credentials_sts.c | 958 | ||||
-rw-r--r-- | fluent-bit/src/aws/flb_aws_error_reporter.c | 276 | ||||
-rw-r--r-- | fluent-bit/src/aws/flb_aws_imds.c | 370 | ||||
-rw-r--r-- | fluent-bit/src/aws/flb_aws_util.c | 1047 |
16 files changed, 6475 insertions, 0 deletions
diff --git a/fluent-bit/src/aws/CMakeLists.txt b/fluent-bit/src/aws/CMakeLists.txt new file mode 100644 index 00000000..a6580e7a --- /dev/null +++ b/fluent-bit/src/aws/CMakeLists.txt @@ -0,0 +1,32 @@ +add_subdirectory(compression) + +include_directories( + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_BINARY_DIR} + ) + +set(src + "flb_aws_credentials_log.h" + "flb_aws_compress.c" + "flb_aws_util.c" + "flb_aws_credentials.c" + "flb_aws_credentials_sts.c" + "flb_aws_credentials_ec2.c" + "flb_aws_imds.c" + "flb_aws_credentials_http.c" + "flb_aws_credentials_profile.c" + ) + +if(FLB_HAVE_AWS_CREDENTIAL_PROCESS) + set(src + ${src} + "flb_aws_credentials_process.c" + ) +endif() + +add_library(flb-aws STATIC ${src}) +target_link_libraries(flb-aws flb-aws-compress) + +if(FLB_JEMALLOC) + target_link_libraries(flb-aws libjemalloc) +endif() diff --git a/fluent-bit/src/aws/compression/CMakeLists.txt b/fluent-bit/src/aws/compression/CMakeLists.txt new file mode 100644 index 00000000..02a1ba3a --- /dev/null +++ b/fluent-bit/src/aws/compression/CMakeLists.txt @@ -0,0 +1,6 @@ +add_library(flb-aws-compress INTERFACE) + +if(FLB_ARROW) + add_subdirectory(arrow EXCLUDE_FROM_ALL) + target_link_libraries(flb-aws-compress INTERFACE flb-aws-arrow) +endif() diff --git a/fluent-bit/src/aws/compression/arrow/CMakeLists.txt b/fluent-bit/src/aws/compression/arrow/CMakeLists.txt new file mode 100644 index 00000000..846f6544 --- /dev/null +++ b/fluent-bit/src/aws/compression/arrow/CMakeLists.txt @@ -0,0 +1,7 @@ +set(src + compress.c) + +add_library(flb-aws-arrow STATIC ${src}) + +target_include_directories(flb-aws-arrow PRIVATE ${ARROW_GLIB_INCLUDE_DIRS}) +target_link_libraries(flb-aws-arrow ${ARROW_GLIB_LDFLAGS}) diff --git a/fluent-bit/src/aws/compression/arrow/compress.c b/fluent-bit/src/aws/compression/arrow/compress.c new file mode 100644 index 00000000..a48b34f8 --- /dev/null +++ b/fluent-bit/src/aws/compression/arrow/compress.c @@ -0,0 +1,147 @@ +/* + * This converts S3 plugin's request buffer into Apache Arrow format. + * + * We use GLib binding to call Arrow functions (which is implemented + * in C++) from Fluent Bit. + * + * https://github.com/apache/arrow/tree/master/c_glib + */ + +#include <arrow-glib/arrow-glib.h> +#include <inttypes.h> + +/* + * GArrowTable is the central structure that represents "table" (a.k.a. + * data frame). + */ +static GArrowTable* parse_json(uint8_t *json, int size) +{ + GArrowJSONReader *reader; + GArrowBuffer *buffer; + GArrowBufferInputStream *input; + GArrowJSONReadOptions *options; + GArrowTable *table; + GError *error = NULL; + + buffer = garrow_buffer_new(json, size); + if (buffer == NULL) { + return NULL; + } + + input = garrow_buffer_input_stream_new(buffer); + if (input == NULL) { + g_object_unref(buffer); + return NULL; + } + + options = garrow_json_read_options_new(); + if (options == NULL) { + g_object_unref(buffer); + g_object_unref(input); + return NULL; + } + + reader = garrow_json_reader_new(GARROW_INPUT_STREAM(input), options, &error); + if (reader == NULL) { + g_error_free(error); + g_object_unref(buffer); + g_object_unref(input); + g_object_unref(options); + return NULL; + } + + table = garrow_json_reader_read(reader, &error); + if (table == NULL) { + g_error_free(error); + g_object_unref(buffer); + g_object_unref(input); + g_object_unref(options); + g_object_unref(reader); + return NULL; + } + g_object_unref(buffer); + g_object_unref(input); + g_object_unref(options); + g_object_unref(reader); + return table; +} + +static GArrowResizableBuffer* table_to_buffer(GArrowTable *table) +{ + GArrowResizableBuffer *buffer; + GArrowBufferOutputStream *sink; + GError *error = NULL; + gboolean success; + + buffer = garrow_resizable_buffer_new(0, &error); + if (buffer == NULL) { + g_error_free(error); + return NULL; + } + + sink = garrow_buffer_output_stream_new(buffer); + if (sink == NULL) { + g_object_unref(buffer); + return NULL; + } + + success = garrow_table_write_as_feather( + table, GARROW_OUTPUT_STREAM(sink), + NULL, &error); + if (!success) { + g_error_free(error); + g_object_unref(buffer); + g_object_unref(sink); + return NULL; + } + g_object_unref(sink); + return buffer; +} + +int out_s3_compress_arrow(void *json, size_t size, void **out_buf, size_t *out_size) +{ + GArrowTable *table; + GArrowResizableBuffer *buffer; + GBytes *bytes; + gconstpointer ptr; + gsize len; + uint8_t *buf; + + table = parse_json((uint8_t *) json, size); + if (table == NULL) { + return -1; + } + + buffer = table_to_buffer(table); + g_object_unref(table); + if (buffer == NULL) { + return -1; + } + + bytes = garrow_buffer_get_data(GARROW_BUFFER(buffer)); + if (bytes == NULL) { + g_object_unref(buffer); + return -1; + } + + ptr = g_bytes_get_data(bytes, &len); + if (ptr == NULL) { + g_object_unref(buffer); + g_bytes_unref(bytes); + return -1; + } + + buf = malloc(len); + if (buf == NULL) { + g_object_unref(buffer); + g_bytes_unref(bytes); + return -1; + } + memcpy(buf, ptr, len); + *out_buf = (void *) buf; + *out_size = len; + + g_object_unref(buffer); + g_bytes_unref(bytes); + return 0; +} diff --git a/fluent-bit/src/aws/compression/arrow/compress.h b/fluent-bit/src/aws/compression/arrow/compress.h new file mode 100644 index 00000000..82e94f43 --- /dev/null +++ b/fluent-bit/src/aws/compression/arrow/compress.h @@ -0,0 +1,13 @@ +/* + * This function converts out_s3 buffer into Apache Arrow format. + * + * `json` is a string that contain (concatenated) JSON objects. + * + * `size` is the length of the json data (excluding the trailing + * null-terminator character). + * + * Return 0 on success (with `out_buf` and `out_size` updated), + * and -1 on failure + */ + +int out_s3_compress_arrow(void *json, size_t size, void **out_buf, size_t *out_size); diff --git a/fluent-bit/src/aws/flb_aws_compress.c b/fluent-bit/src/aws/flb_aws_compress.c new file mode 100644 index 00000000..e98ce831 --- /dev/null +++ b/fluent-bit/src/aws/flb_aws_compress.c @@ -0,0 +1,245 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019-2021 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/flb_log.h> +#include <fluent-bit/flb_base64.h> + +#include <fluent-bit/aws/flb_aws_compress.h> +#include <fluent-bit/flb_gzip.h> + +#include <stdint.h> + +#ifdef FLB_HAVE_ARROW +#include "compression/arrow/compress.h" +#endif + +struct compression_option { + int compression_type; + char *compression_keyword; + int(*compress)(void *in_data, size_t in_len, void **out_data, size_t *out_len); +}; + +/* + * Library of compression options + * AWS plugins that support compression will have these options. + * Referenced function should return -1 on error and 0 on success. + */ +static const struct compression_option compression_options[] = { + /* FLB_AWS_COMPRESS_NONE which is 0 is reserved for array footer */ + { + FLB_AWS_COMPRESS_GZIP, + "gzip", + &flb_gzip_compress + }, +#ifdef FLB_HAVE_ARROW + { + FLB_AWS_COMPRESS_ARROW, + "arrow", + &out_s3_compress_arrow + }, +#endif + { 0 } +}; + +int flb_aws_compression_get_type(const char *compression_keyword) +{ + int ret; + const struct compression_option *o; + + o = compression_options; + + while (o->compression_type != 0) { + ret = strcmp(o->compression_keyword, compression_keyword); + if (ret == 0) { + return o->compression_type; + } + ++o; + } + + flb_error("[aws_compress] unknown compression type: %s", compression_keyword); + return -1; +} + +int flb_aws_compression_compress(int compression_type, void *in_data, size_t in_len, + void **out_data, size_t *out_len) +{ + const struct compression_option *o; + + o = compression_options; + + while (o->compression_type != 0) { + if (o->compression_type == compression_type) { + return o->compress(in_data, in_len, out_data, out_len); + } + ++o; + } + + flb_error("[aws_compress] invalid compression type: %i", compression_type); + flb_errno(); + return -1; +} + +int flb_aws_compression_b64_truncate_compress(int compression_type, size_t max_out_len, + void *in_data, size_t in_len, + void **out_data, size_t *out_len) +{ + static const void *truncation_suffix = "[Truncated...]"; + static const size_t truncation_suffix_len = 14; + static const double truncation_reduction_percent = 90; /* % out of 100 */ + static const int truncation_compression_max_attempts = 10; + + int ret; + int is_truncated; + int compression_attempts; + size_t truncated_in_len_prev; + size_t truncated_in_len; + void *truncated_in_buf; + void *compressed_buf; + size_t compressed_len; + size_t original_b64_compressed_len; + + unsigned char *b64_compressed_buf; + size_t b64_compressed_len; + size_t b64_actual_len; + + /* Iterative approach to truncation */ + truncated_in_len = in_len; + truncated_in_buf = in_data; + is_truncated = FLB_FALSE; + b64_compressed_len = SIZE_MAX; + compression_attempts = 0; + while (max_out_len < b64_compressed_len - 1) { + + /* Limit compression truncation attempts, just to be safe */ + if (compression_attempts >= truncation_compression_max_attempts) { + if (is_truncated) { + flb_free(truncated_in_buf); + } + flb_error("[aws_compress] truncation failed, too many compression attempts"); + return -1; + } + + ret = flb_aws_compression_compress(compression_type, truncated_in_buf, + truncated_in_len, &compressed_buf, + &compressed_len); + ++compression_attempts; + if (ret != 0) { + if (is_truncated) { + flb_free(truncated_in_buf); + } + return -1; + } + + /* Determine encoded base64 buffer size */ + b64_compressed_len = compressed_len / 3; /* Compute number of 4 sextet groups */ + b64_compressed_len += (compressed_len % 3 != 0); /* Add padding partial group */ + b64_compressed_len *= 4; /* Compute number of sextets */ + b64_compressed_len += 1; /* Add room for null character 0x00 */ + + /* Truncation needed */ + if (max_out_len < b64_compressed_len - 1) { + flb_debug("[aws_compress] iterative truncation round"); + + /* This compressed_buf is the wrong size. Free */ + flb_free(compressed_buf); + + /* Base case: input compressed empty string, output still too large */ + if (truncated_in_len == 0) { + if (is_truncated) { + flb_free(truncated_in_buf); + } + flb_error("[aws_compress] truncation failed, compressed empty input too " + "large"); + return -1; + } + + /* Calculate corrected input size */ + truncated_in_len_prev = truncated_in_len; + truncated_in_len = (max_out_len * truncated_in_len) / b64_compressed_len; + truncated_in_len = (truncated_in_len * truncation_reduction_percent) / 100; + + /* Ensure working down */ + if (truncated_in_len >= truncated_in_len_prev) { + truncated_in_len = truncated_in_len_prev - 1; + } + + /* Allocate truncation buffer */ + if (!is_truncated) { + is_truncated = FLB_TRUE; + original_b64_compressed_len = b64_compressed_len; + truncated_in_buf = flb_malloc(in_len); + if (!truncated_in_buf) { + flb_errno(); + return -1; + } + memcpy(truncated_in_buf, in_data, in_len); + } + + /* Slap on truncation suffix */ + if (truncated_in_len < truncation_suffix_len) { + /* No room for the truncation suffix. Terminal error */ + flb_error("[aws_compress] truncation failed, no room for suffix"); + flb_free(truncated_in_buf); + return -1; + } + memcpy((char *) truncated_in_buf + truncated_in_len - truncation_suffix_len, + truncation_suffix, truncation_suffix_len); + } + } + + /* Truncate buffer free and compression buffer allocation */ + if (is_truncated) { + flb_free(truncated_in_buf); + flb_warn("[aws_compress][size=%zu] Truncating input for compressed output " + "larger than %zu bytes, output from %zu to %zu bytes", + in_len, max_out_len, original_b64_compressed_len - 1, + b64_compressed_len - 1); + } + b64_compressed_buf = flb_malloc(b64_compressed_len); + if (!b64_compressed_buf) { + flb_errno(); + return -1; + } + + /* Base64 encode compressed out bytes */ + ret = flb_base64_encode(b64_compressed_buf, b64_compressed_len, &b64_actual_len, + compressed_buf, compressed_len); + flb_free(compressed_buf); + + if (ret == FLB_BASE64_ERR_BUFFER_TOO_SMALL) { + flb_error("[aws_compress] compressed log base64 buffer too small"); + return -1; /* not handle truncation at this point */ + } + if (ret != 0) { + flb_free(b64_compressed_buf); + return -1; + } + + /* Double check b64 buf len */ + if (b64_compressed_len - 1 != b64_actual_len) { + flb_error("[aws_compress] buffer len should be 1 greater than actual len"); + flb_free(b64_compressed_buf); + return -1; + } + + *out_data = b64_compressed_buf; + *out_len = b64_compressed_len - 1; /* disregard added null character */ + return 0; +} diff --git a/fluent-bit/src/aws/flb_aws_credentials.c b/fluent-bit/src/aws/flb_aws_credentials.c new file mode 100644 index 00000000..850142e2 --- /dev/null +++ b/fluent-bit/src/aws/flb_aws_credentials.c @@ -0,0 +1,862 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/flb_http_client.h> +#include <fluent-bit/flb_aws_credentials.h> +#include <fluent-bit/flb_aws_util.h> +#include <fluent-bit/flb_jsmn.h> +#include <fluent-bit/flb_output_plugin.h> + +#include <stdlib.h> +#include <time.h> + +#define FIVE_MINUTES 300 +#define TWELVE_HOURS 43200 + +/* Credentials Environment Variables */ +#define AWS_ACCESS_KEY_ID "AWS_ACCESS_KEY_ID" +#define AWS_SECRET_ACCESS_KEY "AWS_SECRET_ACCESS_KEY" +#define AWS_SESSION_TOKEN "AWS_SESSION_TOKEN" + +#define EKS_POD_EXECUTION_ROLE "EKS_POD_EXECUTION_ROLE" + +/* declarations */ +static struct flb_aws_provider *standard_chain_create(struct flb_config + *config, + struct flb_tls *tls, + char *region, + char *sts_endpoint, + char *proxy, + struct + flb_aws_client_generator + *generator, + int eks_irsa, + char *profile); + + +/* + * The standard credential provider chain: + * 1. Environment variables + * 2. Shared credentials file (AWS Profile) + * 3. EKS OIDC + * 4. EC2 IMDS + * 5. ECS HTTP credentials endpoint + * + * This provider will evaluate each provider in order, returning the result + * from the first provider that returns valid credentials. + * + * Note: Client code should use this provider by default. + */ +struct flb_aws_provider_chain { + struct mk_list sub_providers; + + /* + * The standard chain provider picks the first successful provider and + * then uses it until a call to refresh is made. + */ + struct flb_aws_provider *sub_provider; +}; + +/* + * Iterates through the chain and returns credentials from the first provider + * that successfully returns creds. Caches this provider on the implementation. + */ +struct flb_aws_credentials *get_from_chain(struct flb_aws_provider_chain + *implementation) +{ + struct flb_aws_provider *sub_provider = NULL; + struct mk_list *tmp; + struct mk_list *head; + struct flb_aws_credentials *creds = NULL; + + /* find the first provider that produces a valid set of creds */ + mk_list_foreach_safe(head, tmp, &implementation->sub_providers) { + sub_provider = mk_list_entry(head, + struct flb_aws_provider, + _head); + creds = sub_provider->provider_vtable->get_credentials(sub_provider); + if (creds) { + implementation->sub_provider = sub_provider; + return creds; + } + } + + return NULL; +} + +struct flb_aws_credentials *get_credentials_fn_standard_chain(struct + flb_aws_provider + *provider) +{ + struct flb_aws_credentials *creds = NULL; + struct flb_aws_provider_chain *implementation = provider->implementation; + struct flb_aws_provider *sub_provider = implementation->sub_provider; + + if (sub_provider) { + return sub_provider->provider_vtable->get_credentials(sub_provider); + } + + if (try_lock_provider(provider)) { + creds = get_from_chain(implementation); + unlock_provider(provider); + return creds; + } + + /* + * We failed to lock the provider and sub_provider is unset. This means that + * another co-routine is selecting a provider from the chain. + */ + flb_warn("[aws_credentials] No cached credentials are available and " + "a credential refresh is already in progress. The current " + "co-routine will retry."); + return NULL; +} + +int init_fn_standard_chain(struct flb_aws_provider *provider) +{ + struct flb_aws_provider_chain *implementation = provider->implementation; + struct flb_aws_provider *sub_provider = NULL; + struct mk_list *tmp; + struct mk_list *head; + int ret = -1; + + if (try_lock_provider(provider)) { + /* find the first provider that indicates successful init */ + mk_list_foreach_safe(head, tmp, &implementation->sub_providers) { + sub_provider = mk_list_entry(head, + struct flb_aws_provider, + _head); + ret = sub_provider->provider_vtable->init(sub_provider); + if (ret >= 0) { + implementation->sub_provider = sub_provider; + break; + } + } + unlock_provider(provider); + } + + return ret; +} + +/* + * Client code should only call refresh if there has been an + * error from the AWS APIs indicating creds are expired/invalid. + * Refresh may change the current sub_provider. + */ +int refresh_fn_standard_chain(struct flb_aws_provider *provider) +{ + struct flb_aws_provider_chain *implementation = provider->implementation; + struct flb_aws_provider *sub_provider = NULL; + struct mk_list *tmp; + struct mk_list *head; + int ret = -1; + + if (try_lock_provider(provider)) { + /* find the first provider that indicates successful refresh */ + mk_list_foreach_safe(head, tmp, &implementation->sub_providers) { + sub_provider = mk_list_entry(head, + struct flb_aws_provider, + _head); + ret = sub_provider->provider_vtable->refresh(sub_provider); + if (ret >= 0) { + implementation->sub_provider = sub_provider; + break; + } + } + unlock_provider(provider); + } + + return ret; +} + +void sync_fn_standard_chain(struct flb_aws_provider *provider) +{ + struct flb_aws_provider_chain *implementation = provider->implementation; + struct flb_aws_provider *sub_provider = NULL; + struct mk_list *tmp; + struct mk_list *head; + + /* set all providers to sync mode */ + mk_list_foreach_safe(head, tmp, &implementation->sub_providers) { + sub_provider = mk_list_entry(head, + struct flb_aws_provider, + _head); + sub_provider->provider_vtable->sync(sub_provider); + } +} + +void async_fn_standard_chain(struct flb_aws_provider *provider) +{ + struct flb_aws_provider_chain *implementation = provider->implementation; + struct flb_aws_provider *sub_provider = NULL; + struct mk_list *tmp; + struct mk_list *head; + + /* set all providers to async mode */ + mk_list_foreach_safe(head, tmp, &implementation->sub_providers) { + sub_provider = mk_list_entry(head, + struct flb_aws_provider, + _head); + sub_provider->provider_vtable->async(sub_provider); + } +} + +void upstream_set_fn_standard_chain(struct flb_aws_provider *provider, + struct flb_output_instance *ins) +{ + struct flb_aws_provider_chain *implementation = provider->implementation; + struct flb_aws_provider *sub_provider = NULL; + struct mk_list *tmp; + struct mk_list *head; + + /* set all providers to async mode */ + mk_list_foreach_safe(head, tmp, &implementation->sub_providers) { + sub_provider = mk_list_entry(head, + struct flb_aws_provider, + _head); + sub_provider->provider_vtable->upstream_set(sub_provider, ins); + } +} + +void destroy_fn_standard_chain(struct flb_aws_provider *provider) { + struct flb_aws_provider *sub_provider; + struct flb_aws_provider_chain *implementation; + struct mk_list *tmp; + struct mk_list *head; + + implementation = provider->implementation; + + if (implementation) { + mk_list_foreach_safe(head, tmp, &implementation->sub_providers) { + sub_provider = mk_list_entry(head, struct flb_aws_provider, + _head); + mk_list_del(&sub_provider->_head); + flb_aws_provider_destroy(sub_provider); + } + + flb_free(implementation); + } +} + +static struct flb_aws_provider_vtable standard_chain_provider_vtable = { + .get_credentials = get_credentials_fn_standard_chain, + .init = init_fn_standard_chain, + .refresh = refresh_fn_standard_chain, + .destroy = destroy_fn_standard_chain, + .sync = sync_fn_standard_chain, + .async = async_fn_standard_chain, + .upstream_set = upstream_set_fn_standard_chain, +}; + +struct flb_aws_provider *flb_standard_chain_provider_create(struct flb_config + *config, + struct flb_tls *tls, + char *region, + char *sts_endpoint, + char *proxy, + struct + flb_aws_client_generator + *generator, + char *profile) +{ + struct flb_aws_provider *provider; + struct flb_aws_provider *tmp_provider; + char *eks_pod_role = NULL; + char *session_name; + + eks_pod_role = getenv(EKS_POD_EXECUTION_ROLE); + if (eks_pod_role && strlen(eks_pod_role) > 0) { + /* + * eks fargate + * standard chain will be base provider used to + * assume the EKS_POD_EXECUTION_ROLE + */ + flb_debug("[aws_credentials] Using EKS_POD_EXECUTION_ROLE=%s", eks_pod_role); + tmp_provider = standard_chain_create(config, tls, region, sts_endpoint, + proxy, generator, FLB_FALSE, profile); + + if (!tmp_provider) { + return NULL; + } + + session_name = flb_sts_session_name(); + if (!session_name) { + flb_error("Failed to generate random STS session name"); + flb_aws_provider_destroy(tmp_provider); + return NULL; + } + + provider = flb_sts_provider_create(config, tls, tmp_provider, NULL, + eks_pod_role, session_name, + region, sts_endpoint, + NULL, generator); + if (!provider) { + flb_error("Failed to create EKS Fargate Credential Provider"); + flb_aws_provider_destroy(tmp_provider); + return NULL; + } + /* session name can freed after provider is created */ + flb_free(session_name); + session_name = NULL; + + return provider; + } + + /* standard case- not in EKS Fargate */ + provider = standard_chain_create(config, tls, region, sts_endpoint, + proxy, generator, FLB_TRUE, profile); + return provider; +} + +struct flb_aws_provider *flb_managed_chain_provider_create(struct flb_output_instance + *ins, + struct flb_config + *config, + char *config_key_prefix, + char *proxy, + struct + flb_aws_client_generator + *generator) +{ + flb_sds_t config_key_region; + flb_sds_t config_key_sts_endpoint; + flb_sds_t config_key_role_arn; + flb_sds_t config_key_external_id; + flb_sds_t config_key_profile; + const char *region = NULL; + const char *sts_endpoint = NULL; + const char *role_arn = NULL; + const char *external_id = NULL; + const char *profile = NULL; + char *session_name = NULL; + int key_prefix_len; + int key_max_len; + + /* Provider managed dependencies */ + struct flb_aws_provider *aws_provider = NULL; + struct flb_aws_provider *base_aws_provider = NULL; + struct flb_tls *cred_tls = NULL; + struct flb_tls *sts_tls = NULL; + + /* Config keys */ + key_prefix_len = strlen(config_key_prefix); + key_max_len = key_prefix_len + 12; /* max length of + "region", "sts_endpoint", "role_arn", + "external_id" */ + + /* Evaluate full config keys */ + config_key_region = flb_sds_create_len(config_key_prefix, key_max_len); + strcpy(config_key_region + key_prefix_len, "region"); + config_key_sts_endpoint = flb_sds_create_len(config_key_prefix, key_max_len); + strcpy(config_key_sts_endpoint + key_prefix_len, "sts_endpoint"); + config_key_role_arn = flb_sds_create_len(config_key_prefix, key_max_len); + strcpy(config_key_role_arn + key_prefix_len, "role_arn"); + config_key_external_id = flb_sds_create_len(config_key_prefix, key_max_len); + strcpy(config_key_external_id + key_prefix_len, "external_id"); + config_key_profile = flb_sds_create_len(config_key_prefix, key_max_len); + strcpy(config_key_profile + key_prefix_len, "profile"); + + /* AWS provider needs a separate TLS instance */ + cred_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, + FLB_TRUE, + ins->tls_debug, + ins->tls_vhost, + ins->tls_ca_path, + ins->tls_ca_file, + ins->tls_crt_file, + ins->tls_key_file, + ins->tls_key_passwd); + if (!cred_tls) { + flb_plg_error(ins, "Failed to create TLS instance for AWS Provider"); + flb_errno(); + goto error; + } + + region = flb_output_get_property(config_key_region, ins); + if (!region) { + flb_plg_error(ins, "aws_auth enabled but %s not set", config_key_region); + goto error; + } + + /* Use null sts_endpoint if none provided */ + sts_endpoint = flb_output_get_property(config_key_sts_endpoint, ins); + /* Get the profile from configuration */ + profile = flb_output_get_property(config_key_profile, ins); + aws_provider = flb_standard_chain_provider_create(config, + cred_tls, + (char *) region, + (char *) sts_endpoint, + NULL, + flb_aws_client_generator(), + profile); + if (!aws_provider) { + flb_plg_error(ins, "Failed to create AWS Credential Provider"); + goto error; + } + + role_arn = flb_output_get_property(config_key_role_arn, ins); + if (role_arn) { + /* Use the STS Provider */ + base_aws_provider = aws_provider; + external_id = flb_output_get_property(config_key_external_id, ins); + + session_name = flb_sts_session_name(); + if (!session_name) { + flb_plg_error(ins, "Failed to generate aws iam role " + "session name"); + goto error; + } + + /* STS provider needs yet another separate TLS instance */ + sts_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, + FLB_TRUE, + ins->tls_debug, + ins->tls_vhost, + ins->tls_ca_path, + ins->tls_ca_file, + ins->tls_crt_file, + ins->tls_key_file, + ins->tls_key_passwd); + if (!sts_tls) { + flb_plg_error(ins, "Failed to create TLS instance for AWS STS Credential " + "Provider"); + flb_errno(); + goto error; + } + + aws_provider = flb_sts_provider_create(config, + sts_tls, + base_aws_provider, + (char *) external_id, + (char *) role_arn, + session_name, + (char *) region, + (char *) sts_endpoint, + NULL, + flb_aws_client_generator()); + if (!aws_provider) { + flb_plg_error(ins, "Failed to create AWS STS Credential " + "Provider"); + goto error; + } + } + + /* initialize credentials in sync mode */ + aws_provider->provider_vtable->sync(aws_provider); + aws_provider->provider_vtable->init(aws_provider); + + /* set back to async */ + aws_provider->provider_vtable->async(aws_provider); + + /* store dependencies in aws_provider for managed cleanup */ + aws_provider->base_aws_provider = base_aws_provider; + aws_provider->cred_tls = cred_tls; + aws_provider->sts_tls = sts_tls; + + goto cleanup; + +error: + if (aws_provider) { + /* disconnect dependencies */ + aws_provider->base_aws_provider = NULL; + aws_provider->cred_tls = NULL; + aws_provider->sts_tls = NULL; + /* destroy */ + flb_aws_provider_destroy(aws_provider); + } + /* free dependencies */ + if (base_aws_provider) { + flb_aws_provider_destroy(base_aws_provider); + } + if (cred_tls) { + flb_tls_destroy(cred_tls); + } + if (sts_tls) { + flb_tls_destroy(sts_tls); + } + aws_provider = NULL; + +cleanup: + if (config_key_region) { + flb_sds_destroy(config_key_region); + } + if (config_key_sts_endpoint) { + flb_sds_destroy(config_key_sts_endpoint); + } + if (config_key_role_arn) { + flb_sds_destroy(config_key_role_arn); + } + if (config_key_external_id) { + flb_sds_destroy(config_key_external_id); + } + if (session_name) { + flb_free(session_name); + } + + return aws_provider; +} + +static struct flb_aws_provider *standard_chain_create(struct flb_config + *config, + struct flb_tls *tls, + char *region, + char *sts_endpoint, + char *proxy, + struct + flb_aws_client_generator + *generator, + int eks_irsa, + char *profile) +{ + struct flb_aws_provider *sub_provider; + struct flb_aws_provider *provider; + struct flb_aws_provider_chain *implementation; + + provider = flb_calloc(1, sizeof(struct flb_aws_provider)); + + if (!provider) { + flb_errno(); + return NULL; + } + + pthread_mutex_init(&provider->lock, NULL); + + implementation = flb_calloc(1, sizeof(struct flb_aws_provider_chain)); + + if (!implementation) { + flb_errno(); + flb_free(provider); + return NULL; + } + + provider->provider_vtable = &standard_chain_provider_vtable; + provider->implementation = implementation; + + /* Create chain of providers */ + mk_list_init(&implementation->sub_providers); + + sub_provider = flb_aws_env_provider_create(); + if (!sub_provider) { + /* Env provider will only fail creation if a memory alloc failed */ + flb_aws_provider_destroy(provider); + return NULL; + } + flb_debug("[aws_credentials] Initialized Env Provider in standard chain"); + + mk_list_add(&sub_provider->_head, &implementation->sub_providers); + + flb_debug("[aws_credentials] creating profile %s provider", profile); + sub_provider = flb_profile_provider_create(profile); + if (sub_provider) { + /* Profile provider can fail if HOME env var is not set */; + mk_list_add(&sub_provider->_head, &implementation->sub_providers); + flb_debug("[aws_credentials] Initialized AWS Profile Provider in " + "standard chain"); + } + + if (eks_irsa == FLB_TRUE) { + sub_provider = flb_eks_provider_create(config, tls, region, sts_endpoint, proxy, generator); + if (sub_provider) { + /* EKS provider can fail if we are not running in k8s */; + mk_list_add(&sub_provider->_head, &implementation->sub_providers); + flb_debug("[aws_credentials] Initialized EKS Provider in standard chain"); + } + } + + sub_provider = flb_ecs_provider_create(config, generator); + if (sub_provider) { + /* ECS Provider will fail creation if we are not running in ECS */ + mk_list_add(&sub_provider->_head, &implementation->sub_providers); + flb_debug("[aws_credentials] Initialized ECS Provider in standard chain"); + } + + sub_provider = flb_ec2_provider_create(config, generator); + if (!sub_provider) { + /* EC2 provider will only fail creation if a memory alloc failed */ + flb_aws_provider_destroy(provider); + return NULL; + } + mk_list_add(&sub_provider->_head, &implementation->sub_providers); + flb_debug("[aws_credentials] Initialized EC2 Provider in standard chain"); + + return provider; +} + +/* Environment Provider */ +struct flb_aws_credentials *get_credentials_fn_environment(struct + flb_aws_provider + *provider) +{ + char *access_key = NULL; + char *secret_key = NULL; + char *session_token = NULL; + struct flb_aws_credentials *creds = NULL; + + flb_debug("[aws_credentials] Requesting credentials from the " + "env provider.."); + + access_key = getenv(AWS_ACCESS_KEY_ID); + if (!access_key || strlen(access_key) <= 0) { + return NULL; + } + + secret_key = getenv(AWS_SECRET_ACCESS_KEY); + if (!secret_key || strlen(secret_key) <= 0) { + return NULL; + } + + creds = flb_calloc(1, sizeof(struct flb_aws_credentials)); + if (!creds) { + flb_errno(); + return NULL; + } + + creds->access_key_id = flb_sds_create(access_key); + if (!creds->access_key_id) { + flb_aws_credentials_destroy(creds); + flb_errno(); + return NULL; + } + + creds->secret_access_key = flb_sds_create(secret_key); + if (!creds->secret_access_key) { + flb_aws_credentials_destroy(creds); + flb_errno(); + return NULL; + } + + session_token = getenv(AWS_SESSION_TOKEN); + if (session_token && strlen(session_token) > 0) { + creds->session_token = flb_sds_create(session_token); + if (!creds->session_token) { + flb_aws_credentials_destroy(creds); + flb_errno(); + return NULL; + } + } else { + creds->session_token = NULL; + } + + return creds; + +} + +int refresh_env(struct flb_aws_provider *provider) +{ + char *access_key = NULL; + char *secret_key = NULL; + + access_key = getenv(AWS_ACCESS_KEY_ID); + if (!access_key || strlen(access_key) <= 0) { + return -1; + } + + secret_key = getenv(AWS_SECRET_ACCESS_KEY); + if (!secret_key || strlen(secret_key) <= 0) { + return -1; + } + + return 0; +} + +/* + * For the env provider, refresh simply checks if the environment + * variables are available. + */ +int refresh_fn_environment(struct flb_aws_provider *provider) +{ + flb_debug("[aws_credentials] Refresh called on the env provider"); + + return refresh_env(provider); +} + +int init_fn_environment(struct flb_aws_provider *provider) +{ + flb_debug("[aws_credentials] Init called on the env provider"); + + return refresh_env(provider); +} + +/* + * sync and async are no-ops for the env provider because it does not make + * network IO calls + */ +void sync_fn_environment(struct flb_aws_provider *provider) +{ + return; +} + +void async_fn_environment(struct flb_aws_provider *provider) +{ + return; +} + +void upstream_set_fn_environment(struct flb_aws_provider *provider, + struct flb_output_instance *ins) +{ + return; +} + +/* Destroy is a no-op for the env provider */ +void destroy_fn_environment(struct flb_aws_provider *provider) { + return; +} + +static struct flb_aws_provider_vtable environment_provider_vtable = { + .get_credentials = get_credentials_fn_environment, + .init = init_fn_environment, + .refresh = refresh_fn_environment, + .destroy = destroy_fn_environment, + .sync = sync_fn_environment, + .async = async_fn_environment, + .upstream_set = upstream_set_fn_environment, +}; + +struct flb_aws_provider *flb_aws_env_provider_create() { + struct flb_aws_provider *provider = flb_calloc(1, sizeof( + struct flb_aws_provider)); + + if (!provider) { + flb_errno(); + return NULL; + } + + provider->provider_vtable = &environment_provider_vtable; + provider->implementation = NULL; + + return provider; +} + + +void flb_aws_credentials_destroy(struct flb_aws_credentials *creds) +{ + if (creds) { + if (creds->access_key_id) { + flb_sds_destroy(creds->access_key_id); + } + if (creds->secret_access_key) { + flb_sds_destroy(creds->secret_access_key); + } + if (creds->session_token) { + flb_sds_destroy(creds->session_token); + } + + flb_free(creds); + } +} + +void flb_aws_provider_destroy(struct flb_aws_provider *provider) +{ + if (provider) { + if (provider->implementation) { + provider->provider_vtable->destroy(provider); + } + + pthread_mutex_destroy(&provider->lock); + + /* free managed dependencies */ + if (provider->base_aws_provider) { + flb_aws_provider_destroy(provider->base_aws_provider); + } + if (provider->cred_tls) { + flb_tls_destroy(provider->cred_tls); + } + if (provider->sts_tls) { + flb_tls_destroy(provider->sts_tls); + } + + flb_free(provider); + } +} + +time_t timestamp_to_epoch(const char *timestamp) +{ + struct tm tm = {0}; + time_t seconds; + int r; + + r = sscanf(timestamp, "%d-%d-%dT%d:%d:%dZ", &tm.tm_year, &tm.tm_mon, + &tm.tm_mday, &tm.tm_hour, &tm.tm_min, &tm.tm_sec); + if (r != 6) { + return -1; + } + + tm.tm_year -= 1900; + tm.tm_mon -= 1; + tm.tm_isdst = -1; + seconds = timegm(&tm); + if (seconds < 0) { + return -1; + } + + return seconds; +} + +time_t flb_aws_cred_expiration(const char *timestamp) +{ + time_t now; + time_t expiration = timestamp_to_epoch(timestamp); + if (expiration < 0) { + flb_warn("[aws_credentials] Could not parse expiration: %s", timestamp); + return -1; + } + /* + * Sanity check - expiration should be ~10 minutes to 12 hours in the future + * (> 12 hours is impossible with the current APIs and would likely indicate + * a bug in how this code processes timestamps.) + */ + now = time(NULL); + if (expiration < (now + FIVE_MINUTES)) { + flb_warn("[aws_credentials] Credential expiration '%s' is less than " + "5 minutes in the future.", + timestamp); + } + if (expiration > (now + TWELVE_HOURS)) { + flb_warn("[aws_credentials] Credential expiration '%s' is greater than " + "12 hours in the future. This should not be possible.", + timestamp); + } + return expiration; +} + +/* + * Fluent Bit is now multi-threaded and asynchonous with coros. + * The trylock prevents deadlock, and protects the provider + * when a cred refresh happens. The refresh frees and + * sets the shared cred cache, a double free could occur + * if two threads do it at the same exact time. + */ + +/* Like a traditional try lock- it does not block if the lock is not obtained */ +int try_lock_provider(struct flb_aws_provider *provider) +{ + int ret = 0; + ret = pthread_mutex_trylock(&provider->lock); + if (ret != 0) { + return FLB_FALSE; + } + return FLB_TRUE; +} + +void unlock_provider(struct flb_aws_provider *provider) +{ + pthread_mutex_unlock(&provider->lock); +} diff --git a/fluent-bit/src/aws/flb_aws_credentials_ec2.c b/fluent-bit/src/aws/flb_aws_credentials_ec2.c new file mode 100644 index 00000000..5d2d8515 --- /dev/null +++ b/fluent-bit/src/aws/flb_aws_credentials_ec2.c @@ -0,0 +1,371 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/flb_http_client.h> +#include <fluent-bit/flb_aws_credentials.h> +#include <fluent-bit/flb_aws_util.h> +#include <fluent-bit/flb_jsmn.h> +#include <fluent-bit/aws/flb_aws_imds.h> + +#include <stdlib.h> +#include <time.h> +#include <sys/types.h> +#include <sys/stat.h> + +#define AWS_IMDS_ROLE_PATH "/latest/meta-data/iam/security-credentials/" +#define AWS_IMDS_ROLE_PATH_LEN 43 + +struct flb_aws_provider_ec2; +static int get_creds_ec2(struct flb_aws_provider_ec2 *implementation); +static int ec2_credentials_request(struct flb_aws_provider_ec2 + *implementation, char *cred_path); + +/* EC2 IMDS Provider */ + +/* + * A provider that obtains credentials from EC2 IMDS. + */ +struct flb_aws_provider_ec2 { + struct flb_aws_credentials *creds; + time_t next_refresh; + + /* upstream connection to IMDS */ + struct flb_aws_client *client; + + /* IMDS interface */ + struct flb_aws_imds *imds_interface; +}; + +struct flb_aws_credentials *get_credentials_fn_ec2(struct flb_aws_provider + *provider) +{ + struct flb_aws_credentials *creds; + int refresh = FLB_FALSE; + struct flb_aws_provider_ec2 *implementation = provider->implementation; + + flb_debug("[aws_credentials] Requesting credentials from the " + "EC2 provider.."); + + /* a negative next_refresh means that auto-refresh is disabled */ + if (implementation->next_refresh > 0 + && time(NULL) > implementation->next_refresh) { + refresh = FLB_TRUE; + } + if (!implementation->creds || refresh == FLB_TRUE) { + if (try_lock_provider(provider)) { + get_creds_ec2(implementation); + unlock_provider(provider); + } + } + + if (!implementation->creds) { + /* + * We failed to lock the provider and creds are unset. This means that + * another co-routine is performing the refresh. + */ + flb_warn("[aws_credentials] No cached credentials are available and " + "a credential refresh is already in progress. The current " + "co-routine will retry."); + + return NULL; + } + + creds = flb_calloc(1, sizeof(struct flb_aws_credentials)); + if (!creds) { + flb_errno(); + return NULL; + } + + creds->access_key_id = flb_sds_create(implementation->creds->access_key_id); + if (!creds->access_key_id) { + flb_errno(); + flb_aws_credentials_destroy(creds); + return NULL; + } + + creds->secret_access_key = flb_sds_create(implementation->creds-> + secret_access_key); + if (!creds->secret_access_key) { + flb_errno(); + flb_aws_credentials_destroy(creds); + return NULL; + } + + if (implementation->creds->session_token) { + creds->session_token = flb_sds_create(implementation->creds-> + session_token); + if (!creds->session_token) { + flb_errno(); + flb_aws_credentials_destroy(creds); + return NULL; + } + + } else { + creds->session_token = NULL; + } + + return creds; +} + +int refresh_fn_ec2(struct flb_aws_provider *provider) { + struct flb_aws_provider_ec2 *implementation = provider->implementation; + int ret = -1; + + flb_debug("[aws_credentials] Refresh called on the EC2 IMDS provider"); + if (try_lock_provider(provider)) { + ret = get_creds_ec2(implementation); + unlock_provider(provider); + } + return ret; +} + +int init_fn_ec2(struct flb_aws_provider *provider) { + struct flb_aws_provider_ec2 *implementation = provider->implementation; + int ret = -1; + + implementation->client->debug_only = FLB_TRUE; + + flb_debug("[aws_credentials] Init called on the EC2 IMDS provider"); + if (try_lock_provider(provider)) { + ret = get_creds_ec2(implementation); + unlock_provider(provider); + } + + implementation->client->debug_only = FLB_FALSE; + return ret; +} + +void sync_fn_ec2(struct flb_aws_provider *provider) { + struct flb_aws_provider_ec2 *implementation = provider->implementation; + + flb_debug("[aws_credentials] Sync called on the EC2 provider"); + /* remove async flag */ + flb_stream_disable_async_mode(&implementation->client->upstream->base); +} + +void async_fn_ec2(struct flb_aws_provider *provider) { + struct flb_aws_provider_ec2 *implementation = provider->implementation; + + flb_debug("[aws_credentials] Async called on the EC2 provider"); + /* add async flag */ + flb_stream_enable_async_mode(&implementation->client->upstream->base); +} + +void upstream_set_fn_ec2(struct flb_aws_provider *provider, + struct flb_output_instance *ins) { + struct flb_aws_provider_ec2 *implementation = provider->implementation; + + flb_debug("[aws_credentials] upstream_set called on the EC2 provider"); + /* Make sure TLS is set to false before setting upstream, then reset it */ + ins->use_tls = FLB_FALSE; + flb_output_upstream_set(implementation->client->upstream, ins); + ins->use_tls = FLB_TRUE; +} + +void destroy_fn_ec2(struct flb_aws_provider *provider) { + struct flb_aws_provider_ec2 *implementation = provider->implementation; + + if (implementation) { + if (implementation->creds) { + flb_aws_credentials_destroy(implementation->creds); + } + + if (implementation->imds_interface) { + flb_aws_imds_destroy(implementation->imds_interface); + } + + if (implementation->client) { + flb_aws_client_destroy(implementation->client); + } + + flb_free(implementation); + provider->implementation = NULL; + } + + return; +} + +static struct flb_aws_provider_vtable ec2_provider_vtable = { + .get_credentials = get_credentials_fn_ec2, + .init = init_fn_ec2, + .refresh = refresh_fn_ec2, + .destroy = destroy_fn_ec2, + .sync = sync_fn_ec2, + .async = async_fn_ec2, + .upstream_set = upstream_set_fn_ec2, +}; + +struct flb_aws_provider *flb_ec2_provider_create(struct flb_config *config, + struct + flb_aws_client_generator + *generator) +{ + struct flb_aws_provider_ec2 *implementation; + struct flb_aws_provider *provider; + struct flb_upstream *upstream; + + provider = flb_calloc(1, sizeof(struct flb_aws_provider)); + + if (!provider) { + flb_errno(); + return NULL; + } + + pthread_mutex_init(&provider->lock, NULL); + + implementation = flb_calloc(1, sizeof(struct flb_aws_provider_ec2)); + + if (!implementation) { + flb_free(provider); + flb_errno(); + return NULL; + } + + provider->provider_vtable = &ec2_provider_vtable; + provider->implementation = implementation; + + upstream = flb_upstream_create(config, FLB_AWS_IMDS_HOST, FLB_AWS_IMDS_PORT, + FLB_IO_TCP, NULL); + if (!upstream) { + flb_aws_provider_destroy(provider); + flb_debug("[aws_credentials] unable to connect to EC2 IMDS."); + return NULL; + } + + /* IMDSv2 token request will timeout if hops = 1 and running within container */ + upstream->base.net.connect_timeout = FLB_AWS_IMDS_TIMEOUT; + upstream->base.net.io_timeout = FLB_AWS_IMDS_TIMEOUT; + upstream->base.net.keepalive = FLB_FALSE; /* On timeout, the connection is broken */ + + implementation->client = generator->create(); + if (!implementation->client) { + flb_aws_provider_destroy(provider); + flb_upstream_destroy(upstream); + flb_error("[aws_credentials] EC2 IMDS: client creation error"); + return NULL; + } + implementation->client->name = "ec2_imds_provider_client"; + implementation->client->has_auth = FLB_FALSE; + implementation->client->provider = NULL; + implementation->client->region = NULL; + implementation->client->service = NULL; + implementation->client->port = 80; + implementation->client->flags = 0; + implementation->client->proxy = NULL; + implementation->client->upstream = upstream; + + /* Use default imds configuration */ + implementation->imds_interface = flb_aws_imds_create(&flb_aws_imds_config_default, + implementation->client); + if (!implementation->imds_interface) { + flb_aws_provider_destroy(provider); + flb_error("[aws_credentials] EC2 IMDS configuration error"); + return NULL; + } + + return provider; +} + +/* Requests creds from IMDSv1 and sets them on the provider */ +static int get_creds_ec2(struct flb_aws_provider_ec2 *implementation) +{ + int ret; + flb_sds_t instance_role; + size_t instance_role_len; + char *cred_path; + size_t cred_path_size; + + flb_debug("[aws_credentials] requesting credentials from EC2 IMDS"); + + /* Get the name of the instance role */ + ret = flb_aws_imds_request(implementation->imds_interface, AWS_IMDS_ROLE_PATH, + &instance_role, &instance_role_len); + + if (ret < 0) { + return -1; + } + + flb_debug("[aws_credentials] Requesting credentials for instance role %s", + instance_role); + + /* Construct path where we will find the credentials */ + cred_path_size = sizeof(char) * (AWS_IMDS_ROLE_PATH_LEN + + instance_role_len) + 1; + cred_path = flb_malloc(cred_path_size); + if (!cred_path) { + flb_sds_destroy(instance_role); + flb_errno(); + return -1; + } + + ret = snprintf(cred_path, cred_path_size, "%s%s", AWS_IMDS_ROLE_PATH, + instance_role); + if (ret < 0) { + flb_sds_destroy(instance_role); + flb_free(cred_path); + flb_errno(); + return -1; + } + + /* request creds */ + ret = ec2_credentials_request(implementation, cred_path); + + flb_sds_destroy(instance_role); + flb_free(cred_path); + return ret; + +} + +static int ec2_credentials_request(struct flb_aws_provider_ec2 + *implementation, char *cred_path) +{ + int ret; + flb_sds_t credentials_response; + size_t credentials_response_len; + struct flb_aws_credentials *creds; + time_t expiration; + + ret = flb_aws_imds_request(implementation->imds_interface, cred_path, + &credentials_response, &credentials_response_len); + + if (ret < 0) { + return -1; + } + + creds = flb_parse_http_credentials(credentials_response, + credentials_response_len, + &expiration); + + if (creds == NULL) { + flb_sds_destroy(credentials_response); + return -1; + } + + /* destroy existing credentials first */ + flb_aws_credentials_destroy(implementation->creds); + implementation->creds = NULL; + /* set new creds */ + implementation->creds = creds; + implementation->next_refresh = expiration - FLB_AWS_REFRESH_WINDOW; + + flb_sds_destroy(credentials_response); + return 0; +} diff --git a/fluent-bit/src/aws/flb_aws_credentials_http.c b/fluent-bit/src/aws/flb_aws_credentials_http.c new file mode 100644 index 00000000..c08b6b55 --- /dev/null +++ b/fluent-bit/src/aws/flb_aws_credentials_http.c @@ -0,0 +1,566 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/flb_http_client.h> +#include <fluent-bit/flb_aws_credentials.h> +#include <fluent-bit/flb_aws_util.h> + +#include <fluent-bit/flb_jsmn.h> +#include <stdlib.h> +#include <time.h> +#include <sys/types.h> +#include <sys/stat.h> + +#define AWS_CREDENTIAL_RESPONSE_ACCESS_KEY "AccessKeyId" +#define AWS_CREDENTIAL_RESPONSE_SECRET_KEY "SecretAccessKey" +#define AWS_HTTP_RESPONSE_TOKEN "Token" +#define AWS_CREDENTIAL_RESPONSE_EXPIRATION "Expiration" + +#define ECS_CREDENTIALS_HOST "169.254.170.2" +#define ECS_CREDENTIALS_HOST_LEN 13 +#define ECS_CREDENTIALS_PATH_ENV_VAR "AWS_CONTAINER_CREDENTIALS_RELATIVE_URI" + + +/* Declarations */ +struct flb_aws_provider_http; +static int http_credentials_request(struct flb_aws_provider_http + *implementation); + + +/* + * HTTP Credentials Provider - retrieve credentials from a local http server + * Used to implement the ECS Credentials provider. + * Equivalent to: + * https://github.com/aws/aws-sdk-go/tree/master/aws/credentials/endpointcreds + */ + +struct flb_aws_provider_http { + struct flb_aws_credentials *creds; + time_t next_refresh; + + struct flb_aws_client *client; + + /* Host and Path to request credentials */ + flb_sds_t host; + flb_sds_t path; +}; + + +struct flb_aws_credentials *get_credentials_fn_http(struct flb_aws_provider + *provider) +{ + struct flb_aws_credentials *creds = NULL; + int refresh = FLB_FALSE; + struct flb_aws_provider_http *implementation = provider->implementation; + + flb_debug("[aws_credentials] Retrieving credentials from the " + "HTTP provider.."); + + /* a negative next_refresh means that auto-refresh is disabled */ + if (implementation->next_refresh > 0 + && time(NULL) > implementation->next_refresh) { + refresh = FLB_TRUE; + } + if (!implementation->creds || refresh == FLB_TRUE) { + if (try_lock_provider(provider)) { + http_credentials_request(implementation); + unlock_provider(provider); + } + } + + if (!implementation->creds) { + /* + * We failed to lock the provider and creds are unset. This means that + * another co-routine is performing the refresh. + */ + flb_warn("[aws_credentials] No cached credentials are available and " + "a credential refresh is already in progress. The current " + "co-routine will retry."); + + return NULL; + } + + creds = flb_calloc(1, sizeof(struct flb_aws_credentials)); + if (!creds) { + flb_errno(); + goto error; + } + + creds->access_key_id = flb_sds_create(implementation->creds->access_key_id); + if (!creds->access_key_id) { + flb_errno(); + goto error; + } + + creds->secret_access_key = flb_sds_create(implementation->creds-> + secret_access_key); + if (!creds->secret_access_key) { + flb_errno(); + goto error; + } + + if (implementation->creds->session_token) { + creds->session_token = flb_sds_create(implementation->creds-> + session_token); + if (!creds->session_token) { + flb_errno(); + goto error; + } + + } else { + creds->session_token = NULL; + } + + return creds; + +error: + flb_aws_credentials_destroy(creds); + return NULL; +} + +int refresh_fn_http(struct flb_aws_provider *provider) { + struct flb_aws_provider_http *implementation = provider->implementation; + int ret = -1; + flb_debug("[aws_credentials] Refresh called on the http provider"); + + if (try_lock_provider(provider)) { + ret = http_credentials_request(implementation); + unlock_provider(provider); + } + return ret; +} + +int init_fn_http(struct flb_aws_provider *provider) { + struct flb_aws_provider_http *implementation = provider->implementation; + int ret = -1; + flb_debug("[aws_credentials] Init called on the http provider"); + + implementation->client->debug_only = FLB_TRUE; + + if (try_lock_provider(provider)) { + ret = http_credentials_request(implementation); + unlock_provider(provider); + } + + implementation->client->debug_only = FLB_FALSE; + + return ret; +} + +void sync_fn_http(struct flb_aws_provider *provider) { + struct flb_aws_provider_http *implementation = provider->implementation; + + flb_debug("[aws_credentials] Sync called on the http provider"); + /* remove async flag */ + flb_stream_disable_async_mode(&implementation->client->upstream->base); +} + +void async_fn_http(struct flb_aws_provider *provider) { + struct flb_aws_provider_http *implementation = provider->implementation; + + flb_debug("[aws_credentials] Async called on the http provider"); + /* add async flag */ + flb_stream_enable_async_mode(&implementation->client->upstream->base); +} + +void upstream_set_fn_http(struct flb_aws_provider *provider, + struct flb_output_instance *ins) { + struct flb_aws_provider_http *implementation = provider->implementation; + + flb_debug("[aws_credentials] upstream_set called on the http provider"); + /* Make sure TLS is set to false before setting upstream, then reset it */ + ins->use_tls = FLB_FALSE; + flb_output_upstream_set(implementation->client->upstream, ins); + ins->use_tls = FLB_TRUE; +} + +void destroy_fn_http(struct flb_aws_provider *provider) { + struct flb_aws_provider_http *implementation = provider->implementation; + + if (implementation) { + if (implementation->creds) { + flb_aws_credentials_destroy(implementation->creds); + } + + if (implementation->client) { + flb_aws_client_destroy(implementation->client); + } + + if (implementation->host) { + flb_sds_destroy(implementation->host); + } + + if (implementation->path) { + flb_sds_destroy(implementation->path); + } + + flb_free(implementation); + provider->implementation = NULL; + } + + return; +} + +static struct flb_aws_provider_vtable http_provider_vtable = { + .get_credentials = get_credentials_fn_http, + .init = init_fn_http, + .refresh = refresh_fn_http, + .destroy = destroy_fn_http, + .sync = sync_fn_http, + .async = async_fn_http, + .upstream_set = upstream_set_fn_http, +}; + +struct flb_aws_provider *flb_http_provider_create(struct flb_config *config, + flb_sds_t host, + flb_sds_t path, + struct + flb_aws_client_generator + *generator) +{ + struct flb_aws_provider_http *implementation = NULL; + struct flb_aws_provider *provider = NULL; + struct flb_upstream *upstream = NULL; + + flb_debug("[aws_credentials] Configuring HTTP provider with %s:80%s", + host, path); + + provider = flb_calloc(1, sizeof(struct flb_aws_provider)); + + if (!provider) { + flb_errno(); + return NULL; + } + + pthread_mutex_init(&provider->lock, NULL); + + implementation = flb_calloc(1, sizeof(struct flb_aws_provider_http)); + + if (!implementation) { + flb_free(provider); + flb_errno(); + return NULL; + } + + provider->provider_vtable = &http_provider_vtable; + provider->implementation = implementation; + + implementation->host = host; + implementation->path = path; + + upstream = flb_upstream_create(config, host, 80, FLB_IO_TCP, NULL); + + if (!upstream) { + flb_aws_provider_destroy(provider); + flb_error("[aws_credentials] HTTP Provider: connection initialization " + "error"); + return NULL; + } + + upstream->base.net.connect_timeout = FLB_AWS_CREDENTIAL_NET_TIMEOUT; + + implementation->client = generator->create(); + if (!implementation->client) { + flb_aws_provider_destroy(provider); + flb_upstream_destroy(upstream); + flb_error("[aws_credentials] HTTP Provider: client creation error"); + return NULL; + } + implementation->client->name = "http_provider_client"; + implementation->client->has_auth = FLB_FALSE; + implementation->client->provider = NULL; + implementation->client->region = NULL; + implementation->client->service = NULL; + implementation->client->port = 80; + implementation->client->flags = 0; + implementation->client->proxy = NULL; + implementation->client->upstream = upstream; + + return provider; +} + +/* + * ECS Provider + * The ECS Provider is just a wrapper around the HTTP Provider + * with the ECS credentials endpoint. + */ + + struct flb_aws_provider *flb_ecs_provider_create(struct flb_config *config, + struct + flb_aws_client_generator + *generator) +{ + flb_sds_t host = NULL; + flb_sds_t path = NULL; + char *path_var = NULL; + + host = flb_sds_create_len(ECS_CREDENTIALS_HOST, ECS_CREDENTIALS_HOST_LEN); + if (!host) { + flb_errno(); + return NULL; + } + + path_var = getenv(ECS_CREDENTIALS_PATH_ENV_VAR); + if (path_var && strlen(path_var) > 0) { + path = flb_sds_create(path_var); + if (!path) { + flb_errno(); + flb_free(host); + return NULL; + } + + return flb_http_provider_create(config, host, path, generator); + } else { + flb_debug("[aws_credentials] Not initializing ECS Provider because" + " %s is not set", ECS_CREDENTIALS_PATH_ENV_VAR); + flb_sds_destroy(host); + return NULL; + } + +} + +static int http_credentials_request(struct flb_aws_provider_http + *implementation) +{ + char *response = NULL; + size_t response_len; + time_t expiration; + struct flb_aws_credentials *creds = NULL; + struct flb_aws_client *client = implementation->client; + struct flb_http_client *c = NULL; + + c = client->client_vtable->request(client, FLB_HTTP_GET, + implementation->path, NULL, 0, + NULL, 0); + + if (!c || c->resp.status != 200) { + flb_debug("[aws_credentials] http credentials request failed"); + if (c) { + flb_http_client_destroy(c); + } + return -1; + } + + response = c->resp.payload; + response_len = c->resp.payload_size; + + creds = flb_parse_http_credentials(response, response_len, &expiration); + if (!creds) { + flb_http_client_destroy(c); + return -1; + } + + /* destroy existing credentials */ + flb_aws_credentials_destroy(implementation->creds); + implementation->creds = NULL; + + implementation->creds = creds; + implementation->next_refresh = expiration - FLB_AWS_REFRESH_WINDOW; + flb_http_client_destroy(c); + return 0; +} + +/* + * All HTTP credentials endpoints (IMDS, ECS, custom) follow the same spec: + * { + * "AccessKeyId": "ACCESS_KEY_ID", + * "Expiration": "2019-12-18T21:27:58Z", + * "SecretAccessKey": "SECRET_ACCESS_KEY", + * "Token": "SECURITY_TOKEN_STRING" + * } + * (some implementations (IMDS) have additional fields) + * Returns NULL if any part of parsing was unsuccessful. + */ +struct flb_aws_credentials *flb_parse_http_credentials(char *response, + size_t response_len, + time_t *expiration) +{ + return flb_parse_json_credentials(response, response_len, AWS_HTTP_RESPONSE_TOKEN, + expiration); +} + +struct flb_aws_credentials *flb_parse_json_credentials(char *response, + size_t response_len, + char* session_token_field, + time_t *expiration) +{ + jsmntok_t *tokens = NULL; + const jsmntok_t *t = NULL; + char *current_token = NULL; + jsmn_parser parser; + int tokens_size = 50; + size_t size; + int ret; + struct flb_aws_credentials *creds = NULL; + int i = 0; + int len; + flb_sds_t tmp; + + /* + * Remove/reset existing value of expiration. + * Expiration should be in the response, but it is not + * strictly speaking needed. Fluent Bit logs a warning if it is missing. + */ + *expiration = -1; + + jsmn_init(&parser); + + size = sizeof(jsmntok_t) * tokens_size; + tokens = flb_calloc(1, size); + if (!tokens) { + goto error; + } + + ret = jsmn_parse(&parser, response, response_len, + tokens, tokens_size); + + if (ret == JSMN_ERROR_INVAL || ret == JSMN_ERROR_PART) { + flb_error("[aws_credentials] Could not parse credentials response" + " - invalid JSON."); + goto error; + } + + /* Shouldn't happen, but just in case, check for too many tokens error */ + if (ret == JSMN_ERROR_NOMEM) { + flb_error("[aws_credentials] Could not parse credentials response" + " - response contained more tokens than expected."); + goto error; + } + + /* return value is number of tokens parsed */ + tokens_size = ret; + + creds = flb_calloc(1, sizeof(struct flb_aws_credentials)); + if (!creds) { + flb_errno(); + goto error; + } + + /* + * jsmn will create an array of tokens like: + * key, value, key, value + */ + while (i < (tokens_size - 1)) { + t = &tokens[i]; + + if (t->start == -1 || t->end == -1 || (t->start == 0 && t->end == 0)) { + break; + } + + if (t->type == JSMN_STRING) { + current_token = &response[t->start]; + len = t->end - t->start; + + if (strncmp(current_token, AWS_CREDENTIAL_RESPONSE_ACCESS_KEY, len) == 0) + { + i++; + t = &tokens[i]; + current_token = &response[t->start]; + len = t->end - t->start; + if (creds->access_key_id != NULL) { + flb_error("Trying to double allocate access_key_id"); + goto error; + } + creds->access_key_id = flb_sds_create_len(current_token, len); + if (!creds->access_key_id) { + flb_errno(); + goto error; + } + continue; + } + if (strncmp(current_token, AWS_CREDENTIAL_RESPONSE_SECRET_KEY, len) == 0) + { + i++; + t = &tokens[i]; + current_token = &response[t->start]; + len = t->end - t->start; + if (creds->secret_access_key != NULL) { + flb_error("Trying to double allocate secret_access_key"); + goto error; + } + creds->secret_access_key = flb_sds_create_len(current_token, + len); + if (!creds->secret_access_key) { + flb_errno(); + goto error; + } + continue; + } + if (strncmp(current_token, session_token_field, len) == 0) { + i++; + t = &tokens[i]; + current_token = &response[t->start]; + len = t->end - t->start; + if (creds->session_token != NULL) { + flb_error("Trying to double allocate session_token"); + goto error; + } + creds->session_token = flb_sds_create_len(current_token, len); + if (!creds->session_token) { + flb_errno(); + goto error; + } + continue; + } + if (strncmp(current_token, AWS_CREDENTIAL_RESPONSE_EXPIRATION, len) == 0) + { + i++; + t = &tokens[i]; + current_token = &response[t->start]; + len = t->end - t->start; + tmp = flb_sds_create_len(current_token, len); + if (!tmp) { + flb_errno(); + goto error; + } + *expiration = flb_aws_cred_expiration(tmp); + flb_sds_destroy(tmp); + if (*expiration < 0) { + flb_warn("[aws_credentials] '%s' was invalid or " + "could not be parsed. Disabling auto-refresh of " + "credentials.", AWS_CREDENTIAL_RESPONSE_EXPIRATION); + } + } + } + + i++; + } + + if (creds->access_key_id == NULL) { + flb_error("[aws_credentials] Missing %s field in" + "credentials response", AWS_CREDENTIAL_RESPONSE_ACCESS_KEY); + goto error; + } + + if (creds->secret_access_key == NULL) { + flb_error("[aws_credentials] Missing %s field in" + "credentials response", AWS_CREDENTIAL_RESPONSE_SECRET_KEY); + goto error; + } + + flb_free(tokens); + return creds; + +error: + flb_aws_credentials_destroy(creds); + flb_free(tokens); + return NULL; +} diff --git a/fluent-bit/src/aws/flb_aws_credentials_log.h b/fluent-bit/src/aws/flb_aws_credentials_log.h new file mode 100644 index 00000000..6e9f0806 --- /dev/null +++ b/fluent-bit/src/aws/flb_aws_credentials_log.h @@ -0,0 +1,39 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2021 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_AWS_CREDENTIALS_LOG_H + +#define FLB_AWS_CREDENTIALS_LOG_H + +#include <fluent-bit/flb_log.h> + +#define AWS_CREDS_ERROR(format, ...) flb_error("[aws_credentials] " format, ##__VA_ARGS__) +#define AWS_CREDS_WARN(format, ...) flb_warn("[aws_credentials] " format, ##__VA_ARGS__) +#define AWS_CREDS_DEBUG(format, ...) flb_debug("[aws_credentials] " format, ##__VA_ARGS__) + +#define AWS_CREDS_ERROR_OR_DEBUG(debug_only, format, ...) do {\ + if (debug_only == FLB_TRUE) {\ + AWS_CREDS_DEBUG(format, ##__VA_ARGS__);\ + }\ + else {\ + AWS_CREDS_ERROR(format, ##__VA_ARGS__);\ + }\ +} while (0) + +#endif diff --git a/fluent-bit/src/aws/flb_aws_credentials_process.c b/fluent-bit/src/aws/flb_aws_credentials_process.c new file mode 100644 index 00000000..44c024ca --- /dev/null +++ b/fluent-bit/src/aws/flb_aws_credentials_process.c @@ -0,0 +1,783 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2021 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_aws_credentials.h> + +#include "flb_aws_credentials_log.h" + +#include <fluent-bit/flb_compat.h> +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_pipe.h> +#include <fluent-bit/flb_time.h> + +#include <fcntl.h> +#include <poll.h> +#include <stdlib.h> +#include <sys/wait.h> + +#define DEV_NULL "/dev/null" + +#define MS_PER_SEC 1000 +#define MICROS_PER_MS 1000 +#define NS_PER_MS 1000000 + +#define CREDENTIAL_PROCESS_TIMEOUT_MS 60000 +#define CREDENTIAL_PROCESS_BUFFER_SIZE 8 * 1024 + +#define WAITPID_POLL_FREQUENCY_MS 20 +#define WAITPID_TIMEOUT_MS 10 * WAITPID_POLL_FREQUENCY_MS + +#define CREDENTIAL_PROCESS_RESPONSE_SESSION_TOKEN "SessionToken" + +/* Declarations */ +struct token_array; +static int new_token_array(struct token_array *arr, int cap); +static int append_token(struct token_array *arr, char* elem); + +struct readbuf; +static int new_readbuf(struct readbuf* buf, int cap); + +static int get_monotonic_time(struct flb_time* tm); + +static char* ltrim(char* input); +static int scan_credential_process_token_quoted(char *input); +static int scan_credential_process_token_unquoted(char *input); +static int credential_process_token_count(char* process); +static int parse_credential_process_token(char **input, char** out_token); + +static int read_until_block(char* name, flb_pipefd_t fd, struct readbuf* buf); +static int waitpid_timeout(char* name, pid_t pid, int* wstatus); + +struct process; +static int new_process(struct process* p, char** args); +static void exec_process_child(struct process* p); +static int exec_process(struct process* p); +static int read_from_process(struct process* p, struct readbuf* buf); +static int wait_process(struct process* p); +static void destroy_process(struct process* p); +/* End Declarations */ + +struct token_array { + char** tokens; + int len; + int cap; +}; + +/* + * Initializes a new token array with the given capacity. + * Returns 0 on success and < 0 on failure. + * The caller is responsible for calling `flb_free(arr->tokens)`. + */ +static int new_token_array(struct token_array *arr, int cap) +{ + *arr = (struct token_array) { .len = 0, .cap = cap }; + arr->tokens = flb_malloc(cap * sizeof(char*)); + if (!arr->tokens) { + flb_errno(); + return -1; + } + return 0; +} + +/* + * Appends the given token to the array, if there is capacity. + * Returns 0 on success and < 0 on failure. + */ +static int append_token(struct token_array *arr, char* token) +{ + if (arr->len >= arr->cap) { + /* This means there is a bug in credential_process_token_count. */ + AWS_CREDS_ERROR("append_token called on full token_array"); + return -1; + } + + (arr->tokens)[arr->len] = token; + arr->len++; + return 0; +} + +struct readbuf { + char* buf; + int len; + int cap; +}; + +/* + * Initializes a new buffer with the given capacity. + * Returns 0 on success and < 0 on failure. + * The caller is responsible for calling `flb_free(buf->buf)`. + */ +static int new_readbuf(struct readbuf* buf, int cap) +{ + *buf = (struct readbuf) { .len = 0, .cap = cap }; + buf->buf = flb_malloc(cap * sizeof(char)); + if (!buf->buf) { + flb_errno(); + return -1; + } + return 0; +} + +/* + * Fetches the current time from the monotonic clock. + * Returns 0 on success and < 0 on failure. + * This is useful for calculating deadlines that are not sensitive to changes + * in the system clock. + */ +static int get_monotonic_time(struct flb_time* tm) +{ + struct timespec ts; + if (clock_gettime(CLOCK_MONOTONIC, &ts) < 0) { + flb_errno(); + return -1; + } + flb_time_set(tm, ts.tv_sec, ts.tv_nsec); + return 0; +} + +/* + * Skips over any leading spaces in the input string, returning the remainder. + * If the entire string is consumed, returns the empty string (not NULL). + */ +static char* ltrim(char* input) +{ + while (*input == ' ') { + input++; + } + return input; +} + +/* + * Scans the unquoted token string at the start of the input string. + * The input must be the start of an unquoted token. + * Returns the token length on success, and < 0 on failure. + * This function does not add a null terminator to the token. + * The token length is the index where the null terminator must be placed. + * If the entire input is consumed, returns the length of the input string + * (excluding the null terminator). + */ +static int scan_credential_process_token_unquoted(char *input) +{ + int i; + + for (i = 0; input[i] != ' '; i++) { + if (input[i] == '\0') { + break; + } + if (input[i] == '"') { + AWS_CREDS_ERROR("unexpected quote in credential_process"); + return -1; + } + } + + return i; +} + +/* + * Scans the quoted token at the start of the input string. + * The input must be the string after the opening quote. + * Returns the token length on success, and < 0 on failure. + * This function does not add a null terminator to the token. + * The token length is the index where the null terminator must be placed. + */ +static int scan_credential_process_token_quoted(char *input) +{ + int i; + + for (i = 0; input[i] != '"'; i++) { + if (input[i] == '\0') { + AWS_CREDS_ERROR("unterminated quote in credential_process"); + return -1; + } + } + + if (input[i+1] != '\0' && input[i+1] != ' ') { + AWS_CREDS_ERROR("unexpected character %c after closing quote in " + "credential_process", input[i+1]); + return -1; + } + + return i; +} + +/* + * Counts the number of tokens in the input string, which is assumed to be the + * credential_process from the config file. + * Returns < 0 on failure. + */ +static int credential_process_token_count(char* process) +{ + int count = 0; + int i; + + while (1) { + process = ltrim(process); + if (*process == '\0') { + break; + } + + count++; + + if (*process == '"') { + process++; + i = scan_credential_process_token_quoted(process); + } + else { + i = scan_credential_process_token_unquoted(process); + } + + if (i < 0) { + return -1; + } + + process += i; + if (*process != '\0') { + process++; + } + } + + return count; +} + +/* + * Parses the input string, which is assumed to be the credential_process + * from the config file. The next token will be put in *out_token, and the + * remaining unprocessed input will be put in *input. + * Returns 0 on success and < 0 on failure. + * If there is an error, the value of *input and *out_token is not defined. + * If it succeeds and *out_token is NULL, then there are no more tokens, + * and this function should not be called again. + * *out_token will be some substring of the original *input, so it should not + * be freed. + */ +static int parse_credential_process_token(char** input, char** out_token) +{ + *out_token = NULL; + int i; + + if (!*input) { + AWS_CREDS_ERROR("parse_credential_process_token called after yielding last token"); + return -1; + } + + *input = ltrim(*input); + + if (**input == '\0') { + *input = NULL; + *out_token = NULL; + return 0; + } + + if (**input == '"') { + (*input)++; + i = scan_credential_process_token_quoted(*input); + } + else { + i = scan_credential_process_token_unquoted(*input); + } + + if (i < 0) { + return -1; + } + + *out_token = *input; + *input += i; + + if (**input != '\0') { + **input = '\0'; + (*input)++; + } + + return 0; +} + +/* See <fluent-bit/flb_aws_credentials.h>. */ +char** parse_credential_process(char* input) +{ + char* next_token = NULL; + struct token_array arr = { 0 }; + int token_count = credential_process_token_count(input); + + if (token_count < 0) { + goto error; + } + + /* Add one extra capacity for the NULL terminator. */ + if (new_token_array(&arr, token_count + 1) < 0) { + goto error; + } + + while (1) { + if (parse_credential_process_token(&input, &next_token) < 0) { + goto error; + } + + if (!next_token) { + break; + } + + if (append_token(&arr, next_token) < 0) { + goto error; + } + } + + if (append_token(&arr, NULL) < 0) { + goto error; + } + + return arr.tokens; + +error: + flb_free(arr.tokens); + return NULL; +} + +/* + * Reads from the pipe into the buffer until no more input is available. + * If the input is exhausted (EOF), returns 0. + * If reading would block (EWOULDBLOCK/EAGAIN), returns > 0. + * If an error occurs or the buffer is full, returns < 0. + */ +static int read_until_block(char* name, flb_pipefd_t fd, struct readbuf* buf) +{ + int result = -1; + + while (1) { + if (buf->len >= buf->cap) { + AWS_CREDS_ERROR("credential_process %s exceeded max buffer size", name); + return -1; + } + + result = flb_pipe_r(fd, buf->buf + buf->len, buf->cap - buf->len); + if (result < 0) { + if (FLB_PIPE_WOULDBLOCK()) { + return 1; + } + flb_errno(); + return -1; + } + else if (result == 0) { /* EOF */ + return 0; + } + else { + buf->len += result; + } + } +} + +/* + * Polls waitpid until the given process exits, or the timeout is reached. + * Returns 0 on success and < 0 on failure. + */ +static int waitpid_timeout(char* name, pid_t pid, int* wstatus) +{ + int result = -1; + int retries = WAITPID_TIMEOUT_MS / WAITPID_POLL_FREQUENCY_MS; + + while (1) { + result = waitpid(pid, wstatus, WNOHANG); + if (result < 0) { + flb_errno(); + return -1; + } + + if (result > 0) { + return 0; + } + + if (retries <= 0) { + AWS_CREDS_ERROR("timed out waiting for credential_process %s to exit", name); + return -1; + } + retries--; + + usleep(WAITPID_POLL_FREQUENCY_MS * MICROS_PER_MS); + } +} + +struct process { + int initialized; + char** args; + int stdin_stream; + flb_pipefd_t stdout_stream[2]; + int stderr_stream; + pid_t pid; +}; + +/* + * Initializes a new process with the given args. + * args is assumed to be a NULL terminated array, for use with execvp. + * It must have a least one element, and the first element is assumed to be the + * name/path of the executable. + * Returns 0 on success and < 0 on failure. + * The caller is responsible for calling `destroy_process(p)`. + */ +static int new_process(struct process* p, char** args) +{ + *p = (struct process) { + .initialized = FLB_TRUE, + .args = args, + .stdin_stream = -1, + .stdout_stream = {-1, -1}, + .stderr_stream = -1, + .pid = -1, + }; + + while ((p->stdin_stream = open(DEV_NULL, O_RDONLY|O_CLOEXEC)) < 0) { + if (errno != EINTR) { + flb_errno(); + return -1; + } + } + + if (flb_pipe_create(p->stdout_stream) < 0) {; + flb_errno(); + return -1; + } + + if (fcntl(p->stdout_stream[0], F_SETFL, O_CLOEXEC) < 0) { + flb_errno(); + return -1; + } + + if (fcntl(p->stdout_stream[1], F_SETFL, O_CLOEXEC) < 0) { + flb_errno(); + return -1; + } + + while ((p->stderr_stream = open(DEV_NULL, O_WRONLY|O_CLOEXEC)) < 0) { + if (errno != EINTR) { + flb_errno(); + return -1; + } + } + + return 0; +} + +/* + * Sets up the credential_process's stdin, stdout, and stderr, and exec's + * the actual process. + * For this function to return at all is an error. + * This function should not be called more than once. + */ +static void exec_process_child(struct process* p) +{ + while ((dup2(p->stdin_stream, STDIN_FILENO) < 0)) { + if (errno != EINTR) { + return; + } + } + while ((dup2(p->stdout_stream[1], STDOUT_FILENO) < 0)) { + if (errno != EINTR) { + return; + } + } + while ((dup2(p->stderr_stream, STDERR_FILENO) < 0)) { + if (errno != EINTR) { + return; + } + } + + close(p->stdin_stream); + flb_pipe_close(p->stdout_stream[0]); + flb_pipe_close(p->stdout_stream[1]); + close(p->stderr_stream); + + execvp(p->args[0], p->args); +} + +/* + * Forks the credential_process, but does not wait for it to finish. + * Returns 0 on success and < 0 on failure. + * This function should not be called more than once. + */ +static int exec_process(struct process* p) +{ + AWS_CREDS_DEBUG("executing credential_process %s", p->args[0]); + + p->pid = fork(); + if (p->pid < 0) { + flb_errno(); + return -1; + } + + if (p->pid == 0) { + exec_process_child(p); + + /* It should not be possible to reach this under normal circumstances. */ + exit(EXIT_FAILURE); + } + + close(p->stdin_stream); + p->stdin_stream = -1; + + flb_pipe_close(p->stdout_stream[1]); + p->stdout_stream[1] = -1; + + close(p->stderr_stream); + p->stderr_stream = -1; + + return 0; +} + +/* + * Reads from the credential_process's stdout into the given buffer. + * Returns 0 on success, and < 0 on failure or timeout. + * This function should not be called more than once. + */ +static int read_from_process(struct process* p, struct readbuf* buf) +{ + int result = -1; + struct pollfd pfd; + struct flb_time start, timeout, deadline, now, remaining; + int remaining_ms; + + if (fcntl(p->stdout_stream[0], F_SETFL, O_NONBLOCK) < 0) { + flb_errno(); + return -1; + } + + if (get_monotonic_time(&start) < 0) { + return -1; + } + + flb_time_set(&timeout, + (time_t) (CREDENTIAL_PROCESS_TIMEOUT_MS / MS_PER_SEC), + ((long) (CREDENTIAL_PROCESS_TIMEOUT_MS % MS_PER_SEC)) * NS_PER_MS); + + /* deadline = start + timeout */ + flb_time_add(&start, &timeout, &deadline); + + while (1) { + pfd = (struct pollfd) { + .fd = p->stdout_stream[0], + .events = POLLIN, + }; + + if (get_monotonic_time(&now) < 0) { + return -1; + } + + /* remaining = deadline - now */ + if (flb_time_diff(&deadline, &now, &remaining) < 0) { + AWS_CREDS_ERROR("credential_process %s timed out", p->args[0]); + return -1; + } + + /* + * poll uses millisecond resolution for the timeout. + * If there is less than a millisecond left, then for simplicity we'll just + * declare that it timed out. + */ + remaining_ms = (int) (flb_time_to_nanosec(&remaining) / NS_PER_MS); + if (remaining_ms <= 0) { + AWS_CREDS_ERROR("credential_process %s timed out", p->args[0]); + return -1; + } + + result = poll(&pfd, 1, remaining_ms); + if (result < 0) { + if (errno != EINTR) { + flb_errno(); + return -1; + } + continue; + } + + if (result == 0) { + AWS_CREDS_ERROR("credential_process %s timed out", p->args[0]); + return -1; + } + + if ((pfd.revents & POLLNVAL) == POLLNVAL) { + AWS_CREDS_ERROR("credential_process %s POLLNVAL", p->args[0]); + return -1; + } + + if ((pfd.revents & POLLERR) == POLLERR) { + AWS_CREDS_ERROR("credential_process %s POLLERR", p->args[0]); + return -1; + } + + if ((pfd.revents & POLLIN) == POLLIN || (pfd.revents & POLLHUP) == POLLHUP) { + result = read_until_block(p->args[0], p->stdout_stream[0], buf); + if (result <= 0) { + return result; + } + } + } +} + +/* + * Waits for the process to exit, up to a timeout. + * Returns 0 on success and < 0 on failure. + * This function should not be called more than once. + */ +static int wait_process(struct process* p) +{ + int wstatus; + + if (waitpid_timeout(p->args[0], p->pid, &wstatus) < 0) { + return -1; + } + p->pid = -1; + + if (!WIFEXITED(wstatus)) { + AWS_CREDS_ERROR("credential_process %s did not terminate normally", p->args[0]); + return -1; + } + + if (WEXITSTATUS(wstatus) != EXIT_SUCCESS) { + AWS_CREDS_ERROR("credential_process %s exited with status %d", p->args[0], + WEXITSTATUS(wstatus)); + return -1; + } + + AWS_CREDS_DEBUG("credential_process %s exited successfully", p->args[0]); + return 0; +} + +/* + * Release all resources associated with this process. + * Calling this function multiple times is a no-op. + * Since the process does not own p->args, it does not free it. + * Note that p->args will be set to NULL, so the caller must hold onto + * it separately in order to free it. + */ +static void destroy_process(struct process* p) +{ + if (p->initialized) { + if (p->stdin_stream >= 0) { + close(p->stdin_stream); + p->stdin_stream = -1; + } + if (p->stdout_stream[0] >= 0) { + close(p->stdout_stream[0]); + p->stdout_stream[0] = -1; + } + if (p->stdout_stream[1] >= 0) { + close(p->stdout_stream[1]); + p->stdout_stream[1] = -1; + } + if (p->stderr_stream >= 0) { + close(p->stderr_stream); + p->stderr_stream = -1; + } + + if (p->pid > 0) { + if (kill(p->pid, SIGKILL) < 0) { + flb_errno(); + AWS_CREDS_ERROR("could not kill credential_process %s (pid=%d) " + "during cleanup", p->args[0], p->pid); + } + else { + while (waitpid(p->pid, NULL, 0) < 0) { + if (errno != EINTR) { + flb_errno(); + break; + } + } + } + p->pid = -1; + } + + p->args = NULL; + + p->initialized = FLB_FALSE; + } +} + +/* See <fluent-bit/flb_aws_credentials.h>. */ +int exec_credential_process(char* process, struct flb_aws_credentials** creds, + time_t* expiration) +{ + char** args = NULL; + int result = -1; + struct process p = { 0 }; + struct readbuf buf = { 0 }; + *creds = NULL; + *expiration = 0; + + args = parse_credential_process(process); + if (!args) { + result = -1; + goto end; + } + + if (!args[0]) { + AWS_CREDS_ERROR("invalid credential_process"); + result = -1; + goto end; + } + + if (new_process(&p, args) < 0) { + result = -1; + goto end; + } + + if (new_readbuf(&buf, CREDENTIAL_PROCESS_BUFFER_SIZE) < 0) { + result = -1; + goto end; + } + + if (exec_process(&p) < 0) { + result = -1; + goto end; + } + + if (read_from_process(&p, &buf) < 0) { + result = -1; + goto end; + } + + if (wait_process(&p) < 0) { + result = -1; + goto end; + } + + *creds = flb_parse_json_credentials(buf.buf, buf.len, + CREDENTIAL_PROCESS_RESPONSE_SESSION_TOKEN, + expiration); + if (!*creds) { + AWS_CREDS_ERROR("could not parse credentials from credential_process %s", args[0]); + result = -1; + goto end; + } + + AWS_CREDS_DEBUG("successfully parsed credentials from credential_process %s", args[0]); + + result = 0; + +end: + destroy_process(&p); + + flb_free(buf.buf); + buf.buf = NULL; + + flb_free(args); + args = NULL; + + if (result < 0) { + flb_aws_credentials_destroy(*creds); + *creds = NULL; + } + + return result; +} diff --git a/fluent-bit/src/aws/flb_aws_credentials_profile.c b/fluent-bit/src/aws/flb_aws_credentials_profile.c new file mode 100644 index 00000000..6b3ab5db --- /dev/null +++ b/fluent-bit/src/aws/flb_aws_credentials_profile.c @@ -0,0 +1,753 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "flb_aws_credentials_log.h" + +#include <fluent-bit/flb_aws_credentials.h> +#include <fluent-bit/flb_aws_util.h> +#include <fluent-bit/flb_http_client.h> +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_sds.h> + +#include <stdlib.h> +#include <string.h> +#include <time.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <ctype.h> + +#define ACCESS_KEY_PROPERTY_NAME "aws_access_key_id" +#define SECRET_KEY_PROPERTY_NAME "aws_secret_access_key" +#define SESSION_TOKEN_PROPERTY_NAME "aws_session_token" +#define CREDENTIAL_PROCESS_PROPERTY_NAME "credential_process" + +#define AWS_PROFILE "AWS_PROFILE" +#define AWS_DEFAULT_PROFILE "AWS_DEFAULT_PROFILE" + +#define AWS_CONFIG_FILE "AWS_CONFIG_FILE" +#define AWS_SHARED_CREDENTIALS_FILE "AWS_SHARED_CREDENTIALS_FILE" + +#define DEFAULT_PROFILE "default" +#define CONFIG_PROFILE_PREFIX "profile " +#define CONFIG_PROFILE_PREFIX_LEN (sizeof(CONFIG_PROFILE_PREFIX)-1) + +/* Declarations */ +struct flb_aws_provider_profile; +static int refresh_credentials(struct flb_aws_provider_profile *implementation, + int debug_only); + +static int get_aws_shared_file_path(flb_sds_t* field, char* env_var, char* home_aws_path); + +static int parse_config_file(char *buf, char* profile, struct flb_aws_credentials** creds, + time_t* expiration, int debug_only); +static int parse_credentials_file(char *buf, char *profile, + struct flb_aws_credentials *creds, int debug_only); + +static int get_shared_config_credentials(char* config_path, + char*profile, + struct flb_aws_credentials** creds, + time_t* expiration, + int debug_only); +static int get_shared_credentials(char* credentials_path, + char* profile, + struct flb_aws_credentials** creds, + int debug_only); + +static flb_sds_t parse_property_value(char *s, int debug_only); +static char *parse_property_line(char *line); +static int has_profile(char *line, char* profile, int debug_only); +static int is_profile_line(char *line); +static int config_file_profile_matches(char *line, char *profile); + +/* + * A provider that reads from the shared credentials file. + */ +struct flb_aws_provider_profile { + struct flb_aws_credentials *creds; + time_t next_refresh; + + flb_sds_t profile; + flb_sds_t config_path; + flb_sds_t credentials_path; +}; + +struct flb_aws_credentials *get_credentials_fn_profile(struct flb_aws_provider + *provider) +{ + struct flb_aws_credentials *creds; + int ret; + struct flb_aws_provider_profile *implementation = provider->implementation; + + /* + * If next_refresh <= 0, it means we don't know how long the credentials + * are valid for. So we won't refresh them unless explicitly asked + * via refresh_fn_profile. + */ + if (!implementation->creds || (implementation->next_refresh > 0 && + time(NULL) >= implementation->next_refresh)) { + AWS_CREDS_DEBUG("Retrieving credentials for AWS Profile %s", + implementation->profile); + if (try_lock_provider(provider) == FLB_TRUE) { + ret = refresh_credentials(implementation, FLB_FALSE); + unlock_provider(provider); + if (ret < 0) { + AWS_CREDS_ERROR("Failed to retrieve credentials for AWS Profile %s", + implementation->profile); + return NULL; + } + } else { + AWS_CREDS_WARN("Another thread is refreshing credentials, will retry"); + return NULL; + } + } + + creds = flb_calloc(1, sizeof(struct flb_aws_credentials)); + if (!creds) { + flb_errno(); + goto error; + } + + creds->access_key_id = flb_sds_create(implementation->creds->access_key_id); + if (!creds->access_key_id) { + flb_errno(); + goto error; + } + + creds->secret_access_key = flb_sds_create(implementation-> + creds->secret_access_key); + if (!creds->secret_access_key) { + flb_errno(); + goto error; + } + + if (implementation->creds->session_token) { + creds->session_token = flb_sds_create(implementation-> + creds->session_token); + if (!creds->session_token) { + flb_errno(); + goto error; + } + + } else { + creds->session_token = NULL; + } + + return creds; + +error: + flb_aws_credentials_destroy(creds); + return NULL; +} + +int refresh_fn_profile(struct flb_aws_provider *provider) +{ + struct flb_aws_provider_profile *implementation = provider->implementation; + int ret = -1; + AWS_CREDS_DEBUG("Refresh called on the profile provider"); + if (try_lock_provider(provider) == FLB_TRUE) { + ret = refresh_credentials(implementation, FLB_FALSE); + unlock_provider(provider); + return ret; + } + return ret; +} + +int init_fn_profile(struct flb_aws_provider *provider) +{ + struct flb_aws_provider_profile *implementation = provider->implementation; + int ret = -1; + AWS_CREDS_DEBUG("Init called on the profile provider"); + if (try_lock_provider(provider) == FLB_TRUE) { + ret = refresh_credentials(implementation, FLB_TRUE); + unlock_provider(provider); + return ret; + } + return ret; +} + +/* + * Sync and Async are no-ops for the profile provider because it does not + * make network IO calls + */ +void sync_fn_profile(struct flb_aws_provider *provider) +{ + return; +} + +void async_fn_profile(struct flb_aws_provider *provider) +{ + return; +} + +void upstream_set_fn_profile(struct flb_aws_provider *provider, + struct flb_output_instance *ins) +{ + return; +} + +void destroy_fn_profile(struct flb_aws_provider *provider) +{ + struct flb_aws_provider_profile *implementation = provider->implementation; + + if (implementation) { + if (implementation->creds) { + flb_aws_credentials_destroy(implementation->creds); + } + + if (implementation->profile) { + flb_sds_destroy(implementation->profile); + } + + if (implementation->config_path) { + flb_sds_destroy(implementation->config_path); + } + + if (implementation->credentials_path) { + flb_sds_destroy(implementation->credentials_path); + } + + flb_free(implementation); + provider->implementation = NULL; + } + + return; +} + +static struct flb_aws_provider_vtable profile_provider_vtable = { + .get_credentials = get_credentials_fn_profile, + .init = init_fn_profile, + .refresh = refresh_fn_profile, + .destroy = destroy_fn_profile, + .sync = sync_fn_profile, + .async = async_fn_profile, + .upstream_set = upstream_set_fn_profile, +}; + +struct flb_aws_provider *flb_profile_provider_create(char* profile) +{ + struct flb_aws_provider *provider = NULL; + struct flb_aws_provider_profile *implementation = NULL; + int result = -1; + + provider = flb_calloc(1, sizeof(struct flb_aws_provider)); + + if (!provider) { + flb_errno(); + goto error; + } + + pthread_mutex_init(&provider->lock, NULL); + + implementation = flb_calloc(1, + sizeof( + struct flb_aws_provider_profile)); + + if (!implementation) { + flb_errno(); + goto error; + } + + provider->provider_vtable = &profile_provider_vtable; + provider->implementation = implementation; + + result = get_aws_shared_file_path(&implementation->config_path, AWS_CONFIG_FILE, + "/.aws/config"); + if (result < 0) { + goto error; + } + + result = get_aws_shared_file_path(&implementation->credentials_path, + AWS_SHARED_CREDENTIALS_FILE, "/.aws/credentials"); + if (result < 0) { + goto error; + } + + if (!implementation->config_path && !implementation->credentials_path) { + AWS_CREDS_WARN("Failed to initialize profile provider: " + "HOME, %s, and %s not set.", + AWS_CONFIG_FILE, AWS_SHARED_CREDENTIALS_FILE); + goto error; + } + + /* AWS profile name. */ + if (profile == NULL) { + profile = getenv(AWS_PROFILE); + } + if (profile && strlen(profile) > 0) { + goto set_profile; + } + + profile = getenv(AWS_DEFAULT_PROFILE); + if (profile && strlen(profile) > 0) { + goto set_profile; + } + + profile = DEFAULT_PROFILE; + +set_profile: + implementation->profile = flb_sds_create(profile); + if (!implementation->profile) { + flb_errno(); + goto error; + } + + return provider; + +error: + flb_aws_provider_destroy(provider); + return NULL; +} + + +/* + * Fetches the path of either the shared config file or the shared credentials file. + * Returns 0 on success and < 0 on failure. + * On success, the result will be stored in *field. + * + * If the given environment variable is set, then its value will be used verbatim. + * Else if $HOME is set, then it will be concatenated with home_aws_path. + * If neither is set, then *field will be set to NULL. This is not considered a failure. + * + * In practice, env_var will be "AWS_CONFIG_FILE" or "AWS_SHARED_CREDENTIALS_FILE", + * and home_aws_path will be "/.aws/config" or "/.aws/credentials". + */ +static int get_aws_shared_file_path(flb_sds_t* field, char* env_var, char* home_aws_path) +{ + char* path = NULL; + int result = -1; + flb_sds_t value = NULL; + + path = getenv(env_var); + if (path && *path) { + value = flb_sds_create(path); + if (!value) { + flb_errno(); + goto error; + } + } else { + path = getenv("HOME"); + if (path && *path) { + value = flb_sds_create(path); + if (!value) { + flb_errno(); + goto error; + } + + if (path[strlen(path) - 1] == '/') { + home_aws_path++; + } + result = flb_sds_cat_safe(&value, home_aws_path, strlen(home_aws_path)); + if (result < 0) { + flb_errno(); + goto error; + } + } + } + + *field = value; + return 0; + +error: + flb_sds_destroy(value); + return -1; +} + +static int is_profile_line(char *line) { + if (line[0] == '[') { + return FLB_TRUE; + } + return FLB_FALSE; +} + +/* Called on lines that have is_profile_line == True */ +static int has_profile(char *line, char* profile, int debug_only) { + char *end_bracket = strchr(line, ']'); + if (!end_bracket) { + if (debug_only) { + AWS_CREDS_DEBUG("Profile header has no ending bracket:\n %s", line); + } + else { + AWS_CREDS_WARN("Profile header has no ending bracket:\n %s", line); + } + return FLB_FALSE; + } + *end_bracket = '\0'; + + if (strcmp(&line[1], profile) == 0) { + return FLB_TRUE; + } + + return FLB_FALSE; +} + +/* + * Sets a null byte such that line becomes the property name + * Returns a pointer to the rest of the line (the value), if successful. + */ +static char *parse_property_line(char *line) { + int len = strlen(line); + int found_delimeter = FLB_FALSE; + int i = 0; + + if (isspace(line[0])) { + /* property line can not start with whitespace */ + return NULL; + } + + /* + * Go through the line char by char, once we find whitespace/= we are + * passed the property name. Return the first char of the property value. + * There should be a single "=" separating name and value. + */ + for (i=0; i < (len - 1); i++) { + if (isspace(line[i])) { + line[i] = '\0'; + } else if (found_delimeter == FLB_FALSE && line[i] == '=') { + found_delimeter = FLB_TRUE; + line[i] = '\0'; + } else if (found_delimeter == FLB_TRUE) { + return &line[i]; + } + } + + return NULL; +} + +/* called on the rest of a line after parse_property_line is called */ +static flb_sds_t parse_property_value(char *s, int debug_only) { + int len = strlen(s); + int i = 0; + char *val = NULL; + flb_sds_t prop; + + for (i=0; i < len; i++) { + if (isspace(s[i])) { + s[i] = '\0'; + continue; + } else if (!val) { + val = &s[i]; + } + } + + if (!val) { + AWS_CREDS_ERROR_OR_DEBUG(debug_only, "Could not parse credential value from %s", s); + } + + prop = flb_sds_create(val); + if (!prop) { + flb_errno(); + return NULL; + } + + return prop; +} + +static int config_file_profile_matches(char *line, char *profile) { + char *current_profile = line + 1; + char* current_profile_end = strchr(current_profile, ']'); + + if (!current_profile_end) { + return FLB_FALSE; + } + *current_profile_end = '\0'; + + /* + * Non-default profiles look like `[profile <name>]`. + * The default profile can look like `[profile default]` or just `[default]`. + * This is different than the credentials file, where everything is `[<name>]`. + */ + if (strncmp(current_profile, CONFIG_PROFILE_PREFIX, CONFIG_PROFILE_PREFIX_LEN) != 0) { + if (strcmp(current_profile, DEFAULT_PROFILE) != 0) { + /* This is not a valid profile line. */ + return FLB_FALSE; + } + } else { + current_profile += CONFIG_PROFILE_PREFIX_LEN; + } + + if (strcmp(current_profile, profile) == 0) { + return FLB_TRUE; + } + return FLB_FALSE; +} + +static int parse_config_file(char *buf, char* profile, struct flb_aws_credentials** creds, + time_t* expiration, int debug_only) +{ + char *line = NULL; + char *line_end = NULL; + char *prop_val = NULL; + char *credential_process = NULL; + int found_profile = FLB_FALSE; + + for (line = buf; line[0] != '\0'; line = buf) { + /* + * Find the next newline and replace it with a null terminator. + * That way we can easily manipulate the current line as a string. + */ + line_end = strchr(line, '\n'); + if (line_end) { + *line_end = '\0'; + buf = line_end + 1; + } else { + buf = ""; + } + + if (found_profile != FLB_TRUE) { + if (is_profile_line(line) != FLB_TRUE) { + continue; + } + if (config_file_profile_matches(line, profile) != FLB_TRUE) { + continue; + } + found_profile = FLB_TRUE; + } else { + if (is_profile_line(line) == FLB_TRUE) { + break; + } + prop_val = parse_property_line(line); + if (strcmp(line, CREDENTIAL_PROCESS_PROPERTY_NAME) == 0) { + credential_process = prop_val; + } + } + } + + if (credential_process) { +#ifdef FLB_HAVE_AWS_CREDENTIAL_PROCESS + if (exec_credential_process(credential_process, creds, expiration) < 0) { + return -1; + } +#else + AWS_CREDS_WARN("credential_process not supported for this platform"); + return -1; +#endif + } + + return 0; +} + +/* + * Parses a shared credentials file. + * Expects the contents of 'creds' to be initialized to NULL (i.e use calloc). + */ +static int parse_credentials_file(char *buf, char *profile, + struct flb_aws_credentials *creds, int debug_only) +{ + char *line; + char *line_end; + char *prop_val = NULL; + int found_profile = FLB_FALSE; + + line = buf; + + while (line[0] != '\0') { + /* turn the line into a C string */ + line_end = strchr(line, '\n'); + if (line_end) { + *line_end = '\0'; + } + + if (is_profile_line(line) == FLB_TRUE) { + if (found_profile == FLB_TRUE) { + break; + } + if (has_profile(line, profile, debug_only)) { + found_profile = FLB_TRUE; + } + } else { + prop_val = parse_property_line(line); + if (prop_val && found_profile == FLB_TRUE) { + if (strcmp(line, ACCESS_KEY_PROPERTY_NAME) == 0) { + creds->access_key_id = parse_property_value(prop_val, + debug_only); + } + if (strcmp(line, SECRET_KEY_PROPERTY_NAME) == 0) { + creds->secret_access_key = parse_property_value(prop_val, + debug_only); + } + if (strcmp(line, SESSION_TOKEN_PROPERTY_NAME) == 0) { + creds->session_token = parse_property_value(prop_val, + debug_only); + } + } + } + + /* advance to next line */ + if (line_end) { + line = line_end + 1; + } else { + break; + } + } + + if (creds->access_key_id && creds->secret_access_key) { + return 0; + } + AWS_CREDS_ERROR_OR_DEBUG(debug_only, "%s and %s keys not parsed in shared " + "credentials file for profile %s.", ACCESS_KEY_PROPERTY_NAME, + SECRET_KEY_PROPERTY_NAME, profile); + return -1; +} + +static int get_shared_config_credentials(char* config_path, + char*profile, + struct flb_aws_credentials** creds, + time_t* expiration, + int debug_only) { + int result = -1; + char* buf = NULL; + size_t size; + *creds = NULL; + *expiration = 0; + + AWS_CREDS_DEBUG("Reading shared config file."); + + if (flb_read_file(config_path, &buf, &size) < 0) { + if (errno == ENOENT) { + AWS_CREDS_DEBUG("Shared config file %s does not exist", config_path); + result = 0; + goto end; + } + flb_errno(); + AWS_CREDS_ERROR_OR_DEBUG(debug_only, "Could not read shared config file %s", + config_path); + result = -1; + goto end; + } + + if (parse_config_file(buf, profile, creds, expiration, debug_only) < 0) { + result = -1; + goto end; + } + + result = 0; + +end: + flb_free(buf); + return result; +} + +static int get_shared_credentials(char* credentials_path, + char* profile, + struct flb_aws_credentials** creds, + int debug_only) { + int result = -1; + char* buf = NULL; + size_t size; + *creds = NULL; + + *creds = flb_calloc(1, sizeof(struct flb_aws_credentials)); + if (!*creds) { + flb_errno(); + result = -1; + goto end; + } + + AWS_CREDS_DEBUG("Reading shared credentials file."); + + if (flb_read_file(credentials_path, &buf, &size) < 0) { + if (errno == ENOENT) { + AWS_CREDS_ERROR_OR_DEBUG(debug_only, "Shared credentials file %s does not exist", + credentials_path); + } else { + flb_errno(); + AWS_CREDS_ERROR_OR_DEBUG(debug_only, "Could not read shared credentials file %s", + credentials_path); + } + result = -1; + goto end; + } + + if (parse_credentials_file(buf, profile, *creds, debug_only) < 0) { + AWS_CREDS_ERROR_OR_DEBUG(debug_only, "Could not parse shared credentials file: " + "valid profile with name '%s' not found", profile); + result = -1; + goto end; + } + + result = 0; + +end: + flb_free(buf); + + if (result < 0) { + flb_aws_credentials_destroy(*creds); + *creds = NULL; + } + + return result; +} + +static int refresh_credentials(struct flb_aws_provider_profile *implementation, + int debug_only) +{ + struct flb_aws_credentials *creds = NULL; + time_t expiration = 0; + int ret; + + if (implementation->config_path) { + ret = get_shared_config_credentials(implementation->config_path, + implementation->profile, + &creds, + &expiration, + debug_only); + if (ret < 0) { + goto error; + } + } + + /* + * If we did not find a credential_process in the shared config file, fall back to + * the shared credentials file. + */ + if (!creds) { + if (!implementation->credentials_path) { + AWS_CREDS_ERROR("shared config file contains no credential_process and " + "no shared credentials file was configured"); + goto error; + } + + ret = get_shared_credentials(implementation->credentials_path, + implementation->profile, + &creds, + debug_only); + if (ret < 0) { + goto error; + } + + /* The shared credentials file does not record when the credentials expire. */ + expiration = 0; + } + + /* unset and free existing credentials */ + flb_aws_credentials_destroy(implementation->creds); + implementation->creds = creds; + + if (expiration > 0) { + implementation->next_refresh = expiration - FLB_AWS_REFRESH_WINDOW; + } else { + implementation->next_refresh = 0; + } + + return 0; + +error: + flb_aws_credentials_destroy(creds); + return -1; +} diff --git a/fluent-bit/src/aws/flb_aws_credentials_sts.c b/fluent-bit/src/aws/flb_aws_credentials_sts.c new file mode 100644 index 00000000..d992485c --- /dev/null +++ b/fluent-bit/src/aws/flb_aws_credentials_sts.c @@ -0,0 +1,958 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/flb_http_client.h> +#include <fluent-bit/flb_aws_credentials.h> +#include <fluent-bit/flb_aws_util.h> +#include <fluent-bit/flb_random.h> +#include <fluent-bit/flb_jsmn.h> + +#include <stdlib.h> +#include <time.h> +#include <string.h> + +#define STS_ASSUME_ROLE_URI_FORMAT "/?Version=2011-06-15&Action=%s\ +&RoleSessionName=%s&RoleArn=%s" +#define STS_ASSUME_ROLE_URI_BASE_LEN 54 + +/* + * The STS APIs return an XML document with credentials. + * The part of the document we care about looks like this: + * <Credentials> + * <AccessKeyId>akid</AccessKeyId> + * <SecretAccessKey>skid</SecretAccessKey> + * <SessionToken>token</SessionToken> + * <Expiration>2019-11-09T13:34:41Z</Expiration> + * </Credentials> + */ +#define CREDENTIALS_NODE "<Credentials>" +#define CREDENTIALS_NODE_LEN 13 +#define ACCESS_KEY_NODE "<AccessKeyId>" +#define ACCESS_KEY_NODE_LEN 13 +#define ACCESS_KEY_NODE_END "</AccessKeyId>" +#define SECRET_KEY_NODE "<SecretAccessKey>" +#define SECRET_KEY_NODE_LEN 17 +#define SECRET_KEY_NODE_END "</SecretAccessKey>" +#define SESSION_TOKEN_NODE "<SessionToken>" +#define SESSION_TOKEN_NODE_LEN 14 +#define SESSION_TOKEN_NODE_END "</SessionToken>" +#define EXPIRATION_NODE "<Expiration>" +#define EXPIRATION_NODE_LEN 12 +#define EXPIRATION_NODE_END "</Expiration>" + +#define TOKEN_FILE_ENV_VAR "AWS_WEB_IDENTITY_TOKEN_FILE" +#define ROLE_ARN_ENV_VAR "AWS_ROLE_ARN" +#define SESSION_NAME_ENV_VAR "AWS_ROLE_SESSION_NAME" + +#define SESSION_NAME_RANDOM_BYTE_LEN 32 + +struct flb_aws_provider_eks; +void bytes_to_string(unsigned char *data, char *buf, size_t len); +static int assume_with_web_identity(struct flb_aws_provider_eks + *implementation); +static int sts_assume_role_request(struct flb_aws_client *sts_client, + struct flb_aws_credentials **creds, + char *uri, + time_t *next_refresh); +static flb_sds_t get_node(char *cred_node, char* node_name, int node_name_len, char* node_end); + + +/* + * A provider that uses credentials from the base provider to call STS + * and assume an IAM Role. + */ +struct flb_aws_provider_sts { + int custom_endpoint; + struct flb_aws_provider *base_provider; + + struct flb_aws_credentials *creds; + time_t next_refresh; + + struct flb_aws_client *sts_client; + + /* Fluent Bit uses regional STS endpoints; this is a best practice. */ + char *endpoint; + + flb_sds_t uri; +}; + +struct flb_aws_credentials *get_credentials_fn_sts(struct flb_aws_provider + *provider) +{ + struct flb_aws_credentials *creds; + int refresh = FLB_FALSE; + struct flb_aws_provider_sts *implementation = provider->implementation; + + flb_debug("[aws_credentials] Requesting credentials from the " + "STS provider.."); + + /* a negative next_refresh means that auto-refresh is disabled */ + if (implementation->next_refresh > 0 + && time(NULL) > implementation->next_refresh) { + refresh = FLB_TRUE; + } + if (!implementation->creds || refresh == FLB_TRUE) { + /* credentials need to be refreshed/obtained */ + if (try_lock_provider(provider)) { + flb_debug("[aws_credentials] STS Provider: Refreshing credential " + "cache."); + sts_assume_role_request(implementation->sts_client, + &implementation->creds, + implementation->uri, + &implementation->next_refresh); + unlock_provider(provider); + } + } + + if (!implementation->creds) { + /* + * We failed to lock the provider and creds are unset. This means that + * another co-routine is performing the refresh. + */ + flb_warn("[aws_credentials] No cached credentials are available and " + "a credential refresh is already in progress. The current " + "co-routine will retry."); + + return NULL; + } + + /* return a copy of the existing cached credentials */ + creds = flb_calloc(1, sizeof(struct flb_aws_credentials)); + if (!creds) { + goto error; + } + + creds->access_key_id = flb_sds_create(implementation->creds->access_key_id); + if (!creds->access_key_id) { + goto error; + } + + creds->secret_access_key = flb_sds_create(implementation->creds-> + secret_access_key); + if (!creds->secret_access_key) { + goto error; + } + + if (implementation->creds->session_token) { + creds->session_token = flb_sds_create(implementation->creds-> + session_token); + if (!creds->session_token) { + goto error; + } + + } else { + creds->session_token = NULL; + } + + return creds; + +error: + flb_errno(); + flb_aws_credentials_destroy(creds); + return NULL; +} + +int refresh_fn_sts(struct flb_aws_provider *provider) { + int ret = -1; + struct flb_aws_provider_sts *implementation = provider->implementation; + + flb_debug("[aws_credentials] Refresh called on the STS provider"); + + if (try_lock_provider(provider)) { + ret = sts_assume_role_request(implementation->sts_client, + &implementation->creds, implementation->uri, + &implementation->next_refresh); + unlock_provider(provider); + } + return ret; +} + +int init_fn_sts(struct flb_aws_provider *provider) { + int ret = -1; + struct flb_aws_provider_sts *implementation = provider->implementation; + + flb_debug("[aws_credentials] Init called on the STS provider"); + + /* Call Init on the base provider first */ + implementation->base_provider->provider_vtable-> + init(implementation->base_provider); + + implementation->sts_client->debug_only = FLB_TRUE; + + if (try_lock_provider(provider)) { + ret = sts_assume_role_request(implementation->sts_client, + &implementation->creds, implementation->uri, + &implementation->next_refresh); + unlock_provider(provider); + } + + implementation->sts_client->debug_only = FLB_FALSE; + return ret; +} + +void sync_fn_sts(struct flb_aws_provider *provider) { + struct flb_aws_provider_sts *implementation = provider->implementation; + struct flb_aws_provider *base_provider = implementation->base_provider; + + flb_debug("[aws_credentials] Sync called on the STS provider"); + /* Remove async flag */ + flb_stream_disable_async_mode(&implementation->sts_client->upstream->base); + + /* we also need to call sync on the base_provider */ + base_provider->provider_vtable->sync(base_provider); +} + +void async_fn_sts(struct flb_aws_provider *provider) { + struct flb_aws_provider_sts *implementation = provider->implementation; + struct flb_aws_provider *base_provider = implementation->base_provider; + + flb_debug("[aws_credentials] Async called on the STS provider"); + /* Add async flag */ + flb_stream_enable_async_mode(&implementation->sts_client->upstream->base); + + /* we also need to call async on the base_provider */ + base_provider->provider_vtable->async(base_provider); +} + +void upstream_set_fn_sts(struct flb_aws_provider *provider, + struct flb_output_instance *ins) { + struct flb_aws_provider_sts *implementation = provider->implementation; + struct flb_aws_provider *base_provider = implementation->base_provider; + + flb_debug("[aws_credentials] upstream_set called on the STS provider"); + /* associate output and upstream */ + flb_output_upstream_set(implementation->sts_client->upstream, ins); + + /* we also need to call upstream_set on the base_provider */ + base_provider->provider_vtable->upstream_set(base_provider, ins); +} + +void destroy_fn_sts(struct flb_aws_provider *provider) { + struct flb_aws_provider_sts *implementation; + + implementation = provider->implementation; + + if (implementation) { + if (implementation->creds) { + flb_aws_credentials_destroy(implementation->creds); + } + + if (implementation->sts_client) { + flb_aws_client_destroy(implementation->sts_client); + } + + if (implementation->uri) { + flb_sds_destroy(implementation->uri); + } + + if (implementation->custom_endpoint == FLB_FALSE) { + flb_free(implementation->endpoint); + } + + flb_free(implementation); + provider->implementation = NULL; + } + + return; +} + +static struct flb_aws_provider_vtable sts_provider_vtable = { + .get_credentials = get_credentials_fn_sts, + .init = init_fn_sts, + .refresh = refresh_fn_sts, + .destroy = destroy_fn_sts, + .sync = sync_fn_sts, + .async = async_fn_sts, + .upstream_set = upstream_set_fn_sts, +}; + +struct flb_aws_provider *flb_sts_provider_create(struct flb_config *config, + struct flb_tls *tls, + struct flb_aws_provider + *base_provider, + char *external_id, + char *role_arn, + char *session_name, + char *region, + char *sts_endpoint, + char *proxy, + struct + flb_aws_client_generator + *generator) +{ + struct flb_aws_provider_sts *implementation = NULL; + struct flb_aws_provider *provider = NULL; + struct flb_upstream *upstream = NULL; + + provider = flb_calloc(1, sizeof(struct flb_aws_provider)); + if (!provider) { + flb_errno(); + return NULL; + } + + pthread_mutex_init(&provider->lock, NULL); + + implementation = flb_calloc(1, sizeof(struct flb_aws_provider_sts)); + if (!implementation) { + goto error; + } + + provider->provider_vtable = &sts_provider_vtable; + provider->implementation = implementation; + + implementation->uri = flb_sts_uri("AssumeRole", role_arn, session_name, + external_id, NULL); + if (!implementation->uri) { + goto error; + } + + if (sts_endpoint) { + implementation->endpoint = removeProtocol(sts_endpoint, "https://"); + implementation->custom_endpoint = FLB_TRUE; + } + else { + implementation->endpoint = flb_aws_endpoint("sts", region); + implementation->custom_endpoint = FLB_FALSE; + } + + if(!implementation->endpoint) { + goto error; + } + + implementation->base_provider = base_provider; + implementation->sts_client = generator->create(); + if (!implementation->sts_client) { + goto error; + } + implementation->sts_client->name = "sts_client_assume_role_provider"; + implementation->sts_client->has_auth = FLB_TRUE; + implementation->sts_client->provider = base_provider; + implementation->sts_client->region = region; + implementation->sts_client->service = "sts"; + implementation->sts_client->port = 443; + implementation->sts_client->flags = 0; + implementation->sts_client->proxy = proxy; + + upstream = flb_upstream_create(config, implementation->endpoint, 443, + FLB_IO_TLS, tls); + if (!upstream) { + flb_error("[aws_credentials] Connection initialization error"); + goto error; + } + + upstream->base.net.connect_timeout = FLB_AWS_CREDENTIAL_NET_TIMEOUT; + + implementation->sts_client->upstream = upstream; + implementation->sts_client->host = implementation->endpoint; + + return provider; + +error: + flb_errno(); + flb_aws_provider_destroy(provider); + return NULL; +} + +/* + * A provider that uses OIDC tokens provided by kubernetes to obtain + * AWS credentials. + * + * The AWS SDKs have defined a spec for an OIDC provider that obtains tokens + * from environment variables or the shared config file. + * This provider only contains the functionality needed for EKS- obtaining the + * location of the OIDC token from an environment variable. + */ +struct flb_aws_provider_eks { + int custom_endpoint; + struct flb_aws_credentials *creds; + /* + * Time to auto-refresh creds before they expire. A negative value disables + * auto-refresh. Client code can always force a refresh. + */ + time_t next_refresh; + + struct flb_aws_client *sts_client; + + /* Fluent Bit uses regional STS endpoints; this is a best practice. */ + char *endpoint; + + char *session_name; + /* session name can come from env or be generated by the provider */ + int free_session_name; + char *role_arn; + + char *token_file; +}; + + +struct flb_aws_credentials *get_credentials_fn_eks(struct flb_aws_provider + *provider) +{ + struct flb_aws_credentials *creds = NULL; + int refresh = FLB_FALSE; + struct flb_aws_provider_eks *implementation = provider->implementation; + + flb_debug("[aws_credentials] Requesting credentials from the " + "EKS provider.."); + + /* a negative next_refresh means that auto-refresh is disabled */ + if (implementation->next_refresh > 0 + && time(NULL) > implementation->next_refresh) { + refresh = FLB_TRUE; + } + if (!implementation->creds || refresh == FLB_TRUE) { + if (try_lock_provider(provider)) { + flb_debug("[aws_credentials] EKS Provider: Refreshing credential " + "cache."); + assume_with_web_identity(implementation); + unlock_provider(provider); + } + } + + if (!implementation->creds) { + /* + * We failed to lock the provider and creds are unset. This means that + * another co-routine is performing the refresh. + */ + flb_warn("[aws_credentials] No cached credentials are available and " + "a credential refresh is already in progress. The current " + "co-routine will retry."); + + return NULL; + } + + creds = flb_calloc(1, sizeof(struct flb_aws_credentials)); + if (!creds) { + goto error; + } + + creds->access_key_id = flb_sds_create(implementation->creds->access_key_id); + if (!creds->access_key_id) { + goto error; + } + + creds->secret_access_key = flb_sds_create(implementation->creds-> + secret_access_key); + if (!creds->secret_access_key) { + goto error; + } + + if (implementation->creds->session_token) { + creds->session_token = flb_sds_create(implementation->creds-> + session_token); + if (!creds->session_token) { + goto error; + } + + } + else { + creds->session_token = NULL; + } + + return creds; + +error: + flb_errno(); + flb_aws_credentials_destroy(creds); + return NULL; +} + +int refresh_fn_eks(struct flb_aws_provider *provider) { + int ret = -1; + struct flb_aws_provider_eks *implementation = provider->implementation; + + flb_debug("[aws_credentials] Refresh called on the EKS provider"); + if (try_lock_provider(provider)) { + ret = assume_with_web_identity(implementation); + unlock_provider(provider); + } + return ret; +} + +int init_fn_eks(struct flb_aws_provider *provider) { + int ret = -1; + struct flb_aws_provider_eks *implementation = provider->implementation; + + implementation->sts_client->debug_only = FLB_TRUE; + + flb_debug("[aws_credentials] Init called on the EKS provider"); + if (try_lock_provider(provider)) { + ret = assume_with_web_identity(implementation); + unlock_provider(provider); + } + + implementation->sts_client->debug_only = FLB_FALSE; + return ret; +} + +void sync_fn_eks(struct flb_aws_provider *provider) { + struct flb_aws_provider_eks *implementation = provider->implementation; + flb_debug("[aws_credentials] Sync called on the EKS provider"); + /* remove async flag */ + flb_stream_disable_async_mode(&implementation->sts_client->upstream->base); +} + +void async_fn_eks(struct flb_aws_provider *provider) { + struct flb_aws_provider_eks *implementation = provider->implementation; + flb_debug("[aws_credentials] Async called on the EKS provider"); + /* add async flag */ + flb_stream_enable_async_mode(&implementation->sts_client->upstream->base); +} + +void upstream_set_fn_eks(struct flb_aws_provider *provider, + struct flb_output_instance *ins) { + struct flb_aws_provider_eks *implementation = provider->implementation; + flb_debug("[aws_credentials] upstream_set called on the EKS provider"); + /* set upstream on output */ + flb_output_upstream_set(implementation->sts_client->upstream, ins); +} + +void destroy_fn_eks(struct flb_aws_provider *provider) { + struct flb_aws_provider_eks *implementation = provider-> + implementation; + if (implementation) { + if (implementation->creds) { + flb_aws_credentials_destroy(implementation->creds); + } + + if (implementation->sts_client) { + flb_aws_client_destroy(implementation->sts_client); + } + + if (implementation->custom_endpoint == FLB_FALSE) { + flb_free(implementation->endpoint); + } + if (implementation->free_session_name == FLB_TRUE) { + flb_free(implementation->session_name); + } + + flb_free(implementation); + provider->implementation = NULL; + } + + return; +} + +static struct flb_aws_provider_vtable eks_provider_vtable = { + .get_credentials = get_credentials_fn_eks, + .init = init_fn_eks, + .refresh = refresh_fn_eks, + .destroy = destroy_fn_eks, + .sync = sync_fn_eks, + .async = async_fn_eks, + .upstream_set = upstream_set_fn_eks, +}; + +struct flb_aws_provider *flb_eks_provider_create(struct flb_config *config, + struct flb_tls *tls, + char *region, + char *sts_endpoint, + char *proxy, + struct + flb_aws_client_generator + *generator) +{ + struct flb_aws_provider_eks *implementation = NULL; + struct flb_aws_provider *provider = NULL; + struct flb_upstream *upstream = NULL; + + provider = flb_calloc(1, sizeof(struct flb_aws_provider)); + + if (!provider) { + flb_errno(); + return NULL; + } + + pthread_mutex_init(&provider->lock, NULL); + + implementation = flb_calloc(1, sizeof(struct flb_aws_provider_eks)); + + if (!implementation) { + goto error; + } + + provider->provider_vtable = &eks_provider_vtable; + provider->implementation = implementation; + + /* session name either comes from the env var or is a random uuid */ + implementation->session_name = getenv(SESSION_NAME_ENV_VAR); + implementation->free_session_name = FLB_FALSE; + if (!implementation->session_name || + strlen(implementation->session_name) == 0) { + implementation->session_name = flb_sts_session_name(); + if (!implementation->session_name) { + goto error; + } + implementation->free_session_name = FLB_TRUE; + } + + implementation->role_arn = getenv(ROLE_ARN_ENV_VAR); + if (!implementation->role_arn || strlen(implementation->role_arn) == 0) { + flb_debug("[aws_credentials] Not initializing EKS provider because" + " %s was not set", ROLE_ARN_ENV_VAR); + flb_aws_provider_destroy(provider); + return NULL; + } + + implementation->token_file = getenv(TOKEN_FILE_ENV_VAR); + if (!implementation->token_file || strlen(implementation->token_file) == 0) + { + flb_debug("[aws_credentials] Not initializing EKS provider because" + " %s was not set", TOKEN_FILE_ENV_VAR); + flb_aws_provider_destroy(provider); + return NULL; + } + + if (sts_endpoint) { + implementation->endpoint = removeProtocol(sts_endpoint, "https://"); + implementation->custom_endpoint = FLB_TRUE; + } + else { + implementation->endpoint = flb_aws_endpoint("sts", region); + implementation->custom_endpoint = FLB_FALSE; + } + + if(!implementation->endpoint) { + goto error; + } + + implementation->sts_client = generator->create(); + if (!implementation->sts_client) { + goto error; + } + implementation->sts_client->name = "sts_client_eks_provider"; + /* AssumeRoleWithWebIdentity does not require sigv4 */ + implementation->sts_client->has_auth = FLB_FALSE; + implementation->sts_client->provider = NULL; + implementation->sts_client->region = region; + implementation->sts_client->service = "sts"; + implementation->sts_client->port = 443; + implementation->sts_client->flags = 0; + implementation->sts_client->proxy = proxy; + + upstream = flb_upstream_create(config, implementation->endpoint, 443, + FLB_IO_TLS, tls); + + if (!upstream) { + goto error; + } + + upstream->base.net.connect_timeout = FLB_AWS_CREDENTIAL_NET_TIMEOUT; + + implementation->sts_client->upstream = upstream; + implementation->sts_client->host = implementation->endpoint; + + return provider; + +error: + flb_errno(); + flb_aws_provider_destroy(provider); + return NULL; +} + +/* Generates string which can serve as a unique session name */ +char *flb_sts_session_name() { + unsigned char random_data[SESSION_NAME_RANDOM_BYTE_LEN]; + char *session_name = NULL; + int ret; + + ret = flb_random_bytes(random_data, SESSION_NAME_RANDOM_BYTE_LEN); + + if (ret != 0) { + flb_errno(); + + return NULL; + } + + session_name = flb_calloc(SESSION_NAME_RANDOM_BYTE_LEN + 1, + sizeof(char)); + if (session_name == NULL) { + flb_errno(); + + return NULL; + } + + bytes_to_string(random_data, session_name, SESSION_NAME_RANDOM_BYTE_LEN); + + return session_name; +} + +/* converts random bytes to a string we can safely put in a URL */ +void bytes_to_string(unsigned char *data, char *buf, size_t len) { + int index; + char charset[] = "0123456789" + "abcdefghijklmnopqrstuvwxyz" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + + while (len-- > 0) { + index = (int) data[len]; + index = index % (sizeof(charset) - 1); + buf[len] = charset[index]; + } +} + +static int assume_with_web_identity(struct flb_aws_provider_eks + *implementation) +{ + int ret; + char *web_token = NULL; + size_t web_token_size; + flb_sds_t uri = NULL; + int init_mode = implementation->sts_client->debug_only; + + ret = flb_read_file(implementation->token_file, &web_token, + &web_token_size); + if (ret < 0) { + if (init_mode == FLB_TRUE) { + flb_debug("[aws_credentials] Could not read web identify token file"); + } else { + flb_error("[aws_credentials] Could not read web identify token file"); + } + return -1; + } + + uri = flb_sts_uri("AssumeRoleWithWebIdentity", implementation->role_arn, + implementation->session_name, NULL, web_token); + if (!uri) { + flb_free(web_token); + return -1; + } + + ret = sts_assume_role_request(implementation->sts_client, + &implementation->creds, uri, + &implementation->next_refresh); + flb_free(web_token); + flb_sds_destroy(uri); + return ret; +} + +static int sts_assume_role_request(struct flb_aws_client *sts_client, + struct flb_aws_credentials **creds, + char *uri, + time_t *next_refresh) +{ + time_t expiration; + struct flb_aws_credentials *credentials = NULL; + struct flb_http_client *c = NULL; + flb_sds_t error_type; + int init_mode = sts_client->debug_only; + + flb_debug("[aws_credentials] Calling STS.."); + + c = sts_client->client_vtable->request(sts_client, FLB_HTTP_GET, + uri, NULL, 0, NULL, 0); + + if (c && c->resp.status == 200) { + credentials = flb_parse_sts_resp(c->resp.payload, &expiration); + if (!credentials) { + if (init_mode == FLB_TRUE) { + flb_debug("[aws_credentials] Failed to parse response from STS"); + } + else { + flb_error("[aws_credentials] Failed to parse response from STS"); + } + flb_http_client_destroy(c); + return -1; + } + + /* unset and free existing credentials first */ + flb_aws_credentials_destroy(*creds); + *creds = NULL; + + *next_refresh = expiration - FLB_AWS_REFRESH_WINDOW; + *creds = credentials; + flb_http_client_destroy(c); + return 0; + } + + if (c && c->resp.payload_size > 0) { + error_type = flb_aws_error(c->resp.payload, c->resp.payload_size); + if (error_type) { + if (init_mode == FLB_TRUE) { + flb_debug("[aws_credentials] STS API responded with %s", error_type); + } + else { + flb_error("[aws_credentials] STS API responded with %s", error_type); + } + } else { + flb_debug("[aws_credentials] STS raw response: \n%s", + c->resp.payload); + } + } + + if (c) { + flb_http_client_destroy(c); + } + if (init_mode == FLB_TRUE) { + flb_debug("[aws_credentials] STS assume role request failed"); + } + else { + flb_error("[aws_credentials] STS assume role request failed"); + } + return -1; + +} + +/* + * The STS APIs return an XML document with credentials. + * The part of the document we care about looks like this: + * <Credentials> + * <AccessKeyId>akid</AccessKeyId> + * <SecretAccessKey>skid</SecretAccessKey> + * <SessionToken>token</SessionToken> + * <Expiration>2019-11-09T13:34:41Z</Expiration> + * </Credentials> + */ +struct flb_aws_credentials *flb_parse_sts_resp(char *response, + time_t *expiration) +{ + struct flb_aws_credentials *creds = NULL; + char *cred_node = NULL; + flb_sds_t tmp = NULL; + + cred_node = strstr(response, CREDENTIALS_NODE); + if (!cred_node) { + flb_error("[aws_credentials] Could not find '%s' node in sts response", + CREDENTIALS_NODE); + return NULL; + } + cred_node += CREDENTIALS_NODE_LEN; + + creds = flb_calloc(1, sizeof(struct flb_aws_credentials)); + if (!creds) { + flb_errno(); + return NULL; + } + + creds->access_key_id = get_node(cred_node, ACCESS_KEY_NODE, + ACCESS_KEY_NODE_LEN, ACCESS_KEY_NODE_END); + if (!creds->access_key_id) { + goto error; + } + + creds->secret_access_key = get_node(cred_node, SECRET_KEY_NODE, + SECRET_KEY_NODE_LEN, SECRET_KEY_NODE_END); + if (!creds->secret_access_key) { + goto error; + } + + creds->session_token = get_node(cred_node, SESSION_TOKEN_NODE, + SESSION_TOKEN_NODE_LEN, SESSION_TOKEN_NODE_END); + if (!creds->session_token) { + goto error; + } + + tmp = get_node(cred_node, EXPIRATION_NODE, EXPIRATION_NODE_LEN, EXPIRATION_NODE_END); + if (!tmp) { + goto error; + } + *expiration = flb_aws_cred_expiration(tmp); + + flb_sds_destroy(tmp); + return creds; + +error: + flb_aws_credentials_destroy(creds); + if (tmp) { + flb_sds_destroy(tmp); + } + return NULL; +} + +/* + * Constructs the STS request uri. + * external_id can be NULL. + */ +flb_sds_t flb_sts_uri(char *action, char *role_arn, char *session_name, + char *external_id, char *identity_token) +{ + flb_sds_t tmp; + flb_sds_t uri = NULL; + size_t len = STS_ASSUME_ROLE_URI_BASE_LEN; + + if (external_id) { + len += 12; /* will add "&ExternalId=" */ + len += strlen(external_id); + } + + if (identity_token) { + len += 18; /* will add "&WebIdentityToken=" */ + len += strlen(identity_token); + } + + + len += strlen(session_name); + len += strlen(role_arn); + len += strlen(action); + len++; /* null char */ + + uri = flb_sds_create_size(len); + if (!uri) { + return NULL; + } + + tmp = flb_sds_printf(&uri, STS_ASSUME_ROLE_URI_FORMAT, action, session_name, + role_arn); + if (!tmp) { + flb_sds_destroy(uri); + return NULL; + } + + if (external_id) { + flb_sds_printf(&uri, "&ExternalId=%s", external_id); + } + + if (identity_token) { + flb_sds_printf(&uri, "&WebIdentityToken=%s", identity_token); + } + + return uri; +} + +static flb_sds_t get_node(char *cred_node, char* node_name, int node_name_len, char* node_end) +{ + char *node = NULL; + char *end = NULL; + flb_sds_t val = NULL; + int len; + + node = strstr(cred_node, node_name); + if (!node) { + flb_error("[aws_credentials] Could not find '%s' node in sts response", + node_name); + return NULL; + } + node += node_name_len; + end = strstr(node, node_end); + if (!end) { + flb_error("[aws_credentials] Could not find end of '%s' node in " + "sts response", node_name); + return NULL; + } + len = end - node; + val = flb_sds_create_len(node, len); + if (!val) { + flb_errno(); + return NULL; + } + + return val; +} diff --git a/fluent-bit/src/aws/flb_aws_error_reporter.c b/fluent-bit/src/aws/flb_aws_error_reporter.c new file mode 100644 index 00000000..72da9666 --- /dev/null +++ b/fluent-bit/src/aws/flb_aws_error_reporter.c @@ -0,0 +1,276 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <stdio.h> +#include <stdlib.h> +#include <time.h> +#include <monkey/mk_core/mk_list.h> + +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/flb_log.h> +#include <fluent-bit/flb_env.h> +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/aws/flb_aws_error_reporter.h> + +/* helper function to get int type environment variable*/ +static int getenv_int(const char *name) { + char *value, *end; + long result; + + value = getenv(name); + if (!value) { + return 0; + } + + result = strtol(value, &end, 10); + if (*end != '\0') { + return 0; + } + return (int) result; +} + +/* create an error reporter*/ +struct flb_aws_error_reporter *flb_aws_error_reporter_create() +{ + char *path_var = NULL; + int ttl_var, status_message_length; + struct flb_aws_error_reporter *error_reporter; + FILE *f; + int ret; + + error_reporter = flb_calloc(1, sizeof(struct flb_aws_error_reporter)); + if (!error_reporter) { + flb_errno(); + return NULL; + } + + /* setup error report file path */ + path_var = getenv(STATUS_MESSAGE_FILE_PATH_ENV); + if (path_var == NULL) { + flb_free(error_reporter); + flb_errno(); + return NULL; + } + + error_reporter->file_path = flb_sds_create(path_var); + if (!error_reporter->file_path) { + flb_free(error_reporter); + flb_errno(); + return NULL; + } + + /* clean up existing file*/ + if ((f = fopen(error_reporter->file_path, "r")) != NULL) { + /* file exist, try delete it*/ + if (remove(error_reporter->file_path)) { + flb_free(error_reporter); + flb_errno(); + return NULL; + } + } + + /* setup error reporter message TTL */ + ttl_var = getenv_int(STATUS_MESSAGE_TTL_ENV); + if (ttl_var <= 0) { + ttl_var = STATUS_MESSAGE_TTL_DEFAULT; + } + error_reporter->ttl = ttl_var; + + /* setup error reporter file size */ + status_message_length = getenv_int(STATUS_MESSAGE_MAX_BYTE_LENGTH_ENV); + if(status_message_length <= 0) { + status_message_length = STATUS_MESSAGE_MAX_BYTE_LENGTH_DEFAULT; + } + error_reporter->max_size = status_message_length; + + /* create the message Linked Lists */ + mk_list_init(&error_reporter->messages); + + return error_reporter; +} + +/* error reporter write the error message into reporting file and memory*/ +int flb_aws_error_reporter_write(struct flb_aws_error_reporter *error_reporter, char *msg) +{ + struct mk_list *tmp; + struct mk_list *head; + struct flb_error_message *message; + struct flb_error_message *tmp_message; + flb_sds_t buf; + flb_sds_t buf_tmp; + int deleted_message_count = 0; + FILE *f; + + if (error_reporter == NULL) { + return -1; + } + + buf = flb_sds_create(msg); + if (!buf) { + flb_errno(); + return -1; + } + /* check if the message is the same with latest one in queue*/ + if (mk_list_is_empty(&error_reporter->messages) != 0) { + tmp_message = mk_list_entry_last(&error_reporter->messages, + struct flb_error_message, _head); + if (tmp_message->len == flb_sds_len(buf) && + flb_sds_cmp(tmp_message->data, buf, tmp_message->len) == 0) { + + tmp_message->timestamp = time(NULL); + flb_sds_destroy(buf); + return 0; + } + } + + message = flb_malloc(sizeof(struct flb_error_message)); + if (!message) { + flb_sds_destroy(buf); + flb_errno(); + return -1; + } + + /* check if new message is too large and truncate*/ + if (flb_sds_len(buf) > error_reporter->max_size) { + // truncate message + buf_tmp = flb_sds_copy(buf, msg, error_reporter->max_size); + if (!buf_tmp) { + flb_sds_destroy(buf); + flb_free(message); + return -1; + } + } + + message->data = flb_sds_create(buf); + if (!message->data) { + flb_sds_destroy(buf); + flb_free(message); + return -1; + } + + message->len = flb_sds_len(buf); + + /* clean up old message to provide enough space for new message*/ + mk_list_foreach_safe(head, tmp, &error_reporter->messages) { + tmp_message = mk_list_entry(head, struct flb_error_message, _head); + if (error_reporter->file_size + flb_sds_len(buf) <= error_reporter->max_size) { + break; + } + else { + error_reporter->file_size -= tmp_message->len; + deleted_message_count++; + mk_list_del(&tmp_message->_head); + flb_sds_destroy(tmp_message->data); + flb_free(tmp_message); + } + } + message->timestamp = time(NULL); + + mk_list_add(&message->_head, &error_reporter->messages); + error_reporter->file_size += message->len; + + if (deleted_message_count == 0) { + f = fopen(error_reporter->file_path, "a"); + fprintf(f, message->data); + } + else { + f = fopen(error_reporter->file_path, "w"); + mk_list_foreach_safe(head, tmp, &error_reporter->messages) { + tmp_message = mk_list_entry(head, struct flb_error_message, _head); + fprintf(f, tmp_message->data); + } + } + fclose(f); + + flb_sds_destroy(buf); + + return 0; + +} + +/* error reporter clean up the expired message based on TTL*/ +void flb_aws_error_reporter_clean(struct flb_aws_error_reporter *error_reporter) +{ + struct mk_list *tmp; + struct mk_list *head; + struct flb_error_message *message; + int expired_message_count = 0; + FILE *f; + + if (error_reporter == NULL) { + return; + } + + /* check the timestamp for every message and clean up expired messages*/ + mk_list_foreach_safe(head, tmp, &error_reporter->messages) { + message = mk_list_entry(head, struct flb_error_message, _head); + if (error_reporter->ttl > time(NULL) - message->timestamp) { + break; + } + error_reporter->file_size -= message->len; + mk_list_del(&message->_head); + flb_sds_destroy(message->data); + flb_free(message); + expired_message_count++; + } + + /* rewrite error report file if any message is cleaned up*/ + if (expired_message_count > 0) { + f = fopen(error_reporter->file_path, "w"); + mk_list_foreach_safe(head, tmp, &error_reporter->messages) { + message = mk_list_entry(head, struct flb_error_message, _head); + fprintf(f, message->data); + } + fclose(f); + } +} + +/* error reporter clean up when fluent bit shutdown*/ +void flb_aws_error_reporter_destroy(struct flb_aws_error_reporter *error_reporter) +{ + struct mk_list *tmp; + struct mk_list *head; + struct flb_error_message *message; + + if (error_reporter == NULL) { + return; + } + + if(error_reporter->file_path) { + flb_sds_destroy(error_reporter->file_path); + } + if (mk_list_is_empty(&error_reporter->messages) != 0) { + + mk_list_foreach_safe(head, tmp, &error_reporter->messages) { + message = mk_list_entry(head, struct flb_error_message, _head); + mk_list_del(&message->_head); + flb_sds_destroy(message->data); + flb_free(message); + } + mk_list_del(&error_reporter->messages); + } + + flb_free(error_reporter); +} + +/*check if system enable error reporting*/ +int is_error_reporting_enabled() +{ + return getenv(STATUS_MESSAGE_FILE_PATH_ENV) != NULL; +}
\ No newline at end of file diff --git a/fluent-bit/src/aws/flb_aws_imds.c b/fluent-bit/src/aws/flb_aws_imds.c new file mode 100644 index 00000000..0e54db16 --- /dev/null +++ b/fluent-bit/src/aws/flb_aws_imds.c @@ -0,0 +1,370 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/aws/flb_aws_imds.h> +#include <fluent-bit/flb_aws_credentials.h> +#include <fluent-bit/flb_aws_util.h> +#include <fluent-bit/flb_http_client.h> +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_jsmn.h> + +#define FLB_AWS_IMDS_ROOT "/" +#define FLB_AWS_IMDS_V2_TOKEN_PATH "/latest/api/token" + +/* Request headers */ +static struct flb_aws_header imds_v2_token_ttl_header = { + .key = "X-aws-ec2-metadata-token-ttl-seconds", + .key_len = 36, + .val = "21600", /* 6 hours (ie maximum ttl) */ + .val_len = 5, +}; + +/* Request header templates */ +const static struct flb_aws_header imds_v2_token_token_header_template = { + .key = "X-aws-ec2-metadata-token", + .key_len = 24, + .val = "", /* Replace with token value */ + .val_len = 0, /* Replace with token length */ +}; + +/* Declarations */ +static int get_imds_version(struct flb_aws_imds *ctx); +static int refresh_imds_v2_token(struct flb_aws_imds *ctx); + +/* Default config values */ +const struct flb_aws_imds_config flb_aws_imds_config_default = { + FLB_AWS_IMDS_VERSION_EVALUATE}; + +/* Create IMDS context */ +struct flb_aws_imds *flb_aws_imds_create(const struct flb_aws_imds_config *imds_config, + struct flb_aws_client *ec2_imds_client) +{ + struct flb_aws_imds *ctx = NULL; + + /* Create context */ + ctx = flb_calloc(1, sizeof(struct flb_aws_imds)); + if (!ctx) { + flb_errno(); + return NULL; + } + + /* + * Set IMDS version to whatever is specified in config + * Version may be evaluated later if set to FLB_AWS_IMDS_VERSION_EVALUATE + */ + ctx->imds_version = imds_config->use_imds_version; + ctx->imds_v2_token = flb_sds_create_len("INVALID_TOKEN", 13); + ctx->imds_v2_token_len = 13; + + /* Detect IMDS support */ + if (!ec2_imds_client->upstream) { + flb_debug( + "[imds] unable to connect to EC2 IMDS. ec2_imds_client upstream is null"); + + flb_aws_imds_destroy(ctx); + return NULL; + } + if (0 != strncmp(ec2_imds_client->upstream->tcp_host, FLB_AWS_IMDS_HOST, + FLB_AWS_IMDS_HOST_LEN)) { + flb_debug("[imds] ec2_imds_client tcp host must be set to %s", FLB_AWS_IMDS_HOST); + flb_aws_imds_destroy(ctx); + return NULL; + } + if (ec2_imds_client->upstream->tcp_port != FLB_AWS_IMDS_PORT) { + flb_debug("[imds] ec2_imds_client tcp port must be set to %i", FLB_AWS_IMDS_PORT); + flb_aws_imds_destroy(ctx); + return NULL; + } + + /* Connect client */ + ctx->ec2_imds_client = ec2_imds_client; + return ctx; +} + +/* Destroy IMDS context */ +void flb_aws_imds_destroy(struct flb_aws_imds *ctx) +{ + if (ctx->imds_v2_token) { + flb_sds_destroy(ctx->imds_v2_token); + } + + flb_free(ctx); +} + +/* Get IMDS metadata */ +int flb_aws_imds_request(struct flb_aws_imds *ctx, const char *metadata_path, + flb_sds_t *metadata, size_t *metadata_len) +{ + return flb_aws_imds_request_by_key(ctx, metadata_path, metadata, metadata_len, NULL); +} + +/* Get IMDS metadata by key */ +int flb_aws_imds_request_by_key(struct flb_aws_imds *ctx, const char *metadata_path, + flb_sds_t *metadata, size_t *metadata_len, char *key) +{ + int ret; + flb_sds_t tmp; + + struct flb_http_client *c = NULL; + + struct flb_aws_client *ec2_imds_client = ctx->ec2_imds_client; + struct flb_aws_header token_header = imds_v2_token_token_header_template; + + /* Get IMDS version */ + int imds_version = get_imds_version(ctx); + + /* Abort on version detection failure */ + if (imds_version == FLB_AWS_IMDS_VERSION_EVALUATE) { + /* Exit gracefully allowing for retrys */ + flb_warn("[imds] unable to evaluate IMDS version"); + return -1; + } + + if (imds_version == FLB_AWS_IMDS_VERSION_2) { + token_header.val = ctx->imds_v2_token; + token_header.val_len = ctx->imds_v2_token_len; + flb_debug("[imds] using IMDSv2"); + } + else { + flb_debug("[imds] using IMDSv1"); + } + + c = ec2_imds_client->client_vtable->request( + ec2_imds_client, FLB_HTTP_GET, metadata_path, NULL, 0, &token_header, + (imds_version == FLB_AWS_IMDS_VERSION_1) ? 0 : 1); + if (!c) { + /* Exit gracefully allowing for retrys */ + flb_warn("[imds] failed to retrieve metadata"); + return -1; + } + + /* Detect invalid token */ + if (imds_version == FLB_AWS_IMDS_VERSION_2 && c->resp.status == 401) { + /* Refresh token and retry request */ + flb_http_client_destroy(c); + ret = refresh_imds_v2_token(ctx); + if (ret < 0) { + flb_debug("[imds] failed to refresh IMDSv2 token"); + return -1; + } + token_header.val = ctx->imds_v2_token; + token_header.val_len = ctx->imds_v2_token_len; + flb_debug("[imds] refreshed IMDSv2 token"); + c = ec2_imds_client->client_vtable->request( + ec2_imds_client, FLB_HTTP_GET, metadata_path, NULL, 0, &token_header, 1); + if (!c) { + /* Exit gracefully allowing for retries */ + flb_warn("[imds] failed to retrieve metadata"); + return -1; + } + } + + if (c->resp.status != 200) { + ret = -1; + if (c->resp.status == 404) { + ret = -2; + } + if (c->resp.payload_size > 0) { + flb_debug("[imds] metadata request failure response\n%s", c->resp.payload); + } + flb_http_client_destroy(c); + return ret; + } + + if (key != NULL) { + /* get the value of the key from payload json string */ + tmp = flb_json_get_val(c->resp.payload, c->resp.payload_size, key); + if (!tmp) { + tmp = flb_sds_create_len("NULL", 4); + flb_error("[imds] %s is undefined in EC2 instance", key); + } + } + else { + tmp = flb_sds_create_len(c->resp.payload, c->resp.payload_size); + } + + if (!tmp) { + flb_errno(); + flb_http_client_destroy(c); + return -1; + } + + *metadata = tmp; + *metadata_len = key == NULL ? c->resp.payload_size : strlen(tmp); + + flb_http_client_destroy(c); + return 0; +} + +/* Get VPC Id */ +flb_sds_t flb_aws_imds_get_vpc_id(struct flb_aws_imds *ctx) +{ + int ret; + flb_sds_t mac_id = NULL; + size_t mac_len = 0; + flb_sds_t vpc_id = NULL; + size_t vpc_id_len = 0; + + /* get EC2 instance Mac id first before getting VPC id */ + ret = flb_aws_imds_request(ctx, FLB_AWS_IMDS_MAC_PATH, &mac_id, &mac_len); + + if (ret < 0) { + flb_sds_destroy(mac_id); + return NULL; + } + + /* + * the VPC full path should be like: + * latest/meta-data/network/interfaces/macs/{mac_id}/vpc-id/" + */ + flb_sds_t vpc_path = flb_sds_create_size(70); + vpc_path = + flb_sds_printf(&vpc_path, "%s/%s/%s/", + "/latest/meta-data/network/interfaces/macs", mac_id, "vpc-id"); + ret = flb_aws_imds_request(ctx, vpc_path, &vpc_id, &vpc_id_len); + + flb_sds_destroy(mac_id); + flb_sds_destroy(vpc_path); + + return vpc_id; +} + +/* Obtain the IMDS version */ +static int get_imds_version(struct flb_aws_imds *ctx) +{ + int ret; + struct flb_aws_client *client = ctx->ec2_imds_client; + struct flb_aws_header invalid_token_header; + struct flb_http_client *c = NULL; + + if (ctx->imds_version != FLB_AWS_IMDS_VERSION_EVALUATE) { + return ctx->imds_version; + } + + /* + * Evaluate version + * To evaluate wether IMDSv2 is available, send an invalid token + * in IMDS request. If response status is 'Unauthorized', then IMDSv2 + * is available. + */ + invalid_token_header = imds_v2_token_token_header_template; + invalid_token_header.val = "INVALID"; + invalid_token_header.val_len = 7; + c = client->client_vtable->request(client, FLB_HTTP_GET, FLB_AWS_IMDS_ROOT, NULL, 0, + &invalid_token_header, 1); + + if (!c) { + flb_debug("[imds] imds endpoint unavailable"); + return FLB_AWS_IMDS_VERSION_EVALUATE; + } + + /* Unauthorized response means that IMDS version 2 is in use */ + if (c->resp.status == 401) { + ctx->imds_version = FLB_AWS_IMDS_VERSION_2; + ret = refresh_imds_v2_token(ctx); + if (ret == -1) { + /* + * Token cannot be refreshed, test IMDSv1 + * If IMDSv1 cannot be used, response will be status 401 + */ + flb_http_client_destroy(c); + ctx->imds_version = FLB_AWS_IMDS_VERSION_EVALUATE; + c = client->client_vtable->request(client, FLB_HTTP_GET, FLB_AWS_IMDS_ROOT, + NULL, 0, NULL, 0); + if (!c) { + flb_debug("[imds] imds v1 attempt, endpoint unavailable"); + return FLB_AWS_IMDS_VERSION_EVALUATE; + } + + if (c->resp.status == 200) { + flb_info("[imds] to use IMDSv2, set --http-put-response-hop-limit to 2"); + } + else { + /* IMDSv1 unavailable. IMDSv2 beyond network hop count */ + flb_warn("[imds] failed to retrieve IMDSv2 token and IMDSv1 unavailable. " + "This is likely due to instance-metadata-options " + "--http-put-response-hop-limit being set to 1 and --http-tokens " + "set to required. " + "To use IMDSv2, please set --http-put-response-hop-limit to 2 as " + "described https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/" + "configuring-instance-metadata-options.html"); + } + } + } + + /* + * Success means that IMDS version 1 is in use + */ + if (c->resp.status == 200) { + flb_warn("[imds] falling back on IMDSv1"); + ctx->imds_version = FLB_AWS_IMDS_VERSION_1; + } + + flb_http_client_destroy(c); + return ctx->imds_version; +} + +/* + * Get an IMDSv2 token + * Token preserved in imds context + */ +static int refresh_imds_v2_token(struct flb_aws_imds *ctx) +{ + struct flb_http_client *c = NULL; + struct flb_aws_client *ec2_imds_client = ctx->ec2_imds_client; + + c = ec2_imds_client->client_vtable->request(ec2_imds_client, FLB_HTTP_PUT, + FLB_AWS_IMDS_V2_TOKEN_PATH, NULL, 0, + &imds_v2_token_ttl_header, 1); + + if (!c) { + return -1; + } + + if (c->resp.status != 200) { + if (c->resp.payload_size > 0) { + flb_error("[imds] IMDSv2 token retrieval failure response\n%s", + c->resp.payload); + } + + flb_http_client_destroy(c); + return -1; + } + + /* Preserve token information in ctx */ + if (c->resp.payload_size > 0) { + if (ctx->imds_v2_token) { + flb_sds_destroy(ctx->imds_v2_token); + } + ctx->imds_v2_token = flb_sds_create_len(c->resp.payload, c->resp.payload_size); + if (!ctx->imds_v2_token) { + flb_errno(); + flb_http_client_destroy(c); + return -1; + } + ctx->imds_v2_token_len = c->resp.payload_size; + + flb_http_client_destroy(c); + return 0; + } + + flb_debug("[imds] IMDS metadata response was empty"); + flb_http_client_destroy(c); + return -1; +} diff --git a/fluent-bit/src/aws/flb_aws_util.c b/fluent-bit/src/aws/flb_aws_util.c new file mode 100644 index 00000000..533bba7e --- /dev/null +++ b/fluent-bit/src/aws/flb_aws_util.c @@ -0,0 +1,1047 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_sds.h> +#include <fluent-bit/flb_http_client.h> +#include <fluent-bit/flb_signv4.h> +#include <fluent-bit/flb_aws_util.h> +#include <fluent-bit/flb_aws_credentials.h> +#include <fluent-bit/flb_output_plugin.h> +#include <fluent-bit/flb_jsmn.h> +#include <fluent-bit/flb_env.h> + +#include <stdlib.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> + +#define AWS_SERVICE_ENDPOINT_FORMAT "%s.%s.amazonaws.com" +#define AWS_SERVICE_ENDPOINT_BASE_LEN 15 + +#define TAG_PART_DESCRIPTOR "$TAG[%d]" +#define TAG_DESCRIPTOR "$TAG" +#define MAX_TAG_PARTS 10 +#define S3_KEY_SIZE 1024 +#define RANDOM_STRING "$UUID" +#define INDEX_STRING "$INDEX" +#define AWS_USER_AGENT_NONE "none" +#define AWS_USER_AGENT_ECS "ecs" +#define AWS_USER_AGENT_K8S "k8s" +#define AWS_ECS_METADATA_URI "ECS_CONTAINER_METADATA_URI_V4" +#define FLB_MAX_AWS_RESP_BUFFER_SIZE 0 /* 0 means unlimited capacity as per requirement */ + +#ifdef FLB_SYSTEM_WINDOWS +#define FLB_AWS_BASE_USER_AGENT "aws-fluent-bit-plugin-windows" +#define FLB_AWS_BASE_USER_AGENT_FORMAT "aws-fluent-bit-plugin-windows-%s" +#define FLB_AWS_BASE_USER_AGENT_LEN 29 +#else +#define FLB_AWS_BASE_USER_AGENT "aws-fluent-bit-plugin" +#define FLB_AWS_BASE_USER_AGENT_FORMAT "aws-fluent-bit-plugin-%s" +#define FLB_AWS_BASE_USER_AGENT_LEN 21 +#endif + +#define FLB_AWS_MILLISECOND_FORMATTER_LENGTH 3 +#define FLB_AWS_NANOSECOND_FORMATTER_LENGTH 9 +#define FLB_AWS_MILLISECOND_FORMATTER "%3N" +#define FLB_AWS_NANOSECOND_FORMATTER_N "%9N" +#define FLB_AWS_NANOSECOND_FORMATTER_L "%L" + +struct flb_http_client *request_do(struct flb_aws_client *aws_client, + int method, const char *uri, + const char *body, size_t body_len, + struct flb_aws_header *dynamic_headers, + size_t dynamic_headers_len); + +/* + * https://service.region.amazonaws.com(.cn) + */ +char *flb_aws_endpoint(char* service, char* region) +{ + char *endpoint = NULL; + size_t len = AWS_SERVICE_ENDPOINT_BASE_LEN; + int is_cn = FLB_FALSE; + int bytes; + + + /* In the China regions, ".cn" is appended to the URL */ + if (strcmp("cn-north-1", region) == 0) { + len += 3; + is_cn = FLB_TRUE; + } + if (strcmp("cn-northwest-1", region) == 0) { + len += 3; + is_cn = FLB_TRUE; + } + + len += strlen(service); + len += strlen(region); + len++; /* null byte */ + + endpoint = flb_calloc(len, sizeof(char)); + if (!endpoint) { + flb_errno(); + return NULL; + } + + bytes = snprintf(endpoint, len, AWS_SERVICE_ENDPOINT_FORMAT, service, region); + if (bytes < 0) { + flb_errno(); + flb_free(endpoint); + return NULL; + } + + if (is_cn) { + memcpy(endpoint + bytes, ".cn", 3); + endpoint[bytes + 3] = '\0'; + } + + return endpoint; + +} + +int flb_read_file(const char *path, char **out_buf, size_t *out_size) +{ + int ret; + long bytes; + char *buf = NULL; + struct stat st; + int fd; + + fd = open(path, O_RDONLY); + if (fd < 0) { + return -1; + } + + ret = fstat(fd, &st); + if (ret == -1) { + flb_errno(); + close(fd); + return -1; + } + + buf = flb_calloc(st.st_size + 1, sizeof(char)); + if (!buf) { + flb_errno(); + close(fd); + return -1; + } + + bytes = read(fd, buf, st.st_size); + if (bytes < 0) { + flb_errno(); + flb_free(buf); + close(fd); + return -1; + } + + /* fread does not add null byte */ + buf[st.st_size] = '\0'; + + close(fd); + *out_buf = buf; + *out_size = st.st_size; + + return 0; +} + + +char *removeProtocol (char *endpoint, char *protocol) { + if (strncmp(protocol, endpoint, strlen(protocol)) == 0){ + endpoint = endpoint + strlen(protocol); + } + return endpoint; +} + +struct flb_http_client *flb_aws_client_request(struct flb_aws_client *aws_client, + int method, const char *uri, + const char *body, size_t body_len, + struct flb_aws_header + *dynamic_headers, + size_t dynamic_headers_len) +{ + struct flb_http_client *c = NULL; + + c = request_do(aws_client, method, uri, body, body_len, + dynamic_headers, dynamic_headers_len); + + // Auto retry if request fails + if (c == NULL && aws_client->retry_requests) { + flb_debug("[aws_client] auto-retrying"); + c = request_do(aws_client, method, uri, body, body_len, + dynamic_headers, dynamic_headers_len); + } + + /* + * 400 or 403 could indicate an issue with credentials- so we check for auth + * specific error messages and then force a refresh on the provider. + * For safety a refresh can be performed only once + * per FLB_AWS_CREDENTIAL_REFRESH_LIMIT. + * + */ + if (c && (c->resp.status >= 400 && c->resp.status < 500)) { + if (aws_client->has_auth && time(NULL) > aws_client->refresh_limit) { + if (flb_aws_is_auth_error(c->resp.payload, c->resp.payload_size) + == FLB_TRUE) { + flb_info("[aws_client] auth error, refreshing creds"); + aws_client->refresh_limit = time(NULL) + + FLB_AWS_CREDENTIAL_REFRESH_LIMIT; + aws_client->provider->provider_vtable-> + refresh(aws_client->provider); + } + } + } + + return c; +} + +static struct flb_aws_client_vtable client_vtable = { + .request = flb_aws_client_request, +}; + +struct flb_aws_client *flb_aws_client_create() +{ + struct flb_aws_client *client = flb_calloc(1, sizeof(struct flb_aws_client)); + if (!client) { + flb_errno(); + return NULL; + } + client->client_vtable = &client_vtable; + client->retry_requests = FLB_FALSE; + client->debug_only = FLB_FALSE; + return client; +} + +/* Generator that returns clients with the default vtable */ + +static struct flb_aws_client_generator default_generator = { + .create = flb_aws_client_create, +}; + +struct flb_aws_client_generator *flb_aws_client_generator() +{ + return &default_generator; +} + +void flb_aws_client_destroy(struct flb_aws_client *aws_client) +{ + if (aws_client) { + if (aws_client->upstream) { + flb_upstream_destroy(aws_client->upstream); + } + if (aws_client->extra_user_agent) { + flb_sds_destroy(aws_client->extra_user_agent); + } + flb_free(aws_client); + } +} + +int flb_aws_is_auth_error(char *payload, size_t payload_size) +{ + flb_sds_t error = NULL; + + if (payload_size == 0) { + return FLB_FALSE; + } + + /* Fluent Bit calls the STS API which returns XML */ + if (strcasestr(payload, "InvalidClientTokenId") != NULL) { + return FLB_TRUE; + } + + if (strcasestr(payload, "AccessDenied") != NULL) { + return FLB_TRUE; + } + + if (strcasestr(payload, "Expired") != NULL) { + return FLB_TRUE; + } + + /* Most APIs we use return JSON */ + error = flb_aws_error(payload, payload_size); + if (error != NULL) { + if (strcmp(error, "ExpiredToken") == 0 || + strcmp(error, "ExpiredTokenException") == 0 || + strcmp(error, "AccessDeniedException") == 0 || + strcmp(error, "AccessDenied") == 0 || + strcmp(error, "IncompleteSignature") == 0 || + strcmp(error, "SignatureDoesNotMatch") == 0 || + strcmp(error, "MissingAuthenticationToken") == 0 || + strcmp(error, "InvalidClientTokenId") == 0 || + strcmp(error, "InvalidToken") == 0 || + strcmp(error, "InvalidAccessKeyId") == 0 || + strcmp(error, "UnrecognizedClientException") == 0) { + flb_sds_destroy(error); + return FLB_TRUE; + } + flb_sds_destroy(error); + } + + return FLB_FALSE; +} + +struct flb_http_client *request_do(struct flb_aws_client *aws_client, + int method, const char *uri, + const char *body, size_t body_len, + struct flb_aws_header *dynamic_headers, + size_t dynamic_headers_len) +{ + size_t b_sent; + int ret; + struct flb_connection *u_conn = NULL; + flb_sds_t signature = NULL; + int i; + int normalize_uri; + struct flb_aws_header header; + struct flb_http_client *c = NULL; + flb_sds_t tmp; + flb_sds_t user_agent_prefix; + flb_sds_t user_agent = NULL; + char *buf; + struct flb_env *env; + + u_conn = flb_upstream_conn_get(aws_client->upstream); + if (!u_conn) { + if (aws_client->debug_only == FLB_TRUE) { + flb_debug("[aws_client] connection initialization error"); + } + else { + flb_error("[aws_client] connection initialization error"); + } + return NULL; + } + + /* Compose HTTP request */ + c = flb_http_client(u_conn, method, uri, + body, body_len, + aws_client->host, aws_client->port, + aws_client->proxy, aws_client->flags); + + if (!c) { + if (aws_client->debug_only == FLB_TRUE) { + flb_debug("[aws_client] could not initialize request"); + } + else { + flb_error("[aws_client] could not initialize request"); + } + goto error; + } + + /* Increase the maximum HTTP response buffer size to fit large responses from AWS services */ + ret = flb_http_buffer_size(c, FLB_MAX_AWS_RESP_BUFFER_SIZE); + if (ret != 0) { + flb_warn("[aws_http_client] failed to increase max response buffer size"); + } + + /* Set AWS Fluent Bit user agent */ + env = aws_client->upstream->base.config->env; + buf = (char *) flb_env_get(env, "FLB_AWS_USER_AGENT"); + if (buf == NULL) { + if (getenv(AWS_ECS_METADATA_URI) != NULL) { + user_agent = AWS_USER_AGENT_ECS; + } + else { + buf = (char *) flb_env_get(env, AWS_USER_AGENT_K8S); + if (buf && strcasecmp(buf, "enabled") == 0) { + user_agent = AWS_USER_AGENT_K8S; + } + } + + if (user_agent == NULL) { + user_agent = AWS_USER_AGENT_NONE; + } + + flb_env_set(env, "FLB_AWS_USER_AGENT", user_agent); + } + if (aws_client->extra_user_agent == NULL) { + buf = (char *) flb_env_get(env, "FLB_AWS_USER_AGENT"); + tmp = flb_sds_create(buf); + if (!tmp) { + flb_errno(); + goto error; + } + aws_client->extra_user_agent = tmp; + tmp = NULL; + } + + /* Add AWS Fluent Bit user agent header */ + if (strcasecmp(aws_client->extra_user_agent, AWS_USER_AGENT_NONE) == 0) { + ret = flb_http_add_header(c, "User-Agent", 10, + FLB_AWS_BASE_USER_AGENT, FLB_AWS_BASE_USER_AGENT_LEN); + } + else { + user_agent_prefix = flb_sds_create_size(64); + if (!user_agent_prefix) { + flb_errno(); + flb_error("[aws_client] failed to create user agent"); + goto error; + } + tmp = flb_sds_printf(&user_agent_prefix, FLB_AWS_BASE_USER_AGENT_FORMAT, + aws_client->extra_user_agent); + if (!tmp) { + flb_errno(); + flb_sds_destroy(user_agent_prefix); + flb_error("[aws_client] failed to create user agent"); + goto error; + } + user_agent_prefix = tmp; + + ret = flb_http_add_header(c, "User-Agent", 10, user_agent_prefix, + flb_sds_len(user_agent_prefix)); + flb_sds_destroy(user_agent_prefix); + } + + if (ret < 0) { + if (aws_client->debug_only == FLB_TRUE) { + flb_debug("[aws_client] failed to add header to request"); + } + else { + flb_error("[aws_client] failed to add header to request"); + } + goto error; + } + + /* add headers */ + for (i = 0; i < aws_client->static_headers_len; i++) { + header = aws_client->static_headers[i]; + ret = flb_http_add_header(c, + header.key, header.key_len, + header.val, header.val_len); + if (ret < 0) { + if (aws_client->debug_only == FLB_TRUE) { + flb_debug("[aws_client] failed to add header to request"); + } + else { + flb_error("[aws_client] failed to add header to request"); + } + goto error; + } + } + + for (i = 0; i < dynamic_headers_len; i++) { + header = dynamic_headers[i]; + ret = flb_http_add_header(c, + header.key, header.key_len, + header.val, header.val_len); + if (ret < 0) { + if (aws_client->debug_only == FLB_TRUE) { + flb_debug("[aws_client] failed to add header to request"); + } + else { + flb_error("[aws_client] failed to add header to request"); + } + goto error; + } + } + + if (aws_client->has_auth) { + if (aws_client->s3_mode == S3_MODE_NONE) { + normalize_uri = FLB_TRUE; + } + else { + normalize_uri = FLB_FALSE; + } + signature = flb_signv4_do(c, normalize_uri, FLB_TRUE, time(NULL), + aws_client->region, aws_client->service, + aws_client->s3_mode, NULL, + aws_client->provider); + if (!signature) { + if (aws_client->debug_only == FLB_TRUE) { + flb_debug("[aws_client] could not sign request"); + } + else { + flb_error("[aws_client] could not sign request"); + } + goto error; + } + } + + /* Perform request */ + ret = flb_http_do(c, &b_sent); + + if (ret != 0 || c->resp.status != 200) { + flb_debug("[aws_client] %s: http_do=%i, HTTP Status: %i", + aws_client->host, ret, c->resp.status); + } + + if (ret != 0 && c != NULL) { + flb_http_client_destroy(c); + c = NULL; + } + + flb_upstream_conn_release(u_conn); + flb_sds_destroy(signature); + return c; + +error: + if (u_conn) { + flb_upstream_conn_release(u_conn); + } + if (signature) { + flb_sds_destroy(signature); + } + if (c) { + flb_http_client_destroy(c); + } + return NULL; +} + +void flb_aws_print_xml_error(char *response, size_t response_len, + char *api, struct flb_output_instance *ins) +{ + flb_sds_t error; + flb_sds_t message; + + error = flb_aws_xml_get_val(response, response_len, "<Code>", "</Code>"); + if (!error) { + flb_plg_error(ins, "%s: Could not parse response", api); + return; + } + + message = flb_aws_xml_get_val(response, response_len, "<Message>", "</Message>"); + if (!message) { + /* just print the error */ + flb_plg_error(ins, "%s API responded with error='%s'", api, error); + } + else { + flb_plg_error(ins, "%s API responded with error='%s', message='%s'", + api, error, message); + flb_sds_destroy(message); + } + + flb_sds_destroy(error); +} + +/* Parses AWS XML API Error responses and returns the value of the <code> tag */ +flb_sds_t flb_aws_xml_error(char *response, size_t response_len) +{ + return flb_aws_xml_get_val(response, response_len, "<Code>", "</Code>"); +} + +/* + * Parses an XML document and returns the value of the given tag + * Param `tag` should include angle brackets; ex "<code>" + * And param `end` should include end brackets: "</code>" + */ +flb_sds_t flb_aws_xml_get_val(char *response, size_t response_len, char *tag, char *tag_end) +{ + flb_sds_t val = NULL; + char *node = NULL; + char *end; + int len; + + if (response_len == 0) { + return NULL; + } + node = strstr(response, tag); + if (!node) { + flb_debug("[aws] Could not find '%s' tag in API response", tag); + return NULL; + } + + /* advance to end of tag */ + node += strlen(tag); + + end = strstr(node, tag_end); + if (!end) { + flb_error("[aws] Could not find end of '%s' node in xml", tag); + return NULL; + } + len = end - node; + val = flb_sds_create_len(node, len); + if (!val) { + flb_errno(); + return NULL; + } + + return val; +} + +void flb_aws_print_error(char *response, size_t response_len, + char *api, struct flb_output_instance *ins) +{ + flb_sds_t error; + flb_sds_t message; + + error = flb_json_get_val(response, response_len, "__type"); + if (!error) { + return; + } + + message = flb_json_get_val(response, response_len, "message"); + if (!message) { + /* just print the error */ + flb_plg_error(ins, "%s API responded with error='%s'", api, error); + } + else { + flb_plg_error(ins, "%s API responded with error='%s', message='%s'", + api, error, message); + flb_sds_destroy(message); + } + + flb_sds_destroy(error); +} + +/* parses AWS JSON API error responses and returns the value of the __type field */ +flb_sds_t flb_aws_error(char *response, size_t response_len) +{ + return flb_json_get_val(response, response_len, "__type"); +} + +/* gets the value of a key in a json string */ +flb_sds_t flb_json_get_val(char *response, size_t response_len, char *key) +{ + jsmntok_t *tokens = NULL; + const jsmntok_t *t = NULL; + char *current_token = NULL; + jsmn_parser parser; + int tokens_size = 50; + size_t size; + int ret; + int i = 0; + int len; + flb_sds_t error_type = NULL; + + jsmn_init(&parser); + + size = sizeof(jsmntok_t) * tokens_size; + tokens = flb_calloc(1, size); + if (!tokens) { + flb_errno(); + return NULL; + } + + ret = jsmn_parse(&parser, response, response_len, + tokens, tokens_size); + + if (ret == JSMN_ERROR_INVAL || ret == JSMN_ERROR_PART) { + flb_free(tokens); + flb_debug("[aws_client] Unable to parse API response- response is not" + " valid JSON."); + return NULL; + } + + /* return value is number of tokens parsed */ + tokens_size = ret; + + /* + * jsmn will create an array of tokens like: + * key, value, key, value + */ + while (i < (tokens_size - 1)) { + t = &tokens[i]; + + if (t->start == -1 || t->end == -1 || (t->start == 0 && t->end == 0)) { + break; + } + + if (t->type == JSMN_STRING) { + current_token = &response[t->start]; + + if (strncmp(current_token, key, strlen(key)) == 0) { + i++; + t = &tokens[i]; + current_token = &response[t->start]; + len = t->end - t->start; + error_type = flb_sds_create_len(current_token, len); + if (!error_type) { + flb_errno(); + flb_free(tokens); + return NULL; + } + break; + } + } + + i++; + } + flb_free(tokens); + return error_type; +} + +/* Generic replace function for strings. */ +static char* replace_uri_tokens(const char* original_string, const char* current_word, + const char* new_word) +{ + char *result; + int i = 0; + int count = 0; + int new_word_len = strlen(new_word); + int old_word_len = strlen(current_word); + + for (i = 0; original_string[i] != '\0'; i++) { + if (strstr(&original_string[i], current_word) == &original_string[i]) { + count++; + i += old_word_len - 1; + } + } + + result = flb_sds_create_size(i + count * (new_word_len - old_word_len) + 1); + if (!result) { + flb_errno(); + return NULL; + } + + i = 0; + while (*original_string) { + if (strstr(original_string, current_word) == original_string) { + strncpy(&result[i], new_word, new_word_len); + i += new_word_len; + original_string += old_word_len; + } + else + result[i++] = *original_string++; + } + + result[i] = '\0'; + return result; +} + +/* + * Linux has strtok_r as the concurrent safe version + * Windows has strtok_s + */ +char* strtok_concurrent( + char* str, + char* delimiters, + char** context +) +{ +#ifdef FLB_SYSTEM_WINDOWS + return strtok_s(str, delimiters, context); +#else + return strtok_r(str, delimiters, context); +#endif +} + +/* Constructs S3 object key as per the format. */ +flb_sds_t flb_get_s3_key(const char *format, time_t time, const char *tag, + char *tag_delimiter, uint64_t seq_index) +{ + int i = 0; + int ret = 0; + int seq_index_len; + char *tag_token = NULL; + char *key; + char *random_alphanumeric; + char *seq_index_str; + /* concurrent safe strtok_r requires a tracking ptr */ + char *strtok_saveptr; + int len; + flb_sds_t tmp = NULL; + flb_sds_t buf = NULL; + flb_sds_t s3_key = NULL; + flb_sds_t tmp_key = NULL; + flb_sds_t tmp_tag = NULL; + struct tm gmt = {0}; + + if (strlen(format) > S3_KEY_SIZE){ + flb_warn("[s3_key] Object key length is longer than the 1024 character limit."); + } + + tmp_tag = flb_sds_create_len(tag, strlen(tag)); + if(!tmp_tag){ + goto error; + } + + s3_key = flb_sds_create_len(format, strlen(format)); + if (!s3_key) { + goto error; + } + + /* Check if delimiter(s) specifed exists in the tag. */ + for (i = 0; i < strlen(tag_delimiter); i++){ + if (strchr(tag, tag_delimiter[i])){ + ret = 1; + break; + } + } + + tmp = flb_sds_create_len(TAG_PART_DESCRIPTOR, 5); + if (!tmp) { + goto error; + } + if (strstr(s3_key, tmp)){ + if(ret == 0){ + flb_warn("[s3_key] Invalid Tag delimiter: does not exist in tag. " + "tag=%s, format=%s", tag, format); + } + } + + flb_sds_destroy(tmp); + tmp = NULL; + + /* Split the string on the delimiters */ + tag_token = strtok_concurrent(tmp_tag, tag_delimiter, &strtok_saveptr); + + /* Find all occurences of $TAG[*] and + * replaces it with the right token from tag. + */ + i = 0; + while(tag_token != NULL && i < MAX_TAG_PARTS) { + buf = flb_sds_create_size(10); + if (!buf) { + goto error; + } + tmp = flb_sds_printf(&buf, TAG_PART_DESCRIPTOR, i); + if (!tmp) { + goto error; + } + + tmp_key = replace_uri_tokens(s3_key, tmp, tag_token); + if (!tmp_key) { + goto error; + } + + if(strlen(tmp_key) > S3_KEY_SIZE){ + flb_warn("[s3_key] Object key length is longer than the 1024 character limit."); + } + + if (buf != tmp) { + flb_sds_destroy(buf); + } + flb_sds_destroy(tmp); + tmp = NULL; + buf = NULL; + flb_sds_destroy(s3_key); + s3_key = tmp_key; + tmp_key = NULL; + + tag_token = strtok_concurrent(NULL, tag_delimiter, &strtok_saveptr); + i++; + } + + tmp = flb_sds_create_len(TAG_PART_DESCRIPTOR, 5); + if (!tmp) { + goto error; + } + + /* A match against "$TAG[" indicates an invalid or out of bounds tag part. */ + if (strstr(s3_key, tmp)){ + flb_warn("[s3_key] Invalid / Out of bounds tag part: At most 10 tag parts " + "($TAG[0] - $TAG[9]) can be processed. tag=%s, format=%s, delimiters=%s", + tag, format, tag_delimiter); + } + + /* Find all occurences of $TAG and replace with the entire tag. */ + tmp_key = replace_uri_tokens(s3_key, TAG_DESCRIPTOR, tag); + if (!tmp_key) { + goto error; + } + + if(strlen(tmp_key) > S3_KEY_SIZE){ + flb_warn("[s3_key] Object key length is longer than the 1024 character limit."); + } + + flb_sds_destroy(s3_key); + s3_key = tmp_key; + tmp_key = NULL; + + /* Find all occurences of $INDEX and replace with the appropriate index. */ + if (strstr((char *) format, INDEX_STRING)) { + seq_index_len = snprintf(NULL, 0, "%"PRIu64, seq_index); + seq_index_str = flb_calloc(seq_index_len + 1, sizeof(char)); + if (seq_index_str == NULL) { + goto error; + } + + sprintf(seq_index_str, "%"PRIu64, seq_index); + seq_index_str[seq_index_len] = '\0'; + tmp_key = replace_uri_tokens(s3_key, INDEX_STRING, seq_index_str); + if (tmp_key == NULL) { + flb_free(seq_index_str); + goto error; + } + if (strlen(tmp_key) > S3_KEY_SIZE) { + flb_warn("[s3_key] Object key length is longer than the 1024 character limit."); + } + + flb_sds_destroy(s3_key); + s3_key = tmp_key; + tmp_key = NULL; + flb_free(seq_index_str); + } + + /* Find all occurences of $UUID and replace with a random string. */ + random_alphanumeric = flb_sts_session_name(); + if (!random_alphanumeric) { + goto error; + } + /* only use 8 chars of the random string */ + random_alphanumeric[8] = '\0'; + tmp_key = replace_uri_tokens(s3_key, RANDOM_STRING, random_alphanumeric); + if (!tmp_key) { + flb_free(random_alphanumeric); + goto error; + } + + if(strlen(tmp_key) > S3_KEY_SIZE){ + flb_warn("[s3_key] Object key length is longer than the 1024 character limit."); + } + + flb_sds_destroy(s3_key); + s3_key = tmp_key; + tmp_key = NULL; + flb_free(random_alphanumeric); + + if (!gmtime_r(&time, &gmt)) { + flb_error("[s3_key] Failed to create timestamp."); + goto error; + } + + flb_sds_destroy(tmp); + tmp = NULL; + + /* A string no longer than S3_KEY_SIZE + 1 is created to store the formatted timestamp. */ + key = flb_calloc(1, (S3_KEY_SIZE + 1) * sizeof(char)); + if (!key) { + goto error; + } + + ret = strftime(key, S3_KEY_SIZE, s3_key, &gmt); + if(ret == 0){ + flb_warn("[s3_key] Object key length is longer than the 1024 character limit."); + } + flb_sds_destroy(s3_key); + + len = strlen(key); + if (len > S3_KEY_SIZE) { + len = S3_KEY_SIZE; + } + + s3_key = flb_sds_create_len(key, len); + flb_free(key); + if (!s3_key) { + goto error; + } + + flb_sds_destroy(tmp_tag); + tmp_tag = NULL; + return s3_key; + + error: + flb_errno(); + if (tmp_tag){ + flb_sds_destroy(tmp_tag); + } + if (s3_key){ + flb_sds_destroy(s3_key); + } + if (buf && buf != tmp){ + flb_sds_destroy(buf); + } + if (tmp){ + flb_sds_destroy(tmp); + } + if (tmp_key){ + flb_sds_destroy(tmp_key); + } + return NULL; +} + +/* + * This function is an extension to strftime which can support milliseconds with %3N, + * support nanoseconds with %9N or %L. The return value is the length of formatted + * time string. + */ +size_t flb_aws_strftime_precision(char **out_buf, const char *time_format, + struct flb_time *tms) +{ + char millisecond_str[FLB_AWS_MILLISECOND_FORMATTER_LENGTH+1]; + char nanosecond_str[FLB_AWS_NANOSECOND_FORMATTER_LENGTH+1]; + char *tmp_parsed_time_str; + char *buf; + size_t out_size; + size_t tmp_parsed_time_str_len; + size_t time_format_len; + struct tm timestamp; + struct tm *tmp; + int i; + + /* + * Guess the max length needed for tmp_parsed_time_str and tmp_out_buf. The + * upper bound is 12*strlen(time_format) because the worst scenario will be only + * %c in time_format, and %c will be transfer to 24 chars long by function strftime(). + */ + time_format_len = strlen(time_format); + tmp_parsed_time_str_len = 12*time_format_len; + + /* + * Use tmp_parsed_time_str to buffer when replace %3N with milliseconds, replace + * %9N and %L with nanoseconds in time_format. + */ + tmp_parsed_time_str = (char *)flb_calloc(1, tmp_parsed_time_str_len*sizeof(char)); + if (!tmp_parsed_time_str) { + flb_errno(); + return 0; + } + + buf = (char *)flb_calloc(1, tmp_parsed_time_str_len*sizeof(char)); + if (!buf) { + flb_errno(); + flb_free(tmp_parsed_time_str); + return 0; + } + + /* Replace %3N to millisecond, %9N and %L to nanosecond in time_format. */ + snprintf(millisecond_str, FLB_AWS_MILLISECOND_FORMATTER_LENGTH+1, + "%" PRIu64, (uint64_t) tms->tm.tv_nsec / 1000000); + snprintf(nanosecond_str, FLB_AWS_NANOSECOND_FORMATTER_LENGTH+1, + "%" PRIu64, (uint64_t) tms->tm.tv_nsec); + for (i = 0; i < time_format_len; i++) { + if (strncmp(time_format+i, FLB_AWS_MILLISECOND_FORMATTER, 3) == 0) { + strncat(tmp_parsed_time_str, millisecond_str, + FLB_AWS_MILLISECOND_FORMATTER_LENGTH+1); + i += 2; + } + else if (strncmp(time_format+i, FLB_AWS_NANOSECOND_FORMATTER_N, 3) == 0) { + strncat(tmp_parsed_time_str, nanosecond_str, + FLB_AWS_NANOSECOND_FORMATTER_LENGTH+1); + i += 2; + } + else if (strncmp(time_format+i, FLB_AWS_NANOSECOND_FORMATTER_L, 2) == 0) { + strncat(tmp_parsed_time_str, nanosecond_str, + FLB_AWS_NANOSECOND_FORMATTER_LENGTH+1); + i += 1; + } + else { + strncat(tmp_parsed_time_str,time_format+i,1); + } + } + + tmp = gmtime_r(&tms->tm.tv_sec, ×tamp); + if (!tmp) { + return 0; + } + + out_size = strftime(buf, tmp_parsed_time_str_len, + tmp_parsed_time_str, ×tamp); + + /* Check whether tmp_parsed_time_str_len is enough for tmp_out_buff */ + if (out_size == 0) { + flb_free(tmp_parsed_time_str); + flb_free(buf); + return 0; + } + + *out_buf = buf; + flb_free(tmp_parsed_time_str); + + return out_size; +} |