diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/0077-compaction.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/tests/0077-compaction.c | 357 |
1 files changed, 357 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/0077-compaction.c b/fluent-bit/lib/librdkafka-2.1.0/tests/0077-compaction.c new file mode 100644 index 00000000..01667114 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/tests/0077-compaction.c @@ -0,0 +1,357 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2012-2015, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" +#include "rdkafka.h" + +/** + * @brief Verify handling of compacted topics. + * + * General idea: + * - create a compacted topic with a low cleanup interval to promote quick + * compaction. + * - produce messages for 3 keys and interleave with unkeyed messages. + * interleave tombstones for k1 and k2, but not k3. + * - consume before compaction - verify all messages in place + * - wait for compaction + * - consume after compaction - verify expected messages. + */ + + + +/** + * @brief Get low watermark in partition, we use this see if compaction + * has kicked in. + */ +static int64_t +get_low_wmark(rd_kafka_t *rk, const char *topic, int32_t partition) { + rd_kafka_resp_err_t err; + int64_t low, high; + + err = rd_kafka_query_watermark_offsets(rk, topic, partition, &low, + &high, tmout_multip(10000)); + + TEST_ASSERT(!err, "query_warmark_offsets(%s, %d) failed: %s", topic, + (int)partition, rd_kafka_err2str(err)); + + return low; +} + + +/** + * @brief Wait for compaction by checking for + * partition low-watermark increasing */ +static void wait_compaction(rd_kafka_t *rk, + const char *topic, + int32_t partition, + int64_t low_offset, + int timeout_ms) { + int64_t low = -1; + int64_t ts_start = test_clock(); + + TEST_SAY( + "Waiting for compaction to kick in and increase the " + "Low watermark offset from %" PRId64 " on %s [%" PRId32 "]\n", + low_offset, topic, partition); + + while (1) { + low = get_low_wmark(rk, topic, partition); + + TEST_SAY("Low watermark offset for %s [%" PRId32 + "] is " + "%" PRId64 " (want > %" PRId64 ")\n", + topic, partition, low, low_offset); + + if (low > low_offset) + break; + + if (ts_start + (timeout_ms * 1000) < test_clock()) + break; + + rd_sleep(5); + } +} + +static void produce_compactable_msgs(const char *topic, + int32_t partition, + uint64_t testid, + int msgcnt, + size_t msgsize) { + rd_kafka_t *rk; + rd_kafka_conf_t *conf; + int i; + char *val; + char key[16]; + rd_kafka_resp_err_t err; + int msgcounter = msgcnt; + + if (!testid) + testid = test_id_generate(); + + test_str_id_generate(key, sizeof(key)); + + val = calloc(1, msgsize); + + TEST_SAY("Producing %d messages (total of %" PRIusz + " bytes) of " + "compactable messages\n", + msgcnt, (size_t)msgcnt * msgsize); + + test_conf_init(&conf, NULL, 0); + rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); + /* Make sure batch size does not exceed segment.bytes since that + * will make the ProduceRequest fail. */ + test_conf_set(conf, "batch.num.messages", "1"); + + rk = test_create_handle(RD_KAFKA_PRODUCER, conf); + + for (i = 0; i < msgcnt - 1; i++) { + err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC(topic), + RD_KAFKA_V_PARTITION(partition), + RD_KAFKA_V_KEY(key, sizeof(key) - 1), + RD_KAFKA_V_VALUE(val, msgsize), + RD_KAFKA_V_OPAQUE(&msgcounter), + RD_KAFKA_V_END); + TEST_ASSERT(!err, "producev(): %s", rd_kafka_err2str(err)); + } + + /* Final message is the tombstone */ + err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC(topic), + RD_KAFKA_V_PARTITION(partition), + RD_KAFKA_V_KEY(key, sizeof(key) - 1), + RD_KAFKA_V_OPAQUE(&msgcounter), RD_KAFKA_V_END); + TEST_ASSERT(!err, "producev(): %s", rd_kafka_err2str(err)); + + test_flush(rk, tmout_multip(10000)); + TEST_ASSERT(msgcounter == 0, "%d messages unaccounted for", msgcounter); + + rd_kafka_destroy(rk); + + free(val); +} + + + +static void do_test_compaction(int msgs_per_key, const char *compression) { + const char *topic = test_mk_topic_name(__FILE__, 1); +#define _KEY_CNT 4 + const char *keys[_KEY_CNT] = {"k1", "k2", "k3", + NULL /*generate unique*/}; + int msgcnt = msgs_per_key * _KEY_CNT; + rd_kafka_conf_t *conf; + rd_kafka_t *rk; + rd_kafka_topic_t *rkt; + uint64_t testid; + int32_t partition = 0; + int cnt = 0; + test_msgver_t mv; + test_msgver_t mv_correct; + int msgcounter = 0; + const int fillcnt = 20; + + testid = test_id_generate(); + + TEST_SAY( + _C_MAG + "Test compaction on topic %s with %s compression (%d messages)\n", + topic, compression ? compression : "no", msgcnt); + + test_kafka_topics( + "--create --topic \"%s\" " + "--partitions %d " + "--replication-factor 1 " + "--config cleanup.policy=compact " + "--config segment.ms=10000 " + "--config segment.bytes=10000 " + "--config min.cleanable.dirty.ratio=0.01 " + "--config delete.retention.ms=86400 " + "--config file.delete.delay.ms=10000 " + "--config max.compaction.lag.ms=100", + topic, partition + 1); + + test_conf_init(&conf, NULL, 120); + rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); + if (compression) + test_conf_set(conf, "compression.codec", compression); + /* Limit max batch size below segment.bytes to avoid messages + * to accumulate into a batch that will be rejected by the broker. */ + test_conf_set(conf, "message.max.bytes", "6000"); + test_conf_set(conf, "linger.ms", "10"); + rk = test_create_handle(RD_KAFKA_PRODUCER, conf); + rkt = rd_kafka_topic_new(rk, topic, NULL); + + /* The low watermark is not updated on message deletion(compaction) + * but on segment deletion, so fill up the first segment with + * random messages eligible for hasty compaction. */ + produce_compactable_msgs(topic, 0, partition, fillcnt, 1000); + + /* Populate a correct msgver for later comparison after compact. */ + test_msgver_init(&mv_correct, testid); + + TEST_SAY("Producing %d messages for %d keys\n", msgcnt, _KEY_CNT); + for (cnt = 0; cnt < msgcnt;) { + int k; + + for (k = 0; k < _KEY_CNT; k++) { + rd_kafka_resp_err_t err; + int is_last = cnt + _KEY_CNT >= msgcnt; + /* Let keys[0] have some tombstones */ + int is_tombstone = (k == 0 && (is_last || !(cnt % 7))); + char *valp; + size_t valsize; + char rdk_msgid[256]; + char unique_key[16]; + const void *key; + size_t keysize; + int64_t offset = fillcnt + cnt; + + test_msg_fmt(rdk_msgid, sizeof(rdk_msgid), testid, + partition, cnt); + + if (is_tombstone) { + valp = NULL; + valsize = 0; + } else { + valp = rdk_msgid; + valsize = strlen(valp); + } + + if (!(key = keys[k])) { + rd_snprintf(unique_key, sizeof(unique_key), + "%d", cnt); + key = unique_key; + } + keysize = strlen(key); + + /* All unique-key messages should remain intact + * after compaction. */ + if (!keys[k] || is_last) { + TEST_SAYL(4, + "Add to correct msgvec: " + "msgid: %d: %s is_last=%d, " + "is_tomb=%d\n", + cnt, (const char *)key, is_last, + is_tombstone); + test_msgver_add_msg00( + __FUNCTION__, __LINE__, rd_kafka_name(rk), + &mv_correct, testid, topic, partition, + offset, -1, -1, 0, cnt); + } + + + msgcounter++; + err = rd_kafka_producev( + rk, RD_KAFKA_V_TOPIC(topic), + RD_KAFKA_V_PARTITION(0), + RD_KAFKA_V_KEY(key, keysize), + RD_KAFKA_V_VALUE(valp, valsize), + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), + RD_KAFKA_V_HEADER("rdk_msgid", rdk_msgid, -1), + /* msgcounter as msg_opaque is used + * by test delivery report callback to + * count number of messages. */ + RD_KAFKA_V_OPAQUE(&msgcounter), RD_KAFKA_V_END); + TEST_ASSERT(!err, "producev(#%d) failed: %s", cnt, + rd_kafka_err2str(err)); + + cnt++; + } + } + + TEST_ASSERT(cnt == msgcnt, "cnt %d != msgcnt %d", cnt, msgcnt); + + msgcounter = cnt; + test_wait_delivery(rk, &msgcounter); + + /* Trigger compaction by filling up the segment with dummy messages, + * do it in chunks to avoid too good compression which then won't + * fill up the segments.. + * We can't reuse the existing producer instance because it + * might be using compression which makes it hard to know how + * much data we need to produce to trigger compaction. */ + produce_compactable_msgs(topic, 0, partition, 20, 1024); + + /* Wait for compaction: + * this doesn't really work because the low watermark offset + * is not updated on compaction if the first segment is not deleted. + * But it serves as a pause to let compaction kick in + * which is triggered by the dummy produce above. */ + wait_compaction(rk, topic, partition, 0, 20 * 1000); + + TEST_SAY(_C_YEL "Verify messages after compaction\n"); + /* After compaction we expect the following messages: + * last message for each of k1, k2, k3, all messages for unkeyed. */ + test_msgver_init(&mv, testid); + mv.msgid_hdr = "rdk_msgid"; + test_consume_msgs_easy_mv(NULL, topic, -1, testid, 1, -1, NULL, &mv); + test_msgver_verify_compare("post-compaction", &mv, &mv_correct, + TEST_MSGVER_BY_MSGID | + TEST_MSGVER_BY_OFFSET); + test_msgver_clear(&mv); + + test_msgver_clear(&mv_correct); + + rd_kafka_topic_destroy(rkt); + rd_kafka_destroy(rk); + + TEST_SAY(_C_GRN "Compaction test with %s compression: PASS\n", + compression ? compression : "no"); +} + +int main_0077_compaction(int argc, char **argv) { + + if (!test_can_create_topics(1)) + return 0; + + if (test_needs_auth()) { + TEST_SKIP("Test cluster requires authentication/SSL\n"); + return 0; + } + + do_test_compaction(10, NULL); + + if (test_quick) { + TEST_SAY( + "Skipping further compaction tests " + "due to quick mode\n"); + return 0; + } + + do_test_compaction(1000, NULL); +#if WITH_SNAPPY + do_test_compaction(10, "snappy"); +#endif +#if WITH_ZSTD + do_test_compaction(10, "zstd"); +#endif +#if WITH_ZLIB + do_test_compaction(10000, "gzip"); +#endif + + return 0; +} |