/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ #include #include "flb_tests_runtime.h" /* Test data */ #include "data/es/json_es.h" /* JSON_ES */ static void cb_check_write_op_index(void *ctx, int ffd, int res_ret, void *res_data, size_t res_size, void *data) { char *p; char *out_js = res_data; char *index_line = "{\"index\":{"; p = strstr(out_js, index_line); TEST_CHECK(p == out_js); flb_free(res_data); } static void cb_check_write_op_create(void *ctx, int ffd, int res_ret, void *res_data, size_t res_size, void *data) { char *p; char *out_js = res_data; char *index_line = "{\"create\":{"; p = strstr(out_js, index_line); TEST_CHECK(p == out_js); flb_free(res_data); } static void cb_check_write_op_update(void *ctx, int ffd, int res_ret, void *res_data, size_t res_size, void *data) { char *p; char *b; char *out_js = res_data; char *index_line = "{\"update\":{"; char *body = "{\"doc\":"; p = strstr(out_js, index_line); TEST_CHECK(p == out_js); b = strstr(out_js, body); TEST_CHECK(b != NULL); flb_free(res_data); } static void cb_check_write_op_upsert(void *ctx, int ffd, int res_ret, void *res_data, size_t res_size, void *data) { char *p; char *b; char *out_js = res_data; char *index_line = "{\"update\":{"; char *body = "{\"doc_as_upsert\":true,\"doc\":"; p = strstr(out_js, index_line); TEST_CHECK(p == out_js); b = strstr(out_js, body); TEST_CHECK(b != NULL); flb_free(res_data); } static void cb_check_index_type(void *ctx, int ffd, int res_ret, void *res_data, size_t res_size, void *data) { char *p; char *out_js = res_data; char *index_line = "{\"create\":{\"_index\":\"index_test\",\"_type\":\"type_test\"}"; p = strstr(out_js, index_line); TEST_CHECK(p != NULL); flb_free(res_data); } static void cb_check_logstash_format(void *ctx, int ffd, int res_ret, void *res_data, size_t res_size, void *data) { char *p; char *out_js = res_data; char *index_line = "{\"create\":{\"_index\":\"prefix-2015-11-24\",\"_type\":\"_doc\"}"; p = strstr(out_js, index_line); if(!TEST_CHECK(p != NULL)) { TEST_MSG("Got: %s", out_js); } flb_free(res_data); } static void cb_check_logstash_prefix_separator(void *ctx, int ffd, int res_ret, void *res_data, size_t res_size, void *data) { char *p; char *out_js = res_data; char *index_line = "{\"create\":{\"_index\":\"prefixSEP2015-11-24\",\"_type\":\"_doc\"}"; p = strstr(out_js, index_line); if(!TEST_CHECK(p != NULL)) { TEST_MSG("Got: %s", out_js); } flb_free(res_data); } static void cb_check_logstash_format_nanos(void *ctx, int ffd, int res_ret, void *res_data, size_t res_size, void *data) { char *p; char *out_js = res_data; char *index_line = "\"@timestamp\":\"2015-11-24T22:15:40.000000000Z\""; p = strstr(out_js, index_line); TEST_CHECK(p != NULL); flb_free(res_data); } static void cb_check_tag_key(void *ctx, int ffd, int res_ret, void *res_data, size_t res_size, void *data) { char *p; char *out_js = res_data; char *record = "\"mytag\":\"test\""; p = strstr(out_js, record); TEST_CHECK(p != NULL); flb_free(res_data); } static void cb_check_replace_dots(void *ctx, int ffd, int res_ret, void *res_data, size_t res_size, void *data) { char *p; char *out_js = res_data; char *record = "\"_o_k\":[{\"_b_ar\""; p = strstr(out_js, record); TEST_CHECK(p != NULL); flb_free(res_data); } static void cb_check_id_key(void *ctx, int ffd, int res_ret, void *res_data, size_t res_size, void *data) { char *p; char *out_js = res_data; char *record = "\"_id\":\"some string\""; // see data/es/json_es.h p = strstr(out_js, record); TEST_CHECK(p != NULL); flb_free(res_data); } void flb_test_write_operation_index() { int ret; int size = sizeof(JSON_ES) - 1; flb_ctx_t *ctx; int in_ffd; int out_ffd; /* Create context, flush every second (some checks omitted here) */ ctx = flb_create(); flb_service_set(ctx, "flush", "1", "grace", "1", NULL); /* Lib input mode */ in_ffd = flb_input(ctx, (char *) "lib", NULL); flb_input_set(ctx, in_ffd, "tag", "test", NULL); /* Elasticsearch output */ out_ffd = flb_output(ctx, (char *) "es", NULL); flb_output_set(ctx, out_ffd, "match", "test", NULL); /* Override defaults of index and type */ flb_output_set(ctx, out_ffd, "write_operation", "index", NULL); /* Enable test mode */ ret = flb_output_set_test(ctx, out_ffd, "formatter", cb_check_write_op_index, NULL, NULL); /* Start */ ret = flb_start(ctx); TEST_CHECK(ret == 0); /* Ingest data sample */ flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); sleep(2); flb_stop(ctx); flb_destroy(ctx); } void flb_test_write_operation_create() { int ret; int size = sizeof(JSON_ES) - 1; flb_ctx_t *ctx; int in_ffd; int out_ffd; /* Create context, flush every second (some checks omitted here) */ ctx = flb_create(); flb_service_set(ctx, "flush", "1", "grace", "1", NULL); /* Lib input mode */ in_ffd = flb_input(ctx, (char *) "lib", NULL); flb_input_set(ctx, in_ffd, "tag", "test", NULL); /* Elasticsearch output */ out_ffd = flb_output(ctx, (char *) "es", NULL); flb_output_set(ctx, out_ffd, "match", "test", NULL); /* Override defaults of index and type */ flb_output_set(ctx, out_ffd, "write_operation", "create", NULL); /* Enable test mode */ ret = flb_output_set_test(ctx, out_ffd, "formatter", cb_check_write_op_create, NULL, NULL); /* Start */ ret = flb_start(ctx); TEST_CHECK(ret == 0); /* Ingest data sample */ flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); sleep(2); flb_stop(ctx); flb_destroy(ctx); } void flb_test_write_operation_update() { int ret; int size = sizeof(JSON_ES) - 1; flb_ctx_t *ctx; int in_ffd; int out_ffd; /* Create context, flush every second (some checks omitted here) */ ctx = flb_create(); flb_service_set(ctx, "flush", "1", "grace", "1", NULL); /* Lib input mode */ in_ffd = flb_input(ctx, (char *) "lib", NULL); flb_input_set(ctx, in_ffd, "tag", "test", NULL); /* Elasticsearch output */ out_ffd = flb_output(ctx, (char *) "es", NULL); flb_output_set(ctx, out_ffd, "match", "test", NULL); /* Override defaults of index and type */ flb_output_set(ctx, out_ffd, "Write_Operation", "Update", "Generate_Id", "True", NULL); /* Enable test mode */ ret = flb_output_set_test(ctx, out_ffd, "formatter", cb_check_write_op_update, NULL, NULL); /* Start */ ret = flb_start(ctx); TEST_CHECK(ret == 0); /* Ingest data sample */ flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); sleep(2); flb_stop(ctx); flb_destroy(ctx); } void flb_test_write_operation_upsert() { int ret; int size = sizeof(JSON_ES) - 1; flb_ctx_t *ctx; int in_ffd; int out_ffd; /* Create context, flush every second (some checks omitted here) */ ctx = flb_create(); flb_service_set(ctx, "flush", "1", "grace", "1", NULL); /* Lib input mode */ in_ffd = flb_input(ctx, (char *) "lib", NULL); flb_input_set(ctx, in_ffd, "tag", "test", NULL); /* Elasticsearch output */ out_ffd = flb_output(ctx, (char *) "es", NULL); flb_output_set(ctx, out_ffd, "match", "test", NULL); /* Override defaults of index and type */ flb_output_set(ctx, out_ffd, "Write_Operation", "Upsert", "Generate_Id", "True", NULL); /* Enable test mode */ ret = flb_output_set_test(ctx, out_ffd, "formatter", cb_check_write_op_upsert, NULL, NULL); /* Start */ ret = flb_start(ctx); TEST_CHECK(ret == 0); /* Ingest data sample */ flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); sleep(2); flb_stop(ctx); flb_destroy(ctx); } void flb_test_index_type() { int ret; int size = sizeof(JSON_ES) - 1; flb_ctx_t *ctx; int in_ffd; int out_ffd; /* Create context, flush every second (some checks omitted here) */ ctx = flb_create(); flb_service_set(ctx, "flush", "1", "grace", "1", NULL); /* Lib input mode */ in_ffd = flb_input(ctx, (char *) "lib", NULL); flb_input_set(ctx, in_ffd, "tag", "test", NULL); /* Elasticsearch output */ out_ffd = flb_output(ctx, (char *) "es", NULL); flb_output_set(ctx, out_ffd, "match", "test", NULL); /* Override defaults of index and type */ flb_output_set(ctx, out_ffd, "index", "index_test", "type", "type_test", NULL); /* Enable test mode */ ret = flb_output_set_test(ctx, out_ffd, "formatter", cb_check_index_type, NULL, NULL); /* Start */ ret = flb_start(ctx); TEST_CHECK(ret == 0); /* Ingest data sample */ flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); sleep(2); flb_stop(ctx); flb_destroy(ctx); } void flb_test_logstash_format() { int ret; int size = sizeof(JSON_ES) - 1; flb_ctx_t *ctx; int in_ffd; int out_ffd; /* Create context, flush every second (some checks omitted here) */ ctx = flb_create(); flb_service_set(ctx, "flush", "1", "grace", "1", NULL); /* Lib input mode */ in_ffd = flb_input(ctx, (char *) "lib", NULL); flb_input_set(ctx, in_ffd, "tag", "test", NULL); /* Elasticsearch output */ out_ffd = flb_output(ctx, (char *) "es", NULL); flb_output_set(ctx, out_ffd, "match", "test", NULL); /* Override defaults of index and type */ flb_output_set(ctx, out_ffd, "logstash_format", "on", "logstash_prefix", "prefix", "logstash_dateformat", "%Y-%m-%d", NULL); /* Enable test mode */ ret = flb_output_set_test(ctx, out_ffd, "formatter", cb_check_logstash_format, NULL, NULL); /* Start */ ret = flb_start(ctx); TEST_CHECK(ret == 0); /* Ingest data sample */ flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); sleep(2); flb_stop(ctx); flb_destroy(ctx); } void flb_test_logstash_format_nanos() { int ret; int size = sizeof(JSON_ES) - 1; flb_ctx_t *ctx; int in_ffd; int out_ffd; /* Create context, flush every second (some checks omitted here) */ ctx = flb_create(); flb_service_set(ctx, "flush", "1", "grace", "1", NULL); /* Lib input mode */ in_ffd = flb_input(ctx, (char *) "lib", NULL); flb_input_set(ctx, in_ffd, "tag", "test", NULL); /* Elasticsearch output */ out_ffd = flb_output(ctx, (char *) "es", NULL); flb_output_set(ctx, out_ffd, "match", "test", NULL); /* Override defaults of index and type */ flb_output_set(ctx, out_ffd, "logstash_format", "on", "logstash_prefix", "prefix", "logstash_dateformat", "%Y-%m-%d", "time_key_nanos", "on", NULL); /* Enable test mode */ ret = flb_output_set_test(ctx, out_ffd, "formatter", cb_check_logstash_format_nanos, NULL, NULL); /* Start */ ret = flb_start(ctx); TEST_CHECK(ret == 0); /* Ingest data sample */ flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); sleep(2); flb_stop(ctx); flb_destroy(ctx); } void flb_test_tag_key() { int ret; int size = sizeof(JSON_ES) - 1; flb_ctx_t *ctx; int in_ffd; int out_ffd; /* Create context, flush every second (some checks omitted here) */ ctx = flb_create(); flb_service_set(ctx, "flush", "1", "grace", "1", NULL); /* Lib input mode */ in_ffd = flb_input(ctx, (char *) "lib", NULL); flb_input_set(ctx, in_ffd, "tag", "test", NULL); /* Elasticsearch output */ out_ffd = flb_output(ctx, (char *) "es", NULL); flb_output_set(ctx, out_ffd, "match", "test", NULL); /* Override defaults of index and type */ flb_output_set(ctx, out_ffd, "include_tag_key", "on", "tag_key", "mytag", NULL); /* Enable test mode */ ret = flb_output_set_test(ctx, out_ffd, "formatter", cb_check_tag_key, NULL, NULL); /* Start */ ret = flb_start(ctx); TEST_CHECK(ret == 0); /* Ingest data sample */ flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); sleep(2); flb_stop(ctx); flb_destroy(ctx); } void flb_test_replace_dots() { int ret; int size = sizeof(JSON_DOTS) - 1; flb_ctx_t *ctx; int in_ffd; int out_ffd; /* Create context, flush every second (some checks omitted here) */ ctx = flb_create(); flb_service_set(ctx, "flush", "1", "grace", "1", NULL); /* Lib input mode */ in_ffd = flb_input(ctx, (char *) "lib", NULL); flb_input_set(ctx, in_ffd, "tag", "test", NULL); /* Elasticsearch output */ out_ffd = flb_output(ctx, (char *) "es", NULL); flb_output_set(ctx, out_ffd, "match", "test", NULL); /* Override defaults of index and type */ flb_output_set(ctx, out_ffd, "replace_dots", "on", NULL); /* Enable test mode */ ret = flb_output_set_test(ctx, out_ffd, "formatter", cb_check_replace_dots, NULL, NULL); /* Start */ ret = flb_start(ctx); TEST_CHECK(ret == 0); /* Ingest data sample */ flb_lib_push(ctx, in_ffd, (char *) JSON_DOTS, size); sleep(2); flb_stop(ctx); flb_destroy(ctx); } void flb_test_id_key() { int ret; int size = sizeof(JSON_ES) - 1; flb_ctx_t *ctx; int in_ffd; int out_ffd; /* Create context, flush every second (some checks omitted here) */ ctx = flb_create(); flb_service_set(ctx, "flush", "1", "grace", "1", NULL); /* Lib input mode */ in_ffd = flb_input(ctx, (char *) "lib", NULL); flb_input_set(ctx, in_ffd, "tag", "test", NULL); /* Elasticsearch output */ out_ffd = flb_output(ctx, (char *) "es", NULL); flb_output_set(ctx, out_ffd, "match", "test", NULL); /* Override defaults of index and type */ flb_output_set(ctx, out_ffd, "id_key", "key_2", NULL); /* Enable test mode */ ret = flb_output_set_test(ctx, out_ffd, "formatter", cb_check_id_key, NULL, NULL); /* Start */ ret = flb_start(ctx); TEST_CHECK(ret == 0); /* Ingest data sample */ flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); sleep(2); flb_stop(ctx); flb_destroy(ctx); } /* no check */ static void cb_check_nothing(void *ctx, int ffd, int res_ret, void *res_data, size_t res_size, void *data) { flb_free(res_data); } /* https://github.com/fluent/fluent-bit/issues/3905 */ void flb_test_div0() { int ret; char record[8000]; char record_header[] = "[1448403340,{\"key\":\""; flb_ctx_t *ctx; int in_ffd; int out_ffd; int i; /* Create context, flush every second (some checks omitted here) */ ctx = flb_create(); flb_service_set(ctx, "flush", "1", "grace", "1", NULL); /* Lib input mode */ in_ffd = flb_input(ctx, (char *) "lib", NULL); flb_input_set(ctx, in_ffd, "tag", "test", NULL); /* Elasticsearch output */ out_ffd = flb_output(ctx, (char *) "es", NULL); flb_output_set(ctx, out_ffd, "match", "test", NULL); /* Override defaults of index and type */ flb_output_set(ctx, out_ffd, NULL); /* Enable test mode */ ret = flb_output_set_test(ctx, out_ffd, "formatter", cb_check_nothing, NULL, NULL); /* Start */ ret = flb_start(ctx); TEST_CHECK(ret == 0); /* create json */ strncpy(&record[0], &record_header[0], strlen(record_header)); for(i=strlen(record_header); i