summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/avro/src/avropipe.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/avro/src/avropipe.c')
-rw-r--r--fluent-bit/lib/avro/src/avropipe.c432
1 files changed, 432 insertions, 0 deletions
diff --git a/fluent-bit/lib/avro/src/avropipe.c b/fluent-bit/lib/avro/src/avropipe.c
new file mode 100644
index 00000000..7bda1253
--- /dev/null
+++ b/fluent-bit/lib/avro/src/avropipe.c
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+#include <ctype.h>
+#include <errno.h>
+#include <getopt.h>
+#include <avro/platform.h>
+#include <avro/platform.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "avro.h"
+#include "avro_private.h"
+
+
+/* The path separator to use in the JSON output. */
+
+static const char *separator = "/";
+
+
+/*-- PROCESSING A FILE --*/
+
+/**
+ * Fills in a raw string with the path to an element of an array.
+ */
+
+static void
+create_array_prefix(avro_raw_string_t *dest, const char *prefix, size_t index)
+{
+ static char buf[100];
+ snprintf(buf, sizeof(buf), "%" PRIsz, index);
+ avro_raw_string_set(dest, prefix);
+ avro_raw_string_append(dest, separator);
+ avro_raw_string_append(dest, buf);
+}
+
+static void
+create_object_prefix(avro_raw_string_t *dest, const char *prefix, const char *key)
+{
+ /*
+ * Make sure that the key doesn't contain the separator
+ * character.
+ */
+
+ if (strstr(key, separator) != NULL) {
+ fprintf(stderr,
+ "Error: Element \"%s\" in object %s "
+ "contains the separator character.\n"
+ "Please use the --separator option to choose another.\n",
+ key, prefix);
+ exit(1);
+ }
+
+ avro_raw_string_set(dest, prefix);
+ avro_raw_string_append(dest, separator);
+ avro_raw_string_append(dest, key);
+}
+
+static void
+print_bytes_value(const char *buf, size_t size)
+{
+ size_t i;
+ printf("\"");
+ for (i = 0; i < size; i++)
+ {
+ if (buf[i] == '"') {
+ printf("\\\"");
+ } else if (buf[i] == '\\') {
+ printf("\\\\");
+ } else if (buf[i] == '\b') {
+ printf("\\b");
+ } else if (buf[i] == '\f') {
+ printf("\\f");
+ } else if (buf[i] == '\n') {
+ printf("\\n");
+ } else if (buf[i] == '\r') {
+ printf("\\r");
+ } else if (buf[i] == '\t') {
+ printf("\\t");
+ } else if (isprint(buf[i])) {
+ printf("%c", (int) buf[i]);
+ } else {
+ printf("\\u00%02x", (unsigned int) (unsigned char) buf[i]);
+ }
+ }
+ printf("\"");
+}
+
+static void
+process_value(const char *prefix, avro_value_t *value);
+
+static void
+process_array(const char *prefix, avro_value_t *value)
+{
+ printf("%s\t[]\n", prefix);
+ size_t element_count;
+ avro_value_get_size(value, &element_count);
+
+ avro_raw_string_t element_prefix;
+ avro_raw_string_init(&element_prefix);
+
+ size_t i;
+ for (i = 0; i < element_count; i++) {
+ avro_value_t element_value;
+ avro_value_get_by_index(value, i, &element_value, NULL);
+
+ create_array_prefix(&element_prefix, prefix, i);
+ process_value((const char *) avro_raw_string_get(&element_prefix), &element_value);
+ }
+
+ avro_raw_string_done(&element_prefix);
+}
+
+static void
+process_enum(const char *prefix, avro_value_t *value)
+{
+ int val;
+ const char *symbol_name;
+
+ avro_schema_t schema = avro_value_get_schema(value);
+ avro_value_get_enum(value, &val);
+ symbol_name = avro_schema_enum_get(schema, val);
+ printf("%s\t", prefix);
+ print_bytes_value(symbol_name, strlen(symbol_name));
+ printf("\n");
+}
+
+static void
+process_map(const char *prefix, avro_value_t *value)
+{
+ printf("%s\t{}\n", prefix);
+ size_t element_count;
+ avro_value_get_size(value, &element_count);
+
+ avro_raw_string_t element_prefix;
+ avro_raw_string_init(&element_prefix);
+
+ size_t i;
+ for (i = 0; i < element_count; i++) {
+ const char *key;
+ avro_value_t element_value;
+ avro_value_get_by_index(value, i, &element_value, &key);
+
+ create_object_prefix(&element_prefix, prefix, key);
+ process_value((const char *) avro_raw_string_get(&element_prefix), &element_value);
+ }
+
+ avro_raw_string_done(&element_prefix);
+}
+
+static void
+process_record(const char *prefix, avro_value_t *value)
+{
+ printf("%s\t{}\n", prefix);
+ size_t field_count;
+ avro_value_get_size(value, &field_count);
+
+ avro_raw_string_t field_prefix;
+ avro_raw_string_init(&field_prefix);
+
+ size_t i;
+ for (i = 0; i < field_count; i++) {
+ avro_value_t field_value;
+ const char *field_name;
+ avro_value_get_by_index(value, i, &field_value, &field_name);
+
+ create_object_prefix(&field_prefix, prefix, field_name);
+ process_value((const char *) avro_raw_string_get(&field_prefix), &field_value);
+ }
+
+ avro_raw_string_done(&field_prefix);
+}
+
+static void
+process_union(const char *prefix, avro_value_t *value)
+{
+ avro_value_t branch_value;
+ avro_value_get_current_branch(value, &branch_value);
+
+ /* nulls in a union aren't wrapped in a JSON object */
+ if (avro_value_get_type(&branch_value) == AVRO_NULL) {
+ printf("%s\tnull\n", prefix);
+ return;
+ }
+
+ int discriminant;
+ avro_value_get_discriminant(value, &discriminant);
+
+ avro_schema_t schema = avro_value_get_schema(value);
+ avro_schema_t branch_schema = avro_schema_union_branch(schema, discriminant);
+ const char *branch_name = avro_schema_type_name(branch_schema);
+
+ avro_raw_string_t branch_prefix;
+ avro_raw_string_init(&branch_prefix);
+ create_object_prefix(&branch_prefix, prefix, branch_name);
+
+ printf("%s\t{}\n", prefix);
+ process_value((const char *) avro_raw_string_get(&branch_prefix), &branch_value);
+
+ avro_raw_string_done(&branch_prefix);
+}
+
+static void
+process_value(const char *prefix, avro_value_t *value)
+{
+ avro_type_t type = avro_value_get_type(value);
+ switch (type) {
+ case AVRO_BOOLEAN:
+ {
+ int val;
+ avro_value_get_boolean(value, &val);
+ printf("%s\t%s\n", prefix, val? "true": "false");
+ return;
+ }
+
+ case AVRO_BYTES:
+ {
+ const void *buf;
+ size_t size;
+ avro_value_get_bytes(value, &buf, &size);
+ printf("%s\t", prefix);
+ print_bytes_value((const char *) buf, size);
+ printf("\n");
+ return;
+ }
+
+ case AVRO_DOUBLE:
+ {
+ double val;
+ avro_value_get_double(value, &val);
+ printf("%s\t%lf\n", prefix, val);
+ return;
+ }
+
+ case AVRO_FLOAT:
+ {
+ float val;
+ avro_value_get_float(value, &val);
+ printf("%s\t%f\n", prefix, val);
+ return;
+ }
+
+ case AVRO_INT32:
+ {
+ int32_t val;
+ avro_value_get_int(value, &val);
+ printf("%s\t%" PRId32 "\n", prefix, val);
+ return;
+ }
+
+ case AVRO_INT64:
+ {
+ int64_t val;
+ avro_value_get_long(value, &val);
+ printf("%s\t%" PRId64 "\n", prefix, val);
+ return;
+ }
+
+ case AVRO_NULL:
+ {
+ avro_value_get_null(value);
+ printf("%s\tnull\n", prefix);
+ return;
+ }
+
+ case AVRO_STRING:
+ {
+ /* TODO: Convert the UTF-8 to the current
+ * locale's character set */
+ const char *buf;
+ size_t size;
+ avro_value_get_string(value, &buf, &size);
+ printf("%s\t", prefix);
+ /* For strings, size includes the NUL terminator. */
+ print_bytes_value(buf, size-1);
+ printf("\n");
+ return;
+ }
+
+ case AVRO_ARRAY:
+ process_array(prefix, value);
+ return;
+
+ case AVRO_ENUM:
+ process_enum(prefix, value);
+ return;
+
+ case AVRO_FIXED:
+ {
+ const void *buf;
+ size_t size;
+ avro_value_get_fixed(value, &buf, &size);
+ printf("%s\t", prefix);
+ print_bytes_value((const char *) buf, size);
+ printf("\n");
+ return;
+ }
+
+ case AVRO_MAP:
+ process_map(prefix, value);
+ return;
+
+ case AVRO_RECORD:
+ process_record(prefix, value);
+ return;
+
+ case AVRO_UNION:
+ process_union(prefix, value);
+ return;
+
+ default:
+ {
+ fprintf(stderr, "Unknown schema type\n");
+ exit(1);
+ }
+ }
+}
+
+static void
+process_file(const char *filename)
+{
+ avro_file_reader_t reader;
+
+ if (filename == NULL) {
+ if (avro_file_reader_fp(stdin, "<stdin>", 0, &reader)) {
+ fprintf(stderr, "Error opening <stdin>:\n %s\n",
+ avro_strerror());
+ exit(1);
+ }
+ } else {
+ if (avro_file_reader(filename, &reader)) {
+ fprintf(stderr, "Error opening %s:\n %s\n",
+ filename, avro_strerror());
+ exit(1);
+ }
+ }
+
+ /* The JSON root is an array */
+ printf("%s\t[]\n", separator);
+
+ avro_raw_string_t prefix;
+ avro_raw_string_init(&prefix);
+
+ avro_schema_t wschema = avro_file_reader_get_writer_schema(reader);
+ avro_value_iface_t *iface = avro_generic_class_from_schema(wschema);
+ avro_value_t value;
+ avro_generic_value_new(iface, &value);
+
+ size_t record_number = 0;
+ int rval;
+
+ for (; (rval = avro_file_reader_read_value(reader, &value)) == 0; record_number++) {
+ create_array_prefix(&prefix, "", record_number);
+ process_value((const char *) avro_raw_string_get(&prefix), &value);
+ avro_value_reset(&value);
+ }
+
+ if (rval != EOF) {
+ fprintf(stderr, "Error reading value: %s", avro_strerror());
+ }
+
+ avro_raw_string_done(&prefix);
+ avro_value_decref(&value);
+ avro_value_iface_decref(iface);
+ avro_file_reader_close(reader);
+ avro_schema_decref(wschema);
+}
+
+
+/*-- MAIN PROGRAM --*/
+static struct option longopts[] = {
+ { "separator", required_argument, NULL, 's' },
+ { NULL, 0, NULL, 0 }
+};
+
+static void usage(void)
+{
+ fprintf(stderr,
+ "Usage: avropipe [--separator=<separator>]\n"
+ " <avro data file>\n");
+}
+
+
+int main(int argc, char **argv)
+{
+ char *data_filename;
+
+ int ch;
+ while ((ch = getopt_long(argc, argv, "s:", longopts, NULL) ) != -1) {
+ switch (ch) {
+ case 's':
+ separator = optarg;
+ break;
+
+ default:
+ usage();
+ exit(1);
+ }
+ }
+
+ argc -= optind;
+ argv += optind;
+
+ if (argc == 1) {
+ data_filename = argv[0];
+ } else if (argc == 0) {
+ data_filename = NULL;
+ } else {
+ fprintf(stderr, "Can't read from multiple input files.\n");
+ usage();
+ exit(1);
+ }
+
+ /* Process the data file */
+ process_file(data_filename);
+ return 0;
+}