diff options
Diffstat (limited to 'tests/unit/type/stream.tcl')
-rw-r--r-- | tests/unit/type/stream.tcl | 940 |
1 files changed, 940 insertions, 0 deletions
diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl new file mode 100644 index 0000000..3081c40 --- /dev/null +++ b/tests/unit/type/stream.tcl @@ -0,0 +1,940 @@ +# return value is like strcmp() and similar. +proc streamCompareID {a b} { + if {$a eq $b} {return 0} + lassign [split $a -] a_ms a_seq + lassign [split $b -] b_ms b_seq + if {$a_ms > $b_ms} {return 1} + if {$a_ms < $b_ms} {return -1} + # Same ms case, compare seq. + if {$a_seq > $b_seq} {return 1} + if {$a_seq < $b_seq} {return -1} +} + +# return the ID immediately greater than the specified one. +# Note that this function does not care to handle 'seq' overflow +# since it's a 64 bit value. +proc streamNextID {id} { + lassign [split $id -] ms seq + incr seq + join [list $ms $seq] - +} + +# Generate a random stream entry ID with the ms part between min and max +# and a low sequence number (0 - 999 range), in order to stress test +# XRANGE against a Tcl implementation implementing the same concept +# with Tcl-only code in a linear array. +proc streamRandomID {min_id max_id} { + lassign [split $min_id -] min_ms min_seq + lassign [split $max_id -] max_ms max_seq + set delta [expr {$max_ms-$min_ms+1}] + set ms [expr {$min_ms+[randomInt $delta]}] + set seq [randomInt 1000] + return $ms-$seq +} + +# Tcl-side implementation of XRANGE to perform fuzz testing in the Redis +# XRANGE implementation. +proc streamSimulateXRANGE {items start end} { + set res {} + foreach i $items { + set this_id [lindex $i 0] + if {[streamCompareID $this_id $start] >= 0} { + if {[streamCompareID $this_id $end] <= 0} { + lappend res $i + } + } + } + return $res +} + +set content {} ;# Will be populated with Tcl side copy of the stream content. + +start_server { + tags {"stream"} +} { + test "XADD wrong number of args" { + assert_error {*wrong number of arguments for 'xadd' command} {r XADD mystream} + assert_error {*wrong number of arguments for 'xadd' command} {r XADD mystream *} + assert_error {*wrong number of arguments for 'xadd' command} {r XADD mystream * field} + } + + test {XADD can add entries into a stream that XRANGE can fetch} { + r XADD mystream * item 1 value a + r XADD mystream * item 2 value b + assert_equal 2 [r XLEN mystream] + set items [r XRANGE mystream - +] + assert_equal [lindex $items 0 1] {item 1 value a} + assert_equal [lindex $items 1 1] {item 2 value b} + } + + test {XADD IDs are incremental} { + set id1 [r XADD mystream * item 1 value a] + set id2 [r XADD mystream * item 2 value b] + set id3 [r XADD mystream * item 3 value c] + assert {[streamCompareID $id1 $id2] == -1} + assert {[streamCompareID $id2 $id3] == -1} + } + + test {XADD IDs are incremental when ms is the same as well} { + r multi + r XADD mystream * item 1 value a + r XADD mystream * item 2 value b + r XADD mystream * item 3 value c + lassign [r exec] id1 id2 id3 + assert {[streamCompareID $id1 $id2] == -1} + assert {[streamCompareID $id2 $id3] == -1} + } + + test {XADD IDs correctly report an error when overflowing} { + r DEL mystream + r xadd mystream 18446744073709551615-18446744073709551615 a b + assert_error ERR* {r xadd mystream * c d} + } + + test {XADD auto-generated sequence is incremented for last ID} { + r DEL mystream + set id1 [r XADD mystream 123-456 item 1 value a] + set id2 [r XADD mystream 123-* item 2 value b] + lassign [split $id2 -] _ seq + assert {$seq == 457} + assert {[streamCompareID $id1 $id2] == -1} + } + + test {XADD auto-generated sequence is zero for future timestamp ID} { + r DEL mystream + set id1 [r XADD mystream 123-456 item 1 value a] + set id2 [r XADD mystream 789-* item 2 value b] + lassign [split $id2 -] _ seq + assert {$seq == 0} + assert {[streamCompareID $id1 $id2] == -1} + } + + test {XADD auto-generated sequence can't be smaller than last ID} { + r DEL mystream + r XADD mystream 123-456 item 1 value a + assert_error ERR* {r XADD mystream 42-* item 2 value b} + } + + test {XADD auto-generated sequence can't overflow} { + r DEL mystream + r xadd mystream 1-18446744073709551615 a b + assert_error ERR* {r xadd mystream 1-* c d} + } + + test {XADD 0-* should succeed} { + r DEL mystream + set id [r xadd mystream 0-* a b] + lassign [split $id -] _ seq + assert {$seq == 1} + } + + test {XADD with MAXLEN option} { + r DEL mystream + for {set j 0} {$j < 1000} {incr j} { + if {rand() < 0.9} { + r XADD mystream MAXLEN 5 * xitem $j + } else { + r XADD mystream MAXLEN 5 * yitem $j + } + } + assert {[r xlen mystream] == 5} + set res [r xrange mystream - +] + set expected 995 + foreach r $res { + assert {[lindex $r 1 1] == $expected} + incr expected + } + } + + test {XADD with MAXLEN option and the '=' argument} { + r DEL mystream + for {set j 0} {$j < 1000} {incr j} { + if {rand() < 0.9} { + r XADD mystream MAXLEN = 5 * xitem $j + } else { + r XADD mystream MAXLEN = 5 * yitem $j + } + } + assert {[r XLEN mystream] == 5} + } + + test {XADD with MAXLEN option and the '~' argument} { + r DEL mystream + r config set stream-node-max-entries 100 + for {set j 0} {$j < 1000} {incr j} { + if {rand() < 0.9} { + r XADD mystream MAXLEN ~ 555 * xitem $j + } else { + r XADD mystream MAXLEN ~ 555 * yitem $j + } + } + assert {[r XLEN mystream] == 600} + } + + test {XADD with NOMKSTREAM option} { + r DEL mystream + assert_equal "" [r XADD mystream NOMKSTREAM * item 1 value a] + assert_equal 0 [r EXISTS mystream] + r XADD mystream * item 1 value a + r XADD mystream NOMKSTREAM * item 2 value b + assert_equal 2 [r XLEN mystream] + set items [r XRANGE mystream - +] + assert_equal [lindex $items 0 1] {item 1 value a} + assert_equal [lindex $items 1 1] {item 2 value b} + } + + test {XADD with MINID option} { + r DEL mystream + for {set j 1} {$j < 1001} {incr j} { + set minid 1000 + if {$j >= 5} { + set minid [expr {$j-5}] + } + if {rand() < 0.9} { + r XADD mystream MINID $minid $j xitem $j + } else { + r XADD mystream MINID $minid $j yitem $j + } + } + assert {[r xlen mystream] == 6} + set res [r xrange mystream - +] + set expected 995 + foreach r $res { + assert {[lindex $r 1 1] == $expected} + incr expected + } + } + + test {XTRIM with MINID option} { + r DEL mystream + r XADD mystream 1-0 f v + r XADD mystream 2-0 f v + r XADD mystream 3-0 f v + r XADD mystream 4-0 f v + r XADD mystream 5-0 f v + r XTRIM mystream MINID = 3-0 + assert_equal [r XRANGE mystream - +] {{3-0 {f v}} {4-0 {f v}} {5-0 {f v}}} + } + + test {XTRIM with MINID option, big delta from master record} { + r DEL mystream + r XADD mystream 1-0 f v + r XADD mystream 1641544570597-0 f v + r XADD mystream 1641544570597-1 f v + r XTRIM mystream MINID 1641544570597-0 + assert_equal [r XRANGE mystream - +] {{1641544570597-0 {f v}} {1641544570597-1 {f v}}} + } + + proc insert_into_stream_key {key {count 10000}} { + r multi + for {set j 0} {$j < $count} {incr j} { + # From time to time insert a field with a different set + # of fields in order to stress the stream compression code. + if {rand() < 0.9} { + r XADD $key * item $j + } else { + r XADD $key * item $j otherfield foo + } + } + r exec + } + + test {XADD mass insertion and XLEN} { + r DEL mystream + insert_into_stream_key mystream + + set items [r XRANGE mystream - +] + for {set j 0} {$j < 10000} {incr j} { + assert {[lrange [lindex $items $j 1] 0 1] eq [list item $j]} + } + assert {[r xlen mystream] == $j} + } + + test {XADD with ID 0-0} { + r DEL otherstream + catch {r XADD otherstream 0-0 k v} err + assert {[r EXISTS otherstream] == 0} + } + + test {XADD with LIMIT delete entries no more than limit} { + r del yourstream + for {set j 0} {$j < 3} {incr j} { + r XADD yourstream * xitem v + } + r XADD yourstream MAXLEN ~ 0 limit 1 * xitem v + assert {[r XLEN yourstream] == 4} + } + + test {XRANGE COUNT works as expected} { + assert {[llength [r xrange mystream - + COUNT 10]] == 10} + } + + test {XREVRANGE COUNT works as expected} { + assert {[llength [r xrevrange mystream + - COUNT 10]] == 10} + } + + test {XRANGE can be used to iterate the whole stream} { + set last_id "-" + set j 0 + while 1 { + set elements [r xrange mystream $last_id + COUNT 100] + if {[llength $elements] == 0} break + foreach e $elements { + assert {[lrange [lindex $e 1] 0 1] eq [list item $j]} + incr j; + } + set last_id [streamNextID [lindex $elements end 0]] + } + assert {$j == 10000} + } + + test {XREVRANGE returns the reverse of XRANGE} { + assert {[r xrange mystream - +] == [lreverse [r xrevrange mystream + -]]} + } + + test {XRANGE exclusive ranges} { + set ids {0-1 0-18446744073709551615 1-0 42-0 42-42 + 18446744073709551615-18446744073709551614 + 18446744073709551615-18446744073709551615} + set total [llength $ids] + r multi + r DEL vipstream + foreach id $ids { + r XADD vipstream $id foo bar + } + r exec + assert {[llength [r xrange vipstream - +]] == $total} + assert {[llength [r xrange vipstream ([lindex $ids 0] +]] == $total-1} + assert {[llength [r xrange vipstream - ([lindex $ids $total-1]]] == $total-1} + assert {[llength [r xrange vipstream (0-1 (1-0]] == 1} + assert {[llength [r xrange vipstream (1-0 (42-42]] == 1} + catch {r xrange vipstream (- +} e + assert_match {ERR*} $e + catch {r xrange vipstream - (+} e + assert_match {ERR*} $e + catch {r xrange vipstream (18446744073709551615-18446744073709551615 +} e + assert_match {ERR*} $e + catch {r xrange vipstream - (0-0} e + assert_match {ERR*} $e + } + + test {XREAD with non empty stream} { + set res [r XREAD COUNT 1 STREAMS mystream 0-0] + assert {[lrange [lindex $res 0 1 0 1] 0 1] eq {item 0}} + } + + test {Non blocking XREAD with empty streams} { + set res [r XREAD STREAMS s1{t} s2{t} 0-0 0-0] + assert {$res eq {}} + } + + test {XREAD with non empty second stream} { + insert_into_stream_key mystream{t} + set res [r XREAD COUNT 1 STREAMS nostream{t} mystream{t} 0-0 0-0] + assert {[lindex $res 0 0] eq {mystream{t}}} + assert {[lrange [lindex $res 0 1 0 1] 0 1] eq {item 0}} + } + + test {Blocking XREAD waiting new data} { + r XADD s2{t} * old abcd1234 + set rd [redis_deferring_client] + $rd XREAD BLOCK 20000 STREAMS s1{t} s2{t} s3{t} $ $ $ + wait_for_blocked_client + r XADD s2{t} * new abcd1234 + set res [$rd read] + assert {[lindex $res 0 0] eq {s2{t}}} + assert {[lindex $res 0 1 0 1] eq {new abcd1234}} + $rd close + } + + test {Blocking XREAD waiting old data} { + set rd [redis_deferring_client] + $rd XREAD BLOCK 20000 STREAMS s1{t} s2{t} s3{t} $ 0-0 $ + r XADD s2{t} * foo abcd1234 + set res [$rd read] + assert {[lindex $res 0 0] eq {s2{t}}} + assert {[lindex $res 0 1 0 1] eq {old abcd1234}} + $rd close + } + + test {Blocking XREAD will not reply with an empty array} { + r del s1 + r XADD s1 666 f v + r XADD s1 667 f2 v2 + r XDEL s1 667 + set rd [redis_deferring_client] + $rd XREAD BLOCK 10 STREAMS s1 666 + after 20 + assert {[$rd read] == {}} ;# before the fix, client didn't even block, but was served synchronously with {s1 {}} + $rd close + } + + test "Blocking XREAD for stream that ran dry (issue #5299)" { + set rd [redis_deferring_client] + + # Add a entry then delete it, now stream's last_id is 666. + r DEL mystream + r XADD mystream 666 key value + r XDEL mystream 666 + + # Pass a ID smaller than stream's last_id, released on timeout. + $rd XREAD BLOCK 10 STREAMS mystream 665 + assert_equal [$rd read] {} + + # Throw an error if the ID equal or smaller than the last_id. + assert_error ERR*equal*smaller* {r XADD mystream 665 key value} + assert_error ERR*equal*smaller* {r XADD mystream 666 key value} + + # Entered blocking state and then release because of the new entry. + $rd XREAD BLOCK 0 STREAMS mystream 665 + wait_for_blocked_clients_count 1 + r XADD mystream 667 key value + assert_equal [$rd read] {{mystream {{667-0 {key value}}}}} + + $rd close + } + + test "XREAD: XADD + DEL should not awake client" { + set rd [redis_deferring_client] + r del s1 + $rd XREAD BLOCK 20000 STREAMS s1 $ + wait_for_blocked_clients_count 1 + r multi + r XADD s1 * old abcd1234 + r DEL s1 + r exec + r XADD s1 * new abcd1234 + set res [$rd read] + assert {[lindex $res 0 0] eq {s1}} + assert {[lindex $res 0 1 0 1] eq {new abcd1234}} + $rd close + } + + test "XREAD: XADD + DEL + LPUSH should not awake client" { + set rd [redis_deferring_client] + r del s1 + $rd XREAD BLOCK 20000 STREAMS s1 $ + wait_for_blocked_clients_count 1 + r multi + r XADD s1 * old abcd1234 + r DEL s1 + r LPUSH s1 foo bar + r exec + r DEL s1 + r XADD s1 * new abcd1234 + set res [$rd read] + assert {[lindex $res 0 0] eq {s1}} + assert {[lindex $res 0 1 0 1] eq {new abcd1234}} + $rd close + } + + test {XREAD with same stream name multiple times should work} { + r XADD s2 * old abcd1234 + set rd [redis_deferring_client] + $rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $ + wait_for_blocked_clients_count 1 + r XADD s2 * new abcd1234 + set res [$rd read] + assert {[lindex $res 0 0] eq {s2}} + assert {[lindex $res 0 1 0 1] eq {new abcd1234}} + $rd close + } + + test {XREAD + multiple XADD inside transaction} { + r XADD s2 * old abcd1234 + set rd [redis_deferring_client] + $rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $ + wait_for_blocked_clients_count 1 + r MULTI + r XADD s2 * field one + r XADD s2 * field two + r XADD s2 * field three + r EXEC + set res [$rd read] + assert {[lindex $res 0 0] eq {s2}} + assert {[lindex $res 0 1 0 1] eq {field one}} + assert {[lindex $res 0 1 1 1] eq {field two}} + $rd close + } + + test {XDEL basic test} { + r del somestream + r xadd somestream * foo value0 + set id [r xadd somestream * foo value1] + r xadd somestream * foo value2 + r xdel somestream $id + assert {[r xlen somestream] == 2} + set result [r xrange somestream - +] + assert {[lindex $result 0 1 1] eq {value0}} + assert {[lindex $result 1 1 1] eq {value2}} + } + + test {XDEL multiply id test} { + r del somestream + r xadd somestream 1-1 a 1 + r xadd somestream 1-2 b 2 + r xadd somestream 1-3 c 3 + r xadd somestream 1-4 d 4 + r xadd somestream 1-5 e 5 + assert {[r xlen somestream] == 5} + assert {[r xdel somestream 1-1 1-4 1-5 2-1] == 3} + assert {[r xlen somestream] == 2} + set result [r xrange somestream - +] + assert {[dict get [lindex $result 0 1] b] eq {2}} + assert {[dict get [lindex $result 1 1] c] eq {3}} + } + # Here the idea is to check the consistency of the stream data structure + # as we remove all the elements down to zero elements. + test {XDEL fuzz test} { + r del somestream + set ids {} + set x 0; # Length of the stream + while 1 { + lappend ids [r xadd somestream * item $x] + incr x + # Add enough elements to have a few radix tree nodes inside the stream. + if {[dict get [r xinfo stream somestream] radix-tree-keys] > 20} break + } + + # Now remove all the elements till we reach an empty stream + # and after every deletion, check that the stream is sane enough + # to report the right number of elements with XRANGE: this will also + # force accessing the whole data structure to check sanity. + assert {[r xlen somestream] == $x} + + # We want to remove elements in random order to really test the + # implementation in a better way. + set ids [lshuffle $ids] + foreach id $ids { + assert {[r xdel somestream $id] == 1} + incr x -1 + assert {[r xlen somestream] == $x} + # The test would be too slow calling XRANGE for every iteration. + # Do it every 100 removal. + if {$x % 100 == 0} { + set res [r xrange somestream - +] + assert {[llength $res] == $x} + } + } + } + + test {XRANGE fuzzing} { + set items [r XRANGE mystream{t} - +] + set low_id [lindex $items 0 0] + set high_id [lindex $items end 0] + for {set j 0} {$j < 100} {incr j} { + set start [streamRandomID $low_id $high_id] + set end [streamRandomID $low_id $high_id] + set range [r xrange mystream{t} $start $end] + set tcl_range [streamSimulateXRANGE $items $start $end] + if {$range ne $tcl_range} { + puts "*** WARNING *** - XRANGE fuzzing mismatch: $start - $end" + puts "---" + puts "XRANGE: '$range'" + puts "---" + puts "TCL: '$tcl_range'" + puts "---" + fail "XRANGE fuzzing failed, check logs for details" + } + } + } + + test {XREVRANGE regression test for issue #5006} { + # Add non compressed entries + r xadd teststream 1234567891230 key1 value1 + r xadd teststream 1234567891240 key2 value2 + r xadd teststream 1234567891250 key3 value3 + + # Add SAMEFIELD compressed entries + r xadd teststream2 1234567891230 key1 value1 + r xadd teststream2 1234567891240 key1 value2 + r xadd teststream2 1234567891250 key1 value3 + + assert_equal [r xrevrange teststream 1234567891245 -] {{1234567891240-0 {key2 value2}} {1234567891230-0 {key1 value1}}} + + assert_equal [r xrevrange teststream2 1234567891245 -] {{1234567891240-0 {key1 value2}} {1234567891230-0 {key1 value1}}} + } + + test {XREAD streamID edge (no-blocking)} { + r del x + r XADD x 1-1 f v + r XADD x 1-18446744073709551615 f v + r XADD x 2-1 f v + set res [r XREAD BLOCK 0 STREAMS x 1-18446744073709551615] + assert {[lindex $res 0 1 0] == {2-1 {f v}}} + } + + test {XREAD streamID edge (blocking)} { + r del x + set rd [redis_deferring_client] + $rd XREAD BLOCK 0 STREAMS x 1-18446744073709551615 + wait_for_blocked_clients_count 1 + r XADD x 1-1 f v + r XADD x 1-18446744073709551615 f v + r XADD x 2-1 f v + set res [$rd read] + assert {[lindex $res 0 1 0] == {2-1 {f v}}} + $rd close + } + + test {XADD streamID edge} { + r del x + r XADD x 2577343934890-18446744073709551615 f v ;# we need the timestamp to be in the future + r XADD x * f2 v2 + assert_equal [r XRANGE x - +] {{2577343934890-18446744073709551615 {f v}} {2577343934891-0 {f2 v2}}} + } + + test {XTRIM with MAXLEN option basic test} { + r DEL mystream + for {set j 0} {$j < 1000} {incr j} { + if {rand() < 0.9} { + r XADD mystream * xitem $j + } else { + r XADD mystream * yitem $j + } + } + r XTRIM mystream MAXLEN 666 + assert {[r XLEN mystream] == 666} + r XTRIM mystream MAXLEN = 555 + assert {[r XLEN mystream] == 555} + r XTRIM mystream MAXLEN ~ 444 + assert {[r XLEN mystream] == 500} + r XTRIM mystream MAXLEN ~ 400 + assert {[r XLEN mystream] == 400} + } + + test {XADD with LIMIT consecutive calls} { + r del mystream + r config set stream-node-max-entries 10 + for {set j 0} {$j < 100} {incr j} { + r XADD mystream * xitem v + } + r XADD mystream MAXLEN ~ 55 LIMIT 30 * xitem v + assert {[r xlen mystream] == 71} + r XADD mystream MAXLEN ~ 55 LIMIT 30 * xitem v + assert {[r xlen mystream] == 62} + r config set stream-node-max-entries 100 + } + + test {XTRIM with ~ is limited} { + r del mystream + r config set stream-node-max-entries 1 + for {set j 0} {$j < 102} {incr j} { + r XADD mystream * xitem v + } + r XTRIM mystream MAXLEN ~ 1 + assert {[r xlen mystream] == 2} + r config set stream-node-max-entries 100 + } + + test {XTRIM without ~ is not limited} { + r del mystream + r config set stream-node-max-entries 1 + for {set j 0} {$j < 102} {incr j} { + r XADD mystream * xitem v + } + r XTRIM mystream MAXLEN 1 + assert {[r xlen mystream] == 1} + r config set stream-node-max-entries 100 + } + + test {XTRIM without ~ and with LIMIT} { + r del mystream + r config set stream-node-max-entries 1 + for {set j 0} {$j < 102} {incr j} { + r XADD mystream * xitem v + } + assert_error ERR* {r XTRIM mystream MAXLEN 1 LIMIT 30} + } + + test {XTRIM with LIMIT delete entries no more than limit} { + r del mystream + r config set stream-node-max-entries 2 + for {set j 0} {$j < 3} {incr j} { + r XADD mystream * xitem v + } + assert {[r XTRIM mystream MAXLEN ~ 0 LIMIT 1] == 0} + assert {[r XTRIM mystream MAXLEN ~ 0 LIMIT 2] == 2} + } +} + +start_server {tags {"stream needs:debug"} overrides {appendonly yes}} { + test {XADD with MAXLEN > xlen can propagate correctly} { + for {set j 0} {$j < 100} {incr j} { + r XADD mystream * xitem v + } + r XADD mystream MAXLEN 200 * xitem v + incr j + assert {[r xlen mystream] == $j} + r debug loadaof + r XADD mystream * xitem v + incr j + assert {[r xlen mystream] == $j} + } +} + +start_server {tags {"stream needs:debug"} overrides {appendonly yes}} { + test {XADD with MINID > lastid can propagate correctly} { + for {set j 0} {$j < 100} {incr j} { + set id [expr {$j+1}] + r XADD mystream $id xitem v + } + r XADD mystream MINID 1 * xitem v + incr j + assert {[r xlen mystream] == $j} + r debug loadaof + r XADD mystream * xitem v + incr j + assert {[r xlen mystream] == $j} + } +} + +start_server {tags {"stream needs:debug"} overrides {appendonly yes stream-node-max-entries 100}} { + test {XADD with ~ MAXLEN can propagate correctly} { + for {set j 0} {$j < 100} {incr j} { + r XADD mystream * xitem v + } + r XADD mystream MAXLEN ~ $j * xitem v + incr j + assert {[r xlen mystream] == $j} + r config set stream-node-max-entries 1 + r debug loadaof + r XADD mystream * xitem v + incr j + assert {[r xlen mystream] == $j} + } +} + +start_server {tags {"stream needs:debug"} overrides {appendonly yes stream-node-max-entries 10}} { + test {XADD with ~ MAXLEN and LIMIT can propagate correctly} { + for {set j 0} {$j < 100} {incr j} { + r XADD mystream * xitem v + } + r XADD mystream MAXLEN ~ 55 LIMIT 30 * xitem v + assert {[r xlen mystream] == 71} + r config set stream-node-max-entries 1 + r debug loadaof + r XADD mystream * xitem v + assert {[r xlen mystream] == 72} + } +} + +start_server {tags {"stream needs:debug"} overrides {appendonly yes stream-node-max-entries 100}} { + test {XADD with ~ MINID can propagate correctly} { + for {set j 0} {$j < 100} {incr j} { + set id [expr {$j+1}] + r XADD mystream $id xitem v + } + r XADD mystream MINID ~ $j * xitem v + incr j + assert {[r xlen mystream] == $j} + r config set stream-node-max-entries 1 + r debug loadaof + r XADD mystream * xitem v + incr j + assert {[r xlen mystream] == $j} + } +} + +start_server {tags {"stream needs:debug"} overrides {appendonly yes stream-node-max-entries 10}} { + test {XADD with ~ MINID and LIMIT can propagate correctly} { + for {set j 0} {$j < 100} {incr j} { + set id [expr {$j+1}] + r XADD mystream $id xitem v + } + r XADD mystream MINID ~ 55 LIMIT 30 * xitem v + assert {[r xlen mystream] == 71} + r config set stream-node-max-entries 1 + r debug loadaof + r XADD mystream * xitem v + assert {[r xlen mystream] == 72} + } +} + +start_server {tags {"stream needs:debug"} overrides {appendonly yes stream-node-max-entries 10}} { + test {XTRIM with ~ MAXLEN can propagate correctly} { + for {set j 0} {$j < 100} {incr j} { + r XADD mystream * xitem v + } + r XTRIM mystream MAXLEN ~ 85 + assert {[r xlen mystream] == 90} + r config set stream-node-max-entries 1 + r debug loadaof + r XADD mystream * xitem v + incr j + assert {[r xlen mystream] == 91} + } +} + +start_server {tags {"stream"}} { + test {XADD can CREATE an empty stream} { + r XADD mystream MAXLEN 0 * a b + assert {[dict get [r xinfo stream mystream] length] == 0} + } + + test {XSETID can set a specific ID} { + r XSETID mystream "200-0" + set reply [r XINFO stream mystream] + assert_equal [dict get $reply last-generated-id] "200-0" + assert_equal [dict get $reply entries-added] 1 + } + + test {XSETID cannot SETID with smaller ID} { + r XADD mystream * a b + catch {r XSETID mystream "1-1"} err + r XADD mystream MAXLEN 0 * a b + set err + } {ERR *smaller*} + + test {XSETID cannot SETID on non-existent key} { + catch {r XSETID stream 1-1} err + set _ $err + } {ERR no such key} + + test {XSETID cannot run with an offset but without a maximal tombstone} { + catch {r XSETID stream 1-1 0} err + set _ $err + } {ERR syntax error} + + test {XSETID cannot run with a maximal tombstone but without an offset} { + catch {r XSETID stream 1-1 0-0} err + set _ $err + } {ERR syntax error} + + test {XSETID errors on negstive offset} { + catch {r XSETID stream 1-1 ENTRIESADDED -1 MAXDELETEDID 0-0} err + set _ $err + } {ERR *must be positive} + + test {XSETID cannot set the maximal tombstone with larger ID} { + r DEL x + r XADD x 1-0 a b + + catch {r XSETID x "1-0" ENTRIESADDED 1 MAXDELETEDID "2-0" } err + r XADD mystream MAXLEN 0 * a b + set err + } {ERR *smaller*} + + test {XSETID cannot set the offset to less than the length} { + r DEL x + r XADD x 1-0 a b + + catch {r XSETID x "1-0" ENTRIESADDED 0 MAXDELETEDID "0-0" } err + r XADD mystream MAXLEN 0 * a b + set err + } {ERR *smaller*} + + test {XSETID cannot set smaller ID than current MAXDELETEDID} { + r DEL x + r XADD x 1-1 a 1 + r XADD x 1-2 b 2 + r XADD x 1-3 c 3 + r XDEL x 1-2 + r XDEL x 1-3 + set reply [r XINFO stream x] + assert_equal [dict get $reply max-deleted-entry-id] "1-3" + catch {r XSETID x "1-2" } err + set err + } {ERR *smaller*} +} + +start_server {tags {"stream"}} { + test {XADD advances the entries-added counter and sets the recorded-first-entry-id} { + r DEL x + r XADD x 1-0 data a + + set reply [r XINFO STREAM x FULL] + assert_equal [dict get $reply entries-added] 1 + assert_equal [dict get $reply recorded-first-entry-id] "1-0" + + r XADD x 2-0 data a + set reply [r XINFO STREAM x FULL] + assert_equal [dict get $reply entries-added] 2 + assert_equal [dict get $reply recorded-first-entry-id] "1-0" + } + + test {XDEL/TRIM are reflected by recorded first entry} { + r DEL x + r XADD x 1-0 data a + r XADD x 2-0 data a + r XADD x 3-0 data a + r XADD x 4-0 data a + r XADD x 5-0 data a + + set reply [r XINFO STREAM x FULL] + assert_equal [dict get $reply entries-added] 5 + assert_equal [dict get $reply recorded-first-entry-id] "1-0" + + r XDEL x 2-0 + set reply [r XINFO STREAM x FULL] + assert_equal [dict get $reply recorded-first-entry-id] "1-0" + + r XDEL x 1-0 + set reply [r XINFO STREAM x FULL] + assert_equal [dict get $reply recorded-first-entry-id] "3-0" + + r XTRIM x MAXLEN = 2 + set reply [r XINFO STREAM x FULL] + assert_equal [dict get $reply recorded-first-entry-id] "4-0" + } + + test {Maximum XDEL ID behaves correctly} { + r DEL x + r XADD x 1-0 data a + r XADD x 2-0 data b + r XADD x 3-0 data c + + set reply [r XINFO STREAM x FULL] + assert_equal [dict get $reply max-deleted-entry-id] "0-0" + + r XDEL x 2-0 + set reply [r XINFO STREAM x FULL] + assert_equal [dict get $reply max-deleted-entry-id] "2-0" + + r XDEL x 1-0 + set reply [r XINFO STREAM x FULL] + assert_equal [dict get $reply max-deleted-entry-id] "2-0" + } + + test {XADD with artial ID with maximal seq} { + r DEL x + r XADD x 1-18446744073709551615 f1 v1 + assert_error {*The ID specified in XADD is equal or smaller*} {r XADD x 1-* f2 v2} + } +} + +start_server {tags {"stream needs:debug"} overrides {appendonly yes aof-use-rdb-preamble no}} { + test {Empty stream can be rewrite into AOF correctly} { + r XADD mystream MAXLEN 0 * a b + assert {[dict get [r xinfo stream mystream] length] == 0} + r bgrewriteaof + waitForBgrewriteaof r + r debug loadaof + assert {[dict get [r xinfo stream mystream] length] == 0} + } + + test {Stream can be rewrite into AOF correctly after XDEL lastid} { + r XSETID mystream 0-0 + r XADD mystream 1-1 a b + r XADD mystream 2-2 a b + assert {[dict get [r xinfo stream mystream] length] == 2} + r XDEL mystream 2-2 + r bgrewriteaof + waitForBgrewriteaof r + r debug loadaof + assert {[dict get [r xinfo stream mystream] length] == 1} + assert_equal [dict get [r xinfo stream mystream] last-generated-id] "2-2" + } +} + +start_server {tags {"stream"}} { + test {XGROUP HELP should not have unexpected options} { + catch {r XGROUP help xxx} e + assert_match "*wrong number of arguments for 'xgroup|help' command" $e + } + + test {XINFO HELP should not have unexpected options} { + catch {r XINFO help xxx} e + assert_match "*wrong number of arguments for 'xinfo|help' command" $e + } +} |