summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/tests/0098-consumer-txn.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/0098-consumer-txn.cpp')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/tests/0098-consumer-txn.cpp1218
1 files changed, 0 insertions, 1218 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/0098-consumer-txn.cpp b/fluent-bit/lib/librdkafka-2.1.0/tests/0098-consumer-txn.cpp
deleted file mode 100644
index 1bdb46d0b..000000000
--- a/fluent-bit/lib/librdkafka-2.1.0/tests/0098-consumer-txn.cpp
+++ /dev/null
@@ -1,1218 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2016, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include "testcpp.h"
-
-#if WITH_RAPIDJSON
-
-#include <iostream>
-#include <cstring>
-#include <cstdlib>
-#include <assert.h>
-#include <sstream>
-#include <string>
-#include <map>
-
-#include <rapidjson/document.h>
-#include <rapidjson/schema.h>
-#include <rapidjson/filereadstream.h>
-#include <rapidjson/stringbuffer.h>
-#include <rapidjson/error/en.h>
-#include <rapidjson/prettywriter.h>
-
-
-/**
- * @name Consumer Transactions.
- *
- * - Uses the TransactionProducerCli Java application to produce messages
- * that are part of abort and commit transactions in various combinations
- * and tests that librdkafka consumes them as expected. Refer to
- * TransactionProducerCli.java for scenarios covered.
- */
-
-
-class TestEventCb : public RdKafka::EventCb {
- public:
- static bool should_capture_stats;
- static bool has_captured_stats;
- static int64_t partition_0_hi_offset;
- static int64_t partition_0_ls_offset;
- static std::string topic;
-
- void event_cb(RdKafka::Event &event) {
- switch (event.type()) {
- case RdKafka::Event::EVENT_STATS:
- if (should_capture_stats) {
- partition_0_hi_offset = -1;
- partition_0_ls_offset = -1;
-
- has_captured_stats = true;
- should_capture_stats = false;
- char path[256];
-
- /* Parse JSON to validate */
- rapidjson::Document d;
- if (d.Parse(event.str().c_str()).HasParseError())
- Test::Fail(tostr() << "Failed to parse stats JSON: "
- << rapidjson::GetParseError_En(d.GetParseError())
- << " at " << d.GetErrorOffset());
-
- rd_snprintf(path, sizeof(path), "/topics/%s/partitions/0",
- topic.c_str());
-
- rapidjson::Pointer jpath((const char *)path);
- rapidjson::Value *pp = rapidjson::GetValueByPointer(d, jpath);
- if (pp == NULL)
- return;
-
- TEST_ASSERT(pp->HasMember("hi_offset"), "hi_offset not found in stats");
- TEST_ASSERT(pp->HasMember("ls_offset"), "ls_offset not found in stats");
-
- partition_0_hi_offset = (*pp)["hi_offset"].GetInt();
- partition_0_ls_offset = (*pp)["ls_offset"].GetInt();
- }
- break;
-
- case RdKafka::Event::EVENT_LOG:
- std::cerr << event.str() << "\n";
- break;
-
- default:
- break;
- }
- }
-};
-
-bool TestEventCb::should_capture_stats;
-bool TestEventCb::has_captured_stats;
-int64_t TestEventCb::partition_0_hi_offset;
-int64_t TestEventCb::partition_0_ls_offset;
-std::string TestEventCb::topic;
-
-static TestEventCb ex_event_cb;
-
-
-static void execute_java_produce_cli(std::string &bootstrapServers,
- const std::string &topic,
- const std::string &testidstr,
- const char **cmds,
- size_t cmd_cnt) {
- const std::string topicCmd = "topic," + topic;
- const std::string testidCmd = "testid," + testidstr;
- const char **argv;
- size_t i = 0;
-
- argv = (const char **)rd_alloca(sizeof(*argv) * (1 + 1 + 1 + cmd_cnt + 1));
- argv[i++] = bootstrapServers.c_str();
- argv[i++] = topicCmd.c_str();
- argv[i++] = testidCmd.c_str();
-
- for (size_t j = 0; j < cmd_cnt; j++)
- argv[i++] = cmds[j];
-
- argv[i] = NULL;
-
- int pid = test_run_java("TransactionProducerCli", (const char **)argv);
- test_waitpid(pid);
-}
-
-static std::vector<RdKafka::Message *>
-consume_messages(RdKafka::KafkaConsumer *c, std::string topic, int partition) {
- RdKafka::ErrorCode err;
-
- /* Assign partitions */
- std::vector<RdKafka::TopicPartition *> parts;
- parts.push_back(RdKafka::TopicPartition::create(topic, partition));
- if ((err = c->assign(parts)))
- Test::Fail("assign failed: " + RdKafka::err2str(err));
- RdKafka::TopicPartition::destroy(parts);
-
- Test::Say(tostr() << "Consuming from topic " << topic << " partition "
- << partition << "\n");
- std::vector<RdKafka::Message *> result = std::vector<RdKafka::Message *>();
-
- while (true) {
- RdKafka::Message *msg = c->consume(tmout_multip(1000));
- switch (msg->err()) {
- case RdKafka::ERR__TIMED_OUT:
- delete msg;
- continue;
- case RdKafka::ERR__PARTITION_EOF:
- delete msg;
- break;
- case RdKafka::ERR_NO_ERROR:
- result.push_back(msg);
- continue;
- default:
- Test::Fail("Error consuming from topic " + topic + ": " + msg->errstr());
- delete msg;
- break;
- }
- break;
- }
-
- Test::Say("Read all messages from topic: " + topic + "\n");
-
- TestEventCb::should_capture_stats = true;
-
- /* rely on the test timeout to prevent an infinite loop in
- * the (unlikely) event that the statistics callback isn't
- * called. */
- while (!TestEventCb::has_captured_stats) {
- RdKafka::Message *msg = c->consume(tmout_multip(500));
- delete msg;
- }
-
- Test::Say("Captured consumer statistics event\n");
-
- return result;
-}
-
-
-static void delete_messages(std::vector<RdKafka::Message *> &messages) {
- for (size_t i = 0; i < messages.size(); ++i)
- delete messages[i];
-}
-
-
-static std::string get_bootstrap_servers() {
- RdKafka::Conf *conf;
- std::string bootstrap_servers;
- Test::conf_init(&conf, NULL, 40);
- conf->get("bootstrap.servers", bootstrap_servers);
- delete conf;
- return bootstrap_servers;
-}
-
-
-static RdKafka::KafkaConsumer *create_consumer(std::string &topic_name,
- const char *isolation_level) {
- RdKafka::Conf *conf;
- std::string errstr;
-
- Test::conf_init(&conf, NULL, 40);
- Test::conf_set(conf, "group.id", topic_name);
- Test::conf_set(conf, "enable.auto.commit", "false");
- Test::conf_set(conf, "auto.offset.reset", "earliest");
- Test::conf_set(conf, "enable.partition.eof", "true");
- Test::conf_set(conf, "isolation.level", isolation_level);
- Test::conf_set(conf, "statistics.interval.ms", "1000");
- conf->set("event_cb", &ex_event_cb, errstr);
- TestEventCb::should_capture_stats = false;
- TestEventCb::has_captured_stats = false;
-
- RdKafka::KafkaConsumer *c = RdKafka::KafkaConsumer::create(conf, errstr);
- if (!c)
- Test::Fail("Failed to create KafkaConsumer: " + errstr);
-
- delete conf;
-
- return c;
-}
-
-
-static std::vector<std::string> csv_split(const std::string &input) {
- std::stringstream ss(input);
- std::vector<std::string> res;
-
- while (ss.good()) {
- std::string substr;
- std::getline(ss, substr, ',');
- /* Trim */
- substr.erase(0, substr.find_first_not_of(' '));
- substr.erase(substr.find_last_not_of(' ') + 1);
- res.push_back(substr);
- }
-
- return res;
-}
-
-
-
-enum TransactionType {
- TransactionType_None,
- TransactionType_BeginAbort,
- TransactionType_BeginCommit,
- TransactionType_BeginOpen,
- TransactionType_ContinueAbort,
- TransactionType_ContinueCommit,
- TransactionType_ContinueOpen
-};
-
-static TransactionType TransactionType_from_string(std::string str) {
-#define _CHKRET(NAME) \
- if (!str.compare(#NAME)) \
- return TransactionType_##NAME
-
- _CHKRET(None);
- _CHKRET(BeginAbort);
- _CHKRET(BeginCommit);
- _CHKRET(BeginOpen);
- _CHKRET(ContinueAbort);
- _CHKRET(ContinueCommit);
- _CHKRET(ContinueOpen);
-
- Test::Fail("Unknown TransactionType: " + str);
-
- return TransactionType_None; /* NOTREACHED */
-}
-
-
-static void txn_producer_makeTestMessages(RdKafka::Producer *producer,
- const std::string &topic,
- const std::string &testidstr,
- int partition,
- int idStart,
- int msgcount,
- TransactionType tt,
- bool do_flush) {
- RdKafka::Error *error;
-
- if (tt != TransactionType_None && tt != TransactionType_ContinueOpen &&
- tt != TransactionType_ContinueCommit &&
- tt != TransactionType_ContinueAbort) {
- error = producer->begin_transaction();
- if (error) {
- Test::Fail("begin_transaction() failed: " + error->str());
- delete error;
- }
- }
-
- for (int i = 0; i < msgcount; i++) {
- char key[] = {(char)((i + idStart) & 0xff)};
- char payload[] = {0x10, 0x20, 0x30, 0x40};
- RdKafka::ErrorCode err;
-
- err = producer->produce(topic, partition, producer->RK_MSG_COPY, payload,
- sizeof(payload), key, sizeof(key), 0, NULL);
- if (err)
- Test::Fail("produce() failed: " + RdKafka::err2str(err));
- }
-
- if (do_flush)
- producer->flush(-1);
-
- switch (tt) {
- case TransactionType_BeginAbort:
- case TransactionType_ContinueAbort:
- error = producer->abort_transaction(30 * 1000);
- if (error) {
- Test::Fail("abort_transaction() failed: " + error->str());
- delete error;
- }
- break;
-
- case TransactionType_BeginCommit:
- case TransactionType_ContinueCommit:
- error = producer->commit_transaction(30 * 1000);
- if (error) {
- Test::Fail("commit_transaction() failed: " + error->str());
- delete error;
- }
- break;
-
- default:
- break;
- }
-}
-
-
-class txnDeliveryReportCb : public RdKafka::DeliveryReportCb {
- public:
- void dr_cb(RdKafka::Message &msg) {
- switch (msg.err()) {
- case RdKafka::ERR__PURGE_QUEUE:
- case RdKafka::ERR__PURGE_INFLIGHT:
- /* These are expected when transactions are aborted */
- break;
-
- case RdKafka::ERR_NO_ERROR:
- break;
-
- default:
- Test::Fail("Delivery failed: " + msg.errstr());
- break;
- }
- }
-};
-
-
-/**
- * @brief Transactional producer, performing the commands in \p cmds.
- * This is the librdkafka counterpart of
- * java/TransactionProducerCli.java
- */
-static void txn_producer(const std::string &brokers,
- const std::string &topic,
- const std::string &testidstr,
- const char **cmds,
- size_t cmd_cnt) {
- RdKafka::Conf *conf;
- txnDeliveryReportCb txn_dr;
-
- Test::conf_init(&conf, NULL, 0);
- Test::conf_set(conf, "bootstrap.servers", brokers);
-
-
- std::map<std::string, RdKafka::Producer *> producers;
-
- for (size_t i = 0; i < cmd_cnt; i++) {
- std::string cmdstr = std::string(cmds[i]);
-
- Test::Say(_C_CLR "rdkafka txn producer command: " + cmdstr + "\n");
-
- std::vector<std::string> cmd = csv_split(cmdstr);
-
- if (!cmd[0].compare("sleep")) {
- rd_usleep(atoi(cmd[1].c_str()) * 1000, NULL);
-
- } else if (!cmd[0].compare("exit")) {
- break; /* We can't really simulate the Java exit behaviour
- * from in-process. */
-
- } else if (cmd[0].find("producer") == 0) {
- TransactionType txntype = TransactionType_from_string(cmd[4]);
-
- std::map<std::string, RdKafka::Producer *>::iterator it =
- producers.find(cmd[0]);
-
- RdKafka::Producer *producer;
-
- if (it == producers.end()) {
- /* Create producer if it doesn't exist */
- std::string errstr;
-
- Test::Say(tostr() << "Creating producer " << cmd[0]
- << " with transactiontype " << txntype << " '"
- << cmd[4] << "'\n");
-
- /* Config */
- Test::conf_set(conf, "enable.idempotence", "true");
- if (txntype != TransactionType_None)
- Test::conf_set(conf, "transactional.id",
- "test-transactional-id-c-" + testidstr + "-" + cmd[0]);
- else
- Test::conf_set(conf, "transactional.id", "");
- Test::conf_set(conf, "linger.ms", "5"); /* ensure batching */
- conf->set("dr_cb", &txn_dr, errstr);
-
- /* Create producer */
- producer = RdKafka::Producer::create(conf, errstr);
- if (!producer)
- Test::Fail("Failed to create producer " + cmd[0] + ": " + errstr);
-
- /* Init transactions if producer is transactional */
- if (txntype != TransactionType_None) {
- RdKafka::Error *error = producer->init_transactions(20 * 1000);
- if (error) {
- Test::Fail("init_transactions() failed: " + error->str());
- delete error;
- }
- }
-
-
- producers[cmd[0]] = producer;
- } else {
- producer = it->second;
- }
-
- txn_producer_makeTestMessages(
- producer, /* producer */
- topic, /* topic */
- testidstr, /* testid */
- atoi(cmd[1].c_str()), /* partition */
- (int)strtol(cmd[2].c_str(), NULL, 0), /* idStart */
- atoi(cmd[3].c_str()), /* msg count */
- txntype, /* TransactionType */
- !cmd[5].compare("DoFlush") /* Flush */);
-
- } else {
- Test::Fail("Unknown command: " + cmd[0]);
- }
- }
-
- delete conf;
-
- for (std::map<std::string, RdKafka::Producer *>::iterator it =
- producers.begin();
- it != producers.end(); it++)
- delete it->second;
-}
-
-
-
-static void do_test_consumer_txn_test(bool use_java_producer) {
- std::string errstr;
- std::string topic_name;
- RdKafka::KafkaConsumer *c;
- std::vector<RdKafka::Message *> msgs;
- std::string testidstr = test_str_id_generate_tmp();
-
- std::string bootstrap_servers = get_bootstrap_servers();
-
- Test::Say(tostr() << _C_BLU "[ Consumer transaction tests using "
- << (use_java_producer ? "java" : "librdkafka")
- << " producer with testid " << testidstr << "]\n" _C_CLR);
-
-#define run_producer(CMDS...) \
- do { \
- const char *_cmds[] = {CMDS}; \
- size_t _cmd_cnt = sizeof(_cmds) / sizeof(*_cmds); \
- if (use_java_producer) \
- execute_java_produce_cli(bootstrap_servers, topic_name, testidstr, \
- _cmds, _cmd_cnt); \
- else \
- txn_producer(bootstrap_servers, topic_name, testidstr, _cmds, _cmd_cnt); \
- } while (0)
-
- if (test_quick) {
- Test::Say("Skipping consumer_txn tests 0->4 due to quick mode\n");
- goto test5;
- }
-
-
- Test::Say(_C_BLU "Test 0 - basic commit + abort\n" _C_CLR);
-
- topic_name = Test::mk_topic_name("0098-consumer_txn-0", 1);
- c = create_consumer(topic_name, "READ_COMMITTED");
- Test::create_topic(c, topic_name.c_str(), 1, 3);
-
- run_producer("producer1, -1, 0x0, 5, BeginCommit, DoFlush",
- "producer1, -1, 0x10, 5, BeginAbort, DoFlush");
-
- msgs = consume_messages(c, topic_name, 0);
- TEST_ASSERT(msgs.size() == 5,
- "Consumed unexpected number of messages. "
- "Expected 5, got: %d",
- (int)msgs.size());
- TEST_ASSERT(msgs[0]->key_len() >= 1 && 0 == msgs[0]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[4]->key_len() >= 1 && 4 == msgs[4]->key()->c_str()[0],
- "Unexpected key");
- delete_messages(msgs);
- c->close();
- delete c;
-
-#define expect_msgcnt(msgcnt) \
- TEST_ASSERT(msgs.size() == msgcnt, "Expected %d messages, got %d", \
- (int)msgs.size(), msgcnt)
-
-#define expect_key(msgidx, value) \
- do { \
- TEST_ASSERT(msgs.size() > msgidx, \
- "Expected at least %d message(s), only got %d", msgidx + 1, \
- (int)msgs.size()); \
- TEST_ASSERT(msgs[msgidx]->key_len() == 1, \
- "Expected msg #%d key to be of size 1, not %d\n", msgidx, \
- (int)msgs[msgidx]->key_len()); \
- TEST_ASSERT(value == (int)msgs[msgidx]->key()->c_str()[0], \
- "Expected msg #%d key 0x%x, not 0x%x", msgidx, value, \
- (int)msgs[msgidx]->key()->c_str()[0]); \
- } while (0)
-
- c = create_consumer(topic_name, "READ_UNCOMMITTED");
- msgs = consume_messages(c, topic_name, 0);
- expect_msgcnt(10);
- expect_key(0, 0x0);
- expect_key(4, 0x4);
- expect_key(5, 0x10);
- expect_key(9, 0x14);
- delete_messages(msgs);
-
- Test::delete_topic(c, topic_name.c_str());
-
- c->close();
- delete c;
-
-
- Test::Say(_C_BLU "Test 0.1\n" _C_CLR);
-
- topic_name = Test::mk_topic_name("0098-consumer_txn-0.1", 1);
- c = create_consumer(topic_name, "READ_COMMITTED");
- Test::create_topic(c, topic_name.c_str(), 1, 3);
-
- run_producer("producer1, -1, 0x0, 5, BeginCommit, DontFlush",
- "producer1, -1, 0x10, 5, BeginAbort, DoFlush");
-
- msgs = consume_messages(c, topic_name, 0);
- TEST_ASSERT(msgs.size() == 5,
- "Consumed unexpected number of messages. "
- "Expected 5, got: %d",
- (int)msgs.size());
- TEST_ASSERT(msgs[0]->key_len() >= 1 && 0 == msgs[0]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[4]->key_len() >= 1 && 4 == msgs[4]->key()->c_str()[0],
- "Unexpected key");
- delete_messages(msgs);
- c->close();
- delete c;
-
- c = create_consumer(topic_name, "READ_UNCOMMITTED");
- msgs = consume_messages(c, topic_name, 0);
- TEST_ASSERT(msgs.size() == 10,
- "Consumed unexpected number of messages. "
- "Expected 10, got: %d",
- (int)msgs.size());
- TEST_ASSERT(msgs[0]->key_len() >= 1 && 0 == msgs[0]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[4]->key_len() >= 1 && 4 == msgs[4]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[5]->key_len() >= 1 && 0x10 == msgs[5]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[9]->key_len() >= 1 && 0x14 == msgs[9]->key()->c_str()[0],
- "Unexpected key");
- delete_messages(msgs);
-
- Test::delete_topic(c, topic_name.c_str());
-
- c->close();
- delete c;
-
-
- Test::Say(_C_BLU "Test 0.2\n" _C_CLR);
-
- topic_name = Test::mk_topic_name("0098-consumer_txn-0.2", 1);
- c = create_consumer(topic_name, "READ_COMMITTED");
- Test::create_topic(c, topic_name.c_str(), 1, 3);
-
- run_producer("producer1, -1, 0x10, 5, BeginAbort, DoFlush",
- "producer1, -1, 0x30, 5, BeginCommit, DoFlush");
-
- msgs = consume_messages(c, topic_name, 0);
- TEST_ASSERT(msgs.size() == 5,
- "Consumed unexpected number of messages. "
- "Expected 5, got: %d",
- (int)msgs.size());
- TEST_ASSERT(msgs[0]->key_len() >= 1 && 0x30 == msgs[0]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[4]->key_len() >= 1 && 0x34 == msgs[4]->key()->c_str()[0],
- "Unexpected key");
- delete_messages(msgs);
- c->close();
- delete c;
-
- c = create_consumer(topic_name, "READ_UNCOMMITTED");
- msgs = consume_messages(c, topic_name, 0);
- TEST_ASSERT(msgs.size() == 10,
- "Consumed unexpected number of messages. "
- "Expected 10, got: %d",
- (int)msgs.size());
- TEST_ASSERT(msgs[0]->key_len() >= 1 && 0x10 == msgs[0]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[4]->key_len() >= 1 && 0x14 == msgs[4]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[5]->key_len() >= 1 && 0x30 == msgs[5]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[9]->key_len() >= 1 && 0x34 == msgs[9]->key()->c_str()[0],
- "Unexpected key");
- delete_messages(msgs);
-
- Test::delete_topic(c, topic_name.c_str());
-
- c->close();
- delete c;
-
-
- Test::Say(_C_BLU "Test 1 - mixed with non-transactional.\n" _C_CLR);
-
- topic_name = Test::mk_topic_name("0098-consumer_txn-1", 1);
- c = create_consumer(topic_name, "READ_COMMITTED");
- Test::create_topic(c, topic_name.c_str(), 1, 3);
- TestEventCb::topic = topic_name;
-
- run_producer("producer3, -1, 0x10, 5, None, DoFlush",
- "producer1, -1, 0x50, 5, BeginCommit, DoFlush",
- "producer1, -1, 0x80, 5, BeginAbort, DoFlush");
-
- msgs = consume_messages(c, topic_name, 0);
-
- TEST_ASSERT(TestEventCb::partition_0_ls_offset != -1 &&
- TestEventCb::partition_0_ls_offset ==
- TestEventCb::partition_0_hi_offset,
- "Expected hi_offset to equal ls_offset but "
- "got hi_offset: %" PRId64 ", ls_offset: %" PRId64,
- TestEventCb::partition_0_hi_offset,
- TestEventCb::partition_0_ls_offset);
-
- TEST_ASSERT(msgs.size() == 10,
- "Consumed unexpected number of messages. "
- "Expected 10, got: %d",
- (int)msgs.size());
- TEST_ASSERT(msgs[0]->key_len() >= 1 && 0x10 == msgs[0]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[4]->key_len() >= 1 && 0x14 == msgs[4]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[5]->key_len() >= 1 && 0x50 == msgs[5]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[9]->key_len() >= 1 && 0x54 == msgs[9]->key()->c_str()[0],
- "Unexpected key");
- delete_messages(msgs);
-
- Test::delete_topic(c, topic_name.c_str());
-
- c->close();
- delete c;
-
- Test::Say(_C_BLU "Test 1.1\n" _C_CLR);
-
- topic_name = Test::mk_topic_name("0098-consumer_txn-1.1", 1);
- c = create_consumer(topic_name, "READ_COMMITTED");
- Test::create_topic(c, topic_name.c_str(), 1, 3);
-
- run_producer("producer1, -1, 0x30, 5, BeginAbort, DoFlush",
- "producer3, -1, 0x40, 5, None, DoFlush",
- "producer1, -1, 0x60, 5, BeginCommit, DoFlush");
-
-
- msgs = consume_messages(c, topic_name, 0);
- TEST_ASSERT(msgs.size() == 10,
- "Consumed unexpected number of messages. "
- "Expected 10, got: %d",
- (int)msgs.size());
- TEST_ASSERT(msgs[0]->key_len() >= 1 && 0x40 == msgs[0]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[4]->key_len() >= 1 && 0x44 == msgs[4]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[5]->key_len() >= 1 && 0x60 == msgs[5]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[9]->key_len() >= 1 && 0x64 == msgs[9]->key()->c_str()[0],
- "Unexpected key");
- delete_messages(msgs);
-
- Test::delete_topic(c, topic_name.c_str());
-
- c->close();
- delete c;
-
-
- Test::Say(_C_BLU "Test 1.2\n" _C_CLR);
-
- topic_name = Test::mk_topic_name("0098-consumer_txn-1.2", 1);
- c = create_consumer(topic_name, "READ_COMMITTED");
- Test::create_topic(c, topic_name.c_str(), 1, 3);
-
- run_producer("producer1, -1, 0x10, 5, BeginCommit, DoFlush",
- "producer1, -1, 0x20, 5, BeginAbort, DoFlush",
- "producer3, -1, 0x30, 5, None, DoFlush");
-
- msgs = consume_messages(c, topic_name, 0);
- TEST_ASSERT(msgs.size() == 10,
- "Consumed unexpected number of messages. "
- "Expected 10, got: %d",
- (int)msgs.size());
- TEST_ASSERT(msgs[0]->key_len() >= 1 && 0x10 == msgs[0]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[4]->key_len() >= 1 && 0x14 == msgs[4]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[5]->key_len() >= 1 && 0x30 == msgs[5]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[9]->key_len() >= 1 && 0x34 == msgs[9]->key()->c_str()[0],
- "Unexpected key");
- delete_messages(msgs);
-
- Test::delete_topic(c, topic_name.c_str());
-
- c->close();
- delete c;
-
-
- Test::Say(_C_BLU "Test 2 - rapid abort / committing.\n" _C_CLR);
- // note: aborted records never seem to make it to the broker when not flushed.
-
- topic_name = Test::mk_topic_name("0098-consumer_txn-2", 1);
- c = create_consumer(topic_name, "READ_COMMITTED");
- Test::create_topic(c, topic_name.c_str(), 1, 3);
-
- run_producer("producer1, -1, 0x10, 1, BeginAbort, DontFlush",
- "producer1, -1, 0x20, 1, BeginCommit, DontFlush",
- "producer1, -1, 0x30, 1, BeginAbort, DontFlush",
- "producer1, -1, 0x40, 1, BeginCommit, DontFlush",
- "producer1, -1, 0x50, 1, BeginAbort, DontFlush",
- "producer1, -1, 0x60, 1, BeginCommit, DontFlush",
- "producer1, -1, 0x70, 1, BeginAbort, DontFlush",
- "producer1, -1, 0x80, 1, BeginCommit, DontFlush",
- "producer1, -1, 0x90, 1, BeginAbort, DontFlush",
- "producer1, -1, 0xa0, 1, BeginCommit, DoFlush",
- "producer3, -1, 0xb0, 1, None, DontFlush",
- "producer3, -1, 0xc0, 1, None, DoFlush");
-
- msgs = consume_messages(c, topic_name, 0);
- TEST_ASSERT(msgs.size() == 7,
- "Consumed unexpected number of messages. "
- "Expected 7, got: %d",
- (int)msgs.size());
- TEST_ASSERT(msgs[0]->key_len() >= 1 &&
- 0x20 == (unsigned char)msgs[0]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[1]->key_len() >= 1 &&
- 0x40 == (unsigned char)msgs[1]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[2]->key_len() >= 1 &&
- 0x60 == (unsigned char)msgs[2]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[3]->key_len() >= 1 &&
- 0x80 == (unsigned char)msgs[3]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[4]->key_len() >= 1 &&
- 0xa0 == (unsigned char)msgs[4]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[5]->key_len() >= 1 &&
- 0xb0 == (unsigned char)msgs[5]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[6]->key_len() >= 1 &&
- 0xc0 == (unsigned char)msgs[6]->key()->c_str()[0],
- "Unexpected key");
- delete_messages(msgs);
-
- Test::delete_topic(c, topic_name.c_str());
-
- c->close();
- delete c;
-
-
- Test::Say(_C_BLU "Test 2.1\n" _C_CLR);
-
- topic_name = Test::mk_topic_name("0098-consumer_txn-2.1", 1);
- c = create_consumer(topic_name, "READ_COMMITTED");
- Test::create_topic(c, topic_name.c_str(), 1, 3);
-
- run_producer("producer1, -1, 0x10, 1, BeginAbort, DoFlush",
- "producer1, -1, 0x20, 1, BeginCommit, DoFlush",
- "producer1, -1, 0x30, 1, BeginAbort, DoFlush",
- "producer1, -1, 0x40, 1, BeginCommit, DoFlush",
- "producer1, -1, 0x50, 1, BeginAbort, DoFlush",
- "producer1, -1, 0x60, 1, BeginCommit, DoFlush",
- "producer1, -1, 0x70, 1, BeginAbort, DoFlush",
- "producer1, -1, 0x80, 1, BeginCommit, DoFlush",
- "producer1, -1, 0x90, 1, BeginAbort, DoFlush",
- "producer1, -1, 0xa0, 1, BeginCommit, DoFlush",
- "producer3, -1, 0xb0, 1, None, DoFlush",
- "producer3, -1, 0xc0, 1, None, DoFlush");
-
- msgs = consume_messages(c, topic_name, 0);
- TEST_ASSERT(msgs.size() == 7,
- "Consumed unexpected number of messages. "
- "Expected 7, got: %d",
- (int)msgs.size());
- TEST_ASSERT(msgs[0]->key_len() >= 1 &&
- 0x20 == (unsigned char)msgs[0]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[1]->key_len() >= 1 &&
- 0x40 == (unsigned char)msgs[1]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[2]->key_len() >= 1 &&
- 0x60 == (unsigned char)msgs[2]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[3]->key_len() >= 1 &&
- 0x80 == (unsigned char)msgs[3]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[4]->key_len() >= 1 &&
- 0xa0 == (unsigned char)msgs[4]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[5]->key_len() >= 1 &&
- 0xb0 == (unsigned char)msgs[5]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[6]->key_len() >= 1 &&
- 0xc0 == (unsigned char)msgs[6]->key()->c_str()[0],
- "Unexpected key");
- delete_messages(msgs);
- c->close();
- delete c;
-
- c = create_consumer(topic_name, "READ_UNCOMMITTED");
- msgs = consume_messages(c, topic_name, 0);
- TEST_ASSERT(msgs.size() == 12,
- "Consumed unexpected number of messages. "
- "Expected 12, got: %d",
- (int)msgs.size());
- TEST_ASSERT(msgs[0]->key_len() >= 1 &&
- 0x10 == (unsigned char)msgs[0]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[1]->key_len() >= 1 &&
- 0x20 == (unsigned char)msgs[1]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[2]->key_len() >= 1 &&
- 0x30 == (unsigned char)msgs[2]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[3]->key_len() >= 1 &&
- 0x40 == (unsigned char)msgs[3]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[4]->key_len() >= 1 &&
- 0x50 == (unsigned char)msgs[4]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[5]->key_len() >= 1 &&
- 0x60 == (unsigned char)msgs[5]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[6]->key_len() >= 1 &&
- 0x70 == (unsigned char)msgs[6]->key()->c_str()[0],
- "Unexpected key");
- delete_messages(msgs);
-
- Test::delete_topic(c, topic_name.c_str());
-
- c->close();
- delete c;
-
-
- Test::Say(_C_BLU "Test 3 - cross partition (simple).\n" _C_CLR);
-
- topic_name = Test::mk_topic_name("0098-consumer_txn-3", 1);
- c = create_consumer(topic_name, "READ_COMMITTED");
- Test::create_topic(c, topic_name.c_str(), 2, 3);
-
- run_producer("producer1, 0, 0x10, 3, BeginOpen, DoFlush",
- "producer1, 1, 0x20, 3, ContinueOpen, DoFlush",
- "producer1, 0, 0x30, 3, ContinueCommit, DoFlush");
-
- msgs = consume_messages(c, topic_name, 0);
- TEST_ASSERT(msgs.size() == 6,
- "Consumed unexpected number of messages. "
- "Expected 6, got: %d",
- (int)msgs.size());
- delete_messages(msgs);
- msgs = consume_messages(c, topic_name, 1);
- TEST_ASSERT(msgs.size() == 3,
- "Consumed unexpected number of messages. "
- "Expected 3, got: %d",
- (int)msgs.size());
- delete_messages(msgs);
- c->close();
- delete c;
-
- c = create_consumer(topic_name, "READ_UNCOMMITTED");
- msgs = consume_messages(c, topic_name, 0);
- TEST_ASSERT(msgs.size() == 6,
- "Consumed unexpected number of messages. "
- "Expected 6, got: %d",
- (int)msgs.size());
- delete_messages(msgs);
- msgs = consume_messages(c, topic_name, 1);
- TEST_ASSERT(msgs.size() == 3,
- "Consumed unexpected number of messages. "
- "Expected 3, got: %d",
- (int)msgs.size());
- delete_messages(msgs);
-
- Test::delete_topic(c, topic_name.c_str());
-
- c->close();
- delete c;
-
-
- Test::Say(_C_BLU "Test 3.1\n" _C_CLR);
-
- topic_name = Test::mk_topic_name("0098-consumer_txn-3.1", 1);
- c = create_consumer(topic_name, "READ_COMMITTED");
- Test::create_topic(c, topic_name.c_str(), 2, 3);
-
- run_producer("producer1, 0, 0x55, 1, BeginCommit, DoFlush",
- "producer1, 0, 0x10, 3, BeginOpen, DoFlush",
- "producer1, 1, 0x20, 3, ContinueOpen, DoFlush",
- "producer1, 0, 0x30, 3, ContinueAbort, DoFlush",
- "producer3, 0, 0x00, 1, None, DoFlush",
- "producer1, 1, 0x44, 1, BeginCommit, DoFlush");
-
- msgs = consume_messages(c, topic_name, 0);
- TEST_ASSERT(msgs.size() == 2,
- "Consumed unexpected number of messages. "
- "Expected 2, got: %d",
- (int)msgs.size());
- TEST_ASSERT(msgs[0]->key_len() >= 1 &&
- 0x55 == (unsigned char)msgs[0]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[1]->key_len() >= 1 &&
- 0x00 == (unsigned char)msgs[1]->key()->c_str()[0],
- "Unexpected key");
- delete_messages(msgs);
- msgs = consume_messages(c, topic_name, 1);
- TEST_ASSERT(msgs.size() == 1,
- "Consumed unexpected number of messages. "
- "Expected 1, got: %d",
- (int)msgs.size());
- TEST_ASSERT(msgs[0]->key_len() >= 1 &&
- 0x44 == (unsigned char)msgs[0]->key()->c_str()[0],
- "Unexpected key");
- delete_messages(msgs);
-
- Test::delete_topic(c, topic_name.c_str());
-
- c->close();
- delete c;
-
-
- Test::Say(_C_BLU "Test 4 - simultaneous transactions (simple).\n" _C_CLR);
-
- topic_name = Test::mk_topic_name("0098-consumer_txn-4", 1);
- c = create_consumer(topic_name, "READ_COMMITTED");
- Test::create_topic(c, topic_name.c_str(), 1, 3);
-
- run_producer("producer3, 0, 0x10, 1, None, DoFlush",
- "producer1, 0, 0x20, 3, BeginOpen, DoFlush",
- "producer2, 0, 0x30, 3, BeginOpen, DoFlush",
- "producer1, 0, 0x40, 3, ContinueCommit, DoFlush",
- "producer2, 0, 0x50, 3, ContinueAbort, DoFlush");
-
- msgs = consume_messages(c, topic_name, 0);
- TEST_ASSERT(msgs.size() == 7,
- "Consumed unexpected number of messages. "
- "Expected 7, got: %d",
- (int)msgs.size());
- delete_messages(msgs);
- c->close();
- delete c;
-
- c = create_consumer(topic_name, "READ_UNCOMMITTED");
- msgs = consume_messages(c, topic_name, 0);
- TEST_ASSERT(msgs.size() == 13,
- "Consumed unexpected number of messages. "
- "Expected 13, got: %d",
- (int)msgs.size());
- delete_messages(msgs);
-
- Test::delete_topic(c, topic_name.c_str());
-
- c->close();
- delete c;
-
-
- Test::Say(_C_BLU "Test 4.1\n" _C_CLR);
-
- topic_name = Test::mk_topic_name("0098-consumer_txn-4.1", 1);
- c = create_consumer(topic_name, "READ_COMMITTED");
- Test::create_topic(c, topic_name.c_str(), 1, 3);
-
- run_producer("producer3, 0, 0x10, 1, None, DoFlush",
- "producer1, 0, 0x20, 3, BeginOpen, DoFlush",
- "producer2, 0, 0x30, 3, BeginOpen, DoFlush",
- "producer1, 0, 0x40, 3, ContinueAbort, DoFlush",
- "producer2, 0, 0x50, 3, ContinueCommit, DoFlush");
-
- msgs = consume_messages(c, topic_name, 0);
- TEST_ASSERT(msgs.size() == 7,
- "Consumed unexpected number of messages. "
- "Expected 7, got: %d",
- (int)msgs.size());
- delete_messages(msgs);
- c->close();
- delete c;
-
- c = create_consumer(topic_name, "READ_UNCOMMITTED");
- msgs = consume_messages(c, topic_name, 0);
- TEST_ASSERT(msgs.size() == 13,
- "Consumed unexpected number of messages. "
- "Expected 13, got: %d",
- (int)msgs.size());
- delete_messages(msgs);
-
- Test::delete_topic(c, topic_name.c_str());
-
- c->close();
- delete c;
-
-
- Test::Say(_C_BLU "Test 4.2\n" _C_CLR);
-
- topic_name = Test::mk_topic_name("0098-consumer_txn-4.2", 1);
- c = create_consumer(topic_name, "READ_COMMITTED");
- Test::create_topic(c, topic_name.c_str(), 1, 3);
-
- run_producer("producer3, 0, 0x10, 1, None, DoFlush",
- "producer1, 0, 0x20, 3, BeginOpen, DoFlush",
- "producer2, 0, 0x30, 3, BeginOpen, DoFlush",
- "producer1, 0, 0x40, 3, ContinueCommit, DoFlush",
- "producer2, 0, 0x50, 3, ContinueCommit, DoFlush");
-
- msgs = consume_messages(c, topic_name, 0);
- TEST_ASSERT(msgs.size() == 13,
- "Consumed unexpected number of messages. "
- "Expected 7, got: %d",
- (int)msgs.size());
- delete_messages(msgs);
- c->close();
- delete c;
-
- c = create_consumer(topic_name, "READ_UNCOMMITTED");
- msgs = consume_messages(c, topic_name, 0);
- TEST_ASSERT(msgs.size() == 13,
- "Consumed unexpected number of messages. "
- "Expected 13, got: %d",
- (int)msgs.size());
- delete_messages(msgs);
-
- Test::delete_topic(c, topic_name.c_str());
-
- c->close();
- delete c;
-
-
- Test::Say(_C_BLU "Test 4.3\n" _C_CLR);
-
- topic_name = Test::mk_topic_name("0098-consumer_txn-4.3", 1);
- c = create_consumer(topic_name, "READ_COMMITTED");
- Test::create_topic(c, topic_name.c_str(), 1, 3);
-
- run_producer("producer3, 0, 0x10, 1, None, DoFlush",
- "producer1, 0, 0x20, 3, BeginOpen, DoFlush",
- "producer2, 0, 0x30, 3, BeginOpen, DoFlush",
- "producer1, 0, 0x40, 3, ContinueAbort, DoFlush",
- "producer2, 0, 0x50, 3, ContinueAbort, DoFlush");
-
- msgs = consume_messages(c, topic_name, 0);
- TEST_ASSERT(msgs.size() == 1,
- "Consumed unexpected number of messages. "
- "Expected 7, got: %d",
- (int)msgs.size());
- delete_messages(msgs);
- c->close();
- delete c;
-
- c = create_consumer(topic_name, "READ_UNCOMMITTED");
- msgs = consume_messages(c, topic_name, 0);
- TEST_ASSERT(msgs.size() == 13,
- "Consumed unexpected number of messages. "
- "Expected 13, got: %d",
- (int)msgs.size());
- delete_messages(msgs);
-
- Test::delete_topic(c, topic_name.c_str());
-
- c->close();
- delete c;
-
-
-
- Test::Say(_C_BLU "Test 5 - split transaction across message sets.\n" _C_CLR);
-
-test5:
- topic_name = Test::mk_topic_name("0098-consumer_txn-5", 1);
- c = create_consumer(topic_name, "READ_COMMITTED");
- Test::create_topic(c, topic_name.c_str(), 1, 3);
-
- run_producer("producer1, 0, 0x10, 2, BeginOpen, DontFlush", "sleep,200",
- "producer1, 0, 0x20, 2, ContinueAbort, DontFlush",
- "producer1, 0, 0x30, 2, BeginOpen, DontFlush", "sleep,200",
- "producer1, 0, 0x40, 2, ContinueCommit, DontFlush",
- "producer1, 0, 0x50, 2, BeginOpen, DontFlush", "sleep,200",
- "producer1, 0, 0x60, 2, ContinueAbort, DontFlush",
- "producer1, 0, 0xa0, 2, BeginOpen, DontFlush", "sleep,200",
- "producer1, 0, 0xb0, 2, ContinueCommit, DontFlush",
- "producer3, 0, 0x70, 1, None, DoFlush");
-
- msgs = consume_messages(c, topic_name, 0);
- TEST_ASSERT(msgs.size() == 9,
- "Consumed unexpected number of messages. "
- "Expected 9, got: %d",
- (int)msgs.size());
- TEST_ASSERT(msgs[0]->key_len() >= 1 &&
- 0x30 == (unsigned char)msgs[0]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[1]->key_len() >= 1 &&
- 0x31 == (unsigned char)msgs[1]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[2]->key_len() >= 1 &&
- 0x40 == (unsigned char)msgs[2]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[3]->key_len() >= 1 &&
- 0x41 == (unsigned char)msgs[3]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[4]->key_len() >= 1 &&
- 0xa0 == (unsigned char)msgs[4]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[5]->key_len() >= 1 &&
- 0xa1 == (unsigned char)msgs[5]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[6]->key_len() >= 1 &&
- 0xb0 == (unsigned char)msgs[6]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[7]->key_len() >= 1 &&
- 0xb1 == (unsigned char)msgs[7]->key()->c_str()[0],
- "Unexpected key");
- TEST_ASSERT(msgs[8]->key_len() >= 1 &&
- 0x70 == (unsigned char)msgs[8]->key()->c_str()[0],
- "Unexpected key");
- delete_messages(msgs);
-
- Test::delete_topic(c, topic_name.c_str());
-
- c->close();
- delete c;
-
-
- Test::Say(_C_BLU "Test 6 - transaction left open\n" _C_CLR);
-
- topic_name = Test::mk_topic_name("0098-consumer_txn-0", 1);
- c = create_consumer(topic_name, "READ_COMMITTED");
- Test::create_topic(c, topic_name.c_str(), 1, 3);
- TestEventCb::topic = topic_name;
-
- run_producer("producer3, 0, 0x10, 1, None, DoFlush",
- "producer1, 0, 0x20, 3, BeginOpen, DoFlush",
- // prevent abort control message from being written.
- "exit,0");
-
- msgs = consume_messages(c, topic_name, 0);
- TEST_ASSERT(msgs.size() == 1,
- "Consumed unexpected number of messages. "
- "Expected 1, got: %d",
- (int)msgs.size());
-
- TEST_ASSERT(TestEventCb::partition_0_ls_offset + 3 ==
- TestEventCb::partition_0_hi_offset,
- "Expected hi_offset to be 3 greater than ls_offset "
- "but got hi_offset: %" PRId64 ", ls_offset: %" PRId64,
- TestEventCb::partition_0_hi_offset,
- TestEventCb::partition_0_ls_offset);
-
- delete_messages(msgs);
-
- Test::delete_topic(c, topic_name.c_str());
-
- c->close();
- delete c;
-}
-#endif
-
-
-extern "C" {
-int main_0098_consumer_txn(int argc, char **argv) {
- if (test_needs_auth()) {
- Test::Skip(
- "Authentication or security configuration "
- "required on client: not supported in "
- "Java transactional producer: skipping tests\n");
- return 0;
- }
-#if WITH_RAPIDJSON
- do_test_consumer_txn_test(true /* with java producer */);
- do_test_consumer_txn_test(false /* with librdkafka producer */);
-#else
- Test::Skip("RapidJSON >=1.1.0 not available\n");
-#endif
- return 0;
-}
-}