/* * librdkafka - Apache Kafka C library * * Copyright (c) 2018, Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ /** * @name Test rd_kafka_destroy_flags() */ #include "test.h" static RD_TLS int rebalance_cnt = 0; static void destroy_flags_rebalance_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *parts, void *opaque) { rebalance_cnt++; TEST_SAY("rebalance_cb: %s with %d partition(s)\n", rd_kafka_err2str(err), parts->cnt); switch (err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: test_consumer_assign("rebalance", rk, parts); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: test_consumer_unassign("rebalance", rk); break; default: TEST_FAIL("rebalance_cb: error: %s", rd_kafka_err2str(err)); } } struct df_args { rd_kafka_type_t client_type; int produce_cnt; int consumer_subscribe; int consumer_unsubscribe; }; static void do_test_destroy_flags(const char *topic, int destroy_flags, int local_mode, const struct df_args *args) { rd_kafka_t *rk; rd_kafka_conf_t *conf; test_timing_t t_destroy; TEST_SAY(_C_MAG "[ test destroy_flags 0x%x for client_type %d, " "produce_cnt %d, subscribe %d, unsubscribe %d, " "%s mode ]\n" _C_CLR, destroy_flags, args->client_type, args->produce_cnt, args->consumer_subscribe, args->consumer_unsubscribe, local_mode ? "local" : "broker"); test_conf_init(&conf, NULL, 20); if (local_mode) test_conf_set(conf, "bootstrap.servers", ""); if (args->client_type == RD_KAFKA_PRODUCER) { rk = test_create_handle(args->client_type, conf); if (args->produce_cnt > 0) { rd_kafka_topic_t *rkt; int msgcounter = 0; rkt = test_create_producer_topic(rk, topic, NULL); test_produce_msgs_nowait( rk, rkt, 0, RD_KAFKA_PARTITION_UA, 0, args->produce_cnt, NULL, 100, 0, &msgcounter); rd_kafka_topic_destroy(rkt); } } else { int i; TEST_ASSERT(args->client_type == RD_KAFKA_CONSUMER); rk = test_create_consumer(topic, destroy_flags_rebalance_cb, conf, NULL); if (args->consumer_subscribe) { test_consumer_subscribe(rk, topic); if (!local_mode) { TEST_SAY("Waiting for assignment\n"); while (rebalance_cnt == 0) test_consumer_poll_once(rk, NULL, 1000); } } for (i = 0; i < 5; i++) test_consumer_poll_once(rk, NULL, 100); if (args->consumer_unsubscribe) { /* Test that calling rd_kafka_unsubscribe immediately * prior to rd_kafka_destroy_flags doesn't cause the * latter to hang. */ TEST_SAY(_C_YEL "Calling rd_kafka_unsubscribe\n"_C_CLR); rd_kafka_unsubscribe(rk); } } rebalance_cnt = 0; TEST_SAY(_C_YEL "Calling rd_kafka_destroy_flags(0x%x)\n" _C_CLR, destroy_flags); TIMING_START(&t_destroy, "rd_kafka_destroy_flags(0x%x)", destroy_flags); rd_kafka_destroy_flags(rk, destroy_flags); TIMING_STOP(&t_destroy); if (destroy_flags & RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE) TIMING_ASSERT_LATER(&t_destroy, 0, 200); else TIMING_ASSERT_LATER(&t_destroy, 0, 1000); if (args->consumer_subscribe && !(destroy_flags & RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE)) { if (!local_mode) TEST_ASSERT(rebalance_cnt > 0, "expected final rebalance callback"); } else TEST_ASSERT(rebalance_cnt == 0, "expected no rebalance callbacks, got %d", rebalance_cnt); TEST_SAY(_C_GRN "[ test destroy_flags 0x%x for client_type %d, " "produce_cnt %d, subscribe %d, unsubscribe %d, " "%s mode: PASS ]\n" _C_CLR, destroy_flags, args->client_type, args->produce_cnt, args->consumer_subscribe, args->consumer_unsubscribe, local_mode ? "local" : "broker"); } /** * @brief Destroy with flags */ static void destroy_flags(int local_mode) { const struct df_args args[] = { {RD_KAFKA_PRODUCER, 0, 0, 0}, {RD_KAFKA_PRODUCER, test_quick ? 100 : 10000, 0, 0}, {RD_KAFKA_CONSUMER, 0, 1, 0}, {RD_KAFKA_CONSUMER, 0, 1, 1}, {RD_KAFKA_CONSUMER, 0, 0, 0}}; const int flag_combos[] = {0, RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE}; const char *topic = test_mk_topic_name(__FUNCTION__, 1); const rd_bool_t can_subscribe = test_broker_version >= TEST_BRKVER(0, 9, 0, 0); int i, j; /* Create the topic to avoid not-yet-auto-created-topics being * subscribed to (and thus raising an error). */ if (!local_mode) { test_create_topic(NULL, topic, 3, 1); test_wait_topic_exists(NULL, topic, 5000); } for (i = 0; i < (int)RD_ARRAYSIZE(args); i++) { for (j = 0; j < (int)RD_ARRAYSIZE(flag_combos); j++) { if (!can_subscribe && (args[i].consumer_subscribe || args[i].consumer_unsubscribe)) continue; do_test_destroy_flags(topic, flag_combos[j], local_mode, &args[i]); } } } int main_0084_destroy_flags_local(int argc, char **argv) { destroy_flags(1 /*no brokers*/); return 0; } int main_0084_destroy_flags(int argc, char **argv) { destroy_flags(0 /*with brokers*/); return 0; }