diff options
Diffstat (limited to 'fluent-bit/tests/runtime/in_tail.c')
-rw-r--r-- | fluent-bit/tests/runtime/in_tail.c | 1583 |
1 files changed, 1583 insertions, 0 deletions
diff --git a/fluent-bit/tests/runtime/in_tail.c b/fluent-bit/tests/runtime/in_tail.c new file mode 100644 index 00000000..ee5fba88 --- /dev/null +++ b/fluent-bit/tests/runtime/in_tail.c @@ -0,0 +1,1583 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019-2022 The Fluent Bit Authors + * Copyright (C) 2015-2018 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* +Approach for this tests is basing on filter_kubernetes tests +*/ + +#include <fluent-bit.h> +#include <fluent-bit/flb_time.h> +#include <fluent-bit/flb_pthread.h> +#include <fluent-bit/flb_compat.h> +#include <stdlib.h> +#include <sys/stat.h> +#include <sys/types.h> +#include <fcntl.h> +#include <string.h> +#include "flb_tests_runtime.h" + +#define NEW_LINE "\n" +#define PATH_SEPARATOR "/" + +#define DPATH_COMMON FLB_TESTS_DATA_PATH "/data/common" + +#ifdef _WIN32 + #define TIME_EPSILON_MS 30 +#else + #define TIME_EPSILON_MS 10 +#endif + +struct test_tail_ctx { + flb_ctx_t *flb; /* Fluent Bit library context */ + int i_ffd; /* Input fd */ + int o_ffd; /* Output fd */ + char **filepaths; + int *fds; + int fd_num; +}; + +pthread_mutex_t result_mutex = PTHREAD_MUTEX_INITIALIZER; +int num_output = 0; +static int get_output_num() +{ + int ret; + pthread_mutex_lock(&result_mutex); + ret = num_output; + pthread_mutex_unlock(&result_mutex); + + return ret; +} + +static void set_output_num(int num) +{ + pthread_mutex_lock(&result_mutex); + num_output = num; + pthread_mutex_unlock(&result_mutex); +} + +static void clear_output_num() +{ + set_output_num(0); +} + +static int cb_count_msgpack(void *record, size_t size, void *data) +{ + msgpack_unpacked result; + size_t off = 0; + + /* Iterate each item array and apply rules */ + msgpack_unpacked_init(&result); + while (msgpack_unpack_next(&result, record, size, &off) == MSGPACK_UNPACK_SUCCESS) { + pthread_mutex_lock(&result_mutex); + num_output++; + /* + msgpack_object_print(stdout, result.data); + puts(NEW_LINE); + */ + pthread_mutex_unlock(&result_mutex); + } + msgpack_unpacked_destroy(&result); + + flb_free(record); + return 0; +} + +struct str_list { + size_t size; + char **lists; +}; + +/* Callback to check expected results */ +static int cb_check_json_str_list(void *record, size_t size, void *data) +{ + char *p; + char *result; + int num = get_output_num(); + size_t i; + struct str_list *l = (struct str_list*)data; + + if (!TEST_CHECK(l != NULL)) { + TEST_MSG("Data is NULL"); + flb_free(record); + return 0; + } + + + set_output_num(num+1); + + result = (char *) record; + + for (i=0; i<l->size; i++) { + p = strstr(result, l->lists[i]); + if(!TEST_CHECK(p != NULL)) { + TEST_MSG("Expected to find: '%s' in result '%s'", + l->lists[i], result); + } + } + + flb_free(record); + return 0; +} + +static struct test_tail_ctx *test_tail_ctx_create(struct flb_lib_out_cb *data, + char **paths, int path_num, int override) +{ + int i_ffd; + int o_ffd; + int i; + int j; + int fd; + int o_flags; + struct test_tail_ctx *ctx = NULL; + + if (!TEST_CHECK(data != NULL)){ + TEST_MSG("data is NULL"); + return NULL; + } + + ctx = flb_malloc(sizeof(struct test_tail_ctx)); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("malloc failed"); + flb_errno(); + return NULL; + } + ctx->fds = NULL; + ctx->filepaths = NULL; + ctx->fd_num = path_num; + + /* Service config */ + ctx->flb = flb_create(); + flb_service_set(ctx->flb, + "Flush", "0.200000000", + "Grace", "1", + "Log_Level", "info", + "Parsers_File", DPATH_COMMON "/parsers.conf", + NULL); + + /* Input */ + i_ffd = flb_input(ctx->flb, (char *) "tail", NULL); + TEST_CHECK(i_ffd >= 0); + ctx->i_ffd = i_ffd; + + /* Output */ + o_ffd = flb_output(ctx->flb, (char *) "lib", (void *) data); + ctx->o_ffd = o_ffd; + + /* open() flags */ + o_flags = O_RDWR | O_CREAT; + + if (paths != NULL) { + ctx->fds = flb_malloc(sizeof(int) * path_num); + ctx->filepaths = paths; + if (!TEST_CHECK(ctx->fds != NULL)) { + TEST_MSG("malloc failed"); + flb_destroy(ctx->flb); + flb_free(ctx); + flb_errno(); + return NULL; + } + + for (i=0; i<path_num; i++) { + if (override) { + unlink(paths[i]); + } + else { + o_flags |= O_APPEND; + } + + fd = open(paths[i], o_flags, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); + if (!TEST_CHECK(fd >= 0)) { + TEST_MSG("open failed. errno=%d path[%d]=%s", errno, i, paths[i]); + flb_destroy(ctx->flb); + for (j=0; j<i; j++) { + close(ctx->fds[j]); + } + flb_free(ctx->fds); + flb_free(ctx); + flb_errno(); + return NULL; + } + ctx->fds[i] = fd; + } + } + + return ctx; +} + +static void test_tail_ctx_destroy(struct test_tail_ctx *ctx) +{ + int i; + TEST_CHECK(ctx != NULL); + + if (ctx->fds != NULL) { + for (i=0; i <ctx->fd_num; i++) { + close(ctx->fds[i]); + unlink(ctx->filepaths[i]); + } + flb_free(ctx->fds); + } + + sleep(1); + flb_stop(ctx->flb); + flb_destroy(ctx->flb); + flb_free(ctx); +} + +static ssize_t write_msg(struct test_tail_ctx *ctx, char *msg, size_t msg_len) +{ + int i; + ssize_t w_byte; + + for (i = 0; i <ctx->fd_num; i++) { + flb_time_msleep(100); + w_byte = write(ctx->fds[i], msg, msg_len); + if (!TEST_CHECK(w_byte == msg_len)) { + TEST_MSG("write failed ret=%ld", w_byte); + return -1; + } + /* new line */ + w_byte = write(ctx->fds[i], NEW_LINE, strlen(NEW_LINE)); + if (!TEST_CHECK(w_byte == strlen(NEW_LINE))) { + TEST_MSG("write failed ret=%ld", w_byte); + return -1; + } + fsync(ctx->fds[i]); + flb_time_msleep(100); + } + return w_byte; +} + + +#define DPATH FLB_TESTS_DATA_PATH "/data/tail" +#define MAX_LINES 32 + +int64_t result_time; +struct tail_test_result { + const char *target; + int nMatched; + int nNotMatched; + int nLines; +}; + +struct tail_file_lines { + char *lines[MAX_LINES]; + int lines_c; +}; + +void wait_with_timeout(uint32_t timeout_ms, struct tail_test_result *result, int nExpected) +{ + struct flb_time start_time; + struct flb_time end_time; + struct flb_time diff_time; + uint64_t elapsed_time_flb = 0; + + flb_time_get(&start_time); + + while (true) { + if (result->nMatched == nExpected) { + break; + } + + flb_time_msleep(100); + flb_time_get(&end_time); + flb_time_diff(&end_time, &start_time, &diff_time); + elapsed_time_flb = flb_time_to_nanosec(&diff_time) / 1000000; + + if (elapsed_time_flb > timeout_ms - TIME_EPSILON_MS) { + flb_warn("[timeout] elapsed_time: %ld", elapsed_time_flb); + // Reached timeout. + break; + } + } +} + +static inline int64_t set_result(int64_t v) +{ + int64_t old = __sync_lock_test_and_set(&result_time, v); + return old; +} + + +static int file_to_buf(const char *path, char **out_buf, size_t *out_size) +{ + int ret; + long bytes; + char *buf; + FILE *fp; + struct stat st; + + ret = stat(path, &st); + if (ret == -1) { + return -1; + } + + fp = fopen(path, "r"); + if (!fp) { + return -1; + } + + buf = flb_malloc(st.st_size+1); + if (!buf) { + flb_errno(); + fclose(fp); + return -1; + } + + bytes = fread(buf, st.st_size, 1, fp); + if (bytes != 1) { + flb_errno(); + flb_free(buf); + fclose(fp); + return -1; + } + + fclose(fp); + buf[st.st_size] = '\0'; + *out_buf = buf; + *out_size = st.st_size; + + return 0; +} + +/* Given a target, lookup the .out file and return it content in a tail_file_lines structure */ +static struct tail_file_lines *get_out_file_content(const char *target) +{ + int ret; + char file[PATH_MAX]; + char *p; + char *out_buf; + size_t out_size; + struct tail_file_lines *file_lines = flb_malloc(sizeof (struct tail_file_lines)); + file_lines->lines_c = 0; + + snprintf(file, sizeof(file) - 1, DPATH "/out/%s.out", target); + + ret = file_to_buf(file, &out_buf, &out_size); + TEST_CHECK_(ret == 0, "getting output file content: %s", file); + if (ret != 0) { + file_lines->lines_c = 0; + return file_lines; + } + + file_lines->lines[file_lines->lines_c++] = out_buf; + + int i; + for (i=0; i<out_size; i++) { + // Nullify \n and \r characters + p = (char *)(out_buf + i); + if (*p == '\n' || *p == '\r') { + *p = '\0'; + + if (i == out_size - 1) { + break; + } + + if (*++p != '\0' && *p != '\n' && *p != '\r' && file_lines->lines_c < MAX_LINES) { + file_lines->lines[file_lines->lines_c++] = p; + } + } + } + + return file_lines; +} + +static int cb_check_result(void *record, size_t size, void *data) +{ + struct tail_test_result *result; + struct tail_file_lines *out; + + result = (struct tail_test_result *) data; + + char *check; + + out = get_out_file_content(result->target); + if (!out->lines_c) { + goto exit; + } + /* + * Our validation is: check that the one of the output lines + * in the output record. + */ + int i; + result->nLines = out->lines_c; + for (i=0; i<out->lines_c; i++) { + check = strstr(record, out->lines[i]); + if (check != NULL) { + result->nMatched++; + goto exit; + } + } + result->nNotMatched++; +exit: + if (size > 0) { + flb_free(record); + } + if (out->lines_c) { + flb_free(out->lines[0]); + flb_free(out); + } + return 0; +} + +void do_test(char *system, const char *target, int tExpected, int nExpected, ...) +{ + int64_t ret; + flb_ctx_t *ctx = NULL; + int in_ffd; + int out_ffd; + va_list va; + char *key; + char *value; + char path[PATH_MAX]; + struct tail_test_result result = {0}; + + result.nMatched = 0; + result.target = target; + + struct flb_lib_out_cb cb; + cb.cb = cb_check_result; + cb.data = &result; + + /* initialize */ + set_result(0); + + ctx = flb_create(); + + ret = flb_service_set(ctx, + "Log_Level", "error", + "Parsers_File", DPATH "/parsers.conf", + NULL); + TEST_CHECK_(ret == 0, "setting service options"); + + in_ffd = flb_input(ctx, (char *) system, NULL); + TEST_CHECK(in_ffd >= 0); + TEST_CHECK(flb_input_set(ctx, in_ffd, "tag", "test", NULL) == 0); + + /* Compose path based on target */ + snprintf(path, sizeof(path) - 1, DPATH "/log/%s.log", target); + TEST_CHECK_(access(path, R_OK) == 0, "accessing log file: %s", path); + + TEST_CHECK(flb_input_set(ctx, in_ffd, + "path" , path, + "docker_mode" , "on", + "parser" , "docker", + "read_from_head", "true", + NULL) == 0); + + va_start(va, nExpected); + while ((key = va_arg(va, char *))) { + value = va_arg(va, char *); + TEST_CHECK(value != NULL); + TEST_CHECK(flb_input_set(ctx, in_ffd, key, value, NULL) == 0); + } + va_end(va); + + out_ffd = flb_output(ctx, (char *) "lib", &cb); + TEST_CHECK(out_ffd >= 0); + TEST_CHECK(flb_output_set(ctx, out_ffd, + "match", "test", + "format", "json", + NULL) == 0); + + TEST_CHECK(flb_service_set(ctx, "Flush", "0.5", + "Grace", "1", + NULL) == 0); + + /* Start test */ + /* Start the engine */ + ret = flb_start(ctx); + TEST_CHECK_(ret == 0, "starting engine"); + + /* Poll for up to 5 seconds or until we got a match */ + for (ret = 0; ret < tExpected && result.nMatched < nExpected; ret++) { + usleep(1000); + } + + /* Wait until matching nExpected results */ + wait_with_timeout(5000, &result, nExpected); + + TEST_CHECK(result.nMatched == nExpected); + TEST_MSG("result.nMatched: %i\nnExpected: %i", result.nMatched, nExpected); + + ret = flb_stop(ctx); + TEST_CHECK_(ret == 0, "stopping engine"); + + if (ctx) { + flb_destroy(ctx); + } +} + +void flb_test_in_tail_dockermode() +{ + do_test("tail", "dockermode", 20000, 3, + NULL); +} + +void flb_test_in_tail_dockermode_splitted_line() +{ + do_test("tail", "dockermode_splitted_line", 20000, 2, + NULL); +} + +void flb_test_in_tail_dockermode_multiple_lines() +{ + do_test("tail", "dockermode_multiple_lines", 20000, 2, + "Docker_Mode_Parser", "docker_multiline", + NULL); +} + +void flb_test_in_tail_dockermode_splitted_multiple_lines() +{ + do_test("tail", "dockermode_splitted_multiple_lines", 20000, 2, + "Docker_Mode_Parser", "docker_multiline", + NULL); +} + +void flb_test_in_tail_dockermode_firstline_detection() +{ + do_test("tail", "dockermode_firstline_detection", 20000, 5, + "Docker_Mode_Parser", "docker_multiline", + NULL); +} + +int write_long_lines(int fd) { + ssize_t ret; + int i; + const char* data = "0123456789abcdef" "0123456789abcdef"; + size_t len = strlen(data); + + for (i=0; i<1024; i++) { + ret = write(fd, data, strlen(data)); + if (ret < 0) { + flb_errno(); + return -1; + } + else if(ret != len) { + write(fd, &data[ret], len-ret); + } + } + + write(fd, "\n", 1); + return 0; +} + +void flb_test_in_tail_skip_long_lines() +{ + int64_t ret; + flb_ctx_t *ctx = NULL; + int in_ffd; + int out_ffd; + char path[PATH_MAX]; + struct tail_test_result result = {0}; + int fd; + + char *target = "skip_long_lines"; + int nExpected = 2; + int nExpectedNotMatched = 0; + int nExpectedLines = 2; + + result.nMatched = 0; + result.target = target; + + struct flb_lib_out_cb cb; + cb.cb = cb_check_result; + cb.data = &result; + + /* initialize */ + set_result(0); + + ctx = flb_create(); + + ret = flb_service_set(ctx, + "Log_Level", "error", + NULL); + TEST_CHECK_(ret == 0, "setting service options"); + + in_ffd = flb_input(ctx, "tail", NULL); + TEST_CHECK(in_ffd >= 0); + TEST_CHECK(flb_input_set(ctx, in_ffd, "tag", "test", NULL) == 0); + + /* Compose path based on target */ + snprintf(path, sizeof(path) - 1, DPATH "/log/%s.log", target); + fd = creat(path, S_IRWXU | S_IRGRP); + TEST_CHECK(fd >= 0); + + /* Write log + ======= + before_long_line + (long line which should be skipped) + after_long_line + ======= + + Output should be "before_long_line" and "after_long_line" + */ + write(fd, "before_long_line\n", strlen("before_long_line\n")); + write_long_lines(fd); + write(fd, "after_long_line\n", strlen("after_long_line\n")); + close(fd); + + TEST_CHECK_(access(path, R_OK) == 0, "accessing log file: %s", path); + + TEST_CHECK(flb_input_set(ctx, in_ffd, + "path" , path, + "read_from_head", "true", + "skip_long_lines", "on", + NULL) == 0); + + out_ffd = flb_output(ctx, (char *) "lib", &cb); + TEST_CHECK(out_ffd >= 0); + TEST_CHECK(flb_output_set(ctx, out_ffd, + "match", "test", + "format", "json", + NULL) == 0); + + TEST_CHECK(flb_service_set(ctx, "Flush", "0.5", + "Grace", "1", + NULL) == 0); + + /* Start test */ + /* Start the engine */ + ret = flb_start(ctx); + TEST_CHECK_(ret == 0, "starting engine"); + + wait_with_timeout(5000, &result, nExpected); + + TEST_CHECK(result.nMatched == nExpected); + TEST_MSG("result.nMatched: %i\nnExpected: %i", result.nMatched, nExpected); + TEST_CHECK(result.nNotMatched == nExpectedNotMatched); + TEST_MSG("result.nNotMatched: %i\nnExpectedNotMatched: %i", result.nNotMatched, nExpectedNotMatched); + TEST_CHECK(result.nLines == nExpectedLines); + TEST_MSG("result.nLines: %i\nnExpectedLines: %i", result.nLines, nExpectedLines); + + ret = flb_stop(ctx); + TEST_CHECK_(ret == 0, "stopping engine"); + + if (ctx) { + flb_destroy(ctx); + } + + unlink(path); +} + +/* + * test case for https://github.com/fluent/fluent-bit/issues/3943 + * + * test to read the lines "CRLF + empty_line + LF" + */ +void flb_test_in_tail_issue_3943() +{ + int64_t ret; + flb_ctx_t *ctx = NULL; + int in_ffd; + int out_ffd; + char path[PATH_MAX]; + struct tail_test_result result = {0}; + + char *target = "3943"; + int nExpected = 2; + int nExpectedNotMatched = 0; + int nExpectedLines = 2; + + result.nMatched = 0; + result.target = target; + + struct flb_lib_out_cb cb; + cb.cb = cb_check_result; + cb.data = &result; + + /* initialize */ + set_result(0); + + ctx = flb_create(); + + ret = flb_service_set(ctx, + "Log_Level", "error", + NULL); + TEST_CHECK_(ret == 0, "setting service options"); + + in_ffd = flb_input(ctx, "tail", NULL); + TEST_CHECK(in_ffd >= 0); + TEST_CHECK(flb_input_set(ctx, in_ffd, "tag", "test", NULL) == 0); + + snprintf(path, sizeof(path) - 1, DPATH "/log/%s.log", target); + TEST_CHECK_(access(path, R_OK) == 0, "accessing log file: %s", path); + + TEST_CHECK(flb_input_set(ctx, in_ffd, + "path" , path, + "read_from_head", "true", + NULL) == 0); + + out_ffd = flb_output(ctx, (char *) "lib", &cb); + TEST_CHECK(out_ffd >= 0); + TEST_CHECK(flb_output_set(ctx, out_ffd, + "match", "test", + "format", "json", + NULL) == 0); + + TEST_CHECK(flb_service_set(ctx, "Flush", "0.5", + "Grace", "1", + NULL) == 0); + + /* Start test */ + /* Start the engine */ + ret = flb_start(ctx); + TEST_CHECK_(ret == 0, "starting engine"); + + wait_with_timeout(3000, &result, nExpected); + + TEST_CHECK(result.nMatched == nExpected); + TEST_MSG("result.nMatched: %i\nnExpected: %i", result.nMatched, nExpected); + TEST_CHECK(result.nNotMatched == nExpectedNotMatched); + TEST_MSG("result.nNotMatched: %i\nnExpectedNotMatched: %i", result.nNotMatched, nExpectedNotMatched); + TEST_CHECK(result.nLines == nExpectedLines); + TEST_MSG("result.nLines: %i\nnExpectedLines: %i", result.nLines, nExpectedLines); + + ret = flb_stop(ctx); + TEST_CHECK_(ret == 0, "stopping engine"); + + if (ctx) { + flb_destroy(ctx); + } +} + +void flb_test_in_tail_multiline_json_and_regex() +{ + int64_t ret; + int in_ffd; + int out_ffd; + int n_expected; + int t_expected; + char *target; + char path[PATH_MAX]; + struct tail_test_result result = {0}; + flb_ctx_t *ctx; + + target = "multiline_001"; + result.nMatched = 0; + result.target = target; + + struct flb_lib_out_cb cb; + cb.cb = cb_check_result; + cb.data = &result; + + /* initialize */ + set_result(0); + + ctx = flb_create(); + + TEST_CHECK(flb_service_set(ctx, "Flush", "0.5", + "Grace", "5", + NULL) == 0); + + ret = flb_service_set(ctx, + "Log_Level", "info", + "Parsers_File", DPATH "/parsers_multiline_json.conf", + NULL); + TEST_CHECK_(ret == 0, "setting service options"); + + in_ffd = flb_input(ctx, (char *) "tail", NULL); + TEST_CHECK(in_ffd >= 0); + TEST_CHECK(flb_input_set(ctx, in_ffd, "tag", "test", NULL) == 0); + + /* Compose path based on target */ + snprintf(path, sizeof(path) - 1, DPATH "/log/%s.log", target); + TEST_CHECK_(access(path, R_OK) == 0, "accessing log file: %s", path); + + TEST_CHECK(flb_input_set(ctx, in_ffd, + "path" , path, + "read_from_head" , "true", + "multiline.parser", "multiline-json-regex", + NULL) == 0); + + + out_ffd = flb_output(ctx, (char *) "lib", &cb); + TEST_CHECK(out_ffd >= 0); + TEST_CHECK(flb_output_set(ctx, out_ffd, + "match", "test", + "format", "json", + NULL) == 0); + + /* Start test */ + /* Start the engine */ + ret = flb_start(ctx); + TEST_CHECK_(ret == 0, "starting engine"); + + /* Expect 1 final record */ + n_expected = 1; + t_expected = 5000; + + /* Poll for up to 5 seconds or until we got a match */ + for (ret = 0; ret < t_expected && result.nMatched < n_expected; ret++) { + usleep(1000); + } + wait_with_timeout(5000, &result, n_expected); + + TEST_CHECK(result.nMatched == n_expected); + TEST_MSG("result.nMatched: %i\nnExpected: %i", result.nMatched, n_expected); + + ret = flb_stop(ctx); + TEST_CHECK_(ret == 0, "stopping engine"); + + if (ctx) { + flb_destroy(ctx); + } +} + +void flb_test_path_comma() +{ + struct flb_lib_out_cb cb_data; + struct test_tail_ctx *ctx; + char *file[] = {"a.log", "b.log", "c.log", "d.log"}; + char *path = "a.log, b.log, c.log, d.log"; + char *msg = "hello world"; + int ret; + int num; + int unused; + + clear_output_num(); + + cb_data.cb = cb_count_msgpack; + cb_data.data = &unused; + + ctx = test_tail_ctx_create(&cb_data, &file[0], sizeof(file)/sizeof(char*), FLB_TRUE); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->o_ffd, + "path", path, + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + ret = write_msg(ctx, msg, strlen(msg)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + /* waiting to flush */ + flb_time_msleep(500); + + num = get_output_num(); + if (!TEST_CHECK(num == sizeof(file)/sizeof(char*))) { + TEST_MSG("output num error. expect=%lu got=%d", sizeof(file)/sizeof(char*), num); + } + + test_tail_ctx_destroy(ctx); +} + +void flb_test_path_key() +{ + struct flb_lib_out_cb cb_data; + struct test_tail_ctx *ctx; + char *file[] = {"path_key.log"}; + char *path_key = "path_key_is"; + char *msg = "hello world"; + int ret; + int num; + + char *expected_strs[] = {path_key, msg, file[0]}; + struct str_list expected = { + .size = sizeof(expected_strs)/sizeof(char*), + .lists = &expected_strs[0], + }; + + clear_output_num(); + + cb_data.cb = cb_check_json_str_list; + cb_data.data = &expected; + + ctx = test_tail_ctx_create(&cb_data, &file[0], sizeof(file)/sizeof(char*), FLB_TRUE); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->o_ffd, + "path", file[0], + "path_key", path_key, + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + ret = write_msg(ctx, msg, strlen(msg)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + /* waiting to flush */ + flb_time_msleep(500); + + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no outputs"); + } + + test_tail_ctx_destroy(ctx); +} + +void flb_test_exclude_path() +{ + struct flb_lib_out_cb cb_data; + struct test_tail_ctx *ctx; + char *exclude_path = "ep_ignore*.txt"; + char *path = "ep_*.txt"; + char *file[] = {"ep_ignore_1.txt", "ep_ignore_2.txt", "ep_file1.txt", "ep_file2.txt", "ep_file3.txt"}; + char *msg = "hello world"; + int unused; + int ret; + int num; + + clear_output_num(); + + cb_data.cb = cb_count_msgpack; + cb_data.data = &unused; + + ctx = test_tail_ctx_create(&cb_data, &file[0], sizeof(file)/sizeof(char*), FLB_TRUE); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->o_ffd, + "path", path, + "exclude_path", exclude_path, + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + ret = write_msg(ctx, msg, strlen(msg)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + /* waiting to flush */ + flb_time_msleep(500); + + num = get_output_num(); + if (!TEST_CHECK(num == 3 /* 3files. "ep_file1.txt", "ep_file2.txt", "ep_file3.txt" */)) { + TEST_MSG("output num error. expect=3 got=%d", num); + } + + test_tail_ctx_destroy(ctx); +} + +void flb_test_offset_key() +{ + struct flb_lib_out_cb cb_data; + struct test_tail_ctx *ctx; + char *file[] = {"offset_key.log"}; + char *offset_key = "OffsetKey"; + char *msg_before_tail = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + char *msg_after_tail = "test test"; + char expected_msg[1024] = {0}; + int ret; + int num; + + char *expected_strs[] = {msg_after_tail, &expected_msg[0]}; + struct str_list expected = { + .size = sizeof(expected_strs)/sizeof(char*), + .lists = &expected_strs[0], + }; + + clear_output_num(); + + cb_data.cb = cb_check_json_str_list; + cb_data.data = &expected; + + ret = snprintf(&expected_msg[0], sizeof(expected_msg), "\"%s\":%ld", offset_key, strlen(msg_before_tail)+strlen(NEW_LINE)); + if(!TEST_CHECK(ret >= 0)) { + TEST_MSG("snprintf failed"); + exit(EXIT_FAILURE); + } + + + ctx = test_tail_ctx_create(&cb_data, &file[0], sizeof(file)/sizeof(char *), FLB_TRUE); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->o_ffd, + "path", file[0], + "offset_key", offset_key, + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = write_msg(ctx, msg_before_tail, strlen(msg_before_tail)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + ret = write_msg(ctx, msg_after_tail, strlen(msg_after_tail)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + /* waiting to flush */ + flb_time_msleep(500); + + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no outputs"); + } + + test_tail_ctx_destroy(ctx); +} + +void flb_test_skip_empty_lines() +{ + struct flb_lib_out_cb cb_data; + struct test_tail_ctx *ctx; + char *file[] = {"skip_empty_lines.log"}; + char *empty_lines[] = {NEW_LINE, NEW_LINE}; + char *msg = "lalala"; + int ret; + int num; + int i; + + char *expected_strs[] = {msg}; + struct str_list expected = { + .size = sizeof(expected_strs)/sizeof(char*), + .lists = &expected_strs[0], + }; + + clear_output_num(); + + cb_data.cb = cb_check_json_str_list; + cb_data.data = &expected; + + ctx = test_tail_ctx_create(&cb_data, &file[0], sizeof(file)/sizeof(char *), FLB_TRUE); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->o_ffd, + "path", file[0], + "skip_empty_lines", "true", + "Read_From_Head", "true", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = write_msg(ctx, msg, strlen(msg)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + for (i=0; i<sizeof(empty_lines)/sizeof(char*); i++) { + ret = write_msg(ctx, empty_lines[i], strlen(empty_lines[i])); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + } + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* waiting to flush */ + flb_time_msleep(500); + + num = get_output_num(); + if (!TEST_CHECK(num == 1)) { + TEST_MSG("output error: expect=1 got=%d", num); + } + + test_tail_ctx_destroy(ctx); +} + +static int ignore_older(int expected, char *ignore_older) +{ + struct flb_lib_out_cb cb_data; + struct test_tail_ctx *ctx; + struct timespec times[2]; + struct flb_time tm; + char *file[] = {"time_now.log", "time_30m.log", "time_3h.log", "time_3d.log"}; + char *path = "time_*.log"; + char *msg = "hello world"; + int ret; + int num; + int unused; + + clear_output_num(); + + cb_data.cb = cb_count_msgpack; + cb_data.data = &unused; + + ctx = test_tail_ctx_create(&cb_data, &file[0], sizeof(file)/sizeof(char *), FLB_TRUE); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + return -1; + } + + times[0].tv_nsec = 0; + times[1].tv_nsec = 0; + + flb_time_get(&tm); + times[0].tv_sec = tm.tm.tv_sec - 3 * 24 * 60 * 60; + times[1].tv_sec = tm.tm.tv_sec - 3 * 24 * 60 * 60; + ret = utimensat(AT_FDCWD, file[3], times, 0); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("utimensat failed. errno=%d file=%s", errno, file[3]); + return -1; + } + + times[0].tv_sec = tm.tm.tv_sec - 3 * 60 * 60; + times[1].tv_sec = tm.tm.tv_sec - 3 * 60 * 60; + ret = utimensat(AT_FDCWD, file[2], times, 0); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("utimensat failed. errno=%d file=%s", errno, file[2]); + return -1; + } + + times[0].tv_sec = tm.tm.tv_sec - 30 * 60; + times[1].tv_sec = tm.tm.tv_sec - 30 * 60; + ret = utimensat(AT_FDCWD, file[1], times, 0); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("utimensat failed. errno=%d file=%s", errno, file[1]); + return -1; + } + + ret = flb_input_set(ctx->flb, ctx->o_ffd, + "path", path, + "ignore_older", ignore_older, + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + ret = write_msg(ctx, msg, strlen(msg)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + return -1; + } + + /* waiting to flush */ + flb_time_msleep(500); + + num = get_output_num(); + if (!TEST_CHECK(num == expected)) { + TEST_MSG("output num error. expect=%d got=%d", expected, num); + return -1; + } + + test_tail_ctx_destroy(ctx); + return 0; +} + +void flb_test_ignore_older() +{ + int ret; + char *ignore_olders[] = {"10m", "40m", "4h", "4d"}; + int expecteds[] = {1/*10m*/, 2/*10m, 40m*/, 3/*10m, 40m, 4h*/, 4 /*all*/}; + int i; + + TEST_CHECK(sizeof(ignore_olders)/sizeof(char*) == sizeof(expecteds)/sizeof(int)); + + for (i=0; i<sizeof(expecteds)/sizeof(int); i++) { + ret = ignore_older(expecteds[i], ignore_olders[i]); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("case %d failed. ignore_older=%s", i, ignore_olders[i]); + exit(EXIT_FAILURE); + } + } +} + +void flb_test_inotify_watcher_false() +{ + struct flb_lib_out_cb cb_data; + struct test_tail_ctx *ctx; + char *file[] = {"inotify_watcher_false.log"}; + char *msg = "hello world"; + int ret; + int num; + + char *expected_strs[] = {msg}; + struct str_list expected = { + .size = sizeof(expected_strs)/sizeof(char*), + .lists = &expected_strs[0], + }; + + clear_output_num(); + + cb_data.cb = cb_check_json_str_list; + cb_data.data = &expected; + + ctx = test_tail_ctx_create(&cb_data, &file[0], sizeof(file)/sizeof(char *), FLB_TRUE); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->o_ffd, + "path", file[0], + "inotify_watcher", "false", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + ret = write_msg(ctx, msg, strlen(msg)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + /* waiting to flush */ + flb_time_msleep(1500); + + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no output"); + } + + test_tail_ctx_destroy(ctx); +} + +#ifdef FLB_HAVE_REGEX +void flb_test_parser() +{ + struct flb_lib_out_cb cb_data; + struct test_tail_ctx *ctx; + char *file[] = {"parser.log"}; + /* https://httpd.apache.org/docs/2.4/en/logs.html */ + char *msg = "127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] \"GET /apache_pb.gif HTTP/1.0\" 200 2326"; + int ret; + int num; + + char *expected_strs[] = {"\"method\":\"GET\"", "\"host\":\"127.0.0.1\"","\"user\":\"frank\"", + "\"path\":\"/apache_pb.gif\"","\"code\":\"200\"","\"size\":\"2326\""}; + struct str_list expected = { + .size = sizeof(expected_strs)/sizeof(char*), + .lists = &expected_strs[0], + }; + + clear_output_num(); + + cb_data.cb = cb_check_json_str_list; + cb_data.data = &expected; + + ctx = test_tail_ctx_create(&cb_data, &file[0], sizeof(file)/sizeof(char *), FLB_TRUE); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->o_ffd, + "path", file[0], + "parser", "apache2", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + ret = write_msg(ctx, msg, strlen(msg)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + /* waiting to flush */ + flb_time_msleep(500); + + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no outputs"); + } + + test_tail_ctx_destroy(ctx); +} + +void flb_test_tag_regex() +{ + struct flb_lib_out_cb cb_data; + struct test_tail_ctx *ctx; + char *file[] = {"aa_bb_cc.log"}; + char *tag_regex = "(?<first>[a-z]+)_(?<second>[a-z]+)_(?<third>[a-z]+)\\.log"; + char *tag = "<first>.<second>.<third>"; /* tag will be "aa.bb.cc" */ + char *msg = "hello world"; + int ret; + int num; + + char *expected_strs[] = {msg}; + struct str_list expected = { + .size = sizeof(expected_strs)/sizeof(char*), + .lists = &expected_strs[0], + }; + + clear_output_num(); + + cb_data.cb = cb_check_json_str_list; + cb_data.data = &expected; + + ctx = test_tail_ctx_create(&cb_data, &file[0], sizeof(file)/sizeof(char *), FLB_TRUE); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->o_ffd, + "path", file[0], + "tag", tag, + "tag_regex", tag_regex, + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "aa.bb.cc", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + ret = write_msg(ctx, msg, strlen(msg)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + /* waiting to flush */ + flb_time_msleep(500); + + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no outputs"); + } + + test_tail_ctx_destroy(ctx); +} +#endif /* FLB_HAVE_REGEX */ + +#ifdef FLB_HAVE_SQLDB +void flb_test_db() +{ + struct flb_lib_out_cb cb_data; + struct test_tail_ctx *ctx; + char *file[] = {"test_db.log"}; + char *db = "test_db.db"; + char *msg_init = "hello world"; + char *msg = "hello db"; + char *msg_end = "hello db end"; + int i; + int ret; + int num; + int unused; + + unlink(db); + + clear_output_num(); + + cb_data.cb = cb_count_msgpack; + cb_data.data = &unused; + + ctx = test_tail_ctx_create(&cb_data, &file[0], sizeof(file)/sizeof(char *), FLB_FALSE); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->o_ffd, + "path", file[0], + "db", db, + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + ret = write_msg(ctx, msg_init, strlen(msg_init)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + unlink(db); + exit(EXIT_FAILURE); + } + + /* waiting to flush */ + flb_time_msleep(500); + + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no output"); + } + + if (ctx->fds != NULL) { + for (i=0; i<ctx->fd_num; i++) { + close(ctx->fds[i]); + } + flb_free(ctx->fds); + } + flb_stop(ctx->flb); + flb_destroy(ctx->flb); + flb_free(ctx); + + /* re-init to use db */ + clear_output_num(); + + cb_data.cb = cb_count_msgpack; + cb_data.data = &unused; + + ctx = test_tail_ctx_create(&cb_data, &file[0], sizeof(file)/sizeof(char *), FLB_FALSE); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + unlink(db); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->o_ffd, + "path", file[0], + "db", db, + "db.sync", "full", + NULL); + TEST_CHECK(ret == 0); + + ret = write_msg(ctx, msg, strlen(msg)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + unlink(db); + exit(EXIT_FAILURE); + } + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* waiting to flush */ + flb_time_msleep(500); + + ret = write_msg(ctx, msg_end, strlen(msg_end)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + unlink(db); + exit(EXIT_FAILURE); + } + + /* waiting to flush */ + flb_time_msleep(500); + + num = get_output_num(); + if (!TEST_CHECK(num == 2)) { + /* 2 = msg + msg_end */ + TEST_MSG("num error. expect=2 got=%d", num); + } + + test_tail_ctx_destroy(ctx); + unlink(db); +} +#endif /* FLB_HAVE_SQLDB */ + +/* Test list */ +TEST_LIST = { + {"issue_3943", flb_test_in_tail_issue_3943}, + /* Properties */ + {"skip_long_lines", flb_test_in_tail_skip_long_lines}, + {"path_comma", flb_test_path_comma}, + {"path_key", flb_test_path_key}, + {"exclude_path", flb_test_exclude_path}, + {"offset_key", flb_test_offset_key}, + {"skip_empty_lines", flb_test_skip_empty_lines}, + {"ignore_older", flb_test_ignore_older}, +#ifdef FLB_HAVE_INOTIFY + {"inotify_watcher_false", flb_test_inotify_watcher_false}, +#endif /* FLB_HAVE_INOTIFY */ + +#ifdef FLB_HAVE_REGEX + {"parser", flb_test_parser}, + {"tag_regex", flb_test_tag_regex}, +#endif /* FLB_HAVE_INOTIFY */ + +#ifdef FLB_HAVE_SQLDB + {"db", flb_test_db}, +#endif + +#ifdef in_tail + {"in_tail_dockermode", flb_test_in_tail_dockermode}, + {"in_tail_dockermode_splitted_line", flb_test_in_tail_dockermode_splitted_line}, + {"in_tail_dockermode_multiple_lines", flb_test_in_tail_dockermode_multiple_lines}, + {"in_tail_dockermode_splitted_multiple_lines", flb_test_in_tail_dockermode_splitted_multiple_lines}, + {"in_tail_dockermode_firstline_detection", flb_test_in_tail_dockermode_firstline_detection}, + {"in_tail_multiline_json_and_regex", flb_test_in_tail_multiline_json_and_regex}, +#endif + {NULL, NULL} +}; |