summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/filter_kubernetes/kube_meta.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/plugins/filter_kubernetes/kube_meta.c')
-rw-r--r--src/fluent-bit/plugins/filter_kubernetes/kube_meta.c1650
1 files changed, 1650 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/filter_kubernetes/kube_meta.c b/src/fluent-bit/plugins/filter_kubernetes/kube_meta.c
new file mode 100644
index 000000000..fbad2bb02
--- /dev/null
+++ b/src/fluent-bit/plugins/filter_kubernetes/kube_meta.c
@@ -0,0 +1,1650 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_filter_plugin.h>
+#include <fluent-bit/flb_compat.h>
+#include <fluent-bit/flb_hash_table.h>
+#include <fluent-bit/flb_regex.h>
+#include <fluent-bit/flb_io.h>
+#include <fluent-bit/flb_upstream.h>
+#include <fluent-bit/flb_http_client.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_env.h>
+#include <fluent-bit/tls/flb_tls.h>
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <msgpack.h>
+
+#include "kube_conf.h"
+#include "kube_meta.h"
+#include "kube_property.h"
+
+#define FLB_KUBE_META_CONTAINER_STATUSES_KEY "containerStatuses"
+#define FLB_KUBE_META_CONTAINER_STATUSES_KEY_LEN \
+ (sizeof(FLB_KUBE_META_CONTAINER_STATUSES_KEY) - 1)
+#define FLB_KUBE_META_INIT_CONTAINER_STATUSES_KEY "initContainerStatuses"
+#define FLB_KUBE_META_INIT_CONTAINER_STATUSES_KEY_LEN \
+ (sizeof(FLB_KUBE_META_INIT_CONTAINER_STATUSES_KEY) - 1)
+#define FLB_KUBE_TOKEN_BUF_SIZE 8192 /* 8KB */
+
+static int file_to_buffer(const char *path,
+ char **out_buf, size_t *out_size)
+{
+ int ret;
+ char *buf;
+ ssize_t bytes;
+ FILE *fp;
+ struct stat st;
+
+ if (!(fp = fopen(path, "r"))) {
+ return -1;
+ }
+
+ ret = stat(path, &st);
+ if (ret == -1) {
+ flb_errno();
+ fclose(fp);
+ return -1;
+ }
+
+ buf = flb_calloc(1, (st.st_size + 1));
+ if (!buf) {
+ flb_errno();
+ fclose(fp);
+ return -1;
+ }
+
+ bytes = fread(buf, st.st_size, 1, fp);
+ if (bytes < 1) {
+ flb_free(buf);
+ fclose(fp);
+ return -1;
+ }
+
+ fclose(fp);
+
+ *out_buf = buf;
+ *out_size = st.st_size;
+
+ return 0;
+}
+
+#ifdef FLB_HAVE_KUBE_TOKEN_COMMAND
+/* Run command to get Kubernetes authorization token */
+static int get_token_with_command(const char *command,
+ char **out_buf, size_t *out_size)
+{
+ FILE *fp;
+ char buf[FLB_KUBE_TOKEN_BUF_SIZE];
+ char *temp;
+ char *res;
+ size_t size = 0;
+ size_t len = 0;
+
+ fp = popen(command, "r");
+ if (fp == NULL) {
+ return -1;
+ }
+
+ res = flb_calloc(1, FLB_KUBE_TOKEN_BUF_SIZE);
+ if (!res) {
+ flb_errno();
+ pclose(fp);
+ return -1;
+ }
+
+ while (fgets(buf, sizeof(buf), fp) != NULL) {
+ len = strlen(buf);
+ if (len >= FLB_KUBE_TOKEN_BUF_SIZE - 1) {
+ temp = flb_realloc(res, (FLB_KUBE_TOKEN_BUF_SIZE + size) * 2);
+ if (temp == NULL) {
+ flb_errno();
+ flb_free(res);
+ pclose(fp);
+ return -1;
+ }
+ res = temp;
+ }
+ strcpy(res + size, buf);
+ size += len;
+ }
+
+ if (strlen(res) < 1) {
+ flb_free(res);
+ pclose(fp);
+ return -1;
+ }
+
+ pclose(fp);
+
+ *out_buf = res;
+ *out_size = strlen(res);
+
+ return 0;
+}
+#endif
+
+/* Set K8s Authorization Token and get HTTP Auth Header */
+static int get_http_auth_header(struct flb_kube *ctx)
+{
+ int ret;
+ char *temp;
+ char *tk = NULL;
+ size_t tk_size = 0;
+
+ if (ctx->kube_token_command != NULL) {
+#ifdef FLB_HAVE_KUBE_TOKEN_COMMAND
+ ret = get_token_with_command(ctx->kube_token_command, &tk, &tk_size);
+#else
+ ret = -1;
+#endif
+ if (ret == -1) {
+ flb_plg_warn(ctx->ins, "failed to run command %s", ctx->kube_token_command);
+ }
+ }
+ else {
+ ret = file_to_buffer(ctx->token_file, &tk, &tk_size);
+ if (ret == -1) {
+ flb_plg_warn(ctx->ins, "cannot open %s", FLB_KUBE_TOKEN);
+ }
+ flb_plg_info(ctx->ins, " token updated");
+ }
+ ctx->kube_token_create = time(NULL);
+
+ /* Token */
+ if (ctx->token != NULL) {
+ flb_free(ctx->token);
+ }
+ ctx->token = tk;
+ ctx->token_len = tk_size;
+
+ /* HTTP Auth Header */
+ if (ctx->auth == NULL) {
+ ctx->auth = flb_malloc(tk_size + 32);
+ }
+ else if (ctx->auth_len < tk_size + 32) {
+ temp = flb_realloc(ctx->auth, tk_size + 32);
+ if (temp == NULL) {
+ flb_free(ctx->auth);
+ ctx->auth = NULL;
+ return -1;
+ }
+ ctx->auth = temp;
+ }
+
+ if (!ctx->auth) {
+ return -1;
+ }
+ ctx->auth_len = snprintf(ctx->auth, tk_size + 32,
+ "Bearer %s",
+ tk);
+
+ return 0;
+}
+
+/* Refresh HTTP Auth Header if K8s Authorization Token is expired */
+static int refresh_token_if_needed(struct flb_kube *ctx)
+{
+ int expired = 0;
+ int ret;
+
+ if (ctx->kube_token_create > 0) {
+ if (time(NULL) > ctx->kube_token_create + ctx->kube_token_ttl) {
+ expired = FLB_TRUE;
+ }
+ }
+
+ if (expired || ctx->kube_token_create == 0) {
+ ret = get_http_auth_header(ctx);
+ if (ret == -1) {
+ flb_plg_warn(ctx->ins, "failed to set http auth header");
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+static void expose_k8s_meta(struct flb_kube *ctx)
+{
+ char *tmp;
+ struct flb_env *env;
+
+ env = ctx->config->env;
+
+ flb_env_set(env, "k8s", "enabled");
+ flb_env_set(env, "k8s.namespace", ctx->namespace);
+ flb_env_set(env, "k8s.pod_name", ctx->podname);
+
+ tmp = (char *) flb_env_get(env, "NODE_NAME");
+ if (tmp) {
+ flb_env_set(env, "k8s.node_name", tmp);
+ }
+}
+
+/* Load local information from a POD context */
+static int get_local_pod_info(struct flb_kube *ctx)
+{
+ int ret;
+ char *ns;
+ size_t ns_size;
+ char *hostname;
+
+ /* Get the namespace name */
+ ret = file_to_buffer(FLB_KUBE_NAMESPACE, &ns, &ns_size);
+ if (ret == -1) {
+ /*
+ * If it fails, it's just informational, as likely the caller
+ * wanted to connect using the Proxy instead from inside a POD.
+ */
+ flb_plg_warn(ctx->ins, "cannot open %s", FLB_KUBE_NAMESPACE);
+ return FLB_FALSE;
+ }
+
+ /* Namespace */
+ ctx->namespace = ns;
+ ctx->namespace_len = ns_size;
+
+ /* POD Name */
+ hostname = getenv("HOSTNAME");
+ if (hostname) {
+ ctx->podname = flb_strdup(hostname);
+ ctx->podname_len = strlen(ctx->podname);
+ }
+ else {
+ char tmp[256];
+ gethostname(tmp, 256);
+ ctx->podname = flb_strdup(tmp);
+ ctx->podname_len = strlen(ctx->podname);
+ }
+
+ /* If a namespace was recognized, a token is mandatory */
+ /* Use the token to get HTTP Auth Header*/
+ ret = get_http_auth_header(ctx);
+ if (ret == -1) {
+ flb_plg_warn(ctx->ins, "failed to set http auth header");
+ return FLB_FALSE;
+ }
+
+ expose_k8s_meta(ctx);
+ return FLB_TRUE;
+}
+
+/*
+ * If a file exists called namespace_podname.meta, load it and use it.
+ * If not, fall back to API. This is primarily for diagnostic purposes,
+ * e.g. debugging new parsers.
+ */
+static int get_meta_file_info(struct flb_kube *ctx, const char *namespace,
+ const char *podname, char **buffer, size_t *size,
+ int *root_type) {
+
+ int fd = -1;
+ char *payload = NULL;
+ size_t payload_size = 0;
+ struct stat sb;
+ int packed = -1;
+ int ret;
+ char uri[1024];
+
+ if (ctx->meta_preload_cache_dir && namespace && podname) {
+
+ ret = snprintf(uri, sizeof(uri) - 1, "%s/%s_%s.meta",
+ ctx->meta_preload_cache_dir, namespace, podname);
+ if (ret > 0) {
+ fd = open(uri, O_RDONLY, 0);
+ if (fd != -1) {
+ if (fstat(fd, &sb) == 0) {
+ payload = flb_malloc(sb.st_size);
+ if (!payload) {
+ flb_errno();
+ }
+ else {
+ ret = read(fd, payload, sb.st_size);
+ if (ret == sb.st_size) {
+ payload_size = ret;
+ }
+ }
+ }
+ close(fd);
+ }
+ }
+
+ if (payload_size) {
+ packed = flb_pack_json(payload, payload_size,
+ buffer, size, root_type,
+ NULL);
+ }
+
+ if (payload) {
+ flb_free(payload);
+ }
+ }
+
+ return packed;
+}
+
+/* Gather metadata from HTTP Request,
+ * this could send out HTTP Request either to KUBE Server API or Kubelet
+ */
+static int get_meta_info_from_request(struct flb_kube *ctx,
+ const char *namespace,
+ const char *podname,
+ char **buffer, size_t *size,
+ int *root_type,
+ char* uri)
+{
+ struct flb_http_client *c;
+ struct flb_connection *u_conn;
+ int ret;
+ size_t b_sent;
+ int packed;
+
+ if (!ctx->upstream) {
+ return -1;
+ }
+
+ u_conn = flb_upstream_conn_get(ctx->upstream);
+
+ if (!u_conn) {
+ flb_plg_error(ctx->ins, "kubelet upstream connection error");
+ return -1;
+ }
+
+ ret = refresh_token_if_needed(ctx);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "failed to refresh token");
+ flb_upstream_conn_release(u_conn);
+ return -1;
+ }
+
+ /* Compose HTTP Client request*/
+ c = flb_http_client(u_conn, FLB_HTTP_GET,
+ uri,
+ NULL, 0, NULL, 0, NULL, 0);
+ flb_http_buffer_size(c, ctx->buffer_size);
+
+ flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
+ flb_http_add_header(c, "Connection", 10, "close", 5);
+ if (ctx->auth_len > 0) {
+ flb_http_add_header(c, "Authorization", 13, ctx->auth, ctx->auth_len);
+ }
+
+ ret = flb_http_do(c, &b_sent);
+ flb_plg_debug(ctx->ins, "Request (ns=%s, pod=%s) http_do=%i, "
+ "HTTP Status: %i",
+ namespace, podname, ret, c->resp.status);
+
+ if (ret != 0 || c->resp.status != 200) {
+ if (c->resp.payload_size > 0) {
+ flb_plg_debug(ctx->ins, "HTTP response\n%s",
+ c->resp.payload);
+ }
+ flb_http_client_destroy(c);
+ flb_upstream_conn_release(u_conn);
+ return -1;
+ }
+
+ packed = flb_pack_json(c->resp.payload, c->resp.payload_size,
+ buffer, size, root_type, NULL);
+
+ /* release resources */
+ flb_http_client_destroy(c);
+ flb_upstream_conn_release(u_conn);
+
+ return packed;
+
+}
+
+/* Gather pods list information from Kubelet */
+static int get_pods_from_kubelet(struct flb_kube *ctx,
+ const char *namespace, const char *podname,
+ char **out_buf, size_t *out_size)
+{
+ int ret;
+ int packed = -1;
+ int root_type;
+ char uri[1024];
+ char *buf;
+ size_t size;
+
+ *out_buf = NULL;
+ *out_size = 0;
+
+ /* used for unit test purposes*/
+ packed = get_meta_file_info(ctx, namespace, podname, &buf, &size,
+ &root_type);
+
+ if (packed == -1) {
+
+ ret = snprintf(uri, sizeof(uri) - 1, FLB_KUBELET_PODS);
+ if (ret == -1) {
+ return -1;
+ }
+ flb_plg_debug(ctx->ins,
+ "Send out request to Kubelet for pods information.");
+ packed = get_meta_info_from_request(ctx, namespace, podname,
+ &buf, &size, &root_type, uri);
+ }
+
+ /* validate pack */
+ if (packed == -1) {
+ return -1;
+ }
+
+ *out_buf = buf;
+ *out_size = size;
+
+ return 0;
+}
+
+/* Gather metadata from API Server */
+static int get_api_server_info(struct flb_kube *ctx,
+ const char *namespace, const char *podname,
+ char **out_buf, size_t *out_size)
+{
+ int ret;
+ int packed = -1;
+ int root_type;
+ char uri[1024];
+ char *buf;
+ size_t size;
+
+ *out_buf = NULL;
+ *out_size = 0;
+
+ /* used for unit test purposes*/
+ packed = get_meta_file_info(ctx, namespace, podname,
+ &buf, &size, &root_type);
+
+ if (packed == -1) {
+
+ ret = snprintf(uri, sizeof(uri) - 1, FLB_KUBE_API_FMT, namespace,
+ podname);
+
+ if (ret == -1) {
+ return -1;
+ }
+ flb_plg_debug(ctx->ins,
+ "Send out request to API Server for pods information");
+ packed = get_meta_info_from_request(ctx, namespace, podname,
+ &buf, &size, &root_type, uri);
+ }
+
+ /* validate pack */
+ if (packed == -1) {
+ return -1;
+ }
+
+ *out_buf = buf;
+ *out_size = size;
+
+ return 0;
+}
+
+static void cb_results(const char *name, const char *value,
+ size_t vlen, void *data)
+{
+ struct flb_kube_meta *meta = data;
+
+ if (vlen == 0) {
+ return;
+ }
+
+ if (meta->podname == NULL && strcmp(name, "pod_name") == 0) {
+ meta->podname = flb_strndup(value, vlen);
+ meta->podname_len = vlen;
+ meta->fields++;
+ }
+ else if (meta->namespace == NULL &&
+ strcmp(name, "namespace_name") == 0) {
+ meta->namespace = flb_strndup(value, vlen);
+ meta->namespace_len = vlen;
+ meta->fields++;
+ }
+ else if (meta->container_name == NULL &&
+ strcmp(name, "container_name") == 0) {
+ meta->container_name = flb_strndup(value, vlen);
+ meta->container_name_len = vlen;
+ meta->fields++;
+ }
+ else if (meta->docker_id == NULL &&
+ strcmp(name, "docker_id") == 0) {
+ meta->docker_id = flb_strndup(value, vlen);
+ meta->docker_id_len = vlen;
+ meta->fields++;
+ }
+ else if (meta->container_hash == NULL &&
+ strcmp(name, "container_hash") == 0) {
+ meta->container_hash = flb_strndup(value, vlen);
+ meta->container_hash_len = vlen;
+ meta->fields++;
+ }
+
+ return;
+}
+
+static int extract_hash(const char * im, int sz, const char ** out, int * outsz)
+{
+ char * colon = NULL;
+ char * slash = NULL;
+
+ *out = NULL;
+ *outsz = 0;
+
+ if (sz <= 1) {
+ return -1;
+ }
+
+ colon = memchr(im, ':', sz);
+
+ if (colon == NULL) {
+ return -1;
+ } else {
+ slash = colon;
+ while ((im + sz - slash + 1) > 0 && *(slash + 1) == '/') {
+ slash++;
+ }
+ if (slash == colon) {
+ slash = NULL;
+ }
+ }
+
+ if (slash == NULL && (im + sz - colon) > 0) {
+ *out = im;
+ }
+
+ if (slash != NULL && (colon - slash) < 0 && (im + sz - slash) > 0) {
+ *out = slash + 1;
+ }
+
+ if (*out) {
+ *outsz = im + sz - *out;
+ return 0;
+ }
+ return -1;
+}
+
+/*
+ * As per Kubernetes Pod spec,
+ * https://kubernetes.io/docs/concepts/workloads/pods/pod/, we look
+ * for status.{initContainerStatuses, containerStatuses}.{containerID, imageID, image}
+ * where status.{initContainerStatuses, containerStatus}.name == our container
+ * name
+ * status:
+ * ...
+ * containerStatuses:
+ * - containerID: XXX
+ * image: YYY
+ * imageID: ZZZ
+ * ...
+ * name: nginx-ingress-microk8s
+*/
+static void extract_container_hash(struct flb_kube_meta *meta,
+ msgpack_object status)
+{
+ int i;
+ msgpack_object k, v;
+ int docker_id_len = 0;
+ int container_hash_len = 0;
+ int container_image_len = 0;
+ const char *container_hash;
+ const char *docker_id;
+ const char *container_image;
+ const char *tmp;
+ int tmp_len = 0;
+ int name_found = FLB_FALSE;
+ /* Process status/containerStatus map for docker_id, container_hash, container_image */
+ for (i = 0;
+ (meta->docker_id_len == 0 || meta->container_hash_len == 0 ||
+ meta->container_image_len == 0) &&
+ i < status.via.map.size; i++) {
+ k = status.via.map.ptr[i].key;
+ if ((k.via.str.size == FLB_KUBE_META_CONTAINER_STATUSES_KEY_LEN &&
+ strncmp(k.via.str.ptr,
+ FLB_KUBE_META_CONTAINER_STATUSES_KEY,
+ FLB_KUBE_META_CONTAINER_STATUSES_KEY_LEN) == 0) ||
+ (k.via.str.size == FLB_KUBE_META_INIT_CONTAINER_STATUSES_KEY_LEN &&
+ strncmp(k.via.str.ptr,
+ FLB_KUBE_META_INIT_CONTAINER_STATUSES_KEY,
+ FLB_KUBE_META_INIT_CONTAINER_STATUSES_KEY_LEN) == 0)) {
+ int j;
+ v = status.via.map.ptr[i].val;
+ for (j = 0;
+ (meta->docker_id_len == 0 ||
+ meta->container_hash_len == 0 ||
+ meta->container_image_len == 0) && j < v.via.array.size;
+ j++) {
+ int l;
+ msgpack_object k1, k2;
+ msgpack_object_str v2;
+ k1 = v.via.array.ptr[j];
+ for (l = 0;
+ (meta->docker_id_len == 0 ||
+ meta->container_hash_len == 0 ||
+ meta->container_image_len == 0) &&
+ l < k1.via.map.size; l++) {
+ k2 = k1.via.map.ptr[l].key;
+ v2 = k1.via.map.ptr[l].val.via.str;
+ if (k2.via.str.size == sizeof("name") - 1 &&
+ !strncmp(k2.via.str.ptr, "name", k2.via.str.size)) {
+ if (v2.size == meta->container_name_len &&
+ !strncmp(v2.ptr,
+ meta->container_name,
+ meta->container_name_len)) {
+ name_found = FLB_TRUE;
+ }
+ else {
+ break;
+ }
+ }
+ else if (k2.via.str.size == sizeof("containerID") - 1 &&
+ !strncmp(k2.via.str.ptr,
+ "containerID",
+ k2.via.str.size)) {
+ if (extract_hash(v2.ptr, v2.size, &tmp, &tmp_len) == 0) {
+ docker_id = tmp;
+ docker_id_len = tmp_len;
+ }
+ }
+ else if (k2.via.str.size == sizeof("imageID") - 1 &&
+ !strncmp(k2.via.str.ptr,
+ "imageID",
+ k2.via.str.size)) {
+ if (extract_hash(v2.ptr, v2.size, &tmp, &tmp_len) == 0) {
+ container_hash = tmp;
+ container_hash_len = tmp_len;
+ }
+ }
+ else if (k2.via.str.size == sizeof("image") - 1 &&
+ !strncmp(k2.via.str.ptr,
+ "image",
+ k2.via.str.size)) {
+ container_image = v2.ptr;
+ container_image_len = v2.size;
+ }
+ }
+ if (name_found) {
+ if (container_hash_len && !meta->container_hash_len) {
+ meta->container_hash_len = container_hash_len;
+ meta->container_hash = flb_strndup(container_hash,
+ container_hash_len);
+ meta->fields++;
+ }
+ if (docker_id_len && !meta->docker_id_len) {
+ meta->docker_id_len = docker_id_len;
+ meta->docker_id = flb_strndup(docker_id, docker_id_len);
+ meta->fields++;
+ }
+ if (container_image_len && !meta->container_image_len) {
+ meta->container_image_len = container_image_len;
+ meta->container_image = flb_strndup(container_image, container_image_len);
+ meta->fields++;
+ }
+ return;
+ }
+ }
+ }
+ }
+}
+
+static int search_podname_and_namespace(struct flb_kube_meta *meta,
+ struct flb_kube *ctx,
+ msgpack_object map)
+{
+ int i;
+ int podname_found = FLB_FALSE;
+ int namespace_found = FLB_FALSE;
+ int target_podname_found = FLB_FALSE;
+ int target_namespace_found = FLB_FALSE;
+
+ msgpack_object k;
+ msgpack_object v;
+
+ for (i = 0; (!podname_found || !namespace_found) &&
+ i < map.via.map.size; i++) {
+
+ k = map.via.map.ptr[i].key;
+ v = map.via.map.ptr[i].val;
+ if (k.via.str.size == 4 && !strncmp(k.via.str.ptr, "name", 4)) {
+
+ podname_found = FLB_TRUE;
+ if (!strncmp(v.via.str.ptr, meta->podname, meta->podname_len)) {
+ target_podname_found = FLB_TRUE;
+ }
+
+ }
+ else if (k.via.str.size == 9 && !strncmp(k.via.str.ptr,
+ "namespace", 9)) {
+
+ namespace_found = FLB_TRUE;
+ if (!strncmp((char *)v.via.str.ptr,
+ meta->namespace,
+ meta->namespace_len)) {
+ target_namespace_found = FLB_TRUE;
+ }
+ }
+ }
+
+ if (!target_podname_found || !target_namespace_found) {
+ return -1;
+ }
+
+ return 0;
+}
+
+static int search_metadata_in_items(struct flb_kube_meta *meta,
+ struct flb_kube *ctx,
+ msgpack_object items_array,
+ msgpack_object *target_item_map)
+{
+ int i, j;
+
+ int target_found = FLB_FALSE;
+ msgpack_object item_info_map;
+ msgpack_object k;
+ msgpack_object v;
+
+ for (i = 0; !target_found && i < items_array.via.array.size; i++) {
+
+ item_info_map = items_array.via.array.ptr[i];
+ if (item_info_map.type != MSGPACK_OBJECT_MAP) {
+ continue;
+ }
+
+ for (j = 0; j < item_info_map.via.map.size; j++) {
+
+ k = item_info_map.via.map.ptr[j].key;
+ if (k.via.str.size == 8 &&
+ !strncmp(k.via.str.ptr, "metadata", 8)) {
+
+ v = item_info_map.via.map.ptr[j].val;
+ if (search_podname_and_namespace(meta, ctx, v) == 0) {
+ target_found = FLB_TRUE;
+ *target_item_map = item_info_map;
+ flb_plg_debug(ctx->ins,
+ "kubelet find pod: %s and ns: %s match",
+ meta->podname, meta->namespace);
+ }
+ break;
+ }
+ }
+ }
+
+ if (!target_found) {
+ flb_plg_debug(ctx->ins,
+ "kubelet didn't find pod: %s, ns: %s match",
+ meta->podname, meta->namespace);
+ return -1;
+ }
+ return 0;
+}
+
+/* At this point map points to the ROOT map, eg:
+ *
+ * {
+ * "kind": "PodList",
+ * "apiVersion": "v1",
+ * "metadata": {},
+ * "items": [{
+ * "metadata": {
+ * "name": "fluent-bit-rz47v",
+ * "generateName": "fluent-bit-",
+ * "namespace": "kube-system",
+ * "selfLink": "/api/v1/namespaces/kube-system/pods/fluent-bit-rz47v",
+ * ....
+ * }
+ * }]
+ *
+ */
+static int search_item_in_items(struct flb_kube_meta *meta,
+ struct flb_kube *ctx,
+ msgpack_object api_map,
+ msgpack_object *target_item_map)
+{
+
+ int i;
+ int items_array_found = FLB_FALSE;
+
+ msgpack_object k;
+ msgpack_object v;
+ msgpack_object items_array;
+
+ for (i = 0; !items_array_found && i < api_map.via.map.size; i++) {
+
+ k = api_map.via.map.ptr[i].key;
+ if (k.via.str.size == 5 && !strncmp(k.via.str.ptr, "items", 5)) {
+
+ v = api_map.via.map.ptr[i].val;
+ if (v.type == MSGPACK_OBJECT_ARRAY) {
+ items_array = v;
+ items_array_found = FLB_TRUE;
+ }
+ }
+ }
+
+ int ret = search_metadata_in_items(meta, ctx, items_array,
+ target_item_map);
+
+ return ret;
+}
+
+
+static int merge_meta_from_tag(struct flb_kube *ctx, struct flb_kube_meta *meta,
+ char **out_buf, size_t *out_size)
+{
+ msgpack_sbuffer mp_sbuf;
+ msgpack_packer mp_pck;
+ struct flb_mp_map_header mh;
+
+ /* Initialize output msgpack buffer */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ flb_mp_map_header_init(&mh, &mp_pck);
+
+ if (meta->podname != NULL) {
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 8);
+ msgpack_pack_str_body(&mp_pck, "pod_name", 8);
+ msgpack_pack_str(&mp_pck, meta->podname_len);
+ msgpack_pack_str_body(&mp_pck, meta->podname, meta->podname_len);
+ }
+
+ if (meta->namespace != NULL) {
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 14);
+ msgpack_pack_str_body(&mp_pck, "namespace_name", 14);
+ msgpack_pack_str(&mp_pck, meta->namespace_len);
+ msgpack_pack_str_body(&mp_pck, meta->namespace, meta->namespace_len);
+ }
+
+ if (meta->container_name != NULL) {
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 14);
+ msgpack_pack_str_body(&mp_pck, "container_name", 14);
+ msgpack_pack_str(&mp_pck, meta->container_name_len);
+ msgpack_pack_str_body(&mp_pck, meta->container_name,
+ meta->container_name_len);
+ }
+ if (meta->docker_id != NULL) {
+ flb_mp_map_header_append(&mh);
+ msgpack_pack_str(&mp_pck, 9);
+ msgpack_pack_str_body(&mp_pck, "docker_id", 9);
+ msgpack_pack_str(&mp_pck, meta->docker_id_len);
+ msgpack_pack_str_body(&mp_pck, meta->docker_id,
+ meta->docker_id_len);
+ }
+
+ flb_mp_map_header_end(&mh);
+
+ /* Set outgoing msgpack buffer */
+ *out_buf = mp_sbuf.data;
+ *out_size = mp_sbuf.size;
+
+ return 0;
+}
+
+static int merge_meta(struct flb_kube_meta *meta, struct flb_kube *ctx,
+ const char *api_buf, size_t api_size,
+ char **out_buf, size_t *out_size)
+{
+ int i;
+ int ret;
+ int map_size = 0;
+ int meta_found = FLB_FALSE;
+ int spec_found = FLB_FALSE;
+ int status_found = FLB_FALSE;
+ int target_found = FLB_TRUE;
+ int have_uid = -1;
+ int have_labels = -1;
+ int have_annotations = -1;
+ int have_nodename = -1;
+ size_t off = 0;
+ msgpack_sbuffer mp_sbuf;
+ msgpack_packer mp_pck;
+
+ msgpack_unpacked api_result;
+ msgpack_unpacked meta_result;
+ msgpack_object item_result;
+ msgpack_object k;
+ msgpack_object v;
+ msgpack_object meta_val;
+ msgpack_object spec_val;
+ msgpack_object status_val;
+ msgpack_object api_map;
+ msgpack_object ann_map;
+ struct flb_kube_props props = {0};
+
+ /*
+ * - reg_buf: is a msgpack Map containing meta captured using Regex
+ *
+ * - api_buf: metadata associated to namespace and POD Name coming from
+ * the API server.
+ *
+ * When merging data we aim to add the following keys from the API server:
+ *
+ * - pod_id
+ * - labels
+ * - annotations
+ */
+
+ /* Initialize output msgpack buffer */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ /* Iterate API server msgpack and lookup specific fields */
+ if (api_buf != NULL) {
+ msgpack_unpacked_init(&api_result);
+ ret = msgpack_unpack_next(&api_result, api_buf, api_size, &off);
+ if (ret == MSGPACK_UNPACK_SUCCESS) {
+
+ if (ctx->use_kubelet) {
+ ret = search_item_in_items(meta, ctx, api_result.data, &item_result);
+ if (ret == -1) {
+ target_found = FLB_FALSE;
+ }
+ api_map = target_found ? item_result : api_result.data;
+ } else {
+ api_map = api_result.data;
+ }
+
+ /* At this point map points to the ROOT map, eg:
+ *
+ * {
+ * "kind": "Pod",
+ * "apiVersion": "v1",
+ * "metadata": {
+ * "name": "fluent-bit-rz47v",
+ * "generateName": "fluent-bit-",
+ * "namespace": "kube-system",
+ * "selfLink": "/api/v1/namespaces/kube-system/pods/fluent-bit-rz47v",
+ * ....
+ * }
+ *
+ * We are interested into the 'metadata' map value.
+ * We are also interested in the spec.nodeName.
+ * We are also interested in the status.containerStatuses.
+ */
+ for (i = 0; target_found && !(meta_found && spec_found && status_found) &&
+ i < api_map.via.map.size; i++) {
+ k = api_map.via.map.ptr[i].key;
+ if (k.via.str.size == 8 && !strncmp(k.via.str.ptr, "metadata", 8)) {
+ meta_val = api_map.via.map.ptr[i].val;
+ if (meta_val.type == MSGPACK_OBJECT_MAP) {
+ meta_found = FLB_TRUE;
+ }
+ }
+ else if (k.via.str.size == 4 && !strncmp(k.via.str.ptr, "spec", 4)) {
+ spec_val = api_map.via.map.ptr[i].val;
+ spec_found = FLB_TRUE;
+ }
+ else if (k.via.str.size == 6 && !strncmp(k.via.str.ptr, "status", 6)) {
+ status_val = api_map.via.map.ptr[i].val;
+ status_found = FLB_TRUE;
+ }
+ }
+
+ if (meta_found == FLB_TRUE) {
+ /* Process metadata map value */
+ msgpack_unpacked_init(&meta_result);
+ for (i = 0; i < meta_val.via.map.size; i++) {
+ k = meta_val.via.map.ptr[i].key;
+
+ char *ptr = (char *) k.via.str.ptr;
+ size_t size = k.via.str.size;
+
+ if (size == 3 && strncmp(ptr, "uid", 3) == 0) {
+ have_uid = i;
+ map_size++;
+ }
+ else if (size == 6 && strncmp(ptr, "labels", 6) == 0) {
+ have_labels = i;
+ if (ctx->labels == FLB_TRUE) {
+ map_size++;
+ }
+ }
+
+ else if (size == 11 && strncmp(ptr, "annotations", 11) == 0) {
+ have_annotations = i;
+ if (ctx->annotations == FLB_TRUE) {
+ map_size++;
+ }
+ }
+
+ if (have_uid >= 0 && have_labels >= 0 && have_annotations >= 0) {
+ break;
+ }
+ }
+ }
+
+ /* Process spec map value for nodeName */
+ if (spec_found == FLB_TRUE) {
+ for (i = 0; i < spec_val.via.map.size; i++) {
+ k = spec_val.via.map.ptr[i].key;
+ if (k.via.str.size == 8 &&
+ strncmp(k.via.str.ptr, "nodeName", 8) == 0) {
+ have_nodename = i;
+ map_size++;
+ break;
+ }
+ }
+ }
+
+ if ((!meta->container_hash || !meta->docker_id || !meta->container_image) && status_found) {
+ extract_container_hash(meta, status_val);
+ }
+ }
+ }
+
+ /* Set map size: current + pod_id, labels and annotations */
+ map_size += meta->fields;
+
+ /* Append Regex fields */
+ msgpack_pack_map(&mp_pck, map_size);
+ if (meta->podname != NULL) {
+ msgpack_pack_str(&mp_pck, 8);
+ msgpack_pack_str_body(&mp_pck, "pod_name", 8);
+ msgpack_pack_str(&mp_pck, meta->podname_len);
+ msgpack_pack_str_body(&mp_pck, meta->podname, meta->podname_len);
+ }
+ if (meta->namespace != NULL) {
+ msgpack_pack_str(&mp_pck, 14);
+ msgpack_pack_str_body(&mp_pck, "namespace_name", 14);
+ msgpack_pack_str(&mp_pck, meta->namespace_len);
+ msgpack_pack_str_body(&mp_pck, meta->namespace, meta->namespace_len);
+ }
+
+ /* Append API Server content */
+ if (have_uid >= 0) {
+ v = meta_val.via.map.ptr[have_uid].val;
+
+ msgpack_pack_str(&mp_pck, 6);
+ msgpack_pack_str_body(&mp_pck, "pod_id", 6);
+ msgpack_pack_object(&mp_pck, v);
+ }
+
+ if (have_labels >= 0 && ctx->labels == FLB_TRUE) {
+ k = meta_val.via.map.ptr[have_labels].key;
+ v = meta_val.via.map.ptr[have_labels].val;
+
+ msgpack_pack_object(&mp_pck, k);
+ msgpack_pack_object(&mp_pck, v);
+ }
+
+ if (have_annotations >= 0 && ctx->annotations == FLB_TRUE) {
+ k = meta_val.via.map.ptr[have_annotations].key;
+ v = meta_val.via.map.ptr[have_annotations].val;
+
+ msgpack_pack_object(&mp_pck, k);
+ msgpack_pack_object(&mp_pck, v);
+ }
+
+ if (have_nodename >= 0) {
+ v = spec_val.via.map.ptr[have_nodename].val;
+
+ msgpack_pack_str(&mp_pck, 4);
+ msgpack_pack_str_body(&mp_pck, "host", 4);
+ msgpack_pack_object(&mp_pck, v);
+ }
+
+ if (meta->container_name != NULL) {
+ msgpack_pack_str(&mp_pck, 14);
+ msgpack_pack_str_body(&mp_pck, "container_name", 14);
+ msgpack_pack_str(&mp_pck, meta->container_name_len);
+ msgpack_pack_str_body(&mp_pck, meta->container_name,
+ meta->container_name_len);
+ }
+ if (meta->docker_id != NULL) {
+ msgpack_pack_str(&mp_pck, 9);
+ msgpack_pack_str_body(&mp_pck, "docker_id", 9);
+ msgpack_pack_str(&mp_pck, meta->docker_id_len);
+ msgpack_pack_str_body(&mp_pck, meta->docker_id,
+ meta->docker_id_len);
+ }
+ if (meta->container_hash != NULL) {
+ msgpack_pack_str(&mp_pck, 14);
+ msgpack_pack_str_body(&mp_pck, "container_hash", 14);
+ msgpack_pack_str(&mp_pck, meta->container_hash_len);
+ msgpack_pack_str_body(&mp_pck, meta->container_hash,
+ meta->container_hash_len);
+ }
+ if (meta->container_image != NULL) {
+ msgpack_pack_str(&mp_pck, 15);
+ msgpack_pack_str_body(&mp_pck, "container_image", 15);
+ msgpack_pack_str(&mp_pck, meta->container_image_len);
+ msgpack_pack_str_body(&mp_pck, meta->container_image,
+ meta->container_image_len);
+ }
+
+ /* Process configuration suggested through Annotations */
+ if (have_annotations >= 0) {
+ ann_map = meta_val.via.map.ptr[have_annotations].val;
+
+ /* Iterate annotations keys and look for 'logging' key */
+ if (ann_map.type == MSGPACK_OBJECT_MAP) {
+ for (i = 0; i < ann_map.via.map.size; i++) {
+ k = ann_map.via.map.ptr[i].key;
+ v = ann_map.via.map.ptr[i].val;
+
+ if (k.via.str.size > 13 && /* >= 'fluentbit.io/' */
+ strncmp(k.via.str.ptr, "fluentbit.io/", 13) == 0) {
+
+ /* Validate and set the property */
+ flb_kube_prop_set(ctx, meta,
+ k.via.str.ptr + 13,
+ k.via.str.size - 13,
+ v.via.str.ptr,
+ v.via.str.size,
+ &props);
+ }
+ }
+ }
+
+ /* Pack Annotation properties */
+ void *prop_buf;
+ size_t prop_size;
+ flb_kube_prop_pack(&props, &prop_buf, &prop_size);
+ msgpack_sbuffer_write(&mp_sbuf, prop_buf, prop_size);
+ flb_kube_prop_destroy(&props);
+ flb_free(prop_buf);
+ }
+
+ if (api_buf != NULL) {
+ msgpack_unpacked_destroy(&api_result);
+ if (meta_found == FLB_TRUE) {
+ msgpack_unpacked_destroy(&meta_result);
+ }
+ }
+
+ /* Set outgoing msgpack buffer */
+ *out_buf = mp_sbuf.data;
+ *out_size = mp_sbuf.size;
+
+ return 0;
+}
+
+static inline int extract_meta(struct flb_kube *ctx,
+ const char *tag, int tag_len,
+ const char *data, size_t data_size,
+ struct flb_kube_meta *meta)
+{
+ int i;
+ size_t off = 0;
+ ssize_t n;
+ int kube_tag_len;
+ const char *kube_tag_str;
+ const char *container = NULL;
+ int container_found = FLB_FALSE;
+ int container_length = 0;
+ struct flb_regex_search result;
+ msgpack_unpacked mp_result;
+ msgpack_object root;
+ msgpack_object map;
+ msgpack_object key;
+ msgpack_object val;
+
+ /* Reset meta context */
+ memset(meta, '\0', sizeof(struct flb_kube_meta));
+
+ /* Journald */
+ if (ctx->use_journal == FLB_TRUE) {
+ off = 0;
+ msgpack_unpacked_init(&mp_result);
+ while (msgpack_unpack_next(&mp_result, data, data_size, &off) == MSGPACK_UNPACK_SUCCESS) {
+ root = mp_result.data;
+ if (root.type != MSGPACK_OBJECT_ARRAY) {
+ continue;
+ }
+
+ /* Lookup the CONTAINER_NAME key/value */
+ map = root.via.array.ptr[1];
+ for (i = 0; i < map.via.map.size; i++) {
+ key = map.via.map.ptr[i].key;
+ if (key.via.str.size != 14) {
+ continue;
+ }
+
+ if (strncmp(key.via.str.ptr, "CONTAINER_NAME", 14) == 0) {
+ val = map.via.map.ptr[i].val;
+ container = val.via.str.ptr;
+ container_length = val.via.str.size;
+ container_found = FLB_TRUE;
+ break;
+ }
+ }
+
+ if (container_found == FLB_TRUE) {
+ break;
+ }
+ }
+
+ if (container_found == FLB_FALSE) {
+ msgpack_unpacked_destroy(&mp_result);
+ return -1;
+ }
+ n = flb_regex_do(ctx->regex,
+ container, container_length,
+ &result);
+ msgpack_unpacked_destroy(&mp_result);
+ }
+ else {
+ /*
+ * Lookup metadata using regular expression. In order to let the
+ * regex work we need to know before hand what's the Tag prefix
+ * set and make sure the adjustment can be done.
+ */
+ kube_tag_len = flb_sds_len(ctx->kube_tag_prefix);
+ if (kube_tag_len + 1 >= tag_len) {
+ flb_plg_error(ctx->ins, "incoming record tag (%s) is shorter "
+ "than kube_tag_prefix value (%s), skip filter",
+ tag, ctx->kube_tag_prefix);
+ return -1;
+ }
+ kube_tag_str = tag + kube_tag_len;
+ kube_tag_len = tag_len - kube_tag_len;
+
+ n = flb_regex_do(ctx->regex, kube_tag_str, kube_tag_len, &result);
+ }
+
+ if (n <= 0) {
+ flb_plg_warn(ctx->ins, "invalid pattern for given tag %s", tag);
+ return -1;
+ }
+
+ /* Parse the regex results */
+ flb_regex_parse(ctx->regex, &result, cb_results, meta);
+
+ /* Compose API server cache key */
+ if (meta->podname && meta->namespace) {
+ /* calculate estimated buffer size */
+ n = meta->namespace_len + 1 + meta->podname_len + 1;
+ if (meta->container_name) {
+ n += meta->container_name_len + 1;
+ }
+ if (ctx->cache_use_docker_id && meta->docker_id) {
+ n += meta->docker_id_len + 1;
+ }
+ meta->cache_key = flb_malloc(n);
+ if (!meta->cache_key) {
+ flb_errno();
+ return -1;
+ }
+
+ /* Copy namespace */
+ memcpy(meta->cache_key, meta->namespace, meta->namespace_len);
+ off = meta->namespace_len;
+
+ /* Separator */
+ meta->cache_key[off++] = ':';
+
+ /* Copy podname */
+ memcpy(meta->cache_key + off, meta->podname, meta->podname_len);
+ off += meta->podname_len;
+
+ if (meta->container_name) {
+ /* Separator */
+ meta->cache_key[off++] = ':';
+ memcpy(meta->cache_key + off, meta->container_name, meta->container_name_len);
+ off += meta->container_name_len;
+ }
+
+ if (ctx->cache_use_docker_id && meta->docker_id) {
+ /* Separator */
+ meta->cache_key[off++] = ':';
+ memcpy(meta->cache_key + off, meta->docker_id, meta->docker_id_len);
+ off += meta->docker_id_len;
+ }
+
+ meta->cache_key[off] = '\0';
+ meta->cache_key_len = off;
+ }
+ else {
+ meta->cache_key = NULL;
+ meta->cache_key_len = 0;
+ }
+
+ return 0;
+}
+
+/*
+ * Given a fixed meta data (namespace and podname), get API server information
+ * and merge buffers.
+ */
+static int get_and_merge_meta(struct flb_kube *ctx, struct flb_kube_meta *meta,
+ char **out_buf, size_t *out_size)
+{
+ int ret;
+ char *api_buf;
+ size_t api_size;
+
+ if (ctx->use_tag_for_meta) {
+ ret = merge_meta_from_tag(ctx, meta, out_buf, out_size);
+ return ret;
+ }
+ else if (ctx->use_kubelet) {
+ ret = get_pods_from_kubelet(ctx, meta->namespace, meta->podname,
+ &api_buf, &api_size);
+ }
+ else {
+ ret = get_api_server_info(ctx, meta->namespace, meta->podname,
+ &api_buf, &api_size);
+ }
+ if (ret == -1) {
+ return -1;
+ }
+
+ ret = merge_meta(meta, ctx,
+ api_buf, api_size,
+ out_buf, out_size);
+
+ if (api_buf != NULL) {
+ flb_free(api_buf);
+ }
+
+ return ret;
+}
+
+/*
+ * Work around kubernetes/kubernetes/issues/78479 by waiting
+ * for DNS to start up.
+ */
+static int wait_for_dns(struct flb_kube *ctx)
+{
+ int i;
+ struct addrinfo *res;
+ struct addrinfo hints;
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+
+ for (i = 0; i < ctx->dns_retries; i++) {
+ if (getaddrinfo(ctx->api_host, NULL, &hints, &res) == 0) {
+ freeaddrinfo(res);
+ return 0;
+ }
+ flb_plg_info(ctx->ins, "host: %s Wait %i secs until DNS starts up (%i/%i)",
+ ctx->api_host, ctx->dns_wait_time, i + 1, ctx->dns_retries);
+ sleep(ctx->dns_wait_time);
+ }
+ return -1;
+}
+
+static int flb_kube_network_init(struct flb_kube *ctx, struct flb_config *config)
+{
+ int io_type = FLB_IO_TCP;
+
+ ctx->upstream = NULL;
+
+ if (ctx->api_https == FLB_TRUE) {
+ if (!ctx->tls_ca_path && !ctx->tls_ca_file) {
+ ctx->tls_ca_file = flb_strdup(FLB_KUBE_CA);
+ }
+ ctx->tls = flb_tls_create(FLB_TLS_CLIENT_MODE,
+ ctx->tls_verify,
+ ctx->tls_debug,
+ ctx->tls_vhost,
+ ctx->tls_ca_path,
+ ctx->tls_ca_file,
+ NULL, NULL, NULL);
+ if (!ctx->tls) {
+ return -1;
+ }
+
+ io_type = FLB_IO_TLS;
+ }
+
+ /* Create an Upstream context */
+ ctx->upstream = flb_upstream_create(config,
+ ctx->api_host,
+ ctx->api_port,
+ io_type,
+ ctx->tls);
+ if (!ctx->upstream) {
+ /* note: if ctx->tls.context is set, it's destroyed upon context exit */
+ flb_plg_debug(ctx->ins, "kube network init create upstream failed");
+ return -1;
+ }
+
+ /* Remove async flag from upstream */
+ flb_stream_disable_async_mode(&ctx->upstream->base);
+
+ return 0;
+}
+
+/* Initialize local context */
+int flb_kube_meta_init(struct flb_kube *ctx, struct flb_config *config)
+{
+ int ret;
+ char *meta_buf;
+ size_t meta_size;
+
+ if (ctx->dummy_meta == FLB_TRUE) {
+ flb_plg_warn(ctx->ins, "using Dummy Metadata");
+ return 0;
+ }
+
+ if (ctx->use_tag_for_meta) {
+ flb_plg_info(ctx->ins, "no network access required (OK)");
+ return 0;
+ }
+
+ /* Init network */
+ flb_kube_network_init(ctx, config);
+
+ /* Gather local info */
+ ret = get_local_pod_info(ctx);
+ if (ret == FLB_TRUE && !ctx->use_tag_for_meta) {
+ flb_plg_info(ctx->ins, "local POD info OK");
+
+ ret = wait_for_dns(ctx);
+ if (ret == -1) {
+ flb_plg_warn(ctx->ins, "could not resolve %s", ctx->api_host);
+ return -1;
+ }
+
+ if (ctx->use_kubelet) {
+ /* Gather info from Kubelet */
+ flb_plg_info(ctx->ins, "testing connectivity with Kubelet...");
+ ret = get_pods_from_kubelet(ctx, ctx->namespace, ctx->podname,
+ &meta_buf, &meta_size);
+ }
+ else {
+ /* Gather info from API server */
+ flb_plg_info(ctx->ins, "testing connectivity with API server...");
+ ret = get_api_server_info(ctx, ctx->namespace, ctx->podname,
+ &meta_buf, &meta_size);
+ }
+ if (ret == -1) {
+ if (!ctx->podname) {
+ flb_plg_warn(ctx->ins, "could not get meta for local POD");
+ }
+ else {
+ flb_plg_warn(ctx->ins, "could not get meta for POD %s",
+ ctx->podname);
+ }
+ return -1;
+ }
+ flb_plg_info(ctx->ins, "connectivity OK");
+ flb_free(meta_buf);
+ }
+ else {
+ flb_plg_info(ctx->ins, "Fluent Bit not running in a POD");
+ }
+
+ return 0;
+}
+
+int flb_kube_dummy_meta_get(char **out_buf, size_t *out_size)
+{
+ int len;
+ time_t t;
+ char stime[32];
+ struct tm result;
+ msgpack_sbuffer mp_sbuf;
+ msgpack_packer mp_pck;
+
+ t = time(NULL);
+ localtime_r(&t, &result);
+#ifdef FLB_SYSTEM_WINDOWS
+ asctime_s(stime, sizeof(stime), &result);
+#else
+ asctime_r(&result, stime);
+#endif
+ len = strlen(stime) - 1;
+
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ msgpack_pack_map(&mp_pck, 1);
+ msgpack_pack_str(&mp_pck, 5 /* dummy */ );
+ msgpack_pack_str_body(&mp_pck, "dummy", 5);
+ msgpack_pack_str(&mp_pck, len);
+ msgpack_pack_str_body(&mp_pck, stime, len);
+
+ *out_buf = mp_sbuf.data;
+ *out_size = mp_sbuf.size;
+
+ return 0;
+}
+
+int flb_kube_meta_get(struct flb_kube *ctx,
+ const char *tag, int tag_len,
+ const char *data, size_t data_size,
+ const char **out_buf, size_t *out_size,
+ struct flb_kube_meta *meta,
+ struct flb_kube_props *props)
+{
+ int id;
+ int ret;
+ const char *hash_meta_buf;
+ char *tmp_hash_meta_buf;
+ size_t off = 0;
+ size_t hash_meta_size;
+ msgpack_unpacked result;
+
+ /* Get metadata from tag or record (cache key is the important one) */
+ ret = extract_meta(ctx, tag, tag_len, data, data_size, meta);
+ if (ret != 0) {
+ return -1;
+ }
+
+ /* Check if we have some data associated to the cache key */
+ ret = flb_hash_table_get(ctx->hash_table,
+ meta->cache_key, meta->cache_key_len,
+ (void *) &hash_meta_buf, &hash_meta_size);
+ if (ret == -1) {
+ /* Retrieve API server meta and merge with local meta */
+ ret = get_and_merge_meta(ctx, meta,
+ &tmp_hash_meta_buf, &hash_meta_size);
+ if (ret == -1) {
+ *out_buf = NULL;
+ *out_size = 0;
+ return 0;
+ }
+
+ id = flb_hash_table_add(ctx->hash_table,
+ meta->cache_key, meta->cache_key_len,
+ tmp_hash_meta_buf, hash_meta_size);
+ if (id >= 0) {
+ /*
+ * Release the original buffer created on extract_meta() as a new
+ * copy have been generated into the hash table, then re-set
+ * the outgoing buffer and size.
+ */
+ flb_free(tmp_hash_meta_buf);
+ flb_hash_table_get_by_id(ctx->hash_table, id, meta->cache_key,
+ &hash_meta_buf, &hash_meta_size);
+ }
+ }
+
+ /*
+ * The retrieved buffer may have two serialized items:
+ *
+ * [0] = kubernetes metadata (annotations, labels)
+ * [1] = Annotation properties
+ *
+ * note: annotation properties are optional.
+ */
+ msgpack_unpacked_init(&result);
+
+ /* Unpack to get the offset/bytes of the first item */
+ msgpack_unpack_next(&result, hash_meta_buf, hash_meta_size, &off);
+
+ /* Set the pointer and proper size for the caller */
+ *out_buf = hash_meta_buf;
+ *out_size = off;
+
+ /* A new unpack_next() call will succeed If annotation properties exists */
+ ret = msgpack_unpack_next(&result, hash_meta_buf, hash_meta_size, &off);
+ if (ret == MSGPACK_UNPACK_SUCCESS) {
+ /* Unpack the remaining data into properties structure */
+ flb_kube_prop_unpack(props,
+ hash_meta_buf + *out_size,
+ hash_meta_size - *out_size);
+ }
+ msgpack_unpacked_destroy(&result);
+
+ return 0;
+}
+
+int flb_kube_meta_release(struct flb_kube_meta *meta)
+{
+ int r = 0;
+
+ if (meta->namespace) {
+ flb_free(meta->namespace);
+ r++;
+ }
+
+ if (meta->podname) {
+ flb_free(meta->podname);
+ r++;
+ }
+
+ if (meta->container_name) {
+ flb_free(meta->container_name);
+ r++;
+ }
+
+ if (meta->docker_id) {
+ flb_free(meta->docker_id);
+ r++;
+ }
+
+ if (meta->container_hash) {
+ flb_free(meta->container_hash);
+ r++;
+ }
+
+ if (meta->container_image) {
+ flb_free(meta->container_image);
+ r++;
+ }
+
+ if (meta->cache_key) {
+ flb_free(meta->cache_key);
+ }
+
+ return r;
+}