/* * librdkafka - Apache Kafka C library * * Copyright (c) 2017 Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #ifdef _WIN32 #define RD_UNITTEST_QPC_OVERRIDES 1 #endif #include "rd.h" #include "rdunittest.h" #include "rdvarint.h" #include "rdbuf.h" #include "crc32c.h" #include "rdmurmur2.h" #include "rdfnv1a.h" #if WITH_HDRHISTOGRAM #include "rdhdrhistogram.h" #endif #include "rdkafka_int.h" #include "rdkafka_broker.h" #include "rdkafka_request.h" #include "rdsysqueue.h" #include "rdkafka_sasl_oauthbearer.h" #if WITH_OAUTHBEARER_OIDC #include "rdkafka_sasl_oauthbearer_oidc.h" #endif #include "rdkafka_msgset.h" #include "rdkafka_txnmgr.h" rd_bool_t rd_unittest_assert_on_failure = rd_false; rd_bool_t rd_unittest_on_ci = rd_false; rd_bool_t rd_unittest_slow = rd_false; #if ENABLE_CODECOV /** * @name Code coverage * @{ */ static rd_atomic64_t rd_ut_covnrs[RD_UT_COVNR_MAX + 1]; void rd_ut_coverage(const char *file, const char *func, int line, int covnr) { rd_assert(covnr >= 0 && covnr <= RD_UT_COVNR_MAX); rd_atomic64_add(&rd_ut_covnrs[covnr], 1); } int64_t rd_ut_coverage_check(const char *file, const char *func, int line, int covnr) { int64_t r; rd_assert(covnr >= 0 && covnr <= RD_UT_COVNR_MAX); r = rd_atomic64_get(&rd_ut_covnrs[covnr]); if (!r) { fprintf(stderr, "\033[31m" "RDUT: FAIL: %s:%d: %s: " "Code coverage nr %d: FAIL: " "code path not executed: " "perform `grep -RnF 'COVERAGE(%d)' src/` to find " "source location" "\033[0m\n", file, line, func, covnr, covnr); if (rd_unittest_assert_on_failure) rd_assert(!*"unittest failure"); return 0; } fprintf(stderr, "\033[34mRDUT: CCOV: %s:%d: %s: Code coverage nr %d: " "PASS (%" PRId64 " code path execution(s))\033[0m\n", file, line, func, covnr, r); return r; } /**@}*/ #endif /* ENABLE_CODECOV */ /** * @name Test rdsysqueue.h / queue.h * @{ */ struct ut_tq { TAILQ_ENTRY(ut_tq) link; int v; }; TAILQ_HEAD(ut_tq_head, ut_tq); struct ut_tq_args { const char *name; /**< Descriptive test name */ struct { int base; /**< Base value */ int cnt; /**< Number of elements to add */ int step; /**< Value step */ } q[3]; /**< Queue element definition */ int qcnt; /**< Number of defs in .q */ int exp[16]; /**< Expected value order after join */ }; /** * @brief Find the previous element (insert position) for * value \p val in list \p head or NULL if \p val is less than * the first element in \p head. * @remarks \p head must be ascending sorted. */ static struct ut_tq *ut_tq_find_prev_pos(const struct ut_tq_head *head, int val) { struct ut_tq *e, *prev = NULL; TAILQ_FOREACH(e, head, link) { if (e->v > val) return prev; prev = e; } return prev; } static int ut_tq_test(const struct ut_tq_args *args) { int totcnt = 0; int fails = 0; struct ut_tq_head *tqh[3] = {NULL, NULL, NULL}; struct ut_tq *e, *insert_after; int i, qi; RD_UT_SAY("Testing TAILQ: %s", args->name); /* * Verify TAILQ_INSERT_LIST: * For each insert position test: * - create two lists: tqh 0 and 1 * - add entries to both lists * - insert list 1 into 0 * - verify expected order and correctness */ /* Use heap allocated heads to let valgrind/asan assist * in detecting corruption. */ for (qi = 0; qi < args->qcnt; qi++) { tqh[qi] = rd_calloc(1, sizeof(*tqh[qi])); TAILQ_INIT(tqh[qi]); for (i = 0; i < args->q[qi].cnt; i++) { e = rd_malloc(sizeof(*e)); e->v = args->q[qi].base + (i * args->q[qi].step); TAILQ_INSERT_TAIL(tqh[qi], e, link); } totcnt += args->q[qi].cnt; } for (qi = 1; qi < args->qcnt; qi++) { insert_after = ut_tq_find_prev_pos(tqh[0], args->q[qi].base); if (!insert_after) { /* Insert position is head of list, * do two-step concat+move */ TAILQ_PREPEND(tqh[0], tqh[qi], ut_tq_head, link); } else { TAILQ_INSERT_LIST(tqh[0], insert_after, tqh[qi], ut_tq_head, struct ut_tq *, link); } RD_UT_ASSERT(TAILQ_EMPTY(tqh[qi]), "expected empty tqh[%d]", qi); RD_UT_ASSERT(!TAILQ_EMPTY(tqh[0]), "expected non-empty tqh[0]"); memset(tqh[qi], (int)'A', sizeof(*tqh[qi])); rd_free(tqh[qi]); } RD_UT_ASSERT(TAILQ_LAST(tqh[0], ut_tq_head)->v == args->exp[totcnt - 1], "TAILQ_LAST val %d, expected %d", TAILQ_LAST(tqh[0], ut_tq_head)->v, args->exp[totcnt - 1]); /* Add sentinel value to verify that INSERT_TAIL works * after INSERT_LIST */ e = rd_malloc(sizeof(*e)); e->v = 99; TAILQ_INSERT_TAIL(tqh[0], e, link); totcnt++; i = 0; TAILQ_FOREACH(e, tqh[0], link) { if (i >= totcnt) { RD_UT_WARN( "Too many elements in list tqh[0]: " "idx %d > totcnt %d: element %p (value %d)", i, totcnt, e, e->v); fails++; } else if (e->v != args->exp[i]) { RD_UT_WARN( "Element idx %d/%d in tqh[0] has value %d, " "expected %d", i, totcnt, e->v, args->exp[i]); fails++; } else if (i == totcnt - 1 && e != TAILQ_LAST(tqh[0], ut_tq_head)) { RD_UT_WARN("TAILQ_LAST == %p, expected %p", TAILQ_LAST(tqh[0], ut_tq_head), e); fails++; } i++; } /* Then scan it in reverse */ i = totcnt - 1; TAILQ_FOREACH_REVERSE(e, tqh[0], ut_tq_head, link) { if (i < 0) { RD_UT_WARN( "REVERSE: Too many elements in list tqh[0]: " "idx %d < 0: element %p (value %d)", i, e, e->v); fails++; } else if (e->v != args->exp[i]) { RD_UT_WARN( "REVERSE: Element idx %d/%d in tqh[0] has " "value %d, expected %d", i, totcnt, e->v, args->exp[i]); fails++; } else if (i == totcnt - 1 && e != TAILQ_LAST(tqh[0], ut_tq_head)) { RD_UT_WARN("REVERSE: TAILQ_LAST == %p, expected %p", TAILQ_LAST(tqh[0], ut_tq_head), e); fails++; } i--; } RD_UT_ASSERT(TAILQ_LAST(tqh[0], ut_tq_head)->v == args->exp[totcnt - 1], "TAILQ_LAST val %d, expected %d", TAILQ_LAST(tqh[0], ut_tq_head)->v, args->exp[totcnt - 1]); while ((e = TAILQ_FIRST(tqh[0]))) { TAILQ_REMOVE(tqh[0], e, link); rd_free(e); } rd_free(tqh[0]); return fails; } static int unittest_sysqueue(void) { const struct ut_tq_args args[] = { {"empty tqh[0]", {{0, 0, 0}, {0, 3, 1}}, 2, {0, 1, 2, 99 /*sentinel*/}}, {"prepend 1,0", {{10, 3, 1}, {0, 3, 1}}, 2, {0, 1, 2, 10, 11, 12, 99}}, {"prepend 2,1,0", { {10, 3, 1}, /* 10, 11, 12 */ {5, 3, 1}, /* 5, 6, 7 */ {0, 2, 1} /* 0, 1 */ }, 3, {0, 1, 5, 6, 7, 10, 11, 12, 99}}, {"insert 1", {{0, 3, 2}, {1, 2, 2}}, 2, {0, 1, 3, 2, 4, 99}}, {"insert 1,2", { {0, 3, 3}, /* 0, 3, 6 */ {1, 2, 3}, /* 1, 4 */ {2, 1, 3} /* 2 */ }, 3, {0, 1, 2, 4, 3, 6, 99}}, {"append 1", {{0, 5, 1}, {5, 5, 1}}, 2, {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 99}}, {"append 1,2", { {0, 5, 1}, /* 0, 1, 2, 3, 4 */ {5, 5, 1}, /* 5, 6, 7, 8, 9 */ {11, 2, 1} /* 11, 12 */ }, 3, {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 99}}, { "insert 1,0,2", { {5, 3, 1}, /* 5, 6, 7 */ {0, 1, 1}, /* 0 */ {10, 2, 1} /* 10, 11 */ }, 3, {0, 5, 6, 7, 10, 11, 99}, }, { "insert 2,0,1", { {5, 3, 1}, /* 5, 6, 7 */ {10, 2, 1}, /* 10, 11 */ {0, 1, 1} /* 0 */ }, 3, {0, 5, 6, 7, 10, 11, 99}, }, {NULL}}; int i; int fails = 0; for (i = 0; args[i].name != NULL; i++) fails += ut_tq_test(&args[i]); RD_UT_ASSERT(!fails, "See %d previous failure(s)", fails); RD_UT_PASS(); } /**@}*/ /** * @name rd_clock() unittests * @{ */ #if RD_UNITTEST_QPC_OVERRIDES /** * These values are based off a machine with freq 14318180 * which would cause the original rd_clock() calculation to overflow * after about 8 days. * Details: * https://github.com/confluentinc/confluent-kafka-dotnet/issues/603#issuecomment-417274540 */ static const int64_t rd_ut_qpc_freq = 14318180; static int64_t rd_ut_qpc_now; BOOL rd_ut_QueryPerformanceFrequency(_Out_ LARGE_INTEGER *lpFrequency) { lpFrequency->QuadPart = rd_ut_qpc_freq; return TRUE; } BOOL rd_ut_QueryPerformanceCounter(_Out_ LARGE_INTEGER *lpPerformanceCount) { lpPerformanceCount->QuadPart = rd_ut_qpc_now * rd_ut_qpc_freq; return TRUE; } static int unittest_rdclock(void) { rd_ts_t t1, t2; /* First let "uptime" be fresh boot (0). */ rd_ut_qpc_now = 0; t1 = rd_clock(); rd_ut_qpc_now++; t2 = rd_clock(); RD_UT_ASSERT(t2 == t1 + (1 * 1000000), "Expected t2 %" PRId64 " to be 1s more than t1 %" PRId64, t2, t1); /* Then skip forward to 8 days, which should trigger the * overflow in a faulty implementation. */ rd_ut_qpc_now = 8 * 86400; t2 = rd_clock(); RD_UT_ASSERT(t2 == t1 + (8LL * 86400 * 1000000), "Expected t2 %" PRId64 " to be 8 days larger than t1 %" PRId64, t2, t1); /* And make sure we can run on a system with 38 years of uptime.. */ rd_ut_qpc_now = 38 * 365 * 86400; t2 = rd_clock(); RD_UT_ASSERT(t2 == t1 + (38LL * 365 * 86400 * 1000000), "Expected t2 %" PRId64 " to be 38 years larger than t1 %" PRId64, t2, t1); RD_UT_PASS(); } #endif /**@}*/ extern int unittest_string(void); extern int unittest_cgrp(void); #if WITH_SASL_SCRAM extern int unittest_scram(void); #endif extern int unittest_assignors(void); extern int unittest_map(void); #if WITH_CURL extern int unittest_http(void); #endif #if WITH_OAUTHBEARER_OIDC extern int unittest_sasl_oauthbearer_oidc(void); #endif int rd_unittest(void) { int fails = 0; const struct { const char *name; int (*call)(void); } unittests[] = { {"sysqueue", unittest_sysqueue}, {"string", unittest_string}, {"map", unittest_map}, {"rdbuf", unittest_rdbuf}, {"rdvarint", unittest_rdvarint}, {"crc32c", unittest_rd_crc32c}, {"msg", unittest_msg}, {"murmurhash", unittest_murmur2}, {"fnv1a", unittest_fnv1a}, #if WITH_HDRHISTOGRAM {"rdhdrhistogram", unittest_rdhdrhistogram}, #endif #ifdef _WIN32 {"rdclock", unittest_rdclock}, #endif {"conf", unittest_conf}, {"broker", unittest_broker}, {"request", unittest_request}, #if WITH_SASL_OAUTHBEARER {"sasl_oauthbearer", unittest_sasl_oauthbearer}, #endif {"aborted_txns", unittest_aborted_txns}, {"cgrp", unittest_cgrp}, #if WITH_SASL_SCRAM {"scram", unittest_scram}, #endif {"assignors", unittest_assignors}, #if WITH_CURL {"http", unittest_http}, #endif #if WITH_OAUTHBEARER_OIDC {"sasl_oauthbearer_oidc", unittest_sasl_oauthbearer_oidc}, #endif {NULL} }; int i; const char *match = rd_getenv("RD_UT_TEST", NULL); int cnt = 0; if (rd_getenv("RD_UT_ASSERT", NULL)) rd_unittest_assert_on_failure = rd_true; if (rd_getenv("CI", NULL)) { RD_UT_SAY("Unittests running on CI"); rd_unittest_on_ci = rd_true; } if (rd_unittest_on_ci || (ENABLE_DEVEL + 0)) { RD_UT_SAY("Unittests will not error out on slow CPUs"); rd_unittest_slow = rd_true; } rd_kafka_global_init(); #if ENABLE_CODECOV for (i = 0; i < RD_UT_COVNR_MAX + 1; i++) rd_atomic64_init(&rd_ut_covnrs[i], 0); #endif for (i = 0; unittests[i].name; i++) { int f; if (match && !strstr(unittests[i].name, match)) continue; f = unittests[i].call(); RD_UT_SAY("unittest: %s: %4s\033[0m", unittests[i].name, f ? "\033[31mFAIL" : "\033[32mPASS"); fails += f; cnt++; } #if ENABLE_CODECOV #if FIXME /* This check only works if all tests that use coverage checks \ * are run, which we can't really know, so disable until we \ * know what to do with this. */ if (!match) { /* Verify all code paths were covered */ int cov_fails = 0; for (i = 0; i < RD_UT_COVNR_MAX + 1; i++) { if (!RD_UT_COVERAGE_CHECK(i)) cov_fails++; } if (cov_fails > 0) RD_UT_SAY("%d code coverage failure(s) (ignored)\n", cov_fails); } #endif #endif if (!cnt && match) RD_UT_WARN("No unittests matching \"%s\"", match); return fails; }