summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/lib/librdkafka-2.1.0/tests/0033-regex_subscribe.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 11:19:16 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:07:37 +0000
commitb485aab7e71c1625cfc27e0f92c9509f42378458 (patch)
treeae9abe108601079d1679194de237c9a435ae5b55 /src/fluent-bit/lib/librdkafka-2.1.0/tests/0033-regex_subscribe.c
parentAdding upstream version 1.44.3. (diff)
downloadnetdata-b485aab7e71c1625cfc27e0f92c9509f42378458.tar.xz
netdata-b485aab7e71c1625cfc27e0f92c9509f42378458.zip
Adding upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0033-regex_subscribe.c')
-rw-r--r--src/fluent-bit/lib/librdkafka-2.1.0/tests/0033-regex_subscribe.c509
1 files changed, 509 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0033-regex_subscribe.c b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0033-regex_subscribe.c
new file mode 100644
index 000000000..f31d33ebc
--- /dev/null
+++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0033-regex_subscribe.c
@@ -0,0 +1,509 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2016, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+
+/* Typical include path would be <librdkafka/rdkafka.h>, but this program
+ * is built from within the librdkafka source tree and thus differs. */
+#include "rdkafka.h" /* for Kafka driver */
+
+
+/**
+ * KafkaConsumer: regex topic subscriptions
+ */
+
+
+
+struct expect {
+ char *name; /* sub-test name */
+ const char *sub[4]; /* subscriptions */
+ const char *exp[4]; /* expected topics */
+ int exp_err; /* expected error from subscribe() */
+ int stat[4]; /* per exp status */
+ int fails;
+ enum { _EXP_NONE,
+ _EXP_FAIL,
+ _EXP_OK,
+ _EXP_ASSIGN,
+ _EXP_REVOKE,
+ _EXP_ASSIGNED,
+ _EXP_REVOKED,
+ } result;
+};
+
+static struct expect *exp_curr;
+
+static uint64_t testid;
+
+static void expect_match(struct expect *exp,
+ const rd_kafka_topic_partition_list_t *parts) {
+ int i;
+ int e = 0;
+ int fails = 0;
+
+ memset(exp->stat, 0, sizeof(exp->stat));
+
+ for (i = 0; i < parts->cnt; i++) {
+ int found = 0;
+ e = 0;
+ while (exp->exp[e]) {
+ if (!strcmp(parts->elems[i].topic, exp->exp[e])) {
+ exp->stat[e]++;
+ found++;
+ }
+ e++;
+ }
+
+ if (!found) {
+ TEST_WARN("%s: got unexpected topic match: %s\n",
+ exp->name, parts->elems[i].topic);
+ fails++;
+ }
+ }
+
+
+ e = 0;
+ while (exp->exp[e]) {
+ if (!exp->stat[e]) {
+ TEST_WARN(
+ "%s: expected topic not "
+ "found in assignment: %s\n",
+ exp->name, exp->exp[e]);
+ fails++;
+ } else {
+ TEST_SAY("%s: expected topic %s seen in assignment\n",
+ exp->name, exp->exp[e]);
+ }
+ e++;
+ }
+
+ exp->fails += fails;
+ if (fails) {
+ TEST_WARN("%s: see %d previous failures\n", exp->name, fails);
+ exp->result = _EXP_FAIL;
+ } else {
+ TEST_SAY(_C_MAG "[ %s: assignment matched ]\n", exp->name);
+ exp->result = _EXP_OK;
+ }
+}
+
+static void rebalance_cb(rd_kafka_t *rk,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *parts,
+ void *opaque) {
+ struct expect *exp = exp_curr;
+
+ TEST_ASSERT(exp_curr, "exp_curr not set");
+
+ TEST_SAY("rebalance_cb: %s with %d partition(s)\n",
+ rd_kafka_err2str(err), parts->cnt);
+ test_print_partition_list(parts);
+
+ switch (err) {
+ case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
+ /* Check that provided partitions match our expectations */
+ if (exp->result != _EXP_ASSIGN) {
+ TEST_WARN(
+ "%s: rebalance called while expecting %d: "
+ "too many or undesired assignment(s?\n",
+ exp->name, exp->result);
+ }
+ expect_match(exp, parts);
+ test_consumer_assign("rebalance", rk, parts);
+ exp->result = _EXP_ASSIGNED;
+ break;
+
+ case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
+ if (exp->result != _EXP_REVOKE) {
+ TEST_WARN(
+ "%s: rebalance called while expecting %d: "
+ "too many or undesired assignment(s?\n",
+ exp->name, exp->result);
+ }
+
+ test_consumer_unassign("rebalance", rk);
+ exp->result = _EXP_REVOKED;
+ break;
+
+ default:
+ TEST_FAIL("rebalance_cb: error: %s", rd_kafka_err2str(err));
+ }
+}
+
+
+/**
+ * @brief Poll the consumer once.
+ */
+static void consumer_poll_once(rd_kafka_t *rk) {
+ rd_kafka_message_t *rkmessage;
+
+ rkmessage = rd_kafka_consumer_poll(rk, 1000);
+ if (!rkmessage)
+ return;
+
+ if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
+ TEST_SAY("%s [%" PRId32
+ "] reached EOF at "
+ "offset %" PRId64 "\n",
+ rd_kafka_topic_name(rkmessage->rkt),
+ rkmessage->partition, rkmessage->offset);
+
+ } else if (rkmessage->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART) {
+ if (strstr(rd_kafka_topic_name(rkmessage->rkt), "NONEXIST"))
+ TEST_SAY("%s: %s: error is expected for this topic\n",
+ rd_kafka_topic_name(rkmessage->rkt),
+ rd_kafka_message_errstr(rkmessage));
+ else
+ TEST_FAIL(
+ "%s [%" PRId32 "] error (offset %" PRId64 "): %s",
+ rkmessage->rkt ? rd_kafka_topic_name(rkmessage->rkt)
+ : "(no-topic)",
+ rkmessage->partition, rkmessage->offset,
+ rd_kafka_message_errstr(rkmessage));
+ }
+
+ rd_kafka_message_destroy(rkmessage);
+}
+
+
+
+static int test_subscribe(rd_kafka_t *rk, struct expect *exp) {
+ rd_kafka_resp_err_t err;
+ rd_kafka_topic_partition_list_t *tlist;
+ int i;
+ test_timing_t t_sub, t_assign, t_unsub;
+
+ exp_curr = exp;
+
+ test_timeout_set((test_session_timeout_ms / 1000) * 3);
+
+ tlist = rd_kafka_topic_partition_list_new(4);
+ TEST_SAY(_C_MAG "[ %s: begin ]\n", exp->name);
+ i = 0;
+ TEST_SAY("Topic subscription:\n");
+ while (exp->sub[i]) {
+ TEST_SAY("%s: %s\n", exp->name, exp->sub[i]);
+ rd_kafka_topic_partition_list_add(tlist, exp->sub[i],
+ RD_KAFKA_PARTITION_UA);
+ i++;
+ }
+
+ /* Subscribe */
+ TIMING_START(&t_sub, "subscribe");
+ err = rd_kafka_subscribe(rk, tlist);
+ TIMING_STOP(&t_sub);
+ TEST_ASSERT(err == exp->exp_err, "subscribe() failed: %s (expected %s)",
+ rd_kafka_err2str(err), rd_kafka_err2str(exp->exp_err));
+
+ if (exp->exp[0]) {
+ /* Wait for assignment, actual messages are ignored. */
+ exp->result = _EXP_ASSIGN;
+ TEST_SAY("%s: waiting for assignment\n", exp->name);
+ TIMING_START(&t_assign, "assignment");
+ while (exp->result == _EXP_ASSIGN)
+ consumer_poll_once(rk);
+ TIMING_STOP(&t_assign);
+ TEST_ASSERT(exp->result == _EXP_ASSIGNED,
+ "got %d instead of assignment", exp->result);
+
+ } else {
+ /* Not expecting any assignment */
+ int64_t ts_end = test_clock() + 5000;
+ exp->result = _EXP_NONE; /* Not expecting a rebalance */
+ while (exp->result == _EXP_NONE && test_clock() < ts_end)
+ consumer_poll_once(rk);
+ TEST_ASSERT(exp->result == _EXP_NONE);
+ }
+
+ /* Unsubscribe */
+ TIMING_START(&t_unsub, "unsubscribe");
+ err = rd_kafka_unsubscribe(rk);
+ TIMING_STOP(&t_unsub);
+ TEST_ASSERT(!err, "unsubscribe() failed: %s", rd_kafka_err2str(err));
+
+ rd_kafka_topic_partition_list_destroy(tlist);
+
+ if (exp->exp[0]) {
+ /* Wait for revoke, actual messages are ignored. */
+ TEST_SAY("%s: waiting for revoke\n", exp->name);
+ exp->result = _EXP_REVOKE;
+ TIMING_START(&t_assign, "revoke");
+ while (exp->result != _EXP_REVOKED)
+ consumer_poll_once(rk);
+ TIMING_STOP(&t_assign);
+ TEST_ASSERT(exp->result == _EXP_REVOKED,
+ "got %d instead of revoke", exp->result);
+ } else {
+ /* Not expecting any revoke */
+ int64_t ts_end = test_clock() + 5000;
+ exp->result = _EXP_NONE; /* Not expecting a rebalance */
+ while (exp->result == _EXP_NONE && test_clock() < ts_end)
+ consumer_poll_once(rk);
+ TEST_ASSERT(exp->result == _EXP_NONE);
+ }
+
+ TEST_SAY(_C_MAG "[ %s: done with %d failures ]\n", exp->name,
+ exp->fails);
+
+ return exp->fails;
+}
+
+
+static int do_test(const char *assignor) {
+ static char topics[3][128];
+ static char nonexist_topic[128];
+ const int topic_cnt = 3;
+ rd_kafka_t *rk;
+ const int msgcnt = 10;
+ int i;
+ char groupid[64];
+ int fails = 0;
+ rd_kafka_conf_t *conf;
+
+ if (!test_check_builtin("regex")) {
+ TEST_SKIP("regex support not built in\n");
+ return 0;
+ }
+
+ testid = test_id_generate();
+ test_str_id_generate(groupid, sizeof(groupid));
+
+ rd_snprintf(topics[0], sizeof(topics[0]), "%s_%s",
+ test_mk_topic_name("regex_subscribe_TOPIC_0001_UNO", 0),
+ groupid);
+ rd_snprintf(topics[1], sizeof(topics[1]), "%s_%s",
+ test_mk_topic_name("regex_subscribe_topic_0002_dup", 0),
+ groupid);
+ rd_snprintf(topics[2], sizeof(topics[2]), "%s_%s",
+ test_mk_topic_name("regex_subscribe_TOOTHPIC_0003_3", 0),
+ groupid);
+
+ /* To avoid auto topic creation to kick in we use
+ * an invalid topic name. */
+ rd_snprintf(
+ nonexist_topic, sizeof(nonexist_topic), "%s_%s",
+ test_mk_topic_name("regex_subscribe_NONEXISTENT_0004_IV#!", 0),
+ groupid);
+
+ /* Produce messages to topics to ensure creation. */
+ for (i = 0; i < topic_cnt; i++)
+ test_produce_msgs_easy(topics[i], testid, RD_KAFKA_PARTITION_UA,
+ msgcnt);
+
+ test_conf_init(&conf, NULL, 20);
+ test_conf_set(conf, "partition.assignment.strategy", assignor);
+ /* Speed up propagation of new topics */
+ test_conf_set(conf, "topic.metadata.refresh.interval.ms", "5000");
+ test_conf_set(conf, "allow.auto.create.topics", "true");
+
+ /* Create a single consumer to handle all subscriptions.
+ * Has the nice side affect of testing multiple subscriptions. */
+ rk = test_create_consumer(groupid, rebalance_cb, conf, NULL);
+
+ /*
+ * Test cases
+ */
+ {
+ struct expect expect = {.name = rd_strdup(tsprintf(
+ "%s: no regexps (0&1)", assignor)),
+ .sub = {topics[0], topics[1], NULL},
+ .exp = {topics[0], topics[1], NULL}};
+
+ fails += test_subscribe(rk, &expect);
+ rd_free(expect.name);
+ }
+
+ {
+ struct expect expect = {.name =
+ rd_strdup(tsprintf("%s: no regexps "
+ "(no matches)",
+ assignor)),
+ .sub = {nonexist_topic, NULL},
+ .exp = {NULL}};
+
+ fails += test_subscribe(rk, &expect);
+ rd_free(expect.name);
+ }
+
+ {
+ struct expect expect = {
+ .name = rd_strdup(tsprintf("%s: regex all", assignor)),
+ .sub = {rd_strdup(tsprintf("^.*_%s", groupid)), NULL},
+ .exp = {topics[0], topics[1], topics[2], NULL}};
+
+ fails += test_subscribe(rk, &expect);
+ rd_free(expect.name);
+ rd_free((void *)expect.sub[0]);
+ }
+
+ {
+ struct expect expect = {
+ .name = rd_strdup(tsprintf("%s: regex 0&1", assignor)),
+ .sub = {rd_strdup(tsprintf(
+ "^.*[tToOpPiIcC]_0+[12]_[^_]+_%s", groupid)),
+ NULL},
+ .exp = {topics[0], topics[1], NULL}};
+
+ fails += test_subscribe(rk, &expect);
+ rd_free(expect.name);
+ rd_free((void *)expect.sub[0]);
+ }
+
+ {
+ struct expect expect = {
+ .name = rd_strdup(tsprintf("%s: regex 2", assignor)),
+ .sub = {rd_strdup(
+ tsprintf("^.*TOOTHPIC_000._._%s", groupid)),
+ NULL},
+ .exp = {topics[2], NULL}};
+
+ fails += test_subscribe(rk, &expect);
+ rd_free(expect.name);
+ rd_free((void *)expect.sub[0]);
+ }
+
+ {
+ struct expect expect = {
+ .name = rd_strdup(tsprintf("%s: regex 2 and "
+ "nonexistent(not seen)",
+ assignor)),
+ .sub = {rd_strdup(tsprintf("^.*_000[34]_..?_%s", groupid)),
+ NULL},
+ .exp = {topics[2], NULL}};
+
+ fails += test_subscribe(rk, &expect);
+ rd_free(expect.name);
+ rd_free((void *)expect.sub[0]);
+ }
+
+ {
+ struct expect expect = {
+ .name = rd_strdup(
+ tsprintf("%s: broken regex (no matches)", assignor)),
+ .sub = {"^.*[0", NULL},
+ .exp = {NULL},
+ .exp_err = RD_KAFKA_RESP_ERR__INVALID_ARG};
+
+ fails += test_subscribe(rk, &expect);
+ rd_free(expect.name);
+ }
+
+
+ test_consumer_close(rk);
+
+ rd_kafka_destroy(rk);
+
+ if (fails)
+ TEST_FAIL("See %d previous failures", fails);
+
+ return 0;
+}
+
+
+int main_0033_regex_subscribe(int argc, char **argv) {
+ do_test("range");
+ do_test("roundrobin");
+ return 0;
+}
+
+
+/**
+ * @brief Subscription API tests that dont require a broker
+ */
+int main_0033_regex_subscribe_local(int argc, char **argv) {
+ rd_kafka_topic_partition_list_t *valids, *invalids, *none, *empty,
+ *alot;
+ rd_kafka_t *rk;
+ rd_kafka_conf_t *conf;
+ rd_kafka_resp_err_t err;
+ char errstr[256];
+ int i;
+
+ valids = rd_kafka_topic_partition_list_new(0);
+ invalids = rd_kafka_topic_partition_list_new(100);
+ none = rd_kafka_topic_partition_list_new(1000);
+ empty = rd_kafka_topic_partition_list_new(5);
+ alot = rd_kafka_topic_partition_list_new(1);
+
+ rd_kafka_topic_partition_list_add(valids, "not_a_regex", 0);
+ rd_kafka_topic_partition_list_add(valids, "^My[vV]alid..regex+", 0);
+ rd_kafka_topic_partition_list_add(valids, "^another_one$", 55);
+
+ rd_kafka_topic_partition_list_add(invalids, "not_a_regex", 0);
+ rd_kafka_topic_partition_list_add(invalids, "^My[vV]alid..regex+", 0);
+ rd_kafka_topic_partition_list_add(invalids, "^a[b", 99);
+
+ rd_kafka_topic_partition_list_add(empty, "not_a_regex", 0);
+ rd_kafka_topic_partition_list_add(empty, "", 0);
+ rd_kafka_topic_partition_list_add(empty, "^ok", 0);
+
+ for (i = 0; i < 10000; i++) {
+ char topic[32];
+ rd_snprintf(topic, sizeof(topic), "^Va[lLid]_regex_%d$", i);
+ rd_kafka_topic_partition_list_add(alot, topic, i);
+ }
+
+ conf = rd_kafka_conf_new();
+ test_conf_set(conf, "group.id", "group");
+ test_conf_set(conf, "client.id", test_curr->name);
+
+ rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
+ if (!rk)
+ TEST_FAIL("Failed to create consumer: %s", errstr);
+
+ err = rd_kafka_subscribe(rk, valids);
+ TEST_ASSERT(!err, "valids failed: %s", rd_kafka_err2str(err));
+
+ err = rd_kafka_subscribe(rk, invalids);
+ TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
+ "invalids failed with wrong return: %s",
+ rd_kafka_err2str(err));
+
+ err = rd_kafka_subscribe(rk, none);
+ TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
+ "none failed with wrong return: %s", rd_kafka_err2str(err));
+
+ err = rd_kafka_subscribe(rk, empty);
+ TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
+ "empty failed with wrong return: %s",
+ rd_kafka_err2str(err));
+
+ err = rd_kafka_subscribe(rk, alot);
+ TEST_ASSERT(!err, "alot failed: %s", rd_kafka_err2str(err));
+
+ rd_kafka_consumer_close(rk);
+ rd_kafka_destroy(rk);
+
+ rd_kafka_topic_partition_list_destroy(valids);
+ rd_kafka_topic_partition_list_destroy(invalids);
+ rd_kafka_topic_partition_list_destroy(none);
+ rd_kafka_topic_partition_list_destroy(empty);
+ rd_kafka_topic_partition_list_destroy(alot);
+
+ return 0;
+}