summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/filter_ecs/ecs.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/plugins/filter_ecs/ecs.c')
-rw-r--r--src/fluent-bit/plugins/filter_ecs/ecs.c1760
1 files changed, 1760 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/filter_ecs/ecs.c b/src/fluent-bit/plugins/filter_ecs/ecs.c
new file mode 100644
index 000000000..82339e60e
--- /dev/null
+++ b/src/fluent-bit/plugins/filter_ecs/ecs.c
@@ -0,0 +1,1760 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <fluent-bit/flb_aws_util.h>
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_filter.h>
+#include <fluent-bit/flb_filter_plugin.h>
+#include <fluent-bit/flb_http_client.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_str.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_sds.h>
+#include <fluent-bit/flb_upstream.h>
+#include <fluent-bit/flb_io.h>
+#include <fluent-bit/flb_kv.h>
+#include <fluent-bit/flb_env.h>
+#include <fluent-bit/flb_log_event_decoder.h>
+#include <fluent-bit/flb_log_event_encoder.h>
+
+#include <monkey/mk_core/mk_list.h>
+#include <msgpack.h>
+#include <stdlib.h>
+#include <errno.h>
+
+#include "ecs.h"
+
+static int get_ecs_cluster_metadata(struct flb_filter_ecs *ctx);
+static void flb_filter_ecs_destroy(struct flb_filter_ecs *ctx);
+
+/* cluster meta is static so we can expose it on global ctx for other plugins to use */
+static void expose_ecs_cluster_meta(struct flb_filter_ecs *ctx)
+{
+ struct flb_env *env;
+ struct flb_config *config = ctx->ins->config;
+
+ env = config->env;
+
+ flb_env_set(env, "ecs", "enabled");
+
+ if (ctx->cluster_metadata.cluster_name) {
+ flb_env_set(env,
+ "aws.ecs.cluster_name",
+ ctx->cluster_metadata.cluster_name);
+ }
+
+ if (ctx->cluster_metadata.container_instance_arn) {
+ flb_env_set(env,
+ "aws.ecs.container_instance_arn",
+ ctx->cluster_metadata.container_instance_arn);
+ }
+
+ if (ctx->cluster_metadata.container_instance_id) {
+ flb_env_set(env,
+ "aws.ecs.container_instance_id",
+ ctx->cluster_metadata.container_instance_id);
+ }
+
+ if (ctx->cluster_metadata.ecs_agent_version) {
+ flb_env_set(env,
+ "aws.ecs.ecs_agent_version",
+ ctx->cluster_metadata.container_instance_id);
+ }
+}
+
+static int cb_ecs_init(struct flb_filter_instance *f_ins,
+ struct flb_config *config,
+ void *data)
+{
+ int ret;
+ struct flb_filter_ecs *ctx = NULL;
+ struct mk_list *head;
+ struct mk_list *split;
+ struct flb_kv *kv;
+ struct flb_split_entry *sentry;
+ int list_size;
+ struct flb_ecs_metadata_key *ecs_meta = NULL;
+ (void) data;
+
+ /* Create context */
+ ctx = flb_calloc(1, sizeof(struct flb_filter_ecs));
+ if (!ctx) {
+ flb_errno();
+ return -1;
+ }
+
+ ctx->ins = f_ins;
+
+ /* Populate context with config map defaults and incoming properties */
+ ret = flb_filter_config_map_set(f_ins, (void *) ctx);
+ if (ret == -1) {
+ flb_plg_error(f_ins, "configuration error");
+ flb_free(ctx);
+ return -1;
+ }
+
+ mk_list_init(&ctx->metadata_keys);
+ ctx->metadata_keys_len = 0;
+ mk_list_init(&ctx->metadata_buffers);
+
+ mk_list_foreach(head, &f_ins->properties) {
+ kv = mk_list_entry(head, struct flb_kv, _head);
+
+ if (strcasecmp(kv->key, "add") == 0) {
+ split = flb_utils_split(kv->val, ' ', 2);
+ list_size = mk_list_size(split);
+
+ if (list_size == 0 || list_size > 2) {
+ flb_plg_error(ctx->ins, "Invalid config for %s", kv->key);
+ flb_utils_split_free(split);
+ goto error;
+ }
+
+ sentry = mk_list_entry_first(split, struct flb_split_entry, _head);
+ ecs_meta = flb_calloc(1, sizeof(struct flb_ecs_metadata_key));
+ if (!ecs_meta) {
+ flb_errno();
+ flb_utils_split_free(split);
+ goto error;
+ }
+
+ ecs_meta->key = flb_sds_create_len(sentry->value, sentry->len);
+ if (!ecs_meta->key) {
+ flb_errno();
+ flb_utils_split_free(split);
+ goto error;
+ }
+
+ sentry = mk_list_entry_last(split, struct flb_split_entry, _head);
+
+ ecs_meta->template = flb_sds_create_len(sentry->value, sentry->len);
+ if (!ecs_meta->template) {
+ flb_errno();
+ flb_utils_split_free(split);
+ goto error;
+ }
+
+ ecs_meta->ra = flb_ra_create(ecs_meta->template, FLB_FALSE);
+ if (ecs_meta->ra == NULL) {
+ flb_plg_error(ctx->ins, "Could not parse template for `%s`", ecs_meta->key);
+ flb_utils_split_free(split);
+ goto error;
+ }
+
+ mk_list_add(&ecs_meta->_head, &ctx->metadata_keys);
+ ctx->metadata_keys_len++;
+ flb_utils_split_free(split);
+ }
+ }
+
+ ctx->ecs_upstream = flb_upstream_create(config,
+ ctx->ecs_host,
+ ctx->ecs_port,
+ FLB_IO_TCP,
+ NULL);
+
+ if (!ctx->ecs_upstream) {
+ flb_errno();
+ flb_plg_error(ctx->ins, "Could not create upstream connection to ECS Agent");
+ goto error;
+ }
+
+ flb_stream_disable_async_mode(&ctx->ecs_upstream->base);
+ ctx->has_cluster_metadata = FLB_FALSE;
+
+ /* entries are only evicted when TTL is reached and a get is issued */
+ ctx->container_hash_table = flb_hash_table_create_with_ttl(ctx->ecs_meta_cache_ttl,
+ FLB_HASH_TABLE_EVICT_OLDER,
+ FLB_ECS_FILTER_HASH_TABLE_SIZE,
+ FLB_ECS_FILTER_HASH_TABLE_SIZE);
+ if (!ctx->container_hash_table) {
+ flb_plg_error(f_ins, "failed to create container_hash_table");
+ goto error;
+ }
+
+ ctx->failed_metadata_request_tags = flb_hash_table_create_with_ttl(ctx->ecs_meta_cache_ttl,
+ FLB_HASH_TABLE_EVICT_OLDER,
+ FLB_ECS_FILTER_HASH_TABLE_SIZE,
+ FLB_ECS_FILTER_HASH_TABLE_SIZE);
+ if (!ctx->failed_metadata_request_tags) {
+ flb_plg_error(f_ins, "failed to create failed_metadata_request_tags table");
+ goto error;
+ }
+
+ ctx->ecs_tag_prefix_len = strlen(ctx->ecs_tag_prefix);
+
+ /* attempt to get metadata in init, can retry in cb_filter */
+ ret = get_ecs_cluster_metadata(ctx);
+
+ flb_filter_set_context(f_ins, ctx);
+ return 0;
+
+error:
+ flb_plg_error(ctx->ins, "Initialization failed.");
+ flb_filter_ecs_destroy(ctx);
+ return -1;
+}
+
+static int plugin_under_test()
+{
+ if (getenv("FLB_ECS_PLUGIN_UNDER_TEST") != NULL) {
+ return FLB_TRUE;
+ }
+
+ return FLB_FALSE;
+}
+
+static char *mock_error_response(char *error_env_var)
+{
+ char *err_val = NULL;
+ char *error = NULL;
+ int len = 0;
+
+ err_val = getenv(error_env_var);
+ if (err_val != NULL && strlen(err_val) > 0) {
+ error = flb_malloc(strlen(err_val) + sizeof(char));
+ if (error == NULL) {
+ flb_errno();
+ return NULL;
+ }
+
+ len = strlen(err_val);
+ memcpy(error, err_val, len);
+ error[len] = '\0';
+ return error;
+ }
+
+ return NULL;
+}
+
+static struct flb_http_client *mock_http_call(char *error_env_var, char *api)
+{
+ /* create an http client so that we can set the response */
+ struct flb_http_client *c = NULL;
+ char *error = mock_error_response(error_env_var);
+
+ c = flb_calloc(1, sizeof(struct flb_http_client));
+ if (!c) {
+ flb_errno();
+ flb_free(error);
+ return NULL;
+ }
+ mk_list_init(&c->headers);
+
+ if (error != NULL) {
+ c->resp.status = 400;
+ /* resp.data is freed on destroy, payload is supposed to reference it */
+ c->resp.data = error;
+ c->resp.payload = c->resp.data;
+ c->resp.payload_size = strlen(error);
+ }
+ else {
+ c->resp.status = 200;
+ if (strcmp(api, "Cluster") == 0) {
+ /* mocked success response */
+ c->resp.payload = "{\"Cluster\": \"cluster_name\",\"ContainerInstanceArn\": \"arn:aws:ecs:region:aws_account_id:container-instance/cluster_name/container_instance_id\",\"Version\": \"Amazon ECS Agent - v1.30.0 (02ff320c)\"}";
+ c->resp.payload_size = strlen(c->resp.payload);
+ }
+ else {
+ c->resp.payload = "{\"Arn\": \"arn:aws:ecs:us-west-2:012345678910:task/default/e01d58a8-151b-40e8-bc01-22647b9ecfec\",\"Containers\": [{\"DockerId\": \"79c796ed2a7f864f485c76f83f3165488097279d296a7c05bd5201a1c69b2920\",\"DockerName\": \"ecs-nginx-efs-2-nginx-9ac0808dd0afa495f001\",\"Name\": \"nginx\"}],\"DesiredStatus\": \"RUNNING\",\"Family\": \"nginx-efs\",\"KnownStatus\": \"RUNNING\",\"Version\": \"2\"}";
+ c->resp.payload_size = strlen(c->resp.payload);
+ }
+ }
+
+ return c;
+}
+
+/*
+ * Both container instance and task ARNs have the ID at the end after last '/'
+ */
+static flb_sds_t parse_id_from_arn(const char *arn, int len)
+{
+ int i;
+ flb_sds_t ID = NULL;
+ int last_slash = 0;
+ int id_start = 0;
+
+ for (i = 0; i < len; i++) {
+ if (arn[i] == '/') {
+ last_slash = i;
+ }
+ }
+
+ if (last_slash == 0 || last_slash >= len - 2) {
+ return NULL;
+ }
+ id_start = last_slash + 1;
+
+ ID = flb_sds_create_len(arn + id_start, len - id_start);
+ if (ID == NULL) {
+ flb_errno();
+ return NULL;
+ }
+
+ return ID;
+}
+
+/*
+ * This deserializes the msgpack metadata buf to msgpack_object
+ * which can be used with flb_ra_translate in the main filter callback
+ */
+static int flb_ecs_metadata_buffer_init(struct flb_filter_ecs *ctx,
+ struct flb_ecs_metadata_buffer *meta)
+{
+ msgpack_unpacked result;
+ msgpack_object root;
+ size_t off = 0;
+ int ret;
+
+ msgpack_unpacked_init(&result);
+ ret = msgpack_unpack_next(&result, meta->buf, meta->size, &off);
+ if (ret != MSGPACK_UNPACK_SUCCESS) {
+ flb_plg_error(ctx->ins, "Cannot unpack flb_ecs_metadata_buffer");
+ msgpack_unpacked_destroy(&result);
+ return -1;
+ }
+
+ root = result.data;
+ if (root.type != MSGPACK_OBJECT_MAP) {
+ flb_plg_error(ctx->ins, "Cannot unpack flb_ecs_metadata_buffer, msgpack_type=%i",
+ root.type);
+ msgpack_unpacked_destroy(&result);
+ return -1;
+ }
+
+ meta->unpacked = result;
+ meta->obj = root;
+ meta->last_used_time = time(NULL);
+ meta->free_packer = FLB_TRUE;
+
+ return 0;
+}
+
+static void flb_ecs_metadata_buffer_destroy(struct flb_ecs_metadata_buffer *meta)
+{
+ if (meta) {
+ flb_free(meta->buf);
+ if (meta->free_packer == FLB_TRUE) {
+ msgpack_unpacked_destroy(&meta->unpacked);
+ }
+ if (meta->id) {
+ flb_sds_destroy(meta->id);
+ }
+ flb_free(meta);
+ }
+}
+
+/*
+ * Get cluster and container instance info, which are static and never change
+ */
+static int get_ecs_cluster_metadata(struct flb_filter_ecs *ctx)
+{
+ struct flb_http_client *c;
+ struct flb_connection *u_conn;
+ int ret;
+ int root_type;
+ int found_cluster = FLB_FALSE;
+ int found_version = FLB_FALSE;
+ int found_instance = FLB_FALSE;
+ int free_conn = FLB_FALSE;
+ int i;
+ int len;
+ char *buffer;
+ size_t size;
+ size_t b_sent;
+ size_t off = 0;
+ msgpack_unpacked result;
+ msgpack_object root;
+ msgpack_object key;
+ msgpack_object val;
+ msgpack_sbuffer tmp_sbuf;
+ msgpack_packer tmp_pck;
+ flb_sds_t container_instance_id = NULL;
+ flb_sds_t tmp = NULL;
+
+ /* Compose HTTP Client request*/
+ if (plugin_under_test() == FLB_TRUE) {
+ c = mock_http_call("TEST_CLUSTER_ERROR", "Cluster");
+ ret = 0;
+ }
+ else {
+ u_conn = flb_upstream_conn_get(ctx->ecs_upstream);
+
+ if (!u_conn) {
+ flb_plg_error(ctx->ins, "ECS agent introspection endpoint connection error");
+ return -1;
+ }
+ free_conn = FLB_TRUE;
+ c = flb_http_client(u_conn, FLB_HTTP_GET,
+ FLB_ECS_FILTER_CLUSTER_PATH,
+ NULL, 0,
+ ctx->ecs_host, ctx->ecs_port,
+ NULL, 0);
+ flb_http_buffer_size(c, 0); /* 0 means unlimited */
+
+ flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
+
+ ret = flb_http_do(c, &b_sent);
+ flb_plg_debug(ctx->ins, "http_do=%i, "
+ "HTTP Status: %i",
+ ret, c->resp.status);
+ }
+
+ if (ret != 0 || c->resp.status != 200) {
+ if (c->resp.payload_size > 0) {
+ flb_plg_warn(ctx->ins, "Failed to get metadata from %s, will retry",
+ FLB_ECS_FILTER_CLUSTER_PATH);
+ flb_plg_debug(ctx->ins, "HTTP response\n%s",
+ c->resp.payload);
+ } else {
+ flb_plg_warn(ctx->ins, "%s response status was %d with no payload, will retry",
+ FLB_ECS_FILTER_CLUSTER_PATH,
+ c->resp.status);
+ }
+ flb_http_client_destroy(c);
+ if (free_conn == FLB_TRUE) {
+ flb_upstream_conn_release(u_conn);
+ }
+ return -1;
+ }
+
+ if (free_conn == FLB_TRUE) {
+ flb_upstream_conn_release(u_conn);
+ }
+
+ ret = flb_pack_json(c->resp.payload, c->resp.payload_size,
+ &buffer, &size, &root_type, NULL);
+
+ if (ret < 0) {
+ flb_plg_warn(ctx->ins, "Could not parse response from %s; response=\n%s",
+ FLB_ECS_FILTER_CLUSTER_PATH, c->resp.payload);
+ flb_http_client_destroy(c);
+ return -1;
+ }
+
+ /* parse metadata response */
+ msgpack_unpacked_init(&result);
+ ret = msgpack_unpack_next(&result, buffer, size, &off);
+ if (ret != MSGPACK_UNPACK_SUCCESS) {
+ flb_plg_error(ctx->ins, "Cannot unpack %s response to find metadata\n%s",
+ FLB_ECS_FILTER_CLUSTER_PATH, c->resp.payload);
+ flb_free(buffer);
+ msgpack_unpacked_destroy(&result);
+ flb_http_client_destroy(c);
+ return -1;
+ }
+
+ flb_http_client_destroy(c);
+
+ root = result.data;
+ if (root.type != MSGPACK_OBJECT_MAP) {
+ flb_plg_error(ctx->ins, "%s response parsing failed, msgpack_type=%i",
+ FLB_ECS_FILTER_CLUSTER_PATH,
+ root.type);
+ flb_free(buffer);
+ msgpack_unpacked_destroy(&result);
+ return -1;
+ }
+
+ /*
+Metadata Response:
+{
+ "Cluster": "cluster_name",
+ "ContainerInstanceArn": "arn:aws:ecs:region:aws_account_id:container-instance/cluster_name/container_instance_id",
+ "Version": "Amazon ECS Agent - v1.30.0 (02ff320c)"
+}
+But our metadata keys names are:
+{
+ "ClusterName": "cluster_name",
+ "ContainerInstanceArn": "arn:aws:ecs:region:aws_account_id:container-instance/cluster_name/container_instance_id",
+ "ContainerInstanceID": "container_instance_id"
+ "ECSAgentVersion": "Amazon ECS Agent - v1.30.0 (02ff320c)"
+}
+ */
+
+ for (i = 0; i < root.via.map.size; i++) {
+ key = root.via.map.ptr[i].key;
+ if (key.type != MSGPACK_OBJECT_STR) {
+ flb_plg_error(ctx->ins, "%s response parsing failed, msgpack key type=%i",
+ FLB_ECS_FILTER_CLUSTER_PATH,
+ key.type);
+ continue;
+ }
+
+ if (key.via.str.size == 7 && strncmp(key.via.str.ptr, "Cluster", 7) == 0) {
+ val = root.via.map.ptr[i].val;
+ if (val.type != MSGPACK_OBJECT_STR) {
+ flb_plg_error(ctx->ins, "metadata parsing: unexpected 'Cluster' value type=%i",
+ val.type);
+ flb_free(buffer);
+ msgpack_unpacked_destroy(&result);
+ return -1;
+ }
+
+ found_cluster = FLB_TRUE;
+ if (ctx->cluster_metadata.cluster_name == NULL) {
+ tmp = flb_sds_create_len(val.via.str.ptr, (int) val.via.str.size);
+ if (!tmp) {
+ flb_errno();
+ flb_free(buffer);
+ msgpack_unpacked_destroy(&result);
+ return -1;
+ }
+ ctx->cluster_metadata.cluster_name = tmp;
+ }
+
+ }
+ else if (key.via.str.size == 20 && strncmp(key.via.str.ptr, "ContainerInstanceArn", 20) == 0) {
+ val = root.via.map.ptr[i].val;
+ if (val.type != MSGPACK_OBJECT_STR) {
+ flb_plg_error(ctx->ins, "metadata parsing: unexpected 'ContainerInstanceArn' value type=%i",
+ val.type);
+ flb_free(buffer);
+ msgpack_unpacked_destroy(&result);
+ return -1;
+ }
+
+ /* first the ARN */
+ found_instance = FLB_TRUE;
+ if (ctx->cluster_metadata.container_instance_arn == NULL) {
+ tmp = flb_sds_create_len(val.via.str.ptr, (int) val.via.str.size);
+ if (!tmp) {
+ flb_errno();
+ flb_free(buffer);
+ msgpack_unpacked_destroy(&result);
+ return -1;
+ }
+ ctx->cluster_metadata.container_instance_arn = tmp;
+ }
+
+ /* then the ID */
+ if (ctx->cluster_metadata.container_instance_id == NULL) {
+ container_instance_id = parse_id_from_arn(val.via.str.ptr, (int) val.via.str.size);
+ if (container_instance_id == NULL) {
+ flb_plg_error(ctx->ins, "metadata parsing: failed to get ID from %.*s",
+ (int) val.via.str.size, val.via.str.ptr);
+ flb_free(buffer);
+ msgpack_unpacked_destroy(&result);
+ return -1;
+ }
+ ctx->cluster_metadata.container_instance_id = container_instance_id;
+ }
+
+ } else if (key.via.str.size == 7 && strncmp(key.via.str.ptr, "Version", 7) == 0) {
+ val = root.via.map.ptr[i].val;
+ if (val.type != MSGPACK_OBJECT_STR) {
+ flb_plg_error(ctx->ins, "metadata parsing: unexpected 'Version' value type=%i",
+ val.type);
+ flb_free(buffer);
+ msgpack_unpacked_destroy(&result);
+ return -1;
+ }
+
+ found_version = FLB_TRUE;
+ if (ctx->cluster_metadata.ecs_agent_version == NULL) {
+ tmp = flb_sds_create_len(val.via.str.ptr, (int) val.via.str.size);
+ if (!tmp) {
+ flb_errno();
+ flb_free(buffer);
+ msgpack_unpacked_destroy(&result);
+ return -1;
+ }
+ ctx->cluster_metadata.ecs_agent_version = tmp;
+ }
+ }
+
+ }
+
+ flb_free(buffer);
+ msgpack_unpacked_destroy(&result);
+
+ if (found_cluster == FLB_FALSE) {
+ flb_plg_error(ctx->ins, "Could not parse 'Cluster' from %s response",
+ FLB_ECS_FILTER_CLUSTER_PATH);
+ return -1;
+ }
+ if (found_instance == FLB_FALSE) {
+ flb_plg_error(ctx->ins, "Could not parse 'ContainerInstanceArn' from %s response",
+ FLB_ECS_FILTER_CLUSTER_PATH);
+ return -1;
+ }
+ if (found_version == FLB_FALSE) {
+ flb_plg_error(ctx->ins, "Could not parse 'Version' from %s response",
+ FLB_ECS_FILTER_CLUSTER_PATH);
+ return -1;
+ }
+
+ /*
+ * We also create a standalone cluster metadata msgpack object
+ * This is used as a fallback for logs when we can't find the
+ * task metadata for a log. It is valid to attach cluster meta
+ * to eg. Docker daemon logs which are not an AWS ECS Task via
+ * the `cluster_metadata_only` setting.
+ */
+ msgpack_sbuffer_init(&tmp_sbuf);
+ msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write);
+ msgpack_pack_map(&tmp_pck, 4);
+
+ msgpack_pack_str(&tmp_pck, 11);
+ msgpack_pack_str_body(&tmp_pck,
+ "ClusterName",
+ 11);
+ len = flb_sds_len(ctx->cluster_metadata.cluster_name);
+ msgpack_pack_str(&tmp_pck, len);
+ msgpack_pack_str_body(&tmp_pck,
+ ctx->cluster_metadata.cluster_name,
+ len);
+
+ msgpack_pack_str(&tmp_pck, 20);
+ msgpack_pack_str_body(&tmp_pck,
+ "ContainerInstanceArn",
+ 20);
+ len = flb_sds_len(ctx->cluster_metadata.container_instance_arn);
+ msgpack_pack_str(&tmp_pck, len);
+ msgpack_pack_str_body(&tmp_pck,
+ ctx->cluster_metadata.container_instance_arn,
+ len);
+
+ msgpack_pack_str(&tmp_pck, 19);
+ msgpack_pack_str_body(&tmp_pck,
+ "ContainerInstanceID",
+ 19);
+ len = flb_sds_len(ctx->cluster_metadata.container_instance_id);
+ msgpack_pack_str(&tmp_pck, len);
+ msgpack_pack_str_body(&tmp_pck,
+ ctx->cluster_metadata.container_instance_id,
+ len);
+
+ msgpack_pack_str(&tmp_pck, 15);
+ msgpack_pack_str_body(&tmp_pck,
+ "ECSAgentVersion",
+ 15);
+ len = flb_sds_len(ctx->cluster_metadata.ecs_agent_version);
+ msgpack_pack_str(&tmp_pck, len);
+ msgpack_pack_str_body(&tmp_pck,
+ ctx->cluster_metadata.ecs_agent_version,
+ len);
+
+ ctx->cluster_meta_buf.buf = tmp_sbuf.data;
+ ctx->cluster_meta_buf.size = tmp_sbuf.size;
+
+ ret = flb_ecs_metadata_buffer_init(ctx, &ctx->cluster_meta_buf);
+ if (ret < 0) {
+ flb_plg_error(ctx->ins, "Could not init metadata buffer from %s response",
+ FLB_ECS_FILTER_CLUSTER_PATH);
+ msgpack_sbuffer_destroy(&tmp_sbuf);
+ ctx->cluster_meta_buf.buf = NULL;
+ ctx->cluster_meta_buf.size = 0;
+ return -1;
+ }
+
+ ctx->has_cluster_metadata = FLB_TRUE;
+ expose_ecs_cluster_meta(ctx);
+ return 0;
+}
+
+/*
+ * This is the helper function used by get_task_metadata()
+ * that actually creates the final metadata msgpack buffer
+ * with our final key names.
+ * It collects cluster, task, and container metadata into one
+The new metadata msgpack is flat and looks like:
+{
+ "ContainerID": "79c796ed2a7f864f485c76f83f3165488097279d296a7c05bd5201a1c69b2920",
+ "DockerContainerName": "ecs-nginx-efs-2-nginx-9ac0808dd0afa495f001",
+ "ECSContainerName": "nginx",
+
+ "ClusterName": "cluster_name",
+ "ContainerInstanceArn": "arn:aws:ecs:region:aws_account_id:container-instance/cluster_name/container_instance_id",
+ "ContainerInstanceID": "container_instance_id"
+ "ECSAgentVersion": "Amazon ECS Agent - v1.30.0 (02ff320c)"
+
+ "TaskARN": "arn:aws:ecs:us-west-2:012345678910:task/default/example5-58ff-46c9-ae05-543f8example",
+ "TaskID: "example5-58ff-46c9-ae05-543f8example",
+ "TaskDefinitionFamily": "hello_world",
+ "TaskDefinitionVersion": "8",
+}
+ */
+static int process_container_response(struct flb_filter_ecs *ctx,
+ msgpack_object container,
+ struct flb_ecs_task_metadata task_meta)
+{
+ int ret;
+ int found_id = FLB_FALSE;
+ int found_ecs_name = FLB_FALSE;
+ int found_docker_name = FLB_FALSE;
+ int i;
+ int len;
+ struct flb_ecs_metadata_buffer *cont_meta_buf;
+ msgpack_object key;
+ msgpack_object val;
+ msgpack_sbuffer tmp_sbuf;
+ msgpack_packer tmp_pck;
+ flb_sds_t short_id = NULL;
+
+ /*
+ * We copy the metadata response to a new buffer
+ * So we can define the metadata key names
+ */
+ msgpack_sbuffer_init(&tmp_sbuf);
+ msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write);
+
+ /* 3 container metadata keys, 4 for instance/cluster, 4 for the task */
+ msgpack_pack_map(&tmp_pck, 11);
+
+ /* 1st- process/pack the raw container metadata response */
+ for (i = 0; i < container.via.map.size; i++) {
+ key = container.via.map.ptr[i].key;
+ if (key.type != MSGPACK_OBJECT_STR) {
+ flb_plg_error(ctx->ins, "Container metadata parsing failed, msgpack key type=%i",
+ key.type);
+ continue;
+ }
+
+ if (key.via.str.size == 8 && strncmp(key.via.str.ptr, "DockerId", 8) == 0) {
+ val = container.via.map.ptr[i].val;
+ if (val.type != MSGPACK_OBJECT_STR) {
+ flb_plg_error(ctx->ins, "metadata parsing: unexpected 'DockerId' value type=%i",
+ val.type);
+ msgpack_sbuffer_destroy(&tmp_sbuf);
+ if (short_id != NULL) {
+ flb_sds_destroy(short_id);
+ }
+ return -1;
+ }
+
+ /* save the short ID for hash table key */
+ short_id = flb_sds_create_len(val.via.str.ptr, 12);
+ if (!short_id) {
+ flb_errno();
+ msgpack_sbuffer_destroy(&tmp_sbuf);
+ return -1;
+ }
+
+ found_id = FLB_TRUE;
+ msgpack_pack_str(&tmp_pck, 11);
+ msgpack_pack_str_body(&tmp_pck,
+ "ContainerID",
+ 11);
+ msgpack_pack_str(&tmp_pck, (int) val.via.str.size);
+ msgpack_pack_str_body(&tmp_pck,
+ val.via.str.ptr,
+ (int) val.via.str.size);
+ }
+ else if (key.via.str.size == 10 && strncmp(key.via.str.ptr, "DockerName", 10) == 0) {
+ val = container.via.map.ptr[i].val;
+ if (val.type != MSGPACK_OBJECT_STR) {
+ flb_plg_error(ctx->ins, "metadata parsing: unexpected 'DockerName' value type=%i",
+ val.type);
+ msgpack_sbuffer_destroy(&tmp_sbuf);
+ if (short_id != NULL) {
+ flb_sds_destroy(short_id);
+ }
+ return -1;
+ }
+
+ /* first pack the ARN */
+ found_docker_name = FLB_TRUE;
+ msgpack_pack_str(&tmp_pck, 19);
+ msgpack_pack_str_body(&tmp_pck,
+ "DockerContainerName",
+ 19);
+ msgpack_pack_str(&tmp_pck, (int) val.via.str.size);
+ msgpack_pack_str_body(&tmp_pck,
+ val.via.str.ptr,
+ (int) val.via.str.size);
+ } else if (key.via.str.size == 4 && strncmp(key.via.str.ptr, "Name", 4) == 0) {
+ val = container.via.map.ptr[i].val;
+ if (val.type != MSGPACK_OBJECT_STR) {
+ flb_plg_error(ctx->ins, "metadata parsing: unexpected 'Name' value type=%i",
+ val.type);
+ msgpack_sbuffer_destroy(&tmp_sbuf);
+ if (short_id != NULL) {
+ flb_sds_destroy(short_id);
+ }
+ return -1;
+ }
+
+ found_ecs_name = FLB_TRUE;
+ msgpack_pack_str(&tmp_pck, 16);
+ msgpack_pack_str_body(&tmp_pck,
+ "ECSContainerName",
+ 16);
+ msgpack_pack_str(&tmp_pck, (int) val.via.str.size);
+ msgpack_pack_str_body(&tmp_pck,
+ val.via.str.ptr,
+ (int) val.via.str.size);
+ }
+ }
+
+ if (found_id == FLB_FALSE) {
+ flb_plg_error(ctx->ins, "Could not parse Task 'DockerId' from container response");
+ msgpack_sbuffer_destroy(&tmp_sbuf);
+ return -1;
+ }
+ if (found_docker_name == FLB_FALSE) {
+ flb_plg_error(ctx->ins, "Could not parse 'DockerName' from container response");
+ msgpack_sbuffer_destroy(&tmp_sbuf);
+ if (short_id != NULL) {
+ flb_sds_destroy(short_id);
+ }
+ return -1;
+ }
+ if (found_ecs_name == FLB_FALSE) {
+ flb_plg_error(ctx->ins, "Could not parse 'Name' from container response");
+ msgpack_sbuffer_destroy(&tmp_sbuf);
+ if (short_id != NULL) {
+ flb_sds_destroy(short_id);
+ }
+ return -1;
+ }
+
+ /* 2nd - Add the task fields from the task_meta temp buf we were given */
+ msgpack_pack_str(&tmp_pck, 20);
+ msgpack_pack_str_body(&tmp_pck,
+ "TaskDefinitionFamily",
+ 20);
+ msgpack_pack_str(&tmp_pck, task_meta.task_def_family_len);
+ msgpack_pack_str_body(&tmp_pck,
+ task_meta.task_def_family,
+ task_meta.task_def_family_len);
+
+ msgpack_pack_str(&tmp_pck, 7);
+ msgpack_pack_str_body(&tmp_pck,
+ "TaskARN",
+ 7);
+ msgpack_pack_str(&tmp_pck, task_meta.task_arn_len);
+ msgpack_pack_str_body(&tmp_pck,
+ task_meta.task_arn,
+ task_meta.task_arn_len);
+ msgpack_pack_str(&tmp_pck, 6);
+ msgpack_pack_str_body(&tmp_pck,
+ "TaskID",
+ 6);
+ msgpack_pack_str(&tmp_pck, task_meta.task_id_len);
+ msgpack_pack_str_body(&tmp_pck,
+ task_meta.task_id,
+ task_meta.task_id_len);
+
+ msgpack_pack_str(&tmp_pck, 21);
+ msgpack_pack_str_body(&tmp_pck,
+ "TaskDefinitionVersion",
+ 21);
+ msgpack_pack_str(&tmp_pck, task_meta.task_def_version_len);
+ msgpack_pack_str_body(&tmp_pck,
+ task_meta.task_def_version,
+ task_meta.task_def_version_len);
+
+ /* 3rd - Add the static cluster fields from the plugin context */
+ msgpack_pack_str(&tmp_pck, 11);
+ msgpack_pack_str_body(&tmp_pck,
+ "ClusterName",
+ 11);
+ len = flb_sds_len(ctx->cluster_metadata.cluster_name);
+ msgpack_pack_str(&tmp_pck, len);
+ msgpack_pack_str_body(&tmp_pck,
+ ctx->cluster_metadata.cluster_name,
+ len);
+
+ msgpack_pack_str(&tmp_pck, 20);
+ msgpack_pack_str_body(&tmp_pck,
+ "ContainerInstanceArn",
+ 20);
+ len = flb_sds_len(ctx->cluster_metadata.container_instance_arn);
+ msgpack_pack_str(&tmp_pck, len);
+ msgpack_pack_str_body(&tmp_pck,
+ ctx->cluster_metadata.container_instance_arn,
+ len);
+
+ msgpack_pack_str(&tmp_pck, 19);
+ msgpack_pack_str_body(&tmp_pck,
+ "ContainerInstanceID",
+ 19);
+ len = flb_sds_len(ctx->cluster_metadata.container_instance_id);
+ msgpack_pack_str(&tmp_pck, len);
+ msgpack_pack_str_body(&tmp_pck,
+ ctx->cluster_metadata.container_instance_id,
+ len);
+
+ msgpack_pack_str(&tmp_pck, 15);
+ msgpack_pack_str_body(&tmp_pck,
+ "ECSAgentVersion",
+ 15);
+ len = flb_sds_len(ctx->cluster_metadata.ecs_agent_version);
+ msgpack_pack_str(&tmp_pck, len);
+ msgpack_pack_str_body(&tmp_pck,
+ ctx->cluster_metadata.ecs_agent_version,
+ len);
+
+ cont_meta_buf = flb_calloc(1, sizeof(struct flb_ecs_metadata_buffer));
+ if (!cont_meta_buf) {
+ flb_errno();
+ msgpack_sbuffer_destroy(&tmp_sbuf);
+ flb_sds_destroy(short_id);
+ return -1;
+ }
+
+ cont_meta_buf->buf = tmp_sbuf.data;
+ cont_meta_buf->size = tmp_sbuf.size;
+
+ ret = flb_ecs_metadata_buffer_init(ctx, cont_meta_buf);
+ if (ret < 0) {
+ flb_plg_error(ctx->ins, "Could not init metadata buffer from container response");
+ msgpack_sbuffer_destroy(&tmp_sbuf);
+ flb_free(cont_meta_buf);
+ flb_sds_destroy(short_id);
+ return -1;
+ }
+ cont_meta_buf->id = short_id;
+ mk_list_add(&cont_meta_buf->_head, &ctx->metadata_buffers);
+
+ /*
+ * Size is set to 0 so the table just stores our pointer
+ * Otherwise it will try to copy the memory to a new buffer
+ */
+ ret = flb_hash_table_add(ctx->container_hash_table,
+ short_id, strlen(short_id),
+ cont_meta_buf, 0);
+
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "Could not add container ID %s to metadata hash table",
+ short_id);
+ flb_ecs_metadata_buffer_destroy(cont_meta_buf);
+ } else {
+ ret = 0;
+ flb_plg_debug(ctx->ins, "Added `%s` to container metadata hash table",
+ short_id);
+ }
+ return ret;
+}
+
+/*
+ * Gets the container and task metadata for a task via a container's
+ * 12 char short ID. This can be used with the ECS Agent
+ * Introspection API: http://localhost:51678/v1/tasks?dockerid={short_id}
+ * Entries in the hash table will be added for all containers in the task
+ */
+static int get_task_metadata(struct flb_filter_ecs *ctx, char* short_id)
+{
+ struct flb_http_client *c;
+ struct flb_connection *u_conn;
+ int ret;
+ int root_type;
+ int found_task = FLB_FALSE;
+ int found_version = FLB_FALSE;
+ int found_family = FLB_FALSE;
+ int found_containers = FLB_FALSE;
+ int free_conn = FLB_FALSE;
+ int i;
+ int k;
+ char *buffer;
+ size_t size;
+ size_t b_sent;
+ size_t off = 0;
+ msgpack_unpacked result;
+ msgpack_object root;
+ msgpack_object key;
+ msgpack_object val;
+ msgpack_object container;
+ flb_sds_t tmp;
+ flb_sds_t http_path;
+ flb_sds_t task_id = NULL;
+ struct flb_ecs_task_metadata task_meta;
+
+ tmp = flb_sds_create_size(64);
+ if (!tmp) {
+ return -1;
+ }
+ http_path = flb_sds_printf(&tmp, FLB_ECS_FILTER_TASK_PATH_FORMAT, short_id);
+ if (!http_path) {
+ flb_sds_destroy(tmp);
+ return -1;
+ }
+
+ /* Compose HTTP Client request*/
+ if (plugin_under_test() == FLB_TRUE) {
+ c = mock_http_call("TEST_TASK_ERROR", "Task");
+ ret = 0;
+ }
+ else {
+ u_conn = flb_upstream_conn_get(ctx->ecs_upstream);
+
+ if (!u_conn) {
+ flb_plg_error(ctx->ins, "ECS agent introspection endpoint connection error");
+ flb_sds_destroy(http_path);
+ return -1;
+ }
+ free_conn = FLB_TRUE;
+ c = flb_http_client(u_conn, FLB_HTTP_GET,
+ http_path,
+ NULL, 0,
+ ctx->ecs_host, ctx->ecs_port,
+ NULL, 0);
+ flb_http_buffer_size(c, 0); /* 0 means unlimited */
+
+ flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
+
+ ret = flb_http_do(c, &b_sent);
+ flb_plg_debug(ctx->ins, "http_do=%i, "
+ "HTTP Status: %i",
+ ret, c->resp.status);
+ }
+
+ if (ret != 0 || c->resp.status != 200) {
+ if (c->resp.payload_size > 0) {
+ flb_plg_warn(ctx->ins, "Failed to get metadata from %s, will retry",
+ http_path);
+ flb_plg_debug(ctx->ins, "HTTP response\n%s",
+ c->resp.payload);
+ } else {
+ flb_plg_warn(ctx->ins, "%s response status was %d with no payload, will retry",
+ http_path,
+ c->resp.status);
+ }
+ flb_http_client_destroy(c);
+ if (free_conn == FLB_TRUE) {
+ flb_upstream_conn_release(u_conn);
+ }
+ flb_sds_destroy(http_path);
+ return -1;
+ }
+
+ if (free_conn == FLB_TRUE) {
+ flb_upstream_conn_release(u_conn);
+ }
+
+ ret = flb_pack_json(c->resp.payload, c->resp.payload_size,
+ &buffer, &size, &root_type, NULL);
+
+ if (ret < 0) {
+ flb_plg_warn(ctx->ins, "Could not parse response from %s; response=\n%s",
+ http_path, c->resp.payload);
+ flb_sds_destroy(http_path);
+ flb_http_client_destroy(c);
+ return -1;
+ }
+
+ /* parse metadata response */
+ msgpack_unpacked_init(&result);
+ ret = msgpack_unpack_next(&result, buffer, size, &off);
+ if (ret != MSGPACK_UNPACK_SUCCESS) {
+ flb_plg_error(ctx->ins, "Cannot unpack %s response to find metadata\n%s",
+ http_path, c->resp.payload);
+ flb_free(buffer);
+ msgpack_unpacked_destroy(&result);
+ flb_sds_destroy(http_path);
+ flb_http_client_destroy(c);
+ return -1;
+ }
+
+ flb_http_client_destroy(c);
+
+ root = result.data;
+ if (root.type != MSGPACK_OBJECT_MAP) {
+ flb_plg_error(ctx->ins, "%s response parsing failed, msgpack_type=%i",
+ http_path,
+ root.type);
+ flb_free(buffer);
+ msgpack_unpacked_destroy(&result);
+ flb_sds_destroy(http_path);
+ return -1;
+ }
+
+ /*
+Metadata Response:
+{
+ "Arn": "arn:aws:ecs:us-west-2:012345678910:task/default/e01d58a8-151b-40e8-bc01-22647b9ecfec",
+ "Containers": [
+ {
+ "DockerId": "79c796ed2a7f864f485c76f83f3165488097279d296a7c05bd5201a1c69b2920",
+ "DockerName": "ecs-nginx-efs-2-nginx-9ac0808dd0afa495f001",
+ "Name": "nginx"
+ }
+ ],
+ "DesiredStatus": "RUNNING",
+ "Family": "nginx-efs",
+ "KnownStatus": "RUNNING",
+ "Version": "2"
+}
+ */
+
+ for (i = 0; i < root.via.map.size; i++) {
+ key = root.via.map.ptr[i].key;
+ if (key.type != MSGPACK_OBJECT_STR) {
+ flb_plg_error(ctx->ins, "%s response parsing failed, msgpack key type=%i",
+ http_path,
+ key.type);
+ continue;
+ }
+
+ if (key.via.str.size == 6 && strncmp(key.via.str.ptr, "Family", 6) == 0) {
+ val = root.via.map.ptr[i].val;
+ if (val.type != MSGPACK_OBJECT_STR) {
+ flb_plg_error(ctx->ins, "metadata parsing: unexpected 'Family' value type=%i",
+ val.type);
+ flb_free(buffer);
+ msgpack_unpacked_destroy(&result);
+ flb_sds_destroy(http_path);
+ if (task_id) {
+ flb_sds_destroy(task_id);
+ }
+ return -1;
+ }
+ found_family = FLB_TRUE;
+ task_meta.task_def_family = val.via.str.ptr;
+ task_meta.task_def_family_len = (int) val.via.str.size;
+ }
+ else if (key.via.str.size == 3 && strncmp(key.via.str.ptr, "Arn", 3) == 0) {
+ val = root.via.map.ptr[i].val;
+ if (val.type != MSGPACK_OBJECT_STR) {
+ flb_plg_error(ctx->ins, "metadata parsing: unexpected 'Arn' value type=%i",
+ val.type);
+ flb_free(buffer);
+ msgpack_unpacked_destroy(&result);
+ flb_sds_destroy(http_path);
+ if (task_id) {
+ flb_sds_destroy(task_id);
+ }
+ return -1;
+ }
+ /* first get the ARN */
+ found_task = FLB_TRUE;
+ task_meta.task_arn = val.via.str.ptr;
+ task_meta.task_arn_len = (int) val.via.str.size;
+
+ /* then get the ID */
+ task_id = parse_id_from_arn(val.via.str.ptr, (int) val.via.str.size);
+ if (task_id == NULL) {
+ flb_plg_error(ctx->ins, "metadata parsing: failed to get ID from %.*s",
+ (int) val.via.str.size, val.via.str.ptr);
+ flb_free(buffer);
+ msgpack_unpacked_destroy(&result);
+ flb_sds_destroy(http_path);
+ if (task_id) {
+ flb_sds_destroy(task_id);
+ }
+ return -1;
+ }
+
+ task_meta.task_id = task_id;
+ task_meta.task_id_len = flb_sds_len(task_id);
+ } else if (key.via.str.size == 7 && strncmp(key.via.str.ptr, "Version", 7) == 0) {
+ val = root.via.map.ptr[i].val;
+ if (val.type != MSGPACK_OBJECT_STR) {
+ flb_plg_error(ctx->ins, "metadata parsing: unexpected 'Version' value type=%i",
+ val.type);
+ flb_free(buffer);
+ msgpack_unpacked_destroy(&result);
+ flb_sds_destroy(http_path);
+ if (task_id) {
+ flb_sds_destroy(task_id);
+ }
+ return -1;
+ }
+ found_version = FLB_TRUE;
+ task_meta.task_def_version = val.via.str.ptr;
+ task_meta.task_def_version_len = (int) val.via.str.size;
+ } else if (key.via.str.size == 10 && strncmp(key.via.str.ptr, "Containers", 10) == 0) {
+ val = root.via.map.ptr[i].val;
+ if (val.type != MSGPACK_OBJECT_ARRAY ) {
+ flb_plg_error(ctx->ins, "metadata parsing: unexpected 'Containers' value type=%i",
+ val.type);
+ flb_free(buffer);
+ msgpack_unpacked_destroy(&result);
+ flb_sds_destroy(http_path);
+ if (task_id) {
+ flb_sds_destroy(task_id);
+ }
+ return -1;
+ }
+ found_containers = FLB_TRUE;
+ }
+ }
+
+ if (found_task == FLB_FALSE) {
+ flb_plg_error(ctx->ins, "Could not parse Task 'Arn' from %s response",
+ http_path);
+ flb_sds_destroy(http_path);
+ flb_free(buffer);
+ msgpack_unpacked_destroy(&result);
+ return -1;
+ }
+ if (found_family == FLB_FALSE) {
+ flb_plg_error(ctx->ins, "Could not parse 'Family' from %s response",
+ http_path);
+ flb_sds_destroy(http_path);
+ flb_free(buffer);
+ msgpack_unpacked_destroy(&result);
+ if (task_id) {
+ flb_sds_destroy(task_id);
+ }
+ return -1;
+ }
+ if (found_version == FLB_FALSE) {
+ flb_plg_error(ctx->ins, "Could not parse 'Version' from %s response",
+ http_path);
+ flb_sds_destroy(http_path);
+ flb_free(buffer);
+ msgpack_unpacked_destroy(&result);
+ if (task_id) {
+ flb_sds_destroy(task_id);
+ }
+ return -1;
+ }
+ if (found_containers == FLB_FALSE) {
+ flb_plg_error(ctx->ins, "Could not parse 'Containers' from %s response",
+ http_path);
+ flb_sds_destroy(http_path);
+ flb_free(buffer);
+ msgpack_unpacked_destroy(&result);
+ if (task_id) {
+ flb_sds_destroy(task_id);
+ }
+ return -1;
+ }
+
+ /*
+ * Process metadata response a 2nd time to get the Containers list
+ * This is because we need one complete metadata buf per container
+ * with all task metadata. So we collect task before we process containers.
+ */
+ for (i = 0; i < root.via.map.size; i++) {
+ key = root.via.map.ptr[i].key;
+ if (key.type != MSGPACK_OBJECT_STR) {
+ flb_plg_error(ctx->ins, "%s response parsing failed, msgpack key type=%i",
+ http_path,
+ key.type);
+ continue;
+ }
+
+ if (key.via.str.size == 10 && strncmp(key.via.str.ptr, "Containers", 10) == 0) {
+ val = root.via.map.ptr[i].val;
+ if (val.type != MSGPACK_OBJECT_ARRAY ) {
+ flb_plg_error(ctx->ins, "metadata parsing: unexpected 'Containers' value type=%i",
+ val.type);
+ flb_free(buffer);
+ msgpack_unpacked_destroy(&result);
+ flb_sds_destroy(http_path);
+ flb_sds_destroy(task_id);
+ return -1;
+ }
+
+ /* iterate through list of containers and process them*/
+ for (k = 0; k < val.via.array.size; k++) {
+ container = val.via.array.ptr[k];
+ if (container.type != MSGPACK_OBJECT_MAP) {
+ flb_plg_error(ctx->ins, "metadata parsing: unexpected 'Containers[%d]' inner value type=%i",
+ k,
+ container.type);
+ flb_free(buffer);
+ msgpack_unpacked_destroy(&result);
+ flb_sds_destroy(http_path);
+ flb_sds_destroy(task_id);
+ return -1;
+ }
+ ret = process_container_response(ctx, container, task_meta);
+ if (ret < 0) {
+ flb_plg_error(ctx->ins, "metadata parsing: failed to parse 'Containers[%d]'",
+ k);
+ flb_free(buffer);
+ msgpack_unpacked_destroy(&result);
+ flb_sds_destroy(http_path);
+ flb_sds_destroy(task_id);
+ return -1;
+ }
+ }
+ }
+ }
+
+ flb_free(buffer);
+ msgpack_unpacked_destroy(&result);
+ flb_sds_destroy(task_id);
+ flb_sds_destroy(http_path);
+ return 0;
+}
+
+static int get_metadata_by_id(struct flb_filter_ecs *ctx,
+ const char *tag, int tag_len,
+ struct flb_ecs_metadata_buffer **metadata_buffer)
+{
+ flb_sds_t container_short_id = NULL;
+ const char *tmp;
+ int ret;
+ size_t size;
+
+ if (ctx->ecs_tag_prefix_len + 12 > tag_len) {
+ flb_plg_warn(ctx->ins, "Tag '%s' length check failed: tag is expected "
+ "to be or be prefixed with '{ecs_tag_prefix}{12 character container short ID}'",
+ tag);
+ return -1;
+ }
+
+ ret = strncmp(ctx->ecs_tag_prefix, tag, ctx->ecs_tag_prefix_len);
+ if (ret != 0) {
+ flb_plg_warn(ctx->ins, "Tag '%s' is not prefixed with ecs_tag_prefix '%s'",
+ tag, ctx->ecs_tag_prefix);
+ return -1;
+ }
+
+ tmp = tag + ctx->ecs_tag_prefix_len;
+ container_short_id = flb_sds_create_len(tmp, 12);
+ if (!container_short_id) {
+ flb_errno();
+ return -1;
+ }
+
+ /* get metadata for this container */
+ ret = flb_hash_table_get(ctx->container_hash_table,
+ container_short_id, flb_sds_len(container_short_id),
+ (void **) metadata_buffer, &size);
+
+ if (ret == -1) {
+ /* try fetch metadata */
+ ret = get_task_metadata(ctx, container_short_id);
+ if (ret < 0) {
+ flb_plg_info(ctx->ins, "Requesting metadata from ECS Agent introspection endpoint failed for tag %s",
+ tag);
+ flb_sds_destroy(container_short_id);
+ return -1;
+ }
+ /* get from hash table */
+ ret = flb_hash_table_get(ctx->container_hash_table,
+ container_short_id, flb_sds_len(container_short_id),
+ (void **) metadata_buffer, &size);
+ }
+
+ flb_sds_destroy(container_short_id);
+ return ret;
+}
+
+static void clean_old_metadata_buffers(struct flb_filter_ecs *ctx)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_ecs_metadata_buffer *buf;
+ time_t now = time(NULL);
+
+ mk_list_foreach_safe(head, tmp, &ctx->metadata_buffers) {
+ buf = mk_list_entry(head, struct flb_ecs_metadata_buffer, _head);
+ if (now > (buf->last_used_time + ctx->ecs_meta_cache_ttl)) {
+ flb_plg_debug(ctx->ins, "cleaning buffer: now=%ld, ttl=%d, last_used_time=%ld",
+ (long)now, ctx->ecs_meta_cache_ttl, (long)buf->last_used_time);
+ mk_list_del(&buf->_head);
+ flb_hash_table_del(ctx->container_hash_table, buf->id);
+ flb_ecs_metadata_buffer_destroy(buf);
+ }
+ }
+}
+
+static int is_tag_marked_failed(struct flb_filter_ecs *ctx,
+ const char *tag, int tag_len)
+{
+ int ret;
+ int *val = NULL;
+ size_t val_size;
+
+ ret = flb_hash_table_get(ctx->failed_metadata_request_tags,
+ tag, tag_len,
+ (void **) &val, &val_size);
+ if (ret != -1) {
+ if (*val >= ctx->agent_endpoint_retries) {
+ return FLB_TRUE;
+ }
+ }
+
+ return FLB_FALSE;
+}
+
+static void mark_tag_failed(struct flb_filter_ecs *ctx,
+ const char *tag, int tag_len)
+{
+ int ret;
+ int *val = NULL;
+ int *new_val = NULL;
+ size_t val_size;
+
+ ret = flb_hash_table_get(ctx->failed_metadata_request_tags,
+ tag, tag_len,
+ (void **) &val, &val_size);
+
+ if (ret == -1) {
+ /* hash table copies memory to new heap block */
+ val = flb_malloc(sizeof(int));
+ if (!val) {
+ flb_errno();
+ return;
+ }
+ *val = 1;
+ flb_hash_table_add(ctx->failed_metadata_request_tags,
+ tag, tag_len,
+ val, sizeof(int));
+ /* hash table will contain a copy */
+ flb_free(val);
+ } else {
+ /*
+ * val is memory returned from hash table
+ * if we simply update the value here and call flb_hash_add
+ * it first frees the old memory (which is what we passed it)
+ * then tries to copy over the memory we passed in to a new location
+ * flb_hash stores all entries as if they were strings, so we also
+ * can't simply increment the value returned by flb_hash_get
+ */
+ new_val = flb_malloc(sizeof(int));
+ if (!new_val) {
+ flb_errno();
+ return;
+ }
+ /* increment number of failed metadata requests for this tag */
+ *new_val = *val + 1;
+ flb_hash_table_add(ctx->failed_metadata_request_tags,
+ tag, tag_len,
+ new_val, sizeof(int));
+ flb_plg_info(ctx->ins, "Failed to get ECS Metadata for tag %s %d times. "
+ "This might be because the logs for this tag do not come from an ECS Task Container. "
+ "This plugin will retry metadata requests at most %d times total for this tag.",
+ tag, *new_val, ctx->agent_endpoint_retries);
+ flb_free(new_val);
+ }
+}
+
+static int cb_ecs_filter(const void *data, size_t bytes,
+ const char *tag, int tag_len,
+ void **out_buf, size_t *out_size,
+ struct flb_filter_instance *f_ins,
+ struct flb_input_instance *i_ins,
+ void *context,
+ struct flb_config *config)
+{
+ struct flb_filter_ecs *ctx = context;
+ int i = 0;
+ int ret;
+ int check = FLB_FALSE;
+ msgpack_object *obj;
+ msgpack_object_kv *kv;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_ecs_metadata_key *metadata_key;
+ struct flb_ecs_metadata_buffer *metadata_buffer;
+ flb_sds_t val;
+ struct flb_log_event_encoder log_encoder;
+ struct flb_log_event_decoder log_decoder;
+ struct flb_log_event log_event;
+
+ (void) f_ins;
+ (void) i_ins;
+ (void) config;
+
+ /* First check that the static cluster metadata has been retrieved */
+ if (ctx->has_cluster_metadata == FLB_FALSE) {
+ ret = get_ecs_cluster_metadata(ctx);
+ if (ret < 0) {
+ flb_plg_warn(ctx->ins, "Could not retrieve cluster metadata "
+ "from ECS Agent");
+ return FLB_FILTER_NOTOUCH;
+ }
+ }
+
+ /* check if the current tag is marked as failed */
+ check = is_tag_marked_failed(ctx, tag, tag_len);
+ if (check == FLB_TRUE) {
+ flb_plg_debug(ctx->ins, "Failed to get ECS Metadata for tag %s %d times. "
+ "Will not attempt to retry the metadata request. Will attach cluster metadata only.",
+ tag, ctx->agent_endpoint_retries);
+ }
+
+ if (check == FLB_FALSE && ctx->cluster_metadata_only == FLB_FALSE) {
+ ret = get_metadata_by_id(ctx, tag, tag_len, &metadata_buffer);
+ if (ret == -1) {
+ flb_plg_info(ctx->ins, "Failed to get ECS Task metadata for %s, "
+ "falling back to process cluster metadata only. If "
+ "this is intentional, set `Cluster_Metadata_Only On`",
+ tag);
+ mark_tag_failed(ctx, tag, tag_len);
+ metadata_buffer = &ctx->cluster_meta_buf;
+ }
+ } else {
+ metadata_buffer = &ctx->cluster_meta_buf;
+ }
+
+ metadata_buffer->last_used_time = time(NULL);
+
+ ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
+
+ if (ret != FLB_EVENT_DECODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event decoder initialization error : %d", ret);
+
+ return FLB_FILTER_NOTOUCH;
+ }
+
+ ret = flb_log_event_encoder_init(&log_encoder,
+ FLB_LOG_EVENT_FORMAT_DEFAULT);
+
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event encoder initialization error : %d", ret);
+
+ flb_log_event_decoder_destroy(&log_decoder);
+
+ return FLB_FILTER_NOTOUCH;
+ }
+
+ while ((ret = flb_log_event_decoder_next(
+ &log_decoder,
+ &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
+ obj = log_event.body;
+
+ ret = flb_log_event_encoder_begin_record(&log_encoder);
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_set_timestamp(
+ &log_encoder, &log_event.timestamp);
+ }
+
+ /* iterate through the old record map and add it to the new buffer */
+ kv = obj->via.map.ptr;
+ for(i=0;
+ i < obj->via.map.size &&
+ ret == FLB_EVENT_ENCODER_SUCCESS;
+ i++) {
+ ret = flb_log_event_encoder_append_body_values(
+ &log_encoder,
+ FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv[i].key),
+ FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv[i].val));
+ }
+
+ /* append new keys */
+ mk_list_foreach_safe(head, tmp, &ctx->metadata_keys) {
+ metadata_key = mk_list_entry(head, struct flb_ecs_metadata_key, _head);
+ val = flb_ra_translate(metadata_key->ra, NULL, 0,
+ metadata_buffer->obj, NULL);
+ if (!val) {
+ flb_plg_info(ctx->ins, "Translation failed for %s : %s",
+ metadata_key->key, metadata_key->template);
+
+ flb_log_event_decoder_destroy(&log_decoder);
+ flb_log_event_encoder_destroy(&log_encoder);
+
+ return FLB_FILTER_NOTOUCH;
+ }
+
+ ret = flb_log_event_encoder_append_body_values(
+ &log_encoder,
+ FLB_LOG_EVENT_STRING_VALUE(metadata_key->key,
+ flb_sds_len(metadata_key->key)),
+ FLB_LOG_EVENT_STRING_VALUE(val, flb_sds_len(val)));
+
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ flb_plg_info(ctx->ins,
+ "Metadata appendage failed for %.*s",
+ (int) flb_sds_len(metadata_key->key),
+ metadata_key->key);
+
+ flb_log_event_decoder_destroy(&log_decoder);
+ flb_log_event_encoder_destroy(&log_encoder);
+
+ return FLB_FILTER_NOTOUCH;
+ }
+
+ flb_sds_destroy(val);
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ flb_log_event_encoder_commit_record(&log_encoder);
+ }
+ }
+
+ if (ctx->cluster_metadata_only == FLB_FALSE) {
+ clean_old_metadata_buffers(ctx);
+ }
+
+ if (ret == FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA &&
+ log_decoder.offset == bytes) {
+ ret = FLB_EVENT_ENCODER_SUCCESS;
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ *out_buf = log_encoder.output_buffer;
+ *out_size = log_encoder.output_length;
+
+ ret = FLB_FILTER_MODIFIED;
+
+ flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder);
+ }
+ else {
+ flb_plg_error(ctx->ins,
+ "Log event encoder error : %d", ret);
+
+ ret = FLB_FILTER_NOTOUCH;
+ }
+
+ flb_log_event_decoder_destroy(&log_decoder);
+ flb_log_event_encoder_destroy(&log_encoder);
+
+ return ret;
+}
+
+static void flb_ecs_metadata_key_destroy(struct flb_ecs_metadata_key *metadata_key)
+{
+ if (metadata_key) {
+ if (metadata_key->key) {
+ flb_sds_destroy(metadata_key->key);
+ }
+ if (metadata_key->template) {
+ flb_sds_destroy(metadata_key->template);
+ }
+ if (metadata_key->ra) {
+ flb_ra_destroy(metadata_key->ra);
+ }
+ flb_free(metadata_key);
+ }
+}
+
+static void flb_filter_ecs_destroy(struct flb_filter_ecs *ctx)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_ecs_metadata_key *metadata_key;
+ struct flb_ecs_metadata_buffer *buf;
+
+ if (ctx) {
+ if (ctx->ecs_upstream) {
+ flb_upstream_destroy(ctx->ecs_upstream);
+ }
+ if (ctx->cluster_metadata.cluster_name) {
+ flb_sds_destroy(ctx->cluster_metadata.cluster_name);
+ }
+ if (ctx->cluster_metadata.container_instance_arn) {
+ flb_sds_destroy(ctx->cluster_metadata.container_instance_arn);
+ }
+ if (ctx->cluster_metadata.container_instance_id) {
+ flb_sds_destroy(ctx->cluster_metadata.container_instance_id);
+ }
+ if (ctx->cluster_metadata.ecs_agent_version) {
+ flb_sds_destroy(ctx->cluster_metadata.ecs_agent_version);
+ }
+ if (ctx->cluster_meta_buf.buf) {
+ flb_free(ctx->cluster_meta_buf.buf);
+ msgpack_unpacked_destroy(&ctx->cluster_meta_buf.unpacked);
+ }
+ mk_list_foreach_safe(head, tmp, &ctx->metadata_keys) {
+ metadata_key = mk_list_entry(head, struct flb_ecs_metadata_key, _head);
+ mk_list_del(&metadata_key->_head);
+ flb_ecs_metadata_key_destroy(metadata_key);
+ }
+ mk_list_foreach_safe(head, tmp, &ctx->metadata_buffers) {
+ buf = mk_list_entry(head, struct flb_ecs_metadata_buffer, _head);
+ mk_list_del(&buf->_head);
+ flb_hash_table_del(ctx->container_hash_table, buf->id);
+ flb_ecs_metadata_buffer_destroy(buf);
+ }
+ if (ctx->container_hash_table) {
+ flb_hash_table_destroy(ctx->container_hash_table);
+ }
+ if (ctx->failed_metadata_request_tags) {
+ flb_hash_table_destroy(ctx->failed_metadata_request_tags);
+ }
+ flb_free(ctx);
+ }
+}
+
+static int cb_ecs_exit(void *data, struct flb_config *config)
+{
+ struct flb_filter_ecs *ctx = data;
+
+ flb_filter_ecs_destroy(ctx);
+ return 0;
+}
+
+/* Configuration properties map */
+static struct flb_config_map config_map[] = {
+
+ {
+ FLB_CONFIG_MAP_STR, "add", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_FALSE, 0,
+ "Add a metadata key/value pair with the given key and given value from the given template. "
+ "Format is `Add KEY TEMPLATE`."
+ },
+
+ {
+ FLB_CONFIG_MAP_STR, "ecs_tag_prefix", "",
+ 0, FLB_TRUE, offsetof(struct flb_filter_ecs, ecs_tag_prefix),
+ "This filter must obtain the 12 character container short ID to query "
+ "for ECS Task metadata. The filter removes the prefx from the tag and then assumes "
+ "the next 12 characters are the short container ID. If the container short ID, "
+ "is not found in the tag, the filter can/must fallback to only attaching cluster metadata "
+ "(cluster name, container instance ID/ARN, and ECS Agent version)."
+ },
+
+ {
+ FLB_CONFIG_MAP_BOOL, "cluster_metadata_only", "false",
+ 0, FLB_TRUE, offsetof(struct flb_filter_ecs, cluster_metadata_only),
+ "Only attempt to attach the cluster related metadata to logs "
+ "(cluster name, container instance ID/ARN, and ECS Agent version). "
+ "With this option off, if this filter can not obtain the task metadata for a log, it will "
+ "output errors. Use this option if you have logs that are not part of an "
+ "ECS task (ex: Docker Daemon logs)."
+ },
+
+ {
+ FLB_CONFIG_MAP_TIME, "ecs_meta_cache_ttl", "3600",
+ 0, FLB_TRUE, offsetof(struct flb_filter_ecs, ecs_meta_cache_ttl),
+ "Configurable TTL for cached ECS Task Metadata. Default 3600s (1 hour)"
+ "For example, set this value to 600 or 600s or 10m and cache entries "
+ "which have been created more than 10 minutes will be evicted."
+ "Cache eviction is needed to purge task metadata for tasks that "
+ "have been stopped."
+ },
+
+ {
+ FLB_CONFIG_MAP_STR, "ecs_meta_host", FLB_ECS_FILTER_HOST,
+ 0, FLB_TRUE, offsetof(struct flb_filter_ecs, ecs_host),
+ "The host name at which the ECS Agent Introspection endpoint is reachable. "
+ "Defaults to 127.0.0.1"
+ },
+
+ {
+ FLB_CONFIG_MAP_INT, "ecs_meta_port", FLB_ECS_FILTER_PORT,
+ 0, FLB_TRUE, offsetof(struct flb_filter_ecs, ecs_port),
+ "The port at which the ECS Agent Introspection endpoint is reachable. "
+ "Defaults to 51678"
+ },
+
+ {
+ FLB_CONFIG_MAP_INT, "agent_endpoint_retries", FLB_ECS_FILTER_METADATA_RETRIES,
+ 0, FLB_TRUE, offsetof(struct flb_filter_ecs, agent_endpoint_retries),
+ "Number of retries for failed metadata requests to ECS Agent Introspection "
+ "endpoint. The most common cause of failed metadata requests is that the "
+ "container the metadata request was made for is not part of an ECS Task. "
+ "Check if you have non-task containers and docker dual logging enabled."
+ },
+
+ {0}
+};
+
+struct flb_filter_plugin filter_ecs_plugin = {
+ .name = "ecs",
+ .description = "Add AWS ECS Metadata",
+ .cb_init = cb_ecs_init,
+ .cb_filter = cb_ecs_filter,
+ .cb_exit = cb_ecs_exit,
+ .config_map = config_map,
+ .flags = 0
+};