summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/lib/librdkafka-2.1.0/tests/0077-compaction.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0077-compaction.c')
-rw-r--r--src/fluent-bit/lib/librdkafka-2.1.0/tests/0077-compaction.c357
1 files changed, 357 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0077-compaction.c b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0077-compaction.c
new file mode 100644
index 000000000..01667114c
--- /dev/null
+++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0077-compaction.c
@@ -0,0 +1,357 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2015, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+#include "rdkafka.h"
+
+/**
+ * @brief Verify handling of compacted topics.
+ *
+ * General idea:
+ * - create a compacted topic with a low cleanup interval to promote quick
+ * compaction.
+ * - produce messages for 3 keys and interleave with unkeyed messages.
+ * interleave tombstones for k1 and k2, but not k3.
+ * - consume before compaction - verify all messages in place
+ * - wait for compaction
+ * - consume after compaction - verify expected messages.
+ */
+
+
+
+/**
+ * @brief Get low watermark in partition, we use this see if compaction
+ * has kicked in.
+ */
+static int64_t
+get_low_wmark(rd_kafka_t *rk, const char *topic, int32_t partition) {
+ rd_kafka_resp_err_t err;
+ int64_t low, high;
+
+ err = rd_kafka_query_watermark_offsets(rk, topic, partition, &low,
+ &high, tmout_multip(10000));
+
+ TEST_ASSERT(!err, "query_warmark_offsets(%s, %d) failed: %s", topic,
+ (int)partition, rd_kafka_err2str(err));
+
+ return low;
+}
+
+
+/**
+ * @brief Wait for compaction by checking for
+ * partition low-watermark increasing */
+static void wait_compaction(rd_kafka_t *rk,
+ const char *topic,
+ int32_t partition,
+ int64_t low_offset,
+ int timeout_ms) {
+ int64_t low = -1;
+ int64_t ts_start = test_clock();
+
+ TEST_SAY(
+ "Waiting for compaction to kick in and increase the "
+ "Low watermark offset from %" PRId64 " on %s [%" PRId32 "]\n",
+ low_offset, topic, partition);
+
+ while (1) {
+ low = get_low_wmark(rk, topic, partition);
+
+ TEST_SAY("Low watermark offset for %s [%" PRId32
+ "] is "
+ "%" PRId64 " (want > %" PRId64 ")\n",
+ topic, partition, low, low_offset);
+
+ if (low > low_offset)
+ break;
+
+ if (ts_start + (timeout_ms * 1000) < test_clock())
+ break;
+
+ rd_sleep(5);
+ }
+}
+
+static void produce_compactable_msgs(const char *topic,
+ int32_t partition,
+ uint64_t testid,
+ int msgcnt,
+ size_t msgsize) {
+ rd_kafka_t *rk;
+ rd_kafka_conf_t *conf;
+ int i;
+ char *val;
+ char key[16];
+ rd_kafka_resp_err_t err;
+ int msgcounter = msgcnt;
+
+ if (!testid)
+ testid = test_id_generate();
+
+ test_str_id_generate(key, sizeof(key));
+
+ val = calloc(1, msgsize);
+
+ TEST_SAY("Producing %d messages (total of %" PRIusz
+ " bytes) of "
+ "compactable messages\n",
+ msgcnt, (size_t)msgcnt * msgsize);
+
+ test_conf_init(&conf, NULL, 0);
+ rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
+ /* Make sure batch size does not exceed segment.bytes since that
+ * will make the ProduceRequest fail. */
+ test_conf_set(conf, "batch.num.messages", "1");
+
+ rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
+
+ for (i = 0; i < msgcnt - 1; i++) {
+ err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC(topic),
+ RD_KAFKA_V_PARTITION(partition),
+ RD_KAFKA_V_KEY(key, sizeof(key) - 1),
+ RD_KAFKA_V_VALUE(val, msgsize),
+ RD_KAFKA_V_OPAQUE(&msgcounter),
+ RD_KAFKA_V_END);
+ TEST_ASSERT(!err, "producev(): %s", rd_kafka_err2str(err));
+ }
+
+ /* Final message is the tombstone */
+ err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC(topic),
+ RD_KAFKA_V_PARTITION(partition),
+ RD_KAFKA_V_KEY(key, sizeof(key) - 1),
+ RD_KAFKA_V_OPAQUE(&msgcounter), RD_KAFKA_V_END);
+ TEST_ASSERT(!err, "producev(): %s", rd_kafka_err2str(err));
+
+ test_flush(rk, tmout_multip(10000));
+ TEST_ASSERT(msgcounter == 0, "%d messages unaccounted for", msgcounter);
+
+ rd_kafka_destroy(rk);
+
+ free(val);
+}
+
+
+
+static void do_test_compaction(int msgs_per_key, const char *compression) {
+ const char *topic = test_mk_topic_name(__FILE__, 1);
+#define _KEY_CNT 4
+ const char *keys[_KEY_CNT] = {"k1", "k2", "k3",
+ NULL /*generate unique*/};
+ int msgcnt = msgs_per_key * _KEY_CNT;
+ rd_kafka_conf_t *conf;
+ rd_kafka_t *rk;
+ rd_kafka_topic_t *rkt;
+ uint64_t testid;
+ int32_t partition = 0;
+ int cnt = 0;
+ test_msgver_t mv;
+ test_msgver_t mv_correct;
+ int msgcounter = 0;
+ const int fillcnt = 20;
+
+ testid = test_id_generate();
+
+ TEST_SAY(
+ _C_MAG
+ "Test compaction on topic %s with %s compression (%d messages)\n",
+ topic, compression ? compression : "no", msgcnt);
+
+ test_kafka_topics(
+ "--create --topic \"%s\" "
+ "--partitions %d "
+ "--replication-factor 1 "
+ "--config cleanup.policy=compact "
+ "--config segment.ms=10000 "
+ "--config segment.bytes=10000 "
+ "--config min.cleanable.dirty.ratio=0.01 "
+ "--config delete.retention.ms=86400 "
+ "--config file.delete.delay.ms=10000 "
+ "--config max.compaction.lag.ms=100",
+ topic, partition + 1);
+
+ test_conf_init(&conf, NULL, 120);
+ rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
+ if (compression)
+ test_conf_set(conf, "compression.codec", compression);
+ /* Limit max batch size below segment.bytes to avoid messages
+ * to accumulate into a batch that will be rejected by the broker. */
+ test_conf_set(conf, "message.max.bytes", "6000");
+ test_conf_set(conf, "linger.ms", "10");
+ rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
+ rkt = rd_kafka_topic_new(rk, topic, NULL);
+
+ /* The low watermark is not updated on message deletion(compaction)
+ * but on segment deletion, so fill up the first segment with
+ * random messages eligible for hasty compaction. */
+ produce_compactable_msgs(topic, 0, partition, fillcnt, 1000);
+
+ /* Populate a correct msgver for later comparison after compact. */
+ test_msgver_init(&mv_correct, testid);
+
+ TEST_SAY("Producing %d messages for %d keys\n", msgcnt, _KEY_CNT);
+ for (cnt = 0; cnt < msgcnt;) {
+ int k;
+
+ for (k = 0; k < _KEY_CNT; k++) {
+ rd_kafka_resp_err_t err;
+ int is_last = cnt + _KEY_CNT >= msgcnt;
+ /* Let keys[0] have some tombstones */
+ int is_tombstone = (k == 0 && (is_last || !(cnt % 7)));
+ char *valp;
+ size_t valsize;
+ char rdk_msgid[256];
+ char unique_key[16];
+ const void *key;
+ size_t keysize;
+ int64_t offset = fillcnt + cnt;
+
+ test_msg_fmt(rdk_msgid, sizeof(rdk_msgid), testid,
+ partition, cnt);
+
+ if (is_tombstone) {
+ valp = NULL;
+ valsize = 0;
+ } else {
+ valp = rdk_msgid;
+ valsize = strlen(valp);
+ }
+
+ if (!(key = keys[k])) {
+ rd_snprintf(unique_key, sizeof(unique_key),
+ "%d", cnt);
+ key = unique_key;
+ }
+ keysize = strlen(key);
+
+ /* All unique-key messages should remain intact
+ * after compaction. */
+ if (!keys[k] || is_last) {
+ TEST_SAYL(4,
+ "Add to correct msgvec: "
+ "msgid: %d: %s is_last=%d, "
+ "is_tomb=%d\n",
+ cnt, (const char *)key, is_last,
+ is_tombstone);
+ test_msgver_add_msg00(
+ __FUNCTION__, __LINE__, rd_kafka_name(rk),
+ &mv_correct, testid, topic, partition,
+ offset, -1, -1, 0, cnt);
+ }
+
+
+ msgcounter++;
+ err = rd_kafka_producev(
+ rk, RD_KAFKA_V_TOPIC(topic),
+ RD_KAFKA_V_PARTITION(0),
+ RD_KAFKA_V_KEY(key, keysize),
+ RD_KAFKA_V_VALUE(valp, valsize),
+ RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
+ RD_KAFKA_V_HEADER("rdk_msgid", rdk_msgid, -1),
+ /* msgcounter as msg_opaque is used
+ * by test delivery report callback to
+ * count number of messages. */
+ RD_KAFKA_V_OPAQUE(&msgcounter), RD_KAFKA_V_END);
+ TEST_ASSERT(!err, "producev(#%d) failed: %s", cnt,
+ rd_kafka_err2str(err));
+
+ cnt++;
+ }
+ }
+
+ TEST_ASSERT(cnt == msgcnt, "cnt %d != msgcnt %d", cnt, msgcnt);
+
+ msgcounter = cnt;
+ test_wait_delivery(rk, &msgcounter);
+
+ /* Trigger compaction by filling up the segment with dummy messages,
+ * do it in chunks to avoid too good compression which then won't
+ * fill up the segments..
+ * We can't reuse the existing producer instance because it
+ * might be using compression which makes it hard to know how
+ * much data we need to produce to trigger compaction. */
+ produce_compactable_msgs(topic, 0, partition, 20, 1024);
+
+ /* Wait for compaction:
+ * this doesn't really work because the low watermark offset
+ * is not updated on compaction if the first segment is not deleted.
+ * But it serves as a pause to let compaction kick in
+ * which is triggered by the dummy produce above. */
+ wait_compaction(rk, topic, partition, 0, 20 * 1000);
+
+ TEST_SAY(_C_YEL "Verify messages after compaction\n");
+ /* After compaction we expect the following messages:
+ * last message for each of k1, k2, k3, all messages for unkeyed. */
+ test_msgver_init(&mv, testid);
+ mv.msgid_hdr = "rdk_msgid";
+ test_consume_msgs_easy_mv(NULL, topic, -1, testid, 1, -1, NULL, &mv);
+ test_msgver_verify_compare("post-compaction", &mv, &mv_correct,
+ TEST_MSGVER_BY_MSGID |
+ TEST_MSGVER_BY_OFFSET);
+ test_msgver_clear(&mv);
+
+ test_msgver_clear(&mv_correct);
+
+ rd_kafka_topic_destroy(rkt);
+ rd_kafka_destroy(rk);
+
+ TEST_SAY(_C_GRN "Compaction test with %s compression: PASS\n",
+ compression ? compression : "no");
+}
+
+int main_0077_compaction(int argc, char **argv) {
+
+ if (!test_can_create_topics(1))
+ return 0;
+
+ if (test_needs_auth()) {
+ TEST_SKIP("Test cluster requires authentication/SSL\n");
+ return 0;
+ }
+
+ do_test_compaction(10, NULL);
+
+ if (test_quick) {
+ TEST_SAY(
+ "Skipping further compaction tests "
+ "due to quick mode\n");
+ return 0;
+ }
+
+ do_test_compaction(1000, NULL);
+#if WITH_SNAPPY
+ do_test_compaction(10, "snappy");
+#endif
+#if WITH_ZSTD
+ do_test_compaction(10, "zstd");
+#endif
+#if WITH_ZLIB
+ do_test_compaction(10000, "gzip");
+#endif
+
+ return 0;
+}