diff options
Diffstat (limited to 'src/fluent-bit/plugins/out_pgsql/pgsql.c')
-rw-r--r-- | src/fluent-bit/plugins/out_pgsql/pgsql.c | 389 |
1 files changed, 0 insertions, 389 deletions
diff --git a/src/fluent-bit/plugins/out_pgsql/pgsql.c b/src/fluent-bit/plugins/out_pgsql/pgsql.c deleted file mode 100644 index a01090c1a..000000000 --- a/src/fluent-bit/plugins/out_pgsql/pgsql.c +++ /dev/null @@ -1,389 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_output.h> -#include <fluent-bit/flb_utils.h> -#include <fluent-bit/flb_pack.h> - -#include "pgsql.h" -#include "pgsql_connections.h" - -void pgsql_conf_destroy(struct flb_pgsql_config *ctx) -{ - pgsql_destroy_connections(ctx); - - flb_free(ctx->db_hostname); - - if (ctx->db_table != NULL) { - flb_sds_destroy(ctx->db_table); - } - - if (ctx->timestamp_key != NULL) { - flb_sds_destroy(ctx->timestamp_key); - } - - flb_free(ctx); - ctx = NULL; -} - -static int cb_pgsql_init(struct flb_output_instance *ins, - struct flb_config *config, void *data) -{ - - struct flb_pgsql_config *ctx; - size_t str_len; - PGresult *res; - char *query = NULL; - char *temp = NULL; - const char *tmp = NULL; - int ret; - - /* set default network configuration */ - flb_output_net_default(FLB_PGSQL_HOST, FLB_PGSQL_PORT, ins); - - ctx = flb_calloc(1, sizeof(struct flb_pgsql_config)); - if (!ctx) { - flb_errno(); - return -1; - } - - ctx->ins = ins; - - /* Database host */ - ctx->db_hostname = flb_strdup(ins->host.name); - if (!ctx->db_hostname) { - flb_errno(); - pgsql_conf_destroy(ctx); - return -1; - } - - /* Database port */ - snprintf(ctx->db_port, sizeof(ctx->db_port), "%d", ins->host.port); - - /* Database name */ - ctx->db_name = flb_output_get_property("database", ins); - if (!ctx->db_name) { - ctx->db_name = FLB_PGSQL_DBNAME; - } - - /* db table */ - tmp = flb_output_get_property("table", ins); - if (tmp) { - ctx->db_table = flb_sds_create(tmp); - } - else { - ctx->db_table = flb_sds_create(FLB_PGSQL_TABLE); - } - - /* connection options */ - ctx->conn_options = flb_output_get_property("connection_options", ins); - - if (!ctx->db_table) { - flb_errno(); - pgsql_conf_destroy(ctx); - return -1; - } - - /* db user */ - ctx->db_user = flb_output_get_property("user", ins); - if (!ctx->db_user) { - flb_plg_warn(ctx->ins, - "You didn't supply a valid user to connect," - "your current unix user will be used"); - } - - /* db user password */ - ctx->db_passwd = flb_output_get_property("password", ins); - - /* timestamp key */ - tmp = flb_output_get_property("timestamp_key", ins); - if (tmp) { - ctx->timestamp_key = flb_sds_create(tmp); - } - else { - ctx->timestamp_key = flb_sds_create(FLB_PGSQL_TIMESTAMP_KEY); - } - - if (!ctx->timestamp_key) { - flb_errno(); - pgsql_conf_destroy(ctx); - return -1; - } - - /* Pool size */ - tmp = flb_output_get_property("max_pool_size", ins); - if (tmp) { - ctx->max_pool_size = strtol(tmp, NULL, 0); - if (ctx->max_pool_size < 1) - ctx->max_pool_size = 1; - } - else { - ctx->max_pool_size = FLB_PGSQL_POOL_SIZE; - } - - tmp = flb_output_get_property("min_pool_size", ins); - if (tmp) { - ctx->min_pool_size = strtol(tmp, NULL, 0); - if (ctx->min_pool_size < 1 || ctx->min_pool_size > ctx->max_pool_size) - ctx->min_pool_size = ctx->max_pool_size; - } - else { - ctx->min_pool_size = FLB_PGSQL_MIN_POOL_SIZE; - } - - /* Sync Mode */ - tmp = flb_output_get_property("async", ins); - if (tmp && flb_utils_bool(tmp)) { - ctx->async = FLB_TRUE; - } - else { - ctx->async = FLB_FALSE; - } - - if (!ctx->async) { - ctx->min_pool_size = 1; - ctx->max_pool_size = 1; - } - - /* CockroachDB Support */ - tmp = flb_output_get_property("cockroachdb", ins); - if (tmp && flb_utils_bool(tmp)) { - ctx->cockroachdb = FLB_TRUE; - } - else { - ctx->cockroachdb = FLB_FALSE; - } - - ret = pgsql_start_connections(ctx); - if (ret) { - return -1; - } - - flb_plg_info(ctx->ins, "host=%s port=%s dbname=%s OK", - ctx->db_hostname, ctx->db_port, ctx->db_name); - flb_output_set_context(ins, ctx); - - temp = PQescapeIdentifier(ctx->conn_current->conn, ctx->db_table, - flb_sds_len(ctx->db_table)); - - if (temp == NULL) { - flb_plg_error(ctx->ins, "failed to parse table name: %s", - PQerrorMessage(ctx->conn_current->conn)); - pgsql_conf_destroy(ctx); - return -1; - } - - flb_sds_destroy(ctx->db_table); - ctx->db_table = flb_sds_create(temp); - PQfreemem(temp); - - if (!ctx->db_table) { - flb_errno(); - pgsql_conf_destroy(ctx); - return -1; - } - - flb_plg_info(ctx->ins, "we check that the table %s " - "exists, if not we create it", ctx->db_table); - - str_len = 72 + flb_sds_len(ctx->db_table); - - query = flb_malloc(str_len); - if (query == NULL) { - flb_errno(); - pgsql_conf_destroy(ctx); - return -1; - } - - /* Maybe use the timestamp with the TZ specified */ - /* in the postgresql connection? */ - snprintf(query, str_len, - "CREATE TABLE IF NOT EXISTS %s " - "(tag varchar, time timestamp, data jsonb);", - ctx->db_table); - flb_plg_trace(ctx->ins, "%s", query); - res = PQexec(ctx->conn_current->conn, query); - - flb_free(query); - - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - flb_plg_error(ctx->ins, "%s", - PQerrorMessage(ctx->conn_current->conn)); - pgsql_conf_destroy(ctx); - return -1; - } - - PQclear(res); - - return 0; -} - -static void cb_pgsql_flush(struct flb_event_chunk *event_chunk, - struct flb_output_flush *out_flush, - struct flb_input_instance *i_ins, - void *out_context, - struct flb_config *config) -{ - struct flb_pgsql_config *ctx = out_context; - flb_sds_t json; - char *tmp = NULL; - char *query = NULL; - PGresult *res = NULL; - int send_res; - flb_sds_t tag_escaped = NULL; - size_t str_len; - - - if (pgsql_next_connection(ctx) == 1) { - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - /* - * PQreset() - * This function will close the connection to the server and attempt to - * reestablish a new connection to the same server, using all the same - * parameters previously used. This might be useful for error recovery - * if a working connection is lost. - */ - if (PQstatus(ctx->conn_current->conn) != CONNECTION_OK) { - PQreset(ctx->conn_current->conn); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - json = flb_pack_msgpack_to_json_format(event_chunk->data, - event_chunk->size, - FLB_PACK_JSON_FORMAT_JSON, - FLB_PACK_JSON_DATE_DOUBLE, - ctx->timestamp_key); - if (json == NULL) { - flb_errno(); - flb_plg_error(ctx->ins, - "Can't parse the msgpack into json"); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - tmp = PQescapeLiteral(ctx->conn_current->conn, json, flb_sds_len(json)); - flb_sds_destroy(json); - if (!tmp) { - flb_errno(); - PQfreemem(tmp); - flb_plg_error(ctx->ins, "Can't escape json string"); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - json = flb_sds_create(tmp); - PQfreemem(tmp); - if (!json) { - flb_errno(); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - tmp = PQescapeLiteral(ctx->conn_current->conn, - event_chunk->tag, - flb_sds_len(event_chunk->tag)); - if (!tmp) { - flb_errno(); - flb_sds_destroy(json); - PQfreemem(tmp); - flb_plg_error(ctx->ins, "Can't escape tag string: %s", - event_chunk->tag); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - tag_escaped = flb_sds_create(tmp); - PQfreemem(tmp); - if (!tag_escaped) { - flb_errno(); - flb_sds_destroy(json); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - str_len = 100 + flb_sds_len(json) - + flb_sds_len(tag_escaped) - + flb_sds_len(ctx->db_table) - + flb_sds_len(ctx->timestamp_key); - query = flb_malloc(str_len); - - if (query == NULL) { - flb_errno(); - flb_sds_destroy(json); - flb_sds_destroy(tag_escaped); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - - snprintf(query, str_len, - ctx->cockroachdb ? FLB_PGSQL_INSERT_COCKROACH : FLB_PGSQL_INSERT, - ctx->db_table, tag_escaped, ctx->timestamp_key, json); - flb_plg_trace(ctx->ins, "query: %s", query); - - if (ctx->async) { - send_res = PQsendQuery(ctx->conn_current->conn, query); - flb_free(query); - flb_sds_destroy(json); - flb_sds_destroy(tag_escaped); - - if (send_res == 0) { - flb_plg_error(ctx->ins, "%s", - PQerrorMessage(ctx->conn_current->conn)); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - PQflush(ctx->conn_current->conn); - } - else { - res = PQexec(ctx->conn_current->conn, query); - flb_free(query); - flb_sds_destroy(json); - flb_sds_destroy(tag_escaped); - - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - flb_plg_error(ctx->ins, "%s", - PQerrorMessage(ctx->conn_current->conn)); - PQclear(res); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - PQclear(res); - } - - FLB_OUTPUT_RETURN(FLB_OK); -} - -static int cb_pgsql_exit(void *data, struct flb_config *config) -{ - struct flb_pgsql_config *ctx = data; - - if (!ctx){ - return 0; - } - - pgsql_conf_destroy(ctx); - - return 0; -} - -struct flb_output_plugin out_pgsql_plugin = { - .name = "pgsql", - .description = "PostgreSQL", - .cb_init = cb_pgsql_init, - .cb_flush = cb_pgsql_flush, - .cb_exit = cb_pgsql_exit, - .flags = 0, -}; |