summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/lib/librdkafka-2.1.0/tests/0055-producer_latency.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0055-producer_latency.c')
-rw-r--r--src/fluent-bit/lib/librdkafka-2.1.0/tests/0055-producer_latency.c366
1 files changed, 366 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0055-producer_latency.c b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0055-producer_latency.c
new file mode 100644
index 000000000..e0244cec9
--- /dev/null
+++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0055-producer_latency.c
@@ -0,0 +1,366 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2015, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+#include "rdkafka.h"
+
+
+#define _MSG_COUNT 10
+struct latconf {
+ const char *name;
+ const char *conf[16];
+ int min; /* Minimum expected latency */
+ int max; /* Maximum expected latency */
+
+ float rtt; /* Network+broker latency */
+
+
+ char linger_ms_conf[32]; /**< Read back to show actual value */
+
+ /* Result vector */
+ rd_bool_t passed;
+ float latency[_MSG_COUNT];
+ float sum;
+ int cnt;
+ int wakeups;
+};
+
+static int tot_wakeups = 0;
+
+static void
+dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {
+ struct latconf *latconf = opaque;
+ int64_t *ts_send = (int64_t *)rkmessage->_private;
+ float delivery_time;
+
+ if (rkmessage->err)
+ TEST_FAIL("%s: delivery failed: %s\n", latconf->name,
+ rd_kafka_err2str(rkmessage->err));
+
+ if (!rkmessage->_private)
+ return; /* Priming message, ignore. */
+
+ delivery_time = (float)(test_clock() - *ts_send) / 1000.0f;
+
+ free(ts_send);
+
+ TEST_ASSERT(latconf->cnt < _MSG_COUNT, "");
+
+ TEST_SAY("%s: Message %d delivered in %.3fms\n", latconf->name,
+ latconf->cnt, delivery_time);
+
+ latconf->latency[latconf->cnt++] = delivery_time;
+ latconf->sum += delivery_time;
+}
+
+
+/**
+ * @brief A stats callback to get the per-broker wakeup counts.
+ *
+ * The JSON "parsing" here is crude..
+ */
+static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) {
+ const char *t = json;
+ int cnt = 0;
+ int total = 0;
+
+ /* Since we're only producing to one partition there will only be
+ * one broker, the leader, who's wakeup counts we're interested in, but
+ * we also want to know that other broker threads aren't spinning
+ * like crazy. So just summarize all the wakeups from all brokers. */
+ while ((t = strstr(t, "\"wakeups\":"))) {
+ int wakeups;
+ const char *next;
+
+ t += strlen("\"wakeups\":");
+ while (isspace((int)*t))
+ t++;
+ wakeups = strtol(t, (char **)&next, 0);
+
+ TEST_ASSERT(t != next, "No wakeup number found at \"%.*s...\"",
+ 16, t);
+
+ total += wakeups;
+ cnt++;
+
+ t = next;
+ }
+
+ TEST_ASSERT(cnt > 0, "No brokers found in stats");
+
+ tot_wakeups = total;
+
+ return 0;
+}
+
+
+static int verify_latency(struct latconf *latconf) {
+ float avg;
+ int fails = 0;
+ double ext_overhead =
+ latconf->rtt + 5.0 /* broker ProduceRequest handling time, maybe */;
+
+ ext_overhead *= test_timeout_multiplier;
+
+ avg = latconf->sum / (float)latconf->cnt;
+
+ TEST_SAY(
+ "%s: average latency %.3fms, allowed range %d..%d +%.0fms, "
+ "%d wakeups\n",
+ latconf->name, avg, latconf->min, latconf->max, ext_overhead,
+ tot_wakeups);
+
+ if (avg < (float)latconf->min ||
+ avg > (float)latconf->max + ext_overhead) {
+ TEST_FAIL_LATER(
+ "%s: average latency %.3fms is "
+ "outside range %d..%d +%.0fms",
+ latconf->name, avg, latconf->min, latconf->max,
+ ext_overhead);
+ fails++;
+ }
+
+ latconf->wakeups = tot_wakeups;
+ if (latconf->wakeups < 10 || latconf->wakeups > 1000) {
+ TEST_FAIL_LATER(
+ "%s: broker wakeups out of range: %d, "
+ "expected 10..1000",
+ latconf->name, latconf->wakeups);
+ fails++;
+ }
+
+
+ return fails;
+}
+
+static void measure_rtt(struct latconf *latconf, rd_kafka_t *rk) {
+ rd_kafka_resp_err_t err;
+ const struct rd_kafka_metadata *md;
+ int64_t ts = test_clock();
+
+ err = rd_kafka_metadata(rk, 0, NULL, &md, tmout_multip(5000));
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+ latconf->rtt = (float)(test_clock() - ts) / 1000.0f;
+
+ TEST_SAY("%s: broker base RTT is %.3fms\n", latconf->name,
+ latconf->rtt);
+ rd_kafka_metadata_destroy(md);
+}
+
+
+
+static void test_producer_latency(const char *topic, struct latconf *latconf) {
+ rd_kafka_t *rk;
+ rd_kafka_conf_t *conf;
+ rd_kafka_resp_err_t err;
+ int i;
+ size_t sz;
+ rd_bool_t with_transactions = rd_false;
+
+ SUB_TEST("%s (linger.ms=%d)", latconf->name);
+
+ test_conf_init(&conf, NULL, 60);
+
+ rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
+ rd_kafka_conf_set_opaque(conf, latconf);
+ rd_kafka_conf_set_stats_cb(conf, stats_cb);
+ test_conf_set(conf, "statistics.interval.ms", "100");
+ tot_wakeups = 0;
+
+ for (i = 0; latconf->conf[i]; i += 2) {
+ TEST_SAY("%s: set conf %s = %s\n", latconf->name,
+ latconf->conf[i], latconf->conf[i + 1]);
+ test_conf_set(conf, latconf->conf[i], latconf->conf[i + 1]);
+ if (!strcmp(latconf->conf[i], "transactional.id"))
+ with_transactions = rd_true;
+ }
+
+ sz = sizeof(latconf->linger_ms_conf);
+ rd_kafka_conf_get(conf, "linger.ms", latconf->linger_ms_conf, &sz);
+
+ rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
+
+ if (with_transactions) {
+ TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 10 * 1000));
+ TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
+ }
+
+ TEST_SAY("%s: priming producer\n", latconf->name);
+ /* Send a priming message to make sure everything is up
+ * and functional before starting measurements */
+ err = rd_kafka_producev(
+ rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(0),
+ RD_KAFKA_V_VALUE("priming", 7),
+ RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_END);
+ if (err)
+ TEST_FAIL("%s: priming producev failed: %s", latconf->name,
+ rd_kafka_err2str(err));
+
+ if (with_transactions) {
+ TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
+ } else {
+ /* Await delivery */
+ rd_kafka_flush(rk, tmout_multip(5000));
+ }
+
+ /* Get a network+broker round-trip-time base time. */
+ measure_rtt(latconf, rk);
+
+ TEST_SAY("%s: producing %d messages\n", latconf->name, _MSG_COUNT);
+ for (i = 0; i < _MSG_COUNT; i++) {
+ int64_t *ts_send;
+ int pre_cnt = latconf->cnt;
+
+ if (with_transactions)
+ TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
+
+ ts_send = malloc(sizeof(*ts_send));
+ *ts_send = test_clock();
+
+ err = rd_kafka_producev(
+ rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(0),
+ RD_KAFKA_V_VALUE("hi", 2),
+ RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
+ RD_KAFKA_V_OPAQUE(ts_send), RD_KAFKA_V_END);
+ if (err)
+ TEST_FAIL("%s: producev #%d failed: %s", latconf->name,
+ i, rd_kafka_err2str(err));
+
+ /* Await delivery */
+ while (latconf->cnt == pre_cnt)
+ rd_kafka_poll(rk, 5000);
+
+ if (with_transactions) {
+ test_timing_t timing;
+ TIMING_START(&timing, "commit_transaction");
+ TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
+ TIMING_ASSERT_LATER(&timing, 0,
+ (int)(latconf->rtt + 50.0));
+ }
+ }
+
+ while (tot_wakeups == 0)
+ rd_kafka_poll(rk, 100); /* Get final stats_cb */
+
+ rd_kafka_destroy(rk);
+
+ if (verify_latency(latconf))
+ return; /* verify_latency() has already
+ * called TEST_FAIL_LATER() */
+
+
+ latconf->passed = rd_true;
+
+ SUB_TEST_PASS();
+}
+
+
+static float find_min(const struct latconf *latconf) {
+ int i;
+ float v = 1000000;
+
+ for (i = 0; i < latconf->cnt; i++)
+ if (latconf->latency[i] < v)
+ v = latconf->latency[i];
+
+ return v;
+}
+
+static float find_max(const struct latconf *latconf) {
+ int i;
+ float v = 0;
+
+ for (i = 0; i < latconf->cnt; i++)
+ if (latconf->latency[i] > v)
+ v = latconf->latency[i];
+
+ return v;
+}
+
+int main_0055_producer_latency(int argc, char **argv) {
+ const char *topic = test_mk_topic_name("0055_producer_latency", 1);
+ struct latconf latconfs[] = {
+ {"standard settings", {NULL}, 5, 5}, /* default is now 5ms */
+ {"low linger.ms (0ms)", {"linger.ms", "0", NULL}, 0, 0},
+ {"microsecond linger.ms (0.001ms)",
+ {"linger.ms", "0.001", NULL},
+ 0,
+ 1},
+ {"high linger.ms (3000ms)",
+ {"linger.ms", "3000", NULL},
+ 3000,
+ 3100},
+ {"linger.ms < 1000 (500ms)", /* internal block_max_ms */
+ {"linger.ms", "500", NULL},
+ 500,
+ 600},
+ {"no acks (0ms)",
+ {"linger.ms", "0", "acks", "0", "enable.idempotence", "false",
+ NULL},
+ 0,
+ 0},
+ {"idempotence (10ms)",
+ {"linger.ms", "10", "enable.idempotence", "true", NULL},
+ 10,
+ 10},
+ {"transactions (35ms)",
+ {"linger.ms", "35", "transactional.id", topic, NULL},
+ 35,
+ 50 + 35 /* extra time for AddPartitions..*/},
+ {NULL}};
+ struct latconf *latconf;
+
+ if (test_on_ci) {
+ TEST_SKIP("Latency measurements not reliable on CI\n");
+ return 0;
+ }
+
+ /* Create topic without replicas to keep broker-side latency down */
+ test_create_topic(NULL, topic, 1, 1);
+
+ for (latconf = latconfs; latconf->name; latconf++)
+ test_producer_latency(topic, latconf);
+
+ TEST_SAY(_C_YEL "Latency tests summary:\n" _C_CLR);
+ TEST_SAY("%-40s %9s %6s..%-6s %7s %9s %9s %9s %8s\n", "Name",
+ "linger.ms", "MinExp", "MaxExp", "RTT", "Min", "Average",
+ "Max", "Wakeups");
+
+ for (latconf = latconfs; latconf->name; latconf++)
+ TEST_SAY("%-40s %9s %6d..%-6d %7g %9g %9g %9g %8d%s\n",
+ latconf->name, latconf->linger_ms_conf, latconf->min,
+ latconf->max, latconf->rtt, find_min(latconf),
+ latconf->sum / latconf->cnt, find_max(latconf),
+ latconf->wakeups,
+ latconf->passed ? "" : _C_RED " FAILED");
+
+
+ TEST_LATER_CHECK("");
+
+ return 0;
+}