summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/tests/0045-subscribe_update.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/0045-subscribe_update.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/tests/0045-subscribe_update.c459
1 files changed, 459 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/0045-subscribe_update.c b/fluent-bit/lib/librdkafka-2.1.0/tests/0045-subscribe_update.c
new file mode 100644
index 00000000..f804613d
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/tests/0045-subscribe_update.c
@@ -0,0 +1,459 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2015, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+#include "rdkafka.h"
+
+#include <stdarg.h>
+
+/**
+ * Verify that subscription is updated on metadata changes:
+ * - topic additions
+ * - topic deletions
+ * - partition count changes
+ */
+
+
+
+/**
+ * Wait for REBALANCE ASSIGN event and perform assignment
+ *
+ * Va-args are \p topic_cnt tuples of the expected assignment:
+ * { const char *topic, int partition_cnt }
+ */
+static void await_assignment(const char *pfx,
+ rd_kafka_t *rk,
+ rd_kafka_queue_t *queue,
+ int topic_cnt,
+ ...) {
+ rd_kafka_event_t *rkev;
+ rd_kafka_topic_partition_list_t *tps;
+ int i;
+ va_list ap;
+ int fails = 0;
+ int exp_part_cnt = 0;
+
+ TEST_SAY("%s: waiting for assignment\n", pfx);
+ rkev = test_wait_event(queue, RD_KAFKA_EVENT_REBALANCE, 30000);
+ if (!rkev)
+ TEST_FAIL("timed out waiting for assignment");
+ TEST_ASSERT(rd_kafka_event_error(rkev) ==
+ RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
+ "expected ASSIGN, got %s",
+ rd_kafka_err2str(rd_kafka_event_error(rkev)));
+ tps = rd_kafka_event_topic_partition_list(rkev);
+
+ TEST_SAY("%s: assignment:\n", pfx);
+ test_print_partition_list(tps);
+
+ va_start(ap, topic_cnt);
+ for (i = 0; i < topic_cnt; i++) {
+ const char *topic = va_arg(ap, const char *);
+ int partition_cnt = va_arg(ap, int);
+ int p;
+ TEST_SAY("%s: expecting %s with %d partitions\n", pfx, topic,
+ partition_cnt);
+ for (p = 0; p < partition_cnt; p++) {
+ if (!rd_kafka_topic_partition_list_find(tps, topic,
+ p)) {
+ TEST_FAIL_LATER(
+ "%s: expected partition %s [%d] "
+ "not found in assginment",
+ pfx, topic, p);
+ fails++;
+ }
+ }
+ exp_part_cnt += partition_cnt;
+ }
+ va_end(ap);
+
+ TEST_ASSERT(exp_part_cnt == tps->cnt,
+ "expected assignment of %d partitions, got %d",
+ exp_part_cnt, tps->cnt);
+
+ if (fails > 0)
+ TEST_FAIL("%s: assignment mismatch: see above", pfx);
+
+ rd_kafka_assign(rk, tps);
+ rd_kafka_event_destroy(rkev);
+}
+
+
+/**
+ * Wait for REBALANCE REVOKE event and perform unassignment.
+ */
+static void
+await_revoke(const char *pfx, rd_kafka_t *rk, rd_kafka_queue_t *queue) {
+ rd_kafka_event_t *rkev;
+
+ TEST_SAY("%s: waiting for revoke\n", pfx);
+ rkev = test_wait_event(queue, RD_KAFKA_EVENT_REBALANCE, 30000);
+ if (!rkev)
+ TEST_FAIL("timed out waiting for revoke");
+ TEST_ASSERT(rd_kafka_event_error(rkev) ==
+ RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
+ "expected REVOKE, got %s",
+ rd_kafka_err2str(rd_kafka_event_error(rkev)));
+ rd_kafka_assign(rk, NULL);
+ rd_kafka_event_destroy(rkev);
+}
+
+/**
+ * Wait \p timeout_ms to make sure no rebalance was triggered.
+ */
+static void await_no_rebalance(const char *pfx,
+ rd_kafka_t *rk,
+ rd_kafka_queue_t *queue,
+ int timeout_ms) {
+ rd_kafka_event_t *rkev;
+
+ TEST_SAY("%s: waiting for %d ms to not see rebalance\n", pfx,
+ timeout_ms);
+ rkev = test_wait_event(queue, RD_KAFKA_EVENT_REBALANCE, timeout_ms);
+ if (!rkev)
+ return;
+ TEST_ASSERT(rkev, "did not expect %s: %s", rd_kafka_event_name(rkev),
+ rd_kafka_err2str(rd_kafka_event_error(rkev)));
+ rd_kafka_event_destroy(rkev);
+}
+
+static void do_test_non_exist_and_partchange(void) {
+ char *topic_a = rd_strdup(test_mk_topic_name("topic_a", 1));
+ rd_kafka_t *rk;
+ rd_kafka_conf_t *conf;
+ rd_kafka_queue_t *queue;
+
+ /**
+ * Test #1:
+ * - Subscribe to non-existing topic.
+ * - Verify empty assignment
+ * - Create topic
+ * - Verify new assignment containing topic
+ */
+
+ SUB_TEST();
+
+ test_conf_init(&conf, NULL, 60);
+
+ /* Decrease metadata interval to speed up topic change discovery. */
+ test_conf_set(conf, "topic.metadata.refresh.interval.ms", "5000");
+
+ rd_kafka_conf_set_events(conf, RD_KAFKA_EVENT_REBALANCE);
+ rk = test_create_consumer(test_str_id_generate_tmp(), NULL, conf, NULL);
+ queue = rd_kafka_queue_get_consumer(rk);
+
+ TEST_SAY("#1: Subscribing to %s\n", topic_a);
+ test_consumer_subscribe(rk, topic_a);
+
+ /* Should not see a rebalance since no topics are matched. */
+ await_no_rebalance("#1: empty", rk, queue, 10000);
+
+ TEST_SAY("#1: creating topic %s\n", topic_a);
+ test_create_topic(NULL, topic_a, 2, 1);
+
+ await_assignment("#1: proper", rk, queue, 1, topic_a, 2);
+
+
+ /**
+ * Test #2 (continue with #1 consumer)
+ * - Increase the partition count
+ * - Verify updated assignment
+ */
+ test_kafka_topics("--alter --topic %s --partitions 4", topic_a);
+ await_revoke("#2", rk, queue);
+
+ await_assignment("#2: more partitions", rk, queue, 1, topic_a, 4);
+
+ test_consumer_close(rk);
+ rd_kafka_queue_destroy(queue);
+ rd_kafka_destroy(rk);
+
+ rd_free(topic_a);
+
+ SUB_TEST_PASS();
+}
+
+
+
+static void do_test_regex(void) {
+ char *base_topic = rd_strdup(test_mk_topic_name("topic", 1));
+ char *topic_b = rd_strdup(tsprintf("%s_b", base_topic));
+ char *topic_c = rd_strdup(tsprintf("%s_c", base_topic));
+ char *topic_d = rd_strdup(tsprintf("%s_d", base_topic));
+ char *topic_e = rd_strdup(tsprintf("%s_e", base_topic));
+ rd_kafka_t *rk;
+ rd_kafka_conf_t *conf;
+ rd_kafka_queue_t *queue;
+
+ /**
+ * Regex test:
+ * - Create topic b
+ * - Subscribe to b & d & e
+ * - Verify b assignment
+ * - Create topic c
+ * - Verify no rebalance
+ * - Create topic d
+ * - Verify b & d assignment
+ */
+
+ SUB_TEST();
+
+ test_conf_init(&conf, NULL, 60);
+
+ /* Decrease metadata interval to speed up topic change discovery. */
+ test_conf_set(conf, "topic.metadata.refresh.interval.ms", "5000");
+
+ rd_kafka_conf_set_events(conf, RD_KAFKA_EVENT_REBALANCE);
+ rk = test_create_consumer(test_str_id_generate_tmp(), NULL, conf, NULL);
+ queue = rd_kafka_queue_get_consumer(rk);
+
+ TEST_SAY("Regex: creating topic %s (subscribed)\n", topic_b);
+ test_create_topic(NULL, topic_b, 2, 1);
+ rd_sleep(1); // FIXME: do check&wait loop instead
+
+ TEST_SAY("Regex: Subscribing to %s & %s & %s\n", topic_b, topic_d,
+ topic_e);
+ test_consumer_subscribe(rk, tsprintf("^%s_[bde]$", base_topic));
+
+ await_assignment("Regex: just one topic exists", rk, queue, 1, topic_b,
+ 2);
+
+ TEST_SAY("Regex: creating topic %s (not subscribed)\n", topic_c);
+ test_create_topic(NULL, topic_c, 4, 1);
+
+ /* Should not see a rebalance since no topics are matched. */
+ await_no_rebalance("Regex: empty", rk, queue, 10000);
+
+ TEST_SAY("Regex: creating topic %s (subscribed)\n", topic_d);
+ test_create_topic(NULL, topic_d, 1, 1);
+
+ await_revoke("Regex: rebalance after topic creation", rk, queue);
+
+ await_assignment("Regex: two topics exist", rk, queue, 2, topic_b, 2,
+ topic_d, 1);
+
+ test_consumer_close(rk);
+ rd_kafka_queue_destroy(queue);
+ rd_kafka_destroy(rk);
+
+ rd_free(base_topic);
+ rd_free(topic_b);
+ rd_free(topic_c);
+ rd_free(topic_d);
+ rd_free(topic_e);
+
+ SUB_TEST_PASS();
+}
+
+/**
+ * @remark Requires scenario=noautocreate.
+ */
+static void do_test_topic_remove(void) {
+ char *topic_f = rd_strdup(test_mk_topic_name("topic_f", 1));
+ char *topic_g = rd_strdup(test_mk_topic_name("topic_g", 1));
+ int parts_f = 5;
+ int parts_g = 9;
+ rd_kafka_t *rk;
+ rd_kafka_conf_t *conf;
+ rd_kafka_queue_t *queue;
+ rd_kafka_topic_partition_list_t *topics;
+ rd_kafka_resp_err_t err;
+
+ /**
+ * Topic removal test:
+ * - Create topic f & g
+ * - Subscribe to f & g
+ * - Verify f & g assignment
+ * - Remove topic f
+ * - Verify g assignment
+ * - Remove topic g
+ * - Verify empty assignment
+ */
+
+ SUB_TEST("Topic removal testing");
+
+ test_conf_init(&conf, NULL, 60);
+
+ /* Decrease metadata interval to speed up topic change discovery. */
+ test_conf_set(conf, "topic.metadata.refresh.interval.ms", "5000");
+
+ rd_kafka_conf_set_events(conf, RD_KAFKA_EVENT_REBALANCE);
+ rk = test_create_consumer(test_str_id_generate_tmp(), NULL, conf, NULL);
+ queue = rd_kafka_queue_get_consumer(rk);
+
+ TEST_SAY("Topic removal: creating topic %s (subscribed)\n", topic_f);
+ test_create_topic(NULL, topic_f, parts_f, 1);
+
+ TEST_SAY("Topic removal: creating topic %s (subscribed)\n", topic_g);
+ test_create_topic(NULL, topic_g, parts_g, 1);
+
+ rd_sleep(1); // FIXME: do check&wait loop instead
+
+ TEST_SAY("Topic removal: Subscribing to %s & %s\n", topic_f, topic_g);
+ topics = rd_kafka_topic_partition_list_new(2);
+ rd_kafka_topic_partition_list_add(topics, topic_f,
+ RD_KAFKA_PARTITION_UA);
+ rd_kafka_topic_partition_list_add(topics, topic_g,
+ RD_KAFKA_PARTITION_UA);
+ err = rd_kafka_subscribe(rk, topics);
+ TEST_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR, "%s",
+ rd_kafka_err2str(err));
+ rd_kafka_topic_partition_list_destroy(topics);
+
+ await_assignment("Topic removal: both topics exist", rk, queue, 2,
+ topic_f, parts_f, topic_g, parts_g);
+
+ TEST_SAY("Topic removal: removing %s\n", topic_f);
+ test_kafka_topics("--delete --topic %s", topic_f);
+
+ await_revoke("Topic removal: rebalance after topic removal", rk, queue);
+
+ await_assignment("Topic removal: one topic exists", rk, queue, 1,
+ topic_g, parts_g);
+
+ TEST_SAY("Topic removal: removing %s\n", topic_g);
+ test_kafka_topics("--delete --topic %s", topic_g);
+
+ await_revoke("Topic removal: rebalance after 2nd topic removal", rk,
+ queue);
+
+ /* Should not see another rebalance since all topics now removed */
+ await_no_rebalance("Topic removal: empty", rk, queue, 10000);
+
+ test_consumer_close(rk);
+ rd_kafka_queue_destroy(queue);
+ rd_kafka_destroy(rk);
+
+ rd_free(topic_f);
+ rd_free(topic_g);
+
+ SUB_TEST_PASS();
+}
+
+
+
+/**
+ * @brief Subscribe to a regex and continually create a lot of matching topics,
+ * triggering many rebalances.
+ *
+ * This is using the mock cluster.
+ *
+ */
+static void do_test_regex_many_mock(const char *assignment_strategy,
+ rd_bool_t lots_of_topics) {
+ const char *base_topic = "topic";
+ rd_kafka_t *rk;
+ rd_kafka_conf_t *conf;
+ rd_kafka_mock_cluster_t *mcluster;
+ const char *bootstraps;
+ int topic_cnt = lots_of_topics ? 300 : 50;
+ int await_assignment_every = lots_of_topics ? 150 : 15;
+ int i;
+
+ SUB_TEST("%s with %d topics", assignment_strategy, topic_cnt);
+
+ mcluster = test_mock_cluster_new(3, &bootstraps);
+ test_conf_init(&conf, NULL, 60 * 5);
+
+ test_conf_set(conf, "security.protocol", "plaintext");
+ test_conf_set(conf, "bootstrap.servers", bootstraps);
+ test_conf_set(conf, "partition.assignment.strategy",
+ assignment_strategy);
+ /* Decrease metadata interval to speed up topic change discovery. */
+ test_conf_set(conf, "topic.metadata.refresh.interval.ms", "3000");
+
+ rk = test_create_consumer("mygroup", test_rebalance_cb, conf, NULL);
+
+ test_consumer_subscribe(rk, tsprintf("^%s_.*", base_topic));
+
+ for (i = 0; i < topic_cnt; i++) {
+ char topic[256];
+
+ rd_snprintf(topic, sizeof(topic), "%s_%d", base_topic, i);
+
+
+ TEST_SAY("Creating topic %s\n", topic);
+ TEST_CALL_ERR__(rd_kafka_mock_topic_create(mcluster, topic,
+ 1 + (i % 8), 1));
+
+ test_consumer_poll_no_msgs("POLL", rk, 0,
+ lots_of_topics ? 100 : 300);
+
+ /* Wait for an assignment to let the consumer catch up on
+ * all rebalancing. */
+ if (i % await_assignment_every == await_assignment_every - 1)
+ test_consumer_wait_assignment(rk, rd_true /*poll*/);
+ else if (!lots_of_topics)
+ rd_usleep(100 * 1000, NULL);
+ }
+
+ test_consumer_close(rk);
+ rd_kafka_destroy(rk);
+
+ test_mock_cluster_destroy(mcluster);
+
+ SUB_TEST_PASS();
+}
+
+
+
+int main_0045_subscribe_update(int argc, char **argv) {
+
+ if (!test_can_create_topics(1))
+ return 0;
+
+ do_test_regex();
+
+ return 0;
+}
+
+int main_0045_subscribe_update_non_exist_and_partchange(int argc, char **argv) {
+
+ do_test_non_exist_and_partchange();
+
+ return 0;
+}
+
+int main_0045_subscribe_update_topic_remove(int argc, char **argv) {
+
+ if (!test_can_create_topics(1))
+ return 0;
+
+ do_test_topic_remove();
+
+ return 0;
+}
+
+
+int main_0045_subscribe_update_mock(int argc, char **argv) {
+ do_test_regex_many_mock("range", rd_false);
+ do_test_regex_many_mock("cooperative-sticky", rd_false);
+ do_test_regex_many_mock("cooperative-sticky", rd_true);
+
+ return 0;
+}