summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/in_kubernetes_events
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/plugins/in_kubernetes_events')
-rw-r--r--src/fluent-bit/plugins/in_kubernetes_events/CMakeLists.txt5
-rw-r--r--src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events.c921
-rw-r--r--src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events.h106
-rw-r--r--src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events_conf.c326
-rw-r--r--src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events_conf.h47
-rw-r--r--src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events_sql.h60
6 files changed, 1465 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/in_kubernetes_events/CMakeLists.txt b/src/fluent-bit/plugins/in_kubernetes_events/CMakeLists.txt
new file mode 100644
index 000000000..b860a55e3
--- /dev/null
+++ b/src/fluent-bit/plugins/in_kubernetes_events/CMakeLists.txt
@@ -0,0 +1,5 @@
+set(src
+ kubernetes_events_conf.c
+ kubernetes_events.c)
+
+FLB_PLUGIN(in_kubernetes_events "${src}" "")
diff --git a/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events.c b/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events.c
new file mode 100644
index 000000000..97719fba6
--- /dev/null
+++ b/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events.c
@@ -0,0 +1,921 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2023 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include <sys/types.h>
+#include <sys/stat.h>
+
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_network.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_error.h>
+#include <fluent-bit/flb_compat.h>
+#include <fluent-bit/flb_ra_key.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_strptime.h>
+#include <fluent-bit/flb_parser.h>
+#include <fluent-bit/flb_log_event_encoder.h>
+#include <fluent-bit/flb_compat.h>
+
+#include "kubernetes_events.h"
+#include "kubernetes_events_conf.h"
+
+#ifdef FLB_HAVE_SQLDB
+#include "kubernetes_events_sql.h"
+static int k8s_events_sql_insert_event(struct k8s_events *ctx, msgpack_object *item);
+#endif
+
+static int file_to_buffer(const char *path,
+ char **out_buf, size_t *out_size)
+{
+ int ret;
+ int len;
+ char *buf;
+ ssize_t bytes;
+ FILE *fp;
+ struct stat st;
+
+ if (!(fp = fopen(path, "r"))) {
+ return -1;
+ }
+
+ ret = stat(path, &st);
+ if (ret == -1) {
+ flb_errno();
+ fclose(fp);
+ return -1;
+ }
+
+ buf = flb_calloc(1, (st.st_size + 1));
+ if (!buf) {
+ flb_errno();
+ fclose(fp);
+ return -1;
+ }
+
+ bytes = fread(buf, st.st_size, 1, fp);
+ if (bytes < 1) {
+ flb_free(buf);
+ fclose(fp);
+ return -1;
+ }
+
+ fclose(fp);
+
+ // trim new lines
+ for (len = st.st_size; len > 0; len--) {
+ if (buf[len-1] != '\n' && buf[len-1] != '\r') {
+ break;
+ }
+ }
+ buf[len] = '\0';
+
+ *out_buf = buf;
+ *out_size = len;
+
+ return 0;
+}
+
+/* Set K8s Authorization Token and get HTTP Auth Header */
+static int get_http_auth_header(struct k8s_events *ctx)
+{
+ int ret;
+ char *temp;
+ char *tk = NULL;
+ size_t tk_size = 0;
+
+ if (!ctx->token_file || strlen(ctx->token_file) == 0) {
+ return 0;
+ }
+
+ ret = file_to_buffer(ctx->token_file, &tk, &tk_size);
+ if (ret == -1) {
+ flb_plg_warn(ctx->ins, "cannot open %s", ctx->token_file);
+ return -1;
+ }
+ ctx->token_created = time(NULL);
+
+ /* Token */
+ if (ctx->token != NULL) {
+ flb_free(ctx->token);
+ }
+ ctx->token = tk;
+ ctx->token_len = tk_size;
+
+ /* HTTP Auth Header */
+ if (ctx->auth == NULL) {
+ ctx->auth = flb_malloc(tk_size + 32);
+ }
+ else if (ctx->auth_len < tk_size + 32) {
+ temp = flb_realloc(ctx->auth, tk_size + 32);
+ if (temp == NULL) {
+ flb_errno();
+ flb_free(ctx->auth);
+ ctx->auth = NULL;
+ return -1;
+ }
+ ctx->auth = temp;
+ }
+
+ if (!ctx->auth) {
+ return -1;
+ }
+
+ ctx->auth_len = snprintf(ctx->auth, tk_size + 32, "Bearer %s", tk);
+ return 0;
+}
+
+/* Refresh HTTP Auth Header if K8s Authorization Token is expired */
+static int refresh_token_if_needed(struct k8s_events *ctx)
+{
+ int expired = FLB_FALSE;
+ int ret;
+
+ if (!ctx->token_file || strlen(ctx->token_file) == 0) {
+ return 0;
+ }
+
+ if (ctx->token_created > 0) {
+ if (time(NULL) > ctx->token_created + ctx->token_ttl) {
+ expired = FLB_TRUE;
+ }
+ }
+
+ if (expired || ctx->token_created == 0) {
+ ret = get_http_auth_header(ctx);
+ if (ret == -1) {
+ return -1;
+ }
+ }
+
+ return 0;
+}
+static int timestamp_lookup(struct k8s_events *ctx, char *ts, struct flb_time *time)
+{
+ struct flb_tm tm = { 0 };
+
+ if (flb_strptime(ts, "%Y-%m-%dT%H:%M:%SZ", &tm) == NULL) {
+ return -1;
+ }
+
+ time->tm.tv_sec = flb_parser_tm2time(&tm);
+ time->tm.tv_nsec = 0;
+
+ return 0;
+}
+
+static msgpack_object *record_get_field_ptr(msgpack_object *obj, const char *fieldname)
+{
+ int i;
+ msgpack_object *k;
+ msgpack_object *v;
+
+ if (obj->type != MSGPACK_OBJECT_MAP) {
+ return NULL;
+ }
+
+ for (i = 0; i < obj->via.map.size; i++) {
+ k = &obj->via.map.ptr[i].key;
+ if (k->type != MSGPACK_OBJECT_STR) {
+ continue;
+ }
+
+ if (strncmp(k->via.str.ptr, fieldname, strlen(fieldname)) == 0) {
+ v = &obj->via.map.ptr[i].val;
+ return v;
+ }
+ }
+ return NULL;
+}
+
+static int record_get_field_sds(msgpack_object *obj, const char *fieldname, flb_sds_t *val)
+{
+ msgpack_object *v;
+
+ v = record_get_field_ptr(obj, fieldname);
+ if (v == NULL) {
+ return 0;
+ }
+ if (v->type != MSGPACK_OBJECT_STR) {
+ return -1;
+ }
+
+ *val = flb_sds_create_len(v->via.str.ptr, v->via.str.size);
+ return 0;
+}
+
+static int record_get_field_time(msgpack_object *obj, const char *fieldname, time_t *val)
+{
+ msgpack_object *v;
+ struct flb_tm tm = { 0 };
+
+ v = record_get_field_ptr(obj, fieldname);
+ if (v == NULL) {
+ return -1;
+ }
+ if (v->type != MSGPACK_OBJECT_STR) {
+ return -1;
+ }
+
+ if (flb_strptime(v->via.str.ptr, "%Y-%m-%dT%H:%M:%SZ", &tm) == NULL) {
+ return -2;
+ }
+
+ *val = mktime(&tm.tm);
+ return 0;
+}
+
+static int record_get_field_uint64(msgpack_object *obj, const char *fieldname, uint64_t *val)
+{
+ msgpack_object *v;
+ char *end;
+
+ v = record_get_field_ptr(obj, fieldname);
+ if (v == NULL) {
+ return -1;
+ }
+
+ // attempt to parse string as number...
+ if (v->type == MSGPACK_OBJECT_STR) {
+ *val = strtoul(v->via.str.ptr, &end, 10);
+ if (end == NULL || (end < v->via.str.ptr + v->via.str.size)) {
+ return -1;
+ }
+ return 0;
+ }
+ if (v->type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
+ *val = v->via.u64;
+ return 0;
+ }
+ if (v->type == MSGPACK_OBJECT_NEGATIVE_INTEGER) {
+ *val = (uint64_t)v->via.i64;
+ return 0;
+ }
+ return -1;
+}
+
+static int item_get_timestamp(msgpack_object *obj, time_t *event_time)
+{
+ int ret;
+ msgpack_object *metadata;
+
+ // some events can have lastTimestamp and firstTimestamp set to
+ // NULL while having metadata.creationTimestamp set.
+ ret = record_get_field_time(obj, "lastTimestamp", event_time);
+ if (ret != -1) {
+ return FLB_TRUE;
+ }
+
+ ret = record_get_field_time(obj, "firstTimestamp", event_time);
+ if (ret != -1) {
+ return FLB_TRUE;
+ }
+
+ metadata = record_get_field_ptr(obj, "metadata");
+ if (metadata == NULL) {
+ return FLB_FALSE;
+ }
+
+ ret = record_get_field_time(metadata, "creationTimestamp", event_time);
+ if (ret != -1) {
+ return FLB_TRUE;
+ }
+
+ return FLB_FALSE;
+}
+
+static bool check_event_is_filtered(struct k8s_events *ctx, msgpack_object *obj)
+{
+ int ret;
+ time_t event_time;
+ time_t now;
+ msgpack_object *metadata;
+ flb_sds_t uid;
+ uint64_t resource_version;
+
+ ret = item_get_timestamp(obj, &event_time);
+ if (ret == -FLB_FALSE) {
+ flb_plg_error(ctx->ins, "Cannot get timestamp for item in response");
+ return FLB_FALSE;
+ }
+
+ now = (time_t)(cfl_time_now() / 1000000000);
+ if (event_time < (now - ctx->retention_time)) {
+ flb_plg_debug(ctx->ins, "Item is older than retention_time: %ld < %ld",
+ event_time, (now - ctx->retention_time));
+ return FLB_TRUE;
+ }
+
+ metadata = record_get_field_ptr(obj, "metadata");
+ if (metadata == NULL) {
+ flb_plg_error(ctx->ins, "Cannot unpack item metadata in response");
+ return FLB_FALSE;
+ }
+
+ ret = record_get_field_uint64(metadata, "resourceVersion", &resource_version);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "Cannot get resourceVersion for item in response");
+ return FLB_FALSE;
+ }
+
+ ret = record_get_field_sds(metadata, "uid", &uid);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "Cannot get resourceVersion for item in response");
+ return FLB_FALSE;
+ }
+
+
+#ifdef FLB_HAVE_SQLDB
+ bool exists;
+
+
+ if (ctx->db) {
+ sqlite3_bind_text(ctx->stmt_get_kubernetes_event_exists_by_uid,
+ 1, uid, -1, NULL);
+ ret = sqlite3_step(ctx->stmt_get_kubernetes_event_exists_by_uid);
+ if (ret != SQLITE_ROW) {
+ if (ret != SQLITE_DONE) {
+ flb_plg_error(ctx->ins, "cannot execute kubernetes event exists");
+ }
+ sqlite3_clear_bindings(ctx->stmt_get_kubernetes_event_exists_by_uid);
+ sqlite3_reset(ctx->stmt_get_kubernetes_event_exists_by_uid);
+ flb_sds_destroy(uid);
+ return FLB_FALSE;
+ }
+
+ exists = sqlite3_column_int64(ctx->stmt_get_kubernetes_event_exists_by_uid, 0);
+
+ flb_plg_debug(ctx->ins, "is_filtered: uid=%s exists=%d", uid, exists);
+ sqlite3_clear_bindings(ctx->stmt_get_kubernetes_event_exists_by_uid);
+ sqlite3_reset(ctx->stmt_get_kubernetes_event_exists_by_uid);
+ flb_sds_destroy(uid);
+
+ return exists > 0 ? FLB_TRUE : FLB_FALSE;
+ }
+#endif
+
+ // check if this is an old event.
+ if (ctx->last_resource_version && resource_version <= ctx->last_resource_version) {
+ flb_plg_debug(ctx->ins, "skipping old object: %lu (< %lu)", resource_version,
+ ctx->last_resource_version);
+ flb_sds_destroy(uid);
+ return FLB_TRUE;
+ }
+
+ flb_sds_destroy(uid);
+ return FLB_FALSE;
+}
+
+static int process_events(struct k8s_events *ctx, char *in_data, size_t in_size, uint64_t *max_resource_version, flb_sds_t *continue_token)
+{
+ int i;
+ int ret = -1;
+ int root_type;
+ size_t consumed = 0;
+ char *buf_data;
+ size_t buf_size;
+ size_t off = 0;
+ struct flb_time ts;
+ struct flb_ra_value *rval;
+ uint64_t resource_version;
+ msgpack_unpacked result;
+ msgpack_object root;
+ msgpack_object k;
+ msgpack_object *items = NULL;
+ msgpack_object *item = NULL;
+ msgpack_object *item_metadata = NULL;
+ msgpack_object *metadata = NULL;
+
+
+ ret = flb_pack_json(in_data, in_size, &buf_data, &buf_size, &root_type, &consumed);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "could not process payload, incomplete or bad formed JSON");
+ goto json_error;
+ }
+
+ /* unpack */
+ msgpack_unpacked_init(&result);
+ ret = msgpack_unpack_next(&result, buf_data, buf_size, &off);
+ if (ret != MSGPACK_UNPACK_SUCCESS) {
+ flb_plg_error(ctx->ins, "Cannot unpack response");
+ goto unpack_error;
+ }
+
+ /* lookup the items array */
+ root = result.data;
+ if (root.type != MSGPACK_OBJECT_MAP) {
+ return -1;
+ }
+
+ // Traverse the EventList for the metadata (for the continue token) and the items.
+ // https://kubernetes.io/docs/reference/kubernetes-api/cluster-resources/event-v1/#EventList
+ for (i = 0; i < root.via.map.size; i++) {
+ k = root.via.map.ptr[i].key;
+ if (k.type != MSGPACK_OBJECT_STR) {
+ continue;
+ }
+
+ if (strncmp(k.via.str.ptr, "items", 5) == 0) {
+ items = &root.via.map.ptr[i].val;
+ if (items->type != MSGPACK_OBJECT_ARRAY) {
+ flb_plg_error(ctx->ins, "Cannot unpack items");
+ goto msg_error;
+ }
+ }
+
+ if (strncmp(k.via.str.ptr, "metadata", 8) == 0) {
+ metadata = &root.via.map.ptr[i].val;
+ if (metadata->type != MSGPACK_OBJECT_MAP) {
+ flb_plg_error(ctx->ins, "Cannot unpack metadata");
+ goto msg_error;
+ }
+ }
+ }
+
+ if (items == NULL) {
+ flb_plg_error(ctx->ins, "Cannot find items in response");
+ goto msg_error;
+ }
+
+ if (metadata == NULL) {
+ flb_plg_error(ctx->ins, "Cannot find metatada in response");
+ goto msg_error;
+ }
+
+ ret = record_get_field_sds(metadata, "continue", continue_token);
+ if (ret == -1) {
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "Cannot process continue token");
+ goto msg_error;
+ }
+ }
+
+ for (i = 0; i < items->via.array.size; i++) {
+ if (items->via.array.ptr[i].type != MSGPACK_OBJECT_MAP) {
+ flb_plg_warn(ctx->ins, "Event that is not a map");
+ continue;
+ }
+ item_metadata = record_get_field_ptr(&items->via.array.ptr[i], "metadata");
+ if (item_metadata == NULL) {
+ flb_plg_warn(ctx->ins, "Event without metadata");
+ continue;
+ }
+ ret = record_get_field_uint64(item_metadata,
+ "resourceVersion", &resource_version);
+ if (ret == -1) {
+ continue;
+ }
+ if (resource_version > *max_resource_version) {
+ *max_resource_version = resource_version;
+ }
+ }
+
+ /* reset the log encoder */
+ flb_log_event_encoder_reset(ctx->encoder);
+
+ /* print every item from the items array */
+ for (i = 0; i < items->via.array.size; i++) {
+ item = &items->via.array.ptr[i];
+ if (item->type != MSGPACK_OBJECT_MAP) {
+ flb_plg_error(ctx->ins, "Cannot unpack item in response");
+ goto msg_error;
+ }
+
+ if (check_event_is_filtered(ctx, item) == FLB_TRUE) {
+ continue;
+ }
+
+#ifdef FLB_HAVE_SQLDB
+ if (ctx->db) {
+ k8s_events_sql_insert_event(ctx, item);
+ }
+#endif
+
+ /* get event timestamp */
+ rval = flb_ra_get_value_object(ctx->ra_timestamp, *item);
+ if (!rval || rval->type != FLB_RA_STRING) {
+ flb_plg_error(ctx->ins, "cannot retrieve event timestamp");
+ goto msg_error;
+ }
+
+ /* convert timestamp */
+ ret = timestamp_lookup(ctx, rval->val.string, &ts);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "cannot lookup event timestamp");
+ flb_ra_key_value_destroy(rval);
+ goto msg_error;
+ }
+
+ /* encode content as a log event */
+ flb_log_event_encoder_begin_record(ctx->encoder);
+ flb_log_event_encoder_set_timestamp(ctx->encoder, &ts);
+
+ ret = flb_log_event_encoder_set_body_from_msgpack_object(ctx->encoder, item);
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_commit_record(ctx->encoder);
+ } else {
+ flb_plg_warn(ctx->ins, "unable to encode: %lu", resource_version);
+ }
+ flb_ra_key_value_destroy(rval);
+ }
+
+ if (ctx->encoder->output_length > 0) {
+ flb_input_log_append(ctx->ins, NULL, 0,
+ ctx->encoder->output_buffer,
+ ctx->encoder->output_length);
+ }
+
+msg_error:
+ msgpack_unpacked_destroy(&result);
+unpack_error:
+ flb_free(buf_data);
+json_error:
+ return ret;
+}
+
+static struct flb_http_client *make_event_api_request(struct k8s_events *ctx,
+ struct flb_connection *u_conn,
+ flb_sds_t continue_token)
+{
+ flb_sds_t url;
+ struct flb_http_client *c;
+
+
+ if (continue_token == NULL && ctx->limit_request == 0 && ctx->namespace == NULL) {
+ return flb_http_client(u_conn, FLB_HTTP_GET, K8S_EVENTS_KUBE_API_URI,
+ NULL, 0, ctx->api_host, ctx->api_port, NULL, 0);
+ }
+
+ if (ctx->namespace == NULL) {
+ url = flb_sds_create(K8S_EVENTS_KUBE_API_URI);
+ } else {
+ url = flb_sds_create_size(strlen(K8S_EVENTS_KUBE_NAMESPACE_API_URI) +
+ strlen(ctx->namespace));
+ flb_sds_printf(&url, K8S_EVENTS_KUBE_NAMESPACE_API_URI, ctx->namespace);
+ }
+
+ flb_sds_cat_safe(&url, "?", 1);
+ if (ctx->limit_request) {
+ if (continue_token != NULL) {
+ flb_sds_printf(&url, "continue=%s&", continue_token);
+ }
+ flb_sds_printf(&url, "limit=%d", ctx->limit_request);
+ }
+ c = flb_http_client(u_conn, FLB_HTTP_GET, url,
+ NULL, 0, ctx->api_host, ctx->api_port, NULL, 0);
+ flb_sds_destroy(url);
+ return c;
+}
+
+#ifdef FLB_HAVE_SQLDB
+
+static int k8s_events_cleanup_db(struct flb_input_instance *ins,
+ struct flb_config *config, void *in_context)
+{
+ int ret;
+ struct k8s_events *ctx = (struct k8s_events *)in_context;
+ time_t retention_time_ago;
+ time_t now = (cfl_time_now() / 1000000000);
+
+ if (ctx->db == NULL) {
+ FLB_INPUT_RETURN(0);
+ }
+
+ retention_time_ago = now - (ctx->retention_time);
+ sqlite3_bind_int64(ctx->stmt_delete_old_kubernetes_events,
+ 1, (int64_t)retention_time_ago);
+ ret = sqlite3_step(ctx->stmt_delete_old_kubernetes_events);
+ if (ret != SQLITE_ROW && ret != SQLITE_DONE) {
+ flb_plg_error(ctx->ins, "cannot execute delete old kubernetes events");
+ }
+
+ sqlite3_clear_bindings(ctx->stmt_delete_old_kubernetes_events);
+ sqlite3_reset(ctx->stmt_delete_old_kubernetes_events);
+
+ FLB_INPUT_RETURN(0);
+}
+
+static int k8s_events_sql_insert_event(struct k8s_events *ctx, msgpack_object *item)
+{
+ int ret;
+ uint64_t resource_version;
+ time_t last;
+ msgpack_object *meta;
+ flb_sds_t uid;
+
+
+ meta = record_get_field_ptr(item, "meta");
+ if (meta == NULL) {
+ flb_plg_error(ctx->ins, "unable to find metadata to save event");
+ return -1;
+ }
+
+ ret = record_get_field_uint64(meta, "resourceVersion", &resource_version);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "unable to find resourceVersion in metadata to save event");
+ return -1;
+ }
+
+ ret = record_get_field_sds(meta, "uid", &uid);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "unable to find uid in metadata to save event");
+ return -1;
+ }
+
+ ret = item_get_timestamp(item, &last);
+ if (ret == -FLB_FALSE) {
+ flb_plg_error(ctx->ins, "Cannot get timestamp for item to save it");
+ return -1;
+ }
+
+ if (ret == -2) {
+ flb_plg_error(ctx->ins, "unable to parse lastTimestamp in item to save event");
+ flb_sds_destroy(uid);
+ return -1;
+ }
+
+ /* Bind parameters */
+ sqlite3_bind_text(ctx->stmt_insert_kubernetes_event, 1, uid, -1, 0);
+ sqlite3_bind_int64(ctx->stmt_insert_kubernetes_event, 2, resource_version);
+ sqlite3_bind_int64(ctx->stmt_insert_kubernetes_event, 3, (int64_t)last);
+
+ /* Run the insert */
+ ret = sqlite3_step(ctx->stmt_insert_kubernetes_event);
+ if (ret != SQLITE_DONE) {
+ sqlite3_clear_bindings(ctx->stmt_insert_kubernetes_event);
+ sqlite3_reset(ctx->stmt_insert_kubernetes_event);
+ flb_plg_error(ctx->ins, "cannot execute insert kubernetes event %s inode=%lu",
+ uid, resource_version);
+ flb_sds_destroy(uid);
+ return -1;
+ }
+
+ flb_plg_debug(ctx->ins,
+ "inserted k8s event: uid=%s, resource_version=%lu, last=%ld",
+ uid, resource_version, last);
+ sqlite3_clear_bindings(ctx->stmt_insert_kubernetes_event);
+ sqlite3_reset(ctx->stmt_insert_kubernetes_event);
+
+ flb_sds_destroy(uid);
+ return flb_sqldb_last_id(ctx->db);
+}
+
+#endif
+
+static int k8s_events_collect(struct flb_input_instance *ins,
+ struct flb_config *config, void *in_context)
+{
+ int ret;
+ size_t b_sent;
+ struct flb_connection *u_conn = NULL;
+ struct flb_http_client *c = NULL;
+ struct k8s_events *ctx = in_context;
+ flb_sds_t continue_token = NULL;
+ uint64_t max_resource_version = 0;
+
+ if (pthread_mutex_trylock(&ctx->lock) != 0) {
+ FLB_INPUT_RETURN(0);
+ }
+
+ u_conn = flb_upstream_conn_get(ctx->upstream);
+ if (!u_conn) {
+ flb_plg_error(ins, "upstream connection initialization error");
+ goto exit;
+ }
+
+ ret = refresh_token_if_needed(ctx);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "failed to refresh token");
+ goto exit;
+ }
+
+ do {
+ c = make_event_api_request(ctx, u_conn, continue_token);
+ if (continue_token != NULL) {
+ flb_sds_destroy(continue_token);
+ continue_token = NULL;
+ }
+ if (!c) {
+ flb_plg_error(ins, "unable to create http client");
+ goto exit;
+ }
+ flb_http_buffer_size(c, 0);
+
+ flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
+ if (ctx->auth_len > 0) {
+ flb_http_add_header(c, "Authorization", 13, ctx->auth, ctx->auth_len);
+ }
+
+ ret = flb_http_do(c, &b_sent);
+ if (ret != 0) {
+ flb_plg_error(ins, "http do error");
+ goto exit;
+ }
+
+ if (c->resp.status == 200) {
+ ret = process_events(ctx, c->resp.payload, c->resp.payload_size, &max_resource_version, &continue_token);
+ }
+ else {
+ if (c->resp.payload_size > 0) {
+ flb_plg_error(ctx->ins, "http_status=%i:\n%s", c->resp.status, c->resp.payload);
+ }
+ else {
+ flb_plg_error(ctx->ins, "http_status=%i", c->resp.status);
+ }
+ }
+ flb_http_client_destroy(c);
+ c = NULL;
+ } while(continue_token != NULL);
+
+ if (max_resource_version > ctx->last_resource_version) {
+ flb_plg_debug(ctx->ins, "set last resourceVersion=%lu", max_resource_version);
+ ctx->last_resource_version = max_resource_version;
+ }
+
+exit:
+ pthread_mutex_unlock(&ctx->lock);
+ if (c) {
+ flb_http_client_destroy(c);
+ }
+ if (u_conn) {
+ flb_upstream_conn_release(u_conn);
+ }
+ FLB_INPUT_RETURN(0);
+}
+
+static int k8s_events_init(struct flb_input_instance *ins,
+ struct flb_config *config, void *data)
+{
+ struct k8s_events *ctx = NULL;
+
+ ctx = k8s_events_conf_create(ins);
+ if (!ctx) {
+ return -1;
+ }
+
+ ctx->coll_id = flb_input_set_collector_time(ins,
+ k8s_events_collect,
+ ctx->interval_sec,
+ ctx->interval_nsec,
+ config);
+
+#ifdef FLB_HAVE_SQLDB
+ if (ctx->db) {
+ ctx->coll_cleanup_id = flb_input_set_collector_time(ins,
+ k8s_events_cleanup_db,
+ ctx->interval_sec,
+ ctx->interval_nsec,
+ config);
+ }
+#endif
+
+ return 0;
+}
+
+static int k8s_events_exit(void *data, struct flb_config *config)
+{
+ struct k8s_events *ctx = data;
+
+ if (!ctx) {
+ return 0;
+ }
+
+ k8s_events_conf_destroy(ctx);
+ return 0;
+}
+
+/* Configuration properties map */
+static struct flb_config_map config_map[] = {
+ /* Full Kubernetes API server URL */
+ {
+ FLB_CONFIG_MAP_STR, "kube_url", "https://kubernetes.default.svc",
+ 0, FLB_FALSE, 0,
+ "Kubernetes API server URL"
+ },
+
+ /* Refresh interval */
+ {
+ FLB_CONFIG_MAP_INT, "interval_sec", DEFAULT_INTERVAL_SEC,
+ 0, FLB_TRUE, offsetof(struct k8s_events, interval_sec),
+ "Set the polling interval for each channel"
+ },
+ {
+ FLB_CONFIG_MAP_INT, "interval_nsec", DEFAULT_INTERVAL_NSEC,
+ 0, FLB_TRUE, offsetof(struct k8s_events, interval_nsec),
+ "Set the polling interval for each channel (sub seconds)"
+ },
+
+ /* TLS: set debug 'level' */
+ {
+ FLB_CONFIG_MAP_INT, "tls.debug", "0",
+ 0, FLB_TRUE, offsetof(struct k8s_events, tls_debug),
+ "set TLS debug level: 0 (no debug), 1 (error), "
+ "2 (state change), 3 (info) and 4 (verbose)"
+ },
+
+ /* TLS: enable verification */
+ {
+ FLB_CONFIG_MAP_BOOL, "tls.verify", "true",
+ 0, FLB_TRUE, offsetof(struct k8s_events, tls_verify),
+ "enable or disable verification of TLS peer certificate"
+ },
+
+ /* TLS: set tls.vhost feature */
+ {
+ FLB_CONFIG_MAP_STR, "tls.vhost", NULL,
+ 0, FLB_TRUE, offsetof(struct k8s_events, tls_vhost),
+ "set optional TLS virtual host"
+ },
+
+ /* Kubernetes TLS: CA file */
+ {
+ FLB_CONFIG_MAP_STR, "kube_ca_file", K8S_EVENTS_KUBE_CA,
+ 0, FLB_TRUE, offsetof(struct k8s_events, tls_ca_file),
+ "Kubernetes TLS CA file"
+ },
+
+ /* Kubernetes TLS: CA certs path */
+ {
+ FLB_CONFIG_MAP_STR, "kube_ca_path", NULL,
+ 0, FLB_TRUE, offsetof(struct k8s_events, tls_ca_path),
+ "Kubernetes TLS ca path"
+ },
+
+ /* Kubernetes Token file */
+ {
+ FLB_CONFIG_MAP_STR, "kube_token_file", K8S_EVENTS_KUBE_TOKEN,
+ 0, FLB_TRUE, offsetof(struct k8s_events, token_file),
+ "Kubernetes authorization token file"
+ },
+
+ /* Kubernetes Token file TTL */
+ {
+ FLB_CONFIG_MAP_TIME, "kube_token_ttl", "10m",
+ 0, FLB_TRUE, offsetof(struct k8s_events, token_ttl),
+ "kubernetes token ttl, until it is reread from the token file. Default: 10m"
+ },
+
+ {
+ FLB_CONFIG_MAP_INT, "kube_request_limit", "0",
+ 0, FLB_TRUE, offsetof(struct k8s_events, limit_request),
+ "kubernetes limit parameter for events query, no limit applied when set to 0"
+ },
+
+ {
+ FLB_CONFIG_MAP_TIME, "kube_retention_time", "1h",
+ 0, FLB_TRUE, offsetof(struct k8s_events, retention_time),
+ "kubernetes retention time for events. Default: 1h"
+ },
+
+ {
+ FLB_CONFIG_MAP_STR, "kube_namespace", NULL,
+ 0, FLB_TRUE, offsetof(struct k8s_events, namespace),
+ "kubernetes namespace to get events from, gets event from all namespaces by default."
+ },
+
+#ifdef FLB_HAVE_SQLDB
+ {
+ FLB_CONFIG_MAP_STR, "db", NULL,
+ 0, FLB_FALSE, 0,
+ "set a database file to keep track of recorded kubernetes events."
+ },
+ {
+ FLB_CONFIG_MAP_STR, "db.sync", "normal",
+ 0, FLB_FALSE, 0,
+ "set a database sync method. values: extra, full, normal and off."
+ },
+#endif
+
+ /* EOF */
+ {0}
+};
+
+/* Plugin reference */
+struct flb_input_plugin in_kubernetes_events_plugin = {
+ .name = "kubernetes_events",
+ .description = "Kubernetes Events",
+ .cb_init = k8s_events_init,
+ .cb_pre_run = NULL,
+ .cb_collect = k8s_events_collect,
+ .cb_flush_buf = NULL,
+ .cb_exit = k8s_events_exit,
+ .config_map = config_map,
+ .flags = FLB_INPUT_NET | FLB_INPUT_CORO | FLB_INPUT_THREADED
+};
diff --git a/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events.h b/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events.h
new file mode 100644
index 000000000..3afd48570
--- /dev/null
+++ b/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events.h
@@ -0,0 +1,106 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2023 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_IN_KUBERNETES_EVENTS_H
+#define FLB_IN_KUBERNETES_EVENTS_H
+
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_upstream.h>
+#include <fluent-bit/flb_record_accessor.h>
+#include <fluent-bit/flb_sqldb.h>
+
+#define DEFAULT_INTERVAL_SEC "0"
+#define DEFAULT_INTERVAL_NSEC "500000000"
+
+/* Filter context */
+struct k8s_events {
+ int coll_id;
+ int coll_cleanup_id;
+ int interval_sec; /* interval collection time (Second) */
+ int interval_nsec; /* interval collection time (Nanosecond) */
+ int retention_time; /* retention time limit, default 1 hour */
+
+ /* Configuration parameters */
+ char *api_host;
+ int api_port;
+ int api_https;
+ int tls_debug;
+ int tls_verify;
+ int kube_token_ttl;
+ flb_sds_t namespace;
+
+ /* API Server end point */
+ char kube_url[1024];
+
+ /* TLS CA certificate file */
+ char *tls_ca_path;
+ char *tls_ca_file;
+
+ /* TLS virtual host (optional), set by configmap */
+ flb_sds_t tls_vhost;
+
+ /* Kubernetes Token from FLB_KUBE_TOKEN file */
+ char *token_file;
+ char *token;
+ int token_ttl;
+ size_t token_len;
+ int token_created;
+
+ /* Pre-formatted HTTP Authorization header value */
+ char *auth;
+ size_t auth_len;
+
+ int dns_retries;
+ int dns_wait_time;
+
+ struct flb_tls *tls;
+
+ struct flb_log_event_encoder *encoder;
+
+ /* record accessor */
+ struct flb_record_accessor *ra_timestamp;
+ struct flb_record_accessor *ra_resource_version;
+
+ /* others */
+ struct flb_config *config;
+ struct flb_upstream *upstream;
+ struct flb_input_instance *ins;
+
+ /* limit for event queries */
+ int limit_request;
+ /* last highest seen resource_version */
+ uint64_t last_resource_version;
+
+#ifdef FLB_HAVE_SQLDB
+ /* State database */
+ struct flb_sqldb *db;
+ int db_sync;
+ int db_locking;
+ flb_sds_t db_journal_mode;
+ sqlite3_stmt *stmt_get_kubernetes_event_exists_by_uid;
+ sqlite3_stmt *stmt_insert_kubernetes_event;
+ sqlite3_stmt *stmt_delete_old_kubernetes_events;
+#endif
+
+ /* concurrency lock */
+ pthread_mutex_t lock;
+};
+
+#endif \ No newline at end of file
diff --git a/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events_conf.c b/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events_conf.c
new file mode 100644
index 000000000..4f67d8cfc
--- /dev/null
+++ b/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events_conf.c
@@ -0,0 +1,326 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2023 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "kubernetes_events_conf.h"
+
+#ifdef FLB_HAVE_SQLDB
+#include "kubernetes_events_sql.h"
+
+
+/* Open or create database required by tail plugin */
+static struct flb_sqldb *flb_kubernetes_event_db_open(const char *path,
+ struct flb_input_instance *in,
+ struct k8s_events *ctx,
+ struct flb_config *config)
+{
+ int ret;
+ char tmp[64];
+ struct flb_sqldb *db;
+
+ /* Open/create the database */
+ db = flb_sqldb_open(path, in->name, config);
+ if (!db) {
+ return NULL;
+ }
+
+ /* Create table schema if it don't exists */
+ ret = flb_sqldb_query(db, SQL_CREATE_KUBERNETES_EVENTS, NULL, NULL);
+ if (ret != FLB_OK) {
+ flb_plg_error(ctx->ins, "db: could not create 'in_kubernetes_events' table");
+ flb_sqldb_close(db);
+ return NULL;
+ }
+
+ if (ctx->db_sync >= 0) {
+ snprintf(tmp, sizeof(tmp) - 1, SQL_PRAGMA_SYNC,
+ ctx->db_sync);
+ ret = flb_sqldb_query(db, tmp, NULL, NULL);
+ if (ret != FLB_OK) {
+ flb_plg_error(ctx->ins, "db could not set pragma 'sync'");
+ flb_sqldb_close(db);
+ return NULL;
+ }
+ }
+
+ if (ctx->db_locking == FLB_TRUE) {
+ ret = flb_sqldb_query(db, SQL_PRAGMA_LOCKING_MODE, NULL, NULL);
+ if (ret != FLB_OK) {
+ flb_plg_error(ctx->ins, "db: could not set pragma 'locking_mode'");
+ flb_sqldb_close(db);
+ return NULL;
+ }
+ }
+
+ if (ctx->db_journal_mode) {
+ snprintf(tmp, sizeof(tmp) - 1, SQL_PRAGMA_JOURNAL_MODE,
+ ctx->db_journal_mode);
+ ret = flb_sqldb_query(db, tmp, NULL, NULL);
+ if (ret != FLB_OK) {
+ flb_plg_error(ctx->ins, "db could not set pragma 'journal_mode'");
+ flb_sqldb_close(db);
+ return NULL;
+ }
+ }
+
+ return db;
+}
+
+static int flb_kubernetes_event_db_close(struct flb_sqldb *db)
+{
+ flb_sqldb_close(db);
+ return 0;
+}
+
+#endif
+
+static int network_init(struct k8s_events *ctx, struct flb_config *config)
+{
+ int io_type = FLB_IO_TCP;
+
+ ctx->upstream = NULL;
+
+ if (ctx->api_https == FLB_TRUE) {
+ if (!ctx->tls_ca_path && !ctx->tls_ca_file) {
+ ctx->tls_ca_file = flb_strdup(K8S_EVENTS_KUBE_CA);
+ }
+
+ /* create a custom TLS context since we use user-defined certs */
+ ctx->tls = flb_tls_create(FLB_TLS_CLIENT_MODE,
+ ctx->tls_verify,
+ ctx->tls_debug,
+ ctx->tls_vhost,
+ ctx->tls_ca_path,
+ ctx->tls_ca_file,
+ NULL, NULL, NULL);
+ if (!ctx->tls) {
+ return -1;
+ }
+
+ io_type = FLB_IO_TLS;
+ }
+
+ /* Create an Upstream context */
+ ctx->upstream = flb_upstream_create(config,
+ ctx->api_host,
+ ctx->api_port,
+ io_type,
+ ctx->tls);
+ if (!ctx->upstream) {
+ flb_plg_error(ctx->ins, "network initialization failed");
+ return -1;
+ }
+
+ return 0;
+}
+
+struct k8s_events *k8s_events_conf_create(struct flb_input_instance *ins)
+{
+ int off;
+ int ret;
+ const char *p;
+ const char *url;
+ const char *tmp;
+ struct k8s_events *ctx = NULL;
+ pthread_mutexattr_t attr;
+
+ ctx = flb_calloc(1, sizeof(struct k8s_events));
+ if (!ctx) {
+ flb_errno();
+ return NULL;
+ }
+ ctx->ins = ins;
+
+ pthread_mutexattr_init(&attr);
+ pthread_mutex_init(&ctx->lock, &attr);
+
+ /* Load the config map */
+ ret = flb_input_config_map_set(ins, (void *) ctx);
+ if (ret == -1) {
+ flb_free(ctx);
+ return NULL;
+ }
+ flb_input_set_context(ins, ctx);
+
+ ctx->encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_DEFAULT);
+ if (!ctx->encoder) {
+ flb_plg_error(ins, "could not initialize event encoder");
+ k8s_events_conf_destroy(ctx);
+ return NULL;
+ }
+
+ /* Record accessor pattern */
+ ctx->ra_timestamp = flb_ra_create(K8S_EVENTS_RA_TIMESTAMP, FLB_TRUE);
+ if (!ctx->ra_timestamp) {
+ flb_plg_error(ctx->ins,
+ "could not create record accessor for metadata items");
+ k8s_events_conf_destroy(ctx);
+ return NULL;
+ }
+
+ ctx->ra_resource_version = flb_ra_create(K8S_EVENTS_RA_RESOURCE_VERSION, FLB_TRUE);
+ if (!ctx->ra_resource_version) {
+ flb_plg_error(ctx->ins, "could not create record accessor for resource version");
+ k8s_events_conf_destroy(ctx);
+ return NULL;
+ }
+
+ /* Get Kubernetes API server */
+ url = flb_input_get_property("kube_url", ins);
+ if (!url) {
+ ctx->api_host = flb_strdup(K8S_EVENTS_KUBE_API_HOST);
+ ctx->api_port = K8S_EVENTS_KUBE_API_PORT;
+ ctx->api_https = FLB_TRUE;
+ }
+ else {
+ tmp = url;
+
+ /* Check the protocol */
+ if (strncmp(tmp, "http://", 7) == 0) {
+ off = 7;
+ ctx->api_https = FLB_FALSE;
+ }
+ else if (strncmp(tmp, "https://", 8) == 0) {
+ off = 8;
+ ctx->api_https = FLB_TRUE;
+ }
+ else {
+ k8s_events_conf_destroy(ctx);
+ return NULL;
+ }
+
+ /* Get hostname and TCP port */
+ p = url + off;
+ tmp = strchr(p, ':');
+ if (tmp) {
+ ctx->api_host = flb_strndup(p, tmp - p);
+ tmp++;
+ ctx->api_port = atoi(tmp);
+ }
+ else {
+ ctx->api_host = flb_strdup(p);
+ ctx->api_port = K8S_EVENTS_KUBE_API_PORT;
+ }
+ }
+ snprintf(ctx->kube_url, sizeof(ctx->kube_url) - 1,
+ "%s://%s:%i",
+ ctx->api_https ? "https" : "http",
+ ctx->api_host, ctx->api_port);
+
+ flb_plg_info(ctx->ins, "API server: %s", ctx->kube_url);
+
+ /* network setup */
+ ret = network_init(ctx, ins->config);
+ if (ret == -1) {
+ k8s_events_conf_destroy(ctx);
+ return NULL;
+ }
+
+#ifdef FLB_HAVE_SQLDB
+ /* Initialize database */
+ tmp = flb_input_get_property("db", ins);
+ if (tmp) {
+ ctx->db = flb_kubernetes_event_db_open(tmp, ins, ctx, ins->config);
+ if (!ctx->db) {
+ flb_plg_error(ctx->ins, "could not open/create database");
+ k8s_events_conf_destroy(ctx);
+ return NULL;
+ }
+ }
+
+ if (ctx->db) {
+ ret = sqlite3_prepare_v2(ctx->db->handler,
+ SQL_KUBERNETES_EVENT_EXISTS_BY_UID,
+ -1,
+ &ctx->stmt_get_kubernetes_event_exists_by_uid,
+ 0);
+ if (ret != SQLITE_OK) {
+ flb_plg_error(ctx->ins, "error preparing database SQL statement: stmt_get_kubernetes_event_exists_by_uid");
+ k8s_events_conf_destroy(ctx);
+ return NULL;
+ }
+
+ ret = sqlite3_prepare_v2(ctx->db->handler,
+ SQL_INSERT_KUBERNETES_EVENTS,
+ -1,
+ &ctx->stmt_insert_kubernetes_event,
+ 0);
+ if (ret != SQLITE_OK) {
+ flb_plg_error(ctx->ins, "error preparing database SQL statement: stmt_insert_kubernetes_event");
+ k8s_events_conf_destroy(ctx);
+ return NULL;
+ }
+
+ ret = sqlite3_prepare_v2(ctx->db->handler,
+ SQL_DELETE_OLD_KUBERNETES_EVENTS,
+ -1,
+ &ctx->stmt_delete_old_kubernetes_events,
+ 0);
+ if (ret != SQLITE_OK) {
+ flb_plg_error(ctx->ins, "error preparing database SQL statement: stmt_delete_old_kubernetes_events");
+ k8s_events_conf_destroy(ctx);
+ return NULL;
+ }
+ }
+#endif
+
+ return ctx;
+}
+
+void k8s_events_conf_destroy(struct k8s_events *ctx)
+{
+ if (ctx->ra_timestamp) {
+ flb_ra_destroy(ctx->ra_timestamp);
+ }
+
+ if (ctx->ra_resource_version) {
+ flb_ra_destroy(ctx->ra_resource_version);
+ }
+
+ if (ctx->upstream) {
+ flb_upstream_destroy(ctx->upstream);
+ }
+
+ if (ctx->encoder) {
+ flb_log_event_encoder_destroy(ctx->encoder);
+ }
+
+ if (ctx->api_host) {
+ flb_free(ctx->api_host);
+ }
+ if (ctx->token) {
+ flb_free(ctx->token);
+ }
+ if (ctx->auth) {
+ flb_free(ctx->auth);
+ }
+
+#ifdef FLB_HAVE_TLS
+ if (ctx->tls) {
+ flb_tls_destroy(ctx->tls);
+ }
+#endif
+
+#ifdef FLB_HAVE_SQLDB
+ if (ctx->db) {
+ flb_kubernetes_event_db_close(ctx->db);
+ }
+#endif
+
+ flb_free(ctx);
+}
diff --git a/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events_conf.h b/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events_conf.h
new file mode 100644
index 000000000..9d6b54197
--- /dev/null
+++ b/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events_conf.h
@@ -0,0 +1,47 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2023 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_IN_KUBERNETES_EVENTS_CONF_H
+#define FLB_IN_KUBERNETES_EVENTS_CONF_H
+
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_record_accessor.h>
+#include <fluent-bit/flb_log_event_encoder.h>
+
+#include "kubernetes_events.h"
+
+/* Kubernetes API server info */
+#define K8S_EVENTS_KUBE_API_HOST "kubernetes.default.svc"
+#define K8S_EVENTS_KUBE_API_PORT 443
+// /apis/events.k8s.io/v1/events
+// /apis/events.k8s.io/v1/namespaces/{namespace}/events
+#define K8S_EVENTS_KUBE_API_URI "/api/v1/events"
+#define K8S_EVENTS_KUBE_NAMESPACE_API_URI "/api/v1/namespaces/%s/events"
+
+/* secrets */
+#define K8S_EVENTS_KUBE_TOKEN "/var/run/secrets/kubernetes.io/serviceaccount/token"
+#define K8S_EVENTS_KUBE_CA "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
+
+#define K8S_EVENTS_RA_TIMESTAMP "$metadata['creationTimestamp']"
+#define K8S_EVENTS_RA_RESOURCE_VERSION "$metadata['resourceVersion']"
+
+struct k8s_events *k8s_events_conf_create(struct flb_input_instance *ins);
+void k8s_events_conf_destroy(struct k8s_events *ctx);
+
+#endif \ No newline at end of file
diff --git a/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events_sql.h b/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events_sql.h
new file mode 100644
index 000000000..3076791cc
--- /dev/null
+++ b/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events_sql.h
@@ -0,0 +1,60 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2023 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_KUBERNETES_EVENTS_SQL_H
+#define FLB_KUBERNETES_EVENTS_SQL_H
+
+/*
+ * In Fluent Bit we try to have a common convention for table names,
+ * if the table belongs to an input/output plugin, use the plugins name
+ * with the name of the object or type.
+ *
+ * in_kubernetes_events plugin table to track kubernetes events:
+ * in_kubernetes_events
+ */
+#define SQL_CREATE_KUBERNETES_EVENTS \
+ "CREATE TABLE IF NOT EXISTS in_kubernetes_events (" \
+ " id INTEGER PRIMARY KEY," \
+ " uid TEXT NOT NULL," \
+ " resourceVersion INTEGER NOT NULL," \
+ " created INTEGER NOT NULL" \
+ ");"
+
+#define SQL_KUBERNETES_EVENT_EXISTS_BY_UID \
+ "SELECT COUNT(id) " \
+ " FROM in_kubernetes_events " \
+ " WHERE uid=@uid;"
+
+#define SQL_INSERT_KUBERNETES_EVENTS \
+ "INSERT INTO in_kubernetes_events (uid, resourceVersion, created)" \
+ " VALUES (@uid, @resourceVersion, @created);"
+
+#define SQL_DELETE_OLD_KUBERNETES_EVENTS \
+ "DELETE FROM in_kubernetes_events WHERE created <= @createdBefore;"
+
+#define SQL_PRAGMA_SYNC \
+ "PRAGMA synchronous=%i;"
+
+#define SQL_PRAGMA_JOURNAL_MODE \
+ "PRAGMA journal_mode=%s;"
+
+#define SQL_PRAGMA_LOCKING_MODE \
+ "PRAGMA locking_mode=EXCLUSIVE;"
+
+#endif