diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/examples/list_consumer_group_offsets.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/examples/list_consumer_group_offsets.c | 359 |
1 files changed, 0 insertions, 359 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/examples/list_consumer_group_offsets.c b/fluent-bit/lib/librdkafka-2.1.0/examples/list_consumer_group_offsets.c deleted file mode 100644 index 03e878ee1..000000000 --- a/fluent-bit/lib/librdkafka-2.1.0/examples/list_consumer_group_offsets.c +++ /dev/null @@ -1,359 +0,0 @@ -/* - * librdkafka - Apache Kafka C library - * - * Copyright (c) 2022, Magnus Edenhill - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -/** - * ListConsumerGroupOffsets usage example. - */ - -#include <stdio.h> -#include <signal.h> -#include <string.h> -#include <stdlib.h> -#include <stdarg.h> - -#ifdef _WIN32 -#include "../win32/wingetopt.h" -#else -#include <getopt.h> -#endif - - -/* Typical include path would be <librdkafka/rdkafka.h>, but this program - * is builtin from within the librdkafka source tree and thus differs. */ -#include "rdkafka.h" - - -const char *argv0; - -static rd_kafka_queue_t *queue; /** Admin result queue. - * This is a global so we can - * yield in stop() */ -static volatile sig_atomic_t run = 1; - -/** - * @brief Signal termination of program - */ -static void stop(int sig) { - if (!run) { - fprintf(stderr, "%% Forced termination\n"); - exit(2); - } - run = 0; - rd_kafka_queue_yield(queue); -} - - -static void usage(const char *reason, ...) { - - fprintf(stderr, - "List consumer group offsets usage examples\n" - "\n" - "Usage: %s <options> <group_id> " - "<require_stable_offsets>\n" - " <topic1> <partition1>\n" - " <topic2> <partition2>\n" - " ...\n" - "\n" - "Options:\n" - " -b <brokers> Bootstrap server list to connect to.\n" - " -X <prop=val> Set librdkafka configuration property.\n" - " See CONFIGURATION.md for full list.\n" - " -d <dbg,..> Enable librdkafka debugging (%s).\n" - "\n", - argv0, rd_kafka_get_debug_contexts()); - - if (reason) { - va_list ap; - char reasonbuf[512]; - - va_start(ap, reason); - vsnprintf(reasonbuf, sizeof(reasonbuf), reason, ap); - va_end(ap); - - fprintf(stderr, "ERROR: %s\n", reasonbuf); - } - - exit(reason ? 1 : 0); -} - - -#define fatal(...) \ - do { \ - fprintf(stderr, "ERROR: "); \ - fprintf(stderr, __VA_ARGS__); \ - fprintf(stderr, "\n"); \ - exit(2); \ - } while (0) - - -/** - * @brief Set config property. Exit on failure. - */ -static void conf_set(rd_kafka_conf_t *conf, const char *name, const char *val) { - char errstr[512]; - - if (rd_kafka_conf_set(conf, name, val, errstr, sizeof(errstr)) != - RD_KAFKA_CONF_OK) - fatal("Failed to set %s=%s: %s", name, val, errstr); -} - - -static void -print_partition_list(FILE *fp, - const rd_kafka_topic_partition_list_t *partitions, - int print_offset, - const char *prefix) { - int i; - - if (partitions->cnt == 0) { - fprintf(fp, "%sNo partition found", prefix); - } - for (i = 0; i < partitions->cnt; i++) { - char offset_string[512] = {}; - *offset_string = '\0'; - if (print_offset) { - snprintf(offset_string, sizeof(offset_string), - " offset %" PRId64, - partitions->elems[i].offset); - } - fprintf(fp, "%s%s %s [%" PRId32 "]%s error %s", - i > 0 ? "\n" : "", prefix, partitions->elems[i].topic, - partitions->elems[i].partition, offset_string, - rd_kafka_err2str(partitions->elems[i].err)); - } - fprintf(fp, "\n"); -} - -/** - * @brief Parse an integer or fail. - */ -int64_t parse_int(const char *what, const char *str) { - char *end; - unsigned long n = strtoull(str, &end, 0); - - if (end != str + strlen(str)) { - fprintf(stderr, "%% Invalid input for %s: %s: not an integer\n", - what, str); - exit(1); - } - - return (int64_t)n; -} - -static void -cmd_list_consumer_group_offsets(rd_kafka_conf_t *conf, int argc, char **argv) { - char errstr[512]; /* librdkafka API error reporting buffer */ - rd_kafka_t *rk; /* Admin client instance */ - rd_kafka_AdminOptions_t *options; /* (Optional) Options for - * ListConsumerGroupOffsets() */ - rd_kafka_event_t *event; /* ListConsumerGroupOffsets result event */ - const int min_argc = 2; - char *topic; - int partition; - int require_stable_offsets = 0, num_partitions = 0; - rd_kafka_ListConsumerGroupOffsets_t *list_cgrp_offsets; - rd_kafka_error_t *error; - const char *group; - - /* - * Argument validation - */ - if (argc < min_argc || (argc - min_argc) % 2 != 0) - usage("Wrong number of arguments"); - else { - require_stable_offsets = - parse_int("require_stable_offsets", argv[1]); - if (require_stable_offsets < 0 || require_stable_offsets > 1) - usage("Require stable not a 0-1 int"); - } - - num_partitions = (argc - min_argc) / 2; - group = argv[0]; - - /* - * Create an admin client, it can be created using any client type, - * so we choose producer since it requires no extra configuration - * and is more light-weight than the consumer. - * - * NOTE: rd_kafka_new() takes ownership of the conf object - * and the application must not reference it again after - * this call. - */ - rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); - if (!rk) { - fprintf(stderr, "%% Failed to create new producer: %s\n", - errstr); - exit(1); - } - - /* The Admin API is completely asynchronous, results are emitted - * on the result queue that is passed to ListConsumerGroupOffsets() */ - queue = rd_kafka_queue_new(rk); - - /* Signal handler for clean shutdown */ - signal(SIGINT, stop); - - /* Set timeout (optional) */ - options = rd_kafka_AdminOptions_new( - rk, RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPOFFSETS); - if (rd_kafka_AdminOptions_set_request_timeout( - options, 30 * 1000 /* 30s */, errstr, sizeof(errstr))) { - fprintf(stderr, "%% Failed to set timeout: %s\n", errstr); - exit(1); - } - /* Set requested require stable offsets */ - if ((error = rd_kafka_AdminOptions_set_require_stable_offsets( - options, require_stable_offsets))) { - fprintf(stderr, "%% Failed to set require stable offsets: %s\n", - rd_kafka_error_string(error)); - rd_kafka_error_destroy(error); - exit(1); - } - - /* Read passed partition-offsets */ - rd_kafka_topic_partition_list_t *partitions = NULL; - if (num_partitions > 0) { - int i; - partitions = rd_kafka_topic_partition_list_new(num_partitions); - for (i = 0; i < num_partitions; i++) { - topic = argv[min_argc + i * 2]; - partition = - parse_int("partition", argv[min_argc + i * 2 + 1]); - rd_kafka_topic_partition_list_add(partitions, topic, - partition); - } - } - - /* Create argument */ - list_cgrp_offsets = - rd_kafka_ListConsumerGroupOffsets_new(group, partitions); - /* Call ListConsumerGroupOffsets */ - rd_kafka_ListConsumerGroupOffsets(rk, &list_cgrp_offsets, 1, options, - queue); - - /* Clean up input arguments */ - rd_kafka_ListConsumerGroupOffsets_destroy(list_cgrp_offsets); - rd_kafka_AdminOptions_destroy(options); - - - /* Wait for results */ - event = rd_kafka_queue_poll(queue, -1 /* indefinitely but limited by - * the request timeout set - * above (30s) */); - - if (!event) { - /* User hit Ctrl-C, - * see yield call in stop() signal handler */ - fprintf(stderr, "%% Cancelled by user\n"); - - } else if (rd_kafka_event_error(event)) { - /* ListConsumerGroupOffsets request failed */ - fprintf(stderr, "%% ListConsumerGroupOffsets failed: %s\n", - rd_kafka_event_error_string(event)); - exit(1); - - } else { - /* ListConsumerGroupOffsets request succeeded, but individual - * partitions may have errors. */ - const rd_kafka_ListConsumerGroupOffsets_result_t *result; - const rd_kafka_group_result_t **groups; - size_t n_groups, i; - - result = rd_kafka_event_ListConsumerGroupOffsets_result(event); - groups = rd_kafka_ListConsumerGroupOffsets_result_groups( - result, &n_groups); - - printf("ListConsumerGroupOffsets results:\n"); - for (i = 0; i < n_groups; i++) { - const rd_kafka_group_result_t *group = groups[i]; - const rd_kafka_topic_partition_list_t *partitions = - rd_kafka_group_result_partitions(group); - print_partition_list(stderr, partitions, 1, " "); - } - } - - if (partitions) - rd_kafka_topic_partition_list_destroy(partitions); - - /* Destroy event object when we're done with it. - * Note: rd_kafka_event_destroy() allows a NULL event. */ - rd_kafka_event_destroy(event); - - /* Destroy queue */ - rd_kafka_queue_destroy(queue); - - /* Destroy the producer instance */ - rd_kafka_destroy(rk); -} - -int main(int argc, char **argv) { - rd_kafka_conf_t *conf; /**< Client configuration object */ - int opt; - argv0 = argv[0]; - - /* - * Create Kafka client configuration place-holder - */ - conf = rd_kafka_conf_new(); - - - /* - * Parse common options - */ - while ((opt = getopt(argc, argv, "b:X:d:")) != -1) { - switch (opt) { - case 'b': - conf_set(conf, "bootstrap.servers", optarg); - break; - - case 'X': { - char *name = optarg, *val; - - if (!(val = strchr(name, '='))) - fatal("-X expects a name=value argument"); - - *val = '\0'; - val++; - - conf_set(conf, name, val); - break; - } - - case 'd': - conf_set(conf, "debug", optarg); - break; - - default: - usage("Unknown option %c", (char)opt); - } - } - - cmd_list_consumer_group_offsets(conf, argc - optind, &argv[optind]); - - return 0; -} |