summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/flb_log_event_decoder.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/src/flb_log_event_decoder.c')
-rw-r--r--fluent-bit/src/flb_log_event_decoder.c383
1 files changed, 383 insertions, 0 deletions
diff --git a/fluent-bit/src/flb_log_event_decoder.c b/fluent-bit/src/flb_log_event_decoder.c
new file mode 100644
index 000000000..00a778e3d
--- /dev/null
+++ b/fluent-bit/src/flb_log_event_decoder.c
@@ -0,0 +1,383 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_log_event_decoder.h>
+#include <fluent-bit/flb_byteswap.h>
+
+static int create_empty_map(struct flb_log_event_decoder *context) {
+ msgpack_packer packer;
+ msgpack_sbuffer buffer;
+ int result;
+ size_t offset;
+
+ result = FLB_EVENT_DECODER_SUCCESS;
+
+ context->empty_map = NULL;
+
+ msgpack_sbuffer_init(&buffer);
+ msgpack_packer_init(&packer, &buffer, msgpack_sbuffer_write);
+
+ result = msgpack_pack_map(&packer, 0);
+
+ if (result != 0) {
+ result = FLB_EVENT_DECODER_ERROR_INITIALIZATION_FAILURE;
+ }
+ else {
+ offset = 0;
+
+ msgpack_unpacked_init(&context->unpacked_empty_map);
+
+ result = msgpack_unpack_next(&context->unpacked_empty_map,
+ buffer.data,
+ buffer.size,
+ &offset);
+
+ if (result != MSGPACK_UNPACK_SUCCESS) {
+ result = FLB_EVENT_DECODER_ERROR_INITIALIZATION_FAILURE;
+ }
+ else {
+ context->empty_map = &context->unpacked_empty_map.data;
+
+ result = FLB_EVENT_DECODER_SUCCESS;
+ }
+ }
+
+ msgpack_sbuffer_destroy(&buffer);
+
+ return result;
+}
+
+void flb_log_event_decoder_reset(struct flb_log_event_decoder *context,
+ char *input_buffer,
+ size_t input_length)
+{
+ context->offset = 0;
+ context->buffer = input_buffer;
+ context->length = input_length;
+ context->last_result = FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA;
+
+ msgpack_unpacked_destroy(&context->unpacked_event);
+ msgpack_unpacked_init(&context->unpacked_event);
+
+}
+
+int flb_log_event_decoder_init(struct flb_log_event_decoder *context,
+ char *input_buffer,
+ size_t input_length)
+{
+ if (context == NULL) {
+ return FLB_EVENT_DECODER_ERROR_INVALID_CONTEXT;
+ }
+
+ memset(context, 0, sizeof(struct flb_log_event_decoder));
+
+ context->dynamically_allocated = FLB_FALSE;
+ context->initialized = FLB_TRUE;
+
+ flb_log_event_decoder_reset(context, input_buffer, input_length);
+
+ return create_empty_map(context);
+}
+
+struct flb_log_event_decoder *flb_log_event_decoder_create(
+ char *input_buffer,
+ size_t input_length)
+{
+ struct flb_log_event_decoder *context;
+ int result;
+
+ context = (struct flb_log_event_decoder *) \
+ flb_calloc(1, sizeof(struct flb_log_event_decoder));
+
+ result = flb_log_event_decoder_init(context,
+ input_buffer,
+ input_length);
+
+ if (context != NULL) {
+ context->dynamically_allocated = FLB_TRUE;
+
+ if (result != FLB_EVENT_DECODER_SUCCESS) {
+ flb_log_event_decoder_destroy(context);
+
+ context = NULL;
+ }
+ }
+
+ return context;
+}
+
+void flb_log_event_decoder_destroy(struct flb_log_event_decoder *context)
+{
+ int dynamically_allocated;
+
+ if (context != NULL) {
+ if (context->initialized) {
+ msgpack_unpacked_destroy(&context->unpacked_empty_map);
+ msgpack_unpacked_destroy(&context->unpacked_event);
+ }
+
+ dynamically_allocated = context->dynamically_allocated;
+
+ memset(context, 0, sizeof(struct flb_log_event_decoder));
+
+ /* This might look silly and with most of the codebase including
+ * this module as context it might be but just in case we choose
+ * to stray away from the assumption of FLB_FALSE being zero and
+ * FLB_TRUE being one in favor of explicitly comparing variables to
+ * the the constants I will leave this here.
+ */
+ context->initialized = FLB_FALSE;
+
+ if (dynamically_allocated) {
+ flb_free(context);
+ }
+ }
+}
+
+int flb_log_event_decoder_decode_timestamp(msgpack_object *input,
+ struct flb_time *output)
+{
+ flb_time_zero(output);
+
+ if (input->type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
+ output->tm.tv_sec = input->via.u64;
+ }
+ else if(input->type == MSGPACK_OBJECT_FLOAT) {
+ output->tm.tv_sec = input->via.f64;
+ output->tm.tv_nsec = ((input->via.f64 - output->tm.tv_sec) * 1000000000);
+ }
+ else if(input->type == MSGPACK_OBJECT_EXT) {
+ if (input->via.ext.type != 0 || input->via.ext.size != 8) {
+ return FLB_EVENT_DECODER_ERROR_WRONG_TIMESTAMP_TYPE;
+ }
+
+ output->tm.tv_sec = FLB_BSWAP_32(*((uint32_t *) &input->via.ext.ptr[0]));
+ output->tm.tv_nsec = FLB_BSWAP_32(*((uint32_t *) &input->via.ext.ptr[4]));
+ }
+ else {
+ return FLB_EVENT_DECODER_ERROR_WRONG_TIMESTAMP_TYPE;
+ }
+
+ return FLB_EVENT_DECODER_SUCCESS;
+}
+
+int flb_event_decoder_decode_object(struct flb_log_event_decoder *context,
+ struct flb_log_event *event,
+ msgpack_object *input)
+{
+ msgpack_object *timestamp;
+ msgpack_object *metadata;
+ int result;
+ int format;
+ msgpack_object *header;
+ msgpack_object *body;
+ msgpack_object *root;
+
+ memset(event, 0, sizeof(struct flb_log_event));
+
+ /* Ensure that the root element is a 2 element array*/
+ root = input;
+
+ if (root->type != MSGPACK_OBJECT_ARRAY) {
+ return FLB_EVENT_DECODER_ERROR_WRONG_ROOT_TYPE;
+ }
+
+ if (root->via.array.size != \
+ FLB_LOG_EVENT_EXPECTED_ROOT_ELEMENT_COUNT) {
+ return FLB_EVENT_DECODER_ERROR_WRONG_ROOT_SIZE;
+ }
+
+ header = &root->via.array.ptr[0];
+
+ /* Determine if the first element is the header or
+ * a legacy timestamp (int, float or ext).
+ */
+ if (header->type == MSGPACK_OBJECT_ARRAY) {
+ if (header->via.array.size != \
+ FLB_LOG_EVENT_EXPECTED_HEADER_ELEMENT_COUNT) {
+ return FLB_EVENT_DECODER_ERROR_WRONG_HEADER_SIZE;
+ }
+
+ timestamp = &header->via.array.ptr[0];
+ metadata = &header->via.array.ptr[1];
+
+ format = FLB_LOG_EVENT_FORMAT_FLUENT_BIT_V2;
+ }
+ else {
+ header = NULL;
+ timestamp = &root->via.array.ptr[0];
+ metadata = context->empty_map;
+
+ format = FLB_LOG_EVENT_FORMAT_FORWARD;
+ }
+
+ if (timestamp->type != MSGPACK_OBJECT_POSITIVE_INTEGER &&
+ timestamp->type != MSGPACK_OBJECT_FLOAT &&
+ timestamp->type != MSGPACK_OBJECT_EXT) {
+ return FLB_EVENT_DECODER_ERROR_WRONG_TIMESTAMP_TYPE;
+ }
+
+ if (metadata->type != MSGPACK_OBJECT_MAP) {
+ return FLB_EVENT_DECODER_ERROR_WRONG_METADATA_TYPE;
+ }
+
+ body = &root->via.array.ptr[1];
+
+ if (body->type != MSGPACK_OBJECT_MAP) {
+ return FLB_EVENT_DECODER_ERROR_WRONG_BODY_TYPE;
+ }
+
+ result = flb_log_event_decoder_decode_timestamp(timestamp, &event->timestamp);
+
+ if (result != FLB_EVENT_DECODER_SUCCESS) {
+ return result;
+ }
+
+ event->raw_timestamp = timestamp;
+ event->metadata = metadata;
+ event->format = format;
+ event->body = body;
+ event->root = root;
+
+ context->record_base = \
+ (const char *) &context->buffer[context->previous_offset];
+ context->record_length = context->offset - context->previous_offset;
+
+ return FLB_EVENT_DECODER_SUCCESS;
+}
+
+int flb_log_event_decoder_get_last_result(struct flb_log_event_decoder *context)
+{
+ if (context->last_result == FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA &&
+ context->offset == context->length) {
+ context->last_result = FLB_EVENT_DECODER_SUCCESS;
+ }
+
+ return context->last_result;
+}
+
+int flb_log_event_decoder_next(struct flb_log_event_decoder *context,
+ struct flb_log_event *event)
+{
+ size_t previous_offset;
+ int result;
+
+ if (context == NULL) {
+ return FLB_EVENT_DECODER_ERROR_INVALID_CONTEXT;
+ }
+ if (context->length == 0) {
+ context->last_result = FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA;
+ return context->last_result;
+ }
+
+ context->record_base = NULL;
+ context->record_length = 0;
+
+ if (event == NULL) {
+ context->last_result = FLB_EVENT_DECODER_ERROR_INVALID_ARGUMENT;
+ return context->last_result;
+ }
+
+ memset(event, 0, sizeof(struct flb_log_event));
+
+ previous_offset = context->offset;
+
+ result = msgpack_unpack_next(&context->unpacked_event,
+ context->buffer,
+ context->length,
+ &context->offset);
+
+ if (result == MSGPACK_UNPACK_CONTINUE) {
+ context->last_result = FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA;
+ return context->last_result;
+ }
+ else if (result != MSGPACK_UNPACK_SUCCESS) {
+ context->last_result = FLB_EVENT_DECODER_ERROR_DESERIALIZATION_FAILURE;
+ return context->last_result;
+ }
+
+ context->previous_offset = previous_offset;
+ context->last_result = flb_event_decoder_decode_object(context,
+ event,
+ &context->unpacked_event.data);
+ return context->last_result;
+}
+
+const char *flb_log_event_decoder_get_error_description(int error_code)
+{
+ const char *ret;
+
+ switch (error_code) {
+ case FLB_EVENT_DECODER_SUCCESS:
+ ret = "Success";
+ break;
+
+ case FLB_EVENT_DECODER_ERROR_INITIALIZATION_FAILURE:
+ ret = "Initialization failure";
+ break;
+
+ case FLB_EVENT_DECODER_ERROR_INVALID_CONTEXT:
+ ret = "Invalid context";
+ break;
+
+ case FLB_EVENT_DECODER_ERROR_INVALID_ARGUMENT:
+ ret = "Invalid argument";
+ break;
+
+ case FLB_EVENT_DECODER_ERROR_WRONG_ROOT_TYPE:
+ ret = "Wrong root type";
+ break;
+
+ case FLB_EVENT_DECODER_ERROR_WRONG_ROOT_SIZE:
+ ret = "Wrong root size";
+ break;
+
+ case FLB_EVENT_DECODER_ERROR_WRONG_HEADER_TYPE:
+ ret = "Wrong header type";
+ break;
+
+ case FLB_EVENT_DECODER_ERROR_WRONG_HEADER_SIZE:
+ ret = "Wrong header size";
+ break;
+
+ case FLB_EVENT_DECODER_ERROR_WRONG_TIMESTAMP_TYPE:
+ ret = "Wrong timestamp type";
+ break;
+
+ case FLB_EVENT_DECODER_ERROR_WRONG_METADATA_TYPE:
+ ret = "Wrong metadata type";
+ break;
+
+ case FLB_EVENT_DECODER_ERROR_WRONG_BODY_TYPE:
+ ret = "Wrong body type";
+ break;
+
+ case FLB_EVENT_DECODER_ERROR_DESERIALIZATION_FAILURE:
+ ret = "Deserialization failure";
+ break;
+
+ case FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA:
+ ret = "Insufficient data";
+ break;
+
+ default:
+ ret = "Unknown error";
+ }
+ return ret;
+}