summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/avro/src/codec.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/avro/src/codec.c')
-rw-r--r--fluent-bit/lib/avro/src/codec.c620
1 files changed, 0 insertions, 620 deletions
diff --git a/fluent-bit/lib/avro/src/codec.c b/fluent-bit/lib/avro/src/codec.c
deleted file mode 100644
index 613a91437..000000000
--- a/fluent-bit/lib/avro/src/codec.c
+++ /dev/null
@@ -1,620 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-#include <string.h>
-#ifdef SNAPPY_CODEC
-#include <snappy-c.h>
-# if defined(__APPLE__)
-# include <libkern/OSByteOrder.h>
-# define __bswap_32 OSSwapInt32
-# elif defined(__FreeBSD__)
-# include <sys/endian.h>
-# define __bswap_32 bswap32
-# elif defined(_WIN32)
-# include <stdlib.h>
-# define __bswap_32 _byteswap_ulong
-# else
-# include <byteswap.h>
-# endif
-#endif
-#ifdef DEFLATE_CODEC
-#include <zlib.h>
-#endif
-#ifdef LZMA_CODEC
-#include <lzma.h>
-#endif
-#include "avro/errors.h"
-#include "avro/allocation.h"
-#include "codec.h"
-
-#define DEFAULT_BLOCK_SIZE (16 * 1024)
-
-/* NULL codec */
-
-static int
-codec_null(avro_codec_t codec)
-{
- codec->name = "null";
- codec->type = AVRO_CODEC_NULL;
- codec->block_size = 0;
- codec->used_size = 0;
- codec->block_data = NULL;
- codec->codec_data = NULL;
-
- return 0;
-}
-
-static int encode_null(avro_codec_t c, void * data, int64_t len)
-{
- c->block_data = data;
- c->block_size = len;
- c->used_size = len;
-
- return 0;
-}
-
-static int decode_null(avro_codec_t c, void * data, int64_t len)
-{
- c->block_data = data;
- c->block_size = len;
- c->used_size = len;
-
- return 0;
-}
-
-static int reset_null(avro_codec_t c)
-{
- c->block_data = NULL;
- c->block_size = 0;
- c->used_size = 0;
- c->codec_data = NULL;
-
- return 0;
-}
-
-/* Snappy codec */
-
-#ifdef SNAPPY_CODEC
-
-static int
-codec_snappy(avro_codec_t codec)
-{
- codec->name = "snappy";
- codec->type = AVRO_CODEC_SNAPPY;
- codec->block_size = 0;
- codec->used_size = 0;
- codec->block_data = NULL;
- codec->codec_data = NULL;
-
- return 0;
-}
-
-static int encode_snappy(avro_codec_t c, void * data, int64_t len)
-{
- uint32_t crc;
- size_t outlen = snappy_max_compressed_length(len);
-
- if (!c->block_data) {
- c->block_data = avro_malloc(outlen+4);
- c->block_size = outlen+4;
- } else if (c->block_size < (int64_t) (outlen+4)) {
- c->block_data = avro_realloc(c->block_data, c->block_size, (outlen+4));
- c->block_size = outlen+4;
- }
-
- if (!c->block_data) {
- avro_set_error("Cannot allocate memory for snappy");
- return 1;
- }
-
- if (snappy_compress((const char *)data, len, (char*)c->block_data, &outlen) != SNAPPY_OK)
- {
- avro_set_error("Error compressing block with Snappy");
- return 1;
- }
-
- crc = __bswap_32(crc32(0, (const Bytef *)data, len));
- memcpy((char*)c->block_data+outlen, &crc, 4);
- c->used_size = outlen+4;
-
- return 0;
-}
-
-static int decode_snappy(avro_codec_t c, void * data, int64_t len)
-{
- uint32_t crc;
- size_t outlen;
-
- if (snappy_uncompressed_length((const char*)data, len-4, &outlen) != SNAPPY_OK) {
- avro_set_error("Uncompressed length error in snappy");
- return 1;
- }
-
- if (!c->block_data) {
- c->block_data = avro_malloc(outlen);
- c->block_size = outlen;
- } else if ( (size_t)c->block_size < outlen) {
- c->block_data = avro_realloc(c->block_data, c->block_size, outlen);
- c->block_size = outlen;
- }
-
- if (!c->block_data)
- {
- avro_set_error("Cannot allocate memory for snappy");
- return 1;
- }
-
- if (snappy_uncompress((const char*)data, len-4, (char*)c->block_data, &outlen) != SNAPPY_OK)
- {
- avro_set_error("Error uncompressing block with Snappy");
- return 1;
- }
-
- crc = __bswap_32(crc32(0, (const Bytef *)c->block_data, outlen));
- if (memcmp(&crc, (char*)data+len-4, 4))
- {
- avro_set_error("CRC32 check failure uncompressing block with Snappy");
- return 1;
- }
-
- c->used_size = outlen;
-
- return 0;
-}
-
-static int reset_snappy(avro_codec_t c)
-{
- if (c->block_data) {
- avro_free(c->block_data, c->block_size);
- }
-
- c->block_data = NULL;
- c->block_size = 0;
- c->used_size = 0;
- c->codec_data = NULL;
-
- return 0;
-}
-
-#endif // SNAPPY_CODEC
-
-/* Deflate codec */
-
-#ifdef DEFLATE_CODEC
-
-struct codec_data_deflate {
- z_stream deflate;
- z_stream inflate;
-};
-#define codec_data_deflate_stream(cd) &((struct codec_data_deflate *)cd)->deflate
-#define codec_data_inflate_stream(cd) &((struct codec_data_deflate *)cd)->inflate
-
-
-static int
-codec_deflate(avro_codec_t codec)
-{
- codec->name = "deflate";
- codec->type = AVRO_CODEC_DEFLATE;
- codec->block_size = 0;
- codec->used_size = 0;
- codec->block_data = NULL;
- codec->codec_data = avro_new(struct codec_data_deflate);
-
- if (!codec->codec_data) {
- avro_set_error("Cannot allocate memory for zlib");
- return 1;
- }
-
- z_stream *ds = codec_data_deflate_stream(codec->codec_data);
- z_stream *is = codec_data_inflate_stream(codec->codec_data);
-
- memset(ds, 0, sizeof(z_stream));
- memset(is, 0, sizeof(z_stream));
-
- ds->zalloc = is->zalloc = Z_NULL;
- ds->zfree = is->zfree = Z_NULL;
- ds->opaque = is->opaque = Z_NULL;
-
- if (deflateInit2(ds, Z_BEST_COMPRESSION, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY) != Z_OK) {
- avro_freet(struct codec_data_deflate, codec->codec_data);
- codec->codec_data = NULL;
- avro_set_error("Cannot initialize zlib deflate");
- return 1;
- }
-
- if (inflateInit2(is, -15) != Z_OK) {
- avro_freet(struct codec_data_deflate, codec->codec_data);
- codec->codec_data = NULL;
- avro_set_error("Cannot initialize zlib inflate");
- return 1;
- }
-
- return 0;
-}
-
-static int encode_deflate(avro_codec_t c, void * data, int64_t len)
-{
- int err;
- int64_t defl_len = compressBound((uLong)len * 1.2);
-
- if (!c->block_data) {
- c->block_data = avro_malloc(defl_len);
- c->block_size = defl_len;
- } else if ( c->block_size < defl_len) {
- c->block_data = avro_realloc(c->block_data, c->block_size, defl_len);
- c->block_size = defl_len;
- }
-
- if (!c->block_data)
- {
- avro_set_error("Cannot allocate memory for deflate");
- return 1;
- }
-
- c->used_size = 0;
-
- z_stream *s = codec_data_deflate_stream(c->codec_data);
-
- s->next_in = (Bytef*)data;
- s->avail_in = (uInt)len;
-
- s->next_out = c->block_data;
- s->avail_out = (uInt)c->block_size;
-
- s->total_out = 0;
-
- err = deflate(s, Z_FINISH);
- if (err != Z_STREAM_END) {
- deflateEnd(s);
- if (err != Z_OK) {
- avro_set_error("Error compressing block with deflate (%i)", err);
- return 1;
- }
- return 0;
- }
-
- // zlib resizes the buffer?
- c->block_size = s->total_out;
- c->used_size = s->total_out;
-
- if (deflateReset(s) != Z_OK) {
- return 1;
- }
-
- return 0;
-}
-
-static int decode_deflate(avro_codec_t c, void * data, int64_t len)
-{
- int err;
- z_stream *s = codec_data_inflate_stream(c->codec_data);
-
- if (!c->block_data) {
- c->block_data = avro_malloc(DEFAULT_BLOCK_SIZE);
- c->block_size = DEFAULT_BLOCK_SIZE;
- }
-
- if (!c->block_data)
- {
- avro_set_error("Cannot allocate memory for deflate");
- return 1;
- }
-
- c->used_size = 0;
-
- s->next_in = data;
- s->avail_in = len;
-
- s->next_out = c->block_data;
- s->avail_out = c->block_size;
-
- s->total_out = 0;
-
- do
- {
- err = inflate(s, Z_FINISH);
-
- // Apparently if there is yet available space in the output then something
- // has gone wrong in decompressing the data (according to cpython zlibmodule.c)
- if (err == Z_BUF_ERROR && s->avail_out > 0) {
- inflateEnd(s);
- avro_set_error("Error decompressing block with deflate, possible data error");
- return 1;
- }
-
- // The buffer was not big enough. resize it.
- if (err == Z_BUF_ERROR)
- {
- c->block_data = avro_realloc(c->block_data, c->block_size, c->block_size * 2);
- s->next_out = c->block_data + s->total_out;
- s->avail_out += c->block_size;
- c->block_size = c->block_size * 2;
- }
- } while (err == Z_BUF_ERROR);
-
- if (err != Z_STREAM_END) {
- inflateEnd(s);
- if (err != Z_OK) {
- avro_set_error("Error decompressing block with deflate (%i)", err);
- return 1;
- }
- return 0;
- }
-
- c->used_size = s->total_out;
-
- if (inflateReset(s) != Z_OK) {
- avro_set_error("Error resetting deflate decompression");
- return 1;
- }
-
- return 0;
-}
-
-static int reset_deflate(avro_codec_t c)
-{
- if (c->block_data) {
- avro_free(c->block_data, c->block_size);
- }
- if (c->codec_data) {
- deflateEnd(codec_data_deflate_stream(c->codec_data));
- inflateEnd(codec_data_inflate_stream(c->codec_data));
- avro_freet(struct codec_data_deflate, c->codec_data);
- }
-
- c->block_data = NULL;
- c->block_size = 0;
- c->used_size = 0;
- c->codec_data = NULL;
-
- return 0;
-}
-
-#endif // DEFLATE_CODEC
-
-/* LZMA codec */
-
-#ifdef LZMA_CODEC
-
-struct codec_data_lzma {
- lzma_filter filters[2];
- lzma_options_lzma options;
-};
-#define codec_data_lzma_filters(cd) ((struct codec_data_lzma *)cd)->filters
-#define codec_data_lzma_options(cd) &((struct codec_data_lzma *)cd)->options
-
-static int
-codec_lzma(avro_codec_t codec)
-{
- codec->name = "lzma";
- codec->type = AVRO_CODEC_LZMA;
- codec->block_size = 0;
- codec->used_size = 0;
- codec->block_data = NULL;
- codec->codec_data = avro_new(struct codec_data_lzma);
-
- if (!codec->codec_data) {
- avro_set_error("Cannot allocate memory for lzma");
- return 1;
- }
-
- lzma_options_lzma* opt = codec_data_lzma_options(codec->codec_data);
- lzma_lzma_preset(opt, LZMA_PRESET_DEFAULT);
-
- lzma_filter* filters = codec_data_lzma_filters(codec->codec_data);
- filters[0].id = LZMA_FILTER_LZMA2;
- filters[0].options = opt;
- filters[1].id = LZMA_VLI_UNKNOWN;
- filters[1].options = NULL;
-
- return 0;
-}
-
-static int encode_lzma(avro_codec_t codec, void * data, int64_t len)
-{
- lzma_ret ret;
- size_t written = 0;
- lzma_filter* filters = codec_data_lzma_filters(codec->codec_data);
-
- int64_t buff_len = len + lzma_raw_encoder_memusage(filters);
-
- if (!codec->block_data) {
- codec->block_data = avro_malloc(buff_len);
- codec->block_size = buff_len;
- }
-
- if (!codec->block_data)
- {
- avro_set_error("Cannot allocate memory for lzma encoder");
- return 1;
- }
-
- ret = lzma_raw_buffer_encode(filters, NULL, data, len, codec->block_data, &written, codec->block_size);
-
- codec->used_size = written;
-
- if (ret != LZMA_OK) {
- avro_set_error("Error in lzma encoder");
- return 1;
- }
-
- return 0;
-}
-
-static int decode_lzma(avro_codec_t codec, void * data, int64_t len)
-{
- size_t read_pos = 0;
- size_t write_pos = 0;
- lzma_ret ret;
- lzma_filter* filters = codec_data_lzma_filters(codec->codec_data);
-
- if (!codec->block_data) {
- codec->block_data = avro_malloc(DEFAULT_BLOCK_SIZE);
- codec->block_size = DEFAULT_BLOCK_SIZE;
- }
-
- if (!codec->block_data) {
- avro_set_error("Cannot allocate memory for lzma decoder");
- return 1;
- }
-
- do
- {
- ret = lzma_raw_buffer_decode(filters, NULL, data,
- &read_pos, len, codec->block_data, &write_pos,
- codec->block_size);
-
- codec->used_size = write_pos;
-
- // If it ran out of space to decode, give it more!!
- // It will continue where it left off because of read_pos and write_pos.
- if (ret == LZMA_BUF_ERROR) {
- codec->block_data = avro_realloc(codec->block_data, codec->block_size, codec->block_size * 2);
- codec->block_size = codec->block_size * 2;
- }
-
- } while (ret == LZMA_BUF_ERROR);
-
- if (ret != LZMA_OK) {
- avro_set_error("Error in lzma decoder");
- return 1;
- }
-
- return 0;
-}
-
-static int reset_lzma(avro_codec_t c)
-{
- if (c->block_data) {
- avro_free(c->block_data, c->block_size);
- }
- if (c->codec_data) {
- avro_freet(struct codec_data_lzma, c->codec_data);
- }
-
- c->block_data = NULL;
- c->block_size = 0;
- c->used_size = 0;
- c->codec_data = NULL;
-
- return 0;
-}
-
-#endif // LZMA_CODEC
-
-/* Common interface */
-
-int avro_codec(avro_codec_t codec, const char *type)
-{
- if (type == NULL) {
- return codec_null(codec);
- }
-
-#ifdef SNAPPY_CODEC
- if (strcmp("snappy", type) == 0) {
- return codec_snappy(codec);
- }
-#endif
-
-#ifdef DEFLATE_CODEC
- if (strcmp("deflate", type) == 0) {
- return codec_deflate(codec);
- }
-#endif
-
-#ifdef LZMA_CODEC
- if (strcmp("lzma", type) == 0) {
- return codec_lzma(codec);
- }
-#endif
-
- if (strcmp("null", type) == 0) {
- return codec_null(codec);
- }
-
- avro_set_error("Unknown codec %s", type);
- return 1;
-}
-
-int avro_codec_encode(avro_codec_t c, void * data, int64_t len)
-{
- switch(c->type)
- {
- case AVRO_CODEC_NULL:
- return encode_null(c, data, len);
-#ifdef SNAPPY_CODEC
- case AVRO_CODEC_SNAPPY:
- return encode_snappy(c, data, len);
-#endif
-#ifdef DEFLATE_CODEC
- case AVRO_CODEC_DEFLATE:
- return encode_deflate(c, data, len);
-#endif
-#ifdef LZMA_CODEC
- case AVRO_CODEC_LZMA:
- return encode_lzma(c, data, len);
-#endif
- default:
- return 1;
- }
-}
-
-int avro_codec_decode(avro_codec_t c, void * data, int64_t len)
-{
- switch(c->type)
- {
- case AVRO_CODEC_NULL:
- return decode_null(c, data, len);
-#ifdef SNAPPY_CODEC
- case AVRO_CODEC_SNAPPY:
- return decode_snappy(c, data, len);
-#endif
-#ifdef DEFLATE_CODEC
- case AVRO_CODEC_DEFLATE:
- return decode_deflate(c, data, len);
-#endif
-#ifdef LZMA_CODEC
- case AVRO_CODEC_LZMA:
- return decode_lzma(c, data, len);
-#endif
- default:
- return 1;
- }
-}
-
-int avro_codec_reset(avro_codec_t c)
-{
- switch(c->type)
- {
- case AVRO_CODEC_NULL:
- return reset_null(c);
-#ifdef SNAPPY_CODEC
- case AVRO_CODEC_SNAPPY:
- return reset_snappy(c);
-#endif
-#ifdef DEFLATE_CODEC
- case AVRO_CODEC_DEFLATE:
- return reset_deflate(c);
-#endif
-#ifdef LZMA_CODEC
- case AVRO_CODEC_LZMA:
- return reset_lzma(c);
-#endif
- default:
- return 1;
- }
-}