diff options
Diffstat (limited to 'fluent-bit/src/flb_snappy.c')
-rw-r--r-- | fluent-bit/src/flb_snappy.c | 344 |
1 files changed, 344 insertions, 0 deletions
diff --git a/fluent-bit/src/flb_snappy.c b/fluent-bit/src/flb_snappy.c new file mode 100644 index 000000000..865e18399 --- /dev/null +++ b/fluent-bit/src/flb_snappy.c @@ -0,0 +1,344 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/flb_log.h> + +#include <cfl/cfl.h> +#include <cfl/cfl_list.h> +#include <cfl/cfl_checksum.h> + +#include <fluent-bit/flb_snappy.h> + +#include <snappy.h> + +int flb_snappy_compress(char *in_data, size_t in_len, + char **out_data, size_t *out_len) +{ + struct snappy_env snappy_env; + char *tmp_data; + size_t tmp_len; + int result; + + tmp_len = snappy_max_compressed_length(in_len); + + tmp_data = flb_malloc(tmp_len); + + if (tmp_data == NULL) { + flb_errno(); + + return -1; + } + + result = snappy_init_env(&snappy_env); + + if (result != 0) { + flb_free(tmp_data); + + return -2; + } + + result = snappy_compress(&snappy_env, in_data, in_len, tmp_data, &tmp_len); + + if (result != 0) { + flb_free(tmp_data); + + return -3; + } + + snappy_free_env(&snappy_env); + + *out_data = tmp_data; + *out_len = tmp_len; + + return 0; +} + +int flb_snappy_uncompress(char *in_data, size_t in_len, + char **out_data, size_t *out_len) +{ + char *tmp_data; + size_t tmp_len; + int result; + + result = snappy_uncompressed_length(in_data, in_len, &tmp_len); + + if (result == 0) { + return -1; + } + + tmp_data = flb_malloc(tmp_len); + + if (tmp_data == NULL) { + flb_errno(); + + return -2; + } + + result = snappy_uncompress(in_data, in_len, tmp_data); + + if (result != 0) { + flb_free(tmp_data); + + return -3; + } + + *out_data = tmp_data; + *out_len = tmp_len; + + return 0; +} + +static uint32_t calculate_checksum(char *buffer, size_t length) +{ + uint32_t checksum; + + checksum = cfl_checksum_crc32c((unsigned char *) buffer, length); + + return ((checksum >> 15) | + (checksum << 17)) + 0xa282ead8; +} + +int flb_snappy_uncompress_framed_data(char *in_data, size_t in_len, + char **out_data, size_t *out_len) +{ + uint32_t decompressed_data_checksum; + size_t stream_identifier_length; + size_t uncompressed_chunk_count; + int stream_identifier_found; + char *aggregated_data_buffer; + size_t aggregated_data_length = 0; + size_t aggregated_data_offset; + size_t compressed_chunk_count; + struct cfl_list *iterator_backup; + uint32_t frame_checksum; + char *frame_buffer; + size_t frame_length; + char *frame_body; + unsigned char frame_type; + struct cfl_list *iterator; + int result; + size_t offset; + struct cfl_list chunks; + struct flb_snappy_data_chunk *chunk; + + if (*((uint8_t *) in_data) != FLB_SNAPPY_FRAME_TYPE_STREAM_IDENTIFIER) { + return flb_snappy_uncompress(in_data, in_len, out_data, out_len); + } + + if (out_data == NULL) { + return -1; + } + + if (out_len == NULL) { + return -1; + } + + *out_data = NULL; + *out_len = 0; + + cfl_list_init(&chunks); + + compressed_chunk_count = 0; + uncompressed_chunk_count = 0; + + stream_identifier_found = FLB_FALSE; + stream_identifier_length = strlen(FLB_SNAPPY_STREAM_IDENTIFIER_STRING); + + result = 0; + offset = 0; + + while (offset < in_len && result == 0) { + frame_buffer = &in_data[offset]; + + frame_type = *((uint8_t *) &frame_buffer[0]); + + frame_length = *((uint32_t *) &frame_buffer[1]); + frame_length &= 0x00FFFFFF; + + frame_body = &frame_buffer[4]; + + if (frame_length > FLB_SNAPPY_FRAME_SIZE_LIMIT) { + result = -2; + } + else if (frame_type == FLB_SNAPPY_FRAME_TYPE_STREAM_IDENTIFIER) { + if (!stream_identifier_found) { + if (frame_length == stream_identifier_length) { + result = strncmp(frame_body, + FLB_SNAPPY_STREAM_IDENTIFIER_STRING, + stream_identifier_length); + + if (result == 0) { + stream_identifier_found = FLB_TRUE; + } + } + } + } + else if (frame_type == FLB_SNAPPY_FRAME_TYPE_COMPRESSED_DATA) { + chunk = (struct flb_snappy_data_chunk * ) \ + flb_calloc(1, sizeof(struct flb_snappy_data_chunk)); + + if (chunk != NULL) { + /* We add the chunk to the list now because that way + * even if the process fails we can clean up in a single + * place. + */ + compressed_chunk_count++; + + chunk->dynamically_allocated_buffer = FLB_TRUE; + + cfl_list_add(&chunk->_head, &chunks); + + frame_checksum = *((uint32_t *) &frame_body[0]); + frame_body = &frame_body[4]; + + result = flb_snappy_uncompress( + frame_body, + frame_length - sizeof(uint32_t), + &chunk->buffer, + &chunk->length); + + /* decompressed data */ + if (result == 0) { + decompressed_data_checksum = calculate_checksum( + chunk->buffer, + chunk->length); + + if (decompressed_data_checksum != frame_checksum) { + result = -3; + } + else { + aggregated_data_length += chunk->length; + } + } + else { + result = -4; + } + } + } + else if (frame_type == FLB_SNAPPY_FRAME_TYPE_UNCOMPRESSED_DATA) { + chunk = (struct flb_snappy_data_chunk *) \ + flb_calloc(1, sizeof(struct flb_snappy_data_chunk)); + + if (chunk != NULL) { + /* We add the chunk to the list now because that way + * even if the process fails we can clean up in a single + * place. + */ + uncompressed_chunk_count++; + + chunk->dynamically_allocated_buffer = FLB_FALSE; + + cfl_list_add(&chunk->_head, &chunks); + + frame_checksum = *((uint32_t *) &frame_body[0]); + frame_body = &frame_body[4]; + + chunk->buffer = frame_body; + chunk->length = frame_length - sizeof(uint32_t); + + decompressed_data_checksum = calculate_checksum( + chunk->buffer, + chunk->length); + + if (decompressed_data_checksum != frame_checksum) { + result = -3; + } + else { + aggregated_data_length += chunk->length; + } + } + } + else if (frame_type == FLB_SNAPPY_FRAME_TYPE_PADDING) { + /* We just need to skip these frames */ + } + else if (frame_type >= FLB_SNAPPY_FRAME_TYPE_RESERVED_UNSKIPPABLE_BASE && + frame_type <= FLB_SNAPPY_FRAME_TYPE_RESERVED_UNSKIPPABLE_TOP) { + result = -5; + } + else if (frame_type >= FLB_SNAPPY_FRAME_TYPE_RESERVED_SKIPPABLE_BASE && + frame_type <= FLB_SNAPPY_FRAME_TYPE_RESERVED_SKIPPABLE_TOP) { + /* We just need to skip these frames */ + } + + offset += frame_length + 4; + } + + aggregated_data_buffer = NULL; + aggregated_data_length = 0; + + if (compressed_chunk_count == 1 && + uncompressed_chunk_count == 0 && + result == 0) { + /* This is a "past path" to avoid unnecessarily copying + * data whene the input is only comprised of a single + * compressed chunk. + */ + + chunk = cfl_list_entry_first(&chunks, + struct flb_snappy_data_chunk, _head); + + aggregated_data_buffer = chunk->buffer; + aggregated_data_length = chunk->length; + aggregated_data_offset = aggregated_data_length; + + flb_free(chunk); + } + else { + if (aggregated_data_length > 0) { + aggregated_data_buffer = flb_calloc(aggregated_data_length, + sizeof(char)); + + if (aggregated_data_buffer == NULL) { + result = -6; + } + } + + aggregated_data_offset = 0; + cfl_list_foreach_safe(iterator, iterator_backup, &chunks) { + chunk = cfl_list_entry(iterator, + struct flb_snappy_data_chunk, _head); + + if (chunk->buffer != NULL) { + if (aggregated_data_buffer != NULL && + result == 0) { + memcpy(&aggregated_data_buffer[aggregated_data_offset], + chunk->buffer, + chunk->length); + + aggregated_data_offset += chunk->length; + } + + if (chunk->dynamically_allocated_buffer) { + flb_free(chunk->buffer); + } + } + + cfl_list_del(&chunk->_head); + + flb_free(chunk); + } + } + + *out_data = (char *) aggregated_data_buffer; + *out_len = aggregated_data_offset; + + return result; +} |