1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
|
#!/bin/bash
# added 2018-12-18 by ludobrands
# This file is part of the rsyslog project, released under ASL 2.0
. ${srcdir:=.}/diag.sh init
test_status unreliable 'https://github.com/rsyslog/rsyslog/issues/3197'
check_command_available kafkacat
export KEEP_KAFKA_RUNNING="YES"
export TESTMESSAGES=100000
export TESTMESSAGESFULL=$TESTMESSAGES
export RANDTOPIC=$(tr -dc 'a-zA-Z0-9' < /dev/urandom | fold -w 8 | head -n 1)
# Set EXTRA_EXITCHECK to dump kafka/zookeeperlogfiles on failure only.
export EXTRA_EXITCHECK=dumpkafkalogs
export EXTRA_EXIT=kafka
echo Check and Stop previous instances of kafka/zookeeper
download_kafka
stop_zookeeper
stop_kafka
echo Create kafka/zookeeper instance and $RANDTOPIC topic
start_zookeeper
start_kafka
create_kafka_topic $RANDTOPIC '.dep_wrk' '22181'
# --- Create/Start omkafka sender config
export RSYSLOG_DEBUGLOG="log"
generate_conf
add_conf '
# impstats in order to gain insight into error cases
module(load="../plugins/impstats/.libs/impstats"
log.file="'$RSYSLOG_DYNNAME.pstats'"
interval="1" log.syslog="off")
main_queue(queue.timeoutactioncompletion="60000" queue.timeoutshutdown="60000")
$imdiagInjectDelayMode full
module(load="../plugins/omkafka/.libs/omkafka")
template(name="outfmt" type="string" string="%msg:F,58:2%\n")
template(name="keyin" type="list"){
property(name="$.inkey")
}
local4.* {
set $.inkey = substring(field($msg,":",2),7,1);
action( name="kafka-fwd"
type="omkafka"
topic="'$RANDTOPIC'"
key="keyin"
dynaKey="on"
broker="localhost:29092"
template="outfmt"
confParam=[ "compression.codec=none",
"socket.timeout.ms=10000",
"socket.keepalive.enable=true",
"reconnect.backoff.jitter.ms=1000",
"queue.buffering.max.messages=10000",
"enable.auto.commit=true",
"message.send.max.retries=1"]
topicConfParam=["message.timeout.ms=10000"]
partitions.auto="on"
closeTimeout="60000"
resubmitOnFailure="on"
keepFailedMessages="on"
failedMsgFile="'$RSYSLOG_OUT_LOG'-failed-'$RANDTOPIC'.data"
action.resumeInterval="1"
action.resumeRetryCount="2"
queue.saveonshutdown="on"
)
action( type="omfile" file="'$RSYSLOG_OUT_LOG'")
stop
}
action( type="omfile" file="'$RSYSLOG_DYNNAME.othermsg'")
'
echo Starting sender instance [omkafka]
startup
echo Inject messages into rsyslog sender instance
injectmsg 1 $TESTMESSAGES
wait_file_lines $RSYSLOG_OUT_LOG $TESTMESSAGESFULL 100
# experimental: wait until kafkacat receives everything
timeoutend=100
timecounter=0
while [ $timecounter -lt $timeoutend ]; do
(( timecounter++ ))
kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%s' > $RSYSLOG_OUT_LOG
count=$(wc -l < ${RSYSLOG_OUT_LOG})
if [ $count -eq $TESTMESSAGESFULL ]; then
printf '**** wait-kafka-lines success, have %d lines ****\n\n' "$TESTMESSAGESFULL"
kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%p %k\n' | sort | uniq > "$RSYSLOG_OUT_LOG.extra"
count=$(wc -l < "${RSYSLOG_OUT_LOG}.extra")
if [ $count -eq 10 ]; then
printf '**** partition check success, have 10 partition-key combinations ****\n\n'
break
else
shutdown_when_empty
wait_shutdown
printf '\n\nERROR: partition check failed, expected 10 got %s\n' "$count"
printf '\ņRAW DATA:\n'
kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%p %k\n'
printf '\nCHECKED OUTPUT:\n'
cat "$RSYSLOG_OUT_LOG.extra"
error_exit 1
fi
else
if [ "x$timecounter" == "x$timeoutend" ]; then
echo wait-kafka-lines failed, expected $TESTMESSAGESFULL got $count
shutdown_when_empty
wait_shutdown
error_exit 1
else
echo wait-file-lines not yet there, currently $count lines
printf 'pstats data:\n'
# we use tail below to guard against overwhelming the
# logs if things go wild
tail -n 500 < $RSYSLOG_DYNNAME.pstats
printf '\n'
$TESTTOOL_DIR/msleep 1000
fi
fi
echo end iteration $timecounter
done
unset count
#end experimental
echo Stopping sender instance [omkafka]
shutdown_when_empty
wait_shutdown
#kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%s' > $RSYSLOG_OUT_LOG
#kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%p@%o:%k:%s' > $RSYSLOG_OUT_LOG.extra
# Delete topic to remove old traces before
delete_kafka_topic $RANDTOPIC '.dep_wrk' '22181'
# Dump Kafka log | uncomment if needed
# dump_kafka_serverlog
kafka_check_broken_broker $RSYSLOG_DYNNAME.othermsg
seq_check 1 $TESTMESSAGESFULL -d
exit_test
|