diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/examples/rdkafka_performance.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/examples/rdkafka_performance.c | 1780 |
1 files changed, 1780 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/examples/rdkafka_performance.c b/fluent-bit/lib/librdkafka-2.1.0/examples/rdkafka_performance.c new file mode 100644 index 00000000..a12bb747 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/examples/rdkafka_performance.c @@ -0,0 +1,1780 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2012, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/** + * Apache Kafka consumer & producer performance tester + * using the Kafka driver from librdkafka + * (https://github.com/edenhill/librdkafka) + */ + +#ifdef _MSC_VER +#define _CRT_SECURE_NO_WARNINGS /* Silence nonsense on MSVC */ +#endif + +#include "../src/rd.h" + +#define _GNU_SOURCE /* for strndup() */ +#include <ctype.h> +#include <signal.h> +#include <string.h> +#include <errno.h> + +/* Typical include path would be <librdkafka/rdkafka.h>, but this program + * is built from within the librdkafka source tree and thus differs. */ +#include "rdkafka.h" /* for Kafka driver */ +/* Do not include these defines from your program, they will not be + * provided by librdkafka. */ +#include "rd.h" +#include "rdtime.h" + +#ifdef _WIN32 +#include "../win32/wingetopt.h" +#include "../win32/wintime.h" +#endif + + +static volatile sig_atomic_t run = 1; +static int forever = 1; +static rd_ts_t dispintvl = 1000; +static int do_seq = 0; +static int exit_after = 0; +static int exit_eof = 0; +static FILE *stats_fp; +static int dr_disp_div; +static int verbosity = 1; +static int latency_mode = 0; +static FILE *latency_fp = NULL; +static int msgcnt = -1; +static int incremental_mode = 0; +static int partition_cnt = 0; +static int eof_cnt = 0; +static int with_dr = 1; +static int read_hdrs = 0; + + +static void stop(int sig) { + if (!run) + exit(0); + run = 0; +} + +static long int msgs_wait_cnt = 0; +static long int msgs_wait_produce_cnt = 0; +static rd_ts_t t_end; +static rd_kafka_t *global_rk; + +struct avg { + int64_t val; + int cnt; + uint64_t ts_start; +}; + +static struct { + rd_ts_t t_start; + rd_ts_t t_end; + rd_ts_t t_end_send; + uint64_t msgs; + uint64_t msgs_last; + uint64_t msgs_dr_ok; + uint64_t msgs_dr_err; + uint64_t bytes_dr_ok; + uint64_t bytes; + uint64_t bytes_last; + uint64_t tx; + uint64_t tx_err; + uint64_t avg_rtt; + uint64_t offset; + rd_ts_t t_fetch_latency; + rd_ts_t t_last; + rd_ts_t t_enobufs_last; + rd_ts_t t_total; + rd_ts_t latency_last; + rd_ts_t latency_lo; + rd_ts_t latency_hi; + rd_ts_t latency_sum; + int latency_cnt; + int64_t last_offset; +} cnt; + + +uint64_t wall_clock(void) { + struct timeval tv; + gettimeofday(&tv, NULL); + return ((uint64_t)tv.tv_sec * 1000000LLU) + ((uint64_t)tv.tv_usec); +} + +static void err_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque) { + if (err == RD_KAFKA_RESP_ERR__FATAL) { + char errstr[512]; + err = rd_kafka_fatal_error(rk, errstr, sizeof(errstr)); + printf("%% FATAL ERROR CALLBACK: %s: %s: %s\n", + rd_kafka_name(rk), rd_kafka_err2str(err), errstr); + } else { + printf("%% ERROR CALLBACK: %s: %s: %s\n", rd_kafka_name(rk), + rd_kafka_err2str(err), reason); + } +} + +static void throttle_cb(rd_kafka_t *rk, + const char *broker_name, + int32_t broker_id, + int throttle_time_ms, + void *opaque) { + printf("%% THROTTLED %dms by %s (%" PRId32 ")\n", throttle_time_ms, + broker_name, broker_id); +} + +static void offset_commit_cb(rd_kafka_t *rk, + rd_kafka_resp_err_t err, + rd_kafka_topic_partition_list_t *offsets, + void *opaque) { + int i; + + if (err || verbosity >= 2) + printf("%% Offset commit of %d partition(s): %s\n", + offsets->cnt, rd_kafka_err2str(err)); + + for (i = 0; i < offsets->cnt; i++) { + rd_kafka_topic_partition_t *rktpar = &offsets->elems[i]; + if (rktpar->err || verbosity >= 2) + printf("%% %s [%" PRId32 "] @ %" PRId64 ": %s\n", + rktpar->topic, rktpar->partition, rktpar->offset, + rd_kafka_err2str(err)); + } +} + +/** + * @brief Add latency measurement + */ +static void latency_add(int64_t ts, const char *who) { + if (ts > cnt.latency_hi) + cnt.latency_hi = ts; + if (!cnt.latency_lo || ts < cnt.latency_lo) + cnt.latency_lo = ts; + cnt.latency_last = ts; + cnt.latency_cnt++; + cnt.latency_sum += ts; + if (latency_fp) + fprintf(latency_fp, "%" PRIu64 "\n", ts); +} + + +static void msg_delivered(rd_kafka_t *rk, + const rd_kafka_message_t *rkmessage, + void *opaque) { + static rd_ts_t last; + rd_ts_t now = rd_clock(); + static int msgs; + + msgs++; + + msgs_wait_cnt--; + + if (rkmessage->err) + cnt.msgs_dr_err++; + else { + cnt.msgs_dr_ok++; + cnt.bytes_dr_ok += rkmessage->len; + } + + if (latency_mode) { + /* Extract latency */ + int64_t source_ts; + if (sscanf(rkmessage->payload, "LATENCY:%" SCNd64, + &source_ts) == 1) + latency_add(wall_clock() - source_ts, "producer"); + } + + + if ((rkmessage->err && (cnt.msgs_dr_err < 50 || + !(cnt.msgs_dr_err % (dispintvl / 1000)))) || + !last || msgs_wait_cnt < 5 || !(msgs_wait_cnt % dr_disp_div) || + (now - last) >= dispintvl * 1000 || verbosity >= 3) { + if (rkmessage->err && verbosity >= 2) + printf("%% Message delivery failed (broker %" PRId32 + "): " + "%s [%" PRId32 + "]: " + "%s (%li remain)\n", + rd_kafka_message_broker_id(rkmessage), + rd_kafka_topic_name(rkmessage->rkt), + rkmessage->partition, + rd_kafka_err2str(rkmessage->err), msgs_wait_cnt); + else if (verbosity > 2) + printf("%% Message delivered (offset %" PRId64 + ", broker %" PRId32 + "): " + "%li remain\n", + rkmessage->offset, + rd_kafka_message_broker_id(rkmessage), + msgs_wait_cnt); + if (verbosity >= 3 && do_seq) + printf(" --> \"%.*s\"\n", (int)rkmessage->len, + (const char *)rkmessage->payload); + last = now; + } + + cnt.last_offset = rkmessage->offset; + + if (msgs_wait_produce_cnt == 0 && msgs_wait_cnt == 0 && !forever) { + if (verbosity >= 2 && cnt.msgs > 0) { + double error_percent = + (double)(cnt.msgs - cnt.msgs_dr_ok) / cnt.msgs * + 100; + printf( + "%% Messages delivered with failure " + "percentage of %.5f%%\n", + error_percent); + } + t_end = rd_clock(); + run = 0; + } + + if (exit_after && exit_after <= msgs) { + printf("%% Hard exit after %i messages, as requested\n", + exit_after); + exit(0); + } +} + + +static void msg_consume(rd_kafka_message_t *rkmessage, void *opaque) { + + if (rkmessage->err) { + if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { + cnt.offset = rkmessage->offset; + + if (verbosity >= 1) + printf( + "%% Consumer reached end of " + "%s [%" PRId32 + "] " + "message queue at offset %" PRId64 "\n", + rd_kafka_topic_name(rkmessage->rkt), + rkmessage->partition, rkmessage->offset); + + if (exit_eof && ++eof_cnt == partition_cnt) + run = 0; + + return; + } + + printf("%% Consume error for topic \"%s\" [%" PRId32 + "] " + "offset %" PRId64 ": %s\n", + rkmessage->rkt ? rd_kafka_topic_name(rkmessage->rkt) + : "", + rkmessage->partition, rkmessage->offset, + rd_kafka_message_errstr(rkmessage)); + + if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION || + rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC) + run = 0; + + cnt.msgs_dr_err++; + return; + } + + /* Start measuring from first message received */ + if (!cnt.t_start) + cnt.t_start = cnt.t_last = rd_clock(); + + cnt.offset = rkmessage->offset; + cnt.msgs++; + cnt.bytes += rkmessage->len; + + if (verbosity >= 3 || (verbosity >= 2 && !(cnt.msgs % 1000000))) + printf("@%" PRId64 ": %.*s: %.*s\n", rkmessage->offset, + (int)rkmessage->key_len, (char *)rkmessage->key, + (int)rkmessage->len, (char *)rkmessage->payload); + + + if (latency_mode) { + int64_t remote_ts, ts; + + if (rkmessage->len > 8 && + !memcmp(rkmessage->payload, "LATENCY:", 8) && + sscanf(rkmessage->payload, "LATENCY:%" SCNd64, + &remote_ts) == 1) { + ts = wall_clock() - remote_ts; + if (ts > 0 && ts < (1000000 * 60 * 5)) { + latency_add(ts, "consumer"); + } else { + if (verbosity >= 1) + printf( + "Received latency timestamp is too " + "far off: %" PRId64 + "us (message offset %" PRId64 + "): ignored\n", + ts, rkmessage->offset); + } + } else if (verbosity > 1) + printf("not a LATENCY payload: %.*s\n", + (int)rkmessage->len, (char *)rkmessage->payload); + } + + if (read_hdrs) { + rd_kafka_headers_t *hdrs; + /* Force parsing of headers but don't do anything with them. */ + rd_kafka_message_headers(rkmessage, &hdrs); + } + + if (msgcnt != -1 && (int)cnt.msgs >= msgcnt) + run = 0; +} + + +static void rebalance_cb(rd_kafka_t *rk, + rd_kafka_resp_err_t err, + rd_kafka_topic_partition_list_t *partitions, + void *opaque) { + rd_kafka_error_t *error = NULL; + rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR; + + if (exit_eof && !strcmp(rd_kafka_rebalance_protocol(rk), "COOPERATIVE")) + fprintf(stderr, + "%% This example has not been modified to " + "support -e (exit on EOF) when " + "partition.assignment.strategy " + "is set to an incremental/cooperative strategy: " + "-e will not behave as expected\n"); + + switch (err) { + case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: + fprintf(stderr, + "%% Group rebalanced (%s): " + "%d new partition(s) assigned\n", + rd_kafka_rebalance_protocol(rk), partitions->cnt); + + if (!strcmp(rd_kafka_rebalance_protocol(rk), "COOPERATIVE")) { + error = rd_kafka_incremental_assign(rk, partitions); + } else { + ret_err = rd_kafka_assign(rk, partitions); + eof_cnt = 0; + } + + partition_cnt += partitions->cnt; + break; + + case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: + fprintf(stderr, + "%% Group rebalanced (%s): %d partition(s) revoked\n", + rd_kafka_rebalance_protocol(rk), partitions->cnt); + + if (!strcmp(rd_kafka_rebalance_protocol(rk), "COOPERATIVE")) { + error = rd_kafka_incremental_unassign(rk, partitions); + partition_cnt -= partitions->cnt; + } else { + ret_err = rd_kafka_assign(rk, NULL); + partition_cnt = 0; + } + + eof_cnt = 0; /* FIXME: Not correct for incremental case */ + break; + + default: + break; + } + + if (error) { + fprintf(stderr, "%% incremental assign failure: %s\n", + rd_kafka_error_string(error)); + rd_kafka_error_destroy(error); + } else if (ret_err) { + fprintf(stderr, "%% assign failure: %s\n", + rd_kafka_err2str(ret_err)); + } +} + + +/** + * Find and extract single value from a two-level search. + * First find 'field1', then find 'field2' and extract its value. + * Returns 0 on miss else the value. + */ +static uint64_t json_parse_fields(const char *json, + const char **end, + const char *field1, + const char *field2) { + const char *t = json; + const char *t2; + int len1 = (int)strlen(field1); + int len2 = (int)strlen(field2); + + while ((t2 = strstr(t, field1))) { + uint64_t v; + + t = t2; + t += len1; + + /* Find field */ + if (!(t2 = strstr(t, field2))) + continue; + t2 += len2; + + while (isspace((int)*t2)) + t2++; + + v = strtoull(t2, (char **)&t, 10); + if (t2 == t) + continue; + + *end = t; + return v; + } + + *end = t + strlen(t); + return 0; +} + +/** + * Parse various values from rdkafka stats + */ +static void json_parse_stats(const char *json) { + const char *t; +#define MAX_AVGS 100 /* max number of brokers to scan for rtt */ + uint64_t avg_rtt[MAX_AVGS + 1]; + int avg_rtt_i = 0; + + /* Store totals at end of array */ + avg_rtt[MAX_AVGS] = 0; + + /* Extract all broker RTTs */ + t = json; + while (avg_rtt_i < MAX_AVGS && *t) { + avg_rtt[avg_rtt_i] = + json_parse_fields(t, &t, "\"rtt\":", "\"avg\":"); + + /* Skip low RTT values, means no messages are passing */ + if (avg_rtt[avg_rtt_i] < 100 /*0.1ms*/) + continue; + + + avg_rtt[MAX_AVGS] += avg_rtt[avg_rtt_i]; + avg_rtt_i++; + } + + if (avg_rtt_i > 0) + avg_rtt[MAX_AVGS] /= avg_rtt_i; + + cnt.avg_rtt = avg_rtt[MAX_AVGS]; +} + + +static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) { + + /* Extract values for our own stats */ + json_parse_stats(json); + + if (stats_fp) + fprintf(stats_fp, "%s\n", json); + return 0; +} + +#define _OTYPE_TAB 0x1 /* tabular format */ +#define _OTYPE_SUMMARY 0x2 /* summary format */ +#define _OTYPE_FORCE 0x4 /* force output regardless of interval timing */ +static void +print_stats(rd_kafka_t *rk, int mode, int otype, const char *compression) { + rd_ts_t now = rd_clock(); + rd_ts_t t_total; + static int rows_written = 0; + int print_header; + double latency_avg = 0.0f; + char extra[512]; + int extra_of = 0; + *extra = '\0'; + + if (!(otype & _OTYPE_FORCE) && + (((otype & _OTYPE_SUMMARY) && verbosity == 0) || + cnt.t_last + dispintvl > now)) + return; + + print_header = !rows_written || (verbosity > 0 && !(rows_written % 20)); + + if (cnt.t_end_send) + t_total = cnt.t_end_send - cnt.t_start; + else if (cnt.t_end) + t_total = cnt.t_end - cnt.t_start; + else if (cnt.t_start) + t_total = now - cnt.t_start; + else + t_total = 1; + + if (latency_mode && cnt.latency_cnt) + latency_avg = (double)cnt.latency_sum / (double)cnt.latency_cnt; + + if (mode == 'P') { + + if (otype & _OTYPE_TAB) { +#define ROW_START() \ + do { \ + } while (0) +#define COL_HDR(NAME) printf("| %10.10s ", (NAME)) +#define COL_PR64(NAME, VAL) printf("| %10" PRIu64 " ", (VAL)) +#define COL_PRF(NAME, VAL) printf("| %10.2f ", (VAL)) +#define ROW_END() \ + do { \ + printf("\n"); \ + rows_written++; \ + } while (0) + + if (print_header) { + /* First time, print header */ + ROW_START(); + COL_HDR("elapsed"); + COL_HDR("msgs"); + COL_HDR("bytes"); + COL_HDR("rtt"); + COL_HDR("dr"); + COL_HDR("dr_m/s"); + COL_HDR("dr_MB/s"); + COL_HDR("dr_err"); + COL_HDR("tx_err"); + COL_HDR("outq"); + COL_HDR("offset"); + if (latency_mode) { + COL_HDR("lat_curr"); + COL_HDR("lat_avg"); + COL_HDR("lat_lo"); + COL_HDR("lat_hi"); + } + + ROW_END(); + } + + ROW_START(); + COL_PR64("elapsed", t_total / 1000); + COL_PR64("msgs", cnt.msgs); + COL_PR64("bytes", cnt.bytes); + COL_PR64("rtt", cnt.avg_rtt / 1000); + COL_PR64("dr", cnt.msgs_dr_ok); + COL_PR64("dr_m/s", + ((cnt.msgs_dr_ok * 1000000) / t_total)); + COL_PRF("dr_MB/s", + (float)((cnt.bytes_dr_ok) / (float)t_total)); + COL_PR64("dr_err", cnt.msgs_dr_err); + COL_PR64("tx_err", cnt.tx_err); + COL_PR64("outq", + rk ? (uint64_t)rd_kafka_outq_len(rk) : 0); + COL_PR64("offset", (uint64_t)cnt.last_offset); + if (latency_mode) { + COL_PRF("lat_curr", cnt.latency_last / 1000.0f); + COL_PRF("lat_avg", latency_avg / 1000.0f); + COL_PRF("lat_lo", cnt.latency_lo / 1000.0f); + COL_PRF("lat_hi", cnt.latency_hi / 1000.0f); + } + ROW_END(); + } + + if (otype & _OTYPE_SUMMARY) { + printf("%% %" PRIu64 + " messages produced " + "(%" PRIu64 + " bytes), " + "%" PRIu64 + " delivered " + "(offset %" PRId64 ", %" PRIu64 + " failed) " + "in %" PRIu64 "ms: %" PRIu64 + " msgs/s and " + "%.02f MB/s, " + "%" PRIu64 + " produce failures, %i in queue, " + "%s compression\n", + cnt.msgs, cnt.bytes, cnt.msgs_dr_ok, + cnt.last_offset, cnt.msgs_dr_err, t_total / 1000, + ((cnt.msgs_dr_ok * 1000000) / t_total), + (float)((cnt.bytes_dr_ok) / (float)t_total), + cnt.tx_err, rk ? rd_kafka_outq_len(rk) : 0, + compression); + } + + } else { + + if (otype & _OTYPE_TAB) { + if (print_header) { + /* First time, print header */ + ROW_START(); + COL_HDR("elapsed"); + COL_HDR("msgs"); + COL_HDR("bytes"); + COL_HDR("rtt"); + COL_HDR("m/s"); + COL_HDR("MB/s"); + COL_HDR("rx_err"); + COL_HDR("offset"); + if (latency_mode) { + COL_HDR("lat_curr"); + COL_HDR("lat_avg"); + COL_HDR("lat_lo"); + COL_HDR("lat_hi"); + } + ROW_END(); + } + + ROW_START(); + COL_PR64("elapsed", t_total / 1000); + COL_PR64("msgs", cnt.msgs); + COL_PR64("bytes", cnt.bytes); + COL_PR64("rtt", cnt.avg_rtt / 1000); + COL_PR64("m/s", ((cnt.msgs * 1000000) / t_total)); + COL_PRF("MB/s", (float)((cnt.bytes) / (float)t_total)); + COL_PR64("rx_err", cnt.msgs_dr_err); + COL_PR64("offset", cnt.offset); + if (latency_mode) { + COL_PRF("lat_curr", cnt.latency_last / 1000.0f); + COL_PRF("lat_avg", latency_avg / 1000.0f); + COL_PRF("lat_lo", cnt.latency_lo / 1000.0f); + COL_PRF("lat_hi", cnt.latency_hi / 1000.0f); + } + ROW_END(); + } + + if (otype & _OTYPE_SUMMARY) { + if (latency_avg >= 1.0f) + extra_of += rd_snprintf( + extra + extra_of, sizeof(extra) - extra_of, + ", latency " + "curr/avg/lo/hi " + "%.2f/%.2f/%.2f/%.2fms", + cnt.latency_last / 1000.0f, + latency_avg / 1000.0f, + cnt.latency_lo / 1000.0f, + cnt.latency_hi / 1000.0f); + printf("%% %" PRIu64 " messages (%" PRIu64 + " bytes) " + "consumed in %" PRIu64 "ms: %" PRIu64 + " msgs/s " + "(%.02f MB/s)" + "%s\n", + cnt.msgs, cnt.bytes, t_total / 1000, + ((cnt.msgs * 1000000) / t_total), + (float)((cnt.bytes) / (float)t_total), extra); + } + + if (incremental_mode && now > cnt.t_last) { + uint64_t i_msgs = cnt.msgs - cnt.msgs_last; + uint64_t i_bytes = cnt.bytes - cnt.bytes_last; + uint64_t i_time = cnt.t_last ? now - cnt.t_last : 0; + + printf("%% INTERVAL: %" PRIu64 + " messages " + "(%" PRIu64 + " bytes) " + "consumed in %" PRIu64 "ms: %" PRIu64 + " msgs/s " + "(%.02f MB/s)" + "%s\n", + i_msgs, i_bytes, i_time / 1000, + ((i_msgs * 1000000) / i_time), + (float)((i_bytes) / (float)i_time), extra); + } + } + + cnt.t_last = now; + cnt.msgs_last = cnt.msgs; + cnt.bytes_last = cnt.bytes; +} + + +static void sig_usr1(int sig) { + rd_kafka_dump(stdout, global_rk); +} + + +/** + * @brief Read config from file + * @returns -1 on error, else 0. + */ +static int read_conf_file(rd_kafka_conf_t *conf, const char *path) { + FILE *fp; + char buf[512]; + int line = 0; + char errstr[512]; + + if (!(fp = fopen(path, "r"))) { + fprintf(stderr, "%% Failed to open %s: %s\n", path, + strerror(errno)); + return -1; + } + + while (fgets(buf, sizeof(buf), fp)) { + char *s = buf; + char *t; + rd_kafka_conf_res_t r = RD_KAFKA_CONF_UNKNOWN; + + line++; + + while (isspace((int)*s)) + s++; + + if (!*s || *s == '#') + continue; + + if ((t = strchr(buf, '\n'))) + *t = '\0'; + + t = strchr(buf, '='); + if (!t || t == s || !*(t + 1)) { + fprintf(stderr, "%% %s:%d: expected key=value\n", path, + line); + fclose(fp); + return -1; + } + + *(t++) = '\0'; + + /* Try global config */ + r = rd_kafka_conf_set(conf, s, t, errstr, sizeof(errstr)); + + if (r == RD_KAFKA_CONF_OK) + continue; + + fprintf(stderr, "%% %s:%d: %s=%s: %s\n", path, line, s, t, + errstr); + fclose(fp); + return -1; + } + + fclose(fp); + + return 0; +} + + +static rd_kafka_resp_err_t do_produce(rd_kafka_t *rk, + rd_kafka_topic_t *rkt, + int32_t partition, + int msgflags, + void *payload, + size_t size, + const void *key, + size_t key_size, + const rd_kafka_headers_t *hdrs) { + + /* Send/Produce message. */ + if (hdrs) { + rd_kafka_headers_t *hdrs_copy; + rd_kafka_resp_err_t err; + + hdrs_copy = rd_kafka_headers_copy(hdrs); + + err = rd_kafka_producev( + rk, RD_KAFKA_V_RKT(rkt), RD_KAFKA_V_PARTITION(partition), + RD_KAFKA_V_MSGFLAGS(msgflags), + RD_KAFKA_V_VALUE(payload, size), + RD_KAFKA_V_KEY(key, key_size), + RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_END); + + if (err) + rd_kafka_headers_destroy(hdrs_copy); + + return err; + + } else { + if (rd_kafka_produce(rkt, partition, msgflags, payload, size, + key, key_size, NULL) == -1) + return rd_kafka_last_error(); + } + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + +/** + * @brief Sleep for \p sleep_us microseconds. + */ +static void do_sleep(int sleep_us) { + if (sleep_us > 100) { +#ifdef _WIN32 + Sleep(sleep_us / 1000); +#else + usleep(sleep_us); +#endif + } else { + rd_ts_t next = rd_clock() + (rd_ts_t)sleep_us; + while (next > rd_clock()) + ; + } +} + + +int main(int argc, char **argv) { + char *brokers = NULL; + char mode = 'C'; + char *topic = NULL; + const char *key = NULL; + int *partitions = NULL; + int opt; + int sendflags = 0; + char *msgpattern = "librdkafka_performance testing!"; + int msgsize = -1; + const char *debug = NULL; + int do_conf_dump = 0; + rd_ts_t now; + char errstr[512]; + uint64_t seq = 0; + int seed = (int)time(NULL); + rd_kafka_t *rk; + rd_kafka_topic_t *rkt; + rd_kafka_conf_t *conf; + rd_kafka_queue_t *rkqu = NULL; + const char *compression = "no"; + int64_t start_offset = 0; + int batch_size = 0; + int idle = 0; + const char *stats_cmd = NULL; + char *stats_intvlstr = NULL; + char tmp[128]; + char *tmp2; + int otype = _OTYPE_SUMMARY; + double dtmp; + int rate_sleep = 0; + rd_kafka_topic_partition_list_t *topics; + int exitcode = 0; + rd_kafka_headers_t *hdrs = NULL; + rd_kafka_resp_err_t err; + + /* Kafka configuration */ + conf = rd_kafka_conf_new(); + rd_kafka_conf_set_error_cb(conf, err_cb); + rd_kafka_conf_set_throttle_cb(conf, throttle_cb); + rd_kafka_conf_set_offset_commit_cb(conf, offset_commit_cb); + +#ifdef SIGIO + /* Quick termination */ + rd_snprintf(tmp, sizeof(tmp), "%i", SIGIO); + rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0); +#endif + + /* Producer config */ + rd_kafka_conf_set(conf, "linger.ms", "1000", NULL, 0); + rd_kafka_conf_set(conf, "message.send.max.retries", "3", NULL, 0); + rd_kafka_conf_set(conf, "retry.backoff.ms", "500", NULL, 0); + + /* Consumer config */ + /* Tell rdkafka to (try to) maintain 1M messages + * in its internal receive buffers. This is to avoid + * application -> rdkafka -> broker per-message ping-pong + * latency. + * The larger the local queue, the higher the performance. + * Try other values with: ... -X queued.min.messages=1000 + */ + rd_kafka_conf_set(conf, "queued.min.messages", "1000000", NULL, 0); + rd_kafka_conf_set(conf, "session.timeout.ms", "6000", NULL, 0); + rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", NULL, 0); + + topics = rd_kafka_topic_partition_list_new(1); + + while ((opt = getopt(argc, argv, + "PCG:t:p:b:s:k:c:fi:MDd:m:S:x:" + "R:a:z:o:X:B:eT:Y:qvIur:lA:OwNH:")) != -1) { + switch (opt) { + case 'G': + if (rd_kafka_conf_set(conf, "group.id", optarg, errstr, + sizeof(errstr)) != + RD_KAFKA_CONF_OK) { + fprintf(stderr, "%% %s\n", errstr); + exit(1); + } + /* FALLTHRU */ + case 'P': + case 'C': + mode = opt; + break; + case 't': + rd_kafka_topic_partition_list_add( + topics, optarg, RD_KAFKA_PARTITION_UA); + break; + case 'p': + partition_cnt++; + partitions = realloc(partitions, sizeof(*partitions) * + partition_cnt); + partitions[partition_cnt - 1] = atoi(optarg); + break; + + case 'b': + brokers = optarg; + break; + case 's': + msgsize = atoi(optarg); + break; + case 'k': + key = optarg; + break; + case 'c': + msgcnt = atoi(optarg); + break; + case 'D': + sendflags |= RD_KAFKA_MSG_F_FREE; + break; + case 'i': + dispintvl = atoi(optarg); + break; + case 'm': + msgpattern = optarg; + break; + case 'S': + seq = strtoull(optarg, NULL, 10); + do_seq = 1; + break; + case 'x': + exit_after = atoi(optarg); + break; + case 'R': + seed = atoi(optarg); + break; + case 'a': + if (rd_kafka_conf_set(conf, "acks", optarg, errstr, + sizeof(errstr)) != + RD_KAFKA_CONF_OK) { + fprintf(stderr, "%% %s\n", errstr); + exit(1); + } + break; + case 'B': + batch_size = atoi(optarg); + break; + case 'z': + if (rd_kafka_conf_set(conf, "compression.codec", optarg, + errstr, sizeof(errstr)) != + RD_KAFKA_CONF_OK) { + fprintf(stderr, "%% %s\n", errstr); + exit(1); + } + compression = optarg; + break; + case 'o': + if (!strcmp(optarg, "end")) + start_offset = RD_KAFKA_OFFSET_END; + else if (!strcmp(optarg, "beginning")) + start_offset = RD_KAFKA_OFFSET_BEGINNING; + else if (!strcmp(optarg, "stored")) + start_offset = RD_KAFKA_OFFSET_STORED; + else { + start_offset = strtoll(optarg, NULL, 10); + + if (start_offset < 0) + start_offset = + RD_KAFKA_OFFSET_TAIL(-start_offset); + } + + break; + case 'e': + exit_eof = 1; + break; + case 'd': + debug = optarg; + break; + case 'H': + if (!strcmp(optarg, "parse")) + read_hdrs = 1; + else { + char *name, *val; + size_t name_sz = -1; + + name = optarg; + val = strchr(name, '='); + if (val) { + name_sz = (size_t)(val - name); + val++; /* past the '=' */ + } + + if (!hdrs) + hdrs = rd_kafka_headers_new(8); + + err = rd_kafka_header_add(hdrs, name, name_sz, + val, -1); + if (err) { + fprintf( + stderr, + "%% Failed to add header %s: %s\n", + name, rd_kafka_err2str(err)); + exit(1); + } + } + break; + case 'X': { + char *name, *val; + rd_kafka_conf_res_t res; + + if (!strcmp(optarg, "list") || + !strcmp(optarg, "help")) { + rd_kafka_conf_properties_show(stdout); + exit(0); + } + + if (!strcmp(optarg, "dump")) { + do_conf_dump = 1; + continue; + } + + name = optarg; + if (!(val = strchr(name, '='))) { + fprintf(stderr, + "%% Expected " + "-X property=value, not %s\n", + name); + exit(1); + } + + *val = '\0'; + val++; + + if (!strcmp(name, "file")) { + if (read_conf_file(conf, val) == -1) + exit(1); + break; + } + + res = rd_kafka_conf_set(conf, name, val, errstr, + sizeof(errstr)); + + if (res != RD_KAFKA_CONF_OK) { + fprintf(stderr, "%% %s\n", errstr); + exit(1); + } + } break; + + case 'T': + stats_intvlstr = optarg; + break; + case 'Y': + stats_cmd = optarg; + break; + + case 'q': + verbosity--; + break; + + case 'v': + verbosity++; + break; + + case 'I': + idle = 1; + break; + + case 'u': + otype = _OTYPE_TAB; + verbosity--; /* remove some fluff */ + break; + + case 'r': + dtmp = strtod(optarg, &tmp2); + if (tmp2 == optarg || + (dtmp >= -0.001 && dtmp <= 0.001)) { + fprintf(stderr, "%% Invalid rate: %s\n", + optarg); + exit(1); + } + + rate_sleep = (int)(1000000.0 / dtmp); + break; + + case 'l': + latency_mode = 1; + break; + + case 'A': + if (!(latency_fp = fopen(optarg, "w"))) { + fprintf(stderr, "%% Cant open %s: %s\n", optarg, + strerror(errno)); + exit(1); + } + break; + + case 'M': + incremental_mode = 1; + break; + + case 'N': + with_dr = 0; + break; + + default: + fprintf(stderr, "Unknown option: %c\n", opt); + goto usage; + } + } + + if (topics->cnt == 0 || optind != argc) { + if (optind < argc) + fprintf(stderr, "Unknown argument: %s\n", argv[optind]); + usage: + fprintf( + stderr, + "Usage: %s [-C|-P] -t <topic> " + "[-p <partition>] [-b <broker,broker..>] [options..]\n" + "\n" + "librdkafka version %s (0x%08x)\n" + "\n" + " Options:\n" + " -C | -P | Consumer or Producer mode\n" + " -G <groupid> High-level Kafka Consumer mode\n" + " -t <topic> Topic to consume / produce\n" + " -p <num> Partition (defaults to random). " + "Multiple partitions are allowed in -C consumer mode.\n" + " -M Print consumer interval stats\n" + " -b <brokers> Broker address list (host[:port],..)\n" + " -s <size> Message size (producer)\n" + " -k <key> Message key (producer)\n" + " -H <name[=value]> Add header to message (producer)\n" + " -H parse Read message headers (consumer)\n" + " -c <cnt> Messages to transmit/receive\n" + " -x <cnt> Hard exit after transmitting <cnt> " + "messages (producer)\n" + " -D Copy/Duplicate data buffer (producer)\n" + " -i <ms> Display interval\n" + " -m <msg> Message payload pattern\n" + " -S <start> Send a sequence number starting at " + "<start> as payload\n" + " -R <seed> Random seed value (defaults to time)\n" + " -a <acks> Required acks (producer): " + "-1, 0, 1, >1\n" + " -B <size> Consume batch size (# of msgs)\n" + " -z <codec> Enable compression:\n" + " none|gzip|snappy\n" + " -o <offset> Start offset (consumer)\n" + " beginning, end, NNNNN or -NNNNN\n" + " -d [facs..] Enable debugging contexts:\n" + " %s\n" + " -X <prop=name> Set arbitrary librdkafka " + "configuration property\n" + " -X file=<path> Read config from file.\n" + " -X list Show full list of supported properties.\n" + " -X dump Show configuration\n" + " -T <intvl> Enable statistics from librdkafka at " + "specified interval (ms)\n" + " -Y <command> Pipe statistics to <command>\n" + " -I Idle: dont produce any messages\n" + " -q Decrease verbosity\n" + " -v Increase verbosity (default 1)\n" + " -u Output stats in table format\n" + " -r <rate> Producer msg/s limit\n" + " -l Latency measurement.\n" + " Needs two matching instances, one\n" + " consumer and one producer, both\n" + " running with the -l switch.\n" + " -l Producer: per-message latency stats\n" + " -A <file> Write per-message latency stats to " + "<file>. Requires -l\n" + " -O Report produced offset (producer)\n" + " -N No delivery reports (producer)\n" + "\n" + " In Consumer mode:\n" + " consumes messages and prints thruput\n" + " If -B <..> is supplied the batch consumer\n" + " mode is used, else the callback mode is used.\n" + "\n" + " In Producer mode:\n" + " writes messages of size -s <..> and prints thruput\n" + "\n", + argv[0], rd_kafka_version_str(), rd_kafka_version(), + RD_KAFKA_DEBUG_CONTEXTS); + exit(1); + } + + + dispintvl *= 1000; /* us */ + + if (verbosity > 1) + printf("%% Using random seed %i, verbosity level %i\n", seed, + verbosity); + srand(seed); + signal(SIGINT, stop); +#ifdef SIGUSR1 + signal(SIGUSR1, sig_usr1); +#endif + + + if (debug && rd_kafka_conf_set(conf, "debug", debug, errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + printf("%% Debug configuration failed: %s: %s\n", errstr, + debug); + exit(1); + } + + /* Always enable stats (for RTT extraction), and if user supplied + * the -T <intvl> option we let her take part of the stats aswell. */ + rd_kafka_conf_set_stats_cb(conf, stats_cb); + + if (!stats_intvlstr) { + /* if no user-desired stats, adjust stats interval + * to the display interval. */ + rd_snprintf(tmp, sizeof(tmp), "%" PRId64, dispintvl / 1000); + } + + if (rd_kafka_conf_set(conf, "statistics.interval.ms", + stats_intvlstr ? stats_intvlstr : tmp, errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "%% %s\n", errstr); + exit(1); + } + + if (do_conf_dump) { + const char **arr; + size_t cnt; + int pass; + + for (pass = 0; pass < 2; pass++) { + int i; + + if (pass == 0) { + arr = rd_kafka_conf_dump(conf, &cnt); + printf("# Global config\n"); + } else { + rd_kafka_topic_conf_t *topic_conf = + rd_kafka_conf_get_default_topic_conf(conf); + + if (topic_conf) { + printf("# Topic config\n"); + arr = rd_kafka_topic_conf_dump( + topic_conf, &cnt); + } else { + arr = NULL; + } + } + + if (!arr) + continue; + + for (i = 0; i < (int)cnt; i += 2) + printf("%s = %s\n", arr[i], arr[i + 1]); + + printf("\n"); + + rd_kafka_conf_dump_free(arr, cnt); + } + + exit(0); + } + + if (latency_mode) + do_seq = 0; + + if (stats_intvlstr) { + /* User enabled stats (-T) */ + +#ifndef _WIN32 + if (stats_cmd) { + if (!(stats_fp = popen(stats_cmd, +#ifdef __linux__ + "we" +#else + "w" +#endif + ))) { + fprintf(stderr, + "%% Failed to start stats command: " + "%s: %s", + stats_cmd, strerror(errno)); + exit(1); + } + } else +#endif + stats_fp = stdout; + } + + if (msgcnt != -1) + forever = 0; + + if (msgsize == -1) + msgsize = (int)strlen(msgpattern); + + topic = topics->elems[0].topic; + + if (mode == 'C' || mode == 'G') + rd_kafka_conf_set(conf, "enable.partition.eof", "true", NULL, + 0); + + if (read_hdrs && mode == 'P') { + fprintf(stderr, "%% producer can not read headers\n"); + exit(1); + } + + if (hdrs && mode != 'P') { + fprintf(stderr, "%% consumer can not add headers\n"); + exit(1); + } + + /* Set bootstrap servers */ + if (brokers && + rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "%% %s\n", errstr); + exit(1); + } + + if (mode == 'P') { + /* + * Producer + */ + char *sbuf; + char *pbuf; + int outq; + int keylen = key ? (int)strlen(key) : 0; + off_t rof = 0; + size_t plen = strlen(msgpattern); + int partition = + partitions ? partitions[0] : RD_KAFKA_PARTITION_UA; + + if (latency_mode) { + int minlen = (int)(strlen("LATENCY:") + + strlen("18446744073709551615 ") + 1); + msgsize = RD_MAX(minlen, msgsize); + sendflags |= RD_KAFKA_MSG_F_COPY; + } else if (do_seq) { + int minlen = (int)strlen("18446744073709551615 ") + 1; + if (msgsize < minlen) + msgsize = minlen; + + /* Force duplication of payload */ + sendflags |= RD_KAFKA_MSG_F_FREE; + } + + sbuf = malloc(msgsize); + + /* Copy payload content to new buffer */ + while (rof < msgsize) { + size_t xlen = RD_MIN((size_t)msgsize - rof, plen); + memcpy(sbuf + rof, msgpattern, xlen); + rof += (off_t)xlen; + } + + if (msgcnt == -1) + printf("%% Sending messages of size %i bytes\n", + msgsize); + else + printf("%% Sending %i messages of size %i bytes\n", + msgcnt, msgsize); + + if (with_dr) + rd_kafka_conf_set_dr_msg_cb(conf, msg_delivered); + + /* Create Kafka handle */ + if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, + sizeof(errstr)))) { + fprintf(stderr, + "%% Failed to create Kafka producer: %s\n", + errstr); + exit(1); + } + + global_rk = rk; + + /* Explicitly create topic to avoid per-msg lookups. */ + rkt = rd_kafka_topic_new(rk, topic, NULL); + + + if (rate_sleep && verbosity >= 2) + fprintf(stderr, + "%% Inter message rate limiter sleep %ius\n", + rate_sleep); + + dr_disp_div = msgcnt / 50; + if (dr_disp_div == 0) + dr_disp_div = 10; + + cnt.t_start = cnt.t_last = rd_clock(); + + msgs_wait_produce_cnt = msgcnt; + + while (run && (msgcnt == -1 || (int)cnt.msgs < msgcnt)) { + /* Send/Produce message. */ + + if (idle) { + rd_kafka_poll(rk, 1000); + continue; + } + + if (latency_mode) { + rd_snprintf(sbuf, msgsize - 1, + "LATENCY:%" PRIu64, wall_clock()); + } else if (do_seq) { + rd_snprintf(sbuf, msgsize - 1, "%" PRIu64 ": ", + seq); + seq++; + } + + if (sendflags & RD_KAFKA_MSG_F_FREE) { + /* Duplicate memory */ + pbuf = malloc(msgsize); + memcpy(pbuf, sbuf, msgsize); + } else + pbuf = sbuf; + + if (msgsize == 0) + pbuf = NULL; + + cnt.tx++; + while (run && (err = do_produce( + rk, rkt, partition, sendflags, pbuf, + msgsize, key, keylen, hdrs))) { + if (err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION) + printf( + "%% No such partition: " + "%" PRId32 "\n", + partition); + else if (verbosity >= 3 || + (err != + RD_KAFKA_RESP_ERR__QUEUE_FULL && + verbosity >= 1)) + printf( + "%% produce error: %s%s\n", + rd_kafka_err2str(err), + err == RD_KAFKA_RESP_ERR__QUEUE_FULL + ? " (backpressure)" + : ""); + + cnt.tx_err++; + if (err != RD_KAFKA_RESP_ERR__QUEUE_FULL) { + run = 0; + break; + } + now = rd_clock(); + if (verbosity >= 2 && + cnt.t_enobufs_last + dispintvl <= now) { + printf( + "%% Backpressure %i " + "(tx %" PRIu64 + ", " + "txerr %" PRIu64 ")\n", + rd_kafka_outq_len(rk), cnt.tx, + cnt.tx_err); + cnt.t_enobufs_last = now; + } + + /* Poll to handle delivery reports */ + rd_kafka_poll(rk, 10); + + print_stats(rk, mode, otype, compression); + } + + msgs_wait_cnt++; + if (msgs_wait_produce_cnt != -1) + msgs_wait_produce_cnt--; + cnt.msgs++; + cnt.bytes += msgsize; + + /* Must poll to handle delivery reports */ + if (rate_sleep) { + rd_ts_t next = rd_clock() + (rd_ts_t)rate_sleep; + do { + rd_kafka_poll( + rk, + (int)RD_MAX(0, (next - rd_clock()) / + 1000)); + } while (next > rd_clock()); + } else if (cnt.msgs % 1000 == 0) { + rd_kafka_poll(rk, 0); + } + + print_stats(rk, mode, otype, compression); + } + + forever = 0; + if (verbosity >= 2) + printf( + "%% All messages produced, " + "now waiting for %li deliveries\n", + msgs_wait_cnt); + + /* Wait for messages to be delivered */ + while (run && rd_kafka_poll(rk, 1000) != -1) + print_stats(rk, mode, otype, compression); + + + outq = rd_kafka_outq_len(rk); + if (verbosity >= 2) + printf("%% %i messages in outq\n", outq); + cnt.msgs -= outq; + cnt.t_end = t_end; + + if (cnt.tx_err > 0) + printf("%% %" PRIu64 " backpressures for %" PRIu64 + " produce calls: %.3f%% backpressure rate\n", + cnt.tx_err, cnt.tx, + ((double)cnt.tx_err / (double)cnt.tx) * 100.0); + + /* Destroy topic */ + rd_kafka_topic_destroy(rkt); + + /* Destroy the handle */ + rd_kafka_destroy(rk); + global_rk = rk = NULL; + + free(sbuf); + + exitcode = cnt.msgs == cnt.msgs_dr_ok ? 0 : 1; + + } else if (mode == 'C') { + /* + * Consumer + */ + + rd_kafka_message_t **rkmessages = NULL; + size_t i = 0; + + /* Create Kafka handle */ + if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, + sizeof(errstr)))) { + fprintf(stderr, + "%% Failed to create Kafka consumer: %s\n", + errstr); + exit(1); + } + + global_rk = rk; + + /* Create topic to consume from */ + rkt = rd_kafka_topic_new(rk, topic, NULL); + + /* Batch consumer */ + if (batch_size) + rkmessages = malloc(sizeof(*rkmessages) * batch_size); + + /* Start consuming */ + rkqu = rd_kafka_queue_new(rk); + for (i = 0; i < (size_t)partition_cnt; ++i) { + const int r = rd_kafka_consume_start_queue( + rkt, partitions[i], start_offset, rkqu); + + if (r == -1) { + fprintf( + stderr, "%% Error creating queue: %s\n", + rd_kafka_err2str(rd_kafka_last_error())); + exit(1); + } + } + + while (run && (msgcnt == -1 || msgcnt > (int)cnt.msgs)) { + /* Consume messages. + * A message may either be a real message, or + * an error signaling (if rkmessage->err is set). + */ + uint64_t fetch_latency; + ssize_t r; + + fetch_latency = rd_clock(); + + if (batch_size) { + int partition = partitions + ? partitions[0] + : RD_KAFKA_PARTITION_UA; + + /* Batch fetch mode */ + r = rd_kafka_consume_batch(rkt, partition, 1000, + rkmessages, + batch_size); + if (r != -1) { + for (i = 0; (ssize_t)i < r; i++) { + msg_consume(rkmessages[i], + NULL); + rd_kafka_message_destroy( + rkmessages[i]); + } + } + } else { + /* Queue mode */ + r = rd_kafka_consume_callback_queue( + rkqu, 1000, msg_consume, NULL); + } + + cnt.t_fetch_latency += rd_clock() - fetch_latency; + if (r == -1) + fprintf( + stderr, "%% Error: %s\n", + rd_kafka_err2str(rd_kafka_last_error())); + else if (r > 0 && rate_sleep) { + /* Simulate processing time + * if `-r <rate>` was set. */ + do_sleep(rate_sleep); + } + + + print_stats(rk, mode, otype, compression); + + /* Poll to handle stats callbacks */ + rd_kafka_poll(rk, 0); + } + cnt.t_end = rd_clock(); + + /* Stop consuming */ + for (i = 0; i < (size_t)partition_cnt; ++i) { + int r = rd_kafka_consume_stop(rkt, (int32_t)i); + if (r == -1) { + fprintf( + stderr, "%% Error in consume_stop: %s\n", + rd_kafka_err2str(rd_kafka_last_error())); + } + } + rd_kafka_queue_destroy(rkqu); + + /* Destroy topic */ + rd_kafka_topic_destroy(rkt); + + if (batch_size) + free(rkmessages); + + /* Destroy the handle */ + rd_kafka_destroy(rk); + + global_rk = rk = NULL; + + } else if (mode == 'G') { + /* + * High-level balanced Consumer + */ + rd_kafka_message_t **rkmessages = NULL; + + rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb); + + /* Create Kafka handle */ + if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, + sizeof(errstr)))) { + fprintf(stderr, + "%% Failed to create Kafka consumer: %s\n", + errstr); + exit(1); + } + + /* Forward all events to consumer queue */ + rd_kafka_poll_set_consumer(rk); + + global_rk = rk; + + err = rd_kafka_subscribe(rk, topics); + if (err) { + fprintf(stderr, "%% Subscribe failed: %s\n", + rd_kafka_err2str(err)); + exit(1); + } + fprintf(stderr, "%% Waiting for group rebalance..\n"); + + if (batch_size) { + rkmessages = malloc(sizeof(*rkmessages) * batch_size); + } else { + rkmessages = malloc(sizeof(*rkmessages)); + } + + rkqu = rd_kafka_queue_get_consumer(rk); + + while (run && (msgcnt == -1 || msgcnt > (int)cnt.msgs)) { + /* Consume messages. + * A message may either be a real message, or + * an event (if rkmessage->err is set). + */ + uint64_t fetch_latency; + ssize_t r; + + fetch_latency = rd_clock(); + + if (batch_size) { + /* Batch fetch mode */ + ssize_t i = 0; + r = rd_kafka_consume_batch_queue( + rkqu, 1000, rkmessages, batch_size); + if (r != -1) { + for (i = 0; i < r; i++) { + msg_consume(rkmessages[i], + NULL); + rd_kafka_message_destroy( + rkmessages[i]); + } + } + + if (r == -1) + fprintf(stderr, "%% Error: %s\n", + rd_kafka_err2str( + rd_kafka_last_error())); + else if (r > 0 && rate_sleep) { + /* Simulate processing time + * if `-r <rate>` was set. */ + do_sleep(rate_sleep); + } + + } else { + rkmessages[0] = + rd_kafka_consumer_poll(rk, 1000); + if (rkmessages[0]) { + msg_consume(rkmessages[0], NULL); + rd_kafka_message_destroy(rkmessages[0]); + + /* Simulate processing time + * if `-r <rate>` was set. */ + if (rate_sleep) + do_sleep(rate_sleep); + } + } + + cnt.t_fetch_latency += rd_clock() - fetch_latency; + + print_stats(rk, mode, otype, compression); + } + cnt.t_end = rd_clock(); + + err = rd_kafka_consumer_close(rk); + if (err) + fprintf(stderr, "%% Failed to close consumer: %s\n", + rd_kafka_err2str(err)); + + free(rkmessages); + rd_kafka_queue_destroy(rkqu); + rd_kafka_destroy(rk); + } + + if (hdrs) + rd_kafka_headers_destroy(hdrs); + + print_stats(NULL, mode, otype | _OTYPE_FORCE, compression); + + if (cnt.t_fetch_latency && cnt.msgs) + printf("%% Average application fetch latency: %" PRIu64 "us\n", + cnt.t_fetch_latency / cnt.msgs); + + if (latency_fp) + fclose(latency_fp); + + if (stats_fp) { +#ifndef _WIN32 + pclose(stats_fp); +#endif + stats_fp = NULL; + } + + if (partitions) + free(partitions); + + rd_kafka_topic_partition_list_destroy(topics); + + /* Let background threads clean up and terminate cleanly. */ + rd_kafka_wait_destroyed(2000); + + return exitcode; +} |