diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/test.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/tests/test.c | 6960 |
1 files changed, 0 insertions, 6960 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/test.c b/fluent-bit/lib/librdkafka-2.1.0/tests/test.c deleted file mode 100644 index 71180c8f4..000000000 --- a/fluent-bit/lib/librdkafka-2.1.0/tests/test.c +++ /dev/null @@ -1,6960 +0,0 @@ -/* - * librdkafka - Apache Kafka C library - * - * Copyright (c) 2012-2013, Magnus Edenhill - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - - -#define _CRT_RAND_S // rand_s() on MSVC -#include <stdarg.h> -#include "test.h" -#include <signal.h> -#include <stdlib.h> -#include <stdio.h> - -#ifdef _WIN32 -#include <direct.h> /* _getcwd */ -#else -#include <sys/wait.h> /* waitpid */ -#endif - -/* Typical include path would be <librdkafka/rdkafka.h>, but this program - * is built from within the librdkafka source tree and thus differs. */ -#include "rdkafka.h" - -int test_level = 2; -int test_seed = 0; - -char test_mode[64] = "bare"; -char test_scenario[64] = "default"; -static volatile sig_atomic_t test_exit = 0; -static char test_topic_prefix[128] = "rdkafkatest"; -static int test_topic_random = 0; -int tests_running_cnt = 0; -int test_concurrent_max = 5; -int test_assert_on_fail = 0; -double test_timeout_multiplier = 1.0; -static char *test_sql_cmd = NULL; -int test_session_timeout_ms = 6000; -int test_broker_version; -static const char *test_broker_version_str = "2.4.0.0"; -int test_flags = 0; -int test_neg_flags = TEST_F_KNOWN_ISSUE; -/* run delete-test-topics.sh between each test (when concurrent_max = 1) */ -static int test_delete_topics_between = 0; -static const char *test_git_version = "HEAD"; -static const char *test_sockem_conf = ""; -int test_on_ci = 0; /* Tests are being run on CI, be more forgiving - * with regards to timeouts, etc. */ -int test_quick = 0; /** Run tests quickly */ -int test_idempotent_producer = 0; -int test_rusage = 0; /**< Check resource usage */ -/**< CPU speed calibration for rusage threshold checks. - * >1.0: CPU is slower than base line system, - * <1.0: CPU is faster than base line system. */ -double test_rusage_cpu_calibration = 1.0; -static const char *tests_to_run = NULL; /* all */ -static const char *subtests_to_run = NULL; /* all */ -static const char *tests_to_skip = NULL; /* none */ -int test_write_report = 0; /**< Write test report file */ - -static int show_summary = 1; -static int test_summary(int do_lock); - -/** - * Protects shared state, such as tests[] - */ -mtx_t test_mtx; -cnd_t test_cnd; - -static const char *test_states[] = { - "DNS", "SKIPPED", "RUNNING", "PASSED", "FAILED", -}; - - - -#define _TEST_DECL(NAME) extern int main_##NAME(int, char **) -#define _TEST(NAME, FLAGS, ...) \ - { .name = #NAME, .mainfunc = main_##NAME, .flags = FLAGS, __VA_ARGS__ } - - -/** - * Declare all tests here - */ -_TEST_DECL(0000_unittests); -_TEST_DECL(0001_multiobj); -_TEST_DECL(0002_unkpart); -_TEST_DECL(0003_msgmaxsize); -_TEST_DECL(0004_conf); -_TEST_DECL(0005_order); -_TEST_DECL(0006_symbols); -_TEST_DECL(0007_autotopic); -_TEST_DECL(0008_reqacks); -_TEST_DECL(0009_mock_cluster); -_TEST_DECL(0011_produce_batch); -_TEST_DECL(0012_produce_consume); -_TEST_DECL(0013_null_msgs); -_TEST_DECL(0014_reconsume_191); -_TEST_DECL(0015_offsets_seek); -_TEST_DECL(0016_client_swname); -_TEST_DECL(0017_compression); -_TEST_DECL(0018_cgrp_term); -_TEST_DECL(0019_list_groups); -_TEST_DECL(0020_destroy_hang); -_TEST_DECL(0021_rkt_destroy); -_TEST_DECL(0022_consume_batch); -_TEST_DECL(0022_consume_batch_local); -_TEST_DECL(0025_timers); -_TEST_DECL(0026_consume_pause); -_TEST_DECL(0028_long_topicnames); -_TEST_DECL(0029_assign_offset); -_TEST_DECL(0030_offset_commit); -_TEST_DECL(0031_get_offsets); -_TEST_DECL(0033_regex_subscribe); -_TEST_DECL(0033_regex_subscribe_local); -_TEST_DECL(0034_offset_reset); -_TEST_DECL(0034_offset_reset_mock); -_TEST_DECL(0035_api_version); -_TEST_DECL(0036_partial_fetch); -_TEST_DECL(0037_destroy_hang_local); -_TEST_DECL(0038_performance); -_TEST_DECL(0039_event_dr); -_TEST_DECL(0039_event_log); -_TEST_DECL(0039_event); -_TEST_DECL(0040_io_event); -_TEST_DECL(0041_fetch_max_bytes); -_TEST_DECL(0042_many_topics); -_TEST_DECL(0043_no_connection); -_TEST_DECL(0044_partition_cnt); -_TEST_DECL(0045_subscribe_update); -_TEST_DECL(0045_subscribe_update_topic_remove); -_TEST_DECL(0045_subscribe_update_non_exist_and_partchange); -_TEST_DECL(0045_subscribe_update_mock); -_TEST_DECL(0046_rkt_cache); -_TEST_DECL(0047_partial_buf_tmout); -_TEST_DECL(0048_partitioner); -_TEST_DECL(0049_consume_conn_close); -_TEST_DECL(0050_subscribe_adds); -_TEST_DECL(0051_assign_adds); -_TEST_DECL(0052_msg_timestamps); -_TEST_DECL(0053_stats_timing); -_TEST_DECL(0053_stats); -_TEST_DECL(0054_offset_time); -_TEST_DECL(0055_producer_latency); -_TEST_DECL(0056_balanced_group_mt); -_TEST_DECL(0057_invalid_topic); -_TEST_DECL(0058_log); -_TEST_DECL(0059_bsearch); -_TEST_DECL(0060_op_prio); -_TEST_DECL(0061_consumer_lag); -_TEST_DECL(0062_stats_event); -_TEST_DECL(0063_clusterid); -_TEST_DECL(0064_interceptors); -_TEST_DECL(0065_yield); -_TEST_DECL(0066_plugins); -_TEST_DECL(0067_empty_topic); -_TEST_DECL(0068_produce_timeout); -_TEST_DECL(0069_consumer_add_parts); -_TEST_DECL(0070_null_empty); -_TEST_DECL(0072_headers_ut); -_TEST_DECL(0073_headers); -_TEST_DECL(0074_producev); -_TEST_DECL(0075_retry); -_TEST_DECL(0076_produce_retry); -_TEST_DECL(0077_compaction); -_TEST_DECL(0078_c_from_cpp); -_TEST_DECL(0079_fork); -_TEST_DECL(0080_admin_ut); -_TEST_DECL(0081_admin); -_TEST_DECL(0082_fetch_max_bytes); -_TEST_DECL(0083_cb_event); -_TEST_DECL(0084_destroy_flags_local); -_TEST_DECL(0084_destroy_flags); -_TEST_DECL(0085_headers); -_TEST_DECL(0086_purge_local); -_TEST_DECL(0086_purge_remote); -_TEST_DECL(0088_produce_metadata_timeout); -_TEST_DECL(0089_max_poll_interval); -_TEST_DECL(0090_idempotence); -_TEST_DECL(0091_max_poll_interval_timeout); -_TEST_DECL(0092_mixed_msgver); -_TEST_DECL(0093_holb_consumer); -_TEST_DECL(0094_idempotence_msg_timeout); -_TEST_DECL(0095_all_brokers_down); -_TEST_DECL(0097_ssl_verify); -_TEST_DECL(0097_ssl_verify_local); -_TEST_DECL(0098_consumer_txn); -_TEST_DECL(0099_commit_metadata); -_TEST_DECL(0100_thread_interceptors); -_TEST_DECL(0101_fetch_from_follower); -_TEST_DECL(0102_static_group_rebalance); -_TEST_DECL(0103_transactions_local); -_TEST_DECL(0103_transactions); -_TEST_DECL(0104_fetch_from_follower_mock); -_TEST_DECL(0105_transactions_mock); -_TEST_DECL(0106_cgrp_sess_timeout); -_TEST_DECL(0107_topic_recreate); -_TEST_DECL(0109_auto_create_topics); -_TEST_DECL(0110_batch_size); -_TEST_DECL(0111_delay_create_topics); -_TEST_DECL(0112_assign_unknown_part); -_TEST_DECL(0113_cooperative_rebalance_local); -_TEST_DECL(0113_cooperative_rebalance); -_TEST_DECL(0114_sticky_partitioning); -_TEST_DECL(0115_producer_auth); -_TEST_DECL(0116_kafkaconsumer_close); -_TEST_DECL(0117_mock_errors); -_TEST_DECL(0118_commit_rebalance); -_TEST_DECL(0119_consumer_auth); -_TEST_DECL(0120_asymmetric_subscription); -_TEST_DECL(0121_clusterid); -_TEST_DECL(0122_buffer_cleaning_after_rebalance); -_TEST_DECL(0123_connections_max_idle); -_TEST_DECL(0124_openssl_invalid_engine); -_TEST_DECL(0125_immediate_flush); -_TEST_DECL(0126_oauthbearer_oidc); -_TEST_DECL(0128_sasl_callback_queue); -_TEST_DECL(0129_fetch_aborted_msgs); -_TEST_DECL(0130_store_offsets); -_TEST_DECL(0131_connect_timeout); -_TEST_DECL(0132_strategy_ordering); -_TEST_DECL(0133_ssl_keys); -_TEST_DECL(0134_ssl_provider); -_TEST_DECL(0135_sasl_credentials); -_TEST_DECL(0136_resolve_cb); -_TEST_DECL(0137_barrier_batch_consume); -_TEST_DECL(0138_admin_mock); - -/* Manual tests */ -_TEST_DECL(8000_idle); - - -/* Define test resource usage thresholds if the default limits - * are not tolerable. - * - * Fields: - * .ucpu - Max User CPU percentage (double) - * .scpu - Max System/Kernel CPU percentage (double) - * .rss - Max RSS (memory) in megabytes (double) - * .ctxsw - Max number of voluntary context switches (int) - * - * Also see test_rusage_check_thresholds() in rusage.c - * - * Make a comment in the _THRES() below why the extra thresholds are required. - * - * Usage: - * _TEST(00...., ..., - * _THRES(.ucpu = 15.0)), <-- Max 15% User CPU usage - */ -#define _THRES(...) .rusage_thres = {__VA_ARGS__} - -/** - * Define all tests here - */ -struct test tests[] = { - /* Special MAIN test to hold over-all timings, etc. */ - {.name = "<MAIN>", .flags = TEST_F_LOCAL}, - _TEST(0000_unittests, - TEST_F_LOCAL, - /* The msgq insert order tests are heavy on - * user CPU (memory scan), RSS, and - * system CPU (lots of allocations -> madvise(2)). */ - _THRES(.ucpu = 100.0, .scpu = 20.0, .rss = 900.0)), - _TEST(0001_multiobj, 0), - _TEST(0002_unkpart, 0), - _TEST(0003_msgmaxsize, 0), - _TEST(0004_conf, TEST_F_LOCAL), - _TEST(0005_order, 0), - _TEST(0006_symbols, TEST_F_LOCAL), - _TEST(0007_autotopic, 0), - _TEST(0008_reqacks, 0), - _TEST(0009_mock_cluster, - TEST_F_LOCAL, - /* Mock cluster requires MsgVersion 2 */ - TEST_BRKVER(0, 11, 0, 0)), - _TEST(0011_produce_batch, - 0, - /* Produces a lot of messages */ - _THRES(.ucpu = 40.0, .scpu = 8.0)), - _TEST(0012_produce_consume, 0), - _TEST(0013_null_msgs, 0), - _TEST(0014_reconsume_191, 0), - _TEST(0015_offsets_seek, 0), - _TEST(0016_client_swname, 0), - _TEST(0017_compression, 0), - _TEST(0018_cgrp_term, 0, TEST_BRKVER(0, 9, 0, 0)), - _TEST(0019_list_groups, 0, TEST_BRKVER(0, 9, 0, 0)), - _TEST(0020_destroy_hang, 0, TEST_BRKVER(0, 9, 0, 0)), - _TEST(0021_rkt_destroy, 0), - _TEST(0022_consume_batch, 0), - _TEST(0022_consume_batch_local, TEST_F_LOCAL), - _TEST(0025_timers, TEST_F_LOCAL), - _TEST(0026_consume_pause, 0, TEST_BRKVER(0, 9, 0, 0)), - _TEST(0028_long_topicnames, - TEST_F_KNOWN_ISSUE, - TEST_BRKVER(0, 9, 0, 0), - .extra = "https://github.com/edenhill/librdkafka/issues/529"), - _TEST(0029_assign_offset, 0), - _TEST(0030_offset_commit, - 0, - TEST_BRKVER(0, 9, 0, 0), - /* Loops over committed() until timeout */ - _THRES(.ucpu = 10.0, .scpu = 5.0)), - _TEST(0031_get_offsets, 0), - _TEST(0033_regex_subscribe, 0, TEST_BRKVER(0, 9, 0, 0)), - _TEST(0033_regex_subscribe_local, TEST_F_LOCAL), - _TEST(0034_offset_reset, 0), - _TEST(0034_offset_reset_mock, TEST_F_LOCAL), - _TEST(0035_api_version, 0), - _TEST(0036_partial_fetch, 0), - _TEST(0037_destroy_hang_local, TEST_F_LOCAL), - _TEST(0038_performance, - 0, - /* Produces and consumes a lot of messages */ - _THRES(.ucpu = 150.0, .scpu = 10)), - _TEST(0039_event_dr, 0), - _TEST(0039_event_log, TEST_F_LOCAL), - _TEST(0039_event, TEST_F_LOCAL), - _TEST(0040_io_event, 0, TEST_BRKVER(0, 9, 0, 0)), - _TEST(0041_fetch_max_bytes, - 0, - /* Re-fetches large messages multiple times */ - _THRES(.ucpu = 20.0, .scpu = 10.0)), - _TEST(0042_many_topics, 0), - _TEST(0043_no_connection, TEST_F_LOCAL), - _TEST(0044_partition_cnt, - 0, - TEST_BRKVER(1, 0, 0, 0), - /* Produces a lot of messages */ - _THRES(.ucpu = 30.0)), - _TEST(0045_subscribe_update, 0, TEST_BRKVER(0, 9, 0, 0)), - _TEST(0045_subscribe_update_topic_remove, - 0, - TEST_BRKVER(0, 9, 0, 0), - .scenario = "noautocreate"), - _TEST(0045_subscribe_update_non_exist_and_partchange, - 0, - TEST_BRKVER(0, 9, 0, 0), - .scenario = "noautocreate"), - _TEST(0045_subscribe_update_mock, TEST_F_LOCAL), - _TEST(0046_rkt_cache, TEST_F_LOCAL), - _TEST(0047_partial_buf_tmout, TEST_F_KNOWN_ISSUE), - _TEST(0048_partitioner, - 0, - /* Produces many small messages */ - _THRES(.ucpu = 10.0, .scpu = 5.0)), -#if WITH_SOCKEM - _TEST(0049_consume_conn_close, TEST_F_SOCKEM, TEST_BRKVER(0, 9, 0, 0)), -#endif - _TEST(0050_subscribe_adds, 0, TEST_BRKVER(0, 9, 0, 0)), - _TEST(0051_assign_adds, 0, TEST_BRKVER(0, 9, 0, 0)), - _TEST(0052_msg_timestamps, 0, TEST_BRKVER(0, 10, 0, 0)), - _TEST(0053_stats_timing, TEST_F_LOCAL), - _TEST(0053_stats, 0), - _TEST(0054_offset_time, 0, TEST_BRKVER(0, 10, 1, 0)), - _TEST(0055_producer_latency, TEST_F_KNOWN_ISSUE_WIN32), - _TEST(0056_balanced_group_mt, 0, TEST_BRKVER(0, 9, 0, 0)), - _TEST(0057_invalid_topic, 0, TEST_BRKVER(0, 9, 0, 0)), - _TEST(0058_log, TEST_F_LOCAL), - _TEST(0059_bsearch, 0, TEST_BRKVER(0, 10, 0, 0)), - _TEST(0060_op_prio, 0, TEST_BRKVER(0, 9, 0, 0)), - _TEST(0061_consumer_lag, 0), - _TEST(0062_stats_event, TEST_F_LOCAL), - _TEST(0063_clusterid, 0, TEST_BRKVER(0, 10, 1, 0)), - _TEST(0064_interceptors, 0, TEST_BRKVER(0, 9, 0, 0)), - _TEST(0065_yield, 0), - _TEST(0066_plugins, - TEST_F_LOCAL | TEST_F_KNOWN_ISSUE_WIN32 | TEST_F_KNOWN_ISSUE_OSX, - .extra = - "dynamic loading of tests might not be fixed for this platform"), - _TEST(0067_empty_topic, 0), -#if WITH_SOCKEM - _TEST(0068_produce_timeout, TEST_F_SOCKEM), -#endif - _TEST(0069_consumer_add_parts, - TEST_F_KNOWN_ISSUE_WIN32, - TEST_BRKVER(1, 0, 0, 0)), - _TEST(0070_null_empty, 0), - _TEST(0072_headers_ut, TEST_F_LOCAL), - _TEST(0073_headers, 0, TEST_BRKVER(0, 11, 0, 0)), - _TEST(0074_producev, TEST_F_LOCAL), -#if WITH_SOCKEM - _TEST(0075_retry, TEST_F_SOCKEM), -#endif - _TEST(0076_produce_retry, TEST_F_SOCKEM), - _TEST(0077_compaction, - 0, - /* The test itself requires message headers */ - TEST_BRKVER(0, 11, 0, 0)), - _TEST(0078_c_from_cpp, TEST_F_LOCAL), - _TEST(0079_fork, - TEST_F_LOCAL | TEST_F_KNOWN_ISSUE, - .extra = "using a fork():ed rd_kafka_t is not supported and will " - "most likely hang"), - _TEST(0080_admin_ut, TEST_F_LOCAL), - _TEST(0081_admin, 0, TEST_BRKVER(0, 10, 2, 0)), - _TEST(0082_fetch_max_bytes, 0, TEST_BRKVER(0, 10, 1, 0)), - _TEST(0083_cb_event, 0, TEST_BRKVER(0, 9, 0, 0)), - _TEST(0084_destroy_flags_local, TEST_F_LOCAL), - _TEST(0084_destroy_flags, 0), - _TEST(0085_headers, 0, TEST_BRKVER(0, 11, 0, 0)), - _TEST(0086_purge_local, TEST_F_LOCAL), - _TEST(0086_purge_remote, 0), -#if WITH_SOCKEM - _TEST(0088_produce_metadata_timeout, TEST_F_SOCKEM), -#endif - _TEST(0089_max_poll_interval, 0, TEST_BRKVER(0, 10, 1, 0)), - _TEST(0090_idempotence, 0, TEST_BRKVER(0, 11, 0, 0)), - _TEST(0091_max_poll_interval_timeout, 0, TEST_BRKVER(0, 10, 1, 0)), - _TEST(0092_mixed_msgver, 0, TEST_BRKVER(0, 11, 0, 0)), - _TEST(0093_holb_consumer, 0, TEST_BRKVER(0, 10, 1, 0)), -#if WITH_SOCKEM - _TEST(0094_idempotence_msg_timeout, - TEST_F_SOCKEM, - TEST_BRKVER(0, 11, 0, 0)), -#endif - _TEST(0095_all_brokers_down, TEST_F_LOCAL), - _TEST(0097_ssl_verify, 0), - _TEST(0097_ssl_verify_local, TEST_F_LOCAL), - _TEST(0098_consumer_txn, 0, TEST_BRKVER(0, 11, 0, 0)), - _TEST(0099_commit_metadata, 0), - _TEST(0100_thread_interceptors, TEST_F_LOCAL), - _TEST(0101_fetch_from_follower, 0, TEST_BRKVER(2, 4, 0, 0)), - _TEST(0102_static_group_rebalance, 0, TEST_BRKVER(2, 3, 0, 0)), - _TEST(0103_transactions_local, TEST_F_LOCAL), - _TEST(0103_transactions, - 0, - TEST_BRKVER(0, 11, 0, 0), - .scenario = "default,ak23"), - _TEST(0104_fetch_from_follower_mock, TEST_F_LOCAL, TEST_BRKVER(2, 4, 0, 0)), - _TEST(0105_transactions_mock, TEST_F_LOCAL, TEST_BRKVER(0, 11, 0, 0)), - _TEST(0106_cgrp_sess_timeout, TEST_F_LOCAL, TEST_BRKVER(0, 11, 0, 0)), - _TEST(0107_topic_recreate, - 0, - TEST_BRKVER_TOPIC_ADMINAPI, - .scenario = "noautocreate"), - _TEST(0109_auto_create_topics, 0), - _TEST(0110_batch_size, 0), - _TEST(0111_delay_create_topics, - 0, - TEST_BRKVER_TOPIC_ADMINAPI, - .scenario = "noautocreate"), - _TEST(0112_assign_unknown_part, 0), - _TEST(0113_cooperative_rebalance_local, - TEST_F_LOCAL, - TEST_BRKVER(2, 4, 0, 0)), - _TEST(0113_cooperative_rebalance, 0, TEST_BRKVER(2, 4, 0, 0)), - _TEST(0114_sticky_partitioning, 0), - _TEST(0115_producer_auth, 0, TEST_BRKVER(2, 1, 0, 0)), - _TEST(0116_kafkaconsumer_close, TEST_F_LOCAL), - _TEST(0117_mock_errors, TEST_F_LOCAL), - _TEST(0118_commit_rebalance, 0), - _TEST(0119_consumer_auth, 0, TEST_BRKVER(2, 1, 0, 0)), - _TEST(0120_asymmetric_subscription, TEST_F_LOCAL), - _TEST(0121_clusterid, TEST_F_LOCAL), - _TEST(0122_buffer_cleaning_after_rebalance, 0, TEST_BRKVER(2, 4, 0, 0)), - _TEST(0123_connections_max_idle, 0), - _TEST(0124_openssl_invalid_engine, TEST_F_LOCAL), - _TEST(0125_immediate_flush, 0), - _TEST(0126_oauthbearer_oidc, 0, TEST_BRKVER(3, 1, 0, 0)), - _TEST(0128_sasl_callback_queue, TEST_F_LOCAL, TEST_BRKVER(2, 0, 0, 0)), - _TEST(0129_fetch_aborted_msgs, 0, TEST_BRKVER(0, 11, 0, 0)), - _TEST(0130_store_offsets, 0), - _TEST(0131_connect_timeout, TEST_F_LOCAL), - _TEST(0132_strategy_ordering, 0, TEST_BRKVER(2, 4, 0, 0)), - _TEST(0133_ssl_keys, TEST_F_LOCAL), - _TEST(0134_ssl_provider, TEST_F_LOCAL), - _TEST(0135_sasl_credentials, 0), - _TEST(0136_resolve_cb, TEST_F_LOCAL), - _TEST(0137_barrier_batch_consume, 0), - _TEST(0138_admin_mock, TEST_F_LOCAL, TEST_BRKVER(2, 4, 0, 0)), - - /* Manual tests */ - _TEST(8000_idle, TEST_F_MANUAL), - - {NULL}}; - - -RD_TLS struct test *test_curr = &tests[0]; - - - -#if WITH_SOCKEM -/** - * Socket network emulation with sockem - */ - -static void test_socket_add(struct test *test, sockem_t *skm) { - TEST_LOCK(); - rd_list_add(&test->sockets, skm); - TEST_UNLOCK(); -} - -static void test_socket_del(struct test *test, sockem_t *skm, int do_lock) { - if (do_lock) - TEST_LOCK(); - /* Best effort, skm might not have been added if connect_cb failed */ - rd_list_remove(&test->sockets, skm); - if (do_lock) - TEST_UNLOCK(); -} - -int test_socket_sockem_set_all(const char *key, int val) { - int i; - sockem_t *skm; - int cnt = 0; - - TEST_LOCK(); - - cnt = rd_list_cnt(&test_curr->sockets); - TEST_SAY("Setting sockem %s=%d on %s%d socket(s)\n", key, val, - cnt > 0 ? "" : _C_RED, cnt); - - RD_LIST_FOREACH(skm, &test_curr->sockets, i) { - if (sockem_set(skm, key, val, NULL) == -1) - TEST_FAIL("sockem_set(%s, %d) failed", key, val); - } - - TEST_UNLOCK(); - - return cnt; -} - -void test_socket_sockem_set(int s, const char *key, int value) { - sockem_t *skm; - - TEST_LOCK(); - skm = sockem_find(s); - if (skm) - sockem_set(skm, key, value, NULL); - TEST_UNLOCK(); -} - -void test_socket_close_all(struct test *test, int reinit) { - TEST_LOCK(); - rd_list_destroy(&test->sockets); - if (reinit) - rd_list_init(&test->sockets, 16, (void *)sockem_close); - TEST_UNLOCK(); -} - - -static int test_connect_cb(int s, - const struct sockaddr *addr, - int addrlen, - const char *id, - void *opaque) { - struct test *test = opaque; - sockem_t *skm; - int r; - - skm = sockem_connect(s, addr, addrlen, test_sockem_conf, 0, NULL); - if (!skm) - return errno; - - if (test->connect_cb) { - r = test->connect_cb(test, skm, id); - if (r) - return r; - } - - test_socket_add(test, skm); - - return 0; -} - -static int test_closesocket_cb(int s, void *opaque) { - struct test *test = opaque; - sockem_t *skm; - - TEST_LOCK(); - skm = sockem_find(s); - if (skm) { - /* Close sockem's sockets */ - sockem_close(skm); - test_socket_del(test, skm, 0 /*nolock*/); - } - TEST_UNLOCK(); - - /* Close librdkafka's socket */ -#ifdef _WIN32 - closesocket(s); -#else - close(s); -#endif - - return 0; -} - - -void test_socket_enable(rd_kafka_conf_t *conf) { - rd_kafka_conf_set_connect_cb(conf, test_connect_cb); - rd_kafka_conf_set_closesocket_cb(conf, test_closesocket_cb); - rd_kafka_conf_set_opaque(conf, test_curr); -} -#endif /* WITH_SOCKEM */ - -/** - * @brief For use as the is_fatal_cb(), treating no errors as test-fatal. - */ -int test_error_is_not_fatal_cb(rd_kafka_t *rk, - rd_kafka_resp_err_t err, - const char *reason) { - return 0; -} - -static void -test_error_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque) { - if (test_curr->is_fatal_cb && - !test_curr->is_fatal_cb(rk, err, reason)) { - TEST_SAY(_C_YEL "%s rdkafka error (non-testfatal): %s: %s\n", - rd_kafka_name(rk), rd_kafka_err2str(err), reason); - } else { - if (err == RD_KAFKA_RESP_ERR__FATAL) { - char errstr[512]; - TEST_SAY(_C_RED "%s Fatal error: %s\n", - rd_kafka_name(rk), reason); - - err = rd_kafka_fatal_error(rk, errstr, sizeof(errstr)); - - if (test_curr->is_fatal_cb && - !test_curr->is_fatal_cb(rk, err, reason)) - TEST_SAY(_C_YEL - "%s rdkafka ignored FATAL error: " - "%s: %s\n", - rd_kafka_name(rk), - rd_kafka_err2str(err), errstr); - else - TEST_FAIL("%s rdkafka FATAL error: %s: %s", - rd_kafka_name(rk), - rd_kafka_err2str(err), errstr); - - } else { - TEST_FAIL("%s rdkafka error: %s: %s", rd_kafka_name(rk), - rd_kafka_err2str(err), reason); - } - } -} - -static int -test_stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) { - struct test *test = test_curr; - if (test->stats_fp) - fprintf(test->stats_fp, - "{\"test\": \"%s\", \"instance\":\"%s\", " - "\"stats\": %s}\n", - test->name, rd_kafka_name(rk), json); - return 0; -} - - -/** - * @brief Limit the test run time (in seconds) - */ -void test_timeout_set(int timeout) { - TEST_LOCK(); - TEST_SAY("Setting test timeout to %ds * %.1f\n", timeout, - test_timeout_multiplier); - timeout = (int)((double)timeout * test_timeout_multiplier); - test_curr->timeout = test_clock() + ((int64_t)timeout * 1000000); - TEST_UNLOCK(); -} - -int tmout_multip(int msecs) { - int r; - TEST_LOCK(); - r = (int)(((double)(msecs)) * test_timeout_multiplier); - TEST_UNLOCK(); - return r; -} - - - -#ifdef _WIN32 -static void test_init_win32(void) { - /* Enable VT emulation to support colored output. */ - HANDLE hOut = GetStdHandle(STD_OUTPUT_HANDLE); - DWORD dwMode = 0; - - if (hOut == INVALID_HANDLE_VALUE || !GetConsoleMode(hOut, &dwMode)) - return; - -#ifndef ENABLE_VIRTUAL_TERMINAL_PROCESSING -#define ENABLE_VIRTUAL_TERMINAL_PROCESSING 0x4 -#endif - dwMode |= ENABLE_VIRTUAL_TERMINAL_PROCESSING; - SetConsoleMode(hOut, dwMode); -} -#endif - - -static void test_init(void) { - int seed; - const char *tmp; - - - if (test_seed) - return; - - if ((tmp = test_getenv("TEST_LEVEL", NULL))) - test_level = atoi(tmp); - if ((tmp = test_getenv("TEST_MODE", NULL))) - strncpy(test_mode, tmp, sizeof(test_mode) - 1); - if ((tmp = test_getenv("TEST_SCENARIO", NULL))) - strncpy(test_scenario, tmp, sizeof(test_scenario) - 1); - if ((tmp = test_getenv("TEST_SOCKEM", NULL))) - test_sockem_conf = tmp; - if ((tmp = test_getenv("TEST_SEED", NULL))) - seed = atoi(tmp); - else - seed = test_clock() & 0xffffffff; - if ((tmp = test_getenv("TEST_CPU_CALIBRATION", NULL))) { - test_rusage_cpu_calibration = strtod(tmp, NULL); - if (test_rusage_cpu_calibration < 0.00001) { - fprintf(stderr, - "%% Invalid CPU calibration " - "value (from TEST_CPU_CALIBRATION env): %s\n", - tmp); - exit(1); - } - } - -#ifdef _WIN32 - test_init_win32(); - { - LARGE_INTEGER cycl; - QueryPerformanceCounter(&cycl); - seed = (int)cycl.QuadPart; - } -#endif - srand(seed); - test_seed = seed; -} - - -const char *test_mk_topic_name(const char *suffix, int randomized) { - static RD_TLS char ret[512]; - - /* Strip main_ prefix (caller is using __FUNCTION__) */ - if (!strncmp(suffix, "main_", 5)) - suffix += 5; - - if (test_topic_random || randomized) - rd_snprintf(ret, sizeof(ret), "%s_rnd%" PRIx64 "_%s", - test_topic_prefix, test_id_generate(), suffix); - else - rd_snprintf(ret, sizeof(ret), "%s_%s", test_topic_prefix, - suffix); - - TEST_SAY("Using topic \"%s\"\n", ret); - - return ret; -} - - -/** - * @brief Set special test config property - * @returns 1 if property was known, else 0. - */ -int test_set_special_conf(const char *name, const char *val, int *timeoutp) { - if (!strcmp(name, "test.timeout.multiplier")) { - TEST_LOCK(); - test_timeout_multiplier = strtod(val, NULL); - TEST_UNLOCK(); - *timeoutp = tmout_multip((*timeoutp) * 1000) / 1000; - } else if (!strcmp(name, "test.topic.prefix")) { - rd_snprintf(test_topic_prefix, sizeof(test_topic_prefix), "%s", - val); - } else if (!strcmp(name, "test.topic.random")) { - if (!strcmp(val, "true") || !strcmp(val, "1")) - test_topic_random = 1; - else - test_topic_random = 0; - } else if (!strcmp(name, "test.concurrent.max")) { - TEST_LOCK(); - test_concurrent_max = (int)strtod(val, NULL); - TEST_UNLOCK(); - } else if (!strcmp(name, "test.sql.command")) { - TEST_LOCK(); - if (test_sql_cmd) - rd_free(test_sql_cmd); - test_sql_cmd = rd_strdup(val); - TEST_UNLOCK(); - } else - return 0; - - return 1; -} - -static void test_read_conf_file(const char *conf_path, - rd_kafka_conf_t *conf, - rd_kafka_topic_conf_t *topic_conf, - int *timeoutp) { - FILE *fp; - char buf[1024]; - int line = 0; - -#ifndef _WIN32 - fp = fopen(conf_path, "r"); -#else - fp = NULL; - errno = fopen_s(&fp, conf_path, "r"); -#endif - if (!fp) { - if (errno == ENOENT) { - TEST_SAY("Test config file %s not found\n", conf_path); - return; - } else - TEST_FAIL("Failed to read %s: %s", conf_path, - strerror(errno)); - } - - while (fgets(buf, sizeof(buf) - 1, fp)) { - char *t; - char *b = buf; - rd_kafka_conf_res_t res = RD_KAFKA_CONF_UNKNOWN; - char *name, *val; - char errstr[512]; - - line++; - if ((t = strchr(b, '\n'))) - *t = '\0'; - - if (*b == '#' || !*b) - continue; - - if (!(t = strchr(b, '='))) - TEST_FAIL("%s:%i: expected name=value format\n", - conf_path, line); - - name = b; - *t = '\0'; - val = t + 1; - - if (test_set_special_conf(name, val, timeoutp)) - continue; - - if (!strncmp(name, "topic.", strlen("topic."))) { - name += strlen("topic."); - if (topic_conf) - res = rd_kafka_topic_conf_set(topic_conf, name, - val, errstr, - sizeof(errstr)); - else - res = RD_KAFKA_CONF_OK; - name -= strlen("topic."); - } - - if (res == RD_KAFKA_CONF_UNKNOWN) { - if (conf) - res = rd_kafka_conf_set(conf, name, val, errstr, - sizeof(errstr)); - else - res = RD_KAFKA_CONF_OK; - } - - if (res != RD_KAFKA_CONF_OK) - TEST_FAIL("%s:%i: %s\n", conf_path, line, errstr); - } - - fclose(fp); -} - -/** - * @brief Get path to test config file - */ -const char *test_conf_get_path(void) { - return test_getenv("RDKAFKA_TEST_CONF", "test.conf"); -} - -const char *test_getenv(const char *env, const char *def) { - return rd_getenv(env, def); -} - -void test_conf_common_init(rd_kafka_conf_t *conf, int timeout) { - if (conf) { - const char *tmp = test_getenv("TEST_DEBUG", NULL); - if (tmp) - test_conf_set(conf, "debug", tmp); - } - - if (timeout) - test_timeout_set(timeout); -} - - -/** - * Creates and sets up kafka configuration objects. - * Will read "test.conf" file if it exists. - */ -void test_conf_init(rd_kafka_conf_t **conf, - rd_kafka_topic_conf_t **topic_conf, - int timeout) { - const char *test_conf = test_conf_get_path(); - - if (conf) { - *conf = rd_kafka_conf_new(); - rd_kafka_conf_set(*conf, "client.id", test_curr->name, NULL, 0); - if (test_idempotent_producer) - test_conf_set(*conf, "enable.idempotence", "true"); - rd_kafka_conf_set_error_cb(*conf, test_error_cb); - rd_kafka_conf_set_stats_cb(*conf, test_stats_cb); - - /* Allow higher request timeouts on CI */ - if (test_on_ci) - test_conf_set(*conf, "request.timeout.ms", "10000"); - -#ifdef SIGIO - { - char buf[64]; - - /* Quick termination */ - rd_snprintf(buf, sizeof(buf), "%i", SIGIO); - rd_kafka_conf_set(*conf, "internal.termination.signal", - buf, NULL, 0); - signal(SIGIO, SIG_IGN); - } -#endif - } - -#if WITH_SOCKEM - if (*test_sockem_conf && conf) - test_socket_enable(*conf); -#endif - - if (topic_conf) - *topic_conf = rd_kafka_topic_conf_new(); - - /* Open and read optional local test configuration file, if any. */ - test_read_conf_file(test_conf, conf ? *conf : NULL, - topic_conf ? *topic_conf : NULL, &timeout); - - test_conf_common_init(conf ? *conf : NULL, timeout); -} - - -static RD_INLINE unsigned int test_rand(void) { - unsigned int r; -#ifdef _WIN32 - rand_s(&r); -#else - r = rand(); -#endif - return r; -} -/** - * Generate a "unique" test id. - */ -uint64_t test_id_generate(void) { - return (((uint64_t)test_rand()) << 32) | (uint64_t)test_rand(); -} - - -/** - * Generate a "unique" string id - */ -char *test_str_id_generate(char *dest, size_t dest_size) { - rd_snprintf(dest, dest_size, "%" PRId64, test_id_generate()); - return dest; -} - -/** - * Same as test_str_id_generate but returns a temporary string. - */ -const char *test_str_id_generate_tmp(void) { - static RD_TLS char ret[64]; - return test_str_id_generate(ret, sizeof(ret)); -} - -/** - * Format a message token. - * Pad's to dest_size. - */ -void test_msg_fmt(char *dest, - size_t dest_size, - uint64_t testid, - int32_t partition, - int msgid) { - size_t of; - - of = rd_snprintf(dest, dest_size, - "testid=%" PRIu64 ", partition=%" PRId32 ", msg=%i\n", - testid, partition, msgid); - if (of < dest_size - 1) { - memset(dest + of, '!', dest_size - of); - dest[dest_size - 1] = '\0'; - } -} - -/** - * @brief Prepare message value and key for test produce. - */ -void test_prepare_msg(uint64_t testid, - int32_t partition, - int msg_id, - char *val, - size_t val_size, - char *key, - size_t key_size) { - size_t of = 0; - - test_msg_fmt(key, key_size, testid, partition, msg_id); - - while (of < val_size) { - /* Copy-repeat key into val until val_size */ - size_t len = RD_MIN(val_size - of, key_size); - memcpy(val + of, key, len); - of += len; - } -} - - - -/** - * Parse a message token - */ -void test_msg_parse00(const char *func, - int line, - uint64_t testid, - int32_t exp_partition, - int *msgidp, - const char *topic, - int32_t partition, - int64_t offset, - const char *key, - size_t key_size) { - char buf[128]; - uint64_t in_testid; - int in_part; - - if (!key) - TEST_FAIL("%s:%i: Message (%s [%" PRId32 "] @ %" PRId64 - ") " - "has empty key\n", - func, line, topic, partition, offset); - - rd_snprintf(buf, sizeof(buf), "%.*s", (int)key_size, key); - - if (sscanf(buf, "testid=%" SCNu64 ", partition=%i, msg=%i\n", - &in_testid, &in_part, msgidp) != 3) - TEST_FAIL("%s:%i: Incorrect key format: %s", func, line, buf); - - - if (testid != in_testid || - (exp_partition != -1 && exp_partition != in_part)) - TEST_FAIL("%s:%i: Our testid %" PRIu64 - ", part %i did " - "not match message: \"%s\"\n", - func, line, testid, (int)exp_partition, buf); -} - -void test_msg_parse0(const char *func, - int line, - uint64_t testid, - rd_kafka_message_t *rkmessage, - int32_t exp_partition, - int *msgidp) { - test_msg_parse00(func, line, testid, exp_partition, msgidp, - rd_kafka_topic_name(rkmessage->rkt), - rkmessage->partition, rkmessage->offset, - (const char *)rkmessage->key, rkmessage->key_len); -} - - -struct run_args { - struct test *test; - int argc; - char **argv; -}; - -static int run_test0(struct run_args *run_args) { - struct test *test = run_args->test; - test_timing_t t_run; - int r; - char stats_file[256]; - - rd_snprintf(stats_file, sizeof(stats_file), "stats_%s_%" PRIu64 ".json", - test->name, test_id_generate()); - if (!(test->stats_fp = fopen(stats_file, "w+"))) - TEST_SAY("=== Failed to create stats file %s: %s ===\n", - stats_file, strerror(errno)); - - test_curr = test; - -#if WITH_SOCKEM - rd_list_init(&test->sockets, 16, (void *)sockem_close); -#endif - /* Don't check message status by default */ - test->exp_dr_status = (rd_kafka_msg_status_t)-1; - - TEST_SAY("================= Running test %s =================\n", - test->name); - if (test->stats_fp) - TEST_SAY("==== Stats written to file %s ====\n", stats_file); - - test_rusage_start(test_curr); - TIMING_START(&t_run, "%s", test->name); - test->start = t_run.ts_start; - - /* Run test main function */ - r = test->mainfunc(run_args->argc, run_args->argv); - - TIMING_STOP(&t_run); - test_rusage_stop(test_curr, - (double)TIMING_DURATION(&t_run) / 1000000.0); - - TEST_LOCK(); - test->duration = TIMING_DURATION(&t_run); - - if (test->state == TEST_SKIPPED) { - TEST_SAY( - "================= Test %s SKIPPED " - "=================\n", - run_args->test->name); - } else if (r) { - test->state = TEST_FAILED; - TEST_SAY( - "\033[31m" - "================= Test %s FAILED =================" - "\033[0m\n", - run_args->test->name); - } else { - test->state = TEST_PASSED; - TEST_SAY( - "\033[32m" - "================= Test %s PASSED =================" - "\033[0m\n", - run_args->test->name); - } - TEST_UNLOCK(); - - cnd_broadcast(&test_cnd); - -#if WITH_SOCKEM - test_socket_close_all(test, 0); -#endif - - if (test->stats_fp) { - long pos = ftell(test->stats_fp); - fclose(test->stats_fp); - test->stats_fp = NULL; - /* Delete file if nothing was written */ - if (pos == 0) { -#ifndef _WIN32 - unlink(stats_file); -#else - _unlink(stats_file); -#endif - } - } - - if (test_delete_topics_between && test_concurrent_max == 1) - test_delete_all_test_topics(60 * 1000); - - return r; -} - - - -static int run_test_from_thread(void *arg) { - struct run_args *run_args = arg; - - thrd_detach(thrd_current()); - - run_test0(run_args); - - TEST_LOCK(); - tests_running_cnt--; - TEST_UNLOCK(); - - free(run_args); - - return 0; -} - - -/** - * @brief Check running tests for timeouts. - * @locks TEST_LOCK MUST be held - */ -static void check_test_timeouts(void) { - int64_t now = test_clock(); - struct test *test; - - for (test = tests; test->name; test++) { - if (test->state != TEST_RUNNING) - continue; - - /* Timeout check */ - if (now > test->timeout) { - struct test *save_test = test_curr; - test_curr = test; - test->state = TEST_FAILED; - test_summary(0 /*no-locks*/); - TEST_FAIL0( - __FILE__, __LINE__, 0 /*nolock*/, 0 /*fail-later*/, - "Test %s%s%s%s timed out " - "(timeout set to %d seconds)\n", - test->name, *test->subtest ? " (" : "", - test->subtest, *test->subtest ? ")" : "", - (int)(test->timeout - test->start) / 1000000); - test_curr = save_test; - tests_running_cnt--; /* fail-later misses this*/ -#ifdef _WIN32 - TerminateThread(test->thrd, -1); -#else - pthread_kill(test->thrd, SIGKILL); -#endif - } - } -} - - -static int run_test(struct test *test, int argc, char **argv) { - struct run_args *run_args = calloc(1, sizeof(*run_args)); - int wait_cnt = 0; - - run_args->test = test; - run_args->argc = argc; - run_args->argv = argv; - - TEST_LOCK(); - while (tests_running_cnt >= test_concurrent_max) { - if (!(wait_cnt++ % 100)) - TEST_SAY( - "Too many tests running (%d >= %d): " - "postponing %s start...\n", - tests_running_cnt, test_concurrent_max, test->name); - cnd_timedwait_ms(&test_cnd, &test_mtx, 100); - - check_test_timeouts(); - } - tests_running_cnt++; - test->timeout = test_clock() + - (int64_t)(30.0 * 1000000.0 * test_timeout_multiplier); - test->state = TEST_RUNNING; - TEST_UNLOCK(); - - if (thrd_create(&test->thrd, run_test_from_thread, run_args) != - thrd_success) { - TEST_LOCK(); - tests_running_cnt--; - test->state = TEST_FAILED; - TEST_UNLOCK(); - - TEST_FAIL("Failed to start thread for test %s\n", test->name); - } - - return 0; -} - -static void run_tests(int argc, char **argv) { - struct test *test; - - for (test = tests; test->name; test++) { - char testnum[128]; - char *t; - const char *skip_reason = NULL; - rd_bool_t skip_silent = rd_false; - char tmp[128]; - const char *scenario = - test->scenario ? test->scenario : "default"; - - if (!test->mainfunc) - continue; - - /* Extract test number, as string */ - strncpy(testnum, test->name, sizeof(testnum) - 1); - testnum[sizeof(testnum) - 1] = '\0'; - if ((t = strchr(testnum, '_'))) - *t = '\0'; - - if ((test_flags && (test_flags & test->flags) != test_flags)) { - skip_reason = "filtered due to test flags"; - skip_silent = rd_true; - } - if ((test_neg_flags & ~test_flags) & test->flags) - skip_reason = "Filtered due to negative test flags"; - if (test_broker_version && - (test->minver > test_broker_version || - (test->maxver && test->maxver < test_broker_version))) { - rd_snprintf(tmp, sizeof(tmp), - "not applicable for broker " - "version %d.%d.%d.%d", - TEST_BRKVER_X(test_broker_version, 0), - TEST_BRKVER_X(test_broker_version, 1), - TEST_BRKVER_X(test_broker_version, 2), - TEST_BRKVER_X(test_broker_version, 3)); - skip_reason = tmp; - } - - if (!strstr(scenario, test_scenario)) { - rd_snprintf(tmp, sizeof(tmp), - "requires test scenario %s", scenario); - skip_silent = rd_true; - skip_reason = tmp; - } - - if (tests_to_run && !strstr(tests_to_run, testnum)) { - skip_reason = "not included in TESTS list"; - skip_silent = rd_true; - } else if (!tests_to_run && (test->flags & TEST_F_MANUAL)) { - skip_reason = "manual test"; - skip_silent = rd_true; - } else if (tests_to_skip && strstr(tests_to_skip, testnum)) - skip_reason = "included in TESTS_SKIP list"; - - if (!skip_reason) { - run_test(test, argc, argv); - } else { - if (skip_silent) { - TEST_SAYL(3, - "================= Skipping test %s " - "(%s) ================\n", - test->name, skip_reason); - TEST_LOCK(); - test->state = TEST_SKIPPED; - TEST_UNLOCK(); - } else { - test_curr = test; - TEST_SKIP("%s\n", skip_reason); - test_curr = &tests[0]; - } - } - } -} - -/** - * @brief Print summary for all tests. - * - * @returns the number of failed tests. - */ -static int test_summary(int do_lock) { - struct test *test; - FILE *report_fp = NULL; - char report_path[128]; - time_t t; - struct tm *tm; - char datestr[64]; - int64_t total_duration = 0; - int tests_run = 0; - int tests_failed = 0; - int tests_failed_known = 0; - int tests_passed = 0; - FILE *sql_fp = NULL; - const char *tmp; - - t = time(NULL); - tm = localtime(&t); - strftime(datestr, sizeof(datestr), "%Y%m%d%H%M%S", tm); - - if ((tmp = test_getenv("TEST_REPORT", NULL))) - rd_snprintf(report_path, sizeof(report_path), "%s", tmp); - else if (test_write_report) - rd_snprintf(report_path, sizeof(report_path), - "test_report_%s.json", datestr); - else - report_path[0] = '\0'; - - if (*report_path) { - report_fp = fopen(report_path, "w+"); - if (!report_fp) - TEST_WARN("Failed to create report file %s: %s\n", - report_path, strerror(errno)); - else - fprintf(report_fp, - "{ \"id\": \"%s_%s\", \"mode\": \"%s\", " - "\"scenario\": \"%s\", " - "\"date\": \"%s\", " - "\"git_version\": \"%s\", " - "\"broker_version\": \"%s\", " - "\"tests\": {", - datestr, test_mode, test_mode, test_scenario, - datestr, test_git_version, - test_broker_version_str); - } - - if (do_lock) - TEST_LOCK(); - - if (test_sql_cmd) { -#ifdef _WIN32 - sql_fp = _popen(test_sql_cmd, "w"); -#else - sql_fp = popen(test_sql_cmd, "w"); -#endif - if (!sql_fp) - TEST_WARN("Failed to execute test.sql.command: %s", - test_sql_cmd); - else - fprintf(sql_fp, - "CREATE TABLE IF NOT EXISTS " - "runs(runid text PRIMARY KEY, mode text, " - "date datetime, cnt int, passed int, " - "failed int, duration numeric);\n" - "CREATE TABLE IF NOT EXISTS " - "tests(runid text, mode text, name text, " - "state text, extra text, duration numeric);\n"); - } - - if (show_summary) - printf( - "TEST %s (%s, scenario %s) SUMMARY\n" - "#=========================================================" - "=========#\n", - datestr, test_mode, test_scenario); - - for (test = tests; test->name; test++) { - const char *color; - int64_t duration; - char extra[128] = ""; - int do_count = 1; - - if (!(duration = test->duration) && test->start > 0) - duration = test_clock() - test->start; - - if (test == tests) { - /* <MAIN> test: - * test accounts for total runtime. - * dont include in passed/run/failed counts. */ - total_duration = duration; - do_count = 0; - } - - switch (test->state) { - case TEST_PASSED: - color = _C_GRN; - if (do_count) { - tests_passed++; - tests_run++; - } - break; - case TEST_FAILED: - if (test->flags & TEST_F_KNOWN_ISSUE) { - rd_snprintf(extra, sizeof(extra), - " <-- known issue%s%s", - test->extra ? ": " : "", - test->extra ? test->extra : ""); - if (do_count) - tests_failed_known++; - } - color = _C_RED; - if (do_count) { - tests_failed++; - tests_run++; - } - break; - case TEST_RUNNING: - color = _C_MAG; - if (do_count) { - tests_failed++; /* All tests should be finished - */ - tests_run++; - } - break; - case TEST_NOT_STARTED: - color = _C_YEL; - if (test->extra) - rd_snprintf(extra, sizeof(extra), " %s", - test->extra); - break; - default: - color = _C_CYA; - break; - } - - if (show_summary && - (test->state != TEST_SKIPPED || *test->failstr || - (tests_to_run && !strncmp(tests_to_run, test->name, - strlen(tests_to_run))))) { - printf("|%s %-40s | %10s | %7.3fs %s|", color, - test->name, test_states[test->state], - (double)duration / 1000000.0, _C_CLR); - if (test->state == TEST_FAILED) - printf(_C_RED " %s" _C_CLR, test->failstr); - else if (test->state == TEST_SKIPPED) - printf(_C_CYA " %s" _C_CLR, test->failstr); - printf("%s\n", extra); - } - - if (report_fp) { - int i; - fprintf(report_fp, - "%s\"%s\": {" - "\"name\": \"%s\", " - "\"state\": \"%s\", " - "\"known_issue\": %s, " - "\"extra\": \"%s\", " - "\"duration\": %.3f, " - "\"report\": [ ", - test == tests ? "" : ", ", test->name, - test->name, test_states[test->state], - test->flags & TEST_F_KNOWN_ISSUE ? "true" - : "false", - test->extra ? test->extra : "", - (double)duration / 1000000.0); - - for (i = 0; i < test->report_cnt; i++) { - fprintf(report_fp, "%s%s ", i == 0 ? "" : ",", - test->report_arr[i]); - } - - fprintf(report_fp, "] }"); - } - - if (sql_fp) - fprintf(sql_fp, - "INSERT INTO tests VALUES(" - "'%s_%s', '%s', '%s', '%s', '%s', %f);\n", - datestr, test_mode, test_mode, test->name, - test_states[test->state], - test->extra ? test->extra : "", - (double)duration / 1000000.0); - } - if (do_lock) - TEST_UNLOCK(); - - if (show_summary) - printf( - "#=========================================================" - "=========#\n"); - - if (report_fp) { - fprintf(report_fp, - "}, " - "\"tests_run\": %d, " - "\"tests_passed\": %d, " - "\"tests_failed\": %d, " - "\"duration\": %.3f" - "}\n", - tests_run, tests_passed, tests_failed, - (double)total_duration / 1000000.0); - - fclose(report_fp); - TEST_SAY("# Test report written to %s\n", report_path); - } - - if (sql_fp) { - fprintf(sql_fp, - "INSERT INTO runs VALUES('%s_%s', '%s', datetime(), " - "%d, %d, %d, %f);\n", - datestr, test_mode, test_mode, tests_run, tests_passed, - tests_failed, (double)total_duration / 1000000.0); - fclose(sql_fp); - } - - return tests_failed - tests_failed_known; -} - -#ifndef _WIN32 -static void test_sig_term(int sig) { - if (test_exit) - exit(1); - fprintf(stderr, - "Exiting tests, waiting for running tests to finish.\n"); - test_exit = 1; -} -#endif - -/** - * Wait 'timeout' seconds for rdkafka to kill all its threads and clean up. - */ -static void test_wait_exit(int timeout) { - int r; - time_t start = time(NULL); - - while ((r = rd_kafka_thread_cnt()) && timeout-- >= 0) { - TEST_SAY("%i thread(s) in use by librdkafka, waiting...\n", r); - rd_sleep(1); - } - - TEST_SAY("%i thread(s) in use by librdkafka\n", r); - - if (r > 0) - TEST_FAIL("%i thread(s) still active in librdkafka", r); - - timeout -= (int)(time(NULL) - start); - if (timeout > 0) { - TEST_SAY( - "Waiting %d seconds for all librdkafka memory " - "to be released\n", - timeout); - if (rd_kafka_wait_destroyed(timeout * 1000) == -1) - TEST_FAIL( - "Not all internal librdkafka " - "objects destroyed\n"); - } -} - - - -/** - * @brief Test framework cleanup before termination. - */ -static void test_cleanup(void) { - struct test *test; - - /* Free report arrays */ - for (test = tests; test->name; test++) { - int i; - if (!test->report_arr) - continue; - for (i = 0; i < test->report_cnt; i++) - rd_free(test->report_arr[i]); - rd_free(test->report_arr); - test->report_arr = NULL; - } - - if (test_sql_cmd) - rd_free(test_sql_cmd); -} - - -int main(int argc, char **argv) { - int i, r; - test_timing_t t_all; - int a, b, c, d; - const char *tmpver; - - mtx_init(&test_mtx, mtx_plain); - cnd_init(&test_cnd); - - test_init(); - -#ifndef _WIN32 - signal(SIGINT, test_sig_term); -#endif - tests_to_run = test_getenv("TESTS", NULL); - subtests_to_run = test_getenv("SUBTESTS", NULL); - tests_to_skip = test_getenv("TESTS_SKIP", NULL); - tmpver = test_getenv("TEST_KAFKA_VERSION", NULL); - if (!tmpver) - tmpver = test_getenv("KAFKA_VERSION", test_broker_version_str); - test_broker_version_str = tmpver; - - test_git_version = test_getenv("RDKAFKA_GITVER", "HEAD"); - - /* Are we running on CI? */ - if (test_getenv("CI", NULL)) { - test_on_ci = 1; - test_concurrent_max = 3; - } - - test_conf_init(NULL, NULL, 10); - - for (i = 1; i < argc; i++) { - if (!strncmp(argv[i], "-p", 2) && strlen(argv[i]) > 2) { - if (test_rusage) { - fprintf(stderr, - "%% %s ignored: -R takes preceedence\n", - argv[i]); - continue; - } - test_concurrent_max = (int)strtod(argv[i] + 2, NULL); - } else if (!strcmp(argv[i], "-l")) - test_flags |= TEST_F_LOCAL; - else if (!strcmp(argv[i], "-L")) - test_neg_flags |= TEST_F_LOCAL; - else if (!strcmp(argv[i], "-a")) - test_assert_on_fail = 1; - else if (!strcmp(argv[i], "-k")) - test_flags |= TEST_F_KNOWN_ISSUE; - else if (!strcmp(argv[i], "-K")) - test_neg_flags |= TEST_F_KNOWN_ISSUE; - else if (!strcmp(argv[i], "-E")) - test_neg_flags |= TEST_F_SOCKEM; - else if (!strcmp(argv[i], "-V") && i + 1 < argc) - test_broker_version_str = argv[++i]; - else if (!strcmp(argv[i], "-s") && i + 1 < argc) - strncpy(test_scenario, argv[++i], - sizeof(test_scenario) - 1); - else if (!strcmp(argv[i], "-S")) - show_summary = 0; - else if (!strcmp(argv[i], "-D")) - test_delete_topics_between = 1; - else if (!strcmp(argv[i], "-P")) - test_idempotent_producer = 1; - else if (!strcmp(argv[i], "-Q")) - test_quick = 1; - else if (!strcmp(argv[i], "-r")) - test_write_report = 1; - else if (!strncmp(argv[i], "-R", 2)) { - test_rusage = 1; - test_concurrent_max = 1; - if (strlen(argv[i]) > strlen("-R")) { - test_rusage_cpu_calibration = - strtod(argv[i] + 2, NULL); - if (test_rusage_cpu_calibration < 0.00001) { - fprintf(stderr, - "%% Invalid CPU calibration " - "value: %s\n", - argv[i] + 2); - exit(1); - } - } - } else if (*argv[i] != '-') - tests_to_run = argv[i]; - else { - printf( - "Unknown option: %s\n" - "\n" - "Usage: %s [options] [<test-match-substr>]\n" - "Options:\n" - " -p<N> Run N tests in parallel\n" - " -l/-L Only/dont run local tests (no broker " - "needed)\n" - " -k/-K Only/dont run tests with known issues\n" - " -E Don't run sockem tests\n" - " -a Assert on failures\n" - " -r Write test_report_...json file.\n" - " -S Dont show test summary\n" - " -s <scenario> Test scenario.\n" - " -V <N.N.N.N> Broker version.\n" - " -D Delete all test topics between each test " - "(-p1) or after all tests\n" - " -P Run all tests with " - "`enable.idempotency=true`\n" - " -Q Run tests in quick mode: faster tests, " - "fewer iterations, less data.\n" - " -R Check resource usage thresholds.\n" - " -R<C> Check resource usage thresholds but " - "adjust CPU thresholds by C (float):\n" - " C < 1.0: CPU is faster than base line " - "system.\n" - " C > 1.0: CPU is slower than base line " - "system.\n" - " E.g. -R2.5 = CPU is 2.5x slower than " - "base line system.\n" - "\n" - "Environment variables:\n" - " TESTS - substring matched test to run (e.g., " - "0033)\n" - " SUBTESTS - substring matched subtest to run " - "(e.g., n_wildcard)\n" - " TEST_KAFKA_VERSION - broker version (e.g., " - "0.9.0.1)\n" - " TEST_SCENARIO - Test scenario\n" - " TEST_LEVEL - Test verbosity level\n" - " TEST_MODE - bare, helgrind, valgrind\n" - " TEST_SEED - random seed\n" - " RDKAFKA_TEST_CONF - test config file " - "(test.conf)\n" - " KAFKA_PATH - Path to kafka source dir\n" - " ZK_ADDRESS - Zookeeper address\n" - "\n", - argv[i], argv[0]); - exit(1); - } - } - - TEST_SAY("Git version: %s\n", test_git_version); - - if (!strcmp(test_broker_version_str, "trunk")) - test_broker_version_str = "9.9.9.9"; /* for now */ - - d = 0; - if (sscanf(test_broker_version_str, "%d.%d.%d.%d", &a, &b, &c, &d) < - 3) { - printf( - "%% Expected broker version to be in format " - "N.N.N (N=int), not %s\n", - test_broker_version_str); - exit(1); - } - test_broker_version = TEST_BRKVER(a, b, c, d); - TEST_SAY("Broker version: %s (%d.%d.%d.%d)\n", test_broker_version_str, - TEST_BRKVER_X(test_broker_version, 0), - TEST_BRKVER_X(test_broker_version, 1), - TEST_BRKVER_X(test_broker_version, 2), - TEST_BRKVER_X(test_broker_version, 3)); - - /* Set up fake "<MAIN>" test for all operations performed in - * the main thread rather than the per-test threads. - * Nice side effect is that we get timing and status for main as well.*/ - test_curr = &tests[0]; - test_curr->state = TEST_PASSED; - test_curr->start = test_clock(); - - if (test_on_ci) { - TEST_LOCK(); - test_timeout_multiplier += 2; - TEST_UNLOCK(); - } - - if (!strcmp(test_mode, "helgrind") || !strcmp(test_mode, "drd")) { - TEST_LOCK(); - test_timeout_multiplier += 5; - TEST_UNLOCK(); - } else if (!strcmp(test_mode, "valgrind")) { - TEST_LOCK(); - test_timeout_multiplier += 3; - TEST_UNLOCK(); - } - - /* Broker version 0.9 and api.version.request=true (which is default) - * will cause a 10s stall per connection. Instead of fixing - * that for each affected API in every test we increase the timeout - * multiplier accordingly instead. The typical consume timeout is 5 - * seconds, so a multiplier of 3 should be good. */ - if ((test_broker_version & 0xffff0000) == 0x00090000) - test_timeout_multiplier += 3; - - if (test_concurrent_max > 1) - test_timeout_multiplier += (double)test_concurrent_max / 3; - - TEST_SAY("Tests to run : %s\n", tests_to_run ? tests_to_run : "all"); - if (subtests_to_run) - TEST_SAY("Sub tests : %s\n", subtests_to_run); - if (tests_to_skip) - TEST_SAY("Skip tests : %s\n", tests_to_skip); - TEST_SAY("Test mode : %s%s%s\n", test_quick ? "quick, " : "", - test_mode, test_on_ci ? ", CI" : ""); - TEST_SAY("Test scenario: %s\n", test_scenario); - TEST_SAY("Test filter : %s\n", (test_flags & TEST_F_LOCAL) - ? "local tests only" - : "no filter"); - TEST_SAY("Test timeout multiplier: %.1f\n", test_timeout_multiplier); - TEST_SAY("Action on test failure: %s\n", - test_assert_on_fail ? "assert crash" : "continue other tests"); - if (test_rusage) - TEST_SAY("Test rusage : yes (%.2fx CPU calibration)\n", - test_rusage_cpu_calibration); - if (test_idempotent_producer) - TEST_SAY("Test Idempotent Producer: enabled\n"); - - { - char cwd[512], *pcwd; -#ifdef _WIN32 - pcwd = _getcwd(cwd, sizeof(cwd) - 1); -#else - pcwd = getcwd(cwd, sizeof(cwd) - 1); -#endif - if (pcwd) - TEST_SAY("Current directory: %s\n", cwd); - } - - test_timeout_set(30); - - TIMING_START(&t_all, "ALL-TESTS"); - - /* Run tests */ - run_tests(argc, argv); - - TEST_LOCK(); - while (tests_running_cnt > 0 && !test_exit) { - struct test *test; - - if (!test_quick && test_level >= 2) { - TEST_SAY("%d test(s) running:", tests_running_cnt); - - for (test = tests; test->name; test++) { - if (test->state != TEST_RUNNING) - continue; - - TEST_SAY0(" %s", test->name); - } - - TEST_SAY0("\n"); - } - - check_test_timeouts(); - - TEST_UNLOCK(); - - if (test_quick) - rd_usleep(200 * 1000, NULL); - else - rd_sleep(1); - TEST_LOCK(); - } - - TIMING_STOP(&t_all); - - test_curr = &tests[0]; - test_curr->duration = test_clock() - test_curr->start; - - TEST_UNLOCK(); - - if (test_delete_topics_between) - test_delete_all_test_topics(60 * 1000); - - r = test_summary(1 /*lock*/) ? 1 : 0; - - /* Wait for everything to be cleaned up since broker destroys are - * handled in its own thread. */ - test_wait_exit(0); - - /* If we havent failed at this point then - * there were no threads leaked */ - if (r == 0) - TEST_SAY("\n============== ALL TESTS PASSED ==============\n"); - - test_cleanup(); - - if (r > 0) - TEST_FAIL("%d test(s) failed, see previous errors", r); - - return r; -} - - - -/****************************************************************************** - * - * Helpers - * - ******************************************************************************/ - -void test_dr_msg_cb(rd_kafka_t *rk, - const rd_kafka_message_t *rkmessage, - void *opaque) { - int *remainsp = rkmessage->_private; - static const char *status_names[] = { - [RD_KAFKA_MSG_STATUS_NOT_PERSISTED] = "NotPersisted", - [RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED] = "PossiblyPersisted", - [RD_KAFKA_MSG_STATUS_PERSISTED] = "Persisted"}; - - TEST_SAYL(4, - "Delivery report: %s (%s) to %s [%" PRId32 - "] " - "at offset %" PRId64 " latency %.2fms\n", - rd_kafka_err2str(rkmessage->err), - status_names[rd_kafka_message_status(rkmessage)], - rd_kafka_topic_name(rkmessage->rkt), rkmessage->partition, - rkmessage->offset, - (float)rd_kafka_message_latency(rkmessage) / 1000.0); - - if (!test_curr->produce_sync) { - if (!test_curr->ignore_dr_err && - rkmessage->err != test_curr->exp_dr_err) - TEST_FAIL("Message delivery (to %s [%" PRId32 - "]) " - "failed: expected %s, got %s", - rd_kafka_topic_name(rkmessage->rkt), - rkmessage->partition, - rd_kafka_err2str(test_curr->exp_dr_err), - rd_kafka_err2str(rkmessage->err)); - - if ((int)test_curr->exp_dr_status != -1) { - rd_kafka_msg_status_t status = - rd_kafka_message_status(rkmessage); - - TEST_ASSERT(status == test_curr->exp_dr_status, - "Expected message status %s, not %s", - status_names[test_curr->exp_dr_status], - status_names[status]); - } - - /* Add message to msgver */ - if (!rkmessage->err && test_curr->dr_mv) - test_msgver_add_msg(rk, test_curr->dr_mv, rkmessage); - } - - if (remainsp) { - TEST_ASSERT(*remainsp > 0, - "Too many messages delivered (remains %i)", - *remainsp); - - (*remainsp)--; - } - - if (test_curr->produce_sync) - test_curr->produce_sync_err = rkmessage->err; -} - - -rd_kafka_t *test_create_handle(int mode, rd_kafka_conf_t *conf) { - rd_kafka_t *rk; - char errstr[512]; - - if (!conf) { - test_conf_init(&conf, NULL, 0); -#if WITH_SOCKEM - if (*test_sockem_conf) - test_socket_enable(conf); -#endif - } else { - if (!strcmp(test_conf_get(conf, "client.id"), "rdkafka")) - test_conf_set(conf, "client.id", test_curr->name); - } - - - - /* Creat kafka instance */ - rk = rd_kafka_new(mode, conf, errstr, sizeof(errstr)); - if (!rk) - TEST_FAIL("Failed to create rdkafka instance: %s\n", errstr); - - TEST_SAY("Created kafka instance %s\n", rd_kafka_name(rk)); - - return rk; -} - - -rd_kafka_t *test_create_producer(void) { - rd_kafka_conf_t *conf; - - test_conf_init(&conf, NULL, 0); - rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); - - return test_create_handle(RD_KAFKA_PRODUCER, conf); -} - - -/** - * Create topic_t object with va-arg list as key-value config pairs - * terminated by NULL. - */ -rd_kafka_topic_t * -test_create_topic_object(rd_kafka_t *rk, const char *topic, ...) { - rd_kafka_topic_t *rkt; - rd_kafka_topic_conf_t *topic_conf; - va_list ap; - const char *name, *val; - - test_conf_init(NULL, &topic_conf, 0); - - va_start(ap, topic); - while ((name = va_arg(ap, const char *)) && - (val = va_arg(ap, const char *))) { - test_topic_conf_set(topic_conf, name, val); - } - va_end(ap); - - rkt = rd_kafka_topic_new(rk, topic, topic_conf); - if (!rkt) - TEST_FAIL("Failed to create topic: %s\n", - rd_kafka_err2str(rd_kafka_last_error())); - - return rkt; -} - - -rd_kafka_topic_t * -test_create_producer_topic(rd_kafka_t *rk, const char *topic, ...) { - rd_kafka_topic_t *rkt; - rd_kafka_topic_conf_t *topic_conf; - char errstr[512]; - va_list ap; - const char *name, *val; - - test_conf_init(NULL, &topic_conf, 0); - - va_start(ap, topic); - while ((name = va_arg(ap, const char *)) && - (val = va_arg(ap, const char *))) { - if (rd_kafka_topic_conf_set(topic_conf, name, val, errstr, - sizeof(errstr)) != RD_KAFKA_CONF_OK) - TEST_FAIL("Conf failed: %s\n", errstr); - } - va_end(ap); - - /* Make sure all replicas are in-sync after producing - * so that consume test wont fail. */ - rd_kafka_topic_conf_set(topic_conf, "request.required.acks", "-1", - errstr, sizeof(errstr)); - - - rkt = rd_kafka_topic_new(rk, topic, topic_conf); - if (!rkt) - TEST_FAIL("Failed to create topic: %s\n", - rd_kafka_err2str(rd_kafka_last_error())); - - return rkt; -} - - - -/** - * Produces \p cnt messages and returns immediately. - * Does not wait for delivery. - * \p msgcounterp is incremented for each produced messages and passed - * as \p msg_opaque which is later used in test_dr_msg_cb to decrement - * the counter on delivery. - * - * If \p payload is NULL the message key and payload will be formatted - * according to standard test format, otherwise the key will be NULL and - * payload send as message payload. - * - * Default message size is 128 bytes, if \p size is non-zero and \p payload - * is NULL the message size of \p size will be used. - */ -void test_produce_msgs_nowait(rd_kafka_t *rk, - rd_kafka_topic_t *rkt, - uint64_t testid, - int32_t partition, - int msg_base, - int cnt, - const char *payload, - size_t size, - int msgrate, - int *msgcounterp) { - int msg_id; - test_timing_t t_all, t_poll; - char key[128]; - void *buf; - int64_t tot_bytes = 0; - int64_t tot_time_poll = 0; - int64_t per_msg_wait = 0; - - if (msgrate > 0) - per_msg_wait = 1000000 / (int64_t)msgrate; - - - if (payload) - buf = (void *)payload; - else { - if (size == 0) - size = 128; - buf = calloc(1, size); - } - - TEST_SAY("Produce to %s [%" PRId32 "]: messages #%d..%d\n", - rd_kafka_topic_name(rkt), partition, msg_base, msg_base + cnt); - - TIMING_START(&t_all, "PRODUCE"); - TIMING_START(&t_poll, "SUM(POLL)"); - - for (msg_id = msg_base; msg_id < msg_base + cnt; msg_id++) { - int wait_time = 0; - - if (!payload) - test_prepare_msg(testid, partition, msg_id, buf, size, - key, sizeof(key)); - - - if (rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, buf, - size, !payload ? key : NULL, - !payload ? strlen(key) : 0, - msgcounterp) == -1) - TEST_FAIL( - "Failed to produce message %i " - "to partition %i: %s", - msg_id, (int)partition, - rd_kafka_err2str(rd_kafka_last_error())); - - (*msgcounterp)++; - tot_bytes += size; - - TIMING_RESTART(&t_poll); - do { - if (per_msg_wait) { - wait_time = (int)(per_msg_wait - - TIMING_DURATION(&t_poll)) / - 1000; - if (wait_time < 0) - wait_time = 0; - } - rd_kafka_poll(rk, wait_time); - } while (wait_time > 0); - - tot_time_poll = TIMING_DURATION(&t_poll); - - if (TIMING_EVERY(&t_all, 3 * 1000000)) - TEST_SAY( - "produced %3d%%: %d/%d messages " - "(%d msgs/s, %d bytes/s)\n", - ((msg_id - msg_base) * 100) / cnt, - msg_id - msg_base, cnt, - (int)((msg_id - msg_base) / - (TIMING_DURATION(&t_all) / 1000000)), - (int)((tot_bytes) / - (TIMING_DURATION(&t_all) / 1000000))); - } - - if (!payload) - free(buf); - - t_poll.duration = tot_time_poll; - TIMING_STOP(&t_poll); - TIMING_STOP(&t_all); -} - -/** - * Waits for the messages tracked by counter \p msgcounterp to be delivered. - */ -void test_wait_delivery(rd_kafka_t *rk, int *msgcounterp) { - test_timing_t t_all; - int start_cnt = *msgcounterp; - - TIMING_START(&t_all, "PRODUCE.DELIVERY.WAIT"); - - /* Wait for messages to be delivered */ - while (*msgcounterp > 0 && rd_kafka_outq_len(rk) > 0) { - rd_kafka_poll(rk, 10); - if (TIMING_EVERY(&t_all, 3 * 1000000)) { - int delivered = start_cnt - *msgcounterp; - TEST_SAY( - "wait_delivery: " - "%d/%d messages delivered: %d msgs/s\n", - delivered, start_cnt, - (int)(delivered / - (TIMING_DURATION(&t_all) / 1000000))); - } - } - - TIMING_STOP(&t_all); - - TEST_ASSERT(*msgcounterp == 0, - "Not all messages delivered: msgcounter still at %d, " - "outq_len %d", - *msgcounterp, rd_kafka_outq_len(rk)); -} - -/** - * Produces \p cnt messages and waits for succesful delivery - */ -void test_produce_msgs(rd_kafka_t *rk, - rd_kafka_topic_t *rkt, - uint64_t testid, - int32_t partition, - int msg_base, - int cnt, - const char *payload, - size_t size) { - int remains = 0; - - test_produce_msgs_nowait(rk, rkt, testid, partition, msg_base, cnt, - payload, size, 0, &remains); - - test_wait_delivery(rk, &remains); -} - - -/** - * @brief Produces \p cnt messages and waits for succesful delivery - */ -void test_produce_msgs2(rd_kafka_t *rk, - const char *topic, - uint64_t testid, - int32_t partition, - int msg_base, - int cnt, - const char *payload, - size_t size) { - int remains = 0; - rd_kafka_topic_t *rkt = test_create_topic_object(rk, topic, NULL); - - test_produce_msgs_nowait(rk, rkt, testid, partition, msg_base, cnt, - payload, size, 0, &remains); - - test_wait_delivery(rk, &remains); - - rd_kafka_topic_destroy(rkt); -} - -/** - * @brief Produces \p cnt messages without waiting for delivery. - */ -void test_produce_msgs2_nowait(rd_kafka_t *rk, - const char *topic, - uint64_t testid, - int32_t partition, - int msg_base, - int cnt, - const char *payload, - size_t size, - int *remainsp) { - rd_kafka_topic_t *rkt = test_create_topic_object(rk, topic, NULL); - - test_produce_msgs_nowait(rk, rkt, testid, partition, msg_base, cnt, - payload, size, 0, remainsp); - - rd_kafka_topic_destroy(rkt); -} - - -/** - * Produces \p cnt messages at \p msgs/s, and waits for succesful delivery - */ -void test_produce_msgs_rate(rd_kafka_t *rk, - rd_kafka_topic_t *rkt, - uint64_t testid, - int32_t partition, - int msg_base, - int cnt, - const char *payload, - size_t size, - int msgrate) { - int remains = 0; - - test_produce_msgs_nowait(rk, rkt, testid, partition, msg_base, cnt, - payload, size, msgrate, &remains); - - test_wait_delivery(rk, &remains); -} - - - -/** - * Create producer, produce \p msgcnt messages to \p topic \p partition, - * destroy consumer, and returns the used testid. - */ -uint64_t test_produce_msgs_easy_size(const char *topic, - uint64_t testid, - int32_t partition, - int msgcnt, - size_t size) { - rd_kafka_t *rk; - rd_kafka_topic_t *rkt; - test_timing_t t_produce; - - if (!testid) - testid = test_id_generate(); - rk = test_create_producer(); - rkt = test_create_producer_topic(rk, topic, NULL); - - TIMING_START(&t_produce, "PRODUCE"); - test_produce_msgs(rk, rkt, testid, partition, 0, msgcnt, NULL, size); - TIMING_STOP(&t_produce); - rd_kafka_topic_destroy(rkt); - rd_kafka_destroy(rk); - - return testid; -} - -rd_kafka_resp_err_t test_produce_sync(rd_kafka_t *rk, - rd_kafka_topic_t *rkt, - uint64_t testid, - int32_t partition) { - test_curr->produce_sync = 1; - test_produce_msgs(rk, rkt, testid, partition, 0, 1, NULL, 0); - test_curr->produce_sync = 0; - return test_curr->produce_sync_err; -} - - -/** - * @brief Easy produce function. - * - * @param ... is a NULL-terminated list of key, value config property pairs. - */ -void test_produce_msgs_easy_v(const char *topic, - uint64_t testid, - int32_t partition, - int msg_base, - int cnt, - size_t size, - ...) { - rd_kafka_conf_t *conf; - rd_kafka_t *p; - rd_kafka_topic_t *rkt; - va_list ap; - const char *key, *val; - - test_conf_init(&conf, NULL, 0); - - va_start(ap, size); - while ((key = va_arg(ap, const char *)) && - (val = va_arg(ap, const char *))) - test_conf_set(conf, key, val); - va_end(ap); - - rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); - - p = test_create_handle(RD_KAFKA_PRODUCER, conf); - - rkt = test_create_producer_topic(p, topic, NULL); - - test_produce_msgs(p, rkt, testid, partition, msg_base, cnt, NULL, size); - - rd_kafka_topic_destroy(rkt); - rd_kafka_destroy(p); -} - - -/** - * @brief Produce messages to multiple topic-partitions. - * - * @param ...vararg is a tuple of: - * const char *topic - * int32_t partition (or UA) - * int msg_base - * int msg_cnt - * - * End with a NULL topic - */ -void test_produce_msgs_easy_multi(uint64_t testid, ...) { - rd_kafka_conf_t *conf; - rd_kafka_t *p; - va_list ap; - const char *topic; - int msgcounter = 0; - - test_conf_init(&conf, NULL, 0); - - rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); - - p = test_create_handle(RD_KAFKA_PRODUCER, conf); - - va_start(ap, testid); - while ((topic = va_arg(ap, const char *))) { - int32_t partition = va_arg(ap, int32_t); - int msg_base = va_arg(ap, int); - int msg_cnt = va_arg(ap, int); - rd_kafka_topic_t *rkt; - - rkt = test_create_producer_topic(p, topic, NULL); - - test_produce_msgs_nowait(p, rkt, testid, partition, msg_base, - msg_cnt, NULL, 0, 0, &msgcounter); - - rd_kafka_topic_destroy(rkt); - } - va_end(ap); - - test_flush(p, tmout_multip(10 * 1000)); - - rd_kafka_destroy(p); -} - - - -/** - * @brief A standard incremental rebalance callback. - */ -void test_incremental_rebalance_cb(rd_kafka_t *rk, - rd_kafka_resp_err_t err, - rd_kafka_topic_partition_list_t *parts, - void *opaque) { - TEST_SAY("%s: incremental rebalance: %s: %d partition(s)%s\n", - rd_kafka_name(rk), rd_kafka_err2name(err), parts->cnt, - rd_kafka_assignment_lost(rk) ? ", assignment lost" : ""); - - switch (err) { - case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: - test_consumer_incremental_assign("rebalance_cb", rk, parts); - break; - case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: - test_consumer_incremental_unassign("rebalance_cb", rk, parts); - break; - default: - TEST_FAIL("Unknown rebalance event: %s", - rd_kafka_err2name(err)); - break; - } -} - -/** - * @brief A standard rebalance callback. - */ -void test_rebalance_cb(rd_kafka_t *rk, - rd_kafka_resp_err_t err, - rd_kafka_topic_partition_list_t *parts, - void *opaque) { - - if (!strcmp(rd_kafka_rebalance_protocol(rk), "COOPERATIVE")) { - test_incremental_rebalance_cb(rk, err, parts, opaque); - return; - } - - TEST_SAY("%s: Rebalance: %s: %d partition(s)\n", rd_kafka_name(rk), - rd_kafka_err2name(err), parts->cnt); - - switch (err) { - case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: - test_consumer_assign("assign", rk, parts); - break; - case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: - test_consumer_unassign("unassign", rk); - break; - default: - TEST_FAIL("Unknown rebalance event: %s", - rd_kafka_err2name(err)); - break; - } -} - - - -rd_kafka_t *test_create_consumer( - const char *group_id, - void (*rebalance_cb)(rd_kafka_t *rk, - rd_kafka_resp_err_t err, - rd_kafka_topic_partition_list_t *partitions, - void *opaque), - rd_kafka_conf_t *conf, - rd_kafka_topic_conf_t *default_topic_conf) { - rd_kafka_t *rk; - char tmp[64]; - - if (!conf) - test_conf_init(&conf, NULL, 0); - - if (group_id) { - test_conf_set(conf, "group.id", group_id); - - rd_snprintf(tmp, sizeof(tmp), "%d", test_session_timeout_ms); - test_conf_set(conf, "session.timeout.ms", tmp); - - if (rebalance_cb) - rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb); - } else { - TEST_ASSERT(!rebalance_cb); - } - - if (default_topic_conf) - rd_kafka_conf_set_default_topic_conf(conf, default_topic_conf); - - /* Create kafka instance */ - rk = test_create_handle(RD_KAFKA_CONSUMER, conf); - - if (group_id) - rd_kafka_poll_set_consumer(rk); - - return rk; -} - -rd_kafka_topic_t *test_create_consumer_topic(rd_kafka_t *rk, - const char *topic) { - rd_kafka_topic_t *rkt; - rd_kafka_topic_conf_t *topic_conf; - - test_conf_init(NULL, &topic_conf, 0); - - rkt = rd_kafka_topic_new(rk, topic, topic_conf); - if (!rkt) - TEST_FAIL("Failed to create topic: %s\n", - rd_kafka_err2str(rd_kafka_last_error())); - - return rkt; -} - - -void test_consumer_start(const char *what, - rd_kafka_topic_t *rkt, - int32_t partition, - int64_t start_offset) { - - TEST_SAY("%s: consumer_start: %s [%" PRId32 "] at offset %" PRId64 "\n", - what, rd_kafka_topic_name(rkt), partition, start_offset); - - if (rd_kafka_consume_start(rkt, partition, start_offset) == -1) - TEST_FAIL("%s: consume_start failed: %s\n", what, - rd_kafka_err2str(rd_kafka_last_error())); -} - -void test_consumer_stop(const char *what, - rd_kafka_topic_t *rkt, - int32_t partition) { - - TEST_SAY("%s: consumer_stop: %s [%" PRId32 "]\n", what, - rd_kafka_topic_name(rkt), partition); - - if (rd_kafka_consume_stop(rkt, partition) == -1) - TEST_FAIL("%s: consume_stop failed: %s\n", what, - rd_kafka_err2str(rd_kafka_last_error())); -} - -void test_consumer_seek(const char *what, - rd_kafka_topic_t *rkt, - int32_t partition, - int64_t offset) { - int err; - - TEST_SAY("%s: consumer_seek: %s [%" PRId32 "] to offset %" PRId64 "\n", - what, rd_kafka_topic_name(rkt), partition, offset); - - if ((err = rd_kafka_seek(rkt, partition, offset, 2000))) - TEST_FAIL("%s: consume_seek(%s, %" PRId32 ", %" PRId64 - ") " - "failed: %s\n", - what, rd_kafka_topic_name(rkt), partition, offset, - rd_kafka_err2str(err)); -} - - - -/** - * Returns offset of the last message consumed - */ -int64_t test_consume_msgs(const char *what, - rd_kafka_topic_t *rkt, - uint64_t testid, - int32_t partition, - int64_t offset, - int exp_msg_base, - int exp_cnt, - int parse_fmt) { - int cnt = 0; - int msg_next = exp_msg_base; - int fails = 0; - int64_t offset_last = -1; - int64_t tot_bytes = 0; - test_timing_t t_first, t_all; - - TEST_SAY("%s: consume_msgs: %s [%" PRId32 - "]: expect msg #%d..%d " - "at offset %" PRId64 "\n", - what, rd_kafka_topic_name(rkt), partition, exp_msg_base, - exp_msg_base + exp_cnt, offset); - - if (offset != TEST_NO_SEEK) { - rd_kafka_resp_err_t err; - test_timing_t t_seek; - - TIMING_START(&t_seek, "SEEK"); - if ((err = rd_kafka_seek(rkt, partition, offset, 5000))) - TEST_FAIL("%s: consume_msgs: %s [%" PRId32 - "]: " - "seek to %" PRId64 " failed: %s\n", - what, rd_kafka_topic_name(rkt), partition, - offset, rd_kafka_err2str(err)); - TIMING_STOP(&t_seek); - TEST_SAY("%s: seeked to offset %" PRId64 "\n", what, offset); - } - - TIMING_START(&t_first, "FIRST MSG"); - TIMING_START(&t_all, "ALL MSGS"); - - while (cnt < exp_cnt) { - rd_kafka_message_t *rkmessage; - int msg_id; - - rkmessage = - rd_kafka_consume(rkt, partition, tmout_multip(5000)); - - if (TIMING_EVERY(&t_all, 3 * 1000000)) - TEST_SAY( - "%s: " - "consumed %3d%%: %d/%d messages " - "(%d msgs/s, %d bytes/s)\n", - what, cnt * 100 / exp_cnt, cnt, exp_cnt, - (int)(cnt / (TIMING_DURATION(&t_all) / 1000000)), - (int)(tot_bytes / - (TIMING_DURATION(&t_all) / 1000000))); - - if (!rkmessage) - TEST_FAIL("%s: consume_msgs: %s [%" PRId32 - "]: " - "expected msg #%d (%d/%d): timed out\n", - what, rd_kafka_topic_name(rkt), partition, - msg_next, cnt, exp_cnt); - - if (rkmessage->err) - TEST_FAIL("%s: consume_msgs: %s [%" PRId32 - "]: " - "expected msg #%d (%d/%d): got error: %s\n", - what, rd_kafka_topic_name(rkt), partition, - msg_next, cnt, exp_cnt, - rd_kafka_err2str(rkmessage->err)); - - if (cnt == 0) - TIMING_STOP(&t_first); - - if (parse_fmt) - test_msg_parse(testid, rkmessage, partition, &msg_id); - else - msg_id = 0; - - if (test_level >= 3) - TEST_SAY("%s: consume_msgs: %s [%" PRId32 - "]: " - "got msg #%d at offset %" PRId64 - " (expect #%d at offset %" PRId64 ")\n", - what, rd_kafka_topic_name(rkt), partition, - msg_id, rkmessage->offset, msg_next, - offset >= 0 ? offset + cnt : -1); - - if (parse_fmt && msg_id != msg_next) { - TEST_SAY("%s: consume_msgs: %s [%" PRId32 - "]: " - "expected msg #%d (%d/%d): got msg #%d\n", - what, rd_kafka_topic_name(rkt), partition, - msg_next, cnt, exp_cnt, msg_id); - fails++; - } - - cnt++; - tot_bytes += rkmessage->len; - msg_next++; - offset_last = rkmessage->offset; - - rd_kafka_message_destroy(rkmessage); - } - - TIMING_STOP(&t_all); - - if (fails) - TEST_FAIL("%s: consume_msgs: %s [%" PRId32 "]: %d failures\n", - what, rd_kafka_topic_name(rkt), partition, fails); - - TEST_SAY("%s: consume_msgs: %s [%" PRId32 - "]: " - "%d/%d messages consumed succesfully\n", - what, rd_kafka_topic_name(rkt), partition, cnt, exp_cnt); - return offset_last; -} - - -/** - * Create high-level consumer subscribing to \p topic from BEGINNING - * and expects \d exp_msgcnt with matching \p testid - * Destroys consumer when done. - * - * @param txn If true, isolation.level is set to read_committed. - * @param partition If -1 the topic will be subscribed to, otherwise the - * single partition will be assigned immediately. - * - * If \p group_id is NULL a new unique group is generated - */ -void test_consume_msgs_easy_mv0(const char *group_id, - const char *topic, - rd_bool_t txn, - int32_t partition, - uint64_t testid, - int exp_eofcnt, - int exp_msgcnt, - rd_kafka_topic_conf_t *tconf, - test_msgver_t *mv) { - rd_kafka_t *rk; - char grpid0[64]; - rd_kafka_conf_t *conf; - - test_conf_init(&conf, tconf ? NULL : &tconf, 0); - - if (!group_id) - group_id = test_str_id_generate(grpid0, sizeof(grpid0)); - - if (txn) - test_conf_set(conf, "isolation.level", "read_committed"); - - test_topic_conf_set(tconf, "auto.offset.reset", "smallest"); - if (exp_eofcnt != -1) - test_conf_set(conf, "enable.partition.eof", "true"); - rk = test_create_consumer(group_id, NULL, conf, tconf); - - rd_kafka_poll_set_consumer(rk); - - if (partition == -1) { - TEST_SAY( - "Subscribing to topic %s in group %s " - "(expecting %d msgs with testid %" PRIu64 ")\n", - topic, group_id, exp_msgcnt, testid); - - test_consumer_subscribe(rk, topic); - } else { - rd_kafka_topic_partition_list_t *plist; - - TEST_SAY("Assign topic %s [%" PRId32 - "] in group %s " - "(expecting %d msgs with testid %" PRIu64 ")\n", - topic, partition, group_id, exp_msgcnt, testid); - - plist = rd_kafka_topic_partition_list_new(1); - rd_kafka_topic_partition_list_add(plist, topic, partition); - test_consumer_assign("consume_easy_mv", rk, plist); - rd_kafka_topic_partition_list_destroy(plist); - } - - /* Consume messages */ - test_consumer_poll("consume.easy", rk, testid, exp_eofcnt, -1, - exp_msgcnt, mv); - - test_consumer_close(rk); - - rd_kafka_destroy(rk); -} - -void test_consume_msgs_easy(const char *group_id, - const char *topic, - uint64_t testid, - int exp_eofcnt, - int exp_msgcnt, - rd_kafka_topic_conf_t *tconf) { - test_msgver_t mv; - - test_msgver_init(&mv, testid); - - test_consume_msgs_easy_mv(group_id, topic, -1, testid, exp_eofcnt, - exp_msgcnt, tconf, &mv); - - test_msgver_clear(&mv); -} - - -void test_consume_txn_msgs_easy(const char *group_id, - const char *topic, - uint64_t testid, - int exp_eofcnt, - int exp_msgcnt, - rd_kafka_topic_conf_t *tconf) { - test_msgver_t mv; - - test_msgver_init(&mv, testid); - - test_consume_msgs_easy_mv0(group_id, topic, rd_true /*txn*/, -1, testid, - exp_eofcnt, exp_msgcnt, tconf, &mv); - - test_msgver_clear(&mv); -} - - -/** - * @brief Waits for up to \p timeout_ms for consumer to receive assignment. - * If no assignment received without the timeout the test fails. - * - * @warning This method will poll the consumer and might thus read messages. - * Set \p do_poll to false to use a sleep rather than poll. - */ -void test_consumer_wait_assignment(rd_kafka_t *rk, rd_bool_t do_poll) { - rd_kafka_topic_partition_list_t *assignment = NULL; - int i; - - while (1) { - rd_kafka_resp_err_t err; - - err = rd_kafka_assignment(rk, &assignment); - TEST_ASSERT(!err, "rd_kafka_assignment() failed: %s", - rd_kafka_err2str(err)); - - if (assignment->cnt > 0) - break; - - rd_kafka_topic_partition_list_destroy(assignment); - - if (do_poll) - test_consumer_poll_once(rk, NULL, 1000); - else - rd_usleep(1000 * 1000, NULL); - } - - TEST_SAY("%s: Assignment (%d partition(s)): ", rd_kafka_name(rk), - assignment->cnt); - for (i = 0; i < assignment->cnt; i++) - TEST_SAY0("%s%s[%" PRId32 "]", i == 0 ? "" : ", ", - assignment->elems[i].topic, - assignment->elems[i].partition); - TEST_SAY0("\n"); - - rd_kafka_topic_partition_list_destroy(assignment); -} - - -/** - * @brief Verify that the consumer's assignment matches the expected assignment. - * - * The va-list is a NULL-terminated list of (const char *topic, int partition) - * tuples. - * - * Fails the test on mismatch, unless \p fail_immediately is false. - */ -void test_consumer_verify_assignment0(const char *func, - int line, - rd_kafka_t *rk, - int fail_immediately, - ...) { - va_list ap; - int cnt = 0; - const char *topic; - rd_kafka_topic_partition_list_t *assignment; - rd_kafka_resp_err_t err; - int i; - - if ((err = rd_kafka_assignment(rk, &assignment))) - TEST_FAIL("%s:%d: Failed to get assignment for %s: %s", func, - line, rd_kafka_name(rk), rd_kafka_err2str(err)); - - TEST_SAY("%s assignment (%d partition(s)):\n", rd_kafka_name(rk), - assignment->cnt); - for (i = 0; i < assignment->cnt; i++) - TEST_SAY(" %s [%" PRId32 "]\n", assignment->elems[i].topic, - assignment->elems[i].partition); - - va_start(ap, fail_immediately); - while ((topic = va_arg(ap, const char *))) { - int partition = va_arg(ap, int); - cnt++; - - if (!rd_kafka_topic_partition_list_find(assignment, topic, - partition)) - TEST_FAIL_LATER( - "%s:%d: Expected %s [%d] not found in %s's " - "assignment (%d partition(s))", - func, line, topic, partition, rd_kafka_name(rk), - assignment->cnt); - } - va_end(ap); - - if (cnt != assignment->cnt) - TEST_FAIL_LATER( - "%s:%d: " - "Expected %d assigned partition(s) for %s, not %d", - func, line, cnt, rd_kafka_name(rk), assignment->cnt); - - if (fail_immediately) - TEST_LATER_CHECK(); - - rd_kafka_topic_partition_list_destroy(assignment); -} - - - -/** - * @brief Start subscribing for 'topic' - */ -void test_consumer_subscribe(rd_kafka_t *rk, const char *topic) { - rd_kafka_topic_partition_list_t *topics; - rd_kafka_resp_err_t err; - - topics = rd_kafka_topic_partition_list_new(1); - rd_kafka_topic_partition_list_add(topics, topic, RD_KAFKA_PARTITION_UA); - - err = rd_kafka_subscribe(rk, topics); - if (err) - TEST_FAIL("%s: Failed to subscribe to %s: %s\n", - rd_kafka_name(rk), topic, rd_kafka_err2str(err)); - - rd_kafka_topic_partition_list_destroy(topics); -} - - -void test_consumer_assign(const char *what, - rd_kafka_t *rk, - rd_kafka_topic_partition_list_t *partitions) { - rd_kafka_resp_err_t err; - test_timing_t timing; - - TIMING_START(&timing, "ASSIGN.PARTITIONS"); - err = rd_kafka_assign(rk, partitions); - TIMING_STOP(&timing); - if (err) - TEST_FAIL("%s: failed to assign %d partition(s): %s\n", what, - partitions->cnt, rd_kafka_err2str(err)); - else - TEST_SAY("%s: assigned %d partition(s)\n", what, - partitions->cnt); -} - - -void test_consumer_incremental_assign( - const char *what, - rd_kafka_t *rk, - rd_kafka_topic_partition_list_t *partitions) { - rd_kafka_error_t *error; - test_timing_t timing; - - TIMING_START(&timing, "INCREMENTAL.ASSIGN.PARTITIONS"); - error = rd_kafka_incremental_assign(rk, partitions); - TIMING_STOP(&timing); - if (error) { - TEST_FAIL( - "%s: incremental assign of %d partition(s) failed: " - "%s", - what, partitions->cnt, rd_kafka_error_string(error)); - rd_kafka_error_destroy(error); - } else - TEST_SAY("%s: incremental assign of %d partition(s) done\n", - what, partitions->cnt); -} - - -void test_consumer_unassign(const char *what, rd_kafka_t *rk) { - rd_kafka_resp_err_t err; - test_timing_t timing; - - TIMING_START(&timing, "UNASSIGN.PARTITIONS"); - err = rd_kafka_assign(rk, NULL); - TIMING_STOP(&timing); - if (err) - TEST_FAIL("%s: failed to unassign current partitions: %s\n", - what, rd_kafka_err2str(err)); - else - TEST_SAY("%s: unassigned current partitions\n", what); -} - - -void test_consumer_incremental_unassign( - const char *what, - rd_kafka_t *rk, - rd_kafka_topic_partition_list_t *partitions) { - rd_kafka_error_t *error; - test_timing_t timing; - - TIMING_START(&timing, "INCREMENTAL.UNASSIGN.PARTITIONS"); - error = rd_kafka_incremental_unassign(rk, partitions); - TIMING_STOP(&timing); - if (error) { - TEST_FAIL( - "%s: incremental unassign of %d partition(s) " - "failed: %s", - what, partitions->cnt, rd_kafka_error_string(error)); - rd_kafka_error_destroy(error); - } else - TEST_SAY("%s: incremental unassign of %d partition(s) done\n", - what, partitions->cnt); -} - - -/** - * @brief Assign a single partition with an optional starting offset - */ -void test_consumer_assign_partition(const char *what, - rd_kafka_t *rk, - const char *topic, - int32_t partition, - int64_t offset) { - rd_kafka_topic_partition_list_t *part; - - part = rd_kafka_topic_partition_list_new(1); - rd_kafka_topic_partition_list_add(part, topic, partition)->offset = - offset; - - test_consumer_assign(what, rk, part); - - rd_kafka_topic_partition_list_destroy(part); -} - - -void test_consumer_pause_resume_partition(rd_kafka_t *rk, - const char *topic, - int32_t partition, - rd_bool_t pause) { - rd_kafka_topic_partition_list_t *part; - rd_kafka_resp_err_t err; - - part = rd_kafka_topic_partition_list_new(1); - rd_kafka_topic_partition_list_add(part, topic, partition); - - if (pause) - err = rd_kafka_pause_partitions(rk, part); - else - err = rd_kafka_resume_partitions(rk, part); - - TEST_ASSERT(!err, "Failed to %s %s [%" PRId32 "]: %s", - pause ? "pause" : "resume", topic, partition, - rd_kafka_err2str(err)); - - rd_kafka_topic_partition_list_destroy(part); -} - - -/** - * Message verification services - * - */ - -void test_msgver_init(test_msgver_t *mv, uint64_t testid) { - memset(mv, 0, sizeof(*mv)); - mv->testid = testid; - /* Max warning logs before suppressing. */ - mv->log_max = (test_level + 1) * 100; -} - -void test_msgver_ignore_eof(test_msgver_t *mv) { - mv->ignore_eof = rd_true; -} - -#define TEST_MV_WARN(mv, ...) \ - do { \ - if ((mv)->log_cnt++ > (mv)->log_max) \ - (mv)->log_suppr_cnt++; \ - else \ - TEST_WARN(__VA_ARGS__); \ - } while (0) - - - -static void test_mv_mvec_grow(struct test_mv_mvec *mvec, int tot_size) { - if (tot_size <= mvec->size) - return; - mvec->size = tot_size; - mvec->m = realloc(mvec->m, sizeof(*mvec->m) * mvec->size); -} - -/** - * Make sure there is room for at least \p cnt messages, else grow mvec. - */ -static void test_mv_mvec_reserve(struct test_mv_mvec *mvec, int cnt) { - test_mv_mvec_grow(mvec, mvec->cnt + cnt); -} - -void test_mv_mvec_init(struct test_mv_mvec *mvec, int exp_cnt) { - TEST_ASSERT(mvec->m == NULL, "mvec not cleared"); - - if (!exp_cnt) - return; - - test_mv_mvec_grow(mvec, exp_cnt); -} - - -void test_mv_mvec_clear(struct test_mv_mvec *mvec) { - if (mvec->m) - free(mvec->m); -} - -void test_msgver_clear(test_msgver_t *mv) { - int i; - for (i = 0; i < mv->p_cnt; i++) { - struct test_mv_p *p = mv->p[i]; - free(p->topic); - test_mv_mvec_clear(&p->mvec); - free(p); - } - - free(mv->p); - - test_msgver_init(mv, mv->testid); -} - -struct test_mv_p *test_msgver_p_get(test_msgver_t *mv, - const char *topic, - int32_t partition, - int do_create) { - int i; - struct test_mv_p *p; - - for (i = 0; i < mv->p_cnt; i++) { - p = mv->p[i]; - if (p->partition == partition && !strcmp(p->topic, topic)) - return p; - } - - if (!do_create) - TEST_FAIL("Topic %s [%d] not found in msgver", topic, - partition); - - if (mv->p_cnt == mv->p_size) { - mv->p_size = (mv->p_size + 4) * 2; - mv->p = realloc(mv->p, sizeof(*mv->p) * mv->p_size); - } - - mv->p[mv->p_cnt++] = p = calloc(1, sizeof(*p)); - - p->topic = rd_strdup(topic); - p->partition = partition; - p->eof_offset = RD_KAFKA_OFFSET_INVALID; - - return p; -} - - -/** - * Add (room for) message to message vector. - * Resizes the vector as needed. - */ -static struct test_mv_m *test_mv_mvec_add(struct test_mv_mvec *mvec) { - if (mvec->cnt == mvec->size) { - test_mv_mvec_grow(mvec, (mvec->size ? mvec->size * 2 : 10000)); - } - - mvec->cnt++; - - return &mvec->m[mvec->cnt - 1]; -} - -/** - * Returns message at index \p mi - */ -static RD_INLINE struct test_mv_m *test_mv_mvec_get(struct test_mv_mvec *mvec, - int mi) { - if (mi >= mvec->cnt) - return NULL; - return &mvec->m[mi]; -} - -/** - * @returns the message with msgid \p msgid, or NULL. - */ -static struct test_mv_m *test_mv_mvec_find_by_msgid(struct test_mv_mvec *mvec, - int msgid) { - int mi; - - for (mi = 0; mi < mvec->cnt; mi++) - if (mvec->m[mi].msgid == msgid) - return &mvec->m[mi]; - - return NULL; -} - - -/** - * Print message list to \p fp - */ -static RD_UNUSED void test_mv_mvec_dump(FILE *fp, - const struct test_mv_mvec *mvec) { - int mi; - - fprintf(fp, "*** Dump mvec with %d messages (capacity %d): ***\n", - mvec->cnt, mvec->size); - for (mi = 0; mi < mvec->cnt; mi++) - fprintf(fp, " msgid %d, offset %" PRId64 "\n", - mvec->m[mi].msgid, mvec->m[mi].offset); - fprintf(fp, "*** Done ***\n"); -} - -static void test_mv_mvec_sort(struct test_mv_mvec *mvec, - int (*cmp)(const void *, const void *)) { - qsort(mvec->m, mvec->cnt, sizeof(*mvec->m), cmp); -} - - -/** - * @brief Adds a message to the msgver service. - * - * @returns 1 if message is from the expected testid, else 0 (not added) - */ -int test_msgver_add_msg00(const char *func, - int line, - const char *clientname, - test_msgver_t *mv, - uint64_t testid, - const char *topic, - int32_t partition, - int64_t offset, - int64_t timestamp, - int32_t broker_id, - rd_kafka_resp_err_t err, - int msgnum) { - struct test_mv_p *p; - struct test_mv_m *m; - - if (testid != mv->testid) { - TEST_SAYL(3, - "%s:%d: %s: mismatching testid %" PRIu64 - " != %" PRIu64 "\n", - func, line, clientname, testid, mv->testid); - return 0; /* Ignore message */ - } - - if (err == RD_KAFKA_RESP_ERR__PARTITION_EOF && mv->ignore_eof) { - TEST_SAYL(3, "%s:%d: %s: ignoring EOF for %s [%" PRId32 "]\n", - func, line, clientname, topic, partition); - return 0; /* Ignore message */ - } - - p = test_msgver_p_get(mv, topic, partition, 1); - - if (err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { - p->eof_offset = offset; - return 1; - } - - m = test_mv_mvec_add(&p->mvec); - - m->offset = offset; - m->msgid = msgnum; - m->timestamp = timestamp; - m->broker_id = broker_id; - - if (test_level > 2) { - TEST_SAY( - "%s:%d: %s: " - "Recv msg %s [%" PRId32 "] offset %" PRId64 - " msgid %d " - "timestamp %" PRId64 " broker %" PRId32 "\n", - func, line, clientname, p->topic, p->partition, m->offset, - m->msgid, m->timestamp, m->broker_id); - } - - mv->msgcnt++; - - return 1; -} - -/** - * Adds a message to the msgver service. - * - * Message must be a proper message or PARTITION_EOF. - * - * @param override_topic if non-NULL, overrides the rkmessage's topic - * with this one. - * - * @returns 1 if message is from the expected testid, else 0 (not added). - */ -int test_msgver_add_msg0(const char *func, - int line, - const char *clientname, - test_msgver_t *mv, - const rd_kafka_message_t *rkmessage, - const char *override_topic) { - uint64_t in_testid; - int in_part; - int in_msgnum = -1; - char buf[128]; - const void *val; - size_t valsize; - - if (mv->fwd) - test_msgver_add_msg0(func, line, clientname, mv->fwd, rkmessage, - override_topic); - - if (rd_kafka_message_status(rkmessage) == - RD_KAFKA_MSG_STATUS_NOT_PERSISTED && - rkmessage->err) { - if (rkmessage->err != RD_KAFKA_RESP_ERR__PARTITION_EOF) - return 0; /* Ignore error */ - - in_testid = mv->testid; - - } else { - - if (!mv->msgid_hdr) { - rd_snprintf(buf, sizeof(buf), "%.*s", - (int)rkmessage->len, - (char *)rkmessage->payload); - val = buf; - } else { - /* msgid is in message header */ - rd_kafka_headers_t *hdrs; - - if (rd_kafka_message_headers(rkmessage, &hdrs) || - rd_kafka_header_get_last(hdrs, mv->msgid_hdr, &val, - &valsize)) { - TEST_SAYL(3, - "%s:%d: msgid expected in header %s " - "but %s exists for " - "message at offset %" PRId64 - " has no headers\n", - func, line, mv->msgid_hdr, - hdrs ? "no such header" - : "no headers", - rkmessage->offset); - - return 0; - } - } - - if (sscanf(val, "testid=%" SCNu64 ", partition=%i, msg=%i\n", - &in_testid, &in_part, &in_msgnum) != 3) - TEST_FAIL( - "%s:%d: Incorrect format at offset %" PRId64 ": %s", - func, line, rkmessage->offset, (const char *)val); - } - - return test_msgver_add_msg00( - func, line, clientname, mv, in_testid, - override_topic ? override_topic - : rd_kafka_topic_name(rkmessage->rkt), - rkmessage->partition, rkmessage->offset, - rd_kafka_message_timestamp(rkmessage, NULL), - rd_kafka_message_broker_id(rkmessage), rkmessage->err, in_msgnum); - return 1; -} - - - -/** - * Verify that all messages were received in order. - * - * - Offsets need to occur without gaps - * - msgids need to be increasing: but may have gaps, e.g., using partitioner) - */ -static int test_mv_mvec_verify_order(test_msgver_t *mv, - int flags, - struct test_mv_p *p, - struct test_mv_mvec *mvec, - struct test_mv_vs *vs) { - int mi; - int fails = 0; - - for (mi = 1 /*skip first*/; mi < mvec->cnt; mi++) { - struct test_mv_m *prev = test_mv_mvec_get(mvec, mi - 1); - struct test_mv_m *this = test_mv_mvec_get(mvec, mi); - - if (((flags & TEST_MSGVER_BY_OFFSET) && - prev->offset + 1 != this->offset) || - ((flags & TEST_MSGVER_BY_MSGID) && - prev->msgid > this->msgid)) { - TEST_MV_WARN(mv, - " %s [%" PRId32 - "] msg rcvidx #%d/%d: " - "out of order (prev vs this): " - "offset %" PRId64 " vs %" PRId64 - ", " - "msgid %d vs %d\n", - p ? p->topic : "*", p ? p->partition : -1, - mi, mvec->cnt, prev->offset, this->offset, - prev->msgid, this->msgid); - fails++; - } else if ((flags & TEST_MSGVER_BY_BROKER_ID) && - this->broker_id != vs->broker_id) { - TEST_MV_WARN(mv, - " %s [%" PRId32 - "] msg rcvidx #%d/%d: " - "broker id mismatch: expected %" PRId32 - ", not %" PRId32 "\n", - p ? p->topic : "*", p ? p->partition : -1, - mi, mvec->cnt, vs->broker_id, - this->broker_id); - fails++; - } - } - - return fails; -} - - -/** - * @brief Verify that messages correspond to 'correct' msgver. - */ -static int test_mv_mvec_verify_corr(test_msgver_t *mv, - int flags, - struct test_mv_p *p, - struct test_mv_mvec *mvec, - struct test_mv_vs *vs) { - int mi; - int fails = 0; - struct test_mv_p *corr_p = NULL; - struct test_mv_mvec *corr_mvec; - int verifycnt = 0; - - TEST_ASSERT(vs->corr); - - /* Get correct mvec for comparison. */ - if (p) - corr_p = test_msgver_p_get(vs->corr, p->topic, p->partition, 0); - if (!corr_p) { - TEST_MV_WARN(mv, - " %s [%" PRId32 - "]: " - "no corresponding correct partition found\n", - p ? p->topic : "*", p ? p->partition : -1); - return 1; - } - - corr_mvec = &corr_p->mvec; - - for (mi = 0; mi < mvec->cnt; mi++) { - struct test_mv_m *this = test_mv_mvec_get(mvec, mi); - const struct test_mv_m *corr; - - - if (flags & TEST_MSGVER_SUBSET) - corr = - test_mv_mvec_find_by_msgid(corr_mvec, this->msgid); - else - corr = test_mv_mvec_get(corr_mvec, mi); - - if (0) - TEST_MV_WARN(mv, - "msg #%d: msgid %d, offset %" PRId64 "\n", - mi, this->msgid, this->offset); - if (!corr) { - if (!(flags & TEST_MSGVER_SUBSET)) { - TEST_MV_WARN( - mv, - " %s [%" PRId32 - "] msg rcvidx #%d/%d: " - "out of range: correct mvec has " - "%d messages: " - "message offset %" PRId64 ", msgid %d\n", - p ? p->topic : "*", p ? p->partition : -1, - mi, mvec->cnt, corr_mvec->cnt, this->offset, - this->msgid); - fails++; - } - continue; - } - - if (((flags & TEST_MSGVER_BY_OFFSET) && - this->offset != corr->offset) || - ((flags & TEST_MSGVER_BY_MSGID) && - this->msgid != corr->msgid) || - ((flags & TEST_MSGVER_BY_TIMESTAMP) && - this->timestamp != corr->timestamp) || - ((flags & TEST_MSGVER_BY_BROKER_ID) && - this->broker_id != corr->broker_id)) { - TEST_MV_WARN( - mv, - " %s [%" PRId32 - "] msg rcvidx #%d/%d: " - "did not match correct msg: " - "offset %" PRId64 " vs %" PRId64 - ", " - "msgid %d vs %d, " - "timestamp %" PRId64 " vs %" PRId64 - ", " - "broker %" PRId32 " vs %" PRId32 " (fl 0x%x)\n", - p ? p->topic : "*", p ? p->partition : -1, mi, - mvec->cnt, this->offset, corr->offset, this->msgid, - corr->msgid, this->timestamp, corr->timestamp, - this->broker_id, corr->broker_id, flags); - fails++; - } else { - verifycnt++; - } - } - - if (verifycnt != corr_mvec->cnt && !(flags & TEST_MSGVER_SUBSET)) { - TEST_MV_WARN(mv, - " %s [%" PRId32 - "]: of %d input messages, " - "only %d/%d matched correct messages\n", - p ? p->topic : "*", p ? p->partition : -1, - mvec->cnt, verifycnt, corr_mvec->cnt); - fails++; - } - - return fails; -} - - - -static int test_mv_m_cmp_offset(const void *_a, const void *_b) { - const struct test_mv_m *a = _a, *b = _b; - - return RD_CMP(a->offset, b->offset); -} - -static int test_mv_m_cmp_msgid(const void *_a, const void *_b) { - const struct test_mv_m *a = _a, *b = _b; - - return RD_CMP(a->msgid, b->msgid); -} - - -/** - * Verify that there are no duplicate message. - * - * - Offsets are checked - * - msgids are checked - * - * * NOTE: This sorts the message (.m) array, first by offset, then by msgid - * and leaves the message array sorted (by msgid) - */ -static int test_mv_mvec_verify_dup(test_msgver_t *mv, - int flags, - struct test_mv_p *p, - struct test_mv_mvec *mvec, - struct test_mv_vs *vs) { - int mi; - int fails = 0; - enum { _P_OFFSET, _P_MSGID } pass; - - for (pass = _P_OFFSET; pass <= _P_MSGID; pass++) { - - if (pass == _P_OFFSET) { - if (!(flags & TEST_MSGVER_BY_OFFSET)) - continue; - test_mv_mvec_sort(mvec, test_mv_m_cmp_offset); - } else if (pass == _P_MSGID) { - if (!(flags & TEST_MSGVER_BY_MSGID)) - continue; - test_mv_mvec_sort(mvec, test_mv_m_cmp_msgid); - } - - for (mi = 1 /*skip first*/; mi < mvec->cnt; mi++) { - struct test_mv_m *prev = test_mv_mvec_get(mvec, mi - 1); - struct test_mv_m *this = test_mv_mvec_get(mvec, mi); - int is_dup = 0; - - if (pass == _P_OFFSET) - is_dup = prev->offset == this->offset; - else if (pass == _P_MSGID) - is_dup = prev->msgid == this->msgid; - - if (!is_dup) - continue; - - TEST_MV_WARN(mv, - " %s [%" PRId32 - "] " - "duplicate msg (prev vs this): " - "offset %" PRId64 " vs %" PRId64 - ", " - "msgid %d vs %d\n", - p ? p->topic : "*", p ? p->partition : -1, - prev->offset, this->offset, prev->msgid, - this->msgid); - fails++; - } - } - - return fails; -} - -/** - * @brief Verify that all messages are from the correct broker. - */ -static int test_mv_mvec_verify_broker(test_msgver_t *mv, - int flags, - struct test_mv_p *p, - struct test_mv_mvec *mvec, - struct test_mv_vs *vs) { - int mi; - int fails = 0; - - /* Assume that the correct flag has been checked already. */ - - - rd_assert(flags & TEST_MSGVER_BY_BROKER_ID); - for (mi = 0; mi < mvec->cnt; mi++) { - struct test_mv_m *this = test_mv_mvec_get(mvec, mi); - if (this->broker_id != vs->broker_id) { - TEST_MV_WARN( - mv, - " %s [%" PRId32 - "] broker_id check: " - "msgid #%d (at mi %d): " - "broker_id %" PRId32 - " is not the expected broker_id %" PRId32 "\n", - p ? p->topic : "*", p ? p->partition : -1, - this->msgid, mi, this->broker_id, vs->broker_id); - fails++; - } - } - return fails; -} - - -/** - * Verify that \p mvec contains the expected range: - * - TEST_MSGVER_BY_MSGID: msgid within \p vs->msgid_min .. \p vs->msgid_max - * - TEST_MSGVER_BY_TIMESTAMP: timestamp with \p vs->timestamp_min .. _max - * - * * NOTE: TEST_MSGVER_BY_MSGID is required - * - * * NOTE: This sorts the message (.m) array by msgid - * and leaves the message array sorted (by msgid) - */ -static int test_mv_mvec_verify_range(test_msgver_t *mv, - int flags, - struct test_mv_p *p, - struct test_mv_mvec *mvec, - struct test_mv_vs *vs) { - int mi; - int fails = 0; - int cnt = 0; - int exp_cnt = vs->msgid_max - vs->msgid_min + 1; - int skip_cnt = 0; - - if (!(flags & TEST_MSGVER_BY_MSGID)) - return 0; - - test_mv_mvec_sort(mvec, test_mv_m_cmp_msgid); - - // test_mv_mvec_dump(stdout, mvec); - - for (mi = 0; mi < mvec->cnt; mi++) { - struct test_mv_m *prev = - mi ? test_mv_mvec_get(mvec, mi - 1) : NULL; - struct test_mv_m *this = test_mv_mvec_get(mvec, mi); - - if (this->msgid < vs->msgid_min) { - skip_cnt++; - continue; - } else if (this->msgid > vs->msgid_max) - break; - - if (flags & TEST_MSGVER_BY_TIMESTAMP) { - if (this->timestamp < vs->timestamp_min || - this->timestamp > vs->timestamp_max) { - TEST_MV_WARN( - mv, - " %s [%" PRId32 - "] range check: " - "msgid #%d (at mi %d): " - "timestamp %" PRId64 - " outside " - "expected range %" PRId64 "..%" PRId64 "\n", - p ? p->topic : "*", p ? p->partition : -1, - this->msgid, mi, this->timestamp, - vs->timestamp_min, vs->timestamp_max); - fails++; - } - } - - if ((flags & TEST_MSGVER_BY_BROKER_ID) && - this->broker_id != vs->broker_id) { - TEST_MV_WARN( - mv, - " %s [%" PRId32 - "] range check: " - "msgid #%d (at mi %d): " - "expected broker id %" PRId32 ", not %" PRId32 "\n", - p ? p->topic : "*", p ? p->partition : -1, - this->msgid, mi, vs->broker_id, this->broker_id); - fails++; - } - - if (cnt++ == 0) { - if (this->msgid != vs->msgid_min) { - TEST_MV_WARN(mv, - " %s [%" PRId32 - "] range check: " - "first message #%d (at mi %d) " - "is not first in " - "expected range %d..%d\n", - p ? p->topic : "*", - p ? p->partition : -1, this->msgid, - mi, vs->msgid_min, vs->msgid_max); - fails++; - } - } else if (cnt > exp_cnt) { - TEST_MV_WARN(mv, - " %s [%" PRId32 - "] range check: " - "too many messages received (%d/%d) at " - "msgid %d for expected range %d..%d\n", - p ? p->topic : "*", p ? p->partition : -1, - cnt, exp_cnt, this->msgid, vs->msgid_min, - vs->msgid_max); - fails++; - } - - if (!prev) { - skip_cnt++; - continue; - } - - if (prev->msgid + 1 != this->msgid) { - TEST_MV_WARN(mv, - " %s [%" PRId32 - "] range check: " - " %d message(s) missing between " - "msgid %d..%d in expected range %d..%d\n", - p ? p->topic : "*", p ? p->partition : -1, - this->msgid - prev->msgid - 1, - prev->msgid + 1, this->msgid - 1, - vs->msgid_min, vs->msgid_max); - fails++; - } - } - - if (cnt != exp_cnt) { - TEST_MV_WARN(mv, - " %s [%" PRId32 - "] range check: " - " wrong number of messages seen, wanted %d got %d " - "in expected range %d..%d (%d messages skipped)\n", - p ? p->topic : "*", p ? p->partition : -1, exp_cnt, - cnt, vs->msgid_min, vs->msgid_max, skip_cnt); - fails++; - } - - return fails; -} - - - -/** - * Run verifier \p f for all partitions. - */ -#define test_mv_p_verify_f(mv, flags, f, vs) \ - test_mv_p_verify_f0(mv, flags, f, #f, vs) -static int test_mv_p_verify_f0(test_msgver_t *mv, - int flags, - int (*f)(test_msgver_t *mv, - int flags, - struct test_mv_p *p, - struct test_mv_mvec *mvec, - struct test_mv_vs *vs), - const char *f_name, - struct test_mv_vs *vs) { - int i; - int fails = 0; - - for (i = 0; i < mv->p_cnt; i++) { - TEST_SAY("Verifying %s [%" PRId32 "] %d msgs with %s\n", - mv->p[i]->topic, mv->p[i]->partition, - mv->p[i]->mvec.cnt, f_name); - fails += f(mv, flags, mv->p[i], &mv->p[i]->mvec, vs); - } - - return fails; -} - - -/** - * Collect all messages from all topics and partitions into vs->mvec - */ -static void test_mv_collect_all_msgs(test_msgver_t *mv, struct test_mv_vs *vs) { - int i; - - for (i = 0; i < mv->p_cnt; i++) { - struct test_mv_p *p = mv->p[i]; - int mi; - - test_mv_mvec_reserve(&vs->mvec, p->mvec.cnt); - for (mi = 0; mi < p->mvec.cnt; mi++) { - struct test_mv_m *m = test_mv_mvec_get(&p->mvec, mi); - struct test_mv_m *m_new = test_mv_mvec_add(&vs->mvec); - *m_new = *m; - } - } -} - - -/** - * Verify that all messages (by msgid) in range msg_base+exp_cnt were received - * and received only once. - * This works across all partitions. - */ -static int -test_msgver_verify_range(test_msgver_t *mv, int flags, struct test_mv_vs *vs) { - int fails = 0; - - /** - * Create temporary array to hold expected message set, - * then traverse all topics and partitions and move matching messages - * to that set. Then verify the message set. - */ - - test_mv_mvec_init(&vs->mvec, vs->exp_cnt); - - /* Collect all msgs into vs mvec */ - test_mv_collect_all_msgs(mv, vs); - - fails += test_mv_mvec_verify_range(mv, TEST_MSGVER_BY_MSGID | flags, - NULL, &vs->mvec, vs); - fails += test_mv_mvec_verify_dup(mv, TEST_MSGVER_BY_MSGID | flags, NULL, - &vs->mvec, vs); - - test_mv_mvec_clear(&vs->mvec); - - return fails; -} - - -/** - * Verify that \p exp_cnt messages were received for \p topic and \p partition - * starting at msgid base \p msg_base. - */ -int test_msgver_verify_part0(const char *func, - int line, - const char *what, - test_msgver_t *mv, - int flags, - const char *topic, - int partition, - int msg_base, - int exp_cnt) { - int fails = 0; - struct test_mv_vs vs = {.msg_base = msg_base, .exp_cnt = exp_cnt}; - struct test_mv_p *p; - - TEST_SAY( - "%s:%d: %s: Verifying %d received messages (flags 0x%x) " - "in %s [%d]: expecting msgids %d..%d (%d)\n", - func, line, what, mv->msgcnt, flags, topic, partition, msg_base, - msg_base + exp_cnt, exp_cnt); - - p = test_msgver_p_get(mv, topic, partition, 0); - - /* Per-partition checks */ - if (flags & TEST_MSGVER_ORDER) - fails += test_mv_mvec_verify_order(mv, flags, p, &p->mvec, &vs); - if (flags & TEST_MSGVER_DUP) - fails += test_mv_mvec_verify_dup(mv, flags, p, &p->mvec, &vs); - - if (mv->msgcnt < vs.exp_cnt) { - TEST_MV_WARN(mv, - "%s:%d: " - "%s [%" PRId32 - "] expected %d messages but only " - "%d received\n", - func, line, p ? p->topic : "*", - p ? p->partition : -1, vs.exp_cnt, mv->msgcnt); - fails++; - } - - - if (mv->log_suppr_cnt > 0) - TEST_WARN("%s:%d: %s: %d message warning logs suppressed\n", - func, line, what, mv->log_suppr_cnt); - - if (fails) - TEST_FAIL( - "%s:%d: %s: Verification of %d received messages " - "failed: " - "expected msgids %d..%d (%d): see previous errors\n", - func, line, what, mv->msgcnt, msg_base, msg_base + exp_cnt, - exp_cnt); - else - TEST_SAY( - "%s:%d: %s: Verification of %d received messages " - "succeeded: " - "expected msgids %d..%d (%d)\n", - func, line, what, mv->msgcnt, msg_base, msg_base + exp_cnt, - exp_cnt); - - return fails; -} - -/** - * Verify that \p exp_cnt messages were received starting at - * msgid base \p msg_base. - */ -int test_msgver_verify0(const char *func, - int line, - const char *what, - test_msgver_t *mv, - int flags, - struct test_mv_vs vs) { - int fails = 0; - - TEST_SAY( - "%s:%d: %s: Verifying %d received messages (flags 0x%x): " - "expecting msgids %d..%d (%d)\n", - func, line, what, mv->msgcnt, flags, vs.msg_base, - vs.msg_base + vs.exp_cnt, vs.exp_cnt); - if (flags & TEST_MSGVER_BY_TIMESTAMP) { - assert((flags & TEST_MSGVER_BY_MSGID)); /* Required */ - TEST_SAY( - "%s:%d: %s: " - " and expecting timestamps %" PRId64 "..%" PRId64 "\n", - func, line, what, vs.timestamp_min, vs.timestamp_max); - } - - /* Per-partition checks */ - if (flags & TEST_MSGVER_ORDER) - fails += test_mv_p_verify_f(mv, flags, - test_mv_mvec_verify_order, &vs); - if (flags & TEST_MSGVER_DUP) - fails += - test_mv_p_verify_f(mv, flags, test_mv_mvec_verify_dup, &vs); - - if (flags & TEST_MSGVER_BY_BROKER_ID) - fails += test_mv_p_verify_f(mv, flags, - test_mv_mvec_verify_broker, &vs); - - /* Checks across all partitions */ - if ((flags & TEST_MSGVER_RANGE) && vs.exp_cnt > 0) { - vs.msgid_min = vs.msg_base; - vs.msgid_max = vs.msgid_min + vs.exp_cnt - 1; - fails += test_msgver_verify_range(mv, flags, &vs); - } - - if (mv->log_suppr_cnt > 0) - TEST_WARN("%s:%d: %s: %d message warning logs suppressed\n", - func, line, what, mv->log_suppr_cnt); - - if (vs.exp_cnt != mv->msgcnt) { - if (!(flags & TEST_MSGVER_SUBSET)) { - TEST_WARN("%s:%d: %s: expected %d messages, got %d\n", - func, line, what, vs.exp_cnt, mv->msgcnt); - fails++; - } - } - - if (fails) - TEST_FAIL( - "%s:%d: %s: Verification of %d received messages " - "failed: " - "expected msgids %d..%d (%d): see previous errors\n", - func, line, what, mv->msgcnt, vs.msg_base, - vs.msg_base + vs.exp_cnt, vs.exp_cnt); - else - TEST_SAY( - "%s:%d: %s: Verification of %d received messages " - "succeeded: " - "expected msgids %d..%d (%d)\n", - func, line, what, mv->msgcnt, vs.msg_base, - vs.msg_base + vs.exp_cnt, vs.exp_cnt); - - return fails; -} - - - -void test_verify_rkmessage0(const char *func, - int line, - rd_kafka_message_t *rkmessage, - uint64_t testid, - int32_t partition, - int msgnum) { - uint64_t in_testid; - int in_part; - int in_msgnum; - char buf[128]; - - rd_snprintf(buf, sizeof(buf), "%.*s", (int)rkmessage->len, - (char *)rkmessage->payload); - - if (sscanf(buf, "testid=%" SCNu64 ", partition=%i, msg=%i\n", - &in_testid, &in_part, &in_msgnum) != 3) - TEST_FAIL("Incorrect format: %s", buf); - - if (testid != in_testid || (partition != -1 && partition != in_part) || - (msgnum != -1 && msgnum != in_msgnum) || in_msgnum < 0) - goto fail_match; - - if (test_level > 2) { - TEST_SAY("%s:%i: Our testid %" PRIu64 - ", part %i (%i), msg %i\n", - func, line, testid, (int)partition, - (int)rkmessage->partition, msgnum); - } - - - return; - -fail_match: - TEST_FAIL("%s:%i: Our testid %" PRIu64 - ", part %i, msg %i did " - "not match message: \"%s\"\n", - func, line, testid, (int)partition, msgnum, buf); -} - - -/** - * @brief Verify that \p mv is identical to \p corr according to flags. - */ -void test_msgver_verify_compare0(const char *func, - int line, - const char *what, - test_msgver_t *mv, - test_msgver_t *corr, - int flags) { - struct test_mv_vs vs; - int fails = 0; - - memset(&vs, 0, sizeof(vs)); - - TEST_SAY( - "%s:%d: %s: Verifying %d received messages (flags 0x%x) by " - "comparison to correct msgver (%d messages)\n", - func, line, what, mv->msgcnt, flags, corr->msgcnt); - - vs.corr = corr; - - /* Per-partition checks */ - fails += test_mv_p_verify_f(mv, flags, test_mv_mvec_verify_corr, &vs); - - if (mv->log_suppr_cnt > 0) - TEST_WARN("%s:%d: %s: %d message warning logs suppressed\n", - func, line, what, mv->log_suppr_cnt); - - if (corr->msgcnt != mv->msgcnt) { - if (!(flags & TEST_MSGVER_SUBSET)) { - TEST_WARN("%s:%d: %s: expected %d messages, got %d\n", - func, line, what, corr->msgcnt, mv->msgcnt); - fails++; - } - } - - if (fails) - TEST_FAIL( - "%s:%d: %s: Verification of %d received messages " - "failed: expected %d messages: see previous errors\n", - func, line, what, mv->msgcnt, corr->msgcnt); - else - TEST_SAY( - "%s:%d: %s: Verification of %d received messages " - "succeeded: matching %d messages from correct msgver\n", - func, line, what, mv->msgcnt, corr->msgcnt); -} - - -/** - * Consumer poll but dont expect any proper messages for \p timeout_ms. - */ -void test_consumer_poll_no_msgs(const char *what, - rd_kafka_t *rk, - uint64_t testid, - int timeout_ms) { - int64_t tmout = test_clock() + ((int64_t)timeout_ms * 1000); - int cnt = 0; - test_timing_t t_cons; - test_msgver_t mv; - - test_msgver_init(&mv, testid); - - if (what) - TEST_SAY("%s: not expecting any messages for %dms\n", what, - timeout_ms); - - TIMING_START(&t_cons, "CONSUME"); - - do { - rd_kafka_message_t *rkmessage; - - rkmessage = rd_kafka_consumer_poll(rk, timeout_ms); - if (!rkmessage) - continue; - - if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { - TEST_SAY("%s [%" PRId32 - "] reached EOF at " - "offset %" PRId64 "\n", - rd_kafka_topic_name(rkmessage->rkt), - rkmessage->partition, rkmessage->offset); - test_msgver_add_msg(rk, &mv, rkmessage); - - } else if (rkmessage->err) { - TEST_FAIL( - "%s [%" PRId32 "] error (offset %" PRId64 "): %s", - rkmessage->rkt ? rd_kafka_topic_name(rkmessage->rkt) - : "(no-topic)", - rkmessage->partition, rkmessage->offset, - rd_kafka_message_errstr(rkmessage)); - - } else { - if (test_msgver_add_msg(rk, &mv, rkmessage)) { - TEST_MV_WARN( - &mv, - "Received unexpected message on " - "%s [%" PRId32 - "] at offset " - "%" PRId64 "\n", - rd_kafka_topic_name(rkmessage->rkt), - rkmessage->partition, rkmessage->offset); - cnt++; - } - } - - rd_kafka_message_destroy(rkmessage); - } while (test_clock() <= tmout); - - if (what) - TIMING_STOP(&t_cons); - - test_msgver_verify(what, &mv, TEST_MSGVER_ALL, 0, 0); - test_msgver_clear(&mv); - - TEST_ASSERT(cnt == 0, "Expected 0 messages, got %d", cnt); -} - -/** - * @brief Consumer poll with expectation that a \p err will be reached - * within \p timeout_ms. - */ -void test_consumer_poll_expect_err(rd_kafka_t *rk, - uint64_t testid, - int timeout_ms, - rd_kafka_resp_err_t err) { - int64_t tmout = test_clock() + ((int64_t)timeout_ms * 1000); - - TEST_SAY("%s: expecting error %s within %dms\n", rd_kafka_name(rk), - rd_kafka_err2name(err), timeout_ms); - - do { - rd_kafka_message_t *rkmessage; - rkmessage = rd_kafka_consumer_poll(rk, timeout_ms); - if (!rkmessage) - continue; - - if (rkmessage->err == err) { - TEST_SAY("Got expected error: %s: %s\n", - rd_kafka_err2name(rkmessage->err), - rd_kafka_message_errstr(rkmessage)); - rd_kafka_message_destroy(rkmessage); - - return; - } else if (rkmessage->err) { - TEST_FAIL("%s [%" PRId32 - "] unexpected error " - "(offset %" PRId64 "): %s", - rkmessage->rkt - ? rd_kafka_topic_name(rkmessage->rkt) - : "(no-topic)", - rkmessage->partition, rkmessage->offset, - rd_kafka_err2name(rkmessage->err)); - } - - rd_kafka_message_destroy(rkmessage); - } while (test_clock() <= tmout); - TEST_FAIL("Expected error %s not seen in %dms", rd_kafka_err2name(err), - timeout_ms); -} - -/** - * Call consumer poll once and then return. - * Messages are handled. - * - * \p mv is optional - * - * @returns 0 on timeout, 1 if a message was received or .._PARTITION_EOF - * if EOF was reached. - * TEST_FAIL()s on all errors. - */ -int test_consumer_poll_once(rd_kafka_t *rk, test_msgver_t *mv, int timeout_ms) { - rd_kafka_message_t *rkmessage; - - rkmessage = rd_kafka_consumer_poll(rk, timeout_ms); - if (!rkmessage) - return 0; - - if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { - TEST_SAY("%s [%" PRId32 - "] reached EOF at " - "offset %" PRId64 "\n", - rd_kafka_topic_name(rkmessage->rkt), - rkmessage->partition, rkmessage->offset); - if (mv) - test_msgver_add_msg(rk, mv, rkmessage); - rd_kafka_message_destroy(rkmessage); - return RD_KAFKA_RESP_ERR__PARTITION_EOF; - - } else if (rkmessage->err) { - TEST_FAIL("%s [%" PRId32 "] error (offset %" PRId64 "): %s", - rkmessage->rkt ? rd_kafka_topic_name(rkmessage->rkt) - : "(no-topic)", - rkmessage->partition, rkmessage->offset, - rd_kafka_message_errstr(rkmessage)); - - } else { - if (mv) - test_msgver_add_msg(rk, mv, rkmessage); - } - - rd_kafka_message_destroy(rkmessage); - return 1; -} - -/** - * @param exact Require exact exp_eof_cnt (unless -1) and exp_cnt (unless -1). - * If false: poll until either one is reached. - * @param timeout_ms Each call to poll has a timeout set by this argument. The - * test fails if any poll times out. - */ -int test_consumer_poll_exact_timeout(const char *what, - rd_kafka_t *rk, - uint64_t testid, - int exp_eof_cnt, - int exp_msg_base, - int exp_cnt, - rd_bool_t exact, - test_msgver_t *mv, - int timeout_ms) { - int eof_cnt = 0; - int cnt = 0; - test_timing_t t_cons; - - TEST_SAY("%s: consume %s%d messages\n", what, exact ? "exactly " : "", - exp_cnt); - - TIMING_START(&t_cons, "CONSUME"); - - while ((!exact && ((exp_eof_cnt <= 0 || eof_cnt < exp_eof_cnt) && - (exp_cnt <= 0 || cnt < exp_cnt))) || - (exact && (eof_cnt < exp_eof_cnt || cnt < exp_cnt))) { - rd_kafka_message_t *rkmessage; - - rkmessage = - rd_kafka_consumer_poll(rk, tmout_multip(timeout_ms)); - if (!rkmessage) /* Shouldn't take this long to get a msg */ - TEST_FAIL( - "%s: consumer_poll() timeout " - "(%d/%d eof, %d/%d msgs)\n", - what, eof_cnt, exp_eof_cnt, cnt, exp_cnt); - - - if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { - TEST_SAY("%s [%" PRId32 - "] reached EOF at " - "offset %" PRId64 "\n", - rd_kafka_topic_name(rkmessage->rkt), - rkmessage->partition, rkmessage->offset); - TEST_ASSERT(exp_eof_cnt != 0, "expected no EOFs"); - if (mv) - test_msgver_add_msg(rk, mv, rkmessage); - eof_cnt++; - - } else if (rkmessage->err) { - TEST_FAIL( - "%s [%" PRId32 "] error (offset %" PRId64 "): %s", - rkmessage->rkt ? rd_kafka_topic_name(rkmessage->rkt) - : "(no-topic)", - rkmessage->partition, rkmessage->offset, - rd_kafka_message_errstr(rkmessage)); - - } else { - TEST_SAYL(4, - "%s: consumed message on %s [%" PRId32 - "] " - "at offset %" PRId64 " (leader epoch %" PRId32 - ")\n", - what, rd_kafka_topic_name(rkmessage->rkt), - rkmessage->partition, rkmessage->offset, - rd_kafka_message_leader_epoch(rkmessage)); - - if (!mv || test_msgver_add_msg(rk, mv, rkmessage)) - cnt++; - } - - rd_kafka_message_destroy(rkmessage); - } - - TIMING_STOP(&t_cons); - - TEST_SAY("%s: consumed %d/%d messages (%d/%d EOFs)\n", what, cnt, - exp_cnt, eof_cnt, exp_eof_cnt); - - TEST_ASSERT(!exact || ((exp_cnt == -1 || exp_cnt == cnt) && - (exp_eof_cnt == -1 || exp_eof_cnt == eof_cnt)), - "%s: mismatch between exact expected counts and actual: " - "%d/%d EOFs, %d/%d msgs", - what, eof_cnt, exp_eof_cnt, cnt, exp_cnt); - - if (exp_cnt == 0) - TEST_ASSERT(cnt == 0 && eof_cnt == exp_eof_cnt, - "%s: expected no messages and %d EOFs: " - "got %d messages and %d EOFs", - what, exp_eof_cnt, cnt, eof_cnt); - return cnt; -} - - -/** - * @param exact Require exact exp_eof_cnt (unless -1) and exp_cnt (unless -1). - * If false: poll until either one is reached. - */ -int test_consumer_poll_exact(const char *what, - rd_kafka_t *rk, - uint64_t testid, - int exp_eof_cnt, - int exp_msg_base, - int exp_cnt, - rd_bool_t exact, - test_msgver_t *mv) { - return test_consumer_poll_exact_timeout(what, rk, testid, exp_eof_cnt, - exp_msg_base, exp_cnt, exact, - mv, 10 * 1000); -} - -int test_consumer_poll(const char *what, - rd_kafka_t *rk, - uint64_t testid, - int exp_eof_cnt, - int exp_msg_base, - int exp_cnt, - test_msgver_t *mv) { - return test_consumer_poll_exact(what, rk, testid, exp_eof_cnt, - exp_msg_base, exp_cnt, - rd_false /*not exact */, mv); -} - -int test_consumer_poll_timeout(const char *what, - rd_kafka_t *rk, - uint64_t testid, - int exp_eof_cnt, - int exp_msg_base, - int exp_cnt, - test_msgver_t *mv, - int timeout_ms) { - return test_consumer_poll_exact_timeout( - what, rk, testid, exp_eof_cnt, exp_msg_base, exp_cnt, - rd_false /*not exact */, mv, timeout_ms); -} - -void test_consumer_close(rd_kafka_t *rk) { - rd_kafka_resp_err_t err; - test_timing_t timing; - - TEST_SAY("Closing consumer %s\n", rd_kafka_name(rk)); - - TIMING_START(&timing, "CONSUMER.CLOSE"); - err = rd_kafka_consumer_close(rk); - TIMING_STOP(&timing); - if (err) - TEST_FAIL("Failed to close consumer: %s\n", - rd_kafka_err2str(err)); -} - - -void test_flush(rd_kafka_t *rk, int timeout_ms) { - test_timing_t timing; - rd_kafka_resp_err_t err; - - TEST_SAY("%s: Flushing %d messages\n", rd_kafka_name(rk), - rd_kafka_outq_len(rk)); - TIMING_START(&timing, "FLUSH"); - err = rd_kafka_flush(rk, timeout_ms); - TIMING_STOP(&timing); - if (err) - TEST_FAIL("Failed to flush(%s, %d): %s: len() = %d\n", - rd_kafka_name(rk), timeout_ms, rd_kafka_err2str(err), - rd_kafka_outq_len(rk)); -} - - -void test_conf_set(rd_kafka_conf_t *conf, const char *name, const char *val) { - char errstr[512]; - if (rd_kafka_conf_set(conf, name, val, errstr, sizeof(errstr)) != - RD_KAFKA_CONF_OK) - TEST_FAIL("Failed to set config \"%s\"=\"%s\": %s\n", name, val, - errstr); -} - -/** - * @brief Get configuration value for property \p name. - * - * @param conf Configuration to get value from. If NULL the test.conf (if any) - * configuration will be used. - */ -char *test_conf_get(const rd_kafka_conf_t *conf, const char *name) { - static RD_TLS char ret[256]; - size_t ret_sz = sizeof(ret); - rd_kafka_conf_t *def_conf = NULL; - - if (!conf) /* Use the current test.conf */ - test_conf_init(&def_conf, NULL, 0); - - if (rd_kafka_conf_get(conf ? conf : def_conf, name, ret, &ret_sz) != - RD_KAFKA_CONF_OK) - TEST_FAIL("Failed to get config \"%s\": %s\n", name, - "unknown property"); - - if (def_conf) - rd_kafka_conf_destroy(def_conf); - - return ret; -} - - -char *test_topic_conf_get(const rd_kafka_topic_conf_t *tconf, - const char *name) { - static RD_TLS char ret[256]; - size_t ret_sz = sizeof(ret); - if (rd_kafka_topic_conf_get(tconf, name, ret, &ret_sz) != - RD_KAFKA_CONF_OK) - TEST_FAIL("Failed to get topic config \"%s\": %s\n", name, - "unknown property"); - return ret; -} - - -/** - * @brief Check if property \name matches \p val in \p conf. - * If \p conf is NULL the test config will be used. */ -int test_conf_match(rd_kafka_conf_t *conf, const char *name, const char *val) { - char *real; - int free_conf = 0; - - if (!conf) { - test_conf_init(&conf, NULL, 0); - free_conf = 1; - } - - real = test_conf_get(conf, name); - - if (free_conf) - rd_kafka_conf_destroy(conf); - - return !strcmp(real, val); -} - - -void test_topic_conf_set(rd_kafka_topic_conf_t *tconf, - const char *name, - const char *val) { - char errstr[512]; - if (rd_kafka_topic_conf_set(tconf, name, val, errstr, sizeof(errstr)) != - RD_KAFKA_CONF_OK) - TEST_FAIL("Failed to set topic config \"%s\"=\"%s\": %s\n", - name, val, errstr); -} - -/** - * @brief First attempt to set topic level property, then global. - */ -void test_any_conf_set(rd_kafka_conf_t *conf, - rd_kafka_topic_conf_t *tconf, - const char *name, - const char *val) { - rd_kafka_conf_res_t res = RD_KAFKA_CONF_UNKNOWN; - char errstr[512] = {"Missing conf_t"}; - - if (tconf) - res = rd_kafka_topic_conf_set(tconf, name, val, errstr, - sizeof(errstr)); - if (res == RD_KAFKA_CONF_UNKNOWN && conf) - res = - rd_kafka_conf_set(conf, name, val, errstr, sizeof(errstr)); - - if (res != RD_KAFKA_CONF_OK) - TEST_FAIL("Failed to set any config \"%s\"=\"%s\": %s\n", name, - val, errstr); -} - - -/** - * @returns true if test clients need to be configured for authentication - * or other security measures (SSL), else false for unauthed plaintext. - */ -int test_needs_auth(void) { - rd_kafka_conf_t *conf; - const char *sec; - - test_conf_init(&conf, NULL, 0); - - sec = test_conf_get(conf, "security.protocol"); - - rd_kafka_conf_destroy(conf); - - return strcmp(sec, "plaintext"); -} - - -void test_print_partition_list( - const rd_kafka_topic_partition_list_t *partitions) { - int i; - for (i = 0; i < partitions->cnt; i++) { - TEST_SAY(" %s [%" PRId32 "] offset %" PRId64 " (epoch %" PRId32 - ") %s%s\n", - partitions->elems[i].topic, - partitions->elems[i].partition, - partitions->elems[i].offset, - rd_kafka_topic_partition_get_leader_epoch( - &partitions->elems[i]), - partitions->elems[i].err ? ": " : "", - partitions->elems[i].err - ? rd_kafka_err2str(partitions->elems[i].err) - : ""); - } -} - -/** - * @brief Compare two lists, returning 0 if equal. - * - * @remark The lists may be sorted by this function. - */ -int test_partition_list_cmp(rd_kafka_topic_partition_list_t *al, - rd_kafka_topic_partition_list_t *bl) { - int i; - - if (al->cnt < bl->cnt) - return -1; - else if (al->cnt > bl->cnt) - return 1; - else if (al->cnt == 0) - return 0; - - rd_kafka_topic_partition_list_sort(al, NULL, NULL); - rd_kafka_topic_partition_list_sort(bl, NULL, NULL); - - for (i = 0; i < al->cnt; i++) { - const rd_kafka_topic_partition_t *a = &al->elems[i]; - const rd_kafka_topic_partition_t *b = &bl->elems[i]; - if (a->partition != b->partition || strcmp(a->topic, b->topic)) - return -1; - } - - return 0; -} - -/** - * @brief Compare two lists and their offsets, returning 0 if equal. - * - * @remark The lists may be sorted by this function. - */ -int test_partition_list_and_offsets_cmp(rd_kafka_topic_partition_list_t *al, - rd_kafka_topic_partition_list_t *bl) { - int i; - - if (al->cnt < bl->cnt) - return -1; - else if (al->cnt > bl->cnt) - return 1; - else if (al->cnt == 0) - return 0; - - rd_kafka_topic_partition_list_sort(al, NULL, NULL); - rd_kafka_topic_partition_list_sort(bl, NULL, NULL); - - for (i = 0; i < al->cnt; i++) { - const rd_kafka_topic_partition_t *a = &al->elems[i]; - const rd_kafka_topic_partition_t *b = &bl->elems[i]; - if (a->partition != b->partition || - strcmp(a->topic, b->topic) || a->offset != b->offset || - rd_kafka_topic_partition_get_leader_epoch(a) != - rd_kafka_topic_partition_get_leader_epoch(b)) - return -1; - } - - return 0; -} - -/** - * @brief Execute script from the Kafka distribution bin/ path. - */ -void test_kafka_cmd(const char *fmt, ...) { -#ifdef _WIN32 - TEST_FAIL("%s not supported on Windows, yet", __FUNCTION__); -#else - char cmd[1024]; - int r; - va_list ap; - test_timing_t t_cmd; - const char *kpath; - - kpath = test_getenv("KAFKA_PATH", NULL); - - if (!kpath) - TEST_FAIL("%s: KAFKA_PATH must be set", __FUNCTION__); - - r = rd_snprintf(cmd, sizeof(cmd), "%s/bin/", kpath); - TEST_ASSERT(r < (int)sizeof(cmd)); - - va_start(ap, fmt); - rd_vsnprintf(cmd + r, sizeof(cmd) - r, fmt, ap); - va_end(ap); - - TEST_SAY("Executing: %s\n", cmd); - TIMING_START(&t_cmd, "exec"); - r = system(cmd); - TIMING_STOP(&t_cmd); - - if (r == -1) - TEST_FAIL("system(\"%s\") failed: %s", cmd, strerror(errno)); - else if (WIFSIGNALED(r)) - TEST_FAIL("system(\"%s\") terminated by signal %d\n", cmd, - WTERMSIG(r)); - else if (WEXITSTATUS(r)) - TEST_FAIL("system(\"%s\") failed with exit status %d\n", cmd, - WEXITSTATUS(r)); -#endif -} - -/** - * @brief Execute kafka-topics.sh from the Kafka distribution. - */ -void test_kafka_topics(const char *fmt, ...) { -#ifdef _WIN32 - TEST_FAIL("%s not supported on Windows, yet", __FUNCTION__); -#else - char cmd[1024]; - int r, bytes_left; - va_list ap; - test_timing_t t_cmd; - const char *kpath, *bootstrap_env, *flag, *bootstrap_srvs; - - if (test_broker_version >= TEST_BRKVER(3, 0, 0, 0)) { - bootstrap_env = "BROKERS"; - flag = "--bootstrap-server"; - } else { - bootstrap_env = "ZK_ADDRESS"; - flag = "--zookeeper"; - } - - kpath = test_getenv("KAFKA_PATH", NULL); - bootstrap_srvs = test_getenv(bootstrap_env, NULL); - - if (!kpath || !bootstrap_srvs) - TEST_FAIL("%s: KAFKA_PATH and %s must be set", __FUNCTION__, - bootstrap_env); - - r = rd_snprintf(cmd, sizeof(cmd), "%s/bin/kafka-topics.sh %s %s ", - kpath, flag, bootstrap_srvs); - TEST_ASSERT(r > 0 && r < (int)sizeof(cmd)); - - bytes_left = sizeof(cmd) - r; - - va_start(ap, fmt); - r = rd_vsnprintf(cmd + r, bytes_left, fmt, ap); - va_end(ap); - TEST_ASSERT(r > 0 && r < bytes_left); - - TEST_SAY("Executing: %s\n", cmd); - TIMING_START(&t_cmd, "exec"); - r = system(cmd); - TIMING_STOP(&t_cmd); - - if (r == -1) - TEST_FAIL("system(\"%s\") failed: %s", cmd, strerror(errno)); - else if (WIFSIGNALED(r)) - TEST_FAIL("system(\"%s\") terminated by signal %d\n", cmd, - WTERMSIG(r)); - else if (WEXITSTATUS(r)) - TEST_FAIL("system(\"%s\") failed with exit status %d\n", cmd, - WEXITSTATUS(r)); -#endif -} - - - -/** - * @brief Create topic using Topic Admin API - * - * @param configs is an optional key-value tuple array of - * topic configs (or NULL). - */ -void test_admin_create_topic(rd_kafka_t *use_rk, - const char *topicname, - int partition_cnt, - int replication_factor, - const char **configs) { - rd_kafka_t *rk; - rd_kafka_NewTopic_t *newt[1]; - const size_t newt_cnt = 1; - rd_kafka_AdminOptions_t *options; - rd_kafka_queue_t *rkqu; - rd_kafka_event_t *rkev; - const rd_kafka_CreateTopics_result_t *res; - const rd_kafka_topic_result_t **terr; - int timeout_ms = tmout_multip(10000); - size_t res_cnt; - rd_kafka_resp_err_t err; - char errstr[512]; - test_timing_t t_create; - - if (!(rk = use_rk)) - rk = test_create_producer(); - - rkqu = rd_kafka_queue_new(rk); - - newt[0] = - rd_kafka_NewTopic_new(topicname, partition_cnt, replication_factor, - errstr, sizeof(errstr)); - TEST_ASSERT(newt[0] != NULL, "%s", errstr); - - if (configs) { - int i; - - for (i = 0; configs[i] && configs[i + 1]; i += 2) - TEST_CALL_ERR__(rd_kafka_NewTopic_set_config( - newt[0], configs[i], configs[i + 1])); - } - - options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_CREATETOPICS); - err = rd_kafka_AdminOptions_set_operation_timeout( - options, timeout_ms, errstr, sizeof(errstr)); - TEST_ASSERT(!err, "%s", errstr); - - TEST_SAY( - "Creating topic \"%s\" " - "(partitions=%d, replication_factor=%d, timeout=%d)\n", - topicname, partition_cnt, replication_factor, timeout_ms); - - TIMING_START(&t_create, "CreateTopics"); - rd_kafka_CreateTopics(rk, newt, newt_cnt, options, rkqu); - - /* Wait for result */ - rkev = rd_kafka_queue_poll(rkqu, timeout_ms + 2000); - TEST_ASSERT(rkev, "Timed out waiting for CreateTopics result"); - - TIMING_STOP(&t_create); - - TEST_ASSERT(!rd_kafka_event_error(rkev), "CreateTopics failed: %s", - rd_kafka_event_error_string(rkev)); - - res = rd_kafka_event_CreateTopics_result(rkev); - TEST_ASSERT(res, "Expected CreateTopics_result, not %s", - rd_kafka_event_name(rkev)); - - terr = rd_kafka_CreateTopics_result_topics(res, &res_cnt); - TEST_ASSERT(terr, "CreateTopics_result_topics returned NULL"); - TEST_ASSERT(res_cnt == newt_cnt, - "CreateTopics_result_topics returned %" PRIusz - " topics, " - "not the expected %" PRIusz, - res_cnt, newt_cnt); - - TEST_ASSERT(!rd_kafka_topic_result_error(terr[0]) || - rd_kafka_topic_result_error(terr[0]) == - RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS, - "Topic %s result error: %s", - rd_kafka_topic_result_name(terr[0]), - rd_kafka_topic_result_error_string(terr[0])); - - rd_kafka_event_destroy(rkev); - - rd_kafka_queue_destroy(rkqu); - - rd_kafka_AdminOptions_destroy(options); - - rd_kafka_NewTopic_destroy(newt[0]); - - if (!use_rk) - rd_kafka_destroy(rk); -} - - - -/** - * @brief Create topic using kafka-topics.sh --create - */ -static void test_create_topic_sh(const char *topicname, - int partition_cnt, - int replication_factor) { - test_kafka_topics( - "--create --topic \"%s\" " - "--replication-factor %d --partitions %d", - topicname, replication_factor, partition_cnt); -} - - -/** - * @brief Create topic - */ -void test_create_topic(rd_kafka_t *use_rk, - const char *topicname, - int partition_cnt, - int replication_factor) { - if (test_broker_version < TEST_BRKVER(0, 10, 2, 0)) - test_create_topic_sh(topicname, partition_cnt, - replication_factor); - else - test_admin_create_topic(use_rk, topicname, partition_cnt, - replication_factor, NULL); -} - - -/** - * @brief Create topic using kafka-topics.sh --delete - */ -static void test_delete_topic_sh(const char *topicname) { - test_kafka_topics("--delete --topic \"%s\" ", topicname); -} - - -/** - * @brief Delete topic using Topic Admin API - */ -static void test_admin_delete_topic(rd_kafka_t *use_rk, const char *topicname) { - rd_kafka_t *rk; - rd_kafka_DeleteTopic_t *delt[1]; - const size_t delt_cnt = 1; - rd_kafka_AdminOptions_t *options; - rd_kafka_queue_t *rkqu; - rd_kafka_event_t *rkev; - const rd_kafka_DeleteTopics_result_t *res; - const rd_kafka_topic_result_t **terr; - int timeout_ms = tmout_multip(10000); - size_t res_cnt; - rd_kafka_resp_err_t err; - char errstr[512]; - test_timing_t t_create; - - if (!(rk = use_rk)) - rk = test_create_producer(); - - rkqu = rd_kafka_queue_new(rk); - - delt[0] = rd_kafka_DeleteTopic_new(topicname); - - options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DELETETOPICS); - err = rd_kafka_AdminOptions_set_operation_timeout( - options, timeout_ms, errstr, sizeof(errstr)); - TEST_ASSERT(!err, "%s", errstr); - - TEST_SAY( - "Deleting topic \"%s\" " - "(timeout=%d)\n", - topicname, timeout_ms); - - TIMING_START(&t_create, "DeleteTopics"); - rd_kafka_DeleteTopics(rk, delt, delt_cnt, options, rkqu); - - /* Wait for result */ - rkev = rd_kafka_queue_poll(rkqu, timeout_ms + 2000); - TEST_ASSERT(rkev, "Timed out waiting for DeleteTopics result"); - - TIMING_STOP(&t_create); - - res = rd_kafka_event_DeleteTopics_result(rkev); - TEST_ASSERT(res, "Expected DeleteTopics_result, not %s", - rd_kafka_event_name(rkev)); - - terr = rd_kafka_DeleteTopics_result_topics(res, &res_cnt); - TEST_ASSERT(terr, "DeleteTopics_result_topics returned NULL"); - TEST_ASSERT(res_cnt == delt_cnt, - "DeleteTopics_result_topics returned %" PRIusz - " topics, " - "not the expected %" PRIusz, - res_cnt, delt_cnt); - - TEST_ASSERT(!rd_kafka_topic_result_error(terr[0]), - "Topic %s result error: %s", - rd_kafka_topic_result_name(terr[0]), - rd_kafka_topic_result_error_string(terr[0])); - - rd_kafka_event_destroy(rkev); - - rd_kafka_queue_destroy(rkqu); - - rd_kafka_AdminOptions_destroy(options); - - rd_kafka_DeleteTopic_destroy(delt[0]); - - if (!use_rk) - rd_kafka_destroy(rk); -} - - -/** - * @brief Delete a topic - */ -void test_delete_topic(rd_kafka_t *use_rk, const char *topicname) { - if (test_broker_version < TEST_BRKVER(0, 10, 2, 0)) - test_delete_topic_sh(topicname); - else - test_admin_delete_topic(use_rk, topicname); -} - - -/** - * @brief Create additional partitions for a topic using Admin API - */ -static void test_admin_create_partitions(rd_kafka_t *use_rk, - const char *topicname, - int new_partition_cnt) { - rd_kafka_t *rk; - rd_kafka_NewPartitions_t *newp[1]; - const size_t newp_cnt = 1; - rd_kafka_AdminOptions_t *options; - rd_kafka_queue_t *rkqu; - rd_kafka_event_t *rkev; - const rd_kafka_CreatePartitions_result_t *res; - const rd_kafka_topic_result_t **terr; - int timeout_ms = tmout_multip(10000); - size_t res_cnt; - rd_kafka_resp_err_t err; - char errstr[512]; - test_timing_t t_create; - - if (!(rk = use_rk)) - rk = test_create_producer(); - - rkqu = rd_kafka_queue_new(rk); - - newp[0] = rd_kafka_NewPartitions_new(topicname, new_partition_cnt, - errstr, sizeof(errstr)); - TEST_ASSERT(newp[0] != NULL, "%s", errstr); - - options = - rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_CREATEPARTITIONS); - err = rd_kafka_AdminOptions_set_operation_timeout( - options, timeout_ms, errstr, sizeof(errstr)); - TEST_ASSERT(!err, "%s", errstr); - - TEST_SAY("Creating %d (total) partitions for topic \"%s\"\n", - new_partition_cnt, topicname); - - TIMING_START(&t_create, "CreatePartitions"); - rd_kafka_CreatePartitions(rk, newp, newp_cnt, options, rkqu); - - /* Wait for result */ - rkev = rd_kafka_queue_poll(rkqu, timeout_ms + 2000); - TEST_ASSERT(rkev, "Timed out waiting for CreatePartitions result"); - - TIMING_STOP(&t_create); - - res = rd_kafka_event_CreatePartitions_result(rkev); - TEST_ASSERT(res, "Expected CreatePartitions_result, not %s", - rd_kafka_event_name(rkev)); - - terr = rd_kafka_CreatePartitions_result_topics(res, &res_cnt); - TEST_ASSERT(terr, "CreatePartitions_result_topics returned NULL"); - TEST_ASSERT(res_cnt == newp_cnt, - "CreatePartitions_result_topics returned %" PRIusz - " topics, not the expected %" PRIusz, - res_cnt, newp_cnt); - - TEST_ASSERT(!rd_kafka_topic_result_error(terr[0]), - "Topic %s result error: %s", - rd_kafka_topic_result_name(terr[0]), - rd_kafka_topic_result_error_string(terr[0])); - - rd_kafka_event_destroy(rkev); - - rd_kafka_queue_destroy(rkqu); - - rd_kafka_AdminOptions_destroy(options); - - rd_kafka_NewPartitions_destroy(newp[0]); - - if (!use_rk) - rd_kafka_destroy(rk); -} - - -/** - * @brief Create partitions for topic - */ -void test_create_partitions(rd_kafka_t *use_rk, - const char *topicname, - int new_partition_cnt) { - if (test_broker_version < TEST_BRKVER(0, 10, 2, 0)) - test_kafka_topics("--alter --topic %s --partitions %d", - topicname, new_partition_cnt); - else - test_admin_create_partitions(use_rk, topicname, - new_partition_cnt); -} - - -int test_get_partition_count(rd_kafka_t *rk, - const char *topicname, - int timeout_ms) { - rd_kafka_t *use_rk; - rd_kafka_resp_err_t err; - rd_kafka_topic_t *rkt; - int64_t abs_timeout = test_clock() + ((int64_t)timeout_ms * 1000); - int ret = -1; - - if (!rk) - use_rk = test_create_producer(); - else - use_rk = rk; - - rkt = rd_kafka_topic_new(use_rk, topicname, NULL); - - do { - const struct rd_kafka_metadata *metadata; - - err = rd_kafka_metadata(use_rk, 0, rkt, &metadata, - tmout_multip(15000)); - if (err) - TEST_WARN("metadata() for %s failed: %s\n", - rkt ? rd_kafka_topic_name(rkt) - : "(all-local)", - rd_kafka_err2str(err)); - else { - if (metadata->topic_cnt == 1) { - if (metadata->topics[0].err == 0 || - metadata->topics[0].partition_cnt > 0) { - int32_t cnt; - cnt = metadata->topics[0].partition_cnt; - rd_kafka_metadata_destroy(metadata); - ret = (int)cnt; - break; - } - TEST_SAY( - "metadata(%s) returned %s: retrying\n", - rd_kafka_topic_name(rkt), - rd_kafka_err2str(metadata->topics[0].err)); - } - rd_kafka_metadata_destroy(metadata); - rd_sleep(1); - } - } while (test_clock() < abs_timeout); - - rd_kafka_topic_destroy(rkt); - - if (!rk) - rd_kafka_destroy(use_rk); - - return ret; -} - -/** - * @brief Let the broker auto-create the topic for us. - */ -rd_kafka_resp_err_t test_auto_create_topic_rkt(rd_kafka_t *rk, - rd_kafka_topic_t *rkt, - int timeout_ms) { - const struct rd_kafka_metadata *metadata; - rd_kafka_resp_err_t err; - test_timing_t t; - int64_t abs_timeout = test_clock() + ((int64_t)timeout_ms * 1000); - - do { - TIMING_START(&t, "auto_create_topic"); - err = rd_kafka_metadata(rk, 0, rkt, &metadata, - tmout_multip(15000)); - TIMING_STOP(&t); - if (err) - TEST_WARN("metadata() for %s failed: %s\n", - rkt ? rd_kafka_topic_name(rkt) - : "(all-local)", - rd_kafka_err2str(err)); - else { - if (metadata->topic_cnt == 1) { - if (metadata->topics[0].err == 0 || - metadata->topics[0].partition_cnt > 0) { - rd_kafka_metadata_destroy(metadata); - return 0; - } - TEST_SAY( - "metadata(%s) returned %s: retrying\n", - rd_kafka_topic_name(rkt), - rd_kafka_err2str(metadata->topics[0].err)); - } - rd_kafka_metadata_destroy(metadata); - rd_sleep(1); - } - } while (test_clock() < abs_timeout); - - return err; -} - -rd_kafka_resp_err_t -test_auto_create_topic(rd_kafka_t *rk, const char *name, int timeout_ms) { - rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, name, NULL); - rd_kafka_resp_err_t err; - if (!rkt) - return rd_kafka_last_error(); - err = test_auto_create_topic_rkt(rk, rkt, timeout_ms); - rd_kafka_topic_destroy(rkt); - return err; -} - - -/** - * @brief Check if topic auto creation works. - * @returns 1 if it does, else 0. - */ -int test_check_auto_create_topic(void) { - rd_kafka_t *rk; - rd_kafka_conf_t *conf; - rd_kafka_resp_err_t err; - const char *topic = test_mk_topic_name("autocreatetest", 1); - - test_conf_init(&conf, NULL, 0); - rk = test_create_handle(RD_KAFKA_PRODUCER, conf); - err = test_auto_create_topic(rk, topic, tmout_multip(5000)); - if (err) - TEST_SAY("Auto topic creation of \"%s\" failed: %s\n", topic, - rd_kafka_err2str(err)); - rd_kafka_destroy(rk); - - return err ? 0 : 1; -} - - -/** - * @brief Builds and runs a Java application from the java/ directory. - * - * The application is started in the background, use - * test_waitpid() to await its demise. - * - * @param cls The app class to run using java/run-class.sh - * - * @returns -1 if the application could not be started, else the pid. - */ -int test_run_java(const char *cls, const char **argv) { -#ifdef _WIN32 - TEST_WARN("%s(%s) not supported Windows, yet", __FUNCTION__, cls); - return -1; -#else - int r; - const char *kpath; - pid_t pid; - const char **full_argv, **p; - int cnt; - extern char **environ; - - kpath = test_getenv("KAFKA_PATH", NULL); - - if (!kpath) { - TEST_WARN("%s(%s): KAFKA_PATH must be set\n", __FUNCTION__, - cls); - return -1; - } - - /* Build */ - r = system("make -s java"); - - if (r == -1 || WIFSIGNALED(r) || WEXITSTATUS(r)) { - TEST_WARN("%s(%s): failed to build java class (code %d)\n", - __FUNCTION__, cls, r); - return -1; - } - - /* For child process and run cls */ - pid = fork(); - if (pid == -1) { - TEST_WARN("%s(%s): failed to fork: %s\n", __FUNCTION__, cls, - strerror(errno)); - return -1; - } - - if (pid > 0) - return (int)pid; /* In parent process */ - - /* In child process */ - - /* Reconstruct argv to contain run-class.sh and the cls */ - for (cnt = 0; argv[cnt]; cnt++) - ; - - cnt += 3; /* run-class.sh, cls, .., NULL */ - full_argv = malloc(sizeof(*full_argv) * cnt); - full_argv[0] = "java/run-class.sh"; - full_argv[1] = (const char *)cls; - - /* Copy arguments */ - for (p = &full_argv[2]; *argv; p++, argv++) - *p = *argv; - *p = NULL; - - /* Run */ - r = execve(full_argv[0], (char *const *)full_argv, environ); - - TEST_WARN("%s(%s): failed to execute run-class.sh: %s\n", __FUNCTION__, - cls, strerror(errno)); - exit(2); - - return -1; /* NOTREACHED */ -#endif -} - - -/** - * @brief Wait for child-process \p pid to exit. - * - * @returns -1 if the child process exited successfully, else -1. - */ -int test_waitpid(int pid) { -#ifdef _WIN32 - TEST_WARN("%s() not supported Windows, yet", __FUNCTION__); - return -1; -#else - pid_t r; - int status = 0; - - r = waitpid((pid_t)pid, &status, 0); - - if (r == -1) { - TEST_WARN("waitpid(%d) failed: %s\n", pid, strerror(errno)); - return -1; - } - - if (WIFSIGNALED(status)) { - TEST_WARN("Process %d terminated by signal %d\n", pid, - WTERMSIG(status)); - return -1; - } else if (WEXITSTATUS(status)) { - TEST_WARN("Process %d exited with status %d\n", pid, - WEXITSTATUS(status)); - return -1; - } - - return 0; -#endif -} - - -/** - * @brief Check if \p feature is builtin to librdkafka. - * @returns returns 1 if feature is built in, else 0. - */ -int test_check_builtin(const char *feature) { - rd_kafka_conf_t *conf; - char errstr[128]; - int r; - - conf = rd_kafka_conf_new(); - if (rd_kafka_conf_set(conf, "builtin.features", feature, errstr, - sizeof(errstr)) != RD_KAFKA_CONF_OK) { - TEST_SAY("Feature \"%s\" not built-in: %s\n", feature, errstr); - r = 0; - } else { - TEST_SAY("Feature \"%s\" is built-in\n", feature); - r = 1; - } - - rd_kafka_conf_destroy(conf); - return r; -} - - -char *tsprintf(const char *fmt, ...) { - static RD_TLS char ret[8][512]; - static RD_TLS int i; - va_list ap; - - - i = (i + 1) % 8; - - va_start(ap, fmt); - rd_vsnprintf(ret[i], sizeof(ret[i]), fmt, ap); - va_end(ap); - - return ret[i]; -} - - -/** - * @brief Add a test report JSON object. - * These will be written as a JSON array to the test report file. - */ -void test_report_add(struct test *test, const char *fmt, ...) { - va_list ap; - char buf[512]; - - va_start(ap, fmt); - vsnprintf(buf, sizeof(buf), fmt, ap); - va_end(ap); - - if (test->report_cnt == test->report_size) { - if (test->report_size == 0) - test->report_size = 8; - else - test->report_size *= 2; - - test->report_arr = - realloc(test->report_arr, - sizeof(*test->report_arr) * test->report_size); - } - - test->report_arr[test->report_cnt++] = rd_strdup(buf); - - TEST_SAYL(1, "Report #%d: %s\n", test->report_cnt - 1, buf); -} - -/** - * Returns 1 if KAFKA_PATH and BROKERS (or ZK_ADDRESS) is set to se we can use - * the kafka-topics.sh script to manually create topics. - * - * If \p skip is set TEST_SKIP() will be called with a helpful message. - */ -int test_can_create_topics(int skip) { -#ifndef _WIN32 - const char *bootstrap; -#endif - - /* Has AdminAPI */ - if (test_broker_version >= TEST_BRKVER(0, 10, 2, 0)) - return 1; - -#ifdef _WIN32 - if (skip) - TEST_SKIP("Cannot create topics on Win32\n"); - return 0; -#else - - bootstrap = test_broker_version >= TEST_BRKVER(3, 0, 0, 0) - ? "BROKERS" - : "ZK_ADDRESS"; - - if (!test_getenv("KAFKA_PATH", NULL) || !test_getenv(bootstrap, NULL)) { - if (skip) - TEST_SKIP( - "Cannot create topics " - "(set KAFKA_PATH and %s)\n", - bootstrap); - return 0; - } - - - return 1; -#endif -} - - -/** - * Wait for \p event_type, discarding all other events prior to it. - */ -rd_kafka_event_t *test_wait_event(rd_kafka_queue_t *eventq, - rd_kafka_event_type_t event_type, - int timeout_ms) { - test_timing_t t_w; - int64_t abs_timeout = test_clock() + ((int64_t)timeout_ms * 1000); - - TIMING_START(&t_w, "wait_event"); - while (test_clock() < abs_timeout) { - rd_kafka_event_t *rkev; - - rkev = rd_kafka_queue_poll( - eventq, (int)(abs_timeout - test_clock()) / 1000); - - if (rd_kafka_event_type(rkev) == event_type) { - TIMING_STOP(&t_w); - return rkev; - } - - if (!rkev) - continue; - - if (rd_kafka_event_error(rkev)) - TEST_SAY("discarding ignored event %s: %s\n", - rd_kafka_event_name(rkev), - rd_kafka_event_error_string(rkev)); - else - TEST_SAY("discarding ignored event %s\n", - rd_kafka_event_name(rkev)); - rd_kafka_event_destroy(rkev); - } - TIMING_STOP(&t_w); - - return NULL; -} - - -void test_SAY(const char *file, int line, int level, const char *str) { - TEST_SAYL(level, "%s", str); -} - -void test_SKIP(const char *file, int line, const char *str) { - TEST_WARN("SKIPPING TEST: %s", str); - TEST_LOCK(); - test_curr->state = TEST_SKIPPED; - if (!*test_curr->failstr) { - rd_snprintf(test_curr->failstr, sizeof(test_curr->failstr), - "%s", str); - rtrim(test_curr->failstr); - } - TEST_UNLOCK(); -} - -const char *test_curr_name(void) { - return test_curr->name; -} - - -/** - * @brief Dump/print message haders - */ -void test_headers_dump(const char *what, - int lvl, - const rd_kafka_headers_t *hdrs) { - size_t idx = 0; - const char *name, *value; - size_t size; - - while (!rd_kafka_header_get_all(hdrs, idx++, &name, - (const void **)&value, &size)) - TEST_SAYL(lvl, "%s: Header #%" PRIusz ": %s='%s'\n", what, - idx - 1, name, value ? value : "(NULL)"); -} - - -/** - * @brief Retrieve and return the list of broker ids in the cluster. - * - * @param rk Optional instance to use. - * @param cntp Will be updated to the number of brokers returned. - * - * @returns a malloc:ed list of int32_t broker ids. - */ -int32_t *test_get_broker_ids(rd_kafka_t *use_rk, size_t *cntp) { - int32_t *ids; - rd_kafka_t *rk; - const rd_kafka_metadata_t *md; - rd_kafka_resp_err_t err; - size_t i; - - if (!(rk = use_rk)) - rk = test_create_producer(); - - err = rd_kafka_metadata(rk, 0, NULL, &md, tmout_multip(5000)); - TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); - TEST_ASSERT(md->broker_cnt > 0, "%d brokers, expected > 0", - md->broker_cnt); - - ids = malloc(sizeof(*ids) * md->broker_cnt); - - for (i = 0; i < (size_t)md->broker_cnt; i++) - ids[i] = md->brokers[i].id; - - *cntp = md->broker_cnt; - - rd_kafka_metadata_destroy(md); - - if (!use_rk) - rd_kafka_destroy(rk); - - return ids; -} - - - -/** - * @brief Verify that all topics in \p topics are reported in metadata, - * and that none of the topics in \p not_topics are reported. - * - * @returns the number of failures (but does not FAIL). - */ -static int verify_topics_in_metadata(rd_kafka_t *rk, - rd_kafka_metadata_topic_t *topics, - size_t topic_cnt, - rd_kafka_metadata_topic_t *not_topics, - size_t not_topic_cnt) { - const rd_kafka_metadata_t *md; - rd_kafka_resp_err_t err; - int ti; - size_t i; - int fails = 0; - - /* Mark topics with dummy error which is overwritten - * when topic is found in metadata, allowing us to check - * for missed topics. */ - for (i = 0; i < topic_cnt; i++) - topics[i].err = 12345; - - err = rd_kafka_metadata(rk, 1 /*all_topics*/, NULL, &md, - tmout_multip(5000)); - TEST_ASSERT(!err, "metadata failed: %s", rd_kafka_err2str(err)); - - for (ti = 0; ti < md->topic_cnt; ti++) { - const rd_kafka_metadata_topic_t *mdt = &md->topics[ti]; - - for (i = 0; i < topic_cnt; i++) { - int pi; - rd_kafka_metadata_topic_t *exp_mdt; - - if (strcmp(topics[i].topic, mdt->topic)) - continue; - - exp_mdt = &topics[i]; - - exp_mdt->err = mdt->err; /* indicate found */ - if (mdt->err) { - TEST_SAY( - "metadata: " - "Topic %s has error %s\n", - mdt->topic, rd_kafka_err2str(mdt->err)); - fails++; - } - - if (exp_mdt->partition_cnt > 0 && - mdt->partition_cnt != exp_mdt->partition_cnt) { - TEST_SAY( - "metadata: " - "Topic %s, expected %d partitions" - ", not %d\n", - mdt->topic, exp_mdt->partition_cnt, - mdt->partition_cnt); - fails++; - continue; - } - - /* Verify per-partition values */ - for (pi = 0; - exp_mdt->partitions && pi < exp_mdt->partition_cnt; - pi++) { - const rd_kafka_metadata_partition_t *mdp = - &mdt->partitions[pi]; - const rd_kafka_metadata_partition_t *exp_mdp = - &exp_mdt->partitions[pi]; - - if (mdp->id != exp_mdp->id) { - TEST_SAY( - "metadata: " - "Topic %s, " - "partition %d, " - "partition list out of order," - " expected %d, not %d\n", - mdt->topic, pi, exp_mdp->id, - mdp->id); - fails++; - continue; - } - - if (exp_mdp->replicas) { - if (mdp->replica_cnt != - exp_mdp->replica_cnt) { - TEST_SAY( - "metadata: " - "Topic %s, " - "partition %d, " - "expected %d replicas," - " not %d\n", - mdt->topic, pi, - exp_mdp->replica_cnt, - mdp->replica_cnt); - fails++; - } else if ( - memcmp( - mdp->replicas, - exp_mdp->replicas, - mdp->replica_cnt * - sizeof(*mdp->replicas))) { - int ri; - - TEST_SAY( - "metadata: " - "Topic %s, " - "partition %d, " - "replica mismatch:\n", - mdt->topic, pi); - - for (ri = 0; - ri < mdp->replica_cnt; - ri++) { - TEST_SAY( - " #%d: " - "expected " - "replica %d, " - "not %d\n", - ri, - exp_mdp - ->replicas[ri], - mdp->replicas[ri]); - } - - fails++; - } - } - } - } - - for (i = 0; i < not_topic_cnt; i++) { - if (strcmp(not_topics[i].topic, mdt->topic)) - continue; - - TEST_SAY( - "metadata: " - "Topic %s found in metadata, unexpected\n", - mdt->topic); - fails++; - } - } - - for (i = 0; i < topic_cnt; i++) { - if ((int)topics[i].err == 12345) { - TEST_SAY( - "metadata: " - "Topic %s not seen in metadata\n", - topics[i].topic); - fails++; - } - } - - if (fails > 0) - TEST_SAY("Metadata verification for %" PRIusz - " topics failed " - "with %d errors (see above)\n", - topic_cnt, fails); - else - TEST_SAY( - "Metadata verification succeeded: " - "%" PRIusz - " desired topics seen, " - "%" PRIusz " undesired topics not seen\n", - topic_cnt, not_topic_cnt); - - rd_kafka_metadata_destroy(md); - - return fails; -} - - - -/** - * @brief Wait for metadata to reflect expected and not expected topics - */ -void test_wait_metadata_update(rd_kafka_t *rk, - rd_kafka_metadata_topic_t *topics, - size_t topic_cnt, - rd_kafka_metadata_topic_t *not_topics, - size_t not_topic_cnt, - int tmout) { - int64_t abs_timeout; - test_timing_t t_md; - rd_kafka_t *our_rk = NULL; - - if (!rk) - rk = our_rk = test_create_handle(RD_KAFKA_PRODUCER, NULL); - - abs_timeout = test_clock() + ((int64_t)tmout * 1000); - - TEST_SAY("Waiting for up to %dms for metadata update\n", tmout); - - TIMING_START(&t_md, "METADATA.WAIT"); - do { - int md_fails; - - md_fails = verify_topics_in_metadata(rk, topics, topic_cnt, - not_topics, not_topic_cnt); - - if (!md_fails) { - TEST_SAY( - "All expected topics (not?) " - "seen in metadata\n"); - abs_timeout = 0; - break; - } - - rd_sleep(1); - } while (test_clock() < abs_timeout); - TIMING_STOP(&t_md); - - if (our_rk) - rd_kafka_destroy(our_rk); - - if (abs_timeout) - TEST_FAIL("Expected topics not seen in given time."); -} - -/** - * @brief Wait for topic to be available in metadata - */ -void test_wait_topic_exists(rd_kafka_t *rk, const char *topic, int tmout) { - rd_kafka_metadata_topic_t topics = {.topic = (char *)topic}; - - test_wait_metadata_update(rk, &topics, 1, NULL, 0, tmout); - - /* Wait an additional second for the topic to propagate in - * the cluster. This is not perfect but a cheap workaround for - * the asynchronous nature of topic creations in Kafka. */ - rd_sleep(1); -} - - - -/** - * @brief Wait for up to \p tmout for any type of admin result. - * @returns the event - */ -rd_kafka_event_t *test_wait_admin_result(rd_kafka_queue_t *q, - rd_kafka_event_type_t evtype, - int tmout) { - rd_kafka_event_t *rkev; - - while (1) { - rkev = rd_kafka_queue_poll(q, tmout); - if (!rkev) - TEST_FAIL("Timed out waiting for admin result (%d)\n", - evtype); - - if (rd_kafka_event_type(rkev) == evtype) - return rkev; - - - if (rd_kafka_event_type(rkev) == RD_KAFKA_EVENT_ERROR) { - TEST_WARN( - "Received error event while waiting for %d: " - "%s: ignoring", - evtype, rd_kafka_event_error_string(rkev)); - continue; - } - - - TEST_ASSERT(rd_kafka_event_type(rkev) == evtype, - "Expected event type %d, got %d (%s)", evtype, - rd_kafka_event_type(rkev), - rd_kafka_event_name(rkev)); - } - - return NULL; -} - -/** - * @brief Wait for up to \p tmout for an admin API result and return the - * distilled error code. - * - * Supported APIs: - * - AlterConfigs - * - CreatePartitions - * - CreateTopics - * - DeleteGroups - * - DeleteRecords - * - DeleteTopics - * - DeleteConsumerGroupOffsets - * - DescribeConfigs - * - CreateAcls - */ -rd_kafka_resp_err_t test_wait_topic_admin_result(rd_kafka_queue_t *q, - rd_kafka_event_type_t evtype, - rd_kafka_event_t **retevent, - int tmout) { - rd_kafka_event_t *rkev; - size_t i; - const rd_kafka_topic_result_t **terr = NULL; - size_t terr_cnt = 0; - const rd_kafka_ConfigResource_t **cres = NULL; - size_t cres_cnt = 0; - const rd_kafka_acl_result_t **aclres = NULL; - size_t aclres_cnt = 0; - int errcnt = 0; - rd_kafka_resp_err_t err; - const rd_kafka_group_result_t **gres = NULL; - size_t gres_cnt = 0; - const rd_kafka_ConsumerGroupDescription_t **gdescs = NULL; - size_t gdescs_cnt = 0; - const rd_kafka_error_t **glists_errors = NULL; - size_t glists_error_cnt = 0; - const rd_kafka_topic_partition_list_t *offsets = NULL; - - rkev = test_wait_admin_result(q, evtype, tmout); - - if ((err = rd_kafka_event_error(rkev))) { - TEST_WARN("%s failed: %s\n", rd_kafka_event_name(rkev), - rd_kafka_event_error_string(rkev)); - rd_kafka_event_destroy(rkev); - return err; - } - - if (evtype == RD_KAFKA_EVENT_CREATETOPICS_RESULT) { - const rd_kafka_CreateTopics_result_t *res; - if (!(res = rd_kafka_event_CreateTopics_result(rkev))) - TEST_FAIL("Expected a CreateTopics result, not %s", - rd_kafka_event_name(rkev)); - - terr = rd_kafka_CreateTopics_result_topics(res, &terr_cnt); - - } else if (evtype == RD_KAFKA_EVENT_DELETETOPICS_RESULT) { - const rd_kafka_DeleteTopics_result_t *res; - if (!(res = rd_kafka_event_DeleteTopics_result(rkev))) - TEST_FAIL("Expected a DeleteTopics result, not %s", - rd_kafka_event_name(rkev)); - - terr = rd_kafka_DeleteTopics_result_topics(res, &terr_cnt); - - } else if (evtype == RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT) { - const rd_kafka_CreatePartitions_result_t *res; - if (!(res = rd_kafka_event_CreatePartitions_result(rkev))) - TEST_FAIL("Expected a CreatePartitions result, not %s", - rd_kafka_event_name(rkev)); - - terr = rd_kafka_CreatePartitions_result_topics(res, &terr_cnt); - - } else if (evtype == RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT) { - const rd_kafka_DescribeConfigs_result_t *res; - - if (!(res = rd_kafka_event_DescribeConfigs_result(rkev))) - TEST_FAIL("Expected a DescribeConfigs result, not %s", - rd_kafka_event_name(rkev)); - - cres = - rd_kafka_DescribeConfigs_result_resources(res, &cres_cnt); - - } else if (evtype == RD_KAFKA_EVENT_ALTERCONFIGS_RESULT) { - const rd_kafka_AlterConfigs_result_t *res; - - if (!(res = rd_kafka_event_AlterConfigs_result(rkev))) - TEST_FAIL("Expected a AlterConfigs result, not %s", - rd_kafka_event_name(rkev)); - - cres = rd_kafka_AlterConfigs_result_resources(res, &cres_cnt); - - } else if (evtype == RD_KAFKA_EVENT_CREATEACLS_RESULT) { - const rd_kafka_CreateAcls_result_t *res; - - if (!(res = rd_kafka_event_CreateAcls_result(rkev))) - TEST_FAIL("Expected a CreateAcls result, not %s", - rd_kafka_event_name(rkev)); - - aclres = rd_kafka_CreateAcls_result_acls(res, &aclres_cnt); - } else if (evtype == RD_KAFKA_EVENT_LISTCONSUMERGROUPS_RESULT) { - const rd_kafka_ListConsumerGroups_result_t *res; - if (!(res = rd_kafka_event_ListConsumerGroups_result(rkev))) - TEST_FAIL( - "Expected a ListConsumerGroups result, not %s", - rd_kafka_event_name(rkev)); - - glists_errors = rd_kafka_ListConsumerGroups_result_errors( - res, &glists_error_cnt); - } else if (evtype == RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT) { - const rd_kafka_DescribeConsumerGroups_result_t *res; - if (!(res = rd_kafka_event_DescribeConsumerGroups_result(rkev))) - TEST_FAIL( - "Expected a DescribeConsumerGroups result, not %s", - rd_kafka_event_name(rkev)); - - gdescs = rd_kafka_DescribeConsumerGroups_result_groups( - res, &gdescs_cnt); - } else if (evtype == RD_KAFKA_EVENT_DELETEGROUPS_RESULT) { - const rd_kafka_DeleteGroups_result_t *res; - if (!(res = rd_kafka_event_DeleteGroups_result(rkev))) - TEST_FAIL("Expected a DeleteGroups result, not %s", - rd_kafka_event_name(rkev)); - - gres = rd_kafka_DeleteGroups_result_groups(res, &gres_cnt); - - } else if (evtype == RD_KAFKA_EVENT_DELETERECORDS_RESULT) { - const rd_kafka_DeleteRecords_result_t *res; - if (!(res = rd_kafka_event_DeleteRecords_result(rkev))) - TEST_FAIL("Expected a DeleteRecords result, not %s", - rd_kafka_event_name(rkev)); - - offsets = rd_kafka_DeleteRecords_result_offsets(res); - - } else if (evtype == RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT) { - const rd_kafka_DeleteConsumerGroupOffsets_result_t *res; - if (!(res = rd_kafka_event_DeleteConsumerGroupOffsets_result( - rkev))) - TEST_FAIL( - "Expected a DeleteConsumerGroupOffsets " - "result, not %s", - rd_kafka_event_name(rkev)); - - gres = rd_kafka_DeleteConsumerGroupOffsets_result_groups( - rkev, &gres_cnt); - - } else { - TEST_FAIL("Bad evtype: %d", evtype); - RD_NOTREACHED(); - } - - /* Check topic errors */ - for (i = 0; i < terr_cnt; i++) { - if (rd_kafka_topic_result_error(terr[i])) { - TEST_WARN("..Topics result: %s: error: %s\n", - rd_kafka_topic_result_name(terr[i]), - rd_kafka_topic_result_error_string(terr[i])); - if (!(errcnt++)) - err = rd_kafka_topic_result_error(terr[i]); - } - } - - /* Check resource errors */ - for (i = 0; i < cres_cnt; i++) { - if (rd_kafka_ConfigResource_error(cres[i])) { - TEST_WARN( - "ConfigResource result: %d,%s: error: %s\n", - rd_kafka_ConfigResource_type(cres[i]), - rd_kafka_ConfigResource_name(cres[i]), - rd_kafka_ConfigResource_error_string(cres[i])); - if (!(errcnt++)) - err = rd_kafka_ConfigResource_error(cres[i]); - } - } - - /* Check ACL errors */ - for (i = 0; i < aclres_cnt; i++) { - const rd_kafka_error_t *error = - rd_kafka_acl_result_error(aclres[i]); - if (error) { - TEST_WARN("AclResult error: %s: %s\n", - rd_kafka_error_name(error), - rd_kafka_error_string(error)); - if (!(errcnt++)) - err = rd_kafka_error_code(error); - } - } - - /* Check list groups errors */ - for (i = 0; i < glists_error_cnt; i++) { - const rd_kafka_error_t *error = glists_errors[i]; - TEST_WARN("%s error: %s\n", rd_kafka_event_name(rkev), - rd_kafka_error_string(error)); - if (!(errcnt++)) - err = rd_kafka_error_code(error); - } - - /* Check describe groups errors */ - for (i = 0; i < gdescs_cnt; i++) { - const rd_kafka_error_t *error; - if ((error = - rd_kafka_ConsumerGroupDescription_error(gdescs[i]))) { - TEST_WARN("%s result: %s: error: %s\n", - rd_kafka_event_name(rkev), - rd_kafka_ConsumerGroupDescription_group_id( - gdescs[i]), - rd_kafka_error_string(error)); - if (!(errcnt++)) - err = rd_kafka_error_code(error); - } - } - - /* Check group errors */ - for (i = 0; i < gres_cnt; i++) { - const rd_kafka_topic_partition_list_t *parts; - - if (rd_kafka_group_result_error(gres[i])) { - - TEST_WARN("%s result: %s: error: %s\n", - rd_kafka_event_name(rkev), - rd_kafka_group_result_name(gres[i]), - rd_kafka_error_string( - rd_kafka_group_result_error(gres[i]))); - if (!(errcnt++)) - err = rd_kafka_error_code( - rd_kafka_group_result_error(gres[i])); - } - - parts = rd_kafka_group_result_partitions(gres[i]); - if (parts) { - int j; - for (j = 0; j < parts->cnt; i++) { - if (!parts->elems[j].err) - continue; - - TEST_WARN( - "%s result: %s: " - "%s [%" PRId32 "] error: %s\n", - rd_kafka_event_name(rkev), - rd_kafka_group_result_name(gres[i]), - parts->elems[j].topic, - parts->elems[j].partition, - rd_kafka_err2str(parts->elems[j].err)); - errcnt++; - } - } - } - - /* Check offset errors */ - for (i = 0; (offsets && i < (size_t)offsets->cnt); i++) { - if (offsets->elems[i].err) { - TEST_WARN("DeleteRecords result: %s [%d]: error: %s\n", - offsets->elems[i].topic, - offsets->elems[i].partition, - rd_kafka_err2str(offsets->elems[i].err)); - if (!(errcnt++)) - err = offsets->elems[i].err; - } - } - - if (!err && retevent) - *retevent = rkev; - else - rd_kafka_event_destroy(rkev); - - return err; -} - - - -/** - * @brief Topic Admin API helpers - * - * @param useq Makes the call async and posts the response in this queue. - * If NULL this call will be synchronous and return the error - * result. - * - * @remark Fails the current test on failure. - */ - -rd_kafka_resp_err_t test_CreateTopics_simple(rd_kafka_t *rk, - rd_kafka_queue_t *useq, - char **topics, - size_t topic_cnt, - int num_partitions, - void *opaque) { - rd_kafka_NewTopic_t **new_topics; - rd_kafka_AdminOptions_t *options; - rd_kafka_queue_t *q; - size_t i; - const int tmout = 30 * 1000; - rd_kafka_resp_err_t err; - - new_topics = malloc(sizeof(*new_topics) * topic_cnt); - - for (i = 0; i < topic_cnt; i++) { - char errstr[512]; - new_topics[i] = rd_kafka_NewTopic_new( - topics[i], num_partitions, 1, errstr, sizeof(errstr)); - TEST_ASSERT(new_topics[i], - "Failed to NewTopic(\"%s\", %d) #%" PRIusz ": %s", - topics[i], num_partitions, i, errstr); - } - - options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_CREATETOPICS); - rd_kafka_AdminOptions_set_opaque(options, opaque); - - if (!useq) { - char errstr[512]; - - err = rd_kafka_AdminOptions_set_request_timeout( - options, tmout, errstr, sizeof(errstr)); - TEST_ASSERT(!err, "set_request_timeout: %s", errstr); - err = rd_kafka_AdminOptions_set_operation_timeout( - options, tmout - 5000, errstr, sizeof(errstr)); - TEST_ASSERT(!err, "set_operation_timeout: %s", errstr); - - q = rd_kafka_queue_new(rk); - } else { - q = useq; - } - - TEST_SAY("Creating %" PRIusz " topics\n", topic_cnt); - - rd_kafka_CreateTopics(rk, new_topics, topic_cnt, options, q); - - rd_kafka_AdminOptions_destroy(options); - - rd_kafka_NewTopic_destroy_array(new_topics, topic_cnt); - free(new_topics); - - if (useq) - return RD_KAFKA_RESP_ERR_NO_ERROR; - - - err = test_wait_topic_admin_result( - q, RD_KAFKA_EVENT_CREATETOPICS_RESULT, NULL, tmout + 5000); - - rd_kafka_queue_destroy(q); - - if (err) - TEST_FAIL("Failed to create %d topic(s): %s", (int)topic_cnt, - rd_kafka_err2str(err)); - - return err; -} - - -rd_kafka_resp_err_t test_CreatePartitions_simple(rd_kafka_t *rk, - rd_kafka_queue_t *useq, - const char *topic, - size_t total_part_cnt, - void *opaque) { - rd_kafka_NewPartitions_t *newp[1]; - rd_kafka_AdminOptions_t *options; - rd_kafka_queue_t *q; - const int tmout = 30 * 1000; - rd_kafka_resp_err_t err; - char errstr[512]; - - newp[0] = rd_kafka_NewPartitions_new(topic, total_part_cnt, errstr, - sizeof(errstr)); - TEST_ASSERT(newp[0], "Failed to NewPartitions(\"%s\", %" PRIusz "): %s", - topic, total_part_cnt, errstr); - - options = - rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_CREATEPARTITIONS); - rd_kafka_AdminOptions_set_opaque(options, opaque); - - if (!useq) { - err = rd_kafka_AdminOptions_set_request_timeout( - options, tmout, errstr, sizeof(errstr)); - TEST_ASSERT(!err, "set_request_timeout: %s", errstr); - err = rd_kafka_AdminOptions_set_operation_timeout( - options, tmout - 5000, errstr, sizeof(errstr)); - TEST_ASSERT(!err, "set_operation_timeout: %s", errstr); - - q = rd_kafka_queue_new(rk); - } else { - q = useq; - } - - TEST_SAY("Creating (up to) %" PRIusz " partitions for topic \"%s\"\n", - total_part_cnt, topic); - - rd_kafka_CreatePartitions(rk, newp, 1, options, q); - - rd_kafka_AdminOptions_destroy(options); - - rd_kafka_NewPartitions_destroy(newp[0]); - - if (useq) - return RD_KAFKA_RESP_ERR_NO_ERROR; - - - err = test_wait_topic_admin_result( - q, RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT, NULL, tmout + 5000); - - rd_kafka_queue_destroy(q); - - if (err) - TEST_FAIL("Failed to create partitions: %s", - rd_kafka_err2str(err)); - - return err; -} - - -rd_kafka_resp_err_t test_DeleteTopics_simple(rd_kafka_t *rk, - rd_kafka_queue_t *useq, - char **topics, - size_t topic_cnt, - void *opaque) { - rd_kafka_queue_t *q; - rd_kafka_DeleteTopic_t **del_topics; - rd_kafka_AdminOptions_t *options; - size_t i; - rd_kafka_resp_err_t err; - const int tmout = 30 * 1000; - - del_topics = malloc(sizeof(*del_topics) * topic_cnt); - - for (i = 0; i < topic_cnt; i++) { - del_topics[i] = rd_kafka_DeleteTopic_new(topics[i]); - TEST_ASSERT(del_topics[i]); - } - - options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DELETETOPICS); - rd_kafka_AdminOptions_set_opaque(options, opaque); - - if (!useq) { - char errstr[512]; - - err = rd_kafka_AdminOptions_set_request_timeout( - options, tmout, errstr, sizeof(errstr)); - TEST_ASSERT(!err, "set_request_timeout: %s", errstr); - err = rd_kafka_AdminOptions_set_operation_timeout( - options, tmout - 5000, errstr, sizeof(errstr)); - TEST_ASSERT(!err, "set_operation_timeout: %s", errstr); - - q = rd_kafka_queue_new(rk); - } else { - q = useq; - } - - TEST_SAY("Deleting %" PRIusz " topics\n", topic_cnt); - - rd_kafka_DeleteTopics(rk, del_topics, topic_cnt, options, useq); - - rd_kafka_AdminOptions_destroy(options); - - rd_kafka_DeleteTopic_destroy_array(del_topics, topic_cnt); - - free(del_topics); - - if (useq) - return RD_KAFKA_RESP_ERR_NO_ERROR; - - err = test_wait_topic_admin_result( - q, RD_KAFKA_EVENT_DELETETOPICS_RESULT, NULL, tmout + 5000); - - rd_kafka_queue_destroy(q); - - if (err) - TEST_FAIL("Failed to delete topics: %s", rd_kafka_err2str(err)); - - return err; -} - -rd_kafka_resp_err_t test_DeleteGroups_simple(rd_kafka_t *rk, - rd_kafka_queue_t *useq, - char **groups, - size_t group_cnt, - void *opaque) { - rd_kafka_queue_t *q; - rd_kafka_DeleteGroup_t **del_groups; - rd_kafka_AdminOptions_t *options; - size_t i; - rd_kafka_resp_err_t err; - const int tmout = 30 * 1000; - - del_groups = malloc(sizeof(*del_groups) * group_cnt); - - for (i = 0; i < group_cnt; i++) { - del_groups[i] = rd_kafka_DeleteGroup_new(groups[i]); - TEST_ASSERT(del_groups[i]); - } - - options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DELETEGROUPS); - rd_kafka_AdminOptions_set_opaque(options, opaque); - - if (!useq) { - char errstr[512]; - - err = rd_kafka_AdminOptions_set_request_timeout( - options, tmout, errstr, sizeof(errstr)); - TEST_ASSERT(!err, "set_request_timeout: %s", errstr); - - q = rd_kafka_queue_new(rk); - } else { - q = useq; - } - - TEST_SAY("Deleting %" PRIusz " groups\n", group_cnt); - - rd_kafka_DeleteGroups(rk, del_groups, group_cnt, options, q); - - rd_kafka_AdminOptions_destroy(options); - - rd_kafka_DeleteGroup_destroy_array(del_groups, group_cnt); - free(del_groups); - - if (useq) - return RD_KAFKA_RESP_ERR_NO_ERROR; - - err = test_wait_topic_admin_result( - q, RD_KAFKA_EVENT_DELETEGROUPS_RESULT, NULL, tmout + 5000); - - rd_kafka_queue_destroy(q); - - if (err) - TEST_FAIL("Failed to delete groups: %s", rd_kafka_err2str(err)); - - return err; -} - -rd_kafka_resp_err_t -test_DeleteRecords_simple(rd_kafka_t *rk, - rd_kafka_queue_t *useq, - const rd_kafka_topic_partition_list_t *offsets, - void *opaque) { - rd_kafka_queue_t *q; - rd_kafka_AdminOptions_t *options; - rd_kafka_resp_err_t err; - rd_kafka_DeleteRecords_t *del_records = - rd_kafka_DeleteRecords_new(offsets); - const int tmout = 30 * 1000; - - options = - rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DELETERECORDS); - rd_kafka_AdminOptions_set_opaque(options, opaque); - - if (!useq) { - char errstr[512]; - - err = rd_kafka_AdminOptions_set_request_timeout( - options, tmout, errstr, sizeof(errstr)); - TEST_ASSERT(!err, "set_request_timeout: %s", errstr); - err = rd_kafka_AdminOptions_set_operation_timeout( - options, tmout - 5000, errstr, sizeof(errstr)); - TEST_ASSERT(!err, "set_operation_timeout: %s", errstr); - - q = rd_kafka_queue_new(rk); - } else { - q = useq; - } - - TEST_SAY("Deleting offsets from %d partitions\n", offsets->cnt); - - rd_kafka_DeleteRecords(rk, &del_records, 1, options, q); - - rd_kafka_DeleteRecords_destroy(del_records); - - rd_kafka_AdminOptions_destroy(options); - - if (useq) - return RD_KAFKA_RESP_ERR_NO_ERROR; - - err = test_wait_topic_admin_result( - q, RD_KAFKA_EVENT_DELETERECORDS_RESULT, NULL, tmout + 5000); - - rd_kafka_queue_destroy(q); - - if (err) - TEST_FAIL("Failed to delete records: %s", - rd_kafka_err2str(err)); - - return err; -} - -rd_kafka_resp_err_t test_DeleteConsumerGroupOffsets_simple( - rd_kafka_t *rk, - rd_kafka_queue_t *useq, - const char *group_id, - const rd_kafka_topic_partition_list_t *offsets, - void *opaque) { - rd_kafka_queue_t *q; - rd_kafka_AdminOptions_t *options; - rd_kafka_resp_err_t err; - const int tmout = 30 * 1000; - rd_kafka_DeleteConsumerGroupOffsets_t *cgoffsets; - - options = rd_kafka_AdminOptions_new( - rk, RD_KAFKA_ADMIN_OP_DELETECONSUMERGROUPOFFSETS); - rd_kafka_AdminOptions_set_opaque(options, opaque); - - if (!useq) { - char errstr[512]; - - err = rd_kafka_AdminOptions_set_request_timeout( - options, tmout, errstr, sizeof(errstr)); - TEST_ASSERT(!err, "set_request_timeout: %s", errstr); - err = rd_kafka_AdminOptions_set_operation_timeout( - options, tmout - 5000, errstr, sizeof(errstr)); - TEST_ASSERT(!err, "set_operation_timeout: %s", errstr); - - q = rd_kafka_queue_new(rk); - } else { - q = useq; - } - - if (offsets) { - TEST_SAY( - "Deleting committed offsets for group %s and " - "%d partitions\n", - group_id, offsets->cnt); - - cgoffsets = - rd_kafka_DeleteConsumerGroupOffsets_new(group_id, offsets); - } else { - TEST_SAY("Provoking invalid DeleteConsumerGroupOffsets call\n"); - cgoffsets = NULL; - } - - rd_kafka_DeleteConsumerGroupOffsets(rk, &cgoffsets, cgoffsets ? 1 : 0, - options, useq); - - if (cgoffsets) - rd_kafka_DeleteConsumerGroupOffsets_destroy(cgoffsets); - - rd_kafka_AdminOptions_destroy(options); - - if (useq) - return RD_KAFKA_RESP_ERR_NO_ERROR; - - err = test_wait_topic_admin_result( - q, RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT, NULL, - tmout + 5000); - - rd_kafka_queue_destroy(q); - - if (err) - TEST_FAIL("Failed to delete committed offsets: %s", - rd_kafka_err2str(err)); - - return err; -} - -/** - * @brief Delta Alter configuration for the given resource, - * overwriting/setting the configs provided in \p configs. - * Existing configuration remains intact. - * - * @param configs 'const char *name, const char *value' tuples - * @param config_cnt is the number of tuples in \p configs - */ -rd_kafka_resp_err_t test_AlterConfigs_simple(rd_kafka_t *rk, - rd_kafka_ResourceType_t restype, - const char *resname, - const char **configs, - size_t config_cnt) { - rd_kafka_queue_t *q; - rd_kafka_ConfigResource_t *confres; - rd_kafka_event_t *rkev; - size_t i; - rd_kafka_resp_err_t err; - const rd_kafka_ConfigResource_t **results; - size_t result_cnt; - const rd_kafka_ConfigEntry_t **configents; - size_t configent_cnt; - - - q = rd_kafka_queue_new(rk); - - TEST_SAY("Getting configuration for %d %s\n", restype, resname); - - confres = rd_kafka_ConfigResource_new(restype, resname); - rd_kafka_DescribeConfigs(rk, &confres, 1, NULL, q); - - err = test_wait_topic_admin_result( - q, RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT, &rkev, 15 * 1000); - if (err) { - rd_kafka_queue_destroy(q); - rd_kafka_ConfigResource_destroy(confres); - return err; - } - - results = rd_kafka_DescribeConfigs_result_resources( - rd_kafka_event_DescribeConfigs_result(rkev), &result_cnt); - TEST_ASSERT(result_cnt == 1, - "expected 1 DescribeConfigs result, not %" PRIusz, - result_cnt); - - configents = - rd_kafka_ConfigResource_configs(results[0], &configent_cnt); - TEST_ASSERT(configent_cnt > 0, - "expected > 0 ConfigEntry:s, not %" PRIusz, configent_cnt); - - TEST_SAY("Altering configuration for %d %s\n", restype, resname); - - /* Apply all existing configuration entries to resource object that - * will later be passed to AlterConfigs. */ - for (i = 0; i < configent_cnt; i++) { - const char *entry_name = - rd_kafka_ConfigEntry_name(configents[i]); - - if (test_broker_version >= TEST_BRKVER(3, 2, 0, 0)) { - /* Skip entries that are overwritten to - * avoid duplicates, that cause an error since - * this broker version. */ - size_t j; - for (j = 0; j < config_cnt; j += 2) { - if (!strcmp(configs[j], entry_name)) { - break; - } - } - - if (j < config_cnt) - continue; - } - - err = rd_kafka_ConfigResource_set_config( - confres, entry_name, - rd_kafka_ConfigEntry_value(configents[i])); - TEST_ASSERT(!err, - "Failed to set read-back config %s=%s " - "on local resource object", - entry_name, - rd_kafka_ConfigEntry_value(configents[i])); - } - - rd_kafka_event_destroy(rkev); - - /* Then apply the configuration to change. */ - for (i = 0; i < config_cnt; i += 2) { - err = rd_kafka_ConfigResource_set_config(confres, configs[i], - configs[i + 1]); - TEST_ASSERT(!err, - "Failed to set config %s=%s on " - "local resource object", - configs[i], configs[i + 1]); - } - - rd_kafka_AlterConfigs(rk, &confres, 1, NULL, q); - - rd_kafka_ConfigResource_destroy(confres); - - err = test_wait_topic_admin_result( - q, RD_KAFKA_EVENT_ALTERCONFIGS_RESULT, NULL, 15 * 1000); - - rd_kafka_queue_destroy(q); - - return err; -} - -/** - * @brief Topic Admin API helpers - * - * @param useq Makes the call async and posts the response in this queue. - * If NULL this call will be synchronous and return the error - * result. - * - * @remark Fails the current test on failure. - */ - -rd_kafka_resp_err_t test_CreateAcls_simple(rd_kafka_t *rk, - rd_kafka_queue_t *useq, - rd_kafka_AclBinding_t **acls, - size_t acl_cnt, - void *opaque) { - rd_kafka_AdminOptions_t *options; - rd_kafka_queue_t *q; - rd_kafka_resp_err_t err; - const int tmout = 30 * 1000; - - options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_CREATEACLS); - rd_kafka_AdminOptions_set_opaque(options, opaque); - - if (!useq) { - q = rd_kafka_queue_new(rk); - } else { - q = useq; - } - - TEST_SAY("Creating %" PRIusz " acls\n", acl_cnt); - - rd_kafka_CreateAcls(rk, acls, acl_cnt, options, q); - - rd_kafka_AdminOptions_destroy(options); - - if (useq) - return RD_KAFKA_RESP_ERR_NO_ERROR; - - err = test_wait_topic_admin_result(q, RD_KAFKA_EVENT_CREATEACLS_RESULT, - NULL, tmout + 5000); - - rd_kafka_queue_destroy(q); - - if (err) - TEST_FAIL("Failed to create %d acl(s): %s", (int)acl_cnt, - rd_kafka_err2str(err)); - - return err; -} - -static void test_free_string_array(char **strs, size_t cnt) { - size_t i; - for (i = 0; i < cnt; i++) - free(strs[i]); - free(strs); -} - - -/** - * @return an array of all topics in the cluster matching our the - * rdkafka test prefix. - */ -static rd_kafka_resp_err_t -test_get_all_test_topics(rd_kafka_t *rk, char ***topicsp, size_t *topic_cntp) { - size_t test_topic_prefix_len = strlen(test_topic_prefix); - const rd_kafka_metadata_t *md; - char **topics = NULL; - size_t topic_cnt = 0; - int i; - rd_kafka_resp_err_t err; - - *topic_cntp = 0; - if (topicsp) - *topicsp = NULL; - - /* Retrieve list of topics */ - err = rd_kafka_metadata(rk, 1 /*all topics*/, NULL, &md, - tmout_multip(10000)); - if (err) { - TEST_WARN( - "%s: Failed to acquire metadata: %s: " - "not deleting any topics\n", - __FUNCTION__, rd_kafka_err2str(err)); - return err; - } - - if (md->topic_cnt == 0) { - TEST_WARN("%s: No topics in cluster\n", __FUNCTION__); - rd_kafka_metadata_destroy(md); - return RD_KAFKA_RESP_ERR_NO_ERROR; - } - - if (topicsp) - topics = malloc(sizeof(*topics) * md->topic_cnt); - - for (i = 0; i < md->topic_cnt; i++) { - if (strlen(md->topics[i].topic) >= test_topic_prefix_len && - !strncmp(md->topics[i].topic, test_topic_prefix, - test_topic_prefix_len)) { - if (topicsp) - topics[topic_cnt++] = - rd_strdup(md->topics[i].topic); - else - topic_cnt++; - } - } - - if (topic_cnt == 0) { - TEST_SAY( - "%s: No topics (out of %d) matching our " - "test prefix (%s)\n", - __FUNCTION__, md->topic_cnt, test_topic_prefix); - rd_kafka_metadata_destroy(md); - if (topics) - test_free_string_array(topics, topic_cnt); - return RD_KAFKA_RESP_ERR_NO_ERROR; - } - - rd_kafka_metadata_destroy(md); - - if (topicsp) - *topicsp = topics; - *topic_cntp = topic_cnt; - - return RD_KAFKA_RESP_ERR_NO_ERROR; -} - -/** - * @brief Delete all test topics using the Kafka Admin API. - */ -rd_kafka_resp_err_t test_delete_all_test_topics(int timeout_ms) { - rd_kafka_t *rk; - char **topics; - size_t topic_cnt = 0; - rd_kafka_resp_err_t err; - int i; - rd_kafka_AdminOptions_t *options; - rd_kafka_queue_t *q; - char errstr[256]; - int64_t abs_timeout = test_clock() + ((int64_t)timeout_ms * 1000); - - rk = test_create_producer(); - - err = test_get_all_test_topics(rk, &topics, &topic_cnt); - if (err) { - /* Error already reported by test_get_all_test_topics() */ - rd_kafka_destroy(rk); - return err; - } - - if (topic_cnt == 0) { - rd_kafka_destroy(rk); - return RD_KAFKA_RESP_ERR_NO_ERROR; - } - - q = rd_kafka_queue_get_main(rk); - - options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DELETETOPICS); - if (rd_kafka_AdminOptions_set_operation_timeout(options, 2 * 60 * 1000, - errstr, sizeof(errstr))) - TEST_SAY(_C_YEL - "Failed to set DeleteTopics timeout: %s: " - "ignoring\n", - errstr); - - TEST_SAY(_C_MAG - "====> Deleting all test topics with <====" - "a timeout of 2 minutes\n"); - - test_DeleteTopics_simple(rk, q, topics, topic_cnt, options); - - rd_kafka_AdminOptions_destroy(options); - - while (1) { - rd_kafka_event_t *rkev; - const rd_kafka_DeleteTopics_result_t *res; - - rkev = rd_kafka_queue_poll(q, -1); - - res = rd_kafka_event_DeleteTopics_result(rkev); - if (!res) { - TEST_SAY("%s: Ignoring event: %s: %s\n", __FUNCTION__, - rd_kafka_event_name(rkev), - rd_kafka_event_error_string(rkev)); - rd_kafka_event_destroy(rkev); - continue; - } - - if (rd_kafka_event_error(rkev)) { - TEST_WARN("%s: DeleteTopics for %" PRIusz - " topics " - "failed: %s\n", - __FUNCTION__, topic_cnt, - rd_kafka_event_error_string(rkev)); - err = rd_kafka_event_error(rkev); - } else { - const rd_kafka_topic_result_t **terr; - size_t tcnt; - int okcnt = 0; - - terr = rd_kafka_DeleteTopics_result_topics(res, &tcnt); - - for (i = 0; i < (int)tcnt; i++) { - if (!rd_kafka_topic_result_error(terr[i])) { - okcnt++; - continue; - } - - TEST_WARN("%s: Failed to delete topic %s: %s\n", - __FUNCTION__, - rd_kafka_topic_result_name(terr[i]), - rd_kafka_topic_result_error_string( - terr[i])); - } - - TEST_SAY( - "%s: DeleteTopics " - "succeeded for %d/%" PRIusz " topics\n", - __FUNCTION__, okcnt, topic_cnt); - err = RD_KAFKA_RESP_ERR_NO_ERROR; - } - - rd_kafka_event_destroy(rkev); - break; - } - - rd_kafka_queue_destroy(q); - - test_free_string_array(topics, topic_cnt); - - /* Wait for topics to be fully deleted */ - while (1) { - err = test_get_all_test_topics(rk, NULL, &topic_cnt); - - if (!err && topic_cnt == 0) - break; - - if (abs_timeout < test_clock()) { - TEST_WARN( - "%s: Timed out waiting for " - "remaining %" PRIusz - " deleted topics " - "to disappear from cluster metadata\n", - __FUNCTION__, topic_cnt); - break; - } - - TEST_SAY("Waiting for remaining %" PRIusz - " delete topics " - "to disappear from cluster metadata\n", - topic_cnt); - - rd_sleep(1); - } - - rd_kafka_destroy(rk); - - return err; -} - - - -void test_fail0(const char *file, - int line, - const char *function, - int do_lock, - int fail_now, - const char *fmt, - ...) { - char buf[512]; - int is_thrd = 0; - size_t of; - va_list ap; - char *t; - char timestr[32]; - time_t tnow = time(NULL); - -#ifdef __MINGW32__ - strftime(timestr, sizeof(timestr), "%a %b %d %H:%M:%S %Y", - localtime(&tnow)); -#elif defined(_WIN32) - ctime_s(timestr, sizeof(timestr), &tnow); -#else - ctime_r(&tnow, timestr); -#endif - t = strchr(timestr, '\n'); - if (t) - *t = '\0'; - - of = rd_snprintf(buf, sizeof(buf), "%s%s%s():%i: ", test_curr->subtest, - *test_curr->subtest ? ": " : "", function, line); - rd_assert(of < sizeof(buf)); - - va_start(ap, fmt); - rd_vsnprintf(buf + of, sizeof(buf) - of, fmt, ap); - va_end(ap); - - /* Remove trailing newline */ - if ((t = strchr(buf, '\n')) && !*(t + 1)) - *t = '\0'; - - TEST_SAYL(0, "TEST FAILURE\n"); - fprintf(stderr, - "\033[31m### Test \"%s%s%s%s\" failed at %s:%i:%s() at %s: " - "###\n" - "%s\n", - test_curr->name, *test_curr->subtest ? " (" : "", - test_curr->subtest, *test_curr->subtest ? ")" : "", file, line, - function, timestr, buf + of); - if (do_lock) - TEST_LOCK(); - test_curr->state = TEST_FAILED; - test_curr->failcnt += 1; - test_curr->is_fatal_cb = NULL; - - if (!*test_curr->failstr) { - strncpy(test_curr->failstr, buf, sizeof(test_curr->failstr)); - test_curr->failstr[sizeof(test_curr->failstr) - 1] = '\0'; - } - if (fail_now && test_curr->mainfunc) { - tests_running_cnt--; - is_thrd = 1; - } - if (do_lock) - TEST_UNLOCK(); - if (!fail_now) - return; - if (test_assert_on_fail || !is_thrd) - assert(0); - else - thrd_exit(0); -} - - -/** - * @brief Destroy a mock cluster and its underlying rd_kafka_t handle - */ -void test_mock_cluster_destroy(rd_kafka_mock_cluster_t *mcluster) { - rd_kafka_t *rk = rd_kafka_mock_cluster_handle(mcluster); - rd_kafka_mock_cluster_destroy(mcluster); - rd_kafka_destroy(rk); -} - - - -/** - * @brief Create a standalone mock cluster that can be used by multiple - * rd_kafka_t instances. - */ -rd_kafka_mock_cluster_t *test_mock_cluster_new(int broker_cnt, - const char **bootstraps) { - rd_kafka_t *rk; - rd_kafka_conf_t *conf = rd_kafka_conf_new(); - rd_kafka_mock_cluster_t *mcluster; - char errstr[256]; - - test_conf_common_init(conf, 0); - - test_conf_set(conf, "client.id", "MOCK"); - - rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); - TEST_ASSERT(rk, "Failed to create mock cluster rd_kafka_t: %s", errstr); - - mcluster = rd_kafka_mock_cluster_new(rk, broker_cnt); - TEST_ASSERT(mcluster, "Failed to acquire mock cluster"); - - if (bootstraps) - *bootstraps = rd_kafka_mock_cluster_bootstraps(mcluster); - - return mcluster; -} - - - -/** - * @name Sub-tests - */ - - -/** - * @brief Start a sub-test. \p fmt is optional and allows additional - * sub-test info to be displayed, e.g., test parameters. - * - * @returns 0 if sub-test should not be run, else 1. - */ -int test_sub_start(const char *func, - int line, - int is_quick, - const char *fmt, - ...) { - - if (!is_quick && test_quick) - return 0; - - if (fmt && *fmt) { - va_list ap; - char buf[256]; - - va_start(ap, fmt); - rd_vsnprintf(buf, sizeof(buf), fmt, ap); - va_end(ap); - - rd_snprintf(test_curr->subtest, sizeof(test_curr->subtest), - "%s:%d: %s", func, line, buf); - } else { - rd_snprintf(test_curr->subtest, sizeof(test_curr->subtest), - "%s:%d", func, line); - } - - if (subtests_to_run && !strstr(test_curr->subtest, subtests_to_run)) { - *test_curr->subtest = '\0'; - return 0; - } - - test_curr->subtest_quick = is_quick; - - TIMING_START(&test_curr->subtest_duration, "SUBTEST"); - - TEST_SAY(_C_MAG "[ %s ]\n", test_curr->subtest); - - return 1; -} - - -/** - * @brief Reset the current subtest state. - */ -static void test_sub_reset(void) { - *test_curr->subtest = '\0'; - test_curr->is_fatal_cb = NULL; - test_curr->ignore_dr_err = rd_false; - test_curr->exp_dr_err = RD_KAFKA_RESP_ERR_NO_ERROR; - /* Don't check msg status by default */ - test_curr->exp_dr_status = (rd_kafka_msg_status_t)-1; - test_curr->dr_mv = NULL; -} - -/** - * @brief Sub-test has passed. - */ -void test_sub_pass(void) { - - TEST_ASSERT(*test_curr->subtest); - - TEST_SAYL(1, _C_GRN "[ %s: PASS (%.02fs) ]\n", test_curr->subtest, - (float)(TIMING_DURATION(&test_curr->subtest_duration) / - 1000000.0f)); - - if (test_curr->subtest_quick && test_quick && !test_on_ci && - TIMING_DURATION(&test_curr->subtest_duration) > 45 * 1000 * 1000) - TEST_WARN( - "Subtest %s marked as QUICK but took %.02fs to " - "finish: either fix the test or " - "remove the _QUICK identifier (limit is 45s)\n", - test_curr->subtest, - (float)(TIMING_DURATION(&test_curr->subtest_duration) / - 1000000.0f)); - - test_sub_reset(); -} - - -/** - * @brief Skip sub-test (must have been started with SUB_TEST*()). - */ -void test_sub_skip(const char *fmt, ...) { - va_list ap; - char buf[256]; - - TEST_ASSERT(*test_curr->subtest); - - va_start(ap, fmt); - rd_vsnprintf(buf, sizeof(buf), fmt, ap); - va_end(ap); - - TEST_SAYL(1, _C_YEL "[ %s: SKIP: %s ]\n", test_curr->subtest, buf); - - test_sub_reset(); -} |