summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/examples/rdkafka_example.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/examples/rdkafka_example.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/examples/rdkafka_example.c853
1 files changed, 0 insertions, 853 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/examples/rdkafka_example.c b/fluent-bit/lib/librdkafka-2.1.0/examples/rdkafka_example.c
deleted file mode 100644
index 91415318..00000000
--- a/fluent-bit/lib/librdkafka-2.1.0/examples/rdkafka_example.c
+++ /dev/null
@@ -1,853 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-/**
- * Apache Kafka consumer & producer example programs
- * using the Kafka driver from librdkafka
- * (https://github.com/edenhill/librdkafka)
- */
-
-#include <ctype.h>
-#include <signal.h>
-#include <string.h>
-#include <unistd.h>
-#include <stdlib.h>
-#include <syslog.h>
-#include <time.h>
-#include <sys/time.h>
-#include <getopt.h>
-
-/* Typical include path would be <librdkafka/rdkafka.h>, but this program
- * is builtin from within the librdkafka source tree and thus differs. */
-#include "rdkafka.h" /* for Kafka driver */
-
-
-static volatile sig_atomic_t run = 1;
-static rd_kafka_t *rk;
-static int exit_eof = 0;
-static int quiet = 0;
-static enum {
- OUTPUT_HEXDUMP,
- OUTPUT_RAW,
-} output = OUTPUT_HEXDUMP;
-
-static void stop(int sig) {
- run = 0;
- fclose(stdin); /* abort fgets() */
-}
-
-
-static void hexdump(FILE *fp, const char *name, const void *ptr, size_t len) {
- const char *p = (const char *)ptr;
- size_t of = 0;
-
-
- if (name)
- fprintf(fp, "%s hexdump (%zd bytes):\n", name, len);
-
- for (of = 0; of < len; of += 16) {
- char hexen[16 * 3 + 1];
- char charen[16 + 1];
- int hof = 0;
-
- int cof = 0;
- int i;
-
- for (i = of; i < (int)of + 16 && i < (int)len; i++) {
- hof += sprintf(hexen + hof, "%02x ", p[i] & 0xff);
- cof += sprintf(charen + cof, "%c",
- isprint((int)p[i]) ? p[i] : '.');
- }
- fprintf(fp, "%08zx: %-48s %-16s\n", of, hexen, charen);
- }
-}
-
-/**
- * Kafka logger callback (optional)
- */
-static void
-logger(const rd_kafka_t *rk, int level, const char *fac, const char *buf) {
- struct timeval tv;
- gettimeofday(&tv, NULL);
- fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n", (int)tv.tv_sec,
- (int)(tv.tv_usec / 1000), level, fac,
- rk ? rd_kafka_name(rk) : NULL, buf);
-}
-
-
-/**
- * Message delivery report callback using the richer rd_kafka_message_t object.
- */
-static void msg_delivered(rd_kafka_t *rk,
- const rd_kafka_message_t *rkmessage,
- void *opaque) {
- if (rkmessage->err)
- fprintf(stderr,
- "%% Message delivery failed (broker %" PRId32 "): %s\n",
- rd_kafka_message_broker_id(rkmessage),
- rd_kafka_err2str(rkmessage->err));
- else if (!quiet)
- fprintf(stderr,
- "%% Message delivered (%zd bytes, offset %" PRId64
- ", "
- "partition %" PRId32 ", broker %" PRId32 "): %.*s\n",
- rkmessage->len, rkmessage->offset, rkmessage->partition,
- rd_kafka_message_broker_id(rkmessage),
- (int)rkmessage->len, (const char *)rkmessage->payload);
-}
-
-
-static void msg_consume(rd_kafka_message_t *rkmessage, void *opaque) {
- if (rkmessage->err) {
- if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
- fprintf(stderr,
- "%% Consumer reached end of %s [%" PRId32
- "] "
- "message queue at offset %" PRId64 "\n",
- rd_kafka_topic_name(rkmessage->rkt),
- rkmessage->partition, rkmessage->offset);
-
- if (exit_eof)
- run = 0;
-
- return;
- }
-
- fprintf(stderr,
- "%% Consume error for topic \"%s\" [%" PRId32
- "] "
- "offset %" PRId64 ": %s\n",
- rd_kafka_topic_name(rkmessage->rkt),
- rkmessage->partition, rkmessage->offset,
- rd_kafka_message_errstr(rkmessage));
-
- if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||
- rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
- run = 0;
- return;
- }
-
- if (!quiet) {
- rd_kafka_timestamp_type_t tstype;
- int64_t timestamp;
- rd_kafka_headers_t *hdrs;
-
- fprintf(stdout,
- "%% Message (offset %" PRId64
- ", %zd bytes, "
- "broker %" PRId32 "):\n",
- rkmessage->offset, rkmessage->len,
- rd_kafka_message_broker_id(rkmessage));
-
- timestamp = rd_kafka_message_timestamp(rkmessage, &tstype);
- if (tstype != RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) {
- const char *tsname = "?";
- if (tstype == RD_KAFKA_TIMESTAMP_CREATE_TIME)
- tsname = "create time";
- else if (tstype == RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME)
- tsname = "log append time";
-
- fprintf(stdout,
- "%% Message timestamp: %s %" PRId64
- " (%ds ago)\n",
- tsname, timestamp,
- !timestamp ? 0
- : (int)time(NULL) -
- (int)(timestamp / 1000));
- }
-
- if (!rd_kafka_message_headers(rkmessage, &hdrs)) {
- size_t idx = 0;
- const char *name;
- const void *val;
- size_t size;
-
- fprintf(stdout, "%% Headers:");
-
- while (!rd_kafka_header_get_all(hdrs, idx++, &name,
- &val, &size)) {
- fprintf(stdout, "%s%s=", idx == 1 ? " " : ", ",
- name);
- if (val)
- fprintf(stdout, "\"%.*s\"", (int)size,
- (const char *)val);
- else
- fprintf(stdout, "NULL");
- }
- fprintf(stdout, "\n");
- }
- }
-
- if (rkmessage->key_len) {
- if (output == OUTPUT_HEXDUMP)
- hexdump(stdout, "Message Key", rkmessage->key,
- rkmessage->key_len);
- else
- printf("Key: %.*s\n", (int)rkmessage->key_len,
- (char *)rkmessage->key);
- }
-
- if (output == OUTPUT_HEXDUMP)
- hexdump(stdout, "Message Payload", rkmessage->payload,
- rkmessage->len);
- else
- printf("%.*s\n", (int)rkmessage->len,
- (char *)rkmessage->payload);
-}
-
-
-static void metadata_print(const char *topic,
- const struct rd_kafka_metadata *metadata) {
- int i, j, k;
- int32_t controllerid;
-
- printf("Metadata for %s (from broker %" PRId32 ": %s):\n",
- topic ?: "all topics", metadata->orig_broker_id,
- metadata->orig_broker_name);
-
- controllerid = rd_kafka_controllerid(rk, 0);
-
-
- /* Iterate brokers */
- printf(" %i brokers:\n", metadata->broker_cnt);
- for (i = 0; i < metadata->broker_cnt; i++)
- printf(" broker %" PRId32 " at %s:%i%s\n",
- metadata->brokers[i].id, metadata->brokers[i].host,
- metadata->brokers[i].port,
- controllerid == metadata->brokers[i].id ? " (controller)"
- : "");
-
- /* Iterate topics */
- printf(" %i topics:\n", metadata->topic_cnt);
- for (i = 0; i < metadata->topic_cnt; i++) {
- const struct rd_kafka_metadata_topic *t = &metadata->topics[i];
- printf(" topic \"%s\" with %i partitions:", t->topic,
- t->partition_cnt);
- if (t->err) {
- printf(" %s", rd_kafka_err2str(t->err));
- if (t->err == RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE)
- printf(" (try again)");
- }
- printf("\n");
-
- /* Iterate topic's partitions */
- for (j = 0; j < t->partition_cnt; j++) {
- const struct rd_kafka_metadata_partition *p;
- p = &t->partitions[j];
- printf(" partition %" PRId32
- ", "
- "leader %" PRId32 ", replicas: ",
- p->id, p->leader);
-
- /* Iterate partition's replicas */
- for (k = 0; k < p->replica_cnt; k++)
- printf("%s%" PRId32, k > 0 ? "," : "",
- p->replicas[k]);
-
- /* Iterate partition's ISRs */
- printf(", isrs: ");
- for (k = 0; k < p->isr_cnt; k++)
- printf("%s%" PRId32, k > 0 ? "," : "",
- p->isrs[k]);
- if (p->err)
- printf(", %s\n", rd_kafka_err2str(p->err));
- else
- printf("\n");
- }
- }
-}
-
-
-static void sig_usr1(int sig) {
- rd_kafka_dump(stdout, rk);
-}
-
-int main(int argc, char **argv) {
- rd_kafka_topic_t *rkt;
- char *brokers = "localhost:9092";
- char mode = 'C';
- char *topic = NULL;
- int partition = RD_KAFKA_PARTITION_UA;
- int opt;
- rd_kafka_conf_t *conf;
- rd_kafka_topic_conf_t *topic_conf;
- char errstr[512];
- int64_t start_offset = 0;
- int do_conf_dump = 0;
- char tmp[16];
- int64_t seek_offset = 0;
- int64_t tmp_offset = 0;
- int get_wmarks = 0;
- rd_kafka_headers_t *hdrs = NULL;
- rd_kafka_resp_err_t err;
-
- /* Kafka configuration */
- conf = rd_kafka_conf_new();
-
- /* Set logger */
- rd_kafka_conf_set_log_cb(conf, logger);
-
- /* Quick termination */
- snprintf(tmp, sizeof(tmp), "%i", SIGIO);
- rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
-
- /* Topic configuration */
- topic_conf = rd_kafka_topic_conf_new();
-
- while ((opt = getopt(argc, argv, "PCLt:p:b:z:qd:o:eX:As:H:")) != -1) {
- switch (opt) {
- case 'P':
- case 'C':
- case 'L':
- mode = opt;
- break;
- case 't':
- topic = optarg;
- break;
- case 'p':
- partition = atoi(optarg);
- break;
- case 'b':
- brokers = optarg;
- break;
- case 'z':
- if (rd_kafka_conf_set(conf, "compression.codec", optarg,
- errstr, sizeof(errstr)) !=
- RD_KAFKA_CONF_OK) {
- fprintf(stderr, "%% %s\n", errstr);
- exit(1);
- }
- break;
- case 'o':
- case 's':
- if (!strcmp(optarg, "end"))
- tmp_offset = RD_KAFKA_OFFSET_END;
- else if (!strcmp(optarg, "beginning"))
- tmp_offset = RD_KAFKA_OFFSET_BEGINNING;
- else if (!strcmp(optarg, "stored"))
- tmp_offset = RD_KAFKA_OFFSET_STORED;
- else if (!strcmp(optarg, "wmark"))
- get_wmarks = 1;
- else {
- tmp_offset = strtoll(optarg, NULL, 10);
-
- if (tmp_offset < 0)
- tmp_offset =
- RD_KAFKA_OFFSET_TAIL(-tmp_offset);
- }
-
- if (opt == 'o')
- start_offset = tmp_offset;
- else if (opt == 's')
- seek_offset = tmp_offset;
- break;
- case 'e':
- exit_eof = 1;
- break;
- case 'd':
- if (rd_kafka_conf_set(conf, "debug", optarg, errstr,
- sizeof(errstr)) !=
- RD_KAFKA_CONF_OK) {
- fprintf(stderr,
- "%% Debug configuration failed: "
- "%s: %s\n",
- errstr, optarg);
- exit(1);
- }
- break;
- case 'q':
- quiet = 1;
- break;
- case 'A':
- output = OUTPUT_RAW;
- break;
- case 'H': {
- char *name, *val;
- size_t name_sz = -1;
-
- name = optarg;
- val = strchr(name, '=');
- if (val) {
- name_sz = (size_t)(val - name);
- val++; /* past the '=' */
- }
-
- if (!hdrs)
- hdrs = rd_kafka_headers_new(8);
-
- err = rd_kafka_header_add(hdrs, name, name_sz, val, -1);
- if (err) {
- fprintf(stderr,
- "%% Failed to add header %s: %s\n",
- name, rd_kafka_err2str(err));
- exit(1);
- }
- } break;
-
- case 'X': {
- char *name, *val;
- rd_kafka_conf_res_t res;
-
- if (!strcmp(optarg, "list") ||
- !strcmp(optarg, "help")) {
- rd_kafka_conf_properties_show(stdout);
- exit(0);
- }
-
- if (!strcmp(optarg, "dump")) {
- do_conf_dump = 1;
- continue;
- }
-
- name = optarg;
- if (!(val = strchr(name, '='))) {
- char dest[512];
- size_t dest_size = sizeof(dest);
- /* Return current value for property. */
-
- res = RD_KAFKA_CONF_UNKNOWN;
- if (!strncmp(name, "topic.", strlen("topic.")))
- res = rd_kafka_topic_conf_get(
- topic_conf, name + strlen("topic."),
- dest, &dest_size);
- if (res == RD_KAFKA_CONF_UNKNOWN)
- res = rd_kafka_conf_get(
- conf, name, dest, &dest_size);
-
- if (res == RD_KAFKA_CONF_OK) {
- printf("%s = %s\n", name, dest);
- exit(0);
- } else {
- fprintf(stderr, "%% %s property\n",
- res == RD_KAFKA_CONF_UNKNOWN
- ? "Unknown"
- : "Invalid");
- exit(1);
- }
- }
-
- *val = '\0';
- val++;
-
- res = RD_KAFKA_CONF_UNKNOWN;
- /* Try "topic." prefixed properties on topic
- * conf first, and then fall through to global if
- * it didnt match a topic configuration property. */
- if (!strncmp(name, "topic.", strlen("topic.")))
- res = rd_kafka_topic_conf_set(
- topic_conf, name + strlen("topic."), val,
- errstr, sizeof(errstr));
-
- if (res == RD_KAFKA_CONF_UNKNOWN)
- res = rd_kafka_conf_set(conf, name, val, errstr,
- sizeof(errstr));
-
- if (res != RD_KAFKA_CONF_OK) {
- fprintf(stderr, "%% %s\n", errstr);
- exit(1);
- }
- } break;
-
- default:
- goto usage;
- }
- }
-
-
- if (do_conf_dump) {
- const char **arr;
- size_t cnt;
- int pass;
-
- for (pass = 0; pass < 2; pass++) {
- int i;
-
- if (pass == 0) {
- arr = rd_kafka_conf_dump(conf, &cnt);
- printf("# Global config\n");
- } else {
- printf("# Topic config\n");
- arr =
- rd_kafka_topic_conf_dump(topic_conf, &cnt);
- }
-
- for (i = 0; i < (int)cnt; i += 2)
- printf("%s = %s\n", arr[i], arr[i + 1]);
-
- printf("\n");
-
- rd_kafka_conf_dump_free(arr, cnt);
- }
-
- exit(0);
- }
-
-
- if (optind != argc || (mode != 'L' && !topic)) {
- usage:
- fprintf(stderr,
- "Usage: %s -C|-P|-L -t <topic> "
- "[-p <partition>] [-b <host1:port1,host2:port2,..>]\n"
- "\n"
- "librdkafka version %s (0x%08x)\n"
- "\n"
- " Options:\n"
- " -C | -P Consumer or Producer mode\n"
- " -L Metadata list mode\n"
- " -t <topic> Topic to fetch / produce\n"
- " -p <num> Partition (random partitioner)\n"
- " -b <brokers> Broker address (localhost:9092)\n"
- " -z <codec> Enable compression:\n"
- " none|gzip|snappy|lz4|zstd\n"
- " -o <offset> Start offset (consumer):\n"
- " beginning, end, NNNNN or -NNNNN\n"
- " wmark returns the current hi&lo "
- "watermarks.\n"
- " -e Exit consumer when last message\n"
- " in partition has been received.\n"
- " -d [facs..] Enable debugging contexts:\n"
- " %s\n"
- " -q Be quiet\n"
- " -A Raw payload output (consumer)\n"
- " -H <name[=value]> Add header to message (producer)\n"
- " -X <prop=name> Set arbitrary librdkafka "
- "configuration property\n"
- " Properties prefixed with \"topic.\" "
- "will be set on topic object.\n"
- " -X list Show full list of supported "
- "properties.\n"
- " -X dump Show configuration\n"
- " -X <prop> Get single property value\n"
- "\n"
- " In Consumer mode:\n"
- " writes fetched messages to stdout\n"
- " In Producer mode:\n"
- " reads messages from stdin and sends to broker\n"
- " In List mode:\n"
- " queries broker for metadata information, "
- "topic is optional.\n"
- "\n"
- "\n"
- "\n",
- argv[0], rd_kafka_version_str(), rd_kafka_version(),
- RD_KAFKA_DEBUG_CONTEXTS);
- exit(1);
- }
-
- if ((mode == 'C' && !isatty(STDIN_FILENO)) ||
- (mode != 'C' && !isatty(STDOUT_FILENO)))
- quiet = 1;
-
-
- signal(SIGINT, stop);
- signal(SIGUSR1, sig_usr1);
-
- /* Set bootstrap servers */
- if (brokers &&
- rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,
- sizeof(errstr)) != RD_KAFKA_CONF_OK) {
- fprintf(stderr, "%% %s\n", errstr);
- exit(1);
- }
-
- if (mode == 'P') {
- /*
- * Producer
- */
- char buf[2048];
- int sendcnt = 0;
-
- /* Set up a message delivery report callback.
- * It will be called once for each message, either on successful
- * delivery to broker, or upon failure to deliver to broker. */
- rd_kafka_conf_set_dr_msg_cb(conf, msg_delivered);
-
- /* Create Kafka handle */
- if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr,
- sizeof(errstr)))) {
- fprintf(stderr,
- "%% Failed to create new producer: %s\n",
- errstr);
- exit(1);
- }
-
- /* Create topic */
- rkt = rd_kafka_topic_new(rk, topic, topic_conf);
- topic_conf = NULL; /* Now owned by topic */
-
- if (!quiet)
- fprintf(stderr,
- "%% Type stuff and hit enter to send\n");
-
- while (run && fgets(buf, sizeof(buf), stdin)) {
- size_t len = strlen(buf);
- if (buf[len - 1] == '\n')
- buf[--len] = '\0';
-
- err = RD_KAFKA_RESP_ERR_NO_ERROR;
-
- /* Send/Produce message. */
- if (hdrs) {
- rd_kafka_headers_t *hdrs_copy;
-
- hdrs_copy = rd_kafka_headers_copy(hdrs);
-
- err = rd_kafka_producev(
- rk, RD_KAFKA_V_RKT(rkt),
- RD_KAFKA_V_PARTITION(partition),
- RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
- RD_KAFKA_V_VALUE(buf, len),
- RD_KAFKA_V_HEADERS(hdrs_copy),
- RD_KAFKA_V_END);
-
- if (err)
- rd_kafka_headers_destroy(hdrs_copy);
-
- } else {
- if (rd_kafka_produce(
- rkt, partition, RD_KAFKA_MSG_F_COPY,
- /* Payload and length */
- buf, len,
- /* Optional key and its length */
- NULL, 0,
- /* Message opaque, provided in
- * delivery report callback as
- * msg_opaque. */
- NULL) == -1) {
- err = rd_kafka_last_error();
- }
- }
-
- if (err) {
- fprintf(stderr,
- "%% Failed to produce to topic %s "
- "partition %i: %s\n",
- rd_kafka_topic_name(rkt), partition,
- rd_kafka_err2str(err));
-
- /* Poll to handle delivery reports */
- rd_kafka_poll(rk, 0);
- continue;
- }
-
- if (!quiet)
- fprintf(stderr,
- "%% Sent %zd bytes to topic "
- "%s partition %i\n",
- len, rd_kafka_topic_name(rkt),
- partition);
- sendcnt++;
- /* Poll to handle delivery reports */
- rd_kafka_poll(rk, 0);
- }
-
- /* Poll to handle delivery reports */
- rd_kafka_poll(rk, 0);
-
- /* Wait for messages to be delivered */
- while (run && rd_kafka_outq_len(rk) > 0)
- rd_kafka_poll(rk, 100);
-
- /* Destroy topic */
- rd_kafka_topic_destroy(rkt);
-
- /* Destroy the handle */
- rd_kafka_destroy(rk);
-
- } else if (mode == 'C') {
- /*
- * Consumer
- */
-
- rd_kafka_conf_set(conf, "enable.partition.eof", "true", NULL,
- 0);
-
- /* Create Kafka handle */
- if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr,
- sizeof(errstr)))) {
- fprintf(stderr,
- "%% Failed to create new consumer: %s\n",
- errstr);
- exit(1);
- }
-
- if (get_wmarks) {
- int64_t lo, hi;
-
- /* Only query for hi&lo partition watermarks */
-
- if ((err = rd_kafka_query_watermark_offsets(
- rk, topic, partition, &lo, &hi, 5000))) {
- fprintf(stderr,
- "%% query_watermark_offsets() "
- "failed: %s\n",
- rd_kafka_err2str(err));
- exit(1);
- }
-
- printf(
- "%s [%d]: low - high offsets: "
- "%" PRId64 " - %" PRId64 "\n",
- topic, partition, lo, hi);
-
- rd_kafka_destroy(rk);
- exit(0);
- }
-
-
- /* Create topic */
- rkt = rd_kafka_topic_new(rk, topic, topic_conf);
- topic_conf = NULL; /* Now owned by topic */
-
- /* Start consuming */
- if (rd_kafka_consume_start(rkt, partition, start_offset) ==
- -1) {
- err = rd_kafka_last_error();
- fprintf(stderr, "%% Failed to start consuming: %s\n",
- rd_kafka_err2str(err));
- if (err == RD_KAFKA_RESP_ERR__INVALID_ARG)
- fprintf(stderr,
- "%% Broker based offset storage "
- "requires a group.id, "
- "add: -X group.id=yourGroup\n");
- exit(1);
- }
-
- while (run) {
- rd_kafka_message_t *rkmessage;
-
- /* Poll for errors, etc. */
- rd_kafka_poll(rk, 0);
-
- /* Consume single message.
- * See rdkafka_performance.c for high speed
- * consuming of messages. */
- rkmessage = rd_kafka_consume(rkt, partition, 1000);
- if (!rkmessage) /* timeout */
- continue;
-
- msg_consume(rkmessage, NULL);
-
- /* Return message to rdkafka */
- rd_kafka_message_destroy(rkmessage);
-
- if (seek_offset) {
- err = rd_kafka_seek(rkt, partition, seek_offset,
- 2000);
- if (err)
- printf("Seek failed: %s\n",
- rd_kafka_err2str(err));
- else
- printf("Seeked to %" PRId64 "\n",
- seek_offset);
- seek_offset = 0;
- }
- }
-
- /* Stop consuming */
- rd_kafka_consume_stop(rkt, partition);
-
- while (rd_kafka_outq_len(rk) > 0)
- rd_kafka_poll(rk, 10);
-
- /* Destroy topic */
- rd_kafka_topic_destroy(rkt);
-
- /* Destroy handle */
- rd_kafka_destroy(rk);
-
- } else if (mode == 'L') {
- err = RD_KAFKA_RESP_ERR_NO_ERROR;
-
- /* Create Kafka handle */
- if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr,
- sizeof(errstr)))) {
- fprintf(stderr,
- "%% Failed to create new producer: %s\n",
- errstr);
- exit(1);
- }
-
- /* Create topic */
- if (topic) {
- rkt = rd_kafka_topic_new(rk, topic, topic_conf);
- topic_conf = NULL; /* Now owned by topic */
- } else
- rkt = NULL;
-
- while (run) {
- const struct rd_kafka_metadata *metadata;
-
- /* Fetch metadata */
- err = rd_kafka_metadata(rk, rkt ? 0 : 1, rkt, &metadata,
- 5000);
- if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
- fprintf(stderr,
- "%% Failed to acquire metadata: %s\n",
- rd_kafka_err2str(err));
- run = 0;
- break;
- }
-
- metadata_print(topic, metadata);
-
- rd_kafka_metadata_destroy(metadata);
- run = 0;
- }
-
- /* Destroy topic */
- if (rkt)
- rd_kafka_topic_destroy(rkt);
-
- /* Destroy the handle */
- rd_kafka_destroy(rk);
-
- if (topic_conf)
- rd_kafka_topic_conf_destroy(topic_conf);
-
-
- /* Exit right away, dont wait for background cleanup, we haven't
- * done anything important anyway. */
- exit(err ? 2 : 0);
- }
-
- if (hdrs)
- rd_kafka_headers_destroy(hdrs);
-
- if (topic_conf)
- rd_kafka_topic_conf_destroy(topic_conf);
-
- /* Let background threads clean up and terminate cleanly. */
- run = 5;
- while (run-- > 0 && rd_kafka_wait_destroyed(1000) == -1)
- printf("Waiting for librdkafka to decommission\n");
- if (run <= 0)
- rd_kafka_dump(stdout, rk);
-
- return 0;
-}