diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/INTRODUCTION.md')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/INTRODUCTION.md | 2069 |
1 files changed, 2069 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/INTRODUCTION.md b/fluent-bit/lib/librdkafka-2.1.0/INTRODUCTION.md new file mode 100644 index 000000000..66f796bca --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/INTRODUCTION.md @@ -0,0 +1,2069 @@ +# Introduction to librdkafka - the Apache Kafka C/C++ client library + + +librdkafka is a high performance C implementation of the Apache +Kafka client, providing a reliable and performant client for production use. +librdkafka also provides a native C++ interface. + +<!-- markdown-toc start - Don't edit this section. Run M-x markdown-toc-refresh-toc --> +**Table of Contents** + +- [Introduction to librdkafka - the Apache Kafka C/C++ client library](#introduction-to-librdkafka---the-apache-kafka-cc-client-library) + - [Performance](#performance) + - [High throughput](#high-throughput) + - [Low latency](#low-latency) + - [Latency measurement](#latency-measurement) + - [Compression](#compression) + - [Message reliability](#message-reliability) + - [Producer message delivery success](#producer-message-delivery-success) + - [Producer message delivery failure](#producer-message-delivery-failure) + - [Error: Timed out in transmission queue](#error-timed-out-in-transmission-queue) + - [Error: Timed out in flight to/from broker](#error-timed-out-in-flight-tofrom-broker) + - [Error: Temporary broker-side error](#error-temporary-broker-side-error) + - [Error: Temporary errors due to stale metadata](#error-temporary-errors-due-to-stale-metadata) + - [Error: Local time out](#error-local-time-out) + - [Error: Permanent errors](#error-permanent-errors) + - [Producer retries](#producer-retries) + - [Reordering](#reordering) + - [Idempotent Producer](#idempotent-producer) + - [Guarantees](#guarantees) + - [Ordering and message sequence numbers](#ordering-and-message-sequence-numbers) + - [Partitioner considerations](#partitioner-considerations) + - [Message timeout considerations](#message-timeout-considerations) + - [Leader change](#leader-change) + - [Error handling](#error-handling) + - [RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER](#rdkafkaresperroutofordersequencenumber) + - [RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER](#rdkafkaresperrduplicatesequencenumber) + - [RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID](#rdkafkaresperrunknownproducerid) + - [Standard errors](#standard-errors) + - [Message persistence status](#message-persistence-status) + - [Transactional Producer](#transactional-producer) + - [Error handling](#error-handling-1) + - [Old producer fencing](#old-producer-fencing) + - [Configuration considerations](#configuration-considerations) + - [Exactly Once Semantics (EOS) and transactions](#exactly-once-semantics-eos-and-transactions) + - [Usage](#usage) + - [Documentation](#documentation) + - [Initialization](#initialization) + - [Configuration](#configuration) + - [Example](#example) + - [Termination](#termination) + - [High-level KafkaConsumer](#high-level-kafkaconsumer) + - [Producer](#producer) + - [Admin API client](#admin-api-client) + - [Speeding up termination](#speeding-up-termination) + - [Threads and callbacks](#threads-and-callbacks) + - [Brokers](#brokers) + - [SSL](#ssl) + - [OAUTHBEARER with support for OIDC](#oauthbearer-with-support-for-oidc) + - [Sparse connections](#sparse-connections) + - [Random broker selection](#random-broker-selection) + - [Persistent broker connections](#persistent-broker-connections) + - [Connection close](#connection-close) + - [Fetch From Follower](#fetch-from-follower) + - [Logging](#logging) + - [Debug contexts](#debug-contexts) + - [Feature discovery](#feature-discovery) + - [Producer API](#producer-api) + - [Simple Consumer API (legacy)](#simple-consumer-api-legacy) + - [Offset management](#offset-management) + - [Auto offset commit](#auto-offset-commit) + - [At-least-once processing](#at-least-once-processing) + - [Auto offset reset](#auto-offset-reset) + - [Consumer groups](#consumer-groups) + - [Static consumer groups](#static-consumer-groups) + - [Topics](#topics) + - [Unknown or unauthorized topics](#unknown-or-unauthorized-topics) + - [Topic metadata propagation for newly created topics](#topic-metadata-propagation-for-newly-created-topics) + - [Topic auto creation](#topic-auto-creation) + - [Metadata](#metadata) + - [< 0.9.3](#-093) + - [> 0.9.3](#-093) + - [Query reasons](#query-reasons) + - [Caching strategy](#caching-strategy) + - [Fatal errors](#fatal-errors) + - [Fatal producer errors](#fatal-producer-errors) + - [Fatal consumer errors](#fatal-consumer-errors) + - [Compatibility](#compatibility) + - [Broker version compatibility](#broker-version-compatibility) + - [Broker version >= 0.10.0.0 (or trunk)](#broker-version--01000-or-trunk) + - [Broker versions 0.9.0.x](#broker-versions-090x) + - [Broker versions 0.8.x.y](#broker-versions-08xy) + - [Detailed description](#detailed-description) + - [Supported KIPs](#supported-kips) + - [Supported protocol versions](#supported-protocol-versions) +- [Recommendations for language binding developers](#recommendations-for-language-binding-developers) + - [Expose the configuration interface pass-thru](#expose-the-configuration-interface-pass-thru) + - [Error constants](#error-constants) + - [Reporting client software name and version to broker](#reporting-client-software-name-and-version-to-broker) + - [Documentation reuse](#documentation-reuse) + - [Community support](#community-support) + +<!-- markdown-toc end --> + + +## Performance + +librdkafka is a multi-threaded library designed for use on modern hardware and +it attempts to keep memory copying to a minimum. The payload of produced or +consumed messages may pass through without any copying +(if so desired by the application) putting no limit on message sizes. + +librdkafka allows you to decide if high throughput is the name of the game, +or if a low latency service is required, or a balance between the two, all +through the configuration property interface. + +The single most important configuration properties for performance tuning is +`linger.ms` - how long to wait for `batch.num.messages` or `batch.size` to +fill up in the local per-partition queue before sending the batch of messages +to the broker. + +In low throughput scenarios, a lower value improves latency. +As throughput increases, the cost of each broker request becomes significant +impacting both maximum throughput and latency. For higher throughput +applications, latency will typically be lower using a higher `linger.ms` due +to larger batches resulting in a lesser number of requests, yielding decreased +per-message load on the broker. A good general purpose setting is 5ms. +For applications seeking maximum throughput, the recommended value is >= 50ms. + + +### High throughput + +The key to high throughput is message batching - waiting for a certain amount +of messages to accumulate in the local queue before sending them off in +one large message set or batch to the peer. This amortizes the messaging +overhead and eliminates the adverse effect of the round trip time (rtt). + +`linger.ms` (also called `queue.buffering.max.ms`) allows librdkafka to +wait up to the specified amount of time to accumulate up to +`batch.num.messages` or `batch.size` in a single batch (MessageSet) before +sending to the broker. The larger the batch the higher the throughput. +Enabling `msg` debugging (set `debug` property to `msg`) will emit log +messages for the accumulation process which lets you see what batch sizes +are being produced. + +Example using `linger.ms=1`: + +``` +... test [0]: MessageSet with 1514 message(s) delivered +... test [3]: MessageSet with 1690 message(s) delivered +... test [0]: MessageSet with 1720 message(s) delivered +... test [3]: MessageSet with 2 message(s) delivered +... test [3]: MessageSet with 4 message(s) delivered +... test [0]: MessageSet with 4 message(s) delivered +... test [3]: MessageSet with 11 message(s) delivered +``` + +Example using `linger.ms=1000`: +``` +... test [0]: MessageSet with 10000 message(s) delivered +... test [0]: MessageSet with 10000 message(s) delivered +... test [0]: MessageSet with 4667 message(s) delivered +... test [3]: MessageSet with 10000 message(s) delivered +... test [3]: MessageSet with 10000 message(s) delivered +... test [3]: MessageSet with 4476 message(s) delivered + +``` + + +The default setting of `linger.ms=5` is not suitable for +high throughput, it is recommended to set this value to >50ms, with +throughput leveling out somewhere around 100-1000ms depending on +message produce pattern and sizes. + +These setting are set globally (`rd_kafka_conf_t`) but applies on a +per topic+partition basis. + + +### Low latency + +When low latency messaging is required the `linger.ms` should be +tuned to the maximum permitted producer-side latency. +Setting `linger.ms` to 0 or 0.1 will make sure messages are sent as +soon as possible. +Lower buffering time leads to smaller batches and larger per-message overheads, +increasing network, memory and CPU usage for producers, brokers and consumers. + +See [How to decrease message latency](https://github.com/edenhill/librdkafka/wiki/How-to-decrease-message-latency) for more info. + + +#### Latency measurement + +End-to-end latency is preferably measured by synchronizing clocks on producers +and consumers and using the message timestamp on the consumer to calculate +the full latency. Make sure the topic's `log.message.timestamp.type` is set to +the default `CreateTime` (Kafka topic configuration, not librdkafka topic). + +Latencies are typically incurred by the producer, network and broker, the +consumer effect on end-to-end latency is minimal. + +To break down the end-to-end latencies and find where latencies are adding up +there are a number of metrics available through librdkafka statistics +on the producer: + + * `brokers[].int_latency` is the time, per message, between produce() + and the message being written to a MessageSet and ProduceRequest. + High `int_latency` indicates CPU core contention: check CPU load and, + involuntary context switches (`/proc/<..>/status`). + Consider using a machine/instance with more CPU cores. + This metric is only relevant on the producer. + + * `brokers[].outbuf_latency` is the time, per protocol request + (such as ProduceRequest), between the request being enqueued (which happens + right after int_latency) and the time the request is written to the + TCP socket connected to the broker. + High `outbuf_latency` indicates CPU core contention or network congestion: + check CPU load and socket SendQ (`netstat -anp | grep :9092`). + + * `brokers[].rtt` is the time, per protocol request, between the request being + written to the TCP socket and the time the response is received from + the broker. + High `rtt` indicates broker load or network congestion: + check broker metrics, local socket SendQ, network performance, etc. + + * `brokers[].throttle` is the time, per throttled protocol request, the + broker throttled/delayed handling of a request due to usage quotas. + The throttle time will also be reflected in `rtt`. + + * `topics[].batchsize` is the size of individual Producer MessageSet batches. + See below. + + * `topics[].batchcnt` is the number of messages in individual Producer + MessageSet batches. Due to Kafka protocol overhead a batch with few messages + will have a higher relative processing and size overhead than a batch + with many messages. + Use the `linger.ms` client configuration property to set the maximum + amount of time allowed for accumulating a single batch, the larger the + value the larger the batches will grow, thus increasing efficiency. + When producing messages at a high rate it is recommended to increase + linger.ms, which will improve throughput and in some cases also latency. + + +See [STATISTICS.md](STATISTICS.md) for the full definition of metrics. +A JSON schema for the statistics is available in +[statistics-schema.json](src/statistics-schema.json). + + +### Compression + +Producer message compression is enabled through the `compression.codec` +configuration property. + +Compression is performed on the batch of messages in the local queue, the +larger the batch the higher likelyhood of a higher compression ratio. +The local batch queue size is controlled through the `batch.num.messages`, +`batch.size`, and `linger.ms` configuration properties as described in the +**High throughput** chapter above. + + + +## Message reliability + +Message reliability is an important factor of librdkafka - an application +can rely fully on librdkafka to deliver a message according to the specified +configuration (`request.required.acks` and `message.send.max.retries`, etc). + +If the topic configuration property `request.required.acks` is set to wait +for message commit acknowledgements from brokers (any value but 0, see +[`CONFIGURATION.md`](CONFIGURATION.md) +for specifics) then librdkafka will hold on to the message until +all expected acks have been received, gracefully handling the following events: + + * Broker connection failure + * Topic leader change + * Produce errors signaled by the broker + * Network problems + +We recommend `request.required.acks` to be set to `all` to make sure +produced messages are acknowledged by all in-sync replica brokers. + +This is handled automatically by librdkafka and the application does not need +to take any action at any of the above events. +The message will be resent up to `message.send.max.retries` times before +reporting a failure back to the application. + +The delivery report callback is used by librdkafka to signal the status of +a message back to the application, it will be called once for each message +to report the status of message delivery: + + * If `error_code` is non-zero the message delivery failed and the error_code + indicates the nature of the failure (`rd_kafka_resp_err_t` enum). + * If `error_code` is zero the message has been successfully delivered. + +See Producer API chapter for more details on delivery report callback usage. + +The delivery report callback is optional but highly recommended. + + +### Producer message delivery success + +When a ProduceRequest is successfully handled by the broker and a +ProduceResponse is received (also called the ack) without an error code +the messages from the ProduceRequest are enqueued on the delivery report +queue (if a delivery report callback has been set) and will be passed to +the application on the next invocation rd_kafka_poll(). + + +### Producer message delivery failure + +The following sub-chapters explains how different produce errors +are handled. + +If the error is retryable and there are remaining retry attempts for +the given message(s), an automatic retry will be scheduled by librdkafka, +these retries are not visible to the application. + +Only permanent errors and temporary errors that have reached their maximum +retry count will generate a delivery report event to the application with an +error code set. + +The application should typically not attempt to retry producing the message +on failure, but instead configure librdkafka to perform these retries +using the `retries` and `retry.backoff.ms` configuration properties. + + +#### Error: Timed out in transmission queue + +Internal error ERR__TIMED_OUT_QUEUE. + +The connectivity to the broker may be stalled due to networking contention, +local or remote system issues, etc, and the request has not yet been sent. + +The producer can be certain that the message has not been sent to the broker. + +This is a retryable error, but is not counted as a retry attempt +since the message was never actually transmitted. + +A retry by librdkafka at this point will not cause duplicate messages. + + +#### Error: Timed out in flight to/from broker + +Internal error ERR__TIMED_OUT, ERR__TRANSPORT. + +Same reasons as for `Timed out in transmission queue` above, with the +difference that the message may have been sent to the broker and might +be stalling waiting for broker replicas to ack the message, or the response +could be stalled due to networking issues. +At this point the producer can't know if the message reached the broker, +nor if the broker wrote the message to disk and replicas. + +This is a retryable error. + +A retry by librdkafka at this point may cause duplicate messages. + + +#### Error: Temporary broker-side error + +Broker errors ERR_REQUEST_TIMED_OUT, ERR_NOT_ENOUGH_REPLICAS, +ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND. + +These errors are considered temporary and librdkafka is will retry them +if permitted by configuration. + + +#### Error: Temporary errors due to stale metadata + +Broker errors ERR_LEADER_NOT_AVAILABLE, ERR_NOT_LEADER_FOR_PARTITION. + +These errors are considered temporary and a retry is warranted, a metadata +request is automatically sent to find a new leader for the partition. + +A retry by librdkafka at this point will not cause duplicate messages. + + +#### Error: Local time out + +Internal error ERR__MSG_TIMED_OUT. + +The message could not be successfully transmitted before `message.timeout.ms` +expired, typically due to no leader being available or no broker connection. +The message may have been retried due to other errors but +those error messages are abstracted by the ERR__MSG_TIMED_OUT error code. + +Since the `message.timeout.ms` has passed there will be no more retries +by librdkafka. + + +#### Error: Permanent errors + +Any other error is considered a permanent error and the message +will fail immediately, generating a delivery report event with the +distinctive error code. + +The full list of permanent errors depend on the broker version and +will likely grow in the future. + +Typical permanent broker errors are: + * ERR_CORRUPT_MESSAGE + * ERR_MSG_SIZE_TOO_LARGE - adjust client's or broker's `message.max.bytes`. + * ERR_UNKNOWN_TOPIC_OR_PART - topic or partition does not exist, + automatic topic creation is disabled on the + broker or the application is specifying a + partition that does not exist. + * ERR_RECORD_LIST_TOO_LARGE + * ERR_INVALID_REQUIRED_ACKS + * ERR_TOPIC_AUTHORIZATION_FAILED + * ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT + * ERR_CLUSTER_AUTHORIZATION_FAILED + + +### Producer retries + +The ProduceRequest itself is not retried, instead the messages +are put back on the internal partition queue by an insert sort +that maintains their original position (the message order is defined +at the time a message is initially appended to a partition queue, i.e., after +partitioning). +A backoff time (`retry.backoff.ms`) is set on the retried messages which +effectively blocks retry attempts until the backoff time has expired. + + +### Reordering + +As for all retries, if `max.in.flight` > 1 and `retries` > 0, retried messages +may be produced out of order, since a sub-sequent message in a sub-sequent +ProduceRequest may already be in-flight (and accepted by the broker) +by the time the retry for the failing message is sent. + +Using the Idempotent Producer prevents reordering even with `max.in.flight` > 1, +see [Idempotent Producer](#idempotent-producer) below for more information. + + +### Idempotent Producer + +librdkafka supports the idempotent producer which provides strict ordering and +and exactly-once producer guarantees. +The idempotent producer is enabled by setting the `enable.idempotence` +configuration property to `true`, this will automatically adjust a number of +other configuration properties to adhere to the idempotency requirements, +see the documentation of `enable.idempotence` in [CONFIGURATION.md](CONFIGURATION.md) for +more information. +Producer instantiation will fail if the user supplied an incompatible value +for any of the automatically adjusted properties, e.g., it is an error to +explicitly set `acks=1` when `enable.idempotence=true` is set. + + +#### Guarantees + +There are three types of guarantees that the idempotent producer can satisfy: + + * Exactly-once - a message is only written to the log once. + Does NOT cover the exactly-once consumer case. + * Ordering - a series of messages are written to the log in the + order they were produced. + * Gap-less - **EXPERIMENTAL** a series of messages are written once and + in order without risk of skipping messages. The sequence + of messages may be cut short and fail before all + messages are written, but may not fail individual + messages in the series. + This guarantee is disabled by default, but may be enabled + by setting `enable.gapless.guarantee` if individual message + failure is a concern. + Messages that fail due to exceeded timeout (`message.timeout.ms`), + are permitted by the gap-less guarantee and may cause + gaps in the message series without raising a fatal error. + See **Message timeout considerations** below for more info. + **WARNING**: This is an experimental property subject to + change or removal. + +All three guarantees are in effect when idempotence is enabled, only +gap-less may be disabled individually. + + +#### Ordering and message sequence numbers + +librdkafka maintains the original produce() ordering per-partition for all +messages produced, using an internal per-partition 64-bit counter +called the msgid which starts at 1. This msgid allows messages to be +re-inserted in the partition message queue in the original order in the +case of retries. + +The Idempotent Producer functionality in the Kafka protocol also has +a per-message sequence number, which is a signed 32-bit wrapping counter that is +reset each time the Producer's ID (PID) or Epoch changes. + +The librdkafka msgid is used, along with a base msgid value stored +at the time the PID/Epoch was bumped, to calculate the Kafka protocol's +message sequence number. + +With Idempotent Producer enabled there is no risk of reordering despite +`max.in.flight` > 1 (capped at 5). + +**Note**: "MsgId" in log messages refer to the librdkafka msgid, while "seq" + refers to the protocol message sequence, "baseseq" is the seq of + the first message in a batch. + MsgId starts at 1, while message seqs start at 0. + + +The producer statistics also maintain two metrics for tracking the next +expected response sequence: + + * `next_ack_seq` - the next sequence to expect an acknowledgement for, which + is the last successfully produced MessageSet's last + sequence + 1. + * `next_err_seq` - the next sequence to expect an error for, which is typically + the same as `next_ack_seq` until an error occurs, in which + case the `next_ack_seq` can't be incremented (since no + messages were acked on error). `next_err_seq` is used to + properly handle sub-sequent errors due to a failing + first request. + +**Note**: Both are exposed in partition statistics. + + + +#### Partitioner considerations + +Strict ordering is guaranteed on a **per partition** basis. + +An application utilizing the idempotent producer should not mix +producing to explicit partitions with partitioner-based partitions +since messages produced for the latter are queued separately until +a topic's partition count is known, which would insert these messages +after the partition-explicit messages regardless of produce order. + + +#### Message timeout considerations + +If messages time out (due to `message.timeout.ms`) while in the producer queue +there will be gaps in the series of produced messages. + +E.g., Messages 1,2,3,4,5 are produced by the application. + While messages 2,3,4 are transmitted to the broker the connection to + the broker goes down. + While the broker is down the message timeout expires for message 2 and 3. + As the connection comes back up messages 4, 5 are transmitted to the + broker, resulting in a final written message sequence of 1, 4, 5. + +The producer gracefully handles this case by draining the in-flight requests +for a given partition when one or more of its queued (not transmitted) +messages are timed out. When all requests are drained the Epoch is bumped and +the base sequence number is reset to the first message in the queue, effectively +skipping the timed out messages as if they had never existed from the +broker's point of view. +The message status for timed out queued messages will be +`RD_KAFKA_MSG_STATUS_NOT_PERSISTED`. + +If messages time out while in-flight to the broker (also due to +`message.timeout.ms`), the protocol request will fail, the broker +connection will be closed by the client, and the timed out messages will be +removed from the producer queue. In this case the in-flight messages may be +written to the topic log by the broker, even though +a delivery report with error `ERR__MSG_TIMED_OUT` will be raised, since +the producer timed out the request before getting an acknowledgement back +from the broker. +The message status for timed out in-flight messages will be +`RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED`, indicating that the producer +does not know if the messages were written and acked by the broker, +or dropped in-flight. + +An application may inspect the message status by calling +`rd_kafka_message_status()` on the message in the delivery report callback, +to see if the message was (possibly) persisted (written to the topic log) by +the broker or not. + +Despite the graceful handling of timeouts, we recommend to use a +large `message.timeout.ms` to minimize the risk of timeouts. + +**Warning**: `enable.gapless.guarantee` does not apply to timed-out messages. + +**Note**: `delivery.timeout.ms` is an alias for `message.timeout.ms`. + + +#### Leader change + +There are corner cases where an Idempotent Producer has outstanding +ProduceRequests in-flight to the previous leader while a new leader is elected. + +A leader change is typically triggered by the original leader +failing or terminating, which has the risk of also failing (some of) the +in-flight ProduceRequests to that broker. To recover the producer to a +consistent state it will not send any ProduceRequests for these partitions to +the new leader broker until all responses for any outstanding ProduceRequests +to the previous partition leader has been received, or these requests have +timed out. +This drain may take up to `min(socket.timeout.ms, message.timeout.ms)`. +If the connection to the previous broker goes down the outstanding requests +are failed immediately. + + +#### Error handling + +Background: +The error handling for the Idempotent Producer, as initially proposed +in the [EOS design document](https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8), +missed some corner cases which are now being addressed in [KIP-360](https://cwiki.apache.org/confluence/display/KAFKA/KIP-360%3A+Improve+handling+of+unknown+producer). +There were some intermediate fixes and workarounds prior to KIP-360 that proved +to be incomplete and made the error handling in the client overly complex. +With the benefit of hindsight the librdkafka implementation will attempt +to provide correctness from the lessons learned in the Java client and +provide stricter and less complex error handling. + +The follow sections describe librdkafka's handling of the +Idempotent Producer specific errors that may be returned by the broker. + + +##### RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER + +This error is returned by the broker when the sequence number in the +ProduceRequest is larger than the expected next sequence +for the given PID+Epoch+Partition (last BaseSeq + msgcount + 1). +Note: sequence 0 is always accepted. + +If the failed request is the head-of-line (next expected sequence to be acked) +it indicates desynchronization between the client and broker: +the client thinks the sequence number is correct but the broker disagrees. +There is no way for the client to recover from this scenario without +risking message loss or duplication, and it is not safe for the +application to manually retry messages. +A fatal error (`RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER`) is raised. + +When the request is not head-of-line the previous request failed +(for any reason), which means the messages in the current request +can be retried after waiting for all outstanding requests for this +partition to drain and then reset the Producer ID and start over. + + +**Java Producer behaviour**: +Fail the batch, reset the pid, and then continue producing +(and retrying sub-sequent) messages. This will lead to gaps +in the message series. + + + +##### RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER + +Returned by broker when the request's base sequence number is +less than the expected sequence number (which is the last written +sequence + msgcount). +Note: sequence 0 is always accepted. + +This error is typically benign and occurs upon retrying a previously successful +send that was not acknowledged. + +The messages will be considered successfully produced but will have neither +timestamp or offset set. + + +**Java Producer behaviour:** +Treats the message as successfully delivered. + + +##### RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID + +Returned by broker when the PID+Epoch is unknown, which may occur when +the PID's state has expired (due to topic retention, DeleteRecords, +or compaction). + +The Java producer added quite a bit of error handling for this case, +extending the ProduceRequest protocol to return the logStartOffset +to give the producer a chance to differentiate between an actual +UNKNOWN_PRODUCER_ID or topic retention having deleted the last +message for this producer (effectively voiding the Producer ID cache). +This workaround proved to be error prone (see explanation in KIP-360) +when the partition leader changed. + +KIP-360 suggests removing this error checking in favour of failing fast, +librdkafka follows suite. + + +If the response is for the first ProduceRequest in-flight +and there are no messages waiting to be retried nor any ProduceRequests +unaccounted for, then the error is ignored and the epoch is incremented, +this is likely to happen for an idle producer who's last written +message has been deleted from the log, and thus its PID state. +Otherwise the producer raises a fatal error +(RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID) since the delivery guarantees can't +be satisfied. + + +**Java Producer behaviour:** +Retries the send in some cases (but KIP-360 will change this). +Not a fatal error in any case. + + +##### Standard errors + +All the standard Produce errors are handled in the usual way, +permanent errors will fail the messages in the batch, while +temporary errors will be retried (if retry count permits). + +If a permanent error is returned for a batch in a series of in-flight batches, +the sub-sequent batches will fail with +RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER since the sequence number of the +failed batched was never written to the topic log and next expected sequence +thus not incremented on the broker. + +A fatal error (RD_KAFKA_RESP_ERR__GAPLESS_GUARANTEE) is raised to satisfy +the gap-less guarantee (if `enable.gapless.guarantee` is set) by failing all +queued messages. + + +##### Message persistence status + +To help the application decide what to do in these error cases, a new +per-message API is introduced, `rd_kafka_message_status()`, +which returns one of the following values: + + * `RD_KAFKA_MSG_STATUS_NOT_PERSISTED` - the message has never + been transmitted to the broker, or failed with an error indicating + it was not written to the log. + Application retry will risk ordering, but not duplication. + * `RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED` - the message was transmitted + to the broker, but no acknowledgement was received. + Application retry will risk ordering and duplication. + * `RD_KAFKA_MSG_STATUS_PERSISTED` - the message was written to the log by + the broker and fully acknowledged. + No reason for application to retry. + +This method should be called by the application on delivery report error. + + +### Transactional Producer + + +#### Error handling + +Using the transactional producer simplifies error handling compared to the +standard or idempotent producer, a transactional application will only need +to care about these different types of errors: + + * Retriable errors - the operation failed due to temporary problems, + such as network timeouts, the operation may be safely retried. + Use `rd_kafka_error_is_retriable()` to distinguish this case. + * Abortable errors - if any of the transactional APIs return a non-fatal + error code the current transaction has failed and the application + must call `rd_kafka_abort_transaction()`, rewind its input to the + point before the current transaction started, and attempt a new transaction + by calling `rd_kafka_begin_transaction()`, etc. + Use `rd_kafka_error_txn_requires_abort()` to distinguish this case. + * Fatal errors - the application must cease operations and destroy the + producer instance. + Use `rd_kafka_error_is_fatal()` to distinguish this case. + * For all other errors returned from the transactional API: the current + recommendation is to treat any error that has neither retriable, abortable, + or fatal set, as a fatal error. + +While the application should log the actual fatal or abortable errors, there +is no need for the application to handle the underlying errors specifically. + + + +#### Old producer fencing + +If a new transactional producer instance is started with the same +`transactional.id`, any previous still running producer +instance will be fenced off at the next produce, commit or abort attempt, by +raising a fatal error with the error code set to +`RD_KAFKA_RESP_ERR__FENCED`. + + +#### Configuration considerations + +To make sure messages time out (in case of connectivity problems, etc) within +the transaction, the `message.timeout.ms` configuration property must be +set lower than the `transaction.timeout.ms`, this is enforced when +creating the producer instance. +If `message.timeout.ms` is not explicitly configured it will be adjusted +automatically. + + + + +### Exactly Once Semantics (EOS) and transactions + +librdkafka supports Exactly One Semantics (EOS) as defined in [KIP-98](https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging). +For more on the use of transactions, see [Transactions in Apache Kafka](https://www.confluent.io/blog/transactions-apache-kafka/). + +See [examples/transactions.c](examples/transactions.c) for an example +transactional EOS application. + +**Warning** +If the broker version is older than Apache Kafka 2.5.0 then one transactional +producer instance per consumed input partition is required. +For 2.5.0 and later a single producer instance may be used regardless of +the number of input partitions. +See KIP-447 for more information. + + +## Usage + +### Documentation + +The librdkafka API is documented in the [`rdkafka.h`](src/rdkafka.h) +header file, the configuration properties are documented in +[`CONFIGURATION.md`](CONFIGURATION.md) + +### Initialization + +The application needs to instantiate a top-level object `rd_kafka_t` which is +the base container, providing global configuration and shared state. +It is created by calling `rd_kafka_new()`. + +It also needs to instantiate one or more topics (`rd_kafka_topic_t`) to be used +for producing to or consuming from. The topic object holds topic-specific +configuration and will be internally populated with a mapping of all available +partitions and their leader brokers. +It is created by calling `rd_kafka_topic_new()`. + +Both `rd_kafka_t` and `rd_kafka_topic_t` comes with a configuration API which +is optional. +Not using the API will cause librdkafka to use its default values which are +documented in [`CONFIGURATION.md`](CONFIGURATION.md). + +**Note**: An application may create multiple `rd_kafka_t` objects and + they share no state. + +**Note**: An `rd_kafka_topic_t` object may only be used with the `rd_kafka_t` + object it was created from. + + + +### Configuration + +To ease integration with the official Apache Kafka software and lower +the learning curve, librdkafka implements identical configuration +properties as found in the official clients of Apache Kafka. + +Configuration is applied prior to object creation using the +`rd_kafka_conf_set()` and `rd_kafka_topic_conf_set()` APIs. + +**Note**: The `rd_kafka.._conf_t` objects are not reusable after they have been + passed to `rd_kafka.._new()`. + The application does not need to free any config resources after a + `rd_kafka.._new()` call. + +#### Example + +```c + rd_kafka_conf_t *conf; + rd_kafka_conf_res_t res; + rd_kafka_t *rk; + char errstr[512]; + + conf = rd_kafka_conf_new(); + + res = rd_kafka_conf_set(conf, "compression.codec", "snappy", + errstr, sizeof(errstr)); + if (res != RD_KAFKA_CONF_OK) + fail("%s\n", errstr); + + res = rd_kafka_conf_set(conf, "batch.num.messages", "100", + errstr, sizeof(errstr)); + if (res != RD_KAFKA_CONF_OK) + fail("%s\n", errstr); + + rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); + if (!rk) { + rd_kafka_conf_destroy(rk); + fail("Failed to create producer: %s\n", errstr); + } + + /* Note: librdkafka takes ownership of the conf object on success */ +``` + +Configuration properties may be set in any order (except for interceptors) and +may be overwritten before being passed to `rd_kafka_new()`. +`rd_kafka_new()` will verify that the passed configuration is consistent +and will fail and return an error if incompatible configuration properties +are detected. It will also emit log warnings for deprecated and problematic +configuration properties. + + +### Termination + +librdkafka is asynchronous in its nature and performs most operation in its +background threads. + +Calling the librdkafka handle destructor tells the librdkafka background +threads to finalize their work, close network connections, clean up, etc, and +may thus take some time. The destructor (`rd_kafka_destroy()`) will block +until all background threads have terminated. + +If the destructor blocks indefinitely it typically means there is an outstanding +object reference, such as a message or topic object, that was not destroyed +prior to destroying the client handle. + +All objects except for the handle (C: `rd_kafka_t`, +C++: `Consumer,KafkaConsumer,Producer`), such as topic objects, messages, +`topic_partition_t`, `TopicPartition`, events, etc, **MUST** be +destroyed/deleted prior to destroying or closing the handle. + +For C, make sure the following objects are destroyed prior to calling +`rd_kafka_consumer_close()` and `rd_kafka_destroy()`: + * `rd_kafka_message_t` + * `rd_kafka_topic_t` + * `rd_kafka_topic_partition_t` + * `rd_kafka_topic_partition_list_t` + * `rd_kafka_event_t` + * `rd_kafka_queue_t` + +For C++ make sure the following objects are deleted prior to +calling `KafkaConsumer::close()` and delete on the Consumer, KafkaConsumer or +Producer handle: + * `Message` + * `Topic` + * `TopicPartition` + * `Event` + * `Queue` + + +#### High-level KafkaConsumer + +Proper termination sequence for the high-level KafkaConsumer is: +```c + /* 1) Leave the consumer group, commit final offsets, etc. */ + rd_kafka_consumer_close(rk); + + /* 2) Destroy handle object */ + rd_kafka_destroy(rk); +``` + +**NOTE**: There is no need to unsubscribe prior to calling `rd_kafka_consumer_close()`. + +**NOTE**: Any topic objects created must be destroyed prior to rd_kafka_destroy() + +Effects of not doing the above, for: + 1. Final offsets are not committed and the consumer will not actively leave + the group, it will be kicked out of the group after the `session.timeout.ms` + expires. It is okay to omit the `rd_kafka_consumer_close()` call in case + the application does not want to wait for the blocking close call. + 2. librdkafka will continue to operate on the handle. Actual memory leaks. + + +#### Producer + +The proper termination sequence for Producers is: + +```c + /* 1) Make sure all outstanding requests are transmitted and handled. */ + rd_kafka_flush(rk, 60*1000); /* One minute timeout */ + + /* 2) Destroy the topic and handle objects */ + rd_kafka_topic_destroy(rkt); /* Repeat for all topic objects held */ + rd_kafka_destroy(rk); +``` + +Effects of not doing the above, for: + 1. Messages in-queue or in-flight will be dropped. + 2. librdkafka will continue to operate on the handle. Actual memory leaks. + + +#### Admin API client + +Unlike the Java Admin client, the Admin APIs in librdkafka are available +on any type of client instance and can be used in combination with the +client type's main functionality, e.g., it is perfectly fine to call +`CreateTopics()` in your running producer, or `DeleteRecords()` in your +consumer. + +If you need a client instance to only perform Admin API operations the +recommendation is to create a producer instance since it requires less +configuration (no `group.id`) than the consumer and is generally more cost +efficient. +We do recommend that you set `allow.auto.create.topics=false` to avoid +topic metadata lookups to unexpectedly have the broker create topics. + + + +#### Speeding up termination +To speed up the termination of librdkafka an application can set a +termination signal that will be used internally by librdkafka to quickly +cancel any outstanding I/O waits. +Make sure you block this signal in your application. + +```c + char tmp[16]; + snprintf(tmp, sizeof(tmp), "%i", SIGIO); /* Or whatever signal you decide */ + rd_kafka_conf_set(rk_conf, "internal.termination.signal", tmp, errstr, sizeof(errstr)); +``` + + +### Threads and callbacks + +librdkafka uses multiple threads internally to fully utilize modern hardware. +The API is completely thread-safe and the calling application may call any +of the API functions from any of its own threads at any time. + +A poll-based API is used to provide signaling back to the application, +the application should call rd_kafka_poll() at regular intervals. +The poll API will call the following configured callbacks (optional): + + * `dr_msg_cb` - Message delivery report callback - signals that a message has + been delivered or failed delivery, allowing the application to take action + and to release any application resources used in the message. + * `error_cb` - Error callback - signals an error. These errors are usually of + an informational nature, i.e., failure to connect to a broker, and the + application usually does not need to take any action. + The type of error is passed as a rd_kafka_resp_err_t enum value, + including both remote broker errors as well as local failures. + An application typically does not have to perform any action when + an error is raised through the error callback, the client will + automatically try to recover from all errors, given that the + client and cluster is correctly configured. + In some specific cases a fatal error may occur which will render + the client more or less inoperable for further use: + if the error code in the error callback is set to + `RD_KAFKA_RESP_ERR__FATAL` the application should retrieve the + underlying fatal error and reason using the `rd_kafka_fatal_error()` call, + and then begin terminating the instance. + The Event API's EVENT_ERROR has a `rd_kafka_event_error_is_fatal()` + function, and the C++ EventCb has a `fatal()` method, to help the + application determine if an error is fatal or not. + * `stats_cb` - Statistics callback - triggered if `statistics.interval.ms` + is configured to a non-zero value, emitting metrics and internal state + in JSON format, see [STATISTICS.md]. + * `throttle_cb` - Throttle callback - triggered whenever a broker has + throttled (delayed) a request. + +These callbacks will also be triggered by `rd_kafka_flush()`, +`rd_kafka_consumer_poll()`, and any other functions that serve queues. + + +Optional callbacks not triggered by poll, these may be called spontaneously +from any thread at any time: + + * `log_cb` - Logging callback - allows the application to output log messages + generated by librdkafka. + * `partitioner_cb` - Partitioner callback - application provided message partitioner. + The partitioner may be called in any thread at any time, it may be + called multiple times for the same key. + Partitioner function contraints: + - MUST NOT call any rd_kafka_*() functions + - MUST NOT block or execute for prolonged periods of time. + - MUST return a value between 0 and partition_cnt-1, or the + special RD_KAFKA_PARTITION_UA value if partitioning + could not be performed. + + + +### Brokers + +On initialization, librdkafka only needs a partial list of +brokers (at least one), called the bootstrap brokers. +The client will connect to the bootstrap brokers specified by the +`bootstrap.servers` configuration property and query cluster Metadata +information which contains the full list of brokers, topic, partitions and their +leaders in the Kafka cluster. + +Broker names are specified as `host[:port]` where the port is optional +(default 9092) and the host is either a resolvable hostname or an IPv4 or IPv6 +address. +If host resolves to multiple addresses librdkafka will round-robin the +addresses for each connection attempt. +A DNS record containing all broker address can thus be used to provide a +reliable bootstrap broker. + + +#### SSL + +If the client is to connect to a broker's SSL endpoints/listeners the client +needs to be configured with `security.protocol=SSL` for just SSL transport or +`security.protocol=SASL_SSL` for SASL authentication and SSL transport. +The client will try to verify the broker's certificate by checking the +CA root certificates, if the broker's certificate can't be verified +the connection is closed (and retried). This is to protect the client +from connecting to rogue brokers. + +The CA root certificate defaults are system specific: + * On Linux, Mac OSX, and other Unix-like system the OpenSSL default + CA path will be used, also called the OPENSSLDIR, which is typically + `/etc/ssl/certs` (on Linux, typcially in the `ca-certificates` package) and + `/usr/local/etc/openssl` on Mac OSX (Homebrew). + * On Windows the Root certificate store is used, unless + `ssl.ca.certificate.stores` is configured in which case certificates are + read from the specified stores. + * If OpenSSL is linked statically, librdkafka will set the default CA + location to the first of a series of probed paths (see below). + +If the system-provided default CA root certificates are not sufficient to +verify the broker's certificate, such as when a self-signed certificate +or a local CA authority is used, the CA certificate must be specified +explicitly so that the client can find it. +This can be done either by providing a PEM file (e.g., `cacert.pem`) +as the `ssl.ca.location` configuration property, or by passing an in-memory +PEM, X.509/DER or PKCS#12 certificate to `rd_kafka_conf_set_ssl_cert()`. + +It is also possible to disable broker certificate verification completely +by setting `enable.ssl.certificate.verification=false`, but this is not +recommended since it allows for rogue brokers and man-in-the-middle attacks, +and should only be used for testing and troubleshooting purposes. + +CA location probe paths (see [rdkafka_ssl.c](src/rdkafka_ssl.c) for full list) +used when OpenSSL is statically linked: + + "/etc/pki/tls/certs/ca-bundle.crt", + "/etc/ssl/certs/ca-bundle.crt", + "/etc/pki/tls/certs/ca-bundle.trust.crt", + "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", + "/etc/ssl/ca-bundle.pem", + "/etc/pki/tls/cacert.pem", + "/etc/ssl/cert.pem", + "/etc/ssl/cacert.pem", + "/etc/certs/ca-certificates.crt", + "/etc/ssl/certs/ca-certificates.crt", + "/etc/ssl/certs", + "/usr/local/etc/ssl/cert.pem", + "/usr/local/etc/ssl/cacert.pem", + "/usr/local/etc/ssl/certs/cert.pem", + "/usr/local/etc/ssl/certs/cacert.pem", + etc.. + + +On **Windows** the Root certificate store is read by default, but any number +of certificate stores can be read by setting the `ssl.ca.certificate.stores` +configuration property to a comma-separated list of certificate store names. +The predefined system store names are: + + * `MY` - User certificates + * `Root` - System CA certificates (default) + * `CA` - Intermediate CA certificates + * `Trust` - Trusted publishers + +For example, to read both intermediate and root CAs, set +`ssl.ca.certificate.stores=CA,Root`. + + +#### OAUTHBEARER with support for OIDC + +OAUTHBEARER with OIDC provides a method for the client to authenticate to the +Kafka cluster by requesting an authentication token from an issuing server +and passing the retrieved token to brokers during connection setup. + +To use this authentication method the client needs to be configured as follows: + + * `security.protocol` - set to `SASL_SSL` or `SASL_PLAINTEXT`. + * `sasl.mechanism` - set to `OAUTHBEARER`. + * `sasl.oauthbearer.method` - set to `OIDC`. + * `sasl.oauthbearer.token.endpoint.url` - OAUTH issuer token + endpoint HTTP(S) URI used to retrieve the token. + * `sasl.oauthbearer.client.id` - public identifier for the application. + It must be unique across all clients that the authorization server handles. + * `sasl.oauthbearer.client.secret` - secret known only to the + application and the authorization server. + This should be a sufficiently random string that is not guessable. + * `sasl.oauthbearer.scope` - clients use this to specify the scope of the + access request to the broker. + * `sasl.oauthbearer.extensions` - (optional) additional information to be + provided to the broker. A comma-separated list of key=value pairs. + For example: + `supportFeatureX=true,organizationId=sales-emea` + + +#### Sparse connections + +The client will only connect to brokers it needs to communicate with, and +only when necessary. + +Examples of needed broker connections are: + + * leaders for partitions being consumed from + * leaders for partitions being produced to + * consumer group coordinator broker + * cluster controller for Admin API operations + + +##### Random broker selection + +When there is no broker connection and a connection to any broker +is needed, such as on startup to retrieve metadata, the client randomly selects +a broker from its list of brokers, which includes both the configured bootstrap +brokers (including brokers manually added with `rd_kafka_brokers_add()`), as +well as the brokers discovered from cluster metadata. +Brokers with no prior connection attempt are tried first. + +If there is already an available broker connection to any broker it is used, +rather than connecting to a new one. + +The random broker selection and connection scheduling is triggered when: + * bootstrap servers are configured (`rd_kafka_new()`) + * brokers are manually added (`rd_kafka_brokers_add()`). + * a consumer group coordinator needs to be found. + * acquiring a ProducerID for the Idempotent Producer. + * cluster or topic metadata is being refreshed. + +A single connection attempt will be performed, and the broker will +return to an idle INIT state on failure to connect. + +The random broker selection is rate-limited to: +10 < `reconnect.backoff.ms`/2 < 1000 milliseconds. + +**Note**: The broker connection will be maintained until it is closed + by the broker (idle connection reaper). + +##### Persistent broker connections + +While the random broker selection is useful for one-off queries, there +is need for the client to maintain persistent connections to certain brokers: + * Consumer: the group coordinator. + * Consumer: partition leader for topics being fetched from. + * Producer: partition leader for topics being produced to. + +These dependencies are discovered and maintained automatically, marking +matching brokers as persistent, which will make the client maintain connections +to these brokers at all times, reconnecting as necessary. + + +#### Connection close + +A broker connection may be closed by the broker, intermediary network gear, +due to network errors, timeouts, etc. +When a broker connection is closed, librdkafka will back off the next reconnect +attempt (to the given broker) for `reconnect.backoff.ms` -25% to +50% jitter, +this value is increased exponentially for each connect attempt until +`reconnect.backoff.max.ms` is reached, at which time the value is reset +to `reconnect.backoff.ms`. + +The broker will disconnect clients that have not sent any protocol requests +within `connections.max.idle.ms` (broker configuration propertion, defaults +to 10 minutes), but there is no fool proof way for the client to know that it +was a deliberate close by the broker and not an error. To avoid logging these +deliberate idle disconnects as errors the client employs some logic to try to +classify a disconnect as an idle disconnect if no requests have been sent in +the last `socket.timeout.ms` or there are no outstanding, or +queued, requests waiting to be sent. In this case the standard "Disconnect" +error log is silenced (will only be seen with debug enabled). + +Otherwise, if a connection is closed while there are requests in-flight +the logging level will be LOG_WARNING (4), else LOG_INFO (6). + +`log.connection.close=false` may be used to silence all disconnect logs, +but it is recommended to instead rely on the above heuristics. + + +#### Fetch From Follower + +librdkafka supports consuming messages from follower replicas +([KIP-392](https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica)). +This is enabled by setting the `client.rack` configuration property which +corresponds to `broker.rack` on the broker. The actual assignment of +consumers to replicas is determined by the configured `replica.selector.class` +on the broker. + + +### Logging + +#### Debug contexts + +Extensive debugging of librdkafka can be enabled by setting the +`debug` configuration property to a CSV string of debug contexts: + +Debug context | Type | Description +--------------|----------|---------------------- +generic | * | General client instance level debugging. Includes initialization and termination debugging. +broker | * | Broker and connection state debugging. +topic | * | Topic and partition state debugging. Includes leader changes. +metadata | * | Cluster and topic metadata retrieval debugging. +feature | * | Kafka protocol feature support as negotiated with the broker. +queue | producer | Message queue debugging. +msg | * | Message debugging. Includes information about batching, compression, sizes, etc. +protocol | * | Kafka protocol request/response debugging. Includes latency (rtt) printouts. +cgrp | consumer | Low-level consumer group state debugging. +security | * | Security and authentication debugging. +fetch | consumer | Consumer message fetch debugging. Includes decision when and why messages are fetched. +interceptor | * | Interceptor interface debugging. +plugin | * | Plugin loading debugging. +consumer | consumer | High-level consumer debugging. +admin | admin | Admin API debugging. +eos | producer | Idempotent Producer debugging. +mock | * | Mock cluster functionality debugging. +assignor | consumer | Detailed consumer group partition assignor debugging. +conf | * | Display set configuration properties on startup. +all | * | All of the above. + + +Suggested debugging settings for troubleshooting: + +Problem space | Type | Debug setting +-----------------------|----------|------------------- +Producer not delivering messages to broker | producer | `broker,topic,msg` +Consumer not fetching messages | consumer | Start with `consumer`, or use `cgrp,fetch` for detailed information. +Consumer starts reading at unexpected offset | consumer | `consumer` or `cgrp,fetch` +Authentication or connectivity issues | * | `broker,auth` +Protocol handling or latency | * | `broker,protocol` +Topic leader and state | * | `topic,metadata` + + + + +### Feature discovery + +Apache Kafka broker version 0.10.0 added support for the ApiVersionRequest API +which allows a client to query a broker for its range of supported API versions. + +librdkafka supports this functionality and will query each broker on connect +for this information (if `api.version.request=true`) and use it to enable or disable +various protocol features, such as MessageVersion 1 (timestamps), KafkaConsumer, etc. + +If the broker fails to respond to the ApiVersionRequest librdkafka will +assume the broker is too old to support the API and fall back to an older +broker version's API. These fallback versions are hardcoded in librdkafka +and is controlled by the `broker.version.fallback` configuration property. + + + +### Producer API + +After setting up the `rd_kafka_t` object with type `RD_KAFKA_PRODUCER` and one +or more `rd_kafka_topic_t` objects librdkafka is ready for accepting messages +to be produced and sent to brokers. + +The `rd_kafka_produce()` function takes the following arguments: + + * `rkt` - the topic to produce to, previously created with + `rd_kafka_topic_new()` + * `partition` - partition to produce to. If this is set to + `RD_KAFKA_PARTITION_UA` (UnAssigned) then the configured partitioner + function will be used to select a target partition. + * `msgflags` - 0, or one of: + * `RD_KAFKA_MSG_F_COPY` - librdkafka will immediately make a copy of + the payload. Use this when the payload is in non-persistent + memory, such as the stack. + * `RD_KAFKA_MSG_F_FREE` - let librdkafka free the payload using + `free(3)` when it is done with it. + + These two flags are mutually exclusive and neither need to be set in + which case the payload is neither copied nor freed by librdkafka. + + If `RD_KAFKA_MSG_F_COPY` flag is not set no data copying will be + performed and librdkafka will hold on the payload pointer until + the message has been delivered or fails. + The delivery report callback will be called when librdkafka is done + with the message to let the application regain ownership of the + payload memory. + The application must not free the payload in the delivery report + callback if `RD_KAFKA_MSG_F_FREE is set`. + * `payload`,`len` - the message payload + * `key`,`keylen` - an optional message key which can be used for partitioning. + It will be passed to the topic partitioner callback, if any, and + will be attached to the message when sending to the broker. + * `msg_opaque` - an optional application-provided per-message opaque pointer + that will be provided in the message delivery callback to let + the application reference a specific message. + + +`rd_kafka_produce()` is a non-blocking API, it will enqueue the message +on an internal queue and return immediately. +If the new message would cause the internal queue to exceed +`queue.buffering.max.messages` or `queue.buffering.max.kbytes` +configuration properties, `rd_kafka_produce()` returns -1 and sets errno +to `ENOBUFS` and last_error to `RD_KAFKA_RESP_ERR__QUEUE_FULL`, thus +providing a backpressure mechanism. + + +`rd_kafka_producev()` provides an alternative produce API that does not +require a topic `rkt` object and also provides support for extended +message fields, such as timestamp and headers. + + +**Note**: See `examples/rdkafka_performance.c` for a producer implementation. + + +### Simple Consumer API (legacy) + +NOTE: For the high-level KafkaConsumer interface see rd_kafka_subscribe (rdkafka.h) or KafkaConsumer (rdkafkacpp.h) + +The consumer API is a bit more stateful than the producer API. +After creating `rd_kafka_t` with type `RD_KAFKA_CONSUMER` and +`rd_kafka_topic_t` instances the application must also start the consumer +for a given partition by calling `rd_kafka_consume_start()`. + +`rd_kafka_consume_start()` arguments: + + * `rkt` - the topic to start consuming from, previously created with + `rd_kafka_topic_new()`. + * `partition` - partition to consume from. + * `offset` - message offset to start consuming from. This may either be an + absolute message offset or one of the three special offsets: + `RD_KAFKA_OFFSET_BEGINNING` to start consuming from the beginning + of the partition's queue (oldest message), or + `RD_KAFKA_OFFSET_END` to start consuming at the next message to be + produced to the partition, or + `RD_KAFKA_OFFSET_STORED` to use the offset store. + +After a topic+partition consumer has been started librdkafka will attempt +to keep `queued.min.messages` messages in the local queue by repeatedly +fetching batches of messages from the broker. librdkafka will fetch all +consumed partitions for which that broker is a leader, through a single +request. + +This local message queue is then served to the application through three +different consume APIs: + + * `rd_kafka_consume()` - consumes a single message + * `rd_kafka_consume_batch()` - consumes one or more messages + * `rd_kafka_consume_callback()` - consumes all messages in the local + queue and calls a callback function for each one. + +These three APIs are listed above the ascending order of performance, +`rd_kafka_consume()` being the slowest and `rd_kafka_consume_callback()` being +the fastest. The different consume variants are provided to cater for different +application needs. + +A consumed message, as provided or returned by each of the consume functions, +is represented by the `rd_kafka_message_t` type. + +`rd_kafka_message_t` members: + + * `err` - Error signaling back to the application. If this field is non-zero + the `payload` field should be considered an error message and + `err` is an error code (`rd_kafka_resp_err_t`). + If `err` is zero then the message is a proper fetched message + and `payload` et.al contains message payload data. + * `rkt`,`partition` - Topic and partition for this message or error. + * `payload`,`len` - Message payload data or error message (err!=0). + * `key`,`key_len` - Optional message key as specified by the producer + * `offset` - Message offset + +Both the `payload` and `key` memory, as well as the message as a whole, is +owned by librdkafka and must not be used after an `rd_kafka_message_destroy()` +call. librdkafka will share the same messageset receive buffer memory for all +message payloads of that messageset to avoid excessive copying which means +that if the application decides to hang on to a single `rd_kafka_message_t` +it will hinder the backing memory to be released for all other messages +from the same messageset. + +When the application is done consuming messages from a topic+partition it +should call `rd_kafka_consume_stop()` to stop the consumer. This will also +purge any messages currently in the local queue. + + +**Note**: See `examples/rdkafka_performance.c` for a consumer implementation. + + +#### Offset management + +Broker based offset management is available for broker version >= 0.9.0 +in conjunction with using the high-level KafkaConsumer interface (see +rdkafka.h or rdkafkacpp.h) + +Offset management is also available through a deprecated local offset file, +where the offset is periodically written to a local file for each +topic+partition according to the following topic configuration properties: + + * `enable.auto.commit` + * `auto.commit.interval.ms` + * `offset.store.path` + * `offset.store.sync.interval.ms` + +The legacy `auto.commit.enable` topic configuration property is only to be used +with the legacy low-level consumer. +Use `enable.auto.commit` with the modern KafkaConsumer. + + +##### Auto offset commit + +The consumer will automatically commit offsets every `auto.commit.interval.ms` +when `enable.auto.commit` is enabled (default). + +Offsets to be committed are kept in a local in-memory offset store, +this offset store is updated by `consumer_poll()` (et.al) to +store the offset of the last message passed to the application +(per topic+partition). + +##### At-least-once processing +Since auto commits are performed in a background thread this may result in +the offset for the latest message being committed before the application has +finished processing the message. If the application was to crash or exit +prior to finishing processing, and the offset had been auto committed, +the next incarnation of the consumer application would start at the next +message, effectively missing the message that was processed when the +application crashed. +To avoid this scenario the application can disable the automatic +offset **store** by setting `enable.auto.offset.store` to false +and manually **storing** offsets after processing by calling +`rd_kafka_offsets_store()`. +This gives an application fine-grained control on when a message +is eligible for committing without having to perform the commit itself. +`enable.auto.commit` should be set to true when using manual offset storing. +The latest stored offset will be automatically committed every +`auto.commit.interval.ms`. + +**Note**: Only greater offsets are committed, e.g., if the latest committed + offset was 10 and the application performs an offsets_store() + with offset 9, that offset will not be committed. + + +##### Auto offset reset + +The consumer will by default try to acquire the last committed offsets for +each topic+partition it is assigned using its configured `group.id`. +If there is no committed offset available, or the consumer is unable to +fetch the committed offsets, the policy of `auto.offset.reset` will kick in. +This configuration property may be set to one the following values: + + * `earliest` - start consuming the earliest message of the partition. + * `latest` - start consuming the next message to be produced to the partition. + * `error` - don't start consuming but isntead raise a consumer error + with error-code `RD_KAFKA_RESP_ERR__AUTO_OFFSET_RESET` for + the topic+partition. This allows the application to decide what + to do in case there is no committed start offset. + + +### Consumer groups + +Broker based consumer groups (requires Apache Kafka broker >=0.9) are supported, +see KafkaConsumer in rdkafka.h or rdkafkacpp.h + +The following diagram visualizes the high-level balanced consumer group state +flow and synchronization between the application, librdkafka consumer, +group coordinator, and partition leader(s). + +![Consumer group state diagram](src/librdkafka_cgrp_synch.png) + + +#### Static consumer groups + +By default Kafka consumers are rebalanced each time a new consumer joins +the group or an existing member leaves. This is what is known as a dynamic +membership. Apache Kafka >= 2.3.0 introduces static membership. +Unlike dynamic membership, static members can leave and rejoin a group +within the `session.timeout.ms` without triggering a rebalance, retaining +their existing partitions assignment. + +To enable static group membership configure each consumer instance +in the group with a unique `group.instance.id`. + +Consumers with `group.instance.id` set will not send a leave group request on +close - session timeout, change of subscription, or a new group member joining +the group, are the only mechanisms that will trigger a group rebalance for +static consumer groups. + +If a new consumer joins the group with same `group.instance.id` as an +existing consumer, the existing consumer will be fenced and raise a fatal error. +The fatal error is propagated as a consumer error with error code +`RD_KAFKA_RESP_ERR__FATAL`, use `rd_kafka_fatal_error()` to retrieve +the original fatal error code and reason. + +To read more about static group membership, see [KIP-345](https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances). + + +### Note on Batch consume APIs + +Using multiple instances of `rd_kafka_consume_batch()` and/or `rd_kafka_consume_batch_queue()` +APIs concurrently is not thread safe and will result in undefined behaviour. We strongly recommend a +single instance of these APIs to be used at a given time. This usecase is not supported and will not +be supported in future as well. There are different ways to achieve similar result: + +* Create multiple consumers reading from different partitions. In this way, different partitions + are read by different consumers and each consumer can run its own batch call. +* Create multiple consumers in same consumer group. In this way, partitions are assigned to + different consumers and each consumer can run its own batch call. +* Create single consumer and read data from single batch call and process this data in parallel. + +Even after this if you feel the need to use multiple instances of these APIs for the same consumer +concurrently, then don't use any of the **seek**, **pause**, **resume** or **rebalancing** operation +in conjunction with these API calls. For **rebalancing** operation to work in sequencial manner, please +set `rebalance_cb` configuration property (refer [examples/rdkafka_complex_consumer_example.c](examples/rdkafka_complex_consumer_example.c) +for the help with the usage) for the consumer. + + +### Topics + +#### Unknown or unauthorized topics + +If a consumer application subscribes to non-existent or unauthorized topics +a consumer error will be propagated for each unavailable topic with the +error code set to either `RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART` or a +broker-specific error code, such as +`RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED`. + +As the topic metadata is refreshed every `topic.metadata.refresh.interval.ms` +the unavailable topics are re-checked for availability, but the same error +will not be raised again for the same topic. + +If a consumer has Describe (ACL) permissions for a topic but not Read it will +be able to join a consumer group and start consuming the topic, but the Fetch +requests to retrieve messages from the broker will fail with +`RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED`. +This error will be raised to the application once per partition and +assign()/seek() and the fetcher will back off the next fetch 10 times longer than +the `fetch.error.backoff.ms` (but at least 1 second). +It is recommended that the application takes appropriate action when this +occurs, for instance adjusting its subscription or assignment to exclude the +unauthorized topic. + + +#### Topic metadata propagation for newly created topics + +Due to the asynchronous nature of topic creation in Apache Kafka it may +take some time for a newly created topic to be known by all brokers in the +cluster. +If a client tries to use a topic after topic creation but before the topic +has been fully propagated in the cluster it will seem as if the topic does not +exist which would raise `RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC` (et.al) +errors to the application. +To avoid these temporary errors being raised, the client will not flag +a topic as non-existent until a propagation time has elapsed, this propagation +defaults to 30 seconds and can be configured with +`topic.metadata.propagation.max.ms`. +The per-topic max propagation time starts ticking as soon as the topic is +referenced (e.g., by produce()). + +If messages are produced to unknown topics during the propagation time, the +messages will be queued for later delivery to the broker when the topic +metadata has propagated. +Should the topic propagation time expire without the topic being seen the +produced messages will fail with `RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC`. + +**Note**: The propagation time will not take affect if a topic is known to + the client and then deleted, in this case the topic will immediately + be marked as non-existent and remain non-existent until a topic + metadata refresh sees the topic again (after the topic has been + re-created). + + +#### Topic auto creation + +Topic auto creation is supported by librdkafka, if a non-existent topic is +referenced by the client (by produce to, or consuming from, the topic, etc) +the broker will automatically create the topic (with default partition counts +and replication factor) if the broker configuration property +`auto.create.topics.enable=true` is set. + +*Note*: A topic that is undergoing automatic creation may be reported as +unavailable, with e.g., `RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART`, during the +time the topic is being created and partition leaders are elected. + +While topic auto creation may be useful for producer applications, it is not +particularily valuable for consumer applications since even if the topic +to consume is auto created there is nothing writing messages to the topic. +To avoid consumers automatically creating topics the +`allow.auto.create.topics` consumer configuration property is set to +`false` by default, preventing the consumer to trigger automatic topic +creation on the broker. This requires broker version v0.11.0.0 or later. +The `allow.auto.create.topics` property may be set to `true` to allow +auto topic creation, which also requires `auto.create.topics.enable=true` to +be configured on the broker. + + + +### Metadata + +#### < 0.9.3 +Previous to the 0.9.3 release librdkafka's metadata handling +was chatty and excessive, which usually isn't a problem in small +to medium-sized clusters, but in large clusters with a large amount +of librdkafka clients the metadata requests could hog broker CPU and bandwidth. + +#### > 0.9.3 + +The remaining Metadata sections describe the current behaviour. + +**Note:** "Known topics" in the following section means topics for + locally created `rd_kafka_topic_t` objects. + + +#### Query reasons + +There are four reasons to query metadata: + + * brokers - update/populate cluster broker list, so the client can + find and connect to any new brokers added. + + * specific topic - find leader or partition count for specific topic + + * known topics - same, but for all locally known topics. + + * all topics - get topic names for consumer group wildcard subscription + matching + +The above list is sorted so that the sub-sequent entries contain the +information above, e.g., 'known topics' contains enough information to +also satisfy 'specific topic' and 'brokers'. + + +#### Caching strategy + +The prevalent cache timeout is `metadata.max.age.ms`, any cached entry +will remain authoritative for this long or until a relevant broker error +is returned. + + + * brokers - eternally cached, the broker list is additative. + + * topics - cached for `metadata.max.age.ms` + + + +### Fatal errors + +If an unrecoverable error occurs, a fatal error is triggered in one +or more of the follow ways depending on what APIs the application is utilizing: + + * C: the `error_cb` is triggered with error code `RD_KAFKA_RESP_ERR__FATAL`, + the application should call `rd_kafka_fatal_error()` to retrieve the + underlying fatal error code and error string. + * C: an `RD_KAFKA_EVENT_ERROR` event is triggered and + `rd_kafka_event_error_is_fatal()` returns true: the fatal error code + and string are available through `rd_kafka_event_error()`, and `.._string()`. + * C and C++: any API call may return `RD_KAFKA_RESP_ERR__FATAL`, use + `rd_kafka_fatal_error()` to retrieve the underlying fatal error code + and error string. + * C++: an `EVENT_ERROR` event is triggered and `event.fatal()` returns true: + the fatal error code and string are available through `event.err()` and + `event.str()`. + + +An application may call `rd_kafka_fatal_error()` at any time to check if +a fatal error has been raised. + + +#### Fatal producer errors + +The idempotent producer guarantees of ordering and no duplicates also +requires a way for the client to fail gracefully when these guarantees +can't be satisfied. + +If a fatal error has been raised, sub-sequent use of the following API calls +will fail: + + * `rd_kafka_produce()` + * `rd_kafka_producev()` + * `rd_kafka_produce_batch()` + +The underlying fatal error code will be returned, depending on the error +reporting scheme for each of those APIs. + + +When a fatal error has occurred the application should call `rd_kafka_flush()` +to wait for all outstanding and queued messages to drain before terminating +the application. +`rd_kafka_purge(RD_KAFKA_PURGE_F_QUEUE)` is automatically called by the client +when a producer fatal error has occurred, messages in-flight are not purged +automatically to allow waiting for the proper acknowledgement from the broker. +The purged messages in queue will fail with error code set to +`RD_KAFKA_RESP_ERR__PURGE_QUEUE`. + + +#### Fatal consumer errors + +A consumer configured for static group membership (`group.instance.id`) may +raise a fatal error if a new consumer instance is started with the same +instance id, causing the existing consumer to be fenced by the new consumer. + +This fatal error is propagated on the fenced existing consumer in multiple ways: + * `error_cb` (if configured) is triggered. + * `rd_kafka_consumer_poll()` (et.al) will return a message object + with the `err` field set to `RD_KAFKA_ERR__FATAL`. + * any sub-sequent calls to state-changing consumer calls will + return `RD_KAFKA_ERR___FATAL`. + This includes `rd_kafka_subscribe()`, `rd_kafka_assign()`, + `rd_kafka_consumer_close()`, `rd_kafka_commit*()`, etc. + +The consumer will automatically stop consuming when a fatal error has occurred +and no further subscription, assignment, consumption or offset committing +will be possible. At this point the application should simply destroy the +consumer instance and terminate the application since it has been replaced +by a newer instance. + + +## Compatibility + +### Broker version compatibility + +librdkafka supports all released Apache Kafka broker versions since 0.8.0.0.0, +but not all features may be available on all broker versions since some +features rely on newer broker functionality. + +**Current defaults:** + * `api.version.request=true` + * `broker.version.fallback=0.10.0` + * `api.version.fallback.ms=0` (never revert to `broker.version.fallback`) + +Depending on what broker version you are using, please configure your +librdkafka based client as follows: + +#### Broker version >= 0.10.0.0 (or trunk) + +For librdkafka >= v1.0.0 there is no need to set any api.version-related +configuration parameters, the defaults are tailored for broker version 0.10.0.0 +or later. + +For librdkafka < v1.0.0, please specify: +``` +api.version.request=true +api.version.fallback.ms=0 +``` + + +#### Broker versions 0.9.0.x + +``` +api.version.request=false +broker.version.fallback=0.9.0.x (the exact 0.9.0.. version you are using) +``` + +#### Broker versions 0.8.x.y + +``` +api.version.request=false +broker.version.fallback=0.8.x.y (your exact 0.8... broker version) +``` + +#### Detailed description + +Apache Kafka version 0.10.0.0 added support for +[KIP-35](https://cwiki.apache.org/confluence/display/KAFKA/KIP-35+-+Retrieving+protocol+version) - +querying the broker for supported API request types and versions - +allowing the client to figure out what features it can use. +But for older broker versions there is no way for the client to reliably know +what protocol features the broker supports. + +To alleviate this situation librdkafka has three configuration properties: + * `api.version.request=true|false` - enables the API version request, + this requires a >= 0.10.0.0 broker and will cause a disconnect on + brokers 0.8.x - this disconnect is recognized by librdkafka and on the next + connection attempt (which is immediate) it will disable the API version + request and use `broker.version.fallback` as a basis of available features. + **NOTE**: Due to a bug in broker version 0.9.0.0 & 0.9.0.1 the broker will + not close the connection when receiving the API version request, instead + the request will time out in librdkafka after 10 seconds and it will fall + back to `broker.version.fallback` on the next immediate connection attempt. + * `broker.version.fallback=X.Y.Z.N` - if the API version request fails + (if `api.version.request=true`) or API version requests are disabled + (`api.version.request=false`) then this tells librdkafka what version the + broker is running and adapts its feature set accordingly. + * `api.version.fallback.ms=MS` - In the case where `api.version.request=true` + and the API version request fails, this property dictates for how long + librdkafka will use `broker.version.fallback` instead of + `api.version.request=true`. After `MS` has passed the API version request + will be sent on any new connections made for the broker in question. + This allows upgrading the Kafka broker to a new version with extended + feature set without needing to restart or reconfigure the client + (given that `api.version.request=true`). + +*Note: These properties applies per broker.* + +The API version query was disabled by default (`api.version.request=false`) in +librdkafka up to and including v0.9.5 due to the afforementioned bug in +broker version 0.9.0.0 & 0.9.0.1, but was changed to `true` in +librdkafka v0.11.0. + + +### Supported KIPs + +The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals) supported by librdkafka. + + +| KIP | Kafka release | Status | +|--------------------------------------------------------------------------|-----------------------------|-----------------------------------------------------------------------------------------------| +| KIP-1 - Stop accepting request.required.acks > 1 | 0.9.0.0 | Not enforced on client (due to backwards compat with brokers <0.8.3) | +| KIP-4 - Metadata protocol changes | 0.9.0.0, 0.10.0.0, 0.10.1.0 | Supported | +| KIP-8 - Producer flush() | 0.9.0.0 | Supported | +| KIP-12 - SASL Kerberos | 0.9.0.0 | Supported (uses SSPI/logged-on-user on Windows, full KRB5 keytabs on Unix) | +| KIP-13 - Protocol request throttling (enforced on broker) | 0.9.0.0 | Supported | +| KIP-15 - Producer close with timeout | 0.9.0.0 | Supported (through flush() + destroy()) | +| KIP-19 - Request timeouts | 0.9.0.0 | Supported | +| KIP-22 - Producer pluggable partitioner | 0.9.0.0 | Supported (not supported by Go, .NET and Python) | +| KIP-31 - Relative offsets in messagesets | 0.10.0.0 | Supported | +| KIP-35 - ApiVersionRequest | 0.10.0.0 | Supported | +| KIP-40 - ListGroups and DescribeGroups | 0.9.0.0 | Supported | +| KIP-41 - max.poll.records | 0.10.0.0 | Supported through batch consumption interface (not supported by .NET and Go) | +| KIP-42 - Producer and Consumer interceptors | 0.10.0.0 | Supported (not supported by Go, .NET and Python) | +| KIP-43 - SASL PLAIN and handshake | 0.10.0.0 | Supported | +| KIP-48 - Delegation tokens | 1.1.0 | Not supported | +| KIP-54 - Sticky partition assignment strategy | 0.11.0.0 | Supported but not available, use KIP-429 instead. | +| KIP-57 - Interoperable LZ4 framing | 0.10.0.0 | Supported | +| KIP-62 - max.poll.interval and background heartbeats | 0.10.1.0 | Supported | +| KIP-70 - Proper client rebalance event on unsubscribe/subscribe | 0.10.1.0 | Supported | +| KIP-74 - max.partition.fetch.bytes | 0.10.1.0 | Supported | +| KIP-78 - Retrieve Cluster Id | 0.10.1.0 | Supported (not supported by .NET) | +| KIP-79 - OffsetsForTimes | 0.10.1.0 | Supported | +| KIP-81 - Consumer pre-fetch buffer size | 2.4.0 (WIP) | Supported | +| KIP-82 - Record Headers | 0.11.0.0 | Supported | +| KIP-84 - SASL SCRAM | 0.10.2.0 | Supported | +| KIP-85 - SASL config properties | 0.10.2.0 | Supported | +| KIP-86 - Configurable SASL callbacks | 2.0.0 | Not supported | +| KIP-88 - AdminAPI: ListGroupOffsets | 0.10.2.0 | Supported | +| KIP-91 - Intuitive timeouts in Producer | 2.1.0 | Supported | +| KIP-92 - Per-partition lag metrics in Consumer | 0.10.2.0 | Supported | +| KIP-97 - Backwards compatibility with older brokers | 0.10.2.0 | Supported | +| KIP-98 - EOS | 0.11.0.0 | Supported | +| KIP-102 - Close with timeout in consumer | 0.10.2.0 | Not supported | +| KIP-107 - AdminAPI: DeleteRecordsBefore | 0.11.0.0 | Supported | +| KIP-110 - ZStd compression | 2.1.0 | Supported | +| KIP-117 - AdminClient | 0.11.0.0 | Supported | +| KIP-124 - Request rate quotas | 0.11.0.0 | Partially supported (depending on protocol request) | +| KIP-126 - Producer ensure proper batch size after compression | 0.11.0.0 | Supported | +| KIP-133 - AdminAPI: DescribeConfigs and AlterConfigs | 0.11.0.0 | Supported | +| KIP-140 - AdminAPI: ACLs | 0.11.0.0 | Supported | +| KIP-144 - Broker reconnect backoff | 0.11.0.0 | Supported | +| KIP-152 - Improved SASL auth error messages | 1.0.0 | Supported | +| KIP-192 - Cleaner idempotence semantics | 1.0.0 | Not supported (superceeded by KIP-360) | +| KIP-195 - AdminAPI: CreatePartitions | 1.0.0 | Supported | +| KIP-204 - AdminAPI: DeleteRecords | 1.1.0 | Supported | +| KIP-219 - Client-side throttling | 2.0.0 | Not supported | +| KIP-222 - AdminAPI: Consumer group operations | 2.0.0 | Supported | +| KIP-223 - Consumer partition lead metric | 2.0.0 | Not supported | +| KIP-226 - AdminAPI: Dynamic broker config | 1.1.0 | Supported | +| KIP-227 - Consumer Incremental Fetch | 1.1.0 | Not supported | +| KIP-229 - AdminAPI: DeleteGroups | 1.1.0 | Supported | +| KIP-235 - DNS alias for secure connections | 2.1.0 | Not supported | +| KIP-249 - AdminAPI: Deletegation Tokens | 2.0.0 | Not supported | +| KIP-255 - SASL OAUTHBEARER | 2.0.0 | Supported | +| KIP-266 - Fix indefinite consumer timeouts | 2.0.0 | Supported (bound by session.timeout.ms and max.poll.interval.ms) | +| KIP-289 - Consumer group.id default to NULL | 2.2.0 | Supported | +| KIP-294 - SSL endpoint verification | 2.0.0 | Supported | +| KIP-302 - Use all addresses for resolved broker hostname | 2.1.0 | Supported | +| KIP-320 - Consumer: handle log truncation | 2.1.0, 2.2.0 | Supported | +| KIP-322 - DeleteTopics disabled error code | 2.1.0 | Supported | +| KIP-339 - AdminAPI: incrementalAlterConfigs | 2.3.0 | Not supported | +| KIP-341 - Update Sticky partition assignment data | 2.3.0 | Not supported (superceeded by KIP-429) | +| KIP-342 - Custom SASL OAUTHBEARER extensions | 2.1.0 | Supported | +| KIP-345 - Consumer: Static membership | 2.4.0 | Supported | +| KIP-357 - AdminAPI: list ACLs per principal | 2.1.0 | Not supported | +| KIP-359 - Producer: use EpochLeaderId | 2.4.0 | Not supported | +| KIP-360 - Improve handling of unknown Idempotent Producer | 2.5.0 | Supported | +| KIP-361 - Consumer: add config to disable auto topic creation | 2.3.0 | Supported | +| KIP-368 - SASL periodic reauth | 2.2.0 | Not supported | +| KIP-369 - Always roundRobin partitioner | 2.4.0 | Not supported | +| KIP-389 - Consumer group max size | 2.2.0 | Supported (error is propagated to application, but the consumer does not raise a fatal error) | +| KIP-392 - Allow consumers to fetch from closest replica | 2.4.0 | Supported | +| KIP-394 - Consumer: require member.id in JoinGroupRequest | 2.2.0 | Supported | +| KIP-396 - AdminAPI: commit/list offsets | 2.4.0 | Partially supported (remaining APIs available outside Admin client) | +| KIP-412 - AdminAPI: adjust log levels | 2.4.0 | Not supported | +| KIP-421 - Variables in client config files | 2.3.0 | Not applicable (librdkafka, et.al, does not provide a config file interface, and shouldn't) | +| KIP-429 - Consumer: incremental rebalance protocol | 2.4.0 | Supported | +| KIP-430 - AdminAPI: return authorized operations in Describe.. responses | 2.3.0 | Not supported | +| KIP-436 - Start time in stats | 2.3.0 | Supported | +| KIP-447 - Producer scalability for EOS | 2.5.0 | Supported | +| KIP-455 - AdminAPI: Replica assignment | 2.4.0 (WIP) | Not supported | +| KIP-460 - AdminAPI: electPreferredLeader | 2.4.0 | Not supported | +| KIP-464 - AdminAPI: defaults for createTopics | 2.4.0 | Supported | +| KIP-467 - Per-message (sort of) error codes in ProduceResponse | 2.4.0 (WIP) | Not supported | +| KIP-480 - Sticky partitioner | 2.4.0 | Supported | +| KIP-482 - Optional fields in Kafka protocol | 2.4.0 | Partially supported (ApiVersionRequest) | +| KIP-496 - AdminAPI: delete offsets | 2.4.0 | Supported | +| KIP-511 - Collect Client's Name and Version | 2.4.0 | Supported | +| KIP-514 - Bounded flush() | 2.4.0 | Supported | +| KIP-517 - Consumer poll() metrics | 2.4.0 | Not supported | +| KIP-518 - Allow listing consumer groups per state | 2.6.0 | Supported | +| KIP-519 - Make SSL engine configurable | 2.6.0 | Supported | +| KIP-525 - Return topic metadata and configs in CreateTopics response | 2.4.0 | Not supported | +| KIP-526 - Reduce Producer Metadata Lookups for Large Number of Topics | 2.5.0 | Not supported | +| KIP-533 - Add default API timeout to AdminClient | 2.5.0 | Not supported | +| KIP-546 - Add Client Quota APIs to AdminClient | 2.6.0 | Not supported | +| KIP-559 - Make the Kafka Protocol Friendlier with L7 Proxies | 2.5.0 | Not supported | +| KIP-568 - Explicit rebalance triggering on the Consumer | 2.6.0 | Not supported | +| KIP-659 - Add metadata to DescribeConfigsResponse | 2.6.0 | Not supported | +| KIP-580 - Exponential backoff for Kafka clients | WIP | Partially supported | +| KIP-584 - Versioning scheme for features | WIP | Not supported | +| KIP-588 - Allow producers to recover gracefully from txn timeouts | 2.8.0 (WIP) | Not supported | +| KIP-601 - Configurable socket connection timeout | 2.7.0 | Supported | +| KIP-602 - Use all resolved addresses by default | 2.6.0 | Supported | +| KIP-651 - Support PEM format for SSL certs and keys | 2.7.0 | Supported | +| KIP-654 - Aborted txns with non-flushed msgs should not be fatal | 2.7.0 | Supported | +| KIP-735 - Increase default consumer session timeout | 3.0.0 | Supported | +| KIP-768 - SASL/OAUTHBEARER OIDC support | 3.0 | Supported | + + + + +### Supported protocol versions + +"Kafka max" is the maximum ApiVersion supported in Apache Kafka 3.3.1, while +"librdkafka max" is the maximum ApiVersion supported in the latest +release of librdkafka. + + +| ApiKey | Request name | Kafka max | librdkafka max | +| ------- | ------------------- | ----------- | ----------------------- | +| 0 | Produce | 9 | 7 | +| 1 | Fetch | 13 | 11 | +| 2 | ListOffsets | 7 | 2 | +| 3 | Metadata | 12 | 9 | +| 8 | OffsetCommit | 8 | 7 | +| 9 | OffsetFetch | 8 | 7 | +| 10 | FindCoordinator | 4 | 2 | +| 11 | JoinGroup | 9 | 5 | +| 12 | Heartbeat | 4 | 3 | +| 13 | LeaveGroup | 5 | 1 | +| 14 | SyncGroup | 5 | 3 | +| 15 | DescribeGroups | 5 | 4 | +| 16 | ListGroups | 4 | 4 | +| 17 | SaslHandshake | 1 | 1 | +| 18 | ApiVersions | 3 | 3 | +| 19 | CreateTopics | 7 | 4 | +| 20 | DeleteTopics | 6 | 1 | +| 21 | DeleteRecords | 2 | 1 | +| 22 | InitProducerId | 4 | 4 | +| 24 | AddPartitionsToTxn | 3 | 0 | +| 25 | AddOffsetsToTxn | 3 | 0 | +| 26 | EndTxn | 3 | 1 | +| 28 | TxnOffsetCommit | 3 | 3 | +| 32 | DescribeConfigs | 4 | 1 | +| 33 | AlterConfigs | 2 | 1 | +| 36 | SaslAuthenticate | 2 | 0 | +| 37 | CreatePartitions | 3 | 0 | +| 42 | DeleteGroups | 2 | 1 | +| 47 | OffsetDelete | 0 | 0 | + + + +# Recommendations for language binding developers + +These recommendations are targeted for developers that wrap librdkafka +with their high-level languages, such as confluent-kafka-go or node-rdkafka. + +## Expose the configuration interface pass-thru + +librdkafka's string-based key=value configuration property interface controls +most runtime behaviour and evolves over time. +Most features are also only configuration-based, meaning they do not require a +new API (SSL and SASL are two good examples which are purely enabled through +configuration properties) and thus no changes needed to the binding/application +code. + +If your language binding/applications allows configuration properties to be set +in a pass-through fashion without any pre-checking done by your binding code it +means that a simple upgrade of the underlying librdkafka library (but not your +bindings) will provide new features to the user. + +## Error constants + +The error constants, both the official (value >= 0) errors as well as the +internal (value < 0) errors, evolve constantly. +To avoid hard-coding them to expose to your users, librdkafka provides an API +to extract the full list programmatically during runtime or for +code generation, see `rd_kafka_get_err_descs()`. + +## Reporting client software name and version to broker + +[KIP-511](https://cwiki.apache.org/confluence/display/KAFKA/KIP-511%3A+Collect+and+Expose+Client%27s+Name+and+Version+in+the+Brokers) introduces a means for a +Kafka client to report its implementation name and version to the broker, the +broker then exposes this as metrics (e.g., through JMX) to help Kafka operators +troubleshoot problematic clients, understand the impact of broker and client +upgrades, etc. +This requires broker version 2.4.0 or later (metrics added in 2.5.0). + +librdkafka will send its name (`librdkafka`) and version (e.g., `v1.3.0`) +upon connect to a supporting broker. +To help distinguish high-level client bindings on top of librdkafka, a client +binding should configure the following two properties: + * `client.software.name` - set to the binding name, e.g, + `confluent-kafka-go` or `node-rdkafka`. + * `client.software.version` - the version of the binding and the version + of librdkafka, e.g., `v1.3.0-librdkafka-v1.3.0` or + `1.2.0-librdkafka-v1.3.0`. + It is **highly recommended** to include the librdkafka version in this + version string. + +These configuration properties are hidden (from CONFIGURATION.md et.al.) as +they should typically not be modified by the user. + +## Documentation reuse + +You are free to reuse the librdkafka API and CONFIGURATION documentation in +your project, but please do return any documentation improvements back to +librdkafka (file a github pull request). + +## Community support + +You are welcome to direct your users to +[librdkafka's Gitter chat room](http://gitter.im/edenhill/librdkafka) as long as +you monitor the conversions in there to pick up questions specific to your +bindings. +But for the most part user questions are usually generic enough to apply to all +librdkafka bindings. |