diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/0033-regex_subscribe.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/tests/0033-regex_subscribe.c | 509 |
1 files changed, 0 insertions, 509 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/0033-regex_subscribe.c b/fluent-bit/lib/librdkafka-2.1.0/tests/0033-regex_subscribe.c deleted file mode 100644 index f31d33ebc..000000000 --- a/fluent-bit/lib/librdkafka-2.1.0/tests/0033-regex_subscribe.c +++ /dev/null @@ -1,509 +0,0 @@ -/* - * librdkafka - Apache Kafka C library - * - * Copyright (c) 2016, Magnus Edenhill - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#include "test.h" - -/* Typical include path would be <librdkafka/rdkafka.h>, but this program - * is built from within the librdkafka source tree and thus differs. */ -#include "rdkafka.h" /* for Kafka driver */ - - -/** - * KafkaConsumer: regex topic subscriptions - */ - - - -struct expect { - char *name; /* sub-test name */ - const char *sub[4]; /* subscriptions */ - const char *exp[4]; /* expected topics */ - int exp_err; /* expected error from subscribe() */ - int stat[4]; /* per exp status */ - int fails; - enum { _EXP_NONE, - _EXP_FAIL, - _EXP_OK, - _EXP_ASSIGN, - _EXP_REVOKE, - _EXP_ASSIGNED, - _EXP_REVOKED, - } result; -}; - -static struct expect *exp_curr; - -static uint64_t testid; - -static void expect_match(struct expect *exp, - const rd_kafka_topic_partition_list_t *parts) { - int i; - int e = 0; - int fails = 0; - - memset(exp->stat, 0, sizeof(exp->stat)); - - for (i = 0; i < parts->cnt; i++) { - int found = 0; - e = 0; - while (exp->exp[e]) { - if (!strcmp(parts->elems[i].topic, exp->exp[e])) { - exp->stat[e]++; - found++; - } - e++; - } - - if (!found) { - TEST_WARN("%s: got unexpected topic match: %s\n", - exp->name, parts->elems[i].topic); - fails++; - } - } - - - e = 0; - while (exp->exp[e]) { - if (!exp->stat[e]) { - TEST_WARN( - "%s: expected topic not " - "found in assignment: %s\n", - exp->name, exp->exp[e]); - fails++; - } else { - TEST_SAY("%s: expected topic %s seen in assignment\n", - exp->name, exp->exp[e]); - } - e++; - } - - exp->fails += fails; - if (fails) { - TEST_WARN("%s: see %d previous failures\n", exp->name, fails); - exp->result = _EXP_FAIL; - } else { - TEST_SAY(_C_MAG "[ %s: assignment matched ]\n", exp->name); - exp->result = _EXP_OK; - } -} - -static void rebalance_cb(rd_kafka_t *rk, - rd_kafka_resp_err_t err, - rd_kafka_topic_partition_list_t *parts, - void *opaque) { - struct expect *exp = exp_curr; - - TEST_ASSERT(exp_curr, "exp_curr not set"); - - TEST_SAY("rebalance_cb: %s with %d partition(s)\n", - rd_kafka_err2str(err), parts->cnt); - test_print_partition_list(parts); - - switch (err) { - case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: - /* Check that provided partitions match our expectations */ - if (exp->result != _EXP_ASSIGN) { - TEST_WARN( - "%s: rebalance called while expecting %d: " - "too many or undesired assignment(s?\n", - exp->name, exp->result); - } - expect_match(exp, parts); - test_consumer_assign("rebalance", rk, parts); - exp->result = _EXP_ASSIGNED; - break; - - case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: - if (exp->result != _EXP_REVOKE) { - TEST_WARN( - "%s: rebalance called while expecting %d: " - "too many or undesired assignment(s?\n", - exp->name, exp->result); - } - - test_consumer_unassign("rebalance", rk); - exp->result = _EXP_REVOKED; - break; - - default: - TEST_FAIL("rebalance_cb: error: %s", rd_kafka_err2str(err)); - } -} - - -/** - * @brief Poll the consumer once. - */ -static void consumer_poll_once(rd_kafka_t *rk) { - rd_kafka_message_t *rkmessage; - - rkmessage = rd_kafka_consumer_poll(rk, 1000); - if (!rkmessage) - return; - - if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { - TEST_SAY("%s [%" PRId32 - "] reached EOF at " - "offset %" PRId64 "\n", - rd_kafka_topic_name(rkmessage->rkt), - rkmessage->partition, rkmessage->offset); - - } else if (rkmessage->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART) { - if (strstr(rd_kafka_topic_name(rkmessage->rkt), "NONEXIST")) - TEST_SAY("%s: %s: error is expected for this topic\n", - rd_kafka_topic_name(rkmessage->rkt), - rd_kafka_message_errstr(rkmessage)); - else - TEST_FAIL( - "%s [%" PRId32 "] error (offset %" PRId64 "): %s", - rkmessage->rkt ? rd_kafka_topic_name(rkmessage->rkt) - : "(no-topic)", - rkmessage->partition, rkmessage->offset, - rd_kafka_message_errstr(rkmessage)); - } - - rd_kafka_message_destroy(rkmessage); -} - - - -static int test_subscribe(rd_kafka_t *rk, struct expect *exp) { - rd_kafka_resp_err_t err; - rd_kafka_topic_partition_list_t *tlist; - int i; - test_timing_t t_sub, t_assign, t_unsub; - - exp_curr = exp; - - test_timeout_set((test_session_timeout_ms / 1000) * 3); - - tlist = rd_kafka_topic_partition_list_new(4); - TEST_SAY(_C_MAG "[ %s: begin ]\n", exp->name); - i = 0; - TEST_SAY("Topic subscription:\n"); - while (exp->sub[i]) { - TEST_SAY("%s: %s\n", exp->name, exp->sub[i]); - rd_kafka_topic_partition_list_add(tlist, exp->sub[i], - RD_KAFKA_PARTITION_UA); - i++; - } - - /* Subscribe */ - TIMING_START(&t_sub, "subscribe"); - err = rd_kafka_subscribe(rk, tlist); - TIMING_STOP(&t_sub); - TEST_ASSERT(err == exp->exp_err, "subscribe() failed: %s (expected %s)", - rd_kafka_err2str(err), rd_kafka_err2str(exp->exp_err)); - - if (exp->exp[0]) { - /* Wait for assignment, actual messages are ignored. */ - exp->result = _EXP_ASSIGN; - TEST_SAY("%s: waiting for assignment\n", exp->name); - TIMING_START(&t_assign, "assignment"); - while (exp->result == _EXP_ASSIGN) - consumer_poll_once(rk); - TIMING_STOP(&t_assign); - TEST_ASSERT(exp->result == _EXP_ASSIGNED, - "got %d instead of assignment", exp->result); - - } else { - /* Not expecting any assignment */ - int64_t ts_end = test_clock() + 5000; - exp->result = _EXP_NONE; /* Not expecting a rebalance */ - while (exp->result == _EXP_NONE && test_clock() < ts_end) - consumer_poll_once(rk); - TEST_ASSERT(exp->result == _EXP_NONE); - } - - /* Unsubscribe */ - TIMING_START(&t_unsub, "unsubscribe"); - err = rd_kafka_unsubscribe(rk); - TIMING_STOP(&t_unsub); - TEST_ASSERT(!err, "unsubscribe() failed: %s", rd_kafka_err2str(err)); - - rd_kafka_topic_partition_list_destroy(tlist); - - if (exp->exp[0]) { - /* Wait for revoke, actual messages are ignored. */ - TEST_SAY("%s: waiting for revoke\n", exp->name); - exp->result = _EXP_REVOKE; - TIMING_START(&t_assign, "revoke"); - while (exp->result != _EXP_REVOKED) - consumer_poll_once(rk); - TIMING_STOP(&t_assign); - TEST_ASSERT(exp->result == _EXP_REVOKED, - "got %d instead of revoke", exp->result); - } else { - /* Not expecting any revoke */ - int64_t ts_end = test_clock() + 5000; - exp->result = _EXP_NONE; /* Not expecting a rebalance */ - while (exp->result == _EXP_NONE && test_clock() < ts_end) - consumer_poll_once(rk); - TEST_ASSERT(exp->result == _EXP_NONE); - } - - TEST_SAY(_C_MAG "[ %s: done with %d failures ]\n", exp->name, - exp->fails); - - return exp->fails; -} - - -static int do_test(const char *assignor) { - static char topics[3][128]; - static char nonexist_topic[128]; - const int topic_cnt = 3; - rd_kafka_t *rk; - const int msgcnt = 10; - int i; - char groupid[64]; - int fails = 0; - rd_kafka_conf_t *conf; - - if (!test_check_builtin("regex")) { - TEST_SKIP("regex support not built in\n"); - return 0; - } - - testid = test_id_generate(); - test_str_id_generate(groupid, sizeof(groupid)); - - rd_snprintf(topics[0], sizeof(topics[0]), "%s_%s", - test_mk_topic_name("regex_subscribe_TOPIC_0001_UNO", 0), - groupid); - rd_snprintf(topics[1], sizeof(topics[1]), "%s_%s", - test_mk_topic_name("regex_subscribe_topic_0002_dup", 0), - groupid); - rd_snprintf(topics[2], sizeof(topics[2]), "%s_%s", - test_mk_topic_name("regex_subscribe_TOOTHPIC_0003_3", 0), - groupid); - - /* To avoid auto topic creation to kick in we use - * an invalid topic name. */ - rd_snprintf( - nonexist_topic, sizeof(nonexist_topic), "%s_%s", - test_mk_topic_name("regex_subscribe_NONEXISTENT_0004_IV#!", 0), - groupid); - - /* Produce messages to topics to ensure creation. */ - for (i = 0; i < topic_cnt; i++) - test_produce_msgs_easy(topics[i], testid, RD_KAFKA_PARTITION_UA, - msgcnt); - - test_conf_init(&conf, NULL, 20); - test_conf_set(conf, "partition.assignment.strategy", assignor); - /* Speed up propagation of new topics */ - test_conf_set(conf, "topic.metadata.refresh.interval.ms", "5000"); - test_conf_set(conf, "allow.auto.create.topics", "true"); - - /* Create a single consumer to handle all subscriptions. - * Has the nice side affect of testing multiple subscriptions. */ - rk = test_create_consumer(groupid, rebalance_cb, conf, NULL); - - /* - * Test cases - */ - { - struct expect expect = {.name = rd_strdup(tsprintf( - "%s: no regexps (0&1)", assignor)), - .sub = {topics[0], topics[1], NULL}, - .exp = {topics[0], topics[1], NULL}}; - - fails += test_subscribe(rk, &expect); - rd_free(expect.name); - } - - { - struct expect expect = {.name = - rd_strdup(tsprintf("%s: no regexps " - "(no matches)", - assignor)), - .sub = {nonexist_topic, NULL}, - .exp = {NULL}}; - - fails += test_subscribe(rk, &expect); - rd_free(expect.name); - } - - { - struct expect expect = { - .name = rd_strdup(tsprintf("%s: regex all", assignor)), - .sub = {rd_strdup(tsprintf("^.*_%s", groupid)), NULL}, - .exp = {topics[0], topics[1], topics[2], NULL}}; - - fails += test_subscribe(rk, &expect); - rd_free(expect.name); - rd_free((void *)expect.sub[0]); - } - - { - struct expect expect = { - .name = rd_strdup(tsprintf("%s: regex 0&1", assignor)), - .sub = {rd_strdup(tsprintf( - "^.*[tToOpPiIcC]_0+[12]_[^_]+_%s", groupid)), - NULL}, - .exp = {topics[0], topics[1], NULL}}; - - fails += test_subscribe(rk, &expect); - rd_free(expect.name); - rd_free((void *)expect.sub[0]); - } - - { - struct expect expect = { - .name = rd_strdup(tsprintf("%s: regex 2", assignor)), - .sub = {rd_strdup( - tsprintf("^.*TOOTHPIC_000._._%s", groupid)), - NULL}, - .exp = {topics[2], NULL}}; - - fails += test_subscribe(rk, &expect); - rd_free(expect.name); - rd_free((void *)expect.sub[0]); - } - - { - struct expect expect = { - .name = rd_strdup(tsprintf("%s: regex 2 and " - "nonexistent(not seen)", - assignor)), - .sub = {rd_strdup(tsprintf("^.*_000[34]_..?_%s", groupid)), - NULL}, - .exp = {topics[2], NULL}}; - - fails += test_subscribe(rk, &expect); - rd_free(expect.name); - rd_free((void *)expect.sub[0]); - } - - { - struct expect expect = { - .name = rd_strdup( - tsprintf("%s: broken regex (no matches)", assignor)), - .sub = {"^.*[0", NULL}, - .exp = {NULL}, - .exp_err = RD_KAFKA_RESP_ERR__INVALID_ARG}; - - fails += test_subscribe(rk, &expect); - rd_free(expect.name); - } - - - test_consumer_close(rk); - - rd_kafka_destroy(rk); - - if (fails) - TEST_FAIL("See %d previous failures", fails); - - return 0; -} - - -int main_0033_regex_subscribe(int argc, char **argv) { - do_test("range"); - do_test("roundrobin"); - return 0; -} - - -/** - * @brief Subscription API tests that dont require a broker - */ -int main_0033_regex_subscribe_local(int argc, char **argv) { - rd_kafka_topic_partition_list_t *valids, *invalids, *none, *empty, - *alot; - rd_kafka_t *rk; - rd_kafka_conf_t *conf; - rd_kafka_resp_err_t err; - char errstr[256]; - int i; - - valids = rd_kafka_topic_partition_list_new(0); - invalids = rd_kafka_topic_partition_list_new(100); - none = rd_kafka_topic_partition_list_new(1000); - empty = rd_kafka_topic_partition_list_new(5); - alot = rd_kafka_topic_partition_list_new(1); - - rd_kafka_topic_partition_list_add(valids, "not_a_regex", 0); - rd_kafka_topic_partition_list_add(valids, "^My[vV]alid..regex+", 0); - rd_kafka_topic_partition_list_add(valids, "^another_one$", 55); - - rd_kafka_topic_partition_list_add(invalids, "not_a_regex", 0); - rd_kafka_topic_partition_list_add(invalids, "^My[vV]alid..regex+", 0); - rd_kafka_topic_partition_list_add(invalids, "^a[b", 99); - - rd_kafka_topic_partition_list_add(empty, "not_a_regex", 0); - rd_kafka_topic_partition_list_add(empty, "", 0); - rd_kafka_topic_partition_list_add(empty, "^ok", 0); - - for (i = 0; i < 10000; i++) { - char topic[32]; - rd_snprintf(topic, sizeof(topic), "^Va[lLid]_regex_%d$", i); - rd_kafka_topic_partition_list_add(alot, topic, i); - } - - conf = rd_kafka_conf_new(); - test_conf_set(conf, "group.id", "group"); - test_conf_set(conf, "client.id", test_curr->name); - - rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); - if (!rk) - TEST_FAIL("Failed to create consumer: %s", errstr); - - err = rd_kafka_subscribe(rk, valids); - TEST_ASSERT(!err, "valids failed: %s", rd_kafka_err2str(err)); - - err = rd_kafka_subscribe(rk, invalids); - TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG, - "invalids failed with wrong return: %s", - rd_kafka_err2str(err)); - - err = rd_kafka_subscribe(rk, none); - TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG, - "none failed with wrong return: %s", rd_kafka_err2str(err)); - - err = rd_kafka_subscribe(rk, empty); - TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG, - "empty failed with wrong return: %s", - rd_kafka_err2str(err)); - - err = rd_kafka_subscribe(rk, alot); - TEST_ASSERT(!err, "alot failed: %s", rd_kafka_err2str(err)); - - rd_kafka_consumer_close(rk); - rd_kafka_destroy(rk); - - rd_kafka_topic_partition_list_destroy(valids); - rd_kafka_topic_partition_list_destroy(invalids); - rd_kafka_topic_partition_list_destroy(none); - rd_kafka_topic_partition_list_destroy(empty); - rd_kafka_topic_partition_list_destroy(alot); - - return 0; -} |