summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/out_pgsql
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/plugins/out_pgsql')
-rw-r--r--src/fluent-bit/plugins/out_pgsql/CMakeLists.txt8
-rw-r--r--src/fluent-bit/plugins/out_pgsql/pgsql.c389
-rw-r--r--src/fluent-bit/plugins/out_pgsql/pgsql.h91
-rw-r--r--src/fluent-bit/plugins/out_pgsql/pgsql_connections.c193
-rw-r--r--src/fluent-bit/plugins/out_pgsql/pgsql_connections.h27
5 files changed, 708 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/out_pgsql/CMakeLists.txt b/src/fluent-bit/plugins/out_pgsql/CMakeLists.txt
new file mode 100644
index 000000000..6206c02f9
--- /dev/null
+++ b/src/fluent-bit/plugins/out_pgsql/CMakeLists.txt
@@ -0,0 +1,8 @@
+set(src
+ pgsql.c
+ pgsql_connections.c
+ )
+
+FLB_PLUGIN(out_pgsql "${src}" "")
+target_include_directories(flb-plugin-out_pgsql PRIVATE ${PostgreSQL_INCLUDE_DIRS})
+target_link_libraries(flb-plugin-out_pgsql -lpq)
diff --git a/src/fluent-bit/plugins/out_pgsql/pgsql.c b/src/fluent-bit/plugins/out_pgsql/pgsql.c
new file mode 100644
index 000000000..a01090c1a
--- /dev/null
+++ b/src/fluent-bit/plugins/out_pgsql/pgsql.c
@@ -0,0 +1,389 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_output.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_pack.h>
+
+#include "pgsql.h"
+#include "pgsql_connections.h"
+
+void pgsql_conf_destroy(struct flb_pgsql_config *ctx)
+{
+ pgsql_destroy_connections(ctx);
+
+ flb_free(ctx->db_hostname);
+
+ if (ctx->db_table != NULL) {
+ flb_sds_destroy(ctx->db_table);
+ }
+
+ if (ctx->timestamp_key != NULL) {
+ flb_sds_destroy(ctx->timestamp_key);
+ }
+
+ flb_free(ctx);
+ ctx = NULL;
+}
+
+static int cb_pgsql_init(struct flb_output_instance *ins,
+ struct flb_config *config, void *data)
+{
+
+ struct flb_pgsql_config *ctx;
+ size_t str_len;
+ PGresult *res;
+ char *query = NULL;
+ char *temp = NULL;
+ const char *tmp = NULL;
+ int ret;
+
+ /* set default network configuration */
+ flb_output_net_default(FLB_PGSQL_HOST, FLB_PGSQL_PORT, ins);
+
+ ctx = flb_calloc(1, sizeof(struct flb_pgsql_config));
+ if (!ctx) {
+ flb_errno();
+ return -1;
+ }
+
+ ctx->ins = ins;
+
+ /* Database host */
+ ctx->db_hostname = flb_strdup(ins->host.name);
+ if (!ctx->db_hostname) {
+ flb_errno();
+ pgsql_conf_destroy(ctx);
+ return -1;
+ }
+
+ /* Database port */
+ snprintf(ctx->db_port, sizeof(ctx->db_port), "%d", ins->host.port);
+
+ /* Database name */
+ ctx->db_name = flb_output_get_property("database", ins);
+ if (!ctx->db_name) {
+ ctx->db_name = FLB_PGSQL_DBNAME;
+ }
+
+ /* db table */
+ tmp = flb_output_get_property("table", ins);
+ if (tmp) {
+ ctx->db_table = flb_sds_create(tmp);
+ }
+ else {
+ ctx->db_table = flb_sds_create(FLB_PGSQL_TABLE);
+ }
+
+ /* connection options */
+ ctx->conn_options = flb_output_get_property("connection_options", ins);
+
+ if (!ctx->db_table) {
+ flb_errno();
+ pgsql_conf_destroy(ctx);
+ return -1;
+ }
+
+ /* db user */
+ ctx->db_user = flb_output_get_property("user", ins);
+ if (!ctx->db_user) {
+ flb_plg_warn(ctx->ins,
+ "You didn't supply a valid user to connect,"
+ "your current unix user will be used");
+ }
+
+ /* db user password */
+ ctx->db_passwd = flb_output_get_property("password", ins);
+
+ /* timestamp key */
+ tmp = flb_output_get_property("timestamp_key", ins);
+ if (tmp) {
+ ctx->timestamp_key = flb_sds_create(tmp);
+ }
+ else {
+ ctx->timestamp_key = flb_sds_create(FLB_PGSQL_TIMESTAMP_KEY);
+ }
+
+ if (!ctx->timestamp_key) {
+ flb_errno();
+ pgsql_conf_destroy(ctx);
+ return -1;
+ }
+
+ /* Pool size */
+ tmp = flb_output_get_property("max_pool_size", ins);
+ if (tmp) {
+ ctx->max_pool_size = strtol(tmp, NULL, 0);
+ if (ctx->max_pool_size < 1)
+ ctx->max_pool_size = 1;
+ }
+ else {
+ ctx->max_pool_size = FLB_PGSQL_POOL_SIZE;
+ }
+
+ tmp = flb_output_get_property("min_pool_size", ins);
+ if (tmp) {
+ ctx->min_pool_size = strtol(tmp, NULL, 0);
+ if (ctx->min_pool_size < 1 || ctx->min_pool_size > ctx->max_pool_size)
+ ctx->min_pool_size = ctx->max_pool_size;
+ }
+ else {
+ ctx->min_pool_size = FLB_PGSQL_MIN_POOL_SIZE;
+ }
+
+ /* Sync Mode */
+ tmp = flb_output_get_property("async", ins);
+ if (tmp && flb_utils_bool(tmp)) {
+ ctx->async = FLB_TRUE;
+ }
+ else {
+ ctx->async = FLB_FALSE;
+ }
+
+ if (!ctx->async) {
+ ctx->min_pool_size = 1;
+ ctx->max_pool_size = 1;
+ }
+
+ /* CockroachDB Support */
+ tmp = flb_output_get_property("cockroachdb", ins);
+ if (tmp && flb_utils_bool(tmp)) {
+ ctx->cockroachdb = FLB_TRUE;
+ }
+ else {
+ ctx->cockroachdb = FLB_FALSE;
+ }
+
+ ret = pgsql_start_connections(ctx);
+ if (ret) {
+ return -1;
+ }
+
+ flb_plg_info(ctx->ins, "host=%s port=%s dbname=%s OK",
+ ctx->db_hostname, ctx->db_port, ctx->db_name);
+ flb_output_set_context(ins, ctx);
+
+ temp = PQescapeIdentifier(ctx->conn_current->conn, ctx->db_table,
+ flb_sds_len(ctx->db_table));
+
+ if (temp == NULL) {
+ flb_plg_error(ctx->ins, "failed to parse table name: %s",
+ PQerrorMessage(ctx->conn_current->conn));
+ pgsql_conf_destroy(ctx);
+ return -1;
+ }
+
+ flb_sds_destroy(ctx->db_table);
+ ctx->db_table = flb_sds_create(temp);
+ PQfreemem(temp);
+
+ if (!ctx->db_table) {
+ flb_errno();
+ pgsql_conf_destroy(ctx);
+ return -1;
+ }
+
+ flb_plg_info(ctx->ins, "we check that the table %s "
+ "exists, if not we create it", ctx->db_table);
+
+ str_len = 72 + flb_sds_len(ctx->db_table);
+
+ query = flb_malloc(str_len);
+ if (query == NULL) {
+ flb_errno();
+ pgsql_conf_destroy(ctx);
+ return -1;
+ }
+
+ /* Maybe use the timestamp with the TZ specified */
+ /* in the postgresql connection? */
+ snprintf(query, str_len,
+ "CREATE TABLE IF NOT EXISTS %s "
+ "(tag varchar, time timestamp, data jsonb);",
+ ctx->db_table);
+ flb_plg_trace(ctx->ins, "%s", query);
+ res = PQexec(ctx->conn_current->conn, query);
+
+ flb_free(query);
+
+ if (PQresultStatus(res) != PGRES_COMMAND_OK) {
+ flb_plg_error(ctx->ins, "%s",
+ PQerrorMessage(ctx->conn_current->conn));
+ pgsql_conf_destroy(ctx);
+ return -1;
+ }
+
+ PQclear(res);
+
+ return 0;
+}
+
+static void cb_pgsql_flush(struct flb_event_chunk *event_chunk,
+ struct flb_output_flush *out_flush,
+ struct flb_input_instance *i_ins,
+ void *out_context,
+ struct flb_config *config)
+{
+ struct flb_pgsql_config *ctx = out_context;
+ flb_sds_t json;
+ char *tmp = NULL;
+ char *query = NULL;
+ PGresult *res = NULL;
+ int send_res;
+ flb_sds_t tag_escaped = NULL;
+ size_t str_len;
+
+
+ if (pgsql_next_connection(ctx) == 1) {
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ /*
+ * PQreset()
+ * This function will close the connection to the server and attempt to
+ * reestablish a new connection to the same server, using all the same
+ * parameters previously used. This might be useful for error recovery
+ * if a working connection is lost.
+ */
+ if (PQstatus(ctx->conn_current->conn) != CONNECTION_OK) {
+ PQreset(ctx->conn_current->conn);
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ json = flb_pack_msgpack_to_json_format(event_chunk->data,
+ event_chunk->size,
+ FLB_PACK_JSON_FORMAT_JSON,
+ FLB_PACK_JSON_DATE_DOUBLE,
+ ctx->timestamp_key);
+ if (json == NULL) {
+ flb_errno();
+ flb_plg_error(ctx->ins,
+ "Can't parse the msgpack into json");
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ tmp = PQescapeLiteral(ctx->conn_current->conn, json, flb_sds_len(json));
+ flb_sds_destroy(json);
+ if (!tmp) {
+ flb_errno();
+ PQfreemem(tmp);
+ flb_plg_error(ctx->ins, "Can't escape json string");
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ json = flb_sds_create(tmp);
+ PQfreemem(tmp);
+ if (!json) {
+ flb_errno();
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ tmp = PQescapeLiteral(ctx->conn_current->conn,
+ event_chunk->tag,
+ flb_sds_len(event_chunk->tag));
+ if (!tmp) {
+ flb_errno();
+ flb_sds_destroy(json);
+ PQfreemem(tmp);
+ flb_plg_error(ctx->ins, "Can't escape tag string: %s",
+ event_chunk->tag);
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ tag_escaped = flb_sds_create(tmp);
+ PQfreemem(tmp);
+ if (!tag_escaped) {
+ flb_errno();
+ flb_sds_destroy(json);
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ str_len = 100 + flb_sds_len(json)
+ + flb_sds_len(tag_escaped)
+ + flb_sds_len(ctx->db_table)
+ + flb_sds_len(ctx->timestamp_key);
+ query = flb_malloc(str_len);
+
+ if (query == NULL) {
+ flb_errno();
+ flb_sds_destroy(json);
+ flb_sds_destroy(tag_escaped);
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+
+ snprintf(query, str_len,
+ ctx->cockroachdb ? FLB_PGSQL_INSERT_COCKROACH : FLB_PGSQL_INSERT,
+ ctx->db_table, tag_escaped, ctx->timestamp_key, json);
+ flb_plg_trace(ctx->ins, "query: %s", query);
+
+ if (ctx->async) {
+ send_res = PQsendQuery(ctx->conn_current->conn, query);
+ flb_free(query);
+ flb_sds_destroy(json);
+ flb_sds_destroy(tag_escaped);
+
+ if (send_res == 0) {
+ flb_plg_error(ctx->ins, "%s",
+ PQerrorMessage(ctx->conn_current->conn));
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ PQflush(ctx->conn_current->conn);
+ }
+ else {
+ res = PQexec(ctx->conn_current->conn, query);
+ flb_free(query);
+ flb_sds_destroy(json);
+ flb_sds_destroy(tag_escaped);
+
+ if (PQresultStatus(res) != PGRES_COMMAND_OK) {
+ flb_plg_error(ctx->ins, "%s",
+ PQerrorMessage(ctx->conn_current->conn));
+ PQclear(res);
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+ PQclear(res);
+ }
+
+ FLB_OUTPUT_RETURN(FLB_OK);
+}
+
+static int cb_pgsql_exit(void *data, struct flb_config *config)
+{
+ struct flb_pgsql_config *ctx = data;
+
+ if (!ctx){
+ return 0;
+ }
+
+ pgsql_conf_destroy(ctx);
+
+ return 0;
+}
+
+struct flb_output_plugin out_pgsql_plugin = {
+ .name = "pgsql",
+ .description = "PostgreSQL",
+ .cb_init = cb_pgsql_init,
+ .cb_flush = cb_pgsql_flush,
+ .cb_exit = cb_pgsql_exit,
+ .flags = 0,
+};
diff --git a/src/fluent-bit/plugins/out_pgsql/pgsql.h b/src/fluent-bit/plugins/out_pgsql/pgsql.h
new file mode 100644
index 000000000..5190a5a54
--- /dev/null
+++ b/src/fluent-bit/plugins/out_pgsql/pgsql.h
@@ -0,0 +1,91 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_OUT_PGSQL_H
+#define FLB_OUT_PGSQL_H
+
+#include <fluent-bit/flb_output.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_output_plugin.h>
+
+#include <libpq-fe.h>
+
+#define FLB_PGSQL_HOST "127.0.0.1"
+#define FLB_PGSQL_PORT 5432
+#define FLB_PGSQL_DBNAME "fluentbit"
+#define FLB_PGSQL_TABLE "fluentbit"
+#define FLB_PGSQL_TIMESTAMP_KEY "date"
+#define FLB_PGSQL_POOL_SIZE 4
+#define FLB_PGSQL_MIN_POOL_SIZE 1
+#define FLB_PGSQL_SYNC FLB_FALSE
+#define FLB_PGSQL_COCKROACH FLB_FALSE
+
+#define FLB_PGSQL_INSERT "INSERT INTO %s SELECT %s, " \
+ "to_timestamp(CAST(value->>'%s' as FLOAT))," \
+ " * FROM json_array_elements(%s);"
+#define FLB_PGSQL_INSERT_COCKROACH "INSERT INTO %s SELECT %s," \
+ "CAST(value->>'%s' AS INTERVAL) + DATE'1970-01-01'," \
+ " * FROM json_array_elements(%s);"
+
+struct flb_pgsql_conn {
+ struct mk_list _head;
+ PGconn *conn;
+ int number;
+};
+
+struct flb_pgsql_config {
+
+ /* database */
+ char *db_hostname;
+ char db_port[8];
+ const char *db_name;
+ flb_sds_t db_table;
+
+ /* auth */
+ const char *db_user;
+ const char *db_passwd;
+
+ /* time key */
+ flb_sds_t timestamp_key;
+
+ /* instance reference */
+ struct flb_output_instance *ins;
+
+ /* connections options */
+ const char *conn_options;
+
+ /* connections pool */
+ struct mk_list conn_queue;
+ struct mk_list _head;
+
+ struct flb_pgsql_conn *conn_current;
+ int max_pool_size;
+ int min_pool_size;
+ int active_conn;
+
+ /* async mode or sync mode */
+ int async;
+
+ /* cockroachdb */
+ int cockroachdb;
+};
+
+void pgsql_conf_destroy(struct flb_pgsql_config *ctx);
+
+#endif
diff --git a/src/fluent-bit/plugins/out_pgsql/pgsql_connections.c b/src/fluent-bit/plugins/out_pgsql/pgsql_connections.c
new file mode 100644
index 000000000..9c4ccfba2
--- /dev/null
+++ b/src/fluent-bit/plugins/out_pgsql/pgsql_connections.c
@@ -0,0 +1,193 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_output_plugin.h>
+
+#include "pgsql.h"
+
+void pgsql_destroy_connections(struct flb_pgsql_config *ctx)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_pgsql_conn *conn;
+ PGresult *res = NULL;
+
+ mk_list_foreach_safe(head, tmp, &ctx->conn_queue) {
+ conn = mk_list_entry(head, struct flb_pgsql_conn, _head);
+ if (PQstatus(conn->conn) == CONNECTION_OK) {
+ while(PQconsumeInput(conn->conn) == 0) {
+ res = PQgetResult(conn->conn);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK) {
+ flb_plg_warn(ctx->ins, "%s",
+ PQerrorMessage(conn->conn));
+ }
+ PQclear(res);
+ }
+ }
+ PQfinish(conn->conn);
+ flb_free(conn);
+ }
+}
+
+void *pgsql_create_connection(struct flb_pgsql_config *ctx)
+{
+ struct flb_pgsql_conn *conn;
+
+ conn = flb_calloc(1, sizeof(struct flb_pgsql_conn));
+ if (!conn) {
+ flb_errno();
+ return NULL;
+ }
+
+ conn->conn = PQsetdbLogin(ctx->db_hostname,
+ ctx->db_port,
+ ctx->conn_options,
+ NULL,
+ ctx->db_name,
+ ctx->db_user,
+ ctx->db_passwd);
+
+ if (PQstatus(conn->conn) != CONNECTION_OK) {
+ flb_plg_error(ctx->ins,
+ "failed connecting to host=%s with error: %s",
+ ctx->db_hostname, PQerrorMessage(conn->conn));
+ PQfinish(conn->conn);
+ flb_free(conn);
+ return NULL;
+ }
+
+ flb_plg_info(ctx->ins, "switching postgresql connection "
+ "to non-blocking mode");
+
+ if (PQsetnonblocking(conn->conn, 1) != 0) {
+ flb_plg_error(ctx->ins, "non-blocking mode not set");
+ PQfinish(conn->conn);
+ flb_free(conn);
+ return NULL;
+ }
+
+ return conn;
+}
+
+int pgsql_start_connections(struct flb_pgsql_config *ctx)
+{
+ int i;
+ struct flb_pgsql_conn *conn = NULL;
+
+ mk_list_init(&ctx->conn_queue);
+ ctx->active_conn = 0;
+
+ for(i = 0; i < ctx->min_pool_size; i++) {
+ flb_plg_info(ctx->ins, "Opening connection: #%d", i);
+
+ conn = (struct flb_pgsql_conn *)pgsql_create_connection(ctx);
+ if (conn == NULL) {
+ pgsql_conf_destroy(ctx);
+ return -1;
+ }
+
+ conn->number = i;
+ ctx->active_conn++;
+ mk_list_add(&conn->_head, &ctx->conn_queue);
+ }
+
+ ctx->conn_current = mk_list_entry_last(&ctx->conn_queue,
+ struct flb_pgsql_conn,
+ _head);
+
+ return 0;
+}
+
+int pgsql_new_connection(struct flb_pgsql_config *ctx)
+{
+ struct flb_pgsql_conn *conn = NULL;
+
+ if (ctx->active_conn >= ctx->max_pool_size) {
+ return -1;
+ }
+
+ conn = (struct flb_pgsql_conn *)pgsql_create_connection(ctx);
+ if (conn == NULL) {
+ pgsql_conf_destroy(ctx);
+ return -1;
+ }
+
+ conn->number = ctx->active_conn + 1;
+ ctx->active_conn++;
+
+ mk_list_add(&conn->_head, &ctx->conn_queue);
+
+ return 0;
+}
+
+int pgsql_next_connection(struct flb_pgsql_config *ctx)
+{
+ struct flb_pgsql_conn *tmp = NULL;
+ PGresult *res = NULL;
+ struct mk_list *head;
+ int ret_conn = 1;
+
+ if (PQconsumeInput(ctx->conn_current->conn) == 1) {
+ if (PQisBusy(ctx->conn_current->conn) == 0) {
+ res = PQgetResult(ctx->conn_current->conn);
+ PQclear(res);
+ }
+ }
+ else {
+ flb_plg_error(ctx->ins, "%s",
+ PQerrorMessage(ctx->conn_current->conn));
+ }
+
+ mk_list_foreach(head, &ctx->conn_queue) {
+ tmp = mk_list_entry(head, struct flb_pgsql_conn, _head);
+ if (ctx->conn_current == NULL) {
+ ctx->conn_current = tmp;
+ break;
+ }
+
+ res = PQgetResult(tmp->conn);
+
+ if (res == NULL) {
+ flb_plg_debug(ctx->ins, "Connection number %d",
+ tmp->number);
+ ctx->conn_current = tmp;
+ PQclear(res);
+ return 0;
+ }
+
+ if (PQresultStatus(res) == PGRES_FATAL_ERROR) {
+ flb_plg_info(ctx->ins, "%s",
+ PQerrorMessage(tmp->conn));
+ }
+
+ PQclear(res);
+ }
+
+ if (pgsql_new_connection(ctx) == -1) {
+ flb_plg_warn(ctx->ins,
+ "No more free connections."
+ " Increase max connections");
+ }
+ else {
+ flb_plg_warn(ctx->ins, "Added new connection");
+ ret_conn = pgsql_next_connection(ctx);
+ }
+
+ return ret_conn;
+}
diff --git a/src/fluent-bit/plugins/out_pgsql/pgsql_connections.h b/src/fluent-bit/plugins/out_pgsql/pgsql_connections.h
new file mode 100644
index 000000000..cd8730618
--- /dev/null
+++ b/src/fluent-bit/plugins/out_pgsql/pgsql_connections.h
@@ -0,0 +1,27 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_OUT_PGSQL_CONNECTIONS_H
+#define FLB_OUT_PGSQL_CONNECTIONS_H
+
+void pgsql_destroy_connections(struct flb_pgsql_config *ctx);
+int pgsql_start_connections(struct flb_pgsql_config *ctx);
+int pgsql_next_connection(struct flb_pgsql_config *ctx);
+
+#endif