diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/0084-destroy_flags.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/tests/0084-destroy_flags.c | 211 |
1 files changed, 211 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/0084-destroy_flags.c b/fluent-bit/lib/librdkafka-2.1.0/tests/0084-destroy_flags.c new file mode 100644 index 00000000..cd8bbf7d --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/tests/0084-destroy_flags.c @@ -0,0 +1,211 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2018, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/** + * @name Test rd_kafka_destroy_flags() + */ + + +#include "test.h" + + +static RD_TLS int rebalance_cnt = 0; + +static void destroy_flags_rebalance_cb(rd_kafka_t *rk, + rd_kafka_resp_err_t err, + rd_kafka_topic_partition_list_t *parts, + void *opaque) { + rebalance_cnt++; + + TEST_SAY("rebalance_cb: %s with %d partition(s)\n", + rd_kafka_err2str(err), parts->cnt); + + switch (err) { + case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: + test_consumer_assign("rebalance", rk, parts); + break; + + case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: + test_consumer_unassign("rebalance", rk); + break; + + default: + TEST_FAIL("rebalance_cb: error: %s", rd_kafka_err2str(err)); + } +} + +struct df_args { + rd_kafka_type_t client_type; + int produce_cnt; + int consumer_subscribe; + int consumer_unsubscribe; +}; + +static void do_test_destroy_flags(const char *topic, + int destroy_flags, + int local_mode, + const struct df_args *args) { + rd_kafka_t *rk; + rd_kafka_conf_t *conf; + test_timing_t t_destroy; + + TEST_SAY(_C_MAG + "[ test destroy_flags 0x%x for client_type %d, " + "produce_cnt %d, subscribe %d, unsubscribe %d, " + "%s mode ]\n" _C_CLR, + destroy_flags, args->client_type, args->produce_cnt, + args->consumer_subscribe, args->consumer_unsubscribe, + local_mode ? "local" : "broker"); + + test_conf_init(&conf, NULL, 20); + + if (local_mode) + test_conf_set(conf, "bootstrap.servers", ""); + + if (args->client_type == RD_KAFKA_PRODUCER) { + + rk = test_create_handle(args->client_type, conf); + + if (args->produce_cnt > 0) { + rd_kafka_topic_t *rkt; + int msgcounter = 0; + + rkt = test_create_producer_topic(rk, topic, NULL); + test_produce_msgs_nowait( + rk, rkt, 0, RD_KAFKA_PARTITION_UA, 0, + args->produce_cnt, NULL, 100, 0, &msgcounter); + rd_kafka_topic_destroy(rkt); + } + + } else { + int i; + + TEST_ASSERT(args->client_type == RD_KAFKA_CONSUMER); + + rk = test_create_consumer(topic, destroy_flags_rebalance_cb, + conf, NULL); + + if (args->consumer_subscribe) { + test_consumer_subscribe(rk, topic); + + if (!local_mode) { + TEST_SAY("Waiting for assignment\n"); + while (rebalance_cnt == 0) + test_consumer_poll_once(rk, NULL, 1000); + } + } + + for (i = 0; i < 5; i++) + test_consumer_poll_once(rk, NULL, 100); + + if (args->consumer_unsubscribe) { + /* Test that calling rd_kafka_unsubscribe immediately + * prior to rd_kafka_destroy_flags doesn't cause the + * latter to hang. */ + TEST_SAY(_C_YEL "Calling rd_kafka_unsubscribe\n"_C_CLR); + rd_kafka_unsubscribe(rk); + } + } + + rebalance_cnt = 0; + TEST_SAY(_C_YEL "Calling rd_kafka_destroy_flags(0x%x)\n" _C_CLR, + destroy_flags); + TIMING_START(&t_destroy, "rd_kafka_destroy_flags(0x%x)", destroy_flags); + rd_kafka_destroy_flags(rk, destroy_flags); + TIMING_STOP(&t_destroy); + + if (destroy_flags & RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE) + TIMING_ASSERT_LATER(&t_destroy, 0, 200); + else + TIMING_ASSERT_LATER(&t_destroy, 0, 1000); + + if (args->consumer_subscribe && + !(destroy_flags & RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE)) { + if (!local_mode) + TEST_ASSERT(rebalance_cnt > 0, + "expected final rebalance callback"); + } else + TEST_ASSERT(rebalance_cnt == 0, + "expected no rebalance callbacks, got %d", + rebalance_cnt); + + TEST_SAY(_C_GRN + "[ test destroy_flags 0x%x for client_type %d, " + "produce_cnt %d, subscribe %d, unsubscribe %d, " + "%s mode: PASS ]\n" _C_CLR, + destroy_flags, args->client_type, args->produce_cnt, + args->consumer_subscribe, args->consumer_unsubscribe, + local_mode ? "local" : "broker"); +} + + +/** + * @brief Destroy with flags + */ +static void destroy_flags(int local_mode) { + const struct df_args args[] = { + {RD_KAFKA_PRODUCER, 0, 0, 0}, + {RD_KAFKA_PRODUCER, test_quick ? 100 : 10000, 0, 0}, + {RD_KAFKA_CONSUMER, 0, 1, 0}, + {RD_KAFKA_CONSUMER, 0, 1, 1}, + {RD_KAFKA_CONSUMER, 0, 0, 0}}; + const int flag_combos[] = {0, RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE}; + const char *topic = test_mk_topic_name(__FUNCTION__, 1); + const rd_bool_t can_subscribe = + test_broker_version >= TEST_BRKVER(0, 9, 0, 0); + int i, j; + + /* Create the topic to avoid not-yet-auto-created-topics being + * subscribed to (and thus raising an error). */ + if (!local_mode) { + test_create_topic(NULL, topic, 3, 1); + test_wait_topic_exists(NULL, topic, 5000); + } + + for (i = 0; i < (int)RD_ARRAYSIZE(args); i++) { + for (j = 0; j < (int)RD_ARRAYSIZE(flag_combos); j++) { + if (!can_subscribe && (args[i].consumer_subscribe || + args[i].consumer_unsubscribe)) + continue; + do_test_destroy_flags(topic, flag_combos[j], local_mode, + &args[i]); + } + } +} + + + +int main_0084_destroy_flags_local(int argc, char **argv) { + destroy_flags(1 /*no brokers*/); + return 0; +} + +int main_0084_destroy_flags(int argc, char **argv) { + destroy_flags(0 /*with brokers*/); + return 0; +} |