diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/test.h')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/tests/test.h | 936 |
1 files changed, 0 insertions, 936 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/test.h b/fluent-bit/lib/librdkafka-2.1.0/tests/test.h deleted file mode 100644 index a431f9a25..000000000 --- a/fluent-bit/lib/librdkafka-2.1.0/tests/test.h +++ /dev/null @@ -1,936 +0,0 @@ -/* - * librdkafka - Apache Kafka C library - * - * Copyright (c) 2012-2015, Magnus Edenhill - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ -#ifndef _TEST_H_ -#define _TEST_H_ - -#include "../src/rd.h" - -#include <stdio.h> -#include <string.h> -#include <stdlib.h> -#ifndef _WIN32 -#include <unistd.h> -#endif -#include <errno.h> -#include <assert.h> -#include <time.h> -#include <ctype.h> - -#if HAVE_GETRUSAGE -#include <sys/time.h> -#include <sys/resource.h> -#endif - -#include "rdkafka.h" -#include "rdkafka_mock.h" -#include "tinycthread.h" -#include "rdlist.h" - -#if WITH_SOCKEM -#include "sockem.h" -#endif - -#include "testshared.h" -#ifdef _WIN32 -#define sscanf(...) sscanf_s(__VA_ARGS__) -#endif - -/** - * Test output is controlled through "TEST_LEVEL=N" environemnt variable. - * N < 2: TEST_SAY() is quiet. - */ - -extern int test_seed; -extern char test_mode[64]; -extern RD_TLS struct test *test_curr; -extern int test_assert_on_fail; -extern int tests_running_cnt; -extern int test_concurrent_max; -extern int test_rusage; -extern double test_rusage_cpu_calibration; -extern double test_timeout_multiplier; -extern int test_session_timeout_ms; /* Group session timeout */ -extern int test_flags; -extern int test_neg_flags; -extern int test_idempotent_producer; - -extern mtx_t test_mtx; - -#define TEST_LOCK() mtx_lock(&test_mtx) -#define TEST_UNLOCK() mtx_unlock(&test_mtx) - - -/* Forward decl */ -typedef struct test_msgver_s test_msgver_t; - - -/** @struct Resource usage thresholds */ -struct rusage_thres { - double ucpu; /**< Max User CPU in percentage */ - double scpu; /**< Max Sys CPU in percentage */ - double rss; /**< Max RSS (memory) increase in MB */ - int ctxsw; /**< Max number of voluntary context switches, i.e. - * syscalls. */ -}; - -typedef enum { - TEST_NOT_STARTED, - TEST_SKIPPED, - TEST_RUNNING, - TEST_PASSED, - TEST_FAILED, -} test_state_t; - -struct test { - /** - * Setup - */ - const char *name; /**< e.g. Same as filename minus extension */ - int (*mainfunc)(int argc, char **argv); /**< test's main func */ - const int flags; /**< Test flags */ -#define TEST_F_LOCAL 0x1 /**< Test is local, no broker requirement */ -#define TEST_F_KNOWN_ISSUE \ - 0x2 /**< Known issue, can fail without affecting \ - * total test run status. */ -#define TEST_F_MANUAL \ - 0x4 /**< Manual test, only started when specifically \ - * stated */ -#define TEST_F_SOCKEM 0x8 /**< Test requires socket emulation. */ - int minver; /**< Limit tests to broker version range. */ - int maxver; - - const char *extra; /**< Extra information to print in test_summary. */ - - const char *scenario; /**< Test scenario */ - - char * - *report_arr; /**< Test-specific reporting, JSON array of objects. */ - int report_cnt; - int report_size; - - rd_bool_t ignore_dr_err; /**< Ignore delivery report errors */ - rd_kafka_resp_err_t exp_dr_err; /* Expected error in test_dr_cb */ - rd_kafka_msg_status_t exp_dr_status; /**< Expected delivery status, - * or -1 for not checking. */ - int produce_sync; /**< test_produce_sync() call in action */ - rd_kafka_resp_err_t produce_sync_err; /**< DR error */ - test_msgver_t *dr_mv; /**< MsgVer that delivered messages will be - * added to (if not NULL). - * Must be set and freed by test. */ - - /** - * Runtime - */ - thrd_t thrd; - int64_t start; - int64_t duration; - FILE *stats_fp; - int64_t timeout; - test_state_t state; - int failcnt; /**< Number of failures, useful with FAIL_LATER */ - char failstr[512 + 1]; /**< First test failure reason */ - char subtest[400]; /**< Current subtest, if any */ - test_timing_t subtest_duration; /**< Subtest duration timing */ - rd_bool_t subtest_quick; /**< Subtest is marked as QUICK */ - -#if WITH_SOCKEM - rd_list_t sockets; - int (*connect_cb)(struct test *test, sockem_t *skm, const char *id); -#endif - int (*is_fatal_cb)(rd_kafka_t *rk, - rd_kafka_resp_err_t err, - const char *reason); - - /**< Resource usage thresholds */ - struct rusage_thres rusage_thres; /**< Usage thresholds */ -#if HAVE_GETRUSAGE - struct rusage rusage; /**< Monitored process CPU/mem usage */ -#endif -}; - - -#ifdef _WIN32 -#define TEST_F_KNOWN_ISSUE_WIN32 TEST_F_KNOWN_ISSUE -#else -#define TEST_F_KNOWN_ISSUE_WIN32 0 -#endif - -#ifdef __APPLE__ -#define TEST_F_KNOWN_ISSUE_OSX TEST_F_KNOWN_ISSUE -#else -#define TEST_F_KNOWN_ISSUE_OSX 0 -#endif - - -#define TEST_SAY0(...) fprintf(stderr, __VA_ARGS__) -#define TEST_SAYL(LVL, ...) \ - do { \ - if (test_level >= LVL) { \ - fprintf( \ - stderr, "\033[36m[%-28s/%7.3fs] ", \ - test_curr->name, \ - test_curr->start \ - ? ((float)(test_clock() - test_curr->start) / \ - 1000000.0f) \ - : 0); \ - fprintf(stderr, __VA_ARGS__); \ - fprintf(stderr, "\033[0m"); \ - } \ - } while (0) -#define TEST_SAY(...) TEST_SAYL(2, __VA_ARGS__) - -/** - * Append JSON object (as string) to this tests' report array. - */ -#define TEST_REPORT(...) test_report_add(test_curr, __VA_ARGS__) - - - -static RD_INLINE RD_UNUSED void rtrim(char *str) { - size_t len = strlen(str); - char *s; - - if (len == 0) - return; - - s = str + len - 1; - while (isspace((int)*s)) { - *s = '\0'; - s--; - } -} - -/* Skip the current test. Argument is textual reason (printf format) */ -#define TEST_SKIP(...) \ - do { \ - TEST_WARN("SKIPPING TEST: " __VA_ARGS__); \ - TEST_LOCK(); \ - test_curr->state = TEST_SKIPPED; \ - if (!*test_curr->failstr) { \ - rd_snprintf(test_curr->failstr, \ - sizeof(test_curr->failstr), __VA_ARGS__); \ - rtrim(test_curr->failstr); \ - } \ - TEST_UNLOCK(); \ - } while (0) - - -void test_conf_init(rd_kafka_conf_t **conf, - rd_kafka_topic_conf_t **topic_conf, - int timeout); - - - -void test_msg_fmt(char *dest, - size_t dest_size, - uint64_t testid, - int32_t partition, - int msgid); -void test_msg_parse0(const char *func, - int line, - uint64_t testid, - rd_kafka_message_t *rkmessage, - int32_t exp_partition, - int *msgidp); -#define test_msg_parse(testid, rkmessage, exp_partition, msgidp) \ - test_msg_parse0(__FUNCTION__, __LINE__, testid, rkmessage, \ - exp_partition, msgidp) - - -static RD_INLINE int jitter(int low, int high) RD_UNUSED; -static RD_INLINE int jitter(int low, int high) { - return (low + (rand() % ((high - low) + 1))); -} - - - -/****************************************************************************** - * - * Helpers - * - ******************************************************************************/ - - - -/**************************************************************** - * Message verification services * - * * - * * - * * - ****************************************************************/ - - -/** - * A test_msgver_t is first fed with messages from any number of - * topics and partitions, it is then checked for expected messages, such as: - * - all messages received, based on message payload information. - * - messages received in order - * - EOF - */ -struct test_msgver_s { - struct test_mv_p **p; /* Partitions array */ - int p_cnt; /* Partition count */ - int p_size; /* p size */ - int msgcnt; /* Total message count */ - uint64_t testid; /* Only accept messages for this testid */ - rd_bool_t ignore_eof; /* Don't end PARTITION_EOF messages */ - - struct test_msgver_s *fwd; /* Also forward add_msg() to this mv */ - - int log_cnt; /* Current number of warning logs */ - int log_max; /* Max warning logs before suppressing. */ - int log_suppr_cnt; /* Number of suppressed log messages. */ - - const char *msgid_hdr; /**< msgid string is in header by this name, - * rather than in the payload (default). */ -}; /* test_msgver_t; */ - -/* Message */ -struct test_mv_m { - int64_t offset; /* Message offset */ - int msgid; /* Message id */ - int64_t timestamp; /* Message timestamp */ - int32_t broker_id; /* Message broker id */ -}; - - -/* Message vector */ -struct test_mv_mvec { - struct test_mv_m *m; - int cnt; - int size; /* m[] size */ -}; - -/* Partition */ -struct test_mv_p { - char *topic; - int32_t partition; - struct test_mv_mvec mvec; - int64_t eof_offset; -}; - -/* Verification state */ -struct test_mv_vs { - int msg_base; - int exp_cnt; - - /* used by verify_range */ - int msgid_min; - int msgid_max; - int64_t timestamp_min; - int64_t timestamp_max; - - /* used by verify_broker_id */ - int32_t broker_id; - - struct test_mv_mvec mvec; - - /* Correct msgver for comparison */ - test_msgver_t *corr; -}; - - -void test_msgver_init(test_msgver_t *mv, uint64_t testid); -void test_msgver_clear(test_msgver_t *mv); -void test_msgver_ignore_eof(test_msgver_t *mv); -int test_msgver_add_msg00(const char *func, - int line, - const char *clientname, - test_msgver_t *mv, - uint64_t testid, - const char *topic, - int32_t partition, - int64_t offset, - int64_t timestamp, - int32_t broker_id, - rd_kafka_resp_err_t err, - int msgnum); -int test_msgver_add_msg0(const char *func, - int line, - const char *clientname, - test_msgver_t *mv, - const rd_kafka_message_t *rkmessage, - const char *override_topic); -#define test_msgver_add_msg(rk, mv, rkm) \ - test_msgver_add_msg0(__FUNCTION__, __LINE__, rd_kafka_name(rk), mv, \ - rkm, NULL) - -/** - * Flags to indicate what to verify. - */ -#define TEST_MSGVER_ORDER 0x1 /* Order */ -#define TEST_MSGVER_DUP 0x2 /* Duplicates */ -#define TEST_MSGVER_RANGE 0x4 /* Range of messages */ - -#define TEST_MSGVER_ALL 0xf /* All verifiers */ - -#define TEST_MSGVER_BY_MSGID 0x10000 /* Verify by msgid (unique in testid) */ -#define TEST_MSGVER_BY_OFFSET \ - 0x20000 /* Verify by offset (unique in partition)*/ -#define TEST_MSGVER_BY_TIMESTAMP 0x40000 /* Verify by timestamp range */ -#define TEST_MSGVER_BY_BROKER_ID 0x80000 /* Verify by broker id */ - -#define TEST_MSGVER_SUBSET \ - 0x100000 /* verify_compare: allow correct mv to be \ - * a subset of mv. */ - -/* Only test per partition, not across all messages received on all partitions. - * This is useful when doing incremental verifications with multiple partitions - * and the total number of messages has not been received yet. - * Can't do range check here since messages may be spread out on multiple - * partitions and we might just have read a few partitions. */ -#define TEST_MSGVER_PER_PART \ - ((TEST_MSGVER_ALL & ~TEST_MSGVER_RANGE) | TEST_MSGVER_BY_MSGID | \ - TEST_MSGVER_BY_OFFSET) - -/* Test on all messages across all partitions. - * This can only be used to check with msgid, not offset since that - * is partition local. */ -#define TEST_MSGVER_ALL_PART (TEST_MSGVER_ALL | TEST_MSGVER_BY_MSGID) - - -int test_msgver_verify_part0(const char *func, - int line, - const char *what, - test_msgver_t *mv, - int flags, - const char *topic, - int partition, - int msg_base, - int exp_cnt); -#define test_msgver_verify_part(what, mv, flags, topic, partition, msg_base, \ - exp_cnt) \ - test_msgver_verify_part0(__FUNCTION__, __LINE__, what, mv, flags, \ - topic, partition, msg_base, exp_cnt) - -int test_msgver_verify0(const char *func, - int line, - const char *what, - test_msgver_t *mv, - int flags, - struct test_mv_vs vs); -#define test_msgver_verify(what, mv, flags, msgbase, expcnt) \ - test_msgver_verify0( \ - __FUNCTION__, __LINE__, what, mv, flags, \ - (struct test_mv_vs) {.msg_base = msgbase, .exp_cnt = expcnt}) - - -void test_msgver_verify_compare0(const char *func, - int line, - const char *what, - test_msgver_t *mv, - test_msgver_t *corr, - int flags); -#define test_msgver_verify_compare(what, mv, corr, flags) \ - test_msgver_verify_compare0(__FUNCTION__, __LINE__, what, mv, corr, \ - flags) - -rd_kafka_t *test_create_handle(int mode, rd_kafka_conf_t *conf); - -/** - * Delivery reported callback. - * Called for each message once to signal its delivery status. - */ -void test_dr_msg_cb(rd_kafka_t *rk, - const rd_kafka_message_t *rkmessage, - void *opaque); - -rd_kafka_t *test_create_producer(void); -rd_kafka_topic_t * -test_create_producer_topic(rd_kafka_t *rk, const char *topic, ...); -void test_wait_delivery(rd_kafka_t *rk, int *msgcounterp); -void test_produce_msgs_nowait(rd_kafka_t *rk, - rd_kafka_topic_t *rkt, - uint64_t testid, - int32_t partition, - int msg_base, - int cnt, - const char *payload, - size_t size, - int msgrate, - int *msgcounterp); -void test_produce_msgs(rd_kafka_t *rk, - rd_kafka_topic_t *rkt, - uint64_t testid, - int32_t partition, - int msg_base, - int cnt, - const char *payload, - size_t size); -void test_produce_msgs2(rd_kafka_t *rk, - const char *topic, - uint64_t testid, - int32_t partition, - int msg_base, - int cnt, - const char *payload, - size_t size); -void test_produce_msgs2_nowait(rd_kafka_t *rk, - const char *topic, - uint64_t testid, - int32_t partition, - int msg_base, - int cnt, - const char *payload, - size_t size, - int *remainsp); -void test_produce_msgs_rate(rd_kafka_t *rk, - rd_kafka_topic_t *rkt, - uint64_t testid, - int32_t partition, - int msg_base, - int cnt, - const char *payload, - size_t size, - int msgrate); -rd_kafka_resp_err_t test_produce_sync(rd_kafka_t *rk, - rd_kafka_topic_t *rkt, - uint64_t testid, - int32_t partition); - -void test_produce_msgs_easy_v(const char *topic, - uint64_t testid, - int32_t partition, - int msg_base, - int cnt, - size_t size, - ...); -void test_produce_msgs_easy_multi(uint64_t testid, ...); - -void test_incremental_rebalance_cb(rd_kafka_t *rk, - rd_kafka_resp_err_t err, - rd_kafka_topic_partition_list_t *parts, - void *opaque); -void test_rebalance_cb(rd_kafka_t *rk, - rd_kafka_resp_err_t err, - rd_kafka_topic_partition_list_t *parts, - void *opaque); - -rd_kafka_t *test_create_consumer( - const char *group_id, - void (*rebalance_cb)(rd_kafka_t *rk, - rd_kafka_resp_err_t err, - rd_kafka_topic_partition_list_t *partitions, - void *opaque), - rd_kafka_conf_t *conf, - rd_kafka_topic_conf_t *default_topic_conf); -rd_kafka_topic_t *test_create_consumer_topic(rd_kafka_t *rk, const char *topic); -rd_kafka_topic_t * -test_create_topic_object(rd_kafka_t *rk, const char *topic, ...); -void test_consumer_start(const char *what, - rd_kafka_topic_t *rkt, - int32_t partition, - int64_t start_offset); -void test_consumer_stop(const char *what, - rd_kafka_topic_t *rkt, - int32_t partition); -void test_consumer_seek(const char *what, - rd_kafka_topic_t *rkt, - int32_t partition, - int64_t offset); - -#define TEST_NO_SEEK -1 -int64_t test_consume_msgs(const char *what, - rd_kafka_topic_t *rkt, - uint64_t testid, - int32_t partition, - int64_t offset, - int exp_msg_base, - int exp_cnt, - int parse_fmt); - - -void test_verify_rkmessage0(const char *func, - int line, - rd_kafka_message_t *rkmessage, - uint64_t testid, - int32_t partition, - int msgnum); -#define test_verify_rkmessage(rkmessage, testid, partition, msgnum) \ - test_verify_rkmessage0(__FUNCTION__, __LINE__, rkmessage, testid, \ - partition, msgnum) - -void test_consumer_subscribe(rd_kafka_t *rk, const char *topic); - -void test_consume_msgs_easy_mv0(const char *group_id, - const char *topic, - rd_bool_t txn, - int32_t partition, - uint64_t testid, - int exp_eofcnt, - int exp_msgcnt, - rd_kafka_topic_conf_t *tconf, - test_msgver_t *mv); - -#define test_consume_msgs_easy_mv(group_id, topic, partition, testid, \ - exp_eofcnt, exp_msgcnt, tconf, mv) \ - test_consume_msgs_easy_mv0(group_id, topic, rd_false /*not-txn*/, \ - partition, testid, exp_eofcnt, exp_msgcnt, \ - tconf, mv) - -void test_consume_msgs_easy(const char *group_id, - const char *topic, - uint64_t testid, - int exp_eofcnt, - int exp_msgcnt, - rd_kafka_topic_conf_t *tconf); - -void test_consume_txn_msgs_easy(const char *group_id, - const char *topic, - uint64_t testid, - int exp_eofcnt, - int exp_msgcnt, - rd_kafka_topic_conf_t *tconf); - -void test_consumer_poll_no_msgs(const char *what, - rd_kafka_t *rk, - uint64_t testid, - int timeout_ms); -void test_consumer_poll_expect_err(rd_kafka_t *rk, - uint64_t testid, - int timeout_ms, - rd_kafka_resp_err_t err); -int test_consumer_poll_once(rd_kafka_t *rk, test_msgver_t *mv, int timeout_ms); -int test_consumer_poll_exact_timeout(const char *what, - rd_kafka_t *rk, - uint64_t testid, - int exp_eof_cnt, - int exp_msg_base, - int exp_cnt, - rd_bool_t exact, - test_msgver_t *mv, - int timeout_ms); -int test_consumer_poll_exact(const char *what, - rd_kafka_t *rk, - uint64_t testid, - int exp_eof_cnt, - int exp_msg_base, - int exp_cnt, - rd_bool_t exact, - test_msgver_t *mv); -int test_consumer_poll(const char *what, - rd_kafka_t *rk, - uint64_t testid, - int exp_eof_cnt, - int exp_msg_base, - int exp_cnt, - test_msgver_t *mv); -int test_consumer_poll_timeout(const char *what, - rd_kafka_t *rk, - uint64_t testid, - int exp_eof_cnt, - int exp_msg_base, - int exp_cnt, - test_msgver_t *mv, - int timeout_ms); - -void test_consumer_wait_assignment(rd_kafka_t *rk, rd_bool_t do_poll); -void test_consumer_verify_assignment0(const char *func, - int line, - rd_kafka_t *rk, - int fail_immediately, - ...); -#define test_consumer_verify_assignment(rk, fail_immediately, ...) \ - test_consumer_verify_assignment0(__FUNCTION__, __LINE__, rk, \ - fail_immediately, __VA_ARGS__) - -void test_consumer_assign(const char *what, - rd_kafka_t *rk, - rd_kafka_topic_partition_list_t *parts); -void test_consumer_incremental_assign(const char *what, - rd_kafka_t *rk, - rd_kafka_topic_partition_list_t *parts); -void test_consumer_unassign(const char *what, rd_kafka_t *rk); -void test_consumer_incremental_unassign(const char *what, - rd_kafka_t *rk, - rd_kafka_topic_partition_list_t *parts); -void test_consumer_assign_partition(const char *what, - rd_kafka_t *rk, - const char *topic, - int32_t partition, - int64_t offset); -void test_consumer_pause_resume_partition(rd_kafka_t *rk, - const char *topic, - int32_t partition, - rd_bool_t pause); - -void test_consumer_close(rd_kafka_t *rk); - -void test_flush(rd_kafka_t *rk, int timeout_ms); - -void test_conf_set(rd_kafka_conf_t *conf, const char *name, const char *val); -char *test_topic_conf_get(const rd_kafka_topic_conf_t *tconf, const char *name); -int test_conf_match(rd_kafka_conf_t *conf, const char *name, const char *val); -void test_topic_conf_set(rd_kafka_topic_conf_t *tconf, - const char *name, - const char *val); -void test_any_conf_set(rd_kafka_conf_t *conf, - rd_kafka_topic_conf_t *tconf, - const char *name, - const char *val); - -void test_print_partition_list( - const rd_kafka_topic_partition_list_t *partitions); -int test_partition_list_cmp(rd_kafka_topic_partition_list_t *al, - rd_kafka_topic_partition_list_t *bl); -int test_partition_list_and_offsets_cmp(rd_kafka_topic_partition_list_t *al, - rd_kafka_topic_partition_list_t *bl); - -void test_kafka_topics(const char *fmt, ...); -void test_admin_create_topic(rd_kafka_t *use_rk, - const char *topicname, - int partition_cnt, - int replication_factor, - const char **configs); -void test_create_topic(rd_kafka_t *use_rk, - const char *topicname, - int partition_cnt, - int replication_factor); -rd_kafka_resp_err_t test_auto_create_topic_rkt(rd_kafka_t *rk, - rd_kafka_topic_t *rkt, - int timeout_ms); -rd_kafka_resp_err_t -test_auto_create_topic(rd_kafka_t *rk, const char *name, int timeout_ms); -int test_check_auto_create_topic(void); - -void test_create_partitions(rd_kafka_t *use_rk, - const char *topicname, - int new_partition_cnt); - -int test_get_partition_count(rd_kafka_t *rk, - const char *topicname, - int timeout_ms); - -char *tsprintf(const char *fmt, ...) RD_FORMAT(printf, 1, 2); - -void test_report_add(struct test *test, const char *fmt, ...); -int test_can_create_topics(int skip); - -rd_kafka_event_t *test_wait_event(rd_kafka_queue_t *eventq, - rd_kafka_event_type_t event_type, - int timeout_ms); - -void test_prepare_msg(uint64_t testid, - int32_t partition, - int msg_id, - char *val, - size_t val_size, - char *key, - size_t key_size); - -#if WITH_SOCKEM -void test_socket_enable(rd_kafka_conf_t *conf); -void test_socket_close_all(struct test *test, int reinit); -int test_socket_sockem_set_all(const char *key, int val); -void test_socket_sockem_set(int s, const char *key, int value); -#endif - -void test_headers_dump(const char *what, - int lvl, - const rd_kafka_headers_t *hdrs); - -int32_t *test_get_broker_ids(rd_kafka_t *use_rk, size_t *cntp); - -void test_wait_metadata_update(rd_kafka_t *rk, - rd_kafka_metadata_topic_t *topics, - size_t topic_cnt, - rd_kafka_metadata_topic_t *not_topics, - size_t not_topic_cnt, - int tmout); - -rd_kafka_event_t *test_wait_admin_result(rd_kafka_queue_t *q, - rd_kafka_event_type_t evtype, - int tmout); - -rd_kafka_resp_err_t test_wait_topic_admin_result(rd_kafka_queue_t *q, - rd_kafka_event_type_t evtype, - rd_kafka_event_t **retevent, - int tmout); - -rd_kafka_resp_err_t test_CreateTopics_simple(rd_kafka_t *rk, - rd_kafka_queue_t *useq, - char **topics, - size_t topic_cnt, - int num_partitions, - void *opaque); -rd_kafka_resp_err_t test_CreatePartitions_simple(rd_kafka_t *rk, - rd_kafka_queue_t *useq, - const char *topic, - size_t total_part_cnt, - void *opaque); - -rd_kafka_resp_err_t test_DeleteTopics_simple(rd_kafka_t *rk, - rd_kafka_queue_t *useq, - char **topics, - size_t topic_cnt, - void *opaque); - -rd_kafka_resp_err_t test_AlterConfigs_simple(rd_kafka_t *rk, - rd_kafka_ResourceType_t restype, - const char *resname, - const char **configs, - size_t config_cnt); - -rd_kafka_resp_err_t test_DeleteGroups_simple(rd_kafka_t *rk, - rd_kafka_queue_t *useq, - char **groups, - size_t group_cnt, - void *opaque); - -rd_kafka_resp_err_t -test_DeleteRecords_simple(rd_kafka_t *rk, - rd_kafka_queue_t *useq, - const rd_kafka_topic_partition_list_t *offsets, - void *opaque); - -rd_kafka_resp_err_t test_DeleteConsumerGroupOffsets_simple( - rd_kafka_t *rk, - rd_kafka_queue_t *useq, - const char *group_id, - const rd_kafka_topic_partition_list_t *offsets, - void *opaque); - -rd_kafka_resp_err_t test_CreateAcls_simple(rd_kafka_t *rk, - rd_kafka_queue_t *useq, - rd_kafka_AclBinding_t **acls, - size_t acl_cnt, - void *opaque); - -rd_kafka_resp_err_t test_delete_all_test_topics(int timeout_ms); - -void test_mock_cluster_destroy(rd_kafka_mock_cluster_t *mcluster); -rd_kafka_mock_cluster_t *test_mock_cluster_new(int broker_cnt, - const char **bootstraps); - - - -int test_error_is_not_fatal_cb(rd_kafka_t *rk, - rd_kafka_resp_err_t err, - const char *reason); - - -/** - * @brief Calls rdkafka function (with arguments) - * and checks its return value (must be rd_kafka_resp_err_t) for - * error, in which case the test fails. - * Also times the call. - * - * @remark The trailing __ makes calling code easier to read. - */ -#define TEST_CALL__(FUNC_W_ARGS) \ - do { \ - test_timing_t _timing; \ - const char *_desc = RD_STRINGIFY(FUNC_W_ARGS); \ - rd_kafka_resp_err_t _err; \ - TIMING_START(&_timing, "%s", _desc); \ - TEST_SAYL(3, "Begin call %s\n", _desc); \ - _err = FUNC_W_ARGS; \ - TIMING_STOP(&_timing); \ - if (!_err) \ - break; \ - if (strstr(_desc, "errstr")) \ - TEST_FAIL("%s failed: %s: %s\n", _desc, \ - rd_kafka_err2name(_err), errstr); \ - else \ - TEST_FAIL("%s failed: %s\n", _desc, \ - rd_kafka_err2str(_err)); \ - } while (0) - - -/** - * @brief Same as TEST_CALL__() but expects an rd_kafka_error_t * return type. - */ -#define TEST_CALL_ERROR__(FUNC_W_ARGS) \ - do { \ - test_timing_t _timing; \ - const char *_desc = RD_STRINGIFY(FUNC_W_ARGS); \ - const rd_kafka_error_t *_error; \ - TIMING_START(&_timing, "%s", _desc); \ - TEST_SAYL(3, "Begin call %s\n", _desc); \ - _error = FUNC_W_ARGS; \ - TIMING_STOP(&_timing); \ - if (!_error) \ - break; \ - TEST_FAIL("%s failed: %s\n", _desc, \ - rd_kafka_error_string(_error)); \ - } while (0) - -/** - * @brief Same as TEST_CALL__() but expects an rd_kafka_resp_err_t return type - * without errstr. - */ -#define TEST_CALL_ERR__(FUNC_W_ARGS) \ - do { \ - test_timing_t _timing; \ - const char *_desc = RD_STRINGIFY(FUNC_W_ARGS); \ - rd_kafka_resp_err_t _err; \ - TIMING_START(&_timing, "%s", _desc); \ - TEST_SAYL(3, "Begin call %s\n", _desc); \ - _err = FUNC_W_ARGS; \ - TIMING_STOP(&_timing); \ - if (!_err) \ - break; \ - TEST_FAIL("%s failed: %s\n", _desc, rd_kafka_err2str(_err)); \ - } while (0) - - -/** - * @brief Print a rich error_t object in all its glory. NULL is ok. - * - * @param ... Is a prefix format-string+args that is printed with TEST_SAY() - * prior to the error details. E.g., "commit() returned: ". - * A newline is automatically appended. - */ -#define TEST_SAY_ERROR(ERROR, ...) \ - do { \ - rd_kafka_error_t *_e = (ERROR); \ - TEST_SAY(__VA_ARGS__); \ - if (!_e) { \ - TEST_SAY0("No error" _C_CLR "\n"); \ - break; \ - } \ - if (rd_kafka_error_is_fatal(_e)) \ - TEST_SAY0(_C_RED "FATAL "); \ - if (rd_kafka_error_is_retriable(_e)) \ - TEST_SAY0("Retriable "); \ - if (rd_kafka_error_txn_requires_abort(_e)) \ - TEST_SAY0("TxnRequiresAbort "); \ - TEST_SAY0("Error: %s: %s" _C_CLR "\n", \ - rd_kafka_error_name(_e), rd_kafka_error_string(_e)); \ - } while (0) - -/** - * @name rusage.c - * @{ - */ -void test_rusage_start(struct test *test); -int test_rusage_stop(struct test *test, double duration); - -/**@}*/ - -#endif /* _TEST_H_ */ |