summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/plugins/filter_multiline/ml.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/plugins/filter_multiline/ml.c')
-rw-r--r--src/fluent-bit/plugins/filter_multiline/ml.c931
1 files changed, 931 insertions, 0 deletions
diff --git a/src/fluent-bit/plugins/filter_multiline/ml.c b/src/fluent-bit/plugins/filter_multiline/ml.c
new file mode 100644
index 000000000..b63282628
--- /dev/null
+++ b/src/fluent-bit/plugins/filter_multiline/ml.c
@@ -0,0 +1,931 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_filter_plugin.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_metrics.h>
+#include <fluent-bit/flb_storage.h>
+#include <fluent-bit/multiline/flb_ml.h>
+#include <fluent-bit/multiline/flb_ml_parser.h>
+#include <fluent-bit/flb_scheduler.h>
+#include <fluent-bit/flb_log_event_decoder.h>
+#include <fluent-bit/flb_log_event_encoder.h>
+
+#include "ml.h"
+#include "ml_concat.h"
+
+static struct ml_stream *get_by_id(struct ml_ctx *ctx, uint64_t stream_id);
+
+/* Create an emitter input instance */
+static int emitter_create(struct ml_ctx *ctx)
+{
+ int ret;
+ struct flb_input_instance *ins;
+
+ ret = flb_input_name_exists(ctx->emitter_name, ctx->config);
+ if (ret == FLB_TRUE) {
+ flb_plg_error(ctx->ins, "emitter_name '%s' already exists",
+ ctx->emitter_name);
+ return -1;
+ }
+
+ ins = flb_input_new(ctx->config, "emitter", NULL, FLB_FALSE);
+ if (!ins) {
+ flb_plg_error(ctx->ins, "cannot create emitter instance");
+ return -1;
+ }
+
+ /* Set the alias name */
+ ret = flb_input_set_property(ins, "alias", ctx->emitter_name);
+ if (ret == -1) {
+ flb_plg_warn(ctx->ins,
+ "cannot set emitter_name, using fallback name '%s'",
+ ins->name);
+ }
+
+ /* Set the emitter_mem_buf_limit */
+ if(ctx->emitter_mem_buf_limit > 0) {
+ ins->mem_buf_limit = ctx->emitter_mem_buf_limit;
+ }
+
+ /* Set the storage type */
+ ret = flb_input_set_property(ins, "storage.type",
+ ctx->emitter_storage_type);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "cannot set storage.type");
+ }
+
+ /* Initialize emitter plugin */
+ ret = flb_input_instance_init(ins, ctx->config);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "cannot initialize emitter instance '%s'",
+ ins->name);
+ flb_input_instance_exit(ins, ctx->config);
+ flb_input_instance_destroy(ins);
+ return -1;
+ }
+
+#ifdef FLB_HAVE_METRICS
+ /* Override Metrics title */
+ ret = flb_metrics_title(ctx->emitter_name, ins->metrics);
+ if (ret == -1) {
+ flb_plg_warn(ctx->ins, "cannot set metrics title, using fallback name %s",
+ ins->name);
+ }
+#endif
+
+ /* Storage context */
+ ret = flb_storage_input_create(ctx->config->cio, ins);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "cannot initialize storage for stream '%s'",
+ ctx->emitter_name);
+ flb_input_instance_exit(ins, ctx->config);
+ flb_input_instance_destroy(ins);
+ return -1;
+ }
+ ctx->ins_emitter = ins;
+ return 0;
+}
+
+static int multiline_load_parsers(struct ml_ctx *ctx)
+{
+ int ret;
+ struct mk_list *head;
+ struct mk_list *head_p;
+ struct flb_config_map_val *mv;
+ struct flb_slist_entry *val = NULL;
+ struct flb_ml_parser_ins *parser_i;
+
+ if (!ctx->multiline_parsers) {
+ return -1;
+ }
+
+ /*
+ * Iterate all 'multiline.parser' entries. Every entry is considered
+ * a group which can have multiple multiline parser instances.
+ */
+ flb_config_map_foreach(head, mv, ctx->multiline_parsers) {
+ mk_list_foreach(head_p, mv->val.list) {
+ val = mk_list_entry(head_p, struct flb_slist_entry, _head);
+
+ /* Create an instance of the defined parser */
+ parser_i = flb_ml_parser_instance_create(ctx->m, val->str);
+ if (!parser_i) {
+ return -1;
+ }
+
+ /* Always override parent parser values */
+ if (ctx->key_content) {
+ ret = flb_ml_parser_instance_set(parser_i,
+ "key_content",
+ ctx->key_content);
+ if (ret == -1) {
+ flb_plg_error(ctx->ins, "could not override 'key_content'");
+ return -1;
+ }
+ }
+ }
+ }
+
+ return 0;
+}
+
+static int flush_callback(struct flb_ml_parser *parser,
+ struct flb_ml_stream *mst,
+ void *data, char *buf_data, size_t buf_size)
+{
+ int ret;
+ struct ml_ctx *ctx = data;
+ struct ml_stream *stream;
+
+ if (ctx->debug_flush) {
+ flb_ml_flush_stdout(parser, mst, data, buf_data, buf_size);
+ }
+
+ if (ctx->use_buffer == FLB_FALSE) {
+ /* Append incoming record to our msgpack context buffer */
+ msgpack_sbuffer_write(&ctx->mp_sbuf, buf_data, buf_size);
+ return 0;
+
+ } else { /* buffered mode */
+ stream = get_by_id(ctx, mst->id);
+ if (!stream) {
+ flb_plg_error(ctx->ins, "Could not find tag to re-emit from stream %s",
+ mst->name);
+ return -1;
+ }
+
+ /* Emit record with original tag */
+ flb_plg_trace(ctx->ins, "emitting from %s to %s", stream->input_name, stream->tag);
+ ret = in_emitter_add_record(stream->tag, flb_sds_len(stream->tag), buf_data, buf_size,
+ ctx->ins_emitter);
+
+ return ret;
+ }
+}
+
+static int cb_ml_init(struct flb_filter_instance *ins,
+ struct flb_config *config,
+ void *data)
+{
+ int ret;
+ struct ml_ctx *ctx;
+ flb_sds_t tmp;
+ flb_sds_t emitter_name = NULL;
+ int len;
+ uint64_t stream_id;
+ (void) config;
+
+ ctx = flb_calloc(1, sizeof(struct ml_ctx));
+ if (!ctx) {
+ flb_errno();
+ return -1;
+ }
+ ctx->ins = ins;
+ ctx->debug_flush = FLB_FALSE;
+ ctx->config = config;
+ ctx->timer_created = FLB_FALSE;
+
+ /*
+ * Config map is not yet set at this point in the code
+ * user must explicitly set buffer to false to turn it off
+ */
+ ctx->use_buffer = FLB_TRUE;
+ tmp = (char *) flb_filter_get_property("buffer", ins);
+ if (tmp) {
+ ctx->use_buffer = flb_utils_bool(tmp);
+ }
+ ctx->partial_mode = FLB_FALSE;
+ tmp = (char *) flb_filter_get_property("mode", ins);
+ if (tmp != NULL) {
+ if (strcasecmp(tmp, FLB_MULTILINE_MODE_PARTIAL_MESSAGE) == 0) {
+ ctx->partial_mode = FLB_TRUE;
+ } else if (strcasecmp(tmp, FLB_MULTILINE_MODE_PARSER) == 0) {
+ ctx->partial_mode = FLB_FALSE;
+ } else {
+ flb_plg_error(ins, "'Mode' must be '%s' or '%s'",
+ FLB_MULTILINE_MODE_PARTIAL_MESSAGE,
+ FLB_MULTILINE_MODE_PARSER);
+ return -1;
+ }
+ }
+
+ if (ctx->partial_mode == FLB_TRUE && ctx->use_buffer == FLB_FALSE) {
+ flb_plg_error(ins, "'%s' 'Mode' requires 'Buffer' to be 'On'",
+ FLB_MULTILINE_MODE_PARTIAL_MESSAGE);
+ }
+
+ if (ctx->use_buffer == FLB_FALSE) {
+ /* Init buffers */
+ msgpack_sbuffer_init(&ctx->mp_sbuf);
+ msgpack_packer_init(&ctx->mp_pck, &ctx->mp_sbuf, msgpack_sbuffer_write);
+ } else {
+ /*
+ * Emitter name: every buffered multiline instance needs an emitter input plugin,
+ * with that one is able to emit records. We use a unique instance so we
+ * can use the metrics interface.
+ *
+ * If not set, we define an emitter name
+ *
+ * Validate if the emitter_name has been set before to check with the
+ * config map. If is not set, do a manual set of the property, so we let the
+ * config map handle the memory allocation.
+ */
+ tmp = (char *) flb_filter_get_property("emitter_name", ins);
+ if (!tmp) {
+ emitter_name = flb_sds_create_size(64);
+ if (!emitter_name) {
+ flb_free(ctx);
+ return -1;
+ }
+
+ tmp = flb_sds_printf(&emitter_name, "emitter_for_%s",
+ flb_filter_name(ins));
+ if (!tmp) {
+ flb_plg_error(ins, "cannot compose emitter_name");
+ flb_sds_destroy(emitter_name);
+ flb_free(ctx);
+ return -1;
+ }
+
+ flb_filter_set_property(ins, "emitter_name", emitter_name);
+ flb_plg_info(ins, "created emitter: %s", emitter_name);
+ flb_sds_destroy(emitter_name);
+ }
+ }
+
+ /* Load the config map */
+ ret = flb_filter_config_map_set(ins, (void *) ctx);
+ if (ret == -1) {
+ flb_free(ctx);
+ return -1;
+ }
+
+ /* Set plugin context */
+ flb_filter_set_context(ins, ctx);
+
+ if (ctx->key_content == NULL && ctx->partial_mode == FLB_TRUE) {
+ flb_plg_error(ins, "'Mode' '%s' requires 'multiline.key_content'",
+ FLB_MULTILINE_MODE_PARTIAL_MESSAGE);
+ flb_free(ctx);
+ return -1;
+ }
+
+ if (ctx->partial_mode == FLB_FALSE && mk_list_size(ctx->multiline_parsers) == 0) {
+ flb_plg_error(ins, "The default 'Mode' '%s' requires at least one 'multiline.parser'",
+ FLB_MULTILINE_MODE_PARSER);
+ flb_free(ctx);
+ return -1;
+ }
+
+
+ if (ctx->use_buffer == FLB_TRUE) {
+ /*
+ * Emitter Storage Type: the emitter input plugin to be created by default
+ * uses memory buffer, this option allows to define a filesystem mechanism
+ * for new records created (only if the main service is also filesystem
+ * enabled).
+ *
+ * On this code we just validate the input type: 'memory' or 'filesystem'.
+ */
+ tmp = ctx->emitter_storage_type;
+ if (strcasecmp(tmp, "memory") != 0 && strcasecmp(tmp, "filesystem") != 0) {
+ flb_plg_error(ins, "invalid 'emitter_storage.type' value. Only "
+ "'memory' or 'filesystem' types are allowed");
+ flb_free(ctx);
+ return -1;
+ }
+
+ /* Create the emitter context */
+ ret = emitter_create(ctx);
+ if (ret == -1) {
+ flb_free(ctx);
+ return -1;
+ }
+
+ /* Register a metric to count the number of emitted records */
+#ifdef FLB_HAVE_METRICS
+ ctx->cmt_emitted = cmt_counter_create(ins->cmt,
+ "fluentbit", "filter", "emit_records_total",
+ "Total number of emitted records",
+ 1, (char *[]) {"name"});
+
+ /* OLD api */
+ flb_metrics_add(FLB_MULTILINE_METRIC_EMITTED,
+ "emit_records", ctx->ins->metrics);
+#endif
+ }
+
+ mk_list_init(&ctx->ml_streams);
+ mk_list_init(&ctx->split_message_packers);
+
+ if (ctx->partial_mode == FLB_FALSE) {
+ /* Create multiline context */
+ ctx->m = flb_ml_create(config, ctx->ins->name);
+ if (!ctx->m) {
+ /*
+ * we don't free the context since upon init failure, the exit
+ * callback will be triggered with our context set above.
+ */
+ return -1;
+ }
+
+ /* Load the parsers/config */
+ ret = multiline_load_parsers(ctx);
+ if (ret == -1) {
+ return -1;
+ }
+
+ if (ctx->use_buffer == FLB_TRUE) {
+
+ ctx->m->flush_ms = ctx->flush_ms;
+ ret = flb_ml_auto_flush_init(ctx->m);
+ if (ret == -1) {
+ return -1;
+ }
+ } else {
+ /* Create a stream for this file */
+ len = strlen(ins->name);
+ ret = flb_ml_stream_create(ctx->m,
+ ins->name, len,
+ flush_callback, ctx,
+ &stream_id);
+ if (ret != 0) {
+ flb_plg_error(ctx->ins, "could not create multiline stream");
+ return -1;
+ }
+ ctx->stream_id = stream_id;
+ }
+ }
+
+ return 0;
+}
+
+void ml_stream_destroy(struct ml_stream *stream)
+{
+ if (!stream) {
+ return;
+ }
+
+ if (stream->input_name) {
+ flb_sds_destroy(stream->input_name);
+ }
+ if (stream->tag) {
+ flb_sds_destroy(stream->tag);
+ }
+ flb_free(stream);
+ return;
+}
+
+static struct ml_stream *get_by_id(struct ml_ctx *ctx, uint64_t stream_id)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct ml_stream *stream;
+
+ mk_list_foreach_safe(head, tmp, &ctx->ml_streams) {
+ stream = mk_list_entry(head, struct ml_stream, _head);
+ if (stream->stream_id == stream_id) {
+ return stream;
+ }
+ }
+
+ return NULL;
+}
+
+static struct ml_stream *get_or_create_stream(struct ml_ctx *ctx,
+ struct flb_input_instance *i_ins,
+ const char *tag, int tag_len)
+{
+ uint64_t stream_id;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct ml_stream *stream;
+ flb_sds_t stream_name;
+ flb_sds_t tmp_sds;
+ int name_check;
+ int tag_check;
+ int len;
+ int ret;
+
+ mk_list_foreach_safe(head, tmp, &ctx->ml_streams) {
+ stream = mk_list_entry(head, struct ml_stream, _head);
+ name_check = strcmp(stream->input_name, i_ins->name);
+ tag_check = strcmp(stream->tag, tag);
+ if (tag_check == 0 && name_check == 0) {
+ flb_plg_trace(ctx->ins, "using stream %s_%s", stream->input_name, stream->tag);
+ return stream;
+ }
+ }
+
+ /* create a new stream */
+
+ stream_name = flb_sds_create_size(64);
+
+ tmp_sds = flb_sds_printf(&stream_name, "%s_%s", i_ins->name, tag);
+ if (!tmp_sds) {
+ flb_errno();
+ flb_sds_destroy(stream_name);
+ return NULL;
+ }
+ stream_name = tmp_sds;
+
+ stream = flb_calloc(1, sizeof(struct ml_stream));
+ if (!stream) {
+ flb_errno();
+ flb_sds_destroy(stream_name);
+ return NULL;
+ }
+
+ tmp_sds = flb_sds_create(tag);
+ if (!tmp) {
+ flb_errno();
+ flb_sds_destroy(stream_name);
+ ml_stream_destroy(stream);
+ return NULL;
+ }
+ stream->tag = tmp_sds;
+
+ tmp_sds = flb_sds_create(i_ins->name);
+ if (!tmp_sds) {
+ flb_errno();
+ flb_sds_destroy(stream_name);
+ ml_stream_destroy(stream);
+ return NULL;
+ }
+ stream->input_name = tmp_sds;
+
+ /* Create an flb_ml_stream for this stream */
+ flb_plg_info(ctx->ins, "created new multiline stream for %s", stream_name);
+ len = flb_sds_len(stream_name);
+ ret = flb_ml_stream_create(ctx->m,
+ stream_name, len,
+ flush_callback, ctx,
+ &stream_id);
+ if (ret != 0) {
+ flb_plg_error(ctx->ins, "could not create multiline stream for %s",
+ stream_name);
+ flb_sds_destroy(stream_name);
+ ml_stream_destroy(stream);
+ return NULL;
+ }
+ stream->stream_id = stream_id;
+ mk_list_add(&stream->_head, &ctx->ml_streams);
+ flb_plg_debug(ctx->ins, "Created new ML stream for %s", stream_name);
+ flb_sds_destroy(stream_name);
+
+ return stream;
+}
+
+static void partial_timer_cb(struct flb_config *config, void *data)
+{
+ struct ml_ctx *ctx = data;
+ (void) config;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct split_message_packer *packer;
+ unsigned long long now;
+ unsigned long long diff;
+ int ret;
+
+ now = ml_current_timestamp();
+
+ mk_list_foreach_safe(head, tmp, &ctx->split_message_packers) {
+ packer = mk_list_entry(head, struct split_message_packer, _head);
+
+ diff = now - packer->last_write_time;
+ if (diff <= ctx->flush_ms) {
+ continue;
+ }
+
+ mk_list_del(&packer->_head);
+ ml_split_message_packer_complete(packer);
+ /* re-emit record with original tag */
+ if (packer->log_encoder.output_buffer != NULL &&
+ packer->log_encoder.output_length > 0) {
+
+ flb_plg_trace(ctx->ins, "emitting from %s to %s", packer->input_name, packer->tag);
+ ret = in_emitter_add_record(packer->tag, flb_sds_len(packer->tag),
+ packer->log_encoder.output_buffer,
+ packer->log_encoder.output_length,
+ ctx->ins_emitter);
+ if (ret < 0) {
+ /* this shouldn't happen in normal execution */
+ flb_plg_warn(ctx->ins,
+ "Couldn't send concatenated record of size %zu "
+ "bytes to in_emitter %s",
+ packer->log_encoder.output_length,
+ ctx->ins_emitter->name);
+ }
+ }
+ ml_split_message_packer_destroy(packer);
+ }
+}
+
+static int ml_filter_partial(const void *data, size_t bytes,
+ const char *tag, int tag_len,
+ void **out_buf, size_t *out_bytes,
+ struct flb_filter_instance *f_ins,
+ struct flb_input_instance *i_ins,
+ void *filter_context,
+ struct flb_config *config)
+{
+ int ret;
+ struct ml_ctx *ctx = filter_context;
+ msgpack_sbuffer tmp_sbuf;
+ msgpack_packer tmp_pck;
+ int partial_records = 0;
+ int return_records = 0;
+ int partial = FLB_FALSE;
+ int is_last_partial = FLB_FALSE;
+ struct split_message_packer *packer;
+ char *partial_id_str = NULL;
+ size_t partial_id_size = 0;
+ struct flb_sched *sched;
+ struct flb_log_event_encoder log_encoder;
+ struct flb_log_event_decoder log_decoder;
+ struct flb_log_event log_event;
+
+ (void) f_ins;
+ (void) config;
+
+ ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
+
+ if (ret != FLB_EVENT_DECODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event decoder initialization error : %d", ret);
+
+ return FLB_FILTER_NOTOUCH;
+ }
+
+ ret = flb_log_event_encoder_init(&log_encoder,
+ FLB_LOG_EVENT_FORMAT_DEFAULT);
+
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event encoder initialization error : %d", ret);
+
+ flb_log_event_decoder_destroy(&log_decoder);
+
+ return FLB_FILTER_NOTOUCH;
+ }
+
+ /*
+ * create a timer that will run periodically and check if pending buffers
+ * have expired
+ * this is created once on the first flush
+ */
+ if (ctx->timer_created == FLB_FALSE) {
+ flb_plg_debug(ctx->ins,
+ "Creating flush timer with frequency %dms",
+ ctx->flush_ms);
+
+ sched = flb_sched_ctx_get();
+
+ ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM,
+ ctx->flush_ms / 2, partial_timer_cb,
+ ctx, NULL);
+ if (ret < 0) {
+ flb_plg_error(ctx->ins, "Failed to create flush timer");
+ } else {
+ ctx->timer_created = FLB_TRUE;
+ }
+ }
+
+ /*
+ * Create temporary msgpack buffer
+ * for non-partial messages which are passed on as-is
+ */
+ msgpack_sbuffer_init(&tmp_sbuf);
+ msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write);
+
+ while ((ret = flb_log_event_decoder_next(
+ &log_decoder,
+ &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
+ partial = ml_is_partial(log_event.body);
+ if (partial == FLB_TRUE) {
+ partial_records++;
+ ret = ml_get_partial_id(log_event.body, &partial_id_str, &partial_id_size);
+ if (ret == -1) {
+ flb_plg_warn(ctx->ins, "Could not find partial_id but partial_message key is FLB_TRUE for record with tag %s", tag);
+ /* handle this record as non-partial */
+ partial_records--;
+ goto pack_non_partial;
+ }
+ packer = ml_get_packer(&ctx->split_message_packers, tag,
+ i_ins->name, partial_id_str, partial_id_size);
+ if (packer == NULL) {
+ flb_plg_trace(ctx->ins, "Found new partial record with tag %s", tag);
+ packer = ml_create_packer(tag, i_ins->name, partial_id_str, partial_id_size,
+ log_event.body, ctx->key_content, &log_event.timestamp);
+ if (packer == NULL) {
+ flb_plg_warn(ctx->ins, "Could not create packer for partial record with tag %s", tag);
+ /* handle this record as non-partial */
+ partial_records--;
+ goto pack_non_partial;
+ }
+ mk_list_add(&packer->_head, &ctx->split_message_packers);
+ }
+ ret = ml_split_message_packer_write(packer, log_event.body, ctx->key_content);
+ if (ret < 0) {
+ flb_plg_warn(ctx->ins, "Could not append content for partial record with tag %s", tag);
+ /* handle this record as non-partial */
+ partial_records--;
+ goto pack_non_partial;
+ }
+ is_last_partial = ml_is_partial_last(log_event.body);
+ if (is_last_partial == FLB_TRUE) {
+ /* emit the record in this filter invocation */
+ return_records++;
+ ml_split_message_packer_complete(packer);
+ ml_append_complete_record(packer, &log_encoder);
+ mk_list_del(&packer->_head);
+ ml_split_message_packer_destroy(packer);
+ }
+ } else {
+
+pack_non_partial:
+ return_records++;
+ /* record passed from filter as-is */
+
+ ret = flb_log_event_encoder_emit_raw_record(
+ &log_encoder,
+ log_decoder.record_base,
+ log_decoder.record_length);
+
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ flb_plg_error(ctx->ins,
+ "Log event encoder initialization error : %d", ret);
+ }
+ }
+ }
+
+ if (partial_records == 0) {
+ /* if no records were partial, we didn't modify the chunk */
+ flb_log_event_decoder_destroy(&log_decoder);
+ flb_log_event_encoder_destroy(&log_encoder);
+
+ msgpack_sbuffer_destroy(&tmp_sbuf);
+
+ return FLB_FILTER_NOTOUCH;
+ } else if (return_records > 0) {
+ /* some new records can be returned now, return a new buffer */
+ if (log_encoder.output_length > 0) {
+ *out_buf = log_encoder.output_buffer;
+ *out_bytes = log_encoder.output_length;
+
+ ret = FLB_FILTER_MODIFIED;
+
+ flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder);
+ }
+ else {
+ ret = FLB_FILTER_NOTOUCH;
+ }
+
+ flb_log_event_decoder_destroy(&log_decoder);
+ flb_log_event_encoder_destroy(&log_encoder);
+
+ return ret;
+ } else {
+ /* no records to return right now, free buffer */
+ flb_log_event_decoder_destroy(&log_decoder);
+ flb_log_event_encoder_destroy(&log_encoder);
+
+ msgpack_sbuffer_destroy(&tmp_sbuf);
+ }
+
+ return FLB_FILTER_MODIFIED;
+}
+
+static int cb_ml_filter(const void *data, size_t bytes,
+ const char *tag, int tag_len,
+ void **out_buf, size_t *out_bytes,
+ struct flb_filter_instance *f_ins,
+ struct flb_input_instance *i_ins,
+ void *filter_context,
+ struct flb_config *config)
+{
+ size_t tmp_size;
+ char *tmp_buf;
+ struct flb_log_event_decoder decoder;
+ struct ml_stream *stream;
+ struct flb_log_event event;
+ int ret;
+ struct ml_ctx *ctx;
+
+ (void) f_ins;
+ (void) config;
+
+ ctx = (struct ml_ctx *) filter_context;
+
+ if (i_ins == ctx->ins_emitter) {
+ flb_plg_trace(ctx->ins, "not processing records from the emitter");
+ return FLB_FILTER_NOTOUCH;
+ }
+
+ /* 'partial_message' mode */
+ if (ctx->partial_mode == FLB_TRUE) {
+ return ml_filter_partial(data, bytes, tag, tag_len,
+ out_buf, out_bytes,
+ f_ins, i_ins,
+ filter_context, config);
+ }
+
+ /* 'parser' mode */
+ if (ctx->use_buffer == FLB_FALSE) {
+ /* reset mspgack size content */
+ ctx->mp_sbuf.size = 0;
+
+ /* process records */
+ flb_log_event_decoder_init(&decoder, (char *) data, bytes);
+
+ while (flb_log_event_decoder_next(&decoder, &event) ==
+ FLB_EVENT_DECODER_SUCCESS) {
+ ret = flb_ml_append_event(ctx->m, ctx->stream_id, &event);
+
+ if (ret != 0) {
+ flb_plg_debug(ctx->ins,
+ "could not append object from tag: %s", tag);
+ }
+ }
+
+ flb_log_event_decoder_destroy(&decoder);
+
+ /* flush all pending data (there is no auto-flush when unbuffered) */
+ flb_ml_flush_pending_now(ctx->m);
+
+ if (ctx->mp_sbuf.size > 0) {
+ /*
+ * If the filter will report a new set of records because the
+ * original data was modified, we make a copy to a new memory
+ * area, since the buffer might be invalidated in the filter
+ * chain.
+ */
+
+ tmp_buf = flb_malloc(ctx->mp_sbuf.size);
+ if (!tmp_buf) {
+ flb_errno();
+ return FLB_FILTER_NOTOUCH;
+ }
+ tmp_size = ctx->mp_sbuf.size;
+ memcpy(tmp_buf, ctx->mp_sbuf.data, tmp_size);
+ *out_buf = tmp_buf;
+ *out_bytes = tmp_size;
+ ctx->mp_sbuf.size = 0;
+
+ return FLB_FILTER_MODIFIED;
+ }
+
+ /* unlikely to happen.. but just in case */
+ return FLB_FILTER_NOTOUCH;
+
+ } else { /* buffered mode */
+ stream = get_or_create_stream(ctx, i_ins, tag, tag_len);
+
+ if (!stream) {
+ flb_plg_error(ctx->ins, "Could not find or create ML stream for %s", tag);
+ return FLB_FILTER_NOTOUCH;
+ }
+
+ /* process records */
+ flb_log_event_decoder_init(&decoder, (char *) data, bytes);
+
+ while (flb_log_event_decoder_next(&decoder, &event) ==
+ FLB_EVENT_DECODER_SUCCESS) {
+ ret = flb_ml_append_event(ctx->m, stream->stream_id, &event);
+
+ if (ret != 0) {
+ flb_plg_debug(ctx->ins,
+ "could not append object from tag: %s", tag);
+ }
+ }
+
+ flb_log_event_decoder_destroy(&decoder);
+
+ /*
+ * always returned modified, which will be 0 records, since the emitter takes
+ * all records.
+ */
+ return FLB_FILTER_MODIFIED;
+ }
+}
+
+static int cb_ml_exit(void *data, struct flb_config *config)
+{
+ struct ml_ctx *ctx = data;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct ml_stream *stream;
+
+ if (!ctx) {
+ return 0;
+ }
+
+ if (ctx->m) {
+ flb_ml_destroy(ctx->m);
+ }
+
+ mk_list_foreach_safe(head, tmp, &ctx->ml_streams) {
+ stream = mk_list_entry(head, struct ml_stream, _head);
+ mk_list_del(&stream->_head);
+ ml_stream_destroy(stream);
+ }
+
+ msgpack_sbuffer_destroy(&ctx->mp_sbuf);
+ flb_free(ctx);
+
+ return 0;
+}
+
+/* Configuration properties map */
+static struct flb_config_map config_map[] = {
+ {
+ FLB_CONFIG_MAP_BOOL, "debug_flush", "false",
+ 0, FLB_TRUE, offsetof(struct ml_ctx, debug_flush),
+ "enable debugging for concatenation flush to stdout"
+ },
+
+ {
+ FLB_CONFIG_MAP_BOOL, "buffer", "true",
+ 0, FLB_TRUE, offsetof(struct ml_ctx, use_buffer),
+ "Enable buffered mode. In buffered mode, the filter can concatenate "
+ "multilines from inputs that ingest records one by one (ex: Forward), "
+ "rather than in chunks, re-emitting them into the beggining of the "
+ "pipeline using the in_emitter instance. "
+ "With buffer off, this filter will not work with most inputs, except tail."
+ },
+
+ {
+ FLB_CONFIG_MAP_STR, "mode", "parser",
+ 0, FLB_TRUE, offsetof(struct ml_ctx, mode),
+ "Mode can be 'parser' for regex concat, or 'partial_message' to "
+ "concat split docker logs."
+ },
+
+ {
+ FLB_CONFIG_MAP_INT, "flush_ms", "2000",
+ 0, FLB_TRUE, offsetof(struct ml_ctx, flush_ms),
+ "Flush time for pending multiline records"
+ },
+
+ /* Multiline Core Engine based API */
+ {
+ FLB_CONFIG_MAP_CLIST, "multiline.parser", NULL,
+ FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct ml_ctx, multiline_parsers),
+ "specify one or multiple multiline parsers: docker, cri, go, java, etc."
+ },
+
+ {
+ FLB_CONFIG_MAP_STR, "multiline.key_content", NULL,
+ 0, FLB_TRUE, offsetof(struct ml_ctx, key_content),
+ "specify the key name that holds the content to process."
+ },
+
+ /* emitter config */
+ {
+ FLB_CONFIG_MAP_STR, "emitter_name", NULL,
+ FLB_FALSE, FLB_TRUE, offsetof(struct ml_ctx, emitter_name),
+ NULL
+ },
+ {
+ FLB_CONFIG_MAP_STR, "emitter_storage.type", "memory",
+ FLB_FALSE, FLB_TRUE, offsetof(struct ml_ctx, emitter_storage_type),
+ NULL
+ },
+ {
+ FLB_CONFIG_MAP_SIZE, "emitter_mem_buf_limit", FLB_MULTILINE_MEM_BUF_LIMIT_DEFAULT,
+ FLB_FALSE, FLB_TRUE, offsetof(struct ml_ctx, emitter_mem_buf_limit),
+ "set a memory buffer limit to restrict memory usage of emitter"
+ },
+
+ /* EOF */
+ {0}
+};
+
+struct flb_filter_plugin filter_multiline_plugin = {
+ .name = "multiline",
+ .description = "Concatenate multiline messages",
+ .cb_init = cb_ml_init,
+ .cb_filter = cb_ml_filter,
+ .cb_exit = cb_ml_exit,
+ .config_map = config_map,
+ .flags = 0
+};