diff options
Diffstat (limited to 'tests/omkafkadynakey.sh')
-rwxr-xr-x | tests/omkafkadynakey.sh | 154 |
1 files changed, 154 insertions, 0 deletions
diff --git a/tests/omkafkadynakey.sh b/tests/omkafkadynakey.sh new file mode 100755 index 0000000..a05a5ad --- /dev/null +++ b/tests/omkafkadynakey.sh @@ -0,0 +1,154 @@ +#!/bin/bash +# added 2018-12-18 by ludobrands +# This file is part of the rsyslog project, released under ASL 2.0 +. ${srcdir:=.}/diag.sh init +test_status unreliable 'https://github.com/rsyslog/rsyslog/issues/3197' +check_command_available kafkacat + +export KEEP_KAFKA_RUNNING="YES" +export TESTMESSAGES=100000 +export TESTMESSAGESFULL=$TESTMESSAGES + +export RANDTOPIC=$(tr -dc 'a-zA-Z0-9' < /dev/urandom | fold -w 8 | head -n 1) + +# Set EXTRA_EXITCHECK to dump kafka/zookeeperlogfiles on failure only. +export EXTRA_EXITCHECK=dumpkafkalogs +export EXTRA_EXIT=kafka +echo Check and Stop previous instances of kafka/zookeeper +download_kafka +stop_zookeeper +stop_kafka + +echo Create kafka/zookeeper instance and $RANDTOPIC topic +start_zookeeper +start_kafka +create_kafka_topic $RANDTOPIC '.dep_wrk' '22181' + +# --- Create/Start omkafka sender config +export RSYSLOG_DEBUGLOG="log" +generate_conf +add_conf ' +# impstats in order to gain insight into error cases +module(load="../plugins/impstats/.libs/impstats" + log.file="'$RSYSLOG_DYNNAME.pstats'" + interval="1" log.syslog="off") +main_queue(queue.timeoutactioncompletion="60000" queue.timeoutshutdown="60000") +$imdiagInjectDelayMode full + +module(load="../plugins/omkafka/.libs/omkafka") + +template(name="outfmt" type="string" string="%msg:F,58:2%\n") + +template(name="keyin" type="list"){ + property(name="$.inkey") +} + +local4.* { + set $.inkey = substring(field($msg,":",2),7,1); + action( name="kafka-fwd" + type="omkafka" + topic="'$RANDTOPIC'" + key="keyin" + dynaKey="on" + broker="localhost:29092" + template="outfmt" + confParam=[ "compression.codec=none", + "socket.timeout.ms=10000", + "socket.keepalive.enable=true", + "reconnect.backoff.jitter.ms=1000", + "queue.buffering.max.messages=10000", + "enable.auto.commit=true", + "message.send.max.retries=1"] + topicConfParam=["message.timeout.ms=10000"] + partitions.auto="on" + closeTimeout="60000" + resubmitOnFailure="on" + keepFailedMessages="on" + failedMsgFile="'$RSYSLOG_OUT_LOG'-failed-'$RANDTOPIC'.data" + action.resumeInterval="1" + action.resumeRetryCount="2" + queue.saveonshutdown="on" + ) + action( type="omfile" file="'$RSYSLOG_OUT_LOG'") + stop +} + +action( type="omfile" file="'$RSYSLOG_DYNNAME.othermsg'") +' + +echo Starting sender instance [omkafka] +startup + +echo Inject messages into rsyslog sender instance +injectmsg 1 $TESTMESSAGES + +wait_file_lines $RSYSLOG_OUT_LOG $TESTMESSAGESFULL 100 + +# experimental: wait until kafkacat receives everything + +timeoutend=100 +timecounter=0 + +while [ $timecounter -lt $timeoutend ]; do + (( timecounter++ )) + + kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%s' > $RSYSLOG_OUT_LOG + count=$(wc -l < ${RSYSLOG_OUT_LOG}) + if [ $count -eq $TESTMESSAGESFULL ]; then + printf '**** wait-kafka-lines success, have %d lines ****\n\n' "$TESTMESSAGESFULL" + kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%p %k\n' | sort | uniq > "$RSYSLOG_OUT_LOG.extra" + count=$(wc -l < "${RSYSLOG_OUT_LOG}.extra") + if [ $count -eq 10 ]; then + printf '**** partition check success, have 10 partition-key combinations ****\n\n' + break + else + shutdown_when_empty + wait_shutdown + printf '\n\nERROR: partition check failed, expected 10 got %s\n' "$count" + printf '\ņRAW DATA:\n' + kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%p %k\n' + printf '\nCHECKED OUTPUT:\n' + cat "$RSYSLOG_OUT_LOG.extra" + error_exit 1 + fi + else + if [ "x$timecounter" == "x$timeoutend" ]; then + echo wait-kafka-lines failed, expected $TESTMESSAGESFULL got $count + shutdown_when_empty + wait_shutdown + error_exit 1 + else + echo wait-file-lines not yet there, currently $count lines + printf 'pstats data:\n' + # we use tail below to guard against overwhelming the + # logs if things go wild + tail -n 500 < $RSYSLOG_DYNNAME.pstats + printf '\n' + + $TESTTOOL_DIR/msleep 1000 + fi + fi +echo end iteration $timecounter +done +unset count + +#end experimental + +echo Stopping sender instance [omkafka] +shutdown_when_empty +wait_shutdown + +#kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%s' > $RSYSLOG_OUT_LOG +#kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%p@%o:%k:%s' > $RSYSLOG_OUT_LOG.extra + +# Delete topic to remove old traces before +delete_kafka_topic $RANDTOPIC '.dep_wrk' '22181' + +# Dump Kafka log | uncomment if needed +# dump_kafka_serverlog + +kafka_check_broken_broker $RSYSLOG_DYNNAME.othermsg +seq_check 1 $TESTMESSAGESFULL -d + +exit_test + |