summaryrefslogtreecommitdiffstats
path: root/tests/omkafkadynakey.sh
blob: a05a5adcbe17305df2355a84f139d43c1d8fea01 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#!/bin/bash
# added 2018-12-18 by ludobrands
# This file is part of the rsyslog project, released under ASL 2.0
. ${srcdir:=.}/diag.sh init
test_status unreliable 'https://github.com/rsyslog/rsyslog/issues/3197'
check_command_available kafkacat

export KEEP_KAFKA_RUNNING="YES"
export TESTMESSAGES=100000
export TESTMESSAGESFULL=$TESTMESSAGES

export RANDTOPIC=$(tr -dc 'a-zA-Z0-9' < /dev/urandom | fold -w 8 | head -n 1)

# Set EXTRA_EXITCHECK to dump kafka/zookeeperlogfiles on failure only.
export EXTRA_EXITCHECK=dumpkafkalogs
export EXTRA_EXIT=kafka
echo Check and Stop previous instances of kafka/zookeeper 
download_kafka
stop_zookeeper
stop_kafka

echo Create kafka/zookeeper instance and $RANDTOPIC topic
start_zookeeper
start_kafka
create_kafka_topic $RANDTOPIC '.dep_wrk' '22181'

# --- Create/Start omkafka sender config 
export RSYSLOG_DEBUGLOG="log"
generate_conf
add_conf '
# impstats in order to gain insight into error cases
module(load="../plugins/impstats/.libs/impstats"
	log.file="'$RSYSLOG_DYNNAME.pstats'"
	interval="1" log.syslog="off")
main_queue(queue.timeoutactioncompletion="60000" queue.timeoutshutdown="60000")
$imdiagInjectDelayMode full

module(load="../plugins/omkafka/.libs/omkafka")

template(name="outfmt" type="string" string="%msg:F,58:2%\n")

template(name="keyin" type="list"){
  property(name="$.inkey")
}
  
local4.* {
	set $.inkey = substring(field($msg,":",2),7,1);
 	action(	name="kafka-fwd"
	type="omkafka"
	topic="'$RANDTOPIC'"
	key="keyin"
	dynaKey="on"
	broker="localhost:29092"
	template="outfmt"
	confParam=[	"compression.codec=none",
			"socket.timeout.ms=10000",
			"socket.keepalive.enable=true",
			"reconnect.backoff.jitter.ms=1000",
			"queue.buffering.max.messages=10000",
			"enable.auto.commit=true",
			"message.send.max.retries=1"]
	topicConfParam=["message.timeout.ms=10000"]
	partitions.auto="on"
	closeTimeout="60000"
	resubmitOnFailure="on"
	keepFailedMessages="on"
	failedMsgFile="'$RSYSLOG_OUT_LOG'-failed-'$RANDTOPIC'.data"
	action.resumeInterval="1"
	action.resumeRetryCount="2"
	queue.saveonshutdown="on"
	)
	action( type="omfile" file="'$RSYSLOG_OUT_LOG'")
	stop
}

action( type="omfile" file="'$RSYSLOG_DYNNAME.othermsg'")
'

echo Starting sender instance [omkafka]
startup

echo Inject messages into rsyslog sender instance  
injectmsg 1 $TESTMESSAGES

wait_file_lines $RSYSLOG_OUT_LOG $TESTMESSAGESFULL 100

# experimental: wait until kafkacat receives everything

timeoutend=100
timecounter=0

while [ $timecounter -lt $timeoutend ]; do
	(( timecounter++ ))

	kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%s' > $RSYSLOG_OUT_LOG
	count=$(wc -l < ${RSYSLOG_OUT_LOG})
	if [ $count -eq $TESTMESSAGESFULL ]; then
		printf '**** wait-kafka-lines success, have %d lines ****\n\n' "$TESTMESSAGESFULL"
	        kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%p %k\n' | sort | uniq > "$RSYSLOG_OUT_LOG.extra"
        	count=$(wc -l < "${RSYSLOG_OUT_LOG}.extra")
	        if [ $count -eq 10 ]; then
			printf '**** partition check success, have 10 partition-key combinations ****\n\n' 
			break
	        else
			shutdown_when_empty
			wait_shutdown
			printf '\n\nERROR: partition check failed, expected 10 got %s\n' "$count"
			printf '\ņRAW DATA:\n'
			kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%p %k\n'
			printf '\nCHECKED OUTPUT:\n'
			cat "$RSYSLOG_OUT_LOG.extra"
			error_exit 1
	        fi
	else
		if [ "x$timecounter" == "x$timeoutend" ]; then
			echo wait-kafka-lines failed, expected $TESTMESSAGESFULL got $count
			shutdown_when_empty
			wait_shutdown
			error_exit 1
		else
			echo wait-file-lines not yet there, currently $count lines
			printf 'pstats data:\n'
			# we use tail below to guard against overwhelming the
			# logs if things go wild
			tail -n 500 < $RSYSLOG_DYNNAME.pstats
			printf '\n'

			$TESTTOOL_DIR/msleep 1000
		fi
	fi
echo end iteration  $timecounter
done
unset count

#end experimental

echo Stopping sender instance [omkafka]
shutdown_when_empty
wait_shutdown

#kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%s' > $RSYSLOG_OUT_LOG
#kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%p@%o:%k:%s' > $RSYSLOG_OUT_LOG.extra

# Delete topic to remove old traces before
delete_kafka_topic $RANDTOPIC '.dep_wrk' '22181'

# Dump Kafka log | uncomment if needed
# dump_kafka_serverlog

kafka_check_broken_broker $RSYSLOG_DYNNAME.othermsg
seq_check 1 $TESTMESSAGESFULL -d

exit_test