diff options
Diffstat (limited to 'src/fluent-bit/plugins/filter_kubernetes/kube_meta.c')
-rw-r--r-- | src/fluent-bit/plugins/filter_kubernetes/kube_meta.c | 1650 |
1 files changed, 1650 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/filter_kubernetes/kube_meta.c b/src/fluent-bit/plugins/filter_kubernetes/kube_meta.c new file mode 100644 index 000000000..fbad2bb02 --- /dev/null +++ b/src/fluent-bit/plugins/filter_kubernetes/kube_meta.c @@ -0,0 +1,1650 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_filter_plugin.h> +#include <fluent-bit/flb_compat.h> +#include <fluent-bit/flb_hash_table.h> +#include <fluent-bit/flb_regex.h> +#include <fluent-bit/flb_io.h> +#include <fluent-bit/flb_upstream.h> +#include <fluent-bit/flb_http_client.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_env.h> +#include <fluent-bit/tls/flb_tls.h> + +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <msgpack.h> + +#include "kube_conf.h" +#include "kube_meta.h" +#include "kube_property.h" + +#define FLB_KUBE_META_CONTAINER_STATUSES_KEY "containerStatuses" +#define FLB_KUBE_META_CONTAINER_STATUSES_KEY_LEN \ + (sizeof(FLB_KUBE_META_CONTAINER_STATUSES_KEY) - 1) +#define FLB_KUBE_META_INIT_CONTAINER_STATUSES_KEY "initContainerStatuses" +#define FLB_KUBE_META_INIT_CONTAINER_STATUSES_KEY_LEN \ + (sizeof(FLB_KUBE_META_INIT_CONTAINER_STATUSES_KEY) - 1) +#define FLB_KUBE_TOKEN_BUF_SIZE 8192 /* 8KB */ + +static int file_to_buffer(const char *path, + char **out_buf, size_t *out_size) +{ + int ret; + char *buf; + ssize_t bytes; + FILE *fp; + struct stat st; + + if (!(fp = fopen(path, "r"))) { + return -1; + } + + ret = stat(path, &st); + if (ret == -1) { + flb_errno(); + fclose(fp); + return -1; + } + + buf = flb_calloc(1, (st.st_size + 1)); + if (!buf) { + flb_errno(); + fclose(fp); + return -1; + } + + bytes = fread(buf, st.st_size, 1, fp); + if (bytes < 1) { + flb_free(buf); + fclose(fp); + return -1; + } + + fclose(fp); + + *out_buf = buf; + *out_size = st.st_size; + + return 0; +} + +#ifdef FLB_HAVE_KUBE_TOKEN_COMMAND +/* Run command to get Kubernetes authorization token */ +static int get_token_with_command(const char *command, + char **out_buf, size_t *out_size) +{ + FILE *fp; + char buf[FLB_KUBE_TOKEN_BUF_SIZE]; + char *temp; + char *res; + size_t size = 0; + size_t len = 0; + + fp = popen(command, "r"); + if (fp == NULL) { + return -1; + } + + res = flb_calloc(1, FLB_KUBE_TOKEN_BUF_SIZE); + if (!res) { + flb_errno(); + pclose(fp); + return -1; + } + + while (fgets(buf, sizeof(buf), fp) != NULL) { + len = strlen(buf); + if (len >= FLB_KUBE_TOKEN_BUF_SIZE - 1) { + temp = flb_realloc(res, (FLB_KUBE_TOKEN_BUF_SIZE + size) * 2); + if (temp == NULL) { + flb_errno(); + flb_free(res); + pclose(fp); + return -1; + } + res = temp; + } + strcpy(res + size, buf); + size += len; + } + + if (strlen(res) < 1) { + flb_free(res); + pclose(fp); + return -1; + } + + pclose(fp); + + *out_buf = res; + *out_size = strlen(res); + + return 0; +} +#endif + +/* Set K8s Authorization Token and get HTTP Auth Header */ +static int get_http_auth_header(struct flb_kube *ctx) +{ + int ret; + char *temp; + char *tk = NULL; + size_t tk_size = 0; + + if (ctx->kube_token_command != NULL) { +#ifdef FLB_HAVE_KUBE_TOKEN_COMMAND + ret = get_token_with_command(ctx->kube_token_command, &tk, &tk_size); +#else + ret = -1; +#endif + if (ret == -1) { + flb_plg_warn(ctx->ins, "failed to run command %s", ctx->kube_token_command); + } + } + else { + ret = file_to_buffer(ctx->token_file, &tk, &tk_size); + if (ret == -1) { + flb_plg_warn(ctx->ins, "cannot open %s", FLB_KUBE_TOKEN); + } + flb_plg_info(ctx->ins, " token updated"); + } + ctx->kube_token_create = time(NULL); + + /* Token */ + if (ctx->token != NULL) { + flb_free(ctx->token); + } + ctx->token = tk; + ctx->token_len = tk_size; + + /* HTTP Auth Header */ + if (ctx->auth == NULL) { + ctx->auth = flb_malloc(tk_size + 32); + } + else if (ctx->auth_len < tk_size + 32) { + temp = flb_realloc(ctx->auth, tk_size + 32); + if (temp == NULL) { + flb_free(ctx->auth); + ctx->auth = NULL; + return -1; + } + ctx->auth = temp; + } + + if (!ctx->auth) { + return -1; + } + ctx->auth_len = snprintf(ctx->auth, tk_size + 32, + "Bearer %s", + tk); + + return 0; +} + +/* Refresh HTTP Auth Header if K8s Authorization Token is expired */ +static int refresh_token_if_needed(struct flb_kube *ctx) +{ + int expired = 0; + int ret; + + if (ctx->kube_token_create > 0) { + if (time(NULL) > ctx->kube_token_create + ctx->kube_token_ttl) { + expired = FLB_TRUE; + } + } + + if (expired || ctx->kube_token_create == 0) { + ret = get_http_auth_header(ctx); + if (ret == -1) { + flb_plg_warn(ctx->ins, "failed to set http auth header"); + return -1; + } + } + + return 0; +} + +static void expose_k8s_meta(struct flb_kube *ctx) +{ + char *tmp; + struct flb_env *env; + + env = ctx->config->env; + + flb_env_set(env, "k8s", "enabled"); + flb_env_set(env, "k8s.namespace", ctx->namespace); + flb_env_set(env, "k8s.pod_name", ctx->podname); + + tmp = (char *) flb_env_get(env, "NODE_NAME"); + if (tmp) { + flb_env_set(env, "k8s.node_name", tmp); + } +} + +/* Load local information from a POD context */ +static int get_local_pod_info(struct flb_kube *ctx) +{ + int ret; + char *ns; + size_t ns_size; + char *hostname; + + /* Get the namespace name */ + ret = file_to_buffer(FLB_KUBE_NAMESPACE, &ns, &ns_size); + if (ret == -1) { + /* + * If it fails, it's just informational, as likely the caller + * wanted to connect using the Proxy instead from inside a POD. + */ + flb_plg_warn(ctx->ins, "cannot open %s", FLB_KUBE_NAMESPACE); + return FLB_FALSE; + } + + /* Namespace */ + ctx->namespace = ns; + ctx->namespace_len = ns_size; + + /* POD Name */ + hostname = getenv("HOSTNAME"); + if (hostname) { + ctx->podname = flb_strdup(hostname); + ctx->podname_len = strlen(ctx->podname); + } + else { + char tmp[256]; + gethostname(tmp, 256); + ctx->podname = flb_strdup(tmp); + ctx->podname_len = strlen(ctx->podname); + } + + /* If a namespace was recognized, a token is mandatory */ + /* Use the token to get HTTP Auth Header*/ + ret = get_http_auth_header(ctx); + if (ret == -1) { + flb_plg_warn(ctx->ins, "failed to set http auth header"); + return FLB_FALSE; + } + + expose_k8s_meta(ctx); + return FLB_TRUE; +} + +/* + * If a file exists called namespace_podname.meta, load it and use it. + * If not, fall back to API. This is primarily for diagnostic purposes, + * e.g. debugging new parsers. + */ +static int get_meta_file_info(struct flb_kube *ctx, const char *namespace, + const char *podname, char **buffer, size_t *size, + int *root_type) { + + int fd = -1; + char *payload = NULL; + size_t payload_size = 0; + struct stat sb; + int packed = -1; + int ret; + char uri[1024]; + + if (ctx->meta_preload_cache_dir && namespace && podname) { + + ret = snprintf(uri, sizeof(uri) - 1, "%s/%s_%s.meta", + ctx->meta_preload_cache_dir, namespace, podname); + if (ret > 0) { + fd = open(uri, O_RDONLY, 0); + if (fd != -1) { + if (fstat(fd, &sb) == 0) { + payload = flb_malloc(sb.st_size); + if (!payload) { + flb_errno(); + } + else { + ret = read(fd, payload, sb.st_size); + if (ret == sb.st_size) { + payload_size = ret; + } + } + } + close(fd); + } + } + + if (payload_size) { + packed = flb_pack_json(payload, payload_size, + buffer, size, root_type, + NULL); + } + + if (payload) { + flb_free(payload); + } + } + + return packed; +} + +/* Gather metadata from HTTP Request, + * this could send out HTTP Request either to KUBE Server API or Kubelet + */ +static int get_meta_info_from_request(struct flb_kube *ctx, + const char *namespace, + const char *podname, + char **buffer, size_t *size, + int *root_type, + char* uri) +{ + struct flb_http_client *c; + struct flb_connection *u_conn; + int ret; + size_t b_sent; + int packed; + + if (!ctx->upstream) { + return -1; + } + + u_conn = flb_upstream_conn_get(ctx->upstream); + + if (!u_conn) { + flb_plg_error(ctx->ins, "kubelet upstream connection error"); + return -1; + } + + ret = refresh_token_if_needed(ctx); + if (ret == -1) { + flb_plg_error(ctx->ins, "failed to refresh token"); + flb_upstream_conn_release(u_conn); + return -1; + } + + /* Compose HTTP Client request*/ + c = flb_http_client(u_conn, FLB_HTTP_GET, + uri, + NULL, 0, NULL, 0, NULL, 0); + flb_http_buffer_size(c, ctx->buffer_size); + + flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); + flb_http_add_header(c, "Connection", 10, "close", 5); + if (ctx->auth_len > 0) { + flb_http_add_header(c, "Authorization", 13, ctx->auth, ctx->auth_len); + } + + ret = flb_http_do(c, &b_sent); + flb_plg_debug(ctx->ins, "Request (ns=%s, pod=%s) http_do=%i, " + "HTTP Status: %i", + namespace, podname, ret, c->resp.status); + + if (ret != 0 || c->resp.status != 200) { + if (c->resp.payload_size > 0) { + flb_plg_debug(ctx->ins, "HTTP response\n%s", + c->resp.payload); + } + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + return -1; + } + + packed = flb_pack_json(c->resp.payload, c->resp.payload_size, + buffer, size, root_type, NULL); + + /* release resources */ + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + + return packed; + +} + +/* Gather pods list information from Kubelet */ +static int get_pods_from_kubelet(struct flb_kube *ctx, + const char *namespace, const char *podname, + char **out_buf, size_t *out_size) +{ + int ret; + int packed = -1; + int root_type; + char uri[1024]; + char *buf; + size_t size; + + *out_buf = NULL; + *out_size = 0; + + /* used for unit test purposes*/ + packed = get_meta_file_info(ctx, namespace, podname, &buf, &size, + &root_type); + + if (packed == -1) { + + ret = snprintf(uri, sizeof(uri) - 1, FLB_KUBELET_PODS); + if (ret == -1) { + return -1; + } + flb_plg_debug(ctx->ins, + "Send out request to Kubelet for pods information."); + packed = get_meta_info_from_request(ctx, namespace, podname, + &buf, &size, &root_type, uri); + } + + /* validate pack */ + if (packed == -1) { + return -1; + } + + *out_buf = buf; + *out_size = size; + + return 0; +} + +/* Gather metadata from API Server */ +static int get_api_server_info(struct flb_kube *ctx, + const char *namespace, const char *podname, + char **out_buf, size_t *out_size) +{ + int ret; + int packed = -1; + int root_type; + char uri[1024]; + char *buf; + size_t size; + + *out_buf = NULL; + *out_size = 0; + + /* used for unit test purposes*/ + packed = get_meta_file_info(ctx, namespace, podname, + &buf, &size, &root_type); + + if (packed == -1) { + + ret = snprintf(uri, sizeof(uri) - 1, FLB_KUBE_API_FMT, namespace, + podname); + + if (ret == -1) { + return -1; + } + flb_plg_debug(ctx->ins, + "Send out request to API Server for pods information"); + packed = get_meta_info_from_request(ctx, namespace, podname, + &buf, &size, &root_type, uri); + } + + /* validate pack */ + if (packed == -1) { + return -1; + } + + *out_buf = buf; + *out_size = size; + + return 0; +} + +static void cb_results(const char *name, const char *value, + size_t vlen, void *data) +{ + struct flb_kube_meta *meta = data; + + if (vlen == 0) { + return; + } + + if (meta->podname == NULL && strcmp(name, "pod_name") == 0) { + meta->podname = flb_strndup(value, vlen); + meta->podname_len = vlen; + meta->fields++; + } + else if (meta->namespace == NULL && + strcmp(name, "namespace_name") == 0) { + meta->namespace = flb_strndup(value, vlen); + meta->namespace_len = vlen; + meta->fields++; + } + else if (meta->container_name == NULL && + strcmp(name, "container_name") == 0) { + meta->container_name = flb_strndup(value, vlen); + meta->container_name_len = vlen; + meta->fields++; + } + else if (meta->docker_id == NULL && + strcmp(name, "docker_id") == 0) { + meta->docker_id = flb_strndup(value, vlen); + meta->docker_id_len = vlen; + meta->fields++; + } + else if (meta->container_hash == NULL && + strcmp(name, "container_hash") == 0) { + meta->container_hash = flb_strndup(value, vlen); + meta->container_hash_len = vlen; + meta->fields++; + } + + return; +} + +static int extract_hash(const char * im, int sz, const char ** out, int * outsz) +{ + char * colon = NULL; + char * slash = NULL; + + *out = NULL; + *outsz = 0; + + if (sz <= 1) { + return -1; + } + + colon = memchr(im, ':', sz); + + if (colon == NULL) { + return -1; + } else { + slash = colon; + while ((im + sz - slash + 1) > 0 && *(slash + 1) == '/') { + slash++; + } + if (slash == colon) { + slash = NULL; + } + } + + if (slash == NULL && (im + sz - colon) > 0) { + *out = im; + } + + if (slash != NULL && (colon - slash) < 0 && (im + sz - slash) > 0) { + *out = slash + 1; + } + + if (*out) { + *outsz = im + sz - *out; + return 0; + } + return -1; +} + +/* + * As per Kubernetes Pod spec, + * https://kubernetes.io/docs/concepts/workloads/pods/pod/, we look + * for status.{initContainerStatuses, containerStatuses}.{containerID, imageID, image} + * where status.{initContainerStatuses, containerStatus}.name == our container + * name + * status: + * ... + * containerStatuses: + * - containerID: XXX + * image: YYY + * imageID: ZZZ + * ... + * name: nginx-ingress-microk8s +*/ +static void extract_container_hash(struct flb_kube_meta *meta, + msgpack_object status) +{ + int i; + msgpack_object k, v; + int docker_id_len = 0; + int container_hash_len = 0; + int container_image_len = 0; + const char *container_hash; + const char *docker_id; + const char *container_image; + const char *tmp; + int tmp_len = 0; + int name_found = FLB_FALSE; + /* Process status/containerStatus map for docker_id, container_hash, container_image */ + for (i = 0; + (meta->docker_id_len == 0 || meta->container_hash_len == 0 || + meta->container_image_len == 0) && + i < status.via.map.size; i++) { + k = status.via.map.ptr[i].key; + if ((k.via.str.size == FLB_KUBE_META_CONTAINER_STATUSES_KEY_LEN && + strncmp(k.via.str.ptr, + FLB_KUBE_META_CONTAINER_STATUSES_KEY, + FLB_KUBE_META_CONTAINER_STATUSES_KEY_LEN) == 0) || + (k.via.str.size == FLB_KUBE_META_INIT_CONTAINER_STATUSES_KEY_LEN && + strncmp(k.via.str.ptr, + FLB_KUBE_META_INIT_CONTAINER_STATUSES_KEY, + FLB_KUBE_META_INIT_CONTAINER_STATUSES_KEY_LEN) == 0)) { + int j; + v = status.via.map.ptr[i].val; + for (j = 0; + (meta->docker_id_len == 0 || + meta->container_hash_len == 0 || + meta->container_image_len == 0) && j < v.via.array.size; + j++) { + int l; + msgpack_object k1, k2; + msgpack_object_str v2; + k1 = v.via.array.ptr[j]; + for (l = 0; + (meta->docker_id_len == 0 || + meta->container_hash_len == 0 || + meta->container_image_len == 0) && + l < k1.via.map.size; l++) { + k2 = k1.via.map.ptr[l].key; + v2 = k1.via.map.ptr[l].val.via.str; + if (k2.via.str.size == sizeof("name") - 1 && + !strncmp(k2.via.str.ptr, "name", k2.via.str.size)) { + if (v2.size == meta->container_name_len && + !strncmp(v2.ptr, + meta->container_name, + meta->container_name_len)) { + name_found = FLB_TRUE; + } + else { + break; + } + } + else if (k2.via.str.size == sizeof("containerID") - 1 && + !strncmp(k2.via.str.ptr, + "containerID", + k2.via.str.size)) { + if (extract_hash(v2.ptr, v2.size, &tmp, &tmp_len) == 0) { + docker_id = tmp; + docker_id_len = tmp_len; + } + } + else if (k2.via.str.size == sizeof("imageID") - 1 && + !strncmp(k2.via.str.ptr, + "imageID", + k2.via.str.size)) { + if (extract_hash(v2.ptr, v2.size, &tmp, &tmp_len) == 0) { + container_hash = tmp; + container_hash_len = tmp_len; + } + } + else if (k2.via.str.size == sizeof("image") - 1 && + !strncmp(k2.via.str.ptr, + "image", + k2.via.str.size)) { + container_image = v2.ptr; + container_image_len = v2.size; + } + } + if (name_found) { + if (container_hash_len && !meta->container_hash_len) { + meta->container_hash_len = container_hash_len; + meta->container_hash = flb_strndup(container_hash, + container_hash_len); + meta->fields++; + } + if (docker_id_len && !meta->docker_id_len) { + meta->docker_id_len = docker_id_len; + meta->docker_id = flb_strndup(docker_id, docker_id_len); + meta->fields++; + } + if (container_image_len && !meta->container_image_len) { + meta->container_image_len = container_image_len; + meta->container_image = flb_strndup(container_image, container_image_len); + meta->fields++; + } + return; + } + } + } + } +} + +static int search_podname_and_namespace(struct flb_kube_meta *meta, + struct flb_kube *ctx, + msgpack_object map) +{ + int i; + int podname_found = FLB_FALSE; + int namespace_found = FLB_FALSE; + int target_podname_found = FLB_FALSE; + int target_namespace_found = FLB_FALSE; + + msgpack_object k; + msgpack_object v; + + for (i = 0; (!podname_found || !namespace_found) && + i < map.via.map.size; i++) { + + k = map.via.map.ptr[i].key; + v = map.via.map.ptr[i].val; + if (k.via.str.size == 4 && !strncmp(k.via.str.ptr, "name", 4)) { + + podname_found = FLB_TRUE; + if (!strncmp(v.via.str.ptr, meta->podname, meta->podname_len)) { + target_podname_found = FLB_TRUE; + } + + } + else if (k.via.str.size == 9 && !strncmp(k.via.str.ptr, + "namespace", 9)) { + + namespace_found = FLB_TRUE; + if (!strncmp((char *)v.via.str.ptr, + meta->namespace, + meta->namespace_len)) { + target_namespace_found = FLB_TRUE; + } + } + } + + if (!target_podname_found || !target_namespace_found) { + return -1; + } + + return 0; +} + +static int search_metadata_in_items(struct flb_kube_meta *meta, + struct flb_kube *ctx, + msgpack_object items_array, + msgpack_object *target_item_map) +{ + int i, j; + + int target_found = FLB_FALSE; + msgpack_object item_info_map; + msgpack_object k; + msgpack_object v; + + for (i = 0; !target_found && i < items_array.via.array.size; i++) { + + item_info_map = items_array.via.array.ptr[i]; + if (item_info_map.type != MSGPACK_OBJECT_MAP) { + continue; + } + + for (j = 0; j < item_info_map.via.map.size; j++) { + + k = item_info_map.via.map.ptr[j].key; + if (k.via.str.size == 8 && + !strncmp(k.via.str.ptr, "metadata", 8)) { + + v = item_info_map.via.map.ptr[j].val; + if (search_podname_and_namespace(meta, ctx, v) == 0) { + target_found = FLB_TRUE; + *target_item_map = item_info_map; + flb_plg_debug(ctx->ins, + "kubelet find pod: %s and ns: %s match", + meta->podname, meta->namespace); + } + break; + } + } + } + + if (!target_found) { + flb_plg_debug(ctx->ins, + "kubelet didn't find pod: %s, ns: %s match", + meta->podname, meta->namespace); + return -1; + } + return 0; +} + +/* At this point map points to the ROOT map, eg: + * + * { + * "kind": "PodList", + * "apiVersion": "v1", + * "metadata": {}, + * "items": [{ + * "metadata": { + * "name": "fluent-bit-rz47v", + * "generateName": "fluent-bit-", + * "namespace": "kube-system", + * "selfLink": "/api/v1/namespaces/kube-system/pods/fluent-bit-rz47v", + * .... + * } + * }] + * + */ +static int search_item_in_items(struct flb_kube_meta *meta, + struct flb_kube *ctx, + msgpack_object api_map, + msgpack_object *target_item_map) +{ + + int i; + int items_array_found = FLB_FALSE; + + msgpack_object k; + msgpack_object v; + msgpack_object items_array; + + for (i = 0; !items_array_found && i < api_map.via.map.size; i++) { + + k = api_map.via.map.ptr[i].key; + if (k.via.str.size == 5 && !strncmp(k.via.str.ptr, "items", 5)) { + + v = api_map.via.map.ptr[i].val; + if (v.type == MSGPACK_OBJECT_ARRAY) { + items_array = v; + items_array_found = FLB_TRUE; + } + } + } + + int ret = search_metadata_in_items(meta, ctx, items_array, + target_item_map); + + return ret; +} + + +static int merge_meta_from_tag(struct flb_kube *ctx, struct flb_kube_meta *meta, + char **out_buf, size_t *out_size) +{ + msgpack_sbuffer mp_sbuf; + msgpack_packer mp_pck; + struct flb_mp_map_header mh; + + /* Initialize output msgpack buffer */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + flb_mp_map_header_init(&mh, &mp_pck); + + if (meta->podname != NULL) { + flb_mp_map_header_append(&mh); + msgpack_pack_str(&mp_pck, 8); + msgpack_pack_str_body(&mp_pck, "pod_name", 8); + msgpack_pack_str(&mp_pck, meta->podname_len); + msgpack_pack_str_body(&mp_pck, meta->podname, meta->podname_len); + } + + if (meta->namespace != NULL) { + flb_mp_map_header_append(&mh); + msgpack_pack_str(&mp_pck, 14); + msgpack_pack_str_body(&mp_pck, "namespace_name", 14); + msgpack_pack_str(&mp_pck, meta->namespace_len); + msgpack_pack_str_body(&mp_pck, meta->namespace, meta->namespace_len); + } + + if (meta->container_name != NULL) { + flb_mp_map_header_append(&mh); + msgpack_pack_str(&mp_pck, 14); + msgpack_pack_str_body(&mp_pck, "container_name", 14); + msgpack_pack_str(&mp_pck, meta->container_name_len); + msgpack_pack_str_body(&mp_pck, meta->container_name, + meta->container_name_len); + } + if (meta->docker_id != NULL) { + flb_mp_map_header_append(&mh); + msgpack_pack_str(&mp_pck, 9); + msgpack_pack_str_body(&mp_pck, "docker_id", 9); + msgpack_pack_str(&mp_pck, meta->docker_id_len); + msgpack_pack_str_body(&mp_pck, meta->docker_id, + meta->docker_id_len); + } + + flb_mp_map_header_end(&mh); + + /* Set outgoing msgpack buffer */ + *out_buf = mp_sbuf.data; + *out_size = mp_sbuf.size; + + return 0; +} + +static int merge_meta(struct flb_kube_meta *meta, struct flb_kube *ctx, + const char *api_buf, size_t api_size, + char **out_buf, size_t *out_size) +{ + int i; + int ret; + int map_size = 0; + int meta_found = FLB_FALSE; + int spec_found = FLB_FALSE; + int status_found = FLB_FALSE; + int target_found = FLB_TRUE; + int have_uid = -1; + int have_labels = -1; + int have_annotations = -1; + int have_nodename = -1; + size_t off = 0; + msgpack_sbuffer mp_sbuf; + msgpack_packer mp_pck; + + msgpack_unpacked api_result; + msgpack_unpacked meta_result; + msgpack_object item_result; + msgpack_object k; + msgpack_object v; + msgpack_object meta_val; + msgpack_object spec_val; + msgpack_object status_val; + msgpack_object api_map; + msgpack_object ann_map; + struct flb_kube_props props = {0}; + + /* + * - reg_buf: is a msgpack Map containing meta captured using Regex + * + * - api_buf: metadata associated to namespace and POD Name coming from + * the API server. + * + * When merging data we aim to add the following keys from the API server: + * + * - pod_id + * - labels + * - annotations + */ + + /* Initialize output msgpack buffer */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + /* Iterate API server msgpack and lookup specific fields */ + if (api_buf != NULL) { + msgpack_unpacked_init(&api_result); + ret = msgpack_unpack_next(&api_result, api_buf, api_size, &off); + if (ret == MSGPACK_UNPACK_SUCCESS) { + + if (ctx->use_kubelet) { + ret = search_item_in_items(meta, ctx, api_result.data, &item_result); + if (ret == -1) { + target_found = FLB_FALSE; + } + api_map = target_found ? item_result : api_result.data; + } else { + api_map = api_result.data; + } + + /* At this point map points to the ROOT map, eg: + * + * { + * "kind": "Pod", + * "apiVersion": "v1", + * "metadata": { + * "name": "fluent-bit-rz47v", + * "generateName": "fluent-bit-", + * "namespace": "kube-system", + * "selfLink": "/api/v1/namespaces/kube-system/pods/fluent-bit-rz47v", + * .... + * } + * + * We are interested into the 'metadata' map value. + * We are also interested in the spec.nodeName. + * We are also interested in the status.containerStatuses. + */ + for (i = 0; target_found && !(meta_found && spec_found && status_found) && + i < api_map.via.map.size; i++) { + k = api_map.via.map.ptr[i].key; + if (k.via.str.size == 8 && !strncmp(k.via.str.ptr, "metadata", 8)) { + meta_val = api_map.via.map.ptr[i].val; + if (meta_val.type == MSGPACK_OBJECT_MAP) { + meta_found = FLB_TRUE; + } + } + else if (k.via.str.size == 4 && !strncmp(k.via.str.ptr, "spec", 4)) { + spec_val = api_map.via.map.ptr[i].val; + spec_found = FLB_TRUE; + } + else if (k.via.str.size == 6 && !strncmp(k.via.str.ptr, "status", 6)) { + status_val = api_map.via.map.ptr[i].val; + status_found = FLB_TRUE; + } + } + + if (meta_found == FLB_TRUE) { + /* Process metadata map value */ + msgpack_unpacked_init(&meta_result); + for (i = 0; i < meta_val.via.map.size; i++) { + k = meta_val.via.map.ptr[i].key; + + char *ptr = (char *) k.via.str.ptr; + size_t size = k.via.str.size; + + if (size == 3 && strncmp(ptr, "uid", 3) == 0) { + have_uid = i; + map_size++; + } + else if (size == 6 && strncmp(ptr, "labels", 6) == 0) { + have_labels = i; + if (ctx->labels == FLB_TRUE) { + map_size++; + } + } + + else if (size == 11 && strncmp(ptr, "annotations", 11) == 0) { + have_annotations = i; + if (ctx->annotations == FLB_TRUE) { + map_size++; + } + } + + if (have_uid >= 0 && have_labels >= 0 && have_annotations >= 0) { + break; + } + } + } + + /* Process spec map value for nodeName */ + if (spec_found == FLB_TRUE) { + for (i = 0; i < spec_val.via.map.size; i++) { + k = spec_val.via.map.ptr[i].key; + if (k.via.str.size == 8 && + strncmp(k.via.str.ptr, "nodeName", 8) == 0) { + have_nodename = i; + map_size++; + break; + } + } + } + + if ((!meta->container_hash || !meta->docker_id || !meta->container_image) && status_found) { + extract_container_hash(meta, status_val); + } + } + } + + /* Set map size: current + pod_id, labels and annotations */ + map_size += meta->fields; + + /* Append Regex fields */ + msgpack_pack_map(&mp_pck, map_size); + if (meta->podname != NULL) { + msgpack_pack_str(&mp_pck, 8); + msgpack_pack_str_body(&mp_pck, "pod_name", 8); + msgpack_pack_str(&mp_pck, meta->podname_len); + msgpack_pack_str_body(&mp_pck, meta->podname, meta->podname_len); + } + if (meta->namespace != NULL) { + msgpack_pack_str(&mp_pck, 14); + msgpack_pack_str_body(&mp_pck, "namespace_name", 14); + msgpack_pack_str(&mp_pck, meta->namespace_len); + msgpack_pack_str_body(&mp_pck, meta->namespace, meta->namespace_len); + } + + /* Append API Server content */ + if (have_uid >= 0) { + v = meta_val.via.map.ptr[have_uid].val; + + msgpack_pack_str(&mp_pck, 6); + msgpack_pack_str_body(&mp_pck, "pod_id", 6); + msgpack_pack_object(&mp_pck, v); + } + + if (have_labels >= 0 && ctx->labels == FLB_TRUE) { + k = meta_val.via.map.ptr[have_labels].key; + v = meta_val.via.map.ptr[have_labels].val; + + msgpack_pack_object(&mp_pck, k); + msgpack_pack_object(&mp_pck, v); + } + + if (have_annotations >= 0 && ctx->annotations == FLB_TRUE) { + k = meta_val.via.map.ptr[have_annotations].key; + v = meta_val.via.map.ptr[have_annotations].val; + + msgpack_pack_object(&mp_pck, k); + msgpack_pack_object(&mp_pck, v); + } + + if (have_nodename >= 0) { + v = spec_val.via.map.ptr[have_nodename].val; + + msgpack_pack_str(&mp_pck, 4); + msgpack_pack_str_body(&mp_pck, "host", 4); + msgpack_pack_object(&mp_pck, v); + } + + if (meta->container_name != NULL) { + msgpack_pack_str(&mp_pck, 14); + msgpack_pack_str_body(&mp_pck, "container_name", 14); + msgpack_pack_str(&mp_pck, meta->container_name_len); + msgpack_pack_str_body(&mp_pck, meta->container_name, + meta->container_name_len); + } + if (meta->docker_id != NULL) { + msgpack_pack_str(&mp_pck, 9); + msgpack_pack_str_body(&mp_pck, "docker_id", 9); + msgpack_pack_str(&mp_pck, meta->docker_id_len); + msgpack_pack_str_body(&mp_pck, meta->docker_id, + meta->docker_id_len); + } + if (meta->container_hash != NULL) { + msgpack_pack_str(&mp_pck, 14); + msgpack_pack_str_body(&mp_pck, "container_hash", 14); + msgpack_pack_str(&mp_pck, meta->container_hash_len); + msgpack_pack_str_body(&mp_pck, meta->container_hash, + meta->container_hash_len); + } + if (meta->container_image != NULL) { + msgpack_pack_str(&mp_pck, 15); + msgpack_pack_str_body(&mp_pck, "container_image", 15); + msgpack_pack_str(&mp_pck, meta->container_image_len); + msgpack_pack_str_body(&mp_pck, meta->container_image, + meta->container_image_len); + } + + /* Process configuration suggested through Annotations */ + if (have_annotations >= 0) { + ann_map = meta_val.via.map.ptr[have_annotations].val; + + /* Iterate annotations keys and look for 'logging' key */ + if (ann_map.type == MSGPACK_OBJECT_MAP) { + for (i = 0; i < ann_map.via.map.size; i++) { + k = ann_map.via.map.ptr[i].key; + v = ann_map.via.map.ptr[i].val; + + if (k.via.str.size > 13 && /* >= 'fluentbit.io/' */ + strncmp(k.via.str.ptr, "fluentbit.io/", 13) == 0) { + + /* Validate and set the property */ + flb_kube_prop_set(ctx, meta, + k.via.str.ptr + 13, + k.via.str.size - 13, + v.via.str.ptr, + v.via.str.size, + &props); + } + } + } + + /* Pack Annotation properties */ + void *prop_buf; + size_t prop_size; + flb_kube_prop_pack(&props, &prop_buf, &prop_size); + msgpack_sbuffer_write(&mp_sbuf, prop_buf, prop_size); + flb_kube_prop_destroy(&props); + flb_free(prop_buf); + } + + if (api_buf != NULL) { + msgpack_unpacked_destroy(&api_result); + if (meta_found == FLB_TRUE) { + msgpack_unpacked_destroy(&meta_result); + } + } + + /* Set outgoing msgpack buffer */ + *out_buf = mp_sbuf.data; + *out_size = mp_sbuf.size; + + return 0; +} + +static inline int extract_meta(struct flb_kube *ctx, + const char *tag, int tag_len, + const char *data, size_t data_size, + struct flb_kube_meta *meta) +{ + int i; + size_t off = 0; + ssize_t n; + int kube_tag_len; + const char *kube_tag_str; + const char *container = NULL; + int container_found = FLB_FALSE; + int container_length = 0; + struct flb_regex_search result; + msgpack_unpacked mp_result; + msgpack_object root; + msgpack_object map; + msgpack_object key; + msgpack_object val; + + /* Reset meta context */ + memset(meta, '\0', sizeof(struct flb_kube_meta)); + + /* Journald */ + if (ctx->use_journal == FLB_TRUE) { + off = 0; + msgpack_unpacked_init(&mp_result); + while (msgpack_unpack_next(&mp_result, data, data_size, &off) == MSGPACK_UNPACK_SUCCESS) { + root = mp_result.data; + if (root.type != MSGPACK_OBJECT_ARRAY) { + continue; + } + + /* Lookup the CONTAINER_NAME key/value */ + map = root.via.array.ptr[1]; + for (i = 0; i < map.via.map.size; i++) { + key = map.via.map.ptr[i].key; + if (key.via.str.size != 14) { + continue; + } + + if (strncmp(key.via.str.ptr, "CONTAINER_NAME", 14) == 0) { + val = map.via.map.ptr[i].val; + container = val.via.str.ptr; + container_length = val.via.str.size; + container_found = FLB_TRUE; + break; + } + } + + if (container_found == FLB_TRUE) { + break; + } + } + + if (container_found == FLB_FALSE) { + msgpack_unpacked_destroy(&mp_result); + return -1; + } + n = flb_regex_do(ctx->regex, + container, container_length, + &result); + msgpack_unpacked_destroy(&mp_result); + } + else { + /* + * Lookup metadata using regular expression. In order to let the + * regex work we need to know before hand what's the Tag prefix + * set and make sure the adjustment can be done. + */ + kube_tag_len = flb_sds_len(ctx->kube_tag_prefix); + if (kube_tag_len + 1 >= tag_len) { + flb_plg_error(ctx->ins, "incoming record tag (%s) is shorter " + "than kube_tag_prefix value (%s), skip filter", + tag, ctx->kube_tag_prefix); + return -1; + } + kube_tag_str = tag + kube_tag_len; + kube_tag_len = tag_len - kube_tag_len; + + n = flb_regex_do(ctx->regex, kube_tag_str, kube_tag_len, &result); + } + + if (n <= 0) { + flb_plg_warn(ctx->ins, "invalid pattern for given tag %s", tag); + return -1; + } + + /* Parse the regex results */ + flb_regex_parse(ctx->regex, &result, cb_results, meta); + + /* Compose API server cache key */ + if (meta->podname && meta->namespace) { + /* calculate estimated buffer size */ + n = meta->namespace_len + 1 + meta->podname_len + 1; + if (meta->container_name) { + n += meta->container_name_len + 1; + } + if (ctx->cache_use_docker_id && meta->docker_id) { + n += meta->docker_id_len + 1; + } + meta->cache_key = flb_malloc(n); + if (!meta->cache_key) { + flb_errno(); + return -1; + } + + /* Copy namespace */ + memcpy(meta->cache_key, meta->namespace, meta->namespace_len); + off = meta->namespace_len; + + /* Separator */ + meta->cache_key[off++] = ':'; + + /* Copy podname */ + memcpy(meta->cache_key + off, meta->podname, meta->podname_len); + off += meta->podname_len; + + if (meta->container_name) { + /* Separator */ + meta->cache_key[off++] = ':'; + memcpy(meta->cache_key + off, meta->container_name, meta->container_name_len); + off += meta->container_name_len; + } + + if (ctx->cache_use_docker_id && meta->docker_id) { + /* Separator */ + meta->cache_key[off++] = ':'; + memcpy(meta->cache_key + off, meta->docker_id, meta->docker_id_len); + off += meta->docker_id_len; + } + + meta->cache_key[off] = '\0'; + meta->cache_key_len = off; + } + else { + meta->cache_key = NULL; + meta->cache_key_len = 0; + } + + return 0; +} + +/* + * Given a fixed meta data (namespace and podname), get API server information + * and merge buffers. + */ +static int get_and_merge_meta(struct flb_kube *ctx, struct flb_kube_meta *meta, + char **out_buf, size_t *out_size) +{ + int ret; + char *api_buf; + size_t api_size; + + if (ctx->use_tag_for_meta) { + ret = merge_meta_from_tag(ctx, meta, out_buf, out_size); + return ret; + } + else if (ctx->use_kubelet) { + ret = get_pods_from_kubelet(ctx, meta->namespace, meta->podname, + &api_buf, &api_size); + } + else { + ret = get_api_server_info(ctx, meta->namespace, meta->podname, + &api_buf, &api_size); + } + if (ret == -1) { + return -1; + } + + ret = merge_meta(meta, ctx, + api_buf, api_size, + out_buf, out_size); + + if (api_buf != NULL) { + flb_free(api_buf); + } + + return ret; +} + +/* + * Work around kubernetes/kubernetes/issues/78479 by waiting + * for DNS to start up. + */ +static int wait_for_dns(struct flb_kube *ctx) +{ + int i; + struct addrinfo *res; + struct addrinfo hints; + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + + for (i = 0; i < ctx->dns_retries; i++) { + if (getaddrinfo(ctx->api_host, NULL, &hints, &res) == 0) { + freeaddrinfo(res); + return 0; + } + flb_plg_info(ctx->ins, "host: %s Wait %i secs until DNS starts up (%i/%i)", + ctx->api_host, ctx->dns_wait_time, i + 1, ctx->dns_retries); + sleep(ctx->dns_wait_time); + } + return -1; +} + +static int flb_kube_network_init(struct flb_kube *ctx, struct flb_config *config) +{ + int io_type = FLB_IO_TCP; + + ctx->upstream = NULL; + + if (ctx->api_https == FLB_TRUE) { + if (!ctx->tls_ca_path && !ctx->tls_ca_file) { + ctx->tls_ca_file = flb_strdup(FLB_KUBE_CA); + } + ctx->tls = flb_tls_create(FLB_TLS_CLIENT_MODE, + ctx->tls_verify, + ctx->tls_debug, + ctx->tls_vhost, + ctx->tls_ca_path, + ctx->tls_ca_file, + NULL, NULL, NULL); + if (!ctx->tls) { + return -1; + } + + io_type = FLB_IO_TLS; + } + + /* Create an Upstream context */ + ctx->upstream = flb_upstream_create(config, + ctx->api_host, + ctx->api_port, + io_type, + ctx->tls); + if (!ctx->upstream) { + /* note: if ctx->tls.context is set, it's destroyed upon context exit */ + flb_plg_debug(ctx->ins, "kube network init create upstream failed"); + return -1; + } + + /* Remove async flag from upstream */ + flb_stream_disable_async_mode(&ctx->upstream->base); + + return 0; +} + +/* Initialize local context */ +int flb_kube_meta_init(struct flb_kube *ctx, struct flb_config *config) +{ + int ret; + char *meta_buf; + size_t meta_size; + + if (ctx->dummy_meta == FLB_TRUE) { + flb_plg_warn(ctx->ins, "using Dummy Metadata"); + return 0; + } + + if (ctx->use_tag_for_meta) { + flb_plg_info(ctx->ins, "no network access required (OK)"); + return 0; + } + + /* Init network */ + flb_kube_network_init(ctx, config); + + /* Gather local info */ + ret = get_local_pod_info(ctx); + if (ret == FLB_TRUE && !ctx->use_tag_for_meta) { + flb_plg_info(ctx->ins, "local POD info OK"); + + ret = wait_for_dns(ctx); + if (ret == -1) { + flb_plg_warn(ctx->ins, "could not resolve %s", ctx->api_host); + return -1; + } + + if (ctx->use_kubelet) { + /* Gather info from Kubelet */ + flb_plg_info(ctx->ins, "testing connectivity with Kubelet..."); + ret = get_pods_from_kubelet(ctx, ctx->namespace, ctx->podname, + &meta_buf, &meta_size); + } + else { + /* Gather info from API server */ + flb_plg_info(ctx->ins, "testing connectivity with API server..."); + ret = get_api_server_info(ctx, ctx->namespace, ctx->podname, + &meta_buf, &meta_size); + } + if (ret == -1) { + if (!ctx->podname) { + flb_plg_warn(ctx->ins, "could not get meta for local POD"); + } + else { + flb_plg_warn(ctx->ins, "could not get meta for POD %s", + ctx->podname); + } + return -1; + } + flb_plg_info(ctx->ins, "connectivity OK"); + flb_free(meta_buf); + } + else { + flb_plg_info(ctx->ins, "Fluent Bit not running in a POD"); + } + + return 0; +} + +int flb_kube_dummy_meta_get(char **out_buf, size_t *out_size) +{ + int len; + time_t t; + char stime[32]; + struct tm result; + msgpack_sbuffer mp_sbuf; + msgpack_packer mp_pck; + + t = time(NULL); + localtime_r(&t, &result); +#ifdef FLB_SYSTEM_WINDOWS + asctime_s(stime, sizeof(stime), &result); +#else + asctime_r(&result, stime); +#endif + len = strlen(stime) - 1; + + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + msgpack_pack_map(&mp_pck, 1); + msgpack_pack_str(&mp_pck, 5 /* dummy */ ); + msgpack_pack_str_body(&mp_pck, "dummy", 5); + msgpack_pack_str(&mp_pck, len); + msgpack_pack_str_body(&mp_pck, stime, len); + + *out_buf = mp_sbuf.data; + *out_size = mp_sbuf.size; + + return 0; +} + +int flb_kube_meta_get(struct flb_kube *ctx, + const char *tag, int tag_len, + const char *data, size_t data_size, + const char **out_buf, size_t *out_size, + struct flb_kube_meta *meta, + struct flb_kube_props *props) +{ + int id; + int ret; + const char *hash_meta_buf; + char *tmp_hash_meta_buf; + size_t off = 0; + size_t hash_meta_size; + msgpack_unpacked result; + + /* Get metadata from tag or record (cache key is the important one) */ + ret = extract_meta(ctx, tag, tag_len, data, data_size, meta); + if (ret != 0) { + return -1; + } + + /* Check if we have some data associated to the cache key */ + ret = flb_hash_table_get(ctx->hash_table, + meta->cache_key, meta->cache_key_len, + (void *) &hash_meta_buf, &hash_meta_size); + if (ret == -1) { + /* Retrieve API server meta and merge with local meta */ + ret = get_and_merge_meta(ctx, meta, + &tmp_hash_meta_buf, &hash_meta_size); + if (ret == -1) { + *out_buf = NULL; + *out_size = 0; + return 0; + } + + id = flb_hash_table_add(ctx->hash_table, + meta->cache_key, meta->cache_key_len, + tmp_hash_meta_buf, hash_meta_size); + if (id >= 0) { + /* + * Release the original buffer created on extract_meta() as a new + * copy have been generated into the hash table, then re-set + * the outgoing buffer and size. + */ + flb_free(tmp_hash_meta_buf); + flb_hash_table_get_by_id(ctx->hash_table, id, meta->cache_key, + &hash_meta_buf, &hash_meta_size); + } + } + + /* + * The retrieved buffer may have two serialized items: + * + * [0] = kubernetes metadata (annotations, labels) + * [1] = Annotation properties + * + * note: annotation properties are optional. + */ + msgpack_unpacked_init(&result); + + /* Unpack to get the offset/bytes of the first item */ + msgpack_unpack_next(&result, hash_meta_buf, hash_meta_size, &off); + + /* Set the pointer and proper size for the caller */ + *out_buf = hash_meta_buf; + *out_size = off; + + /* A new unpack_next() call will succeed If annotation properties exists */ + ret = msgpack_unpack_next(&result, hash_meta_buf, hash_meta_size, &off); + if (ret == MSGPACK_UNPACK_SUCCESS) { + /* Unpack the remaining data into properties structure */ + flb_kube_prop_unpack(props, + hash_meta_buf + *out_size, + hash_meta_size - *out_size); + } + msgpack_unpacked_destroy(&result); + + return 0; +} + +int flb_kube_meta_release(struct flb_kube_meta *meta) +{ + int r = 0; + + if (meta->namespace) { + flb_free(meta->namespace); + r++; + } + + if (meta->podname) { + flb_free(meta->podname); + r++; + } + + if (meta->container_name) { + flb_free(meta->container_name); + r++; + } + + if (meta->docker_id) { + flb_free(meta->docker_id); + r++; + } + + if (meta->container_hash) { + flb_free(meta->container_hash); + r++; + } + + if (meta->container_image) { + flb_free(meta->container_image); + r++; + } + + if (meta->cache_key) { + flb_free(meta->cache_key); + } + + return r; +} |