summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/filter_kubernetes/kube_meta.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/plugins/filter_kubernetes/kube_meta.c')
-rw-r--r--src/fluent-bit/plugins/filter_kubernetes/kube_meta.c1650
1 files changed, 0 insertions, 1650 deletions
diff --git a/src/fluent-bit/plugins/filter_kubernetes/kube_meta.c b/src/fluent-bit/plugins/filter_kubernetes/kube_meta.c
deleted file mode 100644
index fbad2bb02..000000000
--- a/src/fluent-bit/plugins/filter_kubernetes/kube_meta.c
+++ /dev/null
@@ -1,1650 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <fluent-bit/flb_info.h>
-#include <fluent-bit/flb_filter_plugin.h>
-#include <fluent-bit/flb_compat.h>
-#include <fluent-bit/flb_hash_table.h>
-#include <fluent-bit/flb_regex.h>
-#include <fluent-bit/flb_io.h>
-#include <fluent-bit/flb_upstream.h>
-#include <fluent-bit/flb_http_client.h>
-#include <fluent-bit/flb_pack.h>
-#include <fluent-bit/flb_env.h>
-#include <fluent-bit/tls/flb_tls.h>
-
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <fcntl.h>
-#include <msgpack.h>
-
-#include "kube_conf.h"
-#include "kube_meta.h"
-#include "kube_property.h"
-
-#define FLB_KUBE_META_CONTAINER_STATUSES_KEY "containerStatuses"
-#define FLB_KUBE_META_CONTAINER_STATUSES_KEY_LEN \
- (sizeof(FLB_KUBE_META_CONTAINER_STATUSES_KEY) - 1)
-#define FLB_KUBE_META_INIT_CONTAINER_STATUSES_KEY "initContainerStatuses"
-#define FLB_KUBE_META_INIT_CONTAINER_STATUSES_KEY_LEN \
- (sizeof(FLB_KUBE_META_INIT_CONTAINER_STATUSES_KEY) - 1)
-#define FLB_KUBE_TOKEN_BUF_SIZE 8192 /* 8KB */
-
-static int file_to_buffer(const char *path,
- char **out_buf, size_t *out_size)
-{
- int ret;
- char *buf;
- ssize_t bytes;
- FILE *fp;
- struct stat st;
-
- if (!(fp = fopen(path, "r"))) {
- return -1;
- }
-
- ret = stat(path, &st);
- if (ret == -1) {
- flb_errno();
- fclose(fp);
- return -1;
- }
-
- buf = flb_calloc(1, (st.st_size + 1));
- if (!buf) {
- flb_errno();
- fclose(fp);
- return -1;
- }
-
- bytes = fread(buf, st.st_size, 1, fp);
- if (bytes < 1) {
- flb_free(buf);
- fclose(fp);
- return -1;
- }
-
- fclose(fp);
-
- *out_buf = buf;
- *out_size = st.st_size;
-
- return 0;
-}
-
-#ifdef FLB_HAVE_KUBE_TOKEN_COMMAND
-/* Run command to get Kubernetes authorization token */
-static int get_token_with_command(const char *command,
- char **out_buf, size_t *out_size)
-{
- FILE *fp;
- char buf[FLB_KUBE_TOKEN_BUF_SIZE];
- char *temp;
- char *res;
- size_t size = 0;
- size_t len = 0;
-
- fp = popen(command, "r");
- if (fp == NULL) {
- return -1;
- }
-
- res = flb_calloc(1, FLB_KUBE_TOKEN_BUF_SIZE);
- if (!res) {
- flb_errno();
- pclose(fp);
- return -1;
- }
-
- while (fgets(buf, sizeof(buf), fp) != NULL) {
- len = strlen(buf);
- if (len >= FLB_KUBE_TOKEN_BUF_SIZE - 1) {
- temp = flb_realloc(res, (FLB_KUBE_TOKEN_BUF_SIZE + size) * 2);
- if (temp == NULL) {
- flb_errno();
- flb_free(res);
- pclose(fp);
- return -1;
- }
- res = temp;
- }
- strcpy(res + size, buf);
- size += len;
- }
-
- if (strlen(res) < 1) {
- flb_free(res);
- pclose(fp);
- return -1;
- }
-
- pclose(fp);
-
- *out_buf = res;
- *out_size = strlen(res);
-
- return 0;
-}
-#endif
-
-/* Set K8s Authorization Token and get HTTP Auth Header */
-static int get_http_auth_header(struct flb_kube *ctx)
-{
- int ret;
- char *temp;
- char *tk = NULL;
- size_t tk_size = 0;
-
- if (ctx->kube_token_command != NULL) {
-#ifdef FLB_HAVE_KUBE_TOKEN_COMMAND
- ret = get_token_with_command(ctx->kube_token_command, &tk, &tk_size);
-#else
- ret = -1;
-#endif
- if (ret == -1) {
- flb_plg_warn(ctx->ins, "failed to run command %s", ctx->kube_token_command);
- }
- }
- else {
- ret = file_to_buffer(ctx->token_file, &tk, &tk_size);
- if (ret == -1) {
- flb_plg_warn(ctx->ins, "cannot open %s", FLB_KUBE_TOKEN);
- }
- flb_plg_info(ctx->ins, " token updated");
- }
- ctx->kube_token_create = time(NULL);
-
- /* Token */
- if (ctx->token != NULL) {
- flb_free(ctx->token);
- }
- ctx->token = tk;
- ctx->token_len = tk_size;
-
- /* HTTP Auth Header */
- if (ctx->auth == NULL) {
- ctx->auth = flb_malloc(tk_size + 32);
- }
- else if (ctx->auth_len < tk_size + 32) {
- temp = flb_realloc(ctx->auth, tk_size + 32);
- if (temp == NULL) {
- flb_free(ctx->auth);
- ctx->auth = NULL;
- return -1;
- }
- ctx->auth = temp;
- }
-
- if (!ctx->auth) {
- return -1;
- }
- ctx->auth_len = snprintf(ctx->auth, tk_size + 32,
- "Bearer %s",
- tk);
-
- return 0;
-}
-
-/* Refresh HTTP Auth Header if K8s Authorization Token is expired */
-static int refresh_token_if_needed(struct flb_kube *ctx)
-{
- int expired = 0;
- int ret;
-
- if (ctx->kube_token_create > 0) {
- if (time(NULL) > ctx->kube_token_create + ctx->kube_token_ttl) {
- expired = FLB_TRUE;
- }
- }
-
- if (expired || ctx->kube_token_create == 0) {
- ret = get_http_auth_header(ctx);
- if (ret == -1) {
- flb_plg_warn(ctx->ins, "failed to set http auth header");
- return -1;
- }
- }
-
- return 0;
-}
-
-static void expose_k8s_meta(struct flb_kube *ctx)
-{
- char *tmp;
- struct flb_env *env;
-
- env = ctx->config->env;
-
- flb_env_set(env, "k8s", "enabled");
- flb_env_set(env, "k8s.namespace", ctx->namespace);
- flb_env_set(env, "k8s.pod_name", ctx->podname);
-
- tmp = (char *) flb_env_get(env, "NODE_NAME");
- if (tmp) {
- flb_env_set(env, "k8s.node_name", tmp);
- }
-}
-
-/* Load local information from a POD context */
-static int get_local_pod_info(struct flb_kube *ctx)
-{
- int ret;
- char *ns;
- size_t ns_size;
- char *hostname;
-
- /* Get the namespace name */
- ret = file_to_buffer(FLB_KUBE_NAMESPACE, &ns, &ns_size);
- if (ret == -1) {
- /*
- * If it fails, it's just informational, as likely the caller
- * wanted to connect using the Proxy instead from inside a POD.
- */
- flb_plg_warn(ctx->ins, "cannot open %s", FLB_KUBE_NAMESPACE);
- return FLB_FALSE;
- }
-
- /* Namespace */
- ctx->namespace = ns;
- ctx->namespace_len = ns_size;
-
- /* POD Name */
- hostname = getenv("HOSTNAME");
- if (hostname) {
- ctx->podname = flb_strdup(hostname);
- ctx->podname_len = strlen(ctx->podname);
- }
- else {
- char tmp[256];
- gethostname(tmp, 256);
- ctx->podname = flb_strdup(tmp);
- ctx->podname_len = strlen(ctx->podname);
- }
-
- /* If a namespace was recognized, a token is mandatory */
- /* Use the token to get HTTP Auth Header*/
- ret = get_http_auth_header(ctx);
- if (ret == -1) {
- flb_plg_warn(ctx->ins, "failed to set http auth header");
- return FLB_FALSE;
- }
-
- expose_k8s_meta(ctx);
- return FLB_TRUE;
-}
-
-/*
- * If a file exists called namespace_podname.meta, load it and use it.
- * If not, fall back to API. This is primarily for diagnostic purposes,
- * e.g. debugging new parsers.
- */
-static int get_meta_file_info(struct flb_kube *ctx, const char *namespace,
- const char *podname, char **buffer, size_t *size,
- int *root_type) {
-
- int fd = -1;
- char *payload = NULL;
- size_t payload_size = 0;
- struct stat sb;
- int packed = -1;
- int ret;
- char uri[1024];
-
- if (ctx->meta_preload_cache_dir && namespace && podname) {
-
- ret = snprintf(uri, sizeof(uri) - 1, "%s/%s_%s.meta",
- ctx->meta_preload_cache_dir, namespace, podname);
- if (ret > 0) {
- fd = open(uri, O_RDONLY, 0);
- if (fd != -1) {
- if (fstat(fd, &sb) == 0) {
- payload = flb_malloc(sb.st_size);
- if (!payload) {
- flb_errno();
- }
- else {
- ret = read(fd, payload, sb.st_size);
- if (ret == sb.st_size) {
- payload_size = ret;
- }
- }
- }
- close(fd);
- }
- }
-
- if (payload_size) {
- packed = flb_pack_json(payload, payload_size,
- buffer, size, root_type,
- NULL);
- }
-
- if (payload) {
- flb_free(payload);
- }
- }
-
- return packed;
-}
-
-/* Gather metadata from HTTP Request,
- * this could send out HTTP Request either to KUBE Server API or Kubelet
- */
-static int get_meta_info_from_request(struct flb_kube *ctx,
- const char *namespace,
- const char *podname,
- char **buffer, size_t *size,
- int *root_type,
- char* uri)
-{
- struct flb_http_client *c;
- struct flb_connection *u_conn;
- int ret;
- size_t b_sent;
- int packed;
-
- if (!ctx->upstream) {
- return -1;
- }
-
- u_conn = flb_upstream_conn_get(ctx->upstream);
-
- if (!u_conn) {
- flb_plg_error(ctx->ins, "kubelet upstream connection error");
- return -1;
- }
-
- ret = refresh_token_if_needed(ctx);
- if (ret == -1) {
- flb_plg_error(ctx->ins, "failed to refresh token");
- flb_upstream_conn_release(u_conn);
- return -1;
- }
-
- /* Compose HTTP Client request*/
- c = flb_http_client(u_conn, FLB_HTTP_GET,
- uri,
- NULL, 0, NULL, 0, NULL, 0);
- flb_http_buffer_size(c, ctx->buffer_size);
-
- flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
- flb_http_add_header(c, "Connection", 10, "close", 5);
- if (ctx->auth_len > 0) {
- flb_http_add_header(c, "Authorization", 13, ctx->auth, ctx->auth_len);
- }
-
- ret = flb_http_do(c, &b_sent);
- flb_plg_debug(ctx->ins, "Request (ns=%s, pod=%s) http_do=%i, "
- "HTTP Status: %i",
- namespace, podname, ret, c->resp.status);
-
- if (ret != 0 || c->resp.status != 200) {
- if (c->resp.payload_size > 0) {
- flb_plg_debug(ctx->ins, "HTTP response\n%s",
- c->resp.payload);
- }
- flb_http_client_destroy(c);
- flb_upstream_conn_release(u_conn);
- return -1;
- }
-
- packed = flb_pack_json(c->resp.payload, c->resp.payload_size,
- buffer, size, root_type, NULL);
-
- /* release resources */
- flb_http_client_destroy(c);
- flb_upstream_conn_release(u_conn);
-
- return packed;
-
-}
-
-/* Gather pods list information from Kubelet */
-static int get_pods_from_kubelet(struct flb_kube *ctx,
- const char *namespace, const char *podname,
- char **out_buf, size_t *out_size)
-{
- int ret;
- int packed = -1;
- int root_type;
- char uri[1024];
- char *buf;
- size_t size;
-
- *out_buf = NULL;
- *out_size = 0;
-
- /* used for unit test purposes*/
- packed = get_meta_file_info(ctx, namespace, podname, &buf, &size,
- &root_type);
-
- if (packed == -1) {
-
- ret = snprintf(uri, sizeof(uri) - 1, FLB_KUBELET_PODS);
- if (ret == -1) {
- return -1;
- }
- flb_plg_debug(ctx->ins,
- "Send out request to Kubelet for pods information.");
- packed = get_meta_info_from_request(ctx, namespace, podname,
- &buf, &size, &root_type, uri);
- }
-
- /* validate pack */
- if (packed == -1) {
- return -1;
- }
-
- *out_buf = buf;
- *out_size = size;
-
- return 0;
-}
-
-/* Gather metadata from API Server */
-static int get_api_server_info(struct flb_kube *ctx,
- const char *namespace, const char *podname,
- char **out_buf, size_t *out_size)
-{
- int ret;
- int packed = -1;
- int root_type;
- char uri[1024];
- char *buf;
- size_t size;
-
- *out_buf = NULL;
- *out_size = 0;
-
- /* used for unit test purposes*/
- packed = get_meta_file_info(ctx, namespace, podname,
- &buf, &size, &root_type);
-
- if (packed == -1) {
-
- ret = snprintf(uri, sizeof(uri) - 1, FLB_KUBE_API_FMT, namespace,
- podname);
-
- if (ret == -1) {
- return -1;
- }
- flb_plg_debug(ctx->ins,
- "Send out request to API Server for pods information");
- packed = get_meta_info_from_request(ctx, namespace, podname,
- &buf, &size, &root_type, uri);
- }
-
- /* validate pack */
- if (packed == -1) {
- return -1;
- }
-
- *out_buf = buf;
- *out_size = size;
-
- return 0;
-}
-
-static void cb_results(const char *name, const char *value,
- size_t vlen, void *data)
-{
- struct flb_kube_meta *meta = data;
-
- if (vlen == 0) {
- return;
- }
-
- if (meta->podname == NULL && strcmp(name, "pod_name") == 0) {
- meta->podname = flb_strndup(value, vlen);
- meta->podname_len = vlen;
- meta->fields++;
- }
- else if (meta->namespace == NULL &&
- strcmp(name, "namespace_name") == 0) {
- meta->namespace = flb_strndup(value, vlen);
- meta->namespace_len = vlen;
- meta->fields++;
- }
- else if (meta->container_name == NULL &&
- strcmp(name, "container_name") == 0) {
- meta->container_name = flb_strndup(value, vlen);
- meta->container_name_len = vlen;
- meta->fields++;
- }
- else if (meta->docker_id == NULL &&
- strcmp(name, "docker_id") == 0) {
- meta->docker_id = flb_strndup(value, vlen);
- meta->docker_id_len = vlen;
- meta->fields++;
- }
- else if (meta->container_hash == NULL &&
- strcmp(name, "container_hash") == 0) {
- meta->container_hash = flb_strndup(value, vlen);
- meta->container_hash_len = vlen;
- meta->fields++;
- }
-
- return;
-}
-
-static int extract_hash(const char * im, int sz, const char ** out, int * outsz)
-{
- char * colon = NULL;
- char * slash = NULL;
-
- *out = NULL;
- *outsz = 0;
-
- if (sz <= 1) {
- return -1;
- }
-
- colon = memchr(im, ':', sz);
-
- if (colon == NULL) {
- return -1;
- } else {
- slash = colon;
- while ((im + sz - slash + 1) > 0 && *(slash + 1) == '/') {
- slash++;
- }
- if (slash == colon) {
- slash = NULL;
- }
- }
-
- if (slash == NULL && (im + sz - colon) > 0) {
- *out = im;
- }
-
- if (slash != NULL && (colon - slash) < 0 && (im + sz - slash) > 0) {
- *out = slash + 1;
- }
-
- if (*out) {
- *outsz = im + sz - *out;
- return 0;
- }
- return -1;
-}
-
-/*
- * As per Kubernetes Pod spec,
- * https://kubernetes.io/docs/concepts/workloads/pods/pod/, we look
- * for status.{initContainerStatuses, containerStatuses}.{containerID, imageID, image}
- * where status.{initContainerStatuses, containerStatus}.name == our container
- * name
- * status:
- * ...
- * containerStatuses:
- * - containerID: XXX
- * image: YYY
- * imageID: ZZZ
- * ...
- * name: nginx-ingress-microk8s
-*/
-static void extract_container_hash(struct flb_kube_meta *meta,
- msgpack_object status)
-{
- int i;
- msgpack_object k, v;
- int docker_id_len = 0;
- int container_hash_len = 0;
- int container_image_len = 0;
- const char *container_hash;
- const char *docker_id;
- const char *container_image;
- const char *tmp;
- int tmp_len = 0;
- int name_found = FLB_FALSE;
- /* Process status/containerStatus map for docker_id, container_hash, container_image */
- for (i = 0;
- (meta->docker_id_len == 0 || meta->container_hash_len == 0 ||
- meta->container_image_len == 0) &&
- i < status.via.map.size; i++) {
- k = status.via.map.ptr[i].key;
- if ((k.via.str.size == FLB_KUBE_META_CONTAINER_STATUSES_KEY_LEN &&
- strncmp(k.via.str.ptr,
- FLB_KUBE_META_CONTAINER_STATUSES_KEY,
- FLB_KUBE_META_CONTAINER_STATUSES_KEY_LEN) == 0) ||
- (k.via.str.size == FLB_KUBE_META_INIT_CONTAINER_STATUSES_KEY_LEN &&
- strncmp(k.via.str.ptr,
- FLB_KUBE_META_INIT_CONTAINER_STATUSES_KEY,
- FLB_KUBE_META_INIT_CONTAINER_STATUSES_KEY_LEN) == 0)) {
- int j;
- v = status.via.map.ptr[i].val;
- for (j = 0;
- (meta->docker_id_len == 0 ||
- meta->container_hash_len == 0 ||
- meta->container_image_len == 0) && j < v.via.array.size;
- j++) {
- int l;
- msgpack_object k1, k2;
- msgpack_object_str v2;
- k1 = v.via.array.ptr[j];
- for (l = 0;
- (meta->docker_id_len == 0 ||
- meta->container_hash_len == 0 ||
- meta->container_image_len == 0) &&
- l < k1.via.map.size; l++) {
- k2 = k1.via.map.ptr[l].key;
- v2 = k1.via.map.ptr[l].val.via.str;
- if (k2.via.str.size == sizeof("name") - 1 &&
- !strncmp(k2.via.str.ptr, "name", k2.via.str.size)) {
- if (v2.size == meta->container_name_len &&
- !strncmp(v2.ptr,
- meta->container_name,
- meta->container_name_len)) {
- name_found = FLB_TRUE;
- }
- else {
- break;
- }
- }
- else if (k2.via.str.size == sizeof("containerID") - 1 &&
- !strncmp(k2.via.str.ptr,
- "containerID",
- k2.via.str.size)) {
- if (extract_hash(v2.ptr, v2.size, &tmp, &tmp_len) == 0) {
- docker_id = tmp;
- docker_id_len = tmp_len;
- }
- }
- else if (k2.via.str.size == sizeof("imageID") - 1 &&
- !strncmp(k2.via.str.ptr,
- "imageID",
- k2.via.str.size)) {
- if (extract_hash(v2.ptr, v2.size, &tmp, &tmp_len) == 0) {
- container_hash = tmp;
- container_hash_len = tmp_len;
- }
- }
- else if (k2.via.str.size == sizeof("image") - 1 &&
- !strncmp(k2.via.str.ptr,
- "image",
- k2.via.str.size)) {
- container_image = v2.ptr;
- container_image_len = v2.size;
- }
- }
- if (name_found) {
- if (container_hash_len && !meta->container_hash_len) {
- meta->container_hash_len = container_hash_len;
- meta->container_hash = flb_strndup(container_hash,
- container_hash_len);
- meta->fields++;
- }
- if (docker_id_len && !meta->docker_id_len) {
- meta->docker_id_len = docker_id_len;
- meta->docker_id = flb_strndup(docker_id, docker_id_len);
- meta->fields++;
- }
- if (container_image_len && !meta->container_image_len) {
- meta->container_image_len = container_image_len;
- meta->container_image = flb_strndup(container_image, container_image_len);
- meta->fields++;
- }
- return;
- }
- }
- }
- }
-}
-
-static int search_podname_and_namespace(struct flb_kube_meta *meta,
- struct flb_kube *ctx,
- msgpack_object map)
-{
- int i;
- int podname_found = FLB_FALSE;
- int namespace_found = FLB_FALSE;
- int target_podname_found = FLB_FALSE;
- int target_namespace_found = FLB_FALSE;
-
- msgpack_object k;
- msgpack_object v;
-
- for (i = 0; (!podname_found || !namespace_found) &&
- i < map.via.map.size; i++) {
-
- k = map.via.map.ptr[i].key;
- v = map.via.map.ptr[i].val;
- if (k.via.str.size == 4 && !strncmp(k.via.str.ptr, "name", 4)) {
-
- podname_found = FLB_TRUE;
- if (!strncmp(v.via.str.ptr, meta->podname, meta->podname_len)) {
- target_podname_found = FLB_TRUE;
- }
-
- }
- else if (k.via.str.size == 9 && !strncmp(k.via.str.ptr,
- "namespace", 9)) {
-
- namespace_found = FLB_TRUE;
- if (!strncmp((char *)v.via.str.ptr,
- meta->namespace,
- meta->namespace_len)) {
- target_namespace_found = FLB_TRUE;
- }
- }
- }
-
- if (!target_podname_found || !target_namespace_found) {
- return -1;
- }
-
- return 0;
-}
-
-static int search_metadata_in_items(struct flb_kube_meta *meta,
- struct flb_kube *ctx,
- msgpack_object items_array,
- msgpack_object *target_item_map)
-{
- int i, j;
-
- int target_found = FLB_FALSE;
- msgpack_object item_info_map;
- msgpack_object k;
- msgpack_object v;
-
- for (i = 0; !target_found && i < items_array.via.array.size; i++) {
-
- item_info_map = items_array.via.array.ptr[i];
- if (item_info_map.type != MSGPACK_OBJECT_MAP) {
- continue;
- }
-
- for (j = 0; j < item_info_map.via.map.size; j++) {
-
- k = item_info_map.via.map.ptr[j].key;
- if (k.via.str.size == 8 &&
- !strncmp(k.via.str.ptr, "metadata", 8)) {
-
- v = item_info_map.via.map.ptr[j].val;
- if (search_podname_and_namespace(meta, ctx, v) == 0) {
- target_found = FLB_TRUE;
- *target_item_map = item_info_map;
- flb_plg_debug(ctx->ins,
- "kubelet find pod: %s and ns: %s match",
- meta->podname, meta->namespace);
- }
- break;
- }
- }
- }
-
- if (!target_found) {
- flb_plg_debug(ctx->ins,
- "kubelet didn't find pod: %s, ns: %s match",
- meta->podname, meta->namespace);
- return -1;
- }
- return 0;
-}
-
-/* At this point map points to the ROOT map, eg:
- *
- * {
- * "kind": "PodList",
- * "apiVersion": "v1",
- * "metadata": {},
- * "items": [{
- * "metadata": {
- * "name": "fluent-bit-rz47v",
- * "generateName": "fluent-bit-",
- * "namespace": "kube-system",
- * "selfLink": "/api/v1/namespaces/kube-system/pods/fluent-bit-rz47v",
- * ....
- * }
- * }]
- *
- */
-static int search_item_in_items(struct flb_kube_meta *meta,
- struct flb_kube *ctx,
- msgpack_object api_map,
- msgpack_object *target_item_map)
-{
-
- int i;
- int items_array_found = FLB_FALSE;
-
- msgpack_object k;
- msgpack_object v;
- msgpack_object items_array;
-
- for (i = 0; !items_array_found && i < api_map.via.map.size; i++) {
-
- k = api_map.via.map.ptr[i].key;
- if (k.via.str.size == 5 && !strncmp(k.via.str.ptr, "items", 5)) {
-
- v = api_map.via.map.ptr[i].val;
- if (v.type == MSGPACK_OBJECT_ARRAY) {
- items_array = v;
- items_array_found = FLB_TRUE;
- }
- }
- }
-
- int ret = search_metadata_in_items(meta, ctx, items_array,
- target_item_map);
-
- return ret;
-}
-
-
-static int merge_meta_from_tag(struct flb_kube *ctx, struct flb_kube_meta *meta,
- char **out_buf, size_t *out_size)
-{
- msgpack_sbuffer mp_sbuf;
- msgpack_packer mp_pck;
- struct flb_mp_map_header mh;
-
- /* Initialize output msgpack buffer */
- msgpack_sbuffer_init(&mp_sbuf);
- msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
-
- flb_mp_map_header_init(&mh, &mp_pck);
-
- if (meta->podname != NULL) {
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 8);
- msgpack_pack_str_body(&mp_pck, "pod_name", 8);
- msgpack_pack_str(&mp_pck, meta->podname_len);
- msgpack_pack_str_body(&mp_pck, meta->podname, meta->podname_len);
- }
-
- if (meta->namespace != NULL) {
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 14);
- msgpack_pack_str_body(&mp_pck, "namespace_name", 14);
- msgpack_pack_str(&mp_pck, meta->namespace_len);
- msgpack_pack_str_body(&mp_pck, meta->namespace, meta->namespace_len);
- }
-
- if (meta->container_name != NULL) {
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 14);
- msgpack_pack_str_body(&mp_pck, "container_name", 14);
- msgpack_pack_str(&mp_pck, meta->container_name_len);
- msgpack_pack_str_body(&mp_pck, meta->container_name,
- meta->container_name_len);
- }
- if (meta->docker_id != NULL) {
- flb_mp_map_header_append(&mh);
- msgpack_pack_str(&mp_pck, 9);
- msgpack_pack_str_body(&mp_pck, "docker_id", 9);
- msgpack_pack_str(&mp_pck, meta->docker_id_len);
- msgpack_pack_str_body(&mp_pck, meta->docker_id,
- meta->docker_id_len);
- }
-
- flb_mp_map_header_end(&mh);
-
- /* Set outgoing msgpack buffer */
- *out_buf = mp_sbuf.data;
- *out_size = mp_sbuf.size;
-
- return 0;
-}
-
-static int merge_meta(struct flb_kube_meta *meta, struct flb_kube *ctx,
- const char *api_buf, size_t api_size,
- char **out_buf, size_t *out_size)
-{
- int i;
- int ret;
- int map_size = 0;
- int meta_found = FLB_FALSE;
- int spec_found = FLB_FALSE;
- int status_found = FLB_FALSE;
- int target_found = FLB_TRUE;
- int have_uid = -1;
- int have_labels = -1;
- int have_annotations = -1;
- int have_nodename = -1;
- size_t off = 0;
- msgpack_sbuffer mp_sbuf;
- msgpack_packer mp_pck;
-
- msgpack_unpacked api_result;
- msgpack_unpacked meta_result;
- msgpack_object item_result;
- msgpack_object k;
- msgpack_object v;
- msgpack_object meta_val;
- msgpack_object spec_val;
- msgpack_object status_val;
- msgpack_object api_map;
- msgpack_object ann_map;
- struct flb_kube_props props = {0};
-
- /*
- * - reg_buf: is a msgpack Map containing meta captured using Regex
- *
- * - api_buf: metadata associated to namespace and POD Name coming from
- * the API server.
- *
- * When merging data we aim to add the following keys from the API server:
- *
- * - pod_id
- * - labels
- * - annotations
- */
-
- /* Initialize output msgpack buffer */
- msgpack_sbuffer_init(&mp_sbuf);
- msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
-
- /* Iterate API server msgpack and lookup specific fields */
- if (api_buf != NULL) {
- msgpack_unpacked_init(&api_result);
- ret = msgpack_unpack_next(&api_result, api_buf, api_size, &off);
- if (ret == MSGPACK_UNPACK_SUCCESS) {
-
- if (ctx->use_kubelet) {
- ret = search_item_in_items(meta, ctx, api_result.data, &item_result);
- if (ret == -1) {
- target_found = FLB_FALSE;
- }
- api_map = target_found ? item_result : api_result.data;
- } else {
- api_map = api_result.data;
- }
-
- /* At this point map points to the ROOT map, eg:
- *
- * {
- * "kind": "Pod",
- * "apiVersion": "v1",
- * "metadata": {
- * "name": "fluent-bit-rz47v",
- * "generateName": "fluent-bit-",
- * "namespace": "kube-system",
- * "selfLink": "/api/v1/namespaces/kube-system/pods/fluent-bit-rz47v",
- * ....
- * }
- *
- * We are interested into the 'metadata' map value.
- * We are also interested in the spec.nodeName.
- * We are also interested in the status.containerStatuses.
- */
- for (i = 0; target_found && !(meta_found && spec_found && status_found) &&
- i < api_map.via.map.size; i++) {
- k = api_map.via.map.ptr[i].key;
- if (k.via.str.size == 8 && !strncmp(k.via.str.ptr, "metadata", 8)) {
- meta_val = api_map.via.map.ptr[i].val;
- if (meta_val.type == MSGPACK_OBJECT_MAP) {
- meta_found = FLB_TRUE;
- }
- }
- else if (k.via.str.size == 4 && !strncmp(k.via.str.ptr, "spec", 4)) {
- spec_val = api_map.via.map.ptr[i].val;
- spec_found = FLB_TRUE;
- }
- else if (k.via.str.size == 6 && !strncmp(k.via.str.ptr, "status", 6)) {
- status_val = api_map.via.map.ptr[i].val;
- status_found = FLB_TRUE;
- }
- }
-
- if (meta_found == FLB_TRUE) {
- /* Process metadata map value */
- msgpack_unpacked_init(&meta_result);
- for (i = 0; i < meta_val.via.map.size; i++) {
- k = meta_val.via.map.ptr[i].key;
-
- char *ptr = (char *) k.via.str.ptr;
- size_t size = k.via.str.size;
-
- if (size == 3 && strncmp(ptr, "uid", 3) == 0) {
- have_uid = i;
- map_size++;
- }
- else if (size == 6 && strncmp(ptr, "labels", 6) == 0) {
- have_labels = i;
- if (ctx->labels == FLB_TRUE) {
- map_size++;
- }
- }
-
- else if (size == 11 && strncmp(ptr, "annotations", 11) == 0) {
- have_annotations = i;
- if (ctx->annotations == FLB_TRUE) {
- map_size++;
- }
- }
-
- if (have_uid >= 0 && have_labels >= 0 && have_annotations >= 0) {
- break;
- }
- }
- }
-
- /* Process spec map value for nodeName */
- if (spec_found == FLB_TRUE) {
- for (i = 0; i < spec_val.via.map.size; i++) {
- k = spec_val.via.map.ptr[i].key;
- if (k.via.str.size == 8 &&
- strncmp(k.via.str.ptr, "nodeName", 8) == 0) {
- have_nodename = i;
- map_size++;
- break;
- }
- }
- }
-
- if ((!meta->container_hash || !meta->docker_id || !meta->container_image) && status_found) {
- extract_container_hash(meta, status_val);
- }
- }
- }
-
- /* Set map size: current + pod_id, labels and annotations */
- map_size += meta->fields;
-
- /* Append Regex fields */
- msgpack_pack_map(&mp_pck, map_size);
- if (meta->podname != NULL) {
- msgpack_pack_str(&mp_pck, 8);
- msgpack_pack_str_body(&mp_pck, "pod_name", 8);
- msgpack_pack_str(&mp_pck, meta->podname_len);
- msgpack_pack_str_body(&mp_pck, meta->podname, meta->podname_len);
- }
- if (meta->namespace != NULL) {
- msgpack_pack_str(&mp_pck, 14);
- msgpack_pack_str_body(&mp_pck, "namespace_name", 14);
- msgpack_pack_str(&mp_pck, meta->namespace_len);
- msgpack_pack_str_body(&mp_pck, meta->namespace, meta->namespace_len);
- }
-
- /* Append API Server content */
- if (have_uid >= 0) {
- v = meta_val.via.map.ptr[have_uid].val;
-
- msgpack_pack_str(&mp_pck, 6);
- msgpack_pack_str_body(&mp_pck, "pod_id", 6);
- msgpack_pack_object(&mp_pck, v);
- }
-
- if (have_labels >= 0 && ctx->labels == FLB_TRUE) {
- k = meta_val.via.map.ptr[have_labels].key;
- v = meta_val.via.map.ptr[have_labels].val;
-
- msgpack_pack_object(&mp_pck, k);
- msgpack_pack_object(&mp_pck, v);
- }
-
- if (have_annotations >= 0 && ctx->annotations == FLB_TRUE) {
- k = meta_val.via.map.ptr[have_annotations].key;
- v = meta_val.via.map.ptr[have_annotations].val;
-
- msgpack_pack_object(&mp_pck, k);
- msgpack_pack_object(&mp_pck, v);
- }
-
- if (have_nodename >= 0) {
- v = spec_val.via.map.ptr[have_nodename].val;
-
- msgpack_pack_str(&mp_pck, 4);
- msgpack_pack_str_body(&mp_pck, "host", 4);
- msgpack_pack_object(&mp_pck, v);
- }
-
- if (meta->container_name != NULL) {
- msgpack_pack_str(&mp_pck, 14);
- msgpack_pack_str_body(&mp_pck, "container_name", 14);
- msgpack_pack_str(&mp_pck, meta->container_name_len);
- msgpack_pack_str_body(&mp_pck, meta->container_name,
- meta->container_name_len);
- }
- if (meta->docker_id != NULL) {
- msgpack_pack_str(&mp_pck, 9);
- msgpack_pack_str_body(&mp_pck, "docker_id", 9);
- msgpack_pack_str(&mp_pck, meta->docker_id_len);
- msgpack_pack_str_body(&mp_pck, meta->docker_id,
- meta->docker_id_len);
- }
- if (meta->container_hash != NULL) {
- msgpack_pack_str(&mp_pck, 14);
- msgpack_pack_str_body(&mp_pck, "container_hash", 14);
- msgpack_pack_str(&mp_pck, meta->container_hash_len);
- msgpack_pack_str_body(&mp_pck, meta->container_hash,
- meta->container_hash_len);
- }
- if (meta->container_image != NULL) {
- msgpack_pack_str(&mp_pck, 15);
- msgpack_pack_str_body(&mp_pck, "container_image", 15);
- msgpack_pack_str(&mp_pck, meta->container_image_len);
- msgpack_pack_str_body(&mp_pck, meta->container_image,
- meta->container_image_len);
- }
-
- /* Process configuration suggested through Annotations */
- if (have_annotations >= 0) {
- ann_map = meta_val.via.map.ptr[have_annotations].val;
-
- /* Iterate annotations keys and look for 'logging' key */
- if (ann_map.type == MSGPACK_OBJECT_MAP) {
- for (i = 0; i < ann_map.via.map.size; i++) {
- k = ann_map.via.map.ptr[i].key;
- v = ann_map.via.map.ptr[i].val;
-
- if (k.via.str.size > 13 && /* >= 'fluentbit.io/' */
- strncmp(k.via.str.ptr, "fluentbit.io/", 13) == 0) {
-
- /* Validate and set the property */
- flb_kube_prop_set(ctx, meta,
- k.via.str.ptr + 13,
- k.via.str.size - 13,
- v.via.str.ptr,
- v.via.str.size,
- &props);
- }
- }
- }
-
- /* Pack Annotation properties */
- void *prop_buf;
- size_t prop_size;
- flb_kube_prop_pack(&props, &prop_buf, &prop_size);
- msgpack_sbuffer_write(&mp_sbuf, prop_buf, prop_size);
- flb_kube_prop_destroy(&props);
- flb_free(prop_buf);
- }
-
- if (api_buf != NULL) {
- msgpack_unpacked_destroy(&api_result);
- if (meta_found == FLB_TRUE) {
- msgpack_unpacked_destroy(&meta_result);
- }
- }
-
- /* Set outgoing msgpack buffer */
- *out_buf = mp_sbuf.data;
- *out_size = mp_sbuf.size;
-
- return 0;
-}
-
-static inline int extract_meta(struct flb_kube *ctx,
- const char *tag, int tag_len,
- const char *data, size_t data_size,
- struct flb_kube_meta *meta)
-{
- int i;
- size_t off = 0;
- ssize_t n;
- int kube_tag_len;
- const char *kube_tag_str;
- const char *container = NULL;
- int container_found = FLB_FALSE;
- int container_length = 0;
- struct flb_regex_search result;
- msgpack_unpacked mp_result;
- msgpack_object root;
- msgpack_object map;
- msgpack_object key;
- msgpack_object val;
-
- /* Reset meta context */
- memset(meta, '\0', sizeof(struct flb_kube_meta));
-
- /* Journald */
- if (ctx->use_journal == FLB_TRUE) {
- off = 0;
- msgpack_unpacked_init(&mp_result);
- while (msgpack_unpack_next(&mp_result, data, data_size, &off) == MSGPACK_UNPACK_SUCCESS) {
- root = mp_result.data;
- if (root.type != MSGPACK_OBJECT_ARRAY) {
- continue;
- }
-
- /* Lookup the CONTAINER_NAME key/value */
- map = root.via.array.ptr[1];
- for (i = 0; i < map.via.map.size; i++) {
- key = map.via.map.ptr[i].key;
- if (key.via.str.size != 14) {
- continue;
- }
-
- if (strncmp(key.via.str.ptr, "CONTAINER_NAME", 14) == 0) {
- val = map.via.map.ptr[i].val;
- container = val.via.str.ptr;
- container_length = val.via.str.size;
- container_found = FLB_TRUE;
- break;
- }
- }
-
- if (container_found == FLB_TRUE) {
- break;
- }
- }
-
- if (container_found == FLB_FALSE) {
- msgpack_unpacked_destroy(&mp_result);
- return -1;
- }
- n = flb_regex_do(ctx->regex,
- container, container_length,
- &result);
- msgpack_unpacked_destroy(&mp_result);
- }
- else {
- /*
- * Lookup metadata using regular expression. In order to let the
- * regex work we need to know before hand what's the Tag prefix
- * set and make sure the adjustment can be done.
- */
- kube_tag_len = flb_sds_len(ctx->kube_tag_prefix);
- if (kube_tag_len + 1 >= tag_len) {
- flb_plg_error(ctx->ins, "incoming record tag (%s) is shorter "
- "than kube_tag_prefix value (%s), skip filter",
- tag, ctx->kube_tag_prefix);
- return -1;
- }
- kube_tag_str = tag + kube_tag_len;
- kube_tag_len = tag_len - kube_tag_len;
-
- n = flb_regex_do(ctx->regex, kube_tag_str, kube_tag_len, &result);
- }
-
- if (n <= 0) {
- flb_plg_warn(ctx->ins, "invalid pattern for given tag %s", tag);
- return -1;
- }
-
- /* Parse the regex results */
- flb_regex_parse(ctx->regex, &result, cb_results, meta);
-
- /* Compose API server cache key */
- if (meta->podname && meta->namespace) {
- /* calculate estimated buffer size */
- n = meta->namespace_len + 1 + meta->podname_len + 1;
- if (meta->container_name) {
- n += meta->container_name_len + 1;
- }
- if (ctx->cache_use_docker_id && meta->docker_id) {
- n += meta->docker_id_len + 1;
- }
- meta->cache_key = flb_malloc(n);
- if (!meta->cache_key) {
- flb_errno();
- return -1;
- }
-
- /* Copy namespace */
- memcpy(meta->cache_key, meta->namespace, meta->namespace_len);
- off = meta->namespace_len;
-
- /* Separator */
- meta->cache_key[off++] = ':';
-
- /* Copy podname */
- memcpy(meta->cache_key + off, meta->podname, meta->podname_len);
- off += meta->podname_len;
-
- if (meta->container_name) {
- /* Separator */
- meta->cache_key[off++] = ':';
- memcpy(meta->cache_key + off, meta->container_name, meta->container_name_len);
- off += meta->container_name_len;
- }
-
- if (ctx->cache_use_docker_id && meta->docker_id) {
- /* Separator */
- meta->cache_key[off++] = ':';
- memcpy(meta->cache_key + off, meta->docker_id, meta->docker_id_len);
- off += meta->docker_id_len;
- }
-
- meta->cache_key[off] = '\0';
- meta->cache_key_len = off;
- }
- else {
- meta->cache_key = NULL;
- meta->cache_key_len = 0;
- }
-
- return 0;
-}
-
-/*
- * Given a fixed meta data (namespace and podname), get API server information
- * and merge buffers.
- */
-static int get_and_merge_meta(struct flb_kube *ctx, struct flb_kube_meta *meta,
- char **out_buf, size_t *out_size)
-{
- int ret;
- char *api_buf;
- size_t api_size;
-
- if (ctx->use_tag_for_meta) {
- ret = merge_meta_from_tag(ctx, meta, out_buf, out_size);
- return ret;
- }
- else if (ctx->use_kubelet) {
- ret = get_pods_from_kubelet(ctx, meta->namespace, meta->podname,
- &api_buf, &api_size);
- }
- else {
- ret = get_api_server_info(ctx, meta->namespace, meta->podname,
- &api_buf, &api_size);
- }
- if (ret == -1) {
- return -1;
- }
-
- ret = merge_meta(meta, ctx,
- api_buf, api_size,
- out_buf, out_size);
-
- if (api_buf != NULL) {
- flb_free(api_buf);
- }
-
- return ret;
-}
-
-/*
- * Work around kubernetes/kubernetes/issues/78479 by waiting
- * for DNS to start up.
- */
-static int wait_for_dns(struct flb_kube *ctx)
-{
- int i;
- struct addrinfo *res;
- struct addrinfo hints;
-
- memset(&hints, 0, sizeof(hints));
- hints.ai_family = AF_UNSPEC;
- hints.ai_socktype = SOCK_STREAM;
-
- for (i = 0; i < ctx->dns_retries; i++) {
- if (getaddrinfo(ctx->api_host, NULL, &hints, &res) == 0) {
- freeaddrinfo(res);
- return 0;
- }
- flb_plg_info(ctx->ins, "host: %s Wait %i secs until DNS starts up (%i/%i)",
- ctx->api_host, ctx->dns_wait_time, i + 1, ctx->dns_retries);
- sleep(ctx->dns_wait_time);
- }
- return -1;
-}
-
-static int flb_kube_network_init(struct flb_kube *ctx, struct flb_config *config)
-{
- int io_type = FLB_IO_TCP;
-
- ctx->upstream = NULL;
-
- if (ctx->api_https == FLB_TRUE) {
- if (!ctx->tls_ca_path && !ctx->tls_ca_file) {
- ctx->tls_ca_file = flb_strdup(FLB_KUBE_CA);
- }
- ctx->tls = flb_tls_create(FLB_TLS_CLIENT_MODE,
- ctx->tls_verify,
- ctx->tls_debug,
- ctx->tls_vhost,
- ctx->tls_ca_path,
- ctx->tls_ca_file,
- NULL, NULL, NULL);
- if (!ctx->tls) {
- return -1;
- }
-
- io_type = FLB_IO_TLS;
- }
-
- /* Create an Upstream context */
- ctx->upstream = flb_upstream_create(config,
- ctx->api_host,
- ctx->api_port,
- io_type,
- ctx->tls);
- if (!ctx->upstream) {
- /* note: if ctx->tls.context is set, it's destroyed upon context exit */
- flb_plg_debug(ctx->ins, "kube network init create upstream failed");
- return -1;
- }
-
- /* Remove async flag from upstream */
- flb_stream_disable_async_mode(&ctx->upstream->base);
-
- return 0;
-}
-
-/* Initialize local context */
-int flb_kube_meta_init(struct flb_kube *ctx, struct flb_config *config)
-{
- int ret;
- char *meta_buf;
- size_t meta_size;
-
- if (ctx->dummy_meta == FLB_TRUE) {
- flb_plg_warn(ctx->ins, "using Dummy Metadata");
- return 0;
- }
-
- if (ctx->use_tag_for_meta) {
- flb_plg_info(ctx->ins, "no network access required (OK)");
- return 0;
- }
-
- /* Init network */
- flb_kube_network_init(ctx, config);
-
- /* Gather local info */
- ret = get_local_pod_info(ctx);
- if (ret == FLB_TRUE && !ctx->use_tag_for_meta) {
- flb_plg_info(ctx->ins, "local POD info OK");
-
- ret = wait_for_dns(ctx);
- if (ret == -1) {
- flb_plg_warn(ctx->ins, "could not resolve %s", ctx->api_host);
- return -1;
- }
-
- if (ctx->use_kubelet) {
- /* Gather info from Kubelet */
- flb_plg_info(ctx->ins, "testing connectivity with Kubelet...");
- ret = get_pods_from_kubelet(ctx, ctx->namespace, ctx->podname,
- &meta_buf, &meta_size);
- }
- else {
- /* Gather info from API server */
- flb_plg_info(ctx->ins, "testing connectivity with API server...");
- ret = get_api_server_info(ctx, ctx->namespace, ctx->podname,
- &meta_buf, &meta_size);
- }
- if (ret == -1) {
- if (!ctx->podname) {
- flb_plg_warn(ctx->ins, "could not get meta for local POD");
- }
- else {
- flb_plg_warn(ctx->ins, "could not get meta for POD %s",
- ctx->podname);
- }
- return -1;
- }
- flb_plg_info(ctx->ins, "connectivity OK");
- flb_free(meta_buf);
- }
- else {
- flb_plg_info(ctx->ins, "Fluent Bit not running in a POD");
- }
-
- return 0;
-}
-
-int flb_kube_dummy_meta_get(char **out_buf, size_t *out_size)
-{
- int len;
- time_t t;
- char stime[32];
- struct tm result;
- msgpack_sbuffer mp_sbuf;
- msgpack_packer mp_pck;
-
- t = time(NULL);
- localtime_r(&t, &result);
-#ifdef FLB_SYSTEM_WINDOWS
- asctime_s(stime, sizeof(stime), &result);
-#else
- asctime_r(&result, stime);
-#endif
- len = strlen(stime) - 1;
-
- msgpack_sbuffer_init(&mp_sbuf);
- msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
-
- msgpack_pack_map(&mp_pck, 1);
- msgpack_pack_str(&mp_pck, 5 /* dummy */ );
- msgpack_pack_str_body(&mp_pck, "dummy", 5);
- msgpack_pack_str(&mp_pck, len);
- msgpack_pack_str_body(&mp_pck, stime, len);
-
- *out_buf = mp_sbuf.data;
- *out_size = mp_sbuf.size;
-
- return 0;
-}
-
-int flb_kube_meta_get(struct flb_kube *ctx,
- const char *tag, int tag_len,
- const char *data, size_t data_size,
- const char **out_buf, size_t *out_size,
- struct flb_kube_meta *meta,
- struct flb_kube_props *props)
-{
- int id;
- int ret;
- const char *hash_meta_buf;
- char *tmp_hash_meta_buf;
- size_t off = 0;
- size_t hash_meta_size;
- msgpack_unpacked result;
-
- /* Get metadata from tag or record (cache key is the important one) */
- ret = extract_meta(ctx, tag, tag_len, data, data_size, meta);
- if (ret != 0) {
- return -1;
- }
-
- /* Check if we have some data associated to the cache key */
- ret = flb_hash_table_get(ctx->hash_table,
- meta->cache_key, meta->cache_key_len,
- (void *) &hash_meta_buf, &hash_meta_size);
- if (ret == -1) {
- /* Retrieve API server meta and merge with local meta */
- ret = get_and_merge_meta(ctx, meta,
- &tmp_hash_meta_buf, &hash_meta_size);
- if (ret == -1) {
- *out_buf = NULL;
- *out_size = 0;
- return 0;
- }
-
- id = flb_hash_table_add(ctx->hash_table,
- meta->cache_key, meta->cache_key_len,
- tmp_hash_meta_buf, hash_meta_size);
- if (id >= 0) {
- /*
- * Release the original buffer created on extract_meta() as a new
- * copy have been generated into the hash table, then re-set
- * the outgoing buffer and size.
- */
- flb_free(tmp_hash_meta_buf);
- flb_hash_table_get_by_id(ctx->hash_table, id, meta->cache_key,
- &hash_meta_buf, &hash_meta_size);
- }
- }
-
- /*
- * The retrieved buffer may have two serialized items:
- *
- * [0] = kubernetes metadata (annotations, labels)
- * [1] = Annotation properties
- *
- * note: annotation properties are optional.
- */
- msgpack_unpacked_init(&result);
-
- /* Unpack to get the offset/bytes of the first item */
- msgpack_unpack_next(&result, hash_meta_buf, hash_meta_size, &off);
-
- /* Set the pointer and proper size for the caller */
- *out_buf = hash_meta_buf;
- *out_size = off;
-
- /* A new unpack_next() call will succeed If annotation properties exists */
- ret = msgpack_unpack_next(&result, hash_meta_buf, hash_meta_size, &off);
- if (ret == MSGPACK_UNPACK_SUCCESS) {
- /* Unpack the remaining data into properties structure */
- flb_kube_prop_unpack(props,
- hash_meta_buf + *out_size,
- hash_meta_size - *out_size);
- }
- msgpack_unpacked_destroy(&result);
-
- return 0;
-}
-
-int flb_kube_meta_release(struct flb_kube_meta *meta)
-{
- int r = 0;
-
- if (meta->namespace) {
- flb_free(meta->namespace);
- r++;
- }
-
- if (meta->podname) {
- flb_free(meta->podname);
- r++;
- }
-
- if (meta->container_name) {
- flb_free(meta->container_name);
- r++;
- }
-
- if (meta->docker_id) {
- flb_free(meta->docker_id);
- r++;
- }
-
- if (meta->container_hash) {
- flb_free(meta->container_hash);
- r++;
- }
-
- if (meta->container_image) {
- flb_free(meta->container_image);
- r++;
- }
-
- if (meta->cache_key) {
- flb_free(meta->cache_key);
- }
-
- return r;
-}