/* * librdkafka - Apache Kafka C library * * Copyright (c) 2012-2018, Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include "test.h" #include "rdkafka.h" /** * Verify that long-processing consumer leaves the group during * processing, with or without a log queue. * * MO: * - produce messages to a single partition topic. * - create two consumers, c1 and c2. * - process first message slowly (2 * max.poll.interval.ms) * - verify in other consumer that group rebalances after max.poll.interval.ms * and the partition is assigned to the other consumer. */ /** * @brief Test max.poll.interval.ms without any additional polling. */ static void do_test(void) { const char *topic = test_mk_topic_name("0089_max_poll_interval", 1); uint64_t testid; const int msgcnt = 10; rd_kafka_t *c[2]; rd_kafka_conf_t *conf; int64_t ts_next[2] = {0, 0}; int64_t ts_exp_msg[2] = {0, 0}; int cmsgcnt = 0; int i; int bad = -1; SUB_TEST(); testid = test_id_generate(); test_create_topic(NULL, topic, 1, 1); test_produce_msgs_easy(topic, testid, -1, msgcnt); test_conf_init(&conf, NULL, 60); test_conf_set(conf, "session.timeout.ms", "6000"); test_conf_set(conf, "max.poll.interval.ms", "10000" /*10s*/); test_conf_set(conf, "auto.offset.reset", "earliest"); c[0] = test_create_consumer(topic, NULL, rd_kafka_conf_dup(conf), NULL); c[1] = test_create_consumer(topic, NULL, conf, NULL); test_consumer_subscribe(c[0], topic); test_consumer_subscribe(c[1], topic); while (1) { for (i = 0; i < 2; i++) { int64_t now; rd_kafka_message_t *rkm; /* Consumer is "processing" */ if (ts_next[i] > test_clock()) continue; rkm = rd_kafka_consumer_poll(c[i], 100); if (!rkm) continue; if (rkm->err) { TEST_WARN( "Consumer %d error: %s: " "ignoring\n", i, rd_kafka_message_errstr(rkm)); continue; } now = test_clock(); cmsgcnt++; TEST_SAY( "Consumer %d received message (#%d) " "at offset %" PRId64 "\n", i, cmsgcnt, rkm->offset); if (ts_exp_msg[i]) { /* This consumer is expecting a message * after a certain time, namely after the * rebalance following max.poll.. being * exceeded in the other consumer */ TEST_ASSERT( now > ts_exp_msg[i], "Consumer %d: did not expect " "message for at least %dms", i, (int)((ts_exp_msg[i] - now) / 1000)); TEST_ASSERT( now < ts_exp_msg[i] + 10000 * 1000, "Consumer %d: expected message " "within 10s, not after %dms", i, (int)((now - ts_exp_msg[i]) / 1000)); TEST_SAY( "Consumer %d: received message " "at offset %" PRId64 " after rebalance\n", i, rkm->offset); rd_kafka_message_destroy(rkm); goto done; } else if (cmsgcnt == 1) { /* Process this message for 20s */ ts_next[i] = now + (20000 * 1000); /* Exp message on other consumer after * max.poll.interval.ms */ ts_exp_msg[i ^ 1] = now + (10000 * 1000); /* This is the bad consumer */ bad = i; TEST_SAY( "Consumer %d processing message at " "offset %" PRId64 "\n", i, rkm->offset); rd_kafka_message_destroy(rkm); } else { rd_kafka_message_destroy(rkm); TEST_FAIL( "Consumer %d did not expect " "a message", i); } } } done: TEST_ASSERT(bad != -1, "Bad consumer not set"); /* Wait for error ERR__MAX_POLL_EXCEEDED on the bad consumer. */ while (1) { rd_kafka_message_t *rkm; rkm = rd_kafka_consumer_poll(c[bad], 1000); TEST_ASSERT(rkm, "Expected consumer result within 1s"); TEST_ASSERT(rkm->err, "Did not expect message on bad consumer"); TEST_SAY("Consumer error: %s: %s\n", rd_kafka_err2name(rkm->err), rd_kafka_message_errstr(rkm)); if (rkm->err == RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED) { rd_kafka_message_destroy(rkm); break; } rd_kafka_message_destroy(rkm); } for (i = 0; i < 2; i++) rd_kafka_destroy_flags(c[i], RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE); SUB_TEST_PASS(); } /** * @brief Test max.poll.interval.ms while polling log queue. */ static void do_test_with_log_queue(void) { const char *topic = test_mk_topic_name("0089_max_poll_interval", 1); uint64_t testid; const int msgcnt = 10; rd_kafka_t *c[2]; rd_kafka_conf_t *conf; rd_kafka_queue_t *logq[2]; int64_t ts_next[2] = {0, 0}; int64_t ts_exp_msg[2] = {0, 0}; int cmsgcnt = 0; int i; int bad = -1; char errstr[512]; SUB_TEST(); testid = test_id_generate(); test_create_topic(NULL, topic, 1, 1); test_produce_msgs_easy(topic, testid, -1, msgcnt); test_conf_init(&conf, NULL, 60); test_conf_set(conf, "session.timeout.ms", "6000"); test_conf_set(conf, "max.poll.interval.ms", "10000" /*10s*/); test_conf_set(conf, "auto.offset.reset", "earliest"); test_conf_set(conf, "log.queue", "true"); c[0] = test_create_consumer(topic, NULL, rd_kafka_conf_dup(conf), NULL); c[1] = test_create_consumer(topic, NULL, conf, NULL); for (i = 0; i < 2; i++) { logq[i] = rd_kafka_queue_new(c[i]); TEST_CALL__(rd_kafka_set_log_queue(c[i], logq[i])); test_consumer_subscribe(c[i], topic); } while (1) { for (i = 0; i < 2; i++) { int64_t now; rd_kafka_message_t *rkm; /* Consumer is "processing". * When we are "processing", we poll the log queue. */ if (ts_next[i] > test_clock()) { rd_kafka_event_destroy( rd_kafka_queue_poll(logq[i], 100)); continue; } rkm = rd_kafka_consumer_poll(c[i], 100); if (!rkm) continue; if (rkm->err) { TEST_WARN( "Consumer %d error: %s: " "ignoring\n", i, rd_kafka_message_errstr(rkm)); continue; } now = test_clock(); cmsgcnt++; TEST_SAY( "Consumer %d received message (#%d) " "at offset %" PRId64 "\n", i, cmsgcnt, rkm->offset); if (ts_exp_msg[i]) { /* This consumer is expecting a message * after a certain time, namely after the * rebalance following max.poll.. being * exceeded in the other consumer */ TEST_ASSERT( now > ts_exp_msg[i], "Consumer %d: did not expect " "message for at least %dms", i, (int)((ts_exp_msg[i] - now) / 1000)); TEST_ASSERT( now < ts_exp_msg[i] + 10000 * 1000, "Consumer %d: expected message " "within 10s, not after %dms", i, (int)((now - ts_exp_msg[i]) / 1000)); TEST_SAY( "Consumer %d: received message " "at offset %" PRId64 " after rebalance\n", i, rkm->offset); rd_kafka_message_destroy(rkm); goto done; } else if (cmsgcnt == 1) { /* Process this message for 20s */ ts_next[i] = now + (20000 * 1000); /* Exp message on other consumer after * max.poll.interval.ms */ ts_exp_msg[i ^ 1] = now + (10000 * 1000); /* This is the bad consumer */ bad = i; TEST_SAY( "Consumer %d processing message at " "offset %" PRId64 "\n", i, rkm->offset); rd_kafka_message_destroy(rkm); } else { rd_kafka_message_destroy(rkm); TEST_FAIL( "Consumer %d did not expect " "a message", i); } } } done: TEST_ASSERT(bad != -1, "Bad consumer not set"); /* Wait for error ERR__MAX_POLL_EXCEEDED on the bad consumer. */ while (1) { rd_kafka_message_t *rkm; rkm = rd_kafka_consumer_poll(c[bad], 1000); TEST_ASSERT(rkm, "Expected consumer result within 1s"); TEST_ASSERT(rkm->err, "Did not expect message on bad consumer"); TEST_SAY("Consumer error: %s: %s\n", rd_kafka_err2name(rkm->err), rd_kafka_message_errstr(rkm)); if (rkm->err == RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED) { rd_kafka_message_destroy(rkm); break; } rd_kafka_message_destroy(rkm); } for (i = 0; i < 2; i++) { rd_kafka_destroy_flags(c[i], RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE); rd_kafka_queue_destroy(logq[i]); } SUB_TEST_PASS(); } int main_0089_max_poll_interval(int argc, char **argv) { do_test(); do_test_with_log_queue(); return 0; }