summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/lib/librdkafka-2.1.0/tests/0052-msg_timestamps.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 11:19:16 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:07:37 +0000
commitb485aab7e71c1625cfc27e0f92c9509f42378458 (patch)
treeae9abe108601079d1679194de237c9a435ae5b55 /src/fluent-bit/lib/librdkafka-2.1.0/tests/0052-msg_timestamps.c
parentAdding upstream version 1.44.3. (diff)
downloadnetdata-b485aab7e71c1625cfc27e0f92c9509f42378458.tar.xz
netdata-b485aab7e71c1625cfc27e0f92c9509f42378458.zip
Adding upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0052-msg_timestamps.c')
-rw-r--r--src/fluent-bit/lib/librdkafka-2.1.0/tests/0052-msg_timestamps.c220
1 files changed, 220 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0052-msg_timestamps.c b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0052-msg_timestamps.c
new file mode 100644
index 000000000..ef9b89878
--- /dev/null
+++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0052-msg_timestamps.c
@@ -0,0 +1,220 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2015, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+#include "rdkafka.h"
+
+/**
+ * Verify message timestamp behaviour on supporting brokers (>=0.10.0.0).
+ * Issue #858
+ */
+struct timestamp_range {
+ int64_t min;
+ int64_t max;
+};
+
+static const struct timestamp_range invalid_timestamp = {-1, -1};
+static struct timestamp_range broker_timestamp;
+static struct timestamp_range my_timestamp;
+
+static void prepare_timestamps(void) {
+ struct timeval ts;
+ rd_gettimeofday(&ts, NULL);
+
+ /* broker timestamps expected to be within 600 seconds */
+ broker_timestamp.min = (int64_t)ts.tv_sec * 1000LLU;
+ broker_timestamp.max = broker_timestamp.min + (600 * 1000LLU);
+
+ /* client timestamps: set in the future (24 hours)
+ * to be outside of broker timestamps */
+ my_timestamp.min = my_timestamp.max =
+ (int64_t)ts.tv_sec + (24 * 3600 * 1000LLU);
+}
+
+/**
+ * @brief Produce messages according to compress \p codec
+ */
+static void produce_msgs(const char *topic,
+ int partition,
+ uint64_t testid,
+ int msgcnt,
+ const char *broker_version,
+ const char *codec) {
+ rd_kafka_conf_t *conf;
+ rd_kafka_t *rk;
+ int i;
+ char key[128], buf[100];
+ int msgcounter = msgcnt;
+
+ test_conf_init(&conf, NULL, 0);
+ rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
+ test_conf_set(conf, "compression.codec", codec);
+ test_conf_set(conf, "broker.version.fallback", broker_version);
+ if (!strncmp(broker_version, "0.8", 3) ||
+ !strncmp(broker_version, "0.9", 3)) {
+ test_conf_set(conf, "api.version.request", "false");
+ test_conf_set(conf, "enable.idempotence", "false");
+ }
+
+ /* Make sure to trigger a bunch of MessageSets */
+ test_conf_set(conf, "batch.num.messages", tsprintf("%d", msgcnt / 5));
+ rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
+
+ for (i = 0; i < msgcnt; i++) {
+ rd_kafka_resp_err_t err;
+
+ test_prepare_msg(testid, partition, i, buf, sizeof(buf), key,
+ sizeof(key));
+
+ err = rd_kafka_producev(
+ rk, RD_KAFKA_V_TOPIC(topic),
+ RD_KAFKA_V_VALUE(buf, sizeof(buf)),
+ RD_KAFKA_V_KEY(key, sizeof(key)),
+ RD_KAFKA_V_TIMESTAMP(my_timestamp.min),
+ RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
+ RD_KAFKA_V_OPAQUE(&msgcounter), RD_KAFKA_V_END);
+ if (err)
+ TEST_FAIL("producev() failed at msg #%d/%d: %s", i,
+ msgcnt, rd_kafka_err2str(err));
+ }
+
+ TEST_SAY("Waiting for %d messages to be produced\n", msgcounter);
+ while (msgcounter > 0)
+ rd_kafka_poll(rk, 100);
+
+ rd_kafka_destroy(rk);
+}
+
+static void
+consume_msgs_verify_timestamps(const char *topic,
+ int partition,
+ uint64_t testid,
+ int msgcnt,
+ const struct timestamp_range *exp_timestamp) {
+ test_msgver_t mv;
+
+ test_msgver_init(&mv, testid);
+ test_consume_msgs_easy_mv(topic, topic, -1, testid, -1, msgcnt, NULL,
+ &mv);
+
+ test_msgver_verify0(
+ __FUNCTION__, __LINE__, topic, &mv,
+ TEST_MSGVER_RANGE | TEST_MSGVER_BY_MSGID | TEST_MSGVER_BY_TIMESTAMP,
+ (struct test_mv_vs) {.msg_base = 0,
+ .exp_cnt = msgcnt,
+ .timestamp_min = exp_timestamp->min,
+ .timestamp_max = exp_timestamp->max});
+
+ test_msgver_clear(&mv);
+}
+
+
+
+static void test_timestamps(const char *broker_tstype,
+ const char *broker_version,
+ const char *codec,
+ const struct timestamp_range *exp_timestamps) {
+ const char *topic =
+ test_mk_topic_name(tsprintf("0052_msg_timestamps_%s_%s_%s",
+ broker_tstype, broker_version, codec),
+ 1);
+ const int msgcnt = 20;
+ uint64_t testid = test_id_generate();
+
+ if ((!strncmp(broker_version, "0.9", 3) ||
+ !strncmp(broker_version, "0.8", 3)) &&
+ !test_conf_match(NULL, "sasl.mechanisms", "GSSAPI")) {
+ TEST_SAY(_C_YEL
+ "Skipping %s, %s test: "
+ "SaslHandshake not supported by broker v%s" _C_CLR
+ "\n",
+ broker_tstype, codec, broker_version);
+ return;
+ }
+
+ TEST_SAY(_C_MAG "Timestamp test using %s\n", topic);
+ test_timeout_set(30);
+
+ test_kafka_topics(
+ "--create --topic \"%s\" "
+ "--replication-factor 1 --partitions 1 "
+ "--config message.timestamp.type=%s",
+ topic, broker_tstype);
+
+ TEST_SAY(_C_MAG "Producing %d messages to %s\n", msgcnt, topic);
+ produce_msgs(topic, 0, testid, msgcnt, broker_version, codec);
+
+ TEST_SAY(_C_MAG
+ "Consuming and verifying %d messages from %s "
+ "with expected timestamps %" PRId64 "..%" PRId64 "\n",
+ msgcnt, topic, exp_timestamps->min, exp_timestamps->max);
+
+ consume_msgs_verify_timestamps(topic, 0, testid, msgcnt,
+ exp_timestamps);
+}
+
+
+int main_0052_msg_timestamps(int argc, char **argv) {
+
+ if (!test_can_create_topics(1))
+ return 0;
+
+ if (test_needs_auth()) {
+ TEST_SKIP("Test cluster requires authentication/SSL\n");
+ return 0;
+ }
+
+ /* Broker version limits the producer's feature set,
+ * for 0.9.0.0 no timestamp will be transmitted,
+ * but for 0.10.1.0 (or newer, api.version.request will be true)
+ * the producer will set the timestamp.
+ * In all cases we want a reasonable timestamp back.
+ *
+ * Explicit broker LogAppendTime setting will overwrite
+ * any producer-provided offset.
+ *
+ * Using the old non-timestamp-aware protocol without
+ * LogAppendTime will cause unset/invalid timestamps .
+ *
+ * Any other option should honour the producer create timestamps.
+ */
+ prepare_timestamps();
+
+ test_timestamps("CreateTime", "0.10.1.0", "none", &my_timestamp);
+ test_timestamps("LogAppendTime", "0.10.1.0", "none", &broker_timestamp);
+ test_timestamps("CreateTime", "0.9.0.0", "none", &invalid_timestamp);
+ test_timestamps("LogAppendTime", "0.9.0.0", "none", &broker_timestamp);
+#if WITH_ZLIB
+ test_timestamps("CreateTime", "0.10.1.0", "gzip", &my_timestamp);
+ test_timestamps("LogAppendTime", "0.10.1.0", "gzip", &broker_timestamp);
+ test_timestamps("CreateTime", "0.9.0.0", "gzip", &invalid_timestamp);
+ test_timestamps("LogAppendTime", "0.9.0.0", "gzip", &broker_timestamp);
+#endif
+
+ return 0;
+}