diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
commit | be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 (patch) | |
tree | 9754ff1ca740f6346cf8483ec915d4054bc5da2d /fluent-bit/src/flb_chunk_trace.c | |
parent | Initial commit. (diff) | |
download | netdata-be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97.tar.xz netdata-be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97.zip |
Adding upstream version 1.44.3.upstream/1.44.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/src/flb_chunk_trace.c')
-rw-r--r-- | fluent-bit/src/flb_chunk_trace.c | 692 |
1 files changed, 692 insertions, 0 deletions
diff --git a/fluent-bit/src/flb_chunk_trace.c b/fluent-bit/src/flb_chunk_trace.c new file mode 100644 index 00000000..adacb73f --- /dev/null +++ b/fluent-bit/src/flb_chunk_trace.c @@ -0,0 +1,692 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fcntl.h> + +#include <msgpack.h> +#include <chunkio/chunkio.h> + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_input_chunk.h> +#include <fluent-bit/flb_input.h> +#include <fluent-bit/flb_output.h> +#include <fluent-bit/flb_chunk_trace.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_base64.h> +#include <fluent-bit/flb_storage.h> +#include <fluent-bit/flb_router.h> +#include <fluent-bit/flb_kv.h> + + +/* Register external function to emit records, check 'plugins/in_emitter' */ +int in_emitter_add_record(const char *tag, int tag_len, + const char *buf_data, size_t buf_size, + struct flb_input_instance *in); + +/****************************************************************************/ +/* To avoid double frees when enabling and disabling tracing as well */ +/* as avoiding race conditions when stopping fluent-bit while someone is */ +/* toggling tracing via the HTTP API this set of APIS with a mutex lock */ +/* is used: */ +/* * flb_chunk_trace_to_be_destroyed - query to see if the trace context */ +/* is slated to be freed */ +/* * flb_chunk_trace_set_destroy - set the trace context to be destroyed */ +/* once all chunks are freed (executed in flb_chunk_trace_destroy). */ +/* * flb_chunk_trace_has_chunks - see if there are still chunks using */ +/* using the tracing context */ +/* * flb_chunk_trace_add - increment the traces chunk count */ +/* * flb_chunk_trace_sub - decrement the traces chunk count */ +/****************************************************************************/ +static inline int flb_chunk_trace_to_be_destroyed(struct flb_chunk_trace_context *ctxt) +{ + int ret = FLB_FALSE; + + ret = (ctxt->to_destroy == 1 ? FLB_TRUE : FLB_FALSE); + return ret; +} + +static inline int flb_chunk_trace_has_chunks(struct flb_chunk_trace_context *ctxt) +{ + int ret = FLB_FALSE; + + ret = ((ctxt->chunks > 0) ? FLB_TRUE : FLB_FALSE); + return ret; +} + +static inline void flb_chunk_trace_add(struct flb_chunk_trace_context *ctxt) +{ + ctxt->chunks++; +} + +static inline void flb_chunk_trace_sub(struct flb_chunk_trace_context *ctxt) +{ + ctxt->chunks--; +} + +static inline void flb_chunk_trace_set_destroy(struct flb_chunk_trace_context *ctxt) +{ + ctxt->to_destroy = 1; +} + +static struct flb_output_instance *find_calyptia_output_instance(struct flb_config *config) +{ + struct mk_list *head = NULL; + struct flb_output_instance *output = NULL; + + mk_list_foreach(head, &config->outputs) { + output = mk_list_entry(head, struct flb_output_instance, _head); + if (strcmp(output->p->name, "calyptia") == 0) { + return output; + } + } + return NULL; +} + +static void trace_chunk_context_destroy(struct flb_chunk_trace_context *ctxt) +{ + int i; + + + if (flb_chunk_trace_has_chunks(ctxt) == FLB_TRUE) { + flb_chunk_trace_set_destroy(ctxt); + flb_input_pause_all(ctxt->flb->config); + return; + } + + /* pause all inputs, then destroy the input storage. */ + flb_input_pause_all(ctxt->flb->config); + /* waiting for all tasks to end is key to safely stopping and destroying */ + /* the fluent-bit pipeline. */ + for (i = 0; i < 5 && flb_task_running_count(ctxt->flb->config) > 0; i++) { + usleep(10 * 1000); + } + + flb_sds_destroy(ctxt->trace_prefix); + flb_stop(ctxt->flb); + flb_destroy(ctxt->flb); + flb_free(ctxt); +} + +void flb_chunk_trace_context_destroy(void *input) +{ + struct flb_input_instance *in = (struct flb_input_instance *)input; + pthread_mutex_lock(&in->chunk_trace_lock); + if (in->chunk_trace_ctxt != NULL) { + trace_chunk_context_destroy(in->chunk_trace_ctxt); + in->chunk_trace_ctxt = NULL; + } + pthread_mutex_unlock(&in->chunk_trace_lock); +} + +struct flb_chunk_trace_context *flb_chunk_trace_context_new(void *trace_input, + const char *output_name, + const char *trace_prefix, + void *data, struct mk_list *props) +{ + struct flb_input_instance *in = (struct flb_input_instance *)trace_input; + struct flb_config *config = in->config; + struct flb_input_instance *input = NULL; + struct flb_output_instance *output = NULL; + struct flb_output_instance *calyptia = NULL; + struct flb_chunk_trace_context *ctx = NULL; + struct mk_list *head = NULL; + struct flb_kv *prop = NULL; + int ret; + + if (config->enable_chunk_trace == FLB_FALSE) { + flb_warn("[chunk trace] enable chunk tracing via the configuration or " + " command line to be able to activate tracing."); + return NULL; + } + + pthread_mutex_lock(&in->chunk_trace_lock); + + if (in->chunk_trace_ctxt) { + trace_chunk_context_destroy(in->chunk_trace_ctxt); + } + + ctx = flb_calloc(1, sizeof(struct flb_chunk_trace_context)); + if (ctx == NULL) { + flb_errno(); + pthread_mutex_unlock(&in->chunk_trace_lock); + return NULL; + } + + ctx->flb = flb_create(); + if (ctx->flb == NULL) { + flb_errno(); + goto error_ctxt; + } + + flb_service_set(ctx->flb, "flush", "1", "grace", "1", NULL); + + input = (void *)flb_input_new(ctx->flb->config, "emitter", NULL, FLB_FALSE); + if (input == NULL) { + flb_error("could not load trace emitter"); + goto error_flb; + } + + ret = flb_input_set_property(input, "alias", "trace-emitter"); + if (ret != 0) { + flb_error("unable to set alias for trace emitter"); + goto error_input; + } + + ret = flb_input_set_property(input, "ring_buffer_size", "4096"); + if (ret != 0) { + flb_error("unable to set ring buffer size for trace emitter"); + goto error_input; + } + + output = flb_output_new(ctx->flb->config, output_name, data, 1); + if (output == NULL) { + flb_error("could not create trace output"); + goto error_input; + } + + /* special handling for the calyptia plugin so we can copy the API */ + /* key and other configuration properties. */ + if (strcmp(output_name, "calyptia") == 0) { + calyptia = find_calyptia_output_instance(config); + if (calyptia == NULL) { + flb_error("unable to find calyptia output instance"); + goto error_output; + } + mk_list_foreach(head, &calyptia->properties) { + prop = mk_list_entry(head, struct flb_kv, _head); + flb_output_set_property(output, prop->key, prop->val); + } + } + else if (props != NULL) { + mk_list_foreach(head, props) { + prop = mk_list_entry(head, struct flb_kv, _head); + flb_output_set_property(output, prop->key, prop->val); + } + } + + ret = flb_router_connect_direct(input, output); + if (ret != 0) { + flb_error("unable to route traces"); + goto error_output; + } + + ctx->output = (void *)output; + ctx->input = (void *)input; + ctx->trace_prefix = flb_sds_create(trace_prefix); + + flb_start_trace(ctx->flb); + + in->chunk_trace_ctxt = ctx; + pthread_mutex_unlock(&in->chunk_trace_lock); + return ctx; + +error_output: + flb_output_instance_destroy(output); +error_input: + if (ctx->cio) { + cio_destroy(ctx->cio); + } + flb_input_instance_destroy(input); +error_flb: + flb_destroy(ctx->flb); +error_ctxt: + flb_free(ctx); + pthread_mutex_unlock(&in->chunk_trace_lock); + return NULL; +} + +struct flb_chunk_trace *flb_chunk_trace_new(struct flb_input_chunk *chunk) +{ + struct flb_chunk_trace *trace = NULL; + struct flb_input_instance *f_ins = (struct flb_input_instance *)chunk->in; + + pthread_mutex_lock(&f_ins->chunk_trace_lock); + + if (flb_chunk_trace_to_be_destroyed(f_ins->chunk_trace_ctxt) == FLB_TRUE) { + pthread_mutex_unlock(&f_ins->chunk_trace_lock); + return NULL; + } + + trace = flb_calloc(1, sizeof(struct flb_chunk_trace)); + if (trace == NULL) { + flb_errno(); + pthread_mutex_unlock(&f_ins->chunk_trace_lock); + return NULL; + } + + trace->ctxt = f_ins->chunk_trace_ctxt; + flb_chunk_trace_add(trace->ctxt); + + trace->trace_id = flb_sds_create(""); + if (flb_sds_printf(&trace->trace_id, "%s%d", trace->ctxt->trace_prefix, + trace->ctxt->trace_count++) == NULL) { + pthread_mutex_unlock(&f_ins->chunk_trace_lock); + flb_sds_destroy(trace->trace_id); + flb_free(trace); + return NULL; + } + + trace->ic = chunk; + + pthread_mutex_unlock(&f_ins->chunk_trace_lock); + return trace; +} + +void flb_chunk_trace_destroy(struct flb_chunk_trace *trace) +{ + pthread_mutex_lock(&trace->ic->in->chunk_trace_lock); + flb_chunk_trace_sub(trace->ctxt); + + /* check to see if we need to free the trace context. */ + if (flb_chunk_trace_has_chunks(trace->ctxt) == FLB_FALSE && + flb_chunk_trace_to_be_destroyed(trace->ctxt) == FLB_TRUE) { + trace_chunk_context_destroy(trace->ctxt); + } + else if (flb_chunk_trace_has_chunks(trace->ctxt) == FLB_TRUE && + flb_chunk_trace_to_be_destroyed(trace->ctxt) == FLB_TRUE) { + } + pthread_mutex_unlock(&trace->ic->in->chunk_trace_lock); + + flb_sds_destroy(trace->trace_id); + flb_free(trace); +} + +int flb_chunk_trace_context_set_limit(void *input, int limit_type, int limit_arg) +{ + struct flb_input_instance *in = (struct flb_input_instance *)input; + struct flb_chunk_trace_context *ctxt = NULL; + struct flb_time tm; + + pthread_mutex_lock(&in->chunk_trace_lock); + + ctxt = in->chunk_trace_ctxt; + if (ctxt == NULL) { + pthread_mutex_unlock(&in->chunk_trace_lock); + return -1; + } + + switch(limit_type) { + case FLB_CHUNK_TRACE_LIMIT_TIME: + flb_time_get(&tm); + ctxt->limit.type = FLB_CHUNK_TRACE_LIMIT_TIME; + ctxt->limit.seconds_started = tm.tm.tv_sec; + ctxt->limit.seconds = limit_arg; + + pthread_mutex_unlock(&in->chunk_trace_lock); + return 0; + case FLB_CHUNK_TRACE_LIMIT_COUNT: + ctxt->limit.type = FLB_CHUNK_TRACE_LIMIT_COUNT; + ctxt->limit.count = limit_arg; + + pthread_mutex_unlock(&in->chunk_trace_lock); + return 0; + } + + pthread_mutex_unlock(&in->chunk_trace_lock); + return -1; +} + +int flb_chunk_trace_context_hit_limit(void *input) +{ + struct flb_input_instance *in = (struct flb_input_instance *)input; + struct flb_time tm; + struct flb_chunk_trace_context *ctxt = NULL; + + pthread_mutex_lock(&in->chunk_trace_lock); + + ctxt = in->chunk_trace_ctxt; + if (ctxt == NULL) { + pthread_mutex_unlock(&in->chunk_trace_lock); + return FLB_FALSE; + } + + switch(ctxt->limit.type) { + case FLB_CHUNK_TRACE_LIMIT_TIME: + flb_time_get(&tm); + if ((tm.tm.tv_sec - ctxt->limit.seconds_started) > ctxt->limit.seconds) { + pthread_mutex_unlock(&in->chunk_trace_lock); + return FLB_TRUE; + } + return FLB_FALSE; + case FLB_CHUNK_TRACE_LIMIT_COUNT: + if (ctxt->limit.count <= ctxt->trace_count) { + pthread_mutex_unlock(&in->chunk_trace_lock); + return FLB_TRUE; + } + pthread_mutex_unlock(&in->chunk_trace_lock); + return FLB_FALSE; + } + pthread_mutex_unlock(&in->chunk_trace_lock); + return FLB_FALSE; +} + +void flb_chunk_trace_do_input(struct flb_input_chunk *ic) +{ + pthread_mutex_lock(&ic->in->chunk_trace_lock); + if (ic->in->chunk_trace_ctxt == NULL) { + pthread_mutex_unlock(&ic->in->chunk_trace_lock); + return; + } + pthread_mutex_unlock(&ic->in->chunk_trace_lock); + + if (ic->trace == NULL) { + ic->trace = flb_chunk_trace_new(ic); + } + + if (ic->trace) { + flb_chunk_trace_input(ic->trace); + if (flb_chunk_trace_context_hit_limit(ic->in) == FLB_TRUE) { + flb_chunk_trace_context_destroy(ic->in); + } + } +} + +int flb_chunk_trace_input(struct flb_chunk_trace *trace) +{ + msgpack_packer mp_pck; + msgpack_sbuffer mp_sbuf; + msgpack_unpacked result; + msgpack_object *record = NULL; + char *buf = NULL; + size_t buf_size; + struct flb_time tm; + struct flb_time tm_end; + struct flb_input_instance *input = (struct flb_input_instance *)trace->ic->in; + int rc = -1; + size_t off = 0; + flb_sds_t tag = flb_sds_create("trace"); + int records = 0; + + + /* initiailize start time */ + flb_time_get(&tm); + flb_time_get(&tm_end); + + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + msgpack_unpacked_init(&result); + + cio_chunk_get_content(trace->ic->chunk, &buf, &buf_size); + + msgpack_pack_array(&mp_pck, 2); + flb_pack_time_now(&mp_pck); + if (input->alias != NULL) { + msgpack_pack_map(&mp_pck, 7); + } + else { + msgpack_pack_map(&mp_pck, 6); + } + + msgpack_pack_str_with_body(&mp_pck, "type", 4); + msgpack_pack_int(&mp_pck, FLB_CHUNK_TRACE_TYPE_INPUT); + + msgpack_pack_str_with_body(&mp_pck, "trace_id", strlen("trace_id")); + msgpack_pack_str_with_body(&mp_pck, trace->trace_id, strlen(trace->trace_id)); + + msgpack_pack_str_with_body(&mp_pck, "plugin_instance", strlen("plugin_instance")); + msgpack_pack_str_with_body(&mp_pck, input->name, strlen(input->name)); + + if (input->alias != NULL) { + msgpack_pack_str_with_body(&mp_pck, "plugin_alias", strlen("plugin_alias")); + msgpack_pack_str_with_body(&mp_pck, input->alias, strlen(input->alias)); + } + + msgpack_pack_str_with_body(&mp_pck, "records", strlen("records")); + + if (buf_size > 0) { + do { + rc = msgpack_unpack_next(&result, buf, buf_size, &off); + if (rc != MSGPACK_UNPACK_SUCCESS) { + flb_error("unable to unpack record"); + goto sbuffer_error; + } + records++; + } while (rc == MSGPACK_UNPACK_SUCCESS && off < buf_size); + + msgpack_pack_array(&mp_pck, records); + + off = 0; + do { + rc = msgpack_unpack_next(&result, buf, buf_size, &off); + if (rc != MSGPACK_UNPACK_SUCCESS) { + flb_error("unable to unpack record"); + goto sbuffer_error; + } + flb_time_pop_from_msgpack(&tm, &result, &record); + + msgpack_pack_map(&mp_pck, 2); + msgpack_pack_str_with_body(&mp_pck, "timestamp", strlen("timestamp")); + flb_time_append_to_msgpack(&tm, &mp_pck, FLB_TIME_ETFMT_INT); + msgpack_pack_str_with_body(&mp_pck, "record", strlen("record")); + msgpack_pack_object(&mp_pck, *record); + + } while (rc == MSGPACK_UNPACK_SUCCESS && off < buf_size); + } + + msgpack_pack_str_with_body(&mp_pck, "start_time", strlen("start_time")); + flb_time_append_to_msgpack(&tm, &mp_pck, FLB_TIME_ETFMT_INT); + msgpack_pack_str_with_body(&mp_pck, "end_time", strlen("end_time")); + flb_time_append_to_msgpack(&tm_end, &mp_pck, FLB_TIME_ETFMT_INT); + in_emitter_add_record(tag, flb_sds_len(tag), mp_sbuf.data, mp_sbuf.size, + trace->ctxt->input); +sbuffer_error: + flb_sds_destroy(tag); + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_destroy(&mp_sbuf); + return rc; +} + +int flb_chunk_trace_pre_output(struct flb_chunk_trace *trace) +{ + msgpack_packer mp_pck; + msgpack_sbuffer mp_sbuf; + msgpack_unpacked result; + msgpack_object *record = NULL; + char *buf = NULL; + size_t buf_size; + struct flb_time tm; + struct flb_time tm_end; + struct flb_input_instance *input = (struct flb_input_instance *)trace->ic->in; + int rc = -1; + size_t off = 0; + flb_sds_t tag = flb_sds_create("trace"); + int records = 0; + + + /* initiailize start time */ + flb_time_get(&tm); + flb_time_get(&tm_end); + + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + msgpack_unpacked_init(&result); + + cio_chunk_get_content(trace->ic->chunk, &buf, &buf_size); + + msgpack_pack_array(&mp_pck, 2); + flb_pack_time_now(&mp_pck); + if (input->alias != NULL) { + msgpack_pack_map(&mp_pck, 7); + } + else { + msgpack_pack_map(&mp_pck, 6); + } + + msgpack_pack_str_with_body(&mp_pck, "type", 4); + msgpack_pack_int(&mp_pck, FLB_CHUNK_TRACE_TYPE_PRE_OUTPUT); + + msgpack_pack_str_with_body(&mp_pck, "trace_id", strlen("trace_id")); + msgpack_pack_str_with_body(&mp_pck, trace->trace_id, strlen(trace->trace_id)); + + msgpack_pack_str_with_body(&mp_pck, "plugin_instance", strlen("plugin_instance")); + msgpack_pack_str_with_body(&mp_pck, input->name, strlen(input->name)); + + if (input->alias != NULL) { + msgpack_pack_str_with_body(&mp_pck, "plugin_alias", strlen("plugin_alias")); + msgpack_pack_str_with_body(&mp_pck, input->alias, strlen(input->alias)); + } + + msgpack_pack_str_with_body(&mp_pck, "records", strlen("records")); + + if (buf_size > 0) { + do { + rc = msgpack_unpack_next(&result, buf, buf_size, &off); + if (rc != MSGPACK_UNPACK_SUCCESS) { + flb_error("unable to unpack record"); + goto sbuffer_error; + } + records++; + } while (rc == MSGPACK_UNPACK_SUCCESS && off < buf_size); + + msgpack_pack_array(&mp_pck, records); + off = 0; + do { + rc = msgpack_unpack_next(&result, buf, buf_size, &off); + if (rc != MSGPACK_UNPACK_SUCCESS) { + flb_error("unable to unpack record"); + goto sbuffer_error; + } + flb_time_pop_from_msgpack(&tm, &result, &record); + + msgpack_pack_map(&mp_pck, 2); + msgpack_pack_str_with_body(&mp_pck, "timestamp", strlen("timestamp")); + flb_time_append_to_msgpack(&tm, &mp_pck, FLB_TIME_ETFMT_INT); + msgpack_pack_str_with_body(&mp_pck, "record", strlen("record")); + msgpack_pack_object(&mp_pck, *record); + + } while (rc == MSGPACK_UNPACK_SUCCESS && off < buf_size); + } + + msgpack_pack_str_with_body(&mp_pck, "start_time", strlen("start_time")); + flb_time_append_to_msgpack(&tm, &mp_pck, FLB_TIME_ETFMT_INT); + msgpack_pack_str_with_body(&mp_pck, "end_time", strlen("end_time")); + flb_time_append_to_msgpack(&tm_end, &mp_pck, FLB_TIME_ETFMT_INT); + in_emitter_add_record(tag, flb_sds_len(tag), mp_sbuf.data, mp_sbuf.size, + trace->ctxt->input); +sbuffer_error: + flb_sds_destroy(tag); + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_destroy(&mp_sbuf); + return rc; +} + +int flb_chunk_trace_filter(struct flb_chunk_trace *tracer, void *pfilter, struct flb_time *tm_start, struct flb_time *tm_end, char *buf, size_t buf_size) +{ + msgpack_packer mp_pck; + msgpack_sbuffer mp_sbuf; + msgpack_unpacked result; + msgpack_object *record = NULL; + int rc = -1; + struct flb_filter_instance *filter = (struct flb_filter_instance *)pfilter; + flb_sds_t tag = flb_sds_create("trace"); + struct flb_time tm; + size_t off = 0; + int records = 0; + + + if (tracer == NULL) { + goto tracer_error; + } + + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + msgpack_pack_array(&mp_pck, 2); + flb_pack_time_now(&mp_pck); + if (filter->alias == NULL) { + msgpack_pack_map(&mp_pck, 6); + } + else { + msgpack_pack_map(&mp_pck, 7); + } + + msgpack_pack_str_with_body(&mp_pck, "type", strlen("type")); + rc = msgpack_pack_int(&mp_pck, FLB_CHUNK_TRACE_TYPE_FILTER); + if (rc == -1) { + goto sbuffer_error; + } + + msgpack_pack_str_with_body(&mp_pck, "start_time", strlen("start_time")); + //msgpack_pack_double(&mp_pck, flb_time_to_double(tm_start)); + flb_time_append_to_msgpack(tm_start, &mp_pck, FLB_TIME_ETFMT_INT); + msgpack_pack_str_with_body(&mp_pck, "end_time", strlen("end_time")); + //msgpack_pack_double(&mp_pck, flb_time_to_double(tm_end)); + flb_time_append_to_msgpack(tm_end, &mp_pck, FLB_TIME_ETFMT_INT); + + msgpack_pack_str_with_body(&mp_pck, "trace_id", strlen("trace_id")); + msgpack_pack_str_with_body(&mp_pck, tracer->trace_id, strlen(tracer->trace_id)); + + + msgpack_pack_str_with_body(&mp_pck, "plugin_instance", strlen("plugin_instance")); + rc = msgpack_pack_str_with_body(&mp_pck, filter->name, strlen(filter->name)); + if (rc == -1) { + goto sbuffer_error; + } + + if (filter->alias != NULL) { + msgpack_pack_str_with_body(&mp_pck, "plugin_alias", strlen("plugin_alias")); + msgpack_pack_str_with_body(&mp_pck, filter->alias, strlen(filter->alias)); + } + + msgpack_pack_str_with_body(&mp_pck, "records", strlen("records")); + + msgpack_unpacked_init(&result); + + if (buf_size > 0) { + do { + rc = msgpack_unpack_next(&result, buf, buf_size, &off); + if (rc != MSGPACK_UNPACK_SUCCESS) { + flb_error("unable to unpack record"); + goto unpack_error; + } + records++; + } while (rc == MSGPACK_UNPACK_SUCCESS && off < buf_size); + + msgpack_pack_array(&mp_pck, records); + off = 0; + do { + rc = msgpack_unpack_next(&result, buf, buf_size, &off); + if (rc != MSGPACK_UNPACK_SUCCESS) { + flb_error("unable to unpack record"); + goto unpack_error; + } + flb_time_pop_from_msgpack(&tm, &result, &record); + + msgpack_pack_map(&mp_pck, 2); + msgpack_pack_str_with_body(&mp_pck, "timestamp", strlen("timestamp")); + flb_time_append_to_msgpack(&tm, &mp_pck, FLB_TIME_ETFMT_INT); + msgpack_pack_str_with_body(&mp_pck, "record", strlen("record")); + msgpack_pack_object(&mp_pck, *record); + + } while (rc == MSGPACK_UNPACK_SUCCESS && off < buf_size); + } + + in_emitter_add_record(tag, flb_sds_len(tag), mp_sbuf.data, mp_sbuf.size, + tracer->ctxt->input); + + rc = 0; + +unpack_error: + msgpack_unpacked_destroy(&result); +sbuffer_error: + msgpack_sbuffer_destroy(&mp_sbuf); +tracer_error: + flb_sds_destroy(tag); + return rc; +} |