diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_mock.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_mock.c | 2585 |
1 files changed, 2585 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_mock.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_mock.c new file mode 100644 index 00000000..ae794053 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_mock.c @@ -0,0 +1,2585 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2019 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/** + * Mocks + * + */ + +#include "rdkafka_int.h" +#include "rdbuf.h" +#include "rdrand.h" +#include "rdkafka_interceptor.h" +#include "rdkafka_mock_int.h" +#include "rdkafka_transport_int.h" + +#include <stdarg.h> + +static void rd_kafka_mock_cluster_destroy0(rd_kafka_mock_cluster_t *mcluster); + + +static rd_kafka_mock_broker_t * +rd_kafka_mock_broker_find(const rd_kafka_mock_cluster_t *mcluster, + int32_t broker_id) { + const rd_kafka_mock_broker_t *mrkb; + + TAILQ_FOREACH(mrkb, &mcluster->brokers, link) + if (mrkb->id == broker_id) + return (rd_kafka_mock_broker_t *)mrkb; + + return NULL; +} + + + +/** + * @brief Unlink and free message set. + */ +static void rd_kafka_mock_msgset_destroy(rd_kafka_mock_partition_t *mpart, + rd_kafka_mock_msgset_t *mset) { + const rd_kafka_mock_msgset_t *next = TAILQ_NEXT(mset, link); + + /* Removing last messageset */ + if (!next) + mpart->start_offset = mpart->end_offset; + else if (mset == TAILQ_FIRST(&mpart->msgsets)) + /* Removing first messageset */ + mpart->start_offset = next->first_offset; + + if (mpart->update_follower_start_offset) + mpart->follower_start_offset = mpart->start_offset; + + rd_assert(mpart->cnt > 0); + mpart->cnt--; + mpart->size -= RD_KAFKAP_BYTES_LEN(&mset->bytes); + TAILQ_REMOVE(&mpart->msgsets, mset, link); + rd_free(mset); +} + + +/** + * @brief Create a new msgset object with a copy of \p bytes + * and appends it to the partition log. + */ +static rd_kafka_mock_msgset_t * +rd_kafka_mock_msgset_new(rd_kafka_mock_partition_t *mpart, + const rd_kafkap_bytes_t *bytes, + size_t msgcnt) { + rd_kafka_mock_msgset_t *mset; + size_t totsize = sizeof(*mset) + RD_KAFKAP_BYTES_LEN(bytes); + int64_t BaseOffset; + int32_t PartitionLeaderEpoch; + int64_t orig_start_offset = mpart->start_offset; + + rd_assert(!RD_KAFKAP_BYTES_IS_NULL(bytes)); + + mset = rd_malloc(totsize); + rd_assert(mset != NULL); + + mset->first_offset = mpart->end_offset; + mset->last_offset = mset->first_offset + msgcnt - 1; + mpart->end_offset = mset->last_offset + 1; + if (mpart->update_follower_end_offset) + mpart->follower_end_offset = mpart->end_offset; + mpart->cnt++; + + mset->bytes.len = bytes->len; + mset->leader_epoch = mpart->leader_epoch; + + + mset->bytes.data = (void *)(mset + 1); + memcpy((void *)mset->bytes.data, bytes->data, mset->bytes.len); + mpart->size += mset->bytes.len; + + /* Update the base Offset in the MessageSet with the + * actual absolute log offset. */ + BaseOffset = htobe64(mset->first_offset); + memcpy((void *)mset->bytes.data, &BaseOffset, sizeof(BaseOffset)); + /* Update the base PartitionLeaderEpoch in the MessageSet with the + * actual partition leader epoch. */ + PartitionLeaderEpoch = htobe32(mset->leader_epoch); + memcpy(((char *)mset->bytes.data) + 12, &PartitionLeaderEpoch, + sizeof(PartitionLeaderEpoch)); + + /* Remove old msgsets until within limits */ + while (mpart->cnt > 1 && + (mpart->cnt > mpart->max_cnt || mpart->size > mpart->max_size)) + rd_kafka_mock_msgset_destroy(mpart, + TAILQ_FIRST(&mpart->msgsets)); + + TAILQ_INSERT_TAIL(&mpart->msgsets, mset, link); + + rd_kafka_dbg(mpart->topic->cluster->rk, MOCK, "MOCK", + "Broker %" PRId32 ": Log append %s [%" PRId32 + "] " + "%" PRIusz " messages, %" PRId32 + " bytes at offset %" PRId64 " (log now %" PRId64 + "..%" PRId64 + ", " + "original start %" PRId64 ")", + mpart->leader->id, mpart->topic->name, mpart->id, msgcnt, + RD_KAFKAP_BYTES_LEN(&mset->bytes), mset->first_offset, + mpart->start_offset, mpart->end_offset, orig_start_offset); + + return mset; +} + +/** + * @brief Find message set containing \p offset + */ +const rd_kafka_mock_msgset_t * +rd_kafka_mock_msgset_find(const rd_kafka_mock_partition_t *mpart, + int64_t offset, + rd_bool_t on_follower) { + const rd_kafka_mock_msgset_t *mset; + + if (!on_follower && + (offset < mpart->start_offset || offset > mpart->end_offset)) + return NULL; + + if (on_follower && (offset < mpart->follower_start_offset || + offset > mpart->follower_end_offset)) + return NULL; + + /* FIXME: Maintain an index */ + + TAILQ_FOREACH(mset, &mpart->msgsets, link) { + if (mset->first_offset <= offset && offset <= mset->last_offset) + return mset; + } + + return NULL; +} + + +/** + * @brief Looks up or creates a new pidstate for the given partition and PID. + * + * The pidstate is used to verify per-partition per-producer BaseSequences + * for the idempotent/txn producer. + */ +static rd_kafka_mock_pid_t * +rd_kafka_mock_partition_pidstate_get(rd_kafka_mock_partition_t *mpart, + const rd_kafka_mock_pid_t *mpid) { + rd_kafka_mock_pid_t *pidstate; + size_t tidlen; + + pidstate = rd_list_find(&mpart->pidstates, mpid, rd_kafka_mock_pid_cmp); + if (pidstate) + return pidstate; + + tidlen = strlen(mpid->TransactionalId); + pidstate = rd_malloc(sizeof(*pidstate) + tidlen); + pidstate->pid = mpid->pid; + memcpy(pidstate->TransactionalId, mpid->TransactionalId, tidlen); + pidstate->TransactionalId[tidlen] = '\0'; + + pidstate->lo = pidstate->hi = pidstate->window = 0; + memset(pidstate->seq, 0, sizeof(pidstate->seq)); + + rd_list_add(&mpart->pidstates, pidstate); + + return pidstate; +} + + +/** + * @brief Validate ProduceRequest records in \p rkbuf. + * + * @warning The \p rkbuf must not be read, just peek()ed. + * + * This is a very selective validation, currently only: + * - verify idempotency TransactionalId,PID,Epoch,Seq + */ +static rd_kafka_resp_err_t +rd_kafka_mock_validate_records(rd_kafka_mock_partition_t *mpart, + rd_kafka_buf_t *rkbuf, + size_t RecordCount, + const rd_kafkap_str_t *TransactionalId, + rd_bool_t *is_dupd) { + const int log_decode_errors = LOG_ERR; + rd_kafka_mock_cluster_t *mcluster = mpart->topic->cluster; + rd_kafka_mock_pid_t *mpid; + rd_kafka_mock_pid_t *mpidstate = NULL; + rd_kafka_pid_t pid; + int32_t expected_BaseSequence = -1, BaseSequence = -1; + rd_kafka_resp_err_t err; + + *is_dupd = rd_false; + + if (!TransactionalId || RD_KAFKAP_STR_LEN(TransactionalId) < 1) + return RD_KAFKA_RESP_ERR_NO_ERROR; + + rd_kafka_buf_peek_i64(rkbuf, RD_KAFKAP_MSGSET_V2_OF_ProducerId, + &pid.id); + rd_kafka_buf_peek_i16(rkbuf, RD_KAFKAP_MSGSET_V2_OF_ProducerEpoch, + &pid.epoch); + rd_kafka_buf_peek_i32(rkbuf, RD_KAFKAP_MSGSET_V2_OF_BaseSequence, + &BaseSequence); + + mtx_lock(&mcluster->lock); + err = rd_kafka_mock_pid_find(mcluster, TransactionalId, pid, &mpid); + mtx_unlock(&mcluster->lock); + + if (likely(!err)) { + + if (mpid->pid.epoch != pid.epoch) + err = RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH; + + /* Each partition tracks the 5 last Produce requests per PID.*/ + mpidstate = rd_kafka_mock_partition_pidstate_get(mpart, mpid); + + expected_BaseSequence = mpidstate->seq[mpidstate->hi]; + + /* A BaseSequence within the range of the last 5 requests is + * considered a legal duplicate and will be successfully acked + * but not written to the log. */ + if (BaseSequence < mpidstate->seq[mpidstate->lo]) + err = RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER; + else if (BaseSequence > mpidstate->seq[mpidstate->hi]) + err = RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER; + else if (BaseSequence != expected_BaseSequence) + *is_dupd = rd_true; + } + + if (unlikely(err)) { + rd_kafka_dbg(mcluster->rk, MOCK, "MOCK", + "Broker %" PRId32 ": Log append %s [%" PRId32 + "] failed: PID mismatch: TransactionalId=%.*s " + "expected %s BaseSeq %" PRId32 + ", not %s BaseSeq %" PRId32 ": %s", + mpart->leader->id, mpart->topic->name, mpart->id, + RD_KAFKAP_STR_PR(TransactionalId), + mpid ? rd_kafka_pid2str(mpid->pid) : "n/a", + expected_BaseSequence, rd_kafka_pid2str(pid), + BaseSequence, rd_kafka_err2name(err)); + return err; + } + + /* Update BaseSequence window */ + if (unlikely(mpidstate->window < 5)) + mpidstate->window++; + else + mpidstate->lo = (mpidstate->lo + 1) % mpidstate->window; + mpidstate->hi = (mpidstate->hi + 1) % mpidstate->window; + mpidstate->seq[mpidstate->hi] = (int32_t)(BaseSequence + RecordCount); + + return RD_KAFKA_RESP_ERR_NO_ERROR; + +err_parse: + return rkbuf->rkbuf_err; +} + +/** + * @brief Append the MessageSets in \p bytes to the \p mpart partition log. + * + * @param BaseOffset will contain the first assigned offset of the message set. + */ +rd_kafka_resp_err_t +rd_kafka_mock_partition_log_append(rd_kafka_mock_partition_t *mpart, + const rd_kafkap_bytes_t *records, + const rd_kafkap_str_t *TransactionalId, + int64_t *BaseOffset) { + const int log_decode_errors = LOG_ERR; + rd_kafka_buf_t *rkbuf; + rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; + int8_t MagicByte; + int32_t RecordCount; + int16_t Attributes; + rd_kafka_mock_msgset_t *mset; + rd_bool_t is_dup = rd_false; + + /* Partially parse the MessageSet in \p bytes to get + * the message count. */ + rkbuf = rd_kafka_buf_new_shadow(records->data, + RD_KAFKAP_BYTES_LEN(records), NULL); + + rd_kafka_buf_peek_i8(rkbuf, RD_KAFKAP_MSGSET_V2_OF_MagicByte, + &MagicByte); + if (MagicByte != 2) { + /* We only support MsgVersion 2 for now */ + err = RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION; + goto err; + } + + rd_kafka_buf_peek_i32(rkbuf, RD_KAFKAP_MSGSET_V2_OF_RecordCount, + &RecordCount); + rd_kafka_buf_peek_i16(rkbuf, RD_KAFKAP_MSGSET_V2_OF_Attributes, + &Attributes); + + if (RecordCount < 1 || + (!(Attributes & RD_KAFKA_MSG_ATTR_COMPRESSION_MASK) && + (size_t)RecordCount > RD_KAFKAP_BYTES_LEN(records) / + RD_KAFKAP_MESSAGE_V2_MIN_OVERHEAD)) { + err = RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE; + goto err; + } + + if ((err = rd_kafka_mock_validate_records( + mpart, rkbuf, (size_t)RecordCount, TransactionalId, &is_dup))) + goto err; + + /* If this is a legit duplicate, don't write it to the log. */ + if (is_dup) + goto err; + + rd_kafka_buf_destroy(rkbuf); + + mset = rd_kafka_mock_msgset_new(mpart, records, (size_t)RecordCount); + + *BaseOffset = mset->first_offset; + + return RD_KAFKA_RESP_ERR_NO_ERROR; + +err_parse: + err = rkbuf->rkbuf_err; +err: + rd_kafka_buf_destroy(rkbuf); + return err; +} + + +/** + * @brief Set the partition leader, or NULL for leader-less. + */ +static void +rd_kafka_mock_partition_set_leader0(rd_kafka_mock_partition_t *mpart, + rd_kafka_mock_broker_t *mrkb) { + mpart->leader = mrkb; + mpart->leader_epoch++; +} + + +/** + * @brief Verifies that the client-provided leader_epoch matches that of the + * partition, else returns the appropriate error. + */ +rd_kafka_resp_err_t rd_kafka_mock_partition_leader_epoch_check( + const rd_kafka_mock_partition_t *mpart, + int32_t leader_epoch) { + if (likely(leader_epoch == -1 || mpart->leader_epoch == leader_epoch)) + return RD_KAFKA_RESP_ERR_NO_ERROR; + else if (mpart->leader_epoch < leader_epoch) + return RD_KAFKA_RESP_ERR_UNKNOWN_LEADER_EPOCH; + else if (mpart->leader_epoch > leader_epoch) + return RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH; + + /* NOTREACHED, but avoids warning */ + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + +/** + * @brief Returns the end offset (last offset + 1) + * for the passed leader epoch in the mock partition. + * + * @param mpart The mock partition + * @param leader_epoch The leader epoch + * + * @return The end offset for the passed \p leader_epoch in \p mpart + */ +int64_t rd_kafka_mock_partition_offset_for_leader_epoch( + const rd_kafka_mock_partition_t *mpart, + int32_t leader_epoch) { + const rd_kafka_mock_msgset_t *mset = NULL; + + if (leader_epoch < 0) + return -1; + + TAILQ_FOREACH_REVERSE(mset, &mpart->msgsets, + rd_kafka_mock_msgset_tailq_s, link) { + if (mset->leader_epoch == leader_epoch) + return mset->last_offset + 1; + } + + return -1; +} + + +/** + * @brief Automatically assign replicas for partition + */ +static void +rd_kafka_mock_partition_assign_replicas(rd_kafka_mock_partition_t *mpart) { + rd_kafka_mock_cluster_t *mcluster = mpart->topic->cluster; + int replica_cnt = + RD_MIN(mcluster->defaults.replication_factor, mcluster->broker_cnt); + rd_kafka_mock_broker_t *mrkb; + int i = 0; + + if (mpart->replicas) + rd_free(mpart->replicas); + + mpart->replicas = rd_calloc(replica_cnt, sizeof(*mpart->replicas)); + mpart->replica_cnt = replica_cnt; + + /* FIXME: randomize this using perhaps reservoir sampling */ + TAILQ_FOREACH(mrkb, &mcluster->brokers, link) { + if (i == mpart->replica_cnt) + break; + mpart->replicas[i++] = mrkb; + } + + /* Select a random leader */ + rd_kafka_mock_partition_set_leader0( + mpart, mpart->replicas[rd_jitter(0, replica_cnt - 1)]); +} + + + +/** + * @brief Unlink and destroy committed offset + */ +static void +rd_kafka_mock_committed_offset_destroy(rd_kafka_mock_partition_t *mpart, + rd_kafka_mock_committed_offset_t *coff) { + rd_kafkap_str_destroy(coff->metadata); + TAILQ_REMOVE(&mpart->committed_offsets, coff, link); + rd_free(coff); +} + + +/** + * @brief Find previously committed offset for group. + */ +rd_kafka_mock_committed_offset_t * +rd_kafka_mock_committed_offset_find(const rd_kafka_mock_partition_t *mpart, + const rd_kafkap_str_t *group) { + const rd_kafka_mock_committed_offset_t *coff; + + TAILQ_FOREACH(coff, &mpart->committed_offsets, link) { + if (!rd_kafkap_str_cmp_str(group, coff->group)) + return (rd_kafka_mock_committed_offset_t *)coff; + } + + return NULL; +} + + +/** + * @brief Commit offset for group + */ +rd_kafka_mock_committed_offset_t * +rd_kafka_mock_commit_offset(rd_kafka_mock_partition_t *mpart, + const rd_kafkap_str_t *group, + int64_t offset, + const rd_kafkap_str_t *metadata) { + rd_kafka_mock_committed_offset_t *coff; + + if (!(coff = rd_kafka_mock_committed_offset_find(mpart, group))) { + size_t slen = (size_t)RD_KAFKAP_STR_LEN(group); + + coff = rd_malloc(sizeof(*coff) + slen + 1); + + coff->group = (char *)(coff + 1); + memcpy(coff->group, group->str, slen); + coff->group[slen] = '\0'; + + coff->metadata = NULL; + + TAILQ_INSERT_HEAD(&mpart->committed_offsets, coff, link); + } + + if (coff->metadata) + rd_kafkap_str_destroy(coff->metadata); + + coff->metadata = rd_kafkap_str_copy(metadata); + + coff->offset = offset; + + rd_kafka_dbg(mpart->topic->cluster->rk, MOCK, "MOCK", + "Topic %s [%" PRId32 "] committing offset %" PRId64 + " for group %.*s", + mpart->topic->name, mpart->id, offset, + RD_KAFKAP_STR_PR(group)); + + return coff; +} + +/** + * @brief Destroy resources for partition, but the \p mpart itself is not freed. + */ +static void rd_kafka_mock_partition_destroy(rd_kafka_mock_partition_t *mpart) { + rd_kafka_mock_msgset_t *mset, *tmp; + rd_kafka_mock_committed_offset_t *coff, *tmpcoff; + + TAILQ_FOREACH_SAFE(mset, &mpart->msgsets, link, tmp) + rd_kafka_mock_msgset_destroy(mpart, mset); + + TAILQ_FOREACH_SAFE(coff, &mpart->committed_offsets, link, tmpcoff) + rd_kafka_mock_committed_offset_destroy(mpart, coff); + + rd_list_destroy(&mpart->pidstates); + + rd_free(mpart->replicas); +} + + +static void rd_kafka_mock_partition_init(rd_kafka_mock_topic_t *mtopic, + rd_kafka_mock_partition_t *mpart, + int id, + int replication_factor) { + mpart->topic = mtopic; + mpart->id = id; + + mpart->follower_id = -1; + mpart->leader_epoch = -1; /* Start at -1 since assign_replicas() will + * bump it right away to 0. */ + + TAILQ_INIT(&mpart->msgsets); + + mpart->max_size = 1024 * 1024 * 5; + mpart->max_cnt = 100000; + + mpart->update_follower_start_offset = rd_true; + mpart->update_follower_end_offset = rd_true; + + TAILQ_INIT(&mpart->committed_offsets); + + rd_list_init(&mpart->pidstates, 0, rd_free); + + rd_kafka_mock_partition_assign_replicas(mpart); +} + +rd_kafka_mock_partition_t * +rd_kafka_mock_partition_find(const rd_kafka_mock_topic_t *mtopic, + int32_t partition) { + if (!mtopic || partition < 0 || partition >= mtopic->partition_cnt) + return NULL; + + return (rd_kafka_mock_partition_t *)&mtopic->partitions[partition]; +} + + +static void rd_kafka_mock_topic_destroy(rd_kafka_mock_topic_t *mtopic) { + int i; + + for (i = 0; i < mtopic->partition_cnt; i++) + rd_kafka_mock_partition_destroy(&mtopic->partitions[i]); + + TAILQ_REMOVE(&mtopic->cluster->topics, mtopic, link); + mtopic->cluster->topic_cnt--; + + rd_free(mtopic->partitions); + rd_free(mtopic->name); + rd_free(mtopic); +} + + +static rd_kafka_mock_topic_t * +rd_kafka_mock_topic_new(rd_kafka_mock_cluster_t *mcluster, + const char *topic, + int partition_cnt, + int replication_factor) { + rd_kafka_mock_topic_t *mtopic; + int i; + + mtopic = rd_calloc(1, sizeof(*mtopic)); + mtopic->name = rd_strdup(topic); + mtopic->cluster = mcluster; + + mtopic->partition_cnt = partition_cnt; + mtopic->partitions = + rd_calloc(partition_cnt, sizeof(*mtopic->partitions)); + + for (i = 0; i < partition_cnt; i++) + rd_kafka_mock_partition_init(mtopic, &mtopic->partitions[i], i, + replication_factor); + + TAILQ_INSERT_TAIL(&mcluster->topics, mtopic, link); + mcluster->topic_cnt++; + + rd_kafka_dbg(mcluster->rk, MOCK, "MOCK", + "Created topic \"%s\" with %d partition(s) and " + "replication-factor %d", + mtopic->name, mtopic->partition_cnt, replication_factor); + + return mtopic; +} + + +rd_kafka_mock_topic_t * +rd_kafka_mock_topic_find(const rd_kafka_mock_cluster_t *mcluster, + const char *name) { + const rd_kafka_mock_topic_t *mtopic; + + TAILQ_FOREACH(mtopic, &mcluster->topics, link) { + if (!strcmp(mtopic->name, name)) + return (rd_kafka_mock_topic_t *)mtopic; + } + + return NULL; +} + + +rd_kafka_mock_topic_t * +rd_kafka_mock_topic_find_by_kstr(const rd_kafka_mock_cluster_t *mcluster, + const rd_kafkap_str_t *kname) { + const rd_kafka_mock_topic_t *mtopic; + + TAILQ_FOREACH(mtopic, &mcluster->topics, link) { + if (!strncmp(mtopic->name, kname->str, + RD_KAFKAP_STR_LEN(kname)) && + mtopic->name[RD_KAFKAP_STR_LEN(kname)] == '\0') + return (rd_kafka_mock_topic_t *)mtopic; + } + + return NULL; +} + + +/** + * @brief Create a topic using default settings. + * The topic must not already exist. + * + * @param errp will be set to an error code that is consistent with + * new topics on real clusters. + */ +rd_kafka_mock_topic_t * +rd_kafka_mock_topic_auto_create(rd_kafka_mock_cluster_t *mcluster, + const char *topic, + int partition_cnt, + rd_kafka_resp_err_t *errp) { + rd_assert(!rd_kafka_mock_topic_find(mcluster, topic)); + *errp = 0; // FIXME? RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE; + return rd_kafka_mock_topic_new(mcluster, topic, + partition_cnt == -1 + ? mcluster->defaults.partition_cnt + : partition_cnt, + mcluster->defaults.replication_factor); +} + + +/** + * @brief Find or create topic. + * + * @param partition_cnt If not -1 and the topic does not exist, the automatic + * topic creation will create this number of topics. + * Otherwise use the default. + */ +rd_kafka_mock_topic_t * +rd_kafka_mock_topic_get(rd_kafka_mock_cluster_t *mcluster, + const char *topic, + int partition_cnt) { + rd_kafka_mock_topic_t *mtopic; + rd_kafka_resp_err_t err; + + if ((mtopic = rd_kafka_mock_topic_find(mcluster, topic))) + return mtopic; + + return rd_kafka_mock_topic_auto_create(mcluster, topic, partition_cnt, + &err); +} + +/** + * @brief Find or create a partition. + * + * @returns NULL if topic already exists and partition is out of range. + */ +static rd_kafka_mock_partition_t * +rd_kafka_mock_partition_get(rd_kafka_mock_cluster_t *mcluster, + const char *topic, + int32_t partition) { + rd_kafka_mock_topic_t *mtopic; + rd_kafka_resp_err_t err; + + if (!(mtopic = rd_kafka_mock_topic_find(mcluster, topic))) + mtopic = rd_kafka_mock_topic_auto_create(mcluster, topic, + partition + 1, &err); + + if (partition >= mtopic->partition_cnt) + return NULL; + + return &mtopic->partitions[partition]; +} + + +/** + * @brief Set IO events for fd + */ +static void +rd_kafka_mock_cluster_io_set_events(rd_kafka_mock_cluster_t *mcluster, + rd_socket_t fd, + int events) { + int i; + + for (i = 0; i < mcluster->fd_cnt; i++) { + if (mcluster->fds[i].fd == fd) { + mcluster->fds[i].events |= events; + return; + } + } + + rd_assert(!*"mock_cluster_io_set_events: fd not found"); +} + +/** + * @brief Set or clear single IO events for fd + */ +static void +rd_kafka_mock_cluster_io_set_event(rd_kafka_mock_cluster_t *mcluster, + rd_socket_t fd, + rd_bool_t set, + int event) { + int i; + + for (i = 0; i < mcluster->fd_cnt; i++) { + if (mcluster->fds[i].fd == fd) { + if (set) + mcluster->fds[i].events |= event; + else + mcluster->fds[i].events &= ~event; + return; + } + } + + rd_assert(!*"mock_cluster_io_set_event: fd not found"); +} + + +/** + * @brief Clear IO events for fd + */ +static void +rd_kafka_mock_cluster_io_clear_events(rd_kafka_mock_cluster_t *mcluster, + rd_socket_t fd, + int events) { + int i; + + for (i = 0; i < mcluster->fd_cnt; i++) { + if (mcluster->fds[i].fd == fd) { + mcluster->fds[i].events &= ~events; + return; + } + } + + rd_assert(!*"mock_cluster_io_set_events: fd not found"); +} + + +static void rd_kafka_mock_cluster_io_del(rd_kafka_mock_cluster_t *mcluster, + rd_socket_t fd) { + int i; + + for (i = 0; i < mcluster->fd_cnt; i++) { + if (mcluster->fds[i].fd == fd) { + if (i + 1 < mcluster->fd_cnt) { + memmove(&mcluster->fds[i], + &mcluster->fds[i + 1], + sizeof(*mcluster->fds) * + (mcluster->fd_cnt - i)); + memmove(&mcluster->handlers[i], + &mcluster->handlers[i + 1], + sizeof(*mcluster->handlers) * + (mcluster->fd_cnt - i)); + } + + mcluster->fd_cnt--; + return; + } + } + + rd_assert(!*"mock_cluster_io_del: fd not found"); +} + + +/** + * @brief Add \p fd to IO poll with initial desired events (POLLIN, et.al). + */ +static void rd_kafka_mock_cluster_io_add(rd_kafka_mock_cluster_t *mcluster, + rd_socket_t fd, + int events, + rd_kafka_mock_io_handler_t handler, + void *opaque) { + + if (mcluster->fd_cnt + 1 >= mcluster->fd_size) { + mcluster->fd_size += 8; + + mcluster->fds = rd_realloc( + mcluster->fds, sizeof(*mcluster->fds) * mcluster->fd_size); + mcluster->handlers = + rd_realloc(mcluster->handlers, + sizeof(*mcluster->handlers) * mcluster->fd_size); + } + + memset(&mcluster->fds[mcluster->fd_cnt], 0, + sizeof(mcluster->fds[mcluster->fd_cnt])); + mcluster->fds[mcluster->fd_cnt].fd = fd; + mcluster->fds[mcluster->fd_cnt].events = events; + mcluster->fds[mcluster->fd_cnt].revents = 0; + mcluster->handlers[mcluster->fd_cnt].cb = handler; + mcluster->handlers[mcluster->fd_cnt].opaque = opaque; + mcluster->fd_cnt++; +} + + +static void rd_kafka_mock_connection_close(rd_kafka_mock_connection_t *mconn, + const char *reason) { + rd_kafka_buf_t *rkbuf; + + rd_kafka_dbg(mconn->broker->cluster->rk, MOCK, "MOCK", + "Broker %" PRId32 ": Connection from %s closed: %s", + mconn->broker->id, + rd_sockaddr2str(&mconn->peer, RD_SOCKADDR2STR_F_PORT), + reason); + + rd_kafka_mock_cgrps_connection_closed(mconn->broker->cluster, mconn); + + rd_kafka_timer_stop(&mconn->broker->cluster->timers, &mconn->write_tmr, + rd_true); + + while ((rkbuf = TAILQ_FIRST(&mconn->outbufs.rkbq_bufs))) { + rd_kafka_bufq_deq(&mconn->outbufs, rkbuf); + rd_kafka_buf_destroy(rkbuf); + } + + if (mconn->rxbuf) + rd_kafka_buf_destroy(mconn->rxbuf); + + rd_kafka_mock_cluster_io_del(mconn->broker->cluster, + mconn->transport->rktrans_s); + TAILQ_REMOVE(&mconn->broker->connections, mconn, link); + rd_kafka_transport_close(mconn->transport); + rd_free(mconn); +} + + +void rd_kafka_mock_connection_send_response(rd_kafka_mock_connection_t *mconn, + rd_kafka_buf_t *resp) { + + if (resp->rkbuf_flags & RD_KAFKA_OP_F_FLEXVER) { + /* Empty struct tags */ + rd_kafka_buf_write_i8(resp, 0); + } + + /* rkbuf_ts_sent might be initialized with a RTT delay, else 0. */ + resp->rkbuf_ts_sent += rd_clock(); + + resp->rkbuf_reshdr.Size = + (int32_t)(rd_buf_write_pos(&resp->rkbuf_buf) - 4); + + rd_kafka_buf_update_i32(resp, 0, resp->rkbuf_reshdr.Size); + + rd_kafka_dbg(mconn->broker->cluster->rk, MOCK, "MOCK", + "Broker %" PRId32 ": Sending %sResponseV%hd to %s", + mconn->broker->id, + rd_kafka_ApiKey2str(resp->rkbuf_reqhdr.ApiKey), + resp->rkbuf_reqhdr.ApiVersion, + rd_sockaddr2str(&mconn->peer, RD_SOCKADDR2STR_F_PORT)); + + /* Set up a buffer reader for sending the buffer. */ + rd_slice_init_full(&resp->rkbuf_reader, &resp->rkbuf_buf); + + rd_kafka_bufq_enq(&mconn->outbufs, resp); + + rd_kafka_mock_cluster_io_set_events( + mconn->broker->cluster, mconn->transport->rktrans_s, POLLOUT); +} + + +/** + * @returns 1 if a complete request is available in which case \p slicep + * is set to a new slice containing the data, + * 0 if a complete request is not yet available, + * -1 on error. + */ +static int +rd_kafka_mock_connection_read_request(rd_kafka_mock_connection_t *mconn, + rd_kafka_buf_t **rkbufp) { + rd_kafka_mock_cluster_t *mcluster = mconn->broker->cluster; + rd_kafka_t *rk = mcluster->rk; + const rd_bool_t log_decode_errors = rd_true; + rd_kafka_buf_t *rkbuf; + char errstr[128]; + ssize_t r; + + if (!(rkbuf = mconn->rxbuf)) { + /* Initial read for a protocol request. + * Allocate enough room for the protocol header + * (where the total size is located). */ + rkbuf = mconn->rxbuf = + rd_kafka_buf_new(2, RD_KAFKAP_REQHDR_SIZE); + + /* Protocol parsing code needs the rkb for logging */ + rkbuf->rkbuf_rkb = mconn->broker->cluster->dummy_rkb; + rd_kafka_broker_keep(rkbuf->rkbuf_rkb); + + /* Make room for request header */ + rd_buf_write_ensure(&rkbuf->rkbuf_buf, RD_KAFKAP_REQHDR_SIZE, + RD_KAFKAP_REQHDR_SIZE); + } + + /* Read as much data as possible from the socket into the + * connection receive buffer. */ + r = rd_kafka_transport_recv(mconn->transport, &rkbuf->rkbuf_buf, errstr, + sizeof(errstr)); + if (r == -1) { + rd_kafka_dbg( + rk, MOCK, "MOCK", + "Broker %" PRId32 + ": Connection %s: " + "receive failed: %s", + mconn->broker->id, + rd_sockaddr2str(&mconn->peer, RD_SOCKADDR2STR_F_PORT), + errstr); + return -1; + } else if (r == 0) { + return 0; /* Need more data */ + } + + if (rd_buf_write_pos(&rkbuf->rkbuf_buf) == RD_KAFKAP_REQHDR_SIZE) { + /* Received the full header, now check full request + * size and allocate the buffer accordingly. */ + + /* Initialize reader */ + rd_slice_init(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf, 0, + RD_KAFKAP_REQHDR_SIZE); + + rd_kafka_buf_read_i32(rkbuf, &rkbuf->rkbuf_reqhdr.Size); + rd_kafka_buf_read_i16(rkbuf, &rkbuf->rkbuf_reqhdr.ApiKey); + rd_kafka_buf_read_i16(rkbuf, &rkbuf->rkbuf_reqhdr.ApiVersion); + + if (rkbuf->rkbuf_reqhdr.ApiKey < 0 || + rkbuf->rkbuf_reqhdr.ApiKey >= RD_KAFKAP__NUM) { + rd_kafka_buf_parse_fail( + rkbuf, "Invalid ApiKey %hd from %s", + rkbuf->rkbuf_reqhdr.ApiKey, + rd_sockaddr2str(&mconn->peer, + RD_SOCKADDR2STR_F_PORT)); + RD_NOTREACHED(); + } + + /* Check if request version has flexible fields (KIP-482) */ + if (mcluster->api_handlers[rkbuf->rkbuf_reqhdr.ApiKey] + .FlexVersion != -1 && + rkbuf->rkbuf_reqhdr.ApiVersion >= + mcluster->api_handlers[rkbuf->rkbuf_reqhdr.ApiKey] + .FlexVersion) + rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_FLEXVER; + + + rd_kafka_buf_read_i32(rkbuf, &rkbuf->rkbuf_reqhdr.CorrId); + + rkbuf->rkbuf_totlen = rkbuf->rkbuf_reqhdr.Size + 4; + + if (rkbuf->rkbuf_totlen < RD_KAFKAP_REQHDR_SIZE + 2 || + rkbuf->rkbuf_totlen > + (size_t)rk->rk_conf.recv_max_msg_size) { + rd_kafka_buf_parse_fail( + rkbuf, "Invalid request size %" PRId32 " from %s", + rkbuf->rkbuf_reqhdr.Size, + rd_sockaddr2str(&mconn->peer, + RD_SOCKADDR2STR_F_PORT)); + RD_NOTREACHED(); + } + + /* Now adjust totlen to skip the header */ + rkbuf->rkbuf_totlen -= RD_KAFKAP_REQHDR_SIZE; + + if (!rkbuf->rkbuf_totlen) { + /* Empty request (valid) */ + *rkbufp = rkbuf; + mconn->rxbuf = NULL; + return 1; + } + + /* Allocate space for the request payload */ + rd_buf_write_ensure(&rkbuf->rkbuf_buf, rkbuf->rkbuf_totlen, + rkbuf->rkbuf_totlen); + + } else if (rd_buf_write_pos(&rkbuf->rkbuf_buf) - + RD_KAFKAP_REQHDR_SIZE == + rkbuf->rkbuf_totlen) { + /* The full request is now read into the buffer. */ + + /* Set up response reader slice starting past the + * request header */ + rd_slice_init(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf, + RD_KAFKAP_REQHDR_SIZE, + rd_buf_len(&rkbuf->rkbuf_buf) - + RD_KAFKAP_REQHDR_SIZE); + + /* For convenience, shave off the ClientId */ + rd_kafka_buf_skip_str(rkbuf); + + /* And the flexible versions header tags, if any */ + rd_kafka_buf_skip_tags(rkbuf); + + /* Return the buffer to the caller */ + *rkbufp = rkbuf; + mconn->rxbuf = NULL; + return 1; + } + + return 0; + + +err_parse: + return -1; +} + +rd_kafka_buf_t *rd_kafka_mock_buf_new_response(const rd_kafka_buf_t *request) { + rd_kafka_buf_t *rkbuf = rd_kafka_buf_new(1, 100); + + /* Copy request header so the ApiVersion remains known */ + rkbuf->rkbuf_reqhdr = request->rkbuf_reqhdr; + + /* Size, updated later */ + rd_kafka_buf_write_i32(rkbuf, 0); + + /* CorrId */ + rd_kafka_buf_write_i32(rkbuf, request->rkbuf_reqhdr.CorrId); + + if (request->rkbuf_flags & RD_KAFKA_OP_F_FLEXVER) { + rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_FLEXVER; + /* Write empty response header tags, unless this is the + * ApiVersionResponse which needs to be backwards compatible. */ + if (request->rkbuf_reqhdr.ApiKey != RD_KAFKAP_ApiVersion) + rd_kafka_buf_write_i8(rkbuf, 0); + } + + return rkbuf; +} + + + +/** + * @brief Parse protocol request. + * + * @returns 0 on success, -1 on parse error. + */ +static int +rd_kafka_mock_connection_parse_request(rd_kafka_mock_connection_t *mconn, + rd_kafka_buf_t *rkbuf) { + rd_kafka_mock_cluster_t *mcluster = mconn->broker->cluster; + rd_kafka_t *rk = mcluster->rk; + + if (rkbuf->rkbuf_reqhdr.ApiKey < 0 || + rkbuf->rkbuf_reqhdr.ApiKey >= RD_KAFKAP__NUM || + !mcluster->api_handlers[rkbuf->rkbuf_reqhdr.ApiKey].cb) { + rd_kafka_log( + rk, LOG_ERR, "MOCK", + "Broker %" PRId32 + ": unsupported %sRequestV%hd " + "from %s", + mconn->broker->id, + rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr.ApiKey), + rkbuf->rkbuf_reqhdr.ApiVersion, + rd_sockaddr2str(&mconn->peer, RD_SOCKADDR2STR_F_PORT)); + return -1; + } + + /* ApiVersionRequest handles future versions, for everything else + * make sure the ApiVersion is supported. */ + if (rkbuf->rkbuf_reqhdr.ApiKey != RD_KAFKAP_ApiVersion && + !rd_kafka_mock_cluster_ApiVersion_check( + mcluster, rkbuf->rkbuf_reqhdr.ApiKey, + rkbuf->rkbuf_reqhdr.ApiVersion)) { + rd_kafka_log( + rk, LOG_ERR, "MOCK", + "Broker %" PRId32 + ": unsupported %sRequest " + "version %hd from %s", + mconn->broker->id, + rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr.ApiKey), + rkbuf->rkbuf_reqhdr.ApiVersion, + rd_sockaddr2str(&mconn->peer, RD_SOCKADDR2STR_F_PORT)); + return -1; + } + + rd_kafka_dbg(rk, MOCK, "MOCK", + "Broker %" PRId32 ": Received %sRequestV%hd from %s", + mconn->broker->id, + rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr.ApiKey), + rkbuf->rkbuf_reqhdr.ApiVersion, + rd_sockaddr2str(&mconn->peer, RD_SOCKADDR2STR_F_PORT)); + + return mcluster->api_handlers[rkbuf->rkbuf_reqhdr.ApiKey].cb(mconn, + rkbuf); +} + + +/** + * @brief Timer callback to set the POLLOUT flag for a connection after + * the delay has expired. + */ +static void rd_kafka_mock_connection_write_out_tmr_cb(rd_kafka_timers_t *rkts, + void *arg) { + rd_kafka_mock_connection_t *mconn = arg; + + rd_kafka_mock_cluster_io_set_events( + mconn->broker->cluster, mconn->transport->rktrans_s, POLLOUT); +} + + +/** + * @brief Send as many bytes as possible from the output buffer. + * + * @returns 1 if all buffers were sent, 0 if more buffers need to be sent, or + * -1 on error. + */ +static ssize_t +rd_kafka_mock_connection_write_out(rd_kafka_mock_connection_t *mconn) { + rd_kafka_buf_t *rkbuf; + rd_ts_t now = rd_clock(); + rd_ts_t rtt = mconn->broker->rtt; + + while ((rkbuf = TAILQ_FIRST(&mconn->outbufs.rkbq_bufs))) { + ssize_t r; + char errstr[128]; + rd_ts_t ts_delay = 0; + + /* Connection delay/rtt is set. */ + if (rkbuf->rkbuf_ts_sent + rtt > now) + ts_delay = rkbuf->rkbuf_ts_sent + rtt; + + /* Response is being delayed */ + if (rkbuf->rkbuf_ts_retry && rkbuf->rkbuf_ts_retry > now) + ts_delay = rkbuf->rkbuf_ts_retry + rtt; + + if (ts_delay) { + /* Delay response */ + rd_kafka_timer_start_oneshot( + &mconn->broker->cluster->timers, &mconn->write_tmr, + rd_false, ts_delay - now, + rd_kafka_mock_connection_write_out_tmr_cb, mconn); + break; + } + + if ((r = rd_kafka_transport_send(mconn->transport, + &rkbuf->rkbuf_reader, errstr, + sizeof(errstr))) == -1) + return -1; + + if (rd_slice_remains(&rkbuf->rkbuf_reader) > 0) + return 0; /* Partial send, continue next time */ + + /* Entire buffer sent, unlink and free */ + rd_kafka_bufq_deq(&mconn->outbufs, rkbuf); + + rd_kafka_buf_destroy(rkbuf); + } + + rd_kafka_mock_cluster_io_clear_events( + mconn->broker->cluster, mconn->transport->rktrans_s, POLLOUT); + + return 1; +} + + +/** + * @brief Call connection_write_out() for all the broker's connections. + * + * Use to check if any responses should be sent when RTT has changed. + */ +static void +rd_kafka_mock_broker_connections_write_out(rd_kafka_mock_broker_t *mrkb) { + rd_kafka_mock_connection_t *mconn, *tmp; + + /* Need a safe loop since connections may be removed on send error */ + TAILQ_FOREACH_SAFE(mconn, &mrkb->connections, link, tmp) { + rd_kafka_mock_connection_write_out(mconn); + } +} + + +/** + * @brief Per-Connection IO handler + */ +static void rd_kafka_mock_connection_io(rd_kafka_mock_cluster_t *mcluster, + rd_socket_t fd, + int events, + void *opaque) { + rd_kafka_mock_connection_t *mconn = opaque; + + if (events & POLLIN) { + rd_kafka_buf_t *rkbuf; + int r; + + while (1) { + /* Read full request */ + r = rd_kafka_mock_connection_read_request(mconn, + &rkbuf); + if (r == 0) + break; /* Need more data */ + else if (r == -1) { + rd_kafka_mock_connection_close(mconn, + "Read error"); + return; + } + + /* Parse and handle request */ + r = rd_kafka_mock_connection_parse_request(mconn, + rkbuf); + rd_kafka_buf_destroy(rkbuf); + if (r == -1) { + rd_kafka_mock_connection_close(mconn, + "Parse error"); + return; + } + } + } + + if (events & (POLLERR | POLLHUP)) { + rd_kafka_mock_connection_close(mconn, "Disconnected"); + return; + } + + if (events & POLLOUT) { + if (rd_kafka_mock_connection_write_out(mconn) == -1) { + rd_kafka_mock_connection_close(mconn, "Write error"); + return; + } + } +} + + +/** + * @brief Set connection as blocking, POLLIN will not be served. + */ +void rd_kafka_mock_connection_set_blocking(rd_kafka_mock_connection_t *mconn, + rd_bool_t blocking) { + rd_kafka_mock_cluster_io_set_event(mconn->broker->cluster, + mconn->transport->rktrans_s, + !blocking, POLLIN); +} + + +static rd_kafka_mock_connection_t * +rd_kafka_mock_connection_new(rd_kafka_mock_broker_t *mrkb, + rd_socket_t fd, + const struct sockaddr_in *peer) { + rd_kafka_mock_connection_t *mconn; + rd_kafka_transport_t *rktrans; + char errstr[128]; + + if (!mrkb->up) { + rd_socket_close(fd); + return NULL; + } + + rktrans = rd_kafka_transport_new(mrkb->cluster->dummy_rkb, fd, errstr, + sizeof(errstr)); + if (!rktrans) { + rd_kafka_log(mrkb->cluster->rk, LOG_ERR, "MOCK", + "Failed to create transport for new " + "mock connection: %s", + errstr); + rd_socket_close(fd); + return NULL; + } + + rd_kafka_transport_post_connect_setup(rktrans); + + mconn = rd_calloc(1, sizeof(*mconn)); + mconn->broker = mrkb; + mconn->transport = rktrans; + mconn->peer = *peer; + rd_kafka_bufq_init(&mconn->outbufs); + + TAILQ_INSERT_TAIL(&mrkb->connections, mconn, link); + + rd_kafka_mock_cluster_io_add(mrkb->cluster, mconn->transport->rktrans_s, + POLLIN, rd_kafka_mock_connection_io, + mconn); + + rd_kafka_dbg(mrkb->cluster->rk, MOCK, "MOCK", + "Broker %" PRId32 ": New connection from %s", mrkb->id, + rd_sockaddr2str(&mconn->peer, RD_SOCKADDR2STR_F_PORT)); + + return mconn; +} + + + +static void rd_kafka_mock_cluster_op_io(rd_kafka_mock_cluster_t *mcluster, + rd_socket_t fd, + int events, + void *opaque) { + /* Read wake-up fd data and throw away, just used for wake-ups*/ + char buf[1024]; + while (rd_socket_read(fd, buf, sizeof(buf)) > 0) + ; /* Read all buffered signalling bytes */ +} + + +static int rd_kafka_mock_cluster_io_poll(rd_kafka_mock_cluster_t *mcluster, + int timeout_ms) { + int r; + int i; + + r = rd_socket_poll(mcluster->fds, mcluster->fd_cnt, timeout_ms); + if (r == RD_SOCKET_ERROR) { + rd_kafka_log(mcluster->rk, LOG_CRIT, "MOCK", + "Mock cluster failed to poll %d fds: %d: %s", + mcluster->fd_cnt, r, + rd_socket_strerror(rd_socket_errno)); + return -1; + } + + /* Serve ops, if any */ + rd_kafka_q_serve(mcluster->ops, RD_POLL_NOWAIT, 0, + RD_KAFKA_Q_CB_CALLBACK, NULL, NULL); + + /* Handle IO events, if any, and if not terminating */ + for (i = 0; mcluster->run && r > 0 && i < mcluster->fd_cnt; i++) { + if (!mcluster->fds[i].revents) + continue; + + /* Call IO handler */ + mcluster->handlers[i].cb(mcluster, mcluster->fds[i].fd, + mcluster->fds[i].revents, + mcluster->handlers[i].opaque); + r--; + } + + return 0; +} + + +static int rd_kafka_mock_cluster_thread_main(void *arg) { + rd_kafka_mock_cluster_t *mcluster = arg; + + rd_kafka_set_thread_name("mock"); + rd_kafka_set_thread_sysname("rdk:mock"); + rd_kafka_interceptors_on_thread_start(mcluster->rk, + RD_KAFKA_THREAD_BACKGROUND); + rd_atomic32_add(&rd_kafka_thread_cnt_curr, 1); + + /* Op wakeup fd */ + rd_kafka_mock_cluster_io_add(mcluster, mcluster->wakeup_fds[0], POLLIN, + rd_kafka_mock_cluster_op_io, NULL); + + mcluster->run = rd_true; + + while (mcluster->run) { + int sleeptime = (int)((rd_kafka_timers_next(&mcluster->timers, + 1000 * 1000 /*1s*/, + 1 /*lock*/) + + 999) / + 1000); + + if (rd_kafka_mock_cluster_io_poll(mcluster, sleeptime) == -1) + break; + + rd_kafka_timers_run(&mcluster->timers, RD_POLL_NOWAIT); + } + + rd_kafka_mock_cluster_io_del(mcluster, mcluster->wakeup_fds[0]); + + + rd_kafka_interceptors_on_thread_exit(mcluster->rk, + RD_KAFKA_THREAD_BACKGROUND); + rd_atomic32_sub(&rd_kafka_thread_cnt_curr, 1); + + rd_kafka_mock_cluster_destroy0(mcluster); + + return 0; +} + + + +static void rd_kafka_mock_broker_listen_io(rd_kafka_mock_cluster_t *mcluster, + rd_socket_t fd, + int events, + void *opaque) { + rd_kafka_mock_broker_t *mrkb = opaque; + + if (events & (POLLERR | POLLHUP)) + rd_assert(!*"Mock broker listen socket error"); + + if (events & POLLIN) { + rd_socket_t new_s; + struct sockaddr_in peer; + socklen_t peer_size = sizeof(peer); + + new_s = accept(mrkb->listen_s, (struct sockaddr *)&peer, + &peer_size); + if (new_s == RD_SOCKET_ERROR) { + rd_kafka_log(mcluster->rk, LOG_ERR, "MOCK", + "Failed to accept mock broker socket: %s", + rd_socket_strerror(rd_socket_errno)); + return; + } + + rd_kafka_mock_connection_new(mrkb, new_s, &peer); + } +} + + +/** + * @brief Close all connections to broker. + */ +static void rd_kafka_mock_broker_close_all(rd_kafka_mock_broker_t *mrkb, + const char *reason) { + rd_kafka_mock_connection_t *mconn; + + while ((mconn = TAILQ_FIRST(&mrkb->connections))) + rd_kafka_mock_connection_close(mconn, reason); +} + +/** + * @brief Destroy error stack, must be unlinked. + */ +static void +rd_kafka_mock_error_stack_destroy(rd_kafka_mock_error_stack_t *errstack) { + if (errstack->errs) + rd_free(errstack->errs); + rd_free(errstack); +} + + +static void rd_kafka_mock_broker_destroy(rd_kafka_mock_broker_t *mrkb) { + rd_kafka_mock_error_stack_t *errstack; + + rd_kafka_mock_broker_close_all(mrkb, "Destroying broker"); + + if (mrkb->listen_s != -1) { + if (mrkb->up) + rd_kafka_mock_cluster_io_del(mrkb->cluster, + mrkb->listen_s); + rd_socket_close(mrkb->listen_s); + } + + while ((errstack = TAILQ_FIRST(&mrkb->errstacks))) { + TAILQ_REMOVE(&mrkb->errstacks, errstack, link); + rd_kafka_mock_error_stack_destroy(errstack); + } + + TAILQ_REMOVE(&mrkb->cluster->brokers, mrkb, link); + mrkb->cluster->broker_cnt--; + + rd_free(mrkb); +} + + +/** + * @brief Starts listening on the mock broker socket. + * + * @returns 0 on success or -1 on error (logged). + */ +static int rd_kafka_mock_broker_start_listener(rd_kafka_mock_broker_t *mrkb) { + rd_assert(mrkb->listen_s != -1); + + if (listen(mrkb->listen_s, 5) == RD_SOCKET_ERROR) { + rd_kafka_log(mrkb->cluster->rk, LOG_CRIT, "MOCK", + "Failed to listen on mock broker socket: %s", + rd_socket_strerror(rd_socket_errno)); + return -1; + } + + rd_kafka_mock_cluster_io_add(mrkb->cluster, mrkb->listen_s, POLLIN, + rd_kafka_mock_broker_listen_io, mrkb); + + return 0; +} + + +/** + * @brief Creates a new listener socket for \p mrkb but does NOT starts + * listening. + * + * @param sin is the address and port to bind. If the port is zero a random + * port will be assigned (by the kernel) and the address and port + * will be returned in this pointer. + * + * @returns listener socket on success or -1 on error (errors are logged). + */ +static int rd_kafka_mock_broker_new_listener(rd_kafka_mock_cluster_t *mcluster, + struct sockaddr_in *sinp) { + struct sockaddr_in sin = *sinp; + socklen_t sin_len = sizeof(sin); + int listen_s; + int on = 1; + + if (!sin.sin_family) + sin.sin_family = AF_INET; + + /* + * Create and bind socket to any loopback port + */ + listen_s = + rd_kafka_socket_cb_linux(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL); + if (listen_s == RD_SOCKET_ERROR) { + rd_kafka_log(mcluster->rk, LOG_CRIT, "MOCK", + "Unable to create mock broker listen socket: %s", + rd_socket_strerror(rd_socket_errno)); + return -1; + } + + if (setsockopt(listen_s, SOL_SOCKET, SO_REUSEADDR, (void *)&on, + sizeof(on)) == -1) { + rd_kafka_log(mcluster->rk, LOG_CRIT, "MOCK", + "Failed to set SO_REUSEADDR on mock broker " + "listen socket: %s", + rd_socket_strerror(rd_socket_errno)); + rd_socket_close(listen_s); + return -1; + } + + if (bind(listen_s, (struct sockaddr *)&sin, sizeof(sin)) == + RD_SOCKET_ERROR) { + rd_kafka_log(mcluster->rk, LOG_CRIT, "MOCK", + "Failed to bind mock broker socket to %s: %s", + rd_socket_strerror(rd_socket_errno), + rd_sockaddr2str(&sin, RD_SOCKADDR2STR_F_PORT)); + rd_socket_close(listen_s); + return -1; + } + + if (getsockname(listen_s, (struct sockaddr *)&sin, &sin_len) == + RD_SOCKET_ERROR) { + rd_kafka_log(mcluster->rk, LOG_CRIT, "MOCK", + "Failed to get mock broker socket name: %s", + rd_socket_strerror(rd_socket_errno)); + rd_socket_close(listen_s); + return -1; + } + rd_assert(sin.sin_family == AF_INET); + /* If a filled in sinp was passed make sure nothing changed. */ + rd_assert(!sinp->sin_port || !memcmp(sinp, &sin, sizeof(sin))); + + *sinp = sin; + + return listen_s; +} + + +static rd_kafka_mock_broker_t * +rd_kafka_mock_broker_new(rd_kafka_mock_cluster_t *mcluster, int32_t broker_id) { + rd_kafka_mock_broker_t *mrkb; + rd_socket_t listen_s; + struct sockaddr_in sin = { + .sin_family = AF_INET, + .sin_addr = {.s_addr = htonl(INADDR_LOOPBACK)}}; + + listen_s = rd_kafka_mock_broker_new_listener(mcluster, &sin); + if (listen_s == -1) + return NULL; + + /* + * Create mock broker object + */ + mrkb = rd_calloc(1, sizeof(*mrkb)); + + mrkb->id = broker_id; + mrkb->cluster = mcluster; + mrkb->up = rd_true; + mrkb->listen_s = listen_s; + mrkb->sin = sin; + mrkb->port = ntohs(sin.sin_port); + rd_snprintf(mrkb->advertised_listener, + sizeof(mrkb->advertised_listener), "%s", + rd_sockaddr2str(&sin, 0)); + + TAILQ_INIT(&mrkb->connections); + TAILQ_INIT(&mrkb->errstacks); + + TAILQ_INSERT_TAIL(&mcluster->brokers, mrkb, link); + mcluster->broker_cnt++; + + if (rd_kafka_mock_broker_start_listener(mrkb) == -1) { + rd_kafka_mock_broker_destroy(mrkb); + return NULL; + } + + return mrkb; +} + + +/** + * @returns the coordtype_t for a coord type string, or -1 on error. + */ +static rd_kafka_coordtype_t rd_kafka_mock_coord_str2type(const char *str) { + if (!strcmp(str, "transaction")) + return RD_KAFKA_COORD_TXN; + else if (!strcmp(str, "group")) + return RD_KAFKA_COORD_GROUP; + else + return (rd_kafka_coordtype_t)-1; +} + + +/** + * @brief Unlink and destroy coordinator. + */ +static void rd_kafka_mock_coord_destroy(rd_kafka_mock_cluster_t *mcluster, + rd_kafka_mock_coord_t *mcoord) { + TAILQ_REMOVE(&mcluster->coords, mcoord, link); + rd_free(mcoord->key); + rd_free(mcoord); +} + +/** + * @brief Find coordinator by type and key. + */ +static rd_kafka_mock_coord_t * +rd_kafka_mock_coord_find(rd_kafka_mock_cluster_t *mcluster, + rd_kafka_coordtype_t type, + const char *key) { + rd_kafka_mock_coord_t *mcoord; + + TAILQ_FOREACH(mcoord, &mcluster->coords, link) { + if (mcoord->type == type && !strcmp(mcoord->key, key)) + return mcoord; + } + + return NULL; +} + + +/** + * @returns the coordinator for KeyType,Key (e.g., GROUP,mygroup). + */ +rd_kafka_mock_broker_t * +rd_kafka_mock_cluster_get_coord(rd_kafka_mock_cluster_t *mcluster, + rd_kafka_coordtype_t KeyType, + const rd_kafkap_str_t *Key) { + rd_kafka_mock_broker_t *mrkb; + rd_kafka_mock_coord_t *mcoord; + char *key; + rd_crc32_t hash; + int idx; + + /* Try the explicit coord list first */ + RD_KAFKAP_STR_DUPA(&key, Key); + if ((mcoord = rd_kafka_mock_coord_find(mcluster, KeyType, key))) + return rd_kafka_mock_broker_find(mcluster, mcoord->broker_id); + + /* Else hash the key to select an available broker. */ + hash = rd_crc32(Key->str, RD_KAFKAP_STR_LEN(Key)); + idx = (int)(hash % mcluster->broker_cnt); + + /* Use the broker index in the list */ + TAILQ_FOREACH(mrkb, &mcluster->brokers, link) + if (idx-- == 0) + return mrkb; + + RD_NOTREACHED(); + return NULL; +} + + +/** + * @brief Explicitly set coordinator for \p key_type ("transaction", "group") + * and \p key. + */ +static rd_kafka_mock_coord_t * +rd_kafka_mock_coord_set(rd_kafka_mock_cluster_t *mcluster, + const char *key_type, + const char *key, + int32_t broker_id) { + rd_kafka_mock_coord_t *mcoord; + rd_kafka_coordtype_t type; + + if ((int)(type = rd_kafka_mock_coord_str2type(key_type)) == -1) + return NULL; + + if ((mcoord = rd_kafka_mock_coord_find(mcluster, type, key))) + rd_kafka_mock_coord_destroy(mcluster, mcoord); + + mcoord = rd_calloc(1, sizeof(*mcoord)); + mcoord->type = type; + mcoord->key = rd_strdup(key); + mcoord->broker_id = broker_id; + + TAILQ_INSERT_TAIL(&mcluster->coords, mcoord, link); + + return mcoord; +} + + +/** + * @brief Remove and return the next error, or RD_KAFKA_RESP_ERR_NO_ERROR + * if no error. + */ +static rd_kafka_mock_error_rtt_t +rd_kafka_mock_error_stack_next(rd_kafka_mock_error_stack_t *errstack) { + rd_kafka_mock_error_rtt_t err_rtt = {RD_KAFKA_RESP_ERR_NO_ERROR, 0}; + + if (likely(errstack->cnt == 0)) + return err_rtt; + + err_rtt = errstack->errs[0]; + errstack->cnt--; + if (errstack->cnt > 0) + memmove(errstack->errs, &errstack->errs[1], + sizeof(*errstack->errs) * errstack->cnt); + + return err_rtt; +} + + +/** + * @brief Find an error stack based on \p ApiKey + */ +static rd_kafka_mock_error_stack_t * +rd_kafka_mock_error_stack_find(const rd_kafka_mock_error_stack_head_t *shead, + int16_t ApiKey) { + const rd_kafka_mock_error_stack_t *errstack; + + TAILQ_FOREACH(errstack, shead, link) + if (errstack->ApiKey == ApiKey) + return (rd_kafka_mock_error_stack_t *)errstack; + + return NULL; +} + + + +/** + * @brief Find or create an error stack based on \p ApiKey + */ +static rd_kafka_mock_error_stack_t * +rd_kafka_mock_error_stack_get(rd_kafka_mock_error_stack_head_t *shead, + int16_t ApiKey) { + rd_kafka_mock_error_stack_t *errstack; + + if ((errstack = rd_kafka_mock_error_stack_find(shead, ApiKey))) + return errstack; + + errstack = rd_calloc(1, sizeof(*errstack)); + + errstack->ApiKey = ApiKey; + TAILQ_INSERT_TAIL(shead, errstack, link); + + return errstack; +} + + + +/** + * @brief Removes and returns the next request error for response's ApiKey. + * + * If the error stack has a corresponding rtt/delay it is set on the + * provided response \p resp buffer. + */ +rd_kafka_resp_err_t +rd_kafka_mock_next_request_error(rd_kafka_mock_connection_t *mconn, + rd_kafka_buf_t *resp) { + rd_kafka_mock_cluster_t *mcluster = mconn->broker->cluster; + rd_kafka_mock_error_stack_t *errstack; + rd_kafka_mock_error_rtt_t err_rtt; + + mtx_lock(&mcluster->lock); + + errstack = rd_kafka_mock_error_stack_find(&mconn->broker->errstacks, + resp->rkbuf_reqhdr.ApiKey); + if (likely(!errstack)) { + errstack = rd_kafka_mock_error_stack_find( + &mcluster->errstacks, resp->rkbuf_reqhdr.ApiKey); + if (likely(!errstack)) { + mtx_unlock(&mcluster->lock); + return RD_KAFKA_RESP_ERR_NO_ERROR; + } + } + + err_rtt = rd_kafka_mock_error_stack_next(errstack); + resp->rkbuf_ts_sent = err_rtt.rtt; + + mtx_unlock(&mcluster->lock); + + /* If the error is ERR__TRANSPORT (a librdkafka-specific error code + * that will never be returned by a broker), we close the connection. + * This allows closing the connection as soon as a certain + * request is seen. + * The handler code in rdkafka_mock_handlers.c does not need to + * handle this case specifically and will generate a response and + * enqueue it, but the connection will be down by the time it will + * be sent. + * Note: Delayed disconnects (rtt-based) are not supported. */ + if (err_rtt.err == RD_KAFKA_RESP_ERR__TRANSPORT) { + rd_kafka_dbg( + mcluster->rk, MOCK, "MOCK", + "Broker %" PRId32 + ": Forcing close of connection " + "from %s", + mconn->broker->id, + rd_sockaddr2str(&mconn->peer, RD_SOCKADDR2STR_F_PORT)); + rd_kafka_transport_shutdown(mconn->transport); + } + + + return err_rtt.err; +} + + +void rd_kafka_mock_clear_request_errors(rd_kafka_mock_cluster_t *mcluster, + int16_t ApiKey) { + rd_kafka_mock_error_stack_t *errstack; + + mtx_lock(&mcluster->lock); + + errstack = rd_kafka_mock_error_stack_find(&mcluster->errstacks, ApiKey); + if (errstack) + errstack->cnt = 0; + + mtx_unlock(&mcluster->lock); +} + + +void rd_kafka_mock_push_request_errors_array( + rd_kafka_mock_cluster_t *mcluster, + int16_t ApiKey, + size_t cnt, + const rd_kafka_resp_err_t *errors) { + rd_kafka_mock_error_stack_t *errstack; + size_t totcnt; + size_t i; + + mtx_lock(&mcluster->lock); + + errstack = rd_kafka_mock_error_stack_get(&mcluster->errstacks, ApiKey); + + totcnt = errstack->cnt + cnt; + + if (totcnt > errstack->size) { + errstack->size = totcnt + 4; + errstack->errs = rd_realloc( + errstack->errs, errstack->size * sizeof(*errstack->errs)); + } + + for (i = 0; i < cnt; i++) { + errstack->errs[errstack->cnt].err = errors[i]; + errstack->errs[errstack->cnt++].rtt = 0; + } + + mtx_unlock(&mcluster->lock); +} + +void rd_kafka_mock_push_request_errors(rd_kafka_mock_cluster_t *mcluster, + int16_t ApiKey, + size_t cnt, + ...) { + va_list ap; + rd_kafka_resp_err_t *errors = rd_alloca(sizeof(*errors) * cnt); + size_t i; + + va_start(ap, cnt); + for (i = 0; i < cnt; i++) + errors[i] = va_arg(ap, rd_kafka_resp_err_t); + va_end(ap); + + rd_kafka_mock_push_request_errors_array(mcluster, ApiKey, cnt, errors); +} + + +rd_kafka_resp_err_t +rd_kafka_mock_broker_push_request_error_rtts(rd_kafka_mock_cluster_t *mcluster, + int32_t broker_id, + int16_t ApiKey, + size_t cnt, + ...) { + rd_kafka_mock_broker_t *mrkb; + va_list ap; + rd_kafka_mock_error_stack_t *errstack; + size_t totcnt; + + mtx_lock(&mcluster->lock); + + if (!(mrkb = rd_kafka_mock_broker_find(mcluster, broker_id))) { + mtx_unlock(&mcluster->lock); + return RD_KAFKA_RESP_ERR__UNKNOWN_BROKER; + } + + errstack = rd_kafka_mock_error_stack_get(&mrkb->errstacks, ApiKey); + + totcnt = errstack->cnt + cnt; + + if (totcnt > errstack->size) { + errstack->size = totcnt + 4; + errstack->errs = rd_realloc( + errstack->errs, errstack->size * sizeof(*errstack->errs)); + } + + va_start(ap, cnt); + while (cnt-- > 0) { + errstack->errs[errstack->cnt].err = + va_arg(ap, rd_kafka_resp_err_t); + errstack->errs[errstack->cnt++].rtt = + ((rd_ts_t)va_arg(ap, int)) * 1000; + } + va_end(ap); + + mtx_unlock(&mcluster->lock); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +rd_kafka_resp_err_t +rd_kafka_mock_broker_error_stack_cnt(rd_kafka_mock_cluster_t *mcluster, + int32_t broker_id, + int16_t ApiKey, + size_t *cntp) { + rd_kafka_mock_broker_t *mrkb; + rd_kafka_mock_error_stack_t *errstack; + + if (!mcluster || !cntp) + return RD_KAFKA_RESP_ERR__INVALID_ARG; + + mtx_lock(&mcluster->lock); + + if (!(mrkb = rd_kafka_mock_broker_find(mcluster, broker_id))) { + mtx_unlock(&mcluster->lock); + return RD_KAFKA_RESP_ERR__UNKNOWN_BROKER; + } + + if ((errstack = + rd_kafka_mock_error_stack_find(&mrkb->errstacks, ApiKey))) + *cntp = errstack->cnt; + + mtx_unlock(&mcluster->lock); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +void rd_kafka_mock_topic_set_error(rd_kafka_mock_cluster_t *mcluster, + const char *topic, + rd_kafka_resp_err_t err) { + rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_MOCK); + + rko->rko_u.mock.name = rd_strdup(topic); + rko->rko_u.mock.cmd = RD_KAFKA_MOCK_CMD_TOPIC_SET_ERROR; + rko->rko_u.mock.err = err; + + rko = rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE); + if (rko) + rd_kafka_op_destroy(rko); +} + + +rd_kafka_resp_err_t +rd_kafka_mock_topic_create(rd_kafka_mock_cluster_t *mcluster, + const char *topic, + int partition_cnt, + int replication_factor) { + rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_MOCK); + + rko->rko_u.mock.name = rd_strdup(topic); + rko->rko_u.mock.lo = partition_cnt; + rko->rko_u.mock.hi = replication_factor; + rko->rko_u.mock.cmd = RD_KAFKA_MOCK_CMD_TOPIC_CREATE; + + return rd_kafka_op_err_destroy( + rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE)); +} + +rd_kafka_resp_err_t +rd_kafka_mock_partition_set_leader(rd_kafka_mock_cluster_t *mcluster, + const char *topic, + int32_t partition, + int32_t broker_id) { + rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_MOCK); + + rko->rko_u.mock.name = rd_strdup(topic); + rko->rko_u.mock.cmd = RD_KAFKA_MOCK_CMD_PART_SET_LEADER; + rko->rko_u.mock.partition = partition; + rko->rko_u.mock.broker_id = broker_id; + + return rd_kafka_op_err_destroy( + rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE)); +} + +rd_kafka_resp_err_t +rd_kafka_mock_partition_set_follower(rd_kafka_mock_cluster_t *mcluster, + const char *topic, + int32_t partition, + int32_t broker_id) { + rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_MOCK); + + rko->rko_u.mock.name = rd_strdup(topic); + rko->rko_u.mock.cmd = RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER; + rko->rko_u.mock.partition = partition; + rko->rko_u.mock.broker_id = broker_id; + + return rd_kafka_op_err_destroy( + rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE)); +} + +rd_kafka_resp_err_t +rd_kafka_mock_partition_set_follower_wmarks(rd_kafka_mock_cluster_t *mcluster, + const char *topic, + int32_t partition, + int64_t lo, + int64_t hi) { + rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_MOCK); + + rko->rko_u.mock.name = rd_strdup(topic); + rko->rko_u.mock.cmd = RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER_WMARKS; + rko->rko_u.mock.partition = partition; + rko->rko_u.mock.lo = lo; + rko->rko_u.mock.hi = hi; + + return rd_kafka_op_err_destroy( + rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE)); +} + +rd_kafka_resp_err_t +rd_kafka_mock_broker_set_down(rd_kafka_mock_cluster_t *mcluster, + int32_t broker_id) { + rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_MOCK); + + rko->rko_u.mock.broker_id = broker_id; + rko->rko_u.mock.lo = rd_false; + rko->rko_u.mock.cmd = RD_KAFKA_MOCK_CMD_BROKER_SET_UPDOWN; + + return rd_kafka_op_err_destroy( + rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE)); +} + +rd_kafka_resp_err_t +rd_kafka_mock_broker_set_up(rd_kafka_mock_cluster_t *mcluster, + int32_t broker_id) { + rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_MOCK); + + rko->rko_u.mock.broker_id = broker_id; + rko->rko_u.mock.lo = rd_true; + rko->rko_u.mock.cmd = RD_KAFKA_MOCK_CMD_BROKER_SET_UPDOWN; + + return rd_kafka_op_err_destroy( + rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE)); +} + +rd_kafka_resp_err_t +rd_kafka_mock_broker_set_rtt(rd_kafka_mock_cluster_t *mcluster, + int32_t broker_id, + int rtt_ms) { + rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_MOCK); + + rko->rko_u.mock.broker_id = broker_id; + rko->rko_u.mock.lo = rtt_ms; + rko->rko_u.mock.cmd = RD_KAFKA_MOCK_CMD_BROKER_SET_RTT; + + return rd_kafka_op_err_destroy( + rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE)); +} + +rd_kafka_resp_err_t +rd_kafka_mock_broker_set_rack(rd_kafka_mock_cluster_t *mcluster, + int32_t broker_id, + const char *rack) { + rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_MOCK); + + rko->rko_u.mock.broker_id = broker_id; + rko->rko_u.mock.name = rd_strdup(rack); + rko->rko_u.mock.cmd = RD_KAFKA_MOCK_CMD_BROKER_SET_RACK; + + return rd_kafka_op_err_destroy( + rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE)); +} + +rd_kafka_resp_err_t +rd_kafka_mock_coordinator_set(rd_kafka_mock_cluster_t *mcluster, + const char *key_type, + const char *key, + int32_t broker_id) { + rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_MOCK); + + rko->rko_u.mock.name = rd_strdup(key_type); + rko->rko_u.mock.str = rd_strdup(key); + rko->rko_u.mock.broker_id = broker_id; + rko->rko_u.mock.cmd = RD_KAFKA_MOCK_CMD_COORD_SET; + + return rd_kafka_op_err_destroy( + rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE)); +} + +rd_kafka_resp_err_t +rd_kafka_mock_set_apiversion(rd_kafka_mock_cluster_t *mcluster, + int16_t ApiKey, + int16_t MinVersion, + int16_t MaxVersion) { + rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_MOCK); + + rko->rko_u.mock.partition = ApiKey; + rko->rko_u.mock.lo = MinVersion; + rko->rko_u.mock.hi = MaxVersion; + rko->rko_u.mock.cmd = RD_KAFKA_MOCK_CMD_APIVERSION_SET; + + return rd_kafka_op_err_destroy( + rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE)); +} + + +/** + * @brief Apply command to specific broker. + * + * @locality mcluster thread + */ +static rd_kafka_resp_err_t +rd_kafka_mock_broker_cmd(rd_kafka_mock_cluster_t *mcluster, + rd_kafka_mock_broker_t *mrkb, + rd_kafka_op_t *rko) { + switch (rko->rko_u.mock.cmd) { + case RD_KAFKA_MOCK_CMD_BROKER_SET_UPDOWN: + if ((rd_bool_t)rko->rko_u.mock.lo == mrkb->up) + break; + + mrkb->up = (rd_bool_t)rko->rko_u.mock.lo; + + if (!mrkb->up) { + rd_kafka_mock_cluster_io_del(mcluster, mrkb->listen_s); + rd_socket_close(mrkb->listen_s); + /* Re-create the listener right away so we retain the + * same port. The listener is not started until + * the broker is set up (below). */ + mrkb->listen_s = rd_kafka_mock_broker_new_listener( + mcluster, &mrkb->sin); + rd_assert(mrkb->listen_s != -1 || + !*"Failed to-create mock broker listener"); + + rd_kafka_mock_broker_close_all(mrkb, "Broker down"); + + } else { + int r; + rd_assert(mrkb->listen_s != -1); + r = rd_kafka_mock_broker_start_listener(mrkb); + rd_assert(r == 0 || !*"broker_start_listener() failed"); + } + break; + + case RD_KAFKA_MOCK_CMD_BROKER_SET_RTT: + mrkb->rtt = (rd_ts_t)rko->rko_u.mock.lo * 1000; + + /* Check if there is anything to send now that the RTT + * has changed or if a timer is to be started. */ + rd_kafka_mock_broker_connections_write_out(mrkb); + break; + + case RD_KAFKA_MOCK_CMD_BROKER_SET_RACK: + if (mrkb->rack) + rd_free(mrkb->rack); + + if (rko->rko_u.mock.name) + mrkb->rack = rd_strdup(rko->rko_u.mock.name); + else + mrkb->rack = NULL; + break; + + default: + RD_BUG("Unhandled mock cmd %d", rko->rko_u.mock.cmd); + break; + } + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +/** + * @brief Apply command to to one or all brokers, depending on the value of + * broker_id, where -1 means all, and != -1 means a specific broker. + * + * @locality mcluster thread + */ +static rd_kafka_resp_err_t +rd_kafka_mock_brokers_cmd(rd_kafka_mock_cluster_t *mcluster, + rd_kafka_op_t *rko) { + rd_kafka_mock_broker_t *mrkb; + + if (rko->rko_u.mock.broker_id != -1) { + /* Specific broker */ + mrkb = rd_kafka_mock_broker_find(mcluster, + rko->rko_u.mock.broker_id); + if (!mrkb) + return RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE; + + return rd_kafka_mock_broker_cmd(mcluster, mrkb, rko); + } + + /* All brokers */ + TAILQ_FOREACH(mrkb, &mcluster->brokers, link) { + rd_kafka_resp_err_t err; + + if ((err = rd_kafka_mock_broker_cmd(mcluster, mrkb, rko))) + return err; + } + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +/** + * @brief Handle command op + * + * @locality mcluster thread + */ +static rd_kafka_resp_err_t +rd_kafka_mock_cluster_cmd(rd_kafka_mock_cluster_t *mcluster, + rd_kafka_op_t *rko) { + rd_kafka_mock_topic_t *mtopic; + rd_kafka_mock_partition_t *mpart; + rd_kafka_mock_broker_t *mrkb; + + switch (rko->rko_u.mock.cmd) { + case RD_KAFKA_MOCK_CMD_TOPIC_CREATE: + if (rd_kafka_mock_topic_find(mcluster, rko->rko_u.mock.name)) + return RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS; + + if (!rd_kafka_mock_topic_new(mcluster, rko->rko_u.mock.name, + /* partition_cnt */ + (int)rko->rko_u.mock.lo, + /* replication_factor */ + (int)rko->rko_u.mock.hi)) + return RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION; + break; + + case RD_KAFKA_MOCK_CMD_TOPIC_SET_ERROR: + mtopic = + rd_kafka_mock_topic_get(mcluster, rko->rko_u.mock.name, -1); + mtopic->err = rko->rko_u.mock.err; + break; + + case RD_KAFKA_MOCK_CMD_PART_SET_LEADER: + mpart = rd_kafka_mock_partition_get( + mcluster, rko->rko_u.mock.name, rko->rko_u.mock.partition); + if (!mpart) + return RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART; + + if (rko->rko_u.mock.broker_id != -1) { + mrkb = rd_kafka_mock_broker_find( + mcluster, rko->rko_u.mock.broker_id); + if (!mrkb) + return RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE; + } else { + mrkb = NULL; + } + + rd_kafka_dbg(mcluster->rk, MOCK, "MOCK", + "Set %s [%" PRId32 "] leader to %" PRId32, + rko->rko_u.mock.name, rko->rko_u.mock.partition, + rko->rko_u.mock.broker_id); + + rd_kafka_mock_partition_set_leader0(mpart, mrkb); + break; + + case RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER: + mpart = rd_kafka_mock_partition_get( + mcluster, rko->rko_u.mock.name, rko->rko_u.mock.partition); + if (!mpart) + return RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART; + + rd_kafka_dbg(mcluster->rk, MOCK, "MOCK", + "Set %s [%" PRId32 + "] preferred follower " + "to %" PRId32, + rko->rko_u.mock.name, rko->rko_u.mock.partition, + rko->rko_u.mock.broker_id); + + mpart->follower_id = rko->rko_u.mock.broker_id; + break; + + case RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER_WMARKS: + mpart = rd_kafka_mock_partition_get( + mcluster, rko->rko_u.mock.name, rko->rko_u.mock.partition); + if (!mpart) + return RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART; + + rd_kafka_dbg(mcluster->rk, MOCK, "MOCK", + "Set %s [%" PRId32 + "] follower " + "watermark offsets to %" PRId64 "..%" PRId64, + rko->rko_u.mock.name, rko->rko_u.mock.partition, + rko->rko_u.mock.lo, rko->rko_u.mock.hi); + + if (rko->rko_u.mock.lo == -1) { + mpart->follower_start_offset = mpart->start_offset; + mpart->update_follower_start_offset = rd_true; + } else { + mpart->follower_start_offset = rko->rko_u.mock.lo; + mpart->update_follower_start_offset = rd_false; + } + + if (rko->rko_u.mock.hi == -1) { + mpart->follower_end_offset = mpart->end_offset; + mpart->update_follower_end_offset = rd_true; + } else { + mpart->follower_end_offset = rko->rko_u.mock.hi; + mpart->update_follower_end_offset = rd_false; + } + break; + + /* Broker commands */ + case RD_KAFKA_MOCK_CMD_BROKER_SET_UPDOWN: + case RD_KAFKA_MOCK_CMD_BROKER_SET_RTT: + case RD_KAFKA_MOCK_CMD_BROKER_SET_RACK: + return rd_kafka_mock_brokers_cmd(mcluster, rko); + + case RD_KAFKA_MOCK_CMD_COORD_SET: + if (!rd_kafka_mock_coord_set(mcluster, rko->rko_u.mock.name, + rko->rko_u.mock.str, + rko->rko_u.mock.broker_id)) + return RD_KAFKA_RESP_ERR__INVALID_ARG; + break; + + case RD_KAFKA_MOCK_CMD_APIVERSION_SET: + if (rko->rko_u.mock.partition < 0 || + rko->rko_u.mock.partition >= RD_KAFKAP__NUM) + return RD_KAFKA_RESP_ERR__INVALID_ARG; + + mcluster->api_handlers[(int)rko->rko_u.mock.partition] + .MinVersion = (int16_t)rko->rko_u.mock.lo; + mcluster->api_handlers[(int)rko->rko_u.mock.partition] + .MaxVersion = (int16_t)rko->rko_u.mock.hi; + break; + + default: + rd_assert(!*"unknown mock cmd"); + break; + } + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +static rd_kafka_op_res_t +rd_kafka_mock_cluster_op_serve(rd_kafka_t *rk, + rd_kafka_q_t *rkq, + rd_kafka_op_t *rko, + rd_kafka_q_cb_type_t cb_type, + void *opaque) { + rd_kafka_mock_cluster_t *mcluster = opaque; + rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; + + switch ((int)rko->rko_type) { + case RD_KAFKA_OP_TERMINATE: + mcluster->run = rd_false; + break; + + case RD_KAFKA_OP_MOCK: + err = rd_kafka_mock_cluster_cmd(mcluster, rko); + break; + + default: + rd_assert(!"*unhandled op"); + break; + } + + rd_kafka_op_reply(rko, err); + + return RD_KAFKA_OP_RES_HANDLED; +} + + +/** + * @brief Destroy cluster (internal) + */ +static void rd_kafka_mock_cluster_destroy0(rd_kafka_mock_cluster_t *mcluster) { + rd_kafka_mock_topic_t *mtopic; + rd_kafka_mock_broker_t *mrkb; + rd_kafka_mock_cgrp_t *mcgrp; + rd_kafka_mock_coord_t *mcoord; + rd_kafka_mock_error_stack_t *errstack; + thrd_t dummy_rkb_thread; + int ret; + + while ((mtopic = TAILQ_FIRST(&mcluster->topics))) + rd_kafka_mock_topic_destroy(mtopic); + + while ((mrkb = TAILQ_FIRST(&mcluster->brokers))) + rd_kafka_mock_broker_destroy(mrkb); + + while ((mcgrp = TAILQ_FIRST(&mcluster->cgrps))) + rd_kafka_mock_cgrp_destroy(mcgrp); + + while ((mcoord = TAILQ_FIRST(&mcluster->coords))) + rd_kafka_mock_coord_destroy(mcluster, mcoord); + + rd_list_destroy(&mcluster->pids); + + while ((errstack = TAILQ_FIRST(&mcluster->errstacks))) { + TAILQ_REMOVE(&mcluster->errstacks, errstack, link); + rd_kafka_mock_error_stack_destroy(errstack); + } + + /* + * Destroy dummy broker + */ + rd_kafka_q_enq(mcluster->dummy_rkb->rkb_ops, + rd_kafka_op_new(RD_KAFKA_OP_TERMINATE)); + + dummy_rkb_thread = mcluster->dummy_rkb->rkb_thread; + + rd_kafka_broker_destroy(mcluster->dummy_rkb); + + if (thrd_join(dummy_rkb_thread, &ret) != thrd_success) + rd_assert(!*"failed to join mock dummy broker thread"); + + + rd_kafka_q_destroy_owner(mcluster->ops); + + rd_kafka_timers_destroy(&mcluster->timers); + + if (mcluster->fd_size > 0) { + rd_free(mcluster->fds); + rd_free(mcluster->handlers); + } + + mtx_destroy(&mcluster->lock); + + rd_free(mcluster->bootstraps); + + rd_socket_close(mcluster->wakeup_fds[0]); + rd_socket_close(mcluster->wakeup_fds[1]); +} + + + +void rd_kafka_mock_cluster_destroy(rd_kafka_mock_cluster_t *mcluster) { + int res; + rd_kafka_op_t *rko; + + rd_kafka_dbg(mcluster->rk, MOCK, "MOCK", "Destroying cluster"); + + rd_assert(rd_atomic32_get(&mcluster->rk->rk_mock.cluster_cnt) > 0); + rd_atomic32_sub(&mcluster->rk->rk_mock.cluster_cnt, 1); + + rko = rd_kafka_op_req2(mcluster->ops, RD_KAFKA_OP_TERMINATE); + + if (rko) + rd_kafka_op_destroy(rko); + + if (thrd_join(mcluster->thread, &res) != thrd_success) + rd_assert(!*"failed to join mock thread"); + + rd_free(mcluster); +} + + + +rd_kafka_mock_cluster_t *rd_kafka_mock_cluster_new(rd_kafka_t *rk, + int broker_cnt) { + rd_kafka_mock_cluster_t *mcluster; + rd_kafka_mock_broker_t *mrkb; + int i, r; + size_t bootstraps_len = 0; + size_t of; + + mcluster = rd_calloc(1, sizeof(*mcluster)); + mcluster->rk = rk; + + mcluster->dummy_rkb = + rd_kafka_broker_add(rk, RD_KAFKA_INTERNAL, RD_KAFKA_PROTO_PLAINTEXT, + "mock", 0, RD_KAFKA_NODEID_UA); + rd_snprintf(mcluster->id, sizeof(mcluster->id), "mockCluster%lx", + (intptr_t)mcluster >> 2); + + TAILQ_INIT(&mcluster->brokers); + + for (i = 1; i <= broker_cnt; i++) { + if (!(mrkb = rd_kafka_mock_broker_new(mcluster, i))) { + rd_kafka_mock_cluster_destroy(mcluster); + return NULL; + } + + /* advertised listener + ":port" + "," */ + bootstraps_len += strlen(mrkb->advertised_listener) + 6 + 1; + } + + mtx_init(&mcluster->lock, mtx_plain); + + TAILQ_INIT(&mcluster->topics); + mcluster->defaults.partition_cnt = 4; + mcluster->defaults.replication_factor = RD_MIN(3, broker_cnt); + + TAILQ_INIT(&mcluster->cgrps); + + TAILQ_INIT(&mcluster->coords); + + rd_list_init(&mcluster->pids, 16, rd_free); + + TAILQ_INIT(&mcluster->errstacks); + + memcpy(mcluster->api_handlers, rd_kafka_mock_api_handlers, + sizeof(mcluster->api_handlers)); + + /* Use an op queue for controlling the cluster in + * a thread-safe manner without locking. */ + mcluster->ops = rd_kafka_q_new(rk); + mcluster->ops->rkq_serve = rd_kafka_mock_cluster_op_serve; + mcluster->ops->rkq_opaque = mcluster; + + rd_kafka_timers_init(&mcluster->timers, rk, mcluster->ops); + + if ((r = rd_pipe_nonblocking(mcluster->wakeup_fds)) == -1) { + rd_kafka_log(rk, LOG_ERR, "MOCK", + "Failed to setup mock cluster wake-up fds: %s", + rd_socket_strerror(r)); + } else { + const char onebyte = 1; + rd_kafka_q_io_event_enable(mcluster->ops, + mcluster->wakeup_fds[1], &onebyte, + sizeof(onebyte)); + } + + + if (thrd_create(&mcluster->thread, rd_kafka_mock_cluster_thread_main, + mcluster) != thrd_success) { + rd_kafka_log(rk, LOG_CRIT, "MOCK", + "Failed to create mock cluster thread: %s", + rd_strerror(errno)); + rd_kafka_mock_cluster_destroy(mcluster); + return NULL; + } + + + /* Construct bootstrap.servers list */ + mcluster->bootstraps = rd_malloc(bootstraps_len + 1); + of = 0; + TAILQ_FOREACH(mrkb, &mcluster->brokers, link) { + r = rd_snprintf(&mcluster->bootstraps[of], bootstraps_len - of, + "%s%s:%hu", of > 0 ? "," : "", + mrkb->advertised_listener, mrkb->port); + of += r; + rd_assert(of < bootstraps_len); + } + mcluster->bootstraps[of] = '\0'; + + rd_kafka_dbg(rk, MOCK, "MOCK", "Mock cluster %s bootstrap.servers=%s", + mcluster->id, mcluster->bootstraps); + + rd_atomic32_add(&rk->rk_mock.cluster_cnt, 1); + + return mcluster; +} + + +rd_kafka_t * +rd_kafka_mock_cluster_handle(const rd_kafka_mock_cluster_t *mcluster) { + return (rd_kafka_t *)mcluster->rk; +} + +rd_kafka_mock_cluster_t *rd_kafka_handle_mock_cluster(const rd_kafka_t *rk) { + return (rd_kafka_mock_cluster_t *)rk->rk_mock.cluster; +} + + +const char * +rd_kafka_mock_cluster_bootstraps(const rd_kafka_mock_cluster_t *mcluster) { + return mcluster->bootstraps; +} |