summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msg.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msg.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msg.c2517
1 files changed, 2517 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msg.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msg.c
new file mode 100644
index 000000000..17b67999b
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msg.c
@@ -0,0 +1,2517 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012,2013 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "rd.h"
+#include "rdkafka_int.h"
+#include "rdkafka_msg.h"
+#include "rdkafka_topic.h"
+#include "rdkafka_partition.h"
+#include "rdkafka_interceptor.h"
+#include "rdkafka_header.h"
+#include "rdkafka_idempotence.h"
+#include "rdkafka_txnmgr.h"
+#include "rdkafka_error.h"
+#include "rdcrc32.h"
+#include "rdfnv1a.h"
+#include "rdmurmur2.h"
+#include "rdrand.h"
+#include "rdtime.h"
+#include "rdsysqueue.h"
+#include "rdunittest.h"
+
+#include <stdarg.h>
+
+
+const char *rd_kafka_message_errstr(const rd_kafka_message_t *rkmessage) {
+ if (!rkmessage->err)
+ return NULL;
+
+ if (rkmessage->payload)
+ return (const char *)rkmessage->payload;
+
+ return rd_kafka_err2str(rkmessage->err);
+}
+
+
+/**
+ * @brief Check if producing is allowed.
+ *
+ * @param errorp If non-NULL and an producing is prohibited a new error_t
+ * object will be allocated and returned in this pointer.
+ *
+ * @returns an error if not allowed, else 0.
+ *
+ * @remarks Also sets the corresponding errno.
+ */
+static RD_INLINE rd_kafka_resp_err_t
+rd_kafka_check_produce(rd_kafka_t *rk, rd_kafka_error_t **errorp) {
+ rd_kafka_resp_err_t err;
+
+ if (unlikely((err = rd_kafka_fatal_error_code(rk)))) {
+ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__FATAL, ECANCELED);
+ if (errorp) {
+ rd_kafka_rdlock(rk);
+ *errorp = rd_kafka_error_new_fatal(
+ err,
+ "Producing not allowed since a previous fatal "
+ "error was raised: %s",
+ rk->rk_fatal.errstr);
+ rd_kafka_rdunlock(rk);
+ }
+ return RD_KAFKA_RESP_ERR__FATAL;
+ }
+
+ if (likely(rd_kafka_txn_may_enq_msg(rk)))
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+
+ /* Transactional state forbids producing */
+ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__STATE, ENOEXEC);
+
+ if (errorp) {
+ rd_kafka_rdlock(rk);
+ *errorp = rd_kafka_error_new(
+ RD_KAFKA_RESP_ERR__STATE,
+ "Producing not allowed in transactional state %s",
+ rd_kafka_txn_state2str(rk->rk_eos.txn_state));
+ rd_kafka_rdunlock(rk);
+ }
+
+ return RD_KAFKA_RESP_ERR__STATE;
+}
+
+
+void rd_kafka_msg_destroy(rd_kafka_t *rk, rd_kafka_msg_t *rkm) {
+ // FIXME
+ if (rkm->rkm_flags & RD_KAFKA_MSG_F_ACCOUNT) {
+ rd_dassert(rk || rkm->rkm_rkmessage.rkt);
+ rd_kafka_curr_msgs_sub(rk ? rk : rkm->rkm_rkmessage.rkt->rkt_rk,
+ 1, rkm->rkm_len);
+ }
+
+ if (rkm->rkm_headers)
+ rd_kafka_headers_destroy(rkm->rkm_headers);
+
+ if (likely(rkm->rkm_rkmessage.rkt != NULL))
+ rd_kafka_topic_destroy0(rkm->rkm_rkmessage.rkt);
+
+ if (rkm->rkm_flags & RD_KAFKA_MSG_F_FREE && rkm->rkm_payload)
+ rd_free(rkm->rkm_payload);
+
+ if (rkm->rkm_flags & RD_KAFKA_MSG_F_FREE_RKM)
+ rd_free(rkm);
+}
+
+
+
+/**
+ * @brief Create a new Producer message, copying the payload as
+ * indicated by msgflags.
+ *
+ * @returns the new message
+ */
+static rd_kafka_msg_t *rd_kafka_msg_new00(rd_kafka_topic_t *rkt,
+ int32_t partition,
+ int msgflags,
+ char *payload,
+ size_t len,
+ const void *key,
+ size_t keylen,
+ void *msg_opaque) {
+ rd_kafka_msg_t *rkm;
+ size_t mlen = sizeof(*rkm);
+ char *p;
+
+ /* If we are to make a copy of the payload, allocate space for it too */
+ if (msgflags & RD_KAFKA_MSG_F_COPY) {
+ msgflags &= ~RD_KAFKA_MSG_F_FREE;
+ mlen += len;
+ }
+
+ mlen += keylen;
+
+ /* Note: using rd_malloc here, not rd_calloc, so make sure all fields
+ * are properly set up. */
+ rkm = rd_malloc(mlen);
+ rkm->rkm_err = 0;
+ rkm->rkm_flags =
+ (RD_KAFKA_MSG_F_PRODUCER | RD_KAFKA_MSG_F_FREE_RKM | msgflags);
+ rkm->rkm_len = len;
+ rkm->rkm_opaque = msg_opaque;
+ rkm->rkm_rkmessage.rkt = rd_kafka_topic_keep(rkt);
+
+ rkm->rkm_broker_id = -1;
+ rkm->rkm_partition = partition;
+ rkm->rkm_offset = RD_KAFKA_OFFSET_INVALID;
+ rkm->rkm_timestamp = 0;
+ rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;
+ rkm->rkm_status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED;
+ rkm->rkm_headers = NULL;
+
+ p = (char *)(rkm + 1);
+
+ if (payload && msgflags & RD_KAFKA_MSG_F_COPY) {
+ /* Copy payload to space following the ..msg_t */
+ rkm->rkm_payload = p;
+ memcpy(rkm->rkm_payload, payload, len);
+ p += len;
+
+ } else {
+ /* Just point to the provided payload. */
+ rkm->rkm_payload = payload;
+ }
+
+ if (key) {
+ rkm->rkm_key = p;
+ rkm->rkm_key_len = keylen;
+ memcpy(rkm->rkm_key, key, keylen);
+ } else {
+ rkm->rkm_key = NULL;
+ rkm->rkm_key_len = 0;
+ }
+
+ return rkm;
+}
+
+
+
+/**
+ * @brief Create a new Producer message.
+ *
+ * @remark Must only be used by producer code.
+ *
+ * Returns 0 on success or -1 on error.
+ * Both errno and 'errp' are set appropriately.
+ */
+static rd_kafka_msg_t *rd_kafka_msg_new0(rd_kafka_topic_t *rkt,
+ int32_t force_partition,
+ int msgflags,
+ char *payload,
+ size_t len,
+ const void *key,
+ size_t keylen,
+ void *msg_opaque,
+ rd_kafka_resp_err_t *errp,
+ int *errnop,
+ rd_kafka_headers_t *hdrs,
+ int64_t timestamp,
+ rd_ts_t now) {
+ rd_kafka_msg_t *rkm;
+ size_t hdrs_size = 0;
+
+ if (unlikely(!payload))
+ len = 0;
+ if (!key)
+ keylen = 0;
+ if (hdrs)
+ hdrs_size = rd_kafka_headers_serialized_size(hdrs);
+
+ if (unlikely(len > INT32_MAX || keylen > INT32_MAX ||
+ rd_kafka_msg_max_wire_size(keylen, len, hdrs_size) >
+ (size_t)rkt->rkt_rk->rk_conf.max_msg_size)) {
+ *errp = RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE;
+ if (errnop)
+ *errnop = EMSGSIZE;
+ return NULL;
+ }
+
+ if (msgflags & RD_KAFKA_MSG_F_BLOCK)
+ *errp = rd_kafka_curr_msgs_add(
+ rkt->rkt_rk, 1, len, 1 /*block*/,
+ (msgflags & RD_KAFKA_MSG_F_RKT_RDLOCKED) ? &rkt->rkt_lock
+ : NULL);
+ else
+ *errp = rd_kafka_curr_msgs_add(rkt->rkt_rk, 1, len, 0, NULL);
+
+ if (unlikely(*errp)) {
+ if (errnop)
+ *errnop = ENOBUFS;
+ return NULL;
+ }
+
+
+ rkm = rd_kafka_msg_new00(
+ rkt, force_partition,
+ msgflags | RD_KAFKA_MSG_F_ACCOUNT /* curr_msgs_add() */, payload,
+ len, key, keylen, msg_opaque);
+
+ memset(&rkm->rkm_u.producer, 0, sizeof(rkm->rkm_u.producer));
+
+ if (timestamp)
+ rkm->rkm_timestamp = timestamp;
+ else
+ rkm->rkm_timestamp = rd_uclock() / 1000;
+ rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_CREATE_TIME;
+
+ if (hdrs) {
+ rd_dassert(!rkm->rkm_headers);
+ rkm->rkm_headers = hdrs;
+ }
+
+ rkm->rkm_ts_enq = now;
+
+ if (rkt->rkt_conf.message_timeout_ms == 0) {
+ rkm->rkm_ts_timeout = INT64_MAX;
+ } else {
+ rkm->rkm_ts_timeout =
+ now + (int64_t)rkt->rkt_conf.message_timeout_ms * 1000;
+ }
+
+ /* Call interceptor chain for on_send */
+ rd_kafka_interceptors_on_send(rkt->rkt_rk, &rkm->rkm_rkmessage);
+
+ return rkm;
+}
+
+
+/**
+ * @brief Produce: creates a new message, runs the partitioner and enqueues
+ * into on the selected partition.
+ *
+ * @returns 0 on success or -1 on error.
+ *
+ * If the function returns -1 and RD_KAFKA_MSG_F_FREE was specified, then
+ * the memory associated with the payload is still the caller's
+ * responsibility.
+ *
+ * @locks none
+ */
+int rd_kafka_msg_new(rd_kafka_topic_t *rkt,
+ int32_t force_partition,
+ int msgflags,
+ char *payload,
+ size_t len,
+ const void *key,
+ size_t keylen,
+ void *msg_opaque) {
+ rd_kafka_msg_t *rkm;
+ rd_kafka_resp_err_t err;
+ int errnox;
+
+ if (unlikely((err = rd_kafka_check_produce(rkt->rkt_rk, NULL))))
+ return -1;
+
+ /* Create message */
+ rkm = rd_kafka_msg_new0(rkt, force_partition, msgflags, payload, len,
+ key, keylen, msg_opaque, &err, &errnox, NULL, 0,
+ rd_clock());
+ if (unlikely(!rkm)) {
+ /* errno is already set by msg_new() */
+ rd_kafka_set_last_error(err, errnox);
+ return -1;
+ }
+
+
+ /* Partition the message */
+ err = rd_kafka_msg_partitioner(rkt, rkm, 1);
+ if (likely(!err)) {
+ rd_kafka_set_last_error(0, 0);
+ return 0;
+ }
+
+ /* Interceptor: unroll failing messages by triggering on_ack.. */
+ rkm->rkm_err = err;
+ rd_kafka_interceptors_on_acknowledgement(rkt->rkt_rk,
+ &rkm->rkm_rkmessage);
+
+ /* Handle partitioner failures: it only fails when the application
+ * attempts to force a destination partition that does not exist
+ * in the cluster. Note we must clear the RD_KAFKA_MSG_F_FREE
+ * flag since our contract says we don't free the payload on
+ * failure. */
+
+ rkm->rkm_flags &= ~RD_KAFKA_MSG_F_FREE;
+ rd_kafka_msg_destroy(rkt->rkt_rk, rkm);
+
+ /* Translate error codes to errnos. */
+ if (err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)
+ rd_kafka_set_last_error(err, ESRCH);
+ else if (err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
+ rd_kafka_set_last_error(err, ENOENT);
+ else
+ rd_kafka_set_last_error(err, EINVAL); /* NOTREACHED */
+
+ return -1;
+}
+
+
+/** @remark Keep rd_kafka_produceva() and rd_kafka_producev() in synch */
+rd_kafka_error_t *
+rd_kafka_produceva(rd_kafka_t *rk, const rd_kafka_vu_t *vus, size_t cnt) {
+ rd_kafka_msg_t s_rkm = {
+ /* Message defaults */
+ .rkm_partition = RD_KAFKA_PARTITION_UA,
+ .rkm_timestamp = 0, /* current time */
+ };
+ rd_kafka_msg_t *rkm = &s_rkm;
+ rd_kafka_topic_t *rkt = NULL;
+ rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
+ rd_kafka_error_t *error = NULL;
+ rd_kafka_headers_t *hdrs = NULL;
+ rd_kafka_headers_t *app_hdrs = NULL; /* App-provided headers list */
+ size_t i;
+
+ if (unlikely(rd_kafka_check_produce(rk, &error)))
+ return error;
+
+ for (i = 0; i < cnt; i++) {
+ const rd_kafka_vu_t *vu = &vus[i];
+ switch (vu->vtype) {
+ case RD_KAFKA_VTYPE_TOPIC:
+ rkt =
+ rd_kafka_topic_new0(rk, vu->u.cstr, NULL, NULL, 1);
+ break;
+
+ case RD_KAFKA_VTYPE_RKT:
+ rkt = rd_kafka_topic_proper(vu->u.rkt);
+ rd_kafka_topic_keep(rkt);
+ break;
+
+ case RD_KAFKA_VTYPE_PARTITION:
+ rkm->rkm_partition = vu->u.i32;
+ break;
+
+ case RD_KAFKA_VTYPE_VALUE:
+ rkm->rkm_payload = vu->u.mem.ptr;
+ rkm->rkm_len = vu->u.mem.size;
+ break;
+
+ case RD_KAFKA_VTYPE_KEY:
+ rkm->rkm_key = vu->u.mem.ptr;
+ rkm->rkm_key_len = vu->u.mem.size;
+ break;
+
+ case RD_KAFKA_VTYPE_OPAQUE:
+ rkm->rkm_opaque = vu->u.ptr;
+ break;
+
+ case RD_KAFKA_VTYPE_MSGFLAGS:
+ rkm->rkm_flags = vu->u.i;
+ break;
+
+ case RD_KAFKA_VTYPE_TIMESTAMP:
+ rkm->rkm_timestamp = vu->u.i64;
+ break;
+
+ case RD_KAFKA_VTYPE_HEADER:
+ if (unlikely(app_hdrs != NULL)) {
+ error = rd_kafka_error_new(
+ RD_KAFKA_RESP_ERR__CONFLICT,
+ "VTYPE_HEADER and VTYPE_HEADERS "
+ "are mutually exclusive");
+ goto err;
+ }
+
+ if (unlikely(!hdrs))
+ hdrs = rd_kafka_headers_new(8);
+
+ err = rd_kafka_header_add(hdrs, vu->u.header.name, -1,
+ vu->u.header.val,
+ vu->u.header.size);
+ if (unlikely(err)) {
+ error = rd_kafka_error_new(
+ err, "Failed to add header: %s",
+ rd_kafka_err2str(err));
+ goto err;
+ }
+ break;
+
+ case RD_KAFKA_VTYPE_HEADERS:
+ if (unlikely(hdrs != NULL)) {
+ error = rd_kafka_error_new(
+ RD_KAFKA_RESP_ERR__CONFLICT,
+ "VTYPE_HEADERS and VTYPE_HEADER "
+ "are mutually exclusive");
+ goto err;
+ }
+ app_hdrs = vu->u.headers;
+ break;
+
+ default:
+ error = rd_kafka_error_new(
+ RD_KAFKA_RESP_ERR__INVALID_ARG,
+ "Unsupported VTYPE %d", (int)vu->vtype);
+ goto err;
+ }
+ }
+
+ rd_assert(!error);
+
+ if (unlikely(!rkt)) {
+ error = rd_kafka_error_new(RD_KAFKA_RESP_ERR__INVALID_ARG,
+ "Topic name or object required");
+ goto err;
+ }
+
+ rkm = rd_kafka_msg_new0(
+ rkt, rkm->rkm_partition, rkm->rkm_flags, rkm->rkm_payload,
+ rkm->rkm_len, rkm->rkm_key, rkm->rkm_key_len, rkm->rkm_opaque, &err,
+ NULL, app_hdrs ? app_hdrs : hdrs, rkm->rkm_timestamp, rd_clock());
+
+ if (unlikely(err)) {
+ error = rd_kafka_error_new(err, "Failed to produce message: %s",
+ rd_kafka_err2str(err));
+ goto err;
+ }
+
+ /* Partition the message */
+ err = rd_kafka_msg_partitioner(rkt, rkm, 1);
+ if (unlikely(err)) {
+ /* Handle partitioner failures: it only fails when
+ * the application attempts to force a destination
+ * partition that does not exist in the cluster. */
+
+ /* Interceptors: Unroll on_send by on_ack.. */
+ rkm->rkm_err = err;
+ rd_kafka_interceptors_on_acknowledgement(rk,
+ &rkm->rkm_rkmessage);
+
+ /* Note we must clear the RD_KAFKA_MSG_F_FREE
+ * flag since our contract says we don't free the payload on
+ * failure. */
+ rkm->rkm_flags &= ~RD_KAFKA_MSG_F_FREE;
+
+ /* Deassociate application owned headers from message
+ * since headers remain in application ownership
+ * when producev() fails */
+ if (app_hdrs && app_hdrs == rkm->rkm_headers)
+ rkm->rkm_headers = NULL;
+
+ rd_kafka_msg_destroy(rk, rkm);
+
+ error = rd_kafka_error_new(err, "Failed to enqueue message: %s",
+ rd_kafka_err2str(err));
+ goto err;
+ }
+
+ rd_kafka_topic_destroy0(rkt);
+
+ return NULL;
+
+err:
+ if (rkt)
+ rd_kafka_topic_destroy0(rkt);
+
+ if (hdrs)
+ rd_kafka_headers_destroy(hdrs);
+
+ rd_assert(error != NULL);
+ return error;
+}
+
+
+
+/** @remark Keep rd_kafka_produceva() and rd_kafka_producev() in synch */
+rd_kafka_resp_err_t rd_kafka_producev(rd_kafka_t *rk, ...) {
+ va_list ap;
+ rd_kafka_msg_t s_rkm = {
+ /* Message defaults */
+ .rkm_partition = RD_KAFKA_PARTITION_UA,
+ .rkm_timestamp = 0, /* current time */
+ };
+ rd_kafka_msg_t *rkm = &s_rkm;
+ rd_kafka_vtype_t vtype;
+ rd_kafka_topic_t *rkt = NULL;
+ rd_kafka_resp_err_t err;
+ rd_kafka_headers_t *hdrs = NULL;
+ rd_kafka_headers_t *app_hdrs = NULL; /* App-provided headers list */
+
+ if (unlikely((err = rd_kafka_check_produce(rk, NULL))))
+ return err;
+
+ va_start(ap, rk);
+ while (!err &&
+ (vtype = va_arg(ap, rd_kafka_vtype_t)) != RD_KAFKA_VTYPE_END) {
+ switch (vtype) {
+ case RD_KAFKA_VTYPE_TOPIC:
+ rkt = rd_kafka_topic_new0(rk, va_arg(ap, const char *),
+ NULL, NULL, 1);
+ break;
+
+ case RD_KAFKA_VTYPE_RKT:
+ rkt = rd_kafka_topic_proper(
+ va_arg(ap, rd_kafka_topic_t *));
+ rd_kafka_topic_keep(rkt);
+ break;
+
+ case RD_KAFKA_VTYPE_PARTITION:
+ rkm->rkm_partition = va_arg(ap, int32_t);
+ break;
+
+ case RD_KAFKA_VTYPE_VALUE:
+ rkm->rkm_payload = va_arg(ap, void *);
+ rkm->rkm_len = va_arg(ap, size_t);
+ break;
+
+ case RD_KAFKA_VTYPE_KEY:
+ rkm->rkm_key = va_arg(ap, void *);
+ rkm->rkm_key_len = va_arg(ap, size_t);
+ break;
+
+ case RD_KAFKA_VTYPE_OPAQUE:
+ rkm->rkm_opaque = va_arg(ap, void *);
+ break;
+
+ case RD_KAFKA_VTYPE_MSGFLAGS:
+ rkm->rkm_flags = va_arg(ap, int);
+ break;
+
+ case RD_KAFKA_VTYPE_TIMESTAMP:
+ rkm->rkm_timestamp = va_arg(ap, int64_t);
+ break;
+
+ case RD_KAFKA_VTYPE_HEADER: {
+ const char *name;
+ const void *value;
+ ssize_t size;
+
+ if (unlikely(app_hdrs != NULL)) {
+ err = RD_KAFKA_RESP_ERR__CONFLICT;
+ break;
+ }
+
+ if (unlikely(!hdrs))
+ hdrs = rd_kafka_headers_new(8);
+
+ name = va_arg(ap, const char *);
+ value = va_arg(ap, const void *);
+ size = va_arg(ap, ssize_t);
+
+ err = rd_kafka_header_add(hdrs, name, -1, value, size);
+ } break;
+
+ case RD_KAFKA_VTYPE_HEADERS:
+ if (unlikely(hdrs != NULL)) {
+ err = RD_KAFKA_RESP_ERR__CONFLICT;
+ break;
+ }
+ app_hdrs = va_arg(ap, rd_kafka_headers_t *);
+ break;
+
+ default:
+ err = RD_KAFKA_RESP_ERR__INVALID_ARG;
+ break;
+ }
+ }
+
+ va_end(ap);
+
+ if (unlikely(!rkt))
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+
+ if (likely(!err))
+ rkm = rd_kafka_msg_new0(
+ rkt, rkm->rkm_partition, rkm->rkm_flags, rkm->rkm_payload,
+ rkm->rkm_len, rkm->rkm_key, rkm->rkm_key_len,
+ rkm->rkm_opaque, &err, NULL, app_hdrs ? app_hdrs : hdrs,
+ rkm->rkm_timestamp, rd_clock());
+
+ if (unlikely(err)) {
+ rd_kafka_topic_destroy0(rkt);
+ if (hdrs)
+ rd_kafka_headers_destroy(hdrs);
+ return err;
+ }
+
+ /* Partition the message */
+ err = rd_kafka_msg_partitioner(rkt, rkm, 1);
+ if (unlikely(err)) {
+ /* Handle partitioner failures: it only fails when
+ * the application attempts to force a destination
+ * partition that does not exist in the cluster. */
+
+ /* Interceptors: Unroll on_send by on_ack.. */
+ rkm->rkm_err = err;
+ rd_kafka_interceptors_on_acknowledgement(rk,
+ &rkm->rkm_rkmessage);
+
+ /* Note we must clear the RD_KAFKA_MSG_F_FREE
+ * flag since our contract says we don't free the payload on
+ * failure. */
+ rkm->rkm_flags &= ~RD_KAFKA_MSG_F_FREE;
+
+ /* Deassociate application owned headers from message
+ * since headers remain in application ownership
+ * when producev() fails */
+ if (app_hdrs && app_hdrs == rkm->rkm_headers)
+ rkm->rkm_headers = NULL;
+
+ rd_kafka_msg_destroy(rk, rkm);
+ }
+
+ rd_kafka_topic_destroy0(rkt);
+
+ return err;
+}
+
+
+
+/**
+ * @brief Produce a single message.
+ * @locality any application thread
+ * @locks none
+ */
+int rd_kafka_produce(rd_kafka_topic_t *rkt,
+ int32_t partition,
+ int msgflags,
+ void *payload,
+ size_t len,
+ const void *key,
+ size_t keylen,
+ void *msg_opaque) {
+ return rd_kafka_msg_new(rkt, partition, msgflags, payload, len, key,
+ keylen, msg_opaque);
+}
+
+
+
+/**
+ * Produce a batch of messages.
+ * Returns the number of messages succesfully queued for producing.
+ * Each message's .err will be set accordingly.
+ */
+int rd_kafka_produce_batch(rd_kafka_topic_t *app_rkt,
+ int32_t partition,
+ int msgflags,
+ rd_kafka_message_t *rkmessages,
+ int message_cnt) {
+ rd_kafka_msgq_t tmpq = RD_KAFKA_MSGQ_INITIALIZER(tmpq);
+ int i;
+ int64_t utc_now = rd_uclock() / 1000;
+ rd_ts_t now = rd_clock();
+ int good = 0;
+ int multiple_partitions = (partition == RD_KAFKA_PARTITION_UA ||
+ (msgflags & RD_KAFKA_MSG_F_PARTITION));
+ rd_kafka_resp_err_t all_err;
+ rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
+ rd_kafka_toppar_t *rktp = NULL;
+
+ /* Propagated per-message below */
+ all_err = rd_kafka_check_produce(rkt->rkt_rk, NULL);
+
+ rd_kafka_topic_rdlock(rkt);
+ if (!multiple_partitions) {
+ /* Single partition: look up the rktp once. */
+ rktp = rd_kafka_toppar_get_avail(rkt, partition,
+ 1 /*ua on miss*/, &all_err);
+
+ } else {
+ /* Indicate to lower-level msg_new..() that rkt is locked
+ * so that they may unlock it momentarily if blocking. */
+ msgflags |= RD_KAFKA_MSG_F_RKT_RDLOCKED;
+ }
+
+ for (i = 0; i < message_cnt; i++) {
+ rd_kafka_msg_t *rkm;
+
+ /* Propagate error for all messages. */
+ if (unlikely(all_err)) {
+ rkmessages[i].err = all_err;
+ continue;
+ }
+
+ /* Create message */
+ rkm = rd_kafka_msg_new0(
+ rkt,
+ (msgflags & RD_KAFKA_MSG_F_PARTITION)
+ ? rkmessages[i].partition
+ : partition,
+ msgflags, rkmessages[i].payload, rkmessages[i].len,
+ rkmessages[i].key, rkmessages[i].key_len,
+ rkmessages[i]._private, &rkmessages[i].err, NULL, NULL,
+ utc_now, now);
+ if (unlikely(!rkm)) {
+ if (rkmessages[i].err == RD_KAFKA_RESP_ERR__QUEUE_FULL)
+ all_err = rkmessages[i].err;
+ continue;
+ }
+
+ /* Three cases here:
+ * partition==UA: run the partitioner (slow)
+ * RD_KAFKA_MSG_F_PARTITION: produce message to specified
+ * partition
+ * fixed partition: simply concatenate the queue
+ * to partit */
+ if (multiple_partitions) {
+ if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA) {
+ /* Partition the message */
+ rkmessages[i].err = rd_kafka_msg_partitioner(
+ rkt, rkm, 0 /*already locked*/);
+ } else {
+ if (rktp == NULL || rkm->rkm_partition !=
+ rktp->rktp_partition) {
+ rd_kafka_resp_err_t err;
+ if (rktp != NULL)
+ rd_kafka_toppar_destroy(rktp);
+ rktp = rd_kafka_toppar_get_avail(
+ rkt, rkm->rkm_partition,
+ 1 /*ua on miss*/, &err);
+
+ if (unlikely(!rktp)) {
+ rkmessages[i].err = err;
+ continue;
+ }
+ }
+ rd_kafka_toppar_enq_msg(rktp, rkm, now);
+
+ if (rd_kafka_is_transactional(rkt->rkt_rk)) {
+ /* Add partition to transaction */
+ rd_kafka_txn_add_partition(rktp);
+ }
+ }
+
+ if (unlikely(rkmessages[i].err)) {
+ /* Interceptors: Unroll on_send by on_ack.. */
+ rd_kafka_interceptors_on_acknowledgement(
+ rkt->rkt_rk, &rkmessages[i]);
+
+ rd_kafka_msg_destroy(rkt->rkt_rk, rkm);
+ continue;
+ }
+
+
+ } else {
+ /* Single destination partition. */
+ rd_kafka_toppar_enq_msg(rktp, rkm, now);
+ }
+
+ rkmessages[i].err = RD_KAFKA_RESP_ERR_NO_ERROR;
+ good++;
+ }
+
+ rd_kafka_topic_rdunlock(rkt);
+
+ if (!multiple_partitions && good > 0 &&
+ rd_kafka_is_transactional(rkt->rkt_rk) &&
+ rktp->rktp_partition != RD_KAFKA_PARTITION_UA) {
+ /* Add single destination partition to transaction */
+ rd_kafka_txn_add_partition(rktp);
+ }
+
+ if (rktp != NULL)
+ rd_kafka_toppar_destroy(rktp);
+
+ return good;
+}
+
+/**
+ * @brief Scan \p rkmq for messages that have timed out and remove them from
+ * \p rkmq and add to \p timedout queue.
+ *
+ * @param abs_next_timeout will be set to the next message timeout, or 0
+ * if no timeout. Optional, may be NULL.
+ *
+ * @returns the number of messages timed out.
+ *
+ * @locality any
+ * @locks toppar_lock MUST be held
+ */
+int rd_kafka_msgq_age_scan(rd_kafka_toppar_t *rktp,
+ rd_kafka_msgq_t *rkmq,
+ rd_kafka_msgq_t *timedout,
+ rd_ts_t now,
+ rd_ts_t *abs_next_timeout) {
+ rd_kafka_msg_t *rkm, *tmp, *first = NULL;
+ int cnt = timedout->rkmq_msg_cnt;
+
+ if (abs_next_timeout)
+ *abs_next_timeout = 0;
+
+ /* Assume messages are added in time sequencial order */
+ TAILQ_FOREACH_SAFE(rkm, &rkmq->rkmq_msgs, rkm_link, tmp) {
+ /* NOTE: this is not true for the deprecated (and soon removed)
+ * LIFO queuing strategy. */
+ if (likely(rkm->rkm_ts_timeout > now)) {
+ if (abs_next_timeout)
+ *abs_next_timeout = rkm->rkm_ts_timeout;
+ break;
+ }
+
+ if (!first)
+ first = rkm;
+
+ rd_kafka_msgq_deq(rkmq, rkm, 1);
+ rd_kafka_msgq_enq(timedout, rkm);
+ }
+
+ return timedout->rkmq_msg_cnt - cnt;
+}
+
+
+int rd_kafka_msgq_enq_sorted0(rd_kafka_msgq_t *rkmq,
+ rd_kafka_msg_t *rkm,
+ int (*order_cmp)(const void *, const void *)) {
+ TAILQ_INSERT_SORTED(&rkmq->rkmq_msgs, rkm, rd_kafka_msg_t *, rkm_link,
+ order_cmp);
+ rkmq->rkmq_msg_bytes += rkm->rkm_len + rkm->rkm_key_len;
+ return ++rkmq->rkmq_msg_cnt;
+}
+
+int rd_kafka_msgq_enq_sorted(const rd_kafka_topic_t *rkt,
+ rd_kafka_msgq_t *rkmq,
+ rd_kafka_msg_t *rkm) {
+ rd_dassert(rkm->rkm_u.producer.msgid != 0);
+ return rd_kafka_msgq_enq_sorted0(rkmq, rkm,
+ rkt->rkt_conf.msg_order_cmp);
+}
+
+/**
+ * @brief Find the insert before position (i.e., the msg which comes
+ * after \p rkm sequencially) for message \p rkm.
+ *
+ * @param rkmq insert queue.
+ * @param start_pos the element in \p rkmq to start scanning at, or NULL
+ * to start with the first element.
+ * @param rkm message to insert.
+ * @param cmp message comparator.
+ * @param cntp the accumulated number of messages up to, but not including,
+ * the returned insert position. Optional (NULL).
+ * Do not use when start_pos is set.
+ * @param bytesp the accumulated number of bytes up to, but not inclduing,
+ * the returned insert position. Optional (NULL).
+ * Do not use when start_pos is set.
+ *
+ * @remark cntp and bytesp will NOT be accurate when \p start_pos is non-NULL.
+ *
+ * @returns the insert position element, or NULL if \p rkm should be
+ * added at tail of queue.
+ */
+rd_kafka_msg_t *rd_kafka_msgq_find_pos(const rd_kafka_msgq_t *rkmq,
+ const rd_kafka_msg_t *start_pos,
+ const rd_kafka_msg_t *rkm,
+ int (*cmp)(const void *, const void *),
+ int *cntp,
+ int64_t *bytesp) {
+ const rd_kafka_msg_t *curr;
+ int cnt = 0;
+ int64_t bytes = 0;
+
+ for (curr = start_pos ? start_pos : rd_kafka_msgq_first(rkmq); curr;
+ curr = TAILQ_NEXT(curr, rkm_link)) {
+ if (cmp(rkm, curr) < 0) {
+ if (cntp) {
+ *cntp = cnt;
+ *bytesp = bytes;
+ }
+ return (rd_kafka_msg_t *)curr;
+ }
+ if (cntp) {
+ cnt++;
+ bytes += rkm->rkm_len + rkm->rkm_key_len;
+ }
+ }
+
+ return NULL;
+}
+
+
+/**
+ * @brief Split the original \p leftq into a left and right part,
+ * with element \p first_right being the first element in the
+ * right part (\p rightq).
+ *
+ * @param cnt is the number of messages up to, but not including \p first_right
+ * in \p leftq, namely the number of messages to remain in
+ * \p leftq after the split.
+ * @param bytes is the bytes counterpart to \p cnt.
+ */
+void rd_kafka_msgq_split(rd_kafka_msgq_t *leftq,
+ rd_kafka_msgq_t *rightq,
+ rd_kafka_msg_t *first_right,
+ int cnt,
+ int64_t bytes) {
+ rd_kafka_msg_t *llast;
+
+ rd_assert(first_right != TAILQ_FIRST(&leftq->rkmq_msgs));
+
+ llast = TAILQ_PREV(first_right, rd_kafka_msg_head_s, rkm_link);
+
+ rd_kafka_msgq_init(rightq);
+
+ rightq->rkmq_msgs.tqh_first = first_right;
+ rightq->rkmq_msgs.tqh_last = leftq->rkmq_msgs.tqh_last;
+
+ first_right->rkm_link.tqe_prev = &rightq->rkmq_msgs.tqh_first;
+
+ leftq->rkmq_msgs.tqh_last = &llast->rkm_link.tqe_next;
+ llast->rkm_link.tqe_next = NULL;
+
+ rightq->rkmq_msg_cnt = leftq->rkmq_msg_cnt - cnt;
+ rightq->rkmq_msg_bytes = leftq->rkmq_msg_bytes - bytes;
+ leftq->rkmq_msg_cnt = cnt;
+ leftq->rkmq_msg_bytes = bytes;
+
+ rd_kafka_msgq_verify_order(NULL, leftq, 0, rd_false);
+ rd_kafka_msgq_verify_order(NULL, rightq, 0, rd_false);
+}
+
+
+/**
+ * @brief Set per-message metadata for all messages in \p rkmq
+ */
+void rd_kafka_msgq_set_metadata(rd_kafka_msgq_t *rkmq,
+ int32_t broker_id,
+ int64_t base_offset,
+ int64_t timestamp,
+ rd_kafka_msg_status_t status) {
+ rd_kafka_msg_t *rkm;
+
+ TAILQ_FOREACH(rkm, &rkmq->rkmq_msgs, rkm_link) {
+ rkm->rkm_broker_id = broker_id;
+ rkm->rkm_offset = base_offset++;
+ if (timestamp != -1) {
+ rkm->rkm_timestamp = timestamp;
+ rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME;
+ }
+
+ /* Don't downgrade a message from any form of PERSISTED
+ * to NOT_PERSISTED, since the original cause of indicating
+ * PERSISTED can't be changed.
+ * E.g., a previous ack or in-flight timeout. */
+ if (unlikely(status == RD_KAFKA_MSG_STATUS_NOT_PERSISTED &&
+ rkm->rkm_status !=
+ RD_KAFKA_MSG_STATUS_NOT_PERSISTED))
+ continue;
+
+ rkm->rkm_status = status;
+ }
+}
+
+
+/**
+ * @brief Move all messages in \p src to \p dst whose msgid <= last_msgid.
+ *
+ * @remark src must be ordered
+ */
+void rd_kafka_msgq_move_acked(rd_kafka_msgq_t *dest,
+ rd_kafka_msgq_t *src,
+ uint64_t last_msgid,
+ rd_kafka_msg_status_t status) {
+ rd_kafka_msg_t *rkm;
+
+ while ((rkm = rd_kafka_msgq_first(src)) &&
+ rkm->rkm_u.producer.msgid <= last_msgid) {
+ rd_kafka_msgq_deq(src, rkm, 1);
+ rd_kafka_msgq_enq(dest, rkm);
+
+ rkm->rkm_status = status;
+ }
+
+ rd_kafka_msgq_verify_order(NULL, dest, 0, rd_false);
+ rd_kafka_msgq_verify_order(NULL, src, 0, rd_false);
+}
+
+
+
+int32_t rd_kafka_msg_partitioner_random(const rd_kafka_topic_t *rkt,
+ const void *key,
+ size_t keylen,
+ int32_t partition_cnt,
+ void *rkt_opaque,
+ void *msg_opaque) {
+ int32_t p = rd_jitter(0, partition_cnt - 1);
+ if (unlikely(!rd_kafka_topic_partition_available(rkt, p)))
+ return rd_jitter(0, partition_cnt - 1);
+ else
+ return p;
+}
+
+int32_t rd_kafka_msg_partitioner_consistent(const rd_kafka_topic_t *rkt,
+ const void *key,
+ size_t keylen,
+ int32_t partition_cnt,
+ void *rkt_opaque,
+ void *msg_opaque) {
+ return rd_crc32(key, keylen) % partition_cnt;
+}
+
+int32_t rd_kafka_msg_partitioner_consistent_random(const rd_kafka_topic_t *rkt,
+ const void *key,
+ size_t keylen,
+ int32_t partition_cnt,
+ void *rkt_opaque,
+ void *msg_opaque) {
+ if (keylen == 0)
+ return rd_kafka_msg_partitioner_random(
+ rkt, key, keylen, partition_cnt, rkt_opaque, msg_opaque);
+ else
+ return rd_kafka_msg_partitioner_consistent(
+ rkt, key, keylen, partition_cnt, rkt_opaque, msg_opaque);
+}
+
+int32_t rd_kafka_msg_partitioner_murmur2(const rd_kafka_topic_t *rkt,
+ const void *key,
+ size_t keylen,
+ int32_t partition_cnt,
+ void *rkt_opaque,
+ void *msg_opaque) {
+ return (rd_murmur2(key, keylen) & 0x7fffffff) % partition_cnt;
+}
+
+int32_t rd_kafka_msg_partitioner_murmur2_random(const rd_kafka_topic_t *rkt,
+ const void *key,
+ size_t keylen,
+ int32_t partition_cnt,
+ void *rkt_opaque,
+ void *msg_opaque) {
+ if (!key)
+ return rd_kafka_msg_partitioner_random(
+ rkt, key, keylen, partition_cnt, rkt_opaque, msg_opaque);
+ else
+ return (rd_murmur2(key, keylen) & 0x7fffffff) % partition_cnt;
+}
+
+int32_t rd_kafka_msg_partitioner_fnv1a(const rd_kafka_topic_t *rkt,
+ const void *key,
+ size_t keylen,
+ int32_t partition_cnt,
+ void *rkt_opaque,
+ void *msg_opaque) {
+ return rd_fnv1a(key, keylen) % partition_cnt;
+}
+
+int32_t rd_kafka_msg_partitioner_fnv1a_random(const rd_kafka_topic_t *rkt,
+ const void *key,
+ size_t keylen,
+ int32_t partition_cnt,
+ void *rkt_opaque,
+ void *msg_opaque) {
+ if (!key)
+ return rd_kafka_msg_partitioner_random(
+ rkt, key, keylen, partition_cnt, rkt_opaque, msg_opaque);
+ else
+ return rd_fnv1a(key, keylen) % partition_cnt;
+}
+
+int32_t rd_kafka_msg_sticky_partition(rd_kafka_topic_t *rkt,
+ const void *key,
+ size_t keylen,
+ int32_t partition_cnt,
+ void *rkt_opaque,
+ void *msg_opaque) {
+
+ if (!rd_kafka_topic_partition_available(rkt, rkt->rkt_sticky_partition))
+ rd_interval_expedite(&rkt->rkt_sticky_intvl, 0);
+
+ if (rd_interval(&rkt->rkt_sticky_intvl,
+ rkt->rkt_rk->rk_conf.sticky_partition_linger_ms * 1000,
+ 0) > 0) {
+ rkt->rkt_sticky_partition = rd_kafka_msg_partitioner_random(
+ rkt, key, keylen, partition_cnt, rkt_opaque, msg_opaque);
+ rd_kafka_dbg(rkt->rkt_rk, TOPIC, "PARTITIONER",
+ "%s [%" PRId32 "] is the new sticky partition",
+ rkt->rkt_topic->str, rkt->rkt_sticky_partition);
+ }
+
+ return rkt->rkt_sticky_partition;
+}
+
+/**
+ * @brief Assigns a message to a topic partition using a partitioner.
+ *
+ * @param do_lock if RD_DO_LOCK then acquire topic lock.
+ *
+ * @returns RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION or .._UNKNOWN_TOPIC if
+ * partitioning failed, or 0 on success.
+ *
+ * @locality any
+ * @locks rd_kafka_
+ */
+int rd_kafka_msg_partitioner(rd_kafka_topic_t *rkt,
+ rd_kafka_msg_t *rkm,
+ rd_dolock_t do_lock) {
+ int32_t partition;
+ rd_kafka_toppar_t *rktp_new;
+ rd_kafka_resp_err_t err;
+
+ if (do_lock)
+ rd_kafka_topic_rdlock(rkt);
+
+ switch (rkt->rkt_state) {
+ case RD_KAFKA_TOPIC_S_UNKNOWN:
+ /* No metadata received from cluster yet.
+ * Put message in UA partition and re-run partitioner when
+ * cluster comes up. */
+ partition = RD_KAFKA_PARTITION_UA;
+ break;
+
+ case RD_KAFKA_TOPIC_S_NOTEXISTS:
+ /* Topic not found in cluster.
+ * Fail message immediately. */
+ err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
+ if (do_lock)
+ rd_kafka_topic_rdunlock(rkt);
+ return err;
+
+ case RD_KAFKA_TOPIC_S_ERROR:
+ /* Topic has permanent error.
+ * Fail message immediately. */
+ err = rkt->rkt_err;
+ if (do_lock)
+ rd_kafka_topic_rdunlock(rkt);
+ return err;
+
+ case RD_KAFKA_TOPIC_S_EXISTS:
+ /* Topic exists in cluster. */
+
+ /* Topic exists but has no partitions.
+ * This is usually an transient state following the
+ * auto-creation of a topic. */
+ if (unlikely(rkt->rkt_partition_cnt == 0)) {
+ partition = RD_KAFKA_PARTITION_UA;
+ break;
+ }
+
+ /* Partition not assigned, run partitioner. */
+ if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA) {
+
+ if (!rkt->rkt_conf.random_partitioner &&
+ (!rkm->rkm_key ||
+ (rkm->rkm_key_len == 0 &&
+ rkt->rkt_conf.partitioner ==
+ rd_kafka_msg_partitioner_consistent_random))) {
+ partition = rd_kafka_msg_sticky_partition(
+ rkt, rkm->rkm_key, rkm->rkm_key_len,
+ rkt->rkt_partition_cnt,
+ rkt->rkt_conf.opaque, rkm->rkm_opaque);
+ } else {
+ partition = rkt->rkt_conf.partitioner(
+ rkt, rkm->rkm_key, rkm->rkm_key_len,
+ rkt->rkt_partition_cnt,
+ rkt->rkt_conf.opaque, rkm->rkm_opaque);
+ }
+ } else
+ partition = rkm->rkm_partition;
+
+ /* Check that partition exists. */
+ if (partition >= rkt->rkt_partition_cnt) {
+ err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
+ if (do_lock)
+ rd_kafka_topic_rdunlock(rkt);
+ return err;
+ }
+ break;
+
+ default:
+ rd_kafka_assert(rkt->rkt_rk, !*"NOTREACHED");
+ break;
+ }
+
+ /* Get new partition */
+ rktp_new = rd_kafka_toppar_get(rkt, partition, 0);
+
+ if (unlikely(!rktp_new)) {
+ /* Unknown topic or partition */
+ if (rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS)
+ err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
+ else
+ err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
+
+ if (do_lock)
+ rd_kafka_topic_rdunlock(rkt);
+
+ return err;
+ }
+
+ rd_atomic64_add(&rktp_new->rktp_c.producer_enq_msgs, 1);
+
+ /* Update message partition */
+ if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA)
+ rkm->rkm_partition = partition;
+
+ /* Partition is available: enqueue msg on partition's queue */
+ rd_kafka_toppar_enq_msg(rktp_new, rkm, rd_clock());
+ if (do_lock)
+ rd_kafka_topic_rdunlock(rkt);
+
+ if (rktp_new->rktp_partition != RD_KAFKA_PARTITION_UA &&
+ rd_kafka_is_transactional(rkt->rkt_rk)) {
+ /* Add partition to transaction */
+ rd_kafka_txn_add_partition(rktp_new);
+ }
+
+ rd_kafka_toppar_destroy(rktp_new); /* from _get() */
+ return 0;
+}
+
+
+
+/**
+ * @name Public message type (rd_kafka_message_t)
+ */
+void rd_kafka_message_destroy(rd_kafka_message_t *rkmessage) {
+ rd_kafka_op_t *rko;
+
+ if (likely((rko = (rd_kafka_op_t *)rkmessage->_private) != NULL))
+ rd_kafka_op_destroy(rko);
+ else {
+ rd_kafka_msg_t *rkm = rd_kafka_message2msg(rkmessage);
+ rd_kafka_msg_destroy(NULL, rkm);
+ }
+}
+
+
+rd_kafka_message_t *rd_kafka_message_new(void) {
+ rd_kafka_msg_t *rkm = rd_calloc(1, sizeof(*rkm));
+ rkm->rkm_flags = RD_KAFKA_MSG_F_FREE_RKM;
+ rkm->rkm_broker_id = -1;
+ return (rd_kafka_message_t *)rkm;
+}
+
+
+/**
+ * @brief Set up a rkmessage from an rko for passing to the application.
+ * @remark Will trigger on_consume() interceptors if any.
+ */
+static rd_kafka_message_t *
+rd_kafka_message_setup(rd_kafka_op_t *rko, rd_kafka_message_t *rkmessage) {
+ rd_kafka_topic_t *rkt;
+ rd_kafka_toppar_t *rktp = NULL;
+
+ if (rko->rko_type == RD_KAFKA_OP_DR) {
+ rkt = rko->rko_u.dr.rkt;
+ } else {
+ if (rko->rko_rktp) {
+ rktp = rko->rko_rktp;
+ rkt = rktp->rktp_rkt;
+ } else
+ rkt = NULL;
+
+ rkmessage->_private = rko;
+ }
+
+
+ if (!rkmessage->rkt && rkt)
+ rkmessage->rkt = rd_kafka_topic_keep(rkt);
+
+ if (rktp)
+ rkmessage->partition = rktp->rktp_partition;
+
+ if (!rkmessage->err)
+ rkmessage->err = rko->rko_err;
+
+ /* Call on_consume interceptors */
+ switch (rko->rko_type) {
+ case RD_KAFKA_OP_FETCH:
+ if (!rkmessage->err && rkt)
+ rd_kafka_interceptors_on_consume(rkt->rkt_rk,
+ rkmessage);
+ break;
+
+ default:
+ break;
+ }
+
+ return rkmessage;
+}
+
+
+
+/**
+ * @brief Get rkmessage from rkm (for EVENT_DR)
+ * @remark Must only be called just prior to passing a dr to the application.
+ */
+rd_kafka_message_t *rd_kafka_message_get_from_rkm(rd_kafka_op_t *rko,
+ rd_kafka_msg_t *rkm) {
+ return rd_kafka_message_setup(rko, &rkm->rkm_rkmessage);
+}
+
+/**
+ * @brief Convert rko to rkmessage
+ * @remark Must only be called just prior to passing a consumed message
+ * or event to the application.
+ * @remark Will trigger on_consume() interceptors, if any.
+ * @returns a rkmessage (bound to the rko).
+ */
+rd_kafka_message_t *rd_kafka_message_get(rd_kafka_op_t *rko) {
+ rd_kafka_message_t *rkmessage;
+
+ if (!rko)
+ return rd_kafka_message_new(); /* empty */
+
+ switch (rko->rko_type) {
+ case RD_KAFKA_OP_FETCH:
+ /* Use embedded rkmessage */
+ rkmessage = &rko->rko_u.fetch.rkm.rkm_rkmessage;
+ break;
+
+ case RD_KAFKA_OP_ERR:
+ case RD_KAFKA_OP_CONSUMER_ERR:
+ rkmessage = &rko->rko_u.err.rkm.rkm_rkmessage;
+ rkmessage->payload = rko->rko_u.err.errstr;
+ rkmessage->len =
+ rkmessage->payload ? strlen(rkmessage->payload) : 0;
+ rkmessage->offset = rko->rko_u.err.offset;
+ break;
+
+ default:
+ rd_kafka_assert(NULL, !*"unhandled optype");
+ RD_NOTREACHED();
+ return NULL;
+ }
+
+ return rd_kafka_message_setup(rko, rkmessage);
+}
+
+
+int64_t rd_kafka_message_timestamp(const rd_kafka_message_t *rkmessage,
+ rd_kafka_timestamp_type_t *tstype) {
+ rd_kafka_msg_t *rkm;
+
+ if (rkmessage->err) {
+ if (tstype)
+ *tstype = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;
+ return -1;
+ }
+
+ rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage);
+
+ if (tstype)
+ *tstype = rkm->rkm_tstype;
+
+ return rkm->rkm_timestamp;
+}
+
+
+int64_t rd_kafka_message_latency(const rd_kafka_message_t *rkmessage) {
+ rd_kafka_msg_t *rkm;
+
+ rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage);
+
+ if (unlikely(!rkm->rkm_ts_enq))
+ return -1;
+
+ return rd_clock() - rkm->rkm_ts_enq;
+}
+
+
+int32_t rd_kafka_message_broker_id(const rd_kafka_message_t *rkmessage) {
+ rd_kafka_msg_t *rkm;
+
+ rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage);
+
+ return rkm->rkm_broker_id;
+}
+
+
+
+/**
+ * @brief Parse serialized message headers and populate
+ * rkm->rkm_headers (which must be NULL).
+ */
+static rd_kafka_resp_err_t rd_kafka_msg_headers_parse(rd_kafka_msg_t *rkm) {
+ rd_kafka_buf_t *rkbuf;
+ int64_t HeaderCount;
+ const int log_decode_errors = 0;
+ rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__BAD_MSG;
+ int i;
+ rd_kafka_headers_t *hdrs = NULL;
+
+ rd_dassert(!rkm->rkm_headers);
+
+ if (RD_KAFKAP_BYTES_LEN(&rkm->rkm_u.consumer.binhdrs) == 0)
+ return RD_KAFKA_RESP_ERR__NOENT;
+
+ rkbuf = rd_kafka_buf_new_shadow(
+ rkm->rkm_u.consumer.binhdrs.data,
+ RD_KAFKAP_BYTES_LEN(&rkm->rkm_u.consumer.binhdrs), NULL);
+
+ rd_kafka_buf_read_varint(rkbuf, &HeaderCount);
+
+ if (HeaderCount <= 0) {
+ rd_kafka_buf_destroy(rkbuf);
+ return RD_KAFKA_RESP_ERR__NOENT;
+ } else if (unlikely(HeaderCount > 100000)) {
+ rd_kafka_buf_destroy(rkbuf);
+ return RD_KAFKA_RESP_ERR__BAD_MSG;
+ }
+
+ hdrs = rd_kafka_headers_new((size_t)HeaderCount);
+
+ for (i = 0; (int64_t)i < HeaderCount; i++) {
+ int64_t KeyLen, ValueLen;
+ const char *Key, *Value;
+
+ rd_kafka_buf_read_varint(rkbuf, &KeyLen);
+ rd_kafka_buf_read_ptr(rkbuf, &Key, (size_t)KeyLen);
+
+ rd_kafka_buf_read_varint(rkbuf, &ValueLen);
+ if (unlikely(ValueLen == -1))
+ Value = NULL;
+ else
+ rd_kafka_buf_read_ptr(rkbuf, &Value, (size_t)ValueLen);
+
+ rd_kafka_header_add(hdrs, Key, (ssize_t)KeyLen, Value,
+ (ssize_t)ValueLen);
+ }
+
+ rkm->rkm_headers = hdrs;
+
+ rd_kafka_buf_destroy(rkbuf);
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+
+err_parse:
+ err = rkbuf->rkbuf_err;
+ rd_kafka_buf_destroy(rkbuf);
+ if (hdrs)
+ rd_kafka_headers_destroy(hdrs);
+ return err;
+}
+
+
+
+rd_kafka_resp_err_t
+rd_kafka_message_headers(const rd_kafka_message_t *rkmessage,
+ rd_kafka_headers_t **hdrsp) {
+ rd_kafka_msg_t *rkm;
+ rd_kafka_resp_err_t err;
+
+ rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage);
+
+ if (rkm->rkm_headers) {
+ *hdrsp = rkm->rkm_headers;
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+ }
+
+ /* Producer (rkm_headers will be set if there were any headers) */
+ if (rkm->rkm_flags & RD_KAFKA_MSG_F_PRODUCER)
+ return RD_KAFKA_RESP_ERR__NOENT;
+
+ /* Consumer */
+
+ /* No previously parsed headers, check if the underlying
+ * protocol message had headers and if so, parse them. */
+ if (unlikely(!RD_KAFKAP_BYTES_LEN(&rkm->rkm_u.consumer.binhdrs)))
+ return RD_KAFKA_RESP_ERR__NOENT;
+
+ err = rd_kafka_msg_headers_parse(rkm);
+ if (unlikely(err))
+ return err;
+
+ *hdrsp = rkm->rkm_headers;
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+rd_kafka_resp_err_t
+rd_kafka_message_detach_headers(rd_kafka_message_t *rkmessage,
+ rd_kafka_headers_t **hdrsp) {
+ rd_kafka_msg_t *rkm;
+ rd_kafka_resp_err_t err;
+
+ err = rd_kafka_message_headers(rkmessage, hdrsp);
+ if (err)
+ return err;
+
+ rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage);
+ rkm->rkm_headers = NULL;
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+void rd_kafka_message_set_headers(rd_kafka_message_t *rkmessage,
+ rd_kafka_headers_t *hdrs) {
+ rd_kafka_msg_t *rkm;
+
+ rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage);
+
+ if (rkm->rkm_headers) {
+ assert(rkm->rkm_headers != hdrs);
+ rd_kafka_headers_destroy(rkm->rkm_headers);
+ }
+
+ rkm->rkm_headers = hdrs;
+}
+
+
+
+rd_kafka_msg_status_t
+rd_kafka_message_status(const rd_kafka_message_t *rkmessage) {
+ rd_kafka_msg_t *rkm;
+
+ rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage);
+
+ return rkm->rkm_status;
+}
+
+
+int32_t rd_kafka_message_leader_epoch(const rd_kafka_message_t *rkmessage) {
+ rd_kafka_msg_t *rkm;
+
+ if (unlikely(!rkmessage->rkt ||
+ rkmessage->rkt->rkt_rk->rk_type != RD_KAFKA_CONSUMER))
+ return -1;
+
+ rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage);
+
+ return rkm->rkm_u.consumer.leader_epoch;
+}
+
+
+void rd_kafka_msgq_dump(FILE *fp, const char *what, rd_kafka_msgq_t *rkmq) {
+ rd_kafka_msg_t *rkm;
+ int cnt = 0;
+
+ fprintf(fp, "%s msgq_dump (%d messages, %" PRIusz " bytes):\n", what,
+ rd_kafka_msgq_len(rkmq), rd_kafka_msgq_size(rkmq));
+ TAILQ_FOREACH(rkm, &rkmq->rkmq_msgs, rkm_link) {
+ fprintf(fp,
+ " [%" PRId32 "]@%" PRId64 ": rkm msgid %" PRIu64
+ ": \"%.*s\"\n",
+ rkm->rkm_partition, rkm->rkm_offset,
+ rkm->rkm_u.producer.msgid, (int)rkm->rkm_len,
+ (const char *)rkm->rkm_payload);
+ rd_assert(cnt++ < rkmq->rkmq_msg_cnt);
+ }
+}
+
+
+
+/**
+ * @brief Destroy resources associated with msgbatch
+ */
+void rd_kafka_msgbatch_destroy(rd_kafka_msgbatch_t *rkmb) {
+ if (rkmb->rktp) {
+ rd_kafka_toppar_destroy(rkmb->rktp);
+ rkmb->rktp = NULL;
+ }
+
+ rd_assert(RD_KAFKA_MSGQ_EMPTY(&rkmb->msgq));
+}
+
+
+/**
+ * @brief Initialize a message batch for the Idempotent Producer.
+ */
+void rd_kafka_msgbatch_init(rd_kafka_msgbatch_t *rkmb,
+ rd_kafka_toppar_t *rktp,
+ rd_kafka_pid_t pid,
+ uint64_t epoch_base_msgid) {
+ memset(rkmb, 0, sizeof(*rkmb));
+
+ rkmb->rktp = rd_kafka_toppar_keep(rktp);
+
+ rd_kafka_msgq_init(&rkmb->msgq);
+
+ rkmb->pid = pid;
+ rkmb->first_seq = -1;
+ rkmb->epoch_base_msgid = epoch_base_msgid;
+}
+
+
+/**
+ * @brief Set the first message in the batch. which is used to set
+ * the BaseSequence and keep track of batch reconstruction range.
+ *
+ * @param rkm is the first message in the batch.
+ */
+void rd_kafka_msgbatch_set_first_msg(rd_kafka_msgbatch_t *rkmb,
+ rd_kafka_msg_t *rkm) {
+ rd_assert(rkmb->first_msgid == 0);
+
+ if (!rd_kafka_pid_valid(rkmb->pid))
+ return;
+
+ rkmb->first_msgid = rkm->rkm_u.producer.msgid;
+
+ /* Our msgid counter is 64-bits, but the
+ * Kafka protocol's sequence is only 31 (signed), so we'll
+ * need to handle wrapping. */
+ rkmb->first_seq = rd_kafka_seq_wrap(rkm->rkm_u.producer.msgid -
+ rkmb->epoch_base_msgid);
+
+ /* Check if there is a stored last message
+ * on the first msg, which means an entire
+ * batch of messages are being retried and
+ * we need to maintain the exact messages
+ * of the original batch.
+ * Simply tracking the last message, on
+ * the first message, is sufficient for now.
+ * Will be 0 if not applicable. */
+ rkmb->last_msgid = rkm->rkm_u.producer.last_msgid;
+}
+
+
+
+/**
+ * @brief Message batch is ready to be transmitted.
+ *
+ * @remark This function assumes the batch will be transmitted and increases
+ * the toppar's in-flight count.
+ */
+void rd_kafka_msgbatch_ready_produce(rd_kafka_msgbatch_t *rkmb) {
+ rd_kafka_toppar_t *rktp = rkmb->rktp;
+ rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
+
+ /* Keep track of number of requests in-flight per partition,
+ * and the number of partitions with in-flight requests when
+ * idempotent producer - this is used to drain partitions
+ * before resetting the PID. */
+ if (rd_atomic32_add(&rktp->rktp_msgs_inflight,
+ rd_kafka_msgq_len(&rkmb->msgq)) ==
+ rd_kafka_msgq_len(&rkmb->msgq) &&
+ rd_kafka_is_idempotent(rk))
+ rd_kafka_idemp_inflight_toppar_add(rk, rktp);
+}
+
+
+
+/**
+ * @brief Allow queue wakeups after \p abstime, or when the
+ * given \p batch_msg_cnt or \p batch_msg_bytes have been reached.
+ *
+ * @param rkmq Queue to monitor and set wakeup parameters on.
+ * @param dest_rkmq Destination queue used to meter current queue depths
+ * and oldest message. May be the same as \p rkmq but is
+ * typically the rktp_xmit_msgq.
+ * @param next_wakeup If non-NULL: update the caller's next scheduler wakeup
+ * according to the wakeup time calculated by this function.
+ * @param now The current time.
+ * @param linger_us The configured queue linger / batching time.
+ * @param batch_msg_cnt Queue threshold before signalling.
+ * @param batch_msg_bytes Queue threshold before signalling.
+ *
+ * @returns true if the wakeup conditions are already met and messages are ready
+ * to be sent, else false.
+ *
+ * @locks_required rd_kafka_toppar_lock()
+ *
+ *
+ * Producer queue and broker thread wake-up behaviour.
+ *
+ * There are contradicting requirements at play here:
+ * - Latency: queued messages must be batched and sent according to
+ * batch size and linger.ms configuration.
+ * - Wakeups: keep the number of thread wake-ups to a minimum to avoid
+ * high CPU utilization and context switching.
+ *
+ * The message queue (rd_kafka_msgq_t) has functionality for the writer (app)
+ * to wake up the reader (broker thread) when there's a new message added.
+ * This wakeup is done thru a combination of cndvar signalling and IO writes
+ * to make sure a thread wakeup is triggered regardless if the broker thread
+ * is blocking on cnd_timedwait() or on IO poll.
+ * When the broker thread is woken up it will scan all the partitions it is
+ * the leader for to check if there are messages to be sent - all according
+ * to the configured batch size and linger.ms - and then decide its next
+ * wait time depending on the lowest remaining linger.ms setting of any
+ * partition with messages enqueued.
+ *
+ * This wait time must also be set as a threshold on the message queue, telling
+ * the writer (app) that it must not trigger a wakeup until the wait time
+ * has expired, or the batch sizes have been exceeded.
+ *
+ * The message queue wakeup time is per partition, while the broker thread
+ * wakeup time is the lowest of all its partitions' wakeup times.
+ *
+ * The per-partition wakeup constraints are calculated and set by
+ * rd_kafka_msgq_allow_wakeup_at() which is called from the broker thread's
+ * per-partition handler.
+ * This function is called each time there are changes to the broker-local
+ * partition transmit queue (rktp_xmit_msgq), such as:
+ * - messages are moved from the partition queue (rktp_msgq) to rktp_xmit_msgq
+ * - messages are moved to a ProduceRequest
+ * - messages are timed out from the rktp_xmit_msgq
+ * - the flushing state changed (rd_kafka_flush() is called or returned).
+ *
+ * If none of these things happen, the broker thread will simply read the
+ * last stored wakeup time for each partition and use that for calculating its
+ * minimum wait time.
+ *
+ *
+ * On the writer side, namely the application calling rd_kafka_produce(), the
+ * followings checks are performed to see if it may trigger a wakeup when
+ * it adds a new message to the partition queue:
+ * - the current time has reached the wakeup time (e.g., remaining linger.ms
+ * has expired), or
+ * - with the new message(s) being added, either the batch.size or
+ * batch.num.messages thresholds have been exceeded, or
+ * - the application is calling rd_kafka_flush(),
+ * - and no wakeup has been signalled yet. This is critical since it may take
+ * some time for the broker thread to do its work we'll want to avoid
+ * flooding it with wakeups. So a wakeup is only sent once per
+ * wakeup period.
+ */
+rd_bool_t rd_kafka_msgq_allow_wakeup_at(rd_kafka_msgq_t *rkmq,
+ const rd_kafka_msgq_t *dest_rkmq,
+ rd_ts_t *next_wakeup,
+ rd_ts_t now,
+ rd_ts_t linger_us,
+ int32_t batch_msg_cnt,
+ int64_t batch_msg_bytes) {
+ int32_t msg_cnt = rd_kafka_msgq_len(dest_rkmq);
+ int64_t msg_bytes = rd_kafka_msgq_size(dest_rkmq);
+
+ if (RD_KAFKA_MSGQ_EMPTY(dest_rkmq)) {
+ rkmq->rkmq_wakeup.on_first = rd_true;
+ rkmq->rkmq_wakeup.abstime = now + linger_us;
+ /* Leave next_wakeup untouched since the queue is empty */
+ msg_cnt = 0;
+ msg_bytes = 0;
+ } else {
+ const rd_kafka_msg_t *rkm = rd_kafka_msgq_first(dest_rkmq);
+
+ rkmq->rkmq_wakeup.on_first = rd_false;
+
+ if (unlikely(rkm->rkm_u.producer.ts_backoff > now)) {
+ /* Honour retry.backoff.ms:
+ * wait for backoff to expire */
+ rkmq->rkmq_wakeup.abstime =
+ rkm->rkm_u.producer.ts_backoff;
+ } else {
+ /* Use message's produce() time + linger.ms */
+ rkmq->rkmq_wakeup.abstime =
+ rd_kafka_msg_enq_time(rkm) + linger_us;
+ if (rkmq->rkmq_wakeup.abstime <= now)
+ rkmq->rkmq_wakeup.abstime = now;
+ }
+
+ /* Update the caller's scheduler wakeup time */
+ if (next_wakeup && rkmq->rkmq_wakeup.abstime < *next_wakeup)
+ *next_wakeup = rkmq->rkmq_wakeup.abstime;
+
+ msg_cnt = rd_kafka_msgq_len(dest_rkmq);
+ msg_bytes = rd_kafka_msgq_size(dest_rkmq);
+ }
+
+ /*
+ * If there are more messages or bytes in queue than the batch limits,
+ * or the linger time has been exceeded,
+ * then there is no need for wakeup since the broker thread will
+ * produce those messages as quickly as it can.
+ */
+ if (msg_cnt >= batch_msg_cnt || msg_bytes >= batch_msg_bytes ||
+ (msg_cnt > 0 && now >= rkmq->rkmq_wakeup.abstime)) {
+ /* Prevent further signalling */
+ rkmq->rkmq_wakeup.signalled = rd_true;
+
+ /* Batch is ready */
+ return rd_true;
+ }
+
+ /* If the current msg or byte count is less than the batch limit
+ * then set the rkmq count to the remaining count or size to
+ * reach the batch limits.
+ * This is for the case where the producer is waiting for more
+ * messages to accumulate into a batch. The wakeup should only
+ * occur once a threshold is reached or the abstime has expired.
+ */
+ rkmq->rkmq_wakeup.signalled = rd_false;
+ rkmq->rkmq_wakeup.msg_cnt = batch_msg_cnt - msg_cnt;
+ rkmq->rkmq_wakeup.msg_bytes = batch_msg_bytes - msg_bytes;
+
+ return rd_false;
+}
+
+
+
+/**
+ * @brief Verify order (by msgid) in message queue.
+ * For development use only.
+ */
+void rd_kafka_msgq_verify_order0(const char *function,
+ int line,
+ const rd_kafka_toppar_t *rktp,
+ const rd_kafka_msgq_t *rkmq,
+ uint64_t exp_first_msgid,
+ rd_bool_t gapless) {
+ const rd_kafka_msg_t *rkm;
+ uint64_t exp;
+ int errcnt = 0;
+ int cnt = 0;
+ const char *topic = rktp ? rktp->rktp_rkt->rkt_topic->str : "n/a";
+ int32_t partition = rktp ? rktp->rktp_partition : -1;
+
+ if (rd_kafka_msgq_len(rkmq) == 0)
+ return;
+
+ if (exp_first_msgid)
+ exp = exp_first_msgid;
+ else {
+ exp = rd_kafka_msgq_first(rkmq)->rkm_u.producer.msgid;
+ if (exp == 0) /* message without msgid (e.g., UA partition) */
+ return;
+ }
+
+ TAILQ_FOREACH(rkm, &rkmq->rkmq_msgs, rkm_link) {
+#if 0
+ printf("%s:%d: %s [%"PRId32"]: rkm #%d (%p) "
+ "msgid %"PRIu64"\n",
+ function, line,
+ topic, partition,
+ cnt, rkm, rkm->rkm_u.producer.msgid);
+#endif
+ if (gapless && rkm->rkm_u.producer.msgid != exp) {
+ printf("%s:%d: %s [%" PRId32
+ "]: rkm #%d (%p) "
+ "msgid %" PRIu64
+ ": "
+ "expected msgid %" PRIu64 "\n",
+ function, line, topic, partition, cnt, rkm,
+ rkm->rkm_u.producer.msgid, exp);
+ errcnt++;
+ } else if (!gapless && rkm->rkm_u.producer.msgid < exp) {
+ printf("%s:%d: %s [%" PRId32
+ "]: rkm #%d (%p) "
+ "msgid %" PRIu64
+ ": "
+ "expected increased msgid >= %" PRIu64 "\n",
+ function, line, topic, partition, cnt, rkm,
+ rkm->rkm_u.producer.msgid, exp);
+ errcnt++;
+ } else
+ exp++;
+
+ if (cnt >= rkmq->rkmq_msg_cnt) {
+ printf("%s:%d: %s [%" PRId32
+ "]: rkm #%d (%p) "
+ "msgid %" PRIu64 ": loop in queue?\n",
+ function, line, topic, partition, cnt, rkm,
+ rkm->rkm_u.producer.msgid);
+ errcnt++;
+ break;
+ }
+
+ cnt++;
+ }
+
+ rd_assert(!errcnt);
+}
+
+
+
+/**
+ * @name Unit tests
+ */
+
+/**
+ * @brief Unittest: message allocator
+ */
+rd_kafka_msg_t *ut_rd_kafka_msg_new(size_t msgsize) {
+ rd_kafka_msg_t *rkm;
+
+ rkm = rd_calloc(1, sizeof(*rkm));
+ rkm->rkm_flags = RD_KAFKA_MSG_F_FREE_RKM;
+ rkm->rkm_offset = RD_KAFKA_OFFSET_INVALID;
+ rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;
+
+ if (msgsize) {
+ rd_assert(msgsize <= sizeof(*rkm));
+ rkm->rkm_payload = rkm;
+ rkm->rkm_len = msgsize;
+ }
+
+ return rkm;
+}
+
+
+
+/**
+ * @brief Unittest: destroy all messages in queue
+ */
+void ut_rd_kafka_msgq_purge(rd_kafka_msgq_t *rkmq) {
+ rd_kafka_msg_t *rkm, *tmp;
+
+ TAILQ_FOREACH_SAFE(rkm, &rkmq->rkmq_msgs, rkm_link, tmp)
+ rd_kafka_msg_destroy(NULL, rkm);
+
+
+ rd_kafka_msgq_init(rkmq);
+}
+
+
+
+static int ut_verify_msgq_order(const char *what,
+ const rd_kafka_msgq_t *rkmq,
+ uint64_t first,
+ uint64_t last,
+ rd_bool_t req_consecutive) {
+ const rd_kafka_msg_t *rkm;
+ uint64_t expected = first;
+ int incr = first < last ? +1 : -1;
+ int fails = 0;
+ int cnt = 0;
+
+ TAILQ_FOREACH(rkm, &rkmq->rkmq_msgs, rkm_link) {
+ if ((req_consecutive &&
+ rkm->rkm_u.producer.msgid != expected) ||
+ (!req_consecutive &&
+ rkm->rkm_u.producer.msgid < expected)) {
+ if (fails++ < 100)
+ RD_UT_SAY("%s: expected msgid %s %" PRIu64
+ " not %" PRIu64 " at index #%d",
+ what, req_consecutive ? "==" : ">=",
+ expected, rkm->rkm_u.producer.msgid,
+ cnt);
+ }
+
+ cnt++;
+ expected += incr;
+
+ if (cnt > rkmq->rkmq_msg_cnt) {
+ RD_UT_SAY("%s: loop in queue?", what);
+ fails++;
+ break;
+ }
+ }
+
+ RD_UT_ASSERT(!fails, "See %d previous failure(s)", fails);
+ return fails;
+}
+
+/**
+ * @brief Verify ordering comparator for message queues.
+ */
+static int unittest_msgq_order(const char *what,
+ int fifo,
+ int (*cmp)(const void *, const void *)) {
+ rd_kafka_msgq_t rkmq = RD_KAFKA_MSGQ_INITIALIZER(rkmq);
+ rd_kafka_msg_t *rkm;
+ rd_kafka_msgq_t sendq, sendq2;
+ const size_t msgsize = 100;
+ int i;
+
+ RD_UT_SAY("%s: testing in %s mode", what, fifo ? "FIFO" : "LIFO");
+
+ for (i = 1; i <= 6; i++) {
+ rkm = ut_rd_kafka_msg_new(msgsize);
+ rkm->rkm_u.producer.msgid = i;
+ rd_kafka_msgq_enq_sorted0(&rkmq, rkm, cmp);
+ }
+
+ if (fifo) {
+ if (ut_verify_msgq_order("added", &rkmq, 1, 6, rd_true))
+ return 1;
+ } else {
+ if (ut_verify_msgq_order("added", &rkmq, 6, 1, rd_true))
+ return 1;
+ }
+
+ /* Move 3 messages to "send" queue which we then re-insert
+ * in the original queue (i.e., "retry"). */
+ rd_kafka_msgq_init(&sendq);
+ while (rd_kafka_msgq_len(&sendq) < 3)
+ rd_kafka_msgq_enq(&sendq, rd_kafka_msgq_pop(&rkmq));
+
+ if (fifo) {
+ if (ut_verify_msgq_order("send removed", &rkmq, 4, 6, rd_true))
+ return 1;
+
+ if (ut_verify_msgq_order("sendq", &sendq, 1, 3, rd_true))
+ return 1;
+ } else {
+ if (ut_verify_msgq_order("send removed", &rkmq, 3, 1, rd_true))
+ return 1;
+
+ if (ut_verify_msgq_order("sendq", &sendq, 6, 4, rd_true))
+ return 1;
+ }
+
+ /* Retry the messages, which moves them back to sendq
+ * maintaining the original order */
+ rd_kafka_retry_msgq(&rkmq, &sendq, 1, 1, 0,
+ RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp);
+
+ RD_UT_ASSERT(rd_kafka_msgq_len(&sendq) == 0,
+ "sendq FIFO should be empty, not contain %d messages",
+ rd_kafka_msgq_len(&sendq));
+
+ if (fifo) {
+ if (ut_verify_msgq_order("readded", &rkmq, 1, 6, rd_true))
+ return 1;
+ } else {
+ if (ut_verify_msgq_order("readded", &rkmq, 6, 1, rd_true))
+ return 1;
+ }
+
+ /* Move 4 first messages to to "send" queue, then
+ * retry them with max_retries=1 which should now fail for
+ * the 3 first messages that were already retried. */
+ rd_kafka_msgq_init(&sendq);
+ while (rd_kafka_msgq_len(&sendq) < 4)
+ rd_kafka_msgq_enq(&sendq, rd_kafka_msgq_pop(&rkmq));
+
+ if (fifo) {
+ if (ut_verify_msgq_order("send removed #2", &rkmq, 5, 6,
+ rd_true))
+ return 1;
+
+ if (ut_verify_msgq_order("sendq #2", &sendq, 1, 4, rd_true))
+ return 1;
+ } else {
+ if (ut_verify_msgq_order("send removed #2", &rkmq, 2, 1,
+ rd_true))
+ return 1;
+
+ if (ut_verify_msgq_order("sendq #2", &sendq, 6, 3, rd_true))
+ return 1;
+ }
+
+ /* Retry the messages, which should now keep the 3 first messages
+ * on sendq (no more retries) and just number 4 moved back. */
+ rd_kafka_retry_msgq(&rkmq, &sendq, 1, 1, 0,
+ RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp);
+
+ if (fifo) {
+ if (ut_verify_msgq_order("readded #2", &rkmq, 4, 6, rd_true))
+ return 1;
+
+ if (ut_verify_msgq_order("no more retries", &sendq, 1, 3,
+ rd_true))
+ return 1;
+
+ } else {
+ if (ut_verify_msgq_order("readded #2", &rkmq, 3, 1, rd_true))
+ return 1;
+
+ if (ut_verify_msgq_order("no more retries", &sendq, 6, 4,
+ rd_true))
+ return 1;
+ }
+
+ /* Move all messages back on rkmq */
+ rd_kafka_retry_msgq(&rkmq, &sendq, 0, 1000, 0,
+ RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp);
+
+
+ /* Move first half of messages to sendq (1,2,3).
+ * Move second half o messages to sendq2 (4,5,6).
+ * Add new message to rkmq (7).
+ * Move first half of messages back on rkmq (1,2,3,7).
+ * Move second half back on the rkmq (1,2,3,4,5,6,7). */
+ rd_kafka_msgq_init(&sendq);
+ rd_kafka_msgq_init(&sendq2);
+
+ while (rd_kafka_msgq_len(&sendq) < 3)
+ rd_kafka_msgq_enq(&sendq, rd_kafka_msgq_pop(&rkmq));
+
+ while (rd_kafka_msgq_len(&sendq2) < 3)
+ rd_kafka_msgq_enq(&sendq2, rd_kafka_msgq_pop(&rkmq));
+
+ rkm = ut_rd_kafka_msg_new(msgsize);
+ rkm->rkm_u.producer.msgid = i;
+ rd_kafka_msgq_enq_sorted0(&rkmq, rkm, cmp);
+
+ rd_kafka_retry_msgq(&rkmq, &sendq, 0, 1000, 0,
+ RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp);
+ rd_kafka_retry_msgq(&rkmq, &sendq2, 0, 1000, 0,
+ RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp);
+
+ RD_UT_ASSERT(rd_kafka_msgq_len(&sendq) == 0,
+ "sendq FIFO should be empty, not contain %d messages",
+ rd_kafka_msgq_len(&sendq));
+ RD_UT_ASSERT(rd_kafka_msgq_len(&sendq2) == 0,
+ "sendq2 FIFO should be empty, not contain %d messages",
+ rd_kafka_msgq_len(&sendq2));
+
+ if (fifo) {
+ if (ut_verify_msgq_order("inject", &rkmq, 1, 7, rd_true))
+ return 1;
+ } else {
+ if (ut_verify_msgq_order("readded #2", &rkmq, 7, 1, rd_true))
+ return 1;
+ }
+
+ RD_UT_ASSERT(rd_kafka_msgq_size(&rkmq) ==
+ rd_kafka_msgq_len(&rkmq) * msgsize,
+ "expected msgq size %" PRIusz ", not %" PRIusz,
+ (size_t)rd_kafka_msgq_len(&rkmq) * msgsize,
+ rd_kafka_msgq_size(&rkmq));
+
+
+ ut_rd_kafka_msgq_purge(&sendq);
+ ut_rd_kafka_msgq_purge(&sendq2);
+ ut_rd_kafka_msgq_purge(&rkmq);
+
+ return 0;
+}
+
+/**
+ * @brief Verify that rd_kafka_seq_wrap() works.
+ */
+static int unittest_msg_seq_wrap(void) {
+ static const struct exp {
+ int64_t in;
+ int32_t out;
+ } exp[] = {
+ {0, 0},
+ {1, 1},
+ {(int64_t)INT32_MAX + 2, 1},
+ {(int64_t)INT32_MAX + 1, 0},
+ {INT32_MAX, INT32_MAX},
+ {INT32_MAX - 1, INT32_MAX - 1},
+ {INT32_MAX - 2, INT32_MAX - 2},
+ {((int64_t)1 << 33) - 2, INT32_MAX - 1},
+ {((int64_t)1 << 33) - 1, INT32_MAX},
+ {((int64_t)1 << 34), 0},
+ {((int64_t)1 << 35) + 3, 3},
+ {1710 + 1229, 2939},
+ {-1, -1},
+ };
+ int i;
+
+ for (i = 0; exp[i].in != -1; i++) {
+ int32_t wseq = rd_kafka_seq_wrap(exp[i].in);
+ RD_UT_ASSERT(wseq == exp[i].out,
+ "Expected seq_wrap(%" PRId64 ") -> %" PRId32
+ ", not %" PRId32,
+ exp[i].in, exp[i].out, wseq);
+ }
+
+ RD_UT_PASS();
+}
+
+
+/**
+ * @brief Populate message queue with message ids from lo..hi (inclusive)
+ */
+static void ut_msgq_populate(rd_kafka_msgq_t *rkmq,
+ uint64_t lo,
+ uint64_t hi,
+ size_t msgsize) {
+ uint64_t i;
+
+ for (i = lo; i <= hi; i++) {
+ rd_kafka_msg_t *rkm = ut_rd_kafka_msg_new(msgsize);
+ rkm->rkm_u.producer.msgid = i;
+ rd_kafka_msgq_enq(rkmq, rkm);
+ }
+}
+
+
+struct ut_msg_range {
+ uint64_t lo;
+ uint64_t hi;
+};
+
+/**
+ * @brief Verify that msgq insert sorts are optimized. Issue #2508.
+ * All source ranges are combined into a single queue before insert.
+ */
+static int
+unittest_msgq_insert_all_sort(const char *what,
+ double max_us_per_msg,
+ double *ret_us_per_msg,
+ const struct ut_msg_range *src_ranges,
+ const struct ut_msg_range *dest_ranges) {
+ rd_kafka_msgq_t destq, srcq;
+ int i;
+ uint64_t lo = UINT64_MAX, hi = 0;
+ uint64_t cnt = 0;
+ const size_t msgsize = 100;
+ size_t totsize = 0;
+ rd_ts_t ts;
+ double us_per_msg;
+
+ RD_UT_SAY("Testing msgq insert (all) efficiency: %s", what);
+
+ rd_kafka_msgq_init(&destq);
+ rd_kafka_msgq_init(&srcq);
+
+ for (i = 0; src_ranges[i].hi > 0; i++) {
+ uint64_t this_cnt;
+
+ ut_msgq_populate(&srcq, src_ranges[i].lo, src_ranges[i].hi,
+ msgsize);
+ if (src_ranges[i].lo < lo)
+ lo = src_ranges[i].lo;
+ if (src_ranges[i].hi > hi)
+ hi = src_ranges[i].hi;
+ this_cnt = (src_ranges[i].hi - src_ranges[i].lo) + 1;
+ cnt += this_cnt;
+ totsize += msgsize * (size_t)this_cnt;
+ }
+
+ for (i = 0; dest_ranges[i].hi > 0; i++) {
+ uint64_t this_cnt;
+
+ ut_msgq_populate(&destq, dest_ranges[i].lo, dest_ranges[i].hi,
+ msgsize);
+ if (dest_ranges[i].lo < lo)
+ lo = dest_ranges[i].lo;
+ if (dest_ranges[i].hi > hi)
+ hi = dest_ranges[i].hi;
+ this_cnt = (dest_ranges[i].hi - dest_ranges[i].lo) + 1;
+ cnt += this_cnt;
+ totsize += msgsize * (size_t)this_cnt;
+ }
+
+ RD_UT_SAY("Begin insert of %d messages into destq with %d messages",
+ rd_kafka_msgq_len(&srcq), rd_kafka_msgq_len(&destq));
+
+ ts = rd_clock();
+ rd_kafka_msgq_insert_msgq(&destq, &srcq, rd_kafka_msg_cmp_msgid);
+ ts = rd_clock() - ts;
+ us_per_msg = (double)ts / (double)cnt;
+
+ RD_UT_SAY("Done: took %" PRId64 "us, %.4fus/msg", ts, us_per_msg);
+
+ RD_UT_ASSERT(rd_kafka_msgq_len(&srcq) == 0,
+ "srcq should be empty, but contains %d messages",
+ rd_kafka_msgq_len(&srcq));
+ RD_UT_ASSERT(rd_kafka_msgq_len(&destq) == (int)cnt,
+ "destq should contain %d messages, not %d", (int)cnt,
+ rd_kafka_msgq_len(&destq));
+
+ if (ut_verify_msgq_order("after", &destq, lo, hi, rd_false))
+ return 1;
+
+ RD_UT_ASSERT(rd_kafka_msgq_size(&destq) == totsize,
+ "expected destq size to be %" PRIusz
+ " bytes, not %" PRIusz,
+ totsize, rd_kafka_msgq_size(&destq));
+
+ ut_rd_kafka_msgq_purge(&srcq);
+ ut_rd_kafka_msgq_purge(&destq);
+
+ if (!rd_unittest_slow)
+ RD_UT_ASSERT(!(us_per_msg > max_us_per_msg + 0.0001),
+ "maximum us/msg exceeded: %.4f > %.4f us/msg",
+ us_per_msg, max_us_per_msg);
+ else if (us_per_msg > max_us_per_msg + 0.0001)
+ RD_UT_WARN("maximum us/msg exceeded: %.4f > %.4f us/msg",
+ us_per_msg, max_us_per_msg);
+
+ if (ret_us_per_msg)
+ *ret_us_per_msg = us_per_msg;
+
+ RD_UT_PASS();
+}
+
+
+/**
+ * @brief Verify that msgq insert sorts are optimized. Issue #2508.
+ * Inserts each source range individually.
+ */
+static int
+unittest_msgq_insert_each_sort(const char *what,
+ double max_us_per_msg,
+ double *ret_us_per_msg,
+ const struct ut_msg_range *src_ranges,
+ const struct ut_msg_range *dest_ranges) {
+ rd_kafka_msgq_t destq;
+ int i;
+ uint64_t lo = UINT64_MAX, hi = 0;
+ uint64_t cnt = 0;
+ uint64_t scnt = 0;
+ const size_t msgsize = 100;
+ size_t totsize = 0;
+ double us_per_msg;
+ rd_ts_t accum_ts = 0;
+
+ RD_UT_SAY("Testing msgq insert (each) efficiency: %s", what);
+
+ rd_kafka_msgq_init(&destq);
+
+ for (i = 0; dest_ranges[i].hi > 0; i++) {
+ uint64_t this_cnt;
+
+ ut_msgq_populate(&destq, dest_ranges[i].lo, dest_ranges[i].hi,
+ msgsize);
+ if (dest_ranges[i].lo < lo)
+ lo = dest_ranges[i].lo;
+ if (dest_ranges[i].hi > hi)
+ hi = dest_ranges[i].hi;
+ this_cnt = (dest_ranges[i].hi - dest_ranges[i].lo) + 1;
+ cnt += this_cnt;
+ totsize += msgsize * (size_t)this_cnt;
+ }
+
+
+ for (i = 0; src_ranges[i].hi > 0; i++) {
+ rd_kafka_msgq_t srcq;
+ uint64_t this_cnt;
+ rd_ts_t ts;
+
+ rd_kafka_msgq_init(&srcq);
+
+ ut_msgq_populate(&srcq, src_ranges[i].lo, src_ranges[i].hi,
+ msgsize);
+ if (src_ranges[i].lo < lo)
+ lo = src_ranges[i].lo;
+ if (src_ranges[i].hi > hi)
+ hi = src_ranges[i].hi;
+ this_cnt = (src_ranges[i].hi - src_ranges[i].lo) + 1;
+ cnt += this_cnt;
+ scnt += this_cnt;
+ totsize += msgsize * (size_t)this_cnt;
+
+ RD_UT_SAY(
+ "Begin insert of %d messages into destq with "
+ "%d messages",
+ rd_kafka_msgq_len(&srcq), rd_kafka_msgq_len(&destq));
+
+ ts = rd_clock();
+ rd_kafka_msgq_insert_msgq(&destq, &srcq,
+ rd_kafka_msg_cmp_msgid);
+ ts = rd_clock() - ts;
+ accum_ts += ts;
+
+ RD_UT_SAY("Done: took %" PRId64 "us, %.4fus/msg", ts,
+ (double)ts / (double)this_cnt);
+
+ RD_UT_ASSERT(rd_kafka_msgq_len(&srcq) == 0,
+ "srcq should be empty, but contains %d messages",
+ rd_kafka_msgq_len(&srcq));
+ RD_UT_ASSERT(rd_kafka_msgq_len(&destq) == (int)cnt,
+ "destq should contain %d messages, not %d",
+ (int)cnt, rd_kafka_msgq_len(&destq));
+
+ if (ut_verify_msgq_order("after", &destq, lo, hi, rd_false))
+ return 1;
+
+ RD_UT_ASSERT(rd_kafka_msgq_size(&destq) == totsize,
+ "expected destq size to be %" PRIusz
+ " bytes, not %" PRIusz,
+ totsize, rd_kafka_msgq_size(&destq));
+
+ ut_rd_kafka_msgq_purge(&srcq);
+ }
+
+ ut_rd_kafka_msgq_purge(&destq);
+
+ us_per_msg = (double)accum_ts / (double)scnt;
+
+ RD_UT_SAY("Total: %.4fus/msg over %" PRId64 " messages in %" PRId64
+ "us",
+ us_per_msg, scnt, accum_ts);
+
+ if (!rd_unittest_slow)
+ RD_UT_ASSERT(!(us_per_msg > max_us_per_msg + 0.0001),
+ "maximum us/msg exceeded: %.4f > %.4f us/msg",
+ us_per_msg, max_us_per_msg);
+ else if (us_per_msg > max_us_per_msg + 0.0001)
+ RD_UT_WARN("maximum us/msg exceeded: %.4f > %.4f us/msg",
+ us_per_msg, max_us_per_msg);
+
+
+ if (ret_us_per_msg)
+ *ret_us_per_msg = us_per_msg;
+
+ RD_UT_PASS();
+}
+
+
+
+/**
+ * @brief Calls both insert_all and insert_each
+ */
+static int unittest_msgq_insert_sort(const char *what,
+ double max_us_per_msg,
+ double *ret_us_per_msg,
+ const struct ut_msg_range *src_ranges,
+ const struct ut_msg_range *dest_ranges) {
+ double ret_all = 0.0, ret_each = 0.0;
+ int r;
+
+ r = unittest_msgq_insert_all_sort(what, max_us_per_msg, &ret_all,
+ src_ranges, dest_ranges);
+ if (r)
+ return r;
+
+ r = unittest_msgq_insert_each_sort(what, max_us_per_msg, &ret_each,
+ src_ranges, dest_ranges);
+ if (r)
+ return r;
+
+ if (ret_us_per_msg)
+ *ret_us_per_msg = RD_MAX(ret_all, ret_each);
+
+ return 0;
+}
+
+
+int unittest_msg(void) {
+ int fails = 0;
+ double insert_baseline = 0.0;
+
+ fails += unittest_msgq_order("FIFO", 1, rd_kafka_msg_cmp_msgid);
+ fails += unittest_msg_seq_wrap();
+
+ fails += unittest_msgq_insert_sort(
+ "get baseline insert time", 100000.0, &insert_baseline,
+ (const struct ut_msg_range[]) {{1, 1}, {3, 3}, {0, 0}},
+ (const struct ut_msg_range[]) {{2, 2}, {4, 4}, {0, 0}});
+
+ /* Allow some wiggle room in baseline time. */
+ if (insert_baseline < 0.1)
+ insert_baseline = 0.2;
+ insert_baseline *= 3;
+
+ fails += unittest_msgq_insert_sort(
+ "single-message ranges", insert_baseline, NULL,
+ (const struct ut_msg_range[]) {
+ {2, 2}, {4, 4}, {9, 9}, {33692864, 33692864}, {0, 0}},
+ (const struct ut_msg_range[]) {{1, 1},
+ {3, 3},
+ {5, 5},
+ {10, 10},
+ {33692865, 33692865},
+ {0, 0}});
+ fails += unittest_msgq_insert_sort(
+ "many messages", insert_baseline, NULL,
+ (const struct ut_msg_range[]) {{100000, 200000},
+ {400000, 450000},
+ {900000, 920000},
+ {33692864, 33751992},
+ {33906868, 33993690},
+ {40000000, 44000000},
+ {0, 0}},
+ (const struct ut_msg_range[]) {{1, 199},
+ {350000, 360000},
+ {500000, 500010},
+ {1000000, 1000200},
+ {33751993, 33906867},
+ {50000001, 50000001},
+ {0, 0}});
+ fails += unittest_msgq_insert_sort(
+ "issue #2508", insert_baseline, NULL,
+ (const struct ut_msg_range[]) {
+ {33692864, 33751992}, {33906868, 33993690}, {0, 0}},
+ (const struct ut_msg_range[]) {{33751993, 33906867}, {0, 0}});
+
+ /* The standard case where all of the srcq
+ * goes after the destq.
+ * Create a big destq and a number of small srcqs.
+ * Should not result in O(n) scans to find the insert position. */
+ fails += unittest_msgq_insert_sort(
+ "issue #2450 (v1.2.1 regression)", insert_baseline, NULL,
+ (const struct ut_msg_range[]) {{200000, 200001},
+ {200002, 200006},
+ {200009, 200012},
+ {200015, 200016},
+ {200020, 200022},
+ {200030, 200090},
+ {200091, 200092},
+ {200093, 200094},
+ {200095, 200096},
+ {200097, 200099},
+ {0, 0}},
+ (const struct ut_msg_range[]) {{1, 199999}, {0, 0}});
+
+ return fails;
+}