summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/lib/librdkafka-2.1.0/tests/test.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 11:19:16 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:07:37 +0000
commitb485aab7e71c1625cfc27e0f92c9509f42378458 (patch)
treeae9abe108601079d1679194de237c9a435ae5b55 /src/fluent-bit/lib/librdkafka-2.1.0/tests/test.h
parentAdding upstream version 1.44.3. (diff)
downloadnetdata-b485aab7e71c1625cfc27e0f92c9509f42378458.tar.xz
netdata-b485aab7e71c1625cfc27e0f92c9509f42378458.zip
Adding upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/test.h')
-rw-r--r--src/fluent-bit/lib/librdkafka-2.1.0/tests/test.h936
1 files changed, 936 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/test.h b/src/fluent-bit/lib/librdkafka-2.1.0/tests/test.h
new file mode 100644
index 000000000..a431f9a25
--- /dev/null
+++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/test.h
@@ -0,0 +1,936 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2015, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+#ifndef _TEST_H_
+#define _TEST_H_
+
+#include "../src/rd.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#ifndef _WIN32
+#include <unistd.h>
+#endif
+#include <errno.h>
+#include <assert.h>
+#include <time.h>
+#include <ctype.h>
+
+#if HAVE_GETRUSAGE
+#include <sys/time.h>
+#include <sys/resource.h>
+#endif
+
+#include "rdkafka.h"
+#include "rdkafka_mock.h"
+#include "tinycthread.h"
+#include "rdlist.h"
+
+#if WITH_SOCKEM
+#include "sockem.h"
+#endif
+
+#include "testshared.h"
+#ifdef _WIN32
+#define sscanf(...) sscanf_s(__VA_ARGS__)
+#endif
+
+/**
+ * Test output is controlled through "TEST_LEVEL=N" environemnt variable.
+ * N < 2: TEST_SAY() is quiet.
+ */
+
+extern int test_seed;
+extern char test_mode[64];
+extern RD_TLS struct test *test_curr;
+extern int test_assert_on_fail;
+extern int tests_running_cnt;
+extern int test_concurrent_max;
+extern int test_rusage;
+extern double test_rusage_cpu_calibration;
+extern double test_timeout_multiplier;
+extern int test_session_timeout_ms; /* Group session timeout */
+extern int test_flags;
+extern int test_neg_flags;
+extern int test_idempotent_producer;
+
+extern mtx_t test_mtx;
+
+#define TEST_LOCK() mtx_lock(&test_mtx)
+#define TEST_UNLOCK() mtx_unlock(&test_mtx)
+
+
+/* Forward decl */
+typedef struct test_msgver_s test_msgver_t;
+
+
+/** @struct Resource usage thresholds */
+struct rusage_thres {
+ double ucpu; /**< Max User CPU in percentage */
+ double scpu; /**< Max Sys CPU in percentage */
+ double rss; /**< Max RSS (memory) increase in MB */
+ int ctxsw; /**< Max number of voluntary context switches, i.e.
+ * syscalls. */
+};
+
+typedef enum {
+ TEST_NOT_STARTED,
+ TEST_SKIPPED,
+ TEST_RUNNING,
+ TEST_PASSED,
+ TEST_FAILED,
+} test_state_t;
+
+struct test {
+ /**
+ * Setup
+ */
+ const char *name; /**< e.g. Same as filename minus extension */
+ int (*mainfunc)(int argc, char **argv); /**< test's main func */
+ const int flags; /**< Test flags */
+#define TEST_F_LOCAL 0x1 /**< Test is local, no broker requirement */
+#define TEST_F_KNOWN_ISSUE \
+ 0x2 /**< Known issue, can fail without affecting \
+ * total test run status. */
+#define TEST_F_MANUAL \
+ 0x4 /**< Manual test, only started when specifically \
+ * stated */
+#define TEST_F_SOCKEM 0x8 /**< Test requires socket emulation. */
+ int minver; /**< Limit tests to broker version range. */
+ int maxver;
+
+ const char *extra; /**< Extra information to print in test_summary. */
+
+ const char *scenario; /**< Test scenario */
+
+ char *
+ *report_arr; /**< Test-specific reporting, JSON array of objects. */
+ int report_cnt;
+ int report_size;
+
+ rd_bool_t ignore_dr_err; /**< Ignore delivery report errors */
+ rd_kafka_resp_err_t exp_dr_err; /* Expected error in test_dr_cb */
+ rd_kafka_msg_status_t exp_dr_status; /**< Expected delivery status,
+ * or -1 for not checking. */
+ int produce_sync; /**< test_produce_sync() call in action */
+ rd_kafka_resp_err_t produce_sync_err; /**< DR error */
+ test_msgver_t *dr_mv; /**< MsgVer that delivered messages will be
+ * added to (if not NULL).
+ * Must be set and freed by test. */
+
+ /**
+ * Runtime
+ */
+ thrd_t thrd;
+ int64_t start;
+ int64_t duration;
+ FILE *stats_fp;
+ int64_t timeout;
+ test_state_t state;
+ int failcnt; /**< Number of failures, useful with FAIL_LATER */
+ char failstr[512 + 1]; /**< First test failure reason */
+ char subtest[400]; /**< Current subtest, if any */
+ test_timing_t subtest_duration; /**< Subtest duration timing */
+ rd_bool_t subtest_quick; /**< Subtest is marked as QUICK */
+
+#if WITH_SOCKEM
+ rd_list_t sockets;
+ int (*connect_cb)(struct test *test, sockem_t *skm, const char *id);
+#endif
+ int (*is_fatal_cb)(rd_kafka_t *rk,
+ rd_kafka_resp_err_t err,
+ const char *reason);
+
+ /**< Resource usage thresholds */
+ struct rusage_thres rusage_thres; /**< Usage thresholds */
+#if HAVE_GETRUSAGE
+ struct rusage rusage; /**< Monitored process CPU/mem usage */
+#endif
+};
+
+
+#ifdef _WIN32
+#define TEST_F_KNOWN_ISSUE_WIN32 TEST_F_KNOWN_ISSUE
+#else
+#define TEST_F_KNOWN_ISSUE_WIN32 0
+#endif
+
+#ifdef __APPLE__
+#define TEST_F_KNOWN_ISSUE_OSX TEST_F_KNOWN_ISSUE
+#else
+#define TEST_F_KNOWN_ISSUE_OSX 0
+#endif
+
+
+#define TEST_SAY0(...) fprintf(stderr, __VA_ARGS__)
+#define TEST_SAYL(LVL, ...) \
+ do { \
+ if (test_level >= LVL) { \
+ fprintf( \
+ stderr, "\033[36m[%-28s/%7.3fs] ", \
+ test_curr->name, \
+ test_curr->start \
+ ? ((float)(test_clock() - test_curr->start) / \
+ 1000000.0f) \
+ : 0); \
+ fprintf(stderr, __VA_ARGS__); \
+ fprintf(stderr, "\033[0m"); \
+ } \
+ } while (0)
+#define TEST_SAY(...) TEST_SAYL(2, __VA_ARGS__)
+
+/**
+ * Append JSON object (as string) to this tests' report array.
+ */
+#define TEST_REPORT(...) test_report_add(test_curr, __VA_ARGS__)
+
+
+
+static RD_INLINE RD_UNUSED void rtrim(char *str) {
+ size_t len = strlen(str);
+ char *s;
+
+ if (len == 0)
+ return;
+
+ s = str + len - 1;
+ while (isspace((int)*s)) {
+ *s = '\0';
+ s--;
+ }
+}
+
+/* Skip the current test. Argument is textual reason (printf format) */
+#define TEST_SKIP(...) \
+ do { \
+ TEST_WARN("SKIPPING TEST: " __VA_ARGS__); \
+ TEST_LOCK(); \
+ test_curr->state = TEST_SKIPPED; \
+ if (!*test_curr->failstr) { \
+ rd_snprintf(test_curr->failstr, \
+ sizeof(test_curr->failstr), __VA_ARGS__); \
+ rtrim(test_curr->failstr); \
+ } \
+ TEST_UNLOCK(); \
+ } while (0)
+
+
+void test_conf_init(rd_kafka_conf_t **conf,
+ rd_kafka_topic_conf_t **topic_conf,
+ int timeout);
+
+
+
+void test_msg_fmt(char *dest,
+ size_t dest_size,
+ uint64_t testid,
+ int32_t partition,
+ int msgid);
+void test_msg_parse0(const char *func,
+ int line,
+ uint64_t testid,
+ rd_kafka_message_t *rkmessage,
+ int32_t exp_partition,
+ int *msgidp);
+#define test_msg_parse(testid, rkmessage, exp_partition, msgidp) \
+ test_msg_parse0(__FUNCTION__, __LINE__, testid, rkmessage, \
+ exp_partition, msgidp)
+
+
+static RD_INLINE int jitter(int low, int high) RD_UNUSED;
+static RD_INLINE int jitter(int low, int high) {
+ return (low + (rand() % ((high - low) + 1)));
+}
+
+
+
+/******************************************************************************
+ *
+ * Helpers
+ *
+ ******************************************************************************/
+
+
+
+/****************************************************************
+ * Message verification services *
+ * *
+ * *
+ * *
+ ****************************************************************/
+
+
+/**
+ * A test_msgver_t is first fed with messages from any number of
+ * topics and partitions, it is then checked for expected messages, such as:
+ * - all messages received, based on message payload information.
+ * - messages received in order
+ * - EOF
+ */
+struct test_msgver_s {
+ struct test_mv_p **p; /* Partitions array */
+ int p_cnt; /* Partition count */
+ int p_size; /* p size */
+ int msgcnt; /* Total message count */
+ uint64_t testid; /* Only accept messages for this testid */
+ rd_bool_t ignore_eof; /* Don't end PARTITION_EOF messages */
+
+ struct test_msgver_s *fwd; /* Also forward add_msg() to this mv */
+
+ int log_cnt; /* Current number of warning logs */
+ int log_max; /* Max warning logs before suppressing. */
+ int log_suppr_cnt; /* Number of suppressed log messages. */
+
+ const char *msgid_hdr; /**< msgid string is in header by this name,
+ * rather than in the payload (default). */
+}; /* test_msgver_t; */
+
+/* Message */
+struct test_mv_m {
+ int64_t offset; /* Message offset */
+ int msgid; /* Message id */
+ int64_t timestamp; /* Message timestamp */
+ int32_t broker_id; /* Message broker id */
+};
+
+
+/* Message vector */
+struct test_mv_mvec {
+ struct test_mv_m *m;
+ int cnt;
+ int size; /* m[] size */
+};
+
+/* Partition */
+struct test_mv_p {
+ char *topic;
+ int32_t partition;
+ struct test_mv_mvec mvec;
+ int64_t eof_offset;
+};
+
+/* Verification state */
+struct test_mv_vs {
+ int msg_base;
+ int exp_cnt;
+
+ /* used by verify_range */
+ int msgid_min;
+ int msgid_max;
+ int64_t timestamp_min;
+ int64_t timestamp_max;
+
+ /* used by verify_broker_id */
+ int32_t broker_id;
+
+ struct test_mv_mvec mvec;
+
+ /* Correct msgver for comparison */
+ test_msgver_t *corr;
+};
+
+
+void test_msgver_init(test_msgver_t *mv, uint64_t testid);
+void test_msgver_clear(test_msgver_t *mv);
+void test_msgver_ignore_eof(test_msgver_t *mv);
+int test_msgver_add_msg00(const char *func,
+ int line,
+ const char *clientname,
+ test_msgver_t *mv,
+ uint64_t testid,
+ const char *topic,
+ int32_t partition,
+ int64_t offset,
+ int64_t timestamp,
+ int32_t broker_id,
+ rd_kafka_resp_err_t err,
+ int msgnum);
+int test_msgver_add_msg0(const char *func,
+ int line,
+ const char *clientname,
+ test_msgver_t *mv,
+ const rd_kafka_message_t *rkmessage,
+ const char *override_topic);
+#define test_msgver_add_msg(rk, mv, rkm) \
+ test_msgver_add_msg0(__FUNCTION__, __LINE__, rd_kafka_name(rk), mv, \
+ rkm, NULL)
+
+/**
+ * Flags to indicate what to verify.
+ */
+#define TEST_MSGVER_ORDER 0x1 /* Order */
+#define TEST_MSGVER_DUP 0x2 /* Duplicates */
+#define TEST_MSGVER_RANGE 0x4 /* Range of messages */
+
+#define TEST_MSGVER_ALL 0xf /* All verifiers */
+
+#define TEST_MSGVER_BY_MSGID 0x10000 /* Verify by msgid (unique in testid) */
+#define TEST_MSGVER_BY_OFFSET \
+ 0x20000 /* Verify by offset (unique in partition)*/
+#define TEST_MSGVER_BY_TIMESTAMP 0x40000 /* Verify by timestamp range */
+#define TEST_MSGVER_BY_BROKER_ID 0x80000 /* Verify by broker id */
+
+#define TEST_MSGVER_SUBSET \
+ 0x100000 /* verify_compare: allow correct mv to be \
+ * a subset of mv. */
+
+/* Only test per partition, not across all messages received on all partitions.
+ * This is useful when doing incremental verifications with multiple partitions
+ * and the total number of messages has not been received yet.
+ * Can't do range check here since messages may be spread out on multiple
+ * partitions and we might just have read a few partitions. */
+#define TEST_MSGVER_PER_PART \
+ ((TEST_MSGVER_ALL & ~TEST_MSGVER_RANGE) | TEST_MSGVER_BY_MSGID | \
+ TEST_MSGVER_BY_OFFSET)
+
+/* Test on all messages across all partitions.
+ * This can only be used to check with msgid, not offset since that
+ * is partition local. */
+#define TEST_MSGVER_ALL_PART (TEST_MSGVER_ALL | TEST_MSGVER_BY_MSGID)
+
+
+int test_msgver_verify_part0(const char *func,
+ int line,
+ const char *what,
+ test_msgver_t *mv,
+ int flags,
+ const char *topic,
+ int partition,
+ int msg_base,
+ int exp_cnt);
+#define test_msgver_verify_part(what, mv, flags, topic, partition, msg_base, \
+ exp_cnt) \
+ test_msgver_verify_part0(__FUNCTION__, __LINE__, what, mv, flags, \
+ topic, partition, msg_base, exp_cnt)
+
+int test_msgver_verify0(const char *func,
+ int line,
+ const char *what,
+ test_msgver_t *mv,
+ int flags,
+ struct test_mv_vs vs);
+#define test_msgver_verify(what, mv, flags, msgbase, expcnt) \
+ test_msgver_verify0( \
+ __FUNCTION__, __LINE__, what, mv, flags, \
+ (struct test_mv_vs) {.msg_base = msgbase, .exp_cnt = expcnt})
+
+
+void test_msgver_verify_compare0(const char *func,
+ int line,
+ const char *what,
+ test_msgver_t *mv,
+ test_msgver_t *corr,
+ int flags);
+#define test_msgver_verify_compare(what, mv, corr, flags) \
+ test_msgver_verify_compare0(__FUNCTION__, __LINE__, what, mv, corr, \
+ flags)
+
+rd_kafka_t *test_create_handle(int mode, rd_kafka_conf_t *conf);
+
+/**
+ * Delivery reported callback.
+ * Called for each message once to signal its delivery status.
+ */
+void test_dr_msg_cb(rd_kafka_t *rk,
+ const rd_kafka_message_t *rkmessage,
+ void *opaque);
+
+rd_kafka_t *test_create_producer(void);
+rd_kafka_topic_t *
+test_create_producer_topic(rd_kafka_t *rk, const char *topic, ...);
+void test_wait_delivery(rd_kafka_t *rk, int *msgcounterp);
+void test_produce_msgs_nowait(rd_kafka_t *rk,
+ rd_kafka_topic_t *rkt,
+ uint64_t testid,
+ int32_t partition,
+ int msg_base,
+ int cnt,
+ const char *payload,
+ size_t size,
+ int msgrate,
+ int *msgcounterp);
+void test_produce_msgs(rd_kafka_t *rk,
+ rd_kafka_topic_t *rkt,
+ uint64_t testid,
+ int32_t partition,
+ int msg_base,
+ int cnt,
+ const char *payload,
+ size_t size);
+void test_produce_msgs2(rd_kafka_t *rk,
+ const char *topic,
+ uint64_t testid,
+ int32_t partition,
+ int msg_base,
+ int cnt,
+ const char *payload,
+ size_t size);
+void test_produce_msgs2_nowait(rd_kafka_t *rk,
+ const char *topic,
+ uint64_t testid,
+ int32_t partition,
+ int msg_base,
+ int cnt,
+ const char *payload,
+ size_t size,
+ int *remainsp);
+void test_produce_msgs_rate(rd_kafka_t *rk,
+ rd_kafka_topic_t *rkt,
+ uint64_t testid,
+ int32_t partition,
+ int msg_base,
+ int cnt,
+ const char *payload,
+ size_t size,
+ int msgrate);
+rd_kafka_resp_err_t test_produce_sync(rd_kafka_t *rk,
+ rd_kafka_topic_t *rkt,
+ uint64_t testid,
+ int32_t partition);
+
+void test_produce_msgs_easy_v(const char *topic,
+ uint64_t testid,
+ int32_t partition,
+ int msg_base,
+ int cnt,
+ size_t size,
+ ...);
+void test_produce_msgs_easy_multi(uint64_t testid, ...);
+
+void test_incremental_rebalance_cb(rd_kafka_t *rk,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *parts,
+ void *opaque);
+void test_rebalance_cb(rd_kafka_t *rk,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *parts,
+ void *opaque);
+
+rd_kafka_t *test_create_consumer(
+ const char *group_id,
+ void (*rebalance_cb)(rd_kafka_t *rk,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *partitions,
+ void *opaque),
+ rd_kafka_conf_t *conf,
+ rd_kafka_topic_conf_t *default_topic_conf);
+rd_kafka_topic_t *test_create_consumer_topic(rd_kafka_t *rk, const char *topic);
+rd_kafka_topic_t *
+test_create_topic_object(rd_kafka_t *rk, const char *topic, ...);
+void test_consumer_start(const char *what,
+ rd_kafka_topic_t *rkt,
+ int32_t partition,
+ int64_t start_offset);
+void test_consumer_stop(const char *what,
+ rd_kafka_topic_t *rkt,
+ int32_t partition);
+void test_consumer_seek(const char *what,
+ rd_kafka_topic_t *rkt,
+ int32_t partition,
+ int64_t offset);
+
+#define TEST_NO_SEEK -1
+int64_t test_consume_msgs(const char *what,
+ rd_kafka_topic_t *rkt,
+ uint64_t testid,
+ int32_t partition,
+ int64_t offset,
+ int exp_msg_base,
+ int exp_cnt,
+ int parse_fmt);
+
+
+void test_verify_rkmessage0(const char *func,
+ int line,
+ rd_kafka_message_t *rkmessage,
+ uint64_t testid,
+ int32_t partition,
+ int msgnum);
+#define test_verify_rkmessage(rkmessage, testid, partition, msgnum) \
+ test_verify_rkmessage0(__FUNCTION__, __LINE__, rkmessage, testid, \
+ partition, msgnum)
+
+void test_consumer_subscribe(rd_kafka_t *rk, const char *topic);
+
+void test_consume_msgs_easy_mv0(const char *group_id,
+ const char *topic,
+ rd_bool_t txn,
+ int32_t partition,
+ uint64_t testid,
+ int exp_eofcnt,
+ int exp_msgcnt,
+ rd_kafka_topic_conf_t *tconf,
+ test_msgver_t *mv);
+
+#define test_consume_msgs_easy_mv(group_id, topic, partition, testid, \
+ exp_eofcnt, exp_msgcnt, tconf, mv) \
+ test_consume_msgs_easy_mv0(group_id, topic, rd_false /*not-txn*/, \
+ partition, testid, exp_eofcnt, exp_msgcnt, \
+ tconf, mv)
+
+void test_consume_msgs_easy(const char *group_id,
+ const char *topic,
+ uint64_t testid,
+ int exp_eofcnt,
+ int exp_msgcnt,
+ rd_kafka_topic_conf_t *tconf);
+
+void test_consume_txn_msgs_easy(const char *group_id,
+ const char *topic,
+ uint64_t testid,
+ int exp_eofcnt,
+ int exp_msgcnt,
+ rd_kafka_topic_conf_t *tconf);
+
+void test_consumer_poll_no_msgs(const char *what,
+ rd_kafka_t *rk,
+ uint64_t testid,
+ int timeout_ms);
+void test_consumer_poll_expect_err(rd_kafka_t *rk,
+ uint64_t testid,
+ int timeout_ms,
+ rd_kafka_resp_err_t err);
+int test_consumer_poll_once(rd_kafka_t *rk, test_msgver_t *mv, int timeout_ms);
+int test_consumer_poll_exact_timeout(const char *what,
+ rd_kafka_t *rk,
+ uint64_t testid,
+ int exp_eof_cnt,
+ int exp_msg_base,
+ int exp_cnt,
+ rd_bool_t exact,
+ test_msgver_t *mv,
+ int timeout_ms);
+int test_consumer_poll_exact(const char *what,
+ rd_kafka_t *rk,
+ uint64_t testid,
+ int exp_eof_cnt,
+ int exp_msg_base,
+ int exp_cnt,
+ rd_bool_t exact,
+ test_msgver_t *mv);
+int test_consumer_poll(const char *what,
+ rd_kafka_t *rk,
+ uint64_t testid,
+ int exp_eof_cnt,
+ int exp_msg_base,
+ int exp_cnt,
+ test_msgver_t *mv);
+int test_consumer_poll_timeout(const char *what,
+ rd_kafka_t *rk,
+ uint64_t testid,
+ int exp_eof_cnt,
+ int exp_msg_base,
+ int exp_cnt,
+ test_msgver_t *mv,
+ int timeout_ms);
+
+void test_consumer_wait_assignment(rd_kafka_t *rk, rd_bool_t do_poll);
+void test_consumer_verify_assignment0(const char *func,
+ int line,
+ rd_kafka_t *rk,
+ int fail_immediately,
+ ...);
+#define test_consumer_verify_assignment(rk, fail_immediately, ...) \
+ test_consumer_verify_assignment0(__FUNCTION__, __LINE__, rk, \
+ fail_immediately, __VA_ARGS__)
+
+void test_consumer_assign(const char *what,
+ rd_kafka_t *rk,
+ rd_kafka_topic_partition_list_t *parts);
+void test_consumer_incremental_assign(const char *what,
+ rd_kafka_t *rk,
+ rd_kafka_topic_partition_list_t *parts);
+void test_consumer_unassign(const char *what, rd_kafka_t *rk);
+void test_consumer_incremental_unassign(const char *what,
+ rd_kafka_t *rk,
+ rd_kafka_topic_partition_list_t *parts);
+void test_consumer_assign_partition(const char *what,
+ rd_kafka_t *rk,
+ const char *topic,
+ int32_t partition,
+ int64_t offset);
+void test_consumer_pause_resume_partition(rd_kafka_t *rk,
+ const char *topic,
+ int32_t partition,
+ rd_bool_t pause);
+
+void test_consumer_close(rd_kafka_t *rk);
+
+void test_flush(rd_kafka_t *rk, int timeout_ms);
+
+void test_conf_set(rd_kafka_conf_t *conf, const char *name, const char *val);
+char *test_topic_conf_get(const rd_kafka_topic_conf_t *tconf, const char *name);
+int test_conf_match(rd_kafka_conf_t *conf, const char *name, const char *val);
+void test_topic_conf_set(rd_kafka_topic_conf_t *tconf,
+ const char *name,
+ const char *val);
+void test_any_conf_set(rd_kafka_conf_t *conf,
+ rd_kafka_topic_conf_t *tconf,
+ const char *name,
+ const char *val);
+
+void test_print_partition_list(
+ const rd_kafka_topic_partition_list_t *partitions);
+int test_partition_list_cmp(rd_kafka_topic_partition_list_t *al,
+ rd_kafka_topic_partition_list_t *bl);
+int test_partition_list_and_offsets_cmp(rd_kafka_topic_partition_list_t *al,
+ rd_kafka_topic_partition_list_t *bl);
+
+void test_kafka_topics(const char *fmt, ...);
+void test_admin_create_topic(rd_kafka_t *use_rk,
+ const char *topicname,
+ int partition_cnt,
+ int replication_factor,
+ const char **configs);
+void test_create_topic(rd_kafka_t *use_rk,
+ const char *topicname,
+ int partition_cnt,
+ int replication_factor);
+rd_kafka_resp_err_t test_auto_create_topic_rkt(rd_kafka_t *rk,
+ rd_kafka_topic_t *rkt,
+ int timeout_ms);
+rd_kafka_resp_err_t
+test_auto_create_topic(rd_kafka_t *rk, const char *name, int timeout_ms);
+int test_check_auto_create_topic(void);
+
+void test_create_partitions(rd_kafka_t *use_rk,
+ const char *topicname,
+ int new_partition_cnt);
+
+int test_get_partition_count(rd_kafka_t *rk,
+ const char *topicname,
+ int timeout_ms);
+
+char *tsprintf(const char *fmt, ...) RD_FORMAT(printf, 1, 2);
+
+void test_report_add(struct test *test, const char *fmt, ...);
+int test_can_create_topics(int skip);
+
+rd_kafka_event_t *test_wait_event(rd_kafka_queue_t *eventq,
+ rd_kafka_event_type_t event_type,
+ int timeout_ms);
+
+void test_prepare_msg(uint64_t testid,
+ int32_t partition,
+ int msg_id,
+ char *val,
+ size_t val_size,
+ char *key,
+ size_t key_size);
+
+#if WITH_SOCKEM
+void test_socket_enable(rd_kafka_conf_t *conf);
+void test_socket_close_all(struct test *test, int reinit);
+int test_socket_sockem_set_all(const char *key, int val);
+void test_socket_sockem_set(int s, const char *key, int value);
+#endif
+
+void test_headers_dump(const char *what,
+ int lvl,
+ const rd_kafka_headers_t *hdrs);
+
+int32_t *test_get_broker_ids(rd_kafka_t *use_rk, size_t *cntp);
+
+void test_wait_metadata_update(rd_kafka_t *rk,
+ rd_kafka_metadata_topic_t *topics,
+ size_t topic_cnt,
+ rd_kafka_metadata_topic_t *not_topics,
+ size_t not_topic_cnt,
+ int tmout);
+
+rd_kafka_event_t *test_wait_admin_result(rd_kafka_queue_t *q,
+ rd_kafka_event_type_t evtype,
+ int tmout);
+
+rd_kafka_resp_err_t test_wait_topic_admin_result(rd_kafka_queue_t *q,
+ rd_kafka_event_type_t evtype,
+ rd_kafka_event_t **retevent,
+ int tmout);
+
+rd_kafka_resp_err_t test_CreateTopics_simple(rd_kafka_t *rk,
+ rd_kafka_queue_t *useq,
+ char **topics,
+ size_t topic_cnt,
+ int num_partitions,
+ void *opaque);
+rd_kafka_resp_err_t test_CreatePartitions_simple(rd_kafka_t *rk,
+ rd_kafka_queue_t *useq,
+ const char *topic,
+ size_t total_part_cnt,
+ void *opaque);
+
+rd_kafka_resp_err_t test_DeleteTopics_simple(rd_kafka_t *rk,
+ rd_kafka_queue_t *useq,
+ char **topics,
+ size_t topic_cnt,
+ void *opaque);
+
+rd_kafka_resp_err_t test_AlterConfigs_simple(rd_kafka_t *rk,
+ rd_kafka_ResourceType_t restype,
+ const char *resname,
+ const char **configs,
+ size_t config_cnt);
+
+rd_kafka_resp_err_t test_DeleteGroups_simple(rd_kafka_t *rk,
+ rd_kafka_queue_t *useq,
+ char **groups,
+ size_t group_cnt,
+ void *opaque);
+
+rd_kafka_resp_err_t
+test_DeleteRecords_simple(rd_kafka_t *rk,
+ rd_kafka_queue_t *useq,
+ const rd_kafka_topic_partition_list_t *offsets,
+ void *opaque);
+
+rd_kafka_resp_err_t test_DeleteConsumerGroupOffsets_simple(
+ rd_kafka_t *rk,
+ rd_kafka_queue_t *useq,
+ const char *group_id,
+ const rd_kafka_topic_partition_list_t *offsets,
+ void *opaque);
+
+rd_kafka_resp_err_t test_CreateAcls_simple(rd_kafka_t *rk,
+ rd_kafka_queue_t *useq,
+ rd_kafka_AclBinding_t **acls,
+ size_t acl_cnt,
+ void *opaque);
+
+rd_kafka_resp_err_t test_delete_all_test_topics(int timeout_ms);
+
+void test_mock_cluster_destroy(rd_kafka_mock_cluster_t *mcluster);
+rd_kafka_mock_cluster_t *test_mock_cluster_new(int broker_cnt,
+ const char **bootstraps);
+
+
+
+int test_error_is_not_fatal_cb(rd_kafka_t *rk,
+ rd_kafka_resp_err_t err,
+ const char *reason);
+
+
+/**
+ * @brief Calls rdkafka function (with arguments)
+ * and checks its return value (must be rd_kafka_resp_err_t) for
+ * error, in which case the test fails.
+ * Also times the call.
+ *
+ * @remark The trailing __ makes calling code easier to read.
+ */
+#define TEST_CALL__(FUNC_W_ARGS) \
+ do { \
+ test_timing_t _timing; \
+ const char *_desc = RD_STRINGIFY(FUNC_W_ARGS); \
+ rd_kafka_resp_err_t _err; \
+ TIMING_START(&_timing, "%s", _desc); \
+ TEST_SAYL(3, "Begin call %s\n", _desc); \
+ _err = FUNC_W_ARGS; \
+ TIMING_STOP(&_timing); \
+ if (!_err) \
+ break; \
+ if (strstr(_desc, "errstr")) \
+ TEST_FAIL("%s failed: %s: %s\n", _desc, \
+ rd_kafka_err2name(_err), errstr); \
+ else \
+ TEST_FAIL("%s failed: %s\n", _desc, \
+ rd_kafka_err2str(_err)); \
+ } while (0)
+
+
+/**
+ * @brief Same as TEST_CALL__() but expects an rd_kafka_error_t * return type.
+ */
+#define TEST_CALL_ERROR__(FUNC_W_ARGS) \
+ do { \
+ test_timing_t _timing; \
+ const char *_desc = RD_STRINGIFY(FUNC_W_ARGS); \
+ const rd_kafka_error_t *_error; \
+ TIMING_START(&_timing, "%s", _desc); \
+ TEST_SAYL(3, "Begin call %s\n", _desc); \
+ _error = FUNC_W_ARGS; \
+ TIMING_STOP(&_timing); \
+ if (!_error) \
+ break; \
+ TEST_FAIL("%s failed: %s\n", _desc, \
+ rd_kafka_error_string(_error)); \
+ } while (0)
+
+/**
+ * @brief Same as TEST_CALL__() but expects an rd_kafka_resp_err_t return type
+ * without errstr.
+ */
+#define TEST_CALL_ERR__(FUNC_W_ARGS) \
+ do { \
+ test_timing_t _timing; \
+ const char *_desc = RD_STRINGIFY(FUNC_W_ARGS); \
+ rd_kafka_resp_err_t _err; \
+ TIMING_START(&_timing, "%s", _desc); \
+ TEST_SAYL(3, "Begin call %s\n", _desc); \
+ _err = FUNC_W_ARGS; \
+ TIMING_STOP(&_timing); \
+ if (!_err) \
+ break; \
+ TEST_FAIL("%s failed: %s\n", _desc, rd_kafka_err2str(_err)); \
+ } while (0)
+
+
+/**
+ * @brief Print a rich error_t object in all its glory. NULL is ok.
+ *
+ * @param ... Is a prefix format-string+args that is printed with TEST_SAY()
+ * prior to the error details. E.g., "commit() returned: ".
+ * A newline is automatically appended.
+ */
+#define TEST_SAY_ERROR(ERROR, ...) \
+ do { \
+ rd_kafka_error_t *_e = (ERROR); \
+ TEST_SAY(__VA_ARGS__); \
+ if (!_e) { \
+ TEST_SAY0("No error" _C_CLR "\n"); \
+ break; \
+ } \
+ if (rd_kafka_error_is_fatal(_e)) \
+ TEST_SAY0(_C_RED "FATAL "); \
+ if (rd_kafka_error_is_retriable(_e)) \
+ TEST_SAY0("Retriable "); \
+ if (rd_kafka_error_txn_requires_abort(_e)) \
+ TEST_SAY0("TxnRequiresAbort "); \
+ TEST_SAY0("Error: %s: %s" _C_CLR "\n", \
+ rd_kafka_error_name(_e), rd_kafka_error_string(_e)); \
+ } while (0)
+
+/**
+ * @name rusage.c
+ * @{
+ */
+void test_rusage_start(struct test *test);
+int test_rusage_stop(struct test *test, double duration);
+
+/**@}*/
+
+#endif /* _TEST_H_ */