diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/examples/describe_consumer_groups.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/examples/describe_consumer_groups.c | 373 |
1 files changed, 373 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/examples/describe_consumer_groups.c b/fluent-bit/lib/librdkafka-2.1.0/examples/describe_consumer_groups.c new file mode 100644 index 000000000..45b6b8d0b --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/examples/describe_consumer_groups.c @@ -0,0 +1,373 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2022, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/** + * DescribeConsumerGroups usage example. + */ + +#include <stdio.h> +#include <signal.h> +#include <string.h> +#include <stdlib.h> +#include <stdarg.h> + +#ifdef _WIN32 +#include "../win32/wingetopt.h" +#else +#include <getopt.h> +#endif + + +/* Typical include path would be <librdkafka/rdkafka.h>, but this program + * is builtin from within the librdkafka source tree and thus differs. */ +#include "rdkafka.h" + + +const char *argv0; + +static rd_kafka_queue_t *queue; /** Admin result queue. + * This is a global so we can + * yield in stop() */ +static volatile sig_atomic_t run = 1; + +/** + * @brief Signal termination of program + */ +static void stop(int sig) { + if (!run) { + fprintf(stderr, "%% Forced termination\n"); + exit(2); + } + run = 0; + rd_kafka_queue_yield(queue); +} + + +static void usage(const char *reason, ...) { + + fprintf(stderr, + "Describe groups usage examples\n" + "\n" + "Usage: %s <options> <group1> <group2> ...\n" + "\n" + "Options:\n" + " -b <brokers> Bootstrap server list to connect to.\n" + " -X <prop=val> Set librdkafka configuration property.\n" + " See CONFIGURATION.md for full list.\n" + " -d <dbg,..> Enable librdkafka debugging (%s).\n" + "\n", + argv0, rd_kafka_get_debug_contexts()); + + if (reason) { + va_list ap; + char reasonbuf[512]; + + va_start(ap, reason); + vsnprintf(reasonbuf, sizeof(reasonbuf), reason, ap); + va_end(ap); + + fprintf(stderr, "ERROR: %s\n", reasonbuf); + } + + exit(reason ? 1 : 0); +} + + +#define fatal(...) \ + do { \ + fprintf(stderr, "ERROR: "); \ + fprintf(stderr, __VA_ARGS__); \ + fprintf(stderr, "\n"); \ + exit(2); \ + } while (0) + + +/** + * @brief Set config property. Exit on failure. + */ +static void conf_set(rd_kafka_conf_t *conf, const char *name, const char *val) { + char errstr[512]; + + if (rd_kafka_conf_set(conf, name, val, errstr, sizeof(errstr)) != + RD_KAFKA_CONF_OK) + fatal("Failed to set %s=%s: %s", name, val, errstr); +} + + +static void +print_partition_list(FILE *fp, + const rd_kafka_topic_partition_list_t *partitions, + int print_offset, + const char *prefix) { + int i; + + if (partitions->cnt == 0) { + fprintf(fp, "%sNo partition found", prefix); + } + for (i = 0; i < partitions->cnt; i++) { + char offset_string[512] = {}; + *offset_string = '\0'; + if (print_offset) { + snprintf(offset_string, sizeof(offset_string), + " offset %" PRId64, + partitions->elems[i].offset); + } + fprintf(fp, "%s%s %s [%" PRId32 "]%s error %s", + i > 0 ? "\n" : "", prefix, partitions->elems[i].topic, + partitions->elems[i].partition, offset_string, + rd_kafka_err2str(partitions->elems[i].err)); + } + fprintf(fp, "\n"); +} + +/** + * @brief Print group information. + */ +static int +print_groups_info(const rd_kafka_DescribeConsumerGroups_result_t *grpdesc, + int groups_cnt) { + size_t i; + const rd_kafka_ConsumerGroupDescription_t **result_groups; + size_t result_groups_cnt; + result_groups = rd_kafka_DescribeConsumerGroups_result_groups( + grpdesc, &result_groups_cnt); + + if (result_groups_cnt == 0) { + if (groups_cnt > 0) { + fprintf(stderr, "No matching groups found\n"); + return 1; + } else { + fprintf(stderr, "No groups in cluster\n"); + } + } + + for (i = 0; i < result_groups_cnt; i++) { + int j, member_cnt; + const rd_kafka_error_t *error; + const rd_kafka_ConsumerGroupDescription_t *group = + result_groups[i]; + char coordinator_desc[512]; + const rd_kafka_Node_t *coordinator = NULL; + const char *group_id = + rd_kafka_ConsumerGroupDescription_group_id(group); + const char *partition_assignor = + rd_kafka_ConsumerGroupDescription_partition_assignor(group); + rd_kafka_consumer_group_state_t state = + rd_kafka_ConsumerGroupDescription_state(group); + member_cnt = + rd_kafka_ConsumerGroupDescription_member_count(group); + error = rd_kafka_ConsumerGroupDescription_error(group); + coordinator = + rd_kafka_ConsumerGroupDescription_coordinator(group); + *coordinator_desc = '\0'; + + if (coordinator != NULL) { + snprintf(coordinator_desc, sizeof(coordinator_desc), + ", coordinator [id: %" PRId32 + ", host: %s" + ", port: %" PRIu16 "]", + rd_kafka_Node_id(coordinator), + rd_kafka_Node_host(coordinator), + rd_kafka_Node_port(coordinator)); + } + printf( + "Group \"%s\", partition assignor \"%s\", " + "state %s%s, with %" PRId32 " member(s)", + group_id, partition_assignor, + rd_kafka_consumer_group_state_name(state), coordinator_desc, + member_cnt); + if (error) + printf(" error[%" PRId32 "]: %s", + rd_kafka_error_code(error), + rd_kafka_error_string(error)); + printf("\n"); + for (j = 0; j < member_cnt; j++) { + const rd_kafka_MemberDescription_t *member = + rd_kafka_ConsumerGroupDescription_member(group, j); + printf( + " Member \"%s\" with client-id %s," + " group instance id: %s, host %s\n", + rd_kafka_MemberDescription_consumer_id(member), + rd_kafka_MemberDescription_client_id(member), + rd_kafka_MemberDescription_group_instance_id( + member), + rd_kafka_MemberDescription_host(member)); + const rd_kafka_MemberAssignment_t *assignment = + rd_kafka_MemberDescription_assignment(member); + const rd_kafka_topic_partition_list_t + *topic_partitions = + rd_kafka_MemberAssignment_partitions( + assignment); + if (!topic_partitions) { + printf(" No assignment\n"); + } else if (topic_partitions->cnt == 0) { + printf(" Empty assignment\n"); + } else { + printf(" Assignment:\n"); + print_partition_list(stdout, topic_partitions, + 0, " "); + } + } + } + return 0; +} + +/** + * @brief Call rd_kafka_DescribeConsumerGroups() with a list of + * groups. + */ +static void +cmd_describe_consumer_groups(rd_kafka_conf_t *conf, int argc, char **argv) { + rd_kafka_t *rk; + const char **groups = NULL; + char errstr[512]; + rd_kafka_AdminOptions_t *options; + rd_kafka_event_t *event = NULL; + int retval = 0; + int groups_cnt = 0; + + if (argc >= 1) { + groups = (const char **)&argv[0]; + groups_cnt = argc; + } + + /* + * Create consumer instance + * NOTE: rd_kafka_new() takes ownership of the conf object + * and the application must not reference it again after + * this call. + */ + rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); + if (!rk) + fatal("Failed to create new consumer: %s", errstr); + + /* + * Describe consumer groups + */ + queue = rd_kafka_queue_new(rk); + + /* Signal handler for clean shutdown */ + signal(SIGINT, stop); + + options = rd_kafka_AdminOptions_new( + rk, RD_KAFKA_ADMIN_OP_DESCRIBECONSUMERGROUPS); + + if (rd_kafka_AdminOptions_set_request_timeout( + options, 10 * 1000 /* 10s */, errstr, sizeof(errstr))) { + fprintf(stderr, "%% Failed to set timeout: %s\n", errstr); + goto exit; + } + + rd_kafka_DescribeConsumerGroups(rk, groups, groups_cnt, options, queue); + + /* Wait for results */ + event = rd_kafka_queue_poll(queue, -1 /* indefinitely but limited by + * the request timeout set + * above (10s) */); + + if (!event) { + /* User hit Ctrl-C, + * see yield call in stop() signal handler */ + fprintf(stderr, "%% Cancelled by user\n"); + + } else if (rd_kafka_event_error(event)) { + rd_kafka_resp_err_t err = rd_kafka_event_error(event); + /* DescribeConsumerGroups request failed */ + fprintf(stderr, + "%% DescribeConsumerGroups failed[%" PRId32 "]: %s\n", + err, rd_kafka_event_error_string(event)); + goto exit; + + } else { + /* DescribeConsumerGroups request succeeded, but individual + * groups may have errors. */ + const rd_kafka_DescribeConsumerGroups_result_t *result; + + result = rd_kafka_event_DescribeConsumerGroups_result(event); + printf("DescribeConsumerGroups results:\n"); + retval = print_groups_info(result, groups_cnt); + } + + +exit: + if (event) + rd_kafka_event_destroy(event); + rd_kafka_AdminOptions_destroy(options); + rd_kafka_queue_destroy(queue); + /* Destroy the client instance */ + rd_kafka_destroy(rk); + + exit(retval); +} + +int main(int argc, char **argv) { + rd_kafka_conf_t *conf; /**< Client configuration object */ + int opt; + argv0 = argv[0]; + + /* + * Create Kafka client configuration place-holder + */ + conf = rd_kafka_conf_new(); + + + /* + * Parse common options + */ + while ((opt = getopt(argc, argv, "b:X:d:")) != -1) { + switch (opt) { + case 'b': + conf_set(conf, "bootstrap.servers", optarg); + break; + + case 'X': { + char *name = optarg, *val; + + if (!(val = strchr(name, '='))) + fatal("-X expects a name=value argument"); + + *val = '\0'; + val++; + + conf_set(conf, name, val); + break; + } + + case 'd': + conf_set(conf, "debug", optarg); + break; + + default: + usage("Unknown option %c", (char)opt); + } + } + + cmd_describe_consumer_groups(conf, argc - optind, &argv[optind]); + + return 0; +} |