1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
#include <stdio.h>
#include <string.h>
#include <librdkafka/rdkafka.h>
int main(int argc, char **argv) {
rd_kafka_conf_t *conf;
rd_kafka_t *rk;
char features[256];
size_t fsize = sizeof(features);
char errstr[512];
const char *exp_features[] = {
"gzip", "snappy", "ssl", "sasl", "regex",
"lz4", "sasl_gssapi", "sasl_plain", "sasl_scram", "plugins",
"zstd", "sasl_oauthbearer", NULL,
};
const char **exp;
int missing = 0;
printf("librdkafka %s\n", rd_kafka_version_str());
conf = rd_kafka_conf_new();
if (rd_kafka_conf_get(conf, "builtin.features", features, &fsize) !=
RD_KAFKA_CONF_OK) {
fprintf(stderr, "conf_get failed\n");
return 1;
}
printf("builtin.features %s\n", features);
/* Verify that expected features are enabled. */
for (exp = exp_features; *exp; exp++) {
const char *t = features;
size_t elen = strlen(*exp);
int match = 0;
while ((t = strstr(t, *exp))) {
if (t[elen] == ',' || t[elen] == '\0') {
match = 1;
break;
}
t += elen;
}
if (match)
continue;
fprintf(stderr, "ERROR: feature %s not found\n", *exp);
missing++;
}
if (rd_kafka_conf_set(conf, "security.protocol", "SASL_SSL", errstr,
sizeof(errstr)) ||
rd_kafka_conf_set(conf, "sasl.mechanism", "PLAIN", errstr,
sizeof(errstr)) ||
rd_kafka_conf_set(conf, "sasl.username", "username", errstr,
sizeof(errstr)) ||
rd_kafka_conf_set(conf, "sasl.password", "password", errstr,
sizeof(errstr)) ||
rd_kafka_conf_set(conf, "debug", "security", errstr,
sizeof(errstr))) {
fprintf(stderr, "conf_set failed: %s\n", errstr);
return 1;
}
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!rk) {
fprintf(stderr, "rd_kafka_new failed: %s\n", errstr);
return 1;
}
printf("client name %s\n", rd_kafka_name(rk));
rd_kafka_destroy(rk);
return missing ? 1 : 0;
}
|