summaryrefslogtreecommitdiffstats
path: root/fluent-bit/tests/internal/avro.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--fluent-bit/tests/internal/avro.c382
1 files changed, 382 insertions, 0 deletions
diff --git a/fluent-bit/tests/internal/avro.c b/fluent-bit/tests/internal/avro.c
new file mode 100644
index 00000000..45f07343
--- /dev/null
+++ b/fluent-bit/tests/internal/avro.c
@@ -0,0 +1,382 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+#include <errno.h>
+#include <string.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_avro.h>
+#include <fluent-bit/flb_log.h>
+#include <fluent-bit/flb_pack.h>
+#include <msgpack.h>
+
+#include "flb_tests_internal.h"
+
+/* AVRO iteration tests */
+#define AVRO_SINGLE_MAP1 FLB_TESTS_DATA_PATH "/data/avro/json_single_map_001.json"
+#define AVRO_MULTILINE_JSON FLB_TESTS_DATA_PATH "/data/avro/live-sample.json"
+
+const char JSON_SINGLE_MAP_001_SCHEMA[] =
+"{\"type\":\"record\",\
+ \"name\":\"Map001\",\
+ \"fields\":[\
+ {\"name\": \"key001\", \"type\": \"int\"},\
+ {\"name\": \"key002\", \"type\": \"float\"},\
+ {\"name\": \"key003\", \"type\": \"string\"},\
+ {\"name\": \"key004\", \"type\":\
+ {\"type\": \"array\", \"items\":\
+ {\"type\": \"map\",\"values\": \"int\"}}}]}";
+
+msgpack_unpacked test_init(avro_value_t *aobject, avro_schema_t *aschema, const char *json_schema, const char *json_data) {
+ char *out_buf;
+ size_t out_size;
+ int root_type;
+
+ avro_value_iface_t *aclass = flb_avro_init(aobject, (char *)json_schema, strlen(json_schema), aschema);
+ TEST_CHECK(aclass != NULL);
+
+ char *data = mk_file_to_buffer(json_data);
+ TEST_CHECK(data != NULL);
+
+ size_t len = strlen(data);
+
+ TEST_CHECK(flb_pack_json(data, len, &out_buf, &out_size, &root_type, NULL) == 0);
+
+ msgpack_unpacked msg;
+ msgpack_unpacked_init(&msg);
+ TEST_CHECK(msgpack_unpack_next(&msg, out_buf, out_size, NULL) == MSGPACK_UNPACK_SUCCESS);
+
+ avro_value_iface_decref(aclass);
+ flb_free(data);
+ flb_free(out_buf);
+
+ return msg;
+}
+/* Unpack msgpack per avro schema */
+void test_unpack_to_avro()
+{
+ avro_value_t aobject;
+ avro_schema_t aschema;
+
+ msgpack_unpacked mp = test_init(&aobject, &aschema, JSON_SINGLE_MAP_001_SCHEMA, AVRO_SINGLE_MAP1);
+
+ msgpack_object_print(stderr, mp.data);
+ flb_msgpack_to_avro(&aobject, &mp.data);
+
+ avro_value_t test_value;
+ TEST_CHECK(avro_value_get_by_name(&aobject, "key001", &test_value, NULL) == 0);
+
+ int val001 = 0;
+ avro_value_get_int(&test_value, &val001);
+ TEST_CHECK(val001 == 123456789);
+
+ TEST_CHECK(avro_value_get_by_name(&aobject, "key002", &test_value, NULL) == 0);
+
+ float val002 = 0.0f;
+ // for some reason its rounding to this value
+ float val002_actual = 0.999888f;
+ avro_value_get_float(&test_value, &val002);
+ char str1[80];
+ char str2[80];
+ sprintf(str1, "%f", val002);
+ sprintf(str2, "%f", val002_actual);
+ flb_info("val002:%s:\n", str1);
+ flb_info("val002_actual:%s:\n", str2);
+ TEST_CHECK((strcmp(str1, str2) == 0));
+
+ TEST_CHECK(avro_value_get_by_name(&aobject, "key003", &test_value, NULL) == 0);
+ const char *val003 = NULL;
+ size_t val003_size = 0;
+ avro_value_get_string(&test_value, &val003, &val003_size);
+ flb_info("val003_size:%zu:\n", val003_size);
+ TEST_CHECK(val003[val003_size] == '\0');
+
+ TEST_CHECK((strcmp(val003, "abcdefghijk") == 0));
+ // avro_value_get_by_name returns ths string length plus the NUL
+ TEST_CHECK(val003_size == 12);
+
+ TEST_CHECK(avro_value_get_by_name(&aobject, "key004", &test_value, NULL) == 0);
+
+ size_t asize = 0;
+ avro_value_get_size(&test_value, &asize);
+ flb_info("asize:%zu:\n", asize);
+
+ TEST_CHECK(asize == 2);
+
+ // check the first map
+ avro_value_t k8sRecord;
+ TEST_CHECK(avro_value_get_by_index(&test_value, 0, &k8sRecord, NULL) == 0);
+
+ size_t msize = 0;
+ avro_value_get_size(&k8sRecord, &msize);
+ flb_info("msize:%zu:\n", msize);
+
+ TEST_CHECK(msize == 2);
+
+ avro_value_t obj_test;
+ const char *actual_key = NULL;
+ int actual = 0;
+
+ // check the first item in the map
+ TEST_CHECK(avro_value_get_by_index(&k8sRecord, 0, &obj_test, &actual_key) == 0);
+ flb_info("actual_key:%s:\n", actual_key);
+
+ TEST_CHECK(strcmp(actual_key, "a") == 0);
+ TEST_CHECK(avro_value_get_int(&obj_test, &actual) == 0);
+ TEST_CHECK(actual == 1);
+
+ // check the second item in the map
+ TEST_CHECK(avro_value_get_by_index(&k8sRecord, 1, &obj_test, &actual_key) == 0);
+ flb_info("actual_key:%s:\n", actual_key);
+
+ TEST_CHECK(strcmp(actual_key, "b") == 0);
+ TEST_CHECK(avro_value_get_int(&obj_test, &actual) == 0);
+ TEST_CHECK(actual == 2);
+
+ // check the second map
+ TEST_CHECK(avro_value_get_by_index(&test_value, 1, &k8sRecord, NULL) == 0);
+
+ avro_value_get_size(&k8sRecord, &msize);
+ flb_info("msize:%zu:\n", msize);
+
+ TEST_CHECK(msize == 2);
+
+ avro_value_decref(&aobject);
+ avro_schema_decref(aschema);
+ msgpack_unpacked_destroy(&mp);
+}
+
+void test_parse_reordered_schema()
+{
+ // test same schema but different order of fields
+ const char *ts1 = "{\"name\":\"qavrov2_record\",\"type\":\"record\",\"fields\":[{\"name\":\"log\",\"type\":\"string\"},{\"name\":\"capture\",\"type\":\"string\"},{\"name\":\"kubernetes\",\"type\":{\"name\":\"krec\",\"type\":\"record\",\"fields\":[{\"name\":\"pod_name\",\"type\":\"string\"},{\"name\":\"namespace_name\",\"type\":\"string\"},{\"name\":\"pod_id\",\"type\":\"string\"},{\"name\":\"labels\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"annotations\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"host\",\"type\":\"string\"},{\"name\":\"container_name\",\"type\":\"string\"},{\"name\":\"docker_id\",\"type\":\"string\"},{\"name\":\"container_hash\",\"type\":\"string\"},{\"name\":\"container_image\",\"type\":\"string\"}]}}]}";
+ const char *ts2 = "{\"name\":\"qavrov2_record\",\"type\":\"record\",\"fields\":[{\"name\":\"capture\",\"type\":\"string\"},{\"name\":\"log\",\"type\":\"string\"},{\"name\":\"kubernetes\",\"type\":{\"name\":\"krec\",\"type\":\"record\",\"fields\":[{\"name\":\"namespace_name\",\"type\":\"string\"},{\"name\":\"pod_name\",\"type\":\"string\"},{\"name\":\"pod_id\",\"type\":\"string\"},{\"name\":\"annotations\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"labels\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"host\",\"type\":\"string\"},{\"name\":\"container_name\",\"type\":\"string\"},{\"name\":\"docker_id\",\"type\":\"string\"},{\"name\":\"container_hash\",\"type\":\"string\"},{\"name\":\"container_image\",\"type\":\"string\"}]}}]}";
+ const char *ts3 = "{\"name\":\"qavrov2_record\",\"type\":\"record\",\"fields\":[{\"name\":\"newnovalue\",\"type\":\"string\"},{\"name\":\"capture\",\"type\":\"string\"},{\"name\":\"log\",\"type\":\"string\"},{\"name\":\"kubernetes\",\"type\":{\"name\":\"krec\",\"type\":\"record\",\"fields\":[{\"name\":\"namespace_name\",\"type\":\"string\"},{\"name\":\"pod_name\",\"type\":\"string\"},{\"name\":\"pod_id\",\"type\":\"string\"},{\"name\":\"annotations\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"labels\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"host\",\"type\":\"string\"},{\"name\":\"container_name\",\"type\":\"string\"},{\"name\":\"docker_id\",\"type\":\"string\"},{\"name\":\"container_hash\",\"type\":\"string\"},{\"name\":\"container_image\",\"type\":\"string\"}]}}]}";
+
+ const char *schemas[] = {ts1, ts2, ts3, ts2, ts1, NULL};
+
+ int i=0;
+ for (i=0; schemas[i] != NULL ; i++) {
+
+ avro_value_t aobject = {0};
+ avro_schema_t aschema = {0};
+
+ msgpack_unpacked msg = test_init(&aobject, &aschema, schemas[i], AVRO_MULTILINE_JSON);
+
+ msgpack_object_print(stderr, msg.data);
+
+ flb_msgpack_to_avro(&aobject, &msg.data);
+
+ avro_value_t log0;
+ TEST_CHECK(avro_value_get_by_name(&aobject, "log", &log0, NULL) == 0);
+
+ size_t size1 = 0;
+ const char *log_line = NULL;
+ TEST_CHECK(avro_value_get_string(&log0, &log_line, &size1) == 0);
+ char *pre = "2020-08-21T15:49:48.154291375Z";
+ TEST_CHECK((strncmp(pre, log_line, strlen(pre)) == 0));
+ flb_info("log_line len:%zu:\n", strlen(log_line));
+
+ avro_value_t kubernetes0;
+ TEST_CHECK(avro_value_get_by_name(&aobject, "kubernetes", &kubernetes0, NULL) == 0);
+
+ avro_value_get_size(&kubernetes0, &size1);
+ flb_info("asize:%zu:\n", size1);
+ TEST_CHECK(size1 == 10);
+
+ avro_value_t pn;
+ TEST_CHECK(avro_value_get_by_name(&kubernetes0, "pod_name", &pn, NULL) == 0);
+
+ const char *pod_name = NULL;
+ size_t pod_name_size = 0;
+ TEST_CHECK(avro_value_get_string(&pn, &pod_name, &pod_name_size) == 0);
+ TEST_CHECK(strcmp(pod_name, "rrrr-bert-completion-tb1-6786c9c8-wj25m") == 0);
+ TEST_CHECK(pod_name[pod_name_size] == '\0');
+ TEST_CHECK(strlen(pod_name) == (pod_name_size-1));
+
+ avro_value_t nn;
+ TEST_CHECK(avro_value_get_by_name(&kubernetes0, "namespace_name", &nn, NULL) == 0);
+
+ const char *namespace_name = NULL;
+ size_t namespace_name_size = 0;
+ TEST_CHECK(avro_value_get_string(&nn, &namespace_name, &namespace_name_size) == 0);
+ TEST_CHECK(strcmp(namespace_name, "k8s-fgg") == 0);
+
+ avro_value_t mapX;
+ TEST_CHECK(avro_value_get_by_name(&kubernetes0, "annotations", &mapX, NULL) == 0);
+
+ avro_value_get_size(&mapX, &size1);
+ flb_info("asize:%zu:\n", size1);
+
+ TEST_CHECK(size1 == 5);
+
+ // check the first item in the map
+ avro_value_t doas;
+ TEST_CHECK(avro_value_get_by_name(&mapX, "doAs", &doas, NULL) == 0);
+ const char *doaser = NULL;
+ size_t doaser_size;
+ TEST_CHECK(avro_value_get_string(&doas, &doaser, &doaser_size) == 0);
+ TEST_CHECK((strcmp(doaser, "weeb") == 0));
+
+ // check the second item in the map
+ avro_value_t iddecorator;
+ TEST_CHECK(avro_value_get_by_name(&mapX, "iddecorator.dkdk.username", &iddecorator, NULL) == 0);
+ const char *idder = NULL;
+ size_t idder_size;
+ TEST_CHECK(avro_value_get_string(&iddecorator, &idder, &idder_size) == 0);
+ TEST_CHECK((strcmp(idder, "rrrr") == 0));
+
+ avro_schema_decref(aschema);
+ msgpack_unpacked_destroy(&msg);
+ avro_value_decref(&aobject);
+ }
+
+}
+
+// int msgpack2avro(avro_value_t *val, msgpack_object *o)
+// get a schema for a type like this:
+// http://avro.apache.org/docs/current/api/c/index.html#_examples
+// ../lib/msgpack-3.2.0/include/msgpack/pack.h
+// static int msgpack_pack_nil(msgpack_packer* pk);
+void test_msgpack2avro()
+{
+ avro_value_t aobject;
+ avro_schema_t schema = avro_schema_null();
+ avro_value_iface_t *aclass = avro_generic_class_from_schema(schema);
+ avro_generic_value_new(aclass, &aobject);
+
+ msgpack_sbuffer sbuf;
+ msgpack_packer pk;
+ msgpack_zone mempool;
+ msgpack_object deserialized;
+
+ /* msgpack::sbuffer is a simple buffer implementation. */
+ msgpack_sbuffer_init(&sbuf);
+
+ /* serialize values into the buffer using msgpack_sbuffer_write callback function. */
+ msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);
+
+ msgpack_pack_nil(&pk);
+
+ /* deserialize the buffer into msgpack_object instance. */
+ /* deserialized object is valid during the msgpack_zone instance alive. */
+ msgpack_zone_init(&mempool, 2048);
+ msgpack_unpack(sbuf.data, sbuf.size, NULL, &mempool, &deserialized);
+
+ TEST_CHECK((msgpack2avro(&aobject, &deserialized) == FLB_TRUE));
+
+ msgpack_zone_destroy(&mempool);
+ msgpack_sbuffer_destroy(&sbuf);
+}
+const char JSON_SINGLE_MAP_001_SCHEMA_WITH_UNION[] =
+"{\"type\":\"record\",\
+ \"name\":\"Map001\",\
+ \"fields\":[\
+ {\"name\": \"key001\", \"type\": \"int\"},\
+ {\"name\": \"key002\", \"type\": \"float\"},\
+ {\"name\": \"key003\", \"type\": \"string\"},\
+ { \
+ \"name\": \"status\", \
+ \"default\": null, \
+ \"type\": [\"null\", \"string\"] \
+ }, \
+ {\"name\": \"key004\", \"type\":\
+ {\"type\": \"array\", \"items\":\
+ {\"type\": \"map\",\"values\": \"int\"}}}]}";
+void test_union_type_sanity()
+{
+ avro_value_t aobject;
+ avro_schema_t aschema;
+
+ msgpack_unpacked msg = test_init(&aobject, &aschema, JSON_SINGLE_MAP_001_SCHEMA_WITH_UNION, AVRO_SINGLE_MAP1);
+
+ msgpack_object_print(stderr, msg.data);
+ flb_msgpack_to_avro(&aobject, &msg.data);
+
+ size_t totalSize = 0;
+ avro_value_get_size(&aobject, &totalSize);
+ flb_info("totalSize:%zu:\n", totalSize);
+ // this is key001,2,3,4 and the status field which is the union type
+ TEST_CHECK(totalSize == 5);
+
+ avro_value_t test_value;
+ TEST_CHECK(avro_value_get_by_name(&aobject, "key001", &test_value, NULL) == 0);
+
+ int val001 = 0;
+ avro_value_get_int(&test_value, &val001);
+ TEST_CHECK(val001 == 123456789);
+
+ TEST_CHECK(avro_value_get_by_name(&aobject, "key002", &test_value, NULL) == 0);
+
+ float val002 = 0.0f;
+ // for some reason its rounding to this value
+ float val002_actual = 0.999888f;
+ avro_value_get_float(&test_value, &val002);
+ char str1[80];
+ char str2[80];
+ sprintf(str1, "%f", val002);
+ sprintf(str2, "%f", val002_actual);
+ flb_info("val002:%s:\n", str1);
+ flb_info("val002_actual:%s:\n", str2);
+ TEST_CHECK((strcmp(str1, str2) == 0));
+
+ TEST_CHECK(avro_value_get_by_name(&aobject, "key003", &test_value, NULL) == 0);
+ const char *val003 = NULL;
+ size_t val003_size = 0;
+ TEST_CHECK(avro_value_get_string(&test_value, &val003, &val003_size) == 0);
+ flb_info("val003_size:%zu:\n", val003_size);
+ TEST_CHECK(val003[val003_size] == '\0');
+
+ TEST_CHECK((strcmp(val003, "abcdefghijk") == 0));
+ // avro_value_get_by_name returns ths string length plus the NUL
+ TEST_CHECK(val003_size == 12);
+
+ TEST_CHECK(avro_value_get_by_name(&aobject, "key004", &test_value, NULL) == 0);
+
+ size_t asize = 0;
+ avro_value_get_size(&test_value, &asize);
+ flb_info("asize:%zu:\n", asize);
+
+ TEST_CHECK(asize == 2);
+
+ TEST_CHECK(avro_value_get_by_name(&aobject, "status", &test_value, NULL) == 0);
+
+ avro_value_decref(&aobject);
+ avro_schema_decref(aschema);
+ msgpack_unpacked_destroy(&msg);
+}
+
+void test_union_type_branches()
+{
+ avro_value_t aobject;
+ avro_schema_t aschema;
+
+ msgpack_unpacked mp = test_init(&aobject, &aschema, JSON_SINGLE_MAP_001_SCHEMA_WITH_UNION, AVRO_SINGLE_MAP1);
+
+ flb_msgpack_to_avro(&aobject, &mp.data);
+
+ avro_value_t test_value;
+ TEST_CHECK(avro_value_get_by_name(&aobject, "status", &test_value, NULL) == 0);
+ TEST_CHECK(avro_value_get_type(&test_value) == AVRO_UNION);
+
+ int discriminant = 0;
+ TEST_CHECK(avro_value_get_discriminant(&test_value, &discriminant) == 0);
+ TEST_CHECK(discriminant == -1);
+
+ avro_value_t branch;
+ TEST_CHECK(avro_value_get_current_branch(&test_value, &branch) != 0);
+
+ TEST_CHECK(avro_value_set_branch(&test_value, 0, &branch) == 0);
+ TEST_CHECK(avro_value_set_null(&branch) == 0);
+
+ TEST_CHECK(avro_value_get_null(&branch) == 0);
+
+ avro_value_decref(&aobject);
+ avro_schema_decref(aschema);
+ msgpack_unpacked_destroy(&mp);
+}
+TEST_LIST = {
+ /* Avro */
+ { "msgpack_to_avro_basic", test_unpack_to_avro},
+ { "test_parse_reordered_schema", test_parse_reordered_schema},
+ { "test_union_type_sanity", test_union_type_sanity},
+ { "test_union_type_branches", test_union_type_branches},
+ { 0 }
+};