diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/0052-msg_timestamps.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/tests/0052-msg_timestamps.c | 220 |
1 files changed, 220 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/0052-msg_timestamps.c b/fluent-bit/lib/librdkafka-2.1.0/tests/0052-msg_timestamps.c new file mode 100644 index 00000000..ef9b8987 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/tests/0052-msg_timestamps.c @@ -0,0 +1,220 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2012-2015, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" +#include "rdkafka.h" + +/** + * Verify message timestamp behaviour on supporting brokers (>=0.10.0.0). + * Issue #858 + */ +struct timestamp_range { + int64_t min; + int64_t max; +}; + +static const struct timestamp_range invalid_timestamp = {-1, -1}; +static struct timestamp_range broker_timestamp; +static struct timestamp_range my_timestamp; + +static void prepare_timestamps(void) { + struct timeval ts; + rd_gettimeofday(&ts, NULL); + + /* broker timestamps expected to be within 600 seconds */ + broker_timestamp.min = (int64_t)ts.tv_sec * 1000LLU; + broker_timestamp.max = broker_timestamp.min + (600 * 1000LLU); + + /* client timestamps: set in the future (24 hours) + * to be outside of broker timestamps */ + my_timestamp.min = my_timestamp.max = + (int64_t)ts.tv_sec + (24 * 3600 * 1000LLU); +} + +/** + * @brief Produce messages according to compress \p codec + */ +static void produce_msgs(const char *topic, + int partition, + uint64_t testid, + int msgcnt, + const char *broker_version, + const char *codec) { + rd_kafka_conf_t *conf; + rd_kafka_t *rk; + int i; + char key[128], buf[100]; + int msgcounter = msgcnt; + + test_conf_init(&conf, NULL, 0); + rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); + test_conf_set(conf, "compression.codec", codec); + test_conf_set(conf, "broker.version.fallback", broker_version); + if (!strncmp(broker_version, "0.8", 3) || + !strncmp(broker_version, "0.9", 3)) { + test_conf_set(conf, "api.version.request", "false"); + test_conf_set(conf, "enable.idempotence", "false"); + } + + /* Make sure to trigger a bunch of MessageSets */ + test_conf_set(conf, "batch.num.messages", tsprintf("%d", msgcnt / 5)); + rk = test_create_handle(RD_KAFKA_PRODUCER, conf); + + for (i = 0; i < msgcnt; i++) { + rd_kafka_resp_err_t err; + + test_prepare_msg(testid, partition, i, buf, sizeof(buf), key, + sizeof(key)); + + err = rd_kafka_producev( + rk, RD_KAFKA_V_TOPIC(topic), + RD_KAFKA_V_VALUE(buf, sizeof(buf)), + RD_KAFKA_V_KEY(key, sizeof(key)), + RD_KAFKA_V_TIMESTAMP(my_timestamp.min), + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), + RD_KAFKA_V_OPAQUE(&msgcounter), RD_KAFKA_V_END); + if (err) + TEST_FAIL("producev() failed at msg #%d/%d: %s", i, + msgcnt, rd_kafka_err2str(err)); + } + + TEST_SAY("Waiting for %d messages to be produced\n", msgcounter); + while (msgcounter > 0) + rd_kafka_poll(rk, 100); + + rd_kafka_destroy(rk); +} + +static void +consume_msgs_verify_timestamps(const char *topic, + int partition, + uint64_t testid, + int msgcnt, + const struct timestamp_range *exp_timestamp) { + test_msgver_t mv; + + test_msgver_init(&mv, testid); + test_consume_msgs_easy_mv(topic, topic, -1, testid, -1, msgcnt, NULL, + &mv); + + test_msgver_verify0( + __FUNCTION__, __LINE__, topic, &mv, + TEST_MSGVER_RANGE | TEST_MSGVER_BY_MSGID | TEST_MSGVER_BY_TIMESTAMP, + (struct test_mv_vs) {.msg_base = 0, + .exp_cnt = msgcnt, + .timestamp_min = exp_timestamp->min, + .timestamp_max = exp_timestamp->max}); + + test_msgver_clear(&mv); +} + + + +static void test_timestamps(const char *broker_tstype, + const char *broker_version, + const char *codec, + const struct timestamp_range *exp_timestamps) { + const char *topic = + test_mk_topic_name(tsprintf("0052_msg_timestamps_%s_%s_%s", + broker_tstype, broker_version, codec), + 1); + const int msgcnt = 20; + uint64_t testid = test_id_generate(); + + if ((!strncmp(broker_version, "0.9", 3) || + !strncmp(broker_version, "0.8", 3)) && + !test_conf_match(NULL, "sasl.mechanisms", "GSSAPI")) { + TEST_SAY(_C_YEL + "Skipping %s, %s test: " + "SaslHandshake not supported by broker v%s" _C_CLR + "\n", + broker_tstype, codec, broker_version); + return; + } + + TEST_SAY(_C_MAG "Timestamp test using %s\n", topic); + test_timeout_set(30); + + test_kafka_topics( + "--create --topic \"%s\" " + "--replication-factor 1 --partitions 1 " + "--config message.timestamp.type=%s", + topic, broker_tstype); + + TEST_SAY(_C_MAG "Producing %d messages to %s\n", msgcnt, topic); + produce_msgs(topic, 0, testid, msgcnt, broker_version, codec); + + TEST_SAY(_C_MAG + "Consuming and verifying %d messages from %s " + "with expected timestamps %" PRId64 "..%" PRId64 "\n", + msgcnt, topic, exp_timestamps->min, exp_timestamps->max); + + consume_msgs_verify_timestamps(topic, 0, testid, msgcnt, + exp_timestamps); +} + + +int main_0052_msg_timestamps(int argc, char **argv) { + + if (!test_can_create_topics(1)) + return 0; + + if (test_needs_auth()) { + TEST_SKIP("Test cluster requires authentication/SSL\n"); + return 0; + } + + /* Broker version limits the producer's feature set, + * for 0.9.0.0 no timestamp will be transmitted, + * but for 0.10.1.0 (or newer, api.version.request will be true) + * the producer will set the timestamp. + * In all cases we want a reasonable timestamp back. + * + * Explicit broker LogAppendTime setting will overwrite + * any producer-provided offset. + * + * Using the old non-timestamp-aware protocol without + * LogAppendTime will cause unset/invalid timestamps . + * + * Any other option should honour the producer create timestamps. + */ + prepare_timestamps(); + + test_timestamps("CreateTime", "0.10.1.0", "none", &my_timestamp); + test_timestamps("LogAppendTime", "0.10.1.0", "none", &broker_timestamp); + test_timestamps("CreateTime", "0.9.0.0", "none", &invalid_timestamp); + test_timestamps("LogAppendTime", "0.9.0.0", "none", &broker_timestamp); +#if WITH_ZLIB + test_timestamps("CreateTime", "0.10.1.0", "gzip", &my_timestamp); + test_timestamps("LogAppendTime", "0.10.1.0", "gzip", &broker_timestamp); + test_timestamps("CreateTime", "0.9.0.0", "gzip", &invalid_timestamp); + test_timestamps("LogAppendTime", "0.9.0.0", "gzip", &broker_timestamp); +#endif + + return 0; +} |