diff options
Diffstat (limited to 'fluent-bit/plugins/out_pgsql')
-rw-r--r-- | fluent-bit/plugins/out_pgsql/CMakeLists.txt | 8 | ||||
-rw-r--r-- | fluent-bit/plugins/out_pgsql/pgsql.c | 389 | ||||
-rw-r--r-- | fluent-bit/plugins/out_pgsql/pgsql.h | 91 | ||||
-rw-r--r-- | fluent-bit/plugins/out_pgsql/pgsql_connections.c | 193 | ||||
-rw-r--r-- | fluent-bit/plugins/out_pgsql/pgsql_connections.h | 27 |
5 files changed, 708 insertions, 0 deletions
diff --git a/fluent-bit/plugins/out_pgsql/CMakeLists.txt b/fluent-bit/plugins/out_pgsql/CMakeLists.txt new file mode 100644 index 000000000..6206c02f9 --- /dev/null +++ b/fluent-bit/plugins/out_pgsql/CMakeLists.txt @@ -0,0 +1,8 @@ +set(src + pgsql.c + pgsql_connections.c + ) + +FLB_PLUGIN(out_pgsql "${src}" "") +target_include_directories(flb-plugin-out_pgsql PRIVATE ${PostgreSQL_INCLUDE_DIRS}) +target_link_libraries(flb-plugin-out_pgsql -lpq) diff --git a/fluent-bit/plugins/out_pgsql/pgsql.c b/fluent-bit/plugins/out_pgsql/pgsql.c new file mode 100644 index 000000000..a01090c1a --- /dev/null +++ b/fluent-bit/plugins/out_pgsql/pgsql.c @@ -0,0 +1,389 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_output.h> +#include <fluent-bit/flb_utils.h> +#include <fluent-bit/flb_pack.h> + +#include "pgsql.h" +#include "pgsql_connections.h" + +void pgsql_conf_destroy(struct flb_pgsql_config *ctx) +{ + pgsql_destroy_connections(ctx); + + flb_free(ctx->db_hostname); + + if (ctx->db_table != NULL) { + flb_sds_destroy(ctx->db_table); + } + + if (ctx->timestamp_key != NULL) { + flb_sds_destroy(ctx->timestamp_key); + } + + flb_free(ctx); + ctx = NULL; +} + +static int cb_pgsql_init(struct flb_output_instance *ins, + struct flb_config *config, void *data) +{ + + struct flb_pgsql_config *ctx; + size_t str_len; + PGresult *res; + char *query = NULL; + char *temp = NULL; + const char *tmp = NULL; + int ret; + + /* set default network configuration */ + flb_output_net_default(FLB_PGSQL_HOST, FLB_PGSQL_PORT, ins); + + ctx = flb_calloc(1, sizeof(struct flb_pgsql_config)); + if (!ctx) { + flb_errno(); + return -1; + } + + ctx->ins = ins; + + /* Database host */ + ctx->db_hostname = flb_strdup(ins->host.name); + if (!ctx->db_hostname) { + flb_errno(); + pgsql_conf_destroy(ctx); + return -1; + } + + /* Database port */ + snprintf(ctx->db_port, sizeof(ctx->db_port), "%d", ins->host.port); + + /* Database name */ + ctx->db_name = flb_output_get_property("database", ins); + if (!ctx->db_name) { + ctx->db_name = FLB_PGSQL_DBNAME; + } + + /* db table */ + tmp = flb_output_get_property("table", ins); + if (tmp) { + ctx->db_table = flb_sds_create(tmp); + } + else { + ctx->db_table = flb_sds_create(FLB_PGSQL_TABLE); + } + + /* connection options */ + ctx->conn_options = flb_output_get_property("connection_options", ins); + + if (!ctx->db_table) { + flb_errno(); + pgsql_conf_destroy(ctx); + return -1; + } + + /* db user */ + ctx->db_user = flb_output_get_property("user", ins); + if (!ctx->db_user) { + flb_plg_warn(ctx->ins, + "You didn't supply a valid user to connect," + "your current unix user will be used"); + } + + /* db user password */ + ctx->db_passwd = flb_output_get_property("password", ins); + + /* timestamp key */ + tmp = flb_output_get_property("timestamp_key", ins); + if (tmp) { + ctx->timestamp_key = flb_sds_create(tmp); + } + else { + ctx->timestamp_key = flb_sds_create(FLB_PGSQL_TIMESTAMP_KEY); + } + + if (!ctx->timestamp_key) { + flb_errno(); + pgsql_conf_destroy(ctx); + return -1; + } + + /* Pool size */ + tmp = flb_output_get_property("max_pool_size", ins); + if (tmp) { + ctx->max_pool_size = strtol(tmp, NULL, 0); + if (ctx->max_pool_size < 1) + ctx->max_pool_size = 1; + } + else { + ctx->max_pool_size = FLB_PGSQL_POOL_SIZE; + } + + tmp = flb_output_get_property("min_pool_size", ins); + if (tmp) { + ctx->min_pool_size = strtol(tmp, NULL, 0); + if (ctx->min_pool_size < 1 || ctx->min_pool_size > ctx->max_pool_size) + ctx->min_pool_size = ctx->max_pool_size; + } + else { + ctx->min_pool_size = FLB_PGSQL_MIN_POOL_SIZE; + } + + /* Sync Mode */ + tmp = flb_output_get_property("async", ins); + if (tmp && flb_utils_bool(tmp)) { + ctx->async = FLB_TRUE; + } + else { + ctx->async = FLB_FALSE; + } + + if (!ctx->async) { + ctx->min_pool_size = 1; + ctx->max_pool_size = 1; + } + + /* CockroachDB Support */ + tmp = flb_output_get_property("cockroachdb", ins); + if (tmp && flb_utils_bool(tmp)) { + ctx->cockroachdb = FLB_TRUE; + } + else { + ctx->cockroachdb = FLB_FALSE; + } + + ret = pgsql_start_connections(ctx); + if (ret) { + return -1; + } + + flb_plg_info(ctx->ins, "host=%s port=%s dbname=%s OK", + ctx->db_hostname, ctx->db_port, ctx->db_name); + flb_output_set_context(ins, ctx); + + temp = PQescapeIdentifier(ctx->conn_current->conn, ctx->db_table, + flb_sds_len(ctx->db_table)); + + if (temp == NULL) { + flb_plg_error(ctx->ins, "failed to parse table name: %s", + PQerrorMessage(ctx->conn_current->conn)); + pgsql_conf_destroy(ctx); + return -1; + } + + flb_sds_destroy(ctx->db_table); + ctx->db_table = flb_sds_create(temp); + PQfreemem(temp); + + if (!ctx->db_table) { + flb_errno(); + pgsql_conf_destroy(ctx); + return -1; + } + + flb_plg_info(ctx->ins, "we check that the table %s " + "exists, if not we create it", ctx->db_table); + + str_len = 72 + flb_sds_len(ctx->db_table); + + query = flb_malloc(str_len); + if (query == NULL) { + flb_errno(); + pgsql_conf_destroy(ctx); + return -1; + } + + /* Maybe use the timestamp with the TZ specified */ + /* in the postgresql connection? */ + snprintf(query, str_len, + "CREATE TABLE IF NOT EXISTS %s " + "(tag varchar, time timestamp, data jsonb);", + ctx->db_table); + flb_plg_trace(ctx->ins, "%s", query); + res = PQexec(ctx->conn_current->conn, query); + + flb_free(query); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + flb_plg_error(ctx->ins, "%s", + PQerrorMessage(ctx->conn_current->conn)); + pgsql_conf_destroy(ctx); + return -1; + } + + PQclear(res); + + return 0; +} + +static void cb_pgsql_flush(struct flb_event_chunk *event_chunk, + struct flb_output_flush *out_flush, + struct flb_input_instance *i_ins, + void *out_context, + struct flb_config *config) +{ + struct flb_pgsql_config *ctx = out_context; + flb_sds_t json; + char *tmp = NULL; + char *query = NULL; + PGresult *res = NULL; + int send_res; + flb_sds_t tag_escaped = NULL; + size_t str_len; + + + if (pgsql_next_connection(ctx) == 1) { + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + /* + * PQreset() + * This function will close the connection to the server and attempt to + * reestablish a new connection to the same server, using all the same + * parameters previously used. This might be useful for error recovery + * if a working connection is lost. + */ + if (PQstatus(ctx->conn_current->conn) != CONNECTION_OK) { + PQreset(ctx->conn_current->conn); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + json = flb_pack_msgpack_to_json_format(event_chunk->data, + event_chunk->size, + FLB_PACK_JSON_FORMAT_JSON, + FLB_PACK_JSON_DATE_DOUBLE, + ctx->timestamp_key); + if (json == NULL) { + flb_errno(); + flb_plg_error(ctx->ins, + "Can't parse the msgpack into json"); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + tmp = PQescapeLiteral(ctx->conn_current->conn, json, flb_sds_len(json)); + flb_sds_destroy(json); + if (!tmp) { + flb_errno(); + PQfreemem(tmp); + flb_plg_error(ctx->ins, "Can't escape json string"); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + json = flb_sds_create(tmp); + PQfreemem(tmp); + if (!json) { + flb_errno(); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + tmp = PQescapeLiteral(ctx->conn_current->conn, + event_chunk->tag, + flb_sds_len(event_chunk->tag)); + if (!tmp) { + flb_errno(); + flb_sds_destroy(json); + PQfreemem(tmp); + flb_plg_error(ctx->ins, "Can't escape tag string: %s", + event_chunk->tag); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + tag_escaped = flb_sds_create(tmp); + PQfreemem(tmp); + if (!tag_escaped) { + flb_errno(); + flb_sds_destroy(json); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + str_len = 100 + flb_sds_len(json) + + flb_sds_len(tag_escaped) + + flb_sds_len(ctx->db_table) + + flb_sds_len(ctx->timestamp_key); + query = flb_malloc(str_len); + + if (query == NULL) { + flb_errno(); + flb_sds_destroy(json); + flb_sds_destroy(tag_escaped); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + + snprintf(query, str_len, + ctx->cockroachdb ? FLB_PGSQL_INSERT_COCKROACH : FLB_PGSQL_INSERT, + ctx->db_table, tag_escaped, ctx->timestamp_key, json); + flb_plg_trace(ctx->ins, "query: %s", query); + + if (ctx->async) { + send_res = PQsendQuery(ctx->conn_current->conn, query); + flb_free(query); + flb_sds_destroy(json); + flb_sds_destroy(tag_escaped); + + if (send_res == 0) { + flb_plg_error(ctx->ins, "%s", + PQerrorMessage(ctx->conn_current->conn)); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + PQflush(ctx->conn_current->conn); + } + else { + res = PQexec(ctx->conn_current->conn, query); + flb_free(query); + flb_sds_destroy(json); + flb_sds_destroy(tag_escaped); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + flb_plg_error(ctx->ins, "%s", + PQerrorMessage(ctx->conn_current->conn)); + PQclear(res); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + PQclear(res); + } + + FLB_OUTPUT_RETURN(FLB_OK); +} + +static int cb_pgsql_exit(void *data, struct flb_config *config) +{ + struct flb_pgsql_config *ctx = data; + + if (!ctx){ + return 0; + } + + pgsql_conf_destroy(ctx); + + return 0; +} + +struct flb_output_plugin out_pgsql_plugin = { + .name = "pgsql", + .description = "PostgreSQL", + .cb_init = cb_pgsql_init, + .cb_flush = cb_pgsql_flush, + .cb_exit = cb_pgsql_exit, + .flags = 0, +}; diff --git a/fluent-bit/plugins/out_pgsql/pgsql.h b/fluent-bit/plugins/out_pgsql/pgsql.h new file mode 100644 index 000000000..5190a5a54 --- /dev/null +++ b/fluent-bit/plugins/out_pgsql/pgsql.h @@ -0,0 +1,91 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_PGSQL_H +#define FLB_OUT_PGSQL_H + +#include <fluent-bit/flb_output.h> +#include <fluent-bit/flb_time.h> +#include <fluent-bit/flb_output_plugin.h> + +#include <libpq-fe.h> + +#define FLB_PGSQL_HOST "127.0.0.1" +#define FLB_PGSQL_PORT 5432 +#define FLB_PGSQL_DBNAME "fluentbit" +#define FLB_PGSQL_TABLE "fluentbit" +#define FLB_PGSQL_TIMESTAMP_KEY "date" +#define FLB_PGSQL_POOL_SIZE 4 +#define FLB_PGSQL_MIN_POOL_SIZE 1 +#define FLB_PGSQL_SYNC FLB_FALSE +#define FLB_PGSQL_COCKROACH FLB_FALSE + +#define FLB_PGSQL_INSERT "INSERT INTO %s SELECT %s, " \ + "to_timestamp(CAST(value->>'%s' as FLOAT))," \ + " * FROM json_array_elements(%s);" +#define FLB_PGSQL_INSERT_COCKROACH "INSERT INTO %s SELECT %s," \ + "CAST(value->>'%s' AS INTERVAL) + DATE'1970-01-01'," \ + " * FROM json_array_elements(%s);" + +struct flb_pgsql_conn { + struct mk_list _head; + PGconn *conn; + int number; +}; + +struct flb_pgsql_config { + + /* database */ + char *db_hostname; + char db_port[8]; + const char *db_name; + flb_sds_t db_table; + + /* auth */ + const char *db_user; + const char *db_passwd; + + /* time key */ + flb_sds_t timestamp_key; + + /* instance reference */ + struct flb_output_instance *ins; + + /* connections options */ + const char *conn_options; + + /* connections pool */ + struct mk_list conn_queue; + struct mk_list _head; + + struct flb_pgsql_conn *conn_current; + int max_pool_size; + int min_pool_size; + int active_conn; + + /* async mode or sync mode */ + int async; + + /* cockroachdb */ + int cockroachdb; +}; + +void pgsql_conf_destroy(struct flb_pgsql_config *ctx); + +#endif diff --git a/fluent-bit/plugins/out_pgsql/pgsql_connections.c b/fluent-bit/plugins/out_pgsql/pgsql_connections.c new file mode 100644 index 000000000..9c4ccfba2 --- /dev/null +++ b/fluent-bit/plugins/out_pgsql/pgsql_connections.c @@ -0,0 +1,193 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_output_plugin.h> + +#include "pgsql.h" + +void pgsql_destroy_connections(struct flb_pgsql_config *ctx) +{ + struct mk_list *tmp; + struct mk_list *head; + struct flb_pgsql_conn *conn; + PGresult *res = NULL; + + mk_list_foreach_safe(head, tmp, &ctx->conn_queue) { + conn = mk_list_entry(head, struct flb_pgsql_conn, _head); + if (PQstatus(conn->conn) == CONNECTION_OK) { + while(PQconsumeInput(conn->conn) == 0) { + res = PQgetResult(conn->conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + flb_plg_warn(ctx->ins, "%s", + PQerrorMessage(conn->conn)); + } + PQclear(res); + } + } + PQfinish(conn->conn); + flb_free(conn); + } +} + +void *pgsql_create_connection(struct flb_pgsql_config *ctx) +{ + struct flb_pgsql_conn *conn; + + conn = flb_calloc(1, sizeof(struct flb_pgsql_conn)); + if (!conn) { + flb_errno(); + return NULL; + } + + conn->conn = PQsetdbLogin(ctx->db_hostname, + ctx->db_port, + ctx->conn_options, + NULL, + ctx->db_name, + ctx->db_user, + ctx->db_passwd); + + if (PQstatus(conn->conn) != CONNECTION_OK) { + flb_plg_error(ctx->ins, + "failed connecting to host=%s with error: %s", + ctx->db_hostname, PQerrorMessage(conn->conn)); + PQfinish(conn->conn); + flb_free(conn); + return NULL; + } + + flb_plg_info(ctx->ins, "switching postgresql connection " + "to non-blocking mode"); + + if (PQsetnonblocking(conn->conn, 1) != 0) { + flb_plg_error(ctx->ins, "non-blocking mode not set"); + PQfinish(conn->conn); + flb_free(conn); + return NULL; + } + + return conn; +} + +int pgsql_start_connections(struct flb_pgsql_config *ctx) +{ + int i; + struct flb_pgsql_conn *conn = NULL; + + mk_list_init(&ctx->conn_queue); + ctx->active_conn = 0; + + for(i = 0; i < ctx->min_pool_size; i++) { + flb_plg_info(ctx->ins, "Opening connection: #%d", i); + + conn = (struct flb_pgsql_conn *)pgsql_create_connection(ctx); + if (conn == NULL) { + pgsql_conf_destroy(ctx); + return -1; + } + + conn->number = i; + ctx->active_conn++; + mk_list_add(&conn->_head, &ctx->conn_queue); + } + + ctx->conn_current = mk_list_entry_last(&ctx->conn_queue, + struct flb_pgsql_conn, + _head); + + return 0; +} + +int pgsql_new_connection(struct flb_pgsql_config *ctx) +{ + struct flb_pgsql_conn *conn = NULL; + + if (ctx->active_conn >= ctx->max_pool_size) { + return -1; + } + + conn = (struct flb_pgsql_conn *)pgsql_create_connection(ctx); + if (conn == NULL) { + pgsql_conf_destroy(ctx); + return -1; + } + + conn->number = ctx->active_conn + 1; + ctx->active_conn++; + + mk_list_add(&conn->_head, &ctx->conn_queue); + + return 0; +} + +int pgsql_next_connection(struct flb_pgsql_config *ctx) +{ + struct flb_pgsql_conn *tmp = NULL; + PGresult *res = NULL; + struct mk_list *head; + int ret_conn = 1; + + if (PQconsumeInput(ctx->conn_current->conn) == 1) { + if (PQisBusy(ctx->conn_current->conn) == 0) { + res = PQgetResult(ctx->conn_current->conn); + PQclear(res); + } + } + else { + flb_plg_error(ctx->ins, "%s", + PQerrorMessage(ctx->conn_current->conn)); + } + + mk_list_foreach(head, &ctx->conn_queue) { + tmp = mk_list_entry(head, struct flb_pgsql_conn, _head); + if (ctx->conn_current == NULL) { + ctx->conn_current = tmp; + break; + } + + res = PQgetResult(tmp->conn); + + if (res == NULL) { + flb_plg_debug(ctx->ins, "Connection number %d", + tmp->number); + ctx->conn_current = tmp; + PQclear(res); + return 0; + } + + if (PQresultStatus(res) == PGRES_FATAL_ERROR) { + flb_plg_info(ctx->ins, "%s", + PQerrorMessage(tmp->conn)); + } + + PQclear(res); + } + + if (pgsql_new_connection(ctx) == -1) { + flb_plg_warn(ctx->ins, + "No more free connections." + " Increase max connections"); + } + else { + flb_plg_warn(ctx->ins, "Added new connection"); + ret_conn = pgsql_next_connection(ctx); + } + + return ret_conn; +} diff --git a/fluent-bit/plugins/out_pgsql/pgsql_connections.h b/fluent-bit/plugins/out_pgsql/pgsql_connections.h new file mode 100644 index 000000000..cd8730618 --- /dev/null +++ b/fluent-bit/plugins/out_pgsql/pgsql_connections.h @@ -0,0 +1,27 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_PGSQL_CONNECTIONS_H +#define FLB_OUT_PGSQL_CONNECTIONS_H + +void pgsql_destroy_connections(struct flb_pgsql_config *ctx); +int pgsql_start_connections(struct flb_pgsql_config *ctx); +int pgsql_next_connection(struct flb_pgsql_config *ctx); + +#endif |