summaryrefslogtreecommitdiffstats
path: root/fluent-bit/plugins/out_gelf/gelf.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/plugins/out_gelf/gelf.c')
-rw-r--r--fluent-bit/plugins/out_gelf/gelf.c556
1 files changed, 556 insertions, 0 deletions
diff --git a/fluent-bit/plugins/out_gelf/gelf.c b/fluent-bit/plugins/out_gelf/gelf.c
new file mode 100644
index 000000000..6d7284641
--- /dev/null
+++ b/fluent-bit/plugins/out_gelf/gelf.c
@@ -0,0 +1,556 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_output_plugin.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_str.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_gzip.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_network.h>
+#include <fluent-bit/flb_random.h>
+#include <fluent-bit/flb_config_map.h>
+#include <fluent-bit/flb_log_event_decoder.h>
+#include <msgpack.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <errno.h>
+
+#include "gelf.h"
+
+#ifndef MSG_DONTWAIT
+ #define MSG_DONTWAIT 0
+#endif
+
+#ifndef MSG_NOSIGNAL
+ #define MSG_NOSIGNAL 0
+#endif
+
+/*
+ * Version 1.1 (11/2013)
+ * A GELF message is a GZIP’d or ZLIB’d JSON string with the following fields:
+ * version string (UTF-8) GELF spec version – “1.1”; MUST be set by client
+ * library.
+ * host string (UTF-8) the name of the host, source or application that sent
+ * this message; MUST be set by client library.
+ * short_message string (UTF-8) a short descriptive message; MUST be set by
+ * client library.
+ * full_message string (UTF-8) a long message that can i.e. contain a
+ * backtrace; optional.
+ * timestamp number Seconds since UNIX epoch with optional decimal places
+ * for milliseconds; SHOULD be set by client library. Will be set to NOW
+ * by server if absent.
+ * level number the level equal to the standard syslog levels; optional,
+ * default is 1 (ALERT).
+ * facility string (UTF-8) optional, deprecated. Send as additional field i
+ * instead.
+ * line number the line in a file that caused the error (decimal); optional,
+ * deprecated. Send as additional field instead.
+ * file string (UTF-8) the file (with path if you want) that caused the error
+ * (string); optional, deprecated. Send as additional field instead.
+ * _[additional field] string (UTF-8) or number every field you send and
+ * prefix with a _ (underscore) will be treated as an additional field.
+ * Allowed characters in field names are any word character (letter,
+ * number, underscore), dashes and dots. The verifying regular expression
+ * is: ^[\w\.\-]*$
+ * Libraries SHOULD not allow to send id as additional field (_id). Graylog
+ * server nodes omit this field automatically.
+ */
+
+/*
+ * Generate a unique message ID. The upper 48-bit is milliseconds
+ * since the Epoch, the lower 16-bit is a random nonce.
+ */
+static uint64_t message_id(void)
+{
+ uint64_t now;
+ uint16_t nonce;
+ struct flb_time tm;
+
+ if (flb_time_get(&tm) != -1) {
+ now = (uint64_t) tm.tm.tv_sec * 1000 + tm.tm.tv_nsec / 1000000;
+ }
+ else {
+ now = (uint64_t) time(NULL) * 1000;
+ }
+ nonce = (uint16_t) rand();
+
+ return (now << 16) | nonce;
+}
+
+/*
+ * A GELF header is 12 bytes in size. It has the following
+ * structure:
+ *
+ * +---+---+---+---+---+---+---+---+---+---+---+---+
+ * | MAGIC | MESSAGE ID |SEQ|NUM|
+ * +---+---+---+---+---+---+---+---+---+---+---+---+
+ *
+ * NUM is the total number of packets to send. SEQ is the
+ * unique sequence number for each packet (zero-indexed).
+ */
+#define GELF_MAGIC "\x1e\x0f"
+#define GELF_HEADER_SIZE 12
+
+static void init_chunk_header(uint8_t *buf, int count)
+{
+ uint64_t msgid = message_id();
+
+ memcpy(buf, GELF_MAGIC, 2);
+ memcpy(buf + 2, &msgid, 8);
+ buf[10] = 0;
+ buf[11] = count;
+}
+
+/*
+ * Chunked GELF
+ * Prepend the following structure to your GELF message to make it chunked:
+ * Chunked GELF magic bytes 2 bytes 0x1e 0x0f
+ * Message ID 8 bytes Must be the same for every chunk of this message.
+ * Identifying the whole message and is used to reassemble the chunks later.
+ * Generate from millisecond timestamp + hostname for example.
+ * Sequence number 1 byte The sequence number of this chunk. Starting at 0
+ * and always less than the sequence count.
+ * Sequence count 1 byte Total number of chunks this message has.
+ * All chunks MUST arrive within 5 seconds or the server will discard all
+ * already arrived and still arriving chunks.
+ * A message MUST NOT consist of more than 128 chunks.
+ */
+static int gelf_send_udp_chunked(struct flb_out_gelf_config *ctx, void *msg,
+ size_t msg_size)
+{
+ int ret;
+ uint8_t n;
+ size_t chunks;
+ size_t offset;
+ size_t len;
+ uint8_t *buf = (uint8_t *) ctx->pckt_buf;
+
+ chunks = msg_size / ctx->pckt_size;
+ if (msg_size % ctx->pckt_size != 0) {
+ chunks++;
+ }
+
+ if (chunks > 128) {
+ flb_plg_error(ctx->ins, "message too big: %zd bytes", msg_size);
+ return -1;
+ }
+
+ init_chunk_header(buf, chunks);
+
+ offset = 0;
+ for (n = 0; n < chunks; n++) {
+ buf[10] = n;
+
+ len = msg_size - offset;
+ if (ctx->pckt_size < len) {
+ len = ctx->pckt_size;
+ }
+ memcpy(buf + GELF_HEADER_SIZE, (char *) msg + offset, len);
+
+ ret = send(ctx->fd, buf, len + GELF_HEADER_SIZE,
+ MSG_DONTWAIT | MSG_NOSIGNAL);
+ if (ret == -1) {
+ flb_errno();
+ }
+ offset += ctx->pckt_size;
+ }
+ return 0;
+}
+
+static int gelf_send_udp_pckt (struct flb_out_gelf_config *ctx, char *msg,
+ size_t msg_size)
+{
+ int ret;
+
+ if (msg_size > ctx->pckt_size) {
+ gelf_send_udp_chunked(ctx, msg, msg_size);
+ }
+ else {
+ ret = send(ctx->fd, msg, msg_size, MSG_DONTWAIT | MSG_NOSIGNAL);
+ if (ret == -1) {
+ flb_errno();
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+static int gelf_send_udp(struct flb_out_gelf_config *ctx, char *msg,
+ size_t msg_size)
+{
+ int ret;
+ int status;
+ void *zdata;
+ size_t zdata_len;
+
+ if (ctx->compress == FLB_TRUE || (msg_size > ctx->pckt_size)) {
+ ret = flb_gzip_compress(msg, msg_size, &zdata, &zdata_len);
+ if (ret != 0) {
+ return -1;
+ }
+
+ status = gelf_send_udp_pckt (ctx, zdata, zdata_len);
+ flb_free(zdata);
+ if (status < 0) {
+ return status;
+ }
+ }
+ else {
+ status = send(ctx->fd, msg, msg_size, MSG_DONTWAIT | MSG_NOSIGNAL);
+ if (status < 0) {
+ return status;
+ }
+ }
+
+ return 0;
+}
+
+static void cb_gelf_flush(struct flb_event_chunk *event_chunk,
+ struct flb_output_flush *out_flush,
+ struct flb_input_instance *i_ins,
+ void *out_context,
+ struct flb_config *config)
+{
+ int ret;
+ flb_sds_t s;
+ flb_sds_t tmp;
+ size_t off = 0;
+ size_t prev_off = 0;
+ size_t size = 0;
+ size_t bytes_sent;
+ msgpack_object map;
+ struct flb_connection *u_conn = NULL;
+ struct flb_out_gelf_config *ctx = out_context;
+ struct flb_log_event_decoder log_decoder;
+ struct flb_log_event log_event;
+
+ if (ctx->mode != FLB_GELF_UDP) {
+ u_conn = flb_upstream_conn_get(ctx->u);
+ if (!u_conn) {
+ flb_plg_error(ctx->ins, "no upstream connections available");
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+ }
+
+ ret = flb_log_event_decoder_init(&log_decoder,
+ (char *) event_chunk->data,
+ event_chunk->size);
+
+ if (ret != FLB_EVENT_DECODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event decoder initialization error : %d", ret);
+
+ if (ctx->mode != FLB_GELF_UDP) {
+ flb_upstream_conn_release(u_conn);
+ }
+
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+
+ while ((ret = flb_log_event_decoder_next(
+ &log_decoder,
+ &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
+ off = log_decoder.offset;
+ size = off - prev_off;
+ prev_off = off;
+
+ map = *log_event.body;
+
+ size = (size * 1.4);
+ s = flb_sds_create_size(size);
+ if (s == NULL) {
+ flb_log_event_decoder_destroy(&log_decoder);
+ FLB_OUTPUT_RETURN(FLB_ERROR);
+ }
+
+ tmp = flb_msgpack_to_gelf(&s, &map, &log_event.timestamp,
+ &(ctx->fields));
+ if (tmp != NULL) {
+ s = tmp;
+ if (ctx->mode == FLB_GELF_UDP) {
+ ret = gelf_send_udp(ctx, s, flb_sds_len(s));
+ if (ret == -1) {
+ if (ctx->mode != FLB_GELF_UDP) {
+ flb_upstream_conn_release(u_conn);
+ }
+
+ flb_log_event_decoder_destroy(&log_decoder);
+
+ flb_sds_destroy(s);
+
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+ }
+ else {
+ /* write gelf json plus \0 */
+ ret = flb_io_net_write(u_conn,
+ s, flb_sds_len(s) + 1, &bytes_sent);
+ if (ret == -1) {
+ flb_errno();
+
+ if (ctx->mode != FLB_GELF_UDP) {
+ flb_upstream_conn_release(u_conn);
+ }
+
+ flb_log_event_decoder_destroy(&log_decoder);
+
+ flb_sds_destroy(s);
+
+ FLB_OUTPUT_RETURN(FLB_RETRY);
+ }
+ }
+ }
+ else {
+ flb_plg_error(ctx->ins, "error encoding to GELF");
+ }
+
+ flb_sds_destroy(s);
+ }
+
+ flb_log_event_decoder_destroy(&log_decoder);
+
+ if (ctx->mode != FLB_GELF_UDP) {
+ flb_upstream_conn_release(u_conn);
+ }
+
+ FLB_OUTPUT_RETURN(FLB_OK);
+}
+
+static int cb_gelf_init(struct flb_output_instance *ins, struct flb_config *config,
+ void *data)
+{
+ int ret;
+ const char *tmp;
+ struct flb_out_gelf_config *ctx = NULL;
+
+ /* Set default network configuration */
+ flb_output_net_default("127.0.0.1", 12201, ins);
+
+ /* Allocate plugin context */
+ ctx = flb_calloc(1, sizeof(struct flb_out_gelf_config));
+ if (!ctx) {
+ flb_errno();
+ return -1;
+ }
+ ctx->ins = ins;
+
+ ret = flb_output_config_map_set(ins, (void *) ctx);
+ if (ret == -1) {
+ flb_plg_error(ins, "flb_output_config_map_set failed");
+ flb_free(ctx);
+ return -1;
+ }
+
+ /* Config Mode */
+ tmp = flb_output_get_property("mode", ins);
+ if (tmp) {
+ if (!strcasecmp(tmp, "tcp")) {
+ ctx->mode = FLB_GELF_TCP;
+ }
+ else if (!strcasecmp(tmp, "tls")) {
+ ctx->mode = FLB_GELF_TLS;
+ }
+ else if (!strcasecmp(tmp, "udp")) {
+ ctx->mode = FLB_GELF_UDP;
+ }
+ else {
+ flb_plg_error(ctx->ins, "Unknown gelf mode %s", tmp);
+ flb_free(ctx);
+ return -1;
+ }
+ }
+ else {
+ ctx->mode = FLB_GELF_UDP;
+ }
+
+ /* Config Gelf_Timestamp_Key */
+ tmp = flb_output_get_property("gelf_timestamp_key", ins);
+ if (tmp) {
+ ctx->fields.timestamp_key = flb_sds_create(tmp);
+ }
+
+ /* Config Gelf_Host_Key */
+ tmp = flb_output_get_property("gelf_host_key", ins);
+ if (tmp) {
+ ctx->fields.host_key = flb_sds_create(tmp);
+ }
+
+ /* Config Gelf_Short_Message_Key */
+ tmp = flb_output_get_property("gelf_short_message_key", ins);
+ if (tmp) {
+ ctx->fields.short_message_key = flb_sds_create(tmp);
+ }
+
+ /* Config Gelf_Full_Message_Key */
+ tmp = flb_output_get_property("gelf_full_message_key", ins);
+ if (tmp) {
+ ctx->fields.full_message_key = flb_sds_create(tmp);
+ }
+
+ /* Config Gelf_Level_Key */
+ tmp = flb_output_get_property("gelf_level_key", ins);
+ if (tmp) {
+ ctx->fields.level_key = flb_sds_create(tmp);
+ }
+
+ /* init random seed */
+ if (flb_random_bytes((unsigned char *) &ctx->seed, sizeof(int))) {
+ ctx->seed = time(NULL);
+ }
+ srand(ctx->seed);
+
+ ctx->fd = -1;
+ ctx->pckt_buf = NULL;
+
+ if (ctx->mode == FLB_GELF_UDP) {
+ ctx->fd = flb_net_udp_connect(ins->host.name, ins->host.port,
+ ins->net_setup.source_address);
+ if (ctx->fd < 0) {
+ flb_free(ctx);
+ return -1;
+ }
+ ctx->pckt_buf = flb_malloc(GELF_HEADER_SIZE + ctx->pckt_size);
+ if (ctx->pckt_buf == NULL) {
+ flb_socket_close(ctx->fd);
+ flb_free(ctx);
+ return -1;
+ }
+ }
+ else {
+ int io_flags = FLB_IO_TCP;
+
+ if (ctx->mode == FLB_GELF_TLS) {
+ io_flags = FLB_IO_TLS;
+ }
+
+ if (ins->host.ipv6 == FLB_TRUE) {
+ io_flags |= FLB_IO_IPV6;
+ }
+
+ ctx->u = flb_upstream_create(config, ins->host.name, ins->host.port,
+ io_flags, ins->tls);
+ if (!(ctx->u)) {
+ flb_free(ctx);
+ return -1;
+ }
+ flb_output_upstream_set(ctx->u, ins);
+ }
+
+ /* Set the plugin context */
+ flb_output_set_context(ins, ctx);
+ return 0;
+}
+
+static int cb_gelf_exit(void *data, struct flb_config *config)
+{
+ struct flb_out_gelf_config *ctx = data;
+
+ if (ctx == NULL) {
+ return 0;
+ }
+
+ if (ctx->u) {
+ flb_upstream_destroy(ctx->u);
+ }
+ if (ctx->fd >= 0) {
+ close(ctx->fd);
+ }
+
+ flb_sds_destroy(ctx->fields.timestamp_key);
+ flb_sds_destroy(ctx->fields.host_key);
+ flb_sds_destroy(ctx->fields.short_message_key);
+ flb_sds_destroy(ctx->fields.full_message_key);
+ flb_sds_destroy(ctx->fields.level_key);
+
+ flb_free(ctx->pckt_buf);
+ flb_free(ctx);
+
+ return 0;
+}
+
+
+static struct flb_config_map config_map[] = {
+ {
+ FLB_CONFIG_MAP_STR, "mode", "udp",
+ 0, FLB_FALSE, 0,
+ "The protocol to use. 'tls', 'tcp' or 'udp'"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "gelf_short_message_key", NULL,
+ 0, FLB_FALSE, 0,
+ "A short descriptive message (MUST be set in GELF)"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "gelf_timestamp_key", NULL,
+ 0, FLB_FALSE, 0,
+ "Timestamp key name (SHOULD be set in GELF)"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "gelf_host_key", NULL,
+ 0, FLB_FALSE, 0,
+ "Key which its value is used as the name of the host,"
+ "source or application that sent this message. (MUST be set in GELF) "
+ },
+ {
+ FLB_CONFIG_MAP_STR, "gelf_full_message_key", NULL,
+ 0, FLB_FALSE, 0,
+ "Key to use as the long message that can i.e. contain a backtrace. "
+ "(Optional in GELF)"
+ },
+ {
+ FLB_CONFIG_MAP_STR, "gelf_level_key", NULL,
+ 0, FLB_FALSE, 0,
+ "Key to be used as the log level. "
+ "Its value must be in standard syslog levels (between 0 and 7). "
+ "(Optional in GELF)"
+ },
+ {
+ FLB_CONFIG_MAP_INT, "packet_size", "1420",
+ 0, FLB_TRUE, offsetof(struct flb_out_gelf_config, pckt_size),
+ "If transport protocol is udp, you can set the size of packets to be sent."
+ },
+ {
+ FLB_CONFIG_MAP_BOOL, "compress", "true",
+ 0, FLB_TRUE, offsetof(struct flb_out_gelf_config, compress),
+ "If transport protocol is udp, "
+ "you can set this if you want your UDP packets to be compressed."
+ },
+
+ /* EOF */
+ {0}
+};
+
+/* Plugin reference */
+struct flb_output_plugin out_gelf_plugin = {
+ .name = "gelf",
+ .description = "GELF Output",
+ .cb_init = cb_gelf_init,
+ .cb_pre_run = NULL,
+ .cb_flush = cb_gelf_flush,
+ .cb_exit = cb_gelf_exit,
+ .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS,
+ .config_map = config_map
+};