summaryrefslogtreecommitdiffstats
path: root/tests/unit/moduleapi/stream.tcl
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unit/moduleapi/stream.tcl')
-rw-r--r--tests/unit/moduleapi/stream.tcl176
1 files changed, 176 insertions, 0 deletions
diff --git a/tests/unit/moduleapi/stream.tcl b/tests/unit/moduleapi/stream.tcl
new file mode 100644
index 0000000..7ad1a30
--- /dev/null
+++ b/tests/unit/moduleapi/stream.tcl
@@ -0,0 +1,176 @@
+set testmodule [file normalize tests/modules/stream.so]
+
+start_server {tags {"modules"}} {
+ r module load $testmodule
+
+ test {Module stream add and delete} {
+ r del mystream
+ # add to empty key
+ set streamid1 [r stream.add mystream item 1 value a]
+ # add to existing stream
+ set streamid2 [r stream.add mystream item 2 value b]
+ # check result
+ assert { [string match "*-*" $streamid1] }
+ set items [r XRANGE mystream - +]
+ assert_equal $items \
+ "{$streamid1 {item 1 value a}} {$streamid2 {item 2 value b}}"
+ # delete one of them and try deleting non-existing ID
+ assert_equal OK [r stream.delete mystream $streamid1]
+ assert_error "ERR StreamDelete*" {r stream.delete mystream 123-456}
+ assert_error "Invalid stream ID*" {r stream.delete mystream foo}
+ assert_equal "{$streamid2 {item 2 value b}}" [r XRANGE mystream - +]
+ # check error condition: wrong type
+ r del mystream
+ r set mystream mystring
+ assert_error "ERR StreamAdd*" {r stream.add mystream item 1 value a}
+ assert_error "ERR StreamDelete*" {r stream.delete mystream 123-456}
+ }
+
+ test {Module stream add unblocks blocking xread} {
+ r del mystream
+
+ # Blocking XREAD on an empty key
+ set rd1 [redis_deferring_client]
+ $rd1 XREAD BLOCK 3000 STREAMS mystream $
+ # wait until client is actually blocked
+ wait_for_condition 50 100 {
+ [s 0 blocked_clients] eq {1}
+ } else {
+ fail "Client is not blocked"
+ }
+ set id [r stream.add mystream field 1 value a]
+ assert_equal "{mystream {{$id {field 1 value a}}}}" [$rd1 read]
+
+ # Blocking XREAD on an existing stream
+ set rd2 [redis_deferring_client]
+ $rd2 XREAD BLOCK 3000 STREAMS mystream $
+ # wait until client is actually blocked
+ wait_for_condition 50 100 {
+ [s 0 blocked_clients] eq {1}
+ } else {
+ fail "Client is not blocked"
+ }
+ set id [r stream.add mystream field 2 value b]
+ assert_equal "{mystream {{$id {field 2 value b}}}}" [$rd2 read]
+ }
+
+ test {Module stream add benchmark (1M stream add)} {
+ set n 1000000
+ r del mystream
+ set result [r stream.addn mystream $n field value]
+ assert_equal $result $n
+ }
+
+ test {Module stream XADD big fields doesn't create empty key} {
+ set original_proto [config_get_set proto-max-bulk-len 2147483647] ;#2gb
+ set original_query [config_get_set client-query-buffer-limit 2147483647] ;#2gb
+
+ r del mystream
+ r write "*4\r\n\$10\r\nstream.add\r\n\$8\r\nmystream\r\n\$5\r\nfield\r\n"
+ catch {
+ write_big_bulk 1073741824 ;#1gb
+ } err
+ assert {$err eq "ERR StreamAdd failed"}
+ assert_equal 0 [r exists mystream]
+
+ # restore defaults
+ r config set proto-max-bulk-len $original_proto
+ r config set client-query-buffer-limit $original_query
+ } {OK} {large-memory}
+
+ test {Module stream iterator} {
+ r del mystream
+ set streamid1 [r xadd mystream * item 1 value a]
+ set streamid2 [r xadd mystream * item 2 value b]
+ # range result
+ set result1 [r stream.range mystream "-" "+"]
+ set expect1 [r xrange mystream "-" "+"]
+ assert_equal $result1 $expect1
+ # reverse range
+ set result_rev [r stream.range mystream "+" "-"]
+ set expect_rev [r xrevrange mystream "+" "-"]
+ assert_equal $result_rev $expect_rev
+
+ # only one item: range with startid = endid
+ set result2 [r stream.range mystream "-" $streamid1]
+ assert_equal $result2 "{$streamid1 {item 1 value a}}"
+ assert_equal $result2 [list [list $streamid1 {item 1 value a}]]
+ # only one item: range with startid = endid
+ set result3 [r stream.range mystream $streamid2 $streamid2]
+ assert_equal $result3 "{$streamid2 {item 2 value b}}"
+ assert_equal $result3 [list [list $streamid2 {item 2 value b}]]
+ }
+
+ test {Module stream iterator delete} {
+ r del mystream
+ set id1 [r xadd mystream * normal item]
+ set id2 [r xadd mystream * selfdestruct yes]
+ set id3 [r xadd mystream * another item]
+ # stream.range deletes the "selfdestruct" item after returning it
+ assert_equal \
+ "{$id1 {normal item}} {$id2 {selfdestruct yes}} {$id3 {another item}}" \
+ [r stream.range mystream - +]
+ # now, the "selfdestruct" item is gone
+ assert_equal \
+ "{$id1 {normal item}} {$id3 {another item}}" \
+ [r stream.range mystream - +]
+ }
+
+ test {Module stream trim by length} {
+ r del mystream
+ # exact maxlen
+ r xadd mystream * item 1 value a
+ r xadd mystream * item 2 value b
+ r xadd mystream * item 3 value c
+ assert_equal 3 [r xlen mystream]
+ assert_equal 0 [r stream.trim mystream maxlen = 5]
+ assert_equal 3 [r xlen mystream]
+ assert_equal 2 [r stream.trim mystream maxlen = 1]
+ assert_equal 1 [r xlen mystream]
+ assert_equal 1 [r stream.trim mystream maxlen = 0]
+ # check that there is no limit for exact maxlen
+ r stream.addn mystream 20000 item x value y
+ assert_equal 20000 [r stream.trim mystream maxlen = 0]
+ # approx maxlen (100 items per node implies default limit 10K items)
+ r stream.addn mystream 20000 item x value y
+ assert_equal 20000 [r xlen mystream]
+ assert_equal 10000 [r stream.trim mystream maxlen ~ 2]
+ assert_equal 9900 [r stream.trim mystream maxlen ~ 2]
+ assert_equal 0 [r stream.trim mystream maxlen ~ 2]
+ assert_equal 100 [r xlen mystream]
+ assert_equal 100 [r stream.trim mystream maxlen ~ 0]
+ assert_equal 0 [r xlen mystream]
+ }
+
+ test {Module stream trim by ID} {
+ r del mystream
+ # exact minid
+ r xadd mystream * item 1 value a
+ r xadd mystream * item 2 value b
+ set minid [r xadd mystream * item 3 value c]
+ assert_equal 3 [r xlen mystream]
+ assert_equal 0 [r stream.trim mystream minid = -]
+ assert_equal 3 [r xlen mystream]
+ assert_equal 2 [r stream.trim mystream minid = $minid]
+ assert_equal 1 [r xlen mystream]
+ assert_equal 1 [r stream.trim mystream minid = +]
+ # check that there is no limit for exact minid
+ r stream.addn mystream 20000 item x value y
+ assert_equal 20000 [r stream.trim mystream minid = +]
+ # approx minid (100 items per node implies default limit 10K items)
+ r stream.addn mystream 19980 item x value y
+ set minid [r xadd mystream * item x value y]
+ r stream.addn mystream 19 item x value y
+ assert_equal 20000 [r xlen mystream]
+ assert_equal 10000 [r stream.trim mystream minid ~ $minid]
+ assert_equal 9900 [r stream.trim mystream minid ~ $minid]
+ assert_equal 0 [r stream.trim mystream minid ~ $minid]
+ assert_equal 100 [r xlen mystream]
+ assert_equal 100 [r stream.trim mystream minid ~ +]
+ assert_equal 0 [r xlen mystream]
+ }
+
+ test "Unload the module - stream" {
+ assert_equal {OK} [r module unload stream]
+ }
+}