diff options
Diffstat (limited to 'fluent-bit/plugins/in_kmsg/in_kmsg.c')
-rw-r--r-- | fluent-bit/plugins/in_kmsg/in_kmsg.c | 390 |
1 files changed, 390 insertions, 0 deletions
diff --git a/fluent-bit/plugins/in_kmsg/in_kmsg.c b/fluent-bit/plugins/in_kmsg/in_kmsg.c new file mode 100644 index 000000000..0f27c67a1 --- /dev/null +++ b/fluent-bit/plugins/in_kmsg/in_kmsg.c @@ -0,0 +1,390 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_input.h> +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_engine.h> +#include <fluent-bit/flb_time.h> + +#include <msgpack.h> + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <limits.h> +#include <fcntl.h> +#include <errno.h> +#include <unistd.h> +#include <ctype.h> +#include <sys/stat.h> +#include <sys/time.h> +#include <inttypes.h> + +#include "in_kmsg.h" + +/* + * Note: Functions timeval_diff() and in_kmsg_boot_time() are based + * on syslog-ng-3.5 source code. + */ +static inline uint64_t timeval_diff(struct timeval *t1, struct timeval *t2) +{ + return ((uint64_t) t1->tv_sec - (uint64_t) t2->tv_sec) * KMSG_USEC_PER_SEC + + ((uint64_t) t1->tv_usec - (uint64_t) t2->tv_usec); +} + +static int boot_time(struct timeval *boot_time) +{ + int fd, pos = 0; + int bytes; + uint64_t tdiff; + char buf[256]; + struct timeval curr_time; + + fd = open("/proc/uptime", O_RDONLY); + if (fd == -1) { + return -1; + } + + bytes = read(fd, buf, sizeof(buf)); + if (bytes <= 0) { + close(fd); + return -1; + } + + close(fd); + gettimeofday(&curr_time, NULL); + + /* Read the seconds part */ + while (pos < bytes && buf[pos] != '.') { + if (isdigit(buf[pos])) { + boot_time->tv_sec = boot_time->tv_sec * 10 + ((buf[pos]) - '0'); + } + else { + boot_time->tv_sec = 0; + return 0; + } + pos++; + } + pos++; + + /* Then the microsecond part */ + while (pos < bytes && buf[pos] != ' ') { + if (isdigit(buf[pos])) { + boot_time->tv_usec = boot_time->tv_usec * 10 + ((buf[pos]) - '0'); + } + else { + boot_time->tv_sec = 0; + boot_time->tv_usec = 0; + return 0; + } + pos++; + } + + tdiff = timeval_diff(&curr_time, boot_time); + boot_time->tv_sec = tdiff / KMSG_USEC_PER_SEC; + boot_time->tv_usec = tdiff % KMSG_USEC_PER_SEC; + + return 0; +} + +static inline int process_line(const char *line, + struct flb_input_instance *i_ins, + struct flb_in_kmsg_config *ctx) +{ + char priority; /* log priority */ + uint64_t sequence; /* sequence number */ + struct timeval tv; /* time value */ + int line_len; + uint64_t val; + const char *p = line; + char *end = NULL; + struct flb_time ts; + int ret; + + /* Increase buffer position */ + ctx->buffer_id++; + + errno = 0; + val = strtol(p, &end, 10); + if ((errno == ERANGE && (val == INT_MAX || val == INT_MIN)) + || (errno != 0 && val == 0)) { + goto fail; + } + + /* Priority */ + priority = FLB_KLOG_PRI(val); + + if (priority > ctx->prio_level) { + /* Drop line */ + return 0; + } + + /* Sequence */ + p = strchr(p, ','); + if (!p) { + goto fail; + } + p++; + + val = strtol(p, &end, 10); + if ((errno == ERANGE && (val == INT_MAX || val == INT_MIN)) + || (errno != 0 && val == 0)) { + goto fail; + } + + sequence = val; + p = ++end; + + /* Timestamp */ + val = strtol(p, &end, 10); + if ((errno == ERANGE && (val == INT_MAX || val == INT_MIN)) + || (errno != 0 && val == 0)) { + goto fail; + } + + tv.tv_sec = val/1000000; + tv.tv_usec = val - (tv.tv_sec * 1000000); + + flb_time_set(&ts, ctx->boot_time.tv_sec + tv.tv_sec, tv.tv_usec * 1000); + + /* Now process the human readable message */ + p = strchr(p, ';'); + if (!p) { + goto fail; + } + p++; + + line_len = strlen(p); + + ret = flb_log_event_encoder_begin_record(&ctx->log_encoder); + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_set_timestamp( + &ctx->log_encoder, + &ts); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_values( + &ctx->log_encoder, + FLB_LOG_EVENT_CSTRING_VALUE("priority"), + FLB_LOG_EVENT_CHAR_VALUE(priority), + + FLB_LOG_EVENT_CSTRING_VALUE("sequence"), + FLB_LOG_EVENT_UINT64_VALUE(sequence), + + FLB_LOG_EVENT_CSTRING_VALUE("sec"), + FLB_LOG_EVENT_UINT64_VALUE(tv.tv_sec), + + FLB_LOG_EVENT_CSTRING_VALUE("usec"), + FLB_LOG_EVENT_UINT64_VALUE(tv.tv_usec), + + FLB_LOG_EVENT_CSTRING_VALUE("msg"), + FLB_LOG_EVENT_STRING_VALUE((char *) p, line_len - 1)); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_commit_record(&ctx->log_encoder); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + flb_input_log_append(ctx->ins, NULL, 0, + ctx->log_encoder.output_buffer, + ctx->log_encoder.output_length); + + ret = 0; + } + else { + flb_plg_error(ctx->ins, "Error encoding record : %d", ret); + + ret = -1; + } + + flb_log_event_encoder_reset(&ctx->log_encoder); + + flb_plg_debug(ctx->ins, "pri=%i seq=%" PRIu64 " sec=%ld usec=%ld msg_length=%i", + priority, + sequence, + (long int) tv.tv_sec, + (long int) tv.tv_usec, + line_len - 1); + return ret; + + fail: + ctx->buffer_id--; + return -1; +} + +/* Callback triggered when some Kernel Log buffer msgs are available */ +static int in_kmsg_collect(struct flb_input_instance *i_ins, + struct flb_config *config, void *in_context) +{ + int ret; + int bytes; + struct flb_in_kmsg_config *ctx = in_context; + + bytes = read(ctx->fd, ctx->buf_data, ctx->buf_size - 1); + if (bytes == -1) { + if (errno == -EPIPE) { + return -1; + } + return 0; + } + else if (bytes == 0) { + flb_errno(); + return 0; + } + ctx->buf_len += bytes; + + /* Always set a delimiter to avoid buffer trash */ + ctx->buf_data[ctx->buf_len] = '\0'; + + /* Check if our buffer is full */ + if (ctx->buffer_id + 1 == KMSG_BUFFER_SIZE) { + ret = flb_engine_flush(config, &in_kmsg_plugin); + if (ret == -1) { + ctx->buffer_id = 0; + } + } + + /* Process and enqueue the received line */ + process_line(ctx->buf_data, i_ins, ctx); + ctx->buf_len = 0; + + return 0; +} + +/* Init kmsg input */ +static int in_kmsg_init(struct flb_input_instance *ins, + struct flb_config *config, void *data) +{ + int fd; + int ret; + struct flb_in_kmsg_config *ctx; + (void) data; + + ctx = flb_calloc(1, sizeof(struct flb_in_kmsg_config)); + if (!ctx) { + flb_errno(); + return -1; + } + ctx->ins = ins; + ctx->buf_data = flb_malloc(FLB_KMSG_BUF_SIZE); + if (!ctx->buf_data) { + flb_errno(); + flb_free(ctx); + return -1; + } + ctx->buf_len = 0; + ctx->buf_size = FLB_KMSG_BUF_SIZE; + + /* Load the config map */ + ret = flb_input_config_map_set(ins, (void *)ctx); + if (ret == -1) { + flb_free(ctx); + return -1; + } + + /* set context */ + flb_input_set_context(ins, ctx); + + /* open device */ + fd = open(FLB_KMSG_DEV, O_RDONLY); + if (fd == -1) { + flb_errno(); + flb_free(ctx); + return -1; + } + ctx->fd = fd; + + /* get the system boot time */ + ret = boot_time(&ctx->boot_time); + if (ret == -1) { + flb_plg_error(ctx->ins, + "could not get system boot time for kmsg input plugin"); + flb_free(ctx); + return -1; + } + flb_plg_debug(ctx->ins, "prio_level is %d", ctx->prio_level); + + /* Set our collector based on a file descriptor event */ + ret = flb_input_set_collector_event(ins, + in_kmsg_collect, + ctx->fd, + config); + if (ret == -1) { + flb_plg_error(ctx->ins, + "could not set collector for kmsg input plugin"); + flb_free(ctx); + return -1; + } + + ret = flb_log_event_encoder_init(&ctx->log_encoder, + FLB_LOG_EVENT_FORMAT_DEFAULT); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_error(ctx->ins, "error initializing event encoder : %d", ret); + + flb_free(ctx); + + return -1; + } + + return 0; +} + +static int in_kmsg_exit(void *data, struct flb_config *config) +{ + (void)*config; + struct flb_in_kmsg_config *ctx = data; + + flb_log_event_encoder_destroy(&ctx->log_encoder); + + if (ctx->fd >= 0) { + close(ctx->fd); + } + + flb_free(ctx->buf_data); + flb_free(ctx); + return 0; +} + +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_INT, "prio_level", "8", + 0, FLB_TRUE, offsetof(struct flb_in_kmsg_config, prio_level), + "The log level to filter. The kernel log is dropped if its priority is more than prio_level. " + "Allowed values are 0-8. Default is 8." + }, + /* EOF */ + {0} +}; + +/* Plugin reference */ +struct flb_input_plugin in_kmsg_plugin = { + .name = "kmsg", + .description = "Kernel Log Buffer", + .cb_init = in_kmsg_init, + .cb_pre_run = NULL, + .cb_collect = in_kmsg_collect, + .cb_flush_buf = NULL, + .cb_exit = in_kmsg_exit, + .config_map = config_map +}; |