summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/examples/transactions.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/examples/transactions.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/examples/transactions.c665
1 files changed, 0 insertions, 665 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/examples/transactions.c b/fluent-bit/lib/librdkafka-2.1.0/examples/transactions.c
deleted file mode 100644
index 0a8b9a8c..00000000
--- a/fluent-bit/lib/librdkafka-2.1.0/examples/transactions.c
+++ /dev/null
@@ -1,665 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2020, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-/**
- * @name Transactions example for Apache Kafka 2.5.0 (KIP-447) and later.
- *
- * This example show-cases a simple transactional consume-process-produce
- * application that reads messages from an input topic, extracts all
- * numbers from the message's value string, adds them up, and sends
- * the sum to the output topic as part of a transaction.
- * The transaction is committed every 5 seconds or 100 messages, whichever
- * comes first. As the transaction is committed a new transaction is started.
- *
- * This example makes use of incremental rebalancing (KIP-429) and the
- * cooperative-sticky partition.assignment.strategy on the consumer, providing
- * hitless rebalances.
- */
-
-#include <stdio.h>
-#include <signal.h>
-#include <unistd.h>
-#include <string.h>
-#include <stdlib.h>
-#include <time.h>
-#include <ctype.h>
-
-
-/* Typical include path would be <librdkafka/rdkafka.h>, but this program
- * is builtin from within the librdkafka source tree and thus differs. */
-#include "rdkafka.h"
-
-
-static volatile sig_atomic_t run = 1;
-
-/**
- * @brief A fatal error has occurred, immediately exit the application.
- */
-#define fatal(...) \
- do { \
- fprintf(stderr, "FATAL ERROR: "); \
- fprintf(stderr, __VA_ARGS__); \
- fprintf(stderr, "\n"); \
- exit(1); \
- } while (0)
-
-/**
- * @brief Same as fatal() but takes an rd_kafka_error_t object, prints its
- * error message, destroys the object and then exits fatally.
- */
-#define fatal_error(what, error) \
- do { \
- fprintf(stderr, "FATAL ERROR: %s: %s: %s\n", what, \
- rd_kafka_error_name(error), \
- rd_kafka_error_string(error)); \
- rd_kafka_error_destroy(error); \
- exit(1); \
- } while (0)
-
-/**
- * @brief Signal termination of program
- */
-static void stop(int sig) {
- run = 0;
-}
-
-
-/**
- * @brief Message delivery report callback.
- *
- * This callback is called exactly once per message, indicating if
- * the message was succesfully delivered
- * (rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR) or permanently
- * failed delivery (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR).
- *
- * The callback is triggered from rd_kafka_poll(), rd_kafka_flush(),
- * rd_kafka_abort_transaction() and rd_kafka_commit_transaction() and
- * executes on the application's thread.
- *
- * The current transactional will enter the abortable state if any
- * message permanently fails delivery and the application must then
- * call rd_kafka_abort_transaction(). But it does not need to be done from
- * here, this state is checked by all the transactional APIs and it is better
- * to perform this error checking when calling
- * rd_kafka_send_offsets_to_transaction() and rd_kafka_commit_transaction().
- * In the case of transactional producing the delivery report callback is
- * mostly useful for logging the produce failures.
- */
-static void
-dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {
- if (rkmessage->err)
- fprintf(stderr, "%% Message delivery failed: %s\n",
- rd_kafka_err2str(rkmessage->err));
-
- /* The rkmessage is destroyed automatically by librdkafka */
-}
-
-
-
-/**
- * @brief Create a transactional producer.
- */
-static rd_kafka_t *create_transactional_producer(const char *brokers,
- const char *output_topic) {
- rd_kafka_conf_t *conf = rd_kafka_conf_new();
- rd_kafka_t *rk;
- char errstr[256];
- rd_kafka_error_t *error;
-
- if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,
- sizeof(errstr)) != RD_KAFKA_CONF_OK ||
- rd_kafka_conf_set(conf, "transactional.id",
- "librdkafka_transactions_example", errstr,
- sizeof(errstr)) != RD_KAFKA_CONF_OK)
- fatal("Failed to configure producer: %s", errstr);
-
- /* This callback will be called once per message to indicate
- * final delivery status. */
- rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
-
- /* Create producer */
- rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
- if (!rk) {
- rd_kafka_conf_destroy(conf);
- fatal("Failed to create producer: %s", errstr);
- }
-
- /* Initialize transactions, this is only performed once
- * per transactional producer to acquire its producer id, et.al. */
- error = rd_kafka_init_transactions(rk, -1);
- if (error)
- fatal_error("init_transactions()", error);
-
- return rk;
-}
-
-
-/**
- * @brief Rewind consumer's consume position to the last committed offsets
- * for the current assignment.
- */
-static void rewind_consumer(rd_kafka_t *consumer) {
- rd_kafka_topic_partition_list_t *offsets;
- rd_kafka_resp_err_t err;
- rd_kafka_error_t *error;
- int i;
-
- /* Get committed offsets for the current assignment, if there
- * is a current assignment. */
- err = rd_kafka_assignment(consumer, &offsets);
- if (err) {
- fprintf(stderr, "No current assignment to rewind: %s\n",
- rd_kafka_err2str(err));
- return;
- }
-
- if (offsets->cnt == 0) {
- fprintf(stderr, "No current assignment to rewind\n");
- rd_kafka_topic_partition_list_destroy(offsets);
- return;
- }
-
- /* Note: Timeout must be lower than max.poll.interval.ms */
- err = rd_kafka_committed(consumer, offsets, 10 * 1000);
- if (err)
- fatal("Failed to acquire committed offsets: %s",
- rd_kafka_err2str(err));
-
- /* Seek to committed offset, or start of partition if no
- * committed offset is available. */
- for (i = 0; i < offsets->cnt; i++) {
- /* No committed offset, start from beginning */
- if (offsets->elems[i].offset < 0)
- offsets->elems[i].offset = RD_KAFKA_OFFSET_BEGINNING;
- }
-
- /* Perform seek */
- error = rd_kafka_seek_partitions(consumer, offsets, -1);
- if (error)
- fatal_error("Failed to seek", error);
-
- rd_kafka_topic_partition_list_destroy(offsets);
-}
-
-/**
- * @brief Abort the current transaction and rewind consumer offsets to
- * position where the transaction last started, i.e., the committed
- * consumer offset, then begin a new transaction.
- */
-static void abort_transaction_and_rewind(rd_kafka_t *consumer,
- rd_kafka_t *producer) {
- rd_kafka_error_t *error;
-
- fprintf(stdout, "Aborting transaction and rewinding offsets\n");
-
- /* Abort the current transaction */
- error = rd_kafka_abort_transaction(producer, -1);
- if (error)
- fatal_error("Failed to abort transaction", error);
-
- /* Rewind consumer */
- rewind_consumer(consumer);
-
- /* Begin a new transaction */
- error = rd_kafka_begin_transaction(producer);
- if (error)
- fatal_error("Failed to begin transaction", error);
-}
-
-
-/**
- * @brief Commit the current transaction.
- *
- * @returns 1 if transaction was successfully committed, or 0
- * if the current transaction was aborted.
- */
-static int commit_transaction(rd_kafka_t *consumer, rd_kafka_t *producer) {
- rd_kafka_error_t *error;
- rd_kafka_resp_err_t err;
- rd_kafka_consumer_group_metadata_t *cgmd;
- rd_kafka_topic_partition_list_t *offsets;
-
- fprintf(stdout, "Committing transaction\n");
-
- /* Send the input consumer's offset to transaction
- * to commit those offsets along with the transaction itself,
- * this is what guarantees exactly-once-semantics (EOS), that
- * input (offsets) and output (messages) are committed atomically. */
-
- /* Get the consumer's current group metadata state */
- cgmd = rd_kafka_consumer_group_metadata(consumer);
-
- /* Get consumer's current assignment */
- err = rd_kafka_assignment(consumer, &offsets);
- if (err || offsets->cnt == 0) {
- /* No partition offsets to commit because consumer
- * (most likely) lost the assignment, abort transaction. */
- if (err)
- fprintf(stderr,
- "Failed to get consumer assignment to commit: "
- "%s\n",
- rd_kafka_err2str(err));
- else
- rd_kafka_topic_partition_list_destroy(offsets);
-
- error = rd_kafka_abort_transaction(producer, -1);
- if (error)
- fatal_error("Failed to abort transaction", error);
-
- return 0;
- }
-
- /* Get consumer's current position for this partition */
- err = rd_kafka_position(consumer, offsets);
- if (err)
- fatal("Failed to get consumer position: %s",
- rd_kafka_err2str(err));
-
- /* Send offsets to transaction coordinator */
- error =
- rd_kafka_send_offsets_to_transaction(producer, offsets, cgmd, -1);
- rd_kafka_consumer_group_metadata_destroy(cgmd);
- rd_kafka_topic_partition_list_destroy(offsets);
- if (error) {
- if (rd_kafka_error_txn_requires_abort(error)) {
- fprintf(stderr,
- "WARNING: Failed to send offsets to "
- "transaction: %s: %s: aborting transaction\n",
- rd_kafka_error_name(error),
- rd_kafka_error_string(error));
- rd_kafka_error_destroy(error);
-
- /* Abort transaction */
- error = rd_kafka_abort_transaction(producer, -1);
- if (error)
- fatal_error("Failed to abort transaction",
- error);
- return 0;
- } else {
- fatal_error("Failed to send offsets to transaction",
- error);
- }
- }
-
- /* Commit the transaction */
- error = rd_kafka_commit_transaction(producer, -1);
- if (error) {
- if (rd_kafka_error_txn_requires_abort(error)) {
- fprintf(stderr,
- "WARNING: Failed to commit transaction: "
- "%s: %s: aborting transaction\n",
- rd_kafka_error_name(error),
- rd_kafka_error_string(error));
- rd_kafka_error_destroy(error);
-
- /* Abort transaction */
- error = rd_kafka_abort_transaction(producer, -1);
- if (error)
- fatal_error("Failed to abort transaction",
- error);
- return 0;
- } else {
- fatal_error("Failed to commit transaction", error);
- }
- }
-
- return 1;
-}
-
-/**
- * @brief Commit the current transaction and start a new transaction.
- */
-static void commit_transaction_and_start_new(rd_kafka_t *consumer,
- rd_kafka_t *producer) {
- rd_kafka_error_t *error;
-
- /* Commit transaction.
- * If commit failed the transaction is aborted and we need
- * to rewind the consumer to the last committed offsets. */
- if (!commit_transaction(consumer, producer))
- rewind_consumer(consumer);
-
- /* Begin new transaction */
- error = rd_kafka_begin_transaction(producer);
- if (error)
- fatal_error("Failed to begin new transaction", error);
-}
-
-/**
- * @brief The rebalance will be triggered (from rd_kafka_consumer_poll())
- * when the consumer's partition assignment is assigned or revoked.
- */
-static void
-consumer_group_rebalance_cb(rd_kafka_t *consumer,
- rd_kafka_resp_err_t err,
- rd_kafka_topic_partition_list_t *partitions,
- void *opaque) {
- rd_kafka_t *producer = (rd_kafka_t *)opaque;
- rd_kafka_error_t *error;
-
- switch (err) {
- case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
- fprintf(stdout,
- "Consumer group rebalanced: "
- "%d new partition(s) assigned\n",
- partitions->cnt);
-
- /* Start fetching messages for the assigned partitions
- * and add them to the consumer's local assignment. */
- error = rd_kafka_incremental_assign(consumer, partitions);
- if (error)
- fatal_error("Incremental assign failed", error);
- break;
-
- case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
- if (rd_kafka_assignment_lost(consumer)) {
- fprintf(stdout,
- "Consumer group rebalanced: assignment lost: "
- "aborting current transaction\n");
-
- error = rd_kafka_abort_transaction(producer, -1);
- if (error)
- fatal_error("Failed to abort transaction",
- error);
- } else {
- fprintf(stdout,
- "Consumer group rebalanced: %d partition(s) "
- "revoked: committing current transaction\n",
- partitions->cnt);
-
- commit_transaction(consumer, producer);
- }
-
- /* Begin new transaction */
- error = rd_kafka_begin_transaction(producer);
- if (error)
- fatal_error("Failed to begin transaction", error);
-
- /* Stop fetching messages for the revoekd partitions
- * and remove them from the consumer's local assignment. */
- error = rd_kafka_incremental_unassign(consumer, partitions);
- if (error)
- fatal_error("Incremental unassign failed", error);
- break;
-
- default:
- /* NOTREACHED */
- fatal("Unexpected rebalance event: %s", rd_kafka_err2name(err));
- }
-}
-
-
-/**
- * @brief Create the input consumer.
- */
-static rd_kafka_t *create_input_consumer(const char *brokers,
- const char *input_topic,
- rd_kafka_t *producer) {
- rd_kafka_conf_t *conf = rd_kafka_conf_new();
- rd_kafka_t *rk;
- char errstr[256];
- rd_kafka_resp_err_t err;
- rd_kafka_topic_partition_list_t *topics;
-
- if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,
- sizeof(errstr)) != RD_KAFKA_CONF_OK ||
- rd_kafka_conf_set(conf, "group.id",
- "librdkafka_transactions_example_group", errstr,
- sizeof(errstr)) != RD_KAFKA_CONF_OK ||
- rd_kafka_conf_set(conf, "partition.assignment.strategy",
- "cooperative-sticky", errstr,
- sizeof(errstr)) != RD_KAFKA_CONF_OK ||
- rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", errstr,
- sizeof(errstr)) != RD_KAFKA_CONF_OK ||
- /* The input consumer's offsets are explicitly committed with the
- * output producer's transaction using
- * rd_kafka_send_offsets_to_transaction(), so auto commits
- * must be disabled. */
- rd_kafka_conf_set(conf, "enable.auto.commit", "false", errstr,
- sizeof(errstr)) != RD_KAFKA_CONF_OK) {
- fatal("Failed to configure consumer: %s", errstr);
- }
-
- /* This callback will be called when the consumer group is rebalanced
- * and the consumer's partition assignment is assigned or revoked. */
- rd_kafka_conf_set_rebalance_cb(conf, consumer_group_rebalance_cb);
-
- /* The producer handle is needed in the consumer's rebalance callback
- * to be able to abort and commit transactions, so we pass the
- * producer as the consumer's opaque. */
- rd_kafka_conf_set_opaque(conf, producer);
-
- /* Create consumer */
- rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
- if (!rk) {
- rd_kafka_conf_destroy(conf);
- fatal("Failed to create consumer: %s", errstr);
- }
-
- /* Forward all partition messages to the main queue and
- * rd_kafka_consumer_poll(). */
- rd_kafka_poll_set_consumer(rk);
-
- /* Subscribe to the input topic */
- topics = rd_kafka_topic_partition_list_new(1);
- rd_kafka_topic_partition_list_add(topics, input_topic,
- /* The partition is ignored in
- * rd_kafka_subscribe() */
- RD_KAFKA_PARTITION_UA);
- err = rd_kafka_subscribe(rk, topics);
- rd_kafka_topic_partition_list_destroy(topics);
- if (err) {
- rd_kafka_destroy(rk);
- fatal("Failed to subscribe to %s: %s\n", input_topic,
- rd_kafka_err2str(err));
- }
-
- return rk;
-}
-
-
-/**
- * @brief Find and parse next integer string in \p start.
- * @returns Pointer after found integer string, or NULL if not found.
- */
-static const void *
-find_next_int(const void *start, const void *end, int *intp) {
- const char *p;
- int collecting = 0;
- int num = 0;
-
- for (p = (const char *)start; p < (const char *)end; p++) {
- if (isdigit((int)(*p))) {
- collecting = 1;
- num = (num * 10) + ((int)*p - ((int)'0'));
- } else if (collecting)
- break;
- }
-
- if (!collecting)
- return NULL; /* No integer string found */
-
- *intp = num;
-
- return p;
-}
-
-
-/**
- * @brief Process a message from the input consumer by parsing all
- * integer strings, adding them, and then producing the sum
- * the output topic using the transactional producer for the given
- * inut partition.
- */
-static void process_message(rd_kafka_t *consumer,
- rd_kafka_t *producer,
- const char *output_topic,
- const rd_kafka_message_t *rkmessage) {
- int num;
- long unsigned sum = 0;
- const void *p, *end;
- rd_kafka_resp_err_t err;
- char value[64];
-
- if (rkmessage->len == 0)
- return; /* Ignore empty messages */
-
- p = rkmessage->payload;
- end = ((const char *)rkmessage->payload) + rkmessage->len;
-
- /* Find and sum all numbers in the message */
- while ((p = find_next_int(p, end, &num)))
- sum += num;
-
- if (sum == 0)
- return; /* No integers in message, ignore it. */
-
- snprintf(value, sizeof(value), "%lu", sum);
-
- /* Emit output message on transactional producer */
- while (1) {
- err = rd_kafka_producev(
- producer, RD_KAFKA_V_TOPIC(output_topic),
- /* Use same key as input message */
- RD_KAFKA_V_KEY(rkmessage->key, rkmessage->key_len),
- /* Value is the current sum of this
- * transaction. */
- RD_KAFKA_V_VALUE(value, strlen(value)),
- /* Copy value since it is allocated on the stack */
- RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_END);
-
- if (!err)
- break;
- else if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
- /* If output queue fills up we need to wait for
- * some delivery reports and then retry. */
- rd_kafka_poll(producer, 100);
- continue;
- } else {
- fprintf(stderr,
- "WARNING: Failed to produce message to %s: "
- "%s: aborting transaction\n",
- output_topic, rd_kafka_err2str(err));
- abort_transaction_and_rewind(consumer, producer);
- return;
- }
- }
-}
-
-
-int main(int argc, char **argv) {
- rd_kafka_t *producer, *consumer;
- int msgcnt = 0;
- time_t last_commit = 0;
- const char *brokers, *input_topic, *output_topic;
- rd_kafka_error_t *error;
-
- /*
- * Argument validation
- */
- if (argc != 4) {
- fprintf(stderr,
- "%% Usage: %s <broker> <input-topic> <output-topic>\n",
- argv[0]);
- return 1;
- }
-
- brokers = argv[1];
- input_topic = argv[2];
- output_topic = argv[3];
-
- /* Signal handler for clean shutdown */
- signal(SIGINT, stop);
-
- producer = create_transactional_producer(brokers, output_topic);
-
- consumer = create_input_consumer(brokers, input_topic, producer);
-
- fprintf(stdout,
- "Expecting integers to sum on input topic %s ...\n"
- "To generate input messages you can use:\n"
- " $ seq 1 100 | examples/producer %s %s\n"
- "Observe summed integers on output topic %s:\n"
- " $ examples/consumer %s just-watching %s\n"
- "\n",
- input_topic, brokers, input_topic, output_topic, brokers,
- output_topic);
-
- /* Begin transaction and start waiting for messages */
- error = rd_kafka_begin_transaction(producer);
- if (error)
- fatal_error("Failed to begin transaction", error);
-
- while (run) {
- rd_kafka_message_t *msg;
-
- /* Commit transaction every 100 messages or 5 seconds */
- if (msgcnt > 0 &&
- (msgcnt > 100 || last_commit + 5 <= time(NULL))) {
- printf("msgcnt %d, elapsed %d\n", msgcnt,
- (int)(time(NULL) - last_commit));
- commit_transaction_and_start_new(consumer, producer);
- msgcnt = 0;
- last_commit = time(NULL);
- }
-
- /* Wait for new mesages or error events */
- msg = rd_kafka_consumer_poll(consumer, 1000 /*1 second*/);
- if (!msg)
- continue; /* Poll timeout */
-
- if (msg->err) {
- /* Client errors are typically just informational
- * since the client will automatically try to recover
- * from all types of errors.
- * It is thus sufficient for the application to log and
- * continue operating when a consumer error is
- * encountered. */
- fprintf(stderr, "WARNING: Consumer error: %s\n",
- rd_kafka_message_errstr(msg));
- rd_kafka_message_destroy(msg);
- continue;
- }
-
- /* Process message */
- process_message(consumer, producer, output_topic, msg);
-
- rd_kafka_message_destroy(msg);
-
- msgcnt++;
- }
-
- fprintf(stdout, "Closing consumer\n");
- rd_kafka_consumer_close(consumer);
- rd_kafka_destroy(consumer);
-
- fprintf(stdout, "Closing producer\n");
- rd_kafka_destroy(producer);
-
- return 0;
-}