summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/tests/0040-io_event.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/0040-io_event.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/tests/0040-io_event.c251
1 files changed, 251 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/0040-io_event.c b/fluent-bit/lib/librdkafka-2.1.0/tests/0040-io_event.c
new file mode 100644
index 000000000..d47da5206
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/tests/0040-io_event.c
@@ -0,0 +1,251 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2013, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * Tests the queue IO event signalling.
+ */
+
+
+#include "test.h"
+
+/* Typical include path would be <librdkafka/rdkafka.h>, but this program
+ * is built from within the librdkafka source tree and thus differs. */
+#include "rdkafka.h" /* for Kafka driver */
+
+#include <fcntl.h>
+#ifdef _WIN32
+#include <io.h>
+#pragma comment(lib, "ws2_32.lib")
+#else
+#include <unistd.h>
+#include <poll.h>
+#endif
+
+
+
+int main_0040_io_event(int argc, char **argv) {
+ rd_kafka_conf_t *conf;
+ rd_kafka_topic_conf_t *tconf;
+ rd_kafka_t *rk_p, *rk_c;
+ const char *topic;
+ rd_kafka_topic_t *rkt_p;
+ rd_kafka_queue_t *queue;
+ uint64_t testid;
+ int msgcnt = test_quick ? 10 : 100;
+ int recvd = 0;
+ int fds[2];
+ int wait_multiplier = 1;
+ struct pollfd pfd;
+ int r;
+ rd_kafka_resp_err_t err;
+ enum { _NOPE, _YEP, _REBALANCE } expecting_io = _REBALANCE;
+
+#ifdef _WIN32
+ TEST_SKIP("WSAPoll and pipes are not reliable on Win32 (FIXME)\n");
+ return 0;
+#endif
+ testid = test_id_generate();
+ topic = test_mk_topic_name(__FUNCTION__, 1);
+
+ rk_p = test_create_producer();
+ rkt_p = test_create_producer_topic(rk_p, topic, NULL);
+ err = test_auto_create_topic_rkt(rk_p, rkt_p, tmout_multip(5000));
+ TEST_ASSERT(!err, "Topic auto creation failed: %s",
+ rd_kafka_err2str(err));
+
+ test_conf_init(&conf, &tconf, 0);
+ rd_kafka_conf_set_events(conf, RD_KAFKA_EVENT_REBALANCE);
+ test_conf_set(conf, "session.timeout.ms", "6000");
+ test_conf_set(conf, "enable.partition.eof", "false");
+ /* Speed up propagation of new topics */
+ test_conf_set(conf, "metadata.max.age.ms", "1000");
+ test_topic_conf_set(tconf, "auto.offset.reset", "earliest");
+ rk_c = test_create_consumer(topic, NULL, conf, tconf);
+
+ queue = rd_kafka_queue_get_consumer(rk_c);
+
+ test_consumer_subscribe(rk_c, topic);
+
+#ifndef _WIN32
+ r = pipe(fds);
+#else
+ r = _pipe(fds, 2, _O_BINARY);
+#endif
+ if (r == -1)
+ TEST_FAIL("pipe() failed: %s\n", strerror(errno));
+
+ rd_kafka_queue_io_event_enable(queue, fds[1], "1", 1);
+
+ pfd.fd = fds[0];
+ pfd.events = POLLIN;
+ pfd.revents = 0;
+
+ /**
+ * 1) Wait for rebalance event
+ * 2) Wait 1 interval (1s) expecting no IO (nothing produced).
+ * 3) Produce half the messages
+ * 4) Expect IO
+ * 5) Consume the available messages
+ * 6) Wait 1 interval expecting no IO.
+ * 7) Produce remaing half
+ * 8) Expect IO
+ * 9) Done.
+ */
+ while (recvd < msgcnt) {
+#ifndef _WIN32
+ r = poll(&pfd, 1, 1000 * wait_multiplier);
+#else
+ r = WSAPoll(&pfd, 1, 1000 * wait_multiplier);
+#endif
+ if (r == -1) {
+ TEST_FAIL("poll() failed: %s", strerror(errno));
+
+ } else if (r == 1) {
+ rd_kafka_event_t *rkev;
+ char b;
+ int eventcnt = 0;
+
+ if (pfd.events & POLLERR)
+ TEST_FAIL("Poll error\n");
+ if (!(pfd.events & POLLIN)) {
+ TEST_SAY("Stray event 0x%x\n", (int)pfd.events);
+ continue;
+ }
+
+ TEST_SAY("POLLIN\n");
+ /* Read signaling token to purge socket queue and
+ * eventually silence POLLIN */
+#ifndef _WIN32
+ r = read(pfd.fd, &b, 1);
+#else
+ r = _read((int)pfd.fd, &b, 1);
+#endif
+ if (r == -1)
+ TEST_FAIL("read failed: %s\n", strerror(errno));
+
+ if (!expecting_io)
+ TEST_WARN(
+ "Got unexpected IO after %d/%d msgs\n",
+ recvd, msgcnt);
+
+ while ((rkev = rd_kafka_queue_poll(queue, 0))) {
+ eventcnt++;
+ switch (rd_kafka_event_type(rkev)) {
+ case RD_KAFKA_EVENT_REBALANCE:
+ TEST_SAY(
+ "Got %s: %s\n",
+ rd_kafka_event_name(rkev),
+ rd_kafka_err2str(
+ rd_kafka_event_error(rkev)));
+ if (expecting_io != _REBALANCE)
+ TEST_FAIL(
+ "Got Rebalance when "
+ "expecting message\n");
+ if (rd_kafka_event_error(rkev) ==
+ RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
+ rd_kafka_assign(
+ rk_c,
+ rd_kafka_event_topic_partition_list(
+ rkev));
+ expecting_io = _NOPE;
+ } else
+ rd_kafka_assign(rk_c, NULL);
+ break;
+
+ case RD_KAFKA_EVENT_FETCH:
+ if (expecting_io != _YEP)
+ TEST_FAIL(
+ "Did not expect more "
+ "messages at %d/%d\n",
+ recvd, msgcnt);
+ recvd++;
+ if (recvd == (msgcnt / 2) ||
+ recvd == msgcnt)
+ expecting_io = _NOPE;
+ break;
+
+ case RD_KAFKA_EVENT_ERROR:
+ TEST_FAIL(
+ "Error: %s\n",
+ rd_kafka_event_error_string(rkev));
+ break;
+
+ default:
+ TEST_SAY("Ignoring event %s\n",
+ rd_kafka_event_name(rkev));
+ }
+
+ rd_kafka_event_destroy(rkev);
+ }
+ TEST_SAY("%d events, Consumed %d/%d messages\n",
+ eventcnt, recvd, msgcnt);
+
+ wait_multiplier = 1;
+
+ } else {
+ if (expecting_io == _REBALANCE) {
+ continue;
+ } else if (expecting_io == _YEP) {
+ TEST_FAIL(
+ "Did not see expected IO after %d/%d "
+ "msgs\n",
+ recvd, msgcnt);
+ }
+
+ TEST_SAY("IO poll timeout (good)\n");
+
+ TEST_SAY("Got idle period, producing\n");
+ test_produce_msgs(rk_p, rkt_p, testid, 0, recvd,
+ msgcnt / 2, NULL, 10);
+
+ expecting_io = _YEP;
+ /* When running slowly (e.g., valgrind) it might take
+ * some time before the first message is received
+ * after producing. */
+ wait_multiplier = 3;
+ }
+ }
+ TEST_SAY("Done\n");
+
+ rd_kafka_topic_destroy(rkt_p);
+ rd_kafka_destroy(rk_p);
+
+ rd_kafka_queue_destroy(queue);
+ rd_kafka_consumer_close(rk_c);
+ rd_kafka_destroy(rk_c);
+
+#ifndef _WIN32
+ close(fds[0]);
+ close(fds[1]);
+#else
+ _close(fds[0]);
+ _close(fds[1]);
+#endif
+
+ return 0;
+}