diff options
Diffstat (limited to 'fluent-bit/src/aws/compression')
-rw-r--r-- | fluent-bit/src/aws/compression/CMakeLists.txt | 6 | ||||
-rw-r--r-- | fluent-bit/src/aws/compression/arrow/CMakeLists.txt | 7 | ||||
-rw-r--r-- | fluent-bit/src/aws/compression/arrow/compress.c | 147 | ||||
-rw-r--r-- | fluent-bit/src/aws/compression/arrow/compress.h | 13 |
4 files changed, 173 insertions, 0 deletions
diff --git a/fluent-bit/src/aws/compression/CMakeLists.txt b/fluent-bit/src/aws/compression/CMakeLists.txt new file mode 100644 index 00000000..02a1ba3a --- /dev/null +++ b/fluent-bit/src/aws/compression/CMakeLists.txt @@ -0,0 +1,6 @@ +add_library(flb-aws-compress INTERFACE) + +if(FLB_ARROW) + add_subdirectory(arrow EXCLUDE_FROM_ALL) + target_link_libraries(flb-aws-compress INTERFACE flb-aws-arrow) +endif() diff --git a/fluent-bit/src/aws/compression/arrow/CMakeLists.txt b/fluent-bit/src/aws/compression/arrow/CMakeLists.txt new file mode 100644 index 00000000..846f6544 --- /dev/null +++ b/fluent-bit/src/aws/compression/arrow/CMakeLists.txt @@ -0,0 +1,7 @@ +set(src + compress.c) + +add_library(flb-aws-arrow STATIC ${src}) + +target_include_directories(flb-aws-arrow PRIVATE ${ARROW_GLIB_INCLUDE_DIRS}) +target_link_libraries(flb-aws-arrow ${ARROW_GLIB_LDFLAGS}) diff --git a/fluent-bit/src/aws/compression/arrow/compress.c b/fluent-bit/src/aws/compression/arrow/compress.c new file mode 100644 index 00000000..a48b34f8 --- /dev/null +++ b/fluent-bit/src/aws/compression/arrow/compress.c @@ -0,0 +1,147 @@ +/* + * This converts S3 plugin's request buffer into Apache Arrow format. + * + * We use GLib binding to call Arrow functions (which is implemented + * in C++) from Fluent Bit. + * + * https://github.com/apache/arrow/tree/master/c_glib + */ + +#include <arrow-glib/arrow-glib.h> +#include <inttypes.h> + +/* + * GArrowTable is the central structure that represents "table" (a.k.a. + * data frame). + */ +static GArrowTable* parse_json(uint8_t *json, int size) +{ + GArrowJSONReader *reader; + GArrowBuffer *buffer; + GArrowBufferInputStream *input; + GArrowJSONReadOptions *options; + GArrowTable *table; + GError *error = NULL; + + buffer = garrow_buffer_new(json, size); + if (buffer == NULL) { + return NULL; + } + + input = garrow_buffer_input_stream_new(buffer); + if (input == NULL) { + g_object_unref(buffer); + return NULL; + } + + options = garrow_json_read_options_new(); + if (options == NULL) { + g_object_unref(buffer); + g_object_unref(input); + return NULL; + } + + reader = garrow_json_reader_new(GARROW_INPUT_STREAM(input), options, &error); + if (reader == NULL) { + g_error_free(error); + g_object_unref(buffer); + g_object_unref(input); + g_object_unref(options); + return NULL; + } + + table = garrow_json_reader_read(reader, &error); + if (table == NULL) { + g_error_free(error); + g_object_unref(buffer); + g_object_unref(input); + g_object_unref(options); + g_object_unref(reader); + return NULL; + } + g_object_unref(buffer); + g_object_unref(input); + g_object_unref(options); + g_object_unref(reader); + return table; +} + +static GArrowResizableBuffer* table_to_buffer(GArrowTable *table) +{ + GArrowResizableBuffer *buffer; + GArrowBufferOutputStream *sink; + GError *error = NULL; + gboolean success; + + buffer = garrow_resizable_buffer_new(0, &error); + if (buffer == NULL) { + g_error_free(error); + return NULL; + } + + sink = garrow_buffer_output_stream_new(buffer); + if (sink == NULL) { + g_object_unref(buffer); + return NULL; + } + + success = garrow_table_write_as_feather( + table, GARROW_OUTPUT_STREAM(sink), + NULL, &error); + if (!success) { + g_error_free(error); + g_object_unref(buffer); + g_object_unref(sink); + return NULL; + } + g_object_unref(sink); + return buffer; +} + +int out_s3_compress_arrow(void *json, size_t size, void **out_buf, size_t *out_size) +{ + GArrowTable *table; + GArrowResizableBuffer *buffer; + GBytes *bytes; + gconstpointer ptr; + gsize len; + uint8_t *buf; + + table = parse_json((uint8_t *) json, size); + if (table == NULL) { + return -1; + } + + buffer = table_to_buffer(table); + g_object_unref(table); + if (buffer == NULL) { + return -1; + } + + bytes = garrow_buffer_get_data(GARROW_BUFFER(buffer)); + if (bytes == NULL) { + g_object_unref(buffer); + return -1; + } + + ptr = g_bytes_get_data(bytes, &len); + if (ptr == NULL) { + g_object_unref(buffer); + g_bytes_unref(bytes); + return -1; + } + + buf = malloc(len); + if (buf == NULL) { + g_object_unref(buffer); + g_bytes_unref(bytes); + return -1; + } + memcpy(buf, ptr, len); + *out_buf = (void *) buf; + *out_size = len; + + g_object_unref(buffer); + g_bytes_unref(bytes); + return 0; +} diff --git a/fluent-bit/src/aws/compression/arrow/compress.h b/fluent-bit/src/aws/compression/arrow/compress.h new file mode 100644 index 00000000..82e94f43 --- /dev/null +++ b/fluent-bit/src/aws/compression/arrow/compress.h @@ -0,0 +1,13 @@ +/* + * This function converts out_s3 buffer into Apache Arrow format. + * + * `json` is a string that contain (concatenated) JSON objects. + * + * `size` is the length of the json data (excluding the trailing + * null-terminator character). + * + * Return 0 on success (with `out_buf` and `out_size` updated), + * and -1 on failure + */ + +int out_s3_compress_arrow(void *json, size_t size, void **out_buf, size_t *out_size); |