diff options
Diffstat (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0055-producer_latency.c')
-rw-r--r-- | src/fluent-bit/lib/librdkafka-2.1.0/tests/0055-producer_latency.c | 366 |
1 files changed, 366 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0055-producer_latency.c b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0055-producer_latency.c new file mode 100644 index 000000000..e0244cec9 --- /dev/null +++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0055-producer_latency.c @@ -0,0 +1,366 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2012-2015, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" +#include "rdkafka.h" + + +#define _MSG_COUNT 10 +struct latconf { + const char *name; + const char *conf[16]; + int min; /* Minimum expected latency */ + int max; /* Maximum expected latency */ + + float rtt; /* Network+broker latency */ + + + char linger_ms_conf[32]; /**< Read back to show actual value */ + + /* Result vector */ + rd_bool_t passed; + float latency[_MSG_COUNT]; + float sum; + int cnt; + int wakeups; +}; + +static int tot_wakeups = 0; + +static void +dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) { + struct latconf *latconf = opaque; + int64_t *ts_send = (int64_t *)rkmessage->_private; + float delivery_time; + + if (rkmessage->err) + TEST_FAIL("%s: delivery failed: %s\n", latconf->name, + rd_kafka_err2str(rkmessage->err)); + + if (!rkmessage->_private) + return; /* Priming message, ignore. */ + + delivery_time = (float)(test_clock() - *ts_send) / 1000.0f; + + free(ts_send); + + TEST_ASSERT(latconf->cnt < _MSG_COUNT, ""); + + TEST_SAY("%s: Message %d delivered in %.3fms\n", latconf->name, + latconf->cnt, delivery_time); + + latconf->latency[latconf->cnt++] = delivery_time; + latconf->sum += delivery_time; +} + + +/** + * @brief A stats callback to get the per-broker wakeup counts. + * + * The JSON "parsing" here is crude.. + */ +static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) { + const char *t = json; + int cnt = 0; + int total = 0; + + /* Since we're only producing to one partition there will only be + * one broker, the leader, who's wakeup counts we're interested in, but + * we also want to know that other broker threads aren't spinning + * like crazy. So just summarize all the wakeups from all brokers. */ + while ((t = strstr(t, "\"wakeups\":"))) { + int wakeups; + const char *next; + + t += strlen("\"wakeups\":"); + while (isspace((int)*t)) + t++; + wakeups = strtol(t, (char **)&next, 0); + + TEST_ASSERT(t != next, "No wakeup number found at \"%.*s...\"", + 16, t); + + total += wakeups; + cnt++; + + t = next; + } + + TEST_ASSERT(cnt > 0, "No brokers found in stats"); + + tot_wakeups = total; + + return 0; +} + + +static int verify_latency(struct latconf *latconf) { + float avg; + int fails = 0; + double ext_overhead = + latconf->rtt + 5.0 /* broker ProduceRequest handling time, maybe */; + + ext_overhead *= test_timeout_multiplier; + + avg = latconf->sum / (float)latconf->cnt; + + TEST_SAY( + "%s: average latency %.3fms, allowed range %d..%d +%.0fms, " + "%d wakeups\n", + latconf->name, avg, latconf->min, latconf->max, ext_overhead, + tot_wakeups); + + if (avg < (float)latconf->min || + avg > (float)latconf->max + ext_overhead) { + TEST_FAIL_LATER( + "%s: average latency %.3fms is " + "outside range %d..%d +%.0fms", + latconf->name, avg, latconf->min, latconf->max, + ext_overhead); + fails++; + } + + latconf->wakeups = tot_wakeups; + if (latconf->wakeups < 10 || latconf->wakeups > 1000) { + TEST_FAIL_LATER( + "%s: broker wakeups out of range: %d, " + "expected 10..1000", + latconf->name, latconf->wakeups); + fails++; + } + + + return fails; +} + +static void measure_rtt(struct latconf *latconf, rd_kafka_t *rk) { + rd_kafka_resp_err_t err; + const struct rd_kafka_metadata *md; + int64_t ts = test_clock(); + + err = rd_kafka_metadata(rk, 0, NULL, &md, tmout_multip(5000)); + TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); + latconf->rtt = (float)(test_clock() - ts) / 1000.0f; + + TEST_SAY("%s: broker base RTT is %.3fms\n", latconf->name, + latconf->rtt); + rd_kafka_metadata_destroy(md); +} + + + +static void test_producer_latency(const char *topic, struct latconf *latconf) { + rd_kafka_t *rk; + rd_kafka_conf_t *conf; + rd_kafka_resp_err_t err; + int i; + size_t sz; + rd_bool_t with_transactions = rd_false; + + SUB_TEST("%s (linger.ms=%d)", latconf->name); + + test_conf_init(&conf, NULL, 60); + + rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb); + rd_kafka_conf_set_opaque(conf, latconf); + rd_kafka_conf_set_stats_cb(conf, stats_cb); + test_conf_set(conf, "statistics.interval.ms", "100"); + tot_wakeups = 0; + + for (i = 0; latconf->conf[i]; i += 2) { + TEST_SAY("%s: set conf %s = %s\n", latconf->name, + latconf->conf[i], latconf->conf[i + 1]); + test_conf_set(conf, latconf->conf[i], latconf->conf[i + 1]); + if (!strcmp(latconf->conf[i], "transactional.id")) + with_transactions = rd_true; + } + + sz = sizeof(latconf->linger_ms_conf); + rd_kafka_conf_get(conf, "linger.ms", latconf->linger_ms_conf, &sz); + + rk = test_create_handle(RD_KAFKA_PRODUCER, conf); + + if (with_transactions) { + TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 10 * 1000)); + TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); + } + + TEST_SAY("%s: priming producer\n", latconf->name); + /* Send a priming message to make sure everything is up + * and functional before starting measurements */ + err = rd_kafka_producev( + rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(0), + RD_KAFKA_V_VALUE("priming", 7), + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_END); + if (err) + TEST_FAIL("%s: priming producev failed: %s", latconf->name, + rd_kafka_err2str(err)); + + if (with_transactions) { + TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1)); + } else { + /* Await delivery */ + rd_kafka_flush(rk, tmout_multip(5000)); + } + + /* Get a network+broker round-trip-time base time. */ + measure_rtt(latconf, rk); + + TEST_SAY("%s: producing %d messages\n", latconf->name, _MSG_COUNT); + for (i = 0; i < _MSG_COUNT; i++) { + int64_t *ts_send; + int pre_cnt = latconf->cnt; + + if (with_transactions) + TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); + + ts_send = malloc(sizeof(*ts_send)); + *ts_send = test_clock(); + + err = rd_kafka_producev( + rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(0), + RD_KAFKA_V_VALUE("hi", 2), + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), + RD_KAFKA_V_OPAQUE(ts_send), RD_KAFKA_V_END); + if (err) + TEST_FAIL("%s: producev #%d failed: %s", latconf->name, + i, rd_kafka_err2str(err)); + + /* Await delivery */ + while (latconf->cnt == pre_cnt) + rd_kafka_poll(rk, 5000); + + if (with_transactions) { + test_timing_t timing; + TIMING_START(&timing, "commit_transaction"); + TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1)); + TIMING_ASSERT_LATER(&timing, 0, + (int)(latconf->rtt + 50.0)); + } + } + + while (tot_wakeups == 0) + rd_kafka_poll(rk, 100); /* Get final stats_cb */ + + rd_kafka_destroy(rk); + + if (verify_latency(latconf)) + return; /* verify_latency() has already + * called TEST_FAIL_LATER() */ + + + latconf->passed = rd_true; + + SUB_TEST_PASS(); +} + + +static float find_min(const struct latconf *latconf) { + int i; + float v = 1000000; + + for (i = 0; i < latconf->cnt; i++) + if (latconf->latency[i] < v) + v = latconf->latency[i]; + + return v; +} + +static float find_max(const struct latconf *latconf) { + int i; + float v = 0; + + for (i = 0; i < latconf->cnt; i++) + if (latconf->latency[i] > v) + v = latconf->latency[i]; + + return v; +} + +int main_0055_producer_latency(int argc, char **argv) { + const char *topic = test_mk_topic_name("0055_producer_latency", 1); + struct latconf latconfs[] = { + {"standard settings", {NULL}, 5, 5}, /* default is now 5ms */ + {"low linger.ms (0ms)", {"linger.ms", "0", NULL}, 0, 0}, + {"microsecond linger.ms (0.001ms)", + {"linger.ms", "0.001", NULL}, + 0, + 1}, + {"high linger.ms (3000ms)", + {"linger.ms", "3000", NULL}, + 3000, + 3100}, + {"linger.ms < 1000 (500ms)", /* internal block_max_ms */ + {"linger.ms", "500", NULL}, + 500, + 600}, + {"no acks (0ms)", + {"linger.ms", "0", "acks", "0", "enable.idempotence", "false", + NULL}, + 0, + 0}, + {"idempotence (10ms)", + {"linger.ms", "10", "enable.idempotence", "true", NULL}, + 10, + 10}, + {"transactions (35ms)", + {"linger.ms", "35", "transactional.id", topic, NULL}, + 35, + 50 + 35 /* extra time for AddPartitions..*/}, + {NULL}}; + struct latconf *latconf; + + if (test_on_ci) { + TEST_SKIP("Latency measurements not reliable on CI\n"); + return 0; + } + + /* Create topic without replicas to keep broker-side latency down */ + test_create_topic(NULL, topic, 1, 1); + + for (latconf = latconfs; latconf->name; latconf++) + test_producer_latency(topic, latconf); + + TEST_SAY(_C_YEL "Latency tests summary:\n" _C_CLR); + TEST_SAY("%-40s %9s %6s..%-6s %7s %9s %9s %9s %8s\n", "Name", + "linger.ms", "MinExp", "MaxExp", "RTT", "Min", "Average", + "Max", "Wakeups"); + + for (latconf = latconfs; latconf->name; latconf++) + TEST_SAY("%-40s %9s %6d..%-6d %7g %9g %9g %9g %8d%s\n", + latconf->name, latconf->linger_ms_conf, latconf->min, + latconf->max, latconf->rtt, find_min(latconf), + latconf->sum / latconf->cnt, find_max(latconf), + latconf->wakeups, + latconf->passed ? "" : _C_RED " FAILED"); + + + TEST_LATER_CHECK(""); + + return 0; +} |