summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/examples/list_consumer_group_offsets.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/examples/list_consumer_group_offsets.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/examples/list_consumer_group_offsets.c359
1 files changed, 0 insertions, 359 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/examples/list_consumer_group_offsets.c b/fluent-bit/lib/librdkafka-2.1.0/examples/list_consumer_group_offsets.c
deleted file mode 100644
index 03e878ee1..000000000
--- a/fluent-bit/lib/librdkafka-2.1.0/examples/list_consumer_group_offsets.c
+++ /dev/null
@@ -1,359 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2022, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-/**
- * ListConsumerGroupOffsets usage example.
- */
-
-#include <stdio.h>
-#include <signal.h>
-#include <string.h>
-#include <stdlib.h>
-#include <stdarg.h>
-
-#ifdef _WIN32
-#include "../win32/wingetopt.h"
-#else
-#include <getopt.h>
-#endif
-
-
-/* Typical include path would be <librdkafka/rdkafka.h>, but this program
- * is builtin from within the librdkafka source tree and thus differs. */
-#include "rdkafka.h"
-
-
-const char *argv0;
-
-static rd_kafka_queue_t *queue; /** Admin result queue.
- * This is a global so we can
- * yield in stop() */
-static volatile sig_atomic_t run = 1;
-
-/**
- * @brief Signal termination of program
- */
-static void stop(int sig) {
- if (!run) {
- fprintf(stderr, "%% Forced termination\n");
- exit(2);
- }
- run = 0;
- rd_kafka_queue_yield(queue);
-}
-
-
-static void usage(const char *reason, ...) {
-
- fprintf(stderr,
- "List consumer group offsets usage examples\n"
- "\n"
- "Usage: %s <options> <group_id> "
- "<require_stable_offsets>\n"
- " <topic1> <partition1>\n"
- " <topic2> <partition2>\n"
- " ...\n"
- "\n"
- "Options:\n"
- " -b <brokers> Bootstrap server list to connect to.\n"
- " -X <prop=val> Set librdkafka configuration property.\n"
- " See CONFIGURATION.md for full list.\n"
- " -d <dbg,..> Enable librdkafka debugging (%s).\n"
- "\n",
- argv0, rd_kafka_get_debug_contexts());
-
- if (reason) {
- va_list ap;
- char reasonbuf[512];
-
- va_start(ap, reason);
- vsnprintf(reasonbuf, sizeof(reasonbuf), reason, ap);
- va_end(ap);
-
- fprintf(stderr, "ERROR: %s\n", reasonbuf);
- }
-
- exit(reason ? 1 : 0);
-}
-
-
-#define fatal(...) \
- do { \
- fprintf(stderr, "ERROR: "); \
- fprintf(stderr, __VA_ARGS__); \
- fprintf(stderr, "\n"); \
- exit(2); \
- } while (0)
-
-
-/**
- * @brief Set config property. Exit on failure.
- */
-static void conf_set(rd_kafka_conf_t *conf, const char *name, const char *val) {
- char errstr[512];
-
- if (rd_kafka_conf_set(conf, name, val, errstr, sizeof(errstr)) !=
- RD_KAFKA_CONF_OK)
- fatal("Failed to set %s=%s: %s", name, val, errstr);
-}
-
-
-static void
-print_partition_list(FILE *fp,
- const rd_kafka_topic_partition_list_t *partitions,
- int print_offset,
- const char *prefix) {
- int i;
-
- if (partitions->cnt == 0) {
- fprintf(fp, "%sNo partition found", prefix);
- }
- for (i = 0; i < partitions->cnt; i++) {
- char offset_string[512] = {};
- *offset_string = '\0';
- if (print_offset) {
- snprintf(offset_string, sizeof(offset_string),
- " offset %" PRId64,
- partitions->elems[i].offset);
- }
- fprintf(fp, "%s%s %s [%" PRId32 "]%s error %s",
- i > 0 ? "\n" : "", prefix, partitions->elems[i].topic,
- partitions->elems[i].partition, offset_string,
- rd_kafka_err2str(partitions->elems[i].err));
- }
- fprintf(fp, "\n");
-}
-
-/**
- * @brief Parse an integer or fail.
- */
-int64_t parse_int(const char *what, const char *str) {
- char *end;
- unsigned long n = strtoull(str, &end, 0);
-
- if (end != str + strlen(str)) {
- fprintf(stderr, "%% Invalid input for %s: %s: not an integer\n",
- what, str);
- exit(1);
- }
-
- return (int64_t)n;
-}
-
-static void
-cmd_list_consumer_group_offsets(rd_kafka_conf_t *conf, int argc, char **argv) {
- char errstr[512]; /* librdkafka API error reporting buffer */
- rd_kafka_t *rk; /* Admin client instance */
- rd_kafka_AdminOptions_t *options; /* (Optional) Options for
- * ListConsumerGroupOffsets() */
- rd_kafka_event_t *event; /* ListConsumerGroupOffsets result event */
- const int min_argc = 2;
- char *topic;
- int partition;
- int require_stable_offsets = 0, num_partitions = 0;
- rd_kafka_ListConsumerGroupOffsets_t *list_cgrp_offsets;
- rd_kafka_error_t *error;
- const char *group;
-
- /*
- * Argument validation
- */
- if (argc < min_argc || (argc - min_argc) % 2 != 0)
- usage("Wrong number of arguments");
- else {
- require_stable_offsets =
- parse_int("require_stable_offsets", argv[1]);
- if (require_stable_offsets < 0 || require_stable_offsets > 1)
- usage("Require stable not a 0-1 int");
- }
-
- num_partitions = (argc - min_argc) / 2;
- group = argv[0];
-
- /*
- * Create an admin client, it can be created using any client type,
- * so we choose producer since it requires no extra configuration
- * and is more light-weight than the consumer.
- *
- * NOTE: rd_kafka_new() takes ownership of the conf object
- * and the application must not reference it again after
- * this call.
- */
- rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
- if (!rk) {
- fprintf(stderr, "%% Failed to create new producer: %s\n",
- errstr);
- exit(1);
- }
-
- /* The Admin API is completely asynchronous, results are emitted
- * on the result queue that is passed to ListConsumerGroupOffsets() */
- queue = rd_kafka_queue_new(rk);
-
- /* Signal handler for clean shutdown */
- signal(SIGINT, stop);
-
- /* Set timeout (optional) */
- options = rd_kafka_AdminOptions_new(
- rk, RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPOFFSETS);
- if (rd_kafka_AdminOptions_set_request_timeout(
- options, 30 * 1000 /* 30s */, errstr, sizeof(errstr))) {
- fprintf(stderr, "%% Failed to set timeout: %s\n", errstr);
- exit(1);
- }
- /* Set requested require stable offsets */
- if ((error = rd_kafka_AdminOptions_set_require_stable_offsets(
- options, require_stable_offsets))) {
- fprintf(stderr, "%% Failed to set require stable offsets: %s\n",
- rd_kafka_error_string(error));
- rd_kafka_error_destroy(error);
- exit(1);
- }
-
- /* Read passed partition-offsets */
- rd_kafka_topic_partition_list_t *partitions = NULL;
- if (num_partitions > 0) {
- int i;
- partitions = rd_kafka_topic_partition_list_new(num_partitions);
- for (i = 0; i < num_partitions; i++) {
- topic = argv[min_argc + i * 2];
- partition =
- parse_int("partition", argv[min_argc + i * 2 + 1]);
- rd_kafka_topic_partition_list_add(partitions, topic,
- partition);
- }
- }
-
- /* Create argument */
- list_cgrp_offsets =
- rd_kafka_ListConsumerGroupOffsets_new(group, partitions);
- /* Call ListConsumerGroupOffsets */
- rd_kafka_ListConsumerGroupOffsets(rk, &list_cgrp_offsets, 1, options,
- queue);
-
- /* Clean up input arguments */
- rd_kafka_ListConsumerGroupOffsets_destroy(list_cgrp_offsets);
- rd_kafka_AdminOptions_destroy(options);
-
-
- /* Wait for results */
- event = rd_kafka_queue_poll(queue, -1 /* indefinitely but limited by
- * the request timeout set
- * above (30s) */);
-
- if (!event) {
- /* User hit Ctrl-C,
- * see yield call in stop() signal handler */
- fprintf(stderr, "%% Cancelled by user\n");
-
- } else if (rd_kafka_event_error(event)) {
- /* ListConsumerGroupOffsets request failed */
- fprintf(stderr, "%% ListConsumerGroupOffsets failed: %s\n",
- rd_kafka_event_error_string(event));
- exit(1);
-
- } else {
- /* ListConsumerGroupOffsets request succeeded, but individual
- * partitions may have errors. */
- const rd_kafka_ListConsumerGroupOffsets_result_t *result;
- const rd_kafka_group_result_t **groups;
- size_t n_groups, i;
-
- result = rd_kafka_event_ListConsumerGroupOffsets_result(event);
- groups = rd_kafka_ListConsumerGroupOffsets_result_groups(
- result, &n_groups);
-
- printf("ListConsumerGroupOffsets results:\n");
- for (i = 0; i < n_groups; i++) {
- const rd_kafka_group_result_t *group = groups[i];
- const rd_kafka_topic_partition_list_t *partitions =
- rd_kafka_group_result_partitions(group);
- print_partition_list(stderr, partitions, 1, " ");
- }
- }
-
- if (partitions)
- rd_kafka_topic_partition_list_destroy(partitions);
-
- /* Destroy event object when we're done with it.
- * Note: rd_kafka_event_destroy() allows a NULL event. */
- rd_kafka_event_destroy(event);
-
- /* Destroy queue */
- rd_kafka_queue_destroy(queue);
-
- /* Destroy the producer instance */
- rd_kafka_destroy(rk);
-}
-
-int main(int argc, char **argv) {
- rd_kafka_conf_t *conf; /**< Client configuration object */
- int opt;
- argv0 = argv[0];
-
- /*
- * Create Kafka client configuration place-holder
- */
- conf = rd_kafka_conf_new();
-
-
- /*
- * Parse common options
- */
- while ((opt = getopt(argc, argv, "b:X:d:")) != -1) {
- switch (opt) {
- case 'b':
- conf_set(conf, "bootstrap.servers", optarg);
- break;
-
- case 'X': {
- char *name = optarg, *val;
-
- if (!(val = strchr(name, '=')))
- fatal("-X expects a name=value argument");
-
- *val = '\0';
- val++;
-
- conf_set(conf, name, val);
- break;
- }
-
- case 'd':
- conf_set(conf, "debug", optarg);
- break;
-
- default:
- usage("Unknown option %c", (char)opt);
- }
- }
-
- cmd_list_consumer_group_offsets(conf, argc - optind, &argv[optind]);
-
- return 0;
-}