diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdunittest.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdunittest.c | 529 |
1 files changed, 529 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdunittest.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdunittest.c new file mode 100644 index 000000000..aa14b6aa8 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdunittest.c @@ -0,0 +1,529 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2017 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifdef _WIN32 +#define RD_UNITTEST_QPC_OVERRIDES 1 +#endif + +#include "rd.h" +#include "rdunittest.h" + +#include "rdvarint.h" +#include "rdbuf.h" +#include "crc32c.h" +#include "rdmurmur2.h" +#include "rdfnv1a.h" +#if WITH_HDRHISTOGRAM +#include "rdhdrhistogram.h" +#endif +#include "rdkafka_int.h" +#include "rdkafka_broker.h" +#include "rdkafka_request.h" + +#include "rdsysqueue.h" +#include "rdkafka_sasl_oauthbearer.h" +#if WITH_OAUTHBEARER_OIDC +#include "rdkafka_sasl_oauthbearer_oidc.h" +#endif +#include "rdkafka_msgset.h" +#include "rdkafka_txnmgr.h" + +rd_bool_t rd_unittest_assert_on_failure = rd_false; +rd_bool_t rd_unittest_on_ci = rd_false; +rd_bool_t rd_unittest_slow = rd_false; + +#if ENABLE_CODECOV +/** + * @name Code coverage + * @{ + */ + +static rd_atomic64_t rd_ut_covnrs[RD_UT_COVNR_MAX + 1]; + +void rd_ut_coverage(const char *file, const char *func, int line, int covnr) { + rd_assert(covnr >= 0 && covnr <= RD_UT_COVNR_MAX); + rd_atomic64_add(&rd_ut_covnrs[covnr], 1); +} + + +int64_t +rd_ut_coverage_check(const char *file, const char *func, int line, int covnr) { + int64_t r; + + rd_assert(covnr >= 0 && covnr <= RD_UT_COVNR_MAX); + + r = rd_atomic64_get(&rd_ut_covnrs[covnr]); + + if (!r) { + fprintf(stderr, + "\033[31m" + "RDUT: FAIL: %s:%d: %s: " + "Code coverage nr %d: FAIL: " + "code path not executed: " + "perform `grep -RnF 'COVERAGE(%d)' src/` to find " + "source location" + "\033[0m\n", + file, line, func, covnr, covnr); + if (rd_unittest_assert_on_failure) + rd_assert(!*"unittest failure"); + return 0; + } + + fprintf(stderr, + "\033[34mRDUT: CCOV: %s:%d: %s: Code coverage nr %d: " + "PASS (%" PRId64 " code path execution(s))\033[0m\n", + file, line, func, covnr, r); + + return r; +} +/**@}*/ + +#endif /* ENABLE_CODECOV */ + + +/** + * @name Test rdsysqueue.h / queue.h + * @{ + */ + +struct ut_tq { + TAILQ_ENTRY(ut_tq) link; + int v; +}; + +TAILQ_HEAD(ut_tq_head, ut_tq); + +struct ut_tq_args { + const char *name; /**< Descriptive test name */ + struct { + int base; /**< Base value */ + int cnt; /**< Number of elements to add */ + int step; /**< Value step */ + } q[3]; /**< Queue element definition */ + int qcnt; /**< Number of defs in .q */ + int exp[16]; /**< Expected value order after join */ +}; + +/** + * @brief Find the previous element (insert position) for + * value \p val in list \p head or NULL if \p val is less than + * the first element in \p head. + * @remarks \p head must be ascending sorted. + */ +static struct ut_tq *ut_tq_find_prev_pos(const struct ut_tq_head *head, + int val) { + struct ut_tq *e, *prev = NULL; + + TAILQ_FOREACH(e, head, link) { + if (e->v > val) + return prev; + prev = e; + } + + return prev; +} + +static int ut_tq_test(const struct ut_tq_args *args) { + int totcnt = 0; + int fails = 0; + struct ut_tq_head *tqh[3] = {NULL, NULL, NULL}; + struct ut_tq *e, *insert_after; + int i, qi; + + RD_UT_SAY("Testing TAILQ: %s", args->name); + + /* + * Verify TAILQ_INSERT_LIST: + * For each insert position test: + * - create two lists: tqh 0 and 1 + * - add entries to both lists + * - insert list 1 into 0 + * - verify expected order and correctness + */ + + /* Use heap allocated heads to let valgrind/asan assist + * in detecting corruption. */ + + for (qi = 0; qi < args->qcnt; qi++) { + tqh[qi] = rd_calloc(1, sizeof(*tqh[qi])); + TAILQ_INIT(tqh[qi]); + + for (i = 0; i < args->q[qi].cnt; i++) { + e = rd_malloc(sizeof(*e)); + e->v = args->q[qi].base + (i * args->q[qi].step); + TAILQ_INSERT_TAIL(tqh[qi], e, link); + } + + totcnt += args->q[qi].cnt; + } + + for (qi = 1; qi < args->qcnt; qi++) { + insert_after = ut_tq_find_prev_pos(tqh[0], args->q[qi].base); + if (!insert_after) { + /* Insert position is head of list, + * do two-step concat+move */ + TAILQ_PREPEND(tqh[0], tqh[qi], ut_tq_head, link); + } else { + TAILQ_INSERT_LIST(tqh[0], insert_after, tqh[qi], + ut_tq_head, struct ut_tq *, link); + } + + RD_UT_ASSERT(TAILQ_EMPTY(tqh[qi]), "expected empty tqh[%d]", + qi); + RD_UT_ASSERT(!TAILQ_EMPTY(tqh[0]), "expected non-empty tqh[0]"); + + memset(tqh[qi], (int)'A', sizeof(*tqh[qi])); + rd_free(tqh[qi]); + } + + RD_UT_ASSERT(TAILQ_LAST(tqh[0], ut_tq_head)->v == args->exp[totcnt - 1], + "TAILQ_LAST val %d, expected %d", + TAILQ_LAST(tqh[0], ut_tq_head)->v, args->exp[totcnt - 1]); + + /* Add sentinel value to verify that INSERT_TAIL works + * after INSERT_LIST */ + e = rd_malloc(sizeof(*e)); + e->v = 99; + TAILQ_INSERT_TAIL(tqh[0], e, link); + totcnt++; + + i = 0; + TAILQ_FOREACH(e, tqh[0], link) { + if (i >= totcnt) { + RD_UT_WARN( + "Too many elements in list tqh[0]: " + "idx %d > totcnt %d: element %p (value %d)", + i, totcnt, e, e->v); + fails++; + } else if (e->v != args->exp[i]) { + RD_UT_WARN( + "Element idx %d/%d in tqh[0] has value %d, " + "expected %d", + i, totcnt, e->v, args->exp[i]); + fails++; + } else if (i == totcnt - 1 && + e != TAILQ_LAST(tqh[0], ut_tq_head)) { + RD_UT_WARN("TAILQ_LAST == %p, expected %p", + TAILQ_LAST(tqh[0], ut_tq_head), e); + fails++; + } + i++; + } + + /* Then scan it in reverse */ + i = totcnt - 1; + TAILQ_FOREACH_REVERSE(e, tqh[0], ut_tq_head, link) { + if (i < 0) { + RD_UT_WARN( + "REVERSE: Too many elements in list tqh[0]: " + "idx %d < 0: element %p (value %d)", + i, e, e->v); + fails++; + } else if (e->v != args->exp[i]) { + RD_UT_WARN( + "REVERSE: Element idx %d/%d in tqh[0] has " + "value %d, expected %d", + i, totcnt, e->v, args->exp[i]); + fails++; + } else if (i == totcnt - 1 && + e != TAILQ_LAST(tqh[0], ut_tq_head)) { + RD_UT_WARN("REVERSE: TAILQ_LAST == %p, expected %p", + TAILQ_LAST(tqh[0], ut_tq_head), e); + fails++; + } + i--; + } + + RD_UT_ASSERT(TAILQ_LAST(tqh[0], ut_tq_head)->v == args->exp[totcnt - 1], + "TAILQ_LAST val %d, expected %d", + TAILQ_LAST(tqh[0], ut_tq_head)->v, args->exp[totcnt - 1]); + + while ((e = TAILQ_FIRST(tqh[0]))) { + TAILQ_REMOVE(tqh[0], e, link); + rd_free(e); + } + + rd_free(tqh[0]); + + return fails; +} + + +static int unittest_sysqueue(void) { + const struct ut_tq_args args[] = { + {"empty tqh[0]", + {{0, 0, 0}, {0, 3, 1}}, + 2, + {0, 1, 2, 99 /*sentinel*/}}, + {"prepend 1,0", + {{10, 3, 1}, {0, 3, 1}}, + 2, + {0, 1, 2, 10, 11, 12, 99}}, + {"prepend 2,1,0", + { + {10, 3, 1}, /* 10, 11, 12 */ + {5, 3, 1}, /* 5, 6, 7 */ + {0, 2, 1} /* 0, 1 */ + }, + 3, + {0, 1, 5, 6, 7, 10, 11, 12, 99}}, + {"insert 1", {{0, 3, 2}, {1, 2, 2}}, 2, {0, 1, 3, 2, 4, 99}}, + {"insert 1,2", + { + {0, 3, 3}, /* 0, 3, 6 */ + {1, 2, 3}, /* 1, 4 */ + {2, 1, 3} /* 2 */ + }, + 3, + {0, 1, 2, 4, 3, 6, 99}}, + {"append 1", + {{0, 5, 1}, {5, 5, 1}}, + 2, + {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 99}}, + {"append 1,2", + { + {0, 5, 1}, /* 0, 1, 2, 3, 4 */ + {5, 5, 1}, /* 5, 6, 7, 8, 9 */ + {11, 2, 1} /* 11, 12 */ + }, + 3, + {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 99}}, + { + "insert 1,0,2", + { + {5, 3, 1}, /* 5, 6, 7 */ + {0, 1, 1}, /* 0 */ + {10, 2, 1} /* 10, 11 */ + }, + 3, + {0, 5, 6, 7, 10, 11, 99}, + }, + { + "insert 2,0,1", + { + {5, 3, 1}, /* 5, 6, 7 */ + {10, 2, 1}, /* 10, 11 */ + {0, 1, 1} /* 0 */ + }, + 3, + {0, 5, 6, 7, 10, 11, 99}, + }, + {NULL}}; + int i; + int fails = 0; + + for (i = 0; args[i].name != NULL; i++) + fails += ut_tq_test(&args[i]); + + RD_UT_ASSERT(!fails, "See %d previous failure(s)", fails); + + RD_UT_PASS(); +} + +/**@}*/ + + +/** + * @name rd_clock() unittests + * @{ + */ + +#if RD_UNITTEST_QPC_OVERRIDES + +/** + * These values are based off a machine with freq 14318180 + * which would cause the original rd_clock() calculation to overflow + * after about 8 days. + * Details: + * https://github.com/confluentinc/confluent-kafka-dotnet/issues/603#issuecomment-417274540 + */ + +static const int64_t rd_ut_qpc_freq = 14318180; +static int64_t rd_ut_qpc_now; + +BOOL rd_ut_QueryPerformanceFrequency(_Out_ LARGE_INTEGER *lpFrequency) { + lpFrequency->QuadPart = rd_ut_qpc_freq; + return TRUE; +} + +BOOL rd_ut_QueryPerformanceCounter(_Out_ LARGE_INTEGER *lpPerformanceCount) { + lpPerformanceCount->QuadPart = rd_ut_qpc_now * rd_ut_qpc_freq; + return TRUE; +} + +static int unittest_rdclock(void) { + rd_ts_t t1, t2; + + /* First let "uptime" be fresh boot (0). */ + rd_ut_qpc_now = 0; + t1 = rd_clock(); + rd_ut_qpc_now++; + t2 = rd_clock(); + RD_UT_ASSERT(t2 == t1 + (1 * 1000000), + "Expected t2 %" PRId64 " to be 1s more than t1 %" PRId64, + t2, t1); + + /* Then skip forward to 8 days, which should trigger the + * overflow in a faulty implementation. */ + rd_ut_qpc_now = 8 * 86400; + t2 = rd_clock(); + RD_UT_ASSERT(t2 == t1 + (8LL * 86400 * 1000000), + "Expected t2 %" PRId64 + " to be 8 days larger than t1 %" PRId64, + t2, t1); + + /* And make sure we can run on a system with 38 years of uptime.. */ + rd_ut_qpc_now = 38 * 365 * 86400; + t2 = rd_clock(); + RD_UT_ASSERT(t2 == t1 + (38LL * 365 * 86400 * 1000000), + "Expected t2 %" PRId64 + " to be 38 years larger than t1 %" PRId64, + t2, t1); + + RD_UT_PASS(); +} +#endif + + + +/**@}*/ + +extern int unittest_string(void); +extern int unittest_cgrp(void); +#if WITH_SASL_SCRAM +extern int unittest_scram(void); +#endif +extern int unittest_assignors(void); +extern int unittest_map(void); +#if WITH_CURL +extern int unittest_http(void); +#endif +#if WITH_OAUTHBEARER_OIDC +extern int unittest_sasl_oauthbearer_oidc(void); +#endif + +int rd_unittest(void) { + int fails = 0; + const struct { + const char *name; + int (*call)(void); + } unittests[] = { + {"sysqueue", unittest_sysqueue}, + {"string", unittest_string}, + {"map", unittest_map}, + {"rdbuf", unittest_rdbuf}, + {"rdvarint", unittest_rdvarint}, + {"crc32c", unittest_rd_crc32c}, + {"msg", unittest_msg}, + {"murmurhash", unittest_murmur2}, + {"fnv1a", unittest_fnv1a}, +#if WITH_HDRHISTOGRAM + {"rdhdrhistogram", unittest_rdhdrhistogram}, +#endif +#ifdef _WIN32 + {"rdclock", unittest_rdclock}, +#endif + {"conf", unittest_conf}, + {"broker", unittest_broker}, + {"request", unittest_request}, +#if WITH_SASL_OAUTHBEARER + {"sasl_oauthbearer", unittest_sasl_oauthbearer}, +#endif + {"aborted_txns", unittest_aborted_txns}, + {"cgrp", unittest_cgrp}, +#if WITH_SASL_SCRAM + {"scram", unittest_scram}, +#endif + {"assignors", unittest_assignors}, +#if WITH_CURL + {"http", unittest_http}, +#endif +#if WITH_OAUTHBEARER_OIDC + {"sasl_oauthbearer_oidc", unittest_sasl_oauthbearer_oidc}, +#endif + {NULL} + }; + int i; + const char *match = rd_getenv("RD_UT_TEST", NULL); + int cnt = 0; + + if (rd_getenv("RD_UT_ASSERT", NULL)) + rd_unittest_assert_on_failure = rd_true; + if (rd_getenv("CI", NULL)) { + RD_UT_SAY("Unittests running on CI"); + rd_unittest_on_ci = rd_true; + } + + if (rd_unittest_on_ci || (ENABLE_DEVEL + 0)) { + RD_UT_SAY("Unittests will not error out on slow CPUs"); + rd_unittest_slow = rd_true; + } + + rd_kafka_global_init(); + +#if ENABLE_CODECOV + for (i = 0; i < RD_UT_COVNR_MAX + 1; i++) + rd_atomic64_init(&rd_ut_covnrs[i], 0); +#endif + + for (i = 0; unittests[i].name; i++) { + int f; + + if (match && !strstr(unittests[i].name, match)) + continue; + + f = unittests[i].call(); + RD_UT_SAY("unittest: %s: %4s\033[0m", unittests[i].name, + f ? "\033[31mFAIL" : "\033[32mPASS"); + fails += f; + cnt++; + } + +#if ENABLE_CODECOV +#if FIXME /* This check only works if all tests that use coverage checks \ + * are run, which we can't really know, so disable until we \ + * know what to do with this. */ + if (!match) { + /* Verify all code paths were covered */ + int cov_fails = 0; + for (i = 0; i < RD_UT_COVNR_MAX + 1; i++) { + if (!RD_UT_COVERAGE_CHECK(i)) + cov_fails++; + } + if (cov_fails > 0) + RD_UT_SAY("%d code coverage failure(s) (ignored)\n", + cov_fails); + } +#endif +#endif + + if (!cnt && match) + RD_UT_WARN("No unittests matching \"%s\"", match); + + return fails; +} |