summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/lib/librdkafka-2.1.0/tests/0120-asymmetric_subscription.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0120-asymmetric_subscription.c')
-rw-r--r--src/fluent-bit/lib/librdkafka-2.1.0/tests/0120-asymmetric_subscription.c183
1 files changed, 183 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0120-asymmetric_subscription.c b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0120-asymmetric_subscription.c
new file mode 100644
index 000000000..2031dcba1
--- /dev/null
+++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0120-asymmetric_subscription.c
@@ -0,0 +1,183 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2020, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+
+
+#define _PART_CNT 4
+
+
+/**
+ * @brief Verify proper assignment for asymmetrical subscriptions.
+ */
+static void do_test_asymmetric(const char *assignor, const char *bootstraps) {
+ rd_kafka_conf_t *conf;
+#define _C_CNT 3
+ rd_kafka_t *c[_C_CNT];
+#define _S_CNT 2 /* max subscription count per consumer */
+ const char *topics[_C_CNT][_S_CNT] = {
+ /* c0 */ {"t1", "t2"},
+ /* c1 */ {"t2", "t3"},
+ /* c2 */ {"t4"},
+ };
+ struct {
+ const char *topic;
+ const int cnt;
+ int seen;
+ } expect[_C_CNT][_S_CNT] = {
+ /* c0 */
+ {
+ {"t1", _PART_CNT},
+ {"t2", _PART_CNT / 2},
+ },
+ /* c1 */
+ {
+ {"t2", _PART_CNT / 2},
+ {"t3", _PART_CNT},
+ },
+ /* c2 */
+ {
+ {"t4", _PART_CNT},
+ },
+ };
+ const char *groupid = assignor;
+ int i;
+
+ SUB_TEST_QUICK("%s assignor", assignor);
+
+ test_conf_init(&conf, NULL, 30);
+ test_conf_set(conf, "bootstrap.servers", bootstraps);
+ test_conf_set(conf, "partition.assignment.strategy", assignor);
+
+ for (i = 0; i < _C_CNT; i++) {
+ char name[16];
+ rd_kafka_topic_partition_list_t *tlist =
+ rd_kafka_topic_partition_list_new(2);
+ int j;
+
+ rd_snprintf(name, sizeof(name), "c%d", i);
+ test_conf_set(conf, "client.id", name);
+
+ for (j = 0; j < _S_CNT && topics[i][j]; j++)
+ rd_kafka_topic_partition_list_add(
+ tlist, topics[i][j], RD_KAFKA_PARTITION_UA);
+
+ c[i] = test_create_consumer(groupid, NULL,
+ rd_kafka_conf_dup(conf), NULL);
+
+ TEST_CALL_ERR__(rd_kafka_subscribe(c[i], tlist));
+
+ rd_kafka_topic_partition_list_destroy(tlist);
+ }
+
+ rd_kafka_conf_destroy(conf);
+
+
+ /* Await assignments for all consumers */
+ for (i = 0; i < _C_CNT; i++)
+ test_consumer_wait_assignment(c[i], rd_true);
+
+ /* All have assignments, grab them. */
+ for (i = 0; i < _C_CNT; i++) {
+ int j;
+ int p;
+ rd_kafka_topic_partition_list_t *assignment;
+
+ TEST_CALL_ERR__(rd_kafka_assignment(c[i], &assignment));
+
+ TEST_ASSERT(assignment, "No assignment for %s",
+ rd_kafka_name(c[i]));
+
+ for (p = 0; p < assignment->cnt; p++) {
+ const rd_kafka_topic_partition_t *part =
+ &assignment->elems[p];
+ rd_bool_t found = rd_false;
+
+ for (j = 0; j < _S_CNT && expect[i][j].topic; j++) {
+ if (!strcmp(part->topic, expect[i][j].topic)) {
+ expect[i][j].seen++;
+ found = rd_true;
+ break;
+ }
+ }
+
+ TEST_ASSERT(found,
+ "%s was assigned unexpected topic %s",
+ rd_kafka_name(c[i]), part->topic);
+ }
+
+ for (j = 0; j < _S_CNT && expect[i][j].topic; j++) {
+ TEST_ASSERT(expect[i][j].seen == expect[i][j].cnt,
+ "%s expected %d assigned partitions "
+ "for %s, not %d",
+ rd_kafka_name(c[i]), expect[i][j].cnt,
+ expect[i][j].topic, expect[i][j].seen);
+ }
+
+ rd_kafka_topic_partition_list_destroy(assignment);
+ }
+
+
+ for (i = 0; i < _C_CNT; i++) {
+ if (strcmp(assignor, "range") && (i & 1) == 0)
+ test_consumer_close(c[i]);
+ rd_kafka_destroy(c[i]);
+ }
+
+
+ SUB_TEST_PASS();
+}
+
+
+int main_0120_asymmetric_subscription(int argc, char **argv) {
+ const char *bootstraps;
+ rd_kafka_mock_cluster_t *mcluster;
+
+ if (test_needs_auth()) {
+ TEST_SKIP("Mock cluster does not support SSL/SASL\n");
+ return 0;
+ }
+
+ mcluster = test_mock_cluster_new(3, &bootstraps);
+
+
+ /* Create topics */
+ rd_kafka_mock_topic_create(mcluster, "t1", _PART_CNT, 1);
+ rd_kafka_mock_topic_create(mcluster, "t2", _PART_CNT, 1);
+ rd_kafka_mock_topic_create(mcluster, "t3", _PART_CNT, 1);
+ rd_kafka_mock_topic_create(mcluster, "t4", _PART_CNT, 1);
+
+
+ do_test_asymmetric("roundrobin", bootstraps);
+ do_test_asymmetric("range", bootstraps);
+ do_test_asymmetric("cooperative-sticky", bootstraps);
+
+ test_mock_cluster_destroy(mcluster);
+
+ return 0;
+}