/* * librdkafka - Apache Kafka C library * * Copyright (c) 2012,2013 Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include "rd.h" #include "rdkafka_int.h" #include "rdkafka_msg.h" #include "rdkafka_topic.h" #include "rdkafka_partition.h" #include "rdkafka_interceptor.h" #include "rdkafka_header.h" #include "rdkafka_idempotence.h" #include "rdkafka_txnmgr.h" #include "rdkafka_error.h" #include "rdcrc32.h" #include "rdfnv1a.h" #include "rdmurmur2.h" #include "rdrand.h" #include "rdtime.h" #include "rdsysqueue.h" #include "rdunittest.h" #include const char *rd_kafka_message_errstr(const rd_kafka_message_t *rkmessage) { if (!rkmessage->err) return NULL; if (rkmessage->payload) return (const char *)rkmessage->payload; return rd_kafka_err2str(rkmessage->err); } /** * @brief Check if producing is allowed. * * @param errorp If non-NULL and an producing is prohibited a new error_t * object will be allocated and returned in this pointer. * * @returns an error if not allowed, else 0. * * @remarks Also sets the corresponding errno. */ static RD_INLINE rd_kafka_resp_err_t rd_kafka_check_produce(rd_kafka_t *rk, rd_kafka_error_t **errorp) { rd_kafka_resp_err_t err; if (unlikely((err = rd_kafka_fatal_error_code(rk)))) { rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__FATAL, ECANCELED); if (errorp) { rd_kafka_rdlock(rk); *errorp = rd_kafka_error_new_fatal( err, "Producing not allowed since a previous fatal " "error was raised: %s", rk->rk_fatal.errstr); rd_kafka_rdunlock(rk); } return RD_KAFKA_RESP_ERR__FATAL; } if (likely(rd_kafka_txn_may_enq_msg(rk))) return RD_KAFKA_RESP_ERR_NO_ERROR; /* Transactional state forbids producing */ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__STATE, ENOEXEC); if (errorp) { rd_kafka_rdlock(rk); *errorp = rd_kafka_error_new( RD_KAFKA_RESP_ERR__STATE, "Producing not allowed in transactional state %s", rd_kafka_txn_state2str(rk->rk_eos.txn_state)); rd_kafka_rdunlock(rk); } return RD_KAFKA_RESP_ERR__STATE; } void rd_kafka_msg_destroy(rd_kafka_t *rk, rd_kafka_msg_t *rkm) { // FIXME if (rkm->rkm_flags & RD_KAFKA_MSG_F_ACCOUNT) { rd_dassert(rk || rkm->rkm_rkmessage.rkt); rd_kafka_curr_msgs_sub(rk ? rk : rkm->rkm_rkmessage.rkt->rkt_rk, 1, rkm->rkm_len); } if (rkm->rkm_headers) rd_kafka_headers_destroy(rkm->rkm_headers); if (likely(rkm->rkm_rkmessage.rkt != NULL)) rd_kafka_topic_destroy0(rkm->rkm_rkmessage.rkt); if (rkm->rkm_flags & RD_KAFKA_MSG_F_FREE && rkm->rkm_payload) rd_free(rkm->rkm_payload); if (rkm->rkm_flags & RD_KAFKA_MSG_F_FREE_RKM) rd_free(rkm); } /** * @brief Create a new Producer message, copying the payload as * indicated by msgflags. * * @returns the new message */ static rd_kafka_msg_t *rd_kafka_msg_new00(rd_kafka_topic_t *rkt, int32_t partition, int msgflags, char *payload, size_t len, const void *key, size_t keylen, void *msg_opaque) { rd_kafka_msg_t *rkm; size_t mlen = sizeof(*rkm); char *p; /* If we are to make a copy of the payload, allocate space for it too */ if (msgflags & RD_KAFKA_MSG_F_COPY) { msgflags &= ~RD_KAFKA_MSG_F_FREE; mlen += len; } mlen += keylen; /* Note: using rd_malloc here, not rd_calloc, so make sure all fields * are properly set up. */ rkm = rd_malloc(mlen); rkm->rkm_err = 0; rkm->rkm_flags = (RD_KAFKA_MSG_F_PRODUCER | RD_KAFKA_MSG_F_FREE_RKM | msgflags); rkm->rkm_len = len; rkm->rkm_opaque = msg_opaque; rkm->rkm_rkmessage.rkt = rd_kafka_topic_keep(rkt); rkm->rkm_broker_id = -1; rkm->rkm_partition = partition; rkm->rkm_offset = RD_KAFKA_OFFSET_INVALID; rkm->rkm_timestamp = 0; rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE; rkm->rkm_status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED; rkm->rkm_headers = NULL; p = (char *)(rkm + 1); if (payload && msgflags & RD_KAFKA_MSG_F_COPY) { /* Copy payload to space following the ..msg_t */ rkm->rkm_payload = p; memcpy(rkm->rkm_payload, payload, len); p += len; } else { /* Just point to the provided payload. */ rkm->rkm_payload = payload; } if (key) { rkm->rkm_key = p; rkm->rkm_key_len = keylen; memcpy(rkm->rkm_key, key, keylen); } else { rkm->rkm_key = NULL; rkm->rkm_key_len = 0; } return rkm; } /** * @brief Create a new Producer message. * * @remark Must only be used by producer code. * * Returns 0 on success or -1 on error. * Both errno and 'errp' are set appropriately. */ static rd_kafka_msg_t *rd_kafka_msg_new0(rd_kafka_topic_t *rkt, int32_t force_partition, int msgflags, char *payload, size_t len, const void *key, size_t keylen, void *msg_opaque, rd_kafka_resp_err_t *errp, int *errnop, rd_kafka_headers_t *hdrs, int64_t timestamp, rd_ts_t now) { rd_kafka_msg_t *rkm; size_t hdrs_size = 0; if (unlikely(!payload)) len = 0; if (!key) keylen = 0; if (hdrs) hdrs_size = rd_kafka_headers_serialized_size(hdrs); if (unlikely(len > INT32_MAX || keylen > INT32_MAX || rd_kafka_msg_max_wire_size(keylen, len, hdrs_size) > (size_t)rkt->rkt_rk->rk_conf.max_msg_size)) { *errp = RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE; if (errnop) *errnop = EMSGSIZE; return NULL; } if (msgflags & RD_KAFKA_MSG_F_BLOCK) *errp = rd_kafka_curr_msgs_add( rkt->rkt_rk, 1, len, 1 /*block*/, (msgflags & RD_KAFKA_MSG_F_RKT_RDLOCKED) ? &rkt->rkt_lock : NULL); else *errp = rd_kafka_curr_msgs_add(rkt->rkt_rk, 1, len, 0, NULL); if (unlikely(*errp)) { if (errnop) *errnop = ENOBUFS; return NULL; } rkm = rd_kafka_msg_new00( rkt, force_partition, msgflags | RD_KAFKA_MSG_F_ACCOUNT /* curr_msgs_add() */, payload, len, key, keylen, msg_opaque); memset(&rkm->rkm_u.producer, 0, sizeof(rkm->rkm_u.producer)); if (timestamp) rkm->rkm_timestamp = timestamp; else rkm->rkm_timestamp = rd_uclock() / 1000; rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_CREATE_TIME; if (hdrs) { rd_dassert(!rkm->rkm_headers); rkm->rkm_headers = hdrs; } rkm->rkm_ts_enq = now; if (rkt->rkt_conf.message_timeout_ms == 0) { rkm->rkm_ts_timeout = INT64_MAX; } else { rkm->rkm_ts_timeout = now + (int64_t)rkt->rkt_conf.message_timeout_ms * 1000; } /* Call interceptor chain for on_send */ rd_kafka_interceptors_on_send(rkt->rkt_rk, &rkm->rkm_rkmessage); return rkm; } /** * @brief Produce: creates a new message, runs the partitioner and enqueues * into on the selected partition. * * @returns 0 on success or -1 on error. * * If the function returns -1 and RD_KAFKA_MSG_F_FREE was specified, then * the memory associated with the payload is still the caller's * responsibility. * * @locks none */ int rd_kafka_msg_new(rd_kafka_topic_t *rkt, int32_t force_partition, int msgflags, char *payload, size_t len, const void *key, size_t keylen, void *msg_opaque) { rd_kafka_msg_t *rkm; rd_kafka_resp_err_t err; int errnox; if (unlikely((err = rd_kafka_check_produce(rkt->rkt_rk, NULL)))) return -1; /* Create message */ rkm = rd_kafka_msg_new0(rkt, force_partition, msgflags, payload, len, key, keylen, msg_opaque, &err, &errnox, NULL, 0, rd_clock()); if (unlikely(!rkm)) { /* errno is already set by msg_new() */ rd_kafka_set_last_error(err, errnox); return -1; } /* Partition the message */ err = rd_kafka_msg_partitioner(rkt, rkm, 1); if (likely(!err)) { rd_kafka_set_last_error(0, 0); return 0; } /* Interceptor: unroll failing messages by triggering on_ack.. */ rkm->rkm_err = err; rd_kafka_interceptors_on_acknowledgement(rkt->rkt_rk, &rkm->rkm_rkmessage); /* Handle partitioner failures: it only fails when the application * attempts to force a destination partition that does not exist * in the cluster. Note we must clear the RD_KAFKA_MSG_F_FREE * flag since our contract says we don't free the payload on * failure. */ rkm->rkm_flags &= ~RD_KAFKA_MSG_F_FREE; rd_kafka_msg_destroy(rkt->rkt_rk, rkm); /* Translate error codes to errnos. */ if (err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION) rd_kafka_set_last_error(err, ESRCH); else if (err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC) rd_kafka_set_last_error(err, ENOENT); else rd_kafka_set_last_error(err, EINVAL); /* NOTREACHED */ return -1; } /** @remark Keep rd_kafka_produceva() and rd_kafka_producev() in synch */ rd_kafka_error_t * rd_kafka_produceva(rd_kafka_t *rk, const rd_kafka_vu_t *vus, size_t cnt) { rd_kafka_msg_t s_rkm = { /* Message defaults */ .rkm_partition = RD_KAFKA_PARTITION_UA, .rkm_timestamp = 0, /* current time */ }; rd_kafka_msg_t *rkm = &s_rkm; rd_kafka_topic_t *rkt = NULL; rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; rd_kafka_error_t *error = NULL; rd_kafka_headers_t *hdrs = NULL; rd_kafka_headers_t *app_hdrs = NULL; /* App-provided headers list */ size_t i; if (unlikely(rd_kafka_check_produce(rk, &error))) return error; for (i = 0; i < cnt; i++) { const rd_kafka_vu_t *vu = &vus[i]; switch (vu->vtype) { case RD_KAFKA_VTYPE_TOPIC: rkt = rd_kafka_topic_new0(rk, vu->u.cstr, NULL, NULL, 1); break; case RD_KAFKA_VTYPE_RKT: rkt = rd_kafka_topic_proper(vu->u.rkt); rd_kafka_topic_keep(rkt); break; case RD_KAFKA_VTYPE_PARTITION: rkm->rkm_partition = vu->u.i32; break; case RD_KAFKA_VTYPE_VALUE: rkm->rkm_payload = vu->u.mem.ptr; rkm->rkm_len = vu->u.mem.size; break; case RD_KAFKA_VTYPE_KEY: rkm->rkm_key = vu->u.mem.ptr; rkm->rkm_key_len = vu->u.mem.size; break; case RD_KAFKA_VTYPE_OPAQUE: rkm->rkm_opaque = vu->u.ptr; break; case RD_KAFKA_VTYPE_MSGFLAGS: rkm->rkm_flags = vu->u.i; break; case RD_KAFKA_VTYPE_TIMESTAMP: rkm->rkm_timestamp = vu->u.i64; break; case RD_KAFKA_VTYPE_HEADER: if (unlikely(app_hdrs != NULL)) { error = rd_kafka_error_new( RD_KAFKA_RESP_ERR__CONFLICT, "VTYPE_HEADER and VTYPE_HEADERS " "are mutually exclusive"); goto err; } if (unlikely(!hdrs)) hdrs = rd_kafka_headers_new(8); err = rd_kafka_header_add(hdrs, vu->u.header.name, -1, vu->u.header.val, vu->u.header.size); if (unlikely(err)) { error = rd_kafka_error_new( err, "Failed to add header: %s", rd_kafka_err2str(err)); goto err; } break; case RD_KAFKA_VTYPE_HEADERS: if (unlikely(hdrs != NULL)) { error = rd_kafka_error_new( RD_KAFKA_RESP_ERR__CONFLICT, "VTYPE_HEADERS and VTYPE_HEADER " "are mutually exclusive"); goto err; } app_hdrs = vu->u.headers; break; default: error = rd_kafka_error_new( RD_KAFKA_RESP_ERR__INVALID_ARG, "Unsupported VTYPE %d", (int)vu->vtype); goto err; } } rd_assert(!error); if (unlikely(!rkt)) { error = rd_kafka_error_new(RD_KAFKA_RESP_ERR__INVALID_ARG, "Topic name or object required"); goto err; } rkm = rd_kafka_msg_new0( rkt, rkm->rkm_partition, rkm->rkm_flags, rkm->rkm_payload, rkm->rkm_len, rkm->rkm_key, rkm->rkm_key_len, rkm->rkm_opaque, &err, NULL, app_hdrs ? app_hdrs : hdrs, rkm->rkm_timestamp, rd_clock()); if (unlikely(err)) { error = rd_kafka_error_new(err, "Failed to produce message: %s", rd_kafka_err2str(err)); goto err; } /* Partition the message */ err = rd_kafka_msg_partitioner(rkt, rkm, 1); if (unlikely(err)) { /* Handle partitioner failures: it only fails when * the application attempts to force a destination * partition that does not exist in the cluster. */ /* Interceptors: Unroll on_send by on_ack.. */ rkm->rkm_err = err; rd_kafka_interceptors_on_acknowledgement(rk, &rkm->rkm_rkmessage); /* Note we must clear the RD_KAFKA_MSG_F_FREE * flag since our contract says we don't free the payload on * failure. */ rkm->rkm_flags &= ~RD_KAFKA_MSG_F_FREE; /* Deassociate application owned headers from message * since headers remain in application ownership * when producev() fails */ if (app_hdrs && app_hdrs == rkm->rkm_headers) rkm->rkm_headers = NULL; rd_kafka_msg_destroy(rk, rkm); error = rd_kafka_error_new(err, "Failed to enqueue message: %s", rd_kafka_err2str(err)); goto err; } rd_kafka_topic_destroy0(rkt); return NULL; err: if (rkt) rd_kafka_topic_destroy0(rkt); if (hdrs) rd_kafka_headers_destroy(hdrs); rd_assert(error != NULL); return error; } /** @remark Keep rd_kafka_produceva() and rd_kafka_producev() in synch */ rd_kafka_resp_err_t rd_kafka_producev(rd_kafka_t *rk, ...) { va_list ap; rd_kafka_msg_t s_rkm = { /* Message defaults */ .rkm_partition = RD_KAFKA_PARTITION_UA, .rkm_timestamp = 0, /* current time */ }; rd_kafka_msg_t *rkm = &s_rkm; rd_kafka_vtype_t vtype; rd_kafka_topic_t *rkt = NULL; rd_kafka_resp_err_t err; rd_kafka_headers_t *hdrs = NULL; rd_kafka_headers_t *app_hdrs = NULL; /* App-provided headers list */ if (unlikely((err = rd_kafka_check_produce(rk, NULL)))) return err; va_start(ap, rk); while (!err && (vtype = va_arg(ap, rd_kafka_vtype_t)) != RD_KAFKA_VTYPE_END) { switch (vtype) { case RD_KAFKA_VTYPE_TOPIC: rkt = rd_kafka_topic_new0(rk, va_arg(ap, const char *), NULL, NULL, 1); break; case RD_KAFKA_VTYPE_RKT: rkt = rd_kafka_topic_proper( va_arg(ap, rd_kafka_topic_t *)); rd_kafka_topic_keep(rkt); break; case RD_KAFKA_VTYPE_PARTITION: rkm->rkm_partition = va_arg(ap, int32_t); break; case RD_KAFKA_VTYPE_VALUE: rkm->rkm_payload = va_arg(ap, void *); rkm->rkm_len = va_arg(ap, size_t); break; case RD_KAFKA_VTYPE_KEY: rkm->rkm_key = va_arg(ap, void *); rkm->rkm_key_len = va_arg(ap, size_t); break; case RD_KAFKA_VTYPE_OPAQUE: rkm->rkm_opaque = va_arg(ap, void *); break; case RD_KAFKA_VTYPE_MSGFLAGS: rkm->rkm_flags = va_arg(ap, int); break; case RD_KAFKA_VTYPE_TIMESTAMP: rkm->rkm_timestamp = va_arg(ap, int64_t); break; case RD_KAFKA_VTYPE_HEADER: { const char *name; const void *value; ssize_t size; if (unlikely(app_hdrs != NULL)) { err = RD_KAFKA_RESP_ERR__CONFLICT; break; } if (unlikely(!hdrs)) hdrs = rd_kafka_headers_new(8); name = va_arg(ap, const char *); value = va_arg(ap, const void *); size = va_arg(ap, ssize_t); err = rd_kafka_header_add(hdrs, name, -1, value, size); } break; case RD_KAFKA_VTYPE_HEADERS: if (unlikely(hdrs != NULL)) { err = RD_KAFKA_RESP_ERR__CONFLICT; break; } app_hdrs = va_arg(ap, rd_kafka_headers_t *); break; default: err = RD_KAFKA_RESP_ERR__INVALID_ARG; break; } } va_end(ap); if (unlikely(!rkt)) return RD_KAFKA_RESP_ERR__INVALID_ARG; if (likely(!err)) rkm = rd_kafka_msg_new0( rkt, rkm->rkm_partition, rkm->rkm_flags, rkm->rkm_payload, rkm->rkm_len, rkm->rkm_key, rkm->rkm_key_len, rkm->rkm_opaque, &err, NULL, app_hdrs ? app_hdrs : hdrs, rkm->rkm_timestamp, rd_clock()); if (unlikely(err)) { rd_kafka_topic_destroy0(rkt); if (hdrs) rd_kafka_headers_destroy(hdrs); return err; } /* Partition the message */ err = rd_kafka_msg_partitioner(rkt, rkm, 1); if (unlikely(err)) { /* Handle partitioner failures: it only fails when * the application attempts to force a destination * partition that does not exist in the cluster. */ /* Interceptors: Unroll on_send by on_ack.. */ rkm->rkm_err = err; rd_kafka_interceptors_on_acknowledgement(rk, &rkm->rkm_rkmessage); /* Note we must clear the RD_KAFKA_MSG_F_FREE * flag since our contract says we don't free the payload on * failure. */ rkm->rkm_flags &= ~RD_KAFKA_MSG_F_FREE; /* Deassociate application owned headers from message * since headers remain in application ownership * when producev() fails */ if (app_hdrs && app_hdrs == rkm->rkm_headers) rkm->rkm_headers = NULL; rd_kafka_msg_destroy(rk, rkm); } rd_kafka_topic_destroy0(rkt); return err; } /** * @brief Produce a single message. * @locality any application thread * @locks none */ int rd_kafka_produce(rd_kafka_topic_t *rkt, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t keylen, void *msg_opaque) { return rd_kafka_msg_new(rkt, partition, msgflags, payload, len, key, keylen, msg_opaque); } /** * Produce a batch of messages. * Returns the number of messages succesfully queued for producing. * Each message's .err will be set accordingly. */ int rd_kafka_produce_batch(rd_kafka_topic_t *app_rkt, int32_t partition, int msgflags, rd_kafka_message_t *rkmessages, int message_cnt) { rd_kafka_msgq_t tmpq = RD_KAFKA_MSGQ_INITIALIZER(tmpq); int i; int64_t utc_now = rd_uclock() / 1000; rd_ts_t now = rd_clock(); int good = 0; int multiple_partitions = (partition == RD_KAFKA_PARTITION_UA || (msgflags & RD_KAFKA_MSG_F_PARTITION)); rd_kafka_resp_err_t all_err; rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt); rd_kafka_toppar_t *rktp = NULL; /* Propagated per-message below */ all_err = rd_kafka_check_produce(rkt->rkt_rk, NULL); rd_kafka_topic_rdlock(rkt); if (!multiple_partitions) { /* Single partition: look up the rktp once. */ rktp = rd_kafka_toppar_get_avail(rkt, partition, 1 /*ua on miss*/, &all_err); } else { /* Indicate to lower-level msg_new..() that rkt is locked * so that they may unlock it momentarily if blocking. */ msgflags |= RD_KAFKA_MSG_F_RKT_RDLOCKED; } for (i = 0; i < message_cnt; i++) { rd_kafka_msg_t *rkm; /* Propagate error for all messages. */ if (unlikely(all_err)) { rkmessages[i].err = all_err; continue; } /* Create message */ rkm = rd_kafka_msg_new0( rkt, (msgflags & RD_KAFKA_MSG_F_PARTITION) ? rkmessages[i].partition : partition, msgflags, rkmessages[i].payload, rkmessages[i].len, rkmessages[i].key, rkmessages[i].key_len, rkmessages[i]._private, &rkmessages[i].err, NULL, NULL, utc_now, now); if (unlikely(!rkm)) { if (rkmessages[i].err == RD_KAFKA_RESP_ERR__QUEUE_FULL) all_err = rkmessages[i].err; continue; } /* Three cases here: * partition==UA: run the partitioner (slow) * RD_KAFKA_MSG_F_PARTITION: produce message to specified * partition * fixed partition: simply concatenate the queue * to partit */ if (multiple_partitions) { if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA) { /* Partition the message */ rkmessages[i].err = rd_kafka_msg_partitioner( rkt, rkm, 0 /*already locked*/); } else { if (rktp == NULL || rkm->rkm_partition != rktp->rktp_partition) { rd_kafka_resp_err_t err; if (rktp != NULL) rd_kafka_toppar_destroy(rktp); rktp = rd_kafka_toppar_get_avail( rkt, rkm->rkm_partition, 1 /*ua on miss*/, &err); if (unlikely(!rktp)) { rkmessages[i].err = err; continue; } } rd_kafka_toppar_enq_msg(rktp, rkm, now); if (rd_kafka_is_transactional(rkt->rkt_rk)) { /* Add partition to transaction */ rd_kafka_txn_add_partition(rktp); } } if (unlikely(rkmessages[i].err)) { /* Interceptors: Unroll on_send by on_ack.. */ rd_kafka_interceptors_on_acknowledgement( rkt->rkt_rk, &rkmessages[i]); rd_kafka_msg_destroy(rkt->rkt_rk, rkm); continue; } } else { /* Single destination partition. */ rd_kafka_toppar_enq_msg(rktp, rkm, now); } rkmessages[i].err = RD_KAFKA_RESP_ERR_NO_ERROR; good++; } rd_kafka_topic_rdunlock(rkt); if (!multiple_partitions && good > 0 && rd_kafka_is_transactional(rkt->rkt_rk) && rktp->rktp_partition != RD_KAFKA_PARTITION_UA) { /* Add single destination partition to transaction */ rd_kafka_txn_add_partition(rktp); } if (rktp != NULL) rd_kafka_toppar_destroy(rktp); return good; } /** * @brief Scan \p rkmq for messages that have timed out and remove them from * \p rkmq and add to \p timedout queue. * * @param abs_next_timeout will be set to the next message timeout, or 0 * if no timeout. Optional, may be NULL. * * @returns the number of messages timed out. * * @locality any * @locks toppar_lock MUST be held */ int rd_kafka_msgq_age_scan(rd_kafka_toppar_t *rktp, rd_kafka_msgq_t *rkmq, rd_kafka_msgq_t *timedout, rd_ts_t now, rd_ts_t *abs_next_timeout) { rd_kafka_msg_t *rkm, *tmp, *first = NULL; int cnt = timedout->rkmq_msg_cnt; if (abs_next_timeout) *abs_next_timeout = 0; /* Assume messages are added in time sequencial order */ TAILQ_FOREACH_SAFE(rkm, &rkmq->rkmq_msgs, rkm_link, tmp) { /* NOTE: this is not true for the deprecated (and soon removed) * LIFO queuing strategy. */ if (likely(rkm->rkm_ts_timeout > now)) { if (abs_next_timeout) *abs_next_timeout = rkm->rkm_ts_timeout; break; } if (!first) first = rkm; rd_kafka_msgq_deq(rkmq, rkm, 1); rd_kafka_msgq_enq(timedout, rkm); } return timedout->rkmq_msg_cnt - cnt; } int rd_kafka_msgq_enq_sorted0(rd_kafka_msgq_t *rkmq, rd_kafka_msg_t *rkm, int (*order_cmp)(const void *, const void *)) { TAILQ_INSERT_SORTED(&rkmq->rkmq_msgs, rkm, rd_kafka_msg_t *, rkm_link, order_cmp); rkmq->rkmq_msg_bytes += rkm->rkm_len + rkm->rkm_key_len; return ++rkmq->rkmq_msg_cnt; } int rd_kafka_msgq_enq_sorted(const rd_kafka_topic_t *rkt, rd_kafka_msgq_t *rkmq, rd_kafka_msg_t *rkm) { rd_dassert(rkm->rkm_u.producer.msgid != 0); return rd_kafka_msgq_enq_sorted0(rkmq, rkm, rkt->rkt_conf.msg_order_cmp); } /** * @brief Find the insert before position (i.e., the msg which comes * after \p rkm sequencially) for message \p rkm. * * @param rkmq insert queue. * @param start_pos the element in \p rkmq to start scanning at, or NULL * to start with the first element. * @param rkm message to insert. * @param cmp message comparator. * @param cntp the accumulated number of messages up to, but not including, * the returned insert position. Optional (NULL). * Do not use when start_pos is set. * @param bytesp the accumulated number of bytes up to, but not inclduing, * the returned insert position. Optional (NULL). * Do not use when start_pos is set. * * @remark cntp and bytesp will NOT be accurate when \p start_pos is non-NULL. * * @returns the insert position element, or NULL if \p rkm should be * added at tail of queue. */ rd_kafka_msg_t *rd_kafka_msgq_find_pos(const rd_kafka_msgq_t *rkmq, const rd_kafka_msg_t *start_pos, const rd_kafka_msg_t *rkm, int (*cmp)(const void *, const void *), int *cntp, int64_t *bytesp) { const rd_kafka_msg_t *curr; int cnt = 0; int64_t bytes = 0; for (curr = start_pos ? start_pos : rd_kafka_msgq_first(rkmq); curr; curr = TAILQ_NEXT(curr, rkm_link)) { if (cmp(rkm, curr) < 0) { if (cntp) { *cntp = cnt; *bytesp = bytes; } return (rd_kafka_msg_t *)curr; } if (cntp) { cnt++; bytes += rkm->rkm_len + rkm->rkm_key_len; } } return NULL; } /** * @brief Split the original \p leftq into a left and right part, * with element \p first_right being the first element in the * right part (\p rightq). * * @param cnt is the number of messages up to, but not including \p first_right * in \p leftq, namely the number of messages to remain in * \p leftq after the split. * @param bytes is the bytes counterpart to \p cnt. */ void rd_kafka_msgq_split(rd_kafka_msgq_t *leftq, rd_kafka_msgq_t *rightq, rd_kafka_msg_t *first_right, int cnt, int64_t bytes) { rd_kafka_msg_t *llast; rd_assert(first_right != TAILQ_FIRST(&leftq->rkmq_msgs)); llast = TAILQ_PREV(first_right, rd_kafka_msg_head_s, rkm_link); rd_kafka_msgq_init(rightq); rightq->rkmq_msgs.tqh_first = first_right; rightq->rkmq_msgs.tqh_last = leftq->rkmq_msgs.tqh_last; first_right->rkm_link.tqe_prev = &rightq->rkmq_msgs.tqh_first; leftq->rkmq_msgs.tqh_last = &llast->rkm_link.tqe_next; llast->rkm_link.tqe_next = NULL; rightq->rkmq_msg_cnt = leftq->rkmq_msg_cnt - cnt; rightq->rkmq_msg_bytes = leftq->rkmq_msg_bytes - bytes; leftq->rkmq_msg_cnt = cnt; leftq->rkmq_msg_bytes = bytes; rd_kafka_msgq_verify_order(NULL, leftq, 0, rd_false); rd_kafka_msgq_verify_order(NULL, rightq, 0, rd_false); } /** * @brief Set per-message metadata for all messages in \p rkmq */ void rd_kafka_msgq_set_metadata(rd_kafka_msgq_t *rkmq, int32_t broker_id, int64_t base_offset, int64_t timestamp, rd_kafka_msg_status_t status) { rd_kafka_msg_t *rkm; TAILQ_FOREACH(rkm, &rkmq->rkmq_msgs, rkm_link) { rkm->rkm_broker_id = broker_id; rkm->rkm_offset = base_offset++; if (timestamp != -1) { rkm->rkm_timestamp = timestamp; rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME; } /* Don't downgrade a message from any form of PERSISTED * to NOT_PERSISTED, since the original cause of indicating * PERSISTED can't be changed. * E.g., a previous ack or in-flight timeout. */ if (unlikely(status == RD_KAFKA_MSG_STATUS_NOT_PERSISTED && rkm->rkm_status != RD_KAFKA_MSG_STATUS_NOT_PERSISTED)) continue; rkm->rkm_status = status; } } /** * @brief Move all messages in \p src to \p dst whose msgid <= last_msgid. * * @remark src must be ordered */ void rd_kafka_msgq_move_acked(rd_kafka_msgq_t *dest, rd_kafka_msgq_t *src, uint64_t last_msgid, rd_kafka_msg_status_t status) { rd_kafka_msg_t *rkm; while ((rkm = rd_kafka_msgq_first(src)) && rkm->rkm_u.producer.msgid <= last_msgid) { rd_kafka_msgq_deq(src, rkm, 1); rd_kafka_msgq_enq(dest, rkm); rkm->rkm_status = status; } rd_kafka_msgq_verify_order(NULL, dest, 0, rd_false); rd_kafka_msgq_verify_order(NULL, src, 0, rd_false); } int32_t rd_kafka_msg_partitioner_random(const rd_kafka_topic_t *rkt, const void *key, size_t keylen, int32_t partition_cnt, void *rkt_opaque, void *msg_opaque) { int32_t p = rd_jitter(0, partition_cnt - 1); if (unlikely(!rd_kafka_topic_partition_available(rkt, p))) return rd_jitter(0, partition_cnt - 1); else return p; } int32_t rd_kafka_msg_partitioner_consistent(const rd_kafka_topic_t *rkt, const void *key, size_t keylen, int32_t partition_cnt, void *rkt_opaque, void *msg_opaque) { return rd_crc32(key, keylen) % partition_cnt; } int32_t rd_kafka_msg_partitioner_consistent_random(const rd_kafka_topic_t *rkt, const void *key, size_t keylen, int32_t partition_cnt, void *rkt_opaque, void *msg_opaque) { if (keylen == 0) return rd_kafka_msg_partitioner_random( rkt, key, keylen, partition_cnt, rkt_opaque, msg_opaque); else return rd_kafka_msg_partitioner_consistent( rkt, key, keylen, partition_cnt, rkt_opaque, msg_opaque); } int32_t rd_kafka_msg_partitioner_murmur2(const rd_kafka_topic_t *rkt, const void *key, size_t keylen, int32_t partition_cnt, void *rkt_opaque, void *msg_opaque) { return (rd_murmur2(key, keylen) & 0x7fffffff) % partition_cnt; } int32_t rd_kafka_msg_partitioner_murmur2_random(const rd_kafka_topic_t *rkt, const void *key, size_t keylen, int32_t partition_cnt, void *rkt_opaque, void *msg_opaque) { if (!key) return rd_kafka_msg_partitioner_random( rkt, key, keylen, partition_cnt, rkt_opaque, msg_opaque); else return (rd_murmur2(key, keylen) & 0x7fffffff) % partition_cnt; } int32_t rd_kafka_msg_partitioner_fnv1a(const rd_kafka_topic_t *rkt, const void *key, size_t keylen, int32_t partition_cnt, void *rkt_opaque, void *msg_opaque) { return rd_fnv1a(key, keylen) % partition_cnt; } int32_t rd_kafka_msg_partitioner_fnv1a_random(const rd_kafka_topic_t *rkt, const void *key, size_t keylen, int32_t partition_cnt, void *rkt_opaque, void *msg_opaque) { if (!key) return rd_kafka_msg_partitioner_random( rkt, key, keylen, partition_cnt, rkt_opaque, msg_opaque); else return rd_fnv1a(key, keylen) % partition_cnt; } int32_t rd_kafka_msg_sticky_partition(rd_kafka_topic_t *rkt, const void *key, size_t keylen, int32_t partition_cnt, void *rkt_opaque, void *msg_opaque) { if (!rd_kafka_topic_partition_available(rkt, rkt->rkt_sticky_partition)) rd_interval_expedite(&rkt->rkt_sticky_intvl, 0); if (rd_interval(&rkt->rkt_sticky_intvl, rkt->rkt_rk->rk_conf.sticky_partition_linger_ms * 1000, 0) > 0) { rkt->rkt_sticky_partition = rd_kafka_msg_partitioner_random( rkt, key, keylen, partition_cnt, rkt_opaque, msg_opaque); rd_kafka_dbg(rkt->rkt_rk, TOPIC, "PARTITIONER", "%s [%" PRId32 "] is the new sticky partition", rkt->rkt_topic->str, rkt->rkt_sticky_partition); } return rkt->rkt_sticky_partition; } /** * @brief Assigns a message to a topic partition using a partitioner. * * @param do_lock if RD_DO_LOCK then acquire topic lock. * * @returns RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION or .._UNKNOWN_TOPIC if * partitioning failed, or 0 on success. * * @locality any * @locks rd_kafka_ */ int rd_kafka_msg_partitioner(rd_kafka_topic_t *rkt, rd_kafka_msg_t *rkm, rd_dolock_t do_lock) { int32_t partition; rd_kafka_toppar_t *rktp_new; rd_kafka_resp_err_t err; if (do_lock) rd_kafka_topic_rdlock(rkt); switch (rkt->rkt_state) { case RD_KAFKA_TOPIC_S_UNKNOWN: /* No metadata received from cluster yet. * Put message in UA partition and re-run partitioner when * cluster comes up. */ partition = RD_KAFKA_PARTITION_UA; break; case RD_KAFKA_TOPIC_S_NOTEXISTS: /* Topic not found in cluster. * Fail message immediately. */ err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC; if (do_lock) rd_kafka_topic_rdunlock(rkt); return err; case RD_KAFKA_TOPIC_S_ERROR: /* Topic has permanent error. * Fail message immediately. */ err = rkt->rkt_err; if (do_lock) rd_kafka_topic_rdunlock(rkt); return err; case RD_KAFKA_TOPIC_S_EXISTS: /* Topic exists in cluster. */ /* Topic exists but has no partitions. * This is usually an transient state following the * auto-creation of a topic. */ if (unlikely(rkt->rkt_partition_cnt == 0)) { partition = RD_KAFKA_PARTITION_UA; break; } /* Partition not assigned, run partitioner. */ if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA) { if (!rkt->rkt_conf.random_partitioner && (!rkm->rkm_key || (rkm->rkm_key_len == 0 && rkt->rkt_conf.partitioner == rd_kafka_msg_partitioner_consistent_random))) { partition = rd_kafka_msg_sticky_partition( rkt, rkm->rkm_key, rkm->rkm_key_len, rkt->rkt_partition_cnt, rkt->rkt_conf.opaque, rkm->rkm_opaque); } else { partition = rkt->rkt_conf.partitioner( rkt, rkm->rkm_key, rkm->rkm_key_len, rkt->rkt_partition_cnt, rkt->rkt_conf.opaque, rkm->rkm_opaque); } } else partition = rkm->rkm_partition; /* Check that partition exists. */ if (partition >= rkt->rkt_partition_cnt) { err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION; if (do_lock) rd_kafka_topic_rdunlock(rkt); return err; } break; default: rd_kafka_assert(rkt->rkt_rk, !*"NOTREACHED"); break; } /* Get new partition */ rktp_new = rd_kafka_toppar_get(rkt, partition, 0); if (unlikely(!rktp_new)) { /* Unknown topic or partition */ if (rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS) err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC; else err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION; if (do_lock) rd_kafka_topic_rdunlock(rkt); return err; } rd_atomic64_add(&rktp_new->rktp_c.producer_enq_msgs, 1); /* Update message partition */ if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA) rkm->rkm_partition = partition; /* Partition is available: enqueue msg on partition's queue */ rd_kafka_toppar_enq_msg(rktp_new, rkm, rd_clock()); if (do_lock) rd_kafka_topic_rdunlock(rkt); if (rktp_new->rktp_partition != RD_KAFKA_PARTITION_UA && rd_kafka_is_transactional(rkt->rkt_rk)) { /* Add partition to transaction */ rd_kafka_txn_add_partition(rktp_new); } rd_kafka_toppar_destroy(rktp_new); /* from _get() */ return 0; } /** * @name Public message type (rd_kafka_message_t) */ void rd_kafka_message_destroy(rd_kafka_message_t *rkmessage) { rd_kafka_op_t *rko; if (likely((rko = (rd_kafka_op_t *)rkmessage->_private) != NULL)) rd_kafka_op_destroy(rko); else { rd_kafka_msg_t *rkm = rd_kafka_message2msg(rkmessage); rd_kafka_msg_destroy(NULL, rkm); } } rd_kafka_message_t *rd_kafka_message_new(void) { rd_kafka_msg_t *rkm = rd_calloc(1, sizeof(*rkm)); rkm->rkm_flags = RD_KAFKA_MSG_F_FREE_RKM; rkm->rkm_broker_id = -1; return (rd_kafka_message_t *)rkm; } /** * @brief Set up a rkmessage from an rko for passing to the application. * @remark Will trigger on_consume() interceptors if any. */ static rd_kafka_message_t * rd_kafka_message_setup(rd_kafka_op_t *rko, rd_kafka_message_t *rkmessage) { rd_kafka_topic_t *rkt; rd_kafka_toppar_t *rktp = NULL; if (rko->rko_type == RD_KAFKA_OP_DR) { rkt = rko->rko_u.dr.rkt; } else { if (rko->rko_rktp) { rktp = rko->rko_rktp; rkt = rktp->rktp_rkt; } else rkt = NULL; rkmessage->_private = rko; } if (!rkmessage->rkt && rkt) rkmessage->rkt = rd_kafka_topic_keep(rkt); if (rktp) rkmessage->partition = rktp->rktp_partition; if (!rkmessage->err) rkmessage->err = rko->rko_err; /* Call on_consume interceptors */ switch (rko->rko_type) { case RD_KAFKA_OP_FETCH: if (!rkmessage->err && rkt) rd_kafka_interceptors_on_consume(rkt->rkt_rk, rkmessage); break; default: break; } return rkmessage; } /** * @brief Get rkmessage from rkm (for EVENT_DR) * @remark Must only be called just prior to passing a dr to the application. */ rd_kafka_message_t *rd_kafka_message_get_from_rkm(rd_kafka_op_t *rko, rd_kafka_msg_t *rkm) { return rd_kafka_message_setup(rko, &rkm->rkm_rkmessage); } /** * @brief Convert rko to rkmessage * @remark Must only be called just prior to passing a consumed message * or event to the application. * @remark Will trigger on_consume() interceptors, if any. * @returns a rkmessage (bound to the rko). */ rd_kafka_message_t *rd_kafka_message_get(rd_kafka_op_t *rko) { rd_kafka_message_t *rkmessage; if (!rko) return rd_kafka_message_new(); /* empty */ switch (rko->rko_type) { case RD_KAFKA_OP_FETCH: /* Use embedded rkmessage */ rkmessage = &rko->rko_u.fetch.rkm.rkm_rkmessage; break; case RD_KAFKA_OP_ERR: case RD_KAFKA_OP_CONSUMER_ERR: rkmessage = &rko->rko_u.err.rkm.rkm_rkmessage; rkmessage->payload = rko->rko_u.err.errstr; rkmessage->len = rkmessage->payload ? strlen(rkmessage->payload) : 0; rkmessage->offset = rko->rko_u.err.offset; break; default: rd_kafka_assert(NULL, !*"unhandled optype"); RD_NOTREACHED(); return NULL; } return rd_kafka_message_setup(rko, rkmessage); } int64_t rd_kafka_message_timestamp(const rd_kafka_message_t *rkmessage, rd_kafka_timestamp_type_t *tstype) { rd_kafka_msg_t *rkm; if (rkmessage->err) { if (tstype) *tstype = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE; return -1; } rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage); if (tstype) *tstype = rkm->rkm_tstype; return rkm->rkm_timestamp; } int64_t rd_kafka_message_latency(const rd_kafka_message_t *rkmessage) { rd_kafka_msg_t *rkm; rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage); if (unlikely(!rkm->rkm_ts_enq)) return -1; return rd_clock() - rkm->rkm_ts_enq; } int32_t rd_kafka_message_broker_id(const rd_kafka_message_t *rkmessage) { rd_kafka_msg_t *rkm; rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage); return rkm->rkm_broker_id; } /** * @brief Parse serialized message headers and populate * rkm->rkm_headers (which must be NULL). */ static rd_kafka_resp_err_t rd_kafka_msg_headers_parse(rd_kafka_msg_t *rkm) { rd_kafka_buf_t *rkbuf; int64_t HeaderCount; const int log_decode_errors = 0; rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__BAD_MSG; int i; rd_kafka_headers_t *hdrs = NULL; rd_dassert(!rkm->rkm_headers); if (RD_KAFKAP_BYTES_LEN(&rkm->rkm_u.consumer.binhdrs) == 0) return RD_KAFKA_RESP_ERR__NOENT; rkbuf = rd_kafka_buf_new_shadow( rkm->rkm_u.consumer.binhdrs.data, RD_KAFKAP_BYTES_LEN(&rkm->rkm_u.consumer.binhdrs), NULL); rd_kafka_buf_read_varint(rkbuf, &HeaderCount); if (HeaderCount <= 0) { rd_kafka_buf_destroy(rkbuf); return RD_KAFKA_RESP_ERR__NOENT; } else if (unlikely(HeaderCount > 100000)) { rd_kafka_buf_destroy(rkbuf); return RD_KAFKA_RESP_ERR__BAD_MSG; } hdrs = rd_kafka_headers_new((size_t)HeaderCount); for (i = 0; (int64_t)i < HeaderCount; i++) { int64_t KeyLen, ValueLen; const char *Key, *Value; rd_kafka_buf_read_varint(rkbuf, &KeyLen); rd_kafka_buf_read_ptr(rkbuf, &Key, (size_t)KeyLen); rd_kafka_buf_read_varint(rkbuf, &ValueLen); if (unlikely(ValueLen == -1)) Value = NULL; else rd_kafka_buf_read_ptr(rkbuf, &Value, (size_t)ValueLen); rd_kafka_header_add(hdrs, Key, (ssize_t)KeyLen, Value, (ssize_t)ValueLen); } rkm->rkm_headers = hdrs; rd_kafka_buf_destroy(rkbuf); return RD_KAFKA_RESP_ERR_NO_ERROR; err_parse: err = rkbuf->rkbuf_err; rd_kafka_buf_destroy(rkbuf); if (hdrs) rd_kafka_headers_destroy(hdrs); return err; } rd_kafka_resp_err_t rd_kafka_message_headers(const rd_kafka_message_t *rkmessage, rd_kafka_headers_t **hdrsp) { rd_kafka_msg_t *rkm; rd_kafka_resp_err_t err; rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage); if (rkm->rkm_headers) { *hdrsp = rkm->rkm_headers; return RD_KAFKA_RESP_ERR_NO_ERROR; } /* Producer (rkm_headers will be set if there were any headers) */ if (rkm->rkm_flags & RD_KAFKA_MSG_F_PRODUCER) return RD_KAFKA_RESP_ERR__NOENT; /* Consumer */ /* No previously parsed headers, check if the underlying * protocol message had headers and if so, parse them. */ if (unlikely(!RD_KAFKAP_BYTES_LEN(&rkm->rkm_u.consumer.binhdrs))) return RD_KAFKA_RESP_ERR__NOENT; err = rd_kafka_msg_headers_parse(rkm); if (unlikely(err)) return err; *hdrsp = rkm->rkm_headers; return RD_KAFKA_RESP_ERR_NO_ERROR; } rd_kafka_resp_err_t rd_kafka_message_detach_headers(rd_kafka_message_t *rkmessage, rd_kafka_headers_t **hdrsp) { rd_kafka_msg_t *rkm; rd_kafka_resp_err_t err; err = rd_kafka_message_headers(rkmessage, hdrsp); if (err) return err; rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage); rkm->rkm_headers = NULL; return RD_KAFKA_RESP_ERR_NO_ERROR; } void rd_kafka_message_set_headers(rd_kafka_message_t *rkmessage, rd_kafka_headers_t *hdrs) { rd_kafka_msg_t *rkm; rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage); if (rkm->rkm_headers) { assert(rkm->rkm_headers != hdrs); rd_kafka_headers_destroy(rkm->rkm_headers); } rkm->rkm_headers = hdrs; } rd_kafka_msg_status_t rd_kafka_message_status(const rd_kafka_message_t *rkmessage) { rd_kafka_msg_t *rkm; rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage); return rkm->rkm_status; } int32_t rd_kafka_message_leader_epoch(const rd_kafka_message_t *rkmessage) { rd_kafka_msg_t *rkm; if (unlikely(!rkmessage->rkt || rkmessage->rkt->rkt_rk->rk_type != RD_KAFKA_CONSUMER)) return -1; rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage); return rkm->rkm_u.consumer.leader_epoch; } void rd_kafka_msgq_dump(FILE *fp, const char *what, rd_kafka_msgq_t *rkmq) { rd_kafka_msg_t *rkm; int cnt = 0; fprintf(fp, "%s msgq_dump (%d messages, %" PRIusz " bytes):\n", what, rd_kafka_msgq_len(rkmq), rd_kafka_msgq_size(rkmq)); TAILQ_FOREACH(rkm, &rkmq->rkmq_msgs, rkm_link) { fprintf(fp, " [%" PRId32 "]@%" PRId64 ": rkm msgid %" PRIu64 ": \"%.*s\"\n", rkm->rkm_partition, rkm->rkm_offset, rkm->rkm_u.producer.msgid, (int)rkm->rkm_len, (const char *)rkm->rkm_payload); rd_assert(cnt++ < rkmq->rkmq_msg_cnt); } } /** * @brief Destroy resources associated with msgbatch */ void rd_kafka_msgbatch_destroy(rd_kafka_msgbatch_t *rkmb) { if (rkmb->rktp) { rd_kafka_toppar_destroy(rkmb->rktp); rkmb->rktp = NULL; } rd_assert(RD_KAFKA_MSGQ_EMPTY(&rkmb->msgq)); } /** * @brief Initialize a message batch for the Idempotent Producer. */ void rd_kafka_msgbatch_init(rd_kafka_msgbatch_t *rkmb, rd_kafka_toppar_t *rktp, rd_kafka_pid_t pid, uint64_t epoch_base_msgid) { memset(rkmb, 0, sizeof(*rkmb)); rkmb->rktp = rd_kafka_toppar_keep(rktp); rd_kafka_msgq_init(&rkmb->msgq); rkmb->pid = pid; rkmb->first_seq = -1; rkmb->epoch_base_msgid = epoch_base_msgid; } /** * @brief Set the first message in the batch. which is used to set * the BaseSequence and keep track of batch reconstruction range. * * @param rkm is the first message in the batch. */ void rd_kafka_msgbatch_set_first_msg(rd_kafka_msgbatch_t *rkmb, rd_kafka_msg_t *rkm) { rd_assert(rkmb->first_msgid == 0); if (!rd_kafka_pid_valid(rkmb->pid)) return; rkmb->first_msgid = rkm->rkm_u.producer.msgid; /* Our msgid counter is 64-bits, but the * Kafka protocol's sequence is only 31 (signed), so we'll * need to handle wrapping. */ rkmb->first_seq = rd_kafka_seq_wrap(rkm->rkm_u.producer.msgid - rkmb->epoch_base_msgid); /* Check if there is a stored last message * on the first msg, which means an entire * batch of messages are being retried and * we need to maintain the exact messages * of the original batch. * Simply tracking the last message, on * the first message, is sufficient for now. * Will be 0 if not applicable. */ rkmb->last_msgid = rkm->rkm_u.producer.last_msgid; } /** * @brief Message batch is ready to be transmitted. * * @remark This function assumes the batch will be transmitted and increases * the toppar's in-flight count. */ void rd_kafka_msgbatch_ready_produce(rd_kafka_msgbatch_t *rkmb) { rd_kafka_toppar_t *rktp = rkmb->rktp; rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk; /* Keep track of number of requests in-flight per partition, * and the number of partitions with in-flight requests when * idempotent producer - this is used to drain partitions * before resetting the PID. */ if (rd_atomic32_add(&rktp->rktp_msgs_inflight, rd_kafka_msgq_len(&rkmb->msgq)) == rd_kafka_msgq_len(&rkmb->msgq) && rd_kafka_is_idempotent(rk)) rd_kafka_idemp_inflight_toppar_add(rk, rktp); } /** * @brief Allow queue wakeups after \p abstime, or when the * given \p batch_msg_cnt or \p batch_msg_bytes have been reached. * * @param rkmq Queue to monitor and set wakeup parameters on. * @param dest_rkmq Destination queue used to meter current queue depths * and oldest message. May be the same as \p rkmq but is * typically the rktp_xmit_msgq. * @param next_wakeup If non-NULL: update the caller's next scheduler wakeup * according to the wakeup time calculated by this function. * @param now The current time. * @param linger_us The configured queue linger / batching time. * @param batch_msg_cnt Queue threshold before signalling. * @param batch_msg_bytes Queue threshold before signalling. * * @returns true if the wakeup conditions are already met and messages are ready * to be sent, else false. * * @locks_required rd_kafka_toppar_lock() * * * Producer queue and broker thread wake-up behaviour. * * There are contradicting requirements at play here: * - Latency: queued messages must be batched and sent according to * batch size and linger.ms configuration. * - Wakeups: keep the number of thread wake-ups to a minimum to avoid * high CPU utilization and context switching. * * The message queue (rd_kafka_msgq_t) has functionality for the writer (app) * to wake up the reader (broker thread) when there's a new message added. * This wakeup is done thru a combination of cndvar signalling and IO writes * to make sure a thread wakeup is triggered regardless if the broker thread * is blocking on cnd_timedwait() or on IO poll. * When the broker thread is woken up it will scan all the partitions it is * the leader for to check if there are messages to be sent - all according * to the configured batch size and linger.ms - and then decide its next * wait time depending on the lowest remaining linger.ms setting of any * partition with messages enqueued. * * This wait time must also be set as a threshold on the message queue, telling * the writer (app) that it must not trigger a wakeup until the wait time * has expired, or the batch sizes have been exceeded. * * The message queue wakeup time is per partition, while the broker thread * wakeup time is the lowest of all its partitions' wakeup times. * * The per-partition wakeup constraints are calculated and set by * rd_kafka_msgq_allow_wakeup_at() which is called from the broker thread's * per-partition handler. * This function is called each time there are changes to the broker-local * partition transmit queue (rktp_xmit_msgq), such as: * - messages are moved from the partition queue (rktp_msgq) to rktp_xmit_msgq * - messages are moved to a ProduceRequest * - messages are timed out from the rktp_xmit_msgq * - the flushing state changed (rd_kafka_flush() is called or returned). * * If none of these things happen, the broker thread will simply read the * last stored wakeup time for each partition and use that for calculating its * minimum wait time. * * * On the writer side, namely the application calling rd_kafka_produce(), the * followings checks are performed to see if it may trigger a wakeup when * it adds a new message to the partition queue: * - the current time has reached the wakeup time (e.g., remaining linger.ms * has expired), or * - with the new message(s) being added, either the batch.size or * batch.num.messages thresholds have been exceeded, or * - the application is calling rd_kafka_flush(), * - and no wakeup has been signalled yet. This is critical since it may take * some time for the broker thread to do its work we'll want to avoid * flooding it with wakeups. So a wakeup is only sent once per * wakeup period. */ rd_bool_t rd_kafka_msgq_allow_wakeup_at(rd_kafka_msgq_t *rkmq, const rd_kafka_msgq_t *dest_rkmq, rd_ts_t *next_wakeup, rd_ts_t now, rd_ts_t linger_us, int32_t batch_msg_cnt, int64_t batch_msg_bytes) { int32_t msg_cnt = rd_kafka_msgq_len(dest_rkmq); int64_t msg_bytes = rd_kafka_msgq_size(dest_rkmq); if (RD_KAFKA_MSGQ_EMPTY(dest_rkmq)) { rkmq->rkmq_wakeup.on_first = rd_true; rkmq->rkmq_wakeup.abstime = now + linger_us; /* Leave next_wakeup untouched since the queue is empty */ msg_cnt = 0; msg_bytes = 0; } else { const rd_kafka_msg_t *rkm = rd_kafka_msgq_first(dest_rkmq); rkmq->rkmq_wakeup.on_first = rd_false; if (unlikely(rkm->rkm_u.producer.ts_backoff > now)) { /* Honour retry.backoff.ms: * wait for backoff to expire */ rkmq->rkmq_wakeup.abstime = rkm->rkm_u.producer.ts_backoff; } else { /* Use message's produce() time + linger.ms */ rkmq->rkmq_wakeup.abstime = rd_kafka_msg_enq_time(rkm) + linger_us; if (rkmq->rkmq_wakeup.abstime <= now) rkmq->rkmq_wakeup.abstime = now; } /* Update the caller's scheduler wakeup time */ if (next_wakeup && rkmq->rkmq_wakeup.abstime < *next_wakeup) *next_wakeup = rkmq->rkmq_wakeup.abstime; msg_cnt = rd_kafka_msgq_len(dest_rkmq); msg_bytes = rd_kafka_msgq_size(dest_rkmq); } /* * If there are more messages or bytes in queue than the batch limits, * or the linger time has been exceeded, * then there is no need for wakeup since the broker thread will * produce those messages as quickly as it can. */ if (msg_cnt >= batch_msg_cnt || msg_bytes >= batch_msg_bytes || (msg_cnt > 0 && now >= rkmq->rkmq_wakeup.abstime)) { /* Prevent further signalling */ rkmq->rkmq_wakeup.signalled = rd_true; /* Batch is ready */ return rd_true; } /* If the current msg or byte count is less than the batch limit * then set the rkmq count to the remaining count or size to * reach the batch limits. * This is for the case where the producer is waiting for more * messages to accumulate into a batch. The wakeup should only * occur once a threshold is reached or the abstime has expired. */ rkmq->rkmq_wakeup.signalled = rd_false; rkmq->rkmq_wakeup.msg_cnt = batch_msg_cnt - msg_cnt; rkmq->rkmq_wakeup.msg_bytes = batch_msg_bytes - msg_bytes; return rd_false; } /** * @brief Verify order (by msgid) in message queue. * For development use only. */ void rd_kafka_msgq_verify_order0(const char *function, int line, const rd_kafka_toppar_t *rktp, const rd_kafka_msgq_t *rkmq, uint64_t exp_first_msgid, rd_bool_t gapless) { const rd_kafka_msg_t *rkm; uint64_t exp; int errcnt = 0; int cnt = 0; const char *topic = rktp ? rktp->rktp_rkt->rkt_topic->str : "n/a"; int32_t partition = rktp ? rktp->rktp_partition : -1; if (rd_kafka_msgq_len(rkmq) == 0) return; if (exp_first_msgid) exp = exp_first_msgid; else { exp = rd_kafka_msgq_first(rkmq)->rkm_u.producer.msgid; if (exp == 0) /* message without msgid (e.g., UA partition) */ return; } TAILQ_FOREACH(rkm, &rkmq->rkmq_msgs, rkm_link) { #if 0 printf("%s:%d: %s [%"PRId32"]: rkm #%d (%p) " "msgid %"PRIu64"\n", function, line, topic, partition, cnt, rkm, rkm->rkm_u.producer.msgid); #endif if (gapless && rkm->rkm_u.producer.msgid != exp) { printf("%s:%d: %s [%" PRId32 "]: rkm #%d (%p) " "msgid %" PRIu64 ": " "expected msgid %" PRIu64 "\n", function, line, topic, partition, cnt, rkm, rkm->rkm_u.producer.msgid, exp); errcnt++; } else if (!gapless && rkm->rkm_u.producer.msgid < exp) { printf("%s:%d: %s [%" PRId32 "]: rkm #%d (%p) " "msgid %" PRIu64 ": " "expected increased msgid >= %" PRIu64 "\n", function, line, topic, partition, cnt, rkm, rkm->rkm_u.producer.msgid, exp); errcnt++; } else exp++; if (cnt >= rkmq->rkmq_msg_cnt) { printf("%s:%d: %s [%" PRId32 "]: rkm #%d (%p) " "msgid %" PRIu64 ": loop in queue?\n", function, line, topic, partition, cnt, rkm, rkm->rkm_u.producer.msgid); errcnt++; break; } cnt++; } rd_assert(!errcnt); } /** * @name Unit tests */ /** * @brief Unittest: message allocator */ rd_kafka_msg_t *ut_rd_kafka_msg_new(size_t msgsize) { rd_kafka_msg_t *rkm; rkm = rd_calloc(1, sizeof(*rkm)); rkm->rkm_flags = RD_KAFKA_MSG_F_FREE_RKM; rkm->rkm_offset = RD_KAFKA_OFFSET_INVALID; rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE; if (msgsize) { rd_assert(msgsize <= sizeof(*rkm)); rkm->rkm_payload = rkm; rkm->rkm_len = msgsize; } return rkm; } /** * @brief Unittest: destroy all messages in queue */ void ut_rd_kafka_msgq_purge(rd_kafka_msgq_t *rkmq) { rd_kafka_msg_t *rkm, *tmp; TAILQ_FOREACH_SAFE(rkm, &rkmq->rkmq_msgs, rkm_link, tmp) rd_kafka_msg_destroy(NULL, rkm); rd_kafka_msgq_init(rkmq); } static int ut_verify_msgq_order(const char *what, const rd_kafka_msgq_t *rkmq, uint64_t first, uint64_t last, rd_bool_t req_consecutive) { const rd_kafka_msg_t *rkm; uint64_t expected = first; int incr = first < last ? +1 : -1; int fails = 0; int cnt = 0; TAILQ_FOREACH(rkm, &rkmq->rkmq_msgs, rkm_link) { if ((req_consecutive && rkm->rkm_u.producer.msgid != expected) || (!req_consecutive && rkm->rkm_u.producer.msgid < expected)) { if (fails++ < 100) RD_UT_SAY("%s: expected msgid %s %" PRIu64 " not %" PRIu64 " at index #%d", what, req_consecutive ? "==" : ">=", expected, rkm->rkm_u.producer.msgid, cnt); } cnt++; expected += incr; if (cnt > rkmq->rkmq_msg_cnt) { RD_UT_SAY("%s: loop in queue?", what); fails++; break; } } RD_UT_ASSERT(!fails, "See %d previous failure(s)", fails); return fails; } /** * @brief Verify ordering comparator for message queues. */ static int unittest_msgq_order(const char *what, int fifo, int (*cmp)(const void *, const void *)) { rd_kafka_msgq_t rkmq = RD_KAFKA_MSGQ_INITIALIZER(rkmq); rd_kafka_msg_t *rkm; rd_kafka_msgq_t sendq, sendq2; const size_t msgsize = 100; int i; RD_UT_SAY("%s: testing in %s mode", what, fifo ? "FIFO" : "LIFO"); for (i = 1; i <= 6; i++) { rkm = ut_rd_kafka_msg_new(msgsize); rkm->rkm_u.producer.msgid = i; rd_kafka_msgq_enq_sorted0(&rkmq, rkm, cmp); } if (fifo) { if (ut_verify_msgq_order("added", &rkmq, 1, 6, rd_true)) return 1; } else { if (ut_verify_msgq_order("added", &rkmq, 6, 1, rd_true)) return 1; } /* Move 3 messages to "send" queue which we then re-insert * in the original queue (i.e., "retry"). */ rd_kafka_msgq_init(&sendq); while (rd_kafka_msgq_len(&sendq) < 3) rd_kafka_msgq_enq(&sendq, rd_kafka_msgq_pop(&rkmq)); if (fifo) { if (ut_verify_msgq_order("send removed", &rkmq, 4, 6, rd_true)) return 1; if (ut_verify_msgq_order("sendq", &sendq, 1, 3, rd_true)) return 1; } else { if (ut_verify_msgq_order("send removed", &rkmq, 3, 1, rd_true)) return 1; if (ut_verify_msgq_order("sendq", &sendq, 6, 4, rd_true)) return 1; } /* Retry the messages, which moves them back to sendq * maintaining the original order */ rd_kafka_retry_msgq(&rkmq, &sendq, 1, 1, 0, RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp); RD_UT_ASSERT(rd_kafka_msgq_len(&sendq) == 0, "sendq FIFO should be empty, not contain %d messages", rd_kafka_msgq_len(&sendq)); if (fifo) { if (ut_verify_msgq_order("readded", &rkmq, 1, 6, rd_true)) return 1; } else { if (ut_verify_msgq_order("readded", &rkmq, 6, 1, rd_true)) return 1; } /* Move 4 first messages to to "send" queue, then * retry them with max_retries=1 which should now fail for * the 3 first messages that were already retried. */ rd_kafka_msgq_init(&sendq); while (rd_kafka_msgq_len(&sendq) < 4) rd_kafka_msgq_enq(&sendq, rd_kafka_msgq_pop(&rkmq)); if (fifo) { if (ut_verify_msgq_order("send removed #2", &rkmq, 5, 6, rd_true)) return 1; if (ut_verify_msgq_order("sendq #2", &sendq, 1, 4, rd_true)) return 1; } else { if (ut_verify_msgq_order("send removed #2", &rkmq, 2, 1, rd_true)) return 1; if (ut_verify_msgq_order("sendq #2", &sendq, 6, 3, rd_true)) return 1; } /* Retry the messages, which should now keep the 3 first messages * on sendq (no more retries) and just number 4 moved back. */ rd_kafka_retry_msgq(&rkmq, &sendq, 1, 1, 0, RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp); if (fifo) { if (ut_verify_msgq_order("readded #2", &rkmq, 4, 6, rd_true)) return 1; if (ut_verify_msgq_order("no more retries", &sendq, 1, 3, rd_true)) return 1; } else { if (ut_verify_msgq_order("readded #2", &rkmq, 3, 1, rd_true)) return 1; if (ut_verify_msgq_order("no more retries", &sendq, 6, 4, rd_true)) return 1; } /* Move all messages back on rkmq */ rd_kafka_retry_msgq(&rkmq, &sendq, 0, 1000, 0, RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp); /* Move first half of messages to sendq (1,2,3). * Move second half o messages to sendq2 (4,5,6). * Add new message to rkmq (7). * Move first half of messages back on rkmq (1,2,3,7). * Move second half back on the rkmq (1,2,3,4,5,6,7). */ rd_kafka_msgq_init(&sendq); rd_kafka_msgq_init(&sendq2); while (rd_kafka_msgq_len(&sendq) < 3) rd_kafka_msgq_enq(&sendq, rd_kafka_msgq_pop(&rkmq)); while (rd_kafka_msgq_len(&sendq2) < 3) rd_kafka_msgq_enq(&sendq2, rd_kafka_msgq_pop(&rkmq)); rkm = ut_rd_kafka_msg_new(msgsize); rkm->rkm_u.producer.msgid = i; rd_kafka_msgq_enq_sorted0(&rkmq, rkm, cmp); rd_kafka_retry_msgq(&rkmq, &sendq, 0, 1000, 0, RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp); rd_kafka_retry_msgq(&rkmq, &sendq2, 0, 1000, 0, RD_KAFKA_MSG_STATUS_NOT_PERSISTED, cmp); RD_UT_ASSERT(rd_kafka_msgq_len(&sendq) == 0, "sendq FIFO should be empty, not contain %d messages", rd_kafka_msgq_len(&sendq)); RD_UT_ASSERT(rd_kafka_msgq_len(&sendq2) == 0, "sendq2 FIFO should be empty, not contain %d messages", rd_kafka_msgq_len(&sendq2)); if (fifo) { if (ut_verify_msgq_order("inject", &rkmq, 1, 7, rd_true)) return 1; } else { if (ut_verify_msgq_order("readded #2", &rkmq, 7, 1, rd_true)) return 1; } RD_UT_ASSERT(rd_kafka_msgq_size(&rkmq) == rd_kafka_msgq_len(&rkmq) * msgsize, "expected msgq size %" PRIusz ", not %" PRIusz, (size_t)rd_kafka_msgq_len(&rkmq) * msgsize, rd_kafka_msgq_size(&rkmq)); ut_rd_kafka_msgq_purge(&sendq); ut_rd_kafka_msgq_purge(&sendq2); ut_rd_kafka_msgq_purge(&rkmq); return 0; } /** * @brief Verify that rd_kafka_seq_wrap() works. */ static int unittest_msg_seq_wrap(void) { static const struct exp { int64_t in; int32_t out; } exp[] = { {0, 0}, {1, 1}, {(int64_t)INT32_MAX + 2, 1}, {(int64_t)INT32_MAX + 1, 0}, {INT32_MAX, INT32_MAX}, {INT32_MAX - 1, INT32_MAX - 1}, {INT32_MAX - 2, INT32_MAX - 2}, {((int64_t)1 << 33) - 2, INT32_MAX - 1}, {((int64_t)1 << 33) - 1, INT32_MAX}, {((int64_t)1 << 34), 0}, {((int64_t)1 << 35) + 3, 3}, {1710 + 1229, 2939}, {-1, -1}, }; int i; for (i = 0; exp[i].in != -1; i++) { int32_t wseq = rd_kafka_seq_wrap(exp[i].in); RD_UT_ASSERT(wseq == exp[i].out, "Expected seq_wrap(%" PRId64 ") -> %" PRId32 ", not %" PRId32, exp[i].in, exp[i].out, wseq); } RD_UT_PASS(); } /** * @brief Populate message queue with message ids from lo..hi (inclusive) */ static void ut_msgq_populate(rd_kafka_msgq_t *rkmq, uint64_t lo, uint64_t hi, size_t msgsize) { uint64_t i; for (i = lo; i <= hi; i++) { rd_kafka_msg_t *rkm = ut_rd_kafka_msg_new(msgsize); rkm->rkm_u.producer.msgid = i; rd_kafka_msgq_enq(rkmq, rkm); } } struct ut_msg_range { uint64_t lo; uint64_t hi; }; /** * @brief Verify that msgq insert sorts are optimized. Issue #2508. * All source ranges are combined into a single queue before insert. */ static int unittest_msgq_insert_all_sort(const char *what, double max_us_per_msg, double *ret_us_per_msg, const struct ut_msg_range *src_ranges, const struct ut_msg_range *dest_ranges) { rd_kafka_msgq_t destq, srcq; int i; uint64_t lo = UINT64_MAX, hi = 0; uint64_t cnt = 0; const size_t msgsize = 100; size_t totsize = 0; rd_ts_t ts; double us_per_msg; RD_UT_SAY("Testing msgq insert (all) efficiency: %s", what); rd_kafka_msgq_init(&destq); rd_kafka_msgq_init(&srcq); for (i = 0; src_ranges[i].hi > 0; i++) { uint64_t this_cnt; ut_msgq_populate(&srcq, src_ranges[i].lo, src_ranges[i].hi, msgsize); if (src_ranges[i].lo < lo) lo = src_ranges[i].lo; if (src_ranges[i].hi > hi) hi = src_ranges[i].hi; this_cnt = (src_ranges[i].hi - src_ranges[i].lo) + 1; cnt += this_cnt; totsize += msgsize * (size_t)this_cnt; } for (i = 0; dest_ranges[i].hi > 0; i++) { uint64_t this_cnt; ut_msgq_populate(&destq, dest_ranges[i].lo, dest_ranges[i].hi, msgsize); if (dest_ranges[i].lo < lo) lo = dest_ranges[i].lo; if (dest_ranges[i].hi > hi) hi = dest_ranges[i].hi; this_cnt = (dest_ranges[i].hi - dest_ranges[i].lo) + 1; cnt += this_cnt; totsize += msgsize * (size_t)this_cnt; } RD_UT_SAY("Begin insert of %d messages into destq with %d messages", rd_kafka_msgq_len(&srcq), rd_kafka_msgq_len(&destq)); ts = rd_clock(); rd_kafka_msgq_insert_msgq(&destq, &srcq, rd_kafka_msg_cmp_msgid); ts = rd_clock() - ts; us_per_msg = (double)ts / (double)cnt; RD_UT_SAY("Done: took %" PRId64 "us, %.4fus/msg", ts, us_per_msg); RD_UT_ASSERT(rd_kafka_msgq_len(&srcq) == 0, "srcq should be empty, but contains %d messages", rd_kafka_msgq_len(&srcq)); RD_UT_ASSERT(rd_kafka_msgq_len(&destq) == (int)cnt, "destq should contain %d messages, not %d", (int)cnt, rd_kafka_msgq_len(&destq)); if (ut_verify_msgq_order("after", &destq, lo, hi, rd_false)) return 1; RD_UT_ASSERT(rd_kafka_msgq_size(&destq) == totsize, "expected destq size to be %" PRIusz " bytes, not %" PRIusz, totsize, rd_kafka_msgq_size(&destq)); ut_rd_kafka_msgq_purge(&srcq); ut_rd_kafka_msgq_purge(&destq); if (!rd_unittest_slow) RD_UT_ASSERT(!(us_per_msg > max_us_per_msg + 0.0001), "maximum us/msg exceeded: %.4f > %.4f us/msg", us_per_msg, max_us_per_msg); else if (us_per_msg > max_us_per_msg + 0.0001) RD_UT_WARN("maximum us/msg exceeded: %.4f > %.4f us/msg", us_per_msg, max_us_per_msg); if (ret_us_per_msg) *ret_us_per_msg = us_per_msg; RD_UT_PASS(); } /** * @brief Verify that msgq insert sorts are optimized. Issue #2508. * Inserts each source range individually. */ static int unittest_msgq_insert_each_sort(const char *what, double max_us_per_msg, double *ret_us_per_msg, const struct ut_msg_range *src_ranges, const struct ut_msg_range *dest_ranges) { rd_kafka_msgq_t destq; int i; uint64_t lo = UINT64_MAX, hi = 0; uint64_t cnt = 0; uint64_t scnt = 0; const size_t msgsize = 100; size_t totsize = 0; double us_per_msg; rd_ts_t accum_ts = 0; RD_UT_SAY("Testing msgq insert (each) efficiency: %s", what); rd_kafka_msgq_init(&destq); for (i = 0; dest_ranges[i].hi > 0; i++) { uint64_t this_cnt; ut_msgq_populate(&destq, dest_ranges[i].lo, dest_ranges[i].hi, msgsize); if (dest_ranges[i].lo < lo) lo = dest_ranges[i].lo; if (dest_ranges[i].hi > hi) hi = dest_ranges[i].hi; this_cnt = (dest_ranges[i].hi - dest_ranges[i].lo) + 1; cnt += this_cnt; totsize += msgsize * (size_t)this_cnt; } for (i = 0; src_ranges[i].hi > 0; i++) { rd_kafka_msgq_t srcq; uint64_t this_cnt; rd_ts_t ts; rd_kafka_msgq_init(&srcq); ut_msgq_populate(&srcq, src_ranges[i].lo, src_ranges[i].hi, msgsize); if (src_ranges[i].lo < lo) lo = src_ranges[i].lo; if (src_ranges[i].hi > hi) hi = src_ranges[i].hi; this_cnt = (src_ranges[i].hi - src_ranges[i].lo) + 1; cnt += this_cnt; scnt += this_cnt; totsize += msgsize * (size_t)this_cnt; RD_UT_SAY( "Begin insert of %d messages into destq with " "%d messages", rd_kafka_msgq_len(&srcq), rd_kafka_msgq_len(&destq)); ts = rd_clock(); rd_kafka_msgq_insert_msgq(&destq, &srcq, rd_kafka_msg_cmp_msgid); ts = rd_clock() - ts; accum_ts += ts; RD_UT_SAY("Done: took %" PRId64 "us, %.4fus/msg", ts, (double)ts / (double)this_cnt); RD_UT_ASSERT(rd_kafka_msgq_len(&srcq) == 0, "srcq should be empty, but contains %d messages", rd_kafka_msgq_len(&srcq)); RD_UT_ASSERT(rd_kafka_msgq_len(&destq) == (int)cnt, "destq should contain %d messages, not %d", (int)cnt, rd_kafka_msgq_len(&destq)); if (ut_verify_msgq_order("after", &destq, lo, hi, rd_false)) return 1; RD_UT_ASSERT(rd_kafka_msgq_size(&destq) == totsize, "expected destq size to be %" PRIusz " bytes, not %" PRIusz, totsize, rd_kafka_msgq_size(&destq)); ut_rd_kafka_msgq_purge(&srcq); } ut_rd_kafka_msgq_purge(&destq); us_per_msg = (double)accum_ts / (double)scnt; RD_UT_SAY("Total: %.4fus/msg over %" PRId64 " messages in %" PRId64 "us", us_per_msg, scnt, accum_ts); if (!rd_unittest_slow) RD_UT_ASSERT(!(us_per_msg > max_us_per_msg + 0.0001), "maximum us/msg exceeded: %.4f > %.4f us/msg", us_per_msg, max_us_per_msg); else if (us_per_msg > max_us_per_msg + 0.0001) RD_UT_WARN("maximum us/msg exceeded: %.4f > %.4f us/msg", us_per_msg, max_us_per_msg); if (ret_us_per_msg) *ret_us_per_msg = us_per_msg; RD_UT_PASS(); } /** * @brief Calls both insert_all and insert_each */ static int unittest_msgq_insert_sort(const char *what, double max_us_per_msg, double *ret_us_per_msg, const struct ut_msg_range *src_ranges, const struct ut_msg_range *dest_ranges) { double ret_all = 0.0, ret_each = 0.0; int r; r = unittest_msgq_insert_all_sort(what, max_us_per_msg, &ret_all, src_ranges, dest_ranges); if (r) return r; r = unittest_msgq_insert_each_sort(what, max_us_per_msg, &ret_each, src_ranges, dest_ranges); if (r) return r; if (ret_us_per_msg) *ret_us_per_msg = RD_MAX(ret_all, ret_each); return 0; } int unittest_msg(void) { int fails = 0; double insert_baseline = 0.0; fails += unittest_msgq_order("FIFO", 1, rd_kafka_msg_cmp_msgid); fails += unittest_msg_seq_wrap(); fails += unittest_msgq_insert_sort( "get baseline insert time", 100000.0, &insert_baseline, (const struct ut_msg_range[]) {{1, 1}, {3, 3}, {0, 0}}, (const struct ut_msg_range[]) {{2, 2}, {4, 4}, {0, 0}}); /* Allow some wiggle room in baseline time. */ if (insert_baseline < 0.1) insert_baseline = 0.2; insert_baseline *= 3; fails += unittest_msgq_insert_sort( "single-message ranges", insert_baseline, NULL, (const struct ut_msg_range[]) { {2, 2}, {4, 4}, {9, 9}, {33692864, 33692864}, {0, 0}}, (const struct ut_msg_range[]) {{1, 1}, {3, 3}, {5, 5}, {10, 10}, {33692865, 33692865}, {0, 0}}); fails += unittest_msgq_insert_sort( "many messages", insert_baseline, NULL, (const struct ut_msg_range[]) {{100000, 200000}, {400000, 450000}, {900000, 920000}, {33692864, 33751992}, {33906868, 33993690}, {40000000, 44000000}, {0, 0}}, (const struct ut_msg_range[]) {{1, 199}, {350000, 360000}, {500000, 500010}, {1000000, 1000200}, {33751993, 33906867}, {50000001, 50000001}, {0, 0}}); fails += unittest_msgq_insert_sort( "issue #2508", insert_baseline, NULL, (const struct ut_msg_range[]) { {33692864, 33751992}, {33906868, 33993690}, {0, 0}}, (const struct ut_msg_range[]) {{33751993, 33906867}, {0, 0}}); /* The standard case where all of the srcq * goes after the destq. * Create a big destq and a number of small srcqs. * Should not result in O(n) scans to find the insert position. */ fails += unittest_msgq_insert_sort( "issue #2450 (v1.2.1 regression)", insert_baseline, NULL, (const struct ut_msg_range[]) {{200000, 200001}, {200002, 200006}, {200009, 200012}, {200015, 200016}, {200020, 200022}, {200030, 200090}, {200091, 200092}, {200093, 200094}, {200095, 200096}, {200097, 200099}, {0, 0}}, (const struct ut_msg_range[]) {{1, 199999}, {0, 0}}); return fails; }