summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/examples/delete_records.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/examples/delete_records.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/examples/delete_records.c233
1 files changed, 0 insertions, 233 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/examples/delete_records.c b/fluent-bit/lib/librdkafka-2.1.0/examples/delete_records.c
deleted file mode 100644
index 2660996a5..000000000
--- a/fluent-bit/lib/librdkafka-2.1.0/examples/delete_records.c
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2020, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-/**
- * Example utility that shows how to use DeleteRecords (AdminAPI)
- * do delete all messages/records up to (but not including) a specific offset
- * from one or more topic partitions.
- */
-
-#include <stdio.h>
-#include <signal.h>
-#include <string.h>
-#include <stdlib.h>
-
-
-/* Typical include path would be <librdkafka/rdkafka.h>, but this program
- * is builtin from within the librdkafka source tree and thus differs. */
-#include "rdkafka.h"
-
-
-static rd_kafka_queue_t *queue; /** Admin result queue.
- * This is a global so we can
- * yield in stop() */
-static volatile sig_atomic_t run = 1;
-
-/**
- * @brief Signal termination of program
- */
-static void stop(int sig) {
- if (!run) {
- fprintf(stderr, "%% Forced termination\n");
- exit(2);
- }
- run = 0;
- rd_kafka_queue_yield(queue);
-}
-
-
-/**
- * @brief Parse an integer or fail.
- */
-int64_t parse_int(const char *what, const char *str) {
- char *end;
- unsigned long n = strtoull(str, &end, 0);
-
- if (end != str + strlen(str)) {
- fprintf(stderr, "%% Invalid input for %s: %s: not an integer\n",
- what, str);
- exit(1);
- }
-
- return (int64_t)n;
-}
-
-
-int main(int argc, char **argv) {
- rd_kafka_conf_t *conf; /* Temporary configuration object */
- char errstr[512]; /* librdkafka API error reporting buffer */
- const char *brokers; /* Argument: broker list */
- rd_kafka_t *rk; /* Admin client instance */
- rd_kafka_topic_partition_list_t *offsets_before; /* Delete messages up
- * to but not
- * including these
- * offsets */
- rd_kafka_DeleteRecords_t *del_records; /* Container for offsets_before*/
- rd_kafka_AdminOptions_t *options; /* (Optional) Options for
- * DeleteRecords() */
- rd_kafka_event_t *event; /* DeleteRecords result event */
- int exitcode = 0;
- int i;
-
- /*
- * Argument validation
- */
- if (argc < 5 || (argc - 2) % 3 != 0) {
- fprintf(stderr,
- "%% Usage: %s <broker> "
- "<topic> <partition> <offset_before> "
- "<topic2> <partition2> <offset_before2> ...\n"
- "\n"
- "Delete all messages up to but not including the "
- "specified offset(s).\n"
- "\n",
- argv[0]);
- return 1;
- }
-
- brokers = argv[1];
-
- /* Parse topic partition offset tuples and add to offsets list */
- offsets_before = rd_kafka_topic_partition_list_new((argc - 2) / 3);
- for (i = 2; i < argc; i += 3) {
- const char *topic = argv[i];
- int partition = parse_int("partition", argv[i + 1]);
- int64_t offset = parse_int("offset_before", argv[i + 2]);
-
- rd_kafka_topic_partition_list_add(offsets_before, topic,
- partition)
- ->offset = offset;
- }
-
- /*
- * Create Kafka client configuration place-holder
- */
- conf = rd_kafka_conf_new();
-
- /* Set bootstrap broker(s) as a comma-separated list of
- * host or host:port (default port 9092).
- * librdkafka will use the bootstrap brokers to acquire the full
- * set of brokers from the cluster. */
- if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,
- sizeof(errstr)) != RD_KAFKA_CONF_OK) {
- fprintf(stderr, "%s\n", errstr);
- return 1;
- }
- rd_kafka_conf_set(conf, "debug", "admin,topic,metadata", NULL, 0);
-
- /*
- * Create an admin client, it can be created using any client type,
- * so we choose producer since it requires no extra configuration
- * and is more light-weight than the consumer.
- *
- * NOTE: rd_kafka_new() takes ownership of the conf object
- * and the application must not reference it again after
- * this call.
- */
- rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
- if (!rk) {
- fprintf(stderr, "%% Failed to create new producer: %s\n",
- errstr);
- return 1;
- }
-
- /* The Admin API is completely asynchronous, results are emitted
- * on the result queue that is passed to DeleteRecords() */
- queue = rd_kafka_queue_new(rk);
-
- /* Signal handler for clean shutdown */
- signal(SIGINT, stop);
-
- /* Set timeout (optional) */
- options =
- rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DELETERECORDS);
- if (rd_kafka_AdminOptions_set_request_timeout(
- options, 30 * 1000 /* 30s */, errstr, sizeof(errstr))) {
- fprintf(stderr, "%% Failed to set timeout: %s\n", errstr);
- return 1;
- }
-
- /* Create argument */
- del_records = rd_kafka_DeleteRecords_new(offsets_before);
- /* We're now done with offsets_before */
- rd_kafka_topic_partition_list_destroy(offsets_before);
-
- /* Call DeleteRecords */
- rd_kafka_DeleteRecords(rk, &del_records, 1, options, queue);
-
- /* Clean up input arguments */
- rd_kafka_DeleteRecords_destroy(del_records);
- rd_kafka_AdminOptions_destroy(options);
-
-
- /* Wait for results */
- event = rd_kafka_queue_poll(queue, -1 /*indefinitely*/);
-
- if (!event) {
- /* User hit Ctrl-C */
- fprintf(stderr, "%% Cancelled by user\n");
-
- } else if (rd_kafka_event_error(event)) {
- /* DeleteRecords request failed */
- fprintf(stderr, "%% DeleteRecords failed: %s\n",
- rd_kafka_event_error_string(event));
- exitcode = 2;
-
- } else {
- /* DeleteRecords request succeeded, but individual
- * partitions may have errors. */
- const rd_kafka_DeleteRecords_result_t *result;
- const rd_kafka_topic_partition_list_t *offsets;
- int i;
-
- result = rd_kafka_event_DeleteRecords_result(event);
- offsets = rd_kafka_DeleteRecords_result_offsets(result);
-
- printf("DeleteRecords results:\n");
- for (i = 0; i < offsets->cnt; i++)
- printf(" %s [%" PRId32 "] offset %" PRId64 ": %s\n",
- offsets->elems[i].topic,
- offsets->elems[i].partition,
- offsets->elems[i].offset,
- rd_kafka_err2str(offsets->elems[i].err));
- }
-
- /* Destroy event object when we're done with it.
- * Note: rd_kafka_event_destroy() allows a NULL event. */
- rd_kafka_event_destroy(event);
-
- signal(SIGINT, SIG_DFL);
-
- /* Destroy queue */
- rd_kafka_queue_destroy(queue);
-
- /* Destroy the producer instance */
- rd_kafka_destroy(rk);
-
- return exitcode;
-}