diff options
Diffstat (limited to 'src/fluent-bit/plugins/in_kubernetes_events')
6 files changed, 0 insertions, 1465 deletions
diff --git a/src/fluent-bit/plugins/in_kubernetes_events/CMakeLists.txt b/src/fluent-bit/plugins/in_kubernetes_events/CMakeLists.txt deleted file mode 100644 index b860a55e3..000000000 --- a/src/fluent-bit/plugins/in_kubernetes_events/CMakeLists.txt +++ /dev/null @@ -1,5 +0,0 @@ -set(src - kubernetes_events_conf.c - kubernetes_events.c) - -FLB_PLUGIN(in_kubernetes_events "${src}" "") diff --git a/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events.c b/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events.c deleted file mode 100644 index 97719fba6..000000000 --- a/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events.c +++ /dev/null @@ -1,921 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2023 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -#include <sys/types.h> -#include <sys/stat.h> - -#include <fluent-bit/flb_input_plugin.h> -#include <fluent-bit/flb_network.h> -#include <fluent-bit/flb_pack.h> -#include <fluent-bit/flb_utils.h> -#include <fluent-bit/flb_error.h> -#include <fluent-bit/flb_compat.h> -#include <fluent-bit/flb_ra_key.h> -#include <fluent-bit/flb_time.h> -#include <fluent-bit/flb_strptime.h> -#include <fluent-bit/flb_parser.h> -#include <fluent-bit/flb_log_event_encoder.h> -#include <fluent-bit/flb_compat.h> - -#include "kubernetes_events.h" -#include "kubernetes_events_conf.h" - -#ifdef FLB_HAVE_SQLDB -#include "kubernetes_events_sql.h" -static int k8s_events_sql_insert_event(struct k8s_events *ctx, msgpack_object *item); -#endif - -static int file_to_buffer(const char *path, - char **out_buf, size_t *out_size) -{ - int ret; - int len; - char *buf; - ssize_t bytes; - FILE *fp; - struct stat st; - - if (!(fp = fopen(path, "r"))) { - return -1; - } - - ret = stat(path, &st); - if (ret == -1) { - flb_errno(); - fclose(fp); - return -1; - } - - buf = flb_calloc(1, (st.st_size + 1)); - if (!buf) { - flb_errno(); - fclose(fp); - return -1; - } - - bytes = fread(buf, st.st_size, 1, fp); - if (bytes < 1) { - flb_free(buf); - fclose(fp); - return -1; - } - - fclose(fp); - - // trim new lines - for (len = st.st_size; len > 0; len--) { - if (buf[len-1] != '\n' && buf[len-1] != '\r') { - break; - } - } - buf[len] = '\0'; - - *out_buf = buf; - *out_size = len; - - return 0; -} - -/* Set K8s Authorization Token and get HTTP Auth Header */ -static int get_http_auth_header(struct k8s_events *ctx) -{ - int ret; - char *temp; - char *tk = NULL; - size_t tk_size = 0; - - if (!ctx->token_file || strlen(ctx->token_file) == 0) { - return 0; - } - - ret = file_to_buffer(ctx->token_file, &tk, &tk_size); - if (ret == -1) { - flb_plg_warn(ctx->ins, "cannot open %s", ctx->token_file); - return -1; - } - ctx->token_created = time(NULL); - - /* Token */ - if (ctx->token != NULL) { - flb_free(ctx->token); - } - ctx->token = tk; - ctx->token_len = tk_size; - - /* HTTP Auth Header */ - if (ctx->auth == NULL) { - ctx->auth = flb_malloc(tk_size + 32); - } - else if (ctx->auth_len < tk_size + 32) { - temp = flb_realloc(ctx->auth, tk_size + 32); - if (temp == NULL) { - flb_errno(); - flb_free(ctx->auth); - ctx->auth = NULL; - return -1; - } - ctx->auth = temp; - } - - if (!ctx->auth) { - return -1; - } - - ctx->auth_len = snprintf(ctx->auth, tk_size + 32, "Bearer %s", tk); - return 0; -} - -/* Refresh HTTP Auth Header if K8s Authorization Token is expired */ -static int refresh_token_if_needed(struct k8s_events *ctx) -{ - int expired = FLB_FALSE; - int ret; - - if (!ctx->token_file || strlen(ctx->token_file) == 0) { - return 0; - } - - if (ctx->token_created > 0) { - if (time(NULL) > ctx->token_created + ctx->token_ttl) { - expired = FLB_TRUE; - } - } - - if (expired || ctx->token_created == 0) { - ret = get_http_auth_header(ctx); - if (ret == -1) { - return -1; - } - } - - return 0; -} -static int timestamp_lookup(struct k8s_events *ctx, char *ts, struct flb_time *time) -{ - struct flb_tm tm = { 0 }; - - if (flb_strptime(ts, "%Y-%m-%dT%H:%M:%SZ", &tm) == NULL) { - return -1; - } - - time->tm.tv_sec = flb_parser_tm2time(&tm); - time->tm.tv_nsec = 0; - - return 0; -} - -static msgpack_object *record_get_field_ptr(msgpack_object *obj, const char *fieldname) -{ - int i; - msgpack_object *k; - msgpack_object *v; - - if (obj->type != MSGPACK_OBJECT_MAP) { - return NULL; - } - - for (i = 0; i < obj->via.map.size; i++) { - k = &obj->via.map.ptr[i].key; - if (k->type != MSGPACK_OBJECT_STR) { - continue; - } - - if (strncmp(k->via.str.ptr, fieldname, strlen(fieldname)) == 0) { - v = &obj->via.map.ptr[i].val; - return v; - } - } - return NULL; -} - -static int record_get_field_sds(msgpack_object *obj, const char *fieldname, flb_sds_t *val) -{ - msgpack_object *v; - - v = record_get_field_ptr(obj, fieldname); - if (v == NULL) { - return 0; - } - if (v->type != MSGPACK_OBJECT_STR) { - return -1; - } - - *val = flb_sds_create_len(v->via.str.ptr, v->via.str.size); - return 0; -} - -static int record_get_field_time(msgpack_object *obj, const char *fieldname, time_t *val) -{ - msgpack_object *v; - struct flb_tm tm = { 0 }; - - v = record_get_field_ptr(obj, fieldname); - if (v == NULL) { - return -1; - } - if (v->type != MSGPACK_OBJECT_STR) { - return -1; - } - - if (flb_strptime(v->via.str.ptr, "%Y-%m-%dT%H:%M:%SZ", &tm) == NULL) { - return -2; - } - - *val = mktime(&tm.tm); - return 0; -} - -static int record_get_field_uint64(msgpack_object *obj, const char *fieldname, uint64_t *val) -{ - msgpack_object *v; - char *end; - - v = record_get_field_ptr(obj, fieldname); - if (v == NULL) { - return -1; - } - - // attempt to parse string as number... - if (v->type == MSGPACK_OBJECT_STR) { - *val = strtoul(v->via.str.ptr, &end, 10); - if (end == NULL || (end < v->via.str.ptr + v->via.str.size)) { - return -1; - } - return 0; - } - if (v->type == MSGPACK_OBJECT_POSITIVE_INTEGER) { - *val = v->via.u64; - return 0; - } - if (v->type == MSGPACK_OBJECT_NEGATIVE_INTEGER) { - *val = (uint64_t)v->via.i64; - return 0; - } - return -1; -} - -static int item_get_timestamp(msgpack_object *obj, time_t *event_time) -{ - int ret; - msgpack_object *metadata; - - // some events can have lastTimestamp and firstTimestamp set to - // NULL while having metadata.creationTimestamp set. - ret = record_get_field_time(obj, "lastTimestamp", event_time); - if (ret != -1) { - return FLB_TRUE; - } - - ret = record_get_field_time(obj, "firstTimestamp", event_time); - if (ret != -1) { - return FLB_TRUE; - } - - metadata = record_get_field_ptr(obj, "metadata"); - if (metadata == NULL) { - return FLB_FALSE; - } - - ret = record_get_field_time(metadata, "creationTimestamp", event_time); - if (ret != -1) { - return FLB_TRUE; - } - - return FLB_FALSE; -} - -static bool check_event_is_filtered(struct k8s_events *ctx, msgpack_object *obj) -{ - int ret; - time_t event_time; - time_t now; - msgpack_object *metadata; - flb_sds_t uid; - uint64_t resource_version; - - ret = item_get_timestamp(obj, &event_time); - if (ret == -FLB_FALSE) { - flb_plg_error(ctx->ins, "Cannot get timestamp for item in response"); - return FLB_FALSE; - } - - now = (time_t)(cfl_time_now() / 1000000000); - if (event_time < (now - ctx->retention_time)) { - flb_plg_debug(ctx->ins, "Item is older than retention_time: %ld < %ld", - event_time, (now - ctx->retention_time)); - return FLB_TRUE; - } - - metadata = record_get_field_ptr(obj, "metadata"); - if (metadata == NULL) { - flb_plg_error(ctx->ins, "Cannot unpack item metadata in response"); - return FLB_FALSE; - } - - ret = record_get_field_uint64(metadata, "resourceVersion", &resource_version); - if (ret == -1) { - flb_plg_error(ctx->ins, "Cannot get resourceVersion for item in response"); - return FLB_FALSE; - } - - ret = record_get_field_sds(metadata, "uid", &uid); - if (ret == -1) { - flb_plg_error(ctx->ins, "Cannot get resourceVersion for item in response"); - return FLB_FALSE; - } - - -#ifdef FLB_HAVE_SQLDB - bool exists; - - - if (ctx->db) { - sqlite3_bind_text(ctx->stmt_get_kubernetes_event_exists_by_uid, - 1, uid, -1, NULL); - ret = sqlite3_step(ctx->stmt_get_kubernetes_event_exists_by_uid); - if (ret != SQLITE_ROW) { - if (ret != SQLITE_DONE) { - flb_plg_error(ctx->ins, "cannot execute kubernetes event exists"); - } - sqlite3_clear_bindings(ctx->stmt_get_kubernetes_event_exists_by_uid); - sqlite3_reset(ctx->stmt_get_kubernetes_event_exists_by_uid); - flb_sds_destroy(uid); - return FLB_FALSE; - } - - exists = sqlite3_column_int64(ctx->stmt_get_kubernetes_event_exists_by_uid, 0); - - flb_plg_debug(ctx->ins, "is_filtered: uid=%s exists=%d", uid, exists); - sqlite3_clear_bindings(ctx->stmt_get_kubernetes_event_exists_by_uid); - sqlite3_reset(ctx->stmt_get_kubernetes_event_exists_by_uid); - flb_sds_destroy(uid); - - return exists > 0 ? FLB_TRUE : FLB_FALSE; - } -#endif - - // check if this is an old event. - if (ctx->last_resource_version && resource_version <= ctx->last_resource_version) { - flb_plg_debug(ctx->ins, "skipping old object: %lu (< %lu)", resource_version, - ctx->last_resource_version); - flb_sds_destroy(uid); - return FLB_TRUE; - } - - flb_sds_destroy(uid); - return FLB_FALSE; -} - -static int process_events(struct k8s_events *ctx, char *in_data, size_t in_size, uint64_t *max_resource_version, flb_sds_t *continue_token) -{ - int i; - int ret = -1; - int root_type; - size_t consumed = 0; - char *buf_data; - size_t buf_size; - size_t off = 0; - struct flb_time ts; - struct flb_ra_value *rval; - uint64_t resource_version; - msgpack_unpacked result; - msgpack_object root; - msgpack_object k; - msgpack_object *items = NULL; - msgpack_object *item = NULL; - msgpack_object *item_metadata = NULL; - msgpack_object *metadata = NULL; - - - ret = flb_pack_json(in_data, in_size, &buf_data, &buf_size, &root_type, &consumed); - if (ret == -1) { - flb_plg_error(ctx->ins, "could not process payload, incomplete or bad formed JSON"); - goto json_error; - } - - /* unpack */ - msgpack_unpacked_init(&result); - ret = msgpack_unpack_next(&result, buf_data, buf_size, &off); - if (ret != MSGPACK_UNPACK_SUCCESS) { - flb_plg_error(ctx->ins, "Cannot unpack response"); - goto unpack_error; - } - - /* lookup the items array */ - root = result.data; - if (root.type != MSGPACK_OBJECT_MAP) { - return -1; - } - - // Traverse the EventList for the metadata (for the continue token) and the items. - // https://kubernetes.io/docs/reference/kubernetes-api/cluster-resources/event-v1/#EventList - for (i = 0; i < root.via.map.size; i++) { - k = root.via.map.ptr[i].key; - if (k.type != MSGPACK_OBJECT_STR) { - continue; - } - - if (strncmp(k.via.str.ptr, "items", 5) == 0) { - items = &root.via.map.ptr[i].val; - if (items->type != MSGPACK_OBJECT_ARRAY) { - flb_plg_error(ctx->ins, "Cannot unpack items"); - goto msg_error; - } - } - - if (strncmp(k.via.str.ptr, "metadata", 8) == 0) { - metadata = &root.via.map.ptr[i].val; - if (metadata->type != MSGPACK_OBJECT_MAP) { - flb_plg_error(ctx->ins, "Cannot unpack metadata"); - goto msg_error; - } - } - } - - if (items == NULL) { - flb_plg_error(ctx->ins, "Cannot find items in response"); - goto msg_error; - } - - if (metadata == NULL) { - flb_plg_error(ctx->ins, "Cannot find metatada in response"); - goto msg_error; - } - - ret = record_get_field_sds(metadata, "continue", continue_token); - if (ret == -1) { - if (ret == -1) { - flb_plg_error(ctx->ins, "Cannot process continue token"); - goto msg_error; - } - } - - for (i = 0; i < items->via.array.size; i++) { - if (items->via.array.ptr[i].type != MSGPACK_OBJECT_MAP) { - flb_plg_warn(ctx->ins, "Event that is not a map"); - continue; - } - item_metadata = record_get_field_ptr(&items->via.array.ptr[i], "metadata"); - if (item_metadata == NULL) { - flb_plg_warn(ctx->ins, "Event without metadata"); - continue; - } - ret = record_get_field_uint64(item_metadata, - "resourceVersion", &resource_version); - if (ret == -1) { - continue; - } - if (resource_version > *max_resource_version) { - *max_resource_version = resource_version; - } - } - - /* reset the log encoder */ - flb_log_event_encoder_reset(ctx->encoder); - - /* print every item from the items array */ - for (i = 0; i < items->via.array.size; i++) { - item = &items->via.array.ptr[i]; - if (item->type != MSGPACK_OBJECT_MAP) { - flb_plg_error(ctx->ins, "Cannot unpack item in response"); - goto msg_error; - } - - if (check_event_is_filtered(ctx, item) == FLB_TRUE) { - continue; - } - -#ifdef FLB_HAVE_SQLDB - if (ctx->db) { - k8s_events_sql_insert_event(ctx, item); - } -#endif - - /* get event timestamp */ - rval = flb_ra_get_value_object(ctx->ra_timestamp, *item); - if (!rval || rval->type != FLB_RA_STRING) { - flb_plg_error(ctx->ins, "cannot retrieve event timestamp"); - goto msg_error; - } - - /* convert timestamp */ - ret = timestamp_lookup(ctx, rval->val.string, &ts); - if (ret == -1) { - flb_plg_error(ctx->ins, "cannot lookup event timestamp"); - flb_ra_key_value_destroy(rval); - goto msg_error; - } - - /* encode content as a log event */ - flb_log_event_encoder_begin_record(ctx->encoder); - flb_log_event_encoder_set_timestamp(ctx->encoder, &ts); - - ret = flb_log_event_encoder_set_body_from_msgpack_object(ctx->encoder, item); - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_commit_record(ctx->encoder); - } else { - flb_plg_warn(ctx->ins, "unable to encode: %lu", resource_version); - } - flb_ra_key_value_destroy(rval); - } - - if (ctx->encoder->output_length > 0) { - flb_input_log_append(ctx->ins, NULL, 0, - ctx->encoder->output_buffer, - ctx->encoder->output_length); - } - -msg_error: - msgpack_unpacked_destroy(&result); -unpack_error: - flb_free(buf_data); -json_error: - return ret; -} - -static struct flb_http_client *make_event_api_request(struct k8s_events *ctx, - struct flb_connection *u_conn, - flb_sds_t continue_token) -{ - flb_sds_t url; - struct flb_http_client *c; - - - if (continue_token == NULL && ctx->limit_request == 0 && ctx->namespace == NULL) { - return flb_http_client(u_conn, FLB_HTTP_GET, K8S_EVENTS_KUBE_API_URI, - NULL, 0, ctx->api_host, ctx->api_port, NULL, 0); - } - - if (ctx->namespace == NULL) { - url = flb_sds_create(K8S_EVENTS_KUBE_API_URI); - } else { - url = flb_sds_create_size(strlen(K8S_EVENTS_KUBE_NAMESPACE_API_URI) + - strlen(ctx->namespace)); - flb_sds_printf(&url, K8S_EVENTS_KUBE_NAMESPACE_API_URI, ctx->namespace); - } - - flb_sds_cat_safe(&url, "?", 1); - if (ctx->limit_request) { - if (continue_token != NULL) { - flb_sds_printf(&url, "continue=%s&", continue_token); - } - flb_sds_printf(&url, "limit=%d", ctx->limit_request); - } - c = flb_http_client(u_conn, FLB_HTTP_GET, url, - NULL, 0, ctx->api_host, ctx->api_port, NULL, 0); - flb_sds_destroy(url); - return c; -} - -#ifdef FLB_HAVE_SQLDB - -static int k8s_events_cleanup_db(struct flb_input_instance *ins, - struct flb_config *config, void *in_context) -{ - int ret; - struct k8s_events *ctx = (struct k8s_events *)in_context; - time_t retention_time_ago; - time_t now = (cfl_time_now() / 1000000000); - - if (ctx->db == NULL) { - FLB_INPUT_RETURN(0); - } - - retention_time_ago = now - (ctx->retention_time); - sqlite3_bind_int64(ctx->stmt_delete_old_kubernetes_events, - 1, (int64_t)retention_time_ago); - ret = sqlite3_step(ctx->stmt_delete_old_kubernetes_events); - if (ret != SQLITE_ROW && ret != SQLITE_DONE) { - flb_plg_error(ctx->ins, "cannot execute delete old kubernetes events"); - } - - sqlite3_clear_bindings(ctx->stmt_delete_old_kubernetes_events); - sqlite3_reset(ctx->stmt_delete_old_kubernetes_events); - - FLB_INPUT_RETURN(0); -} - -static int k8s_events_sql_insert_event(struct k8s_events *ctx, msgpack_object *item) -{ - int ret; - uint64_t resource_version; - time_t last; - msgpack_object *meta; - flb_sds_t uid; - - - meta = record_get_field_ptr(item, "meta"); - if (meta == NULL) { - flb_plg_error(ctx->ins, "unable to find metadata to save event"); - return -1; - } - - ret = record_get_field_uint64(meta, "resourceVersion", &resource_version); - if (ret == -1) { - flb_plg_error(ctx->ins, "unable to find resourceVersion in metadata to save event"); - return -1; - } - - ret = record_get_field_sds(meta, "uid", &uid); - if (ret == -1) { - flb_plg_error(ctx->ins, "unable to find uid in metadata to save event"); - return -1; - } - - ret = item_get_timestamp(item, &last); - if (ret == -FLB_FALSE) { - flb_plg_error(ctx->ins, "Cannot get timestamp for item to save it"); - return -1; - } - - if (ret == -2) { - flb_plg_error(ctx->ins, "unable to parse lastTimestamp in item to save event"); - flb_sds_destroy(uid); - return -1; - } - - /* Bind parameters */ - sqlite3_bind_text(ctx->stmt_insert_kubernetes_event, 1, uid, -1, 0); - sqlite3_bind_int64(ctx->stmt_insert_kubernetes_event, 2, resource_version); - sqlite3_bind_int64(ctx->stmt_insert_kubernetes_event, 3, (int64_t)last); - - /* Run the insert */ - ret = sqlite3_step(ctx->stmt_insert_kubernetes_event); - if (ret != SQLITE_DONE) { - sqlite3_clear_bindings(ctx->stmt_insert_kubernetes_event); - sqlite3_reset(ctx->stmt_insert_kubernetes_event); - flb_plg_error(ctx->ins, "cannot execute insert kubernetes event %s inode=%lu", - uid, resource_version); - flb_sds_destroy(uid); - return -1; - } - - flb_plg_debug(ctx->ins, - "inserted k8s event: uid=%s, resource_version=%lu, last=%ld", - uid, resource_version, last); - sqlite3_clear_bindings(ctx->stmt_insert_kubernetes_event); - sqlite3_reset(ctx->stmt_insert_kubernetes_event); - - flb_sds_destroy(uid); - return flb_sqldb_last_id(ctx->db); -} - -#endif - -static int k8s_events_collect(struct flb_input_instance *ins, - struct flb_config *config, void *in_context) -{ - int ret; - size_t b_sent; - struct flb_connection *u_conn = NULL; - struct flb_http_client *c = NULL; - struct k8s_events *ctx = in_context; - flb_sds_t continue_token = NULL; - uint64_t max_resource_version = 0; - - if (pthread_mutex_trylock(&ctx->lock) != 0) { - FLB_INPUT_RETURN(0); - } - - u_conn = flb_upstream_conn_get(ctx->upstream); - if (!u_conn) { - flb_plg_error(ins, "upstream connection initialization error"); - goto exit; - } - - ret = refresh_token_if_needed(ctx); - if (ret == -1) { - flb_plg_error(ctx->ins, "failed to refresh token"); - goto exit; - } - - do { - c = make_event_api_request(ctx, u_conn, continue_token); - if (continue_token != NULL) { - flb_sds_destroy(continue_token); - continue_token = NULL; - } - if (!c) { - flb_plg_error(ins, "unable to create http client"); - goto exit; - } - flb_http_buffer_size(c, 0); - - flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); - if (ctx->auth_len > 0) { - flb_http_add_header(c, "Authorization", 13, ctx->auth, ctx->auth_len); - } - - ret = flb_http_do(c, &b_sent); - if (ret != 0) { - flb_plg_error(ins, "http do error"); - goto exit; - } - - if (c->resp.status == 200) { - ret = process_events(ctx, c->resp.payload, c->resp.payload_size, &max_resource_version, &continue_token); - } - else { - if (c->resp.payload_size > 0) { - flb_plg_error(ctx->ins, "http_status=%i:\n%s", c->resp.status, c->resp.payload); - } - else { - flb_plg_error(ctx->ins, "http_status=%i", c->resp.status); - } - } - flb_http_client_destroy(c); - c = NULL; - } while(continue_token != NULL); - - if (max_resource_version > ctx->last_resource_version) { - flb_plg_debug(ctx->ins, "set last resourceVersion=%lu", max_resource_version); - ctx->last_resource_version = max_resource_version; - } - -exit: - pthread_mutex_unlock(&ctx->lock); - if (c) { - flb_http_client_destroy(c); - } - if (u_conn) { - flb_upstream_conn_release(u_conn); - } - FLB_INPUT_RETURN(0); -} - -static int k8s_events_init(struct flb_input_instance *ins, - struct flb_config *config, void *data) -{ - struct k8s_events *ctx = NULL; - - ctx = k8s_events_conf_create(ins); - if (!ctx) { - return -1; - } - - ctx->coll_id = flb_input_set_collector_time(ins, - k8s_events_collect, - ctx->interval_sec, - ctx->interval_nsec, - config); - -#ifdef FLB_HAVE_SQLDB - if (ctx->db) { - ctx->coll_cleanup_id = flb_input_set_collector_time(ins, - k8s_events_cleanup_db, - ctx->interval_sec, - ctx->interval_nsec, - config); - } -#endif - - return 0; -} - -static int k8s_events_exit(void *data, struct flb_config *config) -{ - struct k8s_events *ctx = data; - - if (!ctx) { - return 0; - } - - k8s_events_conf_destroy(ctx); - return 0; -} - -/* Configuration properties map */ -static struct flb_config_map config_map[] = { - /* Full Kubernetes API server URL */ - { - FLB_CONFIG_MAP_STR, "kube_url", "https://kubernetes.default.svc", - 0, FLB_FALSE, 0, - "Kubernetes API server URL" - }, - - /* Refresh interval */ - { - FLB_CONFIG_MAP_INT, "interval_sec", DEFAULT_INTERVAL_SEC, - 0, FLB_TRUE, offsetof(struct k8s_events, interval_sec), - "Set the polling interval for each channel" - }, - { - FLB_CONFIG_MAP_INT, "interval_nsec", DEFAULT_INTERVAL_NSEC, - 0, FLB_TRUE, offsetof(struct k8s_events, interval_nsec), - "Set the polling interval for each channel (sub seconds)" - }, - - /* TLS: set debug 'level' */ - { - FLB_CONFIG_MAP_INT, "tls.debug", "0", - 0, FLB_TRUE, offsetof(struct k8s_events, tls_debug), - "set TLS debug level: 0 (no debug), 1 (error), " - "2 (state change), 3 (info) and 4 (verbose)" - }, - - /* TLS: enable verification */ - { - FLB_CONFIG_MAP_BOOL, "tls.verify", "true", - 0, FLB_TRUE, offsetof(struct k8s_events, tls_verify), - "enable or disable verification of TLS peer certificate" - }, - - /* TLS: set tls.vhost feature */ - { - FLB_CONFIG_MAP_STR, "tls.vhost", NULL, - 0, FLB_TRUE, offsetof(struct k8s_events, tls_vhost), - "set optional TLS virtual host" - }, - - /* Kubernetes TLS: CA file */ - { - FLB_CONFIG_MAP_STR, "kube_ca_file", K8S_EVENTS_KUBE_CA, - 0, FLB_TRUE, offsetof(struct k8s_events, tls_ca_file), - "Kubernetes TLS CA file" - }, - - /* Kubernetes TLS: CA certs path */ - { - FLB_CONFIG_MAP_STR, "kube_ca_path", NULL, - 0, FLB_TRUE, offsetof(struct k8s_events, tls_ca_path), - "Kubernetes TLS ca path" - }, - - /* Kubernetes Token file */ - { - FLB_CONFIG_MAP_STR, "kube_token_file", K8S_EVENTS_KUBE_TOKEN, - 0, FLB_TRUE, offsetof(struct k8s_events, token_file), - "Kubernetes authorization token file" - }, - - /* Kubernetes Token file TTL */ - { - FLB_CONFIG_MAP_TIME, "kube_token_ttl", "10m", - 0, FLB_TRUE, offsetof(struct k8s_events, token_ttl), - "kubernetes token ttl, until it is reread from the token file. Default: 10m" - }, - - { - FLB_CONFIG_MAP_INT, "kube_request_limit", "0", - 0, FLB_TRUE, offsetof(struct k8s_events, limit_request), - "kubernetes limit parameter for events query, no limit applied when set to 0" - }, - - { - FLB_CONFIG_MAP_TIME, "kube_retention_time", "1h", - 0, FLB_TRUE, offsetof(struct k8s_events, retention_time), - "kubernetes retention time for events. Default: 1h" - }, - - { - FLB_CONFIG_MAP_STR, "kube_namespace", NULL, - 0, FLB_TRUE, offsetof(struct k8s_events, namespace), - "kubernetes namespace to get events from, gets event from all namespaces by default." - }, - -#ifdef FLB_HAVE_SQLDB - { - FLB_CONFIG_MAP_STR, "db", NULL, - 0, FLB_FALSE, 0, - "set a database file to keep track of recorded kubernetes events." - }, - { - FLB_CONFIG_MAP_STR, "db.sync", "normal", - 0, FLB_FALSE, 0, - "set a database sync method. values: extra, full, normal and off." - }, -#endif - - /* EOF */ - {0} -}; - -/* Plugin reference */ -struct flb_input_plugin in_kubernetes_events_plugin = { - .name = "kubernetes_events", - .description = "Kubernetes Events", - .cb_init = k8s_events_init, - .cb_pre_run = NULL, - .cb_collect = k8s_events_collect, - .cb_flush_buf = NULL, - .cb_exit = k8s_events_exit, - .config_map = config_map, - .flags = FLB_INPUT_NET | FLB_INPUT_CORO | FLB_INPUT_THREADED -}; diff --git a/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events.h b/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events.h deleted file mode 100644 index 3afd48570..000000000 --- a/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events.h +++ /dev/null @@ -1,106 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2023 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_IN_KUBERNETES_EVENTS_H -#define FLB_IN_KUBERNETES_EVENTS_H - -#include <fluent-bit/flb_input_plugin.h> -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_upstream.h> -#include <fluent-bit/flb_record_accessor.h> -#include <fluent-bit/flb_sqldb.h> - -#define DEFAULT_INTERVAL_SEC "0" -#define DEFAULT_INTERVAL_NSEC "500000000" - -/* Filter context */ -struct k8s_events { - int coll_id; - int coll_cleanup_id; - int interval_sec; /* interval collection time (Second) */ - int interval_nsec; /* interval collection time (Nanosecond) */ - int retention_time; /* retention time limit, default 1 hour */ - - /* Configuration parameters */ - char *api_host; - int api_port; - int api_https; - int tls_debug; - int tls_verify; - int kube_token_ttl; - flb_sds_t namespace; - - /* API Server end point */ - char kube_url[1024]; - - /* TLS CA certificate file */ - char *tls_ca_path; - char *tls_ca_file; - - /* TLS virtual host (optional), set by configmap */ - flb_sds_t tls_vhost; - - /* Kubernetes Token from FLB_KUBE_TOKEN file */ - char *token_file; - char *token; - int token_ttl; - size_t token_len; - int token_created; - - /* Pre-formatted HTTP Authorization header value */ - char *auth; - size_t auth_len; - - int dns_retries; - int dns_wait_time; - - struct flb_tls *tls; - - struct flb_log_event_encoder *encoder; - - /* record accessor */ - struct flb_record_accessor *ra_timestamp; - struct flb_record_accessor *ra_resource_version; - - /* others */ - struct flb_config *config; - struct flb_upstream *upstream; - struct flb_input_instance *ins; - - /* limit for event queries */ - int limit_request; - /* last highest seen resource_version */ - uint64_t last_resource_version; - -#ifdef FLB_HAVE_SQLDB - /* State database */ - struct flb_sqldb *db; - int db_sync; - int db_locking; - flb_sds_t db_journal_mode; - sqlite3_stmt *stmt_get_kubernetes_event_exists_by_uid; - sqlite3_stmt *stmt_insert_kubernetes_event; - sqlite3_stmt *stmt_delete_old_kubernetes_events; -#endif - - /* concurrency lock */ - pthread_mutex_t lock; -}; - -#endif
\ No newline at end of file diff --git a/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events_conf.c b/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events_conf.c deleted file mode 100644 index 4f67d8cfc..000000000 --- a/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events_conf.c +++ /dev/null @@ -1,326 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2023 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "kubernetes_events_conf.h" - -#ifdef FLB_HAVE_SQLDB -#include "kubernetes_events_sql.h" - - -/* Open or create database required by tail plugin */ -static struct flb_sqldb *flb_kubernetes_event_db_open(const char *path, - struct flb_input_instance *in, - struct k8s_events *ctx, - struct flb_config *config) -{ - int ret; - char tmp[64]; - struct flb_sqldb *db; - - /* Open/create the database */ - db = flb_sqldb_open(path, in->name, config); - if (!db) { - return NULL; - } - - /* Create table schema if it don't exists */ - ret = flb_sqldb_query(db, SQL_CREATE_KUBERNETES_EVENTS, NULL, NULL); - if (ret != FLB_OK) { - flb_plg_error(ctx->ins, "db: could not create 'in_kubernetes_events' table"); - flb_sqldb_close(db); - return NULL; - } - - if (ctx->db_sync >= 0) { - snprintf(tmp, sizeof(tmp) - 1, SQL_PRAGMA_SYNC, - ctx->db_sync); - ret = flb_sqldb_query(db, tmp, NULL, NULL); - if (ret != FLB_OK) { - flb_plg_error(ctx->ins, "db could not set pragma 'sync'"); - flb_sqldb_close(db); - return NULL; - } - } - - if (ctx->db_locking == FLB_TRUE) { - ret = flb_sqldb_query(db, SQL_PRAGMA_LOCKING_MODE, NULL, NULL); - if (ret != FLB_OK) { - flb_plg_error(ctx->ins, "db: could not set pragma 'locking_mode'"); - flb_sqldb_close(db); - return NULL; - } - } - - if (ctx->db_journal_mode) { - snprintf(tmp, sizeof(tmp) - 1, SQL_PRAGMA_JOURNAL_MODE, - ctx->db_journal_mode); - ret = flb_sqldb_query(db, tmp, NULL, NULL); - if (ret != FLB_OK) { - flb_plg_error(ctx->ins, "db could not set pragma 'journal_mode'"); - flb_sqldb_close(db); - return NULL; - } - } - - return db; -} - -static int flb_kubernetes_event_db_close(struct flb_sqldb *db) -{ - flb_sqldb_close(db); - return 0; -} - -#endif - -static int network_init(struct k8s_events *ctx, struct flb_config *config) -{ - int io_type = FLB_IO_TCP; - - ctx->upstream = NULL; - - if (ctx->api_https == FLB_TRUE) { - if (!ctx->tls_ca_path && !ctx->tls_ca_file) { - ctx->tls_ca_file = flb_strdup(K8S_EVENTS_KUBE_CA); - } - - /* create a custom TLS context since we use user-defined certs */ - ctx->tls = flb_tls_create(FLB_TLS_CLIENT_MODE, - ctx->tls_verify, - ctx->tls_debug, - ctx->tls_vhost, - ctx->tls_ca_path, - ctx->tls_ca_file, - NULL, NULL, NULL); - if (!ctx->tls) { - return -1; - } - - io_type = FLB_IO_TLS; - } - - /* Create an Upstream context */ - ctx->upstream = flb_upstream_create(config, - ctx->api_host, - ctx->api_port, - io_type, - ctx->tls); - if (!ctx->upstream) { - flb_plg_error(ctx->ins, "network initialization failed"); - return -1; - } - - return 0; -} - -struct k8s_events *k8s_events_conf_create(struct flb_input_instance *ins) -{ - int off; - int ret; - const char *p; - const char *url; - const char *tmp; - struct k8s_events *ctx = NULL; - pthread_mutexattr_t attr; - - ctx = flb_calloc(1, sizeof(struct k8s_events)); - if (!ctx) { - flb_errno(); - return NULL; - } - ctx->ins = ins; - - pthread_mutexattr_init(&attr); - pthread_mutex_init(&ctx->lock, &attr); - - /* Load the config map */ - ret = flb_input_config_map_set(ins, (void *) ctx); - if (ret == -1) { - flb_free(ctx); - return NULL; - } - flb_input_set_context(ins, ctx); - - ctx->encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_DEFAULT); - if (!ctx->encoder) { - flb_plg_error(ins, "could not initialize event encoder"); - k8s_events_conf_destroy(ctx); - return NULL; - } - - /* Record accessor pattern */ - ctx->ra_timestamp = flb_ra_create(K8S_EVENTS_RA_TIMESTAMP, FLB_TRUE); - if (!ctx->ra_timestamp) { - flb_plg_error(ctx->ins, - "could not create record accessor for metadata items"); - k8s_events_conf_destroy(ctx); - return NULL; - } - - ctx->ra_resource_version = flb_ra_create(K8S_EVENTS_RA_RESOURCE_VERSION, FLB_TRUE); - if (!ctx->ra_resource_version) { - flb_plg_error(ctx->ins, "could not create record accessor for resource version"); - k8s_events_conf_destroy(ctx); - return NULL; - } - - /* Get Kubernetes API server */ - url = flb_input_get_property("kube_url", ins); - if (!url) { - ctx->api_host = flb_strdup(K8S_EVENTS_KUBE_API_HOST); - ctx->api_port = K8S_EVENTS_KUBE_API_PORT; - ctx->api_https = FLB_TRUE; - } - else { - tmp = url; - - /* Check the protocol */ - if (strncmp(tmp, "http://", 7) == 0) { - off = 7; - ctx->api_https = FLB_FALSE; - } - else if (strncmp(tmp, "https://", 8) == 0) { - off = 8; - ctx->api_https = FLB_TRUE; - } - else { - k8s_events_conf_destroy(ctx); - return NULL; - } - - /* Get hostname and TCP port */ - p = url + off; - tmp = strchr(p, ':'); - if (tmp) { - ctx->api_host = flb_strndup(p, tmp - p); - tmp++; - ctx->api_port = atoi(tmp); - } - else { - ctx->api_host = flb_strdup(p); - ctx->api_port = K8S_EVENTS_KUBE_API_PORT; - } - } - snprintf(ctx->kube_url, sizeof(ctx->kube_url) - 1, - "%s://%s:%i", - ctx->api_https ? "https" : "http", - ctx->api_host, ctx->api_port); - - flb_plg_info(ctx->ins, "API server: %s", ctx->kube_url); - - /* network setup */ - ret = network_init(ctx, ins->config); - if (ret == -1) { - k8s_events_conf_destroy(ctx); - return NULL; - } - -#ifdef FLB_HAVE_SQLDB - /* Initialize database */ - tmp = flb_input_get_property("db", ins); - if (tmp) { - ctx->db = flb_kubernetes_event_db_open(tmp, ins, ctx, ins->config); - if (!ctx->db) { - flb_plg_error(ctx->ins, "could not open/create database"); - k8s_events_conf_destroy(ctx); - return NULL; - } - } - - if (ctx->db) { - ret = sqlite3_prepare_v2(ctx->db->handler, - SQL_KUBERNETES_EVENT_EXISTS_BY_UID, - -1, - &ctx->stmt_get_kubernetes_event_exists_by_uid, - 0); - if (ret != SQLITE_OK) { - flb_plg_error(ctx->ins, "error preparing database SQL statement: stmt_get_kubernetes_event_exists_by_uid"); - k8s_events_conf_destroy(ctx); - return NULL; - } - - ret = sqlite3_prepare_v2(ctx->db->handler, - SQL_INSERT_KUBERNETES_EVENTS, - -1, - &ctx->stmt_insert_kubernetes_event, - 0); - if (ret != SQLITE_OK) { - flb_plg_error(ctx->ins, "error preparing database SQL statement: stmt_insert_kubernetes_event"); - k8s_events_conf_destroy(ctx); - return NULL; - } - - ret = sqlite3_prepare_v2(ctx->db->handler, - SQL_DELETE_OLD_KUBERNETES_EVENTS, - -1, - &ctx->stmt_delete_old_kubernetes_events, - 0); - if (ret != SQLITE_OK) { - flb_plg_error(ctx->ins, "error preparing database SQL statement: stmt_delete_old_kubernetes_events"); - k8s_events_conf_destroy(ctx); - return NULL; - } - } -#endif - - return ctx; -} - -void k8s_events_conf_destroy(struct k8s_events *ctx) -{ - if (ctx->ra_timestamp) { - flb_ra_destroy(ctx->ra_timestamp); - } - - if (ctx->ra_resource_version) { - flb_ra_destroy(ctx->ra_resource_version); - } - - if (ctx->upstream) { - flb_upstream_destroy(ctx->upstream); - } - - if (ctx->encoder) { - flb_log_event_encoder_destroy(ctx->encoder); - } - - if (ctx->api_host) { - flb_free(ctx->api_host); - } - if (ctx->token) { - flb_free(ctx->token); - } - if (ctx->auth) { - flb_free(ctx->auth); - } - -#ifdef FLB_HAVE_TLS - if (ctx->tls) { - flb_tls_destroy(ctx->tls); - } -#endif - -#ifdef FLB_HAVE_SQLDB - if (ctx->db) { - flb_kubernetes_event_db_close(ctx->db); - } -#endif - - flb_free(ctx); -} diff --git a/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events_conf.h b/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events_conf.h deleted file mode 100644 index 9d6b54197..000000000 --- a/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events_conf.h +++ /dev/null @@ -1,47 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2023 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_IN_KUBERNETES_EVENTS_CONF_H -#define FLB_IN_KUBERNETES_EVENTS_CONF_H - -#include <fluent-bit/flb_input_plugin.h> -#include <fluent-bit/flb_record_accessor.h> -#include <fluent-bit/flb_log_event_encoder.h> - -#include "kubernetes_events.h" - -/* Kubernetes API server info */ -#define K8S_EVENTS_KUBE_API_HOST "kubernetes.default.svc" -#define K8S_EVENTS_KUBE_API_PORT 443 -// /apis/events.k8s.io/v1/events -// /apis/events.k8s.io/v1/namespaces/{namespace}/events -#define K8S_EVENTS_KUBE_API_URI "/api/v1/events" -#define K8S_EVENTS_KUBE_NAMESPACE_API_URI "/api/v1/namespaces/%s/events" - -/* secrets */ -#define K8S_EVENTS_KUBE_TOKEN "/var/run/secrets/kubernetes.io/serviceaccount/token" -#define K8S_EVENTS_KUBE_CA "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" - -#define K8S_EVENTS_RA_TIMESTAMP "$metadata['creationTimestamp']" -#define K8S_EVENTS_RA_RESOURCE_VERSION "$metadata['resourceVersion']" - -struct k8s_events *k8s_events_conf_create(struct flb_input_instance *ins); -void k8s_events_conf_destroy(struct k8s_events *ctx); - -#endif
\ No newline at end of file diff --git a/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events_sql.h b/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events_sql.h deleted file mode 100644 index 3076791cc..000000000 --- a/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events_sql.h +++ /dev/null @@ -1,60 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2023 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FLB_KUBERNETES_EVENTS_SQL_H -#define FLB_KUBERNETES_EVENTS_SQL_H - -/* - * In Fluent Bit we try to have a common convention for table names, - * if the table belongs to an input/output plugin, use the plugins name - * with the name of the object or type. - * - * in_kubernetes_events plugin table to track kubernetes events: - * in_kubernetes_events - */ -#define SQL_CREATE_KUBERNETES_EVENTS \ - "CREATE TABLE IF NOT EXISTS in_kubernetes_events (" \ - " id INTEGER PRIMARY KEY," \ - " uid TEXT NOT NULL," \ - " resourceVersion INTEGER NOT NULL," \ - " created INTEGER NOT NULL" \ - ");" - -#define SQL_KUBERNETES_EVENT_EXISTS_BY_UID \ - "SELECT COUNT(id) " \ - " FROM in_kubernetes_events " \ - " WHERE uid=@uid;" - -#define SQL_INSERT_KUBERNETES_EVENTS \ - "INSERT INTO in_kubernetes_events (uid, resourceVersion, created)" \ - " VALUES (@uid, @resourceVersion, @created);" - -#define SQL_DELETE_OLD_KUBERNETES_EVENTS \ - "DELETE FROM in_kubernetes_events WHERE created <= @createdBefore;" - -#define SQL_PRAGMA_SYNC \ - "PRAGMA synchronous=%i;" - -#define SQL_PRAGMA_JOURNAL_MODE \ - "PRAGMA journal_mode=%s;" - -#define SQL_PRAGMA_LOCKING_MODE \ - "PRAGMA locking_mode=EXCLUSIVE;" - -#endif |