summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/aws
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/src/aws')
-rw-r--r--fluent-bit/src/aws/CMakeLists.txt32
-rw-r--r--fluent-bit/src/aws/compression/CMakeLists.txt6
-rw-r--r--fluent-bit/src/aws/compression/arrow/CMakeLists.txt7
-rw-r--r--fluent-bit/src/aws/compression/arrow/compress.c147
-rw-r--r--fluent-bit/src/aws/compression/arrow/compress.h13
-rw-r--r--fluent-bit/src/aws/flb_aws_compress.c245
-rw-r--r--fluent-bit/src/aws/flb_aws_credentials.c862
-rw-r--r--fluent-bit/src/aws/flb_aws_credentials_ec2.c371
-rw-r--r--fluent-bit/src/aws/flb_aws_credentials_http.c566
-rw-r--r--fluent-bit/src/aws/flb_aws_credentials_log.h39
-rw-r--r--fluent-bit/src/aws/flb_aws_credentials_process.c783
-rw-r--r--fluent-bit/src/aws/flb_aws_credentials_profile.c753
-rw-r--r--fluent-bit/src/aws/flb_aws_credentials_sts.c958
-rw-r--r--fluent-bit/src/aws/flb_aws_error_reporter.c276
-rw-r--r--fluent-bit/src/aws/flb_aws_imds.c370
-rw-r--r--fluent-bit/src/aws/flb_aws_util.c1047
16 files changed, 6475 insertions, 0 deletions
diff --git a/fluent-bit/src/aws/CMakeLists.txt b/fluent-bit/src/aws/CMakeLists.txt
new file mode 100644
index 000000000..a6580e7a5
--- /dev/null
+++ b/fluent-bit/src/aws/CMakeLists.txt
@@ -0,0 +1,32 @@
+add_subdirectory(compression)
+
+include_directories(
+ ${CMAKE_CURRENT_SOURCE_DIR}
+ ${CMAKE_CURRENT_BINARY_DIR}
+ )
+
+set(src
+ "flb_aws_credentials_log.h"
+ "flb_aws_compress.c"
+ "flb_aws_util.c"
+ "flb_aws_credentials.c"
+ "flb_aws_credentials_sts.c"
+ "flb_aws_credentials_ec2.c"
+ "flb_aws_imds.c"
+ "flb_aws_credentials_http.c"
+ "flb_aws_credentials_profile.c"
+ )
+
+if(FLB_HAVE_AWS_CREDENTIAL_PROCESS)
+ set(src
+ ${src}
+ "flb_aws_credentials_process.c"
+ )
+endif()
+
+add_library(flb-aws STATIC ${src})
+target_link_libraries(flb-aws flb-aws-compress)
+
+if(FLB_JEMALLOC)
+ target_link_libraries(flb-aws libjemalloc)
+endif()
diff --git a/fluent-bit/src/aws/compression/CMakeLists.txt b/fluent-bit/src/aws/compression/CMakeLists.txt
new file mode 100644
index 000000000..02a1ba3a6
--- /dev/null
+++ b/fluent-bit/src/aws/compression/CMakeLists.txt
@@ -0,0 +1,6 @@
+add_library(flb-aws-compress INTERFACE)
+
+if(FLB_ARROW)
+ add_subdirectory(arrow EXCLUDE_FROM_ALL)
+ target_link_libraries(flb-aws-compress INTERFACE flb-aws-arrow)
+endif()
diff --git a/fluent-bit/src/aws/compression/arrow/CMakeLists.txt b/fluent-bit/src/aws/compression/arrow/CMakeLists.txt
new file mode 100644
index 000000000..846f65441
--- /dev/null
+++ b/fluent-bit/src/aws/compression/arrow/CMakeLists.txt
@@ -0,0 +1,7 @@
+set(src
+ compress.c)
+
+add_library(flb-aws-arrow STATIC ${src})
+
+target_include_directories(flb-aws-arrow PRIVATE ${ARROW_GLIB_INCLUDE_DIRS})
+target_link_libraries(flb-aws-arrow ${ARROW_GLIB_LDFLAGS})
diff --git a/fluent-bit/src/aws/compression/arrow/compress.c b/fluent-bit/src/aws/compression/arrow/compress.c
new file mode 100644
index 000000000..a48b34f80
--- /dev/null
+++ b/fluent-bit/src/aws/compression/arrow/compress.c
@@ -0,0 +1,147 @@
+/*
+ * This converts S3 plugin's request buffer into Apache Arrow format.
+ *
+ * We use GLib binding to call Arrow functions (which is implemented
+ * in C++) from Fluent Bit.
+ *
+ * https://github.com/apache/arrow/tree/master/c_glib
+ */
+
+#include <arrow-glib/arrow-glib.h>
+#include <inttypes.h>
+
+/*
+ * GArrowTable is the central structure that represents "table" (a.k.a.
+ * data frame).
+ */
+static GArrowTable* parse_json(uint8_t *json, int size)
+{
+ GArrowJSONReader *reader;
+ GArrowBuffer *buffer;
+ GArrowBufferInputStream *input;
+ GArrowJSONReadOptions *options;
+ GArrowTable *table;
+ GError *error = NULL;
+
+ buffer = garrow_buffer_new(json, size);
+ if (buffer == NULL) {
+ return NULL;
+ }
+
+ input = garrow_buffer_input_stream_new(buffer);
+ if (input == NULL) {
+ g_object_unref(buffer);
+ return NULL;
+ }
+
+ options = garrow_json_read_options_new();
+ if (options == NULL) {
+ g_object_unref(buffer);
+ g_object_unref(input);
+ return NULL;
+ }
+
+ reader = garrow_json_reader_new(GARROW_INPUT_STREAM(input), options, &error);
+ if (reader == NULL) {
+ g_error_free(error);
+ g_object_unref(buffer);
+ g_object_unref(input);
+ g_object_unref(options);
+ return NULL;
+ }
+
+ table = garrow_json_reader_read(reader, &error);
+ if (table == NULL) {
+ g_error_free(error);
+ g_object_unref(buffer);
+ g_object_unref(input);
+ g_object_unref(options);
+ g_object_unref(reader);
+ return NULL;
+ }
+ g_object_unref(buffer);
+ g_object_unref(input);
+ g_object_unref(options);
+ g_object_unref(reader);
+ return table;
+}
+
+static GArrowResizableBuffer* table_to_buffer(GArrowTable *table)
+{
+ GArrowResizableBuffer *buffer;
+ GArrowBufferOutputStream *sink;
+ GError *error = NULL;
+ gboolean success;
+
+ buffer = garrow_resizable_buffer_new(0, &error);
+ if (buffer == NULL) {
+ g_error_free(error);
+ return NULL;
+ }
+
+ sink = garrow_buffer_output_stream_new(buffer);
+ if (sink == NULL) {
+ g_object_unref(buffer);
+ return NULL;
+ }
+
+ success = garrow_table_write_as_feather(
+ table, GARROW_OUTPUT_STREAM(sink),
+ NULL, &error);
+ if (!success) {
+ g_error_free(error);
+ g_object_unref(buffer);
+ g_object_unref(sink);
+ return NULL;
+ }
+ g_object_unref(sink);
+ return buffer;
+}
+
+int out_s3_compress_arrow(void *json, size_t size, void **out_buf, size_t *out_size)
+{
+ GArrowTable *table;
+ GArrowResizableBuffer *buffer;
+ GBytes *bytes;
+ gconstpointer ptr;
+ gsize len;
+ uint8_t *buf;
+
+ table = parse_json((uint8_t *) json, size);
+ if (table == NULL) {
+ return -1;
+ }
+
+ buffer = table_to_buffer(table);
+ g_object_unref(table);
+ if (buffer == NULL) {
+ return -1;
+ }
+
+ bytes = garrow_buffer_get_data(GARROW_BUFFER(buffer));
+ if (bytes == NULL) {
+ g_object_unref(buffer);
+ return -1;
+ }
+
+ ptr = g_bytes_get_data(bytes, &len);
+ if (ptr == NULL) {
+ g_object_unref(buffer);
+ g_bytes_unref(bytes);
+ return -1;
+ }
+
+ buf = malloc(len);
+ if (buf == NULL) {
+ g_object_unref(buffer);
+ g_bytes_unref(bytes);
+ return -1;
+ }
+ memcpy(buf, ptr, len);
+ *out_buf = (void *) buf;
+ *out_size = len;
+
+ g_object_unref(buffer);
+ g_bytes_unref(bytes);
+ return 0;
+}
diff --git a/fluent-bit/src/aws/compression/arrow/compress.h b/fluent-bit/src/aws/compression/arrow/compress.h
new file mode 100644
index 000000000..82e94f43c
--- /dev/null
+++ b/fluent-bit/src/aws/compression/arrow/compress.h
@@ -0,0 +1,13 @@
+/*
+ * This function converts out_s3 buffer into Apache Arrow format.
+ *
+ * `json` is a string that contain (concatenated) JSON objects.
+ *
+ * `size` is the length of the json data (excluding the trailing
+ * null-terminator character).
+ *
+ * Return 0 on success (with `out_buf` and `out_size` updated),
+ * and -1 on failure
+ */
+
+int out_s3_compress_arrow(void *json, size_t size, void **out_buf, size_t *out_size);
diff --git a/fluent-bit/src/aws/flb_aws_compress.c b/fluent-bit/src/aws/flb_aws_compress.c
new file mode 100644
index 000000000..e98ce8318
--- /dev/null
+++ b/fluent-bit/src/aws/flb_aws_compress.c
@@ -0,0 +1,245 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2019-2021 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_log.h>
+#include <fluent-bit/flb_base64.h>
+
+#include <fluent-bit/aws/flb_aws_compress.h>
+#include <fluent-bit/flb_gzip.h>
+
+#include <stdint.h>
+
+#ifdef FLB_HAVE_ARROW
+#include "compression/arrow/compress.h"
+#endif
+
+struct compression_option {
+ int compression_type;
+ char *compression_keyword;
+ int(*compress)(void *in_data, size_t in_len, void **out_data, size_t *out_len);
+};
+
+/*
+ * Library of compression options
+ * AWS plugins that support compression will have these options.
+ * Referenced function should return -1 on error and 0 on success.
+ */
+static const struct compression_option compression_options[] = {
+ /* FLB_AWS_COMPRESS_NONE which is 0 is reserved for array footer */
+ {
+ FLB_AWS_COMPRESS_GZIP,
+ "gzip",
+ &flb_gzip_compress
+ },
+#ifdef FLB_HAVE_ARROW
+ {
+ FLB_AWS_COMPRESS_ARROW,
+ "arrow",
+ &out_s3_compress_arrow
+ },
+#endif
+ { 0 }
+};
+
+int flb_aws_compression_get_type(const char *compression_keyword)
+{
+ int ret;
+ const struct compression_option *o;
+
+ o = compression_options;
+
+ while (o->compression_type != 0) {
+ ret = strcmp(o->compression_keyword, compression_keyword);
+ if (ret == 0) {
+ return o->compression_type;
+ }
+ ++o;
+ }
+
+ flb_error("[aws_compress] unknown compression type: %s", compression_keyword);
+ return -1;
+}
+
+int flb_aws_compression_compress(int compression_type, void *in_data, size_t in_len,
+ void **out_data, size_t *out_len)
+{
+ const struct compression_option *o;
+
+ o = compression_options;
+
+ while (o->compression_type != 0) {
+ if (o->compression_type == compression_type) {
+ return o->compress(in_data, in_len, out_data, out_len);
+ }
+ ++o;
+ }
+
+ flb_error("[aws_compress] invalid compression type: %i", compression_type);
+ flb_errno();
+ return -1;
+}
+
+int flb_aws_compression_b64_truncate_compress(int compression_type, size_t max_out_len,
+ void *in_data, size_t in_len,
+ void **out_data, size_t *out_len)
+{
+ static const void *truncation_suffix = "[Truncated...]";
+ static const size_t truncation_suffix_len = 14;
+ static const double truncation_reduction_percent = 90; /* % out of 100 */
+ static const int truncation_compression_max_attempts = 10;
+
+ int ret;
+ int is_truncated;
+ int compression_attempts;
+ size_t truncated_in_len_prev;
+ size_t truncated_in_len;
+ void *truncated_in_buf;
+ void *compressed_buf;
+ size_t compressed_len;
+ size_t original_b64_compressed_len;
+
+ unsigned char *b64_compressed_buf;
+ size_t b64_compressed_len;
+ size_t b64_actual_len;
+
+ /* Iterative approach to truncation */
+ truncated_in_len = in_len;
+ truncated_in_buf = in_data;
+ is_truncated = FLB_FALSE;
+ b64_compressed_len = SIZE_MAX;
+ compression_attempts = 0;
+ while (max_out_len < b64_compressed_len - 1) {
+
+ /* Limit compression truncation attempts, just to be safe */
+ if (compression_attempts >= truncation_compression_max_attempts) {
+ if (is_truncated) {
+ flb_free(truncated_in_buf);
+ }
+ flb_error("[aws_compress] truncation failed, too many compression attempts");
+ return -1;
+ }
+
+ ret = flb_aws_compression_compress(compression_type, truncated_in_buf,
+ truncated_in_len, &compressed_buf,
+ &compressed_len);
+ ++compression_attempts;
+ if (ret != 0) {
+ if (is_truncated) {
+ flb_free(truncated_in_buf);
+ }
+ return -1;
+ }
+
+ /* Determine encoded base64 buffer size */
+ b64_compressed_len = compressed_len / 3; /* Compute number of 4 sextet groups */
+ b64_compressed_len += (compressed_len % 3 != 0); /* Add padding partial group */
+ b64_compressed_len *= 4; /* Compute number of sextets */
+ b64_compressed_len += 1; /* Add room for null character 0x00 */
+
+ /* Truncation needed */
+ if (max_out_len < b64_compressed_len - 1) {
+ flb_debug("[aws_compress] iterative truncation round");
+
+ /* This compressed_buf is the wrong size. Free */
+ flb_free(compressed_buf);
+
+ /* Base case: input compressed empty string, output still too large */
+ if (truncated_in_len == 0) {
+ if (is_truncated) {
+ flb_free(truncated_in_buf);
+ }
+ flb_error("[aws_compress] truncation failed, compressed empty input too "
+ "large");
+ return -1;
+ }
+
+ /* Calculate corrected input size */
+ truncated_in_len_prev = truncated_in_len;
+ truncated_in_len = (max_out_len * truncated_in_len) / b64_compressed_len;
+ truncated_in_len = (truncated_in_len * truncation_reduction_percent) / 100;
+
+ /* Ensure working down */
+ if (truncated_in_len >= truncated_in_len_prev) {
+ truncated_in_len = truncated_in_len_prev - 1;
+ }
+
+ /* Allocate truncation buffer */
+ if (!is_truncated) {
+ is_truncated = FLB_TRUE;
+ original_b64_compressed_len = b64_compressed_len;
+ truncated_in_buf = flb_malloc(in_len);
+ if (!truncated_in_buf) {
+ flb_errno();
+ return -1;
+ }
+ memcpy(truncated_in_buf, in_data, in_len);
+ }
+
+ /* Slap on truncation suffix */
+ if (truncated_in_len < truncation_suffix_len) {
+ /* No room for the truncation suffix. Terminal error */
+ flb_error("[aws_compress] truncation failed, no room for suffix");
+ flb_free(truncated_in_buf);
+ return -1;
+ }
+ memcpy((char *) truncated_in_buf + truncated_in_len - truncation_suffix_len,
+ truncation_suffix, truncation_suffix_len);
+ }
+ }
+
+ /* Truncate buffer free and compression buffer allocation */
+ if (is_truncated) {
+ flb_free(truncated_in_buf);
+ flb_warn("[aws_compress][size=%zu] Truncating input for compressed output "
+ "larger than %zu bytes, output from %zu to %zu bytes",
+ in_len, max_out_len, original_b64_compressed_len - 1,
+ b64_compressed_len - 1);
+ }
+ b64_compressed_buf = flb_malloc(b64_compressed_len);
+ if (!b64_compressed_buf) {
+ flb_errno();
+ return -1;
+ }
+
+ /* Base64 encode compressed out bytes */
+ ret = flb_base64_encode(b64_compressed_buf, b64_compressed_len, &b64_actual_len,
+ compressed_buf, compressed_len);
+ flb_free(compressed_buf);
+
+ if (ret == FLB_BASE64_ERR_BUFFER_TOO_SMALL) {
+ flb_error("[aws_compress] compressed log base64 buffer too small");
+ return -1; /* not handle truncation at this point */
+ }
+ if (ret != 0) {
+ flb_free(b64_compressed_buf);
+ return -1;
+ }
+
+ /* Double check b64 buf len */
+ if (b64_compressed_len - 1 != b64_actual_len) {
+ flb_error("[aws_compress] buffer len should be 1 greater than actual len");
+ flb_free(b64_compressed_buf);
+ return -1;
+ }
+
+ *out_data = b64_compressed_buf;
+ *out_len = b64_compressed_len - 1; /* disregard added null character */
+ return 0;
+}
diff --git a/fluent-bit/src/aws/flb_aws_credentials.c b/fluent-bit/src/aws/flb_aws_credentials.c
new file mode 100644
index 000000000..850142e24
--- /dev/null
+++ b/fluent-bit/src/aws/flb_aws_credentials.c
@@ -0,0 +1,862 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/flb_http_client.h>
+#include <fluent-bit/flb_aws_credentials.h>
+#include <fluent-bit/flb_aws_util.h>
+#include <fluent-bit/flb_jsmn.h>
+#include <fluent-bit/flb_output_plugin.h>
+
+#include <stdlib.h>
+#include <time.h>
+
+#define FIVE_MINUTES 300
+#define TWELVE_HOURS 43200
+
+/* Credentials Environment Variables */
+#define AWS_ACCESS_KEY_ID "AWS_ACCESS_KEY_ID"
+#define AWS_SECRET_ACCESS_KEY "AWS_SECRET_ACCESS_KEY"
+#define AWS_SESSION_TOKEN "AWS_SESSION_TOKEN"
+
+#define EKS_POD_EXECUTION_ROLE "EKS_POD_EXECUTION_ROLE"
+
+/* declarations */
+static struct flb_aws_provider *standard_chain_create(struct flb_config
+ *config,
+ struct flb_tls *tls,
+ char *region,
+ char *sts_endpoint,
+ char *proxy,
+ struct
+ flb_aws_client_generator
+ *generator,
+ int eks_irsa,
+ char *profile);
+
+
+/*
+ * The standard credential provider chain:
+ * 1. Environment variables
+ * 2. Shared credentials file (AWS Profile)
+ * 3. EKS OIDC
+ * 4. EC2 IMDS
+ * 5. ECS HTTP credentials endpoint
+ *
+ * This provider will evaluate each provider in order, returning the result
+ * from the first provider that returns valid credentials.
+ *
+ * Note: Client code should use this provider by default.
+ */
+struct flb_aws_provider_chain {
+ struct mk_list sub_providers;
+
+ /*
+ * The standard chain provider picks the first successful provider and
+ * then uses it until a call to refresh is made.
+ */
+ struct flb_aws_provider *sub_provider;
+};
+
+/*
+ * Iterates through the chain and returns credentials from the first provider
+ * that successfully returns creds. Caches this provider on the implementation.
+ */
+struct flb_aws_credentials *get_from_chain(struct flb_aws_provider_chain
+ *implementation)
+{
+ struct flb_aws_provider *sub_provider = NULL;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_aws_credentials *creds = NULL;
+
+ /* find the first provider that produces a valid set of creds */
+ mk_list_foreach_safe(head, tmp, &implementation->sub_providers) {
+ sub_provider = mk_list_entry(head,
+ struct flb_aws_provider,
+ _head);
+ creds = sub_provider->provider_vtable->get_credentials(sub_provider);
+ if (creds) {
+ implementation->sub_provider = sub_provider;
+ return creds;
+ }
+ }
+
+ return NULL;
+}
+
+struct flb_aws_credentials *get_credentials_fn_standard_chain(struct
+ flb_aws_provider
+ *provider)
+{
+ struct flb_aws_credentials *creds = NULL;
+ struct flb_aws_provider_chain *implementation = provider->implementation;
+ struct flb_aws_provider *sub_provider = implementation->sub_provider;
+
+ if (sub_provider) {
+ return sub_provider->provider_vtable->get_credentials(sub_provider);
+ }
+
+ if (try_lock_provider(provider)) {
+ creds = get_from_chain(implementation);
+ unlock_provider(provider);
+ return creds;
+ }
+
+ /*
+ * We failed to lock the provider and sub_provider is unset. This means that
+ * another co-routine is selecting a provider from the chain.
+ */
+ flb_warn("[aws_credentials] No cached credentials are available and "
+ "a credential refresh is already in progress. The current "
+ "co-routine will retry.");
+ return NULL;
+}
+
+int init_fn_standard_chain(struct flb_aws_provider *provider)
+{
+ struct flb_aws_provider_chain *implementation = provider->implementation;
+ struct flb_aws_provider *sub_provider = NULL;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ int ret = -1;
+
+ if (try_lock_provider(provider)) {
+ /* find the first provider that indicates successful init */
+ mk_list_foreach_safe(head, tmp, &implementation->sub_providers) {
+ sub_provider = mk_list_entry(head,
+ struct flb_aws_provider,
+ _head);
+ ret = sub_provider->provider_vtable->init(sub_provider);
+ if (ret >= 0) {
+ implementation->sub_provider = sub_provider;
+ break;
+ }
+ }
+ unlock_provider(provider);
+ }
+
+ return ret;
+}
+
+/*
+ * Client code should only call refresh if there has been an
+ * error from the AWS APIs indicating creds are expired/invalid.
+ * Refresh may change the current sub_provider.
+ */
+int refresh_fn_standard_chain(struct flb_aws_provider *provider)
+{
+ struct flb_aws_provider_chain *implementation = provider->implementation;
+ struct flb_aws_provider *sub_provider = NULL;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ int ret = -1;
+
+ if (try_lock_provider(provider)) {
+ /* find the first provider that indicates successful refresh */
+ mk_list_foreach_safe(head, tmp, &implementation->sub_providers) {
+ sub_provider = mk_list_entry(head,
+ struct flb_aws_provider,
+ _head);
+ ret = sub_provider->provider_vtable->refresh(sub_provider);
+ if (ret >= 0) {
+ implementation->sub_provider = sub_provider;
+ break;
+ }
+ }
+ unlock_provider(provider);
+ }
+
+ return ret;
+}
+
+void sync_fn_standard_chain(struct flb_aws_provider *provider)
+{
+ struct flb_aws_provider_chain *implementation = provider->implementation;
+ struct flb_aws_provider *sub_provider = NULL;
+ struct mk_list *tmp;
+ struct mk_list *head;
+
+ /* set all providers to sync mode */
+ mk_list_foreach_safe(head, tmp, &implementation->sub_providers) {
+ sub_provider = mk_list_entry(head,
+ struct flb_aws_provider,
+ _head);
+ sub_provider->provider_vtable->sync(sub_provider);
+ }
+}
+
+void async_fn_standard_chain(struct flb_aws_provider *provider)
+{
+ struct flb_aws_provider_chain *implementation = provider->implementation;
+ struct flb_aws_provider *sub_provider = NULL;
+ struct mk_list *tmp;
+ struct mk_list *head;
+
+ /* set all providers to async mode */
+ mk_list_foreach_safe(head, tmp, &implementation->sub_providers) {
+ sub_provider = mk_list_entry(head,
+ struct flb_aws_provider,
+ _head);
+ sub_provider->provider_vtable->async(sub_provider);
+ }
+}
+
+void upstream_set_fn_standard_chain(struct flb_aws_provider *provider,
+ struct flb_output_instance *ins)
+{
+ struct flb_aws_provider_chain *implementation = provider->implementation;
+ struct flb_aws_provider *sub_provider = NULL;
+ struct mk_list *tmp;
+ struct mk_list *head;
+
+ /* set all providers to async mode */
+ mk_list_foreach_safe(head, tmp, &implementation->sub_providers) {
+ sub_provider = mk_list_entry(head,
+ struct flb_aws_provider,
+ _head);
+ sub_provider->provider_vtable->upstream_set(sub_provider, ins);
+ }
+}
+
+void destroy_fn_standard_chain(struct flb_aws_provider *provider) {
+ struct flb_aws_provider *sub_provider;
+ struct flb_aws_provider_chain *implementation;
+ struct mk_list *tmp;
+ struct mk_list *head;
+
+ implementation = provider->implementation;
+
+ if (implementation) {
+ mk_list_foreach_safe(head, tmp, &implementation->sub_providers) {
+ sub_provider = mk_list_entry(head, struct flb_aws_provider,
+ _head);
+ mk_list_del(&sub_provider->_head);
+ flb_aws_provider_destroy(sub_provider);
+ }
+
+ flb_free(implementation);
+ }
+}
+
+static struct flb_aws_provider_vtable standard_chain_provider_vtable = {
+ .get_credentials = get_credentials_fn_standard_chain,
+ .init = init_fn_standard_chain,
+ .refresh = refresh_fn_standard_chain,
+ .destroy = destroy_fn_standard_chain,
+ .sync = sync_fn_standard_chain,
+ .async = async_fn_standard_chain,
+ .upstream_set = upstream_set_fn_standard_chain,
+};
+
+struct flb_aws_provider *flb_standard_chain_provider_create(struct flb_config
+ *config,
+ struct flb_tls *tls,
+ char *region,
+ char *sts_endpoint,
+ char *proxy,
+ struct
+ flb_aws_client_generator
+ *generator,
+ char *profile)
+{
+ struct flb_aws_provider *provider;
+ struct flb_aws_provider *tmp_provider;
+ char *eks_pod_role = NULL;
+ char *session_name;
+
+ eks_pod_role = getenv(EKS_POD_EXECUTION_ROLE);
+ if (eks_pod_role && strlen(eks_pod_role) > 0) {
+ /*
+ * eks fargate
+ * standard chain will be base provider used to
+ * assume the EKS_POD_EXECUTION_ROLE
+ */
+ flb_debug("[aws_credentials] Using EKS_POD_EXECUTION_ROLE=%s", eks_pod_role);
+ tmp_provider = standard_chain_create(config, tls, region, sts_endpoint,
+ proxy, generator, FLB_FALSE, profile);
+
+ if (!tmp_provider) {
+ return NULL;
+ }
+
+ session_name = flb_sts_session_name();
+ if (!session_name) {
+ flb_error("Failed to generate random STS session name");
+ flb_aws_provider_destroy(tmp_provider);
+ return NULL;
+ }
+
+ provider = flb_sts_provider_create(config, tls, tmp_provider, NULL,
+ eks_pod_role, session_name,
+ region, sts_endpoint,
+ NULL, generator);
+ if (!provider) {
+ flb_error("Failed to create EKS Fargate Credential Provider");
+ flb_aws_provider_destroy(tmp_provider);
+ return NULL;
+ }
+ /* session name can freed after provider is created */
+ flb_free(session_name);
+ session_name = NULL;
+
+ return provider;
+ }
+
+ /* standard case- not in EKS Fargate */
+ provider = standard_chain_create(config, tls, region, sts_endpoint,
+ proxy, generator, FLB_TRUE, profile);
+ return provider;
+}
+
+struct flb_aws_provider *flb_managed_chain_provider_create(struct flb_output_instance
+ *ins,
+ struct flb_config
+ *config,
+ char *config_key_prefix,
+ char *proxy,
+ struct
+ flb_aws_client_generator
+ *generator)
+{
+ flb_sds_t config_key_region;
+ flb_sds_t config_key_sts_endpoint;
+ flb_sds_t config_key_role_arn;
+ flb_sds_t config_key_external_id;
+ flb_sds_t config_key_profile;
+ const char *region = NULL;
+ const char *sts_endpoint = NULL;
+ const char *role_arn = NULL;
+ const char *external_id = NULL;
+ const char *profile = NULL;
+ char *session_name = NULL;
+ int key_prefix_len;
+ int key_max_len;
+
+ /* Provider managed dependencies */
+ struct flb_aws_provider *aws_provider = NULL;
+ struct flb_aws_provider *base_aws_provider = NULL;
+ struct flb_tls *cred_tls = NULL;
+ struct flb_tls *sts_tls = NULL;
+
+ /* Config keys */
+ key_prefix_len = strlen(config_key_prefix);
+ key_max_len = key_prefix_len + 12; /* max length of
+ "region", "sts_endpoint", "role_arn",
+ "external_id" */
+
+ /* Evaluate full config keys */
+ config_key_region = flb_sds_create_len(config_key_prefix, key_max_len);
+ strcpy(config_key_region + key_prefix_len, "region");
+ config_key_sts_endpoint = flb_sds_create_len(config_key_prefix, key_max_len);
+ strcpy(config_key_sts_endpoint + key_prefix_len, "sts_endpoint");
+ config_key_role_arn = flb_sds_create_len(config_key_prefix, key_max_len);
+ strcpy(config_key_role_arn + key_prefix_len, "role_arn");
+ config_key_external_id = flb_sds_create_len(config_key_prefix, key_max_len);
+ strcpy(config_key_external_id + key_prefix_len, "external_id");
+ config_key_profile = flb_sds_create_len(config_key_prefix, key_max_len);
+ strcpy(config_key_profile + key_prefix_len, "profile");
+
+ /* AWS provider needs a separate TLS instance */
+ cred_tls = flb_tls_create(FLB_TLS_CLIENT_MODE,
+ FLB_TRUE,
+ ins->tls_debug,
+ ins->tls_vhost,
+ ins->tls_ca_path,
+ ins->tls_ca_file,
+ ins->tls_crt_file,
+ ins->tls_key_file,
+ ins->tls_key_passwd);
+ if (!cred_tls) {
+ flb_plg_error(ins, "Failed to create TLS instance for AWS Provider");
+ flb_errno();
+ goto error;
+ }
+
+ region = flb_output_get_property(config_key_region, ins);
+ if (!region) {
+ flb_plg_error(ins, "aws_auth enabled but %s not set", config_key_region);
+ goto error;
+ }
+
+ /* Use null sts_endpoint if none provided */
+ sts_endpoint = flb_output_get_property(config_key_sts_endpoint, ins);
+ /* Get the profile from configuration */
+ profile = flb_output_get_property(config_key_profile, ins);
+ aws_provider = flb_standard_chain_provider_create(config,
+ cred_tls,
+ (char *) region,
+ (char *) sts_endpoint,
+ NULL,
+ flb_aws_client_generator(),
+ profile);
+ if (!aws_provider) {
+ flb_plg_error(ins, "Failed to create AWS Credential Provider");
+ goto error;
+ }
+
+ role_arn = flb_output_get_property(config_key_role_arn, ins);
+ if (role_arn) {
+ /* Use the STS Provider */
+ base_aws_provider = aws_provider;
+ external_id = flb_output_get_property(config_key_external_id, ins);
+
+ session_name = flb_sts_session_name();
+ if (!session_name) {
+ flb_plg_error(ins, "Failed to generate aws iam role "
+ "session name");
+ goto error;
+ }
+
+ /* STS provider needs yet another separate TLS instance */
+ sts_tls = flb_tls_create(FLB_TLS_CLIENT_MODE,
+ FLB_TRUE,
+ ins->tls_debug,
+ ins->tls_vhost,
+ ins->tls_ca_path,
+ ins->tls_ca_file,
+ ins->tls_crt_file,
+ ins->tls_key_file,
+ ins->tls_key_passwd);
+ if (!sts_tls) {
+ flb_plg_error(ins, "Failed to create TLS instance for AWS STS Credential "
+ "Provider");
+ flb_errno();
+ goto error;
+ }
+
+ aws_provider = flb_sts_provider_create(config,
+ sts_tls,
+ base_aws_provider,
+ (char *) external_id,
+ (char *) role_arn,
+ session_name,
+ (char *) region,
+ (char *) sts_endpoint,
+ NULL,
+ flb_aws_client_generator());
+ if (!aws_provider) {
+ flb_plg_error(ins, "Failed to create AWS STS Credential "
+ "Provider");
+ goto error;
+ }
+ }
+
+ /* initialize credentials in sync mode */
+ aws_provider->provider_vtable->sync(aws_provider);
+ aws_provider->provider_vtable->init(aws_provider);
+
+ /* set back to async */
+ aws_provider->provider_vtable->async(aws_provider);
+
+ /* store dependencies in aws_provider for managed cleanup */
+ aws_provider->base_aws_provider = base_aws_provider;
+ aws_provider->cred_tls = cred_tls;
+ aws_provider->sts_tls = sts_tls;
+
+ goto cleanup;
+
+error:
+ if (aws_provider) {
+ /* disconnect dependencies */
+ aws_provider->base_aws_provider = NULL;
+ aws_provider->cred_tls = NULL;
+ aws_provider->sts_tls = NULL;
+ /* destroy */
+ flb_aws_provider_destroy(aws_provider);
+ }
+ /* free dependencies */
+ if (base_aws_provider) {
+ flb_aws_provider_destroy(base_aws_provider);
+ }
+ if (cred_tls) {
+ flb_tls_destroy(cred_tls);
+ }
+ if (sts_tls) {
+ flb_tls_destroy(sts_tls);
+ }
+ aws_provider = NULL;
+
+cleanup:
+ if (config_key_region) {
+ flb_sds_destroy(config_key_region);
+ }
+ if (config_key_sts_endpoint) {
+ flb_sds_destroy(config_key_sts_endpoint);
+ }
+ if (config_key_role_arn) {
+ flb_sds_destroy(config_key_role_arn);
+ }
+ if (config_key_external_id) {
+ flb_sds_destroy(config_key_external_id);
+ }
+ if (session_name) {
+ flb_free(session_name);
+ }
+
+ return aws_provider;
+}
+
+static struct flb_aws_provider *standard_chain_create(struct flb_config
+ *config,
+ struct flb_tls *tls,
+ char *region,
+ char *sts_endpoint,
+ char *proxy,
+ struct
+ flb_aws_client_generator
+ *generator,
+ int eks_irsa,
+ char *profile)
+{
+ struct flb_aws_provider *sub_provider;
+ struct flb_aws_provider *provider;
+ struct flb_aws_provider_chain *implementation;
+
+ provider = flb_calloc(1, sizeof(struct flb_aws_provider));
+
+ if (!provider) {
+ flb_errno();
+ return NULL;
+ }
+
+ pthread_mutex_init(&provider->lock, NULL);
+
+ implementation = flb_calloc(1, sizeof(struct flb_aws_provider_chain));
+
+ if (!implementation) {
+ flb_errno();
+ flb_free(provider);
+ return NULL;
+ }
+
+ provider->provider_vtable = &standard_chain_provider_vtable;
+ provider->implementation = implementation;
+
+ /* Create chain of providers */
+ mk_list_init(&implementation->sub_providers);
+
+ sub_provider = flb_aws_env_provider_create();
+ if (!sub_provider) {
+ /* Env provider will only fail creation if a memory alloc failed */
+ flb_aws_provider_destroy(provider);
+ return NULL;
+ }
+ flb_debug("[aws_credentials] Initialized Env Provider in standard chain");
+
+ mk_list_add(&sub_provider->_head, &implementation->sub_providers);
+
+ flb_debug("[aws_credentials] creating profile %s provider", profile);
+ sub_provider = flb_profile_provider_create(profile);
+ if (sub_provider) {
+ /* Profile provider can fail if HOME env var is not set */;
+ mk_list_add(&sub_provider->_head, &implementation->sub_providers);
+ flb_debug("[aws_credentials] Initialized AWS Profile Provider in "
+ "standard chain");
+ }
+
+ if (eks_irsa == FLB_TRUE) {
+ sub_provider = flb_eks_provider_create(config, tls, region, sts_endpoint, proxy, generator);
+ if (sub_provider) {
+ /* EKS provider can fail if we are not running in k8s */;
+ mk_list_add(&sub_provider->_head, &implementation->sub_providers);
+ flb_debug("[aws_credentials] Initialized EKS Provider in standard chain");
+ }
+ }
+
+ sub_provider = flb_ecs_provider_create(config, generator);
+ if (sub_provider) {
+ /* ECS Provider will fail creation if we are not running in ECS */
+ mk_list_add(&sub_provider->_head, &implementation->sub_providers);
+ flb_debug("[aws_credentials] Initialized ECS Provider in standard chain");
+ }
+
+ sub_provider = flb_ec2_provider_create(config, generator);
+ if (!sub_provider) {
+ /* EC2 provider will only fail creation if a memory alloc failed */
+ flb_aws_provider_destroy(provider);
+ return NULL;
+ }
+ mk_list_add(&sub_provider->_head, &implementation->sub_providers);
+ flb_debug("[aws_credentials] Initialized EC2 Provider in standard chain");
+
+ return provider;
+}
+
+/* Environment Provider */
+struct flb_aws_credentials *get_credentials_fn_environment(struct
+ flb_aws_provider
+ *provider)
+{
+ char *access_key = NULL;
+ char *secret_key = NULL;
+ char *session_token = NULL;
+ struct flb_aws_credentials *creds = NULL;
+
+ flb_debug("[aws_credentials] Requesting credentials from the "
+ "env provider..");
+
+ access_key = getenv(AWS_ACCESS_KEY_ID);
+ if (!access_key || strlen(access_key) <= 0) {
+ return NULL;
+ }
+
+ secret_key = getenv(AWS_SECRET_ACCESS_KEY);
+ if (!secret_key || strlen(secret_key) <= 0) {
+ return NULL;
+ }
+
+ creds = flb_calloc(1, sizeof(struct flb_aws_credentials));
+ if (!creds) {
+ flb_errno();
+ return NULL;
+ }
+
+ creds->access_key_id = flb_sds_create(access_key);
+ if (!creds->access_key_id) {
+ flb_aws_credentials_destroy(creds);
+ flb_errno();
+ return NULL;
+ }
+
+ creds->secret_access_key = flb_sds_create(secret_key);
+ if (!creds->secret_access_key) {
+ flb_aws_credentials_destroy(creds);
+ flb_errno();
+ return NULL;
+ }
+
+ session_token = getenv(AWS_SESSION_TOKEN);
+ if (session_token && strlen(session_token) > 0) {
+ creds->session_token = flb_sds_create(session_token);
+ if (!creds->session_token) {
+ flb_aws_credentials_destroy(creds);
+ flb_errno();
+ return NULL;
+ }
+ } else {
+ creds->session_token = NULL;
+ }
+
+ return creds;
+
+}
+
+int refresh_env(struct flb_aws_provider *provider)
+{
+ char *access_key = NULL;
+ char *secret_key = NULL;
+
+ access_key = getenv(AWS_ACCESS_KEY_ID);
+ if (!access_key || strlen(access_key) <= 0) {
+ return -1;
+ }
+
+ secret_key = getenv(AWS_SECRET_ACCESS_KEY);
+ if (!secret_key || strlen(secret_key) <= 0) {
+ return -1;
+ }
+
+ return 0;
+}
+
+/*
+ * For the env provider, refresh simply checks if the environment
+ * variables are available.
+ */
+int refresh_fn_environment(struct flb_aws_provider *provider)
+{
+ flb_debug("[aws_credentials] Refresh called on the env provider");
+
+ return refresh_env(provider);
+}
+
+int init_fn_environment(struct flb_aws_provider *provider)
+{
+ flb_debug("[aws_credentials] Init called on the env provider");
+
+ return refresh_env(provider);
+}
+
+/*
+ * sync and async are no-ops for the env provider because it does not make
+ * network IO calls
+ */
+void sync_fn_environment(struct flb_aws_provider *provider)
+{
+ return;
+}
+
+void async_fn_environment(struct flb_aws_provider *provider)
+{
+ return;
+}
+
+void upstream_set_fn_environment(struct flb_aws_provider *provider,
+ struct flb_output_instance *ins)
+{
+ return;
+}
+
+/* Destroy is a no-op for the env provider */
+void destroy_fn_environment(struct flb_aws_provider *provider) {
+ return;
+}
+
+static struct flb_aws_provider_vtable environment_provider_vtable = {
+ .get_credentials = get_credentials_fn_environment,
+ .init = init_fn_environment,
+ .refresh = refresh_fn_environment,
+ .destroy = destroy_fn_environment,
+ .sync = sync_fn_environment,
+ .async = async_fn_environment,
+ .upstream_set = upstream_set_fn_environment,
+};
+
+struct flb_aws_provider *flb_aws_env_provider_create() {
+ struct flb_aws_provider *provider = flb_calloc(1, sizeof(
+ struct flb_aws_provider));
+
+ if (!provider) {
+ flb_errno();
+ return NULL;
+ }
+
+ provider->provider_vtable = &environment_provider_vtable;
+ provider->implementation = NULL;
+
+ return provider;
+}
+
+
+void flb_aws_credentials_destroy(struct flb_aws_credentials *creds)
+{
+ if (creds) {
+ if (creds->access_key_id) {
+ flb_sds_destroy(creds->access_key_id);
+ }
+ if (creds->secret_access_key) {
+ flb_sds_destroy(creds->secret_access_key);
+ }
+ if (creds->session_token) {
+ flb_sds_destroy(creds->session_token);
+ }
+
+ flb_free(creds);
+ }
+}
+
+void flb_aws_provider_destroy(struct flb_aws_provider *provider)
+{
+ if (provider) {
+ if (provider->implementation) {
+ provider->provider_vtable->destroy(provider);
+ }
+
+ pthread_mutex_destroy(&provider->lock);
+
+ /* free managed dependencies */
+ if (provider->base_aws_provider) {
+ flb_aws_provider_destroy(provider->base_aws_provider);
+ }
+ if (provider->cred_tls) {
+ flb_tls_destroy(provider->cred_tls);
+ }
+ if (provider->sts_tls) {
+ flb_tls_destroy(provider->sts_tls);
+ }
+
+ flb_free(provider);
+ }
+}
+
+time_t timestamp_to_epoch(const char *timestamp)
+{
+ struct tm tm = {0};
+ time_t seconds;
+ int r;
+
+ r = sscanf(timestamp, "%d-%d-%dT%d:%d:%dZ", &tm.tm_year, &tm.tm_mon,
+ &tm.tm_mday, &tm.tm_hour, &tm.tm_min, &tm.tm_sec);
+ if (r != 6) {
+ return -1;
+ }
+
+ tm.tm_year -= 1900;
+ tm.tm_mon -= 1;
+ tm.tm_isdst = -1;
+ seconds = timegm(&tm);
+ if (seconds < 0) {
+ return -1;
+ }
+
+ return seconds;
+}
+
+time_t flb_aws_cred_expiration(const char *timestamp)
+{
+ time_t now;
+ time_t expiration = timestamp_to_epoch(timestamp);
+ if (expiration < 0) {
+ flb_warn("[aws_credentials] Could not parse expiration: %s", timestamp);
+ return -1;
+ }
+ /*
+ * Sanity check - expiration should be ~10 minutes to 12 hours in the future
+ * (> 12 hours is impossible with the current APIs and would likely indicate
+ * a bug in how this code processes timestamps.)
+ */
+ now = time(NULL);
+ if (expiration < (now + FIVE_MINUTES)) {
+ flb_warn("[aws_credentials] Credential expiration '%s' is less than "
+ "5 minutes in the future.",
+ timestamp);
+ }
+ if (expiration > (now + TWELVE_HOURS)) {
+ flb_warn("[aws_credentials] Credential expiration '%s' is greater than "
+ "12 hours in the future. This should not be possible.",
+ timestamp);
+ }
+ return expiration;
+}
+
+/*
+ * Fluent Bit is now multi-threaded and asynchonous with coros.
+ * The trylock prevents deadlock, and protects the provider
+ * when a cred refresh happens. The refresh frees and
+ * sets the shared cred cache, a double free could occur
+ * if two threads do it at the same exact time.
+ */
+
+/* Like a traditional try lock- it does not block if the lock is not obtained */
+int try_lock_provider(struct flb_aws_provider *provider)
+{
+ int ret = 0;
+ ret = pthread_mutex_trylock(&provider->lock);
+ if (ret != 0) {
+ return FLB_FALSE;
+ }
+ return FLB_TRUE;
+}
+
+void unlock_provider(struct flb_aws_provider *provider)
+{
+ pthread_mutex_unlock(&provider->lock);
+}
diff --git a/fluent-bit/src/aws/flb_aws_credentials_ec2.c b/fluent-bit/src/aws/flb_aws_credentials_ec2.c
new file mode 100644
index 000000000..5d2d8515c
--- /dev/null
+++ b/fluent-bit/src/aws/flb_aws_credentials_ec2.c
@@ -0,0 +1,371 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/flb_http_client.h>
+#include <fluent-bit/flb_aws_credentials.h>
+#include <fluent-bit/flb_aws_util.h>
+#include <fluent-bit/flb_jsmn.h>
+#include <fluent-bit/aws/flb_aws_imds.h>
+
+#include <stdlib.h>
+#include <time.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+
+#define AWS_IMDS_ROLE_PATH "/latest/meta-data/iam/security-credentials/"
+#define AWS_IMDS_ROLE_PATH_LEN 43
+
+struct flb_aws_provider_ec2;
+static int get_creds_ec2(struct flb_aws_provider_ec2 *implementation);
+static int ec2_credentials_request(struct flb_aws_provider_ec2
+ *implementation, char *cred_path);
+
+/* EC2 IMDS Provider */
+
+/*
+ * A provider that obtains credentials from EC2 IMDS.
+ */
+struct flb_aws_provider_ec2 {
+ struct flb_aws_credentials *creds;
+ time_t next_refresh;
+
+ /* upstream connection to IMDS */
+ struct flb_aws_client *client;
+
+ /* IMDS interface */
+ struct flb_aws_imds *imds_interface;
+};
+
+struct flb_aws_credentials *get_credentials_fn_ec2(struct flb_aws_provider
+ *provider)
+{
+ struct flb_aws_credentials *creds;
+ int refresh = FLB_FALSE;
+ struct flb_aws_provider_ec2 *implementation = provider->implementation;
+
+ flb_debug("[aws_credentials] Requesting credentials from the "
+ "EC2 provider..");
+
+ /* a negative next_refresh means that auto-refresh is disabled */
+ if (implementation->next_refresh > 0
+ && time(NULL) > implementation->next_refresh) {
+ refresh = FLB_TRUE;
+ }
+ if (!implementation->creds || refresh == FLB_TRUE) {
+ if (try_lock_provider(provider)) {
+ get_creds_ec2(implementation);
+ unlock_provider(provider);
+ }
+ }
+
+ if (!implementation->creds) {
+ /*
+ * We failed to lock the provider and creds are unset. This means that
+ * another co-routine is performing the refresh.
+ */
+ flb_warn("[aws_credentials] No cached credentials are available and "
+ "a credential refresh is already in progress. The current "
+ "co-routine will retry.");
+
+ return NULL;
+ }
+
+ creds = flb_calloc(1, sizeof(struct flb_aws_credentials));
+ if (!creds) {
+ flb_errno();
+ return NULL;
+ }
+
+ creds->access_key_id = flb_sds_create(implementation->creds->access_key_id);
+ if (!creds->access_key_id) {
+ flb_errno();
+ flb_aws_credentials_destroy(creds);
+ return NULL;
+ }
+
+ creds->secret_access_key = flb_sds_create(implementation->creds->
+ secret_access_key);
+ if (!creds->secret_access_key) {
+ flb_errno();
+ flb_aws_credentials_destroy(creds);
+ return NULL;
+ }
+
+ if (implementation->creds->session_token) {
+ creds->session_token = flb_sds_create(implementation->creds->
+ session_token);
+ if (!creds->session_token) {
+ flb_errno();
+ flb_aws_credentials_destroy(creds);
+ return NULL;
+ }
+
+ } else {
+ creds->session_token = NULL;
+ }
+
+ return creds;
+}
+
+int refresh_fn_ec2(struct flb_aws_provider *provider) {
+ struct flb_aws_provider_ec2 *implementation = provider->implementation;
+ int ret = -1;
+
+ flb_debug("[aws_credentials] Refresh called on the EC2 IMDS provider");
+ if (try_lock_provider(provider)) {
+ ret = get_creds_ec2(implementation);
+ unlock_provider(provider);
+ }
+ return ret;
+}
+
+int init_fn_ec2(struct flb_aws_provider *provider) {
+ struct flb_aws_provider_ec2 *implementation = provider->implementation;
+ int ret = -1;
+
+ implementation->client->debug_only = FLB_TRUE;
+
+ flb_debug("[aws_credentials] Init called on the EC2 IMDS provider");
+ if (try_lock_provider(provider)) {
+ ret = get_creds_ec2(implementation);
+ unlock_provider(provider);
+ }
+
+ implementation->client->debug_only = FLB_FALSE;
+ return ret;
+}
+
+void sync_fn_ec2(struct flb_aws_provider *provider) {
+ struct flb_aws_provider_ec2 *implementation = provider->implementation;
+
+ flb_debug("[aws_credentials] Sync called on the EC2 provider");
+ /* remove async flag */
+ flb_stream_disable_async_mode(&implementation->client->upstream->base);
+}
+
+void async_fn_ec2(struct flb_aws_provider *provider) {
+ struct flb_aws_provider_ec2 *implementation = provider->implementation;
+
+ flb_debug("[aws_credentials] Async called on the EC2 provider");
+ /* add async flag */
+ flb_stream_enable_async_mode(&implementation->client->upstream->base);
+}
+
+void upstream_set_fn_ec2(struct flb_aws_provider *provider,
+ struct flb_output_instance *ins) {
+ struct flb_aws_provider_ec2 *implementation = provider->implementation;
+
+ flb_debug("[aws_credentials] upstream_set called on the EC2 provider");
+ /* Make sure TLS is set to false before setting upstream, then reset it */
+ ins->use_tls = FLB_FALSE;
+ flb_output_upstream_set(implementation->client->upstream, ins);
+ ins->use_tls = FLB_TRUE;
+}
+
+void destroy_fn_ec2(struct flb_aws_provider *provider) {
+ struct flb_aws_provider_ec2 *implementation = provider->implementation;
+
+ if (implementation) {
+ if (implementation->creds) {
+ flb_aws_credentials_destroy(implementation->creds);
+ }
+
+ if (implementation->imds_interface) {
+ flb_aws_imds_destroy(implementation->imds_interface);
+ }
+
+ if (implementation->client) {
+ flb_aws_client_destroy(implementation->client);
+ }
+
+ flb_free(implementation);
+ provider->implementation = NULL;
+ }
+
+ return;
+}
+
+static struct flb_aws_provider_vtable ec2_provider_vtable = {
+ .get_credentials = get_credentials_fn_ec2,
+ .init = init_fn_ec2,
+ .refresh = refresh_fn_ec2,
+ .destroy = destroy_fn_ec2,
+ .sync = sync_fn_ec2,
+ .async = async_fn_ec2,
+ .upstream_set = upstream_set_fn_ec2,
+};
+
+struct flb_aws_provider *flb_ec2_provider_create(struct flb_config *config,
+ struct
+ flb_aws_client_generator
+ *generator)
+{
+ struct flb_aws_provider_ec2 *implementation;
+ struct flb_aws_provider *provider;
+ struct flb_upstream *upstream;
+
+ provider = flb_calloc(1, sizeof(struct flb_aws_provider));
+
+ if (!provider) {
+ flb_errno();
+ return NULL;
+ }
+
+ pthread_mutex_init(&provider->lock, NULL);
+
+ implementation = flb_calloc(1, sizeof(struct flb_aws_provider_ec2));
+
+ if (!implementation) {
+ flb_free(provider);
+ flb_errno();
+ return NULL;
+ }
+
+ provider->provider_vtable = &ec2_provider_vtable;
+ provider->implementation = implementation;
+
+ upstream = flb_upstream_create(config, FLB_AWS_IMDS_HOST, FLB_AWS_IMDS_PORT,
+ FLB_IO_TCP, NULL);
+ if (!upstream) {
+ flb_aws_provider_destroy(provider);
+ flb_debug("[aws_credentials] unable to connect to EC2 IMDS.");
+ return NULL;
+ }
+
+ /* IMDSv2 token request will timeout if hops = 1 and running within container */
+ upstream->base.net.connect_timeout = FLB_AWS_IMDS_TIMEOUT;
+ upstream->base.net.io_timeout = FLB_AWS_IMDS_TIMEOUT;
+ upstream->base.net.keepalive = FLB_FALSE; /* On timeout, the connection is broken */
+
+ implementation->client = generator->create();
+ if (!implementation->client) {
+ flb_aws_provider_destroy(provider);
+ flb_upstream_destroy(upstream);
+ flb_error("[aws_credentials] EC2 IMDS: client creation error");
+ return NULL;
+ }
+ implementation->client->name = "ec2_imds_provider_client";
+ implementation->client->has_auth = FLB_FALSE;
+ implementation->client->provider = NULL;
+ implementation->client->region = NULL;
+ implementation->client->service = NULL;
+ implementation->client->port = 80;
+ implementation->client->flags = 0;
+ implementation->client->proxy = NULL;
+ implementation->client->upstream = upstream;
+
+ /* Use default imds configuration */
+ implementation->imds_interface = flb_aws_imds_create(&flb_aws_imds_config_default,
+ implementation->client);
+ if (!implementation->imds_interface) {
+ flb_aws_provider_destroy(provider);
+ flb_error("[aws_credentials] EC2 IMDS configuration error");
+ return NULL;
+ }
+
+ return provider;
+}
+
+/* Requests creds from IMDSv1 and sets them on the provider */
+static int get_creds_ec2(struct flb_aws_provider_ec2 *implementation)
+{
+ int ret;
+ flb_sds_t instance_role;
+ size_t instance_role_len;
+ char *cred_path;
+ size_t cred_path_size;
+
+ flb_debug("[aws_credentials] requesting credentials from EC2 IMDS");
+
+ /* Get the name of the instance role */
+ ret = flb_aws_imds_request(implementation->imds_interface, AWS_IMDS_ROLE_PATH,
+ &instance_role, &instance_role_len);
+
+ if (ret < 0) {
+ return -1;
+ }
+
+ flb_debug("[aws_credentials] Requesting credentials for instance role %s",
+ instance_role);
+
+ /* Construct path where we will find the credentials */
+ cred_path_size = sizeof(char) * (AWS_IMDS_ROLE_PATH_LEN +
+ instance_role_len) + 1;
+ cred_path = flb_malloc(cred_path_size);
+ if (!cred_path) {
+ flb_sds_destroy(instance_role);
+ flb_errno();
+ return -1;
+ }
+
+ ret = snprintf(cred_path, cred_path_size, "%s%s", AWS_IMDS_ROLE_PATH,
+ instance_role);
+ if (ret < 0) {
+ flb_sds_destroy(instance_role);
+ flb_free(cred_path);
+ flb_errno();
+ return -1;
+ }
+
+ /* request creds */
+ ret = ec2_credentials_request(implementation, cred_path);
+
+ flb_sds_destroy(instance_role);
+ flb_free(cred_path);
+ return ret;
+
+}
+
+static int ec2_credentials_request(struct flb_aws_provider_ec2
+ *implementation, char *cred_path)
+{
+ int ret;
+ flb_sds_t credentials_response;
+ size_t credentials_response_len;
+ struct flb_aws_credentials *creds;
+ time_t expiration;
+
+ ret = flb_aws_imds_request(implementation->imds_interface, cred_path,
+ &credentials_response, &credentials_response_len);
+
+ if (ret < 0) {
+ return -1;
+ }
+
+ creds = flb_parse_http_credentials(credentials_response,
+ credentials_response_len,
+ &expiration);
+
+ if (creds == NULL) {
+ flb_sds_destroy(credentials_response);
+ return -1;
+ }
+
+ /* destroy existing credentials first */
+ flb_aws_credentials_destroy(implementation->creds);
+ implementation->creds = NULL;
+ /* set new creds */
+ implementation->creds = creds;
+ implementation->next_refresh = expiration - FLB_AWS_REFRESH_WINDOW;
+
+ flb_sds_destroy(credentials_response);
+ return 0;
+}
diff --git a/fluent-bit/src/aws/flb_aws_credentials_http.c b/fluent-bit/src/aws/flb_aws_credentials_http.c
new file mode 100644
index 000000000..c08b6b559
--- /dev/null
+++ b/fluent-bit/src/aws/flb_aws_credentials_http.c
@@ -0,0 +1,566 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/flb_http_client.h>
+#include <fluent-bit/flb_aws_credentials.h>
+#include <fluent-bit/flb_aws_util.h>
+
+#include <fluent-bit/flb_jsmn.h>
+#include <stdlib.h>
+#include <time.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+
+#define AWS_CREDENTIAL_RESPONSE_ACCESS_KEY "AccessKeyId"
+#define AWS_CREDENTIAL_RESPONSE_SECRET_KEY "SecretAccessKey"
+#define AWS_HTTP_RESPONSE_TOKEN "Token"
+#define AWS_CREDENTIAL_RESPONSE_EXPIRATION "Expiration"
+
+#define ECS_CREDENTIALS_HOST "169.254.170.2"
+#define ECS_CREDENTIALS_HOST_LEN 13
+#define ECS_CREDENTIALS_PATH_ENV_VAR "AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"
+
+
+/* Declarations */
+struct flb_aws_provider_http;
+static int http_credentials_request(struct flb_aws_provider_http
+ *implementation);
+
+
+/*
+ * HTTP Credentials Provider - retrieve credentials from a local http server
+ * Used to implement the ECS Credentials provider.
+ * Equivalent to:
+ * https://github.com/aws/aws-sdk-go/tree/master/aws/credentials/endpointcreds
+ */
+
+struct flb_aws_provider_http {
+ struct flb_aws_credentials *creds;
+ time_t next_refresh;
+
+ struct flb_aws_client *client;
+
+ /* Host and Path to request credentials */
+ flb_sds_t host;
+ flb_sds_t path;
+};
+
+
+struct flb_aws_credentials *get_credentials_fn_http(struct flb_aws_provider
+ *provider)
+{
+ struct flb_aws_credentials *creds = NULL;
+ int refresh = FLB_FALSE;
+ struct flb_aws_provider_http *implementation = provider->implementation;
+
+ flb_debug("[aws_credentials] Retrieving credentials from the "
+ "HTTP provider..");
+
+ /* a negative next_refresh means that auto-refresh is disabled */
+ if (implementation->next_refresh > 0
+ && time(NULL) > implementation->next_refresh) {
+ refresh = FLB_TRUE;
+ }
+ if (!implementation->creds || refresh == FLB_TRUE) {
+ if (try_lock_provider(provider)) {
+ http_credentials_request(implementation);
+ unlock_provider(provider);
+ }
+ }
+
+ if (!implementation->creds) {
+ /*
+ * We failed to lock the provider and creds are unset. This means that
+ * another co-routine is performing the refresh.
+ */
+ flb_warn("[aws_credentials] No cached credentials are available and "
+ "a credential refresh is already in progress. The current "
+ "co-routine will retry.");
+
+ return NULL;
+ }
+
+ creds = flb_calloc(1, sizeof(struct flb_aws_credentials));
+ if (!creds) {
+ flb_errno();
+ goto error;
+ }
+
+ creds->access_key_id = flb_sds_create(implementation->creds->access_key_id);
+ if (!creds->access_key_id) {
+ flb_errno();
+ goto error;
+ }
+
+ creds->secret_access_key = flb_sds_create(implementation->creds->
+ secret_access_key);
+ if (!creds->secret_access_key) {
+ flb_errno();
+ goto error;
+ }
+
+ if (implementation->creds->session_token) {
+ creds->session_token = flb_sds_create(implementation->creds->
+ session_token);
+ if (!creds->session_token) {
+ flb_errno();
+ goto error;
+ }
+
+ } else {
+ creds->session_token = NULL;
+ }
+
+ return creds;
+
+error:
+ flb_aws_credentials_destroy(creds);
+ return NULL;
+}
+
+int refresh_fn_http(struct flb_aws_provider *provider) {
+ struct flb_aws_provider_http *implementation = provider->implementation;
+ int ret = -1;
+ flb_debug("[aws_credentials] Refresh called on the http provider");
+
+ if (try_lock_provider(provider)) {
+ ret = http_credentials_request(implementation);
+ unlock_provider(provider);
+ }
+ return ret;
+}
+
+int init_fn_http(struct flb_aws_provider *provider) {
+ struct flb_aws_provider_http *implementation = provider->implementation;
+ int ret = -1;
+ flb_debug("[aws_credentials] Init called on the http provider");
+
+ implementation->client->debug_only = FLB_TRUE;
+
+ if (try_lock_provider(provider)) {
+ ret = http_credentials_request(implementation);
+ unlock_provider(provider);
+ }
+
+ implementation->client->debug_only = FLB_FALSE;
+
+ return ret;
+}
+
+void sync_fn_http(struct flb_aws_provider *provider) {
+ struct flb_aws_provider_http *implementation = provider->implementation;
+
+ flb_debug("[aws_credentials] Sync called on the http provider");
+ /* remove async flag */
+ flb_stream_disable_async_mode(&implementation->client->upstream->base);
+}
+
+void async_fn_http(struct flb_aws_provider *provider) {
+ struct flb_aws_provider_http *implementation = provider->implementation;
+
+ flb_debug("[aws_credentials] Async called on the http provider");
+ /* add async flag */
+ flb_stream_enable_async_mode(&implementation->client->upstream->base);
+}
+
+void upstream_set_fn_http(struct flb_aws_provider *provider,
+ struct flb_output_instance *ins) {
+ struct flb_aws_provider_http *implementation = provider->implementation;
+
+ flb_debug("[aws_credentials] upstream_set called on the http provider");
+ /* Make sure TLS is set to false before setting upstream, then reset it */
+ ins->use_tls = FLB_FALSE;
+ flb_output_upstream_set(implementation->client->upstream, ins);
+ ins->use_tls = FLB_TRUE;
+}
+
+void destroy_fn_http(struct flb_aws_provider *provider) {
+ struct flb_aws_provider_http *implementation = provider->implementation;
+
+ if (implementation) {
+ if (implementation->creds) {
+ flb_aws_credentials_destroy(implementation->creds);
+ }
+
+ if (implementation->client) {
+ flb_aws_client_destroy(implementation->client);
+ }
+
+ if (implementation->host) {
+ flb_sds_destroy(implementation->host);
+ }
+
+ if (implementation->path) {
+ flb_sds_destroy(implementation->path);
+ }
+
+ flb_free(implementation);
+ provider->implementation = NULL;
+ }
+
+ return;
+}
+
+static struct flb_aws_provider_vtable http_provider_vtable = {
+ .get_credentials = get_credentials_fn_http,
+ .init = init_fn_http,
+ .refresh = refresh_fn_http,
+ .destroy = destroy_fn_http,
+ .sync = sync_fn_http,
+ .async = async_fn_http,
+ .upstream_set = upstream_set_fn_http,
+};
+
+struct flb_aws_provider *flb_http_provider_create(struct flb_config *config,
+ flb_sds_t host,
+ flb_sds_t path,
+ struct
+ flb_aws_client_generator
+ *generator)
+{
+ struct flb_aws_provider_http *implementation = NULL;
+ struct flb_aws_provider *provider = NULL;
+ struct flb_upstream *upstream = NULL;
+
+ flb_debug("[aws_credentials] Configuring HTTP provider with %s:80%s",
+ host, path);
+
+ provider = flb_calloc(1, sizeof(struct flb_aws_provider));
+
+ if (!provider) {
+ flb_errno();
+ return NULL;
+ }
+
+ pthread_mutex_init(&provider->lock, NULL);
+
+ implementation = flb_calloc(1, sizeof(struct flb_aws_provider_http));
+
+ if (!implementation) {
+ flb_free(provider);
+ flb_errno();
+ return NULL;
+ }
+
+ provider->provider_vtable = &http_provider_vtable;
+ provider->implementation = implementation;
+
+ implementation->host = host;
+ implementation->path = path;
+
+ upstream = flb_upstream_create(config, host, 80, FLB_IO_TCP, NULL);
+
+ if (!upstream) {
+ flb_aws_provider_destroy(provider);
+ flb_error("[aws_credentials] HTTP Provider: connection initialization "
+ "error");
+ return NULL;
+ }
+
+ upstream->base.net.connect_timeout = FLB_AWS_CREDENTIAL_NET_TIMEOUT;
+
+ implementation->client = generator->create();
+ if (!implementation->client) {
+ flb_aws_provider_destroy(provider);
+ flb_upstream_destroy(upstream);
+ flb_error("[aws_credentials] HTTP Provider: client creation error");
+ return NULL;
+ }
+ implementation->client->name = "http_provider_client";
+ implementation->client->has_auth = FLB_FALSE;
+ implementation->client->provider = NULL;
+ implementation->client->region = NULL;
+ implementation->client->service = NULL;
+ implementation->client->port = 80;
+ implementation->client->flags = 0;
+ implementation->client->proxy = NULL;
+ implementation->client->upstream = upstream;
+
+ return provider;
+}
+
+/*
+ * ECS Provider
+ * The ECS Provider is just a wrapper around the HTTP Provider
+ * with the ECS credentials endpoint.
+ */
+
+ struct flb_aws_provider *flb_ecs_provider_create(struct flb_config *config,
+ struct
+ flb_aws_client_generator
+ *generator)
+{
+ flb_sds_t host = NULL;
+ flb_sds_t path = NULL;
+ char *path_var = NULL;
+
+ host = flb_sds_create_len(ECS_CREDENTIALS_HOST, ECS_CREDENTIALS_HOST_LEN);
+ if (!host) {
+ flb_errno();
+ return NULL;
+ }
+
+ path_var = getenv(ECS_CREDENTIALS_PATH_ENV_VAR);
+ if (path_var && strlen(path_var) > 0) {
+ path = flb_sds_create(path_var);
+ if (!path) {
+ flb_errno();
+ flb_free(host);
+ return NULL;
+ }
+
+ return flb_http_provider_create(config, host, path, generator);
+ } else {
+ flb_debug("[aws_credentials] Not initializing ECS Provider because"
+ " %s is not set", ECS_CREDENTIALS_PATH_ENV_VAR);
+ flb_sds_destroy(host);
+ return NULL;
+ }
+
+}
+
+static int http_credentials_request(struct flb_aws_provider_http
+ *implementation)
+{
+ char *response = NULL;
+ size_t response_len;
+ time_t expiration;
+ struct flb_aws_credentials *creds = NULL;
+ struct flb_aws_client *client = implementation->client;
+ struct flb_http_client *c = NULL;
+
+ c = client->client_vtable->request(client, FLB_HTTP_GET,
+ implementation->path, NULL, 0,
+ NULL, 0);
+
+ if (!c || c->resp.status != 200) {
+ flb_debug("[aws_credentials] http credentials request failed");
+ if (c) {
+ flb_http_client_destroy(c);
+ }
+ return -1;
+ }
+
+ response = c->resp.payload;
+ response_len = c->resp.payload_size;
+
+ creds = flb_parse_http_credentials(response, response_len, &expiration);
+ if (!creds) {
+ flb_http_client_destroy(c);
+ return -1;
+ }
+
+ /* destroy existing credentials */
+ flb_aws_credentials_destroy(implementation->creds);
+ implementation->creds = NULL;
+
+ implementation->creds = creds;
+ implementation->next_refresh = expiration - FLB_AWS_REFRESH_WINDOW;
+ flb_http_client_destroy(c);
+ return 0;
+}
+
+/*
+ * All HTTP credentials endpoints (IMDS, ECS, custom) follow the same spec:
+ * {
+ * "AccessKeyId": "ACCESS_KEY_ID",
+ * "Expiration": "2019-12-18T21:27:58Z",
+ * "SecretAccessKey": "SECRET_ACCESS_KEY",
+ * "Token": "SECURITY_TOKEN_STRING"
+ * }
+ * (some implementations (IMDS) have additional fields)
+ * Returns NULL if any part of parsing was unsuccessful.
+ */
+struct flb_aws_credentials *flb_parse_http_credentials(char *response,
+ size_t response_len,
+ time_t *expiration)
+{
+ return flb_parse_json_credentials(response, response_len, AWS_HTTP_RESPONSE_TOKEN,
+ expiration);
+}
+
+struct flb_aws_credentials *flb_parse_json_credentials(char *response,
+ size_t response_len,
+ char* session_token_field,
+ time_t *expiration)
+{
+ jsmntok_t *tokens = NULL;
+ const jsmntok_t *t = NULL;
+ char *current_token = NULL;
+ jsmn_parser parser;
+ int tokens_size = 50;
+ size_t size;
+ int ret;
+ struct flb_aws_credentials *creds = NULL;
+ int i = 0;
+ int len;
+ flb_sds_t tmp;
+
+ /*
+ * Remove/reset existing value of expiration.
+ * Expiration should be in the response, but it is not
+ * strictly speaking needed. Fluent Bit logs a warning if it is missing.
+ */
+ *expiration = -1;
+
+ jsmn_init(&parser);
+
+ size = sizeof(jsmntok_t) * tokens_size;
+ tokens = flb_calloc(1, size);
+ if (!tokens) {
+ goto error;
+ }
+
+ ret = jsmn_parse(&parser, response, response_len,
+ tokens, tokens_size);
+
+ if (ret == JSMN_ERROR_INVAL || ret == JSMN_ERROR_PART) {
+ flb_error("[aws_credentials] Could not parse credentials response"
+ " - invalid JSON.");
+ goto error;
+ }
+
+ /* Shouldn't happen, but just in case, check for too many tokens error */
+ if (ret == JSMN_ERROR_NOMEM) {
+ flb_error("[aws_credentials] Could not parse credentials response"
+ " - response contained more tokens than expected.");
+ goto error;
+ }
+
+ /* return value is number of tokens parsed */
+ tokens_size = ret;
+
+ creds = flb_calloc(1, sizeof(struct flb_aws_credentials));
+ if (!creds) {
+ flb_errno();
+ goto error;
+ }
+
+ /*
+ * jsmn will create an array of tokens like:
+ * key, value, key, value
+ */
+ while (i < (tokens_size - 1)) {
+ t = &tokens[i];
+
+ if (t->start == -1 || t->end == -1 || (t->start == 0 && t->end == 0)) {
+ break;
+ }
+
+ if (t->type == JSMN_STRING) {
+ current_token = &response[t->start];
+ len = t->end - t->start;
+
+ if (strncmp(current_token, AWS_CREDENTIAL_RESPONSE_ACCESS_KEY, len) == 0)
+ {
+ i++;
+ t = &tokens[i];
+ current_token = &response[t->start];
+ len = t->end - t->start;
+ if (creds->access_key_id != NULL) {
+ flb_error("Trying to double allocate access_key_id");
+ goto error;
+ }
+ creds->access_key_id = flb_sds_create_len(current_token, len);
+ if (!creds->access_key_id) {
+ flb_errno();
+ goto error;
+ }
+ continue;
+ }
+ if (strncmp(current_token, AWS_CREDENTIAL_RESPONSE_SECRET_KEY, len) == 0)
+ {
+ i++;
+ t = &tokens[i];
+ current_token = &response[t->start];
+ len = t->end - t->start;
+ if (creds->secret_access_key != NULL) {
+ flb_error("Trying to double allocate secret_access_key");
+ goto error;
+ }
+ creds->secret_access_key = flb_sds_create_len(current_token,
+ len);
+ if (!creds->secret_access_key) {
+ flb_errno();
+ goto error;
+ }
+ continue;
+ }
+ if (strncmp(current_token, session_token_field, len) == 0) {
+ i++;
+ t = &tokens[i];
+ current_token = &response[t->start];
+ len = t->end - t->start;
+ if (creds->session_token != NULL) {
+ flb_error("Trying to double allocate session_token");
+ goto error;
+ }
+ creds->session_token = flb_sds_create_len(current_token, len);
+ if (!creds->session_token) {
+ flb_errno();
+ goto error;
+ }
+ continue;
+ }
+ if (strncmp(current_token, AWS_CREDENTIAL_RESPONSE_EXPIRATION, len) == 0)
+ {
+ i++;
+ t = &tokens[i];
+ current_token = &response[t->start];
+ len = t->end - t->start;
+ tmp = flb_sds_create_len(current_token, len);
+ if (!tmp) {
+ flb_errno();
+ goto error;
+ }
+ *expiration = flb_aws_cred_expiration(tmp);
+ flb_sds_destroy(tmp);
+ if (*expiration < 0) {
+ flb_warn("[aws_credentials] '%s' was invalid or "
+ "could not be parsed. Disabling auto-refresh of "
+ "credentials.", AWS_CREDENTIAL_RESPONSE_EXPIRATION);
+ }
+ }
+ }
+
+ i++;
+ }
+
+ if (creds->access_key_id == NULL) {
+ flb_error("[aws_credentials] Missing %s field in"
+ "credentials response", AWS_CREDENTIAL_RESPONSE_ACCESS_KEY);
+ goto error;
+ }
+
+ if (creds->secret_access_key == NULL) {
+ flb_error("[aws_credentials] Missing %s field in"
+ "credentials response", AWS_CREDENTIAL_RESPONSE_SECRET_KEY);
+ goto error;
+ }
+
+ flb_free(tokens);
+ return creds;
+
+error:
+ flb_aws_credentials_destroy(creds);
+ flb_free(tokens);
+ return NULL;
+}
diff --git a/fluent-bit/src/aws/flb_aws_credentials_log.h b/fluent-bit/src/aws/flb_aws_credentials_log.h
new file mode 100644
index 000000000..6e9f0806d
--- /dev/null
+++ b/fluent-bit/src/aws/flb_aws_credentials_log.h
@@ -0,0 +1,39 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2021 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_AWS_CREDENTIALS_LOG_H
+
+#define FLB_AWS_CREDENTIALS_LOG_H
+
+#include <fluent-bit/flb_log.h>
+
+#define AWS_CREDS_ERROR(format, ...) flb_error("[aws_credentials] " format, ##__VA_ARGS__)
+#define AWS_CREDS_WARN(format, ...) flb_warn("[aws_credentials] " format, ##__VA_ARGS__)
+#define AWS_CREDS_DEBUG(format, ...) flb_debug("[aws_credentials] " format, ##__VA_ARGS__)
+
+#define AWS_CREDS_ERROR_OR_DEBUG(debug_only, format, ...) do {\
+ if (debug_only == FLB_TRUE) {\
+ AWS_CREDS_DEBUG(format, ##__VA_ARGS__);\
+ }\
+ else {\
+ AWS_CREDS_ERROR(format, ##__VA_ARGS__);\
+ }\
+} while (0)
+
+#endif
diff --git a/fluent-bit/src/aws/flb_aws_credentials_process.c b/fluent-bit/src/aws/flb_aws_credentials_process.c
new file mode 100644
index 000000000..44c024ca7
--- /dev/null
+++ b/fluent-bit/src/aws/flb_aws_credentials_process.c
@@ -0,0 +1,783 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2021 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_aws_credentials.h>
+
+#include "flb_aws_credentials_log.h"
+
+#include <fluent-bit/flb_compat.h>
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_pipe.h>
+#include <fluent-bit/flb_time.h>
+
+#include <fcntl.h>
+#include <poll.h>
+#include <stdlib.h>
+#include <sys/wait.h>
+
+#define DEV_NULL "/dev/null"
+
+#define MS_PER_SEC 1000
+#define MICROS_PER_MS 1000
+#define NS_PER_MS 1000000
+
+#define CREDENTIAL_PROCESS_TIMEOUT_MS 60000
+#define CREDENTIAL_PROCESS_BUFFER_SIZE 8 * 1024
+
+#define WAITPID_POLL_FREQUENCY_MS 20
+#define WAITPID_TIMEOUT_MS 10 * WAITPID_POLL_FREQUENCY_MS
+
+#define CREDENTIAL_PROCESS_RESPONSE_SESSION_TOKEN "SessionToken"
+
+/* Declarations */
+struct token_array;
+static int new_token_array(struct token_array *arr, int cap);
+static int append_token(struct token_array *arr, char* elem);
+
+struct readbuf;
+static int new_readbuf(struct readbuf* buf, int cap);
+
+static int get_monotonic_time(struct flb_time* tm);
+
+static char* ltrim(char* input);
+static int scan_credential_process_token_quoted(char *input);
+static int scan_credential_process_token_unquoted(char *input);
+static int credential_process_token_count(char* process);
+static int parse_credential_process_token(char **input, char** out_token);
+
+static int read_until_block(char* name, flb_pipefd_t fd, struct readbuf* buf);
+static int waitpid_timeout(char* name, pid_t pid, int* wstatus);
+
+struct process;
+static int new_process(struct process* p, char** args);
+static void exec_process_child(struct process* p);
+static int exec_process(struct process* p);
+static int read_from_process(struct process* p, struct readbuf* buf);
+static int wait_process(struct process* p);
+static void destroy_process(struct process* p);
+/* End Declarations */
+
+struct token_array {
+ char** tokens;
+ int len;
+ int cap;
+};
+
+/*
+ * Initializes a new token array with the given capacity.
+ * Returns 0 on success and < 0 on failure.
+ * The caller is responsible for calling `flb_free(arr->tokens)`.
+ */
+static int new_token_array(struct token_array *arr, int cap)
+{
+ *arr = (struct token_array) { .len = 0, .cap = cap };
+ arr->tokens = flb_malloc(cap * sizeof(char*));
+ if (!arr->tokens) {
+ flb_errno();
+ return -1;
+ }
+ return 0;
+}
+
+/*
+ * Appends the given token to the array, if there is capacity.
+ * Returns 0 on success and < 0 on failure.
+ */
+static int append_token(struct token_array *arr, char* token)
+{
+ if (arr->len >= arr->cap) {
+ /* This means there is a bug in credential_process_token_count. */
+ AWS_CREDS_ERROR("append_token called on full token_array");
+ return -1;
+ }
+
+ (arr->tokens)[arr->len] = token;
+ arr->len++;
+ return 0;
+}
+
+struct readbuf {
+ char* buf;
+ int len;
+ int cap;
+};
+
+/*
+ * Initializes a new buffer with the given capacity.
+ * Returns 0 on success and < 0 on failure.
+ * The caller is responsible for calling `flb_free(buf->buf)`.
+ */
+static int new_readbuf(struct readbuf* buf, int cap)
+{
+ *buf = (struct readbuf) { .len = 0, .cap = cap };
+ buf->buf = flb_malloc(cap * sizeof(char));
+ if (!buf->buf) {
+ flb_errno();
+ return -1;
+ }
+ return 0;
+}
+
+/*
+ * Fetches the current time from the monotonic clock.
+ * Returns 0 on success and < 0 on failure.
+ * This is useful for calculating deadlines that are not sensitive to changes
+ * in the system clock.
+ */
+static int get_monotonic_time(struct flb_time* tm)
+{
+ struct timespec ts;
+ if (clock_gettime(CLOCK_MONOTONIC, &ts) < 0) {
+ flb_errno();
+ return -1;
+ }
+ flb_time_set(tm, ts.tv_sec, ts.tv_nsec);
+ return 0;
+}
+
+/*
+ * Skips over any leading spaces in the input string, returning the remainder.
+ * If the entire string is consumed, returns the empty string (not NULL).
+ */
+static char* ltrim(char* input)
+{
+ while (*input == ' ') {
+ input++;
+ }
+ return input;
+}
+
+/*
+ * Scans the unquoted token string at the start of the input string.
+ * The input must be the start of an unquoted token.
+ * Returns the token length on success, and < 0 on failure.
+ * This function does not add a null terminator to the token.
+ * The token length is the index where the null terminator must be placed.
+ * If the entire input is consumed, returns the length of the input string
+ * (excluding the null terminator).
+ */
+static int scan_credential_process_token_unquoted(char *input)
+{
+ int i;
+
+ for (i = 0; input[i] != ' '; i++) {
+ if (input[i] == '\0') {
+ break;
+ }
+ if (input[i] == '"') {
+ AWS_CREDS_ERROR("unexpected quote in credential_process");
+ return -1;
+ }
+ }
+
+ return i;
+}
+
+/*
+ * Scans the quoted token at the start of the input string.
+ * The input must be the string after the opening quote.
+ * Returns the token length on success, and < 0 on failure.
+ * This function does not add a null terminator to the token.
+ * The token length is the index where the null terminator must be placed.
+ */
+static int scan_credential_process_token_quoted(char *input)
+{
+ int i;
+
+ for (i = 0; input[i] != '"'; i++) {
+ if (input[i] == '\0') {
+ AWS_CREDS_ERROR("unterminated quote in credential_process");
+ return -1;
+ }
+ }
+
+ if (input[i+1] != '\0' && input[i+1] != ' ') {
+ AWS_CREDS_ERROR("unexpected character %c after closing quote in "
+ "credential_process", input[i+1]);
+ return -1;
+ }
+
+ return i;
+}
+
+/*
+ * Counts the number of tokens in the input string, which is assumed to be the
+ * credential_process from the config file.
+ * Returns < 0 on failure.
+ */
+static int credential_process_token_count(char* process)
+{
+ int count = 0;
+ int i;
+
+ while (1) {
+ process = ltrim(process);
+ if (*process == '\0') {
+ break;
+ }
+
+ count++;
+
+ if (*process == '"') {
+ process++;
+ i = scan_credential_process_token_quoted(process);
+ }
+ else {
+ i = scan_credential_process_token_unquoted(process);
+ }
+
+ if (i < 0) {
+ return -1;
+ }
+
+ process += i;
+ if (*process != '\0') {
+ process++;
+ }
+ }
+
+ return count;
+}
+
+/*
+ * Parses the input string, which is assumed to be the credential_process
+ * from the config file. The next token will be put in *out_token, and the
+ * remaining unprocessed input will be put in *input.
+ * Returns 0 on success and < 0 on failure.
+ * If there is an error, the value of *input and *out_token is not defined.
+ * If it succeeds and *out_token is NULL, then there are no more tokens,
+ * and this function should not be called again.
+ * *out_token will be some substring of the original *input, so it should not
+ * be freed.
+ */
+static int parse_credential_process_token(char** input, char** out_token)
+{
+ *out_token = NULL;
+ int i;
+
+ if (!*input) {
+ AWS_CREDS_ERROR("parse_credential_process_token called after yielding last token");
+ return -1;
+ }
+
+ *input = ltrim(*input);
+
+ if (**input == '\0') {
+ *input = NULL;
+ *out_token = NULL;
+ return 0;
+ }
+
+ if (**input == '"') {
+ (*input)++;
+ i = scan_credential_process_token_quoted(*input);
+ }
+ else {
+ i = scan_credential_process_token_unquoted(*input);
+ }
+
+ if (i < 0) {
+ return -1;
+ }
+
+ *out_token = *input;
+ *input += i;
+
+ if (**input != '\0') {
+ **input = '\0';
+ (*input)++;
+ }
+
+ return 0;
+}
+
+/* See <fluent-bit/flb_aws_credentials.h>. */
+char** parse_credential_process(char* input)
+{
+ char* next_token = NULL;
+ struct token_array arr = { 0 };
+ int token_count = credential_process_token_count(input);
+
+ if (token_count < 0) {
+ goto error;
+ }
+
+ /* Add one extra capacity for the NULL terminator. */
+ if (new_token_array(&arr, token_count + 1) < 0) {
+ goto error;
+ }
+
+ while (1) {
+ if (parse_credential_process_token(&input, &next_token) < 0) {
+ goto error;
+ }
+
+ if (!next_token) {
+ break;
+ }
+
+ if (append_token(&arr, next_token) < 0) {
+ goto error;
+ }
+ }
+
+ if (append_token(&arr, NULL) < 0) {
+ goto error;
+ }
+
+ return arr.tokens;
+
+error:
+ flb_free(arr.tokens);
+ return NULL;
+}
+
+/*
+ * Reads from the pipe into the buffer until no more input is available.
+ * If the input is exhausted (EOF), returns 0.
+ * If reading would block (EWOULDBLOCK/EAGAIN), returns > 0.
+ * If an error occurs or the buffer is full, returns < 0.
+ */
+static int read_until_block(char* name, flb_pipefd_t fd, struct readbuf* buf)
+{
+ int result = -1;
+
+ while (1) {
+ if (buf->len >= buf->cap) {
+ AWS_CREDS_ERROR("credential_process %s exceeded max buffer size", name);
+ return -1;
+ }
+
+ result = flb_pipe_r(fd, buf->buf + buf->len, buf->cap - buf->len);
+ if (result < 0) {
+ if (FLB_PIPE_WOULDBLOCK()) {
+ return 1;
+ }
+ flb_errno();
+ return -1;
+ }
+ else if (result == 0) { /* EOF */
+ return 0;
+ }
+ else {
+ buf->len += result;
+ }
+ }
+}
+
+/*
+ * Polls waitpid until the given process exits, or the timeout is reached.
+ * Returns 0 on success and < 0 on failure.
+ */
+static int waitpid_timeout(char* name, pid_t pid, int* wstatus)
+{
+ int result = -1;
+ int retries = WAITPID_TIMEOUT_MS / WAITPID_POLL_FREQUENCY_MS;
+
+ while (1) {
+ result = waitpid(pid, wstatus, WNOHANG);
+ if (result < 0) {
+ flb_errno();
+ return -1;
+ }
+
+ if (result > 0) {
+ return 0;
+ }
+
+ if (retries <= 0) {
+ AWS_CREDS_ERROR("timed out waiting for credential_process %s to exit", name);
+ return -1;
+ }
+ retries--;
+
+ usleep(WAITPID_POLL_FREQUENCY_MS * MICROS_PER_MS);
+ }
+}
+
+struct process {
+ int initialized;
+ char** args;
+ int stdin_stream;
+ flb_pipefd_t stdout_stream[2];
+ int stderr_stream;
+ pid_t pid;
+};
+
+/*
+ * Initializes a new process with the given args.
+ * args is assumed to be a NULL terminated array, for use with execvp.
+ * It must have a least one element, and the first element is assumed to be the
+ * name/path of the executable.
+ * Returns 0 on success and < 0 on failure.
+ * The caller is responsible for calling `destroy_process(p)`.
+ */
+static int new_process(struct process* p, char** args)
+{
+ *p = (struct process) {
+ .initialized = FLB_TRUE,
+ .args = args,
+ .stdin_stream = -1,
+ .stdout_stream = {-1, -1},
+ .stderr_stream = -1,
+ .pid = -1,
+ };
+
+ while ((p->stdin_stream = open(DEV_NULL, O_RDONLY|O_CLOEXEC)) < 0) {
+ if (errno != EINTR) {
+ flb_errno();
+ return -1;
+ }
+ }
+
+ if (flb_pipe_create(p->stdout_stream) < 0) {;
+ flb_errno();
+ return -1;
+ }
+
+ if (fcntl(p->stdout_stream[0], F_SETFL, O_CLOEXEC) < 0) {
+ flb_errno();
+ return -1;
+ }
+
+ if (fcntl(p->stdout_stream[1], F_SETFL, O_CLOEXEC) < 0) {
+ flb_errno();
+ return -1;
+ }
+
+ while ((p->stderr_stream = open(DEV_NULL, O_WRONLY|O_CLOEXEC)) < 0) {
+ if (errno != EINTR) {
+ flb_errno();
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+/*
+ * Sets up the credential_process's stdin, stdout, and stderr, and exec's
+ * the actual process.
+ * For this function to return at all is an error.
+ * This function should not be called more than once.
+ */
+static void exec_process_child(struct process* p)
+{
+ while ((dup2(p->stdin_stream, STDIN_FILENO) < 0)) {
+ if (errno != EINTR) {
+ return;
+ }
+ }
+ while ((dup2(p->stdout_stream[1], STDOUT_FILENO) < 0)) {
+ if (errno != EINTR) {
+ return;
+ }
+ }
+ while ((dup2(p->stderr_stream, STDERR_FILENO) < 0)) {
+ if (errno != EINTR) {
+ return;
+ }
+ }
+
+ close(p->stdin_stream);
+ flb_pipe_close(p->stdout_stream[0]);
+ flb_pipe_close(p->stdout_stream[1]);
+ close(p->stderr_stream);
+
+ execvp(p->args[0], p->args);
+}
+
+/*
+ * Forks the credential_process, but does not wait for it to finish.
+ * Returns 0 on success and < 0 on failure.
+ * This function should not be called more than once.
+ */
+static int exec_process(struct process* p)
+{
+ AWS_CREDS_DEBUG("executing credential_process %s", p->args[0]);
+
+ p->pid = fork();
+ if (p->pid < 0) {
+ flb_errno();
+ return -1;
+ }
+
+ if (p->pid == 0) {
+ exec_process_child(p);
+
+ /* It should not be possible to reach this under normal circumstances. */
+ exit(EXIT_FAILURE);
+ }
+
+ close(p->stdin_stream);
+ p->stdin_stream = -1;
+
+ flb_pipe_close(p->stdout_stream[1]);
+ p->stdout_stream[1] = -1;
+
+ close(p->stderr_stream);
+ p->stderr_stream = -1;
+
+ return 0;
+}
+
+/*
+ * Reads from the credential_process's stdout into the given buffer.
+ * Returns 0 on success, and < 0 on failure or timeout.
+ * This function should not be called more than once.
+ */
+static int read_from_process(struct process* p, struct readbuf* buf)
+{
+ int result = -1;
+ struct pollfd pfd;
+ struct flb_time start, timeout, deadline, now, remaining;
+ int remaining_ms;
+
+ if (fcntl(p->stdout_stream[0], F_SETFL, O_NONBLOCK) < 0) {
+ flb_errno();
+ return -1;
+ }
+
+ if (get_monotonic_time(&start) < 0) {
+ return -1;
+ }
+
+ flb_time_set(&timeout,
+ (time_t) (CREDENTIAL_PROCESS_TIMEOUT_MS / MS_PER_SEC),
+ ((long) (CREDENTIAL_PROCESS_TIMEOUT_MS % MS_PER_SEC)) * NS_PER_MS);
+
+ /* deadline = start + timeout */
+ flb_time_add(&start, &timeout, &deadline);
+
+ while (1) {
+ pfd = (struct pollfd) {
+ .fd = p->stdout_stream[0],
+ .events = POLLIN,
+ };
+
+ if (get_monotonic_time(&now) < 0) {
+ return -1;
+ }
+
+ /* remaining = deadline - now */
+ if (flb_time_diff(&deadline, &now, &remaining) < 0) {
+ AWS_CREDS_ERROR("credential_process %s timed out", p->args[0]);
+ return -1;
+ }
+
+ /*
+ * poll uses millisecond resolution for the timeout.
+ * If there is less than a millisecond left, then for simplicity we'll just
+ * declare that it timed out.
+ */
+ remaining_ms = (int) (flb_time_to_nanosec(&remaining) / NS_PER_MS);
+ if (remaining_ms <= 0) {
+ AWS_CREDS_ERROR("credential_process %s timed out", p->args[0]);
+ return -1;
+ }
+
+ result = poll(&pfd, 1, remaining_ms);
+ if (result < 0) {
+ if (errno != EINTR) {
+ flb_errno();
+ return -1;
+ }
+ continue;
+ }
+
+ if (result == 0) {
+ AWS_CREDS_ERROR("credential_process %s timed out", p->args[0]);
+ return -1;
+ }
+
+ if ((pfd.revents & POLLNVAL) == POLLNVAL) {
+ AWS_CREDS_ERROR("credential_process %s POLLNVAL", p->args[0]);
+ return -1;
+ }
+
+ if ((pfd.revents & POLLERR) == POLLERR) {
+ AWS_CREDS_ERROR("credential_process %s POLLERR", p->args[0]);
+ return -1;
+ }
+
+ if ((pfd.revents & POLLIN) == POLLIN || (pfd.revents & POLLHUP) == POLLHUP) {
+ result = read_until_block(p->args[0], p->stdout_stream[0], buf);
+ if (result <= 0) {
+ return result;
+ }
+ }
+ }
+}
+
+/*
+ * Waits for the process to exit, up to a timeout.
+ * Returns 0 on success and < 0 on failure.
+ * This function should not be called more than once.
+ */
+static int wait_process(struct process* p)
+{
+ int wstatus;
+
+ if (waitpid_timeout(p->args[0], p->pid, &wstatus) < 0) {
+ return -1;
+ }
+ p->pid = -1;
+
+ if (!WIFEXITED(wstatus)) {
+ AWS_CREDS_ERROR("credential_process %s did not terminate normally", p->args[0]);
+ return -1;
+ }
+
+ if (WEXITSTATUS(wstatus) != EXIT_SUCCESS) {
+ AWS_CREDS_ERROR("credential_process %s exited with status %d", p->args[0],
+ WEXITSTATUS(wstatus));
+ return -1;
+ }
+
+ AWS_CREDS_DEBUG("credential_process %s exited successfully", p->args[0]);
+ return 0;
+}
+
+/*
+ * Release all resources associated with this process.
+ * Calling this function multiple times is a no-op.
+ * Since the process does not own p->args, it does not free it.
+ * Note that p->args will be set to NULL, so the caller must hold onto
+ * it separately in order to free it.
+ */
+static void destroy_process(struct process* p)
+{
+ if (p->initialized) {
+ if (p->stdin_stream >= 0) {
+ close(p->stdin_stream);
+ p->stdin_stream = -1;
+ }
+ if (p->stdout_stream[0] >= 0) {
+ close(p->stdout_stream[0]);
+ p->stdout_stream[0] = -1;
+ }
+ if (p->stdout_stream[1] >= 0) {
+ close(p->stdout_stream[1]);
+ p->stdout_stream[1] = -1;
+ }
+ if (p->stderr_stream >= 0) {
+ close(p->stderr_stream);
+ p->stderr_stream = -1;
+ }
+
+ if (p->pid > 0) {
+ if (kill(p->pid, SIGKILL) < 0) {
+ flb_errno();
+ AWS_CREDS_ERROR("could not kill credential_process %s (pid=%d) "
+ "during cleanup", p->args[0], p->pid);
+ }
+ else {
+ while (waitpid(p->pid, NULL, 0) < 0) {
+ if (errno != EINTR) {
+ flb_errno();
+ break;
+ }
+ }
+ }
+ p->pid = -1;
+ }
+
+ p->args = NULL;
+
+ p->initialized = FLB_FALSE;
+ }
+}
+
+/* See <fluent-bit/flb_aws_credentials.h>. */
+int exec_credential_process(char* process, struct flb_aws_credentials** creds,
+ time_t* expiration)
+{
+ char** args = NULL;
+ int result = -1;
+ struct process p = { 0 };
+ struct readbuf buf = { 0 };
+ *creds = NULL;
+ *expiration = 0;
+
+ args = parse_credential_process(process);
+ if (!args) {
+ result = -1;
+ goto end;
+ }
+
+ if (!args[0]) {
+ AWS_CREDS_ERROR("invalid credential_process");
+ result = -1;
+ goto end;
+ }
+
+ if (new_process(&p, args) < 0) {
+ result = -1;
+ goto end;
+ }
+
+ if (new_readbuf(&buf, CREDENTIAL_PROCESS_BUFFER_SIZE) < 0) {
+ result = -1;
+ goto end;
+ }
+
+ if (exec_process(&p) < 0) {
+ result = -1;
+ goto end;
+ }
+
+ if (read_from_process(&p, &buf) < 0) {
+ result = -1;
+ goto end;
+ }
+
+ if (wait_process(&p) < 0) {
+ result = -1;
+ goto end;
+ }
+
+ *creds = flb_parse_json_credentials(buf.buf, buf.len,
+ CREDENTIAL_PROCESS_RESPONSE_SESSION_TOKEN,
+ expiration);
+ if (!*creds) {
+ AWS_CREDS_ERROR("could not parse credentials from credential_process %s", args[0]);
+ result = -1;
+ goto end;
+ }
+
+ AWS_CREDS_DEBUG("successfully parsed credentials from credential_process %s", args[0]);
+
+ result = 0;
+
+end:
+ destroy_process(&p);
+
+ flb_free(buf.buf);
+ buf.buf = NULL;
+
+ flb_free(args);
+ args = NULL;
+
+ if (result < 0) {
+ flb_aws_credentials_destroy(*creds);
+ *creds = NULL;
+ }
+
+ return result;
+}
diff --git a/fluent-bit/src/aws/flb_aws_credentials_profile.c b/fluent-bit/src/aws/flb_aws_credentials_profile.c
new file mode 100644
index 000000000..6b3ab5dbd
--- /dev/null
+++ b/fluent-bit/src/aws/flb_aws_credentials_profile.c
@@ -0,0 +1,753 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "flb_aws_credentials_log.h"
+
+#include <fluent-bit/flb_aws_credentials.h>
+#include <fluent-bit/flb_aws_util.h>
+#include <fluent-bit/flb_http_client.h>
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_sds.h>
+
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <ctype.h>
+
+#define ACCESS_KEY_PROPERTY_NAME "aws_access_key_id"
+#define SECRET_KEY_PROPERTY_NAME "aws_secret_access_key"
+#define SESSION_TOKEN_PROPERTY_NAME "aws_session_token"
+#define CREDENTIAL_PROCESS_PROPERTY_NAME "credential_process"
+
+#define AWS_PROFILE "AWS_PROFILE"
+#define AWS_DEFAULT_PROFILE "AWS_DEFAULT_PROFILE"
+
+#define AWS_CONFIG_FILE "AWS_CONFIG_FILE"
+#define AWS_SHARED_CREDENTIALS_FILE "AWS_SHARED_CREDENTIALS_FILE"
+
+#define DEFAULT_PROFILE "default"
+#define CONFIG_PROFILE_PREFIX "profile "
+#define CONFIG_PROFILE_PREFIX_LEN (sizeof(CONFIG_PROFILE_PREFIX)-1)
+
+/* Declarations */
+struct flb_aws_provider_profile;
+static int refresh_credentials(struct flb_aws_provider_profile *implementation,
+ int debug_only);
+
+static int get_aws_shared_file_path(flb_sds_t* field, char* env_var, char* home_aws_path);
+
+static int parse_config_file(char *buf, char* profile, struct flb_aws_credentials** creds,
+ time_t* expiration, int debug_only);
+static int parse_credentials_file(char *buf, char *profile,
+ struct flb_aws_credentials *creds, int debug_only);
+
+static int get_shared_config_credentials(char* config_path,
+ char*profile,
+ struct flb_aws_credentials** creds,
+ time_t* expiration,
+ int debug_only);
+static int get_shared_credentials(char* credentials_path,
+ char* profile,
+ struct flb_aws_credentials** creds,
+ int debug_only);
+
+static flb_sds_t parse_property_value(char *s, int debug_only);
+static char *parse_property_line(char *line);
+static int has_profile(char *line, char* profile, int debug_only);
+static int is_profile_line(char *line);
+static int config_file_profile_matches(char *line, char *profile);
+
+/*
+ * A provider that reads from the shared credentials file.
+ */
+struct flb_aws_provider_profile {
+ struct flb_aws_credentials *creds;
+ time_t next_refresh;
+
+ flb_sds_t profile;
+ flb_sds_t config_path;
+ flb_sds_t credentials_path;
+};
+
+struct flb_aws_credentials *get_credentials_fn_profile(struct flb_aws_provider
+ *provider)
+{
+ struct flb_aws_credentials *creds;
+ int ret;
+ struct flb_aws_provider_profile *implementation = provider->implementation;
+
+ /*
+ * If next_refresh <= 0, it means we don't know how long the credentials
+ * are valid for. So we won't refresh them unless explicitly asked
+ * via refresh_fn_profile.
+ */
+ if (!implementation->creds || (implementation->next_refresh > 0 &&
+ time(NULL) >= implementation->next_refresh)) {
+ AWS_CREDS_DEBUG("Retrieving credentials for AWS Profile %s",
+ implementation->profile);
+ if (try_lock_provider(provider) == FLB_TRUE) {
+ ret = refresh_credentials(implementation, FLB_FALSE);
+ unlock_provider(provider);
+ if (ret < 0) {
+ AWS_CREDS_ERROR("Failed to retrieve credentials for AWS Profile %s",
+ implementation->profile);
+ return NULL;
+ }
+ } else {
+ AWS_CREDS_WARN("Another thread is refreshing credentials, will retry");
+ return NULL;
+ }
+ }
+
+ creds = flb_calloc(1, sizeof(struct flb_aws_credentials));
+ if (!creds) {
+ flb_errno();
+ goto error;
+ }
+
+ creds->access_key_id = flb_sds_create(implementation->creds->access_key_id);
+ if (!creds->access_key_id) {
+ flb_errno();
+ goto error;
+ }
+
+ creds->secret_access_key = flb_sds_create(implementation->
+ creds->secret_access_key);
+ if (!creds->secret_access_key) {
+ flb_errno();
+ goto error;
+ }
+
+ if (implementation->creds->session_token) {
+ creds->session_token = flb_sds_create(implementation->
+ creds->session_token);
+ if (!creds->session_token) {
+ flb_errno();
+ goto error;
+ }
+
+ } else {
+ creds->session_token = NULL;
+ }
+
+ return creds;
+
+error:
+ flb_aws_credentials_destroy(creds);
+ return NULL;
+}
+
+int refresh_fn_profile(struct flb_aws_provider *provider)
+{
+ struct flb_aws_provider_profile *implementation = provider->implementation;
+ int ret = -1;
+ AWS_CREDS_DEBUG("Refresh called on the profile provider");
+ if (try_lock_provider(provider) == FLB_TRUE) {
+ ret = refresh_credentials(implementation, FLB_FALSE);
+ unlock_provider(provider);
+ return ret;
+ }
+ return ret;
+}
+
+int init_fn_profile(struct flb_aws_provider *provider)
+{
+ struct flb_aws_provider_profile *implementation = provider->implementation;
+ int ret = -1;
+ AWS_CREDS_DEBUG("Init called on the profile provider");
+ if (try_lock_provider(provider) == FLB_TRUE) {
+ ret = refresh_credentials(implementation, FLB_TRUE);
+ unlock_provider(provider);
+ return ret;
+ }
+ return ret;
+}
+
+/*
+ * Sync and Async are no-ops for the profile provider because it does not
+ * make network IO calls
+ */
+void sync_fn_profile(struct flb_aws_provider *provider)
+{
+ return;
+}
+
+void async_fn_profile(struct flb_aws_provider *provider)
+{
+ return;
+}
+
+void upstream_set_fn_profile(struct flb_aws_provider *provider,
+ struct flb_output_instance *ins)
+{
+ return;
+}
+
+void destroy_fn_profile(struct flb_aws_provider *provider)
+{
+ struct flb_aws_provider_profile *implementation = provider->implementation;
+
+ if (implementation) {
+ if (implementation->creds) {
+ flb_aws_credentials_destroy(implementation->creds);
+ }
+
+ if (implementation->profile) {
+ flb_sds_destroy(implementation->profile);
+ }
+
+ if (implementation->config_path) {
+ flb_sds_destroy(implementation->config_path);
+ }
+
+ if (implementation->credentials_path) {
+ flb_sds_destroy(implementation->credentials_path);
+ }
+
+ flb_free(implementation);
+ provider->implementation = NULL;
+ }
+
+ return;
+}
+
+static struct flb_aws_provider_vtable profile_provider_vtable = {
+ .get_credentials = get_credentials_fn_profile,
+ .init = init_fn_profile,
+ .refresh = refresh_fn_profile,
+ .destroy = destroy_fn_profile,
+ .sync = sync_fn_profile,
+ .async = async_fn_profile,
+ .upstream_set = upstream_set_fn_profile,
+};
+
+struct flb_aws_provider *flb_profile_provider_create(char* profile)
+{
+ struct flb_aws_provider *provider = NULL;
+ struct flb_aws_provider_profile *implementation = NULL;
+ int result = -1;
+
+ provider = flb_calloc(1, sizeof(struct flb_aws_provider));
+
+ if (!provider) {
+ flb_errno();
+ goto error;
+ }
+
+ pthread_mutex_init(&provider->lock, NULL);
+
+ implementation = flb_calloc(1,
+ sizeof(
+ struct flb_aws_provider_profile));
+
+ if (!implementation) {
+ flb_errno();
+ goto error;
+ }
+
+ provider->provider_vtable = &profile_provider_vtable;
+ provider->implementation = implementation;
+
+ result = get_aws_shared_file_path(&implementation->config_path, AWS_CONFIG_FILE,
+ "/.aws/config");
+ if (result < 0) {
+ goto error;
+ }
+
+ result = get_aws_shared_file_path(&implementation->credentials_path,
+ AWS_SHARED_CREDENTIALS_FILE, "/.aws/credentials");
+ if (result < 0) {
+ goto error;
+ }
+
+ if (!implementation->config_path && !implementation->credentials_path) {
+ AWS_CREDS_WARN("Failed to initialize profile provider: "
+ "HOME, %s, and %s not set.",
+ AWS_CONFIG_FILE, AWS_SHARED_CREDENTIALS_FILE);
+ goto error;
+ }
+
+ /* AWS profile name. */
+ if (profile == NULL) {
+ profile = getenv(AWS_PROFILE);
+ }
+ if (profile && strlen(profile) > 0) {
+ goto set_profile;
+ }
+
+ profile = getenv(AWS_DEFAULT_PROFILE);
+ if (profile && strlen(profile) > 0) {
+ goto set_profile;
+ }
+
+ profile = DEFAULT_PROFILE;
+
+set_profile:
+ implementation->profile = flb_sds_create(profile);
+ if (!implementation->profile) {
+ flb_errno();
+ goto error;
+ }
+
+ return provider;
+
+error:
+ flb_aws_provider_destroy(provider);
+ return NULL;
+}
+
+
+/*
+ * Fetches the path of either the shared config file or the shared credentials file.
+ * Returns 0 on success and < 0 on failure.
+ * On success, the result will be stored in *field.
+ *
+ * If the given environment variable is set, then its value will be used verbatim.
+ * Else if $HOME is set, then it will be concatenated with home_aws_path.
+ * If neither is set, then *field will be set to NULL. This is not considered a failure.
+ *
+ * In practice, env_var will be "AWS_CONFIG_FILE" or "AWS_SHARED_CREDENTIALS_FILE",
+ * and home_aws_path will be "/.aws/config" or "/.aws/credentials".
+ */
+static int get_aws_shared_file_path(flb_sds_t* field, char* env_var, char* home_aws_path)
+{
+ char* path = NULL;
+ int result = -1;
+ flb_sds_t value = NULL;
+
+ path = getenv(env_var);
+ if (path && *path) {
+ value = flb_sds_create(path);
+ if (!value) {
+ flb_errno();
+ goto error;
+ }
+ } else {
+ path = getenv("HOME");
+ if (path && *path) {
+ value = flb_sds_create(path);
+ if (!value) {
+ flb_errno();
+ goto error;
+ }
+
+ if (path[strlen(path) - 1] == '/') {
+ home_aws_path++;
+ }
+ result = flb_sds_cat_safe(&value, home_aws_path, strlen(home_aws_path));
+ if (result < 0) {
+ flb_errno();
+ goto error;
+ }
+ }
+ }
+
+ *field = value;
+ return 0;
+
+error:
+ flb_sds_destroy(value);
+ return -1;
+}
+
+static int is_profile_line(char *line) {
+ if (line[0] == '[') {
+ return FLB_TRUE;
+ }
+ return FLB_FALSE;
+}
+
+/* Called on lines that have is_profile_line == True */
+static int has_profile(char *line, char* profile, int debug_only) {
+ char *end_bracket = strchr(line, ']');
+ if (!end_bracket) {
+ if (debug_only) {
+ AWS_CREDS_DEBUG("Profile header has no ending bracket:\n %s", line);
+ }
+ else {
+ AWS_CREDS_WARN("Profile header has no ending bracket:\n %s", line);
+ }
+ return FLB_FALSE;
+ }
+ *end_bracket = '\0';
+
+ if (strcmp(&line[1], profile) == 0) {
+ return FLB_TRUE;
+ }
+
+ return FLB_FALSE;
+}
+
+/*
+ * Sets a null byte such that line becomes the property name
+ * Returns a pointer to the rest of the line (the value), if successful.
+ */
+static char *parse_property_line(char *line) {
+ int len = strlen(line);
+ int found_delimeter = FLB_FALSE;
+ int i = 0;
+
+ if (isspace(line[0])) {
+ /* property line can not start with whitespace */
+ return NULL;
+ }
+
+ /*
+ * Go through the line char by char, once we find whitespace/= we are
+ * passed the property name. Return the first char of the property value.
+ * There should be a single "=" separating name and value.
+ */
+ for (i=0; i < (len - 1); i++) {
+ if (isspace(line[i])) {
+ line[i] = '\0';
+ } else if (found_delimeter == FLB_FALSE && line[i] == '=') {
+ found_delimeter = FLB_TRUE;
+ line[i] = '\0';
+ } else if (found_delimeter == FLB_TRUE) {
+ return &line[i];
+ }
+ }
+
+ return NULL;
+}
+
+/* called on the rest of a line after parse_property_line is called */
+static flb_sds_t parse_property_value(char *s, int debug_only) {
+ int len = strlen(s);
+ int i = 0;
+ char *val = NULL;
+ flb_sds_t prop;
+
+ for (i=0; i < len; i++) {
+ if (isspace(s[i])) {
+ s[i] = '\0';
+ continue;
+ } else if (!val) {
+ val = &s[i];
+ }
+ }
+
+ if (!val) {
+ AWS_CREDS_ERROR_OR_DEBUG(debug_only, "Could not parse credential value from %s", s);
+ }
+
+ prop = flb_sds_create(val);
+ if (!prop) {
+ flb_errno();
+ return NULL;
+ }
+
+ return prop;
+}
+
+static int config_file_profile_matches(char *line, char *profile) {
+ char *current_profile = line + 1;
+ char* current_profile_end = strchr(current_profile, ']');
+
+ if (!current_profile_end) {
+ return FLB_FALSE;
+ }
+ *current_profile_end = '\0';
+
+ /*
+ * Non-default profiles look like `[profile <name>]`.
+ * The default profile can look like `[profile default]` or just `[default]`.
+ * This is different than the credentials file, where everything is `[<name>]`.
+ */
+ if (strncmp(current_profile, CONFIG_PROFILE_PREFIX, CONFIG_PROFILE_PREFIX_LEN) != 0) {
+ if (strcmp(current_profile, DEFAULT_PROFILE) != 0) {
+ /* This is not a valid profile line. */
+ return FLB_FALSE;
+ }
+ } else {
+ current_profile += CONFIG_PROFILE_PREFIX_LEN;
+ }
+
+ if (strcmp(current_profile, profile) == 0) {
+ return FLB_TRUE;
+ }
+ return FLB_FALSE;
+}
+
+static int parse_config_file(char *buf, char* profile, struct flb_aws_credentials** creds,
+ time_t* expiration, int debug_only)
+{
+ char *line = NULL;
+ char *line_end = NULL;
+ char *prop_val = NULL;
+ char *credential_process = NULL;
+ int found_profile = FLB_FALSE;
+
+ for (line = buf; line[0] != '\0'; line = buf) {
+ /*
+ * Find the next newline and replace it with a null terminator.
+ * That way we can easily manipulate the current line as a string.
+ */
+ line_end = strchr(line, '\n');
+ if (line_end) {
+ *line_end = '\0';
+ buf = line_end + 1;
+ } else {
+ buf = "";
+ }
+
+ if (found_profile != FLB_TRUE) {
+ if (is_profile_line(line) != FLB_TRUE) {
+ continue;
+ }
+ if (config_file_profile_matches(line, profile) != FLB_TRUE) {
+ continue;
+ }
+ found_profile = FLB_TRUE;
+ } else {
+ if (is_profile_line(line) == FLB_TRUE) {
+ break;
+ }
+ prop_val = parse_property_line(line);
+ if (strcmp(line, CREDENTIAL_PROCESS_PROPERTY_NAME) == 0) {
+ credential_process = prop_val;
+ }
+ }
+ }
+
+ if (credential_process) {
+#ifdef FLB_HAVE_AWS_CREDENTIAL_PROCESS
+ if (exec_credential_process(credential_process, creds, expiration) < 0) {
+ return -1;
+ }
+#else
+ AWS_CREDS_WARN("credential_process not supported for this platform");
+ return -1;
+#endif
+ }
+
+ return 0;
+}
+
+/*
+ * Parses a shared credentials file.
+ * Expects the contents of 'creds' to be initialized to NULL (i.e use calloc).
+ */
+static int parse_credentials_file(char *buf, char *profile,
+ struct flb_aws_credentials *creds, int debug_only)
+{
+ char *line;
+ char *line_end;
+ char *prop_val = NULL;
+ int found_profile = FLB_FALSE;
+
+ line = buf;
+
+ while (line[0] != '\0') {
+ /* turn the line into a C string */
+ line_end = strchr(line, '\n');
+ if (line_end) {
+ *line_end = '\0';
+ }
+
+ if (is_profile_line(line) == FLB_TRUE) {
+ if (found_profile == FLB_TRUE) {
+ break;
+ }
+ if (has_profile(line, profile, debug_only)) {
+ found_profile = FLB_TRUE;
+ }
+ } else {
+ prop_val = parse_property_line(line);
+ if (prop_val && found_profile == FLB_TRUE) {
+ if (strcmp(line, ACCESS_KEY_PROPERTY_NAME) == 0) {
+ creds->access_key_id = parse_property_value(prop_val,
+ debug_only);
+ }
+ if (strcmp(line, SECRET_KEY_PROPERTY_NAME) == 0) {
+ creds->secret_access_key = parse_property_value(prop_val,
+ debug_only);
+ }
+ if (strcmp(line, SESSION_TOKEN_PROPERTY_NAME) == 0) {
+ creds->session_token = parse_property_value(prop_val,
+ debug_only);
+ }
+ }
+ }
+
+ /* advance to next line */
+ if (line_end) {
+ line = line_end + 1;
+ } else {
+ break;
+ }
+ }
+
+ if (creds->access_key_id && creds->secret_access_key) {
+ return 0;
+ }
+ AWS_CREDS_ERROR_OR_DEBUG(debug_only, "%s and %s keys not parsed in shared "
+ "credentials file for profile %s.", ACCESS_KEY_PROPERTY_NAME,
+ SECRET_KEY_PROPERTY_NAME, profile);
+ return -1;
+}
+
+static int get_shared_config_credentials(char* config_path,
+ char*profile,
+ struct flb_aws_credentials** creds,
+ time_t* expiration,
+ int debug_only) {
+ int result = -1;
+ char* buf = NULL;
+ size_t size;
+ *creds = NULL;
+ *expiration = 0;
+
+ AWS_CREDS_DEBUG("Reading shared config file.");
+
+ if (flb_read_file(config_path, &buf, &size) < 0) {
+ if (errno == ENOENT) {
+ AWS_CREDS_DEBUG("Shared config file %s does not exist", config_path);
+ result = 0;
+ goto end;
+ }
+ flb_errno();
+ AWS_CREDS_ERROR_OR_DEBUG(debug_only, "Could not read shared config file %s",
+ config_path);
+ result = -1;
+ goto end;
+ }
+
+ if (parse_config_file(buf, profile, creds, expiration, debug_only) < 0) {
+ result = -1;
+ goto end;
+ }
+
+ result = 0;
+
+end:
+ flb_free(buf);
+ return result;
+}
+
+static int get_shared_credentials(char* credentials_path,
+ char* profile,
+ struct flb_aws_credentials** creds,
+ int debug_only) {
+ int result = -1;
+ char* buf = NULL;
+ size_t size;
+ *creds = NULL;
+
+ *creds = flb_calloc(1, sizeof(struct flb_aws_credentials));
+ if (!*creds) {
+ flb_errno();
+ result = -1;
+ goto end;
+ }
+
+ AWS_CREDS_DEBUG("Reading shared credentials file.");
+
+ if (flb_read_file(credentials_path, &buf, &size) < 0) {
+ if (errno == ENOENT) {
+ AWS_CREDS_ERROR_OR_DEBUG(debug_only, "Shared credentials file %s does not exist",
+ credentials_path);
+ } else {
+ flb_errno();
+ AWS_CREDS_ERROR_OR_DEBUG(debug_only, "Could not read shared credentials file %s",
+ credentials_path);
+ }
+ result = -1;
+ goto end;
+ }
+
+ if (parse_credentials_file(buf, profile, *creds, debug_only) < 0) {
+ AWS_CREDS_ERROR_OR_DEBUG(debug_only, "Could not parse shared credentials file: "
+ "valid profile with name '%s' not found", profile);
+ result = -1;
+ goto end;
+ }
+
+ result = 0;
+
+end:
+ flb_free(buf);
+
+ if (result < 0) {
+ flb_aws_credentials_destroy(*creds);
+ *creds = NULL;
+ }
+
+ return result;
+}
+
+static int refresh_credentials(struct flb_aws_provider_profile *implementation,
+ int debug_only)
+{
+ struct flb_aws_credentials *creds = NULL;
+ time_t expiration = 0;
+ int ret;
+
+ if (implementation->config_path) {
+ ret = get_shared_config_credentials(implementation->config_path,
+ implementation->profile,
+ &creds,
+ &expiration,
+ debug_only);
+ if (ret < 0) {
+ goto error;
+ }
+ }
+
+ /*
+ * If we did not find a credential_process in the shared config file, fall back to
+ * the shared credentials file.
+ */
+ if (!creds) {
+ if (!implementation->credentials_path) {
+ AWS_CREDS_ERROR("shared config file contains no credential_process and "
+ "no shared credentials file was configured");
+ goto error;
+ }
+
+ ret = get_shared_credentials(implementation->credentials_path,
+ implementation->profile,
+ &creds,
+ debug_only);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* The shared credentials file does not record when the credentials expire. */
+ expiration = 0;
+ }
+
+ /* unset and free existing credentials */
+ flb_aws_credentials_destroy(implementation->creds);
+ implementation->creds = creds;
+
+ if (expiration > 0) {
+ implementation->next_refresh = expiration - FLB_AWS_REFRESH_WINDOW;
+ } else {
+ implementation->next_refresh = 0;
+ }
+
+ return 0;
+
+error:
+ flb_aws_credentials_destroy(creds);
+ return -1;
+}
diff --git a/fluent-bit/src/aws/flb_aws_credentials_sts.c b/fluent-bit/src/aws/flb_aws_credentials_sts.c
new file mode 100644
index 000000000..d992485cc
--- /dev/null
+++ b/fluent-bit/src/aws/flb_aws_credentials_sts.c
@@ -0,0 +1,958 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/flb_http_client.h>
+#include <fluent-bit/flb_aws_credentials.h>
+#include <fluent-bit/flb_aws_util.h>
+#include <fluent-bit/flb_random.h>
+#include <fluent-bit/flb_jsmn.h>
+
+#include <stdlib.h>
+#include <time.h>
+#include <string.h>
+
+#define STS_ASSUME_ROLE_URI_FORMAT "/?Version=2011-06-15&Action=%s\
+&RoleSessionName=%s&RoleArn=%s"
+#define STS_ASSUME_ROLE_URI_BASE_LEN 54
+
+/*
+ * The STS APIs return an XML document with credentials.
+ * The part of the document we care about looks like this:
+ * <Credentials>
+ * <AccessKeyId>akid</AccessKeyId>
+ * <SecretAccessKey>skid</SecretAccessKey>
+ * <SessionToken>token</SessionToken>
+ * <Expiration>2019-11-09T13:34:41Z</Expiration>
+ * </Credentials>
+ */
+#define CREDENTIALS_NODE "<Credentials>"
+#define CREDENTIALS_NODE_LEN 13
+#define ACCESS_KEY_NODE "<AccessKeyId>"
+#define ACCESS_KEY_NODE_LEN 13
+#define ACCESS_KEY_NODE_END "</AccessKeyId>"
+#define SECRET_KEY_NODE "<SecretAccessKey>"
+#define SECRET_KEY_NODE_LEN 17
+#define SECRET_KEY_NODE_END "</SecretAccessKey>"
+#define SESSION_TOKEN_NODE "<SessionToken>"
+#define SESSION_TOKEN_NODE_LEN 14
+#define SESSION_TOKEN_NODE_END "</SessionToken>"
+#define EXPIRATION_NODE "<Expiration>"
+#define EXPIRATION_NODE_LEN 12
+#define EXPIRATION_NODE_END "</Expiration>"
+
+#define TOKEN_FILE_ENV_VAR "AWS_WEB_IDENTITY_TOKEN_FILE"
+#define ROLE_ARN_ENV_VAR "AWS_ROLE_ARN"
+#define SESSION_NAME_ENV_VAR "AWS_ROLE_SESSION_NAME"
+
+#define SESSION_NAME_RANDOM_BYTE_LEN 32
+
+struct flb_aws_provider_eks;
+void bytes_to_string(unsigned char *data, char *buf, size_t len);
+static int assume_with_web_identity(struct flb_aws_provider_eks
+ *implementation);
+static int sts_assume_role_request(struct flb_aws_client *sts_client,
+ struct flb_aws_credentials **creds,
+ char *uri,
+ time_t *next_refresh);
+static flb_sds_t get_node(char *cred_node, char* node_name, int node_name_len, char* node_end);
+
+
+/*
+ * A provider that uses credentials from the base provider to call STS
+ * and assume an IAM Role.
+ */
+struct flb_aws_provider_sts {
+ int custom_endpoint;
+ struct flb_aws_provider *base_provider;
+
+ struct flb_aws_credentials *creds;
+ time_t next_refresh;
+
+ struct flb_aws_client *sts_client;
+
+ /* Fluent Bit uses regional STS endpoints; this is a best practice. */
+ char *endpoint;
+
+ flb_sds_t uri;
+};
+
+struct flb_aws_credentials *get_credentials_fn_sts(struct flb_aws_provider
+ *provider)
+{
+ struct flb_aws_credentials *creds;
+ int refresh = FLB_FALSE;
+ struct flb_aws_provider_sts *implementation = provider->implementation;
+
+ flb_debug("[aws_credentials] Requesting credentials from the "
+ "STS provider..");
+
+ /* a negative next_refresh means that auto-refresh is disabled */
+ if (implementation->next_refresh > 0
+ && time(NULL) > implementation->next_refresh) {
+ refresh = FLB_TRUE;
+ }
+ if (!implementation->creds || refresh == FLB_TRUE) {
+ /* credentials need to be refreshed/obtained */
+ if (try_lock_provider(provider)) {
+ flb_debug("[aws_credentials] STS Provider: Refreshing credential "
+ "cache.");
+ sts_assume_role_request(implementation->sts_client,
+ &implementation->creds,
+ implementation->uri,
+ &implementation->next_refresh);
+ unlock_provider(provider);
+ }
+ }
+
+ if (!implementation->creds) {
+ /*
+ * We failed to lock the provider and creds are unset. This means that
+ * another co-routine is performing the refresh.
+ */
+ flb_warn("[aws_credentials] No cached credentials are available and "
+ "a credential refresh is already in progress. The current "
+ "co-routine will retry.");
+
+ return NULL;
+ }
+
+ /* return a copy of the existing cached credentials */
+ creds = flb_calloc(1, sizeof(struct flb_aws_credentials));
+ if (!creds) {
+ goto error;
+ }
+
+ creds->access_key_id = flb_sds_create(implementation->creds->access_key_id);
+ if (!creds->access_key_id) {
+ goto error;
+ }
+
+ creds->secret_access_key = flb_sds_create(implementation->creds->
+ secret_access_key);
+ if (!creds->secret_access_key) {
+ goto error;
+ }
+
+ if (implementation->creds->session_token) {
+ creds->session_token = flb_sds_create(implementation->creds->
+ session_token);
+ if (!creds->session_token) {
+ goto error;
+ }
+
+ } else {
+ creds->session_token = NULL;
+ }
+
+ return creds;
+
+error:
+ flb_errno();
+ flb_aws_credentials_destroy(creds);
+ return NULL;
+}
+
+int refresh_fn_sts(struct flb_aws_provider *provider) {
+ int ret = -1;
+ struct flb_aws_provider_sts *implementation = provider->implementation;
+
+ flb_debug("[aws_credentials] Refresh called on the STS provider");
+
+ if (try_lock_provider(provider)) {
+ ret = sts_assume_role_request(implementation->sts_client,
+ &implementation->creds, implementation->uri,
+ &implementation->next_refresh);
+ unlock_provider(provider);
+ }
+ return ret;
+}
+
+int init_fn_sts(struct flb_aws_provider *provider) {
+ int ret = -1;
+ struct flb_aws_provider_sts *implementation = provider->implementation;
+
+ flb_debug("[aws_credentials] Init called on the STS provider");
+
+ /* Call Init on the base provider first */
+ implementation->base_provider->provider_vtable->
+ init(implementation->base_provider);
+
+ implementation->sts_client->debug_only = FLB_TRUE;
+
+ if (try_lock_provider(provider)) {
+ ret = sts_assume_role_request(implementation->sts_client,
+ &implementation->creds, implementation->uri,
+ &implementation->next_refresh);
+ unlock_provider(provider);
+ }
+
+ implementation->sts_client->debug_only = FLB_FALSE;
+ return ret;
+}
+
+void sync_fn_sts(struct flb_aws_provider *provider) {
+ struct flb_aws_provider_sts *implementation = provider->implementation;
+ struct flb_aws_provider *base_provider = implementation->base_provider;
+
+ flb_debug("[aws_credentials] Sync called on the STS provider");
+ /* Remove async flag */
+ flb_stream_disable_async_mode(&implementation->sts_client->upstream->base);
+
+ /* we also need to call sync on the base_provider */
+ base_provider->provider_vtable->sync(base_provider);
+}
+
+void async_fn_sts(struct flb_aws_provider *provider) {
+ struct flb_aws_provider_sts *implementation = provider->implementation;
+ struct flb_aws_provider *base_provider = implementation->base_provider;
+
+ flb_debug("[aws_credentials] Async called on the STS provider");
+ /* Add async flag */
+ flb_stream_enable_async_mode(&implementation->sts_client->upstream->base);
+
+ /* we also need to call async on the base_provider */
+ base_provider->provider_vtable->async(base_provider);
+}
+
+void upstream_set_fn_sts(struct flb_aws_provider *provider,
+ struct flb_output_instance *ins) {
+ struct flb_aws_provider_sts *implementation = provider->implementation;
+ struct flb_aws_provider *base_provider = implementation->base_provider;
+
+ flb_debug("[aws_credentials] upstream_set called on the STS provider");
+ /* associate output and upstream */
+ flb_output_upstream_set(implementation->sts_client->upstream, ins);
+
+ /* we also need to call upstream_set on the base_provider */
+ base_provider->provider_vtable->upstream_set(base_provider, ins);
+}
+
+void destroy_fn_sts(struct flb_aws_provider *provider) {
+ struct flb_aws_provider_sts *implementation;
+
+ implementation = provider->implementation;
+
+ if (implementation) {
+ if (implementation->creds) {
+ flb_aws_credentials_destroy(implementation->creds);
+ }
+
+ if (implementation->sts_client) {
+ flb_aws_client_destroy(implementation->sts_client);
+ }
+
+ if (implementation->uri) {
+ flb_sds_destroy(implementation->uri);
+ }
+
+ if (implementation->custom_endpoint == FLB_FALSE) {
+ flb_free(implementation->endpoint);
+ }
+
+ flb_free(implementation);
+ provider->implementation = NULL;
+ }
+
+ return;
+}
+
+static struct flb_aws_provider_vtable sts_provider_vtable = {
+ .get_credentials = get_credentials_fn_sts,
+ .init = init_fn_sts,
+ .refresh = refresh_fn_sts,
+ .destroy = destroy_fn_sts,
+ .sync = sync_fn_sts,
+ .async = async_fn_sts,
+ .upstream_set = upstream_set_fn_sts,
+};
+
+struct flb_aws_provider *flb_sts_provider_create(struct flb_config *config,
+ struct flb_tls *tls,
+ struct flb_aws_provider
+ *base_provider,
+ char *external_id,
+ char *role_arn,
+ char *session_name,
+ char *region,
+ char *sts_endpoint,
+ char *proxy,
+ struct
+ flb_aws_client_generator
+ *generator)
+{
+ struct flb_aws_provider_sts *implementation = NULL;
+ struct flb_aws_provider *provider = NULL;
+ struct flb_upstream *upstream = NULL;
+
+ provider = flb_calloc(1, sizeof(struct flb_aws_provider));
+ if (!provider) {
+ flb_errno();
+ return NULL;
+ }
+
+ pthread_mutex_init(&provider->lock, NULL);
+
+ implementation = flb_calloc(1, sizeof(struct flb_aws_provider_sts));
+ if (!implementation) {
+ goto error;
+ }
+
+ provider->provider_vtable = &sts_provider_vtable;
+ provider->implementation = implementation;
+
+ implementation->uri = flb_sts_uri("AssumeRole", role_arn, session_name,
+ external_id, NULL);
+ if (!implementation->uri) {
+ goto error;
+ }
+
+ if (sts_endpoint) {
+ implementation->endpoint = removeProtocol(sts_endpoint, "https://");
+ implementation->custom_endpoint = FLB_TRUE;
+ }
+ else {
+ implementation->endpoint = flb_aws_endpoint("sts", region);
+ implementation->custom_endpoint = FLB_FALSE;
+ }
+
+ if(!implementation->endpoint) {
+ goto error;
+ }
+
+ implementation->base_provider = base_provider;
+ implementation->sts_client = generator->create();
+ if (!implementation->sts_client) {
+ goto error;
+ }
+ implementation->sts_client->name = "sts_client_assume_role_provider";
+ implementation->sts_client->has_auth = FLB_TRUE;
+ implementation->sts_client->provider = base_provider;
+ implementation->sts_client->region = region;
+ implementation->sts_client->service = "sts";
+ implementation->sts_client->port = 443;
+ implementation->sts_client->flags = 0;
+ implementation->sts_client->proxy = proxy;
+
+ upstream = flb_upstream_create(config, implementation->endpoint, 443,
+ FLB_IO_TLS, tls);
+ if (!upstream) {
+ flb_error("[aws_credentials] Connection initialization error");
+ goto error;
+ }
+
+ upstream->base.net.connect_timeout = FLB_AWS_CREDENTIAL_NET_TIMEOUT;
+
+ implementation->sts_client->upstream = upstream;
+ implementation->sts_client->host = implementation->endpoint;
+
+ return provider;
+
+error:
+ flb_errno();
+ flb_aws_provider_destroy(provider);
+ return NULL;
+}
+
+/*
+ * A provider that uses OIDC tokens provided by kubernetes to obtain
+ * AWS credentials.
+ *
+ * The AWS SDKs have defined a spec for an OIDC provider that obtains tokens
+ * from environment variables or the shared config file.
+ * This provider only contains the functionality needed for EKS- obtaining the
+ * location of the OIDC token from an environment variable.
+ */
+struct flb_aws_provider_eks {
+ int custom_endpoint;
+ struct flb_aws_credentials *creds;
+ /*
+ * Time to auto-refresh creds before they expire. A negative value disables
+ * auto-refresh. Client code can always force a refresh.
+ */
+ time_t next_refresh;
+
+ struct flb_aws_client *sts_client;
+
+ /* Fluent Bit uses regional STS endpoints; this is a best practice. */
+ char *endpoint;
+
+ char *session_name;
+ /* session name can come from env or be generated by the provider */
+ int free_session_name;
+ char *role_arn;
+
+ char *token_file;
+};
+
+
+struct flb_aws_credentials *get_credentials_fn_eks(struct flb_aws_provider
+ *provider)
+{
+ struct flb_aws_credentials *creds = NULL;
+ int refresh = FLB_FALSE;
+ struct flb_aws_provider_eks *implementation = provider->implementation;
+
+ flb_debug("[aws_credentials] Requesting credentials from the "
+ "EKS provider..");
+
+ /* a negative next_refresh means that auto-refresh is disabled */
+ if (implementation->next_refresh > 0
+ && time(NULL) > implementation->next_refresh) {
+ refresh = FLB_TRUE;
+ }
+ if (!implementation->creds || refresh == FLB_TRUE) {
+ if (try_lock_provider(provider)) {
+ flb_debug("[aws_credentials] EKS Provider: Refreshing credential "
+ "cache.");
+ assume_with_web_identity(implementation);
+ unlock_provider(provider);
+ }
+ }
+
+ if (!implementation->creds) {
+ /*
+ * We failed to lock the provider and creds are unset. This means that
+ * another co-routine is performing the refresh.
+ */
+ flb_warn("[aws_credentials] No cached credentials are available and "
+ "a credential refresh is already in progress. The current "
+ "co-routine will retry.");
+
+ return NULL;
+ }
+
+ creds = flb_calloc(1, sizeof(struct flb_aws_credentials));
+ if (!creds) {
+ goto error;
+ }
+
+ creds->access_key_id = flb_sds_create(implementation->creds->access_key_id);
+ if (!creds->access_key_id) {
+ goto error;
+ }
+
+ creds->secret_access_key = flb_sds_create(implementation->creds->
+ secret_access_key);
+ if (!creds->secret_access_key) {
+ goto error;
+ }
+
+ if (implementation->creds->session_token) {
+ creds->session_token = flb_sds_create(implementation->creds->
+ session_token);
+ if (!creds->session_token) {
+ goto error;
+ }
+
+ }
+ else {
+ creds->session_token = NULL;
+ }
+
+ return creds;
+
+error:
+ flb_errno();
+ flb_aws_credentials_destroy(creds);
+ return NULL;
+}
+
+int refresh_fn_eks(struct flb_aws_provider *provider) {
+ int ret = -1;
+ struct flb_aws_provider_eks *implementation = provider->implementation;
+
+ flb_debug("[aws_credentials] Refresh called on the EKS provider");
+ if (try_lock_provider(provider)) {
+ ret = assume_with_web_identity(implementation);
+ unlock_provider(provider);
+ }
+ return ret;
+}
+
+int init_fn_eks(struct flb_aws_provider *provider) {
+ int ret = -1;
+ struct flb_aws_provider_eks *implementation = provider->implementation;
+
+ implementation->sts_client->debug_only = FLB_TRUE;
+
+ flb_debug("[aws_credentials] Init called on the EKS provider");
+ if (try_lock_provider(provider)) {
+ ret = assume_with_web_identity(implementation);
+ unlock_provider(provider);
+ }
+
+ implementation->sts_client->debug_only = FLB_FALSE;
+ return ret;
+}
+
+void sync_fn_eks(struct flb_aws_provider *provider) {
+ struct flb_aws_provider_eks *implementation = provider->implementation;
+ flb_debug("[aws_credentials] Sync called on the EKS provider");
+ /* remove async flag */
+ flb_stream_disable_async_mode(&implementation->sts_client->upstream->base);
+}
+
+void async_fn_eks(struct flb_aws_provider *provider) {
+ struct flb_aws_provider_eks *implementation = provider->implementation;
+ flb_debug("[aws_credentials] Async called on the EKS provider");
+ /* add async flag */
+ flb_stream_enable_async_mode(&implementation->sts_client->upstream->base);
+}
+
+void upstream_set_fn_eks(struct flb_aws_provider *provider,
+ struct flb_output_instance *ins) {
+ struct flb_aws_provider_eks *implementation = provider->implementation;
+ flb_debug("[aws_credentials] upstream_set called on the EKS provider");
+ /* set upstream on output */
+ flb_output_upstream_set(implementation->sts_client->upstream, ins);
+}
+
+void destroy_fn_eks(struct flb_aws_provider *provider) {
+ struct flb_aws_provider_eks *implementation = provider->
+ implementation;
+ if (implementation) {
+ if (implementation->creds) {
+ flb_aws_credentials_destroy(implementation->creds);
+ }
+
+ if (implementation->sts_client) {
+ flb_aws_client_destroy(implementation->sts_client);
+ }
+
+ if (implementation->custom_endpoint == FLB_FALSE) {
+ flb_free(implementation->endpoint);
+ }
+ if (implementation->free_session_name == FLB_TRUE) {
+ flb_free(implementation->session_name);
+ }
+
+ flb_free(implementation);
+ provider->implementation = NULL;
+ }
+
+ return;
+}
+
+static struct flb_aws_provider_vtable eks_provider_vtable = {
+ .get_credentials = get_credentials_fn_eks,
+ .init = init_fn_eks,
+ .refresh = refresh_fn_eks,
+ .destroy = destroy_fn_eks,
+ .sync = sync_fn_eks,
+ .async = async_fn_eks,
+ .upstream_set = upstream_set_fn_eks,
+};
+
+struct flb_aws_provider *flb_eks_provider_create(struct flb_config *config,
+ struct flb_tls *tls,
+ char *region,
+ char *sts_endpoint,
+ char *proxy,
+ struct
+ flb_aws_client_generator
+ *generator)
+{
+ struct flb_aws_provider_eks *implementation = NULL;
+ struct flb_aws_provider *provider = NULL;
+ struct flb_upstream *upstream = NULL;
+
+ provider = flb_calloc(1, sizeof(struct flb_aws_provider));
+
+ if (!provider) {
+ flb_errno();
+ return NULL;
+ }
+
+ pthread_mutex_init(&provider->lock, NULL);
+
+ implementation = flb_calloc(1, sizeof(struct flb_aws_provider_eks));
+
+ if (!implementation) {
+ goto error;
+ }
+
+ provider->provider_vtable = &eks_provider_vtable;
+ provider->implementation = implementation;
+
+ /* session name either comes from the env var or is a random uuid */
+ implementation->session_name = getenv(SESSION_NAME_ENV_VAR);
+ implementation->free_session_name = FLB_FALSE;
+ if (!implementation->session_name ||
+ strlen(implementation->session_name) == 0) {
+ implementation->session_name = flb_sts_session_name();
+ if (!implementation->session_name) {
+ goto error;
+ }
+ implementation->free_session_name = FLB_TRUE;
+ }
+
+ implementation->role_arn = getenv(ROLE_ARN_ENV_VAR);
+ if (!implementation->role_arn || strlen(implementation->role_arn) == 0) {
+ flb_debug("[aws_credentials] Not initializing EKS provider because"
+ " %s was not set", ROLE_ARN_ENV_VAR);
+ flb_aws_provider_destroy(provider);
+ return NULL;
+ }
+
+ implementation->token_file = getenv(TOKEN_FILE_ENV_VAR);
+ if (!implementation->token_file || strlen(implementation->token_file) == 0)
+ {
+ flb_debug("[aws_credentials] Not initializing EKS provider because"
+ " %s was not set", TOKEN_FILE_ENV_VAR);
+ flb_aws_provider_destroy(provider);
+ return NULL;
+ }
+
+ if (sts_endpoint) {
+ implementation->endpoint = removeProtocol(sts_endpoint, "https://");
+ implementation->custom_endpoint = FLB_TRUE;
+ }
+ else {
+ implementation->endpoint = flb_aws_endpoint("sts", region);
+ implementation->custom_endpoint = FLB_FALSE;
+ }
+
+ if(!implementation->endpoint) {
+ goto error;
+ }
+
+ implementation->sts_client = generator->create();
+ if (!implementation->sts_client) {
+ goto error;
+ }
+ implementation->sts_client->name = "sts_client_eks_provider";
+ /* AssumeRoleWithWebIdentity does not require sigv4 */
+ implementation->sts_client->has_auth = FLB_FALSE;
+ implementation->sts_client->provider = NULL;
+ implementation->sts_client->region = region;
+ implementation->sts_client->service = "sts";
+ implementation->sts_client->port = 443;
+ implementation->sts_client->flags = 0;
+ implementation->sts_client->proxy = proxy;
+
+ upstream = flb_upstream_create(config, implementation->endpoint, 443,
+ FLB_IO_TLS, tls);
+
+ if (!upstream) {
+ goto error;
+ }
+
+ upstream->base.net.connect_timeout = FLB_AWS_CREDENTIAL_NET_TIMEOUT;
+
+ implementation->sts_client->upstream = upstream;
+ implementation->sts_client->host = implementation->endpoint;
+
+ return provider;
+
+error:
+ flb_errno();
+ flb_aws_provider_destroy(provider);
+ return NULL;
+}
+
+/* Generates string which can serve as a unique session name */
+char *flb_sts_session_name() {
+ unsigned char random_data[SESSION_NAME_RANDOM_BYTE_LEN];
+ char *session_name = NULL;
+ int ret;
+
+ ret = flb_random_bytes(random_data, SESSION_NAME_RANDOM_BYTE_LEN);
+
+ if (ret != 0) {
+ flb_errno();
+
+ return NULL;
+ }
+
+ session_name = flb_calloc(SESSION_NAME_RANDOM_BYTE_LEN + 1,
+ sizeof(char));
+ if (session_name == NULL) {
+ flb_errno();
+
+ return NULL;
+ }
+
+ bytes_to_string(random_data, session_name, SESSION_NAME_RANDOM_BYTE_LEN);
+
+ return session_name;
+}
+
+/* converts random bytes to a string we can safely put in a URL */
+void bytes_to_string(unsigned char *data, char *buf, size_t len) {
+ int index;
+ char charset[] = "0123456789"
+ "abcdefghijklmnopqrstuvwxyz"
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+
+ while (len-- > 0) {
+ index = (int) data[len];
+ index = index % (sizeof(charset) - 1);
+ buf[len] = charset[index];
+ }
+}
+
+static int assume_with_web_identity(struct flb_aws_provider_eks
+ *implementation)
+{
+ int ret;
+ char *web_token = NULL;
+ size_t web_token_size;
+ flb_sds_t uri = NULL;
+ int init_mode = implementation->sts_client->debug_only;
+
+ ret = flb_read_file(implementation->token_file, &web_token,
+ &web_token_size);
+ if (ret < 0) {
+ if (init_mode == FLB_TRUE) {
+ flb_debug("[aws_credentials] Could not read web identify token file");
+ } else {
+ flb_error("[aws_credentials] Could not read web identify token file");
+ }
+ return -1;
+ }
+
+ uri = flb_sts_uri("AssumeRoleWithWebIdentity", implementation->role_arn,
+ implementation->session_name, NULL, web_token);
+ if (!uri) {
+ flb_free(web_token);
+ return -1;
+ }
+
+ ret = sts_assume_role_request(implementation->sts_client,
+ &implementation->creds, uri,
+ &implementation->next_refresh);
+ flb_free(web_token);
+ flb_sds_destroy(uri);
+ return ret;
+}
+
+static int sts_assume_role_request(struct flb_aws_client *sts_client,
+ struct flb_aws_credentials **creds,
+ char *uri,
+ time_t *next_refresh)
+{
+ time_t expiration;
+ struct flb_aws_credentials *credentials = NULL;
+ struct flb_http_client *c = NULL;
+ flb_sds_t error_type;
+ int init_mode = sts_client->debug_only;
+
+ flb_debug("[aws_credentials] Calling STS..");
+
+ c = sts_client->client_vtable->request(sts_client, FLB_HTTP_GET,
+ uri, NULL, 0, NULL, 0);
+
+ if (c && c->resp.status == 200) {
+ credentials = flb_parse_sts_resp(c->resp.payload, &expiration);
+ if (!credentials) {
+ if (init_mode == FLB_TRUE) {
+ flb_debug("[aws_credentials] Failed to parse response from STS");
+ }
+ else {
+ flb_error("[aws_credentials] Failed to parse response from STS");
+ }
+ flb_http_client_destroy(c);
+ return -1;
+ }
+
+ /* unset and free existing credentials first */
+ flb_aws_credentials_destroy(*creds);
+ *creds = NULL;
+
+ *next_refresh = expiration - FLB_AWS_REFRESH_WINDOW;
+ *creds = credentials;
+ flb_http_client_destroy(c);
+ return 0;
+ }
+
+ if (c && c->resp.payload_size > 0) {
+ error_type = flb_aws_error(c->resp.payload, c->resp.payload_size);
+ if (error_type) {
+ if (init_mode == FLB_TRUE) {
+ flb_debug("[aws_credentials] STS API responded with %s", error_type);
+ }
+ else {
+ flb_error("[aws_credentials] STS API responded with %s", error_type);
+ }
+ } else {
+ flb_debug("[aws_credentials] STS raw response: \n%s",
+ c->resp.payload);
+ }
+ }
+
+ if (c) {
+ flb_http_client_destroy(c);
+ }
+ if (init_mode == FLB_TRUE) {
+ flb_debug("[aws_credentials] STS assume role request failed");
+ }
+ else {
+ flb_error("[aws_credentials] STS assume role request failed");
+ }
+ return -1;
+
+}
+
+/*
+ * The STS APIs return an XML document with credentials.
+ * The part of the document we care about looks like this:
+ * <Credentials>
+ * <AccessKeyId>akid</AccessKeyId>
+ * <SecretAccessKey>skid</SecretAccessKey>
+ * <SessionToken>token</SessionToken>
+ * <Expiration>2019-11-09T13:34:41Z</Expiration>
+ * </Credentials>
+ */
+struct flb_aws_credentials *flb_parse_sts_resp(char *response,
+ time_t *expiration)
+{
+ struct flb_aws_credentials *creds = NULL;
+ char *cred_node = NULL;
+ flb_sds_t tmp = NULL;
+
+ cred_node = strstr(response, CREDENTIALS_NODE);
+ if (!cred_node) {
+ flb_error("[aws_credentials] Could not find '%s' node in sts response",
+ CREDENTIALS_NODE);
+ return NULL;
+ }
+ cred_node += CREDENTIALS_NODE_LEN;
+
+ creds = flb_calloc(1, sizeof(struct flb_aws_credentials));
+ if (!creds) {
+ flb_errno();
+ return NULL;
+ }
+
+ creds->access_key_id = get_node(cred_node, ACCESS_KEY_NODE,
+ ACCESS_KEY_NODE_LEN, ACCESS_KEY_NODE_END);
+ if (!creds->access_key_id) {
+ goto error;
+ }
+
+ creds->secret_access_key = get_node(cred_node, SECRET_KEY_NODE,
+ SECRET_KEY_NODE_LEN, SECRET_KEY_NODE_END);
+ if (!creds->secret_access_key) {
+ goto error;
+ }
+
+ creds->session_token = get_node(cred_node, SESSION_TOKEN_NODE,
+ SESSION_TOKEN_NODE_LEN, SESSION_TOKEN_NODE_END);
+ if (!creds->session_token) {
+ goto error;
+ }
+
+ tmp = get_node(cred_node, EXPIRATION_NODE, EXPIRATION_NODE_LEN, EXPIRATION_NODE_END);
+ if (!tmp) {
+ goto error;
+ }
+ *expiration = flb_aws_cred_expiration(tmp);
+
+ flb_sds_destroy(tmp);
+ return creds;
+
+error:
+ flb_aws_credentials_destroy(creds);
+ if (tmp) {
+ flb_sds_destroy(tmp);
+ }
+ return NULL;
+}
+
+/*
+ * Constructs the STS request uri.
+ * external_id can be NULL.
+ */
+flb_sds_t flb_sts_uri(char *action, char *role_arn, char *session_name,
+ char *external_id, char *identity_token)
+{
+ flb_sds_t tmp;
+ flb_sds_t uri = NULL;
+ size_t len = STS_ASSUME_ROLE_URI_BASE_LEN;
+
+ if (external_id) {
+ len += 12; /* will add "&ExternalId=" */
+ len += strlen(external_id);
+ }
+
+ if (identity_token) {
+ len += 18; /* will add "&WebIdentityToken=" */
+ len += strlen(identity_token);
+ }
+
+
+ len += strlen(session_name);
+ len += strlen(role_arn);
+ len += strlen(action);
+ len++; /* null char */
+
+ uri = flb_sds_create_size(len);
+ if (!uri) {
+ return NULL;
+ }
+
+ tmp = flb_sds_printf(&uri, STS_ASSUME_ROLE_URI_FORMAT, action, session_name,
+ role_arn);
+ if (!tmp) {
+ flb_sds_destroy(uri);
+ return NULL;
+ }
+
+ if (external_id) {
+ flb_sds_printf(&uri, "&ExternalId=%s", external_id);
+ }
+
+ if (identity_token) {
+ flb_sds_printf(&uri, "&WebIdentityToken=%s", identity_token);
+ }
+
+ return uri;
+}
+
+static flb_sds_t get_node(char *cred_node, char* node_name, int node_name_len, char* node_end)
+{
+ char *node = NULL;
+ char *end = NULL;
+ flb_sds_t val = NULL;
+ int len;
+
+ node = strstr(cred_node, node_name);
+ if (!node) {
+ flb_error("[aws_credentials] Could not find '%s' node in sts response",
+ node_name);
+ return NULL;
+ }
+ node += node_name_len;
+ end = strstr(node, node_end);
+ if (!end) {
+ flb_error("[aws_credentials] Could not find end of '%s' node in "
+ "sts response", node_name);
+ return NULL;
+ }
+ len = end - node;
+ val = flb_sds_create_len(node, len);
+ if (!val) {
+ flb_errno();
+ return NULL;
+ }
+
+ return val;
+}
diff --git a/fluent-bit/src/aws/flb_aws_error_reporter.c b/fluent-bit/src/aws/flb_aws_error_reporter.c
new file mode 100644
index 000000000..72da9666c
--- /dev/null
+++ b/fluent-bit/src/aws/flb_aws_error_reporter.c
@@ -0,0 +1,276 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <time.h>
+#include <monkey/mk_core/mk_list.h>
+
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_log.h>
+#include <fluent-bit/flb_env.h>
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/aws/flb_aws_error_reporter.h>
+
+/* helper function to get int type environment variable*/
+static int getenv_int(const char *name) {
+ char *value, *end;
+ long result;
+
+ value = getenv(name);
+ if (!value) {
+ return 0;
+ }
+
+ result = strtol(value, &end, 10);
+ if (*end != '\0') {
+ return 0;
+ }
+ return (int) result;
+}
+
+/* create an error reporter*/
+struct flb_aws_error_reporter *flb_aws_error_reporter_create()
+{
+ char *path_var = NULL;
+ int ttl_var, status_message_length;
+ struct flb_aws_error_reporter *error_reporter;
+ FILE *f;
+ int ret;
+
+ error_reporter = flb_calloc(1, sizeof(struct flb_aws_error_reporter));
+ if (!error_reporter) {
+ flb_errno();
+ return NULL;
+ }
+
+ /* setup error report file path */
+ path_var = getenv(STATUS_MESSAGE_FILE_PATH_ENV);
+ if (path_var == NULL) {
+ flb_free(error_reporter);
+ flb_errno();
+ return NULL;
+ }
+
+ error_reporter->file_path = flb_sds_create(path_var);
+ if (!error_reporter->file_path) {
+ flb_free(error_reporter);
+ flb_errno();
+ return NULL;
+ }
+
+ /* clean up existing file*/
+ if ((f = fopen(error_reporter->file_path, "r")) != NULL) {
+ /* file exist, try delete it*/
+ if (remove(error_reporter->file_path)) {
+ flb_free(error_reporter);
+ flb_errno();
+ return NULL;
+ }
+ }
+
+ /* setup error reporter message TTL */
+ ttl_var = getenv_int(STATUS_MESSAGE_TTL_ENV);
+ if (ttl_var <= 0) {
+ ttl_var = STATUS_MESSAGE_TTL_DEFAULT;
+ }
+ error_reporter->ttl = ttl_var;
+
+ /* setup error reporter file size */
+ status_message_length = getenv_int(STATUS_MESSAGE_MAX_BYTE_LENGTH_ENV);
+ if(status_message_length <= 0) {
+ status_message_length = STATUS_MESSAGE_MAX_BYTE_LENGTH_DEFAULT;
+ }
+ error_reporter->max_size = status_message_length;
+
+ /* create the message Linked Lists */
+ mk_list_init(&error_reporter->messages);
+
+ return error_reporter;
+}
+
+/* error reporter write the error message into reporting file and memory*/
+int flb_aws_error_reporter_write(struct flb_aws_error_reporter *error_reporter, char *msg)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_error_message *message;
+ struct flb_error_message *tmp_message;
+ flb_sds_t buf;
+ flb_sds_t buf_tmp;
+ int deleted_message_count = 0;
+ FILE *f;
+
+ if (error_reporter == NULL) {
+ return -1;
+ }
+
+ buf = flb_sds_create(msg);
+ if (!buf) {
+ flb_errno();
+ return -1;
+ }
+ /* check if the message is the same with latest one in queue*/
+ if (mk_list_is_empty(&error_reporter->messages) != 0) {
+ tmp_message = mk_list_entry_last(&error_reporter->messages,
+ struct flb_error_message, _head);
+ if (tmp_message->len == flb_sds_len(buf) &&
+ flb_sds_cmp(tmp_message->data, buf, tmp_message->len) == 0) {
+
+ tmp_message->timestamp = time(NULL);
+ flb_sds_destroy(buf);
+ return 0;
+ }
+ }
+
+ message = flb_malloc(sizeof(struct flb_error_message));
+ if (!message) {
+ flb_sds_destroy(buf);
+ flb_errno();
+ return -1;
+ }
+
+ /* check if new message is too large and truncate*/
+ if (flb_sds_len(buf) > error_reporter->max_size) {
+ // truncate message
+ buf_tmp = flb_sds_copy(buf, msg, error_reporter->max_size);
+ if (!buf_tmp) {
+ flb_sds_destroy(buf);
+ flb_free(message);
+ return -1;
+ }
+ }
+
+ message->data = flb_sds_create(buf);
+ if (!message->data) {
+ flb_sds_destroy(buf);
+ flb_free(message);
+ return -1;
+ }
+
+ message->len = flb_sds_len(buf);
+
+ /* clean up old message to provide enough space for new message*/
+ mk_list_foreach_safe(head, tmp, &error_reporter->messages) {
+ tmp_message = mk_list_entry(head, struct flb_error_message, _head);
+ if (error_reporter->file_size + flb_sds_len(buf) <= error_reporter->max_size) {
+ break;
+ }
+ else {
+ error_reporter->file_size -= tmp_message->len;
+ deleted_message_count++;
+ mk_list_del(&tmp_message->_head);
+ flb_sds_destroy(tmp_message->data);
+ flb_free(tmp_message);
+ }
+ }
+ message->timestamp = time(NULL);
+
+ mk_list_add(&message->_head, &error_reporter->messages);
+ error_reporter->file_size += message->len;
+
+ if (deleted_message_count == 0) {
+ f = fopen(error_reporter->file_path, "a");
+ fprintf(f, message->data);
+ }
+ else {
+ f = fopen(error_reporter->file_path, "w");
+ mk_list_foreach_safe(head, tmp, &error_reporter->messages) {
+ tmp_message = mk_list_entry(head, struct flb_error_message, _head);
+ fprintf(f, tmp_message->data);
+ }
+ }
+ fclose(f);
+
+ flb_sds_destroy(buf);
+
+ return 0;
+
+}
+
+/* error reporter clean up the expired message based on TTL*/
+void flb_aws_error_reporter_clean(struct flb_aws_error_reporter *error_reporter)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_error_message *message;
+ int expired_message_count = 0;
+ FILE *f;
+
+ if (error_reporter == NULL) {
+ return;
+ }
+
+ /* check the timestamp for every message and clean up expired messages*/
+ mk_list_foreach_safe(head, tmp, &error_reporter->messages) {
+ message = mk_list_entry(head, struct flb_error_message, _head);
+ if (error_reporter->ttl > time(NULL) - message->timestamp) {
+ break;
+ }
+ error_reporter->file_size -= message->len;
+ mk_list_del(&message->_head);
+ flb_sds_destroy(message->data);
+ flb_free(message);
+ expired_message_count++;
+ }
+
+ /* rewrite error report file if any message is cleaned up*/
+ if (expired_message_count > 0) {
+ f = fopen(error_reporter->file_path, "w");
+ mk_list_foreach_safe(head, tmp, &error_reporter->messages) {
+ message = mk_list_entry(head, struct flb_error_message, _head);
+ fprintf(f, message->data);
+ }
+ fclose(f);
+ }
+}
+
+/* error reporter clean up when fluent bit shutdown*/
+void flb_aws_error_reporter_destroy(struct flb_aws_error_reporter *error_reporter)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_error_message *message;
+
+ if (error_reporter == NULL) {
+ return;
+ }
+
+ if(error_reporter->file_path) {
+ flb_sds_destroy(error_reporter->file_path);
+ }
+ if (mk_list_is_empty(&error_reporter->messages) != 0) {
+
+ mk_list_foreach_safe(head, tmp, &error_reporter->messages) {
+ message = mk_list_entry(head, struct flb_error_message, _head);
+ mk_list_del(&message->_head);
+ flb_sds_destroy(message->data);
+ flb_free(message);
+ }
+ mk_list_del(&error_reporter->messages);
+ }
+
+ flb_free(error_reporter);
+}
+
+/*check if system enable error reporting*/
+int is_error_reporting_enabled()
+{
+ return getenv(STATUS_MESSAGE_FILE_PATH_ENV) != NULL;
+} \ No newline at end of file
diff --git a/fluent-bit/src/aws/flb_aws_imds.c b/fluent-bit/src/aws/flb_aws_imds.c
new file mode 100644
index 000000000..0e54db161
--- /dev/null
+++ b/fluent-bit/src/aws/flb_aws_imds.c
@@ -0,0 +1,370 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/aws/flb_aws_imds.h>
+#include <fluent-bit/flb_aws_credentials.h>
+#include <fluent-bit/flb_aws_util.h>
+#include <fluent-bit/flb_http_client.h>
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_jsmn.h>
+
+#define FLB_AWS_IMDS_ROOT "/"
+#define FLB_AWS_IMDS_V2_TOKEN_PATH "/latest/api/token"
+
+/* Request headers */
+static struct flb_aws_header imds_v2_token_ttl_header = {
+ .key = "X-aws-ec2-metadata-token-ttl-seconds",
+ .key_len = 36,
+ .val = "21600", /* 6 hours (ie maximum ttl) */
+ .val_len = 5,
+};
+
+/* Request header templates */
+const static struct flb_aws_header imds_v2_token_token_header_template = {
+ .key = "X-aws-ec2-metadata-token",
+ .key_len = 24,
+ .val = "", /* Replace with token value */
+ .val_len = 0, /* Replace with token length */
+};
+
+/* Declarations */
+static int get_imds_version(struct flb_aws_imds *ctx);
+static int refresh_imds_v2_token(struct flb_aws_imds *ctx);
+
+/* Default config values */
+const struct flb_aws_imds_config flb_aws_imds_config_default = {
+ FLB_AWS_IMDS_VERSION_EVALUATE};
+
+/* Create IMDS context */
+struct flb_aws_imds *flb_aws_imds_create(const struct flb_aws_imds_config *imds_config,
+ struct flb_aws_client *ec2_imds_client)
+{
+ struct flb_aws_imds *ctx = NULL;
+
+ /* Create context */
+ ctx = flb_calloc(1, sizeof(struct flb_aws_imds));
+ if (!ctx) {
+ flb_errno();
+ return NULL;
+ }
+
+ /*
+ * Set IMDS version to whatever is specified in config
+ * Version may be evaluated later if set to FLB_AWS_IMDS_VERSION_EVALUATE
+ */
+ ctx->imds_version = imds_config->use_imds_version;
+ ctx->imds_v2_token = flb_sds_create_len("INVALID_TOKEN", 13);
+ ctx->imds_v2_token_len = 13;
+
+ /* Detect IMDS support */
+ if (!ec2_imds_client->upstream) {
+ flb_debug(
+ "[imds] unable to connect to EC2 IMDS. ec2_imds_client upstream is null");
+
+ flb_aws_imds_destroy(ctx);
+ return NULL;
+ }
+ if (0 != strncmp(ec2_imds_client->upstream->tcp_host, FLB_AWS_IMDS_HOST,
+ FLB_AWS_IMDS_HOST_LEN)) {
+ flb_debug("[imds] ec2_imds_client tcp host must be set to %s", FLB_AWS_IMDS_HOST);
+ flb_aws_imds_destroy(ctx);
+ return NULL;
+ }
+ if (ec2_imds_client->upstream->tcp_port != FLB_AWS_IMDS_PORT) {
+ flb_debug("[imds] ec2_imds_client tcp port must be set to %i", FLB_AWS_IMDS_PORT);
+ flb_aws_imds_destroy(ctx);
+ return NULL;
+ }
+
+ /* Connect client */
+ ctx->ec2_imds_client = ec2_imds_client;
+ return ctx;
+}
+
+/* Destroy IMDS context */
+void flb_aws_imds_destroy(struct flb_aws_imds *ctx)
+{
+ if (ctx->imds_v2_token) {
+ flb_sds_destroy(ctx->imds_v2_token);
+ }
+
+ flb_free(ctx);
+}
+
+/* Get IMDS metadata */
+int flb_aws_imds_request(struct flb_aws_imds *ctx, const char *metadata_path,
+ flb_sds_t *metadata, size_t *metadata_len)
+{
+ return flb_aws_imds_request_by_key(ctx, metadata_path, metadata, metadata_len, NULL);
+}
+
+/* Get IMDS metadata by key */
+int flb_aws_imds_request_by_key(struct flb_aws_imds *ctx, const char *metadata_path,
+ flb_sds_t *metadata, size_t *metadata_len, char *key)
+{
+ int ret;
+ flb_sds_t tmp;
+
+ struct flb_http_client *c = NULL;
+
+ struct flb_aws_client *ec2_imds_client = ctx->ec2_imds_client;
+ struct flb_aws_header token_header = imds_v2_token_token_header_template;
+
+ /* Get IMDS version */
+ int imds_version = get_imds_version(ctx);
+
+ /* Abort on version detection failure */
+ if (imds_version == FLB_AWS_IMDS_VERSION_EVALUATE) {
+ /* Exit gracefully allowing for retrys */
+ flb_warn("[imds] unable to evaluate IMDS version");
+ return -1;
+ }
+
+ if (imds_version == FLB_AWS_IMDS_VERSION_2) {
+ token_header.val = ctx->imds_v2_token;
+ token_header.val_len = ctx->imds_v2_token_len;
+ flb_debug("[imds] using IMDSv2");
+ }
+ else {
+ flb_debug("[imds] using IMDSv1");
+ }
+
+ c = ec2_imds_client->client_vtable->request(
+ ec2_imds_client, FLB_HTTP_GET, metadata_path, NULL, 0, &token_header,
+ (imds_version == FLB_AWS_IMDS_VERSION_1) ? 0 : 1);
+ if (!c) {
+ /* Exit gracefully allowing for retrys */
+ flb_warn("[imds] failed to retrieve metadata");
+ return -1;
+ }
+
+ /* Detect invalid token */
+ if (imds_version == FLB_AWS_IMDS_VERSION_2 && c->resp.status == 401) {
+ /* Refresh token and retry request */
+ flb_http_client_destroy(c);
+ ret = refresh_imds_v2_token(ctx);
+ if (ret < 0) {
+ flb_debug("[imds] failed to refresh IMDSv2 token");
+ return -1;
+ }
+ token_header.val = ctx->imds_v2_token;
+ token_header.val_len = ctx->imds_v2_token_len;
+ flb_debug("[imds] refreshed IMDSv2 token");
+ c = ec2_imds_client->client_vtable->request(
+ ec2_imds_client, FLB_HTTP_GET, metadata_path, NULL, 0, &token_header, 1);
+ if (!c) {
+ /* Exit gracefully allowing for retries */
+ flb_warn("[imds] failed to retrieve metadata");
+ return -1;
+ }
+ }
+
+ if (c->resp.status != 200) {
+ ret = -1;
+ if (c->resp.status == 404) {
+ ret = -2;
+ }
+ if (c->resp.payload_size > 0) {
+ flb_debug("[imds] metadata request failure response\n%s", c->resp.payload);
+ }
+ flb_http_client_destroy(c);
+ return ret;
+ }
+
+ if (key != NULL) {
+ /* get the value of the key from payload json string */
+ tmp = flb_json_get_val(c->resp.payload, c->resp.payload_size, key);
+ if (!tmp) {
+ tmp = flb_sds_create_len("NULL", 4);
+ flb_error("[imds] %s is undefined in EC2 instance", key);
+ }
+ }
+ else {
+ tmp = flb_sds_create_len(c->resp.payload, c->resp.payload_size);
+ }
+
+ if (!tmp) {
+ flb_errno();
+ flb_http_client_destroy(c);
+ return -1;
+ }
+
+ *metadata = tmp;
+ *metadata_len = key == NULL ? c->resp.payload_size : strlen(tmp);
+
+ flb_http_client_destroy(c);
+ return 0;
+}
+
+/* Get VPC Id */
+flb_sds_t flb_aws_imds_get_vpc_id(struct flb_aws_imds *ctx)
+{
+ int ret;
+ flb_sds_t mac_id = NULL;
+ size_t mac_len = 0;
+ flb_sds_t vpc_id = NULL;
+ size_t vpc_id_len = 0;
+
+ /* get EC2 instance Mac id first before getting VPC id */
+ ret = flb_aws_imds_request(ctx, FLB_AWS_IMDS_MAC_PATH, &mac_id, &mac_len);
+
+ if (ret < 0) {
+ flb_sds_destroy(mac_id);
+ return NULL;
+ }
+
+ /*
+ * the VPC full path should be like:
+ * latest/meta-data/network/interfaces/macs/{mac_id}/vpc-id/"
+ */
+ flb_sds_t vpc_path = flb_sds_create_size(70);
+ vpc_path =
+ flb_sds_printf(&vpc_path, "%s/%s/%s/",
+ "/latest/meta-data/network/interfaces/macs", mac_id, "vpc-id");
+ ret = flb_aws_imds_request(ctx, vpc_path, &vpc_id, &vpc_id_len);
+
+ flb_sds_destroy(mac_id);
+ flb_sds_destroy(vpc_path);
+
+ return vpc_id;
+}
+
+/* Obtain the IMDS version */
+static int get_imds_version(struct flb_aws_imds *ctx)
+{
+ int ret;
+ struct flb_aws_client *client = ctx->ec2_imds_client;
+ struct flb_aws_header invalid_token_header;
+ struct flb_http_client *c = NULL;
+
+ if (ctx->imds_version != FLB_AWS_IMDS_VERSION_EVALUATE) {
+ return ctx->imds_version;
+ }
+
+ /*
+ * Evaluate version
+ * To evaluate wether IMDSv2 is available, send an invalid token
+ * in IMDS request. If response status is 'Unauthorized', then IMDSv2
+ * is available.
+ */
+ invalid_token_header = imds_v2_token_token_header_template;
+ invalid_token_header.val = "INVALID";
+ invalid_token_header.val_len = 7;
+ c = client->client_vtable->request(client, FLB_HTTP_GET, FLB_AWS_IMDS_ROOT, NULL, 0,
+ &invalid_token_header, 1);
+
+ if (!c) {
+ flb_debug("[imds] imds endpoint unavailable");
+ return FLB_AWS_IMDS_VERSION_EVALUATE;
+ }
+
+ /* Unauthorized response means that IMDS version 2 is in use */
+ if (c->resp.status == 401) {
+ ctx->imds_version = FLB_AWS_IMDS_VERSION_2;
+ ret = refresh_imds_v2_token(ctx);
+ if (ret == -1) {
+ /*
+ * Token cannot be refreshed, test IMDSv1
+ * If IMDSv1 cannot be used, response will be status 401
+ */
+ flb_http_client_destroy(c);
+ ctx->imds_version = FLB_AWS_IMDS_VERSION_EVALUATE;
+ c = client->client_vtable->request(client, FLB_HTTP_GET, FLB_AWS_IMDS_ROOT,
+ NULL, 0, NULL, 0);
+ if (!c) {
+ flb_debug("[imds] imds v1 attempt, endpoint unavailable");
+ return FLB_AWS_IMDS_VERSION_EVALUATE;
+ }
+
+ if (c->resp.status == 200) {
+ flb_info("[imds] to use IMDSv2, set --http-put-response-hop-limit to 2");
+ }
+ else {
+ /* IMDSv1 unavailable. IMDSv2 beyond network hop count */
+ flb_warn("[imds] failed to retrieve IMDSv2 token and IMDSv1 unavailable. "
+ "This is likely due to instance-metadata-options "
+ "--http-put-response-hop-limit being set to 1 and --http-tokens "
+ "set to required. "
+ "To use IMDSv2, please set --http-put-response-hop-limit to 2 as "
+ "described https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/"
+ "configuring-instance-metadata-options.html");
+ }
+ }
+ }
+
+ /*
+ * Success means that IMDS version 1 is in use
+ */
+ if (c->resp.status == 200) {
+ flb_warn("[imds] falling back on IMDSv1");
+ ctx->imds_version = FLB_AWS_IMDS_VERSION_1;
+ }
+
+ flb_http_client_destroy(c);
+ return ctx->imds_version;
+}
+
+/*
+ * Get an IMDSv2 token
+ * Token preserved in imds context
+ */
+static int refresh_imds_v2_token(struct flb_aws_imds *ctx)
+{
+ struct flb_http_client *c = NULL;
+ struct flb_aws_client *ec2_imds_client = ctx->ec2_imds_client;
+
+ c = ec2_imds_client->client_vtable->request(ec2_imds_client, FLB_HTTP_PUT,
+ FLB_AWS_IMDS_V2_TOKEN_PATH, NULL, 0,
+ &imds_v2_token_ttl_header, 1);
+
+ if (!c) {
+ return -1;
+ }
+
+ if (c->resp.status != 200) {
+ if (c->resp.payload_size > 0) {
+ flb_error("[imds] IMDSv2 token retrieval failure response\n%s",
+ c->resp.payload);
+ }
+
+ flb_http_client_destroy(c);
+ return -1;
+ }
+
+ /* Preserve token information in ctx */
+ if (c->resp.payload_size > 0) {
+ if (ctx->imds_v2_token) {
+ flb_sds_destroy(ctx->imds_v2_token);
+ }
+ ctx->imds_v2_token = flb_sds_create_len(c->resp.payload, c->resp.payload_size);
+ if (!ctx->imds_v2_token) {
+ flb_errno();
+ flb_http_client_destroy(c);
+ return -1;
+ }
+ ctx->imds_v2_token_len = c->resp.payload_size;
+
+ flb_http_client_destroy(c);
+ return 0;
+ }
+
+ flb_debug("[imds] IMDS metadata response was empty");
+ flb_http_client_destroy(c);
+ return -1;
+}
diff --git a/fluent-bit/src/aws/flb_aws_util.c b/fluent-bit/src/aws/flb_aws_util.c
new file mode 100644
index 000000000..533bba7eb
--- /dev/null
+++ b/fluent-bit/src/aws/flb_aws_util.c
@@ -0,0 +1,1047 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/flb_http_client.h>
+#include <fluent-bit/flb_signv4.h>
+#include <fluent-bit/flb_aws_util.h>
+#include <fluent-bit/flb_aws_credentials.h>
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_jsmn.h>
+#include <fluent-bit/flb_env.h>
+
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#define AWS_SERVICE_ENDPOINT_FORMAT "%s.%s.amazonaws.com"
+#define AWS_SERVICE_ENDPOINT_BASE_LEN 15
+
+#define TAG_PART_DESCRIPTOR "$TAG[%d]"
+#define TAG_DESCRIPTOR "$TAG"
+#define MAX_TAG_PARTS 10
+#define S3_KEY_SIZE 1024
+#define RANDOM_STRING "$UUID"
+#define INDEX_STRING "$INDEX"
+#define AWS_USER_AGENT_NONE "none"
+#define AWS_USER_AGENT_ECS "ecs"
+#define AWS_USER_AGENT_K8S "k8s"
+#define AWS_ECS_METADATA_URI "ECS_CONTAINER_METADATA_URI_V4"
+#define FLB_MAX_AWS_RESP_BUFFER_SIZE 0 /* 0 means unlimited capacity as per requirement */
+
+#ifdef FLB_SYSTEM_WINDOWS
+#define FLB_AWS_BASE_USER_AGENT "aws-fluent-bit-plugin-windows"
+#define FLB_AWS_BASE_USER_AGENT_FORMAT "aws-fluent-bit-plugin-windows-%s"
+#define FLB_AWS_BASE_USER_AGENT_LEN 29
+#else
+#define FLB_AWS_BASE_USER_AGENT "aws-fluent-bit-plugin"
+#define FLB_AWS_BASE_USER_AGENT_FORMAT "aws-fluent-bit-plugin-%s"
+#define FLB_AWS_BASE_USER_AGENT_LEN 21
+#endif
+
+#define FLB_AWS_MILLISECOND_FORMATTER_LENGTH 3
+#define FLB_AWS_NANOSECOND_FORMATTER_LENGTH 9
+#define FLB_AWS_MILLISECOND_FORMATTER "%3N"
+#define FLB_AWS_NANOSECOND_FORMATTER_N "%9N"
+#define FLB_AWS_NANOSECOND_FORMATTER_L "%L"
+
+struct flb_http_client *request_do(struct flb_aws_client *aws_client,
+ int method, const char *uri,
+ const char *body, size_t body_len,
+ struct flb_aws_header *dynamic_headers,
+ size_t dynamic_headers_len);
+
+/*
+ * https://service.region.amazonaws.com(.cn)
+ */
+char *flb_aws_endpoint(char* service, char* region)
+{
+ char *endpoint = NULL;
+ size_t len = AWS_SERVICE_ENDPOINT_BASE_LEN;
+ int is_cn = FLB_FALSE;
+ int bytes;
+
+
+ /* In the China regions, ".cn" is appended to the URL */
+ if (strcmp("cn-north-1", region) == 0) {
+ len += 3;
+ is_cn = FLB_TRUE;
+ }
+ if (strcmp("cn-northwest-1", region) == 0) {
+ len += 3;
+ is_cn = FLB_TRUE;
+ }
+
+ len += strlen(service);
+ len += strlen(region);
+ len++; /* null byte */
+
+ endpoint = flb_calloc(len, sizeof(char));
+ if (!endpoint) {
+ flb_errno();
+ return NULL;
+ }
+
+ bytes = snprintf(endpoint, len, AWS_SERVICE_ENDPOINT_FORMAT, service, region);
+ if (bytes < 0) {
+ flb_errno();
+ flb_free(endpoint);
+ return NULL;
+ }
+
+ if (is_cn) {
+ memcpy(endpoint + bytes, ".cn", 3);
+ endpoint[bytes + 3] = '\0';
+ }
+
+ return endpoint;
+
+}
+
+int flb_read_file(const char *path, char **out_buf, size_t *out_size)
+{
+ int ret;
+ long bytes;
+ char *buf = NULL;
+ struct stat st;
+ int fd;
+
+ fd = open(path, O_RDONLY);
+ if (fd < 0) {
+ return -1;
+ }
+
+ ret = fstat(fd, &st);
+ if (ret == -1) {
+ flb_errno();
+ close(fd);
+ return -1;
+ }
+
+ buf = flb_calloc(st.st_size + 1, sizeof(char));
+ if (!buf) {
+ flb_errno();
+ close(fd);
+ return -1;
+ }
+
+ bytes = read(fd, buf, st.st_size);
+ if (bytes < 0) {
+ flb_errno();
+ flb_free(buf);
+ close(fd);
+ return -1;
+ }
+
+ /* fread does not add null byte */
+ buf[st.st_size] = '\0';
+
+ close(fd);
+ *out_buf = buf;
+ *out_size = st.st_size;
+
+ return 0;
+}
+
+
+char *removeProtocol (char *endpoint, char *protocol) {
+ if (strncmp(protocol, endpoint, strlen(protocol)) == 0){
+ endpoint = endpoint + strlen(protocol);
+ }
+ return endpoint;
+}
+
+struct flb_http_client *flb_aws_client_request(struct flb_aws_client *aws_client,
+ int method, const char *uri,
+ const char *body, size_t body_len,
+ struct flb_aws_header
+ *dynamic_headers,
+ size_t dynamic_headers_len)
+{
+ struct flb_http_client *c = NULL;
+
+ c = request_do(aws_client, method, uri, body, body_len,
+ dynamic_headers, dynamic_headers_len);
+
+ // Auto retry if request fails
+ if (c == NULL && aws_client->retry_requests) {
+ flb_debug("[aws_client] auto-retrying");
+ c = request_do(aws_client, method, uri, body, body_len,
+ dynamic_headers, dynamic_headers_len);
+ }
+
+ /*
+ * 400 or 403 could indicate an issue with credentials- so we check for auth
+ * specific error messages and then force a refresh on the provider.
+ * For safety a refresh can be performed only once
+ * per FLB_AWS_CREDENTIAL_REFRESH_LIMIT.
+ *
+ */
+ if (c && (c->resp.status >= 400 && c->resp.status < 500)) {
+ if (aws_client->has_auth && time(NULL) > aws_client->refresh_limit) {
+ if (flb_aws_is_auth_error(c->resp.payload, c->resp.payload_size)
+ == FLB_TRUE) {
+ flb_info("[aws_client] auth error, refreshing creds");
+ aws_client->refresh_limit = time(NULL)
+ + FLB_AWS_CREDENTIAL_REFRESH_LIMIT;
+ aws_client->provider->provider_vtable->
+ refresh(aws_client->provider);
+ }
+ }
+ }
+
+ return c;
+}
+
+static struct flb_aws_client_vtable client_vtable = {
+ .request = flb_aws_client_request,
+};
+
+struct flb_aws_client *flb_aws_client_create()
+{
+ struct flb_aws_client *client = flb_calloc(1, sizeof(struct flb_aws_client));
+ if (!client) {
+ flb_errno();
+ return NULL;
+ }
+ client->client_vtable = &client_vtable;
+ client->retry_requests = FLB_FALSE;
+ client->debug_only = FLB_FALSE;
+ return client;
+}
+
+/* Generator that returns clients with the default vtable */
+
+static struct flb_aws_client_generator default_generator = {
+ .create = flb_aws_client_create,
+};
+
+struct flb_aws_client_generator *flb_aws_client_generator()
+{
+ return &default_generator;
+}
+
+void flb_aws_client_destroy(struct flb_aws_client *aws_client)
+{
+ if (aws_client) {
+ if (aws_client->upstream) {
+ flb_upstream_destroy(aws_client->upstream);
+ }
+ if (aws_client->extra_user_agent) {
+ flb_sds_destroy(aws_client->extra_user_agent);
+ }
+ flb_free(aws_client);
+ }
+}
+
+int flb_aws_is_auth_error(char *payload, size_t payload_size)
+{
+ flb_sds_t error = NULL;
+
+ if (payload_size == 0) {
+ return FLB_FALSE;
+ }
+
+ /* Fluent Bit calls the STS API which returns XML */
+ if (strcasestr(payload, "InvalidClientTokenId") != NULL) {
+ return FLB_TRUE;
+ }
+
+ if (strcasestr(payload, "AccessDenied") != NULL) {
+ return FLB_TRUE;
+ }
+
+ if (strcasestr(payload, "Expired") != NULL) {
+ return FLB_TRUE;
+ }
+
+ /* Most APIs we use return JSON */
+ error = flb_aws_error(payload, payload_size);
+ if (error != NULL) {
+ if (strcmp(error, "ExpiredToken") == 0 ||
+ strcmp(error, "ExpiredTokenException") == 0 ||
+ strcmp(error, "AccessDeniedException") == 0 ||
+ strcmp(error, "AccessDenied") == 0 ||
+ strcmp(error, "IncompleteSignature") == 0 ||
+ strcmp(error, "SignatureDoesNotMatch") == 0 ||
+ strcmp(error, "MissingAuthenticationToken") == 0 ||
+ strcmp(error, "InvalidClientTokenId") == 0 ||
+ strcmp(error, "InvalidToken") == 0 ||
+ strcmp(error, "InvalidAccessKeyId") == 0 ||
+ strcmp(error, "UnrecognizedClientException") == 0) {
+ flb_sds_destroy(error);
+ return FLB_TRUE;
+ }
+ flb_sds_destroy(error);
+ }
+
+ return FLB_FALSE;
+}
+
+struct flb_http_client *request_do(struct flb_aws_client *aws_client,
+ int method, const char *uri,
+ const char *body, size_t body_len,
+ struct flb_aws_header *dynamic_headers,
+ size_t dynamic_headers_len)
+{
+ size_t b_sent;
+ int ret;
+ struct flb_connection *u_conn = NULL;
+ flb_sds_t signature = NULL;
+ int i;
+ int normalize_uri;
+ struct flb_aws_header header;
+ struct flb_http_client *c = NULL;
+ flb_sds_t tmp;
+ flb_sds_t user_agent_prefix;
+ flb_sds_t user_agent = NULL;
+ char *buf;
+ struct flb_env *env;
+
+ u_conn = flb_upstream_conn_get(aws_client->upstream);
+ if (!u_conn) {
+ if (aws_client->debug_only == FLB_TRUE) {
+ flb_debug("[aws_client] connection initialization error");
+ }
+ else {
+ flb_error("[aws_client] connection initialization error");
+ }
+ return NULL;
+ }
+
+ /* Compose HTTP request */
+ c = flb_http_client(u_conn, method, uri,
+ body, body_len,
+ aws_client->host, aws_client->port,
+ aws_client->proxy, aws_client->flags);
+
+ if (!c) {
+ if (aws_client->debug_only == FLB_TRUE) {
+ flb_debug("[aws_client] could not initialize request");
+ }
+ else {
+ flb_error("[aws_client] could not initialize request");
+ }
+ goto error;
+ }
+
+ /* Increase the maximum HTTP response buffer size to fit large responses from AWS services */
+ ret = flb_http_buffer_size(c, FLB_MAX_AWS_RESP_BUFFER_SIZE);
+ if (ret != 0) {
+ flb_warn("[aws_http_client] failed to increase max response buffer size");
+ }
+
+ /* Set AWS Fluent Bit user agent */
+ env = aws_client->upstream->base.config->env;
+ buf = (char *) flb_env_get(env, "FLB_AWS_USER_AGENT");
+ if (buf == NULL) {
+ if (getenv(AWS_ECS_METADATA_URI) != NULL) {
+ user_agent = AWS_USER_AGENT_ECS;
+ }
+ else {
+ buf = (char *) flb_env_get(env, AWS_USER_AGENT_K8S);
+ if (buf && strcasecmp(buf, "enabled") == 0) {
+ user_agent = AWS_USER_AGENT_K8S;
+ }
+ }
+
+ if (user_agent == NULL) {
+ user_agent = AWS_USER_AGENT_NONE;
+ }
+
+ flb_env_set(env, "FLB_AWS_USER_AGENT", user_agent);
+ }
+ if (aws_client->extra_user_agent == NULL) {
+ buf = (char *) flb_env_get(env, "FLB_AWS_USER_AGENT");
+ tmp = flb_sds_create(buf);
+ if (!tmp) {
+ flb_errno();
+ goto error;
+ }
+ aws_client->extra_user_agent = tmp;
+ tmp = NULL;
+ }
+
+ /* Add AWS Fluent Bit user agent header */
+ if (strcasecmp(aws_client->extra_user_agent, AWS_USER_AGENT_NONE) == 0) {
+ ret = flb_http_add_header(c, "User-Agent", 10,
+ FLB_AWS_BASE_USER_AGENT, FLB_AWS_BASE_USER_AGENT_LEN);
+ }
+ else {
+ user_agent_prefix = flb_sds_create_size(64);
+ if (!user_agent_prefix) {
+ flb_errno();
+ flb_error("[aws_client] failed to create user agent");
+ goto error;
+ }
+ tmp = flb_sds_printf(&user_agent_prefix, FLB_AWS_BASE_USER_AGENT_FORMAT,
+ aws_client->extra_user_agent);
+ if (!tmp) {
+ flb_errno();
+ flb_sds_destroy(user_agent_prefix);
+ flb_error("[aws_client] failed to create user agent");
+ goto error;
+ }
+ user_agent_prefix = tmp;
+
+ ret = flb_http_add_header(c, "User-Agent", 10, user_agent_prefix,
+ flb_sds_len(user_agent_prefix));
+ flb_sds_destroy(user_agent_prefix);
+ }
+
+ if (ret < 0) {
+ if (aws_client->debug_only == FLB_TRUE) {
+ flb_debug("[aws_client] failed to add header to request");
+ }
+ else {
+ flb_error("[aws_client] failed to add header to request");
+ }
+ goto error;
+ }
+
+ /* add headers */
+ for (i = 0; i < aws_client->static_headers_len; i++) {
+ header = aws_client->static_headers[i];
+ ret = flb_http_add_header(c,
+ header.key, header.key_len,
+ header.val, header.val_len);
+ if (ret < 0) {
+ if (aws_client->debug_only == FLB_TRUE) {
+ flb_debug("[aws_client] failed to add header to request");
+ }
+ else {
+ flb_error("[aws_client] failed to add header to request");
+ }
+ goto error;
+ }
+ }
+
+ for (i = 0; i < dynamic_headers_len; i++) {
+ header = dynamic_headers[i];
+ ret = flb_http_add_header(c,
+ header.key, header.key_len,
+ header.val, header.val_len);
+ if (ret < 0) {
+ if (aws_client->debug_only == FLB_TRUE) {
+ flb_debug("[aws_client] failed to add header to request");
+ }
+ else {
+ flb_error("[aws_client] failed to add header to request");
+ }
+ goto error;
+ }
+ }
+
+ if (aws_client->has_auth) {
+ if (aws_client->s3_mode == S3_MODE_NONE) {
+ normalize_uri = FLB_TRUE;
+ }
+ else {
+ normalize_uri = FLB_FALSE;
+ }
+ signature = flb_signv4_do(c, normalize_uri, FLB_TRUE, time(NULL),
+ aws_client->region, aws_client->service,
+ aws_client->s3_mode, NULL,
+ aws_client->provider);
+ if (!signature) {
+ if (aws_client->debug_only == FLB_TRUE) {
+ flb_debug("[aws_client] could not sign request");
+ }
+ else {
+ flb_error("[aws_client] could not sign request");
+ }
+ goto error;
+ }
+ }
+
+ /* Perform request */
+ ret = flb_http_do(c, &b_sent);
+
+ if (ret != 0 || c->resp.status != 200) {
+ flb_debug("[aws_client] %s: http_do=%i, HTTP Status: %i",
+ aws_client->host, ret, c->resp.status);
+ }
+
+ if (ret != 0 && c != NULL) {
+ flb_http_client_destroy(c);
+ c = NULL;
+ }
+
+ flb_upstream_conn_release(u_conn);
+ flb_sds_destroy(signature);
+ return c;
+
+error:
+ if (u_conn) {
+ flb_upstream_conn_release(u_conn);
+ }
+ if (signature) {
+ flb_sds_destroy(signature);
+ }
+ if (c) {
+ flb_http_client_destroy(c);
+ }
+ return NULL;
+}
+
+void flb_aws_print_xml_error(char *response, size_t response_len,
+ char *api, struct flb_output_instance *ins)
+{
+ flb_sds_t error;
+ flb_sds_t message;
+
+ error = flb_aws_xml_get_val(response, response_len, "<Code>", "</Code>");
+ if (!error) {
+ flb_plg_error(ins, "%s: Could not parse response", api);
+ return;
+ }
+
+ message = flb_aws_xml_get_val(response, response_len, "<Message>", "</Message>");
+ if (!message) {
+ /* just print the error */
+ flb_plg_error(ins, "%s API responded with error='%s'", api, error);
+ }
+ else {
+ flb_plg_error(ins, "%s API responded with error='%s', message='%s'",
+ api, error, message);
+ flb_sds_destroy(message);
+ }
+
+ flb_sds_destroy(error);
+}
+
+/* Parses AWS XML API Error responses and returns the value of the <code> tag */
+flb_sds_t flb_aws_xml_error(char *response, size_t response_len)
+{
+ return flb_aws_xml_get_val(response, response_len, "<Code>", "</Code>");
+}
+
+/*
+ * Parses an XML document and returns the value of the given tag
+ * Param `tag` should include angle brackets; ex "<code>"
+ * And param `end` should include end brackets: "</code>"
+ */
+flb_sds_t flb_aws_xml_get_val(char *response, size_t response_len, char *tag, char *tag_end)
+{
+ flb_sds_t val = NULL;
+ char *node = NULL;
+ char *end;
+ int len;
+
+ if (response_len == 0) {
+ return NULL;
+ }
+ node = strstr(response, tag);
+ if (!node) {
+ flb_debug("[aws] Could not find '%s' tag in API response", tag);
+ return NULL;
+ }
+
+ /* advance to end of tag */
+ node += strlen(tag);
+
+ end = strstr(node, tag_end);
+ if (!end) {
+ flb_error("[aws] Could not find end of '%s' node in xml", tag);
+ return NULL;
+ }
+ len = end - node;
+ val = flb_sds_create_len(node, len);
+ if (!val) {
+ flb_errno();
+ return NULL;
+ }
+
+ return val;
+}
+
+void flb_aws_print_error(char *response, size_t response_len,
+ char *api, struct flb_output_instance *ins)
+{
+ flb_sds_t error;
+ flb_sds_t message;
+
+ error = flb_json_get_val(response, response_len, "__type");
+ if (!error) {
+ return;
+ }
+
+ message = flb_json_get_val(response, response_len, "message");
+ if (!message) {
+ /* just print the error */
+ flb_plg_error(ins, "%s API responded with error='%s'", api, error);
+ }
+ else {
+ flb_plg_error(ins, "%s API responded with error='%s', message='%s'",
+ api, error, message);
+ flb_sds_destroy(message);
+ }
+
+ flb_sds_destroy(error);
+}
+
+/* parses AWS JSON API error responses and returns the value of the __type field */
+flb_sds_t flb_aws_error(char *response, size_t response_len)
+{
+ return flb_json_get_val(response, response_len, "__type");
+}
+
+/* gets the value of a key in a json string */
+flb_sds_t flb_json_get_val(char *response, size_t response_len, char *key)
+{
+ jsmntok_t *tokens = NULL;
+ const jsmntok_t *t = NULL;
+ char *current_token = NULL;
+ jsmn_parser parser;
+ int tokens_size = 50;
+ size_t size;
+ int ret;
+ int i = 0;
+ int len;
+ flb_sds_t error_type = NULL;
+
+ jsmn_init(&parser);
+
+ size = sizeof(jsmntok_t) * tokens_size;
+ tokens = flb_calloc(1, size);
+ if (!tokens) {
+ flb_errno();
+ return NULL;
+ }
+
+ ret = jsmn_parse(&parser, response, response_len,
+ tokens, tokens_size);
+
+ if (ret == JSMN_ERROR_INVAL || ret == JSMN_ERROR_PART) {
+ flb_free(tokens);
+ flb_debug("[aws_client] Unable to parse API response- response is not"
+ " valid JSON.");
+ return NULL;
+ }
+
+ /* return value is number of tokens parsed */
+ tokens_size = ret;
+
+ /*
+ * jsmn will create an array of tokens like:
+ * key, value, key, value
+ */
+ while (i < (tokens_size - 1)) {
+ t = &tokens[i];
+
+ if (t->start == -1 || t->end == -1 || (t->start == 0 && t->end == 0)) {
+ break;
+ }
+
+ if (t->type == JSMN_STRING) {
+ current_token = &response[t->start];
+
+ if (strncmp(current_token, key, strlen(key)) == 0) {
+ i++;
+ t = &tokens[i];
+ current_token = &response[t->start];
+ len = t->end - t->start;
+ error_type = flb_sds_create_len(current_token, len);
+ if (!error_type) {
+ flb_errno();
+ flb_free(tokens);
+ return NULL;
+ }
+ break;
+ }
+ }
+
+ i++;
+ }
+ flb_free(tokens);
+ return error_type;
+}
+
+/* Generic replace function for strings. */
+static char* replace_uri_tokens(const char* original_string, const char* current_word,
+ const char* new_word)
+{
+ char *result;
+ int i = 0;
+ int count = 0;
+ int new_word_len = strlen(new_word);
+ int old_word_len = strlen(current_word);
+
+ for (i = 0; original_string[i] != '\0'; i++) {
+ if (strstr(&original_string[i], current_word) == &original_string[i]) {
+ count++;
+ i += old_word_len - 1;
+ }
+ }
+
+ result = flb_sds_create_size(i + count * (new_word_len - old_word_len) + 1);
+ if (!result) {
+ flb_errno();
+ return NULL;
+ }
+
+ i = 0;
+ while (*original_string) {
+ if (strstr(original_string, current_word) == original_string) {
+ strncpy(&result[i], new_word, new_word_len);
+ i += new_word_len;
+ original_string += old_word_len;
+ }
+ else
+ result[i++] = *original_string++;
+ }
+
+ result[i] = '\0';
+ return result;
+}
+
+/*
+ * Linux has strtok_r as the concurrent safe version
+ * Windows has strtok_s
+ */
+char* strtok_concurrent(
+ char* str,
+ char* delimiters,
+ char** context
+)
+{
+#ifdef FLB_SYSTEM_WINDOWS
+ return strtok_s(str, delimiters, context);
+#else
+ return strtok_r(str, delimiters, context);
+#endif
+}
+
+/* Constructs S3 object key as per the format. */
+flb_sds_t flb_get_s3_key(const char *format, time_t time, const char *tag,
+ char *tag_delimiter, uint64_t seq_index)
+{
+ int i = 0;
+ int ret = 0;
+ int seq_index_len;
+ char *tag_token = NULL;
+ char *key;
+ char *random_alphanumeric;
+ char *seq_index_str;
+ /* concurrent safe strtok_r requires a tracking ptr */
+ char *strtok_saveptr;
+ int len;
+ flb_sds_t tmp = NULL;
+ flb_sds_t buf = NULL;
+ flb_sds_t s3_key = NULL;
+ flb_sds_t tmp_key = NULL;
+ flb_sds_t tmp_tag = NULL;
+ struct tm gmt = {0};
+
+ if (strlen(format) > S3_KEY_SIZE){
+ flb_warn("[s3_key] Object key length is longer than the 1024 character limit.");
+ }
+
+ tmp_tag = flb_sds_create_len(tag, strlen(tag));
+ if(!tmp_tag){
+ goto error;
+ }
+
+ s3_key = flb_sds_create_len(format, strlen(format));
+ if (!s3_key) {
+ goto error;
+ }
+
+ /* Check if delimiter(s) specifed exists in the tag. */
+ for (i = 0; i < strlen(tag_delimiter); i++){
+ if (strchr(tag, tag_delimiter[i])){
+ ret = 1;
+ break;
+ }
+ }
+
+ tmp = flb_sds_create_len(TAG_PART_DESCRIPTOR, 5);
+ if (!tmp) {
+ goto error;
+ }
+ if (strstr(s3_key, tmp)){
+ if(ret == 0){
+ flb_warn("[s3_key] Invalid Tag delimiter: does not exist in tag. "
+ "tag=%s, format=%s", tag, format);
+ }
+ }
+
+ flb_sds_destroy(tmp);
+ tmp = NULL;
+
+ /* Split the string on the delimiters */
+ tag_token = strtok_concurrent(tmp_tag, tag_delimiter, &strtok_saveptr);
+
+ /* Find all occurences of $TAG[*] and
+ * replaces it with the right token from tag.
+ */
+ i = 0;
+ while(tag_token != NULL && i < MAX_TAG_PARTS) {
+ buf = flb_sds_create_size(10);
+ if (!buf) {
+ goto error;
+ }
+ tmp = flb_sds_printf(&buf, TAG_PART_DESCRIPTOR, i);
+ if (!tmp) {
+ goto error;
+ }
+
+ tmp_key = replace_uri_tokens(s3_key, tmp, tag_token);
+ if (!tmp_key) {
+ goto error;
+ }
+
+ if(strlen(tmp_key) > S3_KEY_SIZE){
+ flb_warn("[s3_key] Object key length is longer than the 1024 character limit.");
+ }
+
+ if (buf != tmp) {
+ flb_sds_destroy(buf);
+ }
+ flb_sds_destroy(tmp);
+ tmp = NULL;
+ buf = NULL;
+ flb_sds_destroy(s3_key);
+ s3_key = tmp_key;
+ tmp_key = NULL;
+
+ tag_token = strtok_concurrent(NULL, tag_delimiter, &strtok_saveptr);
+ i++;
+ }
+
+ tmp = flb_sds_create_len(TAG_PART_DESCRIPTOR, 5);
+ if (!tmp) {
+ goto error;
+ }
+
+ /* A match against "$TAG[" indicates an invalid or out of bounds tag part. */
+ if (strstr(s3_key, tmp)){
+ flb_warn("[s3_key] Invalid / Out of bounds tag part: At most 10 tag parts "
+ "($TAG[0] - $TAG[9]) can be processed. tag=%s, format=%s, delimiters=%s",
+ tag, format, tag_delimiter);
+ }
+
+ /* Find all occurences of $TAG and replace with the entire tag. */
+ tmp_key = replace_uri_tokens(s3_key, TAG_DESCRIPTOR, tag);
+ if (!tmp_key) {
+ goto error;
+ }
+
+ if(strlen(tmp_key) > S3_KEY_SIZE){
+ flb_warn("[s3_key] Object key length is longer than the 1024 character limit.");
+ }
+
+ flb_sds_destroy(s3_key);
+ s3_key = tmp_key;
+ tmp_key = NULL;
+
+ /* Find all occurences of $INDEX and replace with the appropriate index. */
+ if (strstr((char *) format, INDEX_STRING)) {
+ seq_index_len = snprintf(NULL, 0, "%"PRIu64, seq_index);
+ seq_index_str = flb_calloc(seq_index_len + 1, sizeof(char));
+ if (seq_index_str == NULL) {
+ goto error;
+ }
+
+ sprintf(seq_index_str, "%"PRIu64, seq_index);
+ seq_index_str[seq_index_len] = '\0';
+ tmp_key = replace_uri_tokens(s3_key, INDEX_STRING, seq_index_str);
+ if (tmp_key == NULL) {
+ flb_free(seq_index_str);
+ goto error;
+ }
+ if (strlen(tmp_key) > S3_KEY_SIZE) {
+ flb_warn("[s3_key] Object key length is longer than the 1024 character limit.");
+ }
+
+ flb_sds_destroy(s3_key);
+ s3_key = tmp_key;
+ tmp_key = NULL;
+ flb_free(seq_index_str);
+ }
+
+ /* Find all occurences of $UUID and replace with a random string. */
+ random_alphanumeric = flb_sts_session_name();
+ if (!random_alphanumeric) {
+ goto error;
+ }
+ /* only use 8 chars of the random string */
+ random_alphanumeric[8] = '\0';
+ tmp_key = replace_uri_tokens(s3_key, RANDOM_STRING, random_alphanumeric);
+ if (!tmp_key) {
+ flb_free(random_alphanumeric);
+ goto error;
+ }
+
+ if(strlen(tmp_key) > S3_KEY_SIZE){
+ flb_warn("[s3_key] Object key length is longer than the 1024 character limit.");
+ }
+
+ flb_sds_destroy(s3_key);
+ s3_key = tmp_key;
+ tmp_key = NULL;
+ flb_free(random_alphanumeric);
+
+ if (!gmtime_r(&time, &gmt)) {
+ flb_error("[s3_key] Failed to create timestamp.");
+ goto error;
+ }
+
+ flb_sds_destroy(tmp);
+ tmp = NULL;
+
+ /* A string no longer than S3_KEY_SIZE + 1 is created to store the formatted timestamp. */
+ key = flb_calloc(1, (S3_KEY_SIZE + 1) * sizeof(char));
+ if (!key) {
+ goto error;
+ }
+
+ ret = strftime(key, S3_KEY_SIZE, s3_key, &gmt);
+ if(ret == 0){
+ flb_warn("[s3_key] Object key length is longer than the 1024 character limit.");
+ }
+ flb_sds_destroy(s3_key);
+
+ len = strlen(key);
+ if (len > S3_KEY_SIZE) {
+ len = S3_KEY_SIZE;
+ }
+
+ s3_key = flb_sds_create_len(key, len);
+ flb_free(key);
+ if (!s3_key) {
+ goto error;
+ }
+
+ flb_sds_destroy(tmp_tag);
+ tmp_tag = NULL;
+ return s3_key;
+
+ error:
+ flb_errno();
+ if (tmp_tag){
+ flb_sds_destroy(tmp_tag);
+ }
+ if (s3_key){
+ flb_sds_destroy(s3_key);
+ }
+ if (buf && buf != tmp){
+ flb_sds_destroy(buf);
+ }
+ if (tmp){
+ flb_sds_destroy(tmp);
+ }
+ if (tmp_key){
+ flb_sds_destroy(tmp_key);
+ }
+ return NULL;
+}
+
+/*
+ * This function is an extension to strftime which can support milliseconds with %3N,
+ * support nanoseconds with %9N or %L. The return value is the length of formatted
+ * time string.
+ */
+size_t flb_aws_strftime_precision(char **out_buf, const char *time_format,
+ struct flb_time *tms)
+{
+ char millisecond_str[FLB_AWS_MILLISECOND_FORMATTER_LENGTH+1];
+ char nanosecond_str[FLB_AWS_NANOSECOND_FORMATTER_LENGTH+1];
+ char *tmp_parsed_time_str;
+ char *buf;
+ size_t out_size;
+ size_t tmp_parsed_time_str_len;
+ size_t time_format_len;
+ struct tm timestamp;
+ struct tm *tmp;
+ int i;
+
+ /*
+ * Guess the max length needed for tmp_parsed_time_str and tmp_out_buf. The
+ * upper bound is 12*strlen(time_format) because the worst scenario will be only
+ * %c in time_format, and %c will be transfer to 24 chars long by function strftime().
+ */
+ time_format_len = strlen(time_format);
+ tmp_parsed_time_str_len = 12*time_format_len;
+
+ /*
+ * Use tmp_parsed_time_str to buffer when replace %3N with milliseconds, replace
+ * %9N and %L with nanoseconds in time_format.
+ */
+ tmp_parsed_time_str = (char *)flb_calloc(1, tmp_parsed_time_str_len*sizeof(char));
+ if (!tmp_parsed_time_str) {
+ flb_errno();
+ return 0;
+ }
+
+ buf = (char *)flb_calloc(1, tmp_parsed_time_str_len*sizeof(char));
+ if (!buf) {
+ flb_errno();
+ flb_free(tmp_parsed_time_str);
+ return 0;
+ }
+
+ /* Replace %3N to millisecond, %9N and %L to nanosecond in time_format. */
+ snprintf(millisecond_str, FLB_AWS_MILLISECOND_FORMATTER_LENGTH+1,
+ "%" PRIu64, (uint64_t) tms->tm.tv_nsec / 1000000);
+ snprintf(nanosecond_str, FLB_AWS_NANOSECOND_FORMATTER_LENGTH+1,
+ "%" PRIu64, (uint64_t) tms->tm.tv_nsec);
+ for (i = 0; i < time_format_len; i++) {
+ if (strncmp(time_format+i, FLB_AWS_MILLISECOND_FORMATTER, 3) == 0) {
+ strncat(tmp_parsed_time_str, millisecond_str,
+ FLB_AWS_MILLISECOND_FORMATTER_LENGTH+1);
+ i += 2;
+ }
+ else if (strncmp(time_format+i, FLB_AWS_NANOSECOND_FORMATTER_N, 3) == 0) {
+ strncat(tmp_parsed_time_str, nanosecond_str,
+ FLB_AWS_NANOSECOND_FORMATTER_LENGTH+1);
+ i += 2;
+ }
+ else if (strncmp(time_format+i, FLB_AWS_NANOSECOND_FORMATTER_L, 2) == 0) {
+ strncat(tmp_parsed_time_str, nanosecond_str,
+ FLB_AWS_NANOSECOND_FORMATTER_LENGTH+1);
+ i += 1;
+ }
+ else {
+ strncat(tmp_parsed_time_str,time_format+i,1);
+ }
+ }
+
+ tmp = gmtime_r(&tms->tm.tv_sec, &timestamp);
+ if (!tmp) {
+ return 0;
+ }
+
+ out_size = strftime(buf, tmp_parsed_time_str_len,
+ tmp_parsed_time_str, &timestamp);
+
+ /* Check whether tmp_parsed_time_str_len is enough for tmp_out_buff */
+ if (out_size == 0) {
+ flb_free(tmp_parsed_time_str);
+ flb_free(buf);
+ return 0;
+ }
+
+ *out_buf = buf;
+ flb_free(tmp_parsed_time_str);
+
+ return out_size;
+}