diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/examples/rdkafka_example.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/examples/rdkafka_example.c | 853 |
1 files changed, 853 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/examples/rdkafka_example.c b/fluent-bit/lib/librdkafka-2.1.0/examples/rdkafka_example.c new file mode 100644 index 000000000..91415318a --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/examples/rdkafka_example.c @@ -0,0 +1,853 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2012, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/** + * Apache Kafka consumer & producer example programs + * using the Kafka driver from librdkafka + * (https://github.com/edenhill/librdkafka) + */ + +#include <ctype.h> +#include <signal.h> +#include <string.h> +#include <unistd.h> +#include <stdlib.h> +#include <syslog.h> +#include <time.h> +#include <sys/time.h> +#include <getopt.h> + +/* Typical include path would be <librdkafka/rdkafka.h>, but this program + * is builtin from within the librdkafka source tree and thus differs. */ +#include "rdkafka.h" /* for Kafka driver */ + + +static volatile sig_atomic_t run = 1; +static rd_kafka_t *rk; +static int exit_eof = 0; +static int quiet = 0; +static enum { + OUTPUT_HEXDUMP, + OUTPUT_RAW, +} output = OUTPUT_HEXDUMP; + +static void stop(int sig) { + run = 0; + fclose(stdin); /* abort fgets() */ +} + + +static void hexdump(FILE *fp, const char *name, const void *ptr, size_t len) { + const char *p = (const char *)ptr; + size_t of = 0; + + + if (name) + fprintf(fp, "%s hexdump (%zd bytes):\n", name, len); + + for (of = 0; of < len; of += 16) { + char hexen[16 * 3 + 1]; + char charen[16 + 1]; + int hof = 0; + + int cof = 0; + int i; + + for (i = of; i < (int)of + 16 && i < (int)len; i++) { + hof += sprintf(hexen + hof, "%02x ", p[i] & 0xff); + cof += sprintf(charen + cof, "%c", + isprint((int)p[i]) ? p[i] : '.'); + } + fprintf(fp, "%08zx: %-48s %-16s\n", of, hexen, charen); + } +} + +/** + * Kafka logger callback (optional) + */ +static void +logger(const rd_kafka_t *rk, int level, const char *fac, const char *buf) { + struct timeval tv; + gettimeofday(&tv, NULL); + fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n", (int)tv.tv_sec, + (int)(tv.tv_usec / 1000), level, fac, + rk ? rd_kafka_name(rk) : NULL, buf); +} + + +/** + * Message delivery report callback using the richer rd_kafka_message_t object. + */ +static void msg_delivered(rd_kafka_t *rk, + const rd_kafka_message_t *rkmessage, + void *opaque) { + if (rkmessage->err) + fprintf(stderr, + "%% Message delivery failed (broker %" PRId32 "): %s\n", + rd_kafka_message_broker_id(rkmessage), + rd_kafka_err2str(rkmessage->err)); + else if (!quiet) + fprintf(stderr, + "%% Message delivered (%zd bytes, offset %" PRId64 + ", " + "partition %" PRId32 ", broker %" PRId32 "): %.*s\n", + rkmessage->len, rkmessage->offset, rkmessage->partition, + rd_kafka_message_broker_id(rkmessage), + (int)rkmessage->len, (const char *)rkmessage->payload); +} + + +static void msg_consume(rd_kafka_message_t *rkmessage, void *opaque) { + if (rkmessage->err) { + if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { + fprintf(stderr, + "%% Consumer reached end of %s [%" PRId32 + "] " + "message queue at offset %" PRId64 "\n", + rd_kafka_topic_name(rkmessage->rkt), + rkmessage->partition, rkmessage->offset); + + if (exit_eof) + run = 0; + + return; + } + + fprintf(stderr, + "%% Consume error for topic \"%s\" [%" PRId32 + "] " + "offset %" PRId64 ": %s\n", + rd_kafka_topic_name(rkmessage->rkt), + rkmessage->partition, rkmessage->offset, + rd_kafka_message_errstr(rkmessage)); + + if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION || + rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC) + run = 0; + return; + } + + if (!quiet) { + rd_kafka_timestamp_type_t tstype; + int64_t timestamp; + rd_kafka_headers_t *hdrs; + + fprintf(stdout, + "%% Message (offset %" PRId64 + ", %zd bytes, " + "broker %" PRId32 "):\n", + rkmessage->offset, rkmessage->len, + rd_kafka_message_broker_id(rkmessage)); + + timestamp = rd_kafka_message_timestamp(rkmessage, &tstype); + if (tstype != RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) { + const char *tsname = "?"; + if (tstype == RD_KAFKA_TIMESTAMP_CREATE_TIME) + tsname = "create time"; + else if (tstype == RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME) + tsname = "log append time"; + + fprintf(stdout, + "%% Message timestamp: %s %" PRId64 + " (%ds ago)\n", + tsname, timestamp, + !timestamp ? 0 + : (int)time(NULL) - + (int)(timestamp / 1000)); + } + + if (!rd_kafka_message_headers(rkmessage, &hdrs)) { + size_t idx = 0; + const char *name; + const void *val; + size_t size; + + fprintf(stdout, "%% Headers:"); + + while (!rd_kafka_header_get_all(hdrs, idx++, &name, + &val, &size)) { + fprintf(stdout, "%s%s=", idx == 1 ? " " : ", ", + name); + if (val) + fprintf(stdout, "\"%.*s\"", (int)size, + (const char *)val); + else + fprintf(stdout, "NULL"); + } + fprintf(stdout, "\n"); + } + } + + if (rkmessage->key_len) { + if (output == OUTPUT_HEXDUMP) + hexdump(stdout, "Message Key", rkmessage->key, + rkmessage->key_len); + else + printf("Key: %.*s\n", (int)rkmessage->key_len, + (char *)rkmessage->key); + } + + if (output == OUTPUT_HEXDUMP) + hexdump(stdout, "Message Payload", rkmessage->payload, + rkmessage->len); + else + printf("%.*s\n", (int)rkmessage->len, + (char *)rkmessage->payload); +} + + +static void metadata_print(const char *topic, + const struct rd_kafka_metadata *metadata) { + int i, j, k; + int32_t controllerid; + + printf("Metadata for %s (from broker %" PRId32 ": %s):\n", + topic ?: "all topics", metadata->orig_broker_id, + metadata->orig_broker_name); + + controllerid = rd_kafka_controllerid(rk, 0); + + + /* Iterate brokers */ + printf(" %i brokers:\n", metadata->broker_cnt); + for (i = 0; i < metadata->broker_cnt; i++) + printf(" broker %" PRId32 " at %s:%i%s\n", + metadata->brokers[i].id, metadata->brokers[i].host, + metadata->brokers[i].port, + controllerid == metadata->brokers[i].id ? " (controller)" + : ""); + + /* Iterate topics */ + printf(" %i topics:\n", metadata->topic_cnt); + for (i = 0; i < metadata->topic_cnt; i++) { + const struct rd_kafka_metadata_topic *t = &metadata->topics[i]; + printf(" topic \"%s\" with %i partitions:", t->topic, + t->partition_cnt); + if (t->err) { + printf(" %s", rd_kafka_err2str(t->err)); + if (t->err == RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE) + printf(" (try again)"); + } + printf("\n"); + + /* Iterate topic's partitions */ + for (j = 0; j < t->partition_cnt; j++) { + const struct rd_kafka_metadata_partition *p; + p = &t->partitions[j]; + printf(" partition %" PRId32 + ", " + "leader %" PRId32 ", replicas: ", + p->id, p->leader); + + /* Iterate partition's replicas */ + for (k = 0; k < p->replica_cnt; k++) + printf("%s%" PRId32, k > 0 ? "," : "", + p->replicas[k]); + + /* Iterate partition's ISRs */ + printf(", isrs: "); + for (k = 0; k < p->isr_cnt; k++) + printf("%s%" PRId32, k > 0 ? "," : "", + p->isrs[k]); + if (p->err) + printf(", %s\n", rd_kafka_err2str(p->err)); + else + printf("\n"); + } + } +} + + +static void sig_usr1(int sig) { + rd_kafka_dump(stdout, rk); +} + +int main(int argc, char **argv) { + rd_kafka_topic_t *rkt; + char *brokers = "localhost:9092"; + char mode = 'C'; + char *topic = NULL; + int partition = RD_KAFKA_PARTITION_UA; + int opt; + rd_kafka_conf_t *conf; + rd_kafka_topic_conf_t *topic_conf; + char errstr[512]; + int64_t start_offset = 0; + int do_conf_dump = 0; + char tmp[16]; + int64_t seek_offset = 0; + int64_t tmp_offset = 0; + int get_wmarks = 0; + rd_kafka_headers_t *hdrs = NULL; + rd_kafka_resp_err_t err; + + /* Kafka configuration */ + conf = rd_kafka_conf_new(); + + /* Set logger */ + rd_kafka_conf_set_log_cb(conf, logger); + + /* Quick termination */ + snprintf(tmp, sizeof(tmp), "%i", SIGIO); + rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0); + + /* Topic configuration */ + topic_conf = rd_kafka_topic_conf_new(); + + while ((opt = getopt(argc, argv, "PCLt:p:b:z:qd:o:eX:As:H:")) != -1) { + switch (opt) { + case 'P': + case 'C': + case 'L': + mode = opt; + break; + case 't': + topic = optarg; + break; + case 'p': + partition = atoi(optarg); + break; + case 'b': + brokers = optarg; + break; + case 'z': + if (rd_kafka_conf_set(conf, "compression.codec", optarg, + errstr, sizeof(errstr)) != + RD_KAFKA_CONF_OK) { + fprintf(stderr, "%% %s\n", errstr); + exit(1); + } + break; + case 'o': + case 's': + if (!strcmp(optarg, "end")) + tmp_offset = RD_KAFKA_OFFSET_END; + else if (!strcmp(optarg, "beginning")) + tmp_offset = RD_KAFKA_OFFSET_BEGINNING; + else if (!strcmp(optarg, "stored")) + tmp_offset = RD_KAFKA_OFFSET_STORED; + else if (!strcmp(optarg, "wmark")) + get_wmarks = 1; + else { + tmp_offset = strtoll(optarg, NULL, 10); + + if (tmp_offset < 0) + tmp_offset = + RD_KAFKA_OFFSET_TAIL(-tmp_offset); + } + + if (opt == 'o') + start_offset = tmp_offset; + else if (opt == 's') + seek_offset = tmp_offset; + break; + case 'e': + exit_eof = 1; + break; + case 'd': + if (rd_kafka_conf_set(conf, "debug", optarg, errstr, + sizeof(errstr)) != + RD_KAFKA_CONF_OK) { + fprintf(stderr, + "%% Debug configuration failed: " + "%s: %s\n", + errstr, optarg); + exit(1); + } + break; + case 'q': + quiet = 1; + break; + case 'A': + output = OUTPUT_RAW; + break; + case 'H': { + char *name, *val; + size_t name_sz = -1; + + name = optarg; + val = strchr(name, '='); + if (val) { + name_sz = (size_t)(val - name); + val++; /* past the '=' */ + } + + if (!hdrs) + hdrs = rd_kafka_headers_new(8); + + err = rd_kafka_header_add(hdrs, name, name_sz, val, -1); + if (err) { + fprintf(stderr, + "%% Failed to add header %s: %s\n", + name, rd_kafka_err2str(err)); + exit(1); + } + } break; + + case 'X': { + char *name, *val; + rd_kafka_conf_res_t res; + + if (!strcmp(optarg, "list") || + !strcmp(optarg, "help")) { + rd_kafka_conf_properties_show(stdout); + exit(0); + } + + if (!strcmp(optarg, "dump")) { + do_conf_dump = 1; + continue; + } + + name = optarg; + if (!(val = strchr(name, '='))) { + char dest[512]; + size_t dest_size = sizeof(dest); + /* Return current value for property. */ + + res = RD_KAFKA_CONF_UNKNOWN; + if (!strncmp(name, "topic.", strlen("topic."))) + res = rd_kafka_topic_conf_get( + topic_conf, name + strlen("topic."), + dest, &dest_size); + if (res == RD_KAFKA_CONF_UNKNOWN) + res = rd_kafka_conf_get( + conf, name, dest, &dest_size); + + if (res == RD_KAFKA_CONF_OK) { + printf("%s = %s\n", name, dest); + exit(0); + } else { + fprintf(stderr, "%% %s property\n", + res == RD_KAFKA_CONF_UNKNOWN + ? "Unknown" + : "Invalid"); + exit(1); + } + } + + *val = '\0'; + val++; + + res = RD_KAFKA_CONF_UNKNOWN; + /* Try "topic." prefixed properties on topic + * conf first, and then fall through to global if + * it didnt match a topic configuration property. */ + if (!strncmp(name, "topic.", strlen("topic."))) + res = rd_kafka_topic_conf_set( + topic_conf, name + strlen("topic."), val, + errstr, sizeof(errstr)); + + if (res == RD_KAFKA_CONF_UNKNOWN) + res = rd_kafka_conf_set(conf, name, val, errstr, + sizeof(errstr)); + + if (res != RD_KAFKA_CONF_OK) { + fprintf(stderr, "%% %s\n", errstr); + exit(1); + } + } break; + + default: + goto usage; + } + } + + + if (do_conf_dump) { + const char **arr; + size_t cnt; + int pass; + + for (pass = 0; pass < 2; pass++) { + int i; + + if (pass == 0) { + arr = rd_kafka_conf_dump(conf, &cnt); + printf("# Global config\n"); + } else { + printf("# Topic config\n"); + arr = + rd_kafka_topic_conf_dump(topic_conf, &cnt); + } + + for (i = 0; i < (int)cnt; i += 2) + printf("%s = %s\n", arr[i], arr[i + 1]); + + printf("\n"); + + rd_kafka_conf_dump_free(arr, cnt); + } + + exit(0); + } + + + if (optind != argc || (mode != 'L' && !topic)) { + usage: + fprintf(stderr, + "Usage: %s -C|-P|-L -t <topic> " + "[-p <partition>] [-b <host1:port1,host2:port2,..>]\n" + "\n" + "librdkafka version %s (0x%08x)\n" + "\n" + " Options:\n" + " -C | -P Consumer or Producer mode\n" + " -L Metadata list mode\n" + " -t <topic> Topic to fetch / produce\n" + " -p <num> Partition (random partitioner)\n" + " -b <brokers> Broker address (localhost:9092)\n" + " -z <codec> Enable compression:\n" + " none|gzip|snappy|lz4|zstd\n" + " -o <offset> Start offset (consumer):\n" + " beginning, end, NNNNN or -NNNNN\n" + " wmark returns the current hi&lo " + "watermarks.\n" + " -e Exit consumer when last message\n" + " in partition has been received.\n" + " -d [facs..] Enable debugging contexts:\n" + " %s\n" + " -q Be quiet\n" + " -A Raw payload output (consumer)\n" + " -H <name[=value]> Add header to message (producer)\n" + " -X <prop=name> Set arbitrary librdkafka " + "configuration property\n" + " Properties prefixed with \"topic.\" " + "will be set on topic object.\n" + " -X list Show full list of supported " + "properties.\n" + " -X dump Show configuration\n" + " -X <prop> Get single property value\n" + "\n" + " In Consumer mode:\n" + " writes fetched messages to stdout\n" + " In Producer mode:\n" + " reads messages from stdin and sends to broker\n" + " In List mode:\n" + " queries broker for metadata information, " + "topic is optional.\n" + "\n" + "\n" + "\n", + argv[0], rd_kafka_version_str(), rd_kafka_version(), + RD_KAFKA_DEBUG_CONTEXTS); + exit(1); + } + + if ((mode == 'C' && !isatty(STDIN_FILENO)) || + (mode != 'C' && !isatty(STDOUT_FILENO))) + quiet = 1; + + + signal(SIGINT, stop); + signal(SIGUSR1, sig_usr1); + + /* Set bootstrap servers */ + if (brokers && + rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "%% %s\n", errstr); + exit(1); + } + + if (mode == 'P') { + /* + * Producer + */ + char buf[2048]; + int sendcnt = 0; + + /* Set up a message delivery report callback. + * It will be called once for each message, either on successful + * delivery to broker, or upon failure to deliver to broker. */ + rd_kafka_conf_set_dr_msg_cb(conf, msg_delivered); + + /* Create Kafka handle */ + if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, + sizeof(errstr)))) { + fprintf(stderr, + "%% Failed to create new producer: %s\n", + errstr); + exit(1); + } + + /* Create topic */ + rkt = rd_kafka_topic_new(rk, topic, topic_conf); + topic_conf = NULL; /* Now owned by topic */ + + if (!quiet) + fprintf(stderr, + "%% Type stuff and hit enter to send\n"); + + while (run && fgets(buf, sizeof(buf), stdin)) { + size_t len = strlen(buf); + if (buf[len - 1] == '\n') + buf[--len] = '\0'; + + err = RD_KAFKA_RESP_ERR_NO_ERROR; + + /* Send/Produce message. */ + if (hdrs) { + rd_kafka_headers_t *hdrs_copy; + + hdrs_copy = rd_kafka_headers_copy(hdrs); + + err = rd_kafka_producev( + rk, RD_KAFKA_V_RKT(rkt), + RD_KAFKA_V_PARTITION(partition), + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), + RD_KAFKA_V_VALUE(buf, len), + RD_KAFKA_V_HEADERS(hdrs_copy), + RD_KAFKA_V_END); + + if (err) + rd_kafka_headers_destroy(hdrs_copy); + + } else { + if (rd_kafka_produce( + rkt, partition, RD_KAFKA_MSG_F_COPY, + /* Payload and length */ + buf, len, + /* Optional key and its length */ + NULL, 0, + /* Message opaque, provided in + * delivery report callback as + * msg_opaque. */ + NULL) == -1) { + err = rd_kafka_last_error(); + } + } + + if (err) { + fprintf(stderr, + "%% Failed to produce to topic %s " + "partition %i: %s\n", + rd_kafka_topic_name(rkt), partition, + rd_kafka_err2str(err)); + + /* Poll to handle delivery reports */ + rd_kafka_poll(rk, 0); + continue; + } + + if (!quiet) + fprintf(stderr, + "%% Sent %zd bytes to topic " + "%s partition %i\n", + len, rd_kafka_topic_name(rkt), + partition); + sendcnt++; + /* Poll to handle delivery reports */ + rd_kafka_poll(rk, 0); + } + + /* Poll to handle delivery reports */ + rd_kafka_poll(rk, 0); + + /* Wait for messages to be delivered */ + while (run && rd_kafka_outq_len(rk) > 0) + rd_kafka_poll(rk, 100); + + /* Destroy topic */ + rd_kafka_topic_destroy(rkt); + + /* Destroy the handle */ + rd_kafka_destroy(rk); + + } else if (mode == 'C') { + /* + * Consumer + */ + + rd_kafka_conf_set(conf, "enable.partition.eof", "true", NULL, + 0); + + /* Create Kafka handle */ + if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, + sizeof(errstr)))) { + fprintf(stderr, + "%% Failed to create new consumer: %s\n", + errstr); + exit(1); + } + + if (get_wmarks) { + int64_t lo, hi; + + /* Only query for hi&lo partition watermarks */ + + if ((err = rd_kafka_query_watermark_offsets( + rk, topic, partition, &lo, &hi, 5000))) { + fprintf(stderr, + "%% query_watermark_offsets() " + "failed: %s\n", + rd_kafka_err2str(err)); + exit(1); + } + + printf( + "%s [%d]: low - high offsets: " + "%" PRId64 " - %" PRId64 "\n", + topic, partition, lo, hi); + + rd_kafka_destroy(rk); + exit(0); + } + + + /* Create topic */ + rkt = rd_kafka_topic_new(rk, topic, topic_conf); + topic_conf = NULL; /* Now owned by topic */ + + /* Start consuming */ + if (rd_kafka_consume_start(rkt, partition, start_offset) == + -1) { + err = rd_kafka_last_error(); + fprintf(stderr, "%% Failed to start consuming: %s\n", + rd_kafka_err2str(err)); + if (err == RD_KAFKA_RESP_ERR__INVALID_ARG) + fprintf(stderr, + "%% Broker based offset storage " + "requires a group.id, " + "add: -X group.id=yourGroup\n"); + exit(1); + } + + while (run) { + rd_kafka_message_t *rkmessage; + + /* Poll for errors, etc. */ + rd_kafka_poll(rk, 0); + + /* Consume single message. + * See rdkafka_performance.c for high speed + * consuming of messages. */ + rkmessage = rd_kafka_consume(rkt, partition, 1000); + if (!rkmessage) /* timeout */ + continue; + + msg_consume(rkmessage, NULL); + + /* Return message to rdkafka */ + rd_kafka_message_destroy(rkmessage); + + if (seek_offset) { + err = rd_kafka_seek(rkt, partition, seek_offset, + 2000); + if (err) + printf("Seek failed: %s\n", + rd_kafka_err2str(err)); + else + printf("Seeked to %" PRId64 "\n", + seek_offset); + seek_offset = 0; + } + } + + /* Stop consuming */ + rd_kafka_consume_stop(rkt, partition); + + while (rd_kafka_outq_len(rk) > 0) + rd_kafka_poll(rk, 10); + + /* Destroy topic */ + rd_kafka_topic_destroy(rkt); + + /* Destroy handle */ + rd_kafka_destroy(rk); + + } else if (mode == 'L') { + err = RD_KAFKA_RESP_ERR_NO_ERROR; + + /* Create Kafka handle */ + if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, + sizeof(errstr)))) { + fprintf(stderr, + "%% Failed to create new producer: %s\n", + errstr); + exit(1); + } + + /* Create topic */ + if (topic) { + rkt = rd_kafka_topic_new(rk, topic, topic_conf); + topic_conf = NULL; /* Now owned by topic */ + } else + rkt = NULL; + + while (run) { + const struct rd_kafka_metadata *metadata; + + /* Fetch metadata */ + err = rd_kafka_metadata(rk, rkt ? 0 : 1, rkt, &metadata, + 5000); + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { + fprintf(stderr, + "%% Failed to acquire metadata: %s\n", + rd_kafka_err2str(err)); + run = 0; + break; + } + + metadata_print(topic, metadata); + + rd_kafka_metadata_destroy(metadata); + run = 0; + } + + /* Destroy topic */ + if (rkt) + rd_kafka_topic_destroy(rkt); + + /* Destroy the handle */ + rd_kafka_destroy(rk); + + if (topic_conf) + rd_kafka_topic_conf_destroy(topic_conf); + + + /* Exit right away, dont wait for background cleanup, we haven't + * done anything important anyway. */ + exit(err ? 2 : 0); + } + + if (hdrs) + rd_kafka_headers_destroy(hdrs); + + if (topic_conf) + rd_kafka_topic_conf_destroy(topic_conf); + + /* Let background threads clean up and terminate cleanly. */ + run = 5; + while (run-- > 0 && rd_kafka_wait_destroyed(1000) == -1) + printf("Waiting for librdkafka to decommission\n"); + if (run <= 0) + rd_kafka_dump(stdout, rk); + + return 0; +} |