summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/tests/test.h
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/test.h')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/tests/test.h936
1 files changed, 0 insertions, 936 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/test.h b/fluent-bit/lib/librdkafka-2.1.0/tests/test.h
deleted file mode 100644
index a431f9a25..000000000
--- a/fluent-bit/lib/librdkafka-2.1.0/tests/test.h
+++ /dev/null
@@ -1,936 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2015, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-#ifndef _TEST_H_
-#define _TEST_H_
-
-#include "../src/rd.h"
-
-#include <stdio.h>
-#include <string.h>
-#include <stdlib.h>
-#ifndef _WIN32
-#include <unistd.h>
-#endif
-#include <errno.h>
-#include <assert.h>
-#include <time.h>
-#include <ctype.h>
-
-#if HAVE_GETRUSAGE
-#include <sys/time.h>
-#include <sys/resource.h>
-#endif
-
-#include "rdkafka.h"
-#include "rdkafka_mock.h"
-#include "tinycthread.h"
-#include "rdlist.h"
-
-#if WITH_SOCKEM
-#include "sockem.h"
-#endif
-
-#include "testshared.h"
-#ifdef _WIN32
-#define sscanf(...) sscanf_s(__VA_ARGS__)
-#endif
-
-/**
- * Test output is controlled through "TEST_LEVEL=N" environemnt variable.
- * N < 2: TEST_SAY() is quiet.
- */
-
-extern int test_seed;
-extern char test_mode[64];
-extern RD_TLS struct test *test_curr;
-extern int test_assert_on_fail;
-extern int tests_running_cnt;
-extern int test_concurrent_max;
-extern int test_rusage;
-extern double test_rusage_cpu_calibration;
-extern double test_timeout_multiplier;
-extern int test_session_timeout_ms; /* Group session timeout */
-extern int test_flags;
-extern int test_neg_flags;
-extern int test_idempotent_producer;
-
-extern mtx_t test_mtx;
-
-#define TEST_LOCK() mtx_lock(&test_mtx)
-#define TEST_UNLOCK() mtx_unlock(&test_mtx)
-
-
-/* Forward decl */
-typedef struct test_msgver_s test_msgver_t;
-
-
-/** @struct Resource usage thresholds */
-struct rusage_thres {
- double ucpu; /**< Max User CPU in percentage */
- double scpu; /**< Max Sys CPU in percentage */
- double rss; /**< Max RSS (memory) increase in MB */
- int ctxsw; /**< Max number of voluntary context switches, i.e.
- * syscalls. */
-};
-
-typedef enum {
- TEST_NOT_STARTED,
- TEST_SKIPPED,
- TEST_RUNNING,
- TEST_PASSED,
- TEST_FAILED,
-} test_state_t;
-
-struct test {
- /**
- * Setup
- */
- const char *name; /**< e.g. Same as filename minus extension */
- int (*mainfunc)(int argc, char **argv); /**< test's main func */
- const int flags; /**< Test flags */
-#define TEST_F_LOCAL 0x1 /**< Test is local, no broker requirement */
-#define TEST_F_KNOWN_ISSUE \
- 0x2 /**< Known issue, can fail without affecting \
- * total test run status. */
-#define TEST_F_MANUAL \
- 0x4 /**< Manual test, only started when specifically \
- * stated */
-#define TEST_F_SOCKEM 0x8 /**< Test requires socket emulation. */
- int minver; /**< Limit tests to broker version range. */
- int maxver;
-
- const char *extra; /**< Extra information to print in test_summary. */
-
- const char *scenario; /**< Test scenario */
-
- char *
- *report_arr; /**< Test-specific reporting, JSON array of objects. */
- int report_cnt;
- int report_size;
-
- rd_bool_t ignore_dr_err; /**< Ignore delivery report errors */
- rd_kafka_resp_err_t exp_dr_err; /* Expected error in test_dr_cb */
- rd_kafka_msg_status_t exp_dr_status; /**< Expected delivery status,
- * or -1 for not checking. */
- int produce_sync; /**< test_produce_sync() call in action */
- rd_kafka_resp_err_t produce_sync_err; /**< DR error */
- test_msgver_t *dr_mv; /**< MsgVer that delivered messages will be
- * added to (if not NULL).
- * Must be set and freed by test. */
-
- /**
- * Runtime
- */
- thrd_t thrd;
- int64_t start;
- int64_t duration;
- FILE *stats_fp;
- int64_t timeout;
- test_state_t state;
- int failcnt; /**< Number of failures, useful with FAIL_LATER */
- char failstr[512 + 1]; /**< First test failure reason */
- char subtest[400]; /**< Current subtest, if any */
- test_timing_t subtest_duration; /**< Subtest duration timing */
- rd_bool_t subtest_quick; /**< Subtest is marked as QUICK */
-
-#if WITH_SOCKEM
- rd_list_t sockets;
- int (*connect_cb)(struct test *test, sockem_t *skm, const char *id);
-#endif
- int (*is_fatal_cb)(rd_kafka_t *rk,
- rd_kafka_resp_err_t err,
- const char *reason);
-
- /**< Resource usage thresholds */
- struct rusage_thres rusage_thres; /**< Usage thresholds */
-#if HAVE_GETRUSAGE
- struct rusage rusage; /**< Monitored process CPU/mem usage */
-#endif
-};
-
-
-#ifdef _WIN32
-#define TEST_F_KNOWN_ISSUE_WIN32 TEST_F_KNOWN_ISSUE
-#else
-#define TEST_F_KNOWN_ISSUE_WIN32 0
-#endif
-
-#ifdef __APPLE__
-#define TEST_F_KNOWN_ISSUE_OSX TEST_F_KNOWN_ISSUE
-#else
-#define TEST_F_KNOWN_ISSUE_OSX 0
-#endif
-
-
-#define TEST_SAY0(...) fprintf(stderr, __VA_ARGS__)
-#define TEST_SAYL(LVL, ...) \
- do { \
- if (test_level >= LVL) { \
- fprintf( \
- stderr, "\033[36m[%-28s/%7.3fs] ", \
- test_curr->name, \
- test_curr->start \
- ? ((float)(test_clock() - test_curr->start) / \
- 1000000.0f) \
- : 0); \
- fprintf(stderr, __VA_ARGS__); \
- fprintf(stderr, "\033[0m"); \
- } \
- } while (0)
-#define TEST_SAY(...) TEST_SAYL(2, __VA_ARGS__)
-
-/**
- * Append JSON object (as string) to this tests' report array.
- */
-#define TEST_REPORT(...) test_report_add(test_curr, __VA_ARGS__)
-
-
-
-static RD_INLINE RD_UNUSED void rtrim(char *str) {
- size_t len = strlen(str);
- char *s;
-
- if (len == 0)
- return;
-
- s = str + len - 1;
- while (isspace((int)*s)) {
- *s = '\0';
- s--;
- }
-}
-
-/* Skip the current test. Argument is textual reason (printf format) */
-#define TEST_SKIP(...) \
- do { \
- TEST_WARN("SKIPPING TEST: " __VA_ARGS__); \
- TEST_LOCK(); \
- test_curr->state = TEST_SKIPPED; \
- if (!*test_curr->failstr) { \
- rd_snprintf(test_curr->failstr, \
- sizeof(test_curr->failstr), __VA_ARGS__); \
- rtrim(test_curr->failstr); \
- } \
- TEST_UNLOCK(); \
- } while (0)
-
-
-void test_conf_init(rd_kafka_conf_t **conf,
- rd_kafka_topic_conf_t **topic_conf,
- int timeout);
-
-
-
-void test_msg_fmt(char *dest,
- size_t dest_size,
- uint64_t testid,
- int32_t partition,
- int msgid);
-void test_msg_parse0(const char *func,
- int line,
- uint64_t testid,
- rd_kafka_message_t *rkmessage,
- int32_t exp_partition,
- int *msgidp);
-#define test_msg_parse(testid, rkmessage, exp_partition, msgidp) \
- test_msg_parse0(__FUNCTION__, __LINE__, testid, rkmessage, \
- exp_partition, msgidp)
-
-
-static RD_INLINE int jitter(int low, int high) RD_UNUSED;
-static RD_INLINE int jitter(int low, int high) {
- return (low + (rand() % ((high - low) + 1)));
-}
-
-
-
-/******************************************************************************
- *
- * Helpers
- *
- ******************************************************************************/
-
-
-
-/****************************************************************
- * Message verification services *
- * *
- * *
- * *
- ****************************************************************/
-
-
-/**
- * A test_msgver_t is first fed with messages from any number of
- * topics and partitions, it is then checked for expected messages, such as:
- * - all messages received, based on message payload information.
- * - messages received in order
- * - EOF
- */
-struct test_msgver_s {
- struct test_mv_p **p; /* Partitions array */
- int p_cnt; /* Partition count */
- int p_size; /* p size */
- int msgcnt; /* Total message count */
- uint64_t testid; /* Only accept messages for this testid */
- rd_bool_t ignore_eof; /* Don't end PARTITION_EOF messages */
-
- struct test_msgver_s *fwd; /* Also forward add_msg() to this mv */
-
- int log_cnt; /* Current number of warning logs */
- int log_max; /* Max warning logs before suppressing. */
- int log_suppr_cnt; /* Number of suppressed log messages. */
-
- const char *msgid_hdr; /**< msgid string is in header by this name,
- * rather than in the payload (default). */
-}; /* test_msgver_t; */
-
-/* Message */
-struct test_mv_m {
- int64_t offset; /* Message offset */
- int msgid; /* Message id */
- int64_t timestamp; /* Message timestamp */
- int32_t broker_id; /* Message broker id */
-};
-
-
-/* Message vector */
-struct test_mv_mvec {
- struct test_mv_m *m;
- int cnt;
- int size; /* m[] size */
-};
-
-/* Partition */
-struct test_mv_p {
- char *topic;
- int32_t partition;
- struct test_mv_mvec mvec;
- int64_t eof_offset;
-};
-
-/* Verification state */
-struct test_mv_vs {
- int msg_base;
- int exp_cnt;
-
- /* used by verify_range */
- int msgid_min;
- int msgid_max;
- int64_t timestamp_min;
- int64_t timestamp_max;
-
- /* used by verify_broker_id */
- int32_t broker_id;
-
- struct test_mv_mvec mvec;
-
- /* Correct msgver for comparison */
- test_msgver_t *corr;
-};
-
-
-void test_msgver_init(test_msgver_t *mv, uint64_t testid);
-void test_msgver_clear(test_msgver_t *mv);
-void test_msgver_ignore_eof(test_msgver_t *mv);
-int test_msgver_add_msg00(const char *func,
- int line,
- const char *clientname,
- test_msgver_t *mv,
- uint64_t testid,
- const char *topic,
- int32_t partition,
- int64_t offset,
- int64_t timestamp,
- int32_t broker_id,
- rd_kafka_resp_err_t err,
- int msgnum);
-int test_msgver_add_msg0(const char *func,
- int line,
- const char *clientname,
- test_msgver_t *mv,
- const rd_kafka_message_t *rkmessage,
- const char *override_topic);
-#define test_msgver_add_msg(rk, mv, rkm) \
- test_msgver_add_msg0(__FUNCTION__, __LINE__, rd_kafka_name(rk), mv, \
- rkm, NULL)
-
-/**
- * Flags to indicate what to verify.
- */
-#define TEST_MSGVER_ORDER 0x1 /* Order */
-#define TEST_MSGVER_DUP 0x2 /* Duplicates */
-#define TEST_MSGVER_RANGE 0x4 /* Range of messages */
-
-#define TEST_MSGVER_ALL 0xf /* All verifiers */
-
-#define TEST_MSGVER_BY_MSGID 0x10000 /* Verify by msgid (unique in testid) */
-#define TEST_MSGVER_BY_OFFSET \
- 0x20000 /* Verify by offset (unique in partition)*/
-#define TEST_MSGVER_BY_TIMESTAMP 0x40000 /* Verify by timestamp range */
-#define TEST_MSGVER_BY_BROKER_ID 0x80000 /* Verify by broker id */
-
-#define TEST_MSGVER_SUBSET \
- 0x100000 /* verify_compare: allow correct mv to be \
- * a subset of mv. */
-
-/* Only test per partition, not across all messages received on all partitions.
- * This is useful when doing incremental verifications with multiple partitions
- * and the total number of messages has not been received yet.
- * Can't do range check here since messages may be spread out on multiple
- * partitions and we might just have read a few partitions. */
-#define TEST_MSGVER_PER_PART \
- ((TEST_MSGVER_ALL & ~TEST_MSGVER_RANGE) | TEST_MSGVER_BY_MSGID | \
- TEST_MSGVER_BY_OFFSET)
-
-/* Test on all messages across all partitions.
- * This can only be used to check with msgid, not offset since that
- * is partition local. */
-#define TEST_MSGVER_ALL_PART (TEST_MSGVER_ALL | TEST_MSGVER_BY_MSGID)
-
-
-int test_msgver_verify_part0(const char *func,
- int line,
- const char *what,
- test_msgver_t *mv,
- int flags,
- const char *topic,
- int partition,
- int msg_base,
- int exp_cnt);
-#define test_msgver_verify_part(what, mv, flags, topic, partition, msg_base, \
- exp_cnt) \
- test_msgver_verify_part0(__FUNCTION__, __LINE__, what, mv, flags, \
- topic, partition, msg_base, exp_cnt)
-
-int test_msgver_verify0(const char *func,
- int line,
- const char *what,
- test_msgver_t *mv,
- int flags,
- struct test_mv_vs vs);
-#define test_msgver_verify(what, mv, flags, msgbase, expcnt) \
- test_msgver_verify0( \
- __FUNCTION__, __LINE__, what, mv, flags, \
- (struct test_mv_vs) {.msg_base = msgbase, .exp_cnt = expcnt})
-
-
-void test_msgver_verify_compare0(const char *func,
- int line,
- const char *what,
- test_msgver_t *mv,
- test_msgver_t *corr,
- int flags);
-#define test_msgver_verify_compare(what, mv, corr, flags) \
- test_msgver_verify_compare0(__FUNCTION__, __LINE__, what, mv, corr, \
- flags)
-
-rd_kafka_t *test_create_handle(int mode, rd_kafka_conf_t *conf);
-
-/**
- * Delivery reported callback.
- * Called for each message once to signal its delivery status.
- */
-void test_dr_msg_cb(rd_kafka_t *rk,
- const rd_kafka_message_t *rkmessage,
- void *opaque);
-
-rd_kafka_t *test_create_producer(void);
-rd_kafka_topic_t *
-test_create_producer_topic(rd_kafka_t *rk, const char *topic, ...);
-void test_wait_delivery(rd_kafka_t *rk, int *msgcounterp);
-void test_produce_msgs_nowait(rd_kafka_t *rk,
- rd_kafka_topic_t *rkt,
- uint64_t testid,
- int32_t partition,
- int msg_base,
- int cnt,
- const char *payload,
- size_t size,
- int msgrate,
- int *msgcounterp);
-void test_produce_msgs(rd_kafka_t *rk,
- rd_kafka_topic_t *rkt,
- uint64_t testid,
- int32_t partition,
- int msg_base,
- int cnt,
- const char *payload,
- size_t size);
-void test_produce_msgs2(rd_kafka_t *rk,
- const char *topic,
- uint64_t testid,
- int32_t partition,
- int msg_base,
- int cnt,
- const char *payload,
- size_t size);
-void test_produce_msgs2_nowait(rd_kafka_t *rk,
- const char *topic,
- uint64_t testid,
- int32_t partition,
- int msg_base,
- int cnt,
- const char *payload,
- size_t size,
- int *remainsp);
-void test_produce_msgs_rate(rd_kafka_t *rk,
- rd_kafka_topic_t *rkt,
- uint64_t testid,
- int32_t partition,
- int msg_base,
- int cnt,
- const char *payload,
- size_t size,
- int msgrate);
-rd_kafka_resp_err_t test_produce_sync(rd_kafka_t *rk,
- rd_kafka_topic_t *rkt,
- uint64_t testid,
- int32_t partition);
-
-void test_produce_msgs_easy_v(const char *topic,
- uint64_t testid,
- int32_t partition,
- int msg_base,
- int cnt,
- size_t size,
- ...);
-void test_produce_msgs_easy_multi(uint64_t testid, ...);
-
-void test_incremental_rebalance_cb(rd_kafka_t *rk,
- rd_kafka_resp_err_t err,
- rd_kafka_topic_partition_list_t *parts,
- void *opaque);
-void test_rebalance_cb(rd_kafka_t *rk,
- rd_kafka_resp_err_t err,
- rd_kafka_topic_partition_list_t *parts,
- void *opaque);
-
-rd_kafka_t *test_create_consumer(
- const char *group_id,
- void (*rebalance_cb)(rd_kafka_t *rk,
- rd_kafka_resp_err_t err,
- rd_kafka_topic_partition_list_t *partitions,
- void *opaque),
- rd_kafka_conf_t *conf,
- rd_kafka_topic_conf_t *default_topic_conf);
-rd_kafka_topic_t *test_create_consumer_topic(rd_kafka_t *rk, const char *topic);
-rd_kafka_topic_t *
-test_create_topic_object(rd_kafka_t *rk, const char *topic, ...);
-void test_consumer_start(const char *what,
- rd_kafka_topic_t *rkt,
- int32_t partition,
- int64_t start_offset);
-void test_consumer_stop(const char *what,
- rd_kafka_topic_t *rkt,
- int32_t partition);
-void test_consumer_seek(const char *what,
- rd_kafka_topic_t *rkt,
- int32_t partition,
- int64_t offset);
-
-#define TEST_NO_SEEK -1
-int64_t test_consume_msgs(const char *what,
- rd_kafka_topic_t *rkt,
- uint64_t testid,
- int32_t partition,
- int64_t offset,
- int exp_msg_base,
- int exp_cnt,
- int parse_fmt);
-
-
-void test_verify_rkmessage0(const char *func,
- int line,
- rd_kafka_message_t *rkmessage,
- uint64_t testid,
- int32_t partition,
- int msgnum);
-#define test_verify_rkmessage(rkmessage, testid, partition, msgnum) \
- test_verify_rkmessage0(__FUNCTION__, __LINE__, rkmessage, testid, \
- partition, msgnum)
-
-void test_consumer_subscribe(rd_kafka_t *rk, const char *topic);
-
-void test_consume_msgs_easy_mv0(const char *group_id,
- const char *topic,
- rd_bool_t txn,
- int32_t partition,
- uint64_t testid,
- int exp_eofcnt,
- int exp_msgcnt,
- rd_kafka_topic_conf_t *tconf,
- test_msgver_t *mv);
-
-#define test_consume_msgs_easy_mv(group_id, topic, partition, testid, \
- exp_eofcnt, exp_msgcnt, tconf, mv) \
- test_consume_msgs_easy_mv0(group_id, topic, rd_false /*not-txn*/, \
- partition, testid, exp_eofcnt, exp_msgcnt, \
- tconf, mv)
-
-void test_consume_msgs_easy(const char *group_id,
- const char *topic,
- uint64_t testid,
- int exp_eofcnt,
- int exp_msgcnt,
- rd_kafka_topic_conf_t *tconf);
-
-void test_consume_txn_msgs_easy(const char *group_id,
- const char *topic,
- uint64_t testid,
- int exp_eofcnt,
- int exp_msgcnt,
- rd_kafka_topic_conf_t *tconf);
-
-void test_consumer_poll_no_msgs(const char *what,
- rd_kafka_t *rk,
- uint64_t testid,
- int timeout_ms);
-void test_consumer_poll_expect_err(rd_kafka_t *rk,
- uint64_t testid,
- int timeout_ms,
- rd_kafka_resp_err_t err);
-int test_consumer_poll_once(rd_kafka_t *rk, test_msgver_t *mv, int timeout_ms);
-int test_consumer_poll_exact_timeout(const char *what,
- rd_kafka_t *rk,
- uint64_t testid,
- int exp_eof_cnt,
- int exp_msg_base,
- int exp_cnt,
- rd_bool_t exact,
- test_msgver_t *mv,
- int timeout_ms);
-int test_consumer_poll_exact(const char *what,
- rd_kafka_t *rk,
- uint64_t testid,
- int exp_eof_cnt,
- int exp_msg_base,
- int exp_cnt,
- rd_bool_t exact,
- test_msgver_t *mv);
-int test_consumer_poll(const char *what,
- rd_kafka_t *rk,
- uint64_t testid,
- int exp_eof_cnt,
- int exp_msg_base,
- int exp_cnt,
- test_msgver_t *mv);
-int test_consumer_poll_timeout(const char *what,
- rd_kafka_t *rk,
- uint64_t testid,
- int exp_eof_cnt,
- int exp_msg_base,
- int exp_cnt,
- test_msgver_t *mv,
- int timeout_ms);
-
-void test_consumer_wait_assignment(rd_kafka_t *rk, rd_bool_t do_poll);
-void test_consumer_verify_assignment0(const char *func,
- int line,
- rd_kafka_t *rk,
- int fail_immediately,
- ...);
-#define test_consumer_verify_assignment(rk, fail_immediately, ...) \
- test_consumer_verify_assignment0(__FUNCTION__, __LINE__, rk, \
- fail_immediately, __VA_ARGS__)
-
-void test_consumer_assign(const char *what,
- rd_kafka_t *rk,
- rd_kafka_topic_partition_list_t *parts);
-void test_consumer_incremental_assign(const char *what,
- rd_kafka_t *rk,
- rd_kafka_topic_partition_list_t *parts);
-void test_consumer_unassign(const char *what, rd_kafka_t *rk);
-void test_consumer_incremental_unassign(const char *what,
- rd_kafka_t *rk,
- rd_kafka_topic_partition_list_t *parts);
-void test_consumer_assign_partition(const char *what,
- rd_kafka_t *rk,
- const char *topic,
- int32_t partition,
- int64_t offset);
-void test_consumer_pause_resume_partition(rd_kafka_t *rk,
- const char *topic,
- int32_t partition,
- rd_bool_t pause);
-
-void test_consumer_close(rd_kafka_t *rk);
-
-void test_flush(rd_kafka_t *rk, int timeout_ms);
-
-void test_conf_set(rd_kafka_conf_t *conf, const char *name, const char *val);
-char *test_topic_conf_get(const rd_kafka_topic_conf_t *tconf, const char *name);
-int test_conf_match(rd_kafka_conf_t *conf, const char *name, const char *val);
-void test_topic_conf_set(rd_kafka_topic_conf_t *tconf,
- const char *name,
- const char *val);
-void test_any_conf_set(rd_kafka_conf_t *conf,
- rd_kafka_topic_conf_t *tconf,
- const char *name,
- const char *val);
-
-void test_print_partition_list(
- const rd_kafka_topic_partition_list_t *partitions);
-int test_partition_list_cmp(rd_kafka_topic_partition_list_t *al,
- rd_kafka_topic_partition_list_t *bl);
-int test_partition_list_and_offsets_cmp(rd_kafka_topic_partition_list_t *al,
- rd_kafka_topic_partition_list_t *bl);
-
-void test_kafka_topics(const char *fmt, ...);
-void test_admin_create_topic(rd_kafka_t *use_rk,
- const char *topicname,
- int partition_cnt,
- int replication_factor,
- const char **configs);
-void test_create_topic(rd_kafka_t *use_rk,
- const char *topicname,
- int partition_cnt,
- int replication_factor);
-rd_kafka_resp_err_t test_auto_create_topic_rkt(rd_kafka_t *rk,
- rd_kafka_topic_t *rkt,
- int timeout_ms);
-rd_kafka_resp_err_t
-test_auto_create_topic(rd_kafka_t *rk, const char *name, int timeout_ms);
-int test_check_auto_create_topic(void);
-
-void test_create_partitions(rd_kafka_t *use_rk,
- const char *topicname,
- int new_partition_cnt);
-
-int test_get_partition_count(rd_kafka_t *rk,
- const char *topicname,
- int timeout_ms);
-
-char *tsprintf(const char *fmt, ...) RD_FORMAT(printf, 1, 2);
-
-void test_report_add(struct test *test, const char *fmt, ...);
-int test_can_create_topics(int skip);
-
-rd_kafka_event_t *test_wait_event(rd_kafka_queue_t *eventq,
- rd_kafka_event_type_t event_type,
- int timeout_ms);
-
-void test_prepare_msg(uint64_t testid,
- int32_t partition,
- int msg_id,
- char *val,
- size_t val_size,
- char *key,
- size_t key_size);
-
-#if WITH_SOCKEM
-void test_socket_enable(rd_kafka_conf_t *conf);
-void test_socket_close_all(struct test *test, int reinit);
-int test_socket_sockem_set_all(const char *key, int val);
-void test_socket_sockem_set(int s, const char *key, int value);
-#endif
-
-void test_headers_dump(const char *what,
- int lvl,
- const rd_kafka_headers_t *hdrs);
-
-int32_t *test_get_broker_ids(rd_kafka_t *use_rk, size_t *cntp);
-
-void test_wait_metadata_update(rd_kafka_t *rk,
- rd_kafka_metadata_topic_t *topics,
- size_t topic_cnt,
- rd_kafka_metadata_topic_t *not_topics,
- size_t not_topic_cnt,
- int tmout);
-
-rd_kafka_event_t *test_wait_admin_result(rd_kafka_queue_t *q,
- rd_kafka_event_type_t evtype,
- int tmout);
-
-rd_kafka_resp_err_t test_wait_topic_admin_result(rd_kafka_queue_t *q,
- rd_kafka_event_type_t evtype,
- rd_kafka_event_t **retevent,
- int tmout);
-
-rd_kafka_resp_err_t test_CreateTopics_simple(rd_kafka_t *rk,
- rd_kafka_queue_t *useq,
- char **topics,
- size_t topic_cnt,
- int num_partitions,
- void *opaque);
-rd_kafka_resp_err_t test_CreatePartitions_simple(rd_kafka_t *rk,
- rd_kafka_queue_t *useq,
- const char *topic,
- size_t total_part_cnt,
- void *opaque);
-
-rd_kafka_resp_err_t test_DeleteTopics_simple(rd_kafka_t *rk,
- rd_kafka_queue_t *useq,
- char **topics,
- size_t topic_cnt,
- void *opaque);
-
-rd_kafka_resp_err_t test_AlterConfigs_simple(rd_kafka_t *rk,
- rd_kafka_ResourceType_t restype,
- const char *resname,
- const char **configs,
- size_t config_cnt);
-
-rd_kafka_resp_err_t test_DeleteGroups_simple(rd_kafka_t *rk,
- rd_kafka_queue_t *useq,
- char **groups,
- size_t group_cnt,
- void *opaque);
-
-rd_kafka_resp_err_t
-test_DeleteRecords_simple(rd_kafka_t *rk,
- rd_kafka_queue_t *useq,
- const rd_kafka_topic_partition_list_t *offsets,
- void *opaque);
-
-rd_kafka_resp_err_t test_DeleteConsumerGroupOffsets_simple(
- rd_kafka_t *rk,
- rd_kafka_queue_t *useq,
- const char *group_id,
- const rd_kafka_topic_partition_list_t *offsets,
- void *opaque);
-
-rd_kafka_resp_err_t test_CreateAcls_simple(rd_kafka_t *rk,
- rd_kafka_queue_t *useq,
- rd_kafka_AclBinding_t **acls,
- size_t acl_cnt,
- void *opaque);
-
-rd_kafka_resp_err_t test_delete_all_test_topics(int timeout_ms);
-
-void test_mock_cluster_destroy(rd_kafka_mock_cluster_t *mcluster);
-rd_kafka_mock_cluster_t *test_mock_cluster_new(int broker_cnt,
- const char **bootstraps);
-
-
-
-int test_error_is_not_fatal_cb(rd_kafka_t *rk,
- rd_kafka_resp_err_t err,
- const char *reason);
-
-
-/**
- * @brief Calls rdkafka function (with arguments)
- * and checks its return value (must be rd_kafka_resp_err_t) for
- * error, in which case the test fails.
- * Also times the call.
- *
- * @remark The trailing __ makes calling code easier to read.
- */
-#define TEST_CALL__(FUNC_W_ARGS) \
- do { \
- test_timing_t _timing; \
- const char *_desc = RD_STRINGIFY(FUNC_W_ARGS); \
- rd_kafka_resp_err_t _err; \
- TIMING_START(&_timing, "%s", _desc); \
- TEST_SAYL(3, "Begin call %s\n", _desc); \
- _err = FUNC_W_ARGS; \
- TIMING_STOP(&_timing); \
- if (!_err) \
- break; \
- if (strstr(_desc, "errstr")) \
- TEST_FAIL("%s failed: %s: %s\n", _desc, \
- rd_kafka_err2name(_err), errstr); \
- else \
- TEST_FAIL("%s failed: %s\n", _desc, \
- rd_kafka_err2str(_err)); \
- } while (0)
-
-
-/**
- * @brief Same as TEST_CALL__() but expects an rd_kafka_error_t * return type.
- */
-#define TEST_CALL_ERROR__(FUNC_W_ARGS) \
- do { \
- test_timing_t _timing; \
- const char *_desc = RD_STRINGIFY(FUNC_W_ARGS); \
- const rd_kafka_error_t *_error; \
- TIMING_START(&_timing, "%s", _desc); \
- TEST_SAYL(3, "Begin call %s\n", _desc); \
- _error = FUNC_W_ARGS; \
- TIMING_STOP(&_timing); \
- if (!_error) \
- break; \
- TEST_FAIL("%s failed: %s\n", _desc, \
- rd_kafka_error_string(_error)); \
- } while (0)
-
-/**
- * @brief Same as TEST_CALL__() but expects an rd_kafka_resp_err_t return type
- * without errstr.
- */
-#define TEST_CALL_ERR__(FUNC_W_ARGS) \
- do { \
- test_timing_t _timing; \
- const char *_desc = RD_STRINGIFY(FUNC_W_ARGS); \
- rd_kafka_resp_err_t _err; \
- TIMING_START(&_timing, "%s", _desc); \
- TEST_SAYL(3, "Begin call %s\n", _desc); \
- _err = FUNC_W_ARGS; \
- TIMING_STOP(&_timing); \
- if (!_err) \
- break; \
- TEST_FAIL("%s failed: %s\n", _desc, rd_kafka_err2str(_err)); \
- } while (0)
-
-
-/**
- * @brief Print a rich error_t object in all its glory. NULL is ok.
- *
- * @param ... Is a prefix format-string+args that is printed with TEST_SAY()
- * prior to the error details. E.g., "commit() returned: ".
- * A newline is automatically appended.
- */
-#define TEST_SAY_ERROR(ERROR, ...) \
- do { \
- rd_kafka_error_t *_e = (ERROR); \
- TEST_SAY(__VA_ARGS__); \
- if (!_e) { \
- TEST_SAY0("No error" _C_CLR "\n"); \
- break; \
- } \
- if (rd_kafka_error_is_fatal(_e)) \
- TEST_SAY0(_C_RED "FATAL "); \
- if (rd_kafka_error_is_retriable(_e)) \
- TEST_SAY0("Retriable "); \
- if (rd_kafka_error_txn_requires_abort(_e)) \
- TEST_SAY0("TxnRequiresAbort "); \
- TEST_SAY0("Error: %s: %s" _C_CLR "\n", \
- rd_kafka_error_name(_e), rd_kafka_error_string(_e)); \
- } while (0)
-
-/**
- * @name rusage.c
- * @{
- */
-void test_rusage_start(struct test *test);
-int test_rusage_stop(struct test *test, double duration);
-
-/**@}*/
-
-#endif /* _TEST_H_ */