summaryrefslogtreecommitdiffstats
path: root/fluent-bit/tests/internal/stream_processor.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/tests/internal/stream_processor.c')
-rw-r--r--fluent-bit/tests/internal/stream_processor.c934
1 files changed, 934 insertions, 0 deletions
diff --git a/fluent-bit/tests/internal/stream_processor.c b/fluent-bit/tests/internal/stream_processor.c
new file mode 100644
index 00000000..9afda0bf
--- /dev/null
+++ b/fluent-bit/tests/internal/stream_processor.c
@@ -0,0 +1,934 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2019-2020 The Fluent Bit Authors
+ * Copyright (C) 2015-2018 Treasure Data Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit.h>
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_error.h>
+#include <fluent-bit/flb_router.h>
+#include <fluent-bit/flb_storage.h>
+#include <fluent-bit/stream_processor/flb_sp.h>
+#include <fluent-bit/stream_processor/flb_sp_parser.h>
+#include <fluent-bit/stream_processor/flb_sp_stream.h>
+#include <fluent-bit/stream_processor/flb_sp_window.h>
+#include <msgpack.h>
+
+#include "flb_tests_internal.h"
+#include "include/sp_invalid_queries.h"
+#include "include/sp_select_keys.h"
+#include "include/sp_select_subkeys.h"
+#include "include/sp_window.h"
+#include "include/sp_snapshot.h"
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#ifdef _WIN32
+#include <fluent-bit/flb_compat.h>
+#else
+#include <unistd.h>
+#endif
+
+#define DATA_SAMPLES \
+ FLB_TESTS_DATA_PATH "/data/stream_processor/samples.mp"
+
+#define DATA_SAMPLES_SUBKEYS \
+ FLB_TESTS_DATA_PATH "/data/stream_processor/samples-subkeys.mp"
+
+#define DATA_SAMPLES_HOPPING_WINDOW_PATH \
+ FLB_TESTS_DATA_PATH "/data/stream_processor/samples-hw/"
+
+#define MP_UOK MSGPACK_UNPACK_SUCCESS
+
+int flb_sp_fd_event_test(int fd, struct flb_sp_task *task, struct sp_buffer *out_buf)
+{
+ char *tag = NULL;
+ int tag_len = 0;
+
+ if (task->window.type != FLB_SP_WINDOW_DEFAULT) {
+ if (fd == task->window.fd) {
+ if (task->window.records > 0) {
+ /* find input tag from task source */
+ package_results(tag, tag_len, &out_buf->buffer, &out_buf->size, task);
+ if (task->stream) {
+ flb_sp_stream_append_data(out_buf->buffer, out_buf->size, task->stream);
+ }
+ else {
+ flb_pack_print(out_buf->buffer, out_buf->size);
+ }
+ }
+
+ flb_sp_window_prune(task);
+ }
+ else if (fd == task->window.fd_hop) {
+ sp_process_hopping_slot(tag, tag_len, task);
+ }
+ }
+
+ return 0;
+}
+
+/*
+ * Do data processing for internal unit tests, no engine required, set
+ * results on out_data/out_size variables.
+ */
+int flb_sp_do_test(struct flb_sp *sp, struct flb_sp_task *task,
+ const char *tag, int tag_len,
+ struct sp_buffer *data_buf, struct sp_buffer *out_buf)
+{
+ int ret;
+ int records;
+ struct flb_sp_cmd *cmd;
+
+ cmd = task->cmd;
+ if (cmd->source_type == FLB_SP_TAG) {
+ ret = flb_router_match(tag, tag_len, cmd->source_name, NULL);
+ if (ret == FLB_FALSE) {
+ out_buf->buffer = NULL;
+ out_buf->size = 0;
+ return 0;
+ }
+ }
+
+ if (task->aggregate_keys == FLB_TRUE) {
+ ret = sp_process_data_aggr(data_buf->buffer, data_buf->size,
+ tag, tag_len,
+ task, sp, FLB_TRUE);
+ if (ret == -1) {
+ flb_error("[sp] error error processing records for '%s'",
+ task->name);
+ return -1;
+ }
+
+ if (flb_sp_window_populate(task, data_buf->buffer, data_buf->size) == -1) {
+ flb_error("[sp] error populating window for '%s'",
+ task->name);
+ return -1;
+ }
+ if (task->window.type == FLB_SP_WINDOW_DEFAULT || task->window.type == FLB_SP_WINDOW_TUMBLING) {
+ package_results(tag, tag_len, &out_buf->buffer, &out_buf->size, task);
+ }
+
+ records = task->window.records;
+ }
+ else {
+ ret = sp_process_data(tag, tag_len,
+ data_buf->buffer, data_buf->size,
+ &out_buf->buffer, &out_buf->size,
+ task, sp);
+ if (ret == -1) {
+ flb_error("[sp] error processing records for '%s'",
+ task->name);
+ return -1;
+ }
+ records = ret;
+ }
+
+ if (records == 0) {
+ out_buf->buffer = NULL;
+ out_buf->size = 0;
+ return 0;
+ }
+
+ return 0;
+}
+
+/* this function reads the content of a file containing MessagePack data
+ into an input buffer
+*/
+static int file_to_buf(char *path, struct sp_buffer *out_buf)
+{
+ char *buf;
+ int ret;
+ long bytes;
+ FILE *fp;
+ struct stat st;
+
+ ret = stat(path, &st);
+ if (ret == -1) {
+ return -1;
+ }
+
+ fp = fopen(path, "r");
+ if (!fp) {
+ return -1;
+ }
+
+ buf = flb_malloc(st.st_size);
+ if (!buf) {
+ flb_errno();
+ fclose(fp);
+ return -1;
+ }
+
+ bytes = fread(buf, st.st_size, 1, fp);
+ if (bytes != 1) {
+ flb_errno();
+ flb_free(buf);
+ fclose(fp);
+ return -1;
+ }
+
+ fclose(fp);
+ out_buf->buffer = buf;
+ out_buf->size = st.st_size;
+
+ return 0;
+}
+
+static void invalid_queries()
+{
+ int i;
+ int checks;
+ struct flb_config *config;
+ struct flb_sp *sp;
+ struct flb_sp_task *task;
+
+ flb_init_env();
+
+ /* Total number of checks for invalid queries */
+ checks = sizeof(invalid_query_checks) / sizeof(char *);
+
+ config = flb_calloc(1, sizeof(struct flb_config));
+ if (!config) {
+ flb_errno();
+ return;
+ }
+
+ mk_list_init(&config->inputs);
+ mk_list_init(&config->stream_processor_tasks);
+
+ /* Create a stream processor context */
+ sp = flb_sp_create(config);
+ if (!sp) {
+ flb_error("[sp test] cannot create stream processor context");
+ flb_free(config);
+ return;
+ }
+
+ for (i = 0; i < checks; i++) {
+ task = flb_sp_task_create(sp, "invalid_query", invalid_query_checks[i]);
+ TEST_CHECK(task == NULL);
+ }
+
+ flb_sp_destroy(sp);
+ flb_free(config);
+}
+
+static void test_select_keys()
+{
+ int i;
+ int checks;
+ int ret;
+ struct sp_buffer data_buf;
+ struct sp_buffer out_buf;
+ struct flb_config *config;
+ struct flb_sp *sp;
+ struct task_check *check;
+ struct flb_sp_task *task;
+#ifdef _WIN32
+ WSADATA wsa_data;
+#endif
+
+ flb_init_env();
+
+ config = flb_calloc(1, sizeof(struct flb_config));
+ if (!config) {
+ flb_errno();
+ return;
+ }
+#ifdef _WIN32
+ WSAStartup(0x0201, &wsa_data);
+#endif
+ mk_list_init(&config->inputs);
+ mk_list_init(&config->stream_processor_tasks);
+
+ /* Create event loop */
+ config->evl = mk_event_loop_create(256);
+
+ /* Create a stream processor context */
+ sp = flb_sp_create(config);
+ if (!sp) {
+ flb_error("[sp test] cannot create stream processor context");
+ flb_free(config);
+ return;
+ }
+
+ ret = file_to_buf(DATA_SAMPLES, &data_buf);
+ if (ret == -1) {
+ flb_error("[sp test] cannot open DATA_SAMPLES file %s", DATA_SAMPLES);
+ flb_free(config);
+ return;
+ }
+
+ /* Total number of checks for select_keys */
+ checks = (sizeof(select_keys_checks) / sizeof(struct task_check));
+
+ /* Run every test */
+ for (i = 0; i < checks; i++) {
+ check = (struct task_check *) &select_keys_checks[i];
+
+ task = flb_sp_task_create(sp, check->name, check->exec);
+ if (!task) {
+ flb_error("[sp test] wrong check '%s', fix it!", check->name);
+ continue;
+ }
+
+ out_buf.buffer = NULL;
+
+ ret = flb_sp_do_test(sp, task,
+ "samples", strlen("samples"),
+ &data_buf, &out_buf);
+ if (ret == -1) {
+ flb_error("[sp test] error processing check '%s'", check->name);
+ flb_sp_task_destroy(task);
+ continue;
+ }
+
+ /* */
+ flb_sp_fd_event_test(task->window.fd, task, &out_buf);
+
+ flb_info("[sp test] id=%i, SQL => '%s'", check->id, check->exec);
+ check->cb_check(check->id, check, out_buf.buffer, out_buf.size);
+ flb_pack_print(out_buf.buffer, out_buf.size);
+ flb_free(out_buf.buffer);
+ }
+
+ flb_free(data_buf.buffer);
+ flb_sp_destroy(sp);
+ mk_event_loop_destroy(config->evl);
+ flb_free(config);
+#ifdef _WIN32
+ WSACleanup();
+#endif
+}
+
+static void test_select_subkeys()
+{
+ int i;
+ int checks;
+ int ret;
+ struct sp_buffer out_buf;
+ struct sp_buffer data_buf;
+ struct task_check *check;
+ struct flb_config *config;
+ struct flb_sp *sp;
+ struct flb_sp_task *task;
+#ifdef _WIN32
+ WSADATA wsa_data;
+#endif
+
+ flb_init_env();
+
+ config = flb_calloc(1, sizeof(struct flb_config));
+ if (!config) {
+ flb_errno();
+ return;
+ }
+#ifdef _WIN32
+ WSAStartup(0x0201, &wsa_data);
+#endif
+ mk_list_init(&config->inputs);
+ mk_list_init(&config->stream_processor_tasks);
+
+ config->evl = mk_event_loop_create(256);
+
+ sp = flb_sp_create(config);
+ if (!sp) {
+ flb_error("[sp test] cannot create stream processor context");
+ flb_free(config);
+ return;
+ }
+
+ ret = file_to_buf(DATA_SAMPLES_SUBKEYS, &data_buf);
+ if (ret == -1) {
+ flb_error("[sp test] cannot open DATA_SAMPLES file %s",
+ DATA_SAMPLES_SUBKEYS);
+ flb_free(config);
+ return;
+ }
+
+ /* Total number of checks for select_subkeys */
+ checks = (sizeof(select_subkeys_checks) / sizeof(struct task_check));
+
+ /* Run every test */
+ for (i = 0; i < checks; i++) {
+ check = (struct task_check *) &select_subkeys_checks[i];
+
+ task = flb_sp_task_create(sp, check->name, check->exec);
+ if (!task) {
+ flb_error("[sp test] wrong check '%s', fix it!", check->name);
+ continue;
+ }
+
+ out_buf.buffer = NULL;
+ out_buf.size = 0;
+
+ ret = flb_sp_do_test(sp, task,
+ "samples", strlen("samples"),
+ &data_buf, &out_buf);
+ if (ret == -1) {
+ flb_error("[sp test] error processing check '%s'", check->name);
+ flb_sp_task_destroy(task);
+ continue;
+ }
+
+ flb_sp_fd_event_test(task->window.fd, task, &out_buf);
+
+ flb_info("[sp test] id=%i, SQL => '%s'", check->id, check->exec);
+ check->cb_check(check->id, check, out_buf.buffer, out_buf.size);
+ flb_pack_print(out_buf.buffer, out_buf.size);
+ flb_free(out_buf.buffer);
+ }
+
+ flb_free(data_buf.buffer);
+ flb_sp_destroy(sp);
+ mk_event_loop_destroy(config->evl);
+ flb_free(config);
+#ifdef _WIN32
+ WSACleanup();
+#endif
+}
+
+void set_record_timestamps(struct sp_buffer *data_buf, double *record_timestamp)
+{
+ /* unpacker variables */
+ int ok;
+ size_t off = 0;
+ msgpack_object root;
+ msgpack_object map;
+ msgpack_unpacked result;
+ struct flb_time tm;
+
+ /* packer variables */
+ msgpack_sbuffer mp_sbuf;
+ msgpack_packer mp_pck;
+
+ ok = MSGPACK_UNPACK_SUCCESS;
+ msgpack_unpacked_init(&result);
+
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ /* Iterate incoming records */
+ while (msgpack_unpack_next(&result, data_buf->buffer, data_buf->size, &off) == ok) {
+ root = result.data;
+
+ map = root.via.array.ptr[1];
+
+ msgpack_pack_array(&mp_pck, 2);
+ flb_time_set(&tm, *record_timestamp, 0);
+ flb_time_append_to_msgpack(&tm, &mp_pck, 0);
+ msgpack_pack_object(&mp_pck, map);
+
+ *record_timestamp = *record_timestamp + 1;
+ }
+
+ msgpack_unpacked_destroy(&result);
+ flb_free(data_buf->buffer);
+
+ data_buf->buffer = mp_sbuf.data;
+ data_buf->size = mp_sbuf.size;
+}
+
+static void test_window()
+{
+ int i;
+ int t;
+ int checks;
+ int ret;
+ char datafile[PATH_MAX];
+ struct sp_buffer data_buf;
+ struct sp_buffer out_buf;
+ struct task_check *check;
+ struct flb_config *config;
+ struct flb_sp *sp;
+ struct flb_sp_task *task;
+#ifdef _WIN32
+ WSADATA wsa_data;
+#endif
+
+ flb_init_env();
+
+ config = flb_calloc(1, sizeof(struct flb_config));
+ if (!config) {
+ flb_errno();
+ return;
+ }
+#ifdef _WIN32
+ WSAStartup(0x0201, &wsa_data);
+#endif
+ mk_list_init(&config->inputs);
+ mk_list_init(&config->stream_processor_tasks);
+ config->evl = mk_event_loop_create(256);
+
+ sp = flb_sp_create(config);
+ if (!sp) {
+ flb_error("[sp test] cannot create stream processor context");
+ flb_free(config);
+ return;
+ }
+
+ /* Total number of checks for select_keys */
+ checks = (sizeof(window_checks) / sizeof(struct task_check));
+
+ /* Run every test */
+ for (i = 0; i < checks; i++) {
+ check = (struct task_check *) &window_checks[i];
+
+ task = flb_sp_task_create(sp, check->name, check->exec);
+ TEST_CHECK(task != NULL);
+
+ out_buf.buffer = NULL;
+ out_buf.size = 0;
+
+ double record_timestamp = 1.0;
+ if (check->window_type == FLB_SP_WINDOW_TUMBLING) {
+ ret = file_to_buf(DATA_SAMPLES, &data_buf);
+ if (ret == -1) {
+ flb_error("[sp test] cannot open DATA_SAMPLES file %s", DATA_SAMPLES);
+ flb_free(config);
+ return;
+ }
+
+ set_record_timestamps(&data_buf, &record_timestamp);
+
+ /* We ingest the buffer every second */
+ for (t = 0; t < check->window_size_sec; t++) {
+ if (out_buf.buffer != NULL) {
+ flb_free(out_buf.buffer);
+
+ out_buf.buffer = NULL;
+ out_buf.size = 0;
+ }
+
+ ret = flb_sp_do_test(sp, task,
+ "samples", strlen("samples"),
+ &data_buf, &out_buf);
+ if (ret == -1) {
+ flb_error("[sp test] error processing check '%s'",
+ check->name);
+ flb_sp_task_destroy(task);
+ return;
+ }
+
+ /* Sleep for 0.8 seconds, give some delta to the engine */
+ usleep(800000);
+ }
+
+ if (out_buf.buffer != NULL) {
+ flb_free(out_buf.buffer);
+
+ out_buf.buffer = NULL;
+ out_buf.size = 0;
+ }
+
+ flb_sp_fd_event_test(task->window.fd, task, &out_buf);
+
+ flb_info("[sp test] id=%i, SQL => '%s'", check->id, check->exec);
+ check->cb_check(check->id, check, out_buf.buffer, out_buf.size);
+ flb_pack_print(out_buf.buffer, out_buf.size);
+ flb_free(out_buf.buffer);
+ }
+ else if (check->window_type == FLB_SP_WINDOW_HOPPING) {
+ /* Ingest the buffer every second */
+ task->window.fd = 0;
+ task->window.fd_hop = 1;
+ double record_timestamp = 1.0;
+ for (t = 0; t < check->window_size_sec + check->window_hop_sec; t++) {
+ ret = snprintf(datafile, sizeof(datafile)-1, "%s%d.mp",
+ DATA_SAMPLES_HOPPING_WINDOW_PATH, t + 1);
+ if (!TEST_CHECK(ret <= sizeof(datafile)-1)) {
+ exit(1);
+ }
+ ret = file_to_buf(datafile, &data_buf);
+ if (ret == -1) {
+ flb_error("[sp test] cannot open DATA_SAMPLES file %s", datafile);
+ flb_free(config);
+ return;
+ }
+
+ /* Replace record timestamps with test timestamps */
+ set_record_timestamps(&data_buf, &record_timestamp);
+
+ ret = flb_sp_do_test(sp, task,
+ "samples", strlen("samples"),
+ &data_buf, &out_buf);
+ if (ret == -1) {
+ flb_error("[sp test] error processing check '%s'",
+ check->name);
+ flb_sp_task_destroy(task);
+ return;
+ }
+
+ /* Sleep for 0.8 seconds, give some delta to the engine */
+ usleep(800000);
+
+ /* Hopping event */
+ if ((t + 1) % check->window_hop_sec == 0) {
+ if (out_buf.buffer != NULL) {
+ flb_free(out_buf.buffer);
+
+ out_buf.buffer = NULL;
+ out_buf.size = 0;
+ }
+
+ flb_sp_fd_event_test(task->window.fd_hop, task, &out_buf);
+ }
+
+ /* Window event */
+ if ((t + 1) % check->window_size_sec == 0 ||
+ (t + 1 > check->window_size_sec && (t + 1 - check->window_size_sec) % check->window_hop_sec == 0)) {
+ if (out_buf.buffer != NULL) {
+ flb_free(out_buf.buffer);
+
+ out_buf.buffer = NULL;
+ out_buf.size = 0;
+ }
+
+ flb_sp_fd_event_test(task->window.fd, task, &out_buf);
+ }
+ flb_free(data_buf.buffer);
+ data_buf.buffer = NULL;
+ }
+
+ flb_info("[sp test] id=%i, SQL => '%s'", check->id, check->exec);
+ check->cb_check(check->id, check, out_buf.buffer, out_buf.size);
+ flb_pack_print(out_buf.buffer, out_buf.size);
+ flb_free(out_buf.buffer);
+ }
+
+ flb_free(data_buf.buffer);
+ }
+
+ flb_sp_destroy(sp);
+ mk_event_loop_destroy(config->evl);
+ flb_free(config);
+#ifdef _WIN32
+ WSACleanup();
+#endif
+}
+
+static void test_snapshot()
+{
+ int i;
+ int t;
+ int checks;
+ int ret;
+ char datafile[PATH_MAX];
+ char stream_name[100];
+ char window_val[3];
+ struct sp_buffer data_buf;
+ struct sp_buffer out_buf;
+ struct task_check *check;
+ struct task_check *check_flush;
+ struct flb_config *config;
+ struct flb_sp *sp;
+ struct flb_sp_task *task;
+ struct flb_sp_task *task_flush;
+
+#ifdef _WIN32
+ WSADATA wsa_data;
+#endif
+
+ flb_init_env();
+
+ config = flb_calloc(1, sizeof(struct flb_config));
+ if (!config) {
+ flb_errno();
+ return;
+ }
+#ifdef _WIN32
+ WSAStartup(0x0201, &wsa_data);
+#endif
+ mk_list_init(&config->inputs);
+ mk_list_init(&config->stream_processor_tasks);
+ config->evl = mk_event_loop_create(256);
+
+ sp = flb_sp_create(config);
+ if (!sp) {
+ flb_error("[sp test] cannot create stream processor context");
+ flb_free(config);
+ return;
+ }
+
+ ret = file_to_buf(DATA_SAMPLES, &data_buf);
+ if (ret == -1) {
+ flb_error("[sp test] cannot open DATA_SAMPLES file %s", DATA_SAMPLES);
+ flb_free(config);
+ return;
+ }
+
+ /* Total number of checks for select_keys */
+ checks = (sizeof(snapshot_checks) / (sizeof(struct task_check) * 2));
+
+ /* Run every test */
+ for (i = 0; i < checks; i++) {
+ /* Snapshot Create */
+ check = (struct task_check *) &snapshot_checks[i][0];
+
+ task = flb_sp_task_create(sp, check->name, check->exec);
+ if (!task) {
+ flb_error("[sp test] wrong check '%s', fix it!", check->name);
+ continue;
+ }
+
+ snprintf(stream_name, 100, "%s-%d", "SNAPSHOT", i);
+ task->cmd->stream_name = flb_sds_create(stream_name);
+ task->cmd->type = FLB_SP_CREATE_SNAPSHOT;
+ if (check->window_size_sec > 0) {
+ snprintf(window_val, 3, "%d", check->window_size_sec);
+ flb_sp_cmd_stream_prop_add(task->cmd, "seconds", window_val);
+ }
+
+ if (flb_sp_snapshot_create(task) == -1) {
+ flb_error("[sp test] error initializing snapshot for check '%s'!", check->name);
+ continue;
+ }
+
+ out_buf.buffer = NULL;
+ out_buf.size = 0;
+
+ /* Read 1.mp -> 5.mp message pack buffers created for window tests */
+ for (t = 0; t < 5; t++) {
+
+ ret = snprintf(datafile, sizeof(datafile)-1, "%s%d.mp",
+ DATA_SAMPLES_HOPPING_WINDOW_PATH, t + 1);
+ if (!TEST_CHECK(ret <= sizeof(datafile)-1)) {
+ exit(1);
+ }
+
+ if (data_buf.buffer) {
+ flb_free(data_buf.buffer);
+ data_buf.buffer = NULL;
+ }
+
+ ret = file_to_buf(datafile, &data_buf);
+ if (ret == -1) {
+ flb_error("[sp test] cannot open DATA_SAMPLES file %s", datafile);
+ flb_free(config);
+ return;
+ }
+
+ ret = flb_sp_do_test(sp, task,
+ "samples", strlen("samples"),
+ &data_buf, &out_buf);
+
+ if (ret == -1) {
+ flb_error("[sp test] error processing check '%s'", check->name);
+ flb_sp_task_destroy(task);
+ continue;
+ }
+ }
+
+ flb_sp_fd_event_test(task->window.fd, task, &out_buf);
+
+ flb_info("[sp test] id=%i, SQL => '%s'", check->id, check->exec);
+ check->cb_check(check->id, check, out_buf.buffer, out_buf.size);
+ flb_pack_print(out_buf.buffer, out_buf.size);
+ flb_free(out_buf.buffer);
+
+ /* Snapshot flush */
+ check_flush = (struct task_check *) &snapshot_checks[i][1];
+
+ task_flush = flb_sp_task_create(sp, check_flush->name, check_flush->exec);
+ if (!task_flush) {
+ flb_error("[sp test] wrong check '%s', fix it!", check_flush->name);
+ continue;
+ }
+
+ snprintf(stream_name, 100, "%s-%d", "__flush_SNAPSHOT", i);
+ task_flush->cmd->stream_name = flb_sds_create(stream_name);
+ task_flush->cmd->type = FLB_SP_FLUSH_SNAPSHOT;
+
+ out_buf.buffer = NULL;
+ out_buf.size = 0;
+
+ ret = flb_sp_do_test(sp, task_flush,
+ "samples", strlen("samples"),
+ &data_buf, &out_buf);
+ if (ret == -1) {
+ flb_error("[sp test] error processing check '%s'", check_flush->name);
+ flb_sp_task_destroy(task_flush);
+ continue;
+ }
+
+ flb_sp_fd_event_test(task->window.fd, task_flush, &out_buf);
+
+ flb_info("[sp test] id=%i, SQL => '%s'", check_flush->id, check_flush->exec);
+ check_flush->cb_check(check_flush->id, check_flush, out_buf.buffer, out_buf.size);
+ flb_pack_print(out_buf.buffer, out_buf.size);
+ flb_free(out_buf.buffer);
+
+ flb_free(data_buf.buffer);
+ data_buf.buffer = NULL;
+ }
+
+ flb_free(data_buf.buffer);
+ flb_sp_destroy(sp);
+ mk_event_loop_destroy(config->evl);
+ flb_free(config);
+#ifdef _WIN32
+ WSACleanup();
+#endif
+}
+
+static void test_conv_from_str_to_num()
+{
+ struct flb_config *config = NULL;
+ struct flb_sp *sp = NULL;
+ struct flb_sp_task *task = NULL;
+ struct sp_buffer out_buf;
+ struct sp_buffer data_buf;
+ msgpack_sbuffer sbuf;
+ msgpack_packer pck;
+ msgpack_unpacked result;
+ size_t off = 0;
+ char json[4096] = {0};
+ int ret;
+
+#ifdef _WIN32
+ WSADATA wsa_data;
+
+ WSAStartup(0x0201, &wsa_data);
+#endif
+ out_buf.buffer = NULL;
+
+ flb_init_env();
+
+ config = flb_config_init();
+ config->evl = mk_event_loop_create(256);
+
+ ret = flb_storage_create(config);
+ if (!TEST_CHECK(ret == 0)) {
+ TEST_MSG("flb_storage_create failed");
+ flb_config_exit(config);
+ return;
+ }
+
+ sp = flb_sp_create(config);
+ if (!TEST_CHECK(sp != NULL)) {
+ TEST_MSG("[sp test] cannot create stream processor context");
+ goto test_conv_from_str_to_num_end;
+ }
+
+ task = flb_sp_task_create(sp, "tail.0", "CREATE STREAM test WITH (tag=\'test\') AS SELECT word, num, COUNT(*) FROM STREAM:tail.0 WINDOW TUMBLING (1 SECOND) GROUP BY word, num;");
+ if (!TEST_CHECK(task != NULL)) {
+ TEST_MSG("[sp test] wrong check 'conv', fix it!");
+ goto test_conv_from_str_to_num_end;
+ }
+
+ /* Create input data */
+ msgpack_sbuffer_init(&sbuf);
+ msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write);
+ msgpack_pack_array(&pck, 2);
+ flb_pack_time_now(&pck);
+ msgpack_pack_map(&pck, 2);
+
+ msgpack_pack_str(&pck, 4);
+ msgpack_pack_str_body(&pck, "word", 4);
+ msgpack_pack_str(&pck, 4);
+ msgpack_pack_str_body(&pck, "hoge", 4);
+
+ msgpack_pack_str(&pck, 3);
+ msgpack_pack_str_body(&pck, "num", 3);
+ msgpack_pack_str(&pck, 6);
+ msgpack_pack_str_body(&pck, "123456", 6);
+
+ data_buf.buffer = sbuf.data;
+ data_buf.size = sbuf.size;
+
+ out_buf.buffer = NULL;
+ out_buf.size = 0;
+
+ /* Exec stream processor */
+ ret = flb_sp_do_test(sp, task, "tail.0", strlen("tail.0"), &data_buf, &out_buf);
+ if (!TEST_CHECK(ret == 0)) {
+ TEST_MSG("flb_sp_do_test failed");
+ msgpack_sbuffer_destroy(&sbuf);
+ goto test_conv_from_str_to_num_end;
+ }
+
+ if (!TEST_CHECK(out_buf.size > 0)) {
+ TEST_MSG("out_buf size is 0");
+ msgpack_sbuffer_destroy(&sbuf);
+ goto test_conv_from_str_to_num_end;
+ }
+
+
+ /* Check output buffer. It should contain a number 123456 not a string "123456" */
+
+ msgpack_unpacked_init(&result);
+ ret = msgpack_unpack_next(&result, out_buf.buffer, out_buf.size, &off);
+ if (!TEST_CHECK(ret == MSGPACK_UNPACK_SUCCESS)) {
+ TEST_MSG("failed to unpack ret=%d", ret);
+ msgpack_unpacked_destroy(&result);
+ msgpack_sbuffer_destroy(&sbuf);
+ goto test_conv_from_str_to_num_end;
+ }
+
+ ret = flb_msgpack_to_json(&json[0], sizeof(json), &result.data);
+ if (!TEST_CHECK(ret > 0)) {
+ TEST_MSG("flb_msgpack_to_json failed");
+ msgpack_unpacked_destroy(&result);
+ msgpack_sbuffer_destroy(&sbuf);
+ goto test_conv_from_str_to_num_end;
+ }
+
+ if (!TEST_CHECK(strstr(json,"123456") != NULL)) {
+ TEST_MSG("number not found");
+ msgpack_unpacked_destroy(&result);
+ msgpack_sbuffer_destroy(&sbuf);
+ goto test_conv_from_str_to_num_end;
+ }
+ if (!TEST_CHECK(strstr(json,"\"123456\"") == NULL)) {
+ TEST_MSG("output should be number type");
+ msgpack_unpacked_destroy(&result);
+ msgpack_sbuffer_destroy(&sbuf);
+ goto test_conv_from_str_to_num_end;
+ }
+
+ msgpack_unpacked_destroy(&result);
+ msgpack_sbuffer_destroy(&sbuf);
+
+ test_conv_from_str_to_num_end:
+ if (out_buf.buffer != NULL) {
+ flb_free(out_buf.buffer);
+ }
+
+#ifdef _WIN32
+ WSACleanup();
+#endif
+ if (sp != NULL) {
+ flb_sp_destroy(sp);
+ }
+ flb_storage_destroy(config);
+ flb_config_exit(config);
+}
+
+TEST_LIST = {
+ { "invalid_queries", invalid_queries},
+ { "select_keys", test_select_keys},
+ { "select_subkeys", test_select_subkeys},
+ { "window", test_window},
+ { "snapshot", test_snapshot},
+ { "conv_from_str_to_num", test_conv_from_str_to_num},
+ { NULL }
+};