diff options
Diffstat (limited to 'fluent-bit/lib/avro/src/codec.c')
-rw-r--r-- | fluent-bit/lib/avro/src/codec.c | 620 |
1 files changed, 620 insertions, 0 deletions
diff --git a/fluent-bit/lib/avro/src/codec.c b/fluent-bit/lib/avro/src/codec.c new file mode 100644 index 000000000..613a91437 --- /dev/null +++ b/fluent-bit/lib/avro/src/codec.c @@ -0,0 +1,620 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +#include <string.h> +#ifdef SNAPPY_CODEC +#include <snappy-c.h> +# if defined(__APPLE__) +# include <libkern/OSByteOrder.h> +# define __bswap_32 OSSwapInt32 +# elif defined(__FreeBSD__) +# include <sys/endian.h> +# define __bswap_32 bswap32 +# elif defined(_WIN32) +# include <stdlib.h> +# define __bswap_32 _byteswap_ulong +# else +# include <byteswap.h> +# endif +#endif +#ifdef DEFLATE_CODEC +#include <zlib.h> +#endif +#ifdef LZMA_CODEC +#include <lzma.h> +#endif +#include "avro/errors.h" +#include "avro/allocation.h" +#include "codec.h" + +#define DEFAULT_BLOCK_SIZE (16 * 1024) + +/* NULL codec */ + +static int +codec_null(avro_codec_t codec) +{ + codec->name = "null"; + codec->type = AVRO_CODEC_NULL; + codec->block_size = 0; + codec->used_size = 0; + codec->block_data = NULL; + codec->codec_data = NULL; + + return 0; +} + +static int encode_null(avro_codec_t c, void * data, int64_t len) +{ + c->block_data = data; + c->block_size = len; + c->used_size = len; + + return 0; +} + +static int decode_null(avro_codec_t c, void * data, int64_t len) +{ + c->block_data = data; + c->block_size = len; + c->used_size = len; + + return 0; +} + +static int reset_null(avro_codec_t c) +{ + c->block_data = NULL; + c->block_size = 0; + c->used_size = 0; + c->codec_data = NULL; + + return 0; +} + +/* Snappy codec */ + +#ifdef SNAPPY_CODEC + +static int +codec_snappy(avro_codec_t codec) +{ + codec->name = "snappy"; + codec->type = AVRO_CODEC_SNAPPY; + codec->block_size = 0; + codec->used_size = 0; + codec->block_data = NULL; + codec->codec_data = NULL; + + return 0; +} + +static int encode_snappy(avro_codec_t c, void * data, int64_t len) +{ + uint32_t crc; + size_t outlen = snappy_max_compressed_length(len); + + if (!c->block_data) { + c->block_data = avro_malloc(outlen+4); + c->block_size = outlen+4; + } else if (c->block_size < (int64_t) (outlen+4)) { + c->block_data = avro_realloc(c->block_data, c->block_size, (outlen+4)); + c->block_size = outlen+4; + } + + if (!c->block_data) { + avro_set_error("Cannot allocate memory for snappy"); + return 1; + } + + if (snappy_compress((const char *)data, len, (char*)c->block_data, &outlen) != SNAPPY_OK) + { + avro_set_error("Error compressing block with Snappy"); + return 1; + } + + crc = __bswap_32(crc32(0, (const Bytef *)data, len)); + memcpy((char*)c->block_data+outlen, &crc, 4); + c->used_size = outlen+4; + + return 0; +} + +static int decode_snappy(avro_codec_t c, void * data, int64_t len) +{ + uint32_t crc; + size_t outlen; + + if (snappy_uncompressed_length((const char*)data, len-4, &outlen) != SNAPPY_OK) { + avro_set_error("Uncompressed length error in snappy"); + return 1; + } + + if (!c->block_data) { + c->block_data = avro_malloc(outlen); + c->block_size = outlen; + } else if ( (size_t)c->block_size < outlen) { + c->block_data = avro_realloc(c->block_data, c->block_size, outlen); + c->block_size = outlen; + } + + if (!c->block_data) + { + avro_set_error("Cannot allocate memory for snappy"); + return 1; + } + + if (snappy_uncompress((const char*)data, len-4, (char*)c->block_data, &outlen) != SNAPPY_OK) + { + avro_set_error("Error uncompressing block with Snappy"); + return 1; + } + + crc = __bswap_32(crc32(0, (const Bytef *)c->block_data, outlen)); + if (memcmp(&crc, (char*)data+len-4, 4)) + { + avro_set_error("CRC32 check failure uncompressing block with Snappy"); + return 1; + } + + c->used_size = outlen; + + return 0; +} + +static int reset_snappy(avro_codec_t c) +{ + if (c->block_data) { + avro_free(c->block_data, c->block_size); + } + + c->block_data = NULL; + c->block_size = 0; + c->used_size = 0; + c->codec_data = NULL; + + return 0; +} + +#endif // SNAPPY_CODEC + +/* Deflate codec */ + +#ifdef DEFLATE_CODEC + +struct codec_data_deflate { + z_stream deflate; + z_stream inflate; +}; +#define codec_data_deflate_stream(cd) &((struct codec_data_deflate *)cd)->deflate +#define codec_data_inflate_stream(cd) &((struct codec_data_deflate *)cd)->inflate + + +static int +codec_deflate(avro_codec_t codec) +{ + codec->name = "deflate"; + codec->type = AVRO_CODEC_DEFLATE; + codec->block_size = 0; + codec->used_size = 0; + codec->block_data = NULL; + codec->codec_data = avro_new(struct codec_data_deflate); + + if (!codec->codec_data) { + avro_set_error("Cannot allocate memory for zlib"); + return 1; + } + + z_stream *ds = codec_data_deflate_stream(codec->codec_data); + z_stream *is = codec_data_inflate_stream(codec->codec_data); + + memset(ds, 0, sizeof(z_stream)); + memset(is, 0, sizeof(z_stream)); + + ds->zalloc = is->zalloc = Z_NULL; + ds->zfree = is->zfree = Z_NULL; + ds->opaque = is->opaque = Z_NULL; + + if (deflateInit2(ds, Z_BEST_COMPRESSION, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY) != Z_OK) { + avro_freet(struct codec_data_deflate, codec->codec_data); + codec->codec_data = NULL; + avro_set_error("Cannot initialize zlib deflate"); + return 1; + } + + if (inflateInit2(is, -15) != Z_OK) { + avro_freet(struct codec_data_deflate, codec->codec_data); + codec->codec_data = NULL; + avro_set_error("Cannot initialize zlib inflate"); + return 1; + } + + return 0; +} + +static int encode_deflate(avro_codec_t c, void * data, int64_t len) +{ + int err; + int64_t defl_len = compressBound((uLong)len * 1.2); + + if (!c->block_data) { + c->block_data = avro_malloc(defl_len); + c->block_size = defl_len; + } else if ( c->block_size < defl_len) { + c->block_data = avro_realloc(c->block_data, c->block_size, defl_len); + c->block_size = defl_len; + } + + if (!c->block_data) + { + avro_set_error("Cannot allocate memory for deflate"); + return 1; + } + + c->used_size = 0; + + z_stream *s = codec_data_deflate_stream(c->codec_data); + + s->next_in = (Bytef*)data; + s->avail_in = (uInt)len; + + s->next_out = c->block_data; + s->avail_out = (uInt)c->block_size; + + s->total_out = 0; + + err = deflate(s, Z_FINISH); + if (err != Z_STREAM_END) { + deflateEnd(s); + if (err != Z_OK) { + avro_set_error("Error compressing block with deflate (%i)", err); + return 1; + } + return 0; + } + + // zlib resizes the buffer? + c->block_size = s->total_out; + c->used_size = s->total_out; + + if (deflateReset(s) != Z_OK) { + return 1; + } + + return 0; +} + +static int decode_deflate(avro_codec_t c, void * data, int64_t len) +{ + int err; + z_stream *s = codec_data_inflate_stream(c->codec_data); + + if (!c->block_data) { + c->block_data = avro_malloc(DEFAULT_BLOCK_SIZE); + c->block_size = DEFAULT_BLOCK_SIZE; + } + + if (!c->block_data) + { + avro_set_error("Cannot allocate memory for deflate"); + return 1; + } + + c->used_size = 0; + + s->next_in = data; + s->avail_in = len; + + s->next_out = c->block_data; + s->avail_out = c->block_size; + + s->total_out = 0; + + do + { + err = inflate(s, Z_FINISH); + + // Apparently if there is yet available space in the output then something + // has gone wrong in decompressing the data (according to cpython zlibmodule.c) + if (err == Z_BUF_ERROR && s->avail_out > 0) { + inflateEnd(s); + avro_set_error("Error decompressing block with deflate, possible data error"); + return 1; + } + + // The buffer was not big enough. resize it. + if (err == Z_BUF_ERROR) + { + c->block_data = avro_realloc(c->block_data, c->block_size, c->block_size * 2); + s->next_out = c->block_data + s->total_out; + s->avail_out += c->block_size; + c->block_size = c->block_size * 2; + } + } while (err == Z_BUF_ERROR); + + if (err != Z_STREAM_END) { + inflateEnd(s); + if (err != Z_OK) { + avro_set_error("Error decompressing block with deflate (%i)", err); + return 1; + } + return 0; + } + + c->used_size = s->total_out; + + if (inflateReset(s) != Z_OK) { + avro_set_error("Error resetting deflate decompression"); + return 1; + } + + return 0; +} + +static int reset_deflate(avro_codec_t c) +{ + if (c->block_data) { + avro_free(c->block_data, c->block_size); + } + if (c->codec_data) { + deflateEnd(codec_data_deflate_stream(c->codec_data)); + inflateEnd(codec_data_inflate_stream(c->codec_data)); + avro_freet(struct codec_data_deflate, c->codec_data); + } + + c->block_data = NULL; + c->block_size = 0; + c->used_size = 0; + c->codec_data = NULL; + + return 0; +} + +#endif // DEFLATE_CODEC + +/* LZMA codec */ + +#ifdef LZMA_CODEC + +struct codec_data_lzma { + lzma_filter filters[2]; + lzma_options_lzma options; +}; +#define codec_data_lzma_filters(cd) ((struct codec_data_lzma *)cd)->filters +#define codec_data_lzma_options(cd) &((struct codec_data_lzma *)cd)->options + +static int +codec_lzma(avro_codec_t codec) +{ + codec->name = "lzma"; + codec->type = AVRO_CODEC_LZMA; + codec->block_size = 0; + codec->used_size = 0; + codec->block_data = NULL; + codec->codec_data = avro_new(struct codec_data_lzma); + + if (!codec->codec_data) { + avro_set_error("Cannot allocate memory for lzma"); + return 1; + } + + lzma_options_lzma* opt = codec_data_lzma_options(codec->codec_data); + lzma_lzma_preset(opt, LZMA_PRESET_DEFAULT); + + lzma_filter* filters = codec_data_lzma_filters(codec->codec_data); + filters[0].id = LZMA_FILTER_LZMA2; + filters[0].options = opt; + filters[1].id = LZMA_VLI_UNKNOWN; + filters[1].options = NULL; + + return 0; +} + +static int encode_lzma(avro_codec_t codec, void * data, int64_t len) +{ + lzma_ret ret; + size_t written = 0; + lzma_filter* filters = codec_data_lzma_filters(codec->codec_data); + + int64_t buff_len = len + lzma_raw_encoder_memusage(filters); + + if (!codec->block_data) { + codec->block_data = avro_malloc(buff_len); + codec->block_size = buff_len; + } + + if (!codec->block_data) + { + avro_set_error("Cannot allocate memory for lzma encoder"); + return 1; + } + + ret = lzma_raw_buffer_encode(filters, NULL, data, len, codec->block_data, &written, codec->block_size); + + codec->used_size = written; + + if (ret != LZMA_OK) { + avro_set_error("Error in lzma encoder"); + return 1; + } + + return 0; +} + +static int decode_lzma(avro_codec_t codec, void * data, int64_t len) +{ + size_t read_pos = 0; + size_t write_pos = 0; + lzma_ret ret; + lzma_filter* filters = codec_data_lzma_filters(codec->codec_data); + + if (!codec->block_data) { + codec->block_data = avro_malloc(DEFAULT_BLOCK_SIZE); + codec->block_size = DEFAULT_BLOCK_SIZE; + } + + if (!codec->block_data) { + avro_set_error("Cannot allocate memory for lzma decoder"); + return 1; + } + + do + { + ret = lzma_raw_buffer_decode(filters, NULL, data, + &read_pos, len, codec->block_data, &write_pos, + codec->block_size); + + codec->used_size = write_pos; + + // If it ran out of space to decode, give it more!! + // It will continue where it left off because of read_pos and write_pos. + if (ret == LZMA_BUF_ERROR) { + codec->block_data = avro_realloc(codec->block_data, codec->block_size, codec->block_size * 2); + codec->block_size = codec->block_size * 2; + } + + } while (ret == LZMA_BUF_ERROR); + + if (ret != LZMA_OK) { + avro_set_error("Error in lzma decoder"); + return 1; + } + + return 0; +} + +static int reset_lzma(avro_codec_t c) +{ + if (c->block_data) { + avro_free(c->block_data, c->block_size); + } + if (c->codec_data) { + avro_freet(struct codec_data_lzma, c->codec_data); + } + + c->block_data = NULL; + c->block_size = 0; + c->used_size = 0; + c->codec_data = NULL; + + return 0; +} + +#endif // LZMA_CODEC + +/* Common interface */ + +int avro_codec(avro_codec_t codec, const char *type) +{ + if (type == NULL) { + return codec_null(codec); + } + +#ifdef SNAPPY_CODEC + if (strcmp("snappy", type) == 0) { + return codec_snappy(codec); + } +#endif + +#ifdef DEFLATE_CODEC + if (strcmp("deflate", type) == 0) { + return codec_deflate(codec); + } +#endif + +#ifdef LZMA_CODEC + if (strcmp("lzma", type) == 0) { + return codec_lzma(codec); + } +#endif + + if (strcmp("null", type) == 0) { + return codec_null(codec); + } + + avro_set_error("Unknown codec %s", type); + return 1; +} + +int avro_codec_encode(avro_codec_t c, void * data, int64_t len) +{ + switch(c->type) + { + case AVRO_CODEC_NULL: + return encode_null(c, data, len); +#ifdef SNAPPY_CODEC + case AVRO_CODEC_SNAPPY: + return encode_snappy(c, data, len); +#endif +#ifdef DEFLATE_CODEC + case AVRO_CODEC_DEFLATE: + return encode_deflate(c, data, len); +#endif +#ifdef LZMA_CODEC + case AVRO_CODEC_LZMA: + return encode_lzma(c, data, len); +#endif + default: + return 1; + } +} + +int avro_codec_decode(avro_codec_t c, void * data, int64_t len) +{ + switch(c->type) + { + case AVRO_CODEC_NULL: + return decode_null(c, data, len); +#ifdef SNAPPY_CODEC + case AVRO_CODEC_SNAPPY: + return decode_snappy(c, data, len); +#endif +#ifdef DEFLATE_CODEC + case AVRO_CODEC_DEFLATE: + return decode_deflate(c, data, len); +#endif +#ifdef LZMA_CODEC + case AVRO_CODEC_LZMA: + return decode_lzma(c, data, len); +#endif + default: + return 1; + } +} + +int avro_codec_reset(avro_codec_t c) +{ + switch(c->type) + { + case AVRO_CODEC_NULL: + return reset_null(c); +#ifdef SNAPPY_CODEC + case AVRO_CODEC_SNAPPY: + return reset_snappy(c); +#endif +#ifdef DEFLATE_CODEC + case AVRO_CODEC_DEFLATE: + return reset_deflate(c); +#endif +#ifdef LZMA_CODEC + case AVRO_CODEC_LZMA: + return reset_lzma(c); +#endif + default: + return 1; + } +} |