summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/tests/0004-conf.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/0004-conf.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/tests/0004-conf.c865
1 files changed, 865 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/0004-conf.c b/fluent-bit/lib/librdkafka-2.1.0/tests/0004-conf.c
new file mode 100644
index 000000000..51401e17d
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/tests/0004-conf.c
@@ -0,0 +1,865 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2013, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * Tests various config related things
+ */
+
+
+#include "test.h"
+
+/* Typical include path would be <librdkafka/rdkafka.h>, but this program
+ * is built from within the librdkafka source tree and thus differs. */
+#include "rdkafka.h" /* for Kafka driver */
+
+
+
+static void dr_cb(rd_kafka_t *rk,
+ void *payload,
+ size_t len,
+ rd_kafka_resp_err_t err,
+ void *opaque,
+ void *msg_opaque) {
+}
+
+static void
+error_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque) {
+}
+
+
+static int32_t partitioner(const rd_kafka_topic_t *rkt,
+ const void *keydata,
+ size_t keylen,
+ int32_t partition_cnt,
+ void *rkt_opaque,
+ void *msg_opaque) {
+ return 0;
+}
+
+
+static void
+conf_verify(int line, const char **arr, size_t cnt, const char **confs) {
+ int i, j;
+
+
+ for (i = 0; confs[i]; i += 2) {
+ for (j = 0; j < (int)cnt; j += 2) {
+ if (!strcmp(confs[i], arr[j])) {
+ if (strcmp(confs[i + 1], arr[j + 1]))
+ TEST_FAIL(
+ "%i: Property %s mismatch: "
+ "expected %s != retrieved %s",
+ line, confs[i], confs[i + 1],
+ arr[j + 1]);
+ }
+ if (j == (int)cnt)
+ TEST_FAIL(
+ "%i: "
+ "Property %s not found in config\n",
+ line, confs[i]);
+ }
+ }
+}
+
+
+static void conf_cmp(const char *desc,
+ const char **a,
+ size_t acnt,
+ const char **b,
+ size_t bcnt) {
+ int i;
+
+ if (acnt != bcnt)
+ TEST_FAIL("%s config compare: count %" PRIusz " != %" PRIusz
+ " mismatch",
+ desc, acnt, bcnt);
+
+ for (i = 0; i < (int)acnt; i += 2) {
+ if (strcmp(a[i], b[i]))
+ TEST_FAIL("%s conf mismatch: %s != %s", desc, a[i],
+ b[i]);
+ else if (strcmp(a[i + 1], b[i + 1])) {
+ /* The default_topic_conf will be auto-created
+ * when global->topic fallthru is used, so its
+ * value will not match here. */
+ if (!strcmp(a[i], "default_topic_conf"))
+ continue;
+ TEST_FAIL("%s conf value mismatch for %s: %s != %s",
+ desc, a[i], a[i + 1], b[i + 1]);
+ }
+ }
+}
+
+
+/**
+ * @brief Not called, just used for config
+ */
+static int on_new_call_cnt;
+static rd_kafka_resp_err_t my_on_new(rd_kafka_t *rk,
+ const rd_kafka_conf_t *conf,
+ void *ic_opaque,
+ char *errstr,
+ size_t errstr_size) {
+ TEST_SAY("%s: on_new() called\n", rd_kafka_name(rk));
+ on_new_call_cnt++;
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+
+/**
+ * @brief When rd_kafka_new() succeeds it takes ownership of the config object,
+ * but when it fails the config object remains in application custody.
+ * These tests makes sure that's the case (preferably run with valgrind)
+ */
+static void do_test_kafka_new_failures(void) {
+ rd_kafka_conf_t *conf;
+ rd_kafka_t *rk;
+ char errstr[512];
+
+ conf = rd_kafka_conf_new();
+
+ rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
+ TEST_ASSERT(rk, "kafka_new() failed: %s", errstr);
+ rd_kafka_destroy(rk);
+
+ /* Set an erroneous configuration value that is not checked
+ * by conf_set() but by rd_kafka_new() */
+ conf = rd_kafka_conf_new();
+ if (rd_kafka_conf_set(conf, "partition.assignment.strategy",
+ "range,thiswillfail", errstr,
+ sizeof(errstr)) != RD_KAFKA_CONF_OK)
+ TEST_FAIL("%s", errstr);
+
+ rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
+ TEST_ASSERT(!rk, "kafka_new() should have failed");
+
+ /* config object should still belong to us,
+ * correct the erroneous config and try again. */
+ if (rd_kafka_conf_set(conf, "partition.assignment.strategy", NULL,
+ errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
+ TEST_FAIL("%s", errstr);
+
+ rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
+ TEST_ASSERT(rk, "kafka_new() failed: %s", errstr);
+ rd_kafka_destroy(rk);
+
+ /* set conflicting properties */
+ conf = rd_kafka_conf_new();
+ test_conf_set(conf, "acks", "1");
+ test_conf_set(conf, "enable.idempotence", "true");
+ rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
+ TEST_ASSERT(!rk, "kafka_new() should have failed");
+ rd_kafka_conf_destroy(conf);
+ TEST_SAY(_C_GRN "Ok: %s\n", errstr);
+}
+
+
+/**
+ * @brief Verify that INVALID properties (such as for Java SSL properties)
+ * work, as well as INTERNAL properties.
+ */
+static void do_test_special_invalid_conf(void) {
+ rd_kafka_conf_t *conf;
+ char errstr[512];
+ rd_kafka_conf_res_t res;
+
+ conf = rd_kafka_conf_new();
+
+ res = rd_kafka_conf_set(conf, "ssl.truststore.location", "abc", errstr,
+ sizeof(errstr));
+ /* Existing apps might not print the error string when conf_set
+ * returns UNKNOWN, only on INVALID, so make sure that is
+ * what is being returned. */
+ TEST_ASSERT(res == RD_KAFKA_CONF_INVALID,
+ "expected ssl.truststore.location to fail with INVALID, "
+ "not %d",
+ res);
+ /* Make sure there is a link to documentation */
+ TEST_ASSERT(strstr(errstr, "http"),
+ "expected ssl.truststore.location to provide link to "
+ "documentation, not \"%s\"",
+ errstr);
+ TEST_SAY(_C_GRN "Ok: %s\n" _C_CLR, errstr);
+
+
+ res = rd_kafka_conf_set(conf, "sasl.jaas.config", "abc", errstr,
+ sizeof(errstr));
+ /* Existing apps might not print the error string when conf_set
+ * returns UNKNOWN, only on INVALID, so make sure that is
+ * what is being returned. */
+ TEST_ASSERT(res == RD_KAFKA_CONF_INVALID,
+ "expected sasl.jaas.config to fail with INVALID, "
+ "not %d",
+ res);
+ /* Make sure there is a link to documentation */
+ TEST_ASSERT(strstr(errstr, "http"),
+ "expected sasl.jaas.config to provide link to "
+ "documentation, not \"%s\"",
+ errstr);
+ TEST_SAY(_C_GRN "Ok: %s\n" _C_CLR, errstr);
+
+
+ res = rd_kafka_conf_set(conf, "interceptors", "1", errstr,
+ sizeof(errstr));
+ TEST_ASSERT(res == RD_KAFKA_CONF_INVALID,
+ "expected interceptors to fail with INVALID, "
+ "not %d",
+ res);
+ TEST_SAY(_C_GRN "Ok: %s\n" _C_CLR, errstr);
+
+ rd_kafka_conf_destroy(conf);
+}
+
+
+/**
+ * @brief Verify idempotence configuration constraints
+ */
+static void do_test_idempotence_conf(void) {
+ static const struct {
+ const char *prop;
+ const char *val;
+ rd_bool_t topic_conf;
+ rd_bool_t exp_rk_fail;
+ rd_bool_t exp_rkt_fail;
+ } check[] = {{"acks", "1", rd_true, rd_false, rd_true},
+ {"acks", "all", rd_true, rd_false, rd_false},
+ {"queuing.strategy", "lifo", rd_true, rd_false, rd_true},
+ {NULL}};
+ int i;
+
+ for (i = 0; check[i].prop; i++) {
+ int j;
+
+ for (j = 0; j < 1 + (check[i].topic_conf ? 1 : 0); j++) {
+ /* j = 0: set on global config
+ * j = 1: set on topic config */
+ rd_kafka_conf_t *conf;
+ rd_kafka_topic_conf_t *tconf = NULL;
+ rd_kafka_t *rk;
+ rd_kafka_topic_t *rkt;
+ char errstr[512];
+
+ conf = rd_kafka_conf_new();
+ test_conf_set(conf, "enable.idempotence", "true");
+
+ if (j == 0)
+ test_conf_set(conf, check[i].prop,
+ check[i].val);
+
+
+ rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr,
+ sizeof(errstr));
+
+ if (!rk) {
+ /* default topic config (j=0) will fail. */
+ TEST_ASSERT(check[i].exp_rk_fail ||
+ (j == 0 &&
+ check[i].exp_rkt_fail &&
+ check[i].topic_conf),
+ "Did not expect config #%d.%d "
+ "to fail: %s",
+ i, j, errstr);
+
+ rd_kafka_conf_destroy(conf);
+ continue;
+
+ } else {
+ TEST_ASSERT(!check[i].exp_rk_fail,
+ "Expect config #%d.%d to fail", i,
+ j);
+ }
+
+ if (j == 1) {
+ tconf = rd_kafka_topic_conf_new();
+ test_topic_conf_set(tconf, check[i].prop,
+ check[i].val);
+ }
+
+ rkt = rd_kafka_topic_new(rk, "mytopic", tconf);
+ if (!rkt) {
+ TEST_ASSERT(
+ check[i].exp_rkt_fail,
+ "Did not expect topic config "
+ "#%d.%d to fail: %s",
+ i, j,
+ rd_kafka_err2str(rd_kafka_last_error()));
+
+
+ } else {
+ TEST_ASSERT(!check[i].exp_rkt_fail,
+ "Expect topic config "
+ "#%d.%d to fail",
+ i, j);
+ rd_kafka_topic_destroy(rkt);
+ }
+
+ rd_kafka_destroy(rk);
+ }
+ }
+}
+
+
+/**
+ * @brief Verify that configuration properties can be extract
+ * from the instance config object.
+ */
+static void do_test_instance_conf(void) {
+ rd_kafka_conf_t *conf;
+ const rd_kafka_conf_t *iconf;
+ rd_kafka_t *rk;
+ rd_kafka_conf_res_t res;
+ static const char *props[] = {
+ "linger.ms", "123", "group.id", "test1",
+ "enable.auto.commit", "false", NULL,
+ };
+ const char **p;
+
+ conf = rd_kafka_conf_new();
+
+ for (p = props; *p; p += 2) {
+ res = rd_kafka_conf_set(conf, *p, *(p + 1), NULL, 0);
+ TEST_ASSERT(res == RD_KAFKA_CONF_OK, "failed to set %s", *p);
+ }
+
+ rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, NULL, 0);
+ TEST_ASSERT(rk, "failed to create consumer");
+
+ iconf = rd_kafka_conf(rk);
+ TEST_ASSERT(conf, "failed to get instance config");
+
+ for (p = props; *p; p += 2) {
+ char dest[512];
+ size_t destsz = sizeof(dest);
+
+ res = rd_kafka_conf_get(iconf, *p, dest, &destsz);
+ TEST_ASSERT(res == RD_KAFKA_CONF_OK,
+ "failed to get %s: result %d", *p, res);
+
+ TEST_SAY("Instance config %s=%s\n", *p, dest);
+ TEST_ASSERT(!strcmp(*(p + 1), dest), "Expected %s=%s, not %s",
+ *p, *(p + 1), dest);
+ }
+
+ rd_kafka_destroy(rk);
+}
+
+
+/**
+ * @brief Verify that setting and retrieving the default topic config works.
+ */
+static void do_test_default_topic_conf(void) {
+ rd_kafka_conf_t *conf;
+ rd_kafka_topic_conf_t *tconf;
+ const char *val, *exp_val;
+
+ SUB_TEST_QUICK();
+
+ conf = rd_kafka_conf_new();
+
+ /* Set topic-level property, this will create the default topic config*/
+ exp_val = "1234";
+ test_conf_set(conf, "message.timeout.ms", exp_val);
+
+ /* Get the default topic config */
+ tconf = rd_kafka_conf_get_default_topic_conf(conf);
+ TEST_ASSERT(tconf != NULL, "");
+
+ /* Get value from global config by fall-thru */
+ val = test_conf_get(conf, "message.timeout.ms");
+ TEST_ASSERT(val && !strcmp(val, exp_val),
+ "Expected (conf) message.timeout.ms=%s, not %s", exp_val,
+ val ? val : "(NULL)");
+
+ /* Get value from default topic config */
+ val = test_topic_conf_get(tconf, "message.timeout.ms");
+ TEST_ASSERT(val && !strcmp(val, exp_val),
+ "Expected (topic conf) message.timeout.ms=%s, not %s",
+ exp_val, val ? val : "(NULL)");
+
+ /* Now change the value, should be reflected in both. */
+ exp_val = "4444";
+ test_topic_conf_set(tconf, "message.timeout.ms", exp_val);
+
+ /* Get value from global config by fall-thru */
+ val = test_conf_get(conf, "message.timeout.ms");
+ TEST_ASSERT(val && !strcmp(val, exp_val),
+ "Expected (conf) message.timeout.ms=%s, not %s", exp_val,
+ val ? val : "(NULL)");
+
+ /* Get value from default topic config */
+ val = test_topic_conf_get(tconf, "message.timeout.ms");
+ TEST_ASSERT(val && !strcmp(val, exp_val),
+ "Expected (topic conf) message.timeout.ms=%s, not %s",
+ exp_val, val ? val : "(NULL)");
+
+
+ rd_kafka_conf_destroy(conf);
+
+ SUB_TEST_PASS();
+}
+
+
+/**
+ * @brief Verify behaviour of checking that message.timeout.ms fits within
+ * configured linger.ms. By larry-cdn77.
+ */
+static void do_message_timeout_linger_checks(void) {
+ rd_kafka_conf_t *conf;
+ rd_kafka_topic_conf_t *tconf;
+ rd_kafka_t *rk;
+ char errstr[512];
+ int i;
+ const char values[7][3][40] = {
+ {"-", "-", "default and L and M"},
+ {"100", "-", "set L such that L<M"},
+ {"-", "300000", "set M such that L<M"},
+ {"100", "300000", "set L and M such that L<M"},
+ {"500000", "-", "!set L such that L>=M"},
+ {"-", "10", "set M such that L>=M"},
+ {"500000", "10", "!set L and M such that L>=M"}};
+
+ SUB_TEST_QUICK();
+
+ for (i = 0; i < 7; i++) {
+ const char *linger = values[i][0];
+ const char *msgtimeout = values[i][1];
+ const char *desc = values[i][2];
+ rd_bool_t expect_fail = *desc == '!';
+
+ if (expect_fail)
+ desc++; /* Push past the '!' */
+
+ conf = rd_kafka_conf_new();
+ tconf = rd_kafka_topic_conf_new();
+
+ if (*linger != '-')
+ test_conf_set(conf, "linger.ms", linger);
+
+ if (*msgtimeout != '-')
+ test_topic_conf_set(tconf, "message.timeout.ms",
+ msgtimeout);
+
+ rd_kafka_conf_set_default_topic_conf(conf, tconf);
+
+ rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr,
+ sizeof(errstr));
+
+ if (!rk)
+ TEST_SAY("#%d \"%s\": rd_kafka_new() failed: %s\n", i,
+ desc, errstr);
+ else
+ TEST_SAY("#%d \"%s\": rd_kafka_new() succeeded\n", i,
+ desc);
+
+ if (!expect_fail) {
+ TEST_ASSERT(rk != NULL,
+ "Expected success: "
+ "message timeout linger: %s: %s",
+ desc, errstr);
+
+ rd_kafka_destroy(rk);
+
+ } else {
+ TEST_ASSERT(rk == NULL,
+ "Expected failure: "
+ "message timeout linger: %s",
+ desc);
+
+ rd_kafka_conf_destroy(conf);
+ }
+ }
+
+ SUB_TEST_PASS();
+}
+
+
+int main_0004_conf(int argc, char **argv) {
+ rd_kafka_t *rk;
+ rd_kafka_topic_t *rkt;
+ rd_kafka_conf_t *ignore_conf, *conf, *conf2;
+ rd_kafka_topic_conf_t *ignore_topic_conf, *tconf, *tconf2;
+ char errstr[512];
+ rd_kafka_resp_err_t err;
+ const char **arr_orig, **arr_dup;
+ size_t cnt_orig, cnt_dup;
+ int i;
+ const char *topic;
+ static const char *gconfs[] = {
+ "message.max.bytes",
+ "12345", /* int property */
+ "client.id",
+ "my id", /* string property */
+ "debug",
+ "topic,metadata,interceptor", /* S2F property */
+ "topic.blacklist",
+ "__.*", /* #778 */
+ "auto.offset.reset",
+ "earliest", /* Global->Topic fallthru */
+#if WITH_ZLIB
+ "compression.codec",
+ "gzip", /* S2I property */
+#endif
+#if defined(_WIN32)
+ "ssl.ca.certificate.stores",
+ "Intermediate ,, Root ,",
+#endif
+ NULL
+ };
+ static const char *tconfs[] = {"request.required.acks",
+ "-1", /* int */
+ "auto.commit.enable",
+ "false", /* bool */
+ "auto.offset.reset",
+ "error", /* S2I */
+ "offset.store.path",
+ "my/path", /* string */
+ NULL};
+
+ test_conf_init(&ignore_conf, &ignore_topic_conf, 10);
+ rd_kafka_conf_destroy(ignore_conf);
+ rd_kafka_topic_conf_destroy(ignore_topic_conf);
+
+ topic = test_mk_topic_name("0004", 0);
+
+ /* Set up a global config object */
+ conf = rd_kafka_conf_new();
+
+ for (i = 0; gconfs[i]; i += 2) {
+ if (rd_kafka_conf_set(conf, gconfs[i], gconfs[i + 1], errstr,
+ sizeof(errstr)) != RD_KAFKA_CONF_OK)
+ TEST_FAIL("%s\n", errstr);
+ }
+
+ rd_kafka_conf_set_dr_cb(conf, dr_cb);
+ rd_kafka_conf_set_error_cb(conf, error_cb);
+ /* interceptor configs are not exposed as strings or in dumps
+ * so the dump verification step will not cover them, but valgrind
+ * will help track down memory leaks/use-after-free etc. */
+ err = rd_kafka_conf_interceptor_add_on_new(conf, "testic", my_on_new,
+ NULL);
+ TEST_ASSERT(!err, "add_on_new() failed: %s", rd_kafka_err2str(err));
+
+ /* Set up a topic config object */
+ tconf = rd_kafka_topic_conf_new();
+
+ rd_kafka_topic_conf_set_partitioner_cb(tconf, partitioner);
+ rd_kafka_topic_conf_set_opaque(tconf, (void *)0xbeef);
+
+ for (i = 0; tconfs[i]; i += 2) {
+ if (rd_kafka_topic_conf_set(tconf, tconfs[i], tconfs[i + 1],
+ errstr,
+ sizeof(errstr)) != RD_KAFKA_CONF_OK)
+ TEST_FAIL("%s\n", errstr);
+ }
+
+
+ /* Verify global config */
+ arr_orig = rd_kafka_conf_dump(conf, &cnt_orig);
+ conf_verify(__LINE__, arr_orig, cnt_orig, gconfs);
+
+ /* Verify copied global config */
+ conf2 = rd_kafka_conf_dup(conf);
+ arr_dup = rd_kafka_conf_dump(conf2, &cnt_dup);
+ conf_verify(__LINE__, arr_dup, cnt_dup, gconfs);
+ conf_cmp("global", arr_orig, cnt_orig, arr_dup, cnt_dup);
+ rd_kafka_conf_dump_free(arr_orig, cnt_orig);
+ rd_kafka_conf_dump_free(arr_dup, cnt_dup);
+
+ /* Verify topic config */
+ arr_orig = rd_kafka_topic_conf_dump(tconf, &cnt_orig);
+ conf_verify(__LINE__, arr_orig, cnt_orig, tconfs);
+
+ /* Verify copied topic config */
+ tconf2 = rd_kafka_topic_conf_dup(tconf);
+ arr_dup = rd_kafka_topic_conf_dump(tconf2, &cnt_dup);
+ conf_verify(__LINE__, arr_dup, cnt_dup, tconfs);
+ conf_cmp("topic", arr_orig, cnt_orig, arr_dup, cnt_dup);
+ rd_kafka_conf_dump_free(arr_orig, cnt_orig);
+ rd_kafka_conf_dump_free(arr_dup, cnt_dup);
+
+
+ /*
+ * Create kafka instances using original and copied confs
+ */
+
+ /* original */
+ TEST_ASSERT(on_new_call_cnt == 0, "expected 0 on_new call, not %d",
+ on_new_call_cnt);
+ on_new_call_cnt = 0;
+ rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
+ TEST_ASSERT(on_new_call_cnt == 1, "expected 1 on_new call, not %d",
+ on_new_call_cnt);
+
+ rkt = rd_kafka_topic_new(rk, topic, tconf);
+ if (!rkt)
+ TEST_FAIL("Failed to create topic: %s\n", rd_strerror(errno));
+
+ rd_kafka_topic_destroy(rkt);
+ rd_kafka_destroy(rk);
+
+ /* copied */
+ on_new_call_cnt = 0; /* interceptors are not copied. */
+ rk = test_create_handle(RD_KAFKA_PRODUCER, conf2);
+ TEST_ASSERT(on_new_call_cnt == 0, "expected 0 on_new call, not %d",
+ on_new_call_cnt);
+
+ rkt = rd_kafka_topic_new(rk, topic, tconf2);
+ if (!rkt)
+ TEST_FAIL("Failed to create topic: %s\n", rd_strerror(errno));
+ rd_kafka_topic_destroy(rkt);
+ rd_kafka_destroy(rk);
+
+
+ /* Incremental S2F property.
+ * NOTE: The order of fields returned in get() is hardcoded here. */
+ {
+ static const char *s2fs[] = {"generic,broker,queue,cgrp",
+ "generic,broker,queue,cgrp",
+
+ "-broker,+queue,topic",
+ "generic,topic,queue,cgrp",
+
+ "-all,security,-fetch,+metadata",
+ "metadata,security",
+
+ NULL};
+
+ TEST_SAY("Incremental S2F tests\n");
+ conf = rd_kafka_conf_new();
+
+ for (i = 0; s2fs[i]; i += 2) {
+ const char *val;
+
+ TEST_SAY(" Set: %s\n", s2fs[i]);
+ test_conf_set(conf, "debug", s2fs[i]);
+ val = test_conf_get(conf, "debug");
+ TEST_SAY(" Now: %s\n", val);
+
+ if (strcmp(val, s2fs[i + 1]))
+ TEST_FAIL_LATER(
+ "\n"
+ "Expected: %s\n"
+ " Got: %s",
+ s2fs[i + 1], val);
+ }
+ rd_kafka_conf_destroy(conf);
+ }
+
+ {
+ rd_kafka_conf_res_t res;
+
+ TEST_SAY("Error reporting for S2F properties\n");
+ conf = rd_kafka_conf_new();
+
+ res =
+ rd_kafka_conf_set(conf, "debug", "cgrp,invalid-value,topic",
+ errstr, sizeof(errstr));
+
+ TEST_ASSERT(
+ res == RD_KAFKA_CONF_INVALID,
+ "expected 'debug=invalid-value' to fail with INVALID, "
+ "not %d",
+ res);
+ TEST_ASSERT(strstr(errstr, "invalid-value"),
+ "expected invalid value to be mentioned in error, "
+ "not \"%s\"",
+ errstr);
+ TEST_ASSERT(!strstr(errstr, "cgrp") && !strstr(errstr, "topic"),
+ "expected only invalid value to be mentioned, "
+ "not \"%s\"",
+ errstr);
+ TEST_SAY(_C_GRN "Ok: %s\n" _C_CLR, errstr);
+
+ rd_kafka_conf_destroy(conf);
+ }
+
+#if WITH_SSL
+ {
+ TEST_SAY(
+ "Verifying that ssl.ca.location is not "
+ "overwritten (#3566)\n");
+
+ conf = rd_kafka_conf_new();
+
+ test_conf_set(conf, "security.protocol", "SSL");
+ test_conf_set(conf, "ssl.ca.location", "/?/does/!/not/exist!");
+
+ rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr,
+ sizeof(errstr));
+ TEST_ASSERT(!rk,
+ "Expected rd_kafka_new() to fail with "
+ "invalid ssl.ca.location");
+ TEST_SAY("rd_kafka_new() failed as expected: %s\n", errstr);
+ rd_kafka_conf_destroy(conf);
+ }
+
+#ifdef _WIN32
+ {
+ FILE *fp;
+ TEST_SAY(
+ "Verifying that OpenSSL_AppLink "
+ "is not needed (#3554)\n");
+
+ /* Create dummy file so the file open works,
+ * but parsing fails. */
+ fp = fopen("_tmp_0004", "w");
+ TEST_ASSERT(fp != NULL, "Failed to create dummy file: %s",
+ rd_strerror(errno));
+ if (fwrite("?", 1, 1, fp) != 1)
+ TEST_FAIL("Failed to write to dummy file _tmp_0004: %s",
+ rd_strerror(errno));
+ fclose(fp);
+
+ conf = rd_kafka_conf_new();
+
+ test_conf_set(conf, "security.protocol", "SSL");
+ test_conf_set(conf, "ssl.keystore.location", "_tmp_0004");
+ test_conf_set(conf, "ssl.keystore.password", "x");
+
+ /* Prior to the fix OpenSSL will assert with a message like
+ * this: "OPENSSL_Uplink(00007FF9C0229D30,08): no
+ * OPENSSL_Applink"
+ * and the program will exit with error code 1. */
+ rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr,
+ sizeof(errstr));
+ _unlink("tmp_0004");
+
+ TEST_ASSERT(!rk,
+ "Expected rd_kafka_new() to fail due to "
+ "dummy ssl.keystore.location");
+ TEST_ASSERT(strstr(errstr, "ssl.keystore.location") != NULL,
+ "Expected rd_kafka_new() to fail with "
+ "dummy ssl.keystore.location, not: %s",
+ errstr);
+
+ TEST_SAY("rd_kafka_new() failed as expected: %s\n", errstr);
+ }
+#endif /* _WIN32 */
+
+#endif /* WITH_SSL */
+
+ /* Canonical int values, aliases, s2i-verified strings, doubles */
+ {
+ static const struct {
+ const char *prop;
+ const char *val;
+ const char *exp;
+ int is_global;
+ } props[] = {
+ {"request.required.acks", "0", "0"},
+ {"request.required.acks", "-1", "-1"},
+ {"request.required.acks", "1", "1"},
+ {"acks", "3", "3"}, /* alias test */
+ {"request.required.acks", "393", "393"},
+ {"request.required.acks", "bad", NULL},
+ {"request.required.acks", "all", "-1"},
+ {"request.required.acks", "all", "-1", 1 /*fallthru*/},
+ {"acks", "0", "0"}, /* alias test */
+#if WITH_SASL
+ {"sasl.mechanisms", "GSSAPI", "GSSAPI", 1},
+ {"sasl.mechanisms", "PLAIN", "PLAIN", 1},
+ {"sasl.mechanisms", "GSSAPI,PLAIN", NULL, 1},
+ {"sasl.mechanisms", "", NULL, 1},
+#endif
+ {"linger.ms", "12555.3", "12555.3", 1},
+ {"linger.ms", "1500.000", "1500", 1},
+ {"linger.ms", "0.0001", "0.0001", 1},
+ {NULL}
+ };
+
+ TEST_SAY("Canonical tests\n");
+ tconf = rd_kafka_topic_conf_new();
+ conf = rd_kafka_conf_new();
+
+ for (i = 0; props[i].prop; i++) {
+ char dest[64];
+ size_t destsz;
+ rd_kafka_conf_res_t res;
+
+ TEST_SAY(" Set: %s=%s expect %s (%s)\n", props[i].prop,
+ props[i].val, props[i].exp,
+ props[i].is_global ? "global" : "topic");
+
+
+ /* Set value */
+ if (props[i].is_global)
+ res = rd_kafka_conf_set(conf, props[i].prop,
+ props[i].val, errstr,
+ sizeof(errstr));
+ else
+ res = rd_kafka_topic_conf_set(
+ tconf, props[i].prop, props[i].val, errstr,
+ sizeof(errstr));
+ if ((res == RD_KAFKA_CONF_OK ? 1 : 0) !=
+ (props[i].exp ? 1 : 0))
+ TEST_FAIL("Expected %s, got %s",
+ props[i].exp ? "success" : "failure",
+ (res == RD_KAFKA_CONF_OK
+ ? "OK"
+ : (res == RD_KAFKA_CONF_INVALID
+ ? "INVALID"
+ : "UNKNOWN")));
+
+ if (!props[i].exp)
+ continue;
+
+ /* Get value and compare to expected result */
+ destsz = sizeof(dest);
+ if (props[i].is_global)
+ res = rd_kafka_conf_get(conf, props[i].prop,
+ dest, &destsz);
+ else
+ res = rd_kafka_topic_conf_get(
+ tconf, props[i].prop, dest, &destsz);
+ TEST_ASSERT(res == RD_KAFKA_CONF_OK,
+ ".._conf_get(%s) returned %d",
+ props[i].prop, res);
+
+ TEST_ASSERT(!strcmp(props[i].exp, dest),
+ "Expected \"%s\", got \"%s\"", props[i].exp,
+ dest);
+ }
+ rd_kafka_topic_conf_destroy(tconf);
+ rd_kafka_conf_destroy(conf);
+ }
+
+ do_test_kafka_new_failures();
+
+ do_test_special_invalid_conf();
+
+ do_test_idempotence_conf();
+
+ do_test_instance_conf();
+
+ do_test_default_topic_conf();
+
+ do_message_timeout_linger_checks();
+
+ return 0;
+}