summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/in_stdin
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/plugins/in_stdin')
-rw-r--r--src/fluent-bit/plugins/in_stdin/CMakeLists.txt10
-rw-r--r--src/fluent-bit/plugins/in_stdin/in_stdin.c472
-rw-r--r--src/fluent-bit/plugins/in_stdin/in_stdin.h48
3 files changed, 0 insertions, 530 deletions
diff --git a/src/fluent-bit/plugins/in_stdin/CMakeLists.txt b/src/fluent-bit/plugins/in_stdin/CMakeLists.txt
deleted file mode 100644
index 3c2e2bdfe..000000000
--- a/src/fluent-bit/plugins/in_stdin/CMakeLists.txt
+++ /dev/null
@@ -1,10 +0,0 @@
-# FIXME: there is something wrong when linking objects and this
-# static plugin, I should not require to link to a specific symbol
-# if the object was already linked from fluent-bit core on src/, also
-# jsmn should not be required.
-
-set(src
- in_stdin.c
- )
-
-FLB_PLUGIN(in_stdin "${src}" "")
diff --git a/src/fluent-bit/plugins/in_stdin/in_stdin.c b/src/fluent-bit/plugins/in_stdin/in_stdin.c
deleted file mode 100644
index ff3114067..000000000
--- a/src/fluent-bit/plugins/in_stdin/in_stdin.c
+++ /dev/null
@@ -1,472 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <fluent-bit/flb_input_plugin.h>
-#include <fluent-bit/flb_config.h>
-#include <fluent-bit/flb_pack.h>
-#include <fluent-bit/flb_engine.h>
-#include <fluent-bit/flb_time.h>
-#include <fluent-bit/flb_parser.h>
-#include <fluent-bit/flb_error.h>
-#include <fluent-bit/flb_utils.h>
-
-#include <msgpack.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <fcntl.h>
-
-#include "in_stdin.h"
-
-static inline void consume_bytes(char *buf, int bytes, int length)
-{
- memmove(buf, buf + bytes, length - bytes);
-}
-
-static inline int process_pack(struct flb_in_stdin_config *ctx,
- char *data, size_t data_size)
-{
- struct flb_log_event_decoder log_decoder;
- struct flb_log_event log_event;
- msgpack_unpacked result;
- msgpack_object entry;
- int ret;
- size_t off;
-
- ret = flb_log_event_decoder_init(&log_decoder, NULL, 0);
-
- if (ret != FLB_EVENT_ENCODER_SUCCESS) {
- return -1;
- }
-
- /* Queue the data with time field */
- msgpack_unpacked_init(&result);
-
- off = 0;
- while (msgpack_unpack_next(&result, data, data_size, &off) == MSGPACK_UNPACK_SUCCESS) {
- entry = result.data;
-
- if (entry.type == MSGPACK_OBJECT_MAP) {
- ret = flb_log_event_encoder_begin_record(ctx->log_encoder);
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_log_event_encoder_set_current_timestamp(ctx->log_encoder);
- }
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_log_event_encoder_set_body_from_msgpack_object(
- ctx->log_encoder, &entry);
- }
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_log_event_encoder_commit_record(ctx->log_encoder);
- }
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = 0;
- }
- else {
- ret = -1;
-
- break;
- }
- }
- else if (entry.type == MSGPACK_OBJECT_ARRAY) {
- ret = flb_event_decoder_decode_object(&log_decoder,
- &log_event,
- &entry);
-
- if (ret != FLB_EVENT_ENCODER_SUCCESS) {
- ret = -1;
-
- break;
- }
-
- ret = flb_log_event_encoder_begin_record(ctx->log_encoder);
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_log_event_encoder_set_timestamp(ctx->log_encoder,
- &log_event.timestamp);
- }
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_log_event_encoder_set_metadata_from_msgpack_object(
- ctx->log_encoder, log_event.metadata);
- }
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_log_event_encoder_set_body_from_msgpack_object(
- ctx->log_encoder, log_event.body);
- }
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_log_event_encoder_commit_record(ctx->log_encoder);
- }
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = 0;
- }
- else {
- ret = -1;
-
- break;
- }
- }
- else {
- /*
- * Upon exception, acknowledge the user about the problem but continue
- * working, do not discard valid JSON entries.
- */
- flb_plg_error(ctx->ins, "invalid record found, "
- "it's not a JSON map or array");
- ret = -1;
- break;
- }
- }
-
- flb_log_event_decoder_destroy(&log_decoder);
-
- msgpack_unpacked_destroy(&result);
-
- return ret;
-}
-
-static inline int pack_regex(struct flb_in_stdin_config *ctx,
- struct flb_time *t, char *data, size_t data_size)
-{
- int ret;
-
- ret = flb_log_event_encoder_begin_record(ctx->log_encoder);
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_log_event_encoder_set_timestamp(ctx->log_encoder, t);
- }
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_log_event_encoder_set_body_from_raw_msgpack(
- ctx->log_encoder, data, data_size);
- }
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_log_event_encoder_commit_record(ctx->log_encoder);
- }
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = 0;
- }
- else {
- ret = -1;
- }
-
- return ret;
-}
-
-static int in_stdin_collect(struct flb_input_instance *ins,
- struct flb_config *config, void *in_context)
-{
- int bytes = 0;
- int pack_size;
- int ret;
- char *pack;
- void *out_buf;
- size_t out_size;
- struct flb_time out_time;
- struct flb_in_stdin_config *ctx = in_context;
-
- bytes = read(ctx->fd,
- ctx->buf + ctx->buf_len,
- ctx->buf_size - ctx->buf_len - 1);
- flb_plg_trace(ctx->ins, "stdin read() = %i", bytes);
-
- if (bytes == 0) {
- flb_plg_warn(ctx->ins, "end of file (stdin closed by remote end)");
- }
-
- if (bytes <= 0) {
- flb_input_collector_pause(ctx->coll_fd, ctx->ins);
- flb_engine_exit(config);
- return -1;
- }
- ctx->buf_len += bytes;
- ctx->buf[ctx->buf_len] = '\0';
-
- while (ctx->buf_len > 0) {
- /* Try built-in JSON parser */
- if (!ctx->parser) {
- ret = flb_pack_json_state(ctx->buf, ctx->buf_len,
- &pack, &pack_size, &ctx->pack_state);
- if (ret == FLB_ERR_JSON_PART) {
- flb_plg_debug(ctx->ins, "data incomplete, waiting for more...");
- return 0;
- }
- else if (ret == FLB_ERR_JSON_INVAL) {
- flb_plg_debug(ctx->ins, "invalid JSON message, skipping");
- flb_pack_state_reset(&ctx->pack_state);
- flb_pack_state_init(&ctx->pack_state);
- ctx->pack_state.multiple = FLB_TRUE;
- ctx->buf_len = 0;
- return -1;
- }
-
- /* Process valid packaged records */
- process_pack(ctx, pack, pack_size);
-
- /* Move out processed bytes */
- consume_bytes(ctx->buf, ctx->pack_state.last_byte, ctx->buf_len);
- ctx->buf_len -= ctx->pack_state.last_byte;
- ctx->buf[ctx->buf_len] = '\0';
-
- flb_pack_state_reset(&ctx->pack_state);
- flb_pack_state_init(&ctx->pack_state);
- ctx->pack_state.multiple = FLB_TRUE;
-
- flb_free(pack);
-
- if (ctx->log_encoder->output_length > 0) {
- flb_input_log_append(ctx->ins, NULL, 0,
- ctx->log_encoder->output_buffer,
- ctx->log_encoder->output_length);
- }
-
- flb_log_event_encoder_reset(ctx->log_encoder);
-
- return 0;
- }
- else {
- /* Reset time for each line */
- flb_time_zero(&out_time);
-
- /* Use the defined parser */
- ret = flb_parser_do(ctx->parser, ctx->buf, ctx->buf_len,
- &out_buf, &out_size, &out_time);
-
- if (ret >= 0) {
- if (flb_time_to_nanosec(&out_time) == 0L) {
- flb_time_get(&out_time);
- }
- pack_regex(ctx, &out_time, out_buf, out_size);
- flb_free(out_buf);
-
- if (ctx->log_encoder->output_length > 0) {
- flb_input_log_append(ctx->ins, NULL, 0,
- ctx->log_encoder->output_buffer,
- ctx->log_encoder->output_length);
- }
-
- flb_log_event_encoder_reset(ctx->log_encoder);
- }
- else {
- /* we need more data ? */
- flb_plg_trace(ctx->ins, "data mismatch or incomplete : %d", ret);
- return 0;
- }
- }
-
- if (ret == ctx->buf_len) {
- ctx->buf_len = 0;
- break;
- }
- else if (ret >= 0) {
- /*
- * 'ret' is the last byte consumed by the regex engine, we need
- * to advance it position.
- */
- ret++;
- consume_bytes(ctx->buf, ret, ctx->buf_len);
- ctx->buf_len -= ret;
- ctx->buf[ctx->buf_len] = '\0';
- }
- }
-
- return 0;
-}
-
-/* Read stdin config*/
-static int in_stdin_config_init(struct flb_in_stdin_config *ctx,
- struct flb_input_instance *in,
- struct flb_config *config)
-{
- int ret;
-
- ctx->buf_size = DEFAULT_BUF_SIZE;
- ctx->buf = NULL;
- ctx->buf_len = 0;
- ctx->ins = in;
-
- ret = flb_input_config_map_set(in, (void *)ctx);
- if (ret == -1) {
- return -1;
- }
-
- /* parser settings */
- if (ctx->parser_name) {
- ctx->parser = flb_parser_get(ctx->parser_name, config);
- if (!ctx->parser) {
- flb_plg_error(ctx->ins, "requested parser '%s' not found", ctx->parser_name);
- return -1;
- }
- }
-
- /* buffer size setting */
- if (ctx->buf_size == -1) {
- flb_plg_error(ctx->ins, "buffer_size is invalid");
- return -1;
- }
- else if (ctx->buf_size < DEFAULT_BUF_SIZE) {
- flb_plg_error(ctx->ins, "buffer_size '%zu' must be at least %i bytes",
- ctx->buf_size, DEFAULT_BUF_SIZE);
- return -1;
- }
-
- flb_plg_debug(ctx->ins, "buf_size=%zu", ctx->buf_size);
- return 0;
-}
-
-static void in_stdin_config_destroy(struct flb_in_stdin_config *ctx)
-{
- if (!ctx) {
- return;
- }
-
- if (ctx->log_encoder != NULL) {
- flb_log_event_encoder_destroy(ctx->log_encoder);
- }
-
- /* release buffer */
- if (ctx->buf) {
- flb_free(ctx->buf);
- }
- flb_free(ctx);
-}
-
-/* Initialize plugin */
-static int in_stdin_init(struct flb_input_instance *in,
- struct flb_config *config, void *data)
-{
- int fd;
- int ret;
- struct flb_in_stdin_config *ctx;
-
- /* Allocate space for the configuration context */
- ctx = flb_calloc(1, sizeof(struct flb_in_stdin_config));
- if (!ctx) {
- return -1;
- }
-
- ctx->log_encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_DEFAULT);
-
- if (ctx->log_encoder == NULL) {
- flb_plg_error(in, "could not initialize event encoder");
-
- goto init_error;
- }
-
- /* Initialize stdin config */
- ret = in_stdin_config_init(ctx, in, config);
- if (ret < 0) {
- goto init_error;
- }
-
- ctx->buf = flb_malloc(ctx->buf_size);
- if (!ctx->buf) {
- flb_errno();
- goto init_error;
- }
-
- /* Clone the standard input file descriptor */
- fd = dup(STDIN_FILENO);
- if (fd == -1) {
- flb_errno();
- flb_plg_error(ctx->ins, "Could not open standard input!");
- goto init_error;
- }
- ctx->fd = fd;
-
- /* Always initialize built-in JSON pack state */
- flb_pack_state_init(&ctx->pack_state);
- ctx->pack_state.multiple = FLB_TRUE;
-
- /* Set the context */
- flb_input_set_context(in, ctx);
-
- /* Collect upon data available on the standard input */
- ret = flb_input_set_collector_event(in,
- in_stdin_collect,
- ctx->fd,
- config);
- if (ret == -1) {
- flb_plg_error(ctx->ins, "Could not set collector for STDIN input plugin");
- goto init_error;
- }
- ctx->coll_fd = ret;
-
- return 0;
-
-init_error:
- in_stdin_config_destroy(ctx);
-
- return -1;
-}
-
-/* Cleanup serial input */
-static int in_stdin_exit(void *in_context, struct flb_config *config)
-{
- struct flb_in_stdin_config *ctx = in_context;
-
- if (!ctx) {
- return 0;
- }
-
- if (ctx->fd >= 0) {
- close(ctx->fd);
- }
- flb_pack_state_reset(&ctx->pack_state);
- in_stdin_config_destroy(ctx);
-
- return 0;
-}
-
-static struct flb_config_map config_map[] = {
- {
- FLB_CONFIG_MAP_STR, "parser", (char *)NULL,
- 0, FLB_TRUE, offsetof(struct flb_in_stdin_config, parser_name),
- "Set and use a fluent-bit parser"
- },
- {
- FLB_CONFIG_MAP_SIZE, "buffer_size", (char *)NULL,
- 0, FLB_TRUE, offsetof(struct flb_in_stdin_config, buf_size),
- "Set the read buffer size"
- },
- /* EOF */
- {0}
-};
-
-/* Plugin reference */
-struct flb_input_plugin in_stdin_plugin = {
- .name = "stdin",
- .description = "Standard Input",
- .cb_init = in_stdin_init,
- .cb_pre_run = NULL,
- .cb_collect = in_stdin_collect,
- .cb_flush_buf = NULL,
- .cb_exit = in_stdin_exit,
- .config_map = config_map
-};
diff --git a/src/fluent-bit/plugins/in_stdin/in_stdin.h b/src/fluent-bit/plugins/in_stdin/in_stdin.h
deleted file mode 100644
index 0c165809b..000000000
--- a/src/fluent-bit/plugins/in_stdin/in_stdin.h
+++ /dev/null
@@ -1,48 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef FLB_IN_STDIN_H
-#define FLB_IN_STDIN_H
-
-#include <fluent-bit/flb_config.h>
-#include <fluent-bit/flb_input.h>
-#include <fluent-bit/flb_log_event_decoder.h>
-#include <fluent-bit/flb_log_event_encoder.h>
-
-#define DEFAULT_BUF_SIZE 16000
-
-/* STDIN Input configuration & context */
-struct flb_in_stdin_config {
- int fd; /* stdin file descriptor */
- int coll_fd; /* collector fd */
- size_t buf_size; /* size of a buffer */
- int buf_len; /* read buffer length */
- char *buf; /* read buffer */
- flb_sds_t parser_name; /* name of the parser */
-
- /* Parser / Format */
- struct flb_parser *parser;
- struct flb_pack_state pack_state;
- struct flb_input_instance *ins;
- struct flb_log_event_encoder *log_encoder;
-};
-
-extern struct flb_input_plugin in_stdin_plugin;
-
-#endif