summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events_conf.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events_conf.c')
-rw-r--r--src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events_conf.c326
1 files changed, 326 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events_conf.c b/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events_conf.c
new file mode 100644
index 000000000..4f67d8cfc
--- /dev/null
+++ b/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events_conf.c
@@ -0,0 +1,326 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2023 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "kubernetes_events_conf.h"
+
+#ifdef FLB_HAVE_SQLDB
+#include "kubernetes_events_sql.h"
+
+
+/* Open or create database required by tail plugin */
+static struct flb_sqldb *flb_kubernetes_event_db_open(const char *path,
+ struct flb_input_instance *in,
+ struct k8s_events *ctx,
+ struct flb_config *config)
+{
+ int ret;
+ char tmp[64];
+ struct flb_sqldb *db;
+
+ /* Open/create the database */
+ db = flb_sqldb_open(path, in->name, config);
+ if (!db) {
+ return NULL;
+ }
+
+ /* Create table schema if it don't exists */
+ ret = flb_sqldb_query(db, SQL_CREATE_KUBERNETES_EVENTS, NULL, NULL);
+ if (ret != FLB_OK) {
+ flb_plg_error(ctx->ins, "db: could not create 'in_kubernetes_events' table");
+ flb_sqldb_close(db);
+ return NULL;
+ }
+
+ if (ctx->db_sync >= 0) {
+ snprintf(tmp, sizeof(tmp) - 1, SQL_PRAGMA_SYNC,
+ ctx->db_sync);
+ ret = flb_sqldb_query(db, tmp, NULL, NULL);
+ if (ret != FLB_OK) {
+ flb_plg_error(ctx->ins, "db could not set pragma 'sync'");
+ flb_sqldb_close(db);
+ return NULL;
+ }
+ }
+
+ if (ctx->db_locking == FLB_TRUE) {
+ ret = flb_sqldb_query(db, SQL_PRAGMA_LOCKING_MODE, NULL, NULL);
+ if (ret != FLB_OK) {
+ flb_plg_error(ctx->ins, "db: could not set pragma 'locking_mode'");
+ flb_sqldb_close(db);
+ return NULL;
+ }
+ }
+
+ if (ctx->db_journal_mode) {
+ snprintf(tmp, sizeof(tmp) - 1, SQL_PRAGMA_JOURNAL_MODE,
+ ctx->db_journal_mode);
+ ret = flb_sqldb_query(db, tmp, NULL, NULL);
+ if (ret != FLB_OK) {
+ flb_plg_error(ctx->ins, "db could not set pragma 'journal_mode'");
+ flb_sqldb_close(db);
+ return NULL;
+ }
+ }
+
+ return db;
+}
+
+static int flb_kubernetes_event_db_close(struct flb_sqldb *db)
+{
+ flb_sqldb_close(db);
+ return 0;
+}
+
+#endif
+
+static int network_init(struct k8s_events *ctx, struct flb_config *config)
+{
+ int io_type = FLB_IO_TCP;
+
+ ctx->upstream = NULL;
+
+ if (ctx->api_https == FLB_TRUE) {
+ if (!ctx->tls_ca_path && !ctx->tls_ca_file) {
+ ctx->tls_ca_file = flb_strdup(K8S_EVENTS_KUBE_CA);
+ }
+
+ /* create a custom TLS context since we use user-defined certs */
+ ctx->tls = flb_tls_create(FLB_TLS_CLIENT_MODE,
+ ctx->tls_verify,
+ ctx->tls_debug,
+ ctx->tls_vhost,
+ ctx->tls_ca_path,
+ ctx->tls_ca_file,
+ NULL, NULL, NULL);
+ if (!ctx->tls) {
+ return -1;
+ }
+
+ io_type = FLB_IO_TLS;
+ }
+
+ /* Create an Upstream context */
+ ctx->upstream = flb_upstream_create(config,
+ ctx->api_host,
+ ctx->api_port,
+ io_type,
+ ctx->tls);
+ if (!ctx->upstream) {
+ flb_plg_error(ctx->ins, "network initialization failed");
+ return -1;
+ }
+
+ return 0;
+}
+
+struct k8s_events *k8s_events_conf_create(struct flb_input_instance *ins)
+{
+ int off;
+ int ret;
+ const char *p;
+ const char *url;
+ const char *tmp;
+ struct k8s_events *ctx = NULL;
+ pthread_mutexattr_t attr;
+
+ ctx = flb_calloc(1, sizeof(struct k8s_events));
+ if (!ctx) {
+ flb_errno();
+ return NULL;
+ }
+ ctx->ins = ins;
+
+ pthread_mutexattr_init(&attr);
+ pthread_mutex_init(&ctx->lock, &attr);
+
+ /* Load the config map */
+ ret = flb_input_config_map_set(ins, (void *) ctx);
+ if (ret == -1) {
+ flb_free(ctx);
+ return NULL;
+ }
+ flb_input_set_context(ins, ctx);
+
+ ctx->encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_DEFAULT);
+ if (!ctx->encoder) {
+ flb_plg_error(ins, "could not initialize event encoder");
+ k8s_events_conf_destroy(ctx);
+ return NULL;
+ }
+
+ /* Record accessor pattern */
+ ctx->ra_timestamp = flb_ra_create(K8S_EVENTS_RA_TIMESTAMP, FLB_TRUE);
+ if (!ctx->ra_timestamp) {
+ flb_plg_error(ctx->ins,
+ "could not create record accessor for metadata items");
+ k8s_events_conf_destroy(ctx);
+ return NULL;
+ }
+
+ ctx->ra_resource_version = flb_ra_create(K8S_EVENTS_RA_RESOURCE_VERSION, FLB_TRUE);
+ if (!ctx->ra_resource_version) {
+ flb_plg_error(ctx->ins, "could not create record accessor for resource version");
+ k8s_events_conf_destroy(ctx);
+ return NULL;
+ }
+
+ /* Get Kubernetes API server */
+ url = flb_input_get_property("kube_url", ins);
+ if (!url) {
+ ctx->api_host = flb_strdup(K8S_EVENTS_KUBE_API_HOST);
+ ctx->api_port = K8S_EVENTS_KUBE_API_PORT;
+ ctx->api_https = FLB_TRUE;
+ }
+ else {
+ tmp = url;
+
+ /* Check the protocol */
+ if (strncmp(tmp, "http://", 7) == 0) {
+ off = 7;
+ ctx->api_https = FLB_FALSE;
+ }
+ else if (strncmp(tmp, "https://", 8) == 0) {
+ off = 8;
+ ctx->api_https = FLB_TRUE;
+ }
+ else {
+ k8s_events_conf_destroy(ctx);
+ return NULL;
+ }
+
+ /* Get hostname and TCP port */
+ p = url + off;
+ tmp = strchr(p, ':');
+ if (tmp) {
+ ctx->api_host = flb_strndup(p, tmp - p);
+ tmp++;
+ ctx->api_port = atoi(tmp);
+ }
+ else {
+ ctx->api_host = flb_strdup(p);
+ ctx->api_port = K8S_EVENTS_KUBE_API_PORT;
+ }
+ }
+ snprintf(ctx->kube_url, sizeof(ctx->kube_url) - 1,
+ "%s://%s:%i",
+ ctx->api_https ? "https" : "http",
+ ctx->api_host, ctx->api_port);
+
+ flb_plg_info(ctx->ins, "API server: %s", ctx->kube_url);
+
+ /* network setup */
+ ret = network_init(ctx, ins->config);
+ if (ret == -1) {
+ k8s_events_conf_destroy(ctx);
+ return NULL;
+ }
+
+#ifdef FLB_HAVE_SQLDB
+ /* Initialize database */
+ tmp = flb_input_get_property("db", ins);
+ if (tmp) {
+ ctx->db = flb_kubernetes_event_db_open(tmp, ins, ctx, ins->config);
+ if (!ctx->db) {
+ flb_plg_error(ctx->ins, "could not open/create database");
+ k8s_events_conf_destroy(ctx);
+ return NULL;
+ }
+ }
+
+ if (ctx->db) {
+ ret = sqlite3_prepare_v2(ctx->db->handler,
+ SQL_KUBERNETES_EVENT_EXISTS_BY_UID,
+ -1,
+ &ctx->stmt_get_kubernetes_event_exists_by_uid,
+ 0);
+ if (ret != SQLITE_OK) {
+ flb_plg_error(ctx->ins, "error preparing database SQL statement: stmt_get_kubernetes_event_exists_by_uid");
+ k8s_events_conf_destroy(ctx);
+ return NULL;
+ }
+
+ ret = sqlite3_prepare_v2(ctx->db->handler,
+ SQL_INSERT_KUBERNETES_EVENTS,
+ -1,
+ &ctx->stmt_insert_kubernetes_event,
+ 0);
+ if (ret != SQLITE_OK) {
+ flb_plg_error(ctx->ins, "error preparing database SQL statement: stmt_insert_kubernetes_event");
+ k8s_events_conf_destroy(ctx);
+ return NULL;
+ }
+
+ ret = sqlite3_prepare_v2(ctx->db->handler,
+ SQL_DELETE_OLD_KUBERNETES_EVENTS,
+ -1,
+ &ctx->stmt_delete_old_kubernetes_events,
+ 0);
+ if (ret != SQLITE_OK) {
+ flb_plg_error(ctx->ins, "error preparing database SQL statement: stmt_delete_old_kubernetes_events");
+ k8s_events_conf_destroy(ctx);
+ return NULL;
+ }
+ }
+#endif
+
+ return ctx;
+}
+
+void k8s_events_conf_destroy(struct k8s_events *ctx)
+{
+ if (ctx->ra_timestamp) {
+ flb_ra_destroy(ctx->ra_timestamp);
+ }
+
+ if (ctx->ra_resource_version) {
+ flb_ra_destroy(ctx->ra_resource_version);
+ }
+
+ if (ctx->upstream) {
+ flb_upstream_destroy(ctx->upstream);
+ }
+
+ if (ctx->encoder) {
+ flb_log_event_encoder_destroy(ctx->encoder);
+ }
+
+ if (ctx->api_host) {
+ flb_free(ctx->api_host);
+ }
+ if (ctx->token) {
+ flb_free(ctx->token);
+ }
+ if (ctx->auth) {
+ flb_free(ctx->auth);
+ }
+
+#ifdef FLB_HAVE_TLS
+ if (ctx->tls) {
+ flb_tls_destroy(ctx->tls);
+ }
+#endif
+
+#ifdef FLB_HAVE_SQLDB
+ if (ctx->db) {
+ flb_kubernetes_event_db_close(ctx->db);
+ }
+#endif
+
+ flb_free(ctx);
+}