summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/lib/librdkafka-2.1.0/tests/0030-offset_commit.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0030-offset_commit.c')
-rw-r--r--src/fluent-bit/lib/librdkafka-2.1.0/tests/0030-offset_commit.c589
1 files changed, 589 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0030-offset_commit.c b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0030-offset_commit.c
new file mode 100644
index 000000000..9b05cb420
--- /dev/null
+++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0030-offset_commit.c
@@ -0,0 +1,589 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2015, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+
+/* Typical include path would be <librdkafka/rdkafka.h>, but this program
+ * is built from within the librdkafka source tree and thus differs. */
+#include "rdkafka.h" /* for Kafka driver */
+
+
+/**
+ * Consumer: various offset commit constellations, matrix:
+ * enable.auto.commit, enable.auto.offset.store, async
+ */
+
+static char *topic;
+static const int msgcnt = 100;
+static const int partition = 0;
+static uint64_t testid;
+
+static int64_t expected_offset = 0;
+static int64_t committed_offset = -1;
+
+
+static void offset_commit_cb(rd_kafka_t *rk,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *offsets,
+ void *opaque) {
+ rd_kafka_topic_partition_t *rktpar;
+
+ TEST_SAYL(3, "Offset committed: %s:\n", rd_kafka_err2str(err));
+ if (err == RD_KAFKA_RESP_ERR__NO_OFFSET)
+ return;
+
+ test_print_partition_list(offsets);
+ if (err)
+ TEST_FAIL("Offset commit failed: %s", rd_kafka_err2str(err));
+ if (offsets->cnt == 0)
+ TEST_FAIL(
+ "Expected at least one partition in offset_commit_cb");
+
+ /* Find correct partition */
+ if (!(rktpar = rd_kafka_topic_partition_list_find(offsets, topic,
+ partition)))
+ return;
+
+ if (rktpar->err)
+ TEST_FAIL("Offset commit failed for partitioń : %s",
+ rd_kafka_err2str(rktpar->err));
+
+ if (rktpar->offset > expected_offset)
+ TEST_FAIL("Offset committed %" PRId64
+ " > expected offset %" PRId64,
+ rktpar->offset, expected_offset);
+
+ if (rktpar->offset < committed_offset)
+ TEST_FAIL("Old offset %" PRId64
+ " (re)committed: "
+ "should be above committed_offset %" PRId64,
+ rktpar->offset, committed_offset);
+ else if (rktpar->offset == committed_offset)
+ TEST_SAYL(1, "Current offset re-committed: %" PRId64 "\n",
+ rktpar->offset);
+ else
+ committed_offset = rktpar->offset;
+
+ if (rktpar->offset < expected_offset) {
+ TEST_SAYL(3,
+ "Offset committed %" PRId64
+ " < expected offset %" PRId64 "\n",
+ rktpar->offset, expected_offset);
+ return;
+ }
+
+ TEST_SAYL(3, "Expected offset committed: %" PRId64 "\n",
+ rktpar->offset);
+}
+
+
+static void do_offset_test(const char *what,
+ int auto_commit,
+ int auto_store,
+ int async,
+ int subscribe) {
+ test_timing_t t_all;
+ char groupid[64];
+ rd_kafka_t *rk;
+ rd_kafka_conf_t *conf;
+ rd_kafka_topic_conf_t *tconf;
+ int cnt = 0;
+ const int extra_cnt = 5;
+ rd_kafka_resp_err_t err;
+ rd_kafka_topic_partition_list_t *parts;
+ rd_kafka_topic_partition_t *rktpar;
+ int64_t next_offset = -1;
+
+ SUB_TEST_QUICK("%s", what);
+
+ test_conf_init(&conf, &tconf, subscribe ? 30 : 10);
+ test_conf_set(conf, "session.timeout.ms", "6000");
+ test_conf_set(conf, "enable.auto.commit",
+ auto_commit ? "true" : "false");
+ test_conf_set(conf, "enable.auto.offset.store",
+ auto_store ? "true" : "false");
+ test_conf_set(conf, "auto.commit.interval.ms", "500");
+ rd_kafka_conf_set_offset_commit_cb(conf, offset_commit_cb);
+ test_topic_conf_set(tconf, "auto.offset.reset", "smallest");
+ test_str_id_generate(groupid, sizeof(groupid));
+ test_conf_set(conf, "group.id", groupid);
+ rd_kafka_conf_set_default_topic_conf(conf, tconf);
+
+ TIMING_START(&t_all, "%s", what);
+
+ expected_offset = 0;
+ committed_offset = -1;
+
+ /* MO:
+ * - Create consumer.
+ * - Start consuming from beginning
+ * - Perform store & commits according to settings
+ * - Stop storing&committing when half of the messages are consumed,
+ * - but consume 5 more to check against.
+ * - Query position.
+ * - Destroy consumer.
+ * - Create new consumer with same group.id using stored offsets
+ * - Should consume the expected message.
+ */
+
+ /* Create kafka instance */
+ rk = test_create_handle(RD_KAFKA_CONSUMER, rd_kafka_conf_dup(conf));
+
+ rd_kafka_poll_set_consumer(rk);
+
+ if (subscribe) {
+ test_consumer_subscribe(rk, topic);
+ } else {
+ parts = rd_kafka_topic_partition_list_new(1);
+ rd_kafka_topic_partition_list_add(parts, topic, partition);
+ test_consumer_assign("ASSIGN", rk, parts);
+ rd_kafka_topic_partition_list_destroy(parts);
+ }
+
+ while (cnt - extra_cnt < msgcnt / 2) {
+ rd_kafka_message_t *rkm;
+
+ rkm = rd_kafka_consumer_poll(rk, 10 * 1000);
+ if (!rkm)
+ continue;
+
+ if (rkm->err == RD_KAFKA_RESP_ERR__TIMED_OUT)
+ TEST_FAIL("%s: Timed out waiting for message %d", what,
+ cnt);
+ else if (rkm->err)
+ TEST_FAIL("%s: Consumer error: %s", what,
+ rd_kafka_message_errstr(rkm));
+
+ /* Offset of next message. */
+ next_offset = rkm->offset + 1;
+
+ if (cnt < msgcnt / 2) {
+ if (!auto_store) {
+ err = rd_kafka_offset_store(
+ rkm->rkt, rkm->partition, rkm->offset);
+ if (err)
+ TEST_FAIL(
+ "%s: offset_store failed: %s\n",
+ what, rd_kafka_err2str(err));
+ }
+ expected_offset = rkm->offset + 1;
+ if (!auto_commit) {
+ test_timing_t t_commit;
+ TIMING_START(&t_commit, "%s @ %" PRId64,
+ async ? "commit.async"
+ : "commit.sync",
+ rkm->offset + 1);
+ err = rd_kafka_commit_message(rk, rkm, async);
+ TIMING_STOP(&t_commit);
+ if (err)
+ TEST_FAIL("%s: commit failed: %s\n",
+ what, rd_kafka_err2str(err));
+ }
+
+ } else if (auto_store && auto_commit)
+ expected_offset = rkm->offset + 1;
+
+ rd_kafka_message_destroy(rkm);
+ cnt++;
+ }
+
+ TEST_SAY("%s: done consuming after %d messages, at offset %" PRId64
+ ", next_offset %" PRId64 "\n",
+ what, cnt, expected_offset, next_offset);
+
+ if ((err = rd_kafka_assignment(rk, &parts)))
+ TEST_FAIL("%s: failed to get assignment(): %s\n", what,
+ rd_kafka_err2str(err));
+
+ /* Verify position */
+ if ((err = rd_kafka_position(rk, parts)))
+ TEST_FAIL("%s: failed to get position(): %s\n", what,
+ rd_kafka_err2str(err));
+ if (!(rktpar =
+ rd_kafka_topic_partition_list_find(parts, topic, partition)))
+ TEST_FAIL("%s: position(): topic lost\n", what);
+ if (rktpar->offset != next_offset)
+ TEST_FAIL("%s: Expected position() offset %" PRId64
+ ", got %" PRId64,
+ what, next_offset, rktpar->offset);
+ TEST_SAY("%s: Position is at %" PRId64 ", good!\n", what,
+ rktpar->offset);
+
+ /* Pause messages while waiting so we can serve callbacks
+ * without having more messages received. */
+ if ((err = rd_kafka_pause_partitions(rk, parts)))
+ TEST_FAIL("%s: failed to pause partitions: %s\n", what,
+ rd_kafka_err2str(err));
+ rd_kafka_topic_partition_list_destroy(parts);
+
+ /* Fire off any enqueued offset_commit_cb */
+ test_consumer_poll_no_msgs(what, rk, testid, 0);
+
+ TEST_SAY("%s: committed_offset %" PRId64 ", expected_offset %" PRId64
+ "\n",
+ what, committed_offset, expected_offset);
+
+ if (!auto_commit && !async) {
+ /* Sync commits should be up to date at this point. */
+ if (committed_offset != expected_offset)
+ TEST_FAIL("%s: Sync commit: committed offset %" PRId64
+ " should be same as expected offset "
+ "%" PRId64,
+ what, committed_offset, expected_offset);
+ } else {
+
+ /* Wait for offset commits to catch up */
+ while (committed_offset < expected_offset) {
+ TEST_SAYL(2,
+ "%s: Wait for committed offset %" PRId64
+ " to reach expected offset %" PRId64 "\n",
+ what, committed_offset, expected_offset);
+ test_consumer_poll_no_msgs(what, rk, testid, 1000);
+ }
+ }
+
+ TEST_SAY(
+ "%s: phase 1 complete, %d messages consumed, "
+ "next expected offset is %" PRId64 "\n",
+ what, cnt, expected_offset);
+
+ /* Issue #827: cause committed() to return prematurely by specifying
+ * low timeout. The bug (use after free) will only
+ * be catched by valgrind.
+ *
+ * rusage: this triggers a bunch of protocol requests which
+ * increase .ucpu, .scpu, .ctxsw.
+ */
+ do {
+ parts = rd_kafka_topic_partition_list_new(1);
+ rd_kafka_topic_partition_list_add(parts, topic, partition);
+ err = rd_kafka_committed(rk, parts, 1);
+ rd_kafka_topic_partition_list_destroy(parts);
+ if (err)
+ TEST_SAY("Issue #827: committed() returned %s\n",
+ rd_kafka_err2str(err));
+ } while (err != RD_KAFKA_RESP_ERR__TIMED_OUT);
+
+ /* Query position */
+ parts = rd_kafka_topic_partition_list_new(1);
+ rd_kafka_topic_partition_list_add(parts, topic, partition);
+
+ err = rd_kafka_committed(rk, parts, tmout_multip(5 * 1000));
+ if (err)
+ TEST_FAIL("%s: committed() failed: %s", what,
+ rd_kafka_err2str(err));
+ if (!(rktpar =
+ rd_kafka_topic_partition_list_find(parts, topic, partition)))
+ TEST_FAIL("%s: committed(): topic lost\n", what);
+ if (rktpar->offset != expected_offset)
+ TEST_FAIL("%s: Expected committed() offset %" PRId64
+ ", got %" PRId64,
+ what, expected_offset, rktpar->offset);
+ TEST_SAY("%s: Committed offset is at %" PRId64 ", good!\n", what,
+ rktpar->offset);
+
+ rd_kafka_topic_partition_list_destroy(parts);
+ test_consumer_close(rk);
+ rd_kafka_destroy(rk);
+
+
+
+ /* Fire up a new consumer and continue from where we left off. */
+ TEST_SAY("%s: phase 2: starting new consumer to resume consumption\n",
+ what);
+ rk = test_create_handle(RD_KAFKA_CONSUMER, conf);
+ rd_kafka_poll_set_consumer(rk);
+
+ if (subscribe) {
+ test_consumer_subscribe(rk, topic);
+ } else {
+ parts = rd_kafka_topic_partition_list_new(1);
+ rd_kafka_topic_partition_list_add(parts, topic, partition);
+ test_consumer_assign("ASSIGN", rk, parts);
+ rd_kafka_topic_partition_list_destroy(parts);
+ }
+
+ while (cnt < msgcnt) {
+ rd_kafka_message_t *rkm;
+
+ rkm = rd_kafka_consumer_poll(rk, 10 * 1000);
+ if (!rkm)
+ continue;
+
+ if (rkm->err == RD_KAFKA_RESP_ERR__TIMED_OUT)
+ TEST_FAIL("%s: Timed out waiting for message %d", what,
+ cnt);
+ else if (rkm->err)
+ TEST_FAIL("%s: Consumer error: %s", what,
+ rd_kafka_message_errstr(rkm));
+
+ if (rkm->offset != expected_offset)
+ TEST_FAIL("%s: Received message offset %" PRId64
+ ", expected %" PRId64 " at msgcnt %d/%d\n",
+ what, rkm->offset, expected_offset, cnt,
+ msgcnt);
+
+ rd_kafka_message_destroy(rkm);
+ expected_offset++;
+ cnt++;
+ }
+
+
+ TEST_SAY("%s: phase 2: complete\n", what);
+ test_consumer_close(rk);
+ rd_kafka_destroy(rk);
+
+ TIMING_STOP(&t_all);
+
+ SUB_TEST_PASS();
+}
+
+
+static void empty_offset_commit_cb(rd_kafka_t *rk,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *offsets,
+ void *opaque) {
+ rd_kafka_resp_err_t expected = *(rd_kafka_resp_err_t *)opaque;
+ int valid_offsets = 0;
+ int i;
+
+ TEST_SAY(
+ "Offset commit callback for %d partitions: %s (expecting %s)\n",
+ offsets ? offsets->cnt : 0, rd_kafka_err2str(err),
+ rd_kafka_err2str(expected));
+
+ if (expected != err)
+ TEST_FAIL("Offset commit cb: expected %s, got %s",
+ rd_kafka_err2str(expected), rd_kafka_err2str(err));
+
+ for (i = 0; i < offsets->cnt; i++) {
+ TEST_SAY("committed: %s [%" PRId32 "] offset %" PRId64 ": %s\n",
+ offsets->elems[i].topic, offsets->elems[i].partition,
+ offsets->elems[i].offset,
+ rd_kafka_err2str(offsets->elems[i].err));
+
+ if (expected == RD_KAFKA_RESP_ERR_NO_ERROR)
+ TEST_ASSERT(offsets->elems[i].err == expected);
+ if (offsets->elems[i].offset > 0)
+ valid_offsets++;
+ }
+
+ if (expected == RD_KAFKA_RESP_ERR_NO_ERROR) {
+ /* If no error is expected we instead expect one proper offset
+ * to have been committed. */
+ TEST_ASSERT(valid_offsets > 0);
+ }
+}
+
+
+/**
+ * Trigger an empty cgrp commit (issue #803)
+ */
+static void do_empty_commit(void) {
+ rd_kafka_t *rk;
+ char group_id[64];
+ rd_kafka_conf_t *conf;
+ rd_kafka_topic_conf_t *tconf;
+ rd_kafka_resp_err_t err, expect;
+
+ SUB_TEST_QUICK();
+
+ test_conf_init(&conf, &tconf, 20);
+ test_conf_set(conf, "enable.auto.commit", "false");
+ test_topic_conf_set(tconf, "auto.offset.reset", "earliest");
+ test_str_id_generate(group_id, sizeof(group_id));
+
+ TEST_SAY(_C_MAG "[ do_empty_commit group.id %s ]\n", group_id);
+
+ rk = test_create_consumer(group_id, NULL, conf, tconf);
+
+ test_consumer_subscribe(rk, topic);
+
+ test_consumer_poll("consume", rk, testid, -1, -1, 100, NULL);
+
+ TEST_SAY("First commit\n");
+ expect = RD_KAFKA_RESP_ERR_NO_ERROR;
+ err = rd_kafka_commit_queue(rk, NULL, NULL, empty_offset_commit_cb,
+ &expect);
+ if (err != expect)
+ TEST_FAIL("commit failed: %s", rd_kafka_err2str(err));
+ else
+ TEST_SAY("First commit returned %s\n", rd_kafka_err2str(err));
+
+ TEST_SAY("Second commit, should be empty\n");
+ expect = RD_KAFKA_RESP_ERR__NO_OFFSET;
+ err = rd_kafka_commit_queue(rk, NULL, NULL, empty_offset_commit_cb,
+ &expect);
+ if (err != RD_KAFKA_RESP_ERR__NO_OFFSET)
+ TEST_FAIL("unexpected commit result, wanted NO_OFFSET, got: %s",
+ rd_kafka_err2str(err));
+ else
+ TEST_SAY("Second commit returned %s\n", rd_kafka_err2str(err));
+
+ test_consumer_close(rk);
+
+ rd_kafka_destroy(rk);
+
+ SUB_TEST_PASS();
+}
+
+
+/**
+ * Commit non-existent topic (issue #704)
+ */
+static void nonexist_offset_commit_cb(rd_kafka_t *rk,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *offsets,
+ void *opaque) {
+ int i;
+ int failed_offsets = 0;
+
+ TEST_SAY("Offset commit callback for %d partitions: %s\n",
+ offsets ? offsets->cnt : 0, rd_kafka_err2str(err));
+
+ TEST_ASSERT(offsets != NULL);
+
+ for (i = 0; i < offsets->cnt; i++) {
+ TEST_SAY("committed: %s [%" PRId32 "] offset %" PRId64 ": %s\n",
+ offsets->elems[i].topic, offsets->elems[i].partition,
+ offsets->elems[i].offset,
+ rd_kafka_err2str(offsets->elems[i].err));
+ failed_offsets += offsets->elems[i].err ? 1 : 0;
+ }
+
+ TEST_ASSERT(err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,
+ "expected unknown Topic or partition, not %s",
+ rd_kafka_err2str(err));
+ TEST_ASSERT(offsets->cnt == 2, "expected %d offsets", offsets->cnt);
+ TEST_ASSERT(failed_offsets == offsets->cnt,
+ "expected %d offsets to have failed, got %d", offsets->cnt,
+ failed_offsets);
+}
+
+static void do_nonexist_commit(void) {
+ rd_kafka_t *rk;
+ char group_id[64];
+ rd_kafka_conf_t *conf;
+ rd_kafka_topic_conf_t *tconf;
+ rd_kafka_topic_partition_list_t *offsets;
+ const char *unk_topic = test_mk_topic_name(__FUNCTION__, 1);
+ rd_kafka_resp_err_t err;
+
+ SUB_TEST_QUICK();
+
+ test_conf_init(&conf, &tconf, 20);
+ /* Offset commit deferrals when the broker is down is limited to
+ * session.timeout.ms. With 0.9 brokers and api.version.request=true
+ * the initial connect to all brokers will take 10*2 seconds
+ * and the commit_queue() below will time out too quickly.
+ * Set the session timeout high here to avoid it. */
+ test_conf_set(conf, "session.timeout.ms", "60000");
+
+ test_str_id_generate(group_id, sizeof(group_id));
+ test_conf_set(conf, "group.id", group_id);
+
+ rd_kafka_conf_set_default_topic_conf(conf, tconf);
+
+ TEST_SAY(_C_MAG "[ do_nonexist_commit group.id %s ]\n", group_id);
+
+ rk = test_create_handle(RD_KAFKA_CONSUMER, conf);
+ rd_kafka_poll_set_consumer(rk);
+
+ TEST_SAY("Try nonexist commit\n");
+ offsets = rd_kafka_topic_partition_list_new(2);
+ rd_kafka_topic_partition_list_add(offsets, unk_topic, 0)->offset = 123;
+ rd_kafka_topic_partition_list_add(offsets, unk_topic, 1)->offset = 456;
+
+ err = rd_kafka_commit_queue(rk, offsets, NULL,
+ nonexist_offset_commit_cb, NULL);
+ TEST_SAY("nonexist commit returned %s\n", rd_kafka_err2str(err));
+ if (err != RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART)
+ TEST_FAIL("commit() should give UnknownTopicOrPart, not: %s",
+ rd_kafka_err2str(err));
+
+ rd_kafka_topic_partition_list_destroy(offsets);
+
+ test_consumer_close(rk);
+
+ rd_kafka_destroy(rk);
+
+ SUB_TEST_PASS();
+}
+
+
+int main_0030_offset_commit(int argc, char **argv) {
+
+ topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1));
+ testid = test_produce_msgs_easy(topic, 0, partition, msgcnt);
+
+ do_empty_commit();
+
+ do_nonexist_commit();
+
+ do_offset_test("AUTO.COMMIT & AUTO.STORE", 1 /* enable.auto.commit */,
+ 1 /* enable.auto.offset.store */, 0 /* not used. */,
+ 1 /* use subscribe */);
+
+ do_offset_test("MANUAL.COMMIT.ASYNC & AUTO.STORE",
+ 0 /* enable.auto.commit */,
+ 1 /* enable.auto.offset.store */, 1 /* async */,
+ 1 /* use subscribe */);
+
+ do_offset_test("AUTO.COMMIT.ASYNC & AUTO.STORE & ASSIGN",
+ 1 /* enable.auto.commit */,
+ 1 /* enable.auto.offset.store */, 0 /* not used. */,
+ 0 /* use assign */);
+
+ if (test_quick) {
+ rd_free(topic);
+ return 0;
+ }
+
+ do_offset_test("AUTO.COMMIT & MANUAL.STORE", 1 /* enable.auto.commit */,
+ 0 /* enable.auto.offset.store */, 0 /* not used */,
+ 1 /* use subscribe */);
+
+ do_offset_test("MANUAL.COMMIT.SYNC & AUTO.STORE",
+ 0 /* enable.auto.commit */,
+ 1 /* enable.auto.offset.store */, 0 /* async */,
+ 1 /* use subscribe */);
+
+ do_offset_test("MANUAL.COMMIT.ASYNC & MANUAL.STORE",
+ 0 /* enable.auto.commit */,
+ 0 /* enable.auto.offset.store */, 1 /* sync */,
+ 1 /* use subscribe */);
+
+ do_offset_test("MANUAL.COMMIT.SYNC & MANUAL.STORE",
+ 0 /* enable.auto.commit */,
+ 0 /* enable.auto.offset.store */, 0 /* sync */,
+ 1 /* use subscribe */);
+
+ rd_free(topic);
+
+ return 0;
+}