summaryrefslogtreecommitdiffstats
path: root/tests/omkafkadynakey.sh
diff options
context:
space:
mode:
Diffstat (limited to 'tests/omkafkadynakey.sh')
-rwxr-xr-xtests/omkafkadynakey.sh154
1 files changed, 154 insertions, 0 deletions
diff --git a/tests/omkafkadynakey.sh b/tests/omkafkadynakey.sh
new file mode 100755
index 0000000..a05a5ad
--- /dev/null
+++ b/tests/omkafkadynakey.sh
@@ -0,0 +1,154 @@
+#!/bin/bash
+# added 2018-12-18 by ludobrands
+# This file is part of the rsyslog project, released under ASL 2.0
+. ${srcdir:=.}/diag.sh init
+test_status unreliable 'https://github.com/rsyslog/rsyslog/issues/3197'
+check_command_available kafkacat
+
+export KEEP_KAFKA_RUNNING="YES"
+export TESTMESSAGES=100000
+export TESTMESSAGESFULL=$TESTMESSAGES
+
+export RANDTOPIC=$(tr -dc 'a-zA-Z0-9' < /dev/urandom | fold -w 8 | head -n 1)
+
+# Set EXTRA_EXITCHECK to dump kafka/zookeeperlogfiles on failure only.
+export EXTRA_EXITCHECK=dumpkafkalogs
+export EXTRA_EXIT=kafka
+echo Check and Stop previous instances of kafka/zookeeper
+download_kafka
+stop_zookeeper
+stop_kafka
+
+echo Create kafka/zookeeper instance and $RANDTOPIC topic
+start_zookeeper
+start_kafka
+create_kafka_topic $RANDTOPIC '.dep_wrk' '22181'
+
+# --- Create/Start omkafka sender config
+export RSYSLOG_DEBUGLOG="log"
+generate_conf
+add_conf '
+# impstats in order to gain insight into error cases
+module(load="../plugins/impstats/.libs/impstats"
+ log.file="'$RSYSLOG_DYNNAME.pstats'"
+ interval="1" log.syslog="off")
+main_queue(queue.timeoutactioncompletion="60000" queue.timeoutshutdown="60000")
+$imdiagInjectDelayMode full
+
+module(load="../plugins/omkafka/.libs/omkafka")
+
+template(name="outfmt" type="string" string="%msg:F,58:2%\n")
+
+template(name="keyin" type="list"){
+ property(name="$.inkey")
+}
+
+local4.* {
+ set $.inkey = substring(field($msg,":",2),7,1);
+ action( name="kafka-fwd"
+ type="omkafka"
+ topic="'$RANDTOPIC'"
+ key="keyin"
+ dynaKey="on"
+ broker="localhost:29092"
+ template="outfmt"
+ confParam=[ "compression.codec=none",
+ "socket.timeout.ms=10000",
+ "socket.keepalive.enable=true",
+ "reconnect.backoff.jitter.ms=1000",
+ "queue.buffering.max.messages=10000",
+ "enable.auto.commit=true",
+ "message.send.max.retries=1"]
+ topicConfParam=["message.timeout.ms=10000"]
+ partitions.auto="on"
+ closeTimeout="60000"
+ resubmitOnFailure="on"
+ keepFailedMessages="on"
+ failedMsgFile="'$RSYSLOG_OUT_LOG'-failed-'$RANDTOPIC'.data"
+ action.resumeInterval="1"
+ action.resumeRetryCount="2"
+ queue.saveonshutdown="on"
+ )
+ action( type="omfile" file="'$RSYSLOG_OUT_LOG'")
+ stop
+}
+
+action( type="omfile" file="'$RSYSLOG_DYNNAME.othermsg'")
+'
+
+echo Starting sender instance [omkafka]
+startup
+
+echo Inject messages into rsyslog sender instance
+injectmsg 1 $TESTMESSAGES
+
+wait_file_lines $RSYSLOG_OUT_LOG $TESTMESSAGESFULL 100
+
+# experimental: wait until kafkacat receives everything
+
+timeoutend=100
+timecounter=0
+
+while [ $timecounter -lt $timeoutend ]; do
+ (( timecounter++ ))
+
+ kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%s' > $RSYSLOG_OUT_LOG
+ count=$(wc -l < ${RSYSLOG_OUT_LOG})
+ if [ $count -eq $TESTMESSAGESFULL ]; then
+ printf '**** wait-kafka-lines success, have %d lines ****\n\n' "$TESTMESSAGESFULL"
+ kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%p %k\n' | sort | uniq > "$RSYSLOG_OUT_LOG.extra"
+ count=$(wc -l < "${RSYSLOG_OUT_LOG}.extra")
+ if [ $count -eq 10 ]; then
+ printf '**** partition check success, have 10 partition-key combinations ****\n\n'
+ break
+ else
+ shutdown_when_empty
+ wait_shutdown
+ printf '\n\nERROR: partition check failed, expected 10 got %s\n' "$count"
+ printf '\ņRAW DATA:\n'
+ kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%p %k\n'
+ printf '\nCHECKED OUTPUT:\n'
+ cat "$RSYSLOG_OUT_LOG.extra"
+ error_exit 1
+ fi
+ else
+ if [ "x$timecounter" == "x$timeoutend" ]; then
+ echo wait-kafka-lines failed, expected $TESTMESSAGESFULL got $count
+ shutdown_when_empty
+ wait_shutdown
+ error_exit 1
+ else
+ echo wait-file-lines not yet there, currently $count lines
+ printf 'pstats data:\n'
+ # we use tail below to guard against overwhelming the
+ # logs if things go wild
+ tail -n 500 < $RSYSLOG_DYNNAME.pstats
+ printf '\n'
+
+ $TESTTOOL_DIR/msleep 1000
+ fi
+ fi
+echo end iteration $timecounter
+done
+unset count
+
+#end experimental
+
+echo Stopping sender instance [omkafka]
+shutdown_when_empty
+wait_shutdown
+
+#kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%s' > $RSYSLOG_OUT_LOG
+#kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%p@%o:%k:%s' > $RSYSLOG_OUT_LOG.extra
+
+# Delete topic to remove old traces before
+delete_kafka_topic $RANDTOPIC '.dep_wrk' '22181'
+
+# Dump Kafka log | uncomment if needed
+# dump_kafka_serverlog
+
+kafka_check_broken_broker $RSYSLOG_DYNNAME.othermsg
+seq_check 1 $TESTMESSAGESFULL -d
+
+exit_test
+