diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-15 16:28:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-15 16:28:20 +0000 |
commit | dcc721a95bef6f0d8e6d8775b8efe33e5aecd562 (patch) | |
tree | 66a2774cd0ee294d019efd71d2544c70f42b2842 /tests/imkafka_multi_single.sh | |
parent | Initial commit. (diff) | |
download | rsyslog-dcc721a95bef6f0d8e6d8775b8efe33e5aecd562.tar.xz rsyslog-dcc721a95bef6f0d8e6d8775b8efe33e5aecd562.zip |
Adding upstream version 8.2402.0.upstream/8.2402.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'tests/imkafka_multi_single.sh')
-rwxr-xr-x | tests/imkafka_multi_single.sh | 151 |
1 files changed, 151 insertions, 0 deletions
diff --git a/tests/imkafka_multi_single.sh b/tests/imkafka_multi_single.sh new file mode 100755 index 0000000..d0127e0 --- /dev/null +++ b/tests/imkafka_multi_single.sh @@ -0,0 +1,151 @@ +#!/bin/bash +# added 2018-08-29 by alorbach +# This file is part of the rsyslog project, released under ASL 2.0 +. ${srcdir:=.}/diag.sh init +check_command_available kafkacat +export KEEP_KAFKA_RUNNING="YES" + +export TESTMESSAGES=100000 +# Set EXTRA_EXITCHECK to dump kafka/zookeeperlogfiles on failure only. +#export EXTRA_EXITCHECK=dumpkafkalogs +#export EXTRA_EXIT=kafka + +download_kafka +stop_zookeeper +stop_kafka +start_zookeeper +start_kafka + +export RANDTOPIC=$(tr -dc 'a-zA-Z0-9' < /dev/urandom | fold -w 8 | head -n 1) + +create_kafka_topic $RANDTOPIC '.dep_wrk' '22181' + +export RSYSLOG_DEBUGLOG="log" +generate_conf +add_conf ' +main_queue(queue.timeoutactioncompletion="60000" queue.timeoutshutdown="60000") + +module(load="../plugins/imkafka/.libs/imkafka") +/* Polls messages from kafka server!*/ +input( type="imkafka" + topic="'$RANDTOPIC'" + broker="localhost:29092" + consumergroup="default1" + confParam=[ "compression.codec=none", + "session.timeout.ms=10000", + "socket.timeout.ms=5000", + "socket.keepalive.enable=true", + "reconnect.backoff.jitter.ms=1000", + "enable.partition.eof=false" ] + ) + +input( type="imkafka" + topic="'$RANDTOPIC'" + broker="localhost:29092" + consumergroup="default2" + confParam=[ "compression.codec=none", + "session.timeout.ms=10000", + "socket.timeout.ms=5000", + "socket.keepalive.enable=true", + "reconnect.backoff.jitter.ms=1000", + "enable.partition.eof=false" ] + ) + +input( type="imkafka" + topic="'$RANDTOPIC'" + broker="localhost:29092" + consumergroup="default3" + confParam=[ "compression.codec=none", + "session.timeout.ms=10000", + "socket.timeout.ms=5000", + "socket.keepalive.enable=true", + "reconnect.backoff.jitter.ms=1000", + "enable.partition.eof=false" ] + ) + +input( type="imkafka" + topic="'$RANDTOPIC'" + broker="localhost:29092" + consumergroup="default4" + confParam=[ "compression.codec=none", + "session.timeout.ms=10000", + "socket.timeout.ms=5000", + "socket.keepalive.enable=true", + "reconnect.backoff.jitter.ms=1000", + "enable.partition.eof=false" ] + ) + +input( type="imkafka" + topic="'$RANDTOPIC'" + broker="localhost:29092" + consumergroup="default5" + confParam=[ "compression.codec=none", + "session.timeout.ms=10000", + "socket.timeout.ms=5000", + "socket.keepalive.enable=true", + "reconnect.backoff.jitter.ms=1000", + "enable.partition.eof=false" ] + ) + +input( type="imkafka" + topic="'$RANDTOPIC'" + broker="localhost:29092" + consumergroup="default6" + confParam=[ "compression.codec=none", + "session.timeout.ms=10000", + "socket.timeout.ms=5000", + "socket.keepalive.enable=true", + "reconnect.backoff.jitter.ms=1000", + "enable.partition.eof=false" ] + ) + +input( type="imkafka" + topic="'$RANDTOPIC'" + broker="localhost:29092" + consumergroup="default7" + confParam=[ "compression.codec=none", + "session.timeout.ms=10000", + "socket.timeout.ms=5000", + "socket.keepalive.enable=true", + "reconnect.backoff.jitter.ms=1000", + "enable.partition.eof=false" ] + ) + +input( type="imkafka" + topic="'$RANDTOPIC'" + broker="localhost:29092" + consumergroup="default8" + confParam=[ "compression.codec=none", + "session.timeout.ms=10000", + "socket.timeout.ms=5000", + "socket.keepalive.enable=true", + "reconnect.backoff.jitter.ms=1000", + "enable.partition.eof=false" ] + ) + +template(name="outfmt" type="string" string="%msg:F,58:2%\n") + +if ($msg contains "msgnum:") then { + action( type="omfile" file=`echo $RSYSLOG_OUT_LOG` template="outfmt" ) +} +' + +startup + +TIMESTART=$(date +%s.%N) + +injectmsg_kafkacat +# special case: number of test messages differs from file output +wait_file_lines $RSYSLOG_OUT_LOG $((TESTMESSAGES * 8)) ${RETRIES:-200} +shutdown_when_empty +wait_shutdown + +TIMEEND=$(date +%s.%N) +TIMEDIFF=$(echo "$TIMEEND - $TIMESTART" | bc) +echo "*** imkafka time to process all data: $TIMEDIFF seconds!" + +delete_kafka_topic $RANDTOPIC '.dep_wrk' '22181' + +seq_check 1 $TESTMESSAGES -d + +exit_test |