summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/filter_aws/aws.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/plugins/filter_aws/aws.c')
-rw-r--r--src/fluent-bit/plugins/filter_aws/aws.c1062
1 files changed, 1062 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/filter_aws/aws.c b/src/fluent-bit/plugins/filter_aws/aws.c
new file mode 100644
index 000000000..726b8b709
--- /dev/null
+++ b/src/fluent-bit/plugins/filter_aws/aws.c
@@ -0,0 +1,1062 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <fluent-bit/flb_aws_util.h>
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_filter.h>
+#include <fluent-bit/flb_filter_plugin.h>
+#include <fluent-bit/flb_http_client.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_str.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/flb_upstream.h>
+#include <fluent-bit/flb_io.h>
+#include <fluent-bit/flb_kv.h>
+#include <fluent-bit/flb_env.h>
+#include <fluent-bit/aws/flb_aws_imds.h>
+#include <fluent-bit/flb_log_event_decoder.h>
+#include <fluent-bit/flb_log_event_encoder.h>
+
+#include <monkey/mk_core/mk_list.h>
+#include <msgpack.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <string.h>
+
+#include "aws.h"
+
+static int get_ec2_metadata(struct flb_filter_aws *ctx);
+
+static void expose_aws_meta(struct flb_filter_aws *ctx)
+{
+ struct flb_env *env;
+ struct flb_config *config = ctx->ins->config;
+
+ env = config->env;
+
+ flb_env_set(env, "aws", "enabled");
+
+ if (ctx->availability_zone_include) {
+ flb_env_set(env,
+ "aws." FLB_FILTER_AWS_AVAILABILITY_ZONE_KEY,
+ ctx->availability_zone);
+ }
+
+ if (ctx->instance_id_include) {
+ flb_env_set(env,
+ "aws." FLB_FILTER_AWS_INSTANCE_ID_KEY,
+ ctx->instance_id);
+ }
+
+ if (ctx->instance_type_include) {
+ flb_env_set(env,
+ "aws." FLB_FILTER_AWS_INSTANCE_TYPE_KEY,
+ ctx->instance_type);
+ }
+
+ if (ctx->private_ip_include) {
+ flb_env_set(env,
+ "aws." FLB_FILTER_AWS_PRIVATE_IP_KEY,
+ ctx->private_ip);
+ }
+
+ if (ctx->vpc_id_include) {
+ flb_env_set(env,
+ "aws." FLB_FILTER_AWS_VPC_ID_KEY,
+ ctx->vpc_id);
+ }
+
+ if (ctx->ami_id_include) {
+ flb_env_set(env,
+ "aws." FLB_FILTER_AWS_AMI_ID_KEY,
+ ctx->ami_id);
+ }
+
+ if (ctx->account_id_include) {
+ flb_env_set(env,
+ "aws." FLB_FILTER_AWS_ACCOUNT_ID_KEY,
+ ctx->account_id);
+ }
+
+ if (ctx->hostname_include) {
+ flb_env_set(env,
+ "aws." FLB_FILTER_AWS_HOSTNAME_KEY,
+ ctx->hostname);
+ }
+}
+
+static int cb_aws_init(struct flb_filter_instance *f_ins,
+ struct flb_config *config,
+ void *data)
+{
+ int imds_version = FLB_AWS_IMDS_VERSION_2;
+ int ret;
+ struct flb_filter_aws *ctx = NULL;
+ struct flb_filter_aws_init_options *options = data;
+ const char *tmp = NULL;
+
+ /* Create context */
+ ctx = flb_calloc(1, sizeof(struct flb_filter_aws));
+ if (!ctx) {
+ flb_errno();
+ return -1;
+ }
+
+ ctx->options = options;
+ ctx->ins = f_ins;
+
+ tmp = flb_filter_get_property("imds_version", f_ins);
+ if (tmp != NULL) {
+ if (strcasecmp(tmp, "v1") == 0) {
+ imds_version = FLB_AWS_IMDS_VERSION_1;
+ }
+ else if (strcasecmp(tmp, "v2") != 0) {
+ flb_plg_error(ctx->ins, "Invalid value %s for config option "
+ "'imds_version'. Valid values are 'v1' and 'v2'",
+ tmp);
+ flb_free(ctx);
+ return -1;
+ }
+ }
+
+ struct flb_aws_client_generator *generator;
+ if (options && options->client_generator) {
+ generator = options->client_generator;
+ } else {
+ generator = flb_aws_client_generator();
+ }
+ ctx->aws_ec2_filter_client = generator->create();
+ ctx->aws_ec2_filter_client->name = "ec2_imds_provider_client";
+ ctx->aws_ec2_filter_client->has_auth = FLB_FALSE;
+ ctx->aws_ec2_filter_client->provider = NULL;
+ ctx->aws_ec2_filter_client->region = NULL;
+ ctx->aws_ec2_filter_client->service = NULL;
+ ctx->aws_ec2_filter_client->port = FLB_AWS_IMDS_PORT;
+ ctx->aws_ec2_filter_client->flags = 0;
+ ctx->aws_ec2_filter_client->proxy = NULL;
+
+ struct flb_upstream *upstream;
+ upstream = flb_upstream_create(config, FLB_AWS_IMDS_HOST, FLB_AWS_IMDS_PORT,
+ FLB_IO_TCP, NULL);
+ if (!upstream) {
+ flb_plg_debug(ctx->ins, "unable to connect to EC2 IMDS");
+ return -1;
+ }
+
+ /* IMDSv2 token request will timeout if hops = 1 and running within container */
+ upstream->base.net.connect_timeout = FLB_AWS_IMDS_TIMEOUT;
+ upstream->base.net.io_timeout = FLB_AWS_IMDS_TIMEOUT;
+ upstream->base.net.keepalive = FLB_FALSE; /* On timeout, the connection is broken */
+ ctx->aws_ec2_filter_client->upstream = upstream;
+ flb_stream_disable_async_mode(&ctx->aws_ec2_filter_client->upstream->base);
+
+ ctx->client_imds = flb_aws_imds_create(&flb_aws_imds_config_default,
+ ctx->aws_ec2_filter_client);
+ if (!ctx->client_imds) {
+ flb_plg_error(ctx->ins, "failed to create aws client");
+ flb_free(ctx);
+ return -1;
+ }
+ ctx->client_imds->imds_version = imds_version;
+
+ /* Populate context with config map defaults and incoming properties */
+ ret = flb_filter_config_map_set(f_ins, (void *) ctx);
+ if (ret == -1) {
+ flb_plg_error(f_ins, "configuration error");
+ flb_free(ctx);
+ return -1;
+ }
+
+ ctx->metadata_retrieved = FLB_FALSE;
+
+ /* Retrieve metadata */
+ ret = get_ec2_metadata(ctx);
+ if (ret < 0) {
+ /* If the metadata fetch fails, the plugin continues to work. */
+ /* Every flush will attempt to fetch ec2 metadata, if needed. */
+ /* In the error is unrecoverable (-3), it exits and does not retry. */
+ if (ret == -3) {
+ flb_free(ctx);
+ return -1;
+ }
+ }
+ else {
+ expose_aws_meta(ctx);
+ }
+
+ flb_filter_set_context(f_ins, ctx);
+ return 0;
+}
+
+
+/* Get VPC ID from the metadata server.
+ * Initializes ctx->vpc_id and ctx->vpc_id_len.
+ */
+static int get_vpc_id(struct flb_filter_aws *ctx)
+{
+ ctx->vpc_id = flb_aws_imds_get_vpc_id(ctx->client_imds);
+ if (ctx->vpc_id == NULL) {
+ return -1;
+ }
+ ctx->vpc_id_len = flb_sds_len(ctx->vpc_id);
+ return 0;
+}
+
+void flb_filter_aws_tags_destroy(struct flb_filter_aws *ctx)
+{
+ int i;
+ if (!ctx) {
+ return;
+ }
+ if (ctx->tag_keys) {
+ for (i = 0; i < ctx->tags_count; i++) {
+ if (ctx->tag_keys[i]) {
+ flb_sds_destroy(ctx->tag_keys[i]);
+ }
+ }
+ flb_free(ctx->tag_keys);
+ ctx->tag_keys = NULL;
+ }
+ if (ctx->tag_values) {
+ for (i = 0; i < ctx->tags_count; i++) {
+ if (ctx->tag_values[i]) {
+ flb_sds_destroy(ctx->tag_values[i]);
+ }
+ }
+ flb_free(ctx->tag_values);
+ ctx->tag_values = NULL;
+ }
+ if (ctx->tag_keys_len) {
+ flb_free(ctx->tag_keys_len);
+ }
+ ctx->tag_keys_len = NULL;
+ if (ctx->tag_values_len) {
+ flb_free(ctx->tag_values_len);
+ }
+ ctx->tag_values_len = NULL;
+ if (ctx->tag_is_enabled) {
+ flb_free(ctx->tag_is_enabled);
+ }
+ ctx->tag_is_enabled = NULL;
+ ctx->tags_count = 0;
+}
+
+/* Get EC2 instance tag keys from /latest/meta-data/tags/instance.
+ * Initializes ctx->tags_count, ctx->tag_keys and ctx->tag_keys_len.
+ *
+ * In case EC2 metadata server doesn't return tags, either due to the fact that tags are
+ * disabled in the metadata server or EC2 has no tags, function returns -2.
+ */
+static int get_ec2_tag_keys(struct flb_filter_aws *ctx)
+{
+ int ret;
+ flb_sds_t tags_list = NULL;
+ size_t len = 0;
+ size_t tag_index = 0;
+ size_t tag_start = 0;
+ size_t tag_end = 0;
+ flb_sds_t tag_key;
+ flb_sds_t tmp;
+ size_t tag_key_len;
+ int i;
+
+ /* get a list of tag keys from the meta data server */
+ ret = flb_aws_imds_request(ctx->client_imds, FLB_AWS_IMDS_INSTANCE_TAG, &tags_list,
+ &len);
+ if (ret < 0) {
+ ctx->tags_count = 0;
+ if (ret == -2) { /* if there are no tags, response status code is 404 */
+ flb_plg_warn(ctx->ins, "EC2 instance metadata tag request returned 404. "
+ "This likely indicates your instance has no tags "
+ "or the EC2 tagging metadata API is not enabled");
+ return -2;
+ }
+ flb_sds_destroy(tags_list);
+ return -1;
+ }
+
+ /* if endpoint returned 200, normally at least 1 tag should be present */
+ /* for the sake of correctness, let's check the edge case when response is empty */
+ if (len == 0) {
+ ctx->tags_count = 0;
+ flb_sds_destroy(tags_list);
+ return -1;
+ }
+
+ /* count number of tag keys and allocate memory for pointers and lengths */
+ /* since get_metadata returned 0, we assume there is at least 1 tag */
+ /* \n is separator, therefore number of items = number of \n + 1 */
+ ctx->tags_count = 1;
+ for (i = 0; i < len; i++) {
+ if (tags_list[i] == '\n') {
+ ctx->tags_count++;
+ }
+ }
+ ctx->tag_keys = flb_calloc(ctx->tags_count, sizeof(flb_sds_t));
+ if (!ctx->tag_keys) {
+ flb_errno();
+ flb_sds_destroy(tags_list);
+ return -1;
+ }
+ ctx->tag_keys_len = flb_calloc(ctx->tags_count, sizeof(size_t*));
+ if (!ctx->tag_keys_len) {
+ flb_errno();
+ flb_sds_destroy(tags_list);
+ return -1;
+ }
+
+ /* go over the response and initialize tag_keys values */
+ /* code below finds two indices which define tag key and copies them to ctx */
+ while (tag_end <= len) {
+ /* replace \n with \0 to 'clearly' separate tag key strings */
+ if (tags_list[tag_end] == '\n') {
+ tags_list[tag_end] = '\0';
+ }
+ if ((tags_list[tag_end] == '\0' || tag_end == len) && (tag_start < tag_end)) {
+ /* length of tag key characters is the difference between start and end */
+ /* for instance, if tag name is 'Name\0...', the corresponding values are */
+ /* tag_start = 0, points to 'N' */
+ /* tag_end = 4, points to '\0' just after 'e' */
+ /* f.e.: 4 - 0 = 4, which is equal to len("Name") */
+ tag_key_len = tag_end - tag_start;
+ ctx->tag_keys_len[tag_index] = tag_key_len;
+
+ /* allocate new memory for the tag key value */
+ /* + 1, because we need one more character for \0 */
+ tmp = flb_sds_create_size(tag_key_len + 1);
+ if (!tmp) {
+ flb_errno();
+ flb_sds_destroy(tags_list);
+ return -2;
+ }
+ tmp[tag_key_len] = '\0';
+ ctx->tag_keys[tag_index] = tmp;
+
+ /* tag_key points to the first character of tag key as char* */
+ tag_key = tags_list + tag_start;
+ memcpy(ctx->tag_keys[tag_index], tag_key, tag_key_len);
+
+ tag_index++;
+ tag_start = tag_end + 1;
+ }
+ tag_end++;
+ }
+
+ flb_sds_destroy(tags_list);
+
+ return ret;
+}
+
+/* Get EC2 instance tag values from /latest/meta-data/tags/instance/{tag_key}.
+ * Initializes ctx->tag_values and ctx->tag_values_len.
+ */
+static int get_ec2_tag_values(struct flb_filter_aws *ctx)
+{
+ int ret;
+ size_t i;
+ flb_sds_t tag_value = NULL;
+ size_t tag_value_len = 0;
+ size_t tag_value_path_len;
+ flb_sds_t tag_value_path;
+ flb_sds_t tmp;
+
+ /* initialize array for the tag values */
+ ctx->tag_values = flb_calloc(ctx->tags_count, sizeof(flb_sds_t));
+ if (!ctx->tag_values) {
+ flb_errno();
+ return -1;
+ }
+ ctx->tag_values_len = flb_calloc(ctx->tags_count, sizeof(size_t));
+ if (!ctx->tag_values_len) {
+ flb_errno();
+ return -1;
+ }
+
+ for (i = 0; i < ctx->tags_count; i++) {
+ /* fetch tag value using path: /latest/meta-data/tags/instance/{tag_name} */
+ tag_value_path_len = ctx->tag_keys_len[i] + 1 +
+ strlen(FLB_AWS_IMDS_INSTANCE_TAG);
+ tag_value_path = flb_sds_create_size(tag_value_path_len + 1);
+ if (!tag_value_path) {
+ flb_errno();
+ return -1;
+ }
+ tmp = flb_sds_printf(&tag_value_path, "%s/%s",
+ FLB_AWS_IMDS_INSTANCE_TAG,
+ ctx->tag_keys[i]);
+ if (!tmp) {
+ flb_errno();
+ flb_sds_destroy(tag_value_path);
+ return -1;
+ }
+ tag_value_path = tmp;
+
+ ret = flb_aws_imds_request(ctx->client_imds, tag_value_path, &tag_value,
+ &tag_value_len);
+ if (ret < 0) {
+ flb_sds_destroy(tag_value_path);
+ if (ret == -2) {
+ flb_plg_error(ctx->ins, "no value for tag %s", ctx->tag_keys[i]);
+ } else {
+ flb_plg_error(ctx->ins, "could not fetch value for tag %s",
+ ctx->tag_keys[i]);
+ }
+ return ret;
+ }
+
+ ctx->tag_values[i] = tag_value;
+ ctx->tag_values_len[i] = tag_value_len;
+
+ flb_sds_destroy(tag_value_path);
+ }
+
+ return 0;
+}
+
+static int tag_is_present_in_list(struct flb_filter_aws *ctx, flb_sds_t tag,
+ flb_sds_t *tags, int tags_n)
+{
+ int i;
+ for (i = 0; i < tags_n; i++) {
+ if (strcmp(tag, tags[i]) == 0) {
+ return FLB_TRUE;
+ }
+ }
+ return FLB_FALSE;
+}
+
+static int tags_split(char *tags, flb_sds_t **tags_list, int *tags_list_n) {
+ flb_sds_t token;
+ int i;
+ int n;
+ n = 1;
+ for (i = 0; i < strlen(tags); i++) {
+ if (tags[i] == ',') {
+ n++;
+ }
+ }
+
+ *tags_list = flb_calloc(sizeof(flb_sds_t), n);
+ if (*tags_list == NULL) {
+ return -2;
+ }
+
+ token = strtok(tags, ",");
+ i = 0;
+ while (token != NULL) {
+ (*tags_list)[i] = token;
+ i++;
+ token = strtok(NULL, ",");
+ }
+
+ *tags_list_n = n;
+
+ return 0;
+}
+
+static int get_ec2_tag_enabled(struct flb_filter_aws *ctx)
+{
+ const char *tags_include;
+ const char *tags_exclude;
+ char *tags_copy;
+ flb_sds_t *tags;
+ int tags_n;
+ int i;
+ int tag_present;
+ int result;
+
+ /* if there are no tags, there is no need to evaluate which tag is enabled */
+ if (ctx->tags_count == 0) {
+ return 0;
+ }
+
+
+ /* allocate memory for 'tag_is_enabled' for all tags */
+ ctx->tag_is_enabled = flb_calloc(ctx->tags_count, sizeof(int));
+ if (!ctx->tag_is_enabled) {
+ flb_plg_error(ctx->ins, "Failed to allocate memory for tag_is_enabled");
+ return -1;
+ }
+
+ /* if tags_include and tags_exclude are not defined, set all tags as enabled */
+ for (i = 0; i < ctx->tags_count; i++) {
+ ctx->tag_is_enabled[i] = FLB_TRUE;
+ }
+
+ /* apply tags_included configuration */
+ tags_include = flb_filter_get_property("tags_include", ctx->ins);
+ if (tags_include) {
+ /* copy const string in order to use strtok which modifes the string */
+ tags_copy = flb_strdup(tags_include);
+ if (!tags_copy) {
+ return -1;
+ }
+ result = tags_split(tags_copy, &tags, &tags_n);
+ if (result < 0) {
+ free(tags_copy);
+ return -1;
+ }
+ for (i = 0; i < ctx->tags_count; i++) {
+ tag_present = tag_is_present_in_list(ctx, ctx->tag_keys[i], tags, tags_n);
+ /* tag is enabled if present in included list */
+ ctx->tag_is_enabled[i] = tag_present;
+ }
+ free(tags_copy);
+ free(tags);
+ }
+
+ /* apply tags_excluded configuration, only if tags_included is not defined */
+ tags_exclude = flb_filter_get_property("tags_exclude", ctx->ins);
+ if (tags_include && tags_exclude) {
+ flb_plg_error(ctx->ins, "configuration is invalid, both tags_include"
+ " and tags_exclude are specified at the same time");
+ return -3;
+ }
+ if (!tags_include && tags_exclude) {
+ /* copy const string in order to use strtok which modifes the string */
+ tags_copy = flb_strdup(tags_exclude);
+ if (!tags_copy) {
+ return -1;
+ }
+ result = tags_split(tags_copy, &tags, &tags_n);
+ if (result < 0) {
+ free(tags_copy);
+ return -1;
+ }
+ for (i = 0; i < ctx->tags_count; i++) {
+ tag_present = tag_is_present_in_list(ctx, ctx->tag_keys[i], tags, tags_n);
+ if (tag_present == FLB_TRUE) {
+ /* tag is excluded, so should be disabled */
+ ctx->tag_is_enabled[i] = FLB_FALSE;
+ } else {
+ /* tag is not excluded, therefore should be enabled */
+ ctx->tag_is_enabled[i] = FLB_TRUE;
+ }
+ }
+ free(tags_copy);
+ free(tags);
+ }
+
+ return 0;
+}
+
+static int get_ec2_tags(struct flb_filter_aws *ctx)
+{
+ int i;
+ int ret;
+
+ ctx->tags_fetched = FLB_FALSE;
+
+ /* get_ec2_tags function might be called multiple times, so we need to always */
+ /* free memory for tags in case of previous allocations */
+ flb_filter_aws_tags_destroy(ctx);
+
+ ret = get_ec2_tag_keys(ctx);
+ if (ret < 0) {
+ flb_filter_aws_tags_destroy(ctx);
+ if (ret == -2) {
+ /* -2 means there are no tags, */
+ /* to avoid requesting ec2 tags repeatedly for each flush */
+ /* it marks fetching tags as done */
+ ctx->tags_fetched = FLB_TRUE;
+ return 0;
+ }
+ return ret;
+ }
+ ret = get_ec2_tag_values(ctx);
+ if (ret < 0) {
+ flb_filter_aws_tags_destroy(ctx);
+ return ret;
+ }
+
+ ret = get_ec2_tag_enabled(ctx);
+ if (ret < 0) {
+ flb_filter_aws_tags_destroy(ctx);
+ return ret;
+ }
+
+ /* log tags debug information */
+ for (i = 0; i < ctx->tags_count; i++) {
+ flb_plg_debug(ctx->ins, "found tag %s which is included=%d",
+ ctx->tag_keys[i], ctx->tag_is_enabled[i]);
+ }
+
+ ctx->tags_fetched = FLB_TRUE;
+ return 0;
+}
+
+/*
+ * Makes a call to IMDS to set get the values of all metadata fields.
+ * It can be called repeatedly if some metadata calls initially do not succeed.
+ */
+static int get_ec2_metadata(struct flb_filter_aws *ctx)
+{
+ int ret;
+ int i;
+
+ if (ctx->instance_id_include && !ctx->instance_id) {
+ ret = flb_aws_imds_request(ctx->client_imds, FLB_AWS_IMDS_INSTANCE_ID_PATH,
+ &ctx->instance_id,
+ &ctx->instance_id_len);
+ if (ret < 0) {
+ flb_plg_error(ctx->ins, "Failed to get instance ID");
+ return -1;
+ }
+ ctx->new_keys++;
+ }
+
+ if (ctx->availability_zone_include && !ctx->availability_zone) {
+ ret = flb_aws_imds_request(ctx->client_imds, FLB_AWS_IMDS_AZ_PATH,
+ &ctx->availability_zone,
+ &ctx->availability_zone_len);
+
+ if (ret < 0) {
+ flb_plg_error(ctx->ins, "Failed to get instance AZ");
+ return -1;
+ }
+ ctx->new_keys++;
+ }
+
+ if (ctx->instance_type_include && !ctx->instance_type) {
+ ret = flb_aws_imds_request(ctx->client_imds, FLB_AWS_IMDS_INSTANCE_TYPE_PATH,
+ &ctx->instance_type, &ctx->instance_type_len);
+
+ if (ret < 0) {
+ flb_plg_error(ctx->ins, "Failed to get instance type");
+ return -1;
+ }
+ ctx->new_keys++;
+ }
+
+ if (ctx->private_ip_include && !ctx->private_ip) {
+ ret = flb_aws_imds_request(ctx->client_imds, FLB_AWS_IMDS_PRIVATE_IP_PATH,
+ &ctx->private_ip, &ctx->private_ip_len);
+
+ if (ret < 0) {
+ flb_plg_error(ctx->ins, "Failed to get instance private IP");
+ return -1;
+ }
+ ctx->new_keys++;
+ }
+
+ if (ctx->vpc_id_include && !ctx->vpc_id) {
+ ret = get_vpc_id(ctx);
+
+ if (ret < 0) {
+ flb_plg_error(ctx->ins, "Failed to get instance VPC ID");
+ return -1;
+ }
+ ctx->new_keys++;
+ }
+
+ if (ctx->ami_id_include && !ctx->ami_id) {
+ ret = flb_aws_imds_request(ctx->client_imds, FLB_AWS_IMDS_AMI_ID_PATH,
+ &ctx->ami_id, &ctx->ami_id_len);
+
+ if (ret < 0) {
+ flb_plg_error(ctx->ins, "Failed to get AMI ID");
+ return -1;
+ }
+ ctx->new_keys++;
+ }
+
+ if (ctx->account_id_include && !ctx->account_id) {
+ ret = flb_aws_imds_request_by_key(ctx->client_imds, FLB_AWS_IMDS_ACCOUNT_ID_PATH,
+ &ctx->account_id, &ctx->account_id_len,
+ "accountId");
+
+ if (ret < 0) {
+ flb_plg_error(ctx->ins, "Failed to get Account ID");
+ return -1;
+ }
+ ctx->new_keys++;
+ }
+
+ if (ctx->hostname_include && !ctx->hostname) {
+ ret = flb_aws_imds_request(ctx->client_imds, FLB_AWS_IMDS_HOSTNAME_PATH,
+ &ctx->hostname, &ctx->hostname_len);
+
+ if (ret < 0) {
+ flb_plg_error(ctx->ins, "Failed to get Hostname");
+ return -1;
+ }
+ ctx->new_keys++;
+ }
+
+ if (ctx->tags_enabled && !ctx->tags_fetched) {
+ ret = get_ec2_tags(ctx);
+ if (ret < 0) {
+ flb_plg_error(ctx->ins, "Failed to get instance EC2 Tags");
+ return ret;
+ }
+ for (i = 0; i < ctx->tags_count; i++) {
+ if (ctx->tag_is_enabled[i] == FLB_TRUE) {
+ ctx->new_keys++;
+ }
+ }
+ }
+
+ ctx->metadata_retrieved = FLB_TRUE;
+ return 0;
+}
+
+static int cb_aws_filter(const void *data, size_t bytes,
+ const char *tag, int tag_len,
+ void **out_buf, size_t *out_size,
+ struct flb_filter_instance *f_ins,
+ struct flb_input_instance *i_ins,
+ void *context,
+ struct flb_config *config)
+{
+ struct flb_filter_aws *ctx = context;
+ int i = 0;
+ int ret;
+ msgpack_object *obj;
+ msgpack_object_kv *kv;
+ struct flb_log_event_encoder log_encoder;
+ struct flb_log_event_decoder log_decoder;
+ struct flb_log_event log_event;
+
+ (void) f_ins;
+ (void) i_ins;
+ (void) config;
+
+ /* First check that the metadata has been retrieved */
+ if (!ctx->metadata_retrieved) {
+ ret = get_ec2_metadata(ctx);
+ if (ret < 0) {
+ return FLB_FILTER_NOTOUCH;
+ }
+ expose_aws_meta(ctx);
+ }
+
+ ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
+
+ if (ret != FLB_EVENT_DECODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event decoder initialization error : %d", ret);
+
+ return FLB_FILTER_NOTOUCH;
+ }
+
+ ret = flb_log_event_encoder_init(&log_encoder,
+ FLB_LOG_EVENT_FORMAT_DEFAULT);
+
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event encoder initialization error : %d", ret);
+
+ flb_log_event_decoder_destroy(&log_decoder);
+
+ return FLB_FILTER_NOTOUCH;
+ }
+
+ while ((ret = flb_log_event_decoder_next(
+ &log_decoder,
+ &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
+ obj = log_event.body;
+
+ ret = flb_log_event_encoder_begin_record(&log_encoder);
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_set_timestamp(
+ &log_encoder,
+ &log_event.timestamp);
+ }
+
+ /* iterate through the old record map and add it to the new buffer */
+ kv = obj->via.map.ptr;
+
+ for(i=0;
+ i < obj->via.map.size &&
+ ret == FLB_EVENT_ENCODER_SUCCESS;
+ i++) {
+ ret = flb_log_event_encoder_append_body_values(
+ &log_encoder,
+ FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv[i].key),
+ FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv[i].val));
+ }
+
+ /* append new keys */
+ if (ctx->availability_zone_include &&
+ ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_append_body_values(
+ &log_encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE(FLB_FILTER_AWS_AVAILABILITY_ZONE_KEY),
+ FLB_LOG_EVENT_STRING_VALUE(ctx->availability_zone,
+ ctx->availability_zone_len));
+ }
+
+ if (ctx->instance_id_include &&
+ ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_append_body_values(
+ &log_encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE(FLB_FILTER_AWS_INSTANCE_ID_KEY),
+ FLB_LOG_EVENT_STRING_VALUE(ctx->instance_id,
+ ctx->instance_id_len));
+ }
+
+ if (ctx->instance_type_include &&
+ ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_append_body_values(
+ &log_encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE(FLB_FILTER_AWS_INSTANCE_TYPE_KEY),
+ FLB_LOG_EVENT_STRING_VALUE(ctx->instance_type,
+ ctx->instance_type_len));
+ }
+
+ if (ctx->private_ip_include &&
+ ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_append_body_values(
+ &log_encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE(FLB_FILTER_AWS_PRIVATE_IP_KEY),
+ FLB_LOG_EVENT_STRING_VALUE(ctx->private_ip,
+ ctx->private_ip_len));
+ }
+
+ if (ctx->vpc_id_include &&
+ ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_append_body_values(
+ &log_encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE(FLB_FILTER_AWS_VPC_ID_KEY),
+ FLB_LOG_EVENT_STRING_VALUE(ctx->vpc_id,
+ ctx->vpc_id_len));
+ }
+
+ if (ctx->ami_id_include &&
+ ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_append_body_values(
+ &log_encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE(FLB_FILTER_AWS_AMI_ID_KEY),
+ FLB_LOG_EVENT_STRING_VALUE(ctx->ami_id,
+ ctx->ami_id_len));
+ }
+
+ if (ctx->account_id_include &&
+ ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_append_body_values(
+ &log_encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE(FLB_FILTER_AWS_ACCOUNT_ID_KEY),
+ FLB_LOG_EVENT_STRING_VALUE(ctx->account_id,
+ ctx->account_id_len));
+ }
+
+ if (ctx->hostname_include &&
+ ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_append_body_values(
+ &log_encoder,
+ FLB_LOG_EVENT_CSTRING_VALUE(FLB_FILTER_AWS_HOSTNAME_KEY),
+ FLB_LOG_EVENT_STRING_VALUE(ctx->hostname,
+ ctx->hostname_len));
+ }
+
+ if (ctx->tags_enabled && ctx->tags_fetched) {
+ for (i = 0;
+ i < ctx->tags_count &&
+ ret == FLB_EVENT_ENCODER_SUCCESS;
+ i++) {
+ if (ctx->tag_is_enabled[i] == FLB_TRUE) {
+ ret = flb_log_event_encoder_append_body_values(
+ &log_encoder,
+ FLB_LOG_EVENT_STRING_VALUE(ctx->tag_keys[i],
+ ctx->tag_keys_len[i]),
+ FLB_LOG_EVENT_STRING_VALUE(ctx->tag_values[i],
+ ctx->tag_values_len[i]));
+ }
+ }
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_commit_record(&log_encoder);
+ }
+ }
+
+ if (ret == FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA &&
+ log_decoder.offset == bytes) {
+ ret = FLB_EVENT_ENCODER_SUCCESS;
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ *out_buf = log_encoder.output_buffer;
+ *out_size = log_encoder.output_length;
+
+ ret = FLB_FILTER_MODIFIED;
+
+ flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder);
+ }
+ else {
+ flb_plg_error(ctx->ins,
+ "Log event encoder error : %d", ret);
+
+ ret = FLB_FILTER_NOTOUCH;
+ }
+
+ flb_log_event_decoder_destroy(&log_decoder);
+ flb_log_event_encoder_destroy(&log_encoder);
+
+ return ret;
+}
+
+static void flb_filter_aws_destroy(struct flb_filter_aws *ctx)
+{
+ if (ctx->options == NULL) {
+ /* non null options are only provided by unit tests and since */
+ /* aws client mock must clean up the memory with some special behaviour */
+ /* if options are NOT null (which means we are running unit tests), */
+ /* we rely on unit tests to perform memory cleanup */
+ if (ctx->aws_ec2_filter_client) {
+ flb_aws_client_destroy(ctx->aws_ec2_filter_client);
+ }
+ }
+ if (ctx->client_imds) {
+ flb_aws_imds_destroy(ctx->client_imds);
+ }
+
+ if (ctx->availability_zone) {
+ flb_sds_destroy(ctx->availability_zone);
+ }
+
+ if (ctx->instance_id) {
+ flb_sds_destroy(ctx->instance_id);
+ }
+
+ if (ctx->instance_type) {
+ flb_sds_destroy(ctx->instance_type);
+ }
+
+ if (ctx->private_ip) {
+ flb_sds_destroy(ctx->private_ip);
+ }
+
+ if (ctx->vpc_id) {
+ flb_sds_destroy(ctx->vpc_id);
+ }
+
+ if (ctx->ami_id) {
+ flb_sds_destroy(ctx->ami_id);
+ }
+
+ if (ctx->account_id) {
+ flb_sds_destroy(ctx->account_id);
+ }
+
+ if (ctx->hostname) {
+ flb_sds_destroy(ctx->hostname);
+ }
+
+ flb_filter_aws_tags_destroy(ctx);
+
+ flb_free(ctx);
+}
+
+static int cb_aws_exit(void *data, struct flb_config *config)
+{
+ struct flb_filter_aws *ctx = data;
+
+ if (ctx != NULL) {
+ flb_filter_aws_destroy(ctx);
+ }
+ return 0;
+}
+
+/* Configuration properties map */
+static struct flb_config_map config_map[] = {
+ {
+ FLB_CONFIG_MAP_STR, "imds_version", "v2",
+ 0, FLB_FALSE, 0,
+ "Specifies which version of the EC2 instance metadata service"
+ " will be used: 'v1' or 'v2'. 'v2' may not work"
+ " if you run Fluent Bit in a container."
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "az", "true",
+ 0, FLB_TRUE, offsetof(struct flb_filter_aws, availability_zone_include),
+ "Enable EC2 instance availability zone"
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "ec2_instance_id", "true",
+ 0, FLB_TRUE, offsetof(struct flb_filter_aws, instance_id_include),
+ "Enable EC2 instance ID"
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "ec2_instance_type", "false",
+ 0, FLB_TRUE, offsetof(struct flb_filter_aws, instance_type_include),
+ "Enable EC2 instance type"
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "private_ip", "false",
+ 0, FLB_TRUE, offsetof(struct flb_filter_aws, private_ip_include),
+ "Enable EC2 instance private IP"
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "vpc_id", "false",
+ 0, FLB_TRUE, offsetof(struct flb_filter_aws, vpc_id_include),
+ "Enable EC2 instance VPC ID"
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "ami_id", "false",
+ 0, FLB_TRUE, offsetof(struct flb_filter_aws, ami_id_include),
+ "Enable EC2 instance Image ID"
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "account_id", "false",
+ 0, FLB_TRUE, offsetof(struct flb_filter_aws, account_id_include),
+ "Enable EC2 instance Account ID"
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "hostname", "false",
+ 0, FLB_TRUE, offsetof(struct flb_filter_aws, hostname_include),
+ "Enable EC2 instance hostname"
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "tags_enabled", "false",
+ 0, FLB_TRUE, offsetof(struct flb_filter_aws, tags_enabled),
+ "Enable EC2 instance tags, "
+ "injects all tags if tags_include and tags_exclude are empty"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "tags_include", "",
+ 0, FLB_FALSE, 0,
+ "Defines list of specific EC2 tag keys to inject into the logs; "
+ "tag keys must be separated by \",\" character; "
+ "tags which are not present in this list will be ignored; "
+ "e.g.: \"Name,tag1,tag2\""
+ },
+ {
+ FLB_CONFIG_MAP_STR, "tags_exclude", "",
+ 0, FLB_FALSE, 0,
+ "Defines list of specific EC2 tag keys not to inject into the logs; "
+ "tag keys must be separated by \",\" character; "
+ "if both tags_include and tags_exclude are specified, configuration is invalid"
+ " and plugin fails"
+ },
+ {0}
+};
+
+struct flb_filter_plugin filter_aws_plugin = {
+ .name = "aws",
+ .description = "Add AWS Metadata",
+ .cb_init = cb_aws_init,
+ .cb_filter = cb_aws_filter,
+ .cb_exit = cb_aws_exit,
+ .config_map = config_map,
+ .flags = 0
+};