summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/tests/test.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/test.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/tests/test.c6960
1 files changed, 0 insertions, 6960 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/test.c b/fluent-bit/lib/librdkafka-2.1.0/tests/test.c
deleted file mode 100644
index 71180c8f4..000000000
--- a/fluent-bit/lib/librdkafka-2.1.0/tests/test.c
+++ /dev/null
@@ -1,6960 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2013, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-
-#define _CRT_RAND_S // rand_s() on MSVC
-#include <stdarg.h>
-#include "test.h"
-#include <signal.h>
-#include <stdlib.h>
-#include <stdio.h>
-
-#ifdef _WIN32
-#include <direct.h> /* _getcwd */
-#else
-#include <sys/wait.h> /* waitpid */
-#endif
-
-/* Typical include path would be <librdkafka/rdkafka.h>, but this program
- * is built from within the librdkafka source tree and thus differs. */
-#include "rdkafka.h"
-
-int test_level = 2;
-int test_seed = 0;
-
-char test_mode[64] = "bare";
-char test_scenario[64] = "default";
-static volatile sig_atomic_t test_exit = 0;
-static char test_topic_prefix[128] = "rdkafkatest";
-static int test_topic_random = 0;
-int tests_running_cnt = 0;
-int test_concurrent_max = 5;
-int test_assert_on_fail = 0;
-double test_timeout_multiplier = 1.0;
-static char *test_sql_cmd = NULL;
-int test_session_timeout_ms = 6000;
-int test_broker_version;
-static const char *test_broker_version_str = "2.4.0.0";
-int test_flags = 0;
-int test_neg_flags = TEST_F_KNOWN_ISSUE;
-/* run delete-test-topics.sh between each test (when concurrent_max = 1) */
-static int test_delete_topics_between = 0;
-static const char *test_git_version = "HEAD";
-static const char *test_sockem_conf = "";
-int test_on_ci = 0; /* Tests are being run on CI, be more forgiving
- * with regards to timeouts, etc. */
-int test_quick = 0; /** Run tests quickly */
-int test_idempotent_producer = 0;
-int test_rusage = 0; /**< Check resource usage */
-/**< CPU speed calibration for rusage threshold checks.
- * >1.0: CPU is slower than base line system,
- * <1.0: CPU is faster than base line system. */
-double test_rusage_cpu_calibration = 1.0;
-static const char *tests_to_run = NULL; /* all */
-static const char *subtests_to_run = NULL; /* all */
-static const char *tests_to_skip = NULL; /* none */
-int test_write_report = 0; /**< Write test report file */
-
-static int show_summary = 1;
-static int test_summary(int do_lock);
-
-/**
- * Protects shared state, such as tests[]
- */
-mtx_t test_mtx;
-cnd_t test_cnd;
-
-static const char *test_states[] = {
- "DNS", "SKIPPED", "RUNNING", "PASSED", "FAILED",
-};
-
-
-
-#define _TEST_DECL(NAME) extern int main_##NAME(int, char **)
-#define _TEST(NAME, FLAGS, ...) \
- { .name = #NAME, .mainfunc = main_##NAME, .flags = FLAGS, __VA_ARGS__ }
-
-
-/**
- * Declare all tests here
- */
-_TEST_DECL(0000_unittests);
-_TEST_DECL(0001_multiobj);
-_TEST_DECL(0002_unkpart);
-_TEST_DECL(0003_msgmaxsize);
-_TEST_DECL(0004_conf);
-_TEST_DECL(0005_order);
-_TEST_DECL(0006_symbols);
-_TEST_DECL(0007_autotopic);
-_TEST_DECL(0008_reqacks);
-_TEST_DECL(0009_mock_cluster);
-_TEST_DECL(0011_produce_batch);
-_TEST_DECL(0012_produce_consume);
-_TEST_DECL(0013_null_msgs);
-_TEST_DECL(0014_reconsume_191);
-_TEST_DECL(0015_offsets_seek);
-_TEST_DECL(0016_client_swname);
-_TEST_DECL(0017_compression);
-_TEST_DECL(0018_cgrp_term);
-_TEST_DECL(0019_list_groups);
-_TEST_DECL(0020_destroy_hang);
-_TEST_DECL(0021_rkt_destroy);
-_TEST_DECL(0022_consume_batch);
-_TEST_DECL(0022_consume_batch_local);
-_TEST_DECL(0025_timers);
-_TEST_DECL(0026_consume_pause);
-_TEST_DECL(0028_long_topicnames);
-_TEST_DECL(0029_assign_offset);
-_TEST_DECL(0030_offset_commit);
-_TEST_DECL(0031_get_offsets);
-_TEST_DECL(0033_regex_subscribe);
-_TEST_DECL(0033_regex_subscribe_local);
-_TEST_DECL(0034_offset_reset);
-_TEST_DECL(0034_offset_reset_mock);
-_TEST_DECL(0035_api_version);
-_TEST_DECL(0036_partial_fetch);
-_TEST_DECL(0037_destroy_hang_local);
-_TEST_DECL(0038_performance);
-_TEST_DECL(0039_event_dr);
-_TEST_DECL(0039_event_log);
-_TEST_DECL(0039_event);
-_TEST_DECL(0040_io_event);
-_TEST_DECL(0041_fetch_max_bytes);
-_TEST_DECL(0042_many_topics);
-_TEST_DECL(0043_no_connection);
-_TEST_DECL(0044_partition_cnt);
-_TEST_DECL(0045_subscribe_update);
-_TEST_DECL(0045_subscribe_update_topic_remove);
-_TEST_DECL(0045_subscribe_update_non_exist_and_partchange);
-_TEST_DECL(0045_subscribe_update_mock);
-_TEST_DECL(0046_rkt_cache);
-_TEST_DECL(0047_partial_buf_tmout);
-_TEST_DECL(0048_partitioner);
-_TEST_DECL(0049_consume_conn_close);
-_TEST_DECL(0050_subscribe_adds);
-_TEST_DECL(0051_assign_adds);
-_TEST_DECL(0052_msg_timestamps);
-_TEST_DECL(0053_stats_timing);
-_TEST_DECL(0053_stats);
-_TEST_DECL(0054_offset_time);
-_TEST_DECL(0055_producer_latency);
-_TEST_DECL(0056_balanced_group_mt);
-_TEST_DECL(0057_invalid_topic);
-_TEST_DECL(0058_log);
-_TEST_DECL(0059_bsearch);
-_TEST_DECL(0060_op_prio);
-_TEST_DECL(0061_consumer_lag);
-_TEST_DECL(0062_stats_event);
-_TEST_DECL(0063_clusterid);
-_TEST_DECL(0064_interceptors);
-_TEST_DECL(0065_yield);
-_TEST_DECL(0066_plugins);
-_TEST_DECL(0067_empty_topic);
-_TEST_DECL(0068_produce_timeout);
-_TEST_DECL(0069_consumer_add_parts);
-_TEST_DECL(0070_null_empty);
-_TEST_DECL(0072_headers_ut);
-_TEST_DECL(0073_headers);
-_TEST_DECL(0074_producev);
-_TEST_DECL(0075_retry);
-_TEST_DECL(0076_produce_retry);
-_TEST_DECL(0077_compaction);
-_TEST_DECL(0078_c_from_cpp);
-_TEST_DECL(0079_fork);
-_TEST_DECL(0080_admin_ut);
-_TEST_DECL(0081_admin);
-_TEST_DECL(0082_fetch_max_bytes);
-_TEST_DECL(0083_cb_event);
-_TEST_DECL(0084_destroy_flags_local);
-_TEST_DECL(0084_destroy_flags);
-_TEST_DECL(0085_headers);
-_TEST_DECL(0086_purge_local);
-_TEST_DECL(0086_purge_remote);
-_TEST_DECL(0088_produce_metadata_timeout);
-_TEST_DECL(0089_max_poll_interval);
-_TEST_DECL(0090_idempotence);
-_TEST_DECL(0091_max_poll_interval_timeout);
-_TEST_DECL(0092_mixed_msgver);
-_TEST_DECL(0093_holb_consumer);
-_TEST_DECL(0094_idempotence_msg_timeout);
-_TEST_DECL(0095_all_brokers_down);
-_TEST_DECL(0097_ssl_verify);
-_TEST_DECL(0097_ssl_verify_local);
-_TEST_DECL(0098_consumer_txn);
-_TEST_DECL(0099_commit_metadata);
-_TEST_DECL(0100_thread_interceptors);
-_TEST_DECL(0101_fetch_from_follower);
-_TEST_DECL(0102_static_group_rebalance);
-_TEST_DECL(0103_transactions_local);
-_TEST_DECL(0103_transactions);
-_TEST_DECL(0104_fetch_from_follower_mock);
-_TEST_DECL(0105_transactions_mock);
-_TEST_DECL(0106_cgrp_sess_timeout);
-_TEST_DECL(0107_topic_recreate);
-_TEST_DECL(0109_auto_create_topics);
-_TEST_DECL(0110_batch_size);
-_TEST_DECL(0111_delay_create_topics);
-_TEST_DECL(0112_assign_unknown_part);
-_TEST_DECL(0113_cooperative_rebalance_local);
-_TEST_DECL(0113_cooperative_rebalance);
-_TEST_DECL(0114_sticky_partitioning);
-_TEST_DECL(0115_producer_auth);
-_TEST_DECL(0116_kafkaconsumer_close);
-_TEST_DECL(0117_mock_errors);
-_TEST_DECL(0118_commit_rebalance);
-_TEST_DECL(0119_consumer_auth);
-_TEST_DECL(0120_asymmetric_subscription);
-_TEST_DECL(0121_clusterid);
-_TEST_DECL(0122_buffer_cleaning_after_rebalance);
-_TEST_DECL(0123_connections_max_idle);
-_TEST_DECL(0124_openssl_invalid_engine);
-_TEST_DECL(0125_immediate_flush);
-_TEST_DECL(0126_oauthbearer_oidc);
-_TEST_DECL(0128_sasl_callback_queue);
-_TEST_DECL(0129_fetch_aborted_msgs);
-_TEST_DECL(0130_store_offsets);
-_TEST_DECL(0131_connect_timeout);
-_TEST_DECL(0132_strategy_ordering);
-_TEST_DECL(0133_ssl_keys);
-_TEST_DECL(0134_ssl_provider);
-_TEST_DECL(0135_sasl_credentials);
-_TEST_DECL(0136_resolve_cb);
-_TEST_DECL(0137_barrier_batch_consume);
-_TEST_DECL(0138_admin_mock);
-
-/* Manual tests */
-_TEST_DECL(8000_idle);
-
-
-/* Define test resource usage thresholds if the default limits
- * are not tolerable.
- *
- * Fields:
- * .ucpu - Max User CPU percentage (double)
- * .scpu - Max System/Kernel CPU percentage (double)
- * .rss - Max RSS (memory) in megabytes (double)
- * .ctxsw - Max number of voluntary context switches (int)
- *
- * Also see test_rusage_check_thresholds() in rusage.c
- *
- * Make a comment in the _THRES() below why the extra thresholds are required.
- *
- * Usage:
- * _TEST(00...., ...,
- * _THRES(.ucpu = 15.0)), <-- Max 15% User CPU usage
- */
-#define _THRES(...) .rusage_thres = {__VA_ARGS__}
-
-/**
- * Define all tests here
- */
-struct test tests[] = {
- /* Special MAIN test to hold over-all timings, etc. */
- {.name = "<MAIN>", .flags = TEST_F_LOCAL},
- _TEST(0000_unittests,
- TEST_F_LOCAL,
- /* The msgq insert order tests are heavy on
- * user CPU (memory scan), RSS, and
- * system CPU (lots of allocations -> madvise(2)). */
- _THRES(.ucpu = 100.0, .scpu = 20.0, .rss = 900.0)),
- _TEST(0001_multiobj, 0),
- _TEST(0002_unkpart, 0),
- _TEST(0003_msgmaxsize, 0),
- _TEST(0004_conf, TEST_F_LOCAL),
- _TEST(0005_order, 0),
- _TEST(0006_symbols, TEST_F_LOCAL),
- _TEST(0007_autotopic, 0),
- _TEST(0008_reqacks, 0),
- _TEST(0009_mock_cluster,
- TEST_F_LOCAL,
- /* Mock cluster requires MsgVersion 2 */
- TEST_BRKVER(0, 11, 0, 0)),
- _TEST(0011_produce_batch,
- 0,
- /* Produces a lot of messages */
- _THRES(.ucpu = 40.0, .scpu = 8.0)),
- _TEST(0012_produce_consume, 0),
- _TEST(0013_null_msgs, 0),
- _TEST(0014_reconsume_191, 0),
- _TEST(0015_offsets_seek, 0),
- _TEST(0016_client_swname, 0),
- _TEST(0017_compression, 0),
- _TEST(0018_cgrp_term, 0, TEST_BRKVER(0, 9, 0, 0)),
- _TEST(0019_list_groups, 0, TEST_BRKVER(0, 9, 0, 0)),
- _TEST(0020_destroy_hang, 0, TEST_BRKVER(0, 9, 0, 0)),
- _TEST(0021_rkt_destroy, 0),
- _TEST(0022_consume_batch, 0),
- _TEST(0022_consume_batch_local, TEST_F_LOCAL),
- _TEST(0025_timers, TEST_F_LOCAL),
- _TEST(0026_consume_pause, 0, TEST_BRKVER(0, 9, 0, 0)),
- _TEST(0028_long_topicnames,
- TEST_F_KNOWN_ISSUE,
- TEST_BRKVER(0, 9, 0, 0),
- .extra = "https://github.com/edenhill/librdkafka/issues/529"),
- _TEST(0029_assign_offset, 0),
- _TEST(0030_offset_commit,
- 0,
- TEST_BRKVER(0, 9, 0, 0),
- /* Loops over committed() until timeout */
- _THRES(.ucpu = 10.0, .scpu = 5.0)),
- _TEST(0031_get_offsets, 0),
- _TEST(0033_regex_subscribe, 0, TEST_BRKVER(0, 9, 0, 0)),
- _TEST(0033_regex_subscribe_local, TEST_F_LOCAL),
- _TEST(0034_offset_reset, 0),
- _TEST(0034_offset_reset_mock, TEST_F_LOCAL),
- _TEST(0035_api_version, 0),
- _TEST(0036_partial_fetch, 0),
- _TEST(0037_destroy_hang_local, TEST_F_LOCAL),
- _TEST(0038_performance,
- 0,
- /* Produces and consumes a lot of messages */
- _THRES(.ucpu = 150.0, .scpu = 10)),
- _TEST(0039_event_dr, 0),
- _TEST(0039_event_log, TEST_F_LOCAL),
- _TEST(0039_event, TEST_F_LOCAL),
- _TEST(0040_io_event, 0, TEST_BRKVER(0, 9, 0, 0)),
- _TEST(0041_fetch_max_bytes,
- 0,
- /* Re-fetches large messages multiple times */
- _THRES(.ucpu = 20.0, .scpu = 10.0)),
- _TEST(0042_many_topics, 0),
- _TEST(0043_no_connection, TEST_F_LOCAL),
- _TEST(0044_partition_cnt,
- 0,
- TEST_BRKVER(1, 0, 0, 0),
- /* Produces a lot of messages */
- _THRES(.ucpu = 30.0)),
- _TEST(0045_subscribe_update, 0, TEST_BRKVER(0, 9, 0, 0)),
- _TEST(0045_subscribe_update_topic_remove,
- 0,
- TEST_BRKVER(0, 9, 0, 0),
- .scenario = "noautocreate"),
- _TEST(0045_subscribe_update_non_exist_and_partchange,
- 0,
- TEST_BRKVER(0, 9, 0, 0),
- .scenario = "noautocreate"),
- _TEST(0045_subscribe_update_mock, TEST_F_LOCAL),
- _TEST(0046_rkt_cache, TEST_F_LOCAL),
- _TEST(0047_partial_buf_tmout, TEST_F_KNOWN_ISSUE),
- _TEST(0048_partitioner,
- 0,
- /* Produces many small messages */
- _THRES(.ucpu = 10.0, .scpu = 5.0)),
-#if WITH_SOCKEM
- _TEST(0049_consume_conn_close, TEST_F_SOCKEM, TEST_BRKVER(0, 9, 0, 0)),
-#endif
- _TEST(0050_subscribe_adds, 0, TEST_BRKVER(0, 9, 0, 0)),
- _TEST(0051_assign_adds, 0, TEST_BRKVER(0, 9, 0, 0)),
- _TEST(0052_msg_timestamps, 0, TEST_BRKVER(0, 10, 0, 0)),
- _TEST(0053_stats_timing, TEST_F_LOCAL),
- _TEST(0053_stats, 0),
- _TEST(0054_offset_time, 0, TEST_BRKVER(0, 10, 1, 0)),
- _TEST(0055_producer_latency, TEST_F_KNOWN_ISSUE_WIN32),
- _TEST(0056_balanced_group_mt, 0, TEST_BRKVER(0, 9, 0, 0)),
- _TEST(0057_invalid_topic, 0, TEST_BRKVER(0, 9, 0, 0)),
- _TEST(0058_log, TEST_F_LOCAL),
- _TEST(0059_bsearch, 0, TEST_BRKVER(0, 10, 0, 0)),
- _TEST(0060_op_prio, 0, TEST_BRKVER(0, 9, 0, 0)),
- _TEST(0061_consumer_lag, 0),
- _TEST(0062_stats_event, TEST_F_LOCAL),
- _TEST(0063_clusterid, 0, TEST_BRKVER(0, 10, 1, 0)),
- _TEST(0064_interceptors, 0, TEST_BRKVER(0, 9, 0, 0)),
- _TEST(0065_yield, 0),
- _TEST(0066_plugins,
- TEST_F_LOCAL | TEST_F_KNOWN_ISSUE_WIN32 | TEST_F_KNOWN_ISSUE_OSX,
- .extra =
- "dynamic loading of tests might not be fixed for this platform"),
- _TEST(0067_empty_topic, 0),
-#if WITH_SOCKEM
- _TEST(0068_produce_timeout, TEST_F_SOCKEM),
-#endif
- _TEST(0069_consumer_add_parts,
- TEST_F_KNOWN_ISSUE_WIN32,
- TEST_BRKVER(1, 0, 0, 0)),
- _TEST(0070_null_empty, 0),
- _TEST(0072_headers_ut, TEST_F_LOCAL),
- _TEST(0073_headers, 0, TEST_BRKVER(0, 11, 0, 0)),
- _TEST(0074_producev, TEST_F_LOCAL),
-#if WITH_SOCKEM
- _TEST(0075_retry, TEST_F_SOCKEM),
-#endif
- _TEST(0076_produce_retry, TEST_F_SOCKEM),
- _TEST(0077_compaction,
- 0,
- /* The test itself requires message headers */
- TEST_BRKVER(0, 11, 0, 0)),
- _TEST(0078_c_from_cpp, TEST_F_LOCAL),
- _TEST(0079_fork,
- TEST_F_LOCAL | TEST_F_KNOWN_ISSUE,
- .extra = "using a fork():ed rd_kafka_t is not supported and will "
- "most likely hang"),
- _TEST(0080_admin_ut, TEST_F_LOCAL),
- _TEST(0081_admin, 0, TEST_BRKVER(0, 10, 2, 0)),
- _TEST(0082_fetch_max_bytes, 0, TEST_BRKVER(0, 10, 1, 0)),
- _TEST(0083_cb_event, 0, TEST_BRKVER(0, 9, 0, 0)),
- _TEST(0084_destroy_flags_local, TEST_F_LOCAL),
- _TEST(0084_destroy_flags, 0),
- _TEST(0085_headers, 0, TEST_BRKVER(0, 11, 0, 0)),
- _TEST(0086_purge_local, TEST_F_LOCAL),
- _TEST(0086_purge_remote, 0),
-#if WITH_SOCKEM
- _TEST(0088_produce_metadata_timeout, TEST_F_SOCKEM),
-#endif
- _TEST(0089_max_poll_interval, 0, TEST_BRKVER(0, 10, 1, 0)),
- _TEST(0090_idempotence, 0, TEST_BRKVER(0, 11, 0, 0)),
- _TEST(0091_max_poll_interval_timeout, 0, TEST_BRKVER(0, 10, 1, 0)),
- _TEST(0092_mixed_msgver, 0, TEST_BRKVER(0, 11, 0, 0)),
- _TEST(0093_holb_consumer, 0, TEST_BRKVER(0, 10, 1, 0)),
-#if WITH_SOCKEM
- _TEST(0094_idempotence_msg_timeout,
- TEST_F_SOCKEM,
- TEST_BRKVER(0, 11, 0, 0)),
-#endif
- _TEST(0095_all_brokers_down, TEST_F_LOCAL),
- _TEST(0097_ssl_verify, 0),
- _TEST(0097_ssl_verify_local, TEST_F_LOCAL),
- _TEST(0098_consumer_txn, 0, TEST_BRKVER(0, 11, 0, 0)),
- _TEST(0099_commit_metadata, 0),
- _TEST(0100_thread_interceptors, TEST_F_LOCAL),
- _TEST(0101_fetch_from_follower, 0, TEST_BRKVER(2, 4, 0, 0)),
- _TEST(0102_static_group_rebalance, 0, TEST_BRKVER(2, 3, 0, 0)),
- _TEST(0103_transactions_local, TEST_F_LOCAL),
- _TEST(0103_transactions,
- 0,
- TEST_BRKVER(0, 11, 0, 0),
- .scenario = "default,ak23"),
- _TEST(0104_fetch_from_follower_mock, TEST_F_LOCAL, TEST_BRKVER(2, 4, 0, 0)),
- _TEST(0105_transactions_mock, TEST_F_LOCAL, TEST_BRKVER(0, 11, 0, 0)),
- _TEST(0106_cgrp_sess_timeout, TEST_F_LOCAL, TEST_BRKVER(0, 11, 0, 0)),
- _TEST(0107_topic_recreate,
- 0,
- TEST_BRKVER_TOPIC_ADMINAPI,
- .scenario = "noautocreate"),
- _TEST(0109_auto_create_topics, 0),
- _TEST(0110_batch_size, 0),
- _TEST(0111_delay_create_topics,
- 0,
- TEST_BRKVER_TOPIC_ADMINAPI,
- .scenario = "noautocreate"),
- _TEST(0112_assign_unknown_part, 0),
- _TEST(0113_cooperative_rebalance_local,
- TEST_F_LOCAL,
- TEST_BRKVER(2, 4, 0, 0)),
- _TEST(0113_cooperative_rebalance, 0, TEST_BRKVER(2, 4, 0, 0)),
- _TEST(0114_sticky_partitioning, 0),
- _TEST(0115_producer_auth, 0, TEST_BRKVER(2, 1, 0, 0)),
- _TEST(0116_kafkaconsumer_close, TEST_F_LOCAL),
- _TEST(0117_mock_errors, TEST_F_LOCAL),
- _TEST(0118_commit_rebalance, 0),
- _TEST(0119_consumer_auth, 0, TEST_BRKVER(2, 1, 0, 0)),
- _TEST(0120_asymmetric_subscription, TEST_F_LOCAL),
- _TEST(0121_clusterid, TEST_F_LOCAL),
- _TEST(0122_buffer_cleaning_after_rebalance, 0, TEST_BRKVER(2, 4, 0, 0)),
- _TEST(0123_connections_max_idle, 0),
- _TEST(0124_openssl_invalid_engine, TEST_F_LOCAL),
- _TEST(0125_immediate_flush, 0),
- _TEST(0126_oauthbearer_oidc, 0, TEST_BRKVER(3, 1, 0, 0)),
- _TEST(0128_sasl_callback_queue, TEST_F_LOCAL, TEST_BRKVER(2, 0, 0, 0)),
- _TEST(0129_fetch_aborted_msgs, 0, TEST_BRKVER(0, 11, 0, 0)),
- _TEST(0130_store_offsets, 0),
- _TEST(0131_connect_timeout, TEST_F_LOCAL),
- _TEST(0132_strategy_ordering, 0, TEST_BRKVER(2, 4, 0, 0)),
- _TEST(0133_ssl_keys, TEST_F_LOCAL),
- _TEST(0134_ssl_provider, TEST_F_LOCAL),
- _TEST(0135_sasl_credentials, 0),
- _TEST(0136_resolve_cb, TEST_F_LOCAL),
- _TEST(0137_barrier_batch_consume, 0),
- _TEST(0138_admin_mock, TEST_F_LOCAL, TEST_BRKVER(2, 4, 0, 0)),
-
- /* Manual tests */
- _TEST(8000_idle, TEST_F_MANUAL),
-
- {NULL}};
-
-
-RD_TLS struct test *test_curr = &tests[0];
-
-
-
-#if WITH_SOCKEM
-/**
- * Socket network emulation with sockem
- */
-
-static void test_socket_add(struct test *test, sockem_t *skm) {
- TEST_LOCK();
- rd_list_add(&test->sockets, skm);
- TEST_UNLOCK();
-}
-
-static void test_socket_del(struct test *test, sockem_t *skm, int do_lock) {
- if (do_lock)
- TEST_LOCK();
- /* Best effort, skm might not have been added if connect_cb failed */
- rd_list_remove(&test->sockets, skm);
- if (do_lock)
- TEST_UNLOCK();
-}
-
-int test_socket_sockem_set_all(const char *key, int val) {
- int i;
- sockem_t *skm;
- int cnt = 0;
-
- TEST_LOCK();
-
- cnt = rd_list_cnt(&test_curr->sockets);
- TEST_SAY("Setting sockem %s=%d on %s%d socket(s)\n", key, val,
- cnt > 0 ? "" : _C_RED, cnt);
-
- RD_LIST_FOREACH(skm, &test_curr->sockets, i) {
- if (sockem_set(skm, key, val, NULL) == -1)
- TEST_FAIL("sockem_set(%s, %d) failed", key, val);
- }
-
- TEST_UNLOCK();
-
- return cnt;
-}
-
-void test_socket_sockem_set(int s, const char *key, int value) {
- sockem_t *skm;
-
- TEST_LOCK();
- skm = sockem_find(s);
- if (skm)
- sockem_set(skm, key, value, NULL);
- TEST_UNLOCK();
-}
-
-void test_socket_close_all(struct test *test, int reinit) {
- TEST_LOCK();
- rd_list_destroy(&test->sockets);
- if (reinit)
- rd_list_init(&test->sockets, 16, (void *)sockem_close);
- TEST_UNLOCK();
-}
-
-
-static int test_connect_cb(int s,
- const struct sockaddr *addr,
- int addrlen,
- const char *id,
- void *opaque) {
- struct test *test = opaque;
- sockem_t *skm;
- int r;
-
- skm = sockem_connect(s, addr, addrlen, test_sockem_conf, 0, NULL);
- if (!skm)
- return errno;
-
- if (test->connect_cb) {
- r = test->connect_cb(test, skm, id);
- if (r)
- return r;
- }
-
- test_socket_add(test, skm);
-
- return 0;
-}
-
-static int test_closesocket_cb(int s, void *opaque) {
- struct test *test = opaque;
- sockem_t *skm;
-
- TEST_LOCK();
- skm = sockem_find(s);
- if (skm) {
- /* Close sockem's sockets */
- sockem_close(skm);
- test_socket_del(test, skm, 0 /*nolock*/);
- }
- TEST_UNLOCK();
-
- /* Close librdkafka's socket */
-#ifdef _WIN32
- closesocket(s);
-#else
- close(s);
-#endif
-
- return 0;
-}
-
-
-void test_socket_enable(rd_kafka_conf_t *conf) {
- rd_kafka_conf_set_connect_cb(conf, test_connect_cb);
- rd_kafka_conf_set_closesocket_cb(conf, test_closesocket_cb);
- rd_kafka_conf_set_opaque(conf, test_curr);
-}
-#endif /* WITH_SOCKEM */
-
-/**
- * @brief For use as the is_fatal_cb(), treating no errors as test-fatal.
- */
-int test_error_is_not_fatal_cb(rd_kafka_t *rk,
- rd_kafka_resp_err_t err,
- const char *reason) {
- return 0;
-}
-
-static void
-test_error_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque) {
- if (test_curr->is_fatal_cb &&
- !test_curr->is_fatal_cb(rk, err, reason)) {
- TEST_SAY(_C_YEL "%s rdkafka error (non-testfatal): %s: %s\n",
- rd_kafka_name(rk), rd_kafka_err2str(err), reason);
- } else {
- if (err == RD_KAFKA_RESP_ERR__FATAL) {
- char errstr[512];
- TEST_SAY(_C_RED "%s Fatal error: %s\n",
- rd_kafka_name(rk), reason);
-
- err = rd_kafka_fatal_error(rk, errstr, sizeof(errstr));
-
- if (test_curr->is_fatal_cb &&
- !test_curr->is_fatal_cb(rk, err, reason))
- TEST_SAY(_C_YEL
- "%s rdkafka ignored FATAL error: "
- "%s: %s\n",
- rd_kafka_name(rk),
- rd_kafka_err2str(err), errstr);
- else
- TEST_FAIL("%s rdkafka FATAL error: %s: %s",
- rd_kafka_name(rk),
- rd_kafka_err2str(err), errstr);
-
- } else {
- TEST_FAIL("%s rdkafka error: %s: %s", rd_kafka_name(rk),
- rd_kafka_err2str(err), reason);
- }
- }
-}
-
-static int
-test_stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) {
- struct test *test = test_curr;
- if (test->stats_fp)
- fprintf(test->stats_fp,
- "{\"test\": \"%s\", \"instance\":\"%s\", "
- "\"stats\": %s}\n",
- test->name, rd_kafka_name(rk), json);
- return 0;
-}
-
-
-/**
- * @brief Limit the test run time (in seconds)
- */
-void test_timeout_set(int timeout) {
- TEST_LOCK();
- TEST_SAY("Setting test timeout to %ds * %.1f\n", timeout,
- test_timeout_multiplier);
- timeout = (int)((double)timeout * test_timeout_multiplier);
- test_curr->timeout = test_clock() + ((int64_t)timeout * 1000000);
- TEST_UNLOCK();
-}
-
-int tmout_multip(int msecs) {
- int r;
- TEST_LOCK();
- r = (int)(((double)(msecs)) * test_timeout_multiplier);
- TEST_UNLOCK();
- return r;
-}
-
-
-
-#ifdef _WIN32
-static void test_init_win32(void) {
- /* Enable VT emulation to support colored output. */
- HANDLE hOut = GetStdHandle(STD_OUTPUT_HANDLE);
- DWORD dwMode = 0;
-
- if (hOut == INVALID_HANDLE_VALUE || !GetConsoleMode(hOut, &dwMode))
- return;
-
-#ifndef ENABLE_VIRTUAL_TERMINAL_PROCESSING
-#define ENABLE_VIRTUAL_TERMINAL_PROCESSING 0x4
-#endif
- dwMode |= ENABLE_VIRTUAL_TERMINAL_PROCESSING;
- SetConsoleMode(hOut, dwMode);
-}
-#endif
-
-
-static void test_init(void) {
- int seed;
- const char *tmp;
-
-
- if (test_seed)
- return;
-
- if ((tmp = test_getenv("TEST_LEVEL", NULL)))
- test_level = atoi(tmp);
- if ((tmp = test_getenv("TEST_MODE", NULL)))
- strncpy(test_mode, tmp, sizeof(test_mode) - 1);
- if ((tmp = test_getenv("TEST_SCENARIO", NULL)))
- strncpy(test_scenario, tmp, sizeof(test_scenario) - 1);
- if ((tmp = test_getenv("TEST_SOCKEM", NULL)))
- test_sockem_conf = tmp;
- if ((tmp = test_getenv("TEST_SEED", NULL)))
- seed = atoi(tmp);
- else
- seed = test_clock() & 0xffffffff;
- if ((tmp = test_getenv("TEST_CPU_CALIBRATION", NULL))) {
- test_rusage_cpu_calibration = strtod(tmp, NULL);
- if (test_rusage_cpu_calibration < 0.00001) {
- fprintf(stderr,
- "%% Invalid CPU calibration "
- "value (from TEST_CPU_CALIBRATION env): %s\n",
- tmp);
- exit(1);
- }
- }
-
-#ifdef _WIN32
- test_init_win32();
- {
- LARGE_INTEGER cycl;
- QueryPerformanceCounter(&cycl);
- seed = (int)cycl.QuadPart;
- }
-#endif
- srand(seed);
- test_seed = seed;
-}
-
-
-const char *test_mk_topic_name(const char *suffix, int randomized) {
- static RD_TLS char ret[512];
-
- /* Strip main_ prefix (caller is using __FUNCTION__) */
- if (!strncmp(suffix, "main_", 5))
- suffix += 5;
-
- if (test_topic_random || randomized)
- rd_snprintf(ret, sizeof(ret), "%s_rnd%" PRIx64 "_%s",
- test_topic_prefix, test_id_generate(), suffix);
- else
- rd_snprintf(ret, sizeof(ret), "%s_%s", test_topic_prefix,
- suffix);
-
- TEST_SAY("Using topic \"%s\"\n", ret);
-
- return ret;
-}
-
-
-/**
- * @brief Set special test config property
- * @returns 1 if property was known, else 0.
- */
-int test_set_special_conf(const char *name, const char *val, int *timeoutp) {
- if (!strcmp(name, "test.timeout.multiplier")) {
- TEST_LOCK();
- test_timeout_multiplier = strtod(val, NULL);
- TEST_UNLOCK();
- *timeoutp = tmout_multip((*timeoutp) * 1000) / 1000;
- } else if (!strcmp(name, "test.topic.prefix")) {
- rd_snprintf(test_topic_prefix, sizeof(test_topic_prefix), "%s",
- val);
- } else if (!strcmp(name, "test.topic.random")) {
- if (!strcmp(val, "true") || !strcmp(val, "1"))
- test_topic_random = 1;
- else
- test_topic_random = 0;
- } else if (!strcmp(name, "test.concurrent.max")) {
- TEST_LOCK();
- test_concurrent_max = (int)strtod(val, NULL);
- TEST_UNLOCK();
- } else if (!strcmp(name, "test.sql.command")) {
- TEST_LOCK();
- if (test_sql_cmd)
- rd_free(test_sql_cmd);
- test_sql_cmd = rd_strdup(val);
- TEST_UNLOCK();
- } else
- return 0;
-
- return 1;
-}
-
-static void test_read_conf_file(const char *conf_path,
- rd_kafka_conf_t *conf,
- rd_kafka_topic_conf_t *topic_conf,
- int *timeoutp) {
- FILE *fp;
- char buf[1024];
- int line = 0;
-
-#ifndef _WIN32
- fp = fopen(conf_path, "r");
-#else
- fp = NULL;
- errno = fopen_s(&fp, conf_path, "r");
-#endif
- if (!fp) {
- if (errno == ENOENT) {
- TEST_SAY("Test config file %s not found\n", conf_path);
- return;
- } else
- TEST_FAIL("Failed to read %s: %s", conf_path,
- strerror(errno));
- }
-
- while (fgets(buf, sizeof(buf) - 1, fp)) {
- char *t;
- char *b = buf;
- rd_kafka_conf_res_t res = RD_KAFKA_CONF_UNKNOWN;
- char *name, *val;
- char errstr[512];
-
- line++;
- if ((t = strchr(b, '\n')))
- *t = '\0';
-
- if (*b == '#' || !*b)
- continue;
-
- if (!(t = strchr(b, '=')))
- TEST_FAIL("%s:%i: expected name=value format\n",
- conf_path, line);
-
- name = b;
- *t = '\0';
- val = t + 1;
-
- if (test_set_special_conf(name, val, timeoutp))
- continue;
-
- if (!strncmp(name, "topic.", strlen("topic."))) {
- name += strlen("topic.");
- if (topic_conf)
- res = rd_kafka_topic_conf_set(topic_conf, name,
- val, errstr,
- sizeof(errstr));
- else
- res = RD_KAFKA_CONF_OK;
- name -= strlen("topic.");
- }
-
- if (res == RD_KAFKA_CONF_UNKNOWN) {
- if (conf)
- res = rd_kafka_conf_set(conf, name, val, errstr,
- sizeof(errstr));
- else
- res = RD_KAFKA_CONF_OK;
- }
-
- if (res != RD_KAFKA_CONF_OK)
- TEST_FAIL("%s:%i: %s\n", conf_path, line, errstr);
- }
-
- fclose(fp);
-}
-
-/**
- * @brief Get path to test config file
- */
-const char *test_conf_get_path(void) {
- return test_getenv("RDKAFKA_TEST_CONF", "test.conf");
-}
-
-const char *test_getenv(const char *env, const char *def) {
- return rd_getenv(env, def);
-}
-
-void test_conf_common_init(rd_kafka_conf_t *conf, int timeout) {
- if (conf) {
- const char *tmp = test_getenv("TEST_DEBUG", NULL);
- if (tmp)
- test_conf_set(conf, "debug", tmp);
- }
-
- if (timeout)
- test_timeout_set(timeout);
-}
-
-
-/**
- * Creates and sets up kafka configuration objects.
- * Will read "test.conf" file if it exists.
- */
-void test_conf_init(rd_kafka_conf_t **conf,
- rd_kafka_topic_conf_t **topic_conf,
- int timeout) {
- const char *test_conf = test_conf_get_path();
-
- if (conf) {
- *conf = rd_kafka_conf_new();
- rd_kafka_conf_set(*conf, "client.id", test_curr->name, NULL, 0);
- if (test_idempotent_producer)
- test_conf_set(*conf, "enable.idempotence", "true");
- rd_kafka_conf_set_error_cb(*conf, test_error_cb);
- rd_kafka_conf_set_stats_cb(*conf, test_stats_cb);
-
- /* Allow higher request timeouts on CI */
- if (test_on_ci)
- test_conf_set(*conf, "request.timeout.ms", "10000");
-
-#ifdef SIGIO
- {
- char buf[64];
-
- /* Quick termination */
- rd_snprintf(buf, sizeof(buf), "%i", SIGIO);
- rd_kafka_conf_set(*conf, "internal.termination.signal",
- buf, NULL, 0);
- signal(SIGIO, SIG_IGN);
- }
-#endif
- }
-
-#if WITH_SOCKEM
- if (*test_sockem_conf && conf)
- test_socket_enable(*conf);
-#endif
-
- if (topic_conf)
- *topic_conf = rd_kafka_topic_conf_new();
-
- /* Open and read optional local test configuration file, if any. */
- test_read_conf_file(test_conf, conf ? *conf : NULL,
- topic_conf ? *topic_conf : NULL, &timeout);
-
- test_conf_common_init(conf ? *conf : NULL, timeout);
-}
-
-
-static RD_INLINE unsigned int test_rand(void) {
- unsigned int r;
-#ifdef _WIN32
- rand_s(&r);
-#else
- r = rand();
-#endif
- return r;
-}
-/**
- * Generate a "unique" test id.
- */
-uint64_t test_id_generate(void) {
- return (((uint64_t)test_rand()) << 32) | (uint64_t)test_rand();
-}
-
-
-/**
- * Generate a "unique" string id
- */
-char *test_str_id_generate(char *dest, size_t dest_size) {
- rd_snprintf(dest, dest_size, "%" PRId64, test_id_generate());
- return dest;
-}
-
-/**
- * Same as test_str_id_generate but returns a temporary string.
- */
-const char *test_str_id_generate_tmp(void) {
- static RD_TLS char ret[64];
- return test_str_id_generate(ret, sizeof(ret));
-}
-
-/**
- * Format a message token.
- * Pad's to dest_size.
- */
-void test_msg_fmt(char *dest,
- size_t dest_size,
- uint64_t testid,
- int32_t partition,
- int msgid) {
- size_t of;
-
- of = rd_snprintf(dest, dest_size,
- "testid=%" PRIu64 ", partition=%" PRId32 ", msg=%i\n",
- testid, partition, msgid);
- if (of < dest_size - 1) {
- memset(dest + of, '!', dest_size - of);
- dest[dest_size - 1] = '\0';
- }
-}
-
-/**
- * @brief Prepare message value and key for test produce.
- */
-void test_prepare_msg(uint64_t testid,
- int32_t partition,
- int msg_id,
- char *val,
- size_t val_size,
- char *key,
- size_t key_size) {
- size_t of = 0;
-
- test_msg_fmt(key, key_size, testid, partition, msg_id);
-
- while (of < val_size) {
- /* Copy-repeat key into val until val_size */
- size_t len = RD_MIN(val_size - of, key_size);
- memcpy(val + of, key, len);
- of += len;
- }
-}
-
-
-
-/**
- * Parse a message token
- */
-void test_msg_parse00(const char *func,
- int line,
- uint64_t testid,
- int32_t exp_partition,
- int *msgidp,
- const char *topic,
- int32_t partition,
- int64_t offset,
- const char *key,
- size_t key_size) {
- char buf[128];
- uint64_t in_testid;
- int in_part;
-
- if (!key)
- TEST_FAIL("%s:%i: Message (%s [%" PRId32 "] @ %" PRId64
- ") "
- "has empty key\n",
- func, line, topic, partition, offset);
-
- rd_snprintf(buf, sizeof(buf), "%.*s", (int)key_size, key);
-
- if (sscanf(buf, "testid=%" SCNu64 ", partition=%i, msg=%i\n",
- &in_testid, &in_part, msgidp) != 3)
- TEST_FAIL("%s:%i: Incorrect key format: %s", func, line, buf);
-
-
- if (testid != in_testid ||
- (exp_partition != -1 && exp_partition != in_part))
- TEST_FAIL("%s:%i: Our testid %" PRIu64
- ", part %i did "
- "not match message: \"%s\"\n",
- func, line, testid, (int)exp_partition, buf);
-}
-
-void test_msg_parse0(const char *func,
- int line,
- uint64_t testid,
- rd_kafka_message_t *rkmessage,
- int32_t exp_partition,
- int *msgidp) {
- test_msg_parse00(func, line, testid, exp_partition, msgidp,
- rd_kafka_topic_name(rkmessage->rkt),
- rkmessage->partition, rkmessage->offset,
- (const char *)rkmessage->key, rkmessage->key_len);
-}
-
-
-struct run_args {
- struct test *test;
- int argc;
- char **argv;
-};
-
-static int run_test0(struct run_args *run_args) {
- struct test *test = run_args->test;
- test_timing_t t_run;
- int r;
- char stats_file[256];
-
- rd_snprintf(stats_file, sizeof(stats_file), "stats_%s_%" PRIu64 ".json",
- test->name, test_id_generate());
- if (!(test->stats_fp = fopen(stats_file, "w+")))
- TEST_SAY("=== Failed to create stats file %s: %s ===\n",
- stats_file, strerror(errno));
-
- test_curr = test;
-
-#if WITH_SOCKEM
- rd_list_init(&test->sockets, 16, (void *)sockem_close);
-#endif
- /* Don't check message status by default */
- test->exp_dr_status = (rd_kafka_msg_status_t)-1;
-
- TEST_SAY("================= Running test %s =================\n",
- test->name);
- if (test->stats_fp)
- TEST_SAY("==== Stats written to file %s ====\n", stats_file);
-
- test_rusage_start(test_curr);
- TIMING_START(&t_run, "%s", test->name);
- test->start = t_run.ts_start;
-
- /* Run test main function */
- r = test->mainfunc(run_args->argc, run_args->argv);
-
- TIMING_STOP(&t_run);
- test_rusage_stop(test_curr,
- (double)TIMING_DURATION(&t_run) / 1000000.0);
-
- TEST_LOCK();
- test->duration = TIMING_DURATION(&t_run);
-
- if (test->state == TEST_SKIPPED) {
- TEST_SAY(
- "================= Test %s SKIPPED "
- "=================\n",
- run_args->test->name);
- } else if (r) {
- test->state = TEST_FAILED;
- TEST_SAY(
- "\033[31m"
- "================= Test %s FAILED ================="
- "\033[0m\n",
- run_args->test->name);
- } else {
- test->state = TEST_PASSED;
- TEST_SAY(
- "\033[32m"
- "================= Test %s PASSED ================="
- "\033[0m\n",
- run_args->test->name);
- }
- TEST_UNLOCK();
-
- cnd_broadcast(&test_cnd);
-
-#if WITH_SOCKEM
- test_socket_close_all(test, 0);
-#endif
-
- if (test->stats_fp) {
- long pos = ftell(test->stats_fp);
- fclose(test->stats_fp);
- test->stats_fp = NULL;
- /* Delete file if nothing was written */
- if (pos == 0) {
-#ifndef _WIN32
- unlink(stats_file);
-#else
- _unlink(stats_file);
-#endif
- }
- }
-
- if (test_delete_topics_between && test_concurrent_max == 1)
- test_delete_all_test_topics(60 * 1000);
-
- return r;
-}
-
-
-
-static int run_test_from_thread(void *arg) {
- struct run_args *run_args = arg;
-
- thrd_detach(thrd_current());
-
- run_test0(run_args);
-
- TEST_LOCK();
- tests_running_cnt--;
- TEST_UNLOCK();
-
- free(run_args);
-
- return 0;
-}
-
-
-/**
- * @brief Check running tests for timeouts.
- * @locks TEST_LOCK MUST be held
- */
-static void check_test_timeouts(void) {
- int64_t now = test_clock();
- struct test *test;
-
- for (test = tests; test->name; test++) {
- if (test->state != TEST_RUNNING)
- continue;
-
- /* Timeout check */
- if (now > test->timeout) {
- struct test *save_test = test_curr;
- test_curr = test;
- test->state = TEST_FAILED;
- test_summary(0 /*no-locks*/);
- TEST_FAIL0(
- __FILE__, __LINE__, 0 /*nolock*/, 0 /*fail-later*/,
- "Test %s%s%s%s timed out "
- "(timeout set to %d seconds)\n",
- test->name, *test->subtest ? " (" : "",
- test->subtest, *test->subtest ? ")" : "",
- (int)(test->timeout - test->start) / 1000000);
- test_curr = save_test;
- tests_running_cnt--; /* fail-later misses this*/
-#ifdef _WIN32
- TerminateThread(test->thrd, -1);
-#else
- pthread_kill(test->thrd, SIGKILL);
-#endif
- }
- }
-}
-
-
-static int run_test(struct test *test, int argc, char **argv) {
- struct run_args *run_args = calloc(1, sizeof(*run_args));
- int wait_cnt = 0;
-
- run_args->test = test;
- run_args->argc = argc;
- run_args->argv = argv;
-
- TEST_LOCK();
- while (tests_running_cnt >= test_concurrent_max) {
- if (!(wait_cnt++ % 100))
- TEST_SAY(
- "Too many tests running (%d >= %d): "
- "postponing %s start...\n",
- tests_running_cnt, test_concurrent_max, test->name);
- cnd_timedwait_ms(&test_cnd, &test_mtx, 100);
-
- check_test_timeouts();
- }
- tests_running_cnt++;
- test->timeout = test_clock() +
- (int64_t)(30.0 * 1000000.0 * test_timeout_multiplier);
- test->state = TEST_RUNNING;
- TEST_UNLOCK();
-
- if (thrd_create(&test->thrd, run_test_from_thread, run_args) !=
- thrd_success) {
- TEST_LOCK();
- tests_running_cnt--;
- test->state = TEST_FAILED;
- TEST_UNLOCK();
-
- TEST_FAIL("Failed to start thread for test %s\n", test->name);
- }
-
- return 0;
-}
-
-static void run_tests(int argc, char **argv) {
- struct test *test;
-
- for (test = tests; test->name; test++) {
- char testnum[128];
- char *t;
- const char *skip_reason = NULL;
- rd_bool_t skip_silent = rd_false;
- char tmp[128];
- const char *scenario =
- test->scenario ? test->scenario : "default";
-
- if (!test->mainfunc)
- continue;
-
- /* Extract test number, as string */
- strncpy(testnum, test->name, sizeof(testnum) - 1);
- testnum[sizeof(testnum) - 1] = '\0';
- if ((t = strchr(testnum, '_')))
- *t = '\0';
-
- if ((test_flags && (test_flags & test->flags) != test_flags)) {
- skip_reason = "filtered due to test flags";
- skip_silent = rd_true;
- }
- if ((test_neg_flags & ~test_flags) & test->flags)
- skip_reason = "Filtered due to negative test flags";
- if (test_broker_version &&
- (test->minver > test_broker_version ||
- (test->maxver && test->maxver < test_broker_version))) {
- rd_snprintf(tmp, sizeof(tmp),
- "not applicable for broker "
- "version %d.%d.%d.%d",
- TEST_BRKVER_X(test_broker_version, 0),
- TEST_BRKVER_X(test_broker_version, 1),
- TEST_BRKVER_X(test_broker_version, 2),
- TEST_BRKVER_X(test_broker_version, 3));
- skip_reason = tmp;
- }
-
- if (!strstr(scenario, test_scenario)) {
- rd_snprintf(tmp, sizeof(tmp),
- "requires test scenario %s", scenario);
- skip_silent = rd_true;
- skip_reason = tmp;
- }
-
- if (tests_to_run && !strstr(tests_to_run, testnum)) {
- skip_reason = "not included in TESTS list";
- skip_silent = rd_true;
- } else if (!tests_to_run && (test->flags & TEST_F_MANUAL)) {
- skip_reason = "manual test";
- skip_silent = rd_true;
- } else if (tests_to_skip && strstr(tests_to_skip, testnum))
- skip_reason = "included in TESTS_SKIP list";
-
- if (!skip_reason) {
- run_test(test, argc, argv);
- } else {
- if (skip_silent) {
- TEST_SAYL(3,
- "================= Skipping test %s "
- "(%s) ================\n",
- test->name, skip_reason);
- TEST_LOCK();
- test->state = TEST_SKIPPED;
- TEST_UNLOCK();
- } else {
- test_curr = test;
- TEST_SKIP("%s\n", skip_reason);
- test_curr = &tests[0];
- }
- }
- }
-}
-
-/**
- * @brief Print summary for all tests.
- *
- * @returns the number of failed tests.
- */
-static int test_summary(int do_lock) {
- struct test *test;
- FILE *report_fp = NULL;
- char report_path[128];
- time_t t;
- struct tm *tm;
- char datestr[64];
- int64_t total_duration = 0;
- int tests_run = 0;
- int tests_failed = 0;
- int tests_failed_known = 0;
- int tests_passed = 0;
- FILE *sql_fp = NULL;
- const char *tmp;
-
- t = time(NULL);
- tm = localtime(&t);
- strftime(datestr, sizeof(datestr), "%Y%m%d%H%M%S", tm);
-
- if ((tmp = test_getenv("TEST_REPORT", NULL)))
- rd_snprintf(report_path, sizeof(report_path), "%s", tmp);
- else if (test_write_report)
- rd_snprintf(report_path, sizeof(report_path),
- "test_report_%s.json", datestr);
- else
- report_path[0] = '\0';
-
- if (*report_path) {
- report_fp = fopen(report_path, "w+");
- if (!report_fp)
- TEST_WARN("Failed to create report file %s: %s\n",
- report_path, strerror(errno));
- else
- fprintf(report_fp,
- "{ \"id\": \"%s_%s\", \"mode\": \"%s\", "
- "\"scenario\": \"%s\", "
- "\"date\": \"%s\", "
- "\"git_version\": \"%s\", "
- "\"broker_version\": \"%s\", "
- "\"tests\": {",
- datestr, test_mode, test_mode, test_scenario,
- datestr, test_git_version,
- test_broker_version_str);
- }
-
- if (do_lock)
- TEST_LOCK();
-
- if (test_sql_cmd) {
-#ifdef _WIN32
- sql_fp = _popen(test_sql_cmd, "w");
-#else
- sql_fp = popen(test_sql_cmd, "w");
-#endif
- if (!sql_fp)
- TEST_WARN("Failed to execute test.sql.command: %s",
- test_sql_cmd);
- else
- fprintf(sql_fp,
- "CREATE TABLE IF NOT EXISTS "
- "runs(runid text PRIMARY KEY, mode text, "
- "date datetime, cnt int, passed int, "
- "failed int, duration numeric);\n"
- "CREATE TABLE IF NOT EXISTS "
- "tests(runid text, mode text, name text, "
- "state text, extra text, duration numeric);\n");
- }
-
- if (show_summary)
- printf(
- "TEST %s (%s, scenario %s) SUMMARY\n"
- "#========================================================="
- "=========#\n",
- datestr, test_mode, test_scenario);
-
- for (test = tests; test->name; test++) {
- const char *color;
- int64_t duration;
- char extra[128] = "";
- int do_count = 1;
-
- if (!(duration = test->duration) && test->start > 0)
- duration = test_clock() - test->start;
-
- if (test == tests) {
- /* <MAIN> test:
- * test accounts for total runtime.
- * dont include in passed/run/failed counts. */
- total_duration = duration;
- do_count = 0;
- }
-
- switch (test->state) {
- case TEST_PASSED:
- color = _C_GRN;
- if (do_count) {
- tests_passed++;
- tests_run++;
- }
- break;
- case TEST_FAILED:
- if (test->flags & TEST_F_KNOWN_ISSUE) {
- rd_snprintf(extra, sizeof(extra),
- " <-- known issue%s%s",
- test->extra ? ": " : "",
- test->extra ? test->extra : "");
- if (do_count)
- tests_failed_known++;
- }
- color = _C_RED;
- if (do_count) {
- tests_failed++;
- tests_run++;
- }
- break;
- case TEST_RUNNING:
- color = _C_MAG;
- if (do_count) {
- tests_failed++; /* All tests should be finished
- */
- tests_run++;
- }
- break;
- case TEST_NOT_STARTED:
- color = _C_YEL;
- if (test->extra)
- rd_snprintf(extra, sizeof(extra), " %s",
- test->extra);
- break;
- default:
- color = _C_CYA;
- break;
- }
-
- if (show_summary &&
- (test->state != TEST_SKIPPED || *test->failstr ||
- (tests_to_run && !strncmp(tests_to_run, test->name,
- strlen(tests_to_run))))) {
- printf("|%s %-40s | %10s | %7.3fs %s|", color,
- test->name, test_states[test->state],
- (double)duration / 1000000.0, _C_CLR);
- if (test->state == TEST_FAILED)
- printf(_C_RED " %s" _C_CLR, test->failstr);
- else if (test->state == TEST_SKIPPED)
- printf(_C_CYA " %s" _C_CLR, test->failstr);
- printf("%s\n", extra);
- }
-
- if (report_fp) {
- int i;
- fprintf(report_fp,
- "%s\"%s\": {"
- "\"name\": \"%s\", "
- "\"state\": \"%s\", "
- "\"known_issue\": %s, "
- "\"extra\": \"%s\", "
- "\"duration\": %.3f, "
- "\"report\": [ ",
- test == tests ? "" : ", ", test->name,
- test->name, test_states[test->state],
- test->flags & TEST_F_KNOWN_ISSUE ? "true"
- : "false",
- test->extra ? test->extra : "",
- (double)duration / 1000000.0);
-
- for (i = 0; i < test->report_cnt; i++) {
- fprintf(report_fp, "%s%s ", i == 0 ? "" : ",",
- test->report_arr[i]);
- }
-
- fprintf(report_fp, "] }");
- }
-
- if (sql_fp)
- fprintf(sql_fp,
- "INSERT INTO tests VALUES("
- "'%s_%s', '%s', '%s', '%s', '%s', %f);\n",
- datestr, test_mode, test_mode, test->name,
- test_states[test->state],
- test->extra ? test->extra : "",
- (double)duration / 1000000.0);
- }
- if (do_lock)
- TEST_UNLOCK();
-
- if (show_summary)
- printf(
- "#========================================================="
- "=========#\n");
-
- if (report_fp) {
- fprintf(report_fp,
- "}, "
- "\"tests_run\": %d, "
- "\"tests_passed\": %d, "
- "\"tests_failed\": %d, "
- "\"duration\": %.3f"
- "}\n",
- tests_run, tests_passed, tests_failed,
- (double)total_duration / 1000000.0);
-
- fclose(report_fp);
- TEST_SAY("# Test report written to %s\n", report_path);
- }
-
- if (sql_fp) {
- fprintf(sql_fp,
- "INSERT INTO runs VALUES('%s_%s', '%s', datetime(), "
- "%d, %d, %d, %f);\n",
- datestr, test_mode, test_mode, tests_run, tests_passed,
- tests_failed, (double)total_duration / 1000000.0);
- fclose(sql_fp);
- }
-
- return tests_failed - tests_failed_known;
-}
-
-#ifndef _WIN32
-static void test_sig_term(int sig) {
- if (test_exit)
- exit(1);
- fprintf(stderr,
- "Exiting tests, waiting for running tests to finish.\n");
- test_exit = 1;
-}
-#endif
-
-/**
- * Wait 'timeout' seconds for rdkafka to kill all its threads and clean up.
- */
-static void test_wait_exit(int timeout) {
- int r;
- time_t start = time(NULL);
-
- while ((r = rd_kafka_thread_cnt()) && timeout-- >= 0) {
- TEST_SAY("%i thread(s) in use by librdkafka, waiting...\n", r);
- rd_sleep(1);
- }
-
- TEST_SAY("%i thread(s) in use by librdkafka\n", r);
-
- if (r > 0)
- TEST_FAIL("%i thread(s) still active in librdkafka", r);
-
- timeout -= (int)(time(NULL) - start);
- if (timeout > 0) {
- TEST_SAY(
- "Waiting %d seconds for all librdkafka memory "
- "to be released\n",
- timeout);
- if (rd_kafka_wait_destroyed(timeout * 1000) == -1)
- TEST_FAIL(
- "Not all internal librdkafka "
- "objects destroyed\n");
- }
-}
-
-
-
-/**
- * @brief Test framework cleanup before termination.
- */
-static void test_cleanup(void) {
- struct test *test;
-
- /* Free report arrays */
- for (test = tests; test->name; test++) {
- int i;
- if (!test->report_arr)
- continue;
- for (i = 0; i < test->report_cnt; i++)
- rd_free(test->report_arr[i]);
- rd_free(test->report_arr);
- test->report_arr = NULL;
- }
-
- if (test_sql_cmd)
- rd_free(test_sql_cmd);
-}
-
-
-int main(int argc, char **argv) {
- int i, r;
- test_timing_t t_all;
- int a, b, c, d;
- const char *tmpver;
-
- mtx_init(&test_mtx, mtx_plain);
- cnd_init(&test_cnd);
-
- test_init();
-
-#ifndef _WIN32
- signal(SIGINT, test_sig_term);
-#endif
- tests_to_run = test_getenv("TESTS", NULL);
- subtests_to_run = test_getenv("SUBTESTS", NULL);
- tests_to_skip = test_getenv("TESTS_SKIP", NULL);
- tmpver = test_getenv("TEST_KAFKA_VERSION", NULL);
- if (!tmpver)
- tmpver = test_getenv("KAFKA_VERSION", test_broker_version_str);
- test_broker_version_str = tmpver;
-
- test_git_version = test_getenv("RDKAFKA_GITVER", "HEAD");
-
- /* Are we running on CI? */
- if (test_getenv("CI", NULL)) {
- test_on_ci = 1;
- test_concurrent_max = 3;
- }
-
- test_conf_init(NULL, NULL, 10);
-
- for (i = 1; i < argc; i++) {
- if (!strncmp(argv[i], "-p", 2) && strlen(argv[i]) > 2) {
- if (test_rusage) {
- fprintf(stderr,
- "%% %s ignored: -R takes preceedence\n",
- argv[i]);
- continue;
- }
- test_concurrent_max = (int)strtod(argv[i] + 2, NULL);
- } else if (!strcmp(argv[i], "-l"))
- test_flags |= TEST_F_LOCAL;
- else if (!strcmp(argv[i], "-L"))
- test_neg_flags |= TEST_F_LOCAL;
- else if (!strcmp(argv[i], "-a"))
- test_assert_on_fail = 1;
- else if (!strcmp(argv[i], "-k"))
- test_flags |= TEST_F_KNOWN_ISSUE;
- else if (!strcmp(argv[i], "-K"))
- test_neg_flags |= TEST_F_KNOWN_ISSUE;
- else if (!strcmp(argv[i], "-E"))
- test_neg_flags |= TEST_F_SOCKEM;
- else if (!strcmp(argv[i], "-V") && i + 1 < argc)
- test_broker_version_str = argv[++i];
- else if (!strcmp(argv[i], "-s") && i + 1 < argc)
- strncpy(test_scenario, argv[++i],
- sizeof(test_scenario) - 1);
- else if (!strcmp(argv[i], "-S"))
- show_summary = 0;
- else if (!strcmp(argv[i], "-D"))
- test_delete_topics_between = 1;
- else if (!strcmp(argv[i], "-P"))
- test_idempotent_producer = 1;
- else if (!strcmp(argv[i], "-Q"))
- test_quick = 1;
- else if (!strcmp(argv[i], "-r"))
- test_write_report = 1;
- else if (!strncmp(argv[i], "-R", 2)) {
- test_rusage = 1;
- test_concurrent_max = 1;
- if (strlen(argv[i]) > strlen("-R")) {
- test_rusage_cpu_calibration =
- strtod(argv[i] + 2, NULL);
- if (test_rusage_cpu_calibration < 0.00001) {
- fprintf(stderr,
- "%% Invalid CPU calibration "
- "value: %s\n",
- argv[i] + 2);
- exit(1);
- }
- }
- } else if (*argv[i] != '-')
- tests_to_run = argv[i];
- else {
- printf(
- "Unknown option: %s\n"
- "\n"
- "Usage: %s [options] [<test-match-substr>]\n"
- "Options:\n"
- " -p<N> Run N tests in parallel\n"
- " -l/-L Only/dont run local tests (no broker "
- "needed)\n"
- " -k/-K Only/dont run tests with known issues\n"
- " -E Don't run sockem tests\n"
- " -a Assert on failures\n"
- " -r Write test_report_...json file.\n"
- " -S Dont show test summary\n"
- " -s <scenario> Test scenario.\n"
- " -V <N.N.N.N> Broker version.\n"
- " -D Delete all test topics between each test "
- "(-p1) or after all tests\n"
- " -P Run all tests with "
- "`enable.idempotency=true`\n"
- " -Q Run tests in quick mode: faster tests, "
- "fewer iterations, less data.\n"
- " -R Check resource usage thresholds.\n"
- " -R<C> Check resource usage thresholds but "
- "adjust CPU thresholds by C (float):\n"
- " C < 1.0: CPU is faster than base line "
- "system.\n"
- " C > 1.0: CPU is slower than base line "
- "system.\n"
- " E.g. -R2.5 = CPU is 2.5x slower than "
- "base line system.\n"
- "\n"
- "Environment variables:\n"
- " TESTS - substring matched test to run (e.g., "
- "0033)\n"
- " SUBTESTS - substring matched subtest to run "
- "(e.g., n_wildcard)\n"
- " TEST_KAFKA_VERSION - broker version (e.g., "
- "0.9.0.1)\n"
- " TEST_SCENARIO - Test scenario\n"
- " TEST_LEVEL - Test verbosity level\n"
- " TEST_MODE - bare, helgrind, valgrind\n"
- " TEST_SEED - random seed\n"
- " RDKAFKA_TEST_CONF - test config file "
- "(test.conf)\n"
- " KAFKA_PATH - Path to kafka source dir\n"
- " ZK_ADDRESS - Zookeeper address\n"
- "\n",
- argv[i], argv[0]);
- exit(1);
- }
- }
-
- TEST_SAY("Git version: %s\n", test_git_version);
-
- if (!strcmp(test_broker_version_str, "trunk"))
- test_broker_version_str = "9.9.9.9"; /* for now */
-
- d = 0;
- if (sscanf(test_broker_version_str, "%d.%d.%d.%d", &a, &b, &c, &d) <
- 3) {
- printf(
- "%% Expected broker version to be in format "
- "N.N.N (N=int), not %s\n",
- test_broker_version_str);
- exit(1);
- }
- test_broker_version = TEST_BRKVER(a, b, c, d);
- TEST_SAY("Broker version: %s (%d.%d.%d.%d)\n", test_broker_version_str,
- TEST_BRKVER_X(test_broker_version, 0),
- TEST_BRKVER_X(test_broker_version, 1),
- TEST_BRKVER_X(test_broker_version, 2),
- TEST_BRKVER_X(test_broker_version, 3));
-
- /* Set up fake "<MAIN>" test for all operations performed in
- * the main thread rather than the per-test threads.
- * Nice side effect is that we get timing and status for main as well.*/
- test_curr = &tests[0];
- test_curr->state = TEST_PASSED;
- test_curr->start = test_clock();
-
- if (test_on_ci) {
- TEST_LOCK();
- test_timeout_multiplier += 2;
- TEST_UNLOCK();
- }
-
- if (!strcmp(test_mode, "helgrind") || !strcmp(test_mode, "drd")) {
- TEST_LOCK();
- test_timeout_multiplier += 5;
- TEST_UNLOCK();
- } else if (!strcmp(test_mode, "valgrind")) {
- TEST_LOCK();
- test_timeout_multiplier += 3;
- TEST_UNLOCK();
- }
-
- /* Broker version 0.9 and api.version.request=true (which is default)
- * will cause a 10s stall per connection. Instead of fixing
- * that for each affected API in every test we increase the timeout
- * multiplier accordingly instead. The typical consume timeout is 5
- * seconds, so a multiplier of 3 should be good. */
- if ((test_broker_version & 0xffff0000) == 0x00090000)
- test_timeout_multiplier += 3;
-
- if (test_concurrent_max > 1)
- test_timeout_multiplier += (double)test_concurrent_max / 3;
-
- TEST_SAY("Tests to run : %s\n", tests_to_run ? tests_to_run : "all");
- if (subtests_to_run)
- TEST_SAY("Sub tests : %s\n", subtests_to_run);
- if (tests_to_skip)
- TEST_SAY("Skip tests : %s\n", tests_to_skip);
- TEST_SAY("Test mode : %s%s%s\n", test_quick ? "quick, " : "",
- test_mode, test_on_ci ? ", CI" : "");
- TEST_SAY("Test scenario: %s\n", test_scenario);
- TEST_SAY("Test filter : %s\n", (test_flags & TEST_F_LOCAL)
- ? "local tests only"
- : "no filter");
- TEST_SAY("Test timeout multiplier: %.1f\n", test_timeout_multiplier);
- TEST_SAY("Action on test failure: %s\n",
- test_assert_on_fail ? "assert crash" : "continue other tests");
- if (test_rusage)
- TEST_SAY("Test rusage : yes (%.2fx CPU calibration)\n",
- test_rusage_cpu_calibration);
- if (test_idempotent_producer)
- TEST_SAY("Test Idempotent Producer: enabled\n");
-
- {
- char cwd[512], *pcwd;
-#ifdef _WIN32
- pcwd = _getcwd(cwd, sizeof(cwd) - 1);
-#else
- pcwd = getcwd(cwd, sizeof(cwd) - 1);
-#endif
- if (pcwd)
- TEST_SAY("Current directory: %s\n", cwd);
- }
-
- test_timeout_set(30);
-
- TIMING_START(&t_all, "ALL-TESTS");
-
- /* Run tests */
- run_tests(argc, argv);
-
- TEST_LOCK();
- while (tests_running_cnt > 0 && !test_exit) {
- struct test *test;
-
- if (!test_quick && test_level >= 2) {
- TEST_SAY("%d test(s) running:", tests_running_cnt);
-
- for (test = tests; test->name; test++) {
- if (test->state != TEST_RUNNING)
- continue;
-
- TEST_SAY0(" %s", test->name);
- }
-
- TEST_SAY0("\n");
- }
-
- check_test_timeouts();
-
- TEST_UNLOCK();
-
- if (test_quick)
- rd_usleep(200 * 1000, NULL);
- else
- rd_sleep(1);
- TEST_LOCK();
- }
-
- TIMING_STOP(&t_all);
-
- test_curr = &tests[0];
- test_curr->duration = test_clock() - test_curr->start;
-
- TEST_UNLOCK();
-
- if (test_delete_topics_between)
- test_delete_all_test_topics(60 * 1000);
-
- r = test_summary(1 /*lock*/) ? 1 : 0;
-
- /* Wait for everything to be cleaned up since broker destroys are
- * handled in its own thread. */
- test_wait_exit(0);
-
- /* If we havent failed at this point then
- * there were no threads leaked */
- if (r == 0)
- TEST_SAY("\n============== ALL TESTS PASSED ==============\n");
-
- test_cleanup();
-
- if (r > 0)
- TEST_FAIL("%d test(s) failed, see previous errors", r);
-
- return r;
-}
-
-
-
-/******************************************************************************
- *
- * Helpers
- *
- ******************************************************************************/
-
-void test_dr_msg_cb(rd_kafka_t *rk,
- const rd_kafka_message_t *rkmessage,
- void *opaque) {
- int *remainsp = rkmessage->_private;
- static const char *status_names[] = {
- [RD_KAFKA_MSG_STATUS_NOT_PERSISTED] = "NotPersisted",
- [RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED] = "PossiblyPersisted",
- [RD_KAFKA_MSG_STATUS_PERSISTED] = "Persisted"};
-
- TEST_SAYL(4,
- "Delivery report: %s (%s) to %s [%" PRId32
- "] "
- "at offset %" PRId64 " latency %.2fms\n",
- rd_kafka_err2str(rkmessage->err),
- status_names[rd_kafka_message_status(rkmessage)],
- rd_kafka_topic_name(rkmessage->rkt), rkmessage->partition,
- rkmessage->offset,
- (float)rd_kafka_message_latency(rkmessage) / 1000.0);
-
- if (!test_curr->produce_sync) {
- if (!test_curr->ignore_dr_err &&
- rkmessage->err != test_curr->exp_dr_err)
- TEST_FAIL("Message delivery (to %s [%" PRId32
- "]) "
- "failed: expected %s, got %s",
- rd_kafka_topic_name(rkmessage->rkt),
- rkmessage->partition,
- rd_kafka_err2str(test_curr->exp_dr_err),
- rd_kafka_err2str(rkmessage->err));
-
- if ((int)test_curr->exp_dr_status != -1) {
- rd_kafka_msg_status_t status =
- rd_kafka_message_status(rkmessage);
-
- TEST_ASSERT(status == test_curr->exp_dr_status,
- "Expected message status %s, not %s",
- status_names[test_curr->exp_dr_status],
- status_names[status]);
- }
-
- /* Add message to msgver */
- if (!rkmessage->err && test_curr->dr_mv)
- test_msgver_add_msg(rk, test_curr->dr_mv, rkmessage);
- }
-
- if (remainsp) {
- TEST_ASSERT(*remainsp > 0,
- "Too many messages delivered (remains %i)",
- *remainsp);
-
- (*remainsp)--;
- }
-
- if (test_curr->produce_sync)
- test_curr->produce_sync_err = rkmessage->err;
-}
-
-
-rd_kafka_t *test_create_handle(int mode, rd_kafka_conf_t *conf) {
- rd_kafka_t *rk;
- char errstr[512];
-
- if (!conf) {
- test_conf_init(&conf, NULL, 0);
-#if WITH_SOCKEM
- if (*test_sockem_conf)
- test_socket_enable(conf);
-#endif
- } else {
- if (!strcmp(test_conf_get(conf, "client.id"), "rdkafka"))
- test_conf_set(conf, "client.id", test_curr->name);
- }
-
-
-
- /* Creat kafka instance */
- rk = rd_kafka_new(mode, conf, errstr, sizeof(errstr));
- if (!rk)
- TEST_FAIL("Failed to create rdkafka instance: %s\n", errstr);
-
- TEST_SAY("Created kafka instance %s\n", rd_kafka_name(rk));
-
- return rk;
-}
-
-
-rd_kafka_t *test_create_producer(void) {
- rd_kafka_conf_t *conf;
-
- test_conf_init(&conf, NULL, 0);
- rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
-
- return test_create_handle(RD_KAFKA_PRODUCER, conf);
-}
-
-
-/**
- * Create topic_t object with va-arg list as key-value config pairs
- * terminated by NULL.
- */
-rd_kafka_topic_t *
-test_create_topic_object(rd_kafka_t *rk, const char *topic, ...) {
- rd_kafka_topic_t *rkt;
- rd_kafka_topic_conf_t *topic_conf;
- va_list ap;
- const char *name, *val;
-
- test_conf_init(NULL, &topic_conf, 0);
-
- va_start(ap, topic);
- while ((name = va_arg(ap, const char *)) &&
- (val = va_arg(ap, const char *))) {
- test_topic_conf_set(topic_conf, name, val);
- }
- va_end(ap);
-
- rkt = rd_kafka_topic_new(rk, topic, topic_conf);
- if (!rkt)
- TEST_FAIL("Failed to create topic: %s\n",
- rd_kafka_err2str(rd_kafka_last_error()));
-
- return rkt;
-}
-
-
-rd_kafka_topic_t *
-test_create_producer_topic(rd_kafka_t *rk, const char *topic, ...) {
- rd_kafka_topic_t *rkt;
- rd_kafka_topic_conf_t *topic_conf;
- char errstr[512];
- va_list ap;
- const char *name, *val;
-
- test_conf_init(NULL, &topic_conf, 0);
-
- va_start(ap, topic);
- while ((name = va_arg(ap, const char *)) &&
- (val = va_arg(ap, const char *))) {
- if (rd_kafka_topic_conf_set(topic_conf, name, val, errstr,
- sizeof(errstr)) != RD_KAFKA_CONF_OK)
- TEST_FAIL("Conf failed: %s\n", errstr);
- }
- va_end(ap);
-
- /* Make sure all replicas are in-sync after producing
- * so that consume test wont fail. */
- rd_kafka_topic_conf_set(topic_conf, "request.required.acks", "-1",
- errstr, sizeof(errstr));
-
-
- rkt = rd_kafka_topic_new(rk, topic, topic_conf);
- if (!rkt)
- TEST_FAIL("Failed to create topic: %s\n",
- rd_kafka_err2str(rd_kafka_last_error()));
-
- return rkt;
-}
-
-
-
-/**
- * Produces \p cnt messages and returns immediately.
- * Does not wait for delivery.
- * \p msgcounterp is incremented for each produced messages and passed
- * as \p msg_opaque which is later used in test_dr_msg_cb to decrement
- * the counter on delivery.
- *
- * If \p payload is NULL the message key and payload will be formatted
- * according to standard test format, otherwise the key will be NULL and
- * payload send as message payload.
- *
- * Default message size is 128 bytes, if \p size is non-zero and \p payload
- * is NULL the message size of \p size will be used.
- */
-void test_produce_msgs_nowait(rd_kafka_t *rk,
- rd_kafka_topic_t *rkt,
- uint64_t testid,
- int32_t partition,
- int msg_base,
- int cnt,
- const char *payload,
- size_t size,
- int msgrate,
- int *msgcounterp) {
- int msg_id;
- test_timing_t t_all, t_poll;
- char key[128];
- void *buf;
- int64_t tot_bytes = 0;
- int64_t tot_time_poll = 0;
- int64_t per_msg_wait = 0;
-
- if (msgrate > 0)
- per_msg_wait = 1000000 / (int64_t)msgrate;
-
-
- if (payload)
- buf = (void *)payload;
- else {
- if (size == 0)
- size = 128;
- buf = calloc(1, size);
- }
-
- TEST_SAY("Produce to %s [%" PRId32 "]: messages #%d..%d\n",
- rd_kafka_topic_name(rkt), partition, msg_base, msg_base + cnt);
-
- TIMING_START(&t_all, "PRODUCE");
- TIMING_START(&t_poll, "SUM(POLL)");
-
- for (msg_id = msg_base; msg_id < msg_base + cnt; msg_id++) {
- int wait_time = 0;
-
- if (!payload)
- test_prepare_msg(testid, partition, msg_id, buf, size,
- key, sizeof(key));
-
-
- if (rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, buf,
- size, !payload ? key : NULL,
- !payload ? strlen(key) : 0,
- msgcounterp) == -1)
- TEST_FAIL(
- "Failed to produce message %i "
- "to partition %i: %s",
- msg_id, (int)partition,
- rd_kafka_err2str(rd_kafka_last_error()));
-
- (*msgcounterp)++;
- tot_bytes += size;
-
- TIMING_RESTART(&t_poll);
- do {
- if (per_msg_wait) {
- wait_time = (int)(per_msg_wait -
- TIMING_DURATION(&t_poll)) /
- 1000;
- if (wait_time < 0)
- wait_time = 0;
- }
- rd_kafka_poll(rk, wait_time);
- } while (wait_time > 0);
-
- tot_time_poll = TIMING_DURATION(&t_poll);
-
- if (TIMING_EVERY(&t_all, 3 * 1000000))
- TEST_SAY(
- "produced %3d%%: %d/%d messages "
- "(%d msgs/s, %d bytes/s)\n",
- ((msg_id - msg_base) * 100) / cnt,
- msg_id - msg_base, cnt,
- (int)((msg_id - msg_base) /
- (TIMING_DURATION(&t_all) / 1000000)),
- (int)((tot_bytes) /
- (TIMING_DURATION(&t_all) / 1000000)));
- }
-
- if (!payload)
- free(buf);
-
- t_poll.duration = tot_time_poll;
- TIMING_STOP(&t_poll);
- TIMING_STOP(&t_all);
-}
-
-/**
- * Waits for the messages tracked by counter \p msgcounterp to be delivered.
- */
-void test_wait_delivery(rd_kafka_t *rk, int *msgcounterp) {
- test_timing_t t_all;
- int start_cnt = *msgcounterp;
-
- TIMING_START(&t_all, "PRODUCE.DELIVERY.WAIT");
-
- /* Wait for messages to be delivered */
- while (*msgcounterp > 0 && rd_kafka_outq_len(rk) > 0) {
- rd_kafka_poll(rk, 10);
- if (TIMING_EVERY(&t_all, 3 * 1000000)) {
- int delivered = start_cnt - *msgcounterp;
- TEST_SAY(
- "wait_delivery: "
- "%d/%d messages delivered: %d msgs/s\n",
- delivered, start_cnt,
- (int)(delivered /
- (TIMING_DURATION(&t_all) / 1000000)));
- }
- }
-
- TIMING_STOP(&t_all);
-
- TEST_ASSERT(*msgcounterp == 0,
- "Not all messages delivered: msgcounter still at %d, "
- "outq_len %d",
- *msgcounterp, rd_kafka_outq_len(rk));
-}
-
-/**
- * Produces \p cnt messages and waits for succesful delivery
- */
-void test_produce_msgs(rd_kafka_t *rk,
- rd_kafka_topic_t *rkt,
- uint64_t testid,
- int32_t partition,
- int msg_base,
- int cnt,
- const char *payload,
- size_t size) {
- int remains = 0;
-
- test_produce_msgs_nowait(rk, rkt, testid, partition, msg_base, cnt,
- payload, size, 0, &remains);
-
- test_wait_delivery(rk, &remains);
-}
-
-
-/**
- * @brief Produces \p cnt messages and waits for succesful delivery
- */
-void test_produce_msgs2(rd_kafka_t *rk,
- const char *topic,
- uint64_t testid,
- int32_t partition,
- int msg_base,
- int cnt,
- const char *payload,
- size_t size) {
- int remains = 0;
- rd_kafka_topic_t *rkt = test_create_topic_object(rk, topic, NULL);
-
- test_produce_msgs_nowait(rk, rkt, testid, partition, msg_base, cnt,
- payload, size, 0, &remains);
-
- test_wait_delivery(rk, &remains);
-
- rd_kafka_topic_destroy(rkt);
-}
-
-/**
- * @brief Produces \p cnt messages without waiting for delivery.
- */
-void test_produce_msgs2_nowait(rd_kafka_t *rk,
- const char *topic,
- uint64_t testid,
- int32_t partition,
- int msg_base,
- int cnt,
- const char *payload,
- size_t size,
- int *remainsp) {
- rd_kafka_topic_t *rkt = test_create_topic_object(rk, topic, NULL);
-
- test_produce_msgs_nowait(rk, rkt, testid, partition, msg_base, cnt,
- payload, size, 0, remainsp);
-
- rd_kafka_topic_destroy(rkt);
-}
-
-
-/**
- * Produces \p cnt messages at \p msgs/s, and waits for succesful delivery
- */
-void test_produce_msgs_rate(rd_kafka_t *rk,
- rd_kafka_topic_t *rkt,
- uint64_t testid,
- int32_t partition,
- int msg_base,
- int cnt,
- const char *payload,
- size_t size,
- int msgrate) {
- int remains = 0;
-
- test_produce_msgs_nowait(rk, rkt, testid, partition, msg_base, cnt,
- payload, size, msgrate, &remains);
-
- test_wait_delivery(rk, &remains);
-}
-
-
-
-/**
- * Create producer, produce \p msgcnt messages to \p topic \p partition,
- * destroy consumer, and returns the used testid.
- */
-uint64_t test_produce_msgs_easy_size(const char *topic,
- uint64_t testid,
- int32_t partition,
- int msgcnt,
- size_t size) {
- rd_kafka_t *rk;
- rd_kafka_topic_t *rkt;
- test_timing_t t_produce;
-
- if (!testid)
- testid = test_id_generate();
- rk = test_create_producer();
- rkt = test_create_producer_topic(rk, topic, NULL);
-
- TIMING_START(&t_produce, "PRODUCE");
- test_produce_msgs(rk, rkt, testid, partition, 0, msgcnt, NULL, size);
- TIMING_STOP(&t_produce);
- rd_kafka_topic_destroy(rkt);
- rd_kafka_destroy(rk);
-
- return testid;
-}
-
-rd_kafka_resp_err_t test_produce_sync(rd_kafka_t *rk,
- rd_kafka_topic_t *rkt,
- uint64_t testid,
- int32_t partition) {
- test_curr->produce_sync = 1;
- test_produce_msgs(rk, rkt, testid, partition, 0, 1, NULL, 0);
- test_curr->produce_sync = 0;
- return test_curr->produce_sync_err;
-}
-
-
-/**
- * @brief Easy produce function.
- *
- * @param ... is a NULL-terminated list of key, value config property pairs.
- */
-void test_produce_msgs_easy_v(const char *topic,
- uint64_t testid,
- int32_t partition,
- int msg_base,
- int cnt,
- size_t size,
- ...) {
- rd_kafka_conf_t *conf;
- rd_kafka_t *p;
- rd_kafka_topic_t *rkt;
- va_list ap;
- const char *key, *val;
-
- test_conf_init(&conf, NULL, 0);
-
- va_start(ap, size);
- while ((key = va_arg(ap, const char *)) &&
- (val = va_arg(ap, const char *)))
- test_conf_set(conf, key, val);
- va_end(ap);
-
- rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
-
- p = test_create_handle(RD_KAFKA_PRODUCER, conf);
-
- rkt = test_create_producer_topic(p, topic, NULL);
-
- test_produce_msgs(p, rkt, testid, partition, msg_base, cnt, NULL, size);
-
- rd_kafka_topic_destroy(rkt);
- rd_kafka_destroy(p);
-}
-
-
-/**
- * @brief Produce messages to multiple topic-partitions.
- *
- * @param ...vararg is a tuple of:
- * const char *topic
- * int32_t partition (or UA)
- * int msg_base
- * int msg_cnt
- *
- * End with a NULL topic
- */
-void test_produce_msgs_easy_multi(uint64_t testid, ...) {
- rd_kafka_conf_t *conf;
- rd_kafka_t *p;
- va_list ap;
- const char *topic;
- int msgcounter = 0;
-
- test_conf_init(&conf, NULL, 0);
-
- rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
-
- p = test_create_handle(RD_KAFKA_PRODUCER, conf);
-
- va_start(ap, testid);
- while ((topic = va_arg(ap, const char *))) {
- int32_t partition = va_arg(ap, int32_t);
- int msg_base = va_arg(ap, int);
- int msg_cnt = va_arg(ap, int);
- rd_kafka_topic_t *rkt;
-
- rkt = test_create_producer_topic(p, topic, NULL);
-
- test_produce_msgs_nowait(p, rkt, testid, partition, msg_base,
- msg_cnt, NULL, 0, 0, &msgcounter);
-
- rd_kafka_topic_destroy(rkt);
- }
- va_end(ap);
-
- test_flush(p, tmout_multip(10 * 1000));
-
- rd_kafka_destroy(p);
-}
-
-
-
-/**
- * @brief A standard incremental rebalance callback.
- */
-void test_incremental_rebalance_cb(rd_kafka_t *rk,
- rd_kafka_resp_err_t err,
- rd_kafka_topic_partition_list_t *parts,
- void *opaque) {
- TEST_SAY("%s: incremental rebalance: %s: %d partition(s)%s\n",
- rd_kafka_name(rk), rd_kafka_err2name(err), parts->cnt,
- rd_kafka_assignment_lost(rk) ? ", assignment lost" : "");
-
- switch (err) {
- case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
- test_consumer_incremental_assign("rebalance_cb", rk, parts);
- break;
- case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
- test_consumer_incremental_unassign("rebalance_cb", rk, parts);
- break;
- default:
- TEST_FAIL("Unknown rebalance event: %s",
- rd_kafka_err2name(err));
- break;
- }
-}
-
-/**
- * @brief A standard rebalance callback.
- */
-void test_rebalance_cb(rd_kafka_t *rk,
- rd_kafka_resp_err_t err,
- rd_kafka_topic_partition_list_t *parts,
- void *opaque) {
-
- if (!strcmp(rd_kafka_rebalance_protocol(rk), "COOPERATIVE")) {
- test_incremental_rebalance_cb(rk, err, parts, opaque);
- return;
- }
-
- TEST_SAY("%s: Rebalance: %s: %d partition(s)\n", rd_kafka_name(rk),
- rd_kafka_err2name(err), parts->cnt);
-
- switch (err) {
- case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
- test_consumer_assign("assign", rk, parts);
- break;
- case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
- test_consumer_unassign("unassign", rk);
- break;
- default:
- TEST_FAIL("Unknown rebalance event: %s",
- rd_kafka_err2name(err));
- break;
- }
-}
-
-
-
-rd_kafka_t *test_create_consumer(
- const char *group_id,
- void (*rebalance_cb)(rd_kafka_t *rk,
- rd_kafka_resp_err_t err,
- rd_kafka_topic_partition_list_t *partitions,
- void *opaque),
- rd_kafka_conf_t *conf,
- rd_kafka_topic_conf_t *default_topic_conf) {
- rd_kafka_t *rk;
- char tmp[64];
-
- if (!conf)
- test_conf_init(&conf, NULL, 0);
-
- if (group_id) {
- test_conf_set(conf, "group.id", group_id);
-
- rd_snprintf(tmp, sizeof(tmp), "%d", test_session_timeout_ms);
- test_conf_set(conf, "session.timeout.ms", tmp);
-
- if (rebalance_cb)
- rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);
- } else {
- TEST_ASSERT(!rebalance_cb);
- }
-
- if (default_topic_conf)
- rd_kafka_conf_set_default_topic_conf(conf, default_topic_conf);
-
- /* Create kafka instance */
- rk = test_create_handle(RD_KAFKA_CONSUMER, conf);
-
- if (group_id)
- rd_kafka_poll_set_consumer(rk);
-
- return rk;
-}
-
-rd_kafka_topic_t *test_create_consumer_topic(rd_kafka_t *rk,
- const char *topic) {
- rd_kafka_topic_t *rkt;
- rd_kafka_topic_conf_t *topic_conf;
-
- test_conf_init(NULL, &topic_conf, 0);
-
- rkt = rd_kafka_topic_new(rk, topic, topic_conf);
- if (!rkt)
- TEST_FAIL("Failed to create topic: %s\n",
- rd_kafka_err2str(rd_kafka_last_error()));
-
- return rkt;
-}
-
-
-void test_consumer_start(const char *what,
- rd_kafka_topic_t *rkt,
- int32_t partition,
- int64_t start_offset) {
-
- TEST_SAY("%s: consumer_start: %s [%" PRId32 "] at offset %" PRId64 "\n",
- what, rd_kafka_topic_name(rkt), partition, start_offset);
-
- if (rd_kafka_consume_start(rkt, partition, start_offset) == -1)
- TEST_FAIL("%s: consume_start failed: %s\n", what,
- rd_kafka_err2str(rd_kafka_last_error()));
-}
-
-void test_consumer_stop(const char *what,
- rd_kafka_topic_t *rkt,
- int32_t partition) {
-
- TEST_SAY("%s: consumer_stop: %s [%" PRId32 "]\n", what,
- rd_kafka_topic_name(rkt), partition);
-
- if (rd_kafka_consume_stop(rkt, partition) == -1)
- TEST_FAIL("%s: consume_stop failed: %s\n", what,
- rd_kafka_err2str(rd_kafka_last_error()));
-}
-
-void test_consumer_seek(const char *what,
- rd_kafka_topic_t *rkt,
- int32_t partition,
- int64_t offset) {
- int err;
-
- TEST_SAY("%s: consumer_seek: %s [%" PRId32 "] to offset %" PRId64 "\n",
- what, rd_kafka_topic_name(rkt), partition, offset);
-
- if ((err = rd_kafka_seek(rkt, partition, offset, 2000)))
- TEST_FAIL("%s: consume_seek(%s, %" PRId32 ", %" PRId64
- ") "
- "failed: %s\n",
- what, rd_kafka_topic_name(rkt), partition, offset,
- rd_kafka_err2str(err));
-}
-
-
-
-/**
- * Returns offset of the last message consumed
- */
-int64_t test_consume_msgs(const char *what,
- rd_kafka_topic_t *rkt,
- uint64_t testid,
- int32_t partition,
- int64_t offset,
- int exp_msg_base,
- int exp_cnt,
- int parse_fmt) {
- int cnt = 0;
- int msg_next = exp_msg_base;
- int fails = 0;
- int64_t offset_last = -1;
- int64_t tot_bytes = 0;
- test_timing_t t_first, t_all;
-
- TEST_SAY("%s: consume_msgs: %s [%" PRId32
- "]: expect msg #%d..%d "
- "at offset %" PRId64 "\n",
- what, rd_kafka_topic_name(rkt), partition, exp_msg_base,
- exp_msg_base + exp_cnt, offset);
-
- if (offset != TEST_NO_SEEK) {
- rd_kafka_resp_err_t err;
- test_timing_t t_seek;
-
- TIMING_START(&t_seek, "SEEK");
- if ((err = rd_kafka_seek(rkt, partition, offset, 5000)))
- TEST_FAIL("%s: consume_msgs: %s [%" PRId32
- "]: "
- "seek to %" PRId64 " failed: %s\n",
- what, rd_kafka_topic_name(rkt), partition,
- offset, rd_kafka_err2str(err));
- TIMING_STOP(&t_seek);
- TEST_SAY("%s: seeked to offset %" PRId64 "\n", what, offset);
- }
-
- TIMING_START(&t_first, "FIRST MSG");
- TIMING_START(&t_all, "ALL MSGS");
-
- while (cnt < exp_cnt) {
- rd_kafka_message_t *rkmessage;
- int msg_id;
-
- rkmessage =
- rd_kafka_consume(rkt, partition, tmout_multip(5000));
-
- if (TIMING_EVERY(&t_all, 3 * 1000000))
- TEST_SAY(
- "%s: "
- "consumed %3d%%: %d/%d messages "
- "(%d msgs/s, %d bytes/s)\n",
- what, cnt * 100 / exp_cnt, cnt, exp_cnt,
- (int)(cnt / (TIMING_DURATION(&t_all) / 1000000)),
- (int)(tot_bytes /
- (TIMING_DURATION(&t_all) / 1000000)));
-
- if (!rkmessage)
- TEST_FAIL("%s: consume_msgs: %s [%" PRId32
- "]: "
- "expected msg #%d (%d/%d): timed out\n",
- what, rd_kafka_topic_name(rkt), partition,
- msg_next, cnt, exp_cnt);
-
- if (rkmessage->err)
- TEST_FAIL("%s: consume_msgs: %s [%" PRId32
- "]: "
- "expected msg #%d (%d/%d): got error: %s\n",
- what, rd_kafka_topic_name(rkt), partition,
- msg_next, cnt, exp_cnt,
- rd_kafka_err2str(rkmessage->err));
-
- if (cnt == 0)
- TIMING_STOP(&t_first);
-
- if (parse_fmt)
- test_msg_parse(testid, rkmessage, partition, &msg_id);
- else
- msg_id = 0;
-
- if (test_level >= 3)
- TEST_SAY("%s: consume_msgs: %s [%" PRId32
- "]: "
- "got msg #%d at offset %" PRId64
- " (expect #%d at offset %" PRId64 ")\n",
- what, rd_kafka_topic_name(rkt), partition,
- msg_id, rkmessage->offset, msg_next,
- offset >= 0 ? offset + cnt : -1);
-
- if (parse_fmt && msg_id != msg_next) {
- TEST_SAY("%s: consume_msgs: %s [%" PRId32
- "]: "
- "expected msg #%d (%d/%d): got msg #%d\n",
- what, rd_kafka_topic_name(rkt), partition,
- msg_next, cnt, exp_cnt, msg_id);
- fails++;
- }
-
- cnt++;
- tot_bytes += rkmessage->len;
- msg_next++;
- offset_last = rkmessage->offset;
-
- rd_kafka_message_destroy(rkmessage);
- }
-
- TIMING_STOP(&t_all);
-
- if (fails)
- TEST_FAIL("%s: consume_msgs: %s [%" PRId32 "]: %d failures\n",
- what, rd_kafka_topic_name(rkt), partition, fails);
-
- TEST_SAY("%s: consume_msgs: %s [%" PRId32
- "]: "
- "%d/%d messages consumed succesfully\n",
- what, rd_kafka_topic_name(rkt), partition, cnt, exp_cnt);
- return offset_last;
-}
-
-
-/**
- * Create high-level consumer subscribing to \p topic from BEGINNING
- * and expects \d exp_msgcnt with matching \p testid
- * Destroys consumer when done.
- *
- * @param txn If true, isolation.level is set to read_committed.
- * @param partition If -1 the topic will be subscribed to, otherwise the
- * single partition will be assigned immediately.
- *
- * If \p group_id is NULL a new unique group is generated
- */
-void test_consume_msgs_easy_mv0(const char *group_id,
- const char *topic,
- rd_bool_t txn,
- int32_t partition,
- uint64_t testid,
- int exp_eofcnt,
- int exp_msgcnt,
- rd_kafka_topic_conf_t *tconf,
- test_msgver_t *mv) {
- rd_kafka_t *rk;
- char grpid0[64];
- rd_kafka_conf_t *conf;
-
- test_conf_init(&conf, tconf ? NULL : &tconf, 0);
-
- if (!group_id)
- group_id = test_str_id_generate(grpid0, sizeof(grpid0));
-
- if (txn)
- test_conf_set(conf, "isolation.level", "read_committed");
-
- test_topic_conf_set(tconf, "auto.offset.reset", "smallest");
- if (exp_eofcnt != -1)
- test_conf_set(conf, "enable.partition.eof", "true");
- rk = test_create_consumer(group_id, NULL, conf, tconf);
-
- rd_kafka_poll_set_consumer(rk);
-
- if (partition == -1) {
- TEST_SAY(
- "Subscribing to topic %s in group %s "
- "(expecting %d msgs with testid %" PRIu64 ")\n",
- topic, group_id, exp_msgcnt, testid);
-
- test_consumer_subscribe(rk, topic);
- } else {
- rd_kafka_topic_partition_list_t *plist;
-
- TEST_SAY("Assign topic %s [%" PRId32
- "] in group %s "
- "(expecting %d msgs with testid %" PRIu64 ")\n",
- topic, partition, group_id, exp_msgcnt, testid);
-
- plist = rd_kafka_topic_partition_list_new(1);
- rd_kafka_topic_partition_list_add(plist, topic, partition);
- test_consumer_assign("consume_easy_mv", rk, plist);
- rd_kafka_topic_partition_list_destroy(plist);
- }
-
- /* Consume messages */
- test_consumer_poll("consume.easy", rk, testid, exp_eofcnt, -1,
- exp_msgcnt, mv);
-
- test_consumer_close(rk);
-
- rd_kafka_destroy(rk);
-}
-
-void test_consume_msgs_easy(const char *group_id,
- const char *topic,
- uint64_t testid,
- int exp_eofcnt,
- int exp_msgcnt,
- rd_kafka_topic_conf_t *tconf) {
- test_msgver_t mv;
-
- test_msgver_init(&mv, testid);
-
- test_consume_msgs_easy_mv(group_id, topic, -1, testid, exp_eofcnt,
- exp_msgcnt, tconf, &mv);
-
- test_msgver_clear(&mv);
-}
-
-
-void test_consume_txn_msgs_easy(const char *group_id,
- const char *topic,
- uint64_t testid,
- int exp_eofcnt,
- int exp_msgcnt,
- rd_kafka_topic_conf_t *tconf) {
- test_msgver_t mv;
-
- test_msgver_init(&mv, testid);
-
- test_consume_msgs_easy_mv0(group_id, topic, rd_true /*txn*/, -1, testid,
- exp_eofcnt, exp_msgcnt, tconf, &mv);
-
- test_msgver_clear(&mv);
-}
-
-
-/**
- * @brief Waits for up to \p timeout_ms for consumer to receive assignment.
- * If no assignment received without the timeout the test fails.
- *
- * @warning This method will poll the consumer and might thus read messages.
- * Set \p do_poll to false to use a sleep rather than poll.
- */
-void test_consumer_wait_assignment(rd_kafka_t *rk, rd_bool_t do_poll) {
- rd_kafka_topic_partition_list_t *assignment = NULL;
- int i;
-
- while (1) {
- rd_kafka_resp_err_t err;
-
- err = rd_kafka_assignment(rk, &assignment);
- TEST_ASSERT(!err, "rd_kafka_assignment() failed: %s",
- rd_kafka_err2str(err));
-
- if (assignment->cnt > 0)
- break;
-
- rd_kafka_topic_partition_list_destroy(assignment);
-
- if (do_poll)
- test_consumer_poll_once(rk, NULL, 1000);
- else
- rd_usleep(1000 * 1000, NULL);
- }
-
- TEST_SAY("%s: Assignment (%d partition(s)): ", rd_kafka_name(rk),
- assignment->cnt);
- for (i = 0; i < assignment->cnt; i++)
- TEST_SAY0("%s%s[%" PRId32 "]", i == 0 ? "" : ", ",
- assignment->elems[i].topic,
- assignment->elems[i].partition);
- TEST_SAY0("\n");
-
- rd_kafka_topic_partition_list_destroy(assignment);
-}
-
-
-/**
- * @brief Verify that the consumer's assignment matches the expected assignment.
- *
- * The va-list is a NULL-terminated list of (const char *topic, int partition)
- * tuples.
- *
- * Fails the test on mismatch, unless \p fail_immediately is false.
- */
-void test_consumer_verify_assignment0(const char *func,
- int line,
- rd_kafka_t *rk,
- int fail_immediately,
- ...) {
- va_list ap;
- int cnt = 0;
- const char *topic;
- rd_kafka_topic_partition_list_t *assignment;
- rd_kafka_resp_err_t err;
- int i;
-
- if ((err = rd_kafka_assignment(rk, &assignment)))
- TEST_FAIL("%s:%d: Failed to get assignment for %s: %s", func,
- line, rd_kafka_name(rk), rd_kafka_err2str(err));
-
- TEST_SAY("%s assignment (%d partition(s)):\n", rd_kafka_name(rk),
- assignment->cnt);
- for (i = 0; i < assignment->cnt; i++)
- TEST_SAY(" %s [%" PRId32 "]\n", assignment->elems[i].topic,
- assignment->elems[i].partition);
-
- va_start(ap, fail_immediately);
- while ((topic = va_arg(ap, const char *))) {
- int partition = va_arg(ap, int);
- cnt++;
-
- if (!rd_kafka_topic_partition_list_find(assignment, topic,
- partition))
- TEST_FAIL_LATER(
- "%s:%d: Expected %s [%d] not found in %s's "
- "assignment (%d partition(s))",
- func, line, topic, partition, rd_kafka_name(rk),
- assignment->cnt);
- }
- va_end(ap);
-
- if (cnt != assignment->cnt)
- TEST_FAIL_LATER(
- "%s:%d: "
- "Expected %d assigned partition(s) for %s, not %d",
- func, line, cnt, rd_kafka_name(rk), assignment->cnt);
-
- if (fail_immediately)
- TEST_LATER_CHECK();
-
- rd_kafka_topic_partition_list_destroy(assignment);
-}
-
-
-
-/**
- * @brief Start subscribing for 'topic'
- */
-void test_consumer_subscribe(rd_kafka_t *rk, const char *topic) {
- rd_kafka_topic_partition_list_t *topics;
- rd_kafka_resp_err_t err;
-
- topics = rd_kafka_topic_partition_list_new(1);
- rd_kafka_topic_partition_list_add(topics, topic, RD_KAFKA_PARTITION_UA);
-
- err = rd_kafka_subscribe(rk, topics);
- if (err)
- TEST_FAIL("%s: Failed to subscribe to %s: %s\n",
- rd_kafka_name(rk), topic, rd_kafka_err2str(err));
-
- rd_kafka_topic_partition_list_destroy(topics);
-}
-
-
-void test_consumer_assign(const char *what,
- rd_kafka_t *rk,
- rd_kafka_topic_partition_list_t *partitions) {
- rd_kafka_resp_err_t err;
- test_timing_t timing;
-
- TIMING_START(&timing, "ASSIGN.PARTITIONS");
- err = rd_kafka_assign(rk, partitions);
- TIMING_STOP(&timing);
- if (err)
- TEST_FAIL("%s: failed to assign %d partition(s): %s\n", what,
- partitions->cnt, rd_kafka_err2str(err));
- else
- TEST_SAY("%s: assigned %d partition(s)\n", what,
- partitions->cnt);
-}
-
-
-void test_consumer_incremental_assign(
- const char *what,
- rd_kafka_t *rk,
- rd_kafka_topic_partition_list_t *partitions) {
- rd_kafka_error_t *error;
- test_timing_t timing;
-
- TIMING_START(&timing, "INCREMENTAL.ASSIGN.PARTITIONS");
- error = rd_kafka_incremental_assign(rk, partitions);
- TIMING_STOP(&timing);
- if (error) {
- TEST_FAIL(
- "%s: incremental assign of %d partition(s) failed: "
- "%s",
- what, partitions->cnt, rd_kafka_error_string(error));
- rd_kafka_error_destroy(error);
- } else
- TEST_SAY("%s: incremental assign of %d partition(s) done\n",
- what, partitions->cnt);
-}
-
-
-void test_consumer_unassign(const char *what, rd_kafka_t *rk) {
- rd_kafka_resp_err_t err;
- test_timing_t timing;
-
- TIMING_START(&timing, "UNASSIGN.PARTITIONS");
- err = rd_kafka_assign(rk, NULL);
- TIMING_STOP(&timing);
- if (err)
- TEST_FAIL("%s: failed to unassign current partitions: %s\n",
- what, rd_kafka_err2str(err));
- else
- TEST_SAY("%s: unassigned current partitions\n", what);
-}
-
-
-void test_consumer_incremental_unassign(
- const char *what,
- rd_kafka_t *rk,
- rd_kafka_topic_partition_list_t *partitions) {
- rd_kafka_error_t *error;
- test_timing_t timing;
-
- TIMING_START(&timing, "INCREMENTAL.UNASSIGN.PARTITIONS");
- error = rd_kafka_incremental_unassign(rk, partitions);
- TIMING_STOP(&timing);
- if (error) {
- TEST_FAIL(
- "%s: incremental unassign of %d partition(s) "
- "failed: %s",
- what, partitions->cnt, rd_kafka_error_string(error));
- rd_kafka_error_destroy(error);
- } else
- TEST_SAY("%s: incremental unassign of %d partition(s) done\n",
- what, partitions->cnt);
-}
-
-
-/**
- * @brief Assign a single partition with an optional starting offset
- */
-void test_consumer_assign_partition(const char *what,
- rd_kafka_t *rk,
- const char *topic,
- int32_t partition,
- int64_t offset) {
- rd_kafka_topic_partition_list_t *part;
-
- part = rd_kafka_topic_partition_list_new(1);
- rd_kafka_topic_partition_list_add(part, topic, partition)->offset =
- offset;
-
- test_consumer_assign(what, rk, part);
-
- rd_kafka_topic_partition_list_destroy(part);
-}
-
-
-void test_consumer_pause_resume_partition(rd_kafka_t *rk,
- const char *topic,
- int32_t partition,
- rd_bool_t pause) {
- rd_kafka_topic_partition_list_t *part;
- rd_kafka_resp_err_t err;
-
- part = rd_kafka_topic_partition_list_new(1);
- rd_kafka_topic_partition_list_add(part, topic, partition);
-
- if (pause)
- err = rd_kafka_pause_partitions(rk, part);
- else
- err = rd_kafka_resume_partitions(rk, part);
-
- TEST_ASSERT(!err, "Failed to %s %s [%" PRId32 "]: %s",
- pause ? "pause" : "resume", topic, partition,
- rd_kafka_err2str(err));
-
- rd_kafka_topic_partition_list_destroy(part);
-}
-
-
-/**
- * Message verification services
- *
- */
-
-void test_msgver_init(test_msgver_t *mv, uint64_t testid) {
- memset(mv, 0, sizeof(*mv));
- mv->testid = testid;
- /* Max warning logs before suppressing. */
- mv->log_max = (test_level + 1) * 100;
-}
-
-void test_msgver_ignore_eof(test_msgver_t *mv) {
- mv->ignore_eof = rd_true;
-}
-
-#define TEST_MV_WARN(mv, ...) \
- do { \
- if ((mv)->log_cnt++ > (mv)->log_max) \
- (mv)->log_suppr_cnt++; \
- else \
- TEST_WARN(__VA_ARGS__); \
- } while (0)
-
-
-
-static void test_mv_mvec_grow(struct test_mv_mvec *mvec, int tot_size) {
- if (tot_size <= mvec->size)
- return;
- mvec->size = tot_size;
- mvec->m = realloc(mvec->m, sizeof(*mvec->m) * mvec->size);
-}
-
-/**
- * Make sure there is room for at least \p cnt messages, else grow mvec.
- */
-static void test_mv_mvec_reserve(struct test_mv_mvec *mvec, int cnt) {
- test_mv_mvec_grow(mvec, mvec->cnt + cnt);
-}
-
-void test_mv_mvec_init(struct test_mv_mvec *mvec, int exp_cnt) {
- TEST_ASSERT(mvec->m == NULL, "mvec not cleared");
-
- if (!exp_cnt)
- return;
-
- test_mv_mvec_grow(mvec, exp_cnt);
-}
-
-
-void test_mv_mvec_clear(struct test_mv_mvec *mvec) {
- if (mvec->m)
- free(mvec->m);
-}
-
-void test_msgver_clear(test_msgver_t *mv) {
- int i;
- for (i = 0; i < mv->p_cnt; i++) {
- struct test_mv_p *p = mv->p[i];
- free(p->topic);
- test_mv_mvec_clear(&p->mvec);
- free(p);
- }
-
- free(mv->p);
-
- test_msgver_init(mv, mv->testid);
-}
-
-struct test_mv_p *test_msgver_p_get(test_msgver_t *mv,
- const char *topic,
- int32_t partition,
- int do_create) {
- int i;
- struct test_mv_p *p;
-
- for (i = 0; i < mv->p_cnt; i++) {
- p = mv->p[i];
- if (p->partition == partition && !strcmp(p->topic, topic))
- return p;
- }
-
- if (!do_create)
- TEST_FAIL("Topic %s [%d] not found in msgver", topic,
- partition);
-
- if (mv->p_cnt == mv->p_size) {
- mv->p_size = (mv->p_size + 4) * 2;
- mv->p = realloc(mv->p, sizeof(*mv->p) * mv->p_size);
- }
-
- mv->p[mv->p_cnt++] = p = calloc(1, sizeof(*p));
-
- p->topic = rd_strdup(topic);
- p->partition = partition;
- p->eof_offset = RD_KAFKA_OFFSET_INVALID;
-
- return p;
-}
-
-
-/**
- * Add (room for) message to message vector.
- * Resizes the vector as needed.
- */
-static struct test_mv_m *test_mv_mvec_add(struct test_mv_mvec *mvec) {
- if (mvec->cnt == mvec->size) {
- test_mv_mvec_grow(mvec, (mvec->size ? mvec->size * 2 : 10000));
- }
-
- mvec->cnt++;
-
- return &mvec->m[mvec->cnt - 1];
-}
-
-/**
- * Returns message at index \p mi
- */
-static RD_INLINE struct test_mv_m *test_mv_mvec_get(struct test_mv_mvec *mvec,
- int mi) {
- if (mi >= mvec->cnt)
- return NULL;
- return &mvec->m[mi];
-}
-
-/**
- * @returns the message with msgid \p msgid, or NULL.
- */
-static struct test_mv_m *test_mv_mvec_find_by_msgid(struct test_mv_mvec *mvec,
- int msgid) {
- int mi;
-
- for (mi = 0; mi < mvec->cnt; mi++)
- if (mvec->m[mi].msgid == msgid)
- return &mvec->m[mi];
-
- return NULL;
-}
-
-
-/**
- * Print message list to \p fp
- */
-static RD_UNUSED void test_mv_mvec_dump(FILE *fp,
- const struct test_mv_mvec *mvec) {
- int mi;
-
- fprintf(fp, "*** Dump mvec with %d messages (capacity %d): ***\n",
- mvec->cnt, mvec->size);
- for (mi = 0; mi < mvec->cnt; mi++)
- fprintf(fp, " msgid %d, offset %" PRId64 "\n",
- mvec->m[mi].msgid, mvec->m[mi].offset);
- fprintf(fp, "*** Done ***\n");
-}
-
-static void test_mv_mvec_sort(struct test_mv_mvec *mvec,
- int (*cmp)(const void *, const void *)) {
- qsort(mvec->m, mvec->cnt, sizeof(*mvec->m), cmp);
-}
-
-
-/**
- * @brief Adds a message to the msgver service.
- *
- * @returns 1 if message is from the expected testid, else 0 (not added)
- */
-int test_msgver_add_msg00(const char *func,
- int line,
- const char *clientname,
- test_msgver_t *mv,
- uint64_t testid,
- const char *topic,
- int32_t partition,
- int64_t offset,
- int64_t timestamp,
- int32_t broker_id,
- rd_kafka_resp_err_t err,
- int msgnum) {
- struct test_mv_p *p;
- struct test_mv_m *m;
-
- if (testid != mv->testid) {
- TEST_SAYL(3,
- "%s:%d: %s: mismatching testid %" PRIu64
- " != %" PRIu64 "\n",
- func, line, clientname, testid, mv->testid);
- return 0; /* Ignore message */
- }
-
- if (err == RD_KAFKA_RESP_ERR__PARTITION_EOF && mv->ignore_eof) {
- TEST_SAYL(3, "%s:%d: %s: ignoring EOF for %s [%" PRId32 "]\n",
- func, line, clientname, topic, partition);
- return 0; /* Ignore message */
- }
-
- p = test_msgver_p_get(mv, topic, partition, 1);
-
- if (err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
- p->eof_offset = offset;
- return 1;
- }
-
- m = test_mv_mvec_add(&p->mvec);
-
- m->offset = offset;
- m->msgid = msgnum;
- m->timestamp = timestamp;
- m->broker_id = broker_id;
-
- if (test_level > 2) {
- TEST_SAY(
- "%s:%d: %s: "
- "Recv msg %s [%" PRId32 "] offset %" PRId64
- " msgid %d "
- "timestamp %" PRId64 " broker %" PRId32 "\n",
- func, line, clientname, p->topic, p->partition, m->offset,
- m->msgid, m->timestamp, m->broker_id);
- }
-
- mv->msgcnt++;
-
- return 1;
-}
-
-/**
- * Adds a message to the msgver service.
- *
- * Message must be a proper message or PARTITION_EOF.
- *
- * @param override_topic if non-NULL, overrides the rkmessage's topic
- * with this one.
- *
- * @returns 1 if message is from the expected testid, else 0 (not added).
- */
-int test_msgver_add_msg0(const char *func,
- int line,
- const char *clientname,
- test_msgver_t *mv,
- const rd_kafka_message_t *rkmessage,
- const char *override_topic) {
- uint64_t in_testid;
- int in_part;
- int in_msgnum = -1;
- char buf[128];
- const void *val;
- size_t valsize;
-
- if (mv->fwd)
- test_msgver_add_msg0(func, line, clientname, mv->fwd, rkmessage,
- override_topic);
-
- if (rd_kafka_message_status(rkmessage) ==
- RD_KAFKA_MSG_STATUS_NOT_PERSISTED &&
- rkmessage->err) {
- if (rkmessage->err != RD_KAFKA_RESP_ERR__PARTITION_EOF)
- return 0; /* Ignore error */
-
- in_testid = mv->testid;
-
- } else {
-
- if (!mv->msgid_hdr) {
- rd_snprintf(buf, sizeof(buf), "%.*s",
- (int)rkmessage->len,
- (char *)rkmessage->payload);
- val = buf;
- } else {
- /* msgid is in message header */
- rd_kafka_headers_t *hdrs;
-
- if (rd_kafka_message_headers(rkmessage, &hdrs) ||
- rd_kafka_header_get_last(hdrs, mv->msgid_hdr, &val,
- &valsize)) {
- TEST_SAYL(3,
- "%s:%d: msgid expected in header %s "
- "but %s exists for "
- "message at offset %" PRId64
- " has no headers\n",
- func, line, mv->msgid_hdr,
- hdrs ? "no such header"
- : "no headers",
- rkmessage->offset);
-
- return 0;
- }
- }
-
- if (sscanf(val, "testid=%" SCNu64 ", partition=%i, msg=%i\n",
- &in_testid, &in_part, &in_msgnum) != 3)
- TEST_FAIL(
- "%s:%d: Incorrect format at offset %" PRId64 ": %s",
- func, line, rkmessage->offset, (const char *)val);
- }
-
- return test_msgver_add_msg00(
- func, line, clientname, mv, in_testid,
- override_topic ? override_topic
- : rd_kafka_topic_name(rkmessage->rkt),
- rkmessage->partition, rkmessage->offset,
- rd_kafka_message_timestamp(rkmessage, NULL),
- rd_kafka_message_broker_id(rkmessage), rkmessage->err, in_msgnum);
- return 1;
-}
-
-
-
-/**
- * Verify that all messages were received in order.
- *
- * - Offsets need to occur without gaps
- * - msgids need to be increasing: but may have gaps, e.g., using partitioner)
- */
-static int test_mv_mvec_verify_order(test_msgver_t *mv,
- int flags,
- struct test_mv_p *p,
- struct test_mv_mvec *mvec,
- struct test_mv_vs *vs) {
- int mi;
- int fails = 0;
-
- for (mi = 1 /*skip first*/; mi < mvec->cnt; mi++) {
- struct test_mv_m *prev = test_mv_mvec_get(mvec, mi - 1);
- struct test_mv_m *this = test_mv_mvec_get(mvec, mi);
-
- if (((flags & TEST_MSGVER_BY_OFFSET) &&
- prev->offset + 1 != this->offset) ||
- ((flags & TEST_MSGVER_BY_MSGID) &&
- prev->msgid > this->msgid)) {
- TEST_MV_WARN(mv,
- " %s [%" PRId32
- "] msg rcvidx #%d/%d: "
- "out of order (prev vs this): "
- "offset %" PRId64 " vs %" PRId64
- ", "
- "msgid %d vs %d\n",
- p ? p->topic : "*", p ? p->partition : -1,
- mi, mvec->cnt, prev->offset, this->offset,
- prev->msgid, this->msgid);
- fails++;
- } else if ((flags & TEST_MSGVER_BY_BROKER_ID) &&
- this->broker_id != vs->broker_id) {
- TEST_MV_WARN(mv,
- " %s [%" PRId32
- "] msg rcvidx #%d/%d: "
- "broker id mismatch: expected %" PRId32
- ", not %" PRId32 "\n",
- p ? p->topic : "*", p ? p->partition : -1,
- mi, mvec->cnt, vs->broker_id,
- this->broker_id);
- fails++;
- }
- }
-
- return fails;
-}
-
-
-/**
- * @brief Verify that messages correspond to 'correct' msgver.
- */
-static int test_mv_mvec_verify_corr(test_msgver_t *mv,
- int flags,
- struct test_mv_p *p,
- struct test_mv_mvec *mvec,
- struct test_mv_vs *vs) {
- int mi;
- int fails = 0;
- struct test_mv_p *corr_p = NULL;
- struct test_mv_mvec *corr_mvec;
- int verifycnt = 0;
-
- TEST_ASSERT(vs->corr);
-
- /* Get correct mvec for comparison. */
- if (p)
- corr_p = test_msgver_p_get(vs->corr, p->topic, p->partition, 0);
- if (!corr_p) {
- TEST_MV_WARN(mv,
- " %s [%" PRId32
- "]: "
- "no corresponding correct partition found\n",
- p ? p->topic : "*", p ? p->partition : -1);
- return 1;
- }
-
- corr_mvec = &corr_p->mvec;
-
- for (mi = 0; mi < mvec->cnt; mi++) {
- struct test_mv_m *this = test_mv_mvec_get(mvec, mi);
- const struct test_mv_m *corr;
-
-
- if (flags & TEST_MSGVER_SUBSET)
- corr =
- test_mv_mvec_find_by_msgid(corr_mvec, this->msgid);
- else
- corr = test_mv_mvec_get(corr_mvec, mi);
-
- if (0)
- TEST_MV_WARN(mv,
- "msg #%d: msgid %d, offset %" PRId64 "\n",
- mi, this->msgid, this->offset);
- if (!corr) {
- if (!(flags & TEST_MSGVER_SUBSET)) {
- TEST_MV_WARN(
- mv,
- " %s [%" PRId32
- "] msg rcvidx #%d/%d: "
- "out of range: correct mvec has "
- "%d messages: "
- "message offset %" PRId64 ", msgid %d\n",
- p ? p->topic : "*", p ? p->partition : -1,
- mi, mvec->cnt, corr_mvec->cnt, this->offset,
- this->msgid);
- fails++;
- }
- continue;
- }
-
- if (((flags & TEST_MSGVER_BY_OFFSET) &&
- this->offset != corr->offset) ||
- ((flags & TEST_MSGVER_BY_MSGID) &&
- this->msgid != corr->msgid) ||
- ((flags & TEST_MSGVER_BY_TIMESTAMP) &&
- this->timestamp != corr->timestamp) ||
- ((flags & TEST_MSGVER_BY_BROKER_ID) &&
- this->broker_id != corr->broker_id)) {
- TEST_MV_WARN(
- mv,
- " %s [%" PRId32
- "] msg rcvidx #%d/%d: "
- "did not match correct msg: "
- "offset %" PRId64 " vs %" PRId64
- ", "
- "msgid %d vs %d, "
- "timestamp %" PRId64 " vs %" PRId64
- ", "
- "broker %" PRId32 " vs %" PRId32 " (fl 0x%x)\n",
- p ? p->topic : "*", p ? p->partition : -1, mi,
- mvec->cnt, this->offset, corr->offset, this->msgid,
- corr->msgid, this->timestamp, corr->timestamp,
- this->broker_id, corr->broker_id, flags);
- fails++;
- } else {
- verifycnt++;
- }
- }
-
- if (verifycnt != corr_mvec->cnt && !(flags & TEST_MSGVER_SUBSET)) {
- TEST_MV_WARN(mv,
- " %s [%" PRId32
- "]: of %d input messages, "
- "only %d/%d matched correct messages\n",
- p ? p->topic : "*", p ? p->partition : -1,
- mvec->cnt, verifycnt, corr_mvec->cnt);
- fails++;
- }
-
- return fails;
-}
-
-
-
-static int test_mv_m_cmp_offset(const void *_a, const void *_b) {
- const struct test_mv_m *a = _a, *b = _b;
-
- return RD_CMP(a->offset, b->offset);
-}
-
-static int test_mv_m_cmp_msgid(const void *_a, const void *_b) {
- const struct test_mv_m *a = _a, *b = _b;
-
- return RD_CMP(a->msgid, b->msgid);
-}
-
-
-/**
- * Verify that there are no duplicate message.
- *
- * - Offsets are checked
- * - msgids are checked
- *
- * * NOTE: This sorts the message (.m) array, first by offset, then by msgid
- * and leaves the message array sorted (by msgid)
- */
-static int test_mv_mvec_verify_dup(test_msgver_t *mv,
- int flags,
- struct test_mv_p *p,
- struct test_mv_mvec *mvec,
- struct test_mv_vs *vs) {
- int mi;
- int fails = 0;
- enum { _P_OFFSET, _P_MSGID } pass;
-
- for (pass = _P_OFFSET; pass <= _P_MSGID; pass++) {
-
- if (pass == _P_OFFSET) {
- if (!(flags & TEST_MSGVER_BY_OFFSET))
- continue;
- test_mv_mvec_sort(mvec, test_mv_m_cmp_offset);
- } else if (pass == _P_MSGID) {
- if (!(flags & TEST_MSGVER_BY_MSGID))
- continue;
- test_mv_mvec_sort(mvec, test_mv_m_cmp_msgid);
- }
-
- for (mi = 1 /*skip first*/; mi < mvec->cnt; mi++) {
- struct test_mv_m *prev = test_mv_mvec_get(mvec, mi - 1);
- struct test_mv_m *this = test_mv_mvec_get(mvec, mi);
- int is_dup = 0;
-
- if (pass == _P_OFFSET)
- is_dup = prev->offset == this->offset;
- else if (pass == _P_MSGID)
- is_dup = prev->msgid == this->msgid;
-
- if (!is_dup)
- continue;
-
- TEST_MV_WARN(mv,
- " %s [%" PRId32
- "] "
- "duplicate msg (prev vs this): "
- "offset %" PRId64 " vs %" PRId64
- ", "
- "msgid %d vs %d\n",
- p ? p->topic : "*", p ? p->partition : -1,
- prev->offset, this->offset, prev->msgid,
- this->msgid);
- fails++;
- }
- }
-
- return fails;
-}
-
-/**
- * @brief Verify that all messages are from the correct broker.
- */
-static int test_mv_mvec_verify_broker(test_msgver_t *mv,
- int flags,
- struct test_mv_p *p,
- struct test_mv_mvec *mvec,
- struct test_mv_vs *vs) {
- int mi;
- int fails = 0;
-
- /* Assume that the correct flag has been checked already. */
-
-
- rd_assert(flags & TEST_MSGVER_BY_BROKER_ID);
- for (mi = 0; mi < mvec->cnt; mi++) {
- struct test_mv_m *this = test_mv_mvec_get(mvec, mi);
- if (this->broker_id != vs->broker_id) {
- TEST_MV_WARN(
- mv,
- " %s [%" PRId32
- "] broker_id check: "
- "msgid #%d (at mi %d): "
- "broker_id %" PRId32
- " is not the expected broker_id %" PRId32 "\n",
- p ? p->topic : "*", p ? p->partition : -1,
- this->msgid, mi, this->broker_id, vs->broker_id);
- fails++;
- }
- }
- return fails;
-}
-
-
-/**
- * Verify that \p mvec contains the expected range:
- * - TEST_MSGVER_BY_MSGID: msgid within \p vs->msgid_min .. \p vs->msgid_max
- * - TEST_MSGVER_BY_TIMESTAMP: timestamp with \p vs->timestamp_min .. _max
- *
- * * NOTE: TEST_MSGVER_BY_MSGID is required
- *
- * * NOTE: This sorts the message (.m) array by msgid
- * and leaves the message array sorted (by msgid)
- */
-static int test_mv_mvec_verify_range(test_msgver_t *mv,
- int flags,
- struct test_mv_p *p,
- struct test_mv_mvec *mvec,
- struct test_mv_vs *vs) {
- int mi;
- int fails = 0;
- int cnt = 0;
- int exp_cnt = vs->msgid_max - vs->msgid_min + 1;
- int skip_cnt = 0;
-
- if (!(flags & TEST_MSGVER_BY_MSGID))
- return 0;
-
- test_mv_mvec_sort(mvec, test_mv_m_cmp_msgid);
-
- // test_mv_mvec_dump(stdout, mvec);
-
- for (mi = 0; mi < mvec->cnt; mi++) {
- struct test_mv_m *prev =
- mi ? test_mv_mvec_get(mvec, mi - 1) : NULL;
- struct test_mv_m *this = test_mv_mvec_get(mvec, mi);
-
- if (this->msgid < vs->msgid_min) {
- skip_cnt++;
- continue;
- } else if (this->msgid > vs->msgid_max)
- break;
-
- if (flags & TEST_MSGVER_BY_TIMESTAMP) {
- if (this->timestamp < vs->timestamp_min ||
- this->timestamp > vs->timestamp_max) {
- TEST_MV_WARN(
- mv,
- " %s [%" PRId32
- "] range check: "
- "msgid #%d (at mi %d): "
- "timestamp %" PRId64
- " outside "
- "expected range %" PRId64 "..%" PRId64 "\n",
- p ? p->topic : "*", p ? p->partition : -1,
- this->msgid, mi, this->timestamp,
- vs->timestamp_min, vs->timestamp_max);
- fails++;
- }
- }
-
- if ((flags & TEST_MSGVER_BY_BROKER_ID) &&
- this->broker_id != vs->broker_id) {
- TEST_MV_WARN(
- mv,
- " %s [%" PRId32
- "] range check: "
- "msgid #%d (at mi %d): "
- "expected broker id %" PRId32 ", not %" PRId32 "\n",
- p ? p->topic : "*", p ? p->partition : -1,
- this->msgid, mi, vs->broker_id, this->broker_id);
- fails++;
- }
-
- if (cnt++ == 0) {
- if (this->msgid != vs->msgid_min) {
- TEST_MV_WARN(mv,
- " %s [%" PRId32
- "] range check: "
- "first message #%d (at mi %d) "
- "is not first in "
- "expected range %d..%d\n",
- p ? p->topic : "*",
- p ? p->partition : -1, this->msgid,
- mi, vs->msgid_min, vs->msgid_max);
- fails++;
- }
- } else if (cnt > exp_cnt) {
- TEST_MV_WARN(mv,
- " %s [%" PRId32
- "] range check: "
- "too many messages received (%d/%d) at "
- "msgid %d for expected range %d..%d\n",
- p ? p->topic : "*", p ? p->partition : -1,
- cnt, exp_cnt, this->msgid, vs->msgid_min,
- vs->msgid_max);
- fails++;
- }
-
- if (!prev) {
- skip_cnt++;
- continue;
- }
-
- if (prev->msgid + 1 != this->msgid) {
- TEST_MV_WARN(mv,
- " %s [%" PRId32
- "] range check: "
- " %d message(s) missing between "
- "msgid %d..%d in expected range %d..%d\n",
- p ? p->topic : "*", p ? p->partition : -1,
- this->msgid - prev->msgid - 1,
- prev->msgid + 1, this->msgid - 1,
- vs->msgid_min, vs->msgid_max);
- fails++;
- }
- }
-
- if (cnt != exp_cnt) {
- TEST_MV_WARN(mv,
- " %s [%" PRId32
- "] range check: "
- " wrong number of messages seen, wanted %d got %d "
- "in expected range %d..%d (%d messages skipped)\n",
- p ? p->topic : "*", p ? p->partition : -1, exp_cnt,
- cnt, vs->msgid_min, vs->msgid_max, skip_cnt);
- fails++;
- }
-
- return fails;
-}
-
-
-
-/**
- * Run verifier \p f for all partitions.
- */
-#define test_mv_p_verify_f(mv, flags, f, vs) \
- test_mv_p_verify_f0(mv, flags, f, #f, vs)
-static int test_mv_p_verify_f0(test_msgver_t *mv,
- int flags,
- int (*f)(test_msgver_t *mv,
- int flags,
- struct test_mv_p *p,
- struct test_mv_mvec *mvec,
- struct test_mv_vs *vs),
- const char *f_name,
- struct test_mv_vs *vs) {
- int i;
- int fails = 0;
-
- for (i = 0; i < mv->p_cnt; i++) {
- TEST_SAY("Verifying %s [%" PRId32 "] %d msgs with %s\n",
- mv->p[i]->topic, mv->p[i]->partition,
- mv->p[i]->mvec.cnt, f_name);
- fails += f(mv, flags, mv->p[i], &mv->p[i]->mvec, vs);
- }
-
- return fails;
-}
-
-
-/**
- * Collect all messages from all topics and partitions into vs->mvec
- */
-static void test_mv_collect_all_msgs(test_msgver_t *mv, struct test_mv_vs *vs) {
- int i;
-
- for (i = 0; i < mv->p_cnt; i++) {
- struct test_mv_p *p = mv->p[i];
- int mi;
-
- test_mv_mvec_reserve(&vs->mvec, p->mvec.cnt);
- for (mi = 0; mi < p->mvec.cnt; mi++) {
- struct test_mv_m *m = test_mv_mvec_get(&p->mvec, mi);
- struct test_mv_m *m_new = test_mv_mvec_add(&vs->mvec);
- *m_new = *m;
- }
- }
-}
-
-
-/**
- * Verify that all messages (by msgid) in range msg_base+exp_cnt were received
- * and received only once.
- * This works across all partitions.
- */
-static int
-test_msgver_verify_range(test_msgver_t *mv, int flags, struct test_mv_vs *vs) {
- int fails = 0;
-
- /**
- * Create temporary array to hold expected message set,
- * then traverse all topics and partitions and move matching messages
- * to that set. Then verify the message set.
- */
-
- test_mv_mvec_init(&vs->mvec, vs->exp_cnt);
-
- /* Collect all msgs into vs mvec */
- test_mv_collect_all_msgs(mv, vs);
-
- fails += test_mv_mvec_verify_range(mv, TEST_MSGVER_BY_MSGID | flags,
- NULL, &vs->mvec, vs);
- fails += test_mv_mvec_verify_dup(mv, TEST_MSGVER_BY_MSGID | flags, NULL,
- &vs->mvec, vs);
-
- test_mv_mvec_clear(&vs->mvec);
-
- return fails;
-}
-
-
-/**
- * Verify that \p exp_cnt messages were received for \p topic and \p partition
- * starting at msgid base \p msg_base.
- */
-int test_msgver_verify_part0(const char *func,
- int line,
- const char *what,
- test_msgver_t *mv,
- int flags,
- const char *topic,
- int partition,
- int msg_base,
- int exp_cnt) {
- int fails = 0;
- struct test_mv_vs vs = {.msg_base = msg_base, .exp_cnt = exp_cnt};
- struct test_mv_p *p;
-
- TEST_SAY(
- "%s:%d: %s: Verifying %d received messages (flags 0x%x) "
- "in %s [%d]: expecting msgids %d..%d (%d)\n",
- func, line, what, mv->msgcnt, flags, topic, partition, msg_base,
- msg_base + exp_cnt, exp_cnt);
-
- p = test_msgver_p_get(mv, topic, partition, 0);
-
- /* Per-partition checks */
- if (flags & TEST_MSGVER_ORDER)
- fails += test_mv_mvec_verify_order(mv, flags, p, &p->mvec, &vs);
- if (flags & TEST_MSGVER_DUP)
- fails += test_mv_mvec_verify_dup(mv, flags, p, &p->mvec, &vs);
-
- if (mv->msgcnt < vs.exp_cnt) {
- TEST_MV_WARN(mv,
- "%s:%d: "
- "%s [%" PRId32
- "] expected %d messages but only "
- "%d received\n",
- func, line, p ? p->topic : "*",
- p ? p->partition : -1, vs.exp_cnt, mv->msgcnt);
- fails++;
- }
-
-
- if (mv->log_suppr_cnt > 0)
- TEST_WARN("%s:%d: %s: %d message warning logs suppressed\n",
- func, line, what, mv->log_suppr_cnt);
-
- if (fails)
- TEST_FAIL(
- "%s:%d: %s: Verification of %d received messages "
- "failed: "
- "expected msgids %d..%d (%d): see previous errors\n",
- func, line, what, mv->msgcnt, msg_base, msg_base + exp_cnt,
- exp_cnt);
- else
- TEST_SAY(
- "%s:%d: %s: Verification of %d received messages "
- "succeeded: "
- "expected msgids %d..%d (%d)\n",
- func, line, what, mv->msgcnt, msg_base, msg_base + exp_cnt,
- exp_cnt);
-
- return fails;
-}
-
-/**
- * Verify that \p exp_cnt messages were received starting at
- * msgid base \p msg_base.
- */
-int test_msgver_verify0(const char *func,
- int line,
- const char *what,
- test_msgver_t *mv,
- int flags,
- struct test_mv_vs vs) {
- int fails = 0;
-
- TEST_SAY(
- "%s:%d: %s: Verifying %d received messages (flags 0x%x): "
- "expecting msgids %d..%d (%d)\n",
- func, line, what, mv->msgcnt, flags, vs.msg_base,
- vs.msg_base + vs.exp_cnt, vs.exp_cnt);
- if (flags & TEST_MSGVER_BY_TIMESTAMP) {
- assert((flags & TEST_MSGVER_BY_MSGID)); /* Required */
- TEST_SAY(
- "%s:%d: %s: "
- " and expecting timestamps %" PRId64 "..%" PRId64 "\n",
- func, line, what, vs.timestamp_min, vs.timestamp_max);
- }
-
- /* Per-partition checks */
- if (flags & TEST_MSGVER_ORDER)
- fails += test_mv_p_verify_f(mv, flags,
- test_mv_mvec_verify_order, &vs);
- if (flags & TEST_MSGVER_DUP)
- fails +=
- test_mv_p_verify_f(mv, flags, test_mv_mvec_verify_dup, &vs);
-
- if (flags & TEST_MSGVER_BY_BROKER_ID)
- fails += test_mv_p_verify_f(mv, flags,
- test_mv_mvec_verify_broker, &vs);
-
- /* Checks across all partitions */
- if ((flags & TEST_MSGVER_RANGE) && vs.exp_cnt > 0) {
- vs.msgid_min = vs.msg_base;
- vs.msgid_max = vs.msgid_min + vs.exp_cnt - 1;
- fails += test_msgver_verify_range(mv, flags, &vs);
- }
-
- if (mv->log_suppr_cnt > 0)
- TEST_WARN("%s:%d: %s: %d message warning logs suppressed\n",
- func, line, what, mv->log_suppr_cnt);
-
- if (vs.exp_cnt != mv->msgcnt) {
- if (!(flags & TEST_MSGVER_SUBSET)) {
- TEST_WARN("%s:%d: %s: expected %d messages, got %d\n",
- func, line, what, vs.exp_cnt, mv->msgcnt);
- fails++;
- }
- }
-
- if (fails)
- TEST_FAIL(
- "%s:%d: %s: Verification of %d received messages "
- "failed: "
- "expected msgids %d..%d (%d): see previous errors\n",
- func, line, what, mv->msgcnt, vs.msg_base,
- vs.msg_base + vs.exp_cnt, vs.exp_cnt);
- else
- TEST_SAY(
- "%s:%d: %s: Verification of %d received messages "
- "succeeded: "
- "expected msgids %d..%d (%d)\n",
- func, line, what, mv->msgcnt, vs.msg_base,
- vs.msg_base + vs.exp_cnt, vs.exp_cnt);
-
- return fails;
-}
-
-
-
-void test_verify_rkmessage0(const char *func,
- int line,
- rd_kafka_message_t *rkmessage,
- uint64_t testid,
- int32_t partition,
- int msgnum) {
- uint64_t in_testid;
- int in_part;
- int in_msgnum;
- char buf[128];
-
- rd_snprintf(buf, sizeof(buf), "%.*s", (int)rkmessage->len,
- (char *)rkmessage->payload);
-
- if (sscanf(buf, "testid=%" SCNu64 ", partition=%i, msg=%i\n",
- &in_testid, &in_part, &in_msgnum) != 3)
- TEST_FAIL("Incorrect format: %s", buf);
-
- if (testid != in_testid || (partition != -1 && partition != in_part) ||
- (msgnum != -1 && msgnum != in_msgnum) || in_msgnum < 0)
- goto fail_match;
-
- if (test_level > 2) {
- TEST_SAY("%s:%i: Our testid %" PRIu64
- ", part %i (%i), msg %i\n",
- func, line, testid, (int)partition,
- (int)rkmessage->partition, msgnum);
- }
-
-
- return;
-
-fail_match:
- TEST_FAIL("%s:%i: Our testid %" PRIu64
- ", part %i, msg %i did "
- "not match message: \"%s\"\n",
- func, line, testid, (int)partition, msgnum, buf);
-}
-
-
-/**
- * @brief Verify that \p mv is identical to \p corr according to flags.
- */
-void test_msgver_verify_compare0(const char *func,
- int line,
- const char *what,
- test_msgver_t *mv,
- test_msgver_t *corr,
- int flags) {
- struct test_mv_vs vs;
- int fails = 0;
-
- memset(&vs, 0, sizeof(vs));
-
- TEST_SAY(
- "%s:%d: %s: Verifying %d received messages (flags 0x%x) by "
- "comparison to correct msgver (%d messages)\n",
- func, line, what, mv->msgcnt, flags, corr->msgcnt);
-
- vs.corr = corr;
-
- /* Per-partition checks */
- fails += test_mv_p_verify_f(mv, flags, test_mv_mvec_verify_corr, &vs);
-
- if (mv->log_suppr_cnt > 0)
- TEST_WARN("%s:%d: %s: %d message warning logs suppressed\n",
- func, line, what, mv->log_suppr_cnt);
-
- if (corr->msgcnt != mv->msgcnt) {
- if (!(flags & TEST_MSGVER_SUBSET)) {
- TEST_WARN("%s:%d: %s: expected %d messages, got %d\n",
- func, line, what, corr->msgcnt, mv->msgcnt);
- fails++;
- }
- }
-
- if (fails)
- TEST_FAIL(
- "%s:%d: %s: Verification of %d received messages "
- "failed: expected %d messages: see previous errors\n",
- func, line, what, mv->msgcnt, corr->msgcnt);
- else
- TEST_SAY(
- "%s:%d: %s: Verification of %d received messages "
- "succeeded: matching %d messages from correct msgver\n",
- func, line, what, mv->msgcnt, corr->msgcnt);
-}
-
-
-/**
- * Consumer poll but dont expect any proper messages for \p timeout_ms.
- */
-void test_consumer_poll_no_msgs(const char *what,
- rd_kafka_t *rk,
- uint64_t testid,
- int timeout_ms) {
- int64_t tmout = test_clock() + ((int64_t)timeout_ms * 1000);
- int cnt = 0;
- test_timing_t t_cons;
- test_msgver_t mv;
-
- test_msgver_init(&mv, testid);
-
- if (what)
- TEST_SAY("%s: not expecting any messages for %dms\n", what,
- timeout_ms);
-
- TIMING_START(&t_cons, "CONSUME");
-
- do {
- rd_kafka_message_t *rkmessage;
-
- rkmessage = rd_kafka_consumer_poll(rk, timeout_ms);
- if (!rkmessage)
- continue;
-
- if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
- TEST_SAY("%s [%" PRId32
- "] reached EOF at "
- "offset %" PRId64 "\n",
- rd_kafka_topic_name(rkmessage->rkt),
- rkmessage->partition, rkmessage->offset);
- test_msgver_add_msg(rk, &mv, rkmessage);
-
- } else if (rkmessage->err) {
- TEST_FAIL(
- "%s [%" PRId32 "] error (offset %" PRId64 "): %s",
- rkmessage->rkt ? rd_kafka_topic_name(rkmessage->rkt)
- : "(no-topic)",
- rkmessage->partition, rkmessage->offset,
- rd_kafka_message_errstr(rkmessage));
-
- } else {
- if (test_msgver_add_msg(rk, &mv, rkmessage)) {
- TEST_MV_WARN(
- &mv,
- "Received unexpected message on "
- "%s [%" PRId32
- "] at offset "
- "%" PRId64 "\n",
- rd_kafka_topic_name(rkmessage->rkt),
- rkmessage->partition, rkmessage->offset);
- cnt++;
- }
- }
-
- rd_kafka_message_destroy(rkmessage);
- } while (test_clock() <= tmout);
-
- if (what)
- TIMING_STOP(&t_cons);
-
- test_msgver_verify(what, &mv, TEST_MSGVER_ALL, 0, 0);
- test_msgver_clear(&mv);
-
- TEST_ASSERT(cnt == 0, "Expected 0 messages, got %d", cnt);
-}
-
-/**
- * @brief Consumer poll with expectation that a \p err will be reached
- * within \p timeout_ms.
- */
-void test_consumer_poll_expect_err(rd_kafka_t *rk,
- uint64_t testid,
- int timeout_ms,
- rd_kafka_resp_err_t err) {
- int64_t tmout = test_clock() + ((int64_t)timeout_ms * 1000);
-
- TEST_SAY("%s: expecting error %s within %dms\n", rd_kafka_name(rk),
- rd_kafka_err2name(err), timeout_ms);
-
- do {
- rd_kafka_message_t *rkmessage;
- rkmessage = rd_kafka_consumer_poll(rk, timeout_ms);
- if (!rkmessage)
- continue;
-
- if (rkmessage->err == err) {
- TEST_SAY("Got expected error: %s: %s\n",
- rd_kafka_err2name(rkmessage->err),
- rd_kafka_message_errstr(rkmessage));
- rd_kafka_message_destroy(rkmessage);
-
- return;
- } else if (rkmessage->err) {
- TEST_FAIL("%s [%" PRId32
- "] unexpected error "
- "(offset %" PRId64 "): %s",
- rkmessage->rkt
- ? rd_kafka_topic_name(rkmessage->rkt)
- : "(no-topic)",
- rkmessage->partition, rkmessage->offset,
- rd_kafka_err2name(rkmessage->err));
- }
-
- rd_kafka_message_destroy(rkmessage);
- } while (test_clock() <= tmout);
- TEST_FAIL("Expected error %s not seen in %dms", rd_kafka_err2name(err),
- timeout_ms);
-}
-
-/**
- * Call consumer poll once and then return.
- * Messages are handled.
- *
- * \p mv is optional
- *
- * @returns 0 on timeout, 1 if a message was received or .._PARTITION_EOF
- * if EOF was reached.
- * TEST_FAIL()s on all errors.
- */
-int test_consumer_poll_once(rd_kafka_t *rk, test_msgver_t *mv, int timeout_ms) {
- rd_kafka_message_t *rkmessage;
-
- rkmessage = rd_kafka_consumer_poll(rk, timeout_ms);
- if (!rkmessage)
- return 0;
-
- if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
- TEST_SAY("%s [%" PRId32
- "] reached EOF at "
- "offset %" PRId64 "\n",
- rd_kafka_topic_name(rkmessage->rkt),
- rkmessage->partition, rkmessage->offset);
- if (mv)
- test_msgver_add_msg(rk, mv, rkmessage);
- rd_kafka_message_destroy(rkmessage);
- return RD_KAFKA_RESP_ERR__PARTITION_EOF;
-
- } else if (rkmessage->err) {
- TEST_FAIL("%s [%" PRId32 "] error (offset %" PRId64 "): %s",
- rkmessage->rkt ? rd_kafka_topic_name(rkmessage->rkt)
- : "(no-topic)",
- rkmessage->partition, rkmessage->offset,
- rd_kafka_message_errstr(rkmessage));
-
- } else {
- if (mv)
- test_msgver_add_msg(rk, mv, rkmessage);
- }
-
- rd_kafka_message_destroy(rkmessage);
- return 1;
-}
-
-/**
- * @param exact Require exact exp_eof_cnt (unless -1) and exp_cnt (unless -1).
- * If false: poll until either one is reached.
- * @param timeout_ms Each call to poll has a timeout set by this argument. The
- * test fails if any poll times out.
- */
-int test_consumer_poll_exact_timeout(const char *what,
- rd_kafka_t *rk,
- uint64_t testid,
- int exp_eof_cnt,
- int exp_msg_base,
- int exp_cnt,
- rd_bool_t exact,
- test_msgver_t *mv,
- int timeout_ms) {
- int eof_cnt = 0;
- int cnt = 0;
- test_timing_t t_cons;
-
- TEST_SAY("%s: consume %s%d messages\n", what, exact ? "exactly " : "",
- exp_cnt);
-
- TIMING_START(&t_cons, "CONSUME");
-
- while ((!exact && ((exp_eof_cnt <= 0 || eof_cnt < exp_eof_cnt) &&
- (exp_cnt <= 0 || cnt < exp_cnt))) ||
- (exact && (eof_cnt < exp_eof_cnt || cnt < exp_cnt))) {
- rd_kafka_message_t *rkmessage;
-
- rkmessage =
- rd_kafka_consumer_poll(rk, tmout_multip(timeout_ms));
- if (!rkmessage) /* Shouldn't take this long to get a msg */
- TEST_FAIL(
- "%s: consumer_poll() timeout "
- "(%d/%d eof, %d/%d msgs)\n",
- what, eof_cnt, exp_eof_cnt, cnt, exp_cnt);
-
-
- if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
- TEST_SAY("%s [%" PRId32
- "] reached EOF at "
- "offset %" PRId64 "\n",
- rd_kafka_topic_name(rkmessage->rkt),
- rkmessage->partition, rkmessage->offset);
- TEST_ASSERT(exp_eof_cnt != 0, "expected no EOFs");
- if (mv)
- test_msgver_add_msg(rk, mv, rkmessage);
- eof_cnt++;
-
- } else if (rkmessage->err) {
- TEST_FAIL(
- "%s [%" PRId32 "] error (offset %" PRId64 "): %s",
- rkmessage->rkt ? rd_kafka_topic_name(rkmessage->rkt)
- : "(no-topic)",
- rkmessage->partition, rkmessage->offset,
- rd_kafka_message_errstr(rkmessage));
-
- } else {
- TEST_SAYL(4,
- "%s: consumed message on %s [%" PRId32
- "] "
- "at offset %" PRId64 " (leader epoch %" PRId32
- ")\n",
- what, rd_kafka_topic_name(rkmessage->rkt),
- rkmessage->partition, rkmessage->offset,
- rd_kafka_message_leader_epoch(rkmessage));
-
- if (!mv || test_msgver_add_msg(rk, mv, rkmessage))
- cnt++;
- }
-
- rd_kafka_message_destroy(rkmessage);
- }
-
- TIMING_STOP(&t_cons);
-
- TEST_SAY("%s: consumed %d/%d messages (%d/%d EOFs)\n", what, cnt,
- exp_cnt, eof_cnt, exp_eof_cnt);
-
- TEST_ASSERT(!exact || ((exp_cnt == -1 || exp_cnt == cnt) &&
- (exp_eof_cnt == -1 || exp_eof_cnt == eof_cnt)),
- "%s: mismatch between exact expected counts and actual: "
- "%d/%d EOFs, %d/%d msgs",
- what, eof_cnt, exp_eof_cnt, cnt, exp_cnt);
-
- if (exp_cnt == 0)
- TEST_ASSERT(cnt == 0 && eof_cnt == exp_eof_cnt,
- "%s: expected no messages and %d EOFs: "
- "got %d messages and %d EOFs",
- what, exp_eof_cnt, cnt, eof_cnt);
- return cnt;
-}
-
-
-/**
- * @param exact Require exact exp_eof_cnt (unless -1) and exp_cnt (unless -1).
- * If false: poll until either one is reached.
- */
-int test_consumer_poll_exact(const char *what,
- rd_kafka_t *rk,
- uint64_t testid,
- int exp_eof_cnt,
- int exp_msg_base,
- int exp_cnt,
- rd_bool_t exact,
- test_msgver_t *mv) {
- return test_consumer_poll_exact_timeout(what, rk, testid, exp_eof_cnt,
- exp_msg_base, exp_cnt, exact,
- mv, 10 * 1000);
-}
-
-int test_consumer_poll(const char *what,
- rd_kafka_t *rk,
- uint64_t testid,
- int exp_eof_cnt,
- int exp_msg_base,
- int exp_cnt,
- test_msgver_t *mv) {
- return test_consumer_poll_exact(what, rk, testid, exp_eof_cnt,
- exp_msg_base, exp_cnt,
- rd_false /*not exact */, mv);
-}
-
-int test_consumer_poll_timeout(const char *what,
- rd_kafka_t *rk,
- uint64_t testid,
- int exp_eof_cnt,
- int exp_msg_base,
- int exp_cnt,
- test_msgver_t *mv,
- int timeout_ms) {
- return test_consumer_poll_exact_timeout(
- what, rk, testid, exp_eof_cnt, exp_msg_base, exp_cnt,
- rd_false /*not exact */, mv, timeout_ms);
-}
-
-void test_consumer_close(rd_kafka_t *rk) {
- rd_kafka_resp_err_t err;
- test_timing_t timing;
-
- TEST_SAY("Closing consumer %s\n", rd_kafka_name(rk));
-
- TIMING_START(&timing, "CONSUMER.CLOSE");
- err = rd_kafka_consumer_close(rk);
- TIMING_STOP(&timing);
- if (err)
- TEST_FAIL("Failed to close consumer: %s\n",
- rd_kafka_err2str(err));
-}
-
-
-void test_flush(rd_kafka_t *rk, int timeout_ms) {
- test_timing_t timing;
- rd_kafka_resp_err_t err;
-
- TEST_SAY("%s: Flushing %d messages\n", rd_kafka_name(rk),
- rd_kafka_outq_len(rk));
- TIMING_START(&timing, "FLUSH");
- err = rd_kafka_flush(rk, timeout_ms);
- TIMING_STOP(&timing);
- if (err)
- TEST_FAIL("Failed to flush(%s, %d): %s: len() = %d\n",
- rd_kafka_name(rk), timeout_ms, rd_kafka_err2str(err),
- rd_kafka_outq_len(rk));
-}
-
-
-void test_conf_set(rd_kafka_conf_t *conf, const char *name, const char *val) {
- char errstr[512];
- if (rd_kafka_conf_set(conf, name, val, errstr, sizeof(errstr)) !=
- RD_KAFKA_CONF_OK)
- TEST_FAIL("Failed to set config \"%s\"=\"%s\": %s\n", name, val,
- errstr);
-}
-
-/**
- * @brief Get configuration value for property \p name.
- *
- * @param conf Configuration to get value from. If NULL the test.conf (if any)
- * configuration will be used.
- */
-char *test_conf_get(const rd_kafka_conf_t *conf, const char *name) {
- static RD_TLS char ret[256];
- size_t ret_sz = sizeof(ret);
- rd_kafka_conf_t *def_conf = NULL;
-
- if (!conf) /* Use the current test.conf */
- test_conf_init(&def_conf, NULL, 0);
-
- if (rd_kafka_conf_get(conf ? conf : def_conf, name, ret, &ret_sz) !=
- RD_KAFKA_CONF_OK)
- TEST_FAIL("Failed to get config \"%s\": %s\n", name,
- "unknown property");
-
- if (def_conf)
- rd_kafka_conf_destroy(def_conf);
-
- return ret;
-}
-
-
-char *test_topic_conf_get(const rd_kafka_topic_conf_t *tconf,
- const char *name) {
- static RD_TLS char ret[256];
- size_t ret_sz = sizeof(ret);
- if (rd_kafka_topic_conf_get(tconf, name, ret, &ret_sz) !=
- RD_KAFKA_CONF_OK)
- TEST_FAIL("Failed to get topic config \"%s\": %s\n", name,
- "unknown property");
- return ret;
-}
-
-
-/**
- * @brief Check if property \name matches \p val in \p conf.
- * If \p conf is NULL the test config will be used. */
-int test_conf_match(rd_kafka_conf_t *conf, const char *name, const char *val) {
- char *real;
- int free_conf = 0;
-
- if (!conf) {
- test_conf_init(&conf, NULL, 0);
- free_conf = 1;
- }
-
- real = test_conf_get(conf, name);
-
- if (free_conf)
- rd_kafka_conf_destroy(conf);
-
- return !strcmp(real, val);
-}
-
-
-void test_topic_conf_set(rd_kafka_topic_conf_t *tconf,
- const char *name,
- const char *val) {
- char errstr[512];
- if (rd_kafka_topic_conf_set(tconf, name, val, errstr, sizeof(errstr)) !=
- RD_KAFKA_CONF_OK)
- TEST_FAIL("Failed to set topic config \"%s\"=\"%s\": %s\n",
- name, val, errstr);
-}
-
-/**
- * @brief First attempt to set topic level property, then global.
- */
-void test_any_conf_set(rd_kafka_conf_t *conf,
- rd_kafka_topic_conf_t *tconf,
- const char *name,
- const char *val) {
- rd_kafka_conf_res_t res = RD_KAFKA_CONF_UNKNOWN;
- char errstr[512] = {"Missing conf_t"};
-
- if (tconf)
- res = rd_kafka_topic_conf_set(tconf, name, val, errstr,
- sizeof(errstr));
- if (res == RD_KAFKA_CONF_UNKNOWN && conf)
- res =
- rd_kafka_conf_set(conf, name, val, errstr, sizeof(errstr));
-
- if (res != RD_KAFKA_CONF_OK)
- TEST_FAIL("Failed to set any config \"%s\"=\"%s\": %s\n", name,
- val, errstr);
-}
-
-
-/**
- * @returns true if test clients need to be configured for authentication
- * or other security measures (SSL), else false for unauthed plaintext.
- */
-int test_needs_auth(void) {
- rd_kafka_conf_t *conf;
- const char *sec;
-
- test_conf_init(&conf, NULL, 0);
-
- sec = test_conf_get(conf, "security.protocol");
-
- rd_kafka_conf_destroy(conf);
-
- return strcmp(sec, "plaintext");
-}
-
-
-void test_print_partition_list(
- const rd_kafka_topic_partition_list_t *partitions) {
- int i;
- for (i = 0; i < partitions->cnt; i++) {
- TEST_SAY(" %s [%" PRId32 "] offset %" PRId64 " (epoch %" PRId32
- ") %s%s\n",
- partitions->elems[i].topic,
- partitions->elems[i].partition,
- partitions->elems[i].offset,
- rd_kafka_topic_partition_get_leader_epoch(
- &partitions->elems[i]),
- partitions->elems[i].err ? ": " : "",
- partitions->elems[i].err
- ? rd_kafka_err2str(partitions->elems[i].err)
- : "");
- }
-}
-
-/**
- * @brief Compare two lists, returning 0 if equal.
- *
- * @remark The lists may be sorted by this function.
- */
-int test_partition_list_cmp(rd_kafka_topic_partition_list_t *al,
- rd_kafka_topic_partition_list_t *bl) {
- int i;
-
- if (al->cnt < bl->cnt)
- return -1;
- else if (al->cnt > bl->cnt)
- return 1;
- else if (al->cnt == 0)
- return 0;
-
- rd_kafka_topic_partition_list_sort(al, NULL, NULL);
- rd_kafka_topic_partition_list_sort(bl, NULL, NULL);
-
- for (i = 0; i < al->cnt; i++) {
- const rd_kafka_topic_partition_t *a = &al->elems[i];
- const rd_kafka_topic_partition_t *b = &bl->elems[i];
- if (a->partition != b->partition || strcmp(a->topic, b->topic))
- return -1;
- }
-
- return 0;
-}
-
-/**
- * @brief Compare two lists and their offsets, returning 0 if equal.
- *
- * @remark The lists may be sorted by this function.
- */
-int test_partition_list_and_offsets_cmp(rd_kafka_topic_partition_list_t *al,
- rd_kafka_topic_partition_list_t *bl) {
- int i;
-
- if (al->cnt < bl->cnt)
- return -1;
- else if (al->cnt > bl->cnt)
- return 1;
- else if (al->cnt == 0)
- return 0;
-
- rd_kafka_topic_partition_list_sort(al, NULL, NULL);
- rd_kafka_topic_partition_list_sort(bl, NULL, NULL);
-
- for (i = 0; i < al->cnt; i++) {
- const rd_kafka_topic_partition_t *a = &al->elems[i];
- const rd_kafka_topic_partition_t *b = &bl->elems[i];
- if (a->partition != b->partition ||
- strcmp(a->topic, b->topic) || a->offset != b->offset ||
- rd_kafka_topic_partition_get_leader_epoch(a) !=
- rd_kafka_topic_partition_get_leader_epoch(b))
- return -1;
- }
-
- return 0;
-}
-
-/**
- * @brief Execute script from the Kafka distribution bin/ path.
- */
-void test_kafka_cmd(const char *fmt, ...) {
-#ifdef _WIN32
- TEST_FAIL("%s not supported on Windows, yet", __FUNCTION__);
-#else
- char cmd[1024];
- int r;
- va_list ap;
- test_timing_t t_cmd;
- const char *kpath;
-
- kpath = test_getenv("KAFKA_PATH", NULL);
-
- if (!kpath)
- TEST_FAIL("%s: KAFKA_PATH must be set", __FUNCTION__);
-
- r = rd_snprintf(cmd, sizeof(cmd), "%s/bin/", kpath);
- TEST_ASSERT(r < (int)sizeof(cmd));
-
- va_start(ap, fmt);
- rd_vsnprintf(cmd + r, sizeof(cmd) - r, fmt, ap);
- va_end(ap);
-
- TEST_SAY("Executing: %s\n", cmd);
- TIMING_START(&t_cmd, "exec");
- r = system(cmd);
- TIMING_STOP(&t_cmd);
-
- if (r == -1)
- TEST_FAIL("system(\"%s\") failed: %s", cmd, strerror(errno));
- else if (WIFSIGNALED(r))
- TEST_FAIL("system(\"%s\") terminated by signal %d\n", cmd,
- WTERMSIG(r));
- else if (WEXITSTATUS(r))
- TEST_FAIL("system(\"%s\") failed with exit status %d\n", cmd,
- WEXITSTATUS(r));
-#endif
-}
-
-/**
- * @brief Execute kafka-topics.sh from the Kafka distribution.
- */
-void test_kafka_topics(const char *fmt, ...) {
-#ifdef _WIN32
- TEST_FAIL("%s not supported on Windows, yet", __FUNCTION__);
-#else
- char cmd[1024];
- int r, bytes_left;
- va_list ap;
- test_timing_t t_cmd;
- const char *kpath, *bootstrap_env, *flag, *bootstrap_srvs;
-
- if (test_broker_version >= TEST_BRKVER(3, 0, 0, 0)) {
- bootstrap_env = "BROKERS";
- flag = "--bootstrap-server";
- } else {
- bootstrap_env = "ZK_ADDRESS";
- flag = "--zookeeper";
- }
-
- kpath = test_getenv("KAFKA_PATH", NULL);
- bootstrap_srvs = test_getenv(bootstrap_env, NULL);
-
- if (!kpath || !bootstrap_srvs)
- TEST_FAIL("%s: KAFKA_PATH and %s must be set", __FUNCTION__,
- bootstrap_env);
-
- r = rd_snprintf(cmd, sizeof(cmd), "%s/bin/kafka-topics.sh %s %s ",
- kpath, flag, bootstrap_srvs);
- TEST_ASSERT(r > 0 && r < (int)sizeof(cmd));
-
- bytes_left = sizeof(cmd) - r;
-
- va_start(ap, fmt);
- r = rd_vsnprintf(cmd + r, bytes_left, fmt, ap);
- va_end(ap);
- TEST_ASSERT(r > 0 && r < bytes_left);
-
- TEST_SAY("Executing: %s\n", cmd);
- TIMING_START(&t_cmd, "exec");
- r = system(cmd);
- TIMING_STOP(&t_cmd);
-
- if (r == -1)
- TEST_FAIL("system(\"%s\") failed: %s", cmd, strerror(errno));
- else if (WIFSIGNALED(r))
- TEST_FAIL("system(\"%s\") terminated by signal %d\n", cmd,
- WTERMSIG(r));
- else if (WEXITSTATUS(r))
- TEST_FAIL("system(\"%s\") failed with exit status %d\n", cmd,
- WEXITSTATUS(r));
-#endif
-}
-
-
-
-/**
- * @brief Create topic using Topic Admin API
- *
- * @param configs is an optional key-value tuple array of
- * topic configs (or NULL).
- */
-void test_admin_create_topic(rd_kafka_t *use_rk,
- const char *topicname,
- int partition_cnt,
- int replication_factor,
- const char **configs) {
- rd_kafka_t *rk;
- rd_kafka_NewTopic_t *newt[1];
- const size_t newt_cnt = 1;
- rd_kafka_AdminOptions_t *options;
- rd_kafka_queue_t *rkqu;
- rd_kafka_event_t *rkev;
- const rd_kafka_CreateTopics_result_t *res;
- const rd_kafka_topic_result_t **terr;
- int timeout_ms = tmout_multip(10000);
- size_t res_cnt;
- rd_kafka_resp_err_t err;
- char errstr[512];
- test_timing_t t_create;
-
- if (!(rk = use_rk))
- rk = test_create_producer();
-
- rkqu = rd_kafka_queue_new(rk);
-
- newt[0] =
- rd_kafka_NewTopic_new(topicname, partition_cnt, replication_factor,
- errstr, sizeof(errstr));
- TEST_ASSERT(newt[0] != NULL, "%s", errstr);
-
- if (configs) {
- int i;
-
- for (i = 0; configs[i] && configs[i + 1]; i += 2)
- TEST_CALL_ERR__(rd_kafka_NewTopic_set_config(
- newt[0], configs[i], configs[i + 1]));
- }
-
- options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_CREATETOPICS);
- err = rd_kafka_AdminOptions_set_operation_timeout(
- options, timeout_ms, errstr, sizeof(errstr));
- TEST_ASSERT(!err, "%s", errstr);
-
- TEST_SAY(
- "Creating topic \"%s\" "
- "(partitions=%d, replication_factor=%d, timeout=%d)\n",
- topicname, partition_cnt, replication_factor, timeout_ms);
-
- TIMING_START(&t_create, "CreateTopics");
- rd_kafka_CreateTopics(rk, newt, newt_cnt, options, rkqu);
-
- /* Wait for result */
- rkev = rd_kafka_queue_poll(rkqu, timeout_ms + 2000);
- TEST_ASSERT(rkev, "Timed out waiting for CreateTopics result");
-
- TIMING_STOP(&t_create);
-
- TEST_ASSERT(!rd_kafka_event_error(rkev), "CreateTopics failed: %s",
- rd_kafka_event_error_string(rkev));
-
- res = rd_kafka_event_CreateTopics_result(rkev);
- TEST_ASSERT(res, "Expected CreateTopics_result, not %s",
- rd_kafka_event_name(rkev));
-
- terr = rd_kafka_CreateTopics_result_topics(res, &res_cnt);
- TEST_ASSERT(terr, "CreateTopics_result_topics returned NULL");
- TEST_ASSERT(res_cnt == newt_cnt,
- "CreateTopics_result_topics returned %" PRIusz
- " topics, "
- "not the expected %" PRIusz,
- res_cnt, newt_cnt);
-
- TEST_ASSERT(!rd_kafka_topic_result_error(terr[0]) ||
- rd_kafka_topic_result_error(terr[0]) ==
- RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS,
- "Topic %s result error: %s",
- rd_kafka_topic_result_name(terr[0]),
- rd_kafka_topic_result_error_string(terr[0]));
-
- rd_kafka_event_destroy(rkev);
-
- rd_kafka_queue_destroy(rkqu);
-
- rd_kafka_AdminOptions_destroy(options);
-
- rd_kafka_NewTopic_destroy(newt[0]);
-
- if (!use_rk)
- rd_kafka_destroy(rk);
-}
-
-
-
-/**
- * @brief Create topic using kafka-topics.sh --create
- */
-static void test_create_topic_sh(const char *topicname,
- int partition_cnt,
- int replication_factor) {
- test_kafka_topics(
- "--create --topic \"%s\" "
- "--replication-factor %d --partitions %d",
- topicname, replication_factor, partition_cnt);
-}
-
-
-/**
- * @brief Create topic
- */
-void test_create_topic(rd_kafka_t *use_rk,
- const char *topicname,
- int partition_cnt,
- int replication_factor) {
- if (test_broker_version < TEST_BRKVER(0, 10, 2, 0))
- test_create_topic_sh(topicname, partition_cnt,
- replication_factor);
- else
- test_admin_create_topic(use_rk, topicname, partition_cnt,
- replication_factor, NULL);
-}
-
-
-/**
- * @brief Create topic using kafka-topics.sh --delete
- */
-static void test_delete_topic_sh(const char *topicname) {
- test_kafka_topics("--delete --topic \"%s\" ", topicname);
-}
-
-
-/**
- * @brief Delete topic using Topic Admin API
- */
-static void test_admin_delete_topic(rd_kafka_t *use_rk, const char *topicname) {
- rd_kafka_t *rk;
- rd_kafka_DeleteTopic_t *delt[1];
- const size_t delt_cnt = 1;
- rd_kafka_AdminOptions_t *options;
- rd_kafka_queue_t *rkqu;
- rd_kafka_event_t *rkev;
- const rd_kafka_DeleteTopics_result_t *res;
- const rd_kafka_topic_result_t **terr;
- int timeout_ms = tmout_multip(10000);
- size_t res_cnt;
- rd_kafka_resp_err_t err;
- char errstr[512];
- test_timing_t t_create;
-
- if (!(rk = use_rk))
- rk = test_create_producer();
-
- rkqu = rd_kafka_queue_new(rk);
-
- delt[0] = rd_kafka_DeleteTopic_new(topicname);
-
- options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DELETETOPICS);
- err = rd_kafka_AdminOptions_set_operation_timeout(
- options, timeout_ms, errstr, sizeof(errstr));
- TEST_ASSERT(!err, "%s", errstr);
-
- TEST_SAY(
- "Deleting topic \"%s\" "
- "(timeout=%d)\n",
- topicname, timeout_ms);
-
- TIMING_START(&t_create, "DeleteTopics");
- rd_kafka_DeleteTopics(rk, delt, delt_cnt, options, rkqu);
-
- /* Wait for result */
- rkev = rd_kafka_queue_poll(rkqu, timeout_ms + 2000);
- TEST_ASSERT(rkev, "Timed out waiting for DeleteTopics result");
-
- TIMING_STOP(&t_create);
-
- res = rd_kafka_event_DeleteTopics_result(rkev);
- TEST_ASSERT(res, "Expected DeleteTopics_result, not %s",
- rd_kafka_event_name(rkev));
-
- terr = rd_kafka_DeleteTopics_result_topics(res, &res_cnt);
- TEST_ASSERT(terr, "DeleteTopics_result_topics returned NULL");
- TEST_ASSERT(res_cnt == delt_cnt,
- "DeleteTopics_result_topics returned %" PRIusz
- " topics, "
- "not the expected %" PRIusz,
- res_cnt, delt_cnt);
-
- TEST_ASSERT(!rd_kafka_topic_result_error(terr[0]),
- "Topic %s result error: %s",
- rd_kafka_topic_result_name(terr[0]),
- rd_kafka_topic_result_error_string(terr[0]));
-
- rd_kafka_event_destroy(rkev);
-
- rd_kafka_queue_destroy(rkqu);
-
- rd_kafka_AdminOptions_destroy(options);
-
- rd_kafka_DeleteTopic_destroy(delt[0]);
-
- if (!use_rk)
- rd_kafka_destroy(rk);
-}
-
-
-/**
- * @brief Delete a topic
- */
-void test_delete_topic(rd_kafka_t *use_rk, const char *topicname) {
- if (test_broker_version < TEST_BRKVER(0, 10, 2, 0))
- test_delete_topic_sh(topicname);
- else
- test_admin_delete_topic(use_rk, topicname);
-}
-
-
-/**
- * @brief Create additional partitions for a topic using Admin API
- */
-static void test_admin_create_partitions(rd_kafka_t *use_rk,
- const char *topicname,
- int new_partition_cnt) {
- rd_kafka_t *rk;
- rd_kafka_NewPartitions_t *newp[1];
- const size_t newp_cnt = 1;
- rd_kafka_AdminOptions_t *options;
- rd_kafka_queue_t *rkqu;
- rd_kafka_event_t *rkev;
- const rd_kafka_CreatePartitions_result_t *res;
- const rd_kafka_topic_result_t **terr;
- int timeout_ms = tmout_multip(10000);
- size_t res_cnt;
- rd_kafka_resp_err_t err;
- char errstr[512];
- test_timing_t t_create;
-
- if (!(rk = use_rk))
- rk = test_create_producer();
-
- rkqu = rd_kafka_queue_new(rk);
-
- newp[0] = rd_kafka_NewPartitions_new(topicname, new_partition_cnt,
- errstr, sizeof(errstr));
- TEST_ASSERT(newp[0] != NULL, "%s", errstr);
-
- options =
- rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_CREATEPARTITIONS);
- err = rd_kafka_AdminOptions_set_operation_timeout(
- options, timeout_ms, errstr, sizeof(errstr));
- TEST_ASSERT(!err, "%s", errstr);
-
- TEST_SAY("Creating %d (total) partitions for topic \"%s\"\n",
- new_partition_cnt, topicname);
-
- TIMING_START(&t_create, "CreatePartitions");
- rd_kafka_CreatePartitions(rk, newp, newp_cnt, options, rkqu);
-
- /* Wait for result */
- rkev = rd_kafka_queue_poll(rkqu, timeout_ms + 2000);
- TEST_ASSERT(rkev, "Timed out waiting for CreatePartitions result");
-
- TIMING_STOP(&t_create);
-
- res = rd_kafka_event_CreatePartitions_result(rkev);
- TEST_ASSERT(res, "Expected CreatePartitions_result, not %s",
- rd_kafka_event_name(rkev));
-
- terr = rd_kafka_CreatePartitions_result_topics(res, &res_cnt);
- TEST_ASSERT(terr, "CreatePartitions_result_topics returned NULL");
- TEST_ASSERT(res_cnt == newp_cnt,
- "CreatePartitions_result_topics returned %" PRIusz
- " topics, not the expected %" PRIusz,
- res_cnt, newp_cnt);
-
- TEST_ASSERT(!rd_kafka_topic_result_error(terr[0]),
- "Topic %s result error: %s",
- rd_kafka_topic_result_name(terr[0]),
- rd_kafka_topic_result_error_string(terr[0]));
-
- rd_kafka_event_destroy(rkev);
-
- rd_kafka_queue_destroy(rkqu);
-
- rd_kafka_AdminOptions_destroy(options);
-
- rd_kafka_NewPartitions_destroy(newp[0]);
-
- if (!use_rk)
- rd_kafka_destroy(rk);
-}
-
-
-/**
- * @brief Create partitions for topic
- */
-void test_create_partitions(rd_kafka_t *use_rk,
- const char *topicname,
- int new_partition_cnt) {
- if (test_broker_version < TEST_BRKVER(0, 10, 2, 0))
- test_kafka_topics("--alter --topic %s --partitions %d",
- topicname, new_partition_cnt);
- else
- test_admin_create_partitions(use_rk, topicname,
- new_partition_cnt);
-}
-
-
-int test_get_partition_count(rd_kafka_t *rk,
- const char *topicname,
- int timeout_ms) {
- rd_kafka_t *use_rk;
- rd_kafka_resp_err_t err;
- rd_kafka_topic_t *rkt;
- int64_t abs_timeout = test_clock() + ((int64_t)timeout_ms * 1000);
- int ret = -1;
-
- if (!rk)
- use_rk = test_create_producer();
- else
- use_rk = rk;
-
- rkt = rd_kafka_topic_new(use_rk, topicname, NULL);
-
- do {
- const struct rd_kafka_metadata *metadata;
-
- err = rd_kafka_metadata(use_rk, 0, rkt, &metadata,
- tmout_multip(15000));
- if (err)
- TEST_WARN("metadata() for %s failed: %s\n",
- rkt ? rd_kafka_topic_name(rkt)
- : "(all-local)",
- rd_kafka_err2str(err));
- else {
- if (metadata->topic_cnt == 1) {
- if (metadata->topics[0].err == 0 ||
- metadata->topics[0].partition_cnt > 0) {
- int32_t cnt;
- cnt = metadata->topics[0].partition_cnt;
- rd_kafka_metadata_destroy(metadata);
- ret = (int)cnt;
- break;
- }
- TEST_SAY(
- "metadata(%s) returned %s: retrying\n",
- rd_kafka_topic_name(rkt),
- rd_kafka_err2str(metadata->topics[0].err));
- }
- rd_kafka_metadata_destroy(metadata);
- rd_sleep(1);
- }
- } while (test_clock() < abs_timeout);
-
- rd_kafka_topic_destroy(rkt);
-
- if (!rk)
- rd_kafka_destroy(use_rk);
-
- return ret;
-}
-
-/**
- * @brief Let the broker auto-create the topic for us.
- */
-rd_kafka_resp_err_t test_auto_create_topic_rkt(rd_kafka_t *rk,
- rd_kafka_topic_t *rkt,
- int timeout_ms) {
- const struct rd_kafka_metadata *metadata;
- rd_kafka_resp_err_t err;
- test_timing_t t;
- int64_t abs_timeout = test_clock() + ((int64_t)timeout_ms * 1000);
-
- do {
- TIMING_START(&t, "auto_create_topic");
- err = rd_kafka_metadata(rk, 0, rkt, &metadata,
- tmout_multip(15000));
- TIMING_STOP(&t);
- if (err)
- TEST_WARN("metadata() for %s failed: %s\n",
- rkt ? rd_kafka_topic_name(rkt)
- : "(all-local)",
- rd_kafka_err2str(err));
- else {
- if (metadata->topic_cnt == 1) {
- if (metadata->topics[0].err == 0 ||
- metadata->topics[0].partition_cnt > 0) {
- rd_kafka_metadata_destroy(metadata);
- return 0;
- }
- TEST_SAY(
- "metadata(%s) returned %s: retrying\n",
- rd_kafka_topic_name(rkt),
- rd_kafka_err2str(metadata->topics[0].err));
- }
- rd_kafka_metadata_destroy(metadata);
- rd_sleep(1);
- }
- } while (test_clock() < abs_timeout);
-
- return err;
-}
-
-rd_kafka_resp_err_t
-test_auto_create_topic(rd_kafka_t *rk, const char *name, int timeout_ms) {
- rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, name, NULL);
- rd_kafka_resp_err_t err;
- if (!rkt)
- return rd_kafka_last_error();
- err = test_auto_create_topic_rkt(rk, rkt, timeout_ms);
- rd_kafka_topic_destroy(rkt);
- return err;
-}
-
-
-/**
- * @brief Check if topic auto creation works.
- * @returns 1 if it does, else 0.
- */
-int test_check_auto_create_topic(void) {
- rd_kafka_t *rk;
- rd_kafka_conf_t *conf;
- rd_kafka_resp_err_t err;
- const char *topic = test_mk_topic_name("autocreatetest", 1);
-
- test_conf_init(&conf, NULL, 0);
- rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
- err = test_auto_create_topic(rk, topic, tmout_multip(5000));
- if (err)
- TEST_SAY("Auto topic creation of \"%s\" failed: %s\n", topic,
- rd_kafka_err2str(err));
- rd_kafka_destroy(rk);
-
- return err ? 0 : 1;
-}
-
-
-/**
- * @brief Builds and runs a Java application from the java/ directory.
- *
- * The application is started in the background, use
- * test_waitpid() to await its demise.
- *
- * @param cls The app class to run using java/run-class.sh
- *
- * @returns -1 if the application could not be started, else the pid.
- */
-int test_run_java(const char *cls, const char **argv) {
-#ifdef _WIN32
- TEST_WARN("%s(%s) not supported Windows, yet", __FUNCTION__, cls);
- return -1;
-#else
- int r;
- const char *kpath;
- pid_t pid;
- const char **full_argv, **p;
- int cnt;
- extern char **environ;
-
- kpath = test_getenv("KAFKA_PATH", NULL);
-
- if (!kpath) {
- TEST_WARN("%s(%s): KAFKA_PATH must be set\n", __FUNCTION__,
- cls);
- return -1;
- }
-
- /* Build */
- r = system("make -s java");
-
- if (r == -1 || WIFSIGNALED(r) || WEXITSTATUS(r)) {
- TEST_WARN("%s(%s): failed to build java class (code %d)\n",
- __FUNCTION__, cls, r);
- return -1;
- }
-
- /* For child process and run cls */
- pid = fork();
- if (pid == -1) {
- TEST_WARN("%s(%s): failed to fork: %s\n", __FUNCTION__, cls,
- strerror(errno));
- return -1;
- }
-
- if (pid > 0)
- return (int)pid; /* In parent process */
-
- /* In child process */
-
- /* Reconstruct argv to contain run-class.sh and the cls */
- for (cnt = 0; argv[cnt]; cnt++)
- ;
-
- cnt += 3; /* run-class.sh, cls, .., NULL */
- full_argv = malloc(sizeof(*full_argv) * cnt);
- full_argv[0] = "java/run-class.sh";
- full_argv[1] = (const char *)cls;
-
- /* Copy arguments */
- for (p = &full_argv[2]; *argv; p++, argv++)
- *p = *argv;
- *p = NULL;
-
- /* Run */
- r = execve(full_argv[0], (char *const *)full_argv, environ);
-
- TEST_WARN("%s(%s): failed to execute run-class.sh: %s\n", __FUNCTION__,
- cls, strerror(errno));
- exit(2);
-
- return -1; /* NOTREACHED */
-#endif
-}
-
-
-/**
- * @brief Wait for child-process \p pid to exit.
- *
- * @returns -1 if the child process exited successfully, else -1.
- */
-int test_waitpid(int pid) {
-#ifdef _WIN32
- TEST_WARN("%s() not supported Windows, yet", __FUNCTION__);
- return -1;
-#else
- pid_t r;
- int status = 0;
-
- r = waitpid((pid_t)pid, &status, 0);
-
- if (r == -1) {
- TEST_WARN("waitpid(%d) failed: %s\n", pid, strerror(errno));
- return -1;
- }
-
- if (WIFSIGNALED(status)) {
- TEST_WARN("Process %d terminated by signal %d\n", pid,
- WTERMSIG(status));
- return -1;
- } else if (WEXITSTATUS(status)) {
- TEST_WARN("Process %d exited with status %d\n", pid,
- WEXITSTATUS(status));
- return -1;
- }
-
- return 0;
-#endif
-}
-
-
-/**
- * @brief Check if \p feature is builtin to librdkafka.
- * @returns returns 1 if feature is built in, else 0.
- */
-int test_check_builtin(const char *feature) {
- rd_kafka_conf_t *conf;
- char errstr[128];
- int r;
-
- conf = rd_kafka_conf_new();
- if (rd_kafka_conf_set(conf, "builtin.features", feature, errstr,
- sizeof(errstr)) != RD_KAFKA_CONF_OK) {
- TEST_SAY("Feature \"%s\" not built-in: %s\n", feature, errstr);
- r = 0;
- } else {
- TEST_SAY("Feature \"%s\" is built-in\n", feature);
- r = 1;
- }
-
- rd_kafka_conf_destroy(conf);
- return r;
-}
-
-
-char *tsprintf(const char *fmt, ...) {
- static RD_TLS char ret[8][512];
- static RD_TLS int i;
- va_list ap;
-
-
- i = (i + 1) % 8;
-
- va_start(ap, fmt);
- rd_vsnprintf(ret[i], sizeof(ret[i]), fmt, ap);
- va_end(ap);
-
- return ret[i];
-}
-
-
-/**
- * @brief Add a test report JSON object.
- * These will be written as a JSON array to the test report file.
- */
-void test_report_add(struct test *test, const char *fmt, ...) {
- va_list ap;
- char buf[512];
-
- va_start(ap, fmt);
- vsnprintf(buf, sizeof(buf), fmt, ap);
- va_end(ap);
-
- if (test->report_cnt == test->report_size) {
- if (test->report_size == 0)
- test->report_size = 8;
- else
- test->report_size *= 2;
-
- test->report_arr =
- realloc(test->report_arr,
- sizeof(*test->report_arr) * test->report_size);
- }
-
- test->report_arr[test->report_cnt++] = rd_strdup(buf);
-
- TEST_SAYL(1, "Report #%d: %s\n", test->report_cnt - 1, buf);
-}
-
-/**
- * Returns 1 if KAFKA_PATH and BROKERS (or ZK_ADDRESS) is set to se we can use
- * the kafka-topics.sh script to manually create topics.
- *
- * If \p skip is set TEST_SKIP() will be called with a helpful message.
- */
-int test_can_create_topics(int skip) {
-#ifndef _WIN32
- const char *bootstrap;
-#endif
-
- /* Has AdminAPI */
- if (test_broker_version >= TEST_BRKVER(0, 10, 2, 0))
- return 1;
-
-#ifdef _WIN32
- if (skip)
- TEST_SKIP("Cannot create topics on Win32\n");
- return 0;
-#else
-
- bootstrap = test_broker_version >= TEST_BRKVER(3, 0, 0, 0)
- ? "BROKERS"
- : "ZK_ADDRESS";
-
- if (!test_getenv("KAFKA_PATH", NULL) || !test_getenv(bootstrap, NULL)) {
- if (skip)
- TEST_SKIP(
- "Cannot create topics "
- "(set KAFKA_PATH and %s)\n",
- bootstrap);
- return 0;
- }
-
-
- return 1;
-#endif
-}
-
-
-/**
- * Wait for \p event_type, discarding all other events prior to it.
- */
-rd_kafka_event_t *test_wait_event(rd_kafka_queue_t *eventq,
- rd_kafka_event_type_t event_type,
- int timeout_ms) {
- test_timing_t t_w;
- int64_t abs_timeout = test_clock() + ((int64_t)timeout_ms * 1000);
-
- TIMING_START(&t_w, "wait_event");
- while (test_clock() < abs_timeout) {
- rd_kafka_event_t *rkev;
-
- rkev = rd_kafka_queue_poll(
- eventq, (int)(abs_timeout - test_clock()) / 1000);
-
- if (rd_kafka_event_type(rkev) == event_type) {
- TIMING_STOP(&t_w);
- return rkev;
- }
-
- if (!rkev)
- continue;
-
- if (rd_kafka_event_error(rkev))
- TEST_SAY("discarding ignored event %s: %s\n",
- rd_kafka_event_name(rkev),
- rd_kafka_event_error_string(rkev));
- else
- TEST_SAY("discarding ignored event %s\n",
- rd_kafka_event_name(rkev));
- rd_kafka_event_destroy(rkev);
- }
- TIMING_STOP(&t_w);
-
- return NULL;
-}
-
-
-void test_SAY(const char *file, int line, int level, const char *str) {
- TEST_SAYL(level, "%s", str);
-}
-
-void test_SKIP(const char *file, int line, const char *str) {
- TEST_WARN("SKIPPING TEST: %s", str);
- TEST_LOCK();
- test_curr->state = TEST_SKIPPED;
- if (!*test_curr->failstr) {
- rd_snprintf(test_curr->failstr, sizeof(test_curr->failstr),
- "%s", str);
- rtrim(test_curr->failstr);
- }
- TEST_UNLOCK();
-}
-
-const char *test_curr_name(void) {
- return test_curr->name;
-}
-
-
-/**
- * @brief Dump/print message haders
- */
-void test_headers_dump(const char *what,
- int lvl,
- const rd_kafka_headers_t *hdrs) {
- size_t idx = 0;
- const char *name, *value;
- size_t size;
-
- while (!rd_kafka_header_get_all(hdrs, idx++, &name,
- (const void **)&value, &size))
- TEST_SAYL(lvl, "%s: Header #%" PRIusz ": %s='%s'\n", what,
- idx - 1, name, value ? value : "(NULL)");
-}
-
-
-/**
- * @brief Retrieve and return the list of broker ids in the cluster.
- *
- * @param rk Optional instance to use.
- * @param cntp Will be updated to the number of brokers returned.
- *
- * @returns a malloc:ed list of int32_t broker ids.
- */
-int32_t *test_get_broker_ids(rd_kafka_t *use_rk, size_t *cntp) {
- int32_t *ids;
- rd_kafka_t *rk;
- const rd_kafka_metadata_t *md;
- rd_kafka_resp_err_t err;
- size_t i;
-
- if (!(rk = use_rk))
- rk = test_create_producer();
-
- err = rd_kafka_metadata(rk, 0, NULL, &md, tmout_multip(5000));
- TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
- TEST_ASSERT(md->broker_cnt > 0, "%d brokers, expected > 0",
- md->broker_cnt);
-
- ids = malloc(sizeof(*ids) * md->broker_cnt);
-
- for (i = 0; i < (size_t)md->broker_cnt; i++)
- ids[i] = md->brokers[i].id;
-
- *cntp = md->broker_cnt;
-
- rd_kafka_metadata_destroy(md);
-
- if (!use_rk)
- rd_kafka_destroy(rk);
-
- return ids;
-}
-
-
-
-/**
- * @brief Verify that all topics in \p topics are reported in metadata,
- * and that none of the topics in \p not_topics are reported.
- *
- * @returns the number of failures (but does not FAIL).
- */
-static int verify_topics_in_metadata(rd_kafka_t *rk,
- rd_kafka_metadata_topic_t *topics,
- size_t topic_cnt,
- rd_kafka_metadata_topic_t *not_topics,
- size_t not_topic_cnt) {
- const rd_kafka_metadata_t *md;
- rd_kafka_resp_err_t err;
- int ti;
- size_t i;
- int fails = 0;
-
- /* Mark topics with dummy error which is overwritten
- * when topic is found in metadata, allowing us to check
- * for missed topics. */
- for (i = 0; i < topic_cnt; i++)
- topics[i].err = 12345;
-
- err = rd_kafka_metadata(rk, 1 /*all_topics*/, NULL, &md,
- tmout_multip(5000));
- TEST_ASSERT(!err, "metadata failed: %s", rd_kafka_err2str(err));
-
- for (ti = 0; ti < md->topic_cnt; ti++) {
- const rd_kafka_metadata_topic_t *mdt = &md->topics[ti];
-
- for (i = 0; i < topic_cnt; i++) {
- int pi;
- rd_kafka_metadata_topic_t *exp_mdt;
-
- if (strcmp(topics[i].topic, mdt->topic))
- continue;
-
- exp_mdt = &topics[i];
-
- exp_mdt->err = mdt->err; /* indicate found */
- if (mdt->err) {
- TEST_SAY(
- "metadata: "
- "Topic %s has error %s\n",
- mdt->topic, rd_kafka_err2str(mdt->err));
- fails++;
- }
-
- if (exp_mdt->partition_cnt > 0 &&
- mdt->partition_cnt != exp_mdt->partition_cnt) {
- TEST_SAY(
- "metadata: "
- "Topic %s, expected %d partitions"
- ", not %d\n",
- mdt->topic, exp_mdt->partition_cnt,
- mdt->partition_cnt);
- fails++;
- continue;
- }
-
- /* Verify per-partition values */
- for (pi = 0;
- exp_mdt->partitions && pi < exp_mdt->partition_cnt;
- pi++) {
- const rd_kafka_metadata_partition_t *mdp =
- &mdt->partitions[pi];
- const rd_kafka_metadata_partition_t *exp_mdp =
- &exp_mdt->partitions[pi];
-
- if (mdp->id != exp_mdp->id) {
- TEST_SAY(
- "metadata: "
- "Topic %s, "
- "partition %d, "
- "partition list out of order,"
- " expected %d, not %d\n",
- mdt->topic, pi, exp_mdp->id,
- mdp->id);
- fails++;
- continue;
- }
-
- if (exp_mdp->replicas) {
- if (mdp->replica_cnt !=
- exp_mdp->replica_cnt) {
- TEST_SAY(
- "metadata: "
- "Topic %s, "
- "partition %d, "
- "expected %d replicas,"
- " not %d\n",
- mdt->topic, pi,
- exp_mdp->replica_cnt,
- mdp->replica_cnt);
- fails++;
- } else if (
- memcmp(
- mdp->replicas,
- exp_mdp->replicas,
- mdp->replica_cnt *
- sizeof(*mdp->replicas))) {
- int ri;
-
- TEST_SAY(
- "metadata: "
- "Topic %s, "
- "partition %d, "
- "replica mismatch:\n",
- mdt->topic, pi);
-
- for (ri = 0;
- ri < mdp->replica_cnt;
- ri++) {
- TEST_SAY(
- " #%d: "
- "expected "
- "replica %d, "
- "not %d\n",
- ri,
- exp_mdp
- ->replicas[ri],
- mdp->replicas[ri]);
- }
-
- fails++;
- }
- }
- }
- }
-
- for (i = 0; i < not_topic_cnt; i++) {
- if (strcmp(not_topics[i].topic, mdt->topic))
- continue;
-
- TEST_SAY(
- "metadata: "
- "Topic %s found in metadata, unexpected\n",
- mdt->topic);
- fails++;
- }
- }
-
- for (i = 0; i < topic_cnt; i++) {
- if ((int)topics[i].err == 12345) {
- TEST_SAY(
- "metadata: "
- "Topic %s not seen in metadata\n",
- topics[i].topic);
- fails++;
- }
- }
-
- if (fails > 0)
- TEST_SAY("Metadata verification for %" PRIusz
- " topics failed "
- "with %d errors (see above)\n",
- topic_cnt, fails);
- else
- TEST_SAY(
- "Metadata verification succeeded: "
- "%" PRIusz
- " desired topics seen, "
- "%" PRIusz " undesired topics not seen\n",
- topic_cnt, not_topic_cnt);
-
- rd_kafka_metadata_destroy(md);
-
- return fails;
-}
-
-
-
-/**
- * @brief Wait for metadata to reflect expected and not expected topics
- */
-void test_wait_metadata_update(rd_kafka_t *rk,
- rd_kafka_metadata_topic_t *topics,
- size_t topic_cnt,
- rd_kafka_metadata_topic_t *not_topics,
- size_t not_topic_cnt,
- int tmout) {
- int64_t abs_timeout;
- test_timing_t t_md;
- rd_kafka_t *our_rk = NULL;
-
- if (!rk)
- rk = our_rk = test_create_handle(RD_KAFKA_PRODUCER, NULL);
-
- abs_timeout = test_clock() + ((int64_t)tmout * 1000);
-
- TEST_SAY("Waiting for up to %dms for metadata update\n", tmout);
-
- TIMING_START(&t_md, "METADATA.WAIT");
- do {
- int md_fails;
-
- md_fails = verify_topics_in_metadata(rk, topics, topic_cnt,
- not_topics, not_topic_cnt);
-
- if (!md_fails) {
- TEST_SAY(
- "All expected topics (not?) "
- "seen in metadata\n");
- abs_timeout = 0;
- break;
- }
-
- rd_sleep(1);
- } while (test_clock() < abs_timeout);
- TIMING_STOP(&t_md);
-
- if (our_rk)
- rd_kafka_destroy(our_rk);
-
- if (abs_timeout)
- TEST_FAIL("Expected topics not seen in given time.");
-}
-
-/**
- * @brief Wait for topic to be available in metadata
- */
-void test_wait_topic_exists(rd_kafka_t *rk, const char *topic, int tmout) {
- rd_kafka_metadata_topic_t topics = {.topic = (char *)topic};
-
- test_wait_metadata_update(rk, &topics, 1, NULL, 0, tmout);
-
- /* Wait an additional second for the topic to propagate in
- * the cluster. This is not perfect but a cheap workaround for
- * the asynchronous nature of topic creations in Kafka. */
- rd_sleep(1);
-}
-
-
-
-/**
- * @brief Wait for up to \p tmout for any type of admin result.
- * @returns the event
- */
-rd_kafka_event_t *test_wait_admin_result(rd_kafka_queue_t *q,
- rd_kafka_event_type_t evtype,
- int tmout) {
- rd_kafka_event_t *rkev;
-
- while (1) {
- rkev = rd_kafka_queue_poll(q, tmout);
- if (!rkev)
- TEST_FAIL("Timed out waiting for admin result (%d)\n",
- evtype);
-
- if (rd_kafka_event_type(rkev) == evtype)
- return rkev;
-
-
- if (rd_kafka_event_type(rkev) == RD_KAFKA_EVENT_ERROR) {
- TEST_WARN(
- "Received error event while waiting for %d: "
- "%s: ignoring",
- evtype, rd_kafka_event_error_string(rkev));
- continue;
- }
-
-
- TEST_ASSERT(rd_kafka_event_type(rkev) == evtype,
- "Expected event type %d, got %d (%s)", evtype,
- rd_kafka_event_type(rkev),
- rd_kafka_event_name(rkev));
- }
-
- return NULL;
-}
-
-/**
- * @brief Wait for up to \p tmout for an admin API result and return the
- * distilled error code.
- *
- * Supported APIs:
- * - AlterConfigs
- * - CreatePartitions
- * - CreateTopics
- * - DeleteGroups
- * - DeleteRecords
- * - DeleteTopics
- * - DeleteConsumerGroupOffsets
- * - DescribeConfigs
- * - CreateAcls
- */
-rd_kafka_resp_err_t test_wait_topic_admin_result(rd_kafka_queue_t *q,
- rd_kafka_event_type_t evtype,
- rd_kafka_event_t **retevent,
- int tmout) {
- rd_kafka_event_t *rkev;
- size_t i;
- const rd_kafka_topic_result_t **terr = NULL;
- size_t terr_cnt = 0;
- const rd_kafka_ConfigResource_t **cres = NULL;
- size_t cres_cnt = 0;
- const rd_kafka_acl_result_t **aclres = NULL;
- size_t aclres_cnt = 0;
- int errcnt = 0;
- rd_kafka_resp_err_t err;
- const rd_kafka_group_result_t **gres = NULL;
- size_t gres_cnt = 0;
- const rd_kafka_ConsumerGroupDescription_t **gdescs = NULL;
- size_t gdescs_cnt = 0;
- const rd_kafka_error_t **glists_errors = NULL;
- size_t glists_error_cnt = 0;
- const rd_kafka_topic_partition_list_t *offsets = NULL;
-
- rkev = test_wait_admin_result(q, evtype, tmout);
-
- if ((err = rd_kafka_event_error(rkev))) {
- TEST_WARN("%s failed: %s\n", rd_kafka_event_name(rkev),
- rd_kafka_event_error_string(rkev));
- rd_kafka_event_destroy(rkev);
- return err;
- }
-
- if (evtype == RD_KAFKA_EVENT_CREATETOPICS_RESULT) {
- const rd_kafka_CreateTopics_result_t *res;
- if (!(res = rd_kafka_event_CreateTopics_result(rkev)))
- TEST_FAIL("Expected a CreateTopics result, not %s",
- rd_kafka_event_name(rkev));
-
- terr = rd_kafka_CreateTopics_result_topics(res, &terr_cnt);
-
- } else if (evtype == RD_KAFKA_EVENT_DELETETOPICS_RESULT) {
- const rd_kafka_DeleteTopics_result_t *res;
- if (!(res = rd_kafka_event_DeleteTopics_result(rkev)))
- TEST_FAIL("Expected a DeleteTopics result, not %s",
- rd_kafka_event_name(rkev));
-
- terr = rd_kafka_DeleteTopics_result_topics(res, &terr_cnt);
-
- } else if (evtype == RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT) {
- const rd_kafka_CreatePartitions_result_t *res;
- if (!(res = rd_kafka_event_CreatePartitions_result(rkev)))
- TEST_FAIL("Expected a CreatePartitions result, not %s",
- rd_kafka_event_name(rkev));
-
- terr = rd_kafka_CreatePartitions_result_topics(res, &terr_cnt);
-
- } else if (evtype == RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT) {
- const rd_kafka_DescribeConfigs_result_t *res;
-
- if (!(res = rd_kafka_event_DescribeConfigs_result(rkev)))
- TEST_FAIL("Expected a DescribeConfigs result, not %s",
- rd_kafka_event_name(rkev));
-
- cres =
- rd_kafka_DescribeConfigs_result_resources(res, &cres_cnt);
-
- } else if (evtype == RD_KAFKA_EVENT_ALTERCONFIGS_RESULT) {
- const rd_kafka_AlterConfigs_result_t *res;
-
- if (!(res = rd_kafka_event_AlterConfigs_result(rkev)))
- TEST_FAIL("Expected a AlterConfigs result, not %s",
- rd_kafka_event_name(rkev));
-
- cres = rd_kafka_AlterConfigs_result_resources(res, &cres_cnt);
-
- } else if (evtype == RD_KAFKA_EVENT_CREATEACLS_RESULT) {
- const rd_kafka_CreateAcls_result_t *res;
-
- if (!(res = rd_kafka_event_CreateAcls_result(rkev)))
- TEST_FAIL("Expected a CreateAcls result, not %s",
- rd_kafka_event_name(rkev));
-
- aclres = rd_kafka_CreateAcls_result_acls(res, &aclres_cnt);
- } else if (evtype == RD_KAFKA_EVENT_LISTCONSUMERGROUPS_RESULT) {
- const rd_kafka_ListConsumerGroups_result_t *res;
- if (!(res = rd_kafka_event_ListConsumerGroups_result(rkev)))
- TEST_FAIL(
- "Expected a ListConsumerGroups result, not %s",
- rd_kafka_event_name(rkev));
-
- glists_errors = rd_kafka_ListConsumerGroups_result_errors(
- res, &glists_error_cnt);
- } else if (evtype == RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT) {
- const rd_kafka_DescribeConsumerGroups_result_t *res;
- if (!(res = rd_kafka_event_DescribeConsumerGroups_result(rkev)))
- TEST_FAIL(
- "Expected a DescribeConsumerGroups result, not %s",
- rd_kafka_event_name(rkev));
-
- gdescs = rd_kafka_DescribeConsumerGroups_result_groups(
- res, &gdescs_cnt);
- } else if (evtype == RD_KAFKA_EVENT_DELETEGROUPS_RESULT) {
- const rd_kafka_DeleteGroups_result_t *res;
- if (!(res = rd_kafka_event_DeleteGroups_result(rkev)))
- TEST_FAIL("Expected a DeleteGroups result, not %s",
- rd_kafka_event_name(rkev));
-
- gres = rd_kafka_DeleteGroups_result_groups(res, &gres_cnt);
-
- } else if (evtype == RD_KAFKA_EVENT_DELETERECORDS_RESULT) {
- const rd_kafka_DeleteRecords_result_t *res;
- if (!(res = rd_kafka_event_DeleteRecords_result(rkev)))
- TEST_FAIL("Expected a DeleteRecords result, not %s",
- rd_kafka_event_name(rkev));
-
- offsets = rd_kafka_DeleteRecords_result_offsets(res);
-
- } else if (evtype == RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT) {
- const rd_kafka_DeleteConsumerGroupOffsets_result_t *res;
- if (!(res = rd_kafka_event_DeleteConsumerGroupOffsets_result(
- rkev)))
- TEST_FAIL(
- "Expected a DeleteConsumerGroupOffsets "
- "result, not %s",
- rd_kafka_event_name(rkev));
-
- gres = rd_kafka_DeleteConsumerGroupOffsets_result_groups(
- rkev, &gres_cnt);
-
- } else {
- TEST_FAIL("Bad evtype: %d", evtype);
- RD_NOTREACHED();
- }
-
- /* Check topic errors */
- for (i = 0; i < terr_cnt; i++) {
- if (rd_kafka_topic_result_error(terr[i])) {
- TEST_WARN("..Topics result: %s: error: %s\n",
- rd_kafka_topic_result_name(terr[i]),
- rd_kafka_topic_result_error_string(terr[i]));
- if (!(errcnt++))
- err = rd_kafka_topic_result_error(terr[i]);
- }
- }
-
- /* Check resource errors */
- for (i = 0; i < cres_cnt; i++) {
- if (rd_kafka_ConfigResource_error(cres[i])) {
- TEST_WARN(
- "ConfigResource result: %d,%s: error: %s\n",
- rd_kafka_ConfigResource_type(cres[i]),
- rd_kafka_ConfigResource_name(cres[i]),
- rd_kafka_ConfigResource_error_string(cres[i]));
- if (!(errcnt++))
- err = rd_kafka_ConfigResource_error(cres[i]);
- }
- }
-
- /* Check ACL errors */
- for (i = 0; i < aclres_cnt; i++) {
- const rd_kafka_error_t *error =
- rd_kafka_acl_result_error(aclres[i]);
- if (error) {
- TEST_WARN("AclResult error: %s: %s\n",
- rd_kafka_error_name(error),
- rd_kafka_error_string(error));
- if (!(errcnt++))
- err = rd_kafka_error_code(error);
- }
- }
-
- /* Check list groups errors */
- for (i = 0; i < glists_error_cnt; i++) {
- const rd_kafka_error_t *error = glists_errors[i];
- TEST_WARN("%s error: %s\n", rd_kafka_event_name(rkev),
- rd_kafka_error_string(error));
- if (!(errcnt++))
- err = rd_kafka_error_code(error);
- }
-
- /* Check describe groups errors */
- for (i = 0; i < gdescs_cnt; i++) {
- const rd_kafka_error_t *error;
- if ((error =
- rd_kafka_ConsumerGroupDescription_error(gdescs[i]))) {
- TEST_WARN("%s result: %s: error: %s\n",
- rd_kafka_event_name(rkev),
- rd_kafka_ConsumerGroupDescription_group_id(
- gdescs[i]),
- rd_kafka_error_string(error));
- if (!(errcnt++))
- err = rd_kafka_error_code(error);
- }
- }
-
- /* Check group errors */
- for (i = 0; i < gres_cnt; i++) {
- const rd_kafka_topic_partition_list_t *parts;
-
- if (rd_kafka_group_result_error(gres[i])) {
-
- TEST_WARN("%s result: %s: error: %s\n",
- rd_kafka_event_name(rkev),
- rd_kafka_group_result_name(gres[i]),
- rd_kafka_error_string(
- rd_kafka_group_result_error(gres[i])));
- if (!(errcnt++))
- err = rd_kafka_error_code(
- rd_kafka_group_result_error(gres[i]));
- }
-
- parts = rd_kafka_group_result_partitions(gres[i]);
- if (parts) {
- int j;
- for (j = 0; j < parts->cnt; i++) {
- if (!parts->elems[j].err)
- continue;
-
- TEST_WARN(
- "%s result: %s: "
- "%s [%" PRId32 "] error: %s\n",
- rd_kafka_event_name(rkev),
- rd_kafka_group_result_name(gres[i]),
- parts->elems[j].topic,
- parts->elems[j].partition,
- rd_kafka_err2str(parts->elems[j].err));
- errcnt++;
- }
- }
- }
-
- /* Check offset errors */
- for (i = 0; (offsets && i < (size_t)offsets->cnt); i++) {
- if (offsets->elems[i].err) {
- TEST_WARN("DeleteRecords result: %s [%d]: error: %s\n",
- offsets->elems[i].topic,
- offsets->elems[i].partition,
- rd_kafka_err2str(offsets->elems[i].err));
- if (!(errcnt++))
- err = offsets->elems[i].err;
- }
- }
-
- if (!err && retevent)
- *retevent = rkev;
- else
- rd_kafka_event_destroy(rkev);
-
- return err;
-}
-
-
-
-/**
- * @brief Topic Admin API helpers
- *
- * @param useq Makes the call async and posts the response in this queue.
- * If NULL this call will be synchronous and return the error
- * result.
- *
- * @remark Fails the current test on failure.
- */
-
-rd_kafka_resp_err_t test_CreateTopics_simple(rd_kafka_t *rk,
- rd_kafka_queue_t *useq,
- char **topics,
- size_t topic_cnt,
- int num_partitions,
- void *opaque) {
- rd_kafka_NewTopic_t **new_topics;
- rd_kafka_AdminOptions_t *options;
- rd_kafka_queue_t *q;
- size_t i;
- const int tmout = 30 * 1000;
- rd_kafka_resp_err_t err;
-
- new_topics = malloc(sizeof(*new_topics) * topic_cnt);
-
- for (i = 0; i < topic_cnt; i++) {
- char errstr[512];
- new_topics[i] = rd_kafka_NewTopic_new(
- topics[i], num_partitions, 1, errstr, sizeof(errstr));
- TEST_ASSERT(new_topics[i],
- "Failed to NewTopic(\"%s\", %d) #%" PRIusz ": %s",
- topics[i], num_partitions, i, errstr);
- }
-
- options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_CREATETOPICS);
- rd_kafka_AdminOptions_set_opaque(options, opaque);
-
- if (!useq) {
- char errstr[512];
-
- err = rd_kafka_AdminOptions_set_request_timeout(
- options, tmout, errstr, sizeof(errstr));
- TEST_ASSERT(!err, "set_request_timeout: %s", errstr);
- err = rd_kafka_AdminOptions_set_operation_timeout(
- options, tmout - 5000, errstr, sizeof(errstr));
- TEST_ASSERT(!err, "set_operation_timeout: %s", errstr);
-
- q = rd_kafka_queue_new(rk);
- } else {
- q = useq;
- }
-
- TEST_SAY("Creating %" PRIusz " topics\n", topic_cnt);
-
- rd_kafka_CreateTopics(rk, new_topics, topic_cnt, options, q);
-
- rd_kafka_AdminOptions_destroy(options);
-
- rd_kafka_NewTopic_destroy_array(new_topics, topic_cnt);
- free(new_topics);
-
- if (useq)
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-
-
- err = test_wait_topic_admin_result(
- q, RD_KAFKA_EVENT_CREATETOPICS_RESULT, NULL, tmout + 5000);
-
- rd_kafka_queue_destroy(q);
-
- if (err)
- TEST_FAIL("Failed to create %d topic(s): %s", (int)topic_cnt,
- rd_kafka_err2str(err));
-
- return err;
-}
-
-
-rd_kafka_resp_err_t test_CreatePartitions_simple(rd_kafka_t *rk,
- rd_kafka_queue_t *useq,
- const char *topic,
- size_t total_part_cnt,
- void *opaque) {
- rd_kafka_NewPartitions_t *newp[1];
- rd_kafka_AdminOptions_t *options;
- rd_kafka_queue_t *q;
- const int tmout = 30 * 1000;
- rd_kafka_resp_err_t err;
- char errstr[512];
-
- newp[0] = rd_kafka_NewPartitions_new(topic, total_part_cnt, errstr,
- sizeof(errstr));
- TEST_ASSERT(newp[0], "Failed to NewPartitions(\"%s\", %" PRIusz "): %s",
- topic, total_part_cnt, errstr);
-
- options =
- rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_CREATEPARTITIONS);
- rd_kafka_AdminOptions_set_opaque(options, opaque);
-
- if (!useq) {
- err = rd_kafka_AdminOptions_set_request_timeout(
- options, tmout, errstr, sizeof(errstr));
- TEST_ASSERT(!err, "set_request_timeout: %s", errstr);
- err = rd_kafka_AdminOptions_set_operation_timeout(
- options, tmout - 5000, errstr, sizeof(errstr));
- TEST_ASSERT(!err, "set_operation_timeout: %s", errstr);
-
- q = rd_kafka_queue_new(rk);
- } else {
- q = useq;
- }
-
- TEST_SAY("Creating (up to) %" PRIusz " partitions for topic \"%s\"\n",
- total_part_cnt, topic);
-
- rd_kafka_CreatePartitions(rk, newp, 1, options, q);
-
- rd_kafka_AdminOptions_destroy(options);
-
- rd_kafka_NewPartitions_destroy(newp[0]);
-
- if (useq)
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-
-
- err = test_wait_topic_admin_result(
- q, RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT, NULL, tmout + 5000);
-
- rd_kafka_queue_destroy(q);
-
- if (err)
- TEST_FAIL("Failed to create partitions: %s",
- rd_kafka_err2str(err));
-
- return err;
-}
-
-
-rd_kafka_resp_err_t test_DeleteTopics_simple(rd_kafka_t *rk,
- rd_kafka_queue_t *useq,
- char **topics,
- size_t topic_cnt,
- void *opaque) {
- rd_kafka_queue_t *q;
- rd_kafka_DeleteTopic_t **del_topics;
- rd_kafka_AdminOptions_t *options;
- size_t i;
- rd_kafka_resp_err_t err;
- const int tmout = 30 * 1000;
-
- del_topics = malloc(sizeof(*del_topics) * topic_cnt);
-
- for (i = 0; i < topic_cnt; i++) {
- del_topics[i] = rd_kafka_DeleteTopic_new(topics[i]);
- TEST_ASSERT(del_topics[i]);
- }
-
- options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DELETETOPICS);
- rd_kafka_AdminOptions_set_opaque(options, opaque);
-
- if (!useq) {
- char errstr[512];
-
- err = rd_kafka_AdminOptions_set_request_timeout(
- options, tmout, errstr, sizeof(errstr));
- TEST_ASSERT(!err, "set_request_timeout: %s", errstr);
- err = rd_kafka_AdminOptions_set_operation_timeout(
- options, tmout - 5000, errstr, sizeof(errstr));
- TEST_ASSERT(!err, "set_operation_timeout: %s", errstr);
-
- q = rd_kafka_queue_new(rk);
- } else {
- q = useq;
- }
-
- TEST_SAY("Deleting %" PRIusz " topics\n", topic_cnt);
-
- rd_kafka_DeleteTopics(rk, del_topics, topic_cnt, options, useq);
-
- rd_kafka_AdminOptions_destroy(options);
-
- rd_kafka_DeleteTopic_destroy_array(del_topics, topic_cnt);
-
- free(del_topics);
-
- if (useq)
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-
- err = test_wait_topic_admin_result(
- q, RD_KAFKA_EVENT_DELETETOPICS_RESULT, NULL, tmout + 5000);
-
- rd_kafka_queue_destroy(q);
-
- if (err)
- TEST_FAIL("Failed to delete topics: %s", rd_kafka_err2str(err));
-
- return err;
-}
-
-rd_kafka_resp_err_t test_DeleteGroups_simple(rd_kafka_t *rk,
- rd_kafka_queue_t *useq,
- char **groups,
- size_t group_cnt,
- void *opaque) {
- rd_kafka_queue_t *q;
- rd_kafka_DeleteGroup_t **del_groups;
- rd_kafka_AdminOptions_t *options;
- size_t i;
- rd_kafka_resp_err_t err;
- const int tmout = 30 * 1000;
-
- del_groups = malloc(sizeof(*del_groups) * group_cnt);
-
- for (i = 0; i < group_cnt; i++) {
- del_groups[i] = rd_kafka_DeleteGroup_new(groups[i]);
- TEST_ASSERT(del_groups[i]);
- }
-
- options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DELETEGROUPS);
- rd_kafka_AdminOptions_set_opaque(options, opaque);
-
- if (!useq) {
- char errstr[512];
-
- err = rd_kafka_AdminOptions_set_request_timeout(
- options, tmout, errstr, sizeof(errstr));
- TEST_ASSERT(!err, "set_request_timeout: %s", errstr);
-
- q = rd_kafka_queue_new(rk);
- } else {
- q = useq;
- }
-
- TEST_SAY("Deleting %" PRIusz " groups\n", group_cnt);
-
- rd_kafka_DeleteGroups(rk, del_groups, group_cnt, options, q);
-
- rd_kafka_AdminOptions_destroy(options);
-
- rd_kafka_DeleteGroup_destroy_array(del_groups, group_cnt);
- free(del_groups);
-
- if (useq)
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-
- err = test_wait_topic_admin_result(
- q, RD_KAFKA_EVENT_DELETEGROUPS_RESULT, NULL, tmout + 5000);
-
- rd_kafka_queue_destroy(q);
-
- if (err)
- TEST_FAIL("Failed to delete groups: %s", rd_kafka_err2str(err));
-
- return err;
-}
-
-rd_kafka_resp_err_t
-test_DeleteRecords_simple(rd_kafka_t *rk,
- rd_kafka_queue_t *useq,
- const rd_kafka_topic_partition_list_t *offsets,
- void *opaque) {
- rd_kafka_queue_t *q;
- rd_kafka_AdminOptions_t *options;
- rd_kafka_resp_err_t err;
- rd_kafka_DeleteRecords_t *del_records =
- rd_kafka_DeleteRecords_new(offsets);
- const int tmout = 30 * 1000;
-
- options =
- rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DELETERECORDS);
- rd_kafka_AdminOptions_set_opaque(options, opaque);
-
- if (!useq) {
- char errstr[512];
-
- err = rd_kafka_AdminOptions_set_request_timeout(
- options, tmout, errstr, sizeof(errstr));
- TEST_ASSERT(!err, "set_request_timeout: %s", errstr);
- err = rd_kafka_AdminOptions_set_operation_timeout(
- options, tmout - 5000, errstr, sizeof(errstr));
- TEST_ASSERT(!err, "set_operation_timeout: %s", errstr);
-
- q = rd_kafka_queue_new(rk);
- } else {
- q = useq;
- }
-
- TEST_SAY("Deleting offsets from %d partitions\n", offsets->cnt);
-
- rd_kafka_DeleteRecords(rk, &del_records, 1, options, q);
-
- rd_kafka_DeleteRecords_destroy(del_records);
-
- rd_kafka_AdminOptions_destroy(options);
-
- if (useq)
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-
- err = test_wait_topic_admin_result(
- q, RD_KAFKA_EVENT_DELETERECORDS_RESULT, NULL, tmout + 5000);
-
- rd_kafka_queue_destroy(q);
-
- if (err)
- TEST_FAIL("Failed to delete records: %s",
- rd_kafka_err2str(err));
-
- return err;
-}
-
-rd_kafka_resp_err_t test_DeleteConsumerGroupOffsets_simple(
- rd_kafka_t *rk,
- rd_kafka_queue_t *useq,
- const char *group_id,
- const rd_kafka_topic_partition_list_t *offsets,
- void *opaque) {
- rd_kafka_queue_t *q;
- rd_kafka_AdminOptions_t *options;
- rd_kafka_resp_err_t err;
- const int tmout = 30 * 1000;
- rd_kafka_DeleteConsumerGroupOffsets_t *cgoffsets;
-
- options = rd_kafka_AdminOptions_new(
- rk, RD_KAFKA_ADMIN_OP_DELETECONSUMERGROUPOFFSETS);
- rd_kafka_AdminOptions_set_opaque(options, opaque);
-
- if (!useq) {
- char errstr[512];
-
- err = rd_kafka_AdminOptions_set_request_timeout(
- options, tmout, errstr, sizeof(errstr));
- TEST_ASSERT(!err, "set_request_timeout: %s", errstr);
- err = rd_kafka_AdminOptions_set_operation_timeout(
- options, tmout - 5000, errstr, sizeof(errstr));
- TEST_ASSERT(!err, "set_operation_timeout: %s", errstr);
-
- q = rd_kafka_queue_new(rk);
- } else {
- q = useq;
- }
-
- if (offsets) {
- TEST_SAY(
- "Deleting committed offsets for group %s and "
- "%d partitions\n",
- group_id, offsets->cnt);
-
- cgoffsets =
- rd_kafka_DeleteConsumerGroupOffsets_new(group_id, offsets);
- } else {
- TEST_SAY("Provoking invalid DeleteConsumerGroupOffsets call\n");
- cgoffsets = NULL;
- }
-
- rd_kafka_DeleteConsumerGroupOffsets(rk, &cgoffsets, cgoffsets ? 1 : 0,
- options, useq);
-
- if (cgoffsets)
- rd_kafka_DeleteConsumerGroupOffsets_destroy(cgoffsets);
-
- rd_kafka_AdminOptions_destroy(options);
-
- if (useq)
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-
- err = test_wait_topic_admin_result(
- q, RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT, NULL,
- tmout + 5000);
-
- rd_kafka_queue_destroy(q);
-
- if (err)
- TEST_FAIL("Failed to delete committed offsets: %s",
- rd_kafka_err2str(err));
-
- return err;
-}
-
-/**
- * @brief Delta Alter configuration for the given resource,
- * overwriting/setting the configs provided in \p configs.
- * Existing configuration remains intact.
- *
- * @param configs 'const char *name, const char *value' tuples
- * @param config_cnt is the number of tuples in \p configs
- */
-rd_kafka_resp_err_t test_AlterConfigs_simple(rd_kafka_t *rk,
- rd_kafka_ResourceType_t restype,
- const char *resname,
- const char **configs,
- size_t config_cnt) {
- rd_kafka_queue_t *q;
- rd_kafka_ConfigResource_t *confres;
- rd_kafka_event_t *rkev;
- size_t i;
- rd_kafka_resp_err_t err;
- const rd_kafka_ConfigResource_t **results;
- size_t result_cnt;
- const rd_kafka_ConfigEntry_t **configents;
- size_t configent_cnt;
-
-
- q = rd_kafka_queue_new(rk);
-
- TEST_SAY("Getting configuration for %d %s\n", restype, resname);
-
- confres = rd_kafka_ConfigResource_new(restype, resname);
- rd_kafka_DescribeConfigs(rk, &confres, 1, NULL, q);
-
- err = test_wait_topic_admin_result(
- q, RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT, &rkev, 15 * 1000);
- if (err) {
- rd_kafka_queue_destroy(q);
- rd_kafka_ConfigResource_destroy(confres);
- return err;
- }
-
- results = rd_kafka_DescribeConfigs_result_resources(
- rd_kafka_event_DescribeConfigs_result(rkev), &result_cnt);
- TEST_ASSERT(result_cnt == 1,
- "expected 1 DescribeConfigs result, not %" PRIusz,
- result_cnt);
-
- configents =
- rd_kafka_ConfigResource_configs(results[0], &configent_cnt);
- TEST_ASSERT(configent_cnt > 0,
- "expected > 0 ConfigEntry:s, not %" PRIusz, configent_cnt);
-
- TEST_SAY("Altering configuration for %d %s\n", restype, resname);
-
- /* Apply all existing configuration entries to resource object that
- * will later be passed to AlterConfigs. */
- for (i = 0; i < configent_cnt; i++) {
- const char *entry_name =
- rd_kafka_ConfigEntry_name(configents[i]);
-
- if (test_broker_version >= TEST_BRKVER(3, 2, 0, 0)) {
- /* Skip entries that are overwritten to
- * avoid duplicates, that cause an error since
- * this broker version. */
- size_t j;
- for (j = 0; j < config_cnt; j += 2) {
- if (!strcmp(configs[j], entry_name)) {
- break;
- }
- }
-
- if (j < config_cnt)
- continue;
- }
-
- err = rd_kafka_ConfigResource_set_config(
- confres, entry_name,
- rd_kafka_ConfigEntry_value(configents[i]));
- TEST_ASSERT(!err,
- "Failed to set read-back config %s=%s "
- "on local resource object",
- entry_name,
- rd_kafka_ConfigEntry_value(configents[i]));
- }
-
- rd_kafka_event_destroy(rkev);
-
- /* Then apply the configuration to change. */
- for (i = 0; i < config_cnt; i += 2) {
- err = rd_kafka_ConfigResource_set_config(confres, configs[i],
- configs[i + 1]);
- TEST_ASSERT(!err,
- "Failed to set config %s=%s on "
- "local resource object",
- configs[i], configs[i + 1]);
- }
-
- rd_kafka_AlterConfigs(rk, &confres, 1, NULL, q);
-
- rd_kafka_ConfigResource_destroy(confres);
-
- err = test_wait_topic_admin_result(
- q, RD_KAFKA_EVENT_ALTERCONFIGS_RESULT, NULL, 15 * 1000);
-
- rd_kafka_queue_destroy(q);
-
- return err;
-}
-
-/**
- * @brief Topic Admin API helpers
- *
- * @param useq Makes the call async and posts the response in this queue.
- * If NULL this call will be synchronous and return the error
- * result.
- *
- * @remark Fails the current test on failure.
- */
-
-rd_kafka_resp_err_t test_CreateAcls_simple(rd_kafka_t *rk,
- rd_kafka_queue_t *useq,
- rd_kafka_AclBinding_t **acls,
- size_t acl_cnt,
- void *opaque) {
- rd_kafka_AdminOptions_t *options;
- rd_kafka_queue_t *q;
- rd_kafka_resp_err_t err;
- const int tmout = 30 * 1000;
-
- options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_CREATEACLS);
- rd_kafka_AdminOptions_set_opaque(options, opaque);
-
- if (!useq) {
- q = rd_kafka_queue_new(rk);
- } else {
- q = useq;
- }
-
- TEST_SAY("Creating %" PRIusz " acls\n", acl_cnt);
-
- rd_kafka_CreateAcls(rk, acls, acl_cnt, options, q);
-
- rd_kafka_AdminOptions_destroy(options);
-
- if (useq)
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-
- err = test_wait_topic_admin_result(q, RD_KAFKA_EVENT_CREATEACLS_RESULT,
- NULL, tmout + 5000);
-
- rd_kafka_queue_destroy(q);
-
- if (err)
- TEST_FAIL("Failed to create %d acl(s): %s", (int)acl_cnt,
- rd_kafka_err2str(err));
-
- return err;
-}
-
-static void test_free_string_array(char **strs, size_t cnt) {
- size_t i;
- for (i = 0; i < cnt; i++)
- free(strs[i]);
- free(strs);
-}
-
-
-/**
- * @return an array of all topics in the cluster matching our the
- * rdkafka test prefix.
- */
-static rd_kafka_resp_err_t
-test_get_all_test_topics(rd_kafka_t *rk, char ***topicsp, size_t *topic_cntp) {
- size_t test_topic_prefix_len = strlen(test_topic_prefix);
- const rd_kafka_metadata_t *md;
- char **topics = NULL;
- size_t topic_cnt = 0;
- int i;
- rd_kafka_resp_err_t err;
-
- *topic_cntp = 0;
- if (topicsp)
- *topicsp = NULL;
-
- /* Retrieve list of topics */
- err = rd_kafka_metadata(rk, 1 /*all topics*/, NULL, &md,
- tmout_multip(10000));
- if (err) {
- TEST_WARN(
- "%s: Failed to acquire metadata: %s: "
- "not deleting any topics\n",
- __FUNCTION__, rd_kafka_err2str(err));
- return err;
- }
-
- if (md->topic_cnt == 0) {
- TEST_WARN("%s: No topics in cluster\n", __FUNCTION__);
- rd_kafka_metadata_destroy(md);
- return RD_KAFKA_RESP_ERR_NO_ERROR;
- }
-
- if (topicsp)
- topics = malloc(sizeof(*topics) * md->topic_cnt);
-
- for (i = 0; i < md->topic_cnt; i++) {
- if (strlen(md->topics[i].topic) >= test_topic_prefix_len &&
- !strncmp(md->topics[i].topic, test_topic_prefix,
- test_topic_prefix_len)) {
- if (topicsp)
- topics[topic_cnt++] =
- rd_strdup(md->topics[i].topic);
- else
- topic_cnt++;
- }
- }
-
- if (topic_cnt == 0) {
- TEST_SAY(
- "%s: No topics (out of %d) matching our "
- "test prefix (%s)\n",
- __FUNCTION__, md->topic_cnt, test_topic_prefix);
- rd_kafka_metadata_destroy(md);
- if (topics)
- test_free_string_array(topics, topic_cnt);
- return RD_KAFKA_RESP_ERR_NO_ERROR;
- }
-
- rd_kafka_metadata_destroy(md);
-
- if (topicsp)
- *topicsp = topics;
- *topic_cntp = topic_cnt;
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-/**
- * @brief Delete all test topics using the Kafka Admin API.
- */
-rd_kafka_resp_err_t test_delete_all_test_topics(int timeout_ms) {
- rd_kafka_t *rk;
- char **topics;
- size_t topic_cnt = 0;
- rd_kafka_resp_err_t err;
- int i;
- rd_kafka_AdminOptions_t *options;
- rd_kafka_queue_t *q;
- char errstr[256];
- int64_t abs_timeout = test_clock() + ((int64_t)timeout_ms * 1000);
-
- rk = test_create_producer();
-
- err = test_get_all_test_topics(rk, &topics, &topic_cnt);
- if (err) {
- /* Error already reported by test_get_all_test_topics() */
- rd_kafka_destroy(rk);
- return err;
- }
-
- if (topic_cnt == 0) {
- rd_kafka_destroy(rk);
- return RD_KAFKA_RESP_ERR_NO_ERROR;
- }
-
- q = rd_kafka_queue_get_main(rk);
-
- options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DELETETOPICS);
- if (rd_kafka_AdminOptions_set_operation_timeout(options, 2 * 60 * 1000,
- errstr, sizeof(errstr)))
- TEST_SAY(_C_YEL
- "Failed to set DeleteTopics timeout: %s: "
- "ignoring\n",
- errstr);
-
- TEST_SAY(_C_MAG
- "====> Deleting all test topics with <===="
- "a timeout of 2 minutes\n");
-
- test_DeleteTopics_simple(rk, q, topics, topic_cnt, options);
-
- rd_kafka_AdminOptions_destroy(options);
-
- while (1) {
- rd_kafka_event_t *rkev;
- const rd_kafka_DeleteTopics_result_t *res;
-
- rkev = rd_kafka_queue_poll(q, -1);
-
- res = rd_kafka_event_DeleteTopics_result(rkev);
- if (!res) {
- TEST_SAY("%s: Ignoring event: %s: %s\n", __FUNCTION__,
- rd_kafka_event_name(rkev),
- rd_kafka_event_error_string(rkev));
- rd_kafka_event_destroy(rkev);
- continue;
- }
-
- if (rd_kafka_event_error(rkev)) {
- TEST_WARN("%s: DeleteTopics for %" PRIusz
- " topics "
- "failed: %s\n",
- __FUNCTION__, topic_cnt,
- rd_kafka_event_error_string(rkev));
- err = rd_kafka_event_error(rkev);
- } else {
- const rd_kafka_topic_result_t **terr;
- size_t tcnt;
- int okcnt = 0;
-
- terr = rd_kafka_DeleteTopics_result_topics(res, &tcnt);
-
- for (i = 0; i < (int)tcnt; i++) {
- if (!rd_kafka_topic_result_error(terr[i])) {
- okcnt++;
- continue;
- }
-
- TEST_WARN("%s: Failed to delete topic %s: %s\n",
- __FUNCTION__,
- rd_kafka_topic_result_name(terr[i]),
- rd_kafka_topic_result_error_string(
- terr[i]));
- }
-
- TEST_SAY(
- "%s: DeleteTopics "
- "succeeded for %d/%" PRIusz " topics\n",
- __FUNCTION__, okcnt, topic_cnt);
- err = RD_KAFKA_RESP_ERR_NO_ERROR;
- }
-
- rd_kafka_event_destroy(rkev);
- break;
- }
-
- rd_kafka_queue_destroy(q);
-
- test_free_string_array(topics, topic_cnt);
-
- /* Wait for topics to be fully deleted */
- while (1) {
- err = test_get_all_test_topics(rk, NULL, &topic_cnt);
-
- if (!err && topic_cnt == 0)
- break;
-
- if (abs_timeout < test_clock()) {
- TEST_WARN(
- "%s: Timed out waiting for "
- "remaining %" PRIusz
- " deleted topics "
- "to disappear from cluster metadata\n",
- __FUNCTION__, topic_cnt);
- break;
- }
-
- TEST_SAY("Waiting for remaining %" PRIusz
- " delete topics "
- "to disappear from cluster metadata\n",
- topic_cnt);
-
- rd_sleep(1);
- }
-
- rd_kafka_destroy(rk);
-
- return err;
-}
-
-
-
-void test_fail0(const char *file,
- int line,
- const char *function,
- int do_lock,
- int fail_now,
- const char *fmt,
- ...) {
- char buf[512];
- int is_thrd = 0;
- size_t of;
- va_list ap;
- char *t;
- char timestr[32];
- time_t tnow = time(NULL);
-
-#ifdef __MINGW32__
- strftime(timestr, sizeof(timestr), "%a %b %d %H:%M:%S %Y",
- localtime(&tnow));
-#elif defined(_WIN32)
- ctime_s(timestr, sizeof(timestr), &tnow);
-#else
- ctime_r(&tnow, timestr);
-#endif
- t = strchr(timestr, '\n');
- if (t)
- *t = '\0';
-
- of = rd_snprintf(buf, sizeof(buf), "%s%s%s():%i: ", test_curr->subtest,
- *test_curr->subtest ? ": " : "", function, line);
- rd_assert(of < sizeof(buf));
-
- va_start(ap, fmt);
- rd_vsnprintf(buf + of, sizeof(buf) - of, fmt, ap);
- va_end(ap);
-
- /* Remove trailing newline */
- if ((t = strchr(buf, '\n')) && !*(t + 1))
- *t = '\0';
-
- TEST_SAYL(0, "TEST FAILURE\n");
- fprintf(stderr,
- "\033[31m### Test \"%s%s%s%s\" failed at %s:%i:%s() at %s: "
- "###\n"
- "%s\n",
- test_curr->name, *test_curr->subtest ? " (" : "",
- test_curr->subtest, *test_curr->subtest ? ")" : "", file, line,
- function, timestr, buf + of);
- if (do_lock)
- TEST_LOCK();
- test_curr->state = TEST_FAILED;
- test_curr->failcnt += 1;
- test_curr->is_fatal_cb = NULL;
-
- if (!*test_curr->failstr) {
- strncpy(test_curr->failstr, buf, sizeof(test_curr->failstr));
- test_curr->failstr[sizeof(test_curr->failstr) - 1] = '\0';
- }
- if (fail_now && test_curr->mainfunc) {
- tests_running_cnt--;
- is_thrd = 1;
- }
- if (do_lock)
- TEST_UNLOCK();
- if (!fail_now)
- return;
- if (test_assert_on_fail || !is_thrd)
- assert(0);
- else
- thrd_exit(0);
-}
-
-
-/**
- * @brief Destroy a mock cluster and its underlying rd_kafka_t handle
- */
-void test_mock_cluster_destroy(rd_kafka_mock_cluster_t *mcluster) {
- rd_kafka_t *rk = rd_kafka_mock_cluster_handle(mcluster);
- rd_kafka_mock_cluster_destroy(mcluster);
- rd_kafka_destroy(rk);
-}
-
-
-
-/**
- * @brief Create a standalone mock cluster that can be used by multiple
- * rd_kafka_t instances.
- */
-rd_kafka_mock_cluster_t *test_mock_cluster_new(int broker_cnt,
- const char **bootstraps) {
- rd_kafka_t *rk;
- rd_kafka_conf_t *conf = rd_kafka_conf_new();
- rd_kafka_mock_cluster_t *mcluster;
- char errstr[256];
-
- test_conf_common_init(conf, 0);
-
- test_conf_set(conf, "client.id", "MOCK");
-
- rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
- TEST_ASSERT(rk, "Failed to create mock cluster rd_kafka_t: %s", errstr);
-
- mcluster = rd_kafka_mock_cluster_new(rk, broker_cnt);
- TEST_ASSERT(mcluster, "Failed to acquire mock cluster");
-
- if (bootstraps)
- *bootstraps = rd_kafka_mock_cluster_bootstraps(mcluster);
-
- return mcluster;
-}
-
-
-
-/**
- * @name Sub-tests
- */
-
-
-/**
- * @brief Start a sub-test. \p fmt is optional and allows additional
- * sub-test info to be displayed, e.g., test parameters.
- *
- * @returns 0 if sub-test should not be run, else 1.
- */
-int test_sub_start(const char *func,
- int line,
- int is_quick,
- const char *fmt,
- ...) {
-
- if (!is_quick && test_quick)
- return 0;
-
- if (fmt && *fmt) {
- va_list ap;
- char buf[256];
-
- va_start(ap, fmt);
- rd_vsnprintf(buf, sizeof(buf), fmt, ap);
- va_end(ap);
-
- rd_snprintf(test_curr->subtest, sizeof(test_curr->subtest),
- "%s:%d: %s", func, line, buf);
- } else {
- rd_snprintf(test_curr->subtest, sizeof(test_curr->subtest),
- "%s:%d", func, line);
- }
-
- if (subtests_to_run && !strstr(test_curr->subtest, subtests_to_run)) {
- *test_curr->subtest = '\0';
- return 0;
- }
-
- test_curr->subtest_quick = is_quick;
-
- TIMING_START(&test_curr->subtest_duration, "SUBTEST");
-
- TEST_SAY(_C_MAG "[ %s ]\n", test_curr->subtest);
-
- return 1;
-}
-
-
-/**
- * @brief Reset the current subtest state.
- */
-static void test_sub_reset(void) {
- *test_curr->subtest = '\0';
- test_curr->is_fatal_cb = NULL;
- test_curr->ignore_dr_err = rd_false;
- test_curr->exp_dr_err = RD_KAFKA_RESP_ERR_NO_ERROR;
- /* Don't check msg status by default */
- test_curr->exp_dr_status = (rd_kafka_msg_status_t)-1;
- test_curr->dr_mv = NULL;
-}
-
-/**
- * @brief Sub-test has passed.
- */
-void test_sub_pass(void) {
-
- TEST_ASSERT(*test_curr->subtest);
-
- TEST_SAYL(1, _C_GRN "[ %s: PASS (%.02fs) ]\n", test_curr->subtest,
- (float)(TIMING_DURATION(&test_curr->subtest_duration) /
- 1000000.0f));
-
- if (test_curr->subtest_quick && test_quick && !test_on_ci &&
- TIMING_DURATION(&test_curr->subtest_duration) > 45 * 1000 * 1000)
- TEST_WARN(
- "Subtest %s marked as QUICK but took %.02fs to "
- "finish: either fix the test or "
- "remove the _QUICK identifier (limit is 45s)\n",
- test_curr->subtest,
- (float)(TIMING_DURATION(&test_curr->subtest_duration) /
- 1000000.0f));
-
- test_sub_reset();
-}
-
-
-/**
- * @brief Skip sub-test (must have been started with SUB_TEST*()).
- */
-void test_sub_skip(const char *fmt, ...) {
- va_list ap;
- char buf[256];
-
- TEST_ASSERT(*test_curr->subtest);
-
- va_start(ap, fmt);
- rd_vsnprintf(buf, sizeof(buf), fmt, ap);
- va_end(ap);
-
- TEST_SAYL(1, _C_YEL "[ %s: SKIP: %s ]\n", test_curr->subtest, buf);
-
- test_sub_reset();
-}