/* * librdkafka - Apache Kafka C library * * Copyright (c) 2012-2015, Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include "test.h" #include "rdkafka.h" #include /** * Verify that subscription is updated on metadata changes: * - topic additions * - topic deletions * - partition count changes */ /** * Wait for REBALANCE ASSIGN event and perform assignment * * Va-args are \p topic_cnt tuples of the expected assignment: * { const char *topic, int partition_cnt } */ static void await_assignment(const char *pfx, rd_kafka_t *rk, rd_kafka_queue_t *queue, int topic_cnt, ...) { rd_kafka_event_t *rkev; rd_kafka_topic_partition_list_t *tps; int i; va_list ap; int fails = 0; int exp_part_cnt = 0; TEST_SAY("%s: waiting for assignment\n", pfx); rkev = test_wait_event(queue, RD_KAFKA_EVENT_REBALANCE, 30000); if (!rkev) TEST_FAIL("timed out waiting for assignment"); TEST_ASSERT(rd_kafka_event_error(rkev) == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, "expected ASSIGN, got %s", rd_kafka_err2str(rd_kafka_event_error(rkev))); tps = rd_kafka_event_topic_partition_list(rkev); TEST_SAY("%s: assignment:\n", pfx); test_print_partition_list(tps); va_start(ap, topic_cnt); for (i = 0; i < topic_cnt; i++) { const char *topic = va_arg(ap, const char *); int partition_cnt = va_arg(ap, int); int p; TEST_SAY("%s: expecting %s with %d partitions\n", pfx, topic, partition_cnt); for (p = 0; p < partition_cnt; p++) { if (!rd_kafka_topic_partition_list_find(tps, topic, p)) { TEST_FAIL_LATER( "%s: expected partition %s [%d] " "not found in assginment", pfx, topic, p); fails++; } } exp_part_cnt += partition_cnt; } va_end(ap); TEST_ASSERT(exp_part_cnt == tps->cnt, "expected assignment of %d partitions, got %d", exp_part_cnt, tps->cnt); if (fails > 0) TEST_FAIL("%s: assignment mismatch: see above", pfx); rd_kafka_assign(rk, tps); rd_kafka_event_destroy(rkev); } /** * Wait for REBALANCE REVOKE event and perform unassignment. */ static void await_revoke(const char *pfx, rd_kafka_t *rk, rd_kafka_queue_t *queue) { rd_kafka_event_t *rkev; TEST_SAY("%s: waiting for revoke\n", pfx); rkev = test_wait_event(queue, RD_KAFKA_EVENT_REBALANCE, 30000); if (!rkev) TEST_FAIL("timed out waiting for revoke"); TEST_ASSERT(rd_kafka_event_error(rkev) == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS, "expected REVOKE, got %s", rd_kafka_err2str(rd_kafka_event_error(rkev))); rd_kafka_assign(rk, NULL); rd_kafka_event_destroy(rkev); } /** * Wait \p timeout_ms to make sure no rebalance was triggered. */ static void await_no_rebalance(const char *pfx, rd_kafka_t *rk, rd_kafka_queue_t *queue, int timeout_ms) { rd_kafka_event_t *rkev; TEST_SAY("%s: waiting for %d ms to not see rebalance\n", pfx, timeout_ms); rkev = test_wait_event(queue, RD_KAFKA_EVENT_REBALANCE, timeout_ms); if (!rkev) return; TEST_ASSERT(rkev, "did not expect %s: %s", rd_kafka_event_name(rkev), rd_kafka_err2str(rd_kafka_event_error(rkev))); rd_kafka_event_destroy(rkev); } static void do_test_non_exist_and_partchange(void) { char *topic_a = rd_strdup(test_mk_topic_name("topic_a", 1)); rd_kafka_t *rk; rd_kafka_conf_t *conf; rd_kafka_queue_t *queue; /** * Test #1: * - Subscribe to non-existing topic. * - Verify empty assignment * - Create topic * - Verify new assignment containing topic */ SUB_TEST(); test_conf_init(&conf, NULL, 60); /* Decrease metadata interval to speed up topic change discovery. */ test_conf_set(conf, "topic.metadata.refresh.interval.ms", "5000"); rd_kafka_conf_set_events(conf, RD_KAFKA_EVENT_REBALANCE); rk = test_create_consumer(test_str_id_generate_tmp(), NULL, conf, NULL); queue = rd_kafka_queue_get_consumer(rk); TEST_SAY("#1: Subscribing to %s\n", topic_a); test_consumer_subscribe(rk, topic_a); /* Should not see a rebalance since no topics are matched. */ await_no_rebalance("#1: empty", rk, queue, 10000); TEST_SAY("#1: creating topic %s\n", topic_a); test_create_topic(NULL, topic_a, 2, 1); await_assignment("#1: proper", rk, queue, 1, topic_a, 2); /** * Test #2 (continue with #1 consumer) * - Increase the partition count * - Verify updated assignment */ test_kafka_topics("--alter --topic %s --partitions 4", topic_a); await_revoke("#2", rk, queue); await_assignment("#2: more partitions", rk, queue, 1, topic_a, 4); test_consumer_close(rk); rd_kafka_queue_destroy(queue); rd_kafka_destroy(rk); rd_free(topic_a); SUB_TEST_PASS(); } static void do_test_regex(void) { char *base_topic = rd_strdup(test_mk_topic_name("topic", 1)); char *topic_b = rd_strdup(tsprintf("%s_b", base_topic)); char *topic_c = rd_strdup(tsprintf("%s_c", base_topic)); char *topic_d = rd_strdup(tsprintf("%s_d", base_topic)); char *topic_e = rd_strdup(tsprintf("%s_e", base_topic)); rd_kafka_t *rk; rd_kafka_conf_t *conf; rd_kafka_queue_t *queue; /** * Regex test: * - Create topic b * - Subscribe to b & d & e * - Verify b assignment * - Create topic c * - Verify no rebalance * - Create topic d * - Verify b & d assignment */ SUB_TEST(); test_conf_init(&conf, NULL, 60); /* Decrease metadata interval to speed up topic change discovery. */ test_conf_set(conf, "topic.metadata.refresh.interval.ms", "5000"); rd_kafka_conf_set_events(conf, RD_KAFKA_EVENT_REBALANCE); rk = test_create_consumer(test_str_id_generate_tmp(), NULL, conf, NULL); queue = rd_kafka_queue_get_consumer(rk); TEST_SAY("Regex: creating topic %s (subscribed)\n", topic_b); test_create_topic(NULL, topic_b, 2, 1); rd_sleep(1); // FIXME: do check&wait loop instead TEST_SAY("Regex: Subscribing to %s & %s & %s\n", topic_b, topic_d, topic_e); test_consumer_subscribe(rk, tsprintf("^%s_[bde]$", base_topic)); await_assignment("Regex: just one topic exists", rk, queue, 1, topic_b, 2); TEST_SAY("Regex: creating topic %s (not subscribed)\n", topic_c); test_create_topic(NULL, topic_c, 4, 1); /* Should not see a rebalance since no topics are matched. */ await_no_rebalance("Regex: empty", rk, queue, 10000); TEST_SAY("Regex: creating topic %s (subscribed)\n", topic_d); test_create_topic(NULL, topic_d, 1, 1); await_revoke("Regex: rebalance after topic creation", rk, queue); await_assignment("Regex: two topics exist", rk, queue, 2, topic_b, 2, topic_d, 1); test_consumer_close(rk); rd_kafka_queue_destroy(queue); rd_kafka_destroy(rk); rd_free(base_topic); rd_free(topic_b); rd_free(topic_c); rd_free(topic_d); rd_free(topic_e); SUB_TEST_PASS(); } /** * @remark Requires scenario=noautocreate. */ static void do_test_topic_remove(void) { char *topic_f = rd_strdup(test_mk_topic_name("topic_f", 1)); char *topic_g = rd_strdup(test_mk_topic_name("topic_g", 1)); int parts_f = 5; int parts_g = 9; rd_kafka_t *rk; rd_kafka_conf_t *conf; rd_kafka_queue_t *queue; rd_kafka_topic_partition_list_t *topics; rd_kafka_resp_err_t err; /** * Topic removal test: * - Create topic f & g * - Subscribe to f & g * - Verify f & g assignment * - Remove topic f * - Verify g assignment * - Remove topic g * - Verify empty assignment */ SUB_TEST("Topic removal testing"); test_conf_init(&conf, NULL, 60); /* Decrease metadata interval to speed up topic change discovery. */ test_conf_set(conf, "topic.metadata.refresh.interval.ms", "5000"); rd_kafka_conf_set_events(conf, RD_KAFKA_EVENT_REBALANCE); rk = test_create_consumer(test_str_id_generate_tmp(), NULL, conf, NULL); queue = rd_kafka_queue_get_consumer(rk); TEST_SAY("Topic removal: creating topic %s (subscribed)\n", topic_f); test_create_topic(NULL, topic_f, parts_f, 1); TEST_SAY("Topic removal: creating topic %s (subscribed)\n", topic_g); test_create_topic(NULL, topic_g, parts_g, 1); rd_sleep(1); // FIXME: do check&wait loop instead TEST_SAY("Topic removal: Subscribing to %s & %s\n", topic_f, topic_g); topics = rd_kafka_topic_partition_list_new(2); rd_kafka_topic_partition_list_add(topics, topic_f, RD_KAFKA_PARTITION_UA); rd_kafka_topic_partition_list_add(topics, topic_g, RD_KAFKA_PARTITION_UA); err = rd_kafka_subscribe(rk, topics); TEST_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR, "%s", rd_kafka_err2str(err)); rd_kafka_topic_partition_list_destroy(topics); await_assignment("Topic removal: both topics exist", rk, queue, 2, topic_f, parts_f, topic_g, parts_g); TEST_SAY("Topic removal: removing %s\n", topic_f); test_kafka_topics("--delete --topic %s", topic_f); await_revoke("Topic removal: rebalance after topic removal", rk, queue); await_assignment("Topic removal: one topic exists", rk, queue, 1, topic_g, parts_g); TEST_SAY("Topic removal: removing %s\n", topic_g); test_kafka_topics("--delete --topic %s", topic_g); await_revoke("Topic removal: rebalance after 2nd topic removal", rk, queue); /* Should not see another rebalance since all topics now removed */ await_no_rebalance("Topic removal: empty", rk, queue, 10000); test_consumer_close(rk); rd_kafka_queue_destroy(queue); rd_kafka_destroy(rk); rd_free(topic_f); rd_free(topic_g); SUB_TEST_PASS(); } /** * @brief Subscribe to a regex and continually create a lot of matching topics, * triggering many rebalances. * * This is using the mock cluster. * */ static void do_test_regex_many_mock(const char *assignment_strategy, rd_bool_t lots_of_topics) { const char *base_topic = "topic"; rd_kafka_t *rk; rd_kafka_conf_t *conf; rd_kafka_mock_cluster_t *mcluster; const char *bootstraps; int topic_cnt = lots_of_topics ? 300 : 50; int await_assignment_every = lots_of_topics ? 150 : 15; int i; SUB_TEST("%s with %d topics", assignment_strategy, topic_cnt); mcluster = test_mock_cluster_new(3, &bootstraps); test_conf_init(&conf, NULL, 60 * 5); test_conf_set(conf, "security.protocol", "plaintext"); test_conf_set(conf, "bootstrap.servers", bootstraps); test_conf_set(conf, "partition.assignment.strategy", assignment_strategy); /* Decrease metadata interval to speed up topic change discovery. */ test_conf_set(conf, "topic.metadata.refresh.interval.ms", "3000"); rk = test_create_consumer("mygroup", test_rebalance_cb, conf, NULL); test_consumer_subscribe(rk, tsprintf("^%s_.*", base_topic)); for (i = 0; i < topic_cnt; i++) { char topic[256]; rd_snprintf(topic, sizeof(topic), "%s_%d", base_topic, i); TEST_SAY("Creating topic %s\n", topic); TEST_CALL_ERR__(rd_kafka_mock_topic_create(mcluster, topic, 1 + (i % 8), 1)); test_consumer_poll_no_msgs("POLL", rk, 0, lots_of_topics ? 100 : 300); /* Wait for an assignment to let the consumer catch up on * all rebalancing. */ if (i % await_assignment_every == await_assignment_every - 1) test_consumer_wait_assignment(rk, rd_true /*poll*/); else if (!lots_of_topics) rd_usleep(100 * 1000, NULL); } test_consumer_close(rk); rd_kafka_destroy(rk); test_mock_cluster_destroy(mcluster); SUB_TEST_PASS(); } int main_0045_subscribe_update(int argc, char **argv) { if (!test_can_create_topics(1)) return 0; do_test_regex(); return 0; } int main_0045_subscribe_update_non_exist_and_partchange(int argc, char **argv) { do_test_non_exist_and_partchange(); return 0; } int main_0045_subscribe_update_topic_remove(int argc, char **argv) { if (!test_can_create_topics(1)) return 0; do_test_topic_remove(); return 0; } int main_0045_subscribe_update_mock(int argc, char **argv) { do_test_regex_many_mock("range", rd_false); do_test_regex_many_mock("cooperative-sticky", rd_false); do_test_regex_many_mock("cooperative-sticky", rd_true); return 0; }