summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdunittest.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdunittest.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdunittest.c529
1 files changed, 529 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdunittest.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdunittest.c
new file mode 100644
index 000000000..aa14b6aa8
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdunittest.c
@@ -0,0 +1,529 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2017 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifdef _WIN32
+#define RD_UNITTEST_QPC_OVERRIDES 1
+#endif
+
+#include "rd.h"
+#include "rdunittest.h"
+
+#include "rdvarint.h"
+#include "rdbuf.h"
+#include "crc32c.h"
+#include "rdmurmur2.h"
+#include "rdfnv1a.h"
+#if WITH_HDRHISTOGRAM
+#include "rdhdrhistogram.h"
+#endif
+#include "rdkafka_int.h"
+#include "rdkafka_broker.h"
+#include "rdkafka_request.h"
+
+#include "rdsysqueue.h"
+#include "rdkafka_sasl_oauthbearer.h"
+#if WITH_OAUTHBEARER_OIDC
+#include "rdkafka_sasl_oauthbearer_oidc.h"
+#endif
+#include "rdkafka_msgset.h"
+#include "rdkafka_txnmgr.h"
+
+rd_bool_t rd_unittest_assert_on_failure = rd_false;
+rd_bool_t rd_unittest_on_ci = rd_false;
+rd_bool_t rd_unittest_slow = rd_false;
+
+#if ENABLE_CODECOV
+/**
+ * @name Code coverage
+ * @{
+ */
+
+static rd_atomic64_t rd_ut_covnrs[RD_UT_COVNR_MAX + 1];
+
+void rd_ut_coverage(const char *file, const char *func, int line, int covnr) {
+ rd_assert(covnr >= 0 && covnr <= RD_UT_COVNR_MAX);
+ rd_atomic64_add(&rd_ut_covnrs[covnr], 1);
+}
+
+
+int64_t
+rd_ut_coverage_check(const char *file, const char *func, int line, int covnr) {
+ int64_t r;
+
+ rd_assert(covnr >= 0 && covnr <= RD_UT_COVNR_MAX);
+
+ r = rd_atomic64_get(&rd_ut_covnrs[covnr]);
+
+ if (!r) {
+ fprintf(stderr,
+ "\033[31m"
+ "RDUT: FAIL: %s:%d: %s: "
+ "Code coverage nr %d: FAIL: "
+ "code path not executed: "
+ "perform `grep -RnF 'COVERAGE(%d)' src/` to find "
+ "source location"
+ "\033[0m\n",
+ file, line, func, covnr, covnr);
+ if (rd_unittest_assert_on_failure)
+ rd_assert(!*"unittest failure");
+ return 0;
+ }
+
+ fprintf(stderr,
+ "\033[34mRDUT: CCOV: %s:%d: %s: Code coverage nr %d: "
+ "PASS (%" PRId64 " code path execution(s))\033[0m\n",
+ file, line, func, covnr, r);
+
+ return r;
+}
+/**@}*/
+
+#endif /* ENABLE_CODECOV */
+
+
+/**
+ * @name Test rdsysqueue.h / queue.h
+ * @{
+ */
+
+struct ut_tq {
+ TAILQ_ENTRY(ut_tq) link;
+ int v;
+};
+
+TAILQ_HEAD(ut_tq_head, ut_tq);
+
+struct ut_tq_args {
+ const char *name; /**< Descriptive test name */
+ struct {
+ int base; /**< Base value */
+ int cnt; /**< Number of elements to add */
+ int step; /**< Value step */
+ } q[3]; /**< Queue element definition */
+ int qcnt; /**< Number of defs in .q */
+ int exp[16]; /**< Expected value order after join */
+};
+
+/**
+ * @brief Find the previous element (insert position) for
+ * value \p val in list \p head or NULL if \p val is less than
+ * the first element in \p head.
+ * @remarks \p head must be ascending sorted.
+ */
+static struct ut_tq *ut_tq_find_prev_pos(const struct ut_tq_head *head,
+ int val) {
+ struct ut_tq *e, *prev = NULL;
+
+ TAILQ_FOREACH(e, head, link) {
+ if (e->v > val)
+ return prev;
+ prev = e;
+ }
+
+ return prev;
+}
+
+static int ut_tq_test(const struct ut_tq_args *args) {
+ int totcnt = 0;
+ int fails = 0;
+ struct ut_tq_head *tqh[3] = {NULL, NULL, NULL};
+ struct ut_tq *e, *insert_after;
+ int i, qi;
+
+ RD_UT_SAY("Testing TAILQ: %s", args->name);
+
+ /*
+ * Verify TAILQ_INSERT_LIST:
+ * For each insert position test:
+ * - create two lists: tqh 0 and 1
+ * - add entries to both lists
+ * - insert list 1 into 0
+ * - verify expected order and correctness
+ */
+
+ /* Use heap allocated heads to let valgrind/asan assist
+ * in detecting corruption. */
+
+ for (qi = 0; qi < args->qcnt; qi++) {
+ tqh[qi] = rd_calloc(1, sizeof(*tqh[qi]));
+ TAILQ_INIT(tqh[qi]);
+
+ for (i = 0; i < args->q[qi].cnt; i++) {
+ e = rd_malloc(sizeof(*e));
+ e->v = args->q[qi].base + (i * args->q[qi].step);
+ TAILQ_INSERT_TAIL(tqh[qi], e, link);
+ }
+
+ totcnt += args->q[qi].cnt;
+ }
+
+ for (qi = 1; qi < args->qcnt; qi++) {
+ insert_after = ut_tq_find_prev_pos(tqh[0], args->q[qi].base);
+ if (!insert_after) {
+ /* Insert position is head of list,
+ * do two-step concat+move */
+ TAILQ_PREPEND(tqh[0], tqh[qi], ut_tq_head, link);
+ } else {
+ TAILQ_INSERT_LIST(tqh[0], insert_after, tqh[qi],
+ ut_tq_head, struct ut_tq *, link);
+ }
+
+ RD_UT_ASSERT(TAILQ_EMPTY(tqh[qi]), "expected empty tqh[%d]",
+ qi);
+ RD_UT_ASSERT(!TAILQ_EMPTY(tqh[0]), "expected non-empty tqh[0]");
+
+ memset(tqh[qi], (int)'A', sizeof(*tqh[qi]));
+ rd_free(tqh[qi]);
+ }
+
+ RD_UT_ASSERT(TAILQ_LAST(tqh[0], ut_tq_head)->v == args->exp[totcnt - 1],
+ "TAILQ_LAST val %d, expected %d",
+ TAILQ_LAST(tqh[0], ut_tq_head)->v, args->exp[totcnt - 1]);
+
+ /* Add sentinel value to verify that INSERT_TAIL works
+ * after INSERT_LIST */
+ e = rd_malloc(sizeof(*e));
+ e->v = 99;
+ TAILQ_INSERT_TAIL(tqh[0], e, link);
+ totcnt++;
+
+ i = 0;
+ TAILQ_FOREACH(e, tqh[0], link) {
+ if (i >= totcnt) {
+ RD_UT_WARN(
+ "Too many elements in list tqh[0]: "
+ "idx %d > totcnt %d: element %p (value %d)",
+ i, totcnt, e, e->v);
+ fails++;
+ } else if (e->v != args->exp[i]) {
+ RD_UT_WARN(
+ "Element idx %d/%d in tqh[0] has value %d, "
+ "expected %d",
+ i, totcnt, e->v, args->exp[i]);
+ fails++;
+ } else if (i == totcnt - 1 &&
+ e != TAILQ_LAST(tqh[0], ut_tq_head)) {
+ RD_UT_WARN("TAILQ_LAST == %p, expected %p",
+ TAILQ_LAST(tqh[0], ut_tq_head), e);
+ fails++;
+ }
+ i++;
+ }
+
+ /* Then scan it in reverse */
+ i = totcnt - 1;
+ TAILQ_FOREACH_REVERSE(e, tqh[0], ut_tq_head, link) {
+ if (i < 0) {
+ RD_UT_WARN(
+ "REVERSE: Too many elements in list tqh[0]: "
+ "idx %d < 0: element %p (value %d)",
+ i, e, e->v);
+ fails++;
+ } else if (e->v != args->exp[i]) {
+ RD_UT_WARN(
+ "REVERSE: Element idx %d/%d in tqh[0] has "
+ "value %d, expected %d",
+ i, totcnt, e->v, args->exp[i]);
+ fails++;
+ } else if (i == totcnt - 1 &&
+ e != TAILQ_LAST(tqh[0], ut_tq_head)) {
+ RD_UT_WARN("REVERSE: TAILQ_LAST == %p, expected %p",
+ TAILQ_LAST(tqh[0], ut_tq_head), e);
+ fails++;
+ }
+ i--;
+ }
+
+ RD_UT_ASSERT(TAILQ_LAST(tqh[0], ut_tq_head)->v == args->exp[totcnt - 1],
+ "TAILQ_LAST val %d, expected %d",
+ TAILQ_LAST(tqh[0], ut_tq_head)->v, args->exp[totcnt - 1]);
+
+ while ((e = TAILQ_FIRST(tqh[0]))) {
+ TAILQ_REMOVE(tqh[0], e, link);
+ rd_free(e);
+ }
+
+ rd_free(tqh[0]);
+
+ return fails;
+}
+
+
+static int unittest_sysqueue(void) {
+ const struct ut_tq_args args[] = {
+ {"empty tqh[0]",
+ {{0, 0, 0}, {0, 3, 1}},
+ 2,
+ {0, 1, 2, 99 /*sentinel*/}},
+ {"prepend 1,0",
+ {{10, 3, 1}, {0, 3, 1}},
+ 2,
+ {0, 1, 2, 10, 11, 12, 99}},
+ {"prepend 2,1,0",
+ {
+ {10, 3, 1}, /* 10, 11, 12 */
+ {5, 3, 1}, /* 5, 6, 7 */
+ {0, 2, 1} /* 0, 1 */
+ },
+ 3,
+ {0, 1, 5, 6, 7, 10, 11, 12, 99}},
+ {"insert 1", {{0, 3, 2}, {1, 2, 2}}, 2, {0, 1, 3, 2, 4, 99}},
+ {"insert 1,2",
+ {
+ {0, 3, 3}, /* 0, 3, 6 */
+ {1, 2, 3}, /* 1, 4 */
+ {2, 1, 3} /* 2 */
+ },
+ 3,
+ {0, 1, 2, 4, 3, 6, 99}},
+ {"append 1",
+ {{0, 5, 1}, {5, 5, 1}},
+ 2,
+ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 99}},
+ {"append 1,2",
+ {
+ {0, 5, 1}, /* 0, 1, 2, 3, 4 */
+ {5, 5, 1}, /* 5, 6, 7, 8, 9 */
+ {11, 2, 1} /* 11, 12 */
+ },
+ 3,
+ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 99}},
+ {
+ "insert 1,0,2",
+ {
+ {5, 3, 1}, /* 5, 6, 7 */
+ {0, 1, 1}, /* 0 */
+ {10, 2, 1} /* 10, 11 */
+ },
+ 3,
+ {0, 5, 6, 7, 10, 11, 99},
+ },
+ {
+ "insert 2,0,1",
+ {
+ {5, 3, 1}, /* 5, 6, 7 */
+ {10, 2, 1}, /* 10, 11 */
+ {0, 1, 1} /* 0 */
+ },
+ 3,
+ {0, 5, 6, 7, 10, 11, 99},
+ },
+ {NULL}};
+ int i;
+ int fails = 0;
+
+ for (i = 0; args[i].name != NULL; i++)
+ fails += ut_tq_test(&args[i]);
+
+ RD_UT_ASSERT(!fails, "See %d previous failure(s)", fails);
+
+ RD_UT_PASS();
+}
+
+/**@}*/
+
+
+/**
+ * @name rd_clock() unittests
+ * @{
+ */
+
+#if RD_UNITTEST_QPC_OVERRIDES
+
+/**
+ * These values are based off a machine with freq 14318180
+ * which would cause the original rd_clock() calculation to overflow
+ * after about 8 days.
+ * Details:
+ * https://github.com/confluentinc/confluent-kafka-dotnet/issues/603#issuecomment-417274540
+ */
+
+static const int64_t rd_ut_qpc_freq = 14318180;
+static int64_t rd_ut_qpc_now;
+
+BOOL rd_ut_QueryPerformanceFrequency(_Out_ LARGE_INTEGER *lpFrequency) {
+ lpFrequency->QuadPart = rd_ut_qpc_freq;
+ return TRUE;
+}
+
+BOOL rd_ut_QueryPerformanceCounter(_Out_ LARGE_INTEGER *lpPerformanceCount) {
+ lpPerformanceCount->QuadPart = rd_ut_qpc_now * rd_ut_qpc_freq;
+ return TRUE;
+}
+
+static int unittest_rdclock(void) {
+ rd_ts_t t1, t2;
+
+ /* First let "uptime" be fresh boot (0). */
+ rd_ut_qpc_now = 0;
+ t1 = rd_clock();
+ rd_ut_qpc_now++;
+ t2 = rd_clock();
+ RD_UT_ASSERT(t2 == t1 + (1 * 1000000),
+ "Expected t2 %" PRId64 " to be 1s more than t1 %" PRId64,
+ t2, t1);
+
+ /* Then skip forward to 8 days, which should trigger the
+ * overflow in a faulty implementation. */
+ rd_ut_qpc_now = 8 * 86400;
+ t2 = rd_clock();
+ RD_UT_ASSERT(t2 == t1 + (8LL * 86400 * 1000000),
+ "Expected t2 %" PRId64
+ " to be 8 days larger than t1 %" PRId64,
+ t2, t1);
+
+ /* And make sure we can run on a system with 38 years of uptime.. */
+ rd_ut_qpc_now = 38 * 365 * 86400;
+ t2 = rd_clock();
+ RD_UT_ASSERT(t2 == t1 + (38LL * 365 * 86400 * 1000000),
+ "Expected t2 %" PRId64
+ " to be 38 years larger than t1 %" PRId64,
+ t2, t1);
+
+ RD_UT_PASS();
+}
+#endif
+
+
+
+/**@}*/
+
+extern int unittest_string(void);
+extern int unittest_cgrp(void);
+#if WITH_SASL_SCRAM
+extern int unittest_scram(void);
+#endif
+extern int unittest_assignors(void);
+extern int unittest_map(void);
+#if WITH_CURL
+extern int unittest_http(void);
+#endif
+#if WITH_OAUTHBEARER_OIDC
+extern int unittest_sasl_oauthbearer_oidc(void);
+#endif
+
+int rd_unittest(void) {
+ int fails = 0;
+ const struct {
+ const char *name;
+ int (*call)(void);
+ } unittests[] = {
+ {"sysqueue", unittest_sysqueue},
+ {"string", unittest_string},
+ {"map", unittest_map},
+ {"rdbuf", unittest_rdbuf},
+ {"rdvarint", unittest_rdvarint},
+ {"crc32c", unittest_rd_crc32c},
+ {"msg", unittest_msg},
+ {"murmurhash", unittest_murmur2},
+ {"fnv1a", unittest_fnv1a},
+#if WITH_HDRHISTOGRAM
+ {"rdhdrhistogram", unittest_rdhdrhistogram},
+#endif
+#ifdef _WIN32
+ {"rdclock", unittest_rdclock},
+#endif
+ {"conf", unittest_conf},
+ {"broker", unittest_broker},
+ {"request", unittest_request},
+#if WITH_SASL_OAUTHBEARER
+ {"sasl_oauthbearer", unittest_sasl_oauthbearer},
+#endif
+ {"aborted_txns", unittest_aborted_txns},
+ {"cgrp", unittest_cgrp},
+#if WITH_SASL_SCRAM
+ {"scram", unittest_scram},
+#endif
+ {"assignors", unittest_assignors},
+#if WITH_CURL
+ {"http", unittest_http},
+#endif
+#if WITH_OAUTHBEARER_OIDC
+ {"sasl_oauthbearer_oidc", unittest_sasl_oauthbearer_oidc},
+#endif
+ {NULL}
+ };
+ int i;
+ const char *match = rd_getenv("RD_UT_TEST", NULL);
+ int cnt = 0;
+
+ if (rd_getenv("RD_UT_ASSERT", NULL))
+ rd_unittest_assert_on_failure = rd_true;
+ if (rd_getenv("CI", NULL)) {
+ RD_UT_SAY("Unittests running on CI");
+ rd_unittest_on_ci = rd_true;
+ }
+
+ if (rd_unittest_on_ci || (ENABLE_DEVEL + 0)) {
+ RD_UT_SAY("Unittests will not error out on slow CPUs");
+ rd_unittest_slow = rd_true;
+ }
+
+ rd_kafka_global_init();
+
+#if ENABLE_CODECOV
+ for (i = 0; i < RD_UT_COVNR_MAX + 1; i++)
+ rd_atomic64_init(&rd_ut_covnrs[i], 0);
+#endif
+
+ for (i = 0; unittests[i].name; i++) {
+ int f;
+
+ if (match && !strstr(unittests[i].name, match))
+ continue;
+
+ f = unittests[i].call();
+ RD_UT_SAY("unittest: %s: %4s\033[0m", unittests[i].name,
+ f ? "\033[31mFAIL" : "\033[32mPASS");
+ fails += f;
+ cnt++;
+ }
+
+#if ENABLE_CODECOV
+#if FIXME /* This check only works if all tests that use coverage checks \
+ * are run, which we can't really know, so disable until we \
+ * know what to do with this. */
+ if (!match) {
+ /* Verify all code paths were covered */
+ int cov_fails = 0;
+ for (i = 0; i < RD_UT_COVNR_MAX + 1; i++) {
+ if (!RD_UT_COVERAGE_CHECK(i))
+ cov_fails++;
+ }
+ if (cov_fails > 0)
+ RD_UT_SAY("%d code coverage failure(s) (ignored)\n",
+ cov_fails);
+ }
+#endif
+#endif
+
+ if (!cnt && match)
+ RD_UT_WARN("No unittests matching \"%s\"", match);
+
+ return fails;
+}