diff options
Diffstat (limited to 'src/fluent-bit/plugins/out_gelf/gelf.c')
-rw-r--r-- | src/fluent-bit/plugins/out_gelf/gelf.c | 556 |
1 files changed, 0 insertions, 556 deletions
diff --git a/src/fluent-bit/plugins/out_gelf/gelf.c b/src/fluent-bit/plugins/out_gelf/gelf.c deleted file mode 100644 index 6d7284641..000000000 --- a/src/fluent-bit/plugins/out_gelf/gelf.c +++ /dev/null @@ -1,556 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_output_plugin.h> -#include <fluent-bit/flb_pack.h> -#include <fluent-bit/flb_str.h> -#include <fluent-bit/flb_time.h> -#include <fluent-bit/flb_gzip.h> -#include <fluent-bit/flb_utils.h> -#include <fluent-bit/flb_network.h> -#include <fluent-bit/flb_random.h> -#include <fluent-bit/flb_config_map.h> -#include <fluent-bit/flb_log_event_decoder.h> -#include <msgpack.h> - -#include <stdio.h> -#include <stdlib.h> -#include <assert.h> -#include <sys/types.h> -#include <sys/stat.h> -#include <fcntl.h> -#include <errno.h> - -#include "gelf.h" - -#ifndef MSG_DONTWAIT - #define MSG_DONTWAIT 0 -#endif - -#ifndef MSG_NOSIGNAL - #define MSG_NOSIGNAL 0 -#endif - -/* - * Version 1.1 (11/2013) - * A GELF message is a GZIP’d or ZLIB’d JSON string with the following fields: - * version string (UTF-8) GELF spec version – “1.1”; MUST be set by client - * library. - * host string (UTF-8) the name of the host, source or application that sent - * this message; MUST be set by client library. - * short_message string (UTF-8) a short descriptive message; MUST be set by - * client library. - * full_message string (UTF-8) a long message that can i.e. contain a - * backtrace; optional. - * timestamp number Seconds since UNIX epoch with optional decimal places - * for milliseconds; SHOULD be set by client library. Will be set to NOW - * by server if absent. - * level number the level equal to the standard syslog levels; optional, - * default is 1 (ALERT). - * facility string (UTF-8) optional, deprecated. Send as additional field i - * instead. - * line number the line in a file that caused the error (decimal); optional, - * deprecated. Send as additional field instead. - * file string (UTF-8) the file (with path if you want) that caused the error - * (string); optional, deprecated. Send as additional field instead. - * _[additional field] string (UTF-8) or number every field you send and - * prefix with a _ (underscore) will be treated as an additional field. - * Allowed characters in field names are any word character (letter, - * number, underscore), dashes and dots. The verifying regular expression - * is: ^[\w\.\-]*$ - * Libraries SHOULD not allow to send id as additional field (_id). Graylog - * server nodes omit this field automatically. - */ - -/* - * Generate a unique message ID. The upper 48-bit is milliseconds - * since the Epoch, the lower 16-bit is a random nonce. - */ -static uint64_t message_id(void) -{ - uint64_t now; - uint16_t nonce; - struct flb_time tm; - - if (flb_time_get(&tm) != -1) { - now = (uint64_t) tm.tm.tv_sec * 1000 + tm.tm.tv_nsec / 1000000; - } - else { - now = (uint64_t) time(NULL) * 1000; - } - nonce = (uint16_t) rand(); - - return (now << 16) | nonce; -} - -/* - * A GELF header is 12 bytes in size. It has the following - * structure: - * - * +---+---+---+---+---+---+---+---+---+---+---+---+ - * | MAGIC | MESSAGE ID |SEQ|NUM| - * +---+---+---+---+---+---+---+---+---+---+---+---+ - * - * NUM is the total number of packets to send. SEQ is the - * unique sequence number for each packet (zero-indexed). - */ -#define GELF_MAGIC "\x1e\x0f" -#define GELF_HEADER_SIZE 12 - -static void init_chunk_header(uint8_t *buf, int count) -{ - uint64_t msgid = message_id(); - - memcpy(buf, GELF_MAGIC, 2); - memcpy(buf + 2, &msgid, 8); - buf[10] = 0; - buf[11] = count; -} - -/* - * Chunked GELF - * Prepend the following structure to your GELF message to make it chunked: - * Chunked GELF magic bytes 2 bytes 0x1e 0x0f - * Message ID 8 bytes Must be the same for every chunk of this message. - * Identifying the whole message and is used to reassemble the chunks later. - * Generate from millisecond timestamp + hostname for example. - * Sequence number 1 byte The sequence number of this chunk. Starting at 0 - * and always less than the sequence count. - * Sequence count 1 byte Total number of chunks this message has. - * All chunks MUST arrive within 5 seconds or the server will discard all - * already arrived and still arriving chunks. - * A message MUST NOT consist of more than 128 chunks. - */ -static int gelf_send_udp_chunked(struct flb_out_gelf_config *ctx, void *msg, - size_t msg_size) -{ - int ret; - uint8_t n; - size_t chunks; - size_t offset; - size_t len; - uint8_t *buf = (uint8_t *) ctx->pckt_buf; - - chunks = msg_size / ctx->pckt_size; - if (msg_size % ctx->pckt_size != 0) { - chunks++; - } - - if (chunks > 128) { - flb_plg_error(ctx->ins, "message too big: %zd bytes", msg_size); - return -1; - } - - init_chunk_header(buf, chunks); - - offset = 0; - for (n = 0; n < chunks; n++) { - buf[10] = n; - - len = msg_size - offset; - if (ctx->pckt_size < len) { - len = ctx->pckt_size; - } - memcpy(buf + GELF_HEADER_SIZE, (char *) msg + offset, len); - - ret = send(ctx->fd, buf, len + GELF_HEADER_SIZE, - MSG_DONTWAIT | MSG_NOSIGNAL); - if (ret == -1) { - flb_errno(); - } - offset += ctx->pckt_size; - } - return 0; -} - -static int gelf_send_udp_pckt (struct flb_out_gelf_config *ctx, char *msg, - size_t msg_size) -{ - int ret; - - if (msg_size > ctx->pckt_size) { - gelf_send_udp_chunked(ctx, msg, msg_size); - } - else { - ret = send(ctx->fd, msg, msg_size, MSG_DONTWAIT | MSG_NOSIGNAL); - if (ret == -1) { - flb_errno(); - return -1; - } - } - - return 0; -} - -static int gelf_send_udp(struct flb_out_gelf_config *ctx, char *msg, - size_t msg_size) -{ - int ret; - int status; - void *zdata; - size_t zdata_len; - - if (ctx->compress == FLB_TRUE || (msg_size > ctx->pckt_size)) { - ret = flb_gzip_compress(msg, msg_size, &zdata, &zdata_len); - if (ret != 0) { - return -1; - } - - status = gelf_send_udp_pckt (ctx, zdata, zdata_len); - flb_free(zdata); - if (status < 0) { - return status; - } - } - else { - status = send(ctx->fd, msg, msg_size, MSG_DONTWAIT | MSG_NOSIGNAL); - if (status < 0) { - return status; - } - } - - return 0; -} - -static void cb_gelf_flush(struct flb_event_chunk *event_chunk, - struct flb_output_flush *out_flush, - struct flb_input_instance *i_ins, - void *out_context, - struct flb_config *config) -{ - int ret; - flb_sds_t s; - flb_sds_t tmp; - size_t off = 0; - size_t prev_off = 0; - size_t size = 0; - size_t bytes_sent; - msgpack_object map; - struct flb_connection *u_conn = NULL; - struct flb_out_gelf_config *ctx = out_context; - struct flb_log_event_decoder log_decoder; - struct flb_log_event log_event; - - if (ctx->mode != FLB_GELF_UDP) { - u_conn = flb_upstream_conn_get(ctx->u); - if (!u_conn) { - flb_plg_error(ctx->ins, "no upstream connections available"); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - } - - ret = flb_log_event_decoder_init(&log_decoder, - (char *) event_chunk->data, - event_chunk->size); - - if (ret != FLB_EVENT_DECODER_SUCCESS) { - flb_plg_error(ctx->ins, - "Log event decoder initialization error : %d", ret); - - if (ctx->mode != FLB_GELF_UDP) { - flb_upstream_conn_release(u_conn); - } - - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - while ((ret = flb_log_event_decoder_next( - &log_decoder, - &log_event)) == FLB_EVENT_DECODER_SUCCESS) { - off = log_decoder.offset; - size = off - prev_off; - prev_off = off; - - map = *log_event.body; - - size = (size * 1.4); - s = flb_sds_create_size(size); - if (s == NULL) { - flb_log_event_decoder_destroy(&log_decoder); - FLB_OUTPUT_RETURN(FLB_ERROR); - } - - tmp = flb_msgpack_to_gelf(&s, &map, &log_event.timestamp, - &(ctx->fields)); - if (tmp != NULL) { - s = tmp; - if (ctx->mode == FLB_GELF_UDP) { - ret = gelf_send_udp(ctx, s, flb_sds_len(s)); - if (ret == -1) { - if (ctx->mode != FLB_GELF_UDP) { - flb_upstream_conn_release(u_conn); - } - - flb_log_event_decoder_destroy(&log_decoder); - - flb_sds_destroy(s); - - FLB_OUTPUT_RETURN(FLB_RETRY); - } - } - else { - /* write gelf json plus \0 */ - ret = flb_io_net_write(u_conn, - s, flb_sds_len(s) + 1, &bytes_sent); - if (ret == -1) { - flb_errno(); - - if (ctx->mode != FLB_GELF_UDP) { - flb_upstream_conn_release(u_conn); - } - - flb_log_event_decoder_destroy(&log_decoder); - - flb_sds_destroy(s); - - FLB_OUTPUT_RETURN(FLB_RETRY); - } - } - } - else { - flb_plg_error(ctx->ins, "error encoding to GELF"); - } - - flb_sds_destroy(s); - } - - flb_log_event_decoder_destroy(&log_decoder); - - if (ctx->mode != FLB_GELF_UDP) { - flb_upstream_conn_release(u_conn); - } - - FLB_OUTPUT_RETURN(FLB_OK); -} - -static int cb_gelf_init(struct flb_output_instance *ins, struct flb_config *config, - void *data) -{ - int ret; - const char *tmp; - struct flb_out_gelf_config *ctx = NULL; - - /* Set default network configuration */ - flb_output_net_default("127.0.0.1", 12201, ins); - - /* Allocate plugin context */ - ctx = flb_calloc(1, sizeof(struct flb_out_gelf_config)); - if (!ctx) { - flb_errno(); - return -1; - } - ctx->ins = ins; - - ret = flb_output_config_map_set(ins, (void *) ctx); - if (ret == -1) { - flb_plg_error(ins, "flb_output_config_map_set failed"); - flb_free(ctx); - return -1; - } - - /* Config Mode */ - tmp = flb_output_get_property("mode", ins); - if (tmp) { - if (!strcasecmp(tmp, "tcp")) { - ctx->mode = FLB_GELF_TCP; - } - else if (!strcasecmp(tmp, "tls")) { - ctx->mode = FLB_GELF_TLS; - } - else if (!strcasecmp(tmp, "udp")) { - ctx->mode = FLB_GELF_UDP; - } - else { - flb_plg_error(ctx->ins, "Unknown gelf mode %s", tmp); - flb_free(ctx); - return -1; - } - } - else { - ctx->mode = FLB_GELF_UDP; - } - - /* Config Gelf_Timestamp_Key */ - tmp = flb_output_get_property("gelf_timestamp_key", ins); - if (tmp) { - ctx->fields.timestamp_key = flb_sds_create(tmp); - } - - /* Config Gelf_Host_Key */ - tmp = flb_output_get_property("gelf_host_key", ins); - if (tmp) { - ctx->fields.host_key = flb_sds_create(tmp); - } - - /* Config Gelf_Short_Message_Key */ - tmp = flb_output_get_property("gelf_short_message_key", ins); - if (tmp) { - ctx->fields.short_message_key = flb_sds_create(tmp); - } - - /* Config Gelf_Full_Message_Key */ - tmp = flb_output_get_property("gelf_full_message_key", ins); - if (tmp) { - ctx->fields.full_message_key = flb_sds_create(tmp); - } - - /* Config Gelf_Level_Key */ - tmp = flb_output_get_property("gelf_level_key", ins); - if (tmp) { - ctx->fields.level_key = flb_sds_create(tmp); - } - - /* init random seed */ - if (flb_random_bytes((unsigned char *) &ctx->seed, sizeof(int))) { - ctx->seed = time(NULL); - } - srand(ctx->seed); - - ctx->fd = -1; - ctx->pckt_buf = NULL; - - if (ctx->mode == FLB_GELF_UDP) { - ctx->fd = flb_net_udp_connect(ins->host.name, ins->host.port, - ins->net_setup.source_address); - if (ctx->fd < 0) { - flb_free(ctx); - return -1; - } - ctx->pckt_buf = flb_malloc(GELF_HEADER_SIZE + ctx->pckt_size); - if (ctx->pckt_buf == NULL) { - flb_socket_close(ctx->fd); - flb_free(ctx); - return -1; - } - } - else { - int io_flags = FLB_IO_TCP; - - if (ctx->mode == FLB_GELF_TLS) { - io_flags = FLB_IO_TLS; - } - - if (ins->host.ipv6 == FLB_TRUE) { - io_flags |= FLB_IO_IPV6; - } - - ctx->u = flb_upstream_create(config, ins->host.name, ins->host.port, - io_flags, ins->tls); - if (!(ctx->u)) { - flb_free(ctx); - return -1; - } - flb_output_upstream_set(ctx->u, ins); - } - - /* Set the plugin context */ - flb_output_set_context(ins, ctx); - return 0; -} - -static int cb_gelf_exit(void *data, struct flb_config *config) -{ - struct flb_out_gelf_config *ctx = data; - - if (ctx == NULL) { - return 0; - } - - if (ctx->u) { - flb_upstream_destroy(ctx->u); - } - if (ctx->fd >= 0) { - close(ctx->fd); - } - - flb_sds_destroy(ctx->fields.timestamp_key); - flb_sds_destroy(ctx->fields.host_key); - flb_sds_destroy(ctx->fields.short_message_key); - flb_sds_destroy(ctx->fields.full_message_key); - flb_sds_destroy(ctx->fields.level_key); - - flb_free(ctx->pckt_buf); - flb_free(ctx); - - return 0; -} - - -static struct flb_config_map config_map[] = { - { - FLB_CONFIG_MAP_STR, "mode", "udp", - 0, FLB_FALSE, 0, - "The protocol to use. 'tls', 'tcp' or 'udp'" - }, - { - FLB_CONFIG_MAP_STR, "gelf_short_message_key", NULL, - 0, FLB_FALSE, 0, - "A short descriptive message (MUST be set in GELF)" - }, - { - FLB_CONFIG_MAP_STR, "gelf_timestamp_key", NULL, - 0, FLB_FALSE, 0, - "Timestamp key name (SHOULD be set in GELF)" - }, - { - FLB_CONFIG_MAP_STR, "gelf_host_key", NULL, - 0, FLB_FALSE, 0, - "Key which its value is used as the name of the host," - "source or application that sent this message. (MUST be set in GELF) " - }, - { - FLB_CONFIG_MAP_STR, "gelf_full_message_key", NULL, - 0, FLB_FALSE, 0, - "Key to use as the long message that can i.e. contain a backtrace. " - "(Optional in GELF)" - }, - { - FLB_CONFIG_MAP_STR, "gelf_level_key", NULL, - 0, FLB_FALSE, 0, - "Key to be used as the log level. " - "Its value must be in standard syslog levels (between 0 and 7). " - "(Optional in GELF)" - }, - { - FLB_CONFIG_MAP_INT, "packet_size", "1420", - 0, FLB_TRUE, offsetof(struct flb_out_gelf_config, pckt_size), - "If transport protocol is udp, you can set the size of packets to be sent." - }, - { - FLB_CONFIG_MAP_BOOL, "compress", "true", - 0, FLB_TRUE, offsetof(struct flb_out_gelf_config, compress), - "If transport protocol is udp, " - "you can set this if you want your UDP packets to be compressed." - }, - - /* EOF */ - {0} -}; - -/* Plugin reference */ -struct flb_output_plugin out_gelf_plugin = { - .name = "gelf", - .description = "GELF Output", - .cb_init = cb_gelf_init, - .cb_pre_run = NULL, - .cb_flush = cb_gelf_flush, - .cb_exit = cb_gelf_exit, - .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS, - .config_map = config_map -}; |