diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-14 13:40:54 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-14 13:40:54 +0000 |
commit | 317c0644ccf108aa23ef3fd8358bd66c2840bfc0 (patch) | |
tree | c417b3d25c86b775989cb5ac042f37611b626c8a /tests/unit/moduleapi/stream.tcl | |
parent | Initial commit. (diff) | |
download | redis-317c0644ccf108aa23ef3fd8358bd66c2840bfc0.tar.xz redis-317c0644ccf108aa23ef3fd8358bd66c2840bfc0.zip |
Adding upstream version 5:7.2.4.upstream/5%7.2.4
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'tests/unit/moduleapi/stream.tcl')
-rw-r--r-- | tests/unit/moduleapi/stream.tcl | 176 |
1 files changed, 176 insertions, 0 deletions
diff --git a/tests/unit/moduleapi/stream.tcl b/tests/unit/moduleapi/stream.tcl new file mode 100644 index 0000000..7ad1a30 --- /dev/null +++ b/tests/unit/moduleapi/stream.tcl @@ -0,0 +1,176 @@ +set testmodule [file normalize tests/modules/stream.so] + +start_server {tags {"modules"}} { + r module load $testmodule + + test {Module stream add and delete} { + r del mystream + # add to empty key + set streamid1 [r stream.add mystream item 1 value a] + # add to existing stream + set streamid2 [r stream.add mystream item 2 value b] + # check result + assert { [string match "*-*" $streamid1] } + set items [r XRANGE mystream - +] + assert_equal $items \ + "{$streamid1 {item 1 value a}} {$streamid2 {item 2 value b}}" + # delete one of them and try deleting non-existing ID + assert_equal OK [r stream.delete mystream $streamid1] + assert_error "ERR StreamDelete*" {r stream.delete mystream 123-456} + assert_error "Invalid stream ID*" {r stream.delete mystream foo} + assert_equal "{$streamid2 {item 2 value b}}" [r XRANGE mystream - +] + # check error condition: wrong type + r del mystream + r set mystream mystring + assert_error "ERR StreamAdd*" {r stream.add mystream item 1 value a} + assert_error "ERR StreamDelete*" {r stream.delete mystream 123-456} + } + + test {Module stream add unblocks blocking xread} { + r del mystream + + # Blocking XREAD on an empty key + set rd1 [redis_deferring_client] + $rd1 XREAD BLOCK 3000 STREAMS mystream $ + # wait until client is actually blocked + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Client is not blocked" + } + set id [r stream.add mystream field 1 value a] + assert_equal "{mystream {{$id {field 1 value a}}}}" [$rd1 read] + + # Blocking XREAD on an existing stream + set rd2 [redis_deferring_client] + $rd2 XREAD BLOCK 3000 STREAMS mystream $ + # wait until client is actually blocked + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Client is not blocked" + } + set id [r stream.add mystream field 2 value b] + assert_equal "{mystream {{$id {field 2 value b}}}}" [$rd2 read] + } + + test {Module stream add benchmark (1M stream add)} { + set n 1000000 + r del mystream + set result [r stream.addn mystream $n field value] + assert_equal $result $n + } + + test {Module stream XADD big fields doesn't create empty key} { + set original_proto [config_get_set proto-max-bulk-len 2147483647] ;#2gb + set original_query [config_get_set client-query-buffer-limit 2147483647] ;#2gb + + r del mystream + r write "*4\r\n\$10\r\nstream.add\r\n\$8\r\nmystream\r\n\$5\r\nfield\r\n" + catch { + write_big_bulk 1073741824 ;#1gb + } err + assert {$err eq "ERR StreamAdd failed"} + assert_equal 0 [r exists mystream] + + # restore defaults + r config set proto-max-bulk-len $original_proto + r config set client-query-buffer-limit $original_query + } {OK} {large-memory} + + test {Module stream iterator} { + r del mystream + set streamid1 [r xadd mystream * item 1 value a] + set streamid2 [r xadd mystream * item 2 value b] + # range result + set result1 [r stream.range mystream "-" "+"] + set expect1 [r xrange mystream "-" "+"] + assert_equal $result1 $expect1 + # reverse range + set result_rev [r stream.range mystream "+" "-"] + set expect_rev [r xrevrange mystream "+" "-"] + assert_equal $result_rev $expect_rev + + # only one item: range with startid = endid + set result2 [r stream.range mystream "-" $streamid1] + assert_equal $result2 "{$streamid1 {item 1 value a}}" + assert_equal $result2 [list [list $streamid1 {item 1 value a}]] + # only one item: range with startid = endid + set result3 [r stream.range mystream $streamid2 $streamid2] + assert_equal $result3 "{$streamid2 {item 2 value b}}" + assert_equal $result3 [list [list $streamid2 {item 2 value b}]] + } + + test {Module stream iterator delete} { + r del mystream + set id1 [r xadd mystream * normal item] + set id2 [r xadd mystream * selfdestruct yes] + set id3 [r xadd mystream * another item] + # stream.range deletes the "selfdestruct" item after returning it + assert_equal \ + "{$id1 {normal item}} {$id2 {selfdestruct yes}} {$id3 {another item}}" \ + [r stream.range mystream - +] + # now, the "selfdestruct" item is gone + assert_equal \ + "{$id1 {normal item}} {$id3 {another item}}" \ + [r stream.range mystream - +] + } + + test {Module stream trim by length} { + r del mystream + # exact maxlen + r xadd mystream * item 1 value a + r xadd mystream * item 2 value b + r xadd mystream * item 3 value c + assert_equal 3 [r xlen mystream] + assert_equal 0 [r stream.trim mystream maxlen = 5] + assert_equal 3 [r xlen mystream] + assert_equal 2 [r stream.trim mystream maxlen = 1] + assert_equal 1 [r xlen mystream] + assert_equal 1 [r stream.trim mystream maxlen = 0] + # check that there is no limit for exact maxlen + r stream.addn mystream 20000 item x value y + assert_equal 20000 [r stream.trim mystream maxlen = 0] + # approx maxlen (100 items per node implies default limit 10K items) + r stream.addn mystream 20000 item x value y + assert_equal 20000 [r xlen mystream] + assert_equal 10000 [r stream.trim mystream maxlen ~ 2] + assert_equal 9900 [r stream.trim mystream maxlen ~ 2] + assert_equal 0 [r stream.trim mystream maxlen ~ 2] + assert_equal 100 [r xlen mystream] + assert_equal 100 [r stream.trim mystream maxlen ~ 0] + assert_equal 0 [r xlen mystream] + } + + test {Module stream trim by ID} { + r del mystream + # exact minid + r xadd mystream * item 1 value a + r xadd mystream * item 2 value b + set minid [r xadd mystream * item 3 value c] + assert_equal 3 [r xlen mystream] + assert_equal 0 [r stream.trim mystream minid = -] + assert_equal 3 [r xlen mystream] + assert_equal 2 [r stream.trim mystream minid = $minid] + assert_equal 1 [r xlen mystream] + assert_equal 1 [r stream.trim mystream minid = +] + # check that there is no limit for exact minid + r stream.addn mystream 20000 item x value y + assert_equal 20000 [r stream.trim mystream minid = +] + # approx minid (100 items per node implies default limit 10K items) + r stream.addn mystream 19980 item x value y + set minid [r xadd mystream * item x value y] + r stream.addn mystream 19 item x value y + assert_equal 20000 [r xlen mystream] + assert_equal 10000 [r stream.trim mystream minid ~ $minid] + assert_equal 9900 [r stream.trim mystream minid ~ $minid] + assert_equal 0 [r stream.trim mystream minid ~ $minid] + assert_equal 100 [r xlen mystream] + assert_equal 100 [r stream.trim mystream minid ~ +] + assert_equal 0 [r xlen mystream] + } + + test "Unload the module - stream" { + assert_equal {OK} [r module unload stream] + } +} |