diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-15 16:28:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-15 16:28:20 +0000 |
commit | dcc721a95bef6f0d8e6d8775b8efe33e5aecd562 (patch) | |
tree | 66a2774cd0ee294d019efd71d2544c70f42b2842 /tests/sndrcv_kafka_multi_topics.sh | |
parent | Initial commit. (diff) | |
download | rsyslog-dcc721a95bef6f0d8e6d8775b8efe33e5aecd562.tar.xz rsyslog-dcc721a95bef6f0d8e6d8775b8efe33e5aecd562.zip |
Adding upstream version 8.2402.0.upstream/8.2402.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rwxr-xr-x | tests/sndrcv_kafka_multi_topics.sh | 171 |
1 files changed, 171 insertions, 0 deletions
diff --git a/tests/sndrcv_kafka_multi_topics.sh b/tests/sndrcv_kafka_multi_topics.sh new file mode 100755 index 0000000..d0e297f --- /dev/null +++ b/tests/sndrcv_kafka_multi_topics.sh @@ -0,0 +1,171 @@ +#!/bin/bash +# added 2018-08-13 by alorbach +# This file is part of the rsyslog project, released under ASL 2.0 +. ${srcdir:=.}/diag.sh init + +export TESTMESSAGES=50000 +export TESTMESSAGESFULL=100000 + +# Generate random topic name +export RANDTOPIC1=$(tr -dc 'a-zA-Z0-9' < /dev/urandom | fold -w 8 | head -n 1) +export RANDTOPIC2=$(tr -dc 'a-zA-Z0-9' < /dev/urandom | fold -w 8 | head -n 1) + +# Set EXTRA_EXITCHECK to dump kafka/zookeeperlogfiles on failure only. +export EXTRA_EXITCHECK=dumpkafkalogs +export EXTRA_EXIT=kafka +echo STEP: Check and Stop previous instances of kafka/zookeeper +download_kafka +stop_zookeeper +stop_kafka + +echo STEP: Create kafka/zookeeper instance and topics +start_zookeeper +start_kafka +create_kafka_topic $RANDTOPIC1 '.dep_wrk' '22181' +create_kafka_topic $RANDTOPIC2 '.dep_wrk' '22181' + +# --- Create omkafka sender config +export RSYSLOG_DEBUGLOG="log" +generate_conf +add_conf ' +main_queue(queue.timeoutactioncompletion="60000" queue.timeoutshutdown="60000") +$imdiagInjectDelayMode full + +module(load="../plugins/omkafka/.libs/omkafka") + +template(name="outfmt" type="string" string="%msg%\n") + +local4.* action( name="kafka-fwd" + type="omkafka" + topic="'$RANDTOPIC1'" + broker="localhost:29092" + template="outfmt" + confParam=[ "compression.codec=none", + "socket.timeout.ms=10000", + "socket.keepalive.enable=true", + "reconnect.backoff.jitter.ms=1000", + "queue.buffering.max.messages=10000", + "enable.auto.commit=true", + "message.send.max.retries=1"] + topicConfParam=["message.timeout.ms=10000"] + partitions.auto="on" + closeTimeout="60000" + resubmitOnFailure="on" + keepFailedMessages="on" + failedMsgFile="'$RSYSLOG_OUT_LOG'-failed-'$RANDTOPIC1'.data" + action.resumeInterval="1" + action.resumeRetryCount="10" + queue.saveonshutdown="on" + ) +local4.* action( name="kafka-fwd" + type="omkafka" + topic="'$RANDTOPIC2'" + broker="localhost:29092" + template="outfmt" + confParam=[ "compression.codec=none", + "socket.timeout.ms=10000", + "socket.keepalive.enable=true", + "reconnect.backoff.jitter.ms=1000", + "queue.buffering.max.messages=10000", + "enable.auto.commit=true", + "message.send.max.retries=1"] + topicConfParam=["message.timeout.ms=10000"] + partitions.auto="on" + closeTimeout="60000" + resubmitOnFailure="on" + keepFailedMessages="on" + failedMsgFile="'$RSYSLOG_OUT_LOG'-failed-'$RANDTOPIC2'.data" + action.resumeInterval="1" + action.resumeRetryCount="10" + queue.saveonshutdown="on" + ) + +syslog.* action(type="omfile" file="'$RSYSLOG_DYNNAME.sender.syslog'") +' + +echo STEP: Starting sender instance [omkafka] +startup +# --- + +# Injection messages now before starting receiver, simply because omkafka will take some time and +# there is no reason to wait for the receiver to startup first. +echo STEP: Inject messages into rsyslog sender instance +injectmsg 1 $TESTMESSAGES + +# --- Create omkafka receiver config +export RSYSLOG_DEBUGLOG="log2" +generate_conf 2 +add_conf ' +main_queue(queue.timeoutactioncompletion="60000" queue.timeoutshutdown="60000") + +module(load="../plugins/imkafka/.libs/imkafka") +/* Polls messages from kafka server!*/ +input( type="imkafka" + topic="'$RANDTOPIC1'" + broker="localhost:29092" + consumergroup="default1" + confParam=[ "compression.codec=none", + "session.timeout.ms=10000", + "socket.timeout.ms=10000", + "enable.partition.eof=false", + "reconnect.backoff.jitter.ms=1000", + "socket.keepalive.enable=true"] + ) +input( type="imkafka" + topic="'$RANDTOPIC2'" + broker="localhost:29092" + consumergroup="default2" + confParam=[ "compression.codec=none", + "session.timeout.ms=10000", + "socket.timeout.ms=10000", + "enable.partition.eof=false", + "reconnect.backoff.jitter.ms=1000", + "socket.keepalive.enable=true"] + ) + +template(name="outfmt" type="string" string="%msg:F,58:2%\n") + +if ($msg contains "msgnum:") then { + action( type="omfile" file=`echo $RSYSLOG_OUT_LOG` template="outfmt" ) +} + +syslog.* action(type="omfile" file="'$RSYSLOG_DYNNAME.receiver.syslog'") +' 2 + +echo STEP: Starting receiver instance [imkafka] +startup 2 +# --- + +echo STEP: Stopping sender instance [omkafka] +shutdown_when_empty +wait_shutdown + +echo STEP: Stopping receiver instance [imkafka] +kafka_wait_group_coordinator +shutdown_when_empty 2 +wait_shutdown 2 + +echo STEP: delete kafka topics +delete_kafka_topic $RANDTOPIC1 '.dep_wrk' '22181' +delete_kafka_topic $RANDTOPIC2 '.dep_wrk' '22181' + +kafka_check_broken_broker "$RSYSLOG_DYNNAME.sender.syslog" +kafka_check_broken_broker "$RSYSLOG_DYNNAME.receiver.syslog" + +# Dump Kafka log | uncomment if needed +# dump_kafka_serverlog + +# Do the final sequence check +seq_check 1 $TESTMESSAGES -d + +linecount=$(wc -l < ${RSYSLOG_OUT_LOG}) +if [ $linecount -ge $TESTMESSAGESFULL ]; then + echo "Info: Count correct: $linecount" +else + echo "Count error detected in $RSYSLOG_OUT_LOG" + echo "number of lines in file: $linecount" + error_exit 1 +fi + +echo success +exit_test |