From 317c0644ccf108aa23ef3fd8358bd66c2840bfc0 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 14 Apr 2024 15:40:54 +0200 Subject: Adding upstream version 5:7.2.4. Signed-off-by: Daniel Baumann --- tests/unit/type/stream-cgroups.tcl | 1297 ++++++++++++++++++++++++++++++++++++ 1 file changed, 1297 insertions(+) create mode 100644 tests/unit/type/stream-cgroups.tcl (limited to 'tests/unit/type/stream-cgroups.tcl') diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl new file mode 100644 index 0000000..a6cc5da --- /dev/null +++ b/tests/unit/type/stream-cgroups.tcl @@ -0,0 +1,1297 @@ +start_server { + tags {"stream"} +} { + test {XGROUP CREATE: creation and duplicate group name detection} { + r DEL mystream + r XADD mystream * foo bar + r XGROUP CREATE mystream mygroup $ + catch {r XGROUP CREATE mystream mygroup $} err + set err + } {BUSYGROUP*} + + test {XGROUP CREATE: with ENTRIESREAD parameter} { + r DEL mystream + r XADD mystream 1-1 a 1 + r XADD mystream 1-2 b 2 + r XADD mystream 1-3 c 3 + r XADD mystream 1-4 d 4 + assert_error "*value for ENTRIESREAD must be positive or -1*" {r XGROUP CREATE mystream mygroup $ ENTRIESREAD -3} + + r XGROUP CREATE mystream mygroup1 $ ENTRIESREAD 0 + r XGROUP CREATE mystream mygroup2 $ ENTRIESREAD 3 + + set reply [r xinfo groups mystream] + foreach group_info $reply { + set group_name [dict get $group_info name] + set entries_read [dict get $group_info entries-read] + if {$group_name == "mygroup1"} { + assert_equal $entries_read 0 + } else { + assert_equal $entries_read 3 + } + } + } + + test {XGROUP CREATE: automatic stream creation fails without MKSTREAM} { + r DEL mystream + catch {r XGROUP CREATE mystream mygroup $} err + set err + } {ERR*} + + test {XGROUP CREATE: automatic stream creation works with MKSTREAM} { + r DEL mystream + r XGROUP CREATE mystream mygroup $ MKSTREAM + } {OK} + + test {XREADGROUP will return only new elements} { + r XADD mystream * a 1 + r XADD mystream * b 2 + # XREADGROUP should return only the new elements "a 1" "b 1" + # and not the element "foo bar" which was pre existing in the + # stream (see previous test) + set reply [ + r XREADGROUP GROUP mygroup consumer-1 STREAMS mystream ">" + ] + assert {[llength [lindex $reply 0 1]] == 2} + lindex $reply 0 1 0 1 + } {a 1} + + test {XREADGROUP can read the history of the elements we own} { + # Add a few more elements + r XADD mystream * c 3 + r XADD mystream * d 4 + # Read a few elements using a different consumer name + set reply [ + r XREADGROUP GROUP mygroup consumer-2 STREAMS mystream ">" + ] + assert {[llength [lindex $reply 0 1]] == 2} + assert {[lindex $reply 0 1 0 1] eq {c 3}} + + set r1 [r XREADGROUP GROUP mygroup consumer-1 COUNT 10 STREAMS mystream 0] + set r2 [r XREADGROUP GROUP mygroup consumer-2 COUNT 10 STREAMS mystream 0] + assert {[lindex $r1 0 1 0 1] eq {a 1}} + assert {[lindex $r2 0 1 0 1] eq {c 3}} + } + + test {XPENDING is able to return pending items} { + set pending [r XPENDING mystream mygroup - + 10] + assert {[llength $pending] == 4} + for {set j 0} {$j < 4} {incr j} { + set item [lindex $pending $j] + if {$j < 2} { + set owner consumer-1 + } else { + set owner consumer-2 + } + assert {[lindex $item 1] eq $owner} + assert {[lindex $item 1] eq $owner} + } + } + + test {XPENDING can return single consumer items} { + set pending [r XPENDING mystream mygroup - + 10 consumer-1] + assert {[llength $pending] == 2} + } + + test {XPENDING only group} { + set pending [r XPENDING mystream mygroup] + assert {[llength $pending] == 4} + } + + test {XPENDING with IDLE} { + after 20 + set pending [r XPENDING mystream mygroup IDLE 99999999 - + 10 consumer-1] + assert {[llength $pending] == 0} + set pending [r XPENDING mystream mygroup IDLE 1 - + 10 consumer-1] + assert {[llength $pending] == 2} + set pending [r XPENDING mystream mygroup IDLE 99999999 - + 10] + assert {[llength $pending] == 0} + set pending [r XPENDING mystream mygroup IDLE 1 - + 10] + assert {[llength $pending] == 4} + } + + test {XPENDING with exclusive range intervals works as expected} { + set pending [r XPENDING mystream mygroup - + 10] + assert {[llength $pending] == 4} + set startid [lindex [lindex $pending 0] 0] + set endid [lindex [lindex $pending 3] 0] + set expending [r XPENDING mystream mygroup ($startid ($endid 10] + assert {[llength $expending] == 2} + for {set j 0} {$j < 2} {incr j} { + set itemid [lindex [lindex $expending $j] 0] + assert {$itemid ne $startid} + assert {$itemid ne $endid} + } + } + + test {XACK is able to remove items from the consumer/group PEL} { + set pending [r XPENDING mystream mygroup - + 10 consumer-1] + set id1 [lindex $pending 0 0] + set id2 [lindex $pending 1 0] + assert {[r XACK mystream mygroup $id1] eq 1} + set pending [r XPENDING mystream mygroup - + 10 consumer-1] + assert {[llength $pending] == 1} + set id [lindex $pending 0 0] + assert {$id eq $id2} + set global_pel [r XPENDING mystream mygroup - + 10] + assert {[llength $global_pel] == 3} + } + + test {XACK can't remove the same item multiple times} { + assert {[r XACK mystream mygroup $id1] eq 0} + } + + test {XACK is able to accept multiple arguments} { + # One of the IDs was already removed, so it should ack + # just ID2. + assert {[r XACK mystream mygroup $id1 $id2] eq 1} + } + + test {XACK should fail if got at least one invalid ID} { + r del mystream + r xgroup create s g $ MKSTREAM + r xadd s * f1 v1 + set c [llength [lindex [r xreadgroup group g c streams s >] 0 1]] + assert {$c == 1} + set pending [r xpending s g - + 10 c] + set id1 [lindex $pending 0 0] + assert_error "*Invalid stream ID specified*" {r xack s g $id1 invalid-id} + assert {[r xack s g $id1] eq 1} + } + + test {PEL NACK reassignment after XGROUP SETID event} { + r del events + r xadd events * f1 v1 + r xadd events * f1 v1 + r xadd events * f1 v1 + r xadd events * f1 v1 + r xgroup create events g1 $ + r xadd events * f1 v1 + set c [llength [lindex [r xreadgroup group g1 c1 streams events >] 0 1]] + assert {$c == 1} + r xgroup setid events g1 - + set c [llength [lindex [r xreadgroup group g1 c2 streams events >] 0 1]] + assert {$c == 5} + } + + test {XREADGROUP will not report data on empty history. Bug #5577} { + r del events + r xadd events * a 1 + r xadd events * b 2 + r xadd events * c 3 + r xgroup create events mygroup 0 + + # Current local PEL should be empty + set res [r xpending events mygroup - + 10] + assert {[llength $res] == 0} + + # So XREADGROUP should read an empty history as well + set res [r xreadgroup group mygroup myconsumer count 3 streams events 0] + assert {[llength [lindex $res 0 1]] == 0} + + # We should fetch all the elements in the stream asking for > + set res [r xreadgroup group mygroup myconsumer count 3 streams events >] + assert {[llength [lindex $res 0 1]] == 3} + + # Now the history is populated with three not acked entries + set res [r xreadgroup group mygroup myconsumer count 3 streams events 0] + assert {[llength [lindex $res 0 1]] == 3} + } + + test {XREADGROUP history reporting of deleted entries. Bug #5570} { + r del mystream + r XGROUP CREATE mystream mygroup $ MKSTREAM + r XADD mystream 1 field1 A + r XREADGROUP GROUP mygroup myconsumer STREAMS mystream > + r XADD mystream MAXLEN 1 2 field1 B + r XREADGROUP GROUP mygroup myconsumer STREAMS mystream > + + # Now we have two pending entries, however one should be deleted + # and one should be ok (we should only see "B") + set res [r XREADGROUP GROUP mygroup myconsumer STREAMS mystream 0-1] + assert {[lindex $res 0 1 0] == {1-0 {}}} + assert {[lindex $res 0 1 1] == {2-0 {field1 B}}} + } + + test {Blocking XREADGROUP will not reply with an empty array} { + r del mystream + r XGROUP CREATE mystream mygroup $ MKSTREAM + r XADD mystream 666 f v + set res [r XREADGROUP GROUP mygroup Alice BLOCK 10 STREAMS mystream ">"] + assert {[lindex $res 0 1 0] == {666-0 {f v}}} + r XADD mystream 667 f2 v2 + r XDEL mystream 667 + set rd [redis_deferring_client] + $rd XREADGROUP GROUP mygroup Alice BLOCK 10 STREAMS mystream ">" + wait_for_blocked_clients_count 0 + assert {[$rd read] == {}} ;# before the fix, client didn't even block, but was served synchronously with {mystream {}} + $rd close + } + + test {Blocking XREADGROUP: key deleted} { + r DEL mystream + r XADD mystream 666 f v + r XGROUP CREATE mystream mygroup $ + set rd [redis_deferring_client] + $rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream ">" + wait_for_blocked_clients_count 1 + r DEL mystream + assert_error "NOGROUP*" {$rd read} + $rd close + } + + test {Blocking XREADGROUP: key type changed with SET} { + r DEL mystream + r XADD mystream 666 f v + r XGROUP CREATE mystream mygroup $ + set rd [redis_deferring_client] + $rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream ">" + wait_for_blocked_clients_count 1 + r SET mystream val1 + assert_error "*WRONGTYPE*" {$rd read} + $rd close + } + + test {Blocking XREADGROUP: key type changed with transaction} { + r DEL mystream + r XADD mystream 666 f v + r XGROUP CREATE mystream mygroup $ + set rd [redis_deferring_client] + $rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream ">" + wait_for_blocked_clients_count 1 + r MULTI + r DEL mystream + r SADD mystream e1 + r EXEC + assert_error "*WRONGTYPE*" {$rd read} + $rd close + } + + test {Blocking XREADGROUP: flushed DB} { + r DEL mystream + r XADD mystream 666 f v + r XGROUP CREATE mystream mygroup $ + set rd [redis_deferring_client] + $rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream ">" + wait_for_blocked_clients_count 1 + r FLUSHALL + assert_error "*NOGROUP*" {$rd read} + $rd close + } + + test {Blocking XREADGROUP: swapped DB, key doesn't exist} { + r SELECT 4 + r FLUSHDB + r SELECT 9 + r DEL mystream + r XADD mystream 666 f v + r XGROUP CREATE mystream mygroup $ + set rd [redis_deferring_client] + $rd SELECT 9 + $rd read + $rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream ">" + wait_for_blocked_clients_count 1 + r SWAPDB 4 9 + assert_error "*NOGROUP*" {$rd read} + $rd close + } {0} {external:skip} + + test {Blocking XREADGROUP: swapped DB, key is not a stream} { + r SELECT 4 + r FLUSHDB + r LPUSH mystream e1 + r SELECT 9 + r DEL mystream + r XADD mystream 666 f v + r XGROUP CREATE mystream mygroup $ + set rd [redis_deferring_client] + $rd SELECT 9 + $rd read + $rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream ">" + wait_for_blocked_clients_count 1 + r SWAPDB 4 9 + assert_error "*WRONGTYPE*" {$rd read} + $rd close + } {0} {external:skip} + + test {XREAD and XREADGROUP against wrong parameter} { + r DEL mystream + r XADD mystream 666 f v + r XGROUP CREATE mystream mygroup $ + assert_error "ERR Unbalanced 'xreadgroup' list of streams: for each stream key an ID or '>' must be specified." {r XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream } + assert_error "ERR Unbalanced 'xread' list of streams: for each stream key an ID or '$' must be specified." {r XREAD COUNT 1 STREAMS mystream } + } + + test {Blocking XREAD: key deleted} { + r DEL mystream + r XADD mystream 666 f v + set rd [redis_deferring_client] + $rd XREAD BLOCK 0 STREAMS mystream "$" + wait_for_blocked_clients_count 1 + r DEL mystream + + r XADD mystream 667 f v + set res [$rd read] + assert_equal [lindex $res 0 1 0] {667-0 {f v}} + $rd close + } + + test {Blocking XREAD: key type changed with SET} { + r DEL mystream + r XADD mystream 666 f v + set rd [redis_deferring_client] + $rd XREAD BLOCK 0 STREAMS mystream "$" + wait_for_blocked_clients_count 1 + r SET mystream val1 + + r DEL mystream + r XADD mystream 667 f v + set res [$rd read] + assert_equal [lindex $res 0 1 0] {667-0 {f v}} + $rd close + } + + test {Blocking XREADGROUP for stream that ran dry (issue #5299)} { + set rd [redis_deferring_client] + + # Add a entry then delete it, now stream's last_id is 666. + r DEL mystream + r XGROUP CREATE mystream mygroup $ MKSTREAM + r XADD mystream 666 key value + r XDEL mystream 666 + + # Pass a special `>` ID but without new entry, released on timeout. + $rd XREADGROUP GROUP mygroup myconsumer BLOCK 10 STREAMS mystream > + assert_equal [$rd read] {} + + # Throw an error if the ID equal or smaller than the last_id. + assert_error ERR*equal*smaller* {r XADD mystream 665 key value} + assert_error ERR*equal*smaller* {r XADD mystream 666 key value} + + # Entered blocking state and then release because of the new entry. + $rd XREADGROUP GROUP mygroup myconsumer BLOCK 0 STREAMS mystream > + wait_for_blocked_clients_count 1 + r XADD mystream 667 key value + assert_equal [$rd read] {{mystream {{667-0 {key value}}}}} + + $rd close + } + + test "Blocking XREADGROUP will ignore BLOCK if ID is not >" { + set rd [redis_deferring_client] + + # Add a entry then delete it, now stream's last_id is 666. + r DEL mystream + r XGROUP CREATE mystream mygroup $ MKSTREAM + r XADD mystream 666 key value + r XDEL mystream 666 + + # Return right away instead of blocking, return the stream with an + # empty list instead of NIL if the ID specified is not the special `>` ID. + foreach id {0 600 666 700} { + $rd XREADGROUP GROUP mygroup myconsumer BLOCK 0 STREAMS mystream $id + assert_equal [$rd read] {{mystream {}}} + } + + # After adding a new entry, `XREADGROUP BLOCK` still return the stream + # with an empty list because the pending list is empty. + r XADD mystream 667 key value + foreach id {0 600 666 667 700} { + $rd XREADGROUP GROUP mygroup myconsumer BLOCK 0 STREAMS mystream $id + assert_equal [$rd read] {{mystream {}}} + } + + # After we read it once, the pending list is not empty at this time, + # pass any ID smaller than 667 will return one of the pending entry. + set res [r XREADGROUP GROUP mygroup myconsumer BLOCK 0 STREAMS mystream >] + assert_equal $res {{mystream {{667-0 {key value}}}}} + foreach id {0 600 666} { + $rd XREADGROUP GROUP mygroup myconsumer BLOCK 0 STREAMS mystream $id + assert_equal [$rd read] {{mystream {{667-0 {key value}}}}} + } + + # Pass ID equal or greater than 667 will return the stream with an empty list. + foreach id {667 700} { + $rd XREADGROUP GROUP mygroup myconsumer BLOCK 0 STREAMS mystream $id + assert_equal [$rd read] {{mystream {}}} + } + + # After we ACK the pending entry, return the stream with an empty list. + r XACK mystream mygroup 667 + foreach id {0 600 666 667 700} { + $rd XREADGROUP GROUP mygroup myconsumer BLOCK 0 STREAMS mystream $id + assert_equal [$rd read] {{mystream {}}} + } + + $rd close + } + + test {Blocking XREADGROUP for stream key that has clients blocked on list} { + set rd [redis_deferring_client] + set rd2 [redis_deferring_client] + + # First delete the stream + r DEL mystream + + # now place a client blocked on non-existing key as list + $rd2 BLPOP mystream 0 + + # wait until we verify the client is blocked + wait_for_blocked_clients_count 1 + + # verify we only have 1 regular blocking key + assert_equal 1 [getInfoProperty [r info clients] total_blocking_keys] + assert_equal 0 [getInfoProperty [r info clients] total_blocking_keys_on_nokey] + + # now write mystream as stream + r XADD mystream 666 key value + r XGROUP CREATE mystream mygroup $ MKSTREAM + + # block another client on xreadgroup + $rd XREADGROUP GROUP mygroup myconsumer BLOCK 0 STREAMS mystream ">" + + # wait until we verify we have 2 blocked clients (one for the list and one for the stream) + wait_for_blocked_clients_count 2 + + # verify we have 1 blocking key which also have clients blocked on nokey condition + assert_equal 1 [getInfoProperty [r info clients] total_blocking_keys] + assert_equal 1 [getInfoProperty [r info clients] total_blocking_keys_on_nokey] + + # now delete the key and verify we have no clients blocked on nokey condition + r DEL mystream + assert_error "NOGROUP*" {$rd read} + assert_equal 1 [getInfoProperty [r info clients] total_blocking_keys] + assert_equal 0 [getInfoProperty [r info clients] total_blocking_keys_on_nokey] + + # close the only left client and make sure we have no more blocking keys + $rd2 close + + # wait until we verify we have no more blocked clients + wait_for_blocked_clients_count 0 + + assert_equal 0 [getInfoProperty [r info clients] total_blocking_keys] + assert_equal 0 [getInfoProperty [r info clients] total_blocking_keys_on_nokey] + + $rd close + } + + test {Blocking XREADGROUP for stream key that has clients blocked on list - avoid endless loop} { + r DEL mystream + r XGROUP CREATE mystream mygroup $ MKSTREAM + + set rd1 [redis_deferring_client] + set rd2 [redis_deferring_client] + set rd3 [redis_deferring_client] + + $rd1 xreadgroup GROUP mygroup myuser COUNT 10 BLOCK 10000 STREAMS mystream > + $rd2 xreadgroup GROUP mygroup myuser COUNT 10 BLOCK 10000 STREAMS mystream > + $rd3 xreadgroup GROUP mygroup myuser COUNT 10 BLOCK 10000 STREAMS mystream > + + wait_for_blocked_clients_count 3 + + r xadd mystream MAXLEN 5000 * field1 value1 field2 value2 field3 value3 + + $rd1 close + $rd2 close + $rd3 close + + assert_equal [r ping] {PONG} + } + + test {XGROUP DESTROY should unblock XREADGROUP with -NOGROUP} { + r config resetstat + r del mystream + r XGROUP CREATE mystream mygroup $ MKSTREAM + set rd [redis_deferring_client] + $rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream ">" + wait_for_blocked_clients_count 1 + r XGROUP DESTROY mystream mygroup + assert_error "NOGROUP*" {$rd read} + $rd close + + # verify command stats, error stats and error counter work on failed blocked command + assert_match {*count=1*} [errorrstat NOGROUP r] + assert_match {*calls=1,*,rejected_calls=0,failed_calls=1} [cmdrstat xreadgroup r] + assert_equal [s total_error_replies] 1 + } + + test {RENAME can unblock XREADGROUP with data} { + r del mystream{t} + r XGROUP CREATE mystream{t} mygroup $ MKSTREAM + set rd [redis_deferring_client] + $rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream{t} ">" + wait_for_blocked_clients_count 1 + r XGROUP CREATE mystream2{t} mygroup $ MKSTREAM + r XADD mystream2{t} 100 f1 v1 + r RENAME mystream2{t} mystream{t} + assert_equal "{mystream{t} {{100-0 {f1 v1}}}}" [$rd read] ;# mystream2{t} had mygroup before RENAME + $rd close + } + + test {RENAME can unblock XREADGROUP with -NOGROUP} { + r del mystream{t} + r XGROUP CREATE mystream{t} mygroup $ MKSTREAM + set rd [redis_deferring_client] + $rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream{t} ">" + wait_for_blocked_clients_count 1 + r XADD mystream2{t} 100 f1 v1 + r RENAME mystream2{t} mystream{t} + assert_error "*NOGROUP*" {$rd read} ;# mystream2{t} didn't have mygroup before RENAME + $rd close + } + + test {XCLAIM can claim PEL items from another consumer} { + # Add 3 items into the stream, and create a consumer group + r del mystream + set id1 [r XADD mystream * a 1] + set id2 [r XADD mystream * b 2] + set id3 [r XADD mystream * c 3] + r XGROUP CREATE mystream mygroup 0 + + # Consumer 1 reads item 1 from the stream without acknowledgements. + # Consumer 2 then claims pending item 1 from the PEL of consumer 1 + set reply [ + r XREADGROUP GROUP mygroup consumer1 count 1 STREAMS mystream > + ] + assert {[llength [lindex $reply 0 1 0 1]] == 2} + assert {[lindex $reply 0 1 0 1] eq {a 1}} + + # make sure the entry is present in both the group, and the right consumer + assert {[llength [r XPENDING mystream mygroup - + 10]] == 1} + assert {[llength [r XPENDING mystream mygroup - + 10 consumer1]] == 1} + assert {[llength [r XPENDING mystream mygroup - + 10 consumer2]] == 0} + + after 200 + set reply [ + r XCLAIM mystream mygroup consumer2 10 $id1 + ] + assert {[llength [lindex $reply 0 1]] == 2} + assert {[lindex $reply 0 1] eq {a 1}} + + # make sure the entry is present in both the group, and the right consumer + assert {[llength [r XPENDING mystream mygroup - + 10]] == 1} + assert {[llength [r XPENDING mystream mygroup - + 10 consumer1]] == 0} + assert {[llength [r XPENDING mystream mygroup - + 10 consumer2]] == 1} + + # Consumer 1 reads another 2 items from stream + r XREADGROUP GROUP mygroup consumer1 count 2 STREAMS mystream > + after 200 + + # Delete item 2 from the stream. Now consumer 1 has PEL that contains + # only item 3. Try to use consumer 2 to claim the deleted item 2 + # from the PEL of consumer 1, this should be NOP + r XDEL mystream $id2 + set reply [ + r XCLAIM mystream mygroup consumer2 10 $id2 + ] + assert {[llength $reply] == 0} + + # Delete item 3 from the stream. Now consumer 1 has PEL that is empty. + # Try to use consumer 2 to claim the deleted item 3 from the PEL + # of consumer 1, this should be NOP + after 200 + r XDEL mystream $id3 + set reply [ + r XCLAIM mystream mygroup consumer2 10 $id3 + ] + assert {[llength $reply] == 0} + } + + test {XCLAIM without JUSTID increments delivery count} { + # Add 3 items into the stream, and create a consumer group + r del mystream + set id1 [r XADD mystream * a 1] + set id2 [r XADD mystream * b 2] + set id3 [r XADD mystream * c 3] + r XGROUP CREATE mystream mygroup 0 + + # Consumer 1 reads item 1 from the stream without acknowledgements. + # Consumer 2 then claims pending item 1 from the PEL of consumer 1 + set reply [ + r XREADGROUP GROUP mygroup consumer1 count 1 STREAMS mystream > + ] + assert {[llength [lindex $reply 0 1 0 1]] == 2} + assert {[lindex $reply 0 1 0 1] eq {a 1}} + after 200 + set reply [ + r XCLAIM mystream mygroup consumer2 10 $id1 + ] + assert {[llength [lindex $reply 0 1]] == 2} + assert {[lindex $reply 0 1] eq {a 1}} + + set reply [ + r XPENDING mystream mygroup - + 10 + ] + assert {[llength [lindex $reply 0]] == 4} + assert {[lindex $reply 0 3] == 2} + + # Consumer 3 then claims pending item 1 from the PEL of consumer 2 using JUSTID + after 200 + set reply [ + r XCLAIM mystream mygroup consumer3 10 $id1 JUSTID + ] + assert {[llength $reply] == 1} + assert {[lindex $reply 0] eq $id1} + + set reply [ + r XPENDING mystream mygroup - + 10 + ] + assert {[llength [lindex $reply 0]] == 4} + assert {[lindex $reply 0 3] == 2} + } + + test {XCLAIM same consumer} { + # Add 3 items into the stream, and create a consumer group + r del mystream + set id1 [r XADD mystream * a 1] + set id2 [r XADD mystream * b 2] + set id3 [r XADD mystream * c 3] + r XGROUP CREATE mystream mygroup 0 + + set reply [r XREADGROUP GROUP mygroup consumer1 count 1 STREAMS mystream >] + assert {[llength [lindex $reply 0 1 0 1]] == 2} + assert {[lindex $reply 0 1 0 1] eq {a 1}} + after 200 + # re-claim with the same consumer that already has it + assert {[llength [r XCLAIM mystream mygroup consumer1 10 $id1]] == 1} + + # make sure the entry is still in the PEL + set reply [r XPENDING mystream mygroup - + 10] + assert {[llength $reply] == 1} + assert {[lindex $reply 0 1] eq {consumer1}} + } + + test {XAUTOCLAIM can claim PEL items from another consumer} { + # Add 3 items into the stream, and create a consumer group + r del mystream + set id1 [r XADD mystream * a 1] + set id2 [r XADD mystream * b 2] + set id3 [r XADD mystream * c 3] + set id4 [r XADD mystream * d 4] + r XGROUP CREATE mystream mygroup 0 + + # Consumer 1 reads item 1 from the stream without acknowledgements. + # Consumer 2 then claims pending item 1 from the PEL of consumer 1 + set reply [r XREADGROUP GROUP mygroup consumer1 count 1 STREAMS mystream >] + assert_equal [llength [lindex $reply 0 1 0 1]] 2 + assert_equal [lindex $reply 0 1 0 1] {a 1} + after 200 + set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - COUNT 1] + assert_equal [llength $reply] 3 + assert_equal [lindex $reply 0] "0-0" + assert_equal [llength [lindex $reply 1]] 1 + assert_equal [llength [lindex $reply 1 0]] 2 + assert_equal [llength [lindex $reply 1 0 1]] 2 + assert_equal [lindex $reply 1 0 1] {a 1} + + # Consumer 1 reads another 2 items from stream + r XREADGROUP GROUP mygroup consumer1 count 3 STREAMS mystream > + + # For min-idle-time + after 200 + + # Delete item 2 from the stream. Now consumer 1 has PEL that contains + # only item 3. Try to use consumer 2 to claim the deleted item 2 + # from the PEL of consumer 1, this should return nil + r XDEL mystream $id2 + + # id1 and id3 are self-claimed here but not id2 ('count' was set to 3) + # we make sure id2 is indeed skipped (the cursor points to id4) + set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - COUNT 3] + + assert_equal [llength $reply] 3 + assert_equal [lindex $reply 0] $id4 + assert_equal [llength [lindex $reply 1]] 2 + assert_equal [llength [lindex $reply 1 0]] 2 + assert_equal [llength [lindex $reply 1 0 1]] 2 + assert_equal [lindex $reply 1 0 1] {a 1} + assert_equal [lindex $reply 1 1 1] {c 3} + assert_equal [llength [lindex $reply 2]] 1 + assert_equal [llength [lindex $reply 2 0]] 1 + + # Delete item 3 from the stream. Now consumer 1 has PEL that is empty. + # Try to use consumer 2 to claim the deleted item 3 from the PEL + # of consumer 1, this should return nil + after 200 + + r XDEL mystream $id4 + + # id1 and id3 are self-claimed here but not id2 and id4 ('count' is default 100) + set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - JUSTID] + + # we also test the JUSTID modifier here. note that, when using JUSTID, + # deleted entries are returned in reply (consistent with XCLAIM). + + assert_equal [llength $reply] 3 + assert_equal [lindex $reply 0] {0-0} + assert_equal [llength [lindex $reply 1]] 2 + assert_equal [lindex $reply 1 0] $id1 + assert_equal [lindex $reply 1 1] $id3 + } + + test {XAUTOCLAIM as an iterator} { + # Add 5 items into the stream, and create a consumer group + r del mystream + set id1 [r XADD mystream * a 1] + set id2 [r XADD mystream * b 2] + set id3 [r XADD mystream * c 3] + set id4 [r XADD mystream * d 4] + set id5 [r XADD mystream * e 5] + r XGROUP CREATE mystream mygroup 0 + + # Read 5 messages into consumer1 + r XREADGROUP GROUP mygroup consumer1 count 90 STREAMS mystream > + + # For min-idle-time + after 200 + + # Claim 2 entries + set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - COUNT 2] + assert_equal [llength $reply] 3 + set cursor [lindex $reply 0] + assert_equal $cursor $id3 + assert_equal [llength [lindex $reply 1]] 2 + assert_equal [llength [lindex $reply 1 0 1]] 2 + assert_equal [lindex $reply 1 0 1] {a 1} + + # Claim 2 more entries + set reply [r XAUTOCLAIM mystream mygroup consumer2 10 $cursor COUNT 2] + assert_equal [llength $reply] 3 + set cursor [lindex $reply 0] + assert_equal $cursor $id5 + assert_equal [llength [lindex $reply 1]] 2 + assert_equal [llength [lindex $reply 1 0 1]] 2 + assert_equal [lindex $reply 1 0 1] {c 3} + + # Claim last entry + set reply [r XAUTOCLAIM mystream mygroup consumer2 10 $cursor COUNT 1] + assert_equal [llength $reply] 3 + set cursor [lindex $reply 0] + assert_equal $cursor {0-0} + assert_equal [llength [lindex $reply 1]] 1 + assert_equal [llength [lindex $reply 1 0 1]] 2 + assert_equal [lindex $reply 1 0 1] {e 5} + } + + test {XAUTOCLAIM COUNT must be > 0} { + assert_error "ERR COUNT must be > 0" {r XAUTOCLAIM key group consumer 1 1 COUNT 0} + } + + test {XCLAIM with XDEL} { + r DEL x + r XADD x 1-0 f v + r XADD x 2-0 f v + r XADD x 3-0 f v + r XGROUP CREATE x grp 0 + assert_equal [r XREADGROUP GROUP grp Alice STREAMS x >] {{x {{1-0 {f v}} {2-0 {f v}} {3-0 {f v}}}}} + r XDEL x 2-0 + assert_equal [r XCLAIM x grp Bob 0 1-0 2-0 3-0] {{1-0 {f v}} {3-0 {f v}}} + assert_equal [r XPENDING x grp - + 10 Alice] {} + } + + test {XCLAIM with trimming} { + r DEL x + r config set stream-node-max-entries 2 + r XADD x 1-0 f v + r XADD x 2-0 f v + r XADD x 3-0 f v + r XGROUP CREATE x grp 0 + assert_equal [r XREADGROUP GROUP grp Alice STREAMS x >] {{x {{1-0 {f v}} {2-0 {f v}} {3-0 {f v}}}}} + r XTRIM x MAXLEN 1 + assert_equal [r XCLAIM x grp Bob 0 1-0 2-0 3-0] {{3-0 {f v}}} + assert_equal [r XPENDING x grp - + 10 Alice] {} + } + + test {XAUTOCLAIM with XDEL} { + r DEL x + r XADD x 1-0 f v + r XADD x 2-0 f v + r XADD x 3-0 f v + r XGROUP CREATE x grp 0 + assert_equal [r XREADGROUP GROUP grp Alice STREAMS x >] {{x {{1-0 {f v}} {2-0 {f v}} {3-0 {f v}}}}} + r XDEL x 2-0 + assert_equal [r XAUTOCLAIM x grp Bob 0 0-0] {0-0 {{1-0 {f v}} {3-0 {f v}}} 2-0} + assert_equal [r XPENDING x grp - + 10 Alice] {} + } + + test {XAUTOCLAIM with XDEL and count} { + r DEL x + r XADD x 1-0 f v + r XADD x 2-0 f v + r XADD x 3-0 f v + r XGROUP CREATE x grp 0 + assert_equal [r XREADGROUP GROUP grp Alice STREAMS x >] {{x {{1-0 {f v}} {2-0 {f v}} {3-0 {f v}}}}} + r XDEL x 1-0 + r XDEL x 2-0 + assert_equal [r XAUTOCLAIM x grp Bob 0 0-0 COUNT 1] {2-0 {} 1-0} + assert_equal [r XAUTOCLAIM x grp Bob 0 2-0 COUNT 1] {3-0 {} 2-0} + assert_equal [r XAUTOCLAIM x grp Bob 0 3-0 COUNT 1] {0-0 {{3-0 {f v}}} {}} + assert_equal [r XPENDING x grp - + 10 Alice] {} + } + + test {XAUTOCLAIM with out of range count} { + assert_error {ERR COUNT*} {r XAUTOCLAIM x grp Bob 0 3-0 COUNT 8070450532247928833} + } + + test {XCLAIM with trimming} { + r DEL x + r config set stream-node-max-entries 2 + r XADD x 1-0 f v + r XADD x 2-0 f v + r XADD x 3-0 f v + r XGROUP CREATE x grp 0 + assert_equal [r XREADGROUP GROUP grp Alice STREAMS x >] {{x {{1-0 {f v}} {2-0 {f v}} {3-0 {f v}}}}} + r XTRIM x MAXLEN 1 + assert_equal [r XAUTOCLAIM x grp Bob 0 0-0] {0-0 {{3-0 {f v}}} {1-0 2-0}} + assert_equal [r XPENDING x grp - + 10 Alice] {} + } + + test {XINFO FULL output} { + r del x + r XADD x 100 a 1 + r XADD x 101 b 1 + r XADD x 102 c 1 + r XADD x 103 e 1 + r XADD x 104 f 1 + r XGROUP CREATE x g1 0 + r XGROUP CREATE x g2 0 + r XREADGROUP GROUP g1 Alice COUNT 1 STREAMS x > + r XREADGROUP GROUP g1 Bob COUNT 1 STREAMS x > + r XREADGROUP GROUP g1 Bob NOACK COUNT 1 STREAMS x > + r XREADGROUP GROUP g2 Charlie COUNT 4 STREAMS x > + r XDEL x 103 + + set reply [r XINFO STREAM x FULL] + assert_equal [llength $reply] 18 + assert_equal [dict get $reply length] 4 + assert_equal [dict get $reply entries] "{100-0 {a 1}} {101-0 {b 1}} {102-0 {c 1}} {104-0 {f 1}}" + + # First consumer group + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $group name] "g1" + assert_equal [lindex [dict get $group pending] 0 0] "100-0" + set consumer [lindex [dict get $group consumers] 0] + assert_equal [dict get $consumer name] "Alice" + assert_equal [lindex [dict get $consumer pending] 0 0] "100-0" ;# first entry in first consumer's PEL + + # Second consumer group + set group [lindex [dict get $reply groups] 1] + assert_equal [dict get $group name] "g2" + set consumer [lindex [dict get $group consumers] 0] + assert_equal [dict get $consumer name] "Charlie" + assert_equal [lindex [dict get $consumer pending] 0 0] "100-0" ;# first entry in first consumer's PEL + assert_equal [lindex [dict get $consumer pending] 1 0] "101-0" ;# second entry in first consumer's PEL + + set reply [r XINFO STREAM x FULL COUNT 1] + assert_equal [llength $reply] 18 + assert_equal [dict get $reply length] 4 + assert_equal [dict get $reply entries] "{100-0 {a 1}}" + } + + test {Consumer seen-time and active-time} { + r DEL mystream + r XGROUP CREATE mystream mygroup $ MKSTREAM + r XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream > + after 100 + set reply [r xinfo consumers mystream mygroup] + set consumer_info [lindex $reply 0] + assert {[dict get $consumer_info idle] >= 100} ;# consumer idle (seen-time) + assert_equal [dict get $consumer_info inactive] "-1" ;# consumer inactive (active-time) + + r XADD mystream * f v + r XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream > + set reply [r xinfo consumers mystream mygroup] + set consumer_info [lindex $reply 0] + assert_equal [lindex $consumer_info 1] "Alice" ;# consumer name + assert {[dict get $consumer_info idle] < 80} ;# consumer idle (seen-time) + assert {[dict get $consumer_info inactive] < 80} ;# consumer inactive (active-time) + + after 100 + r XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream > + set reply [r xinfo consumers mystream mygroup] + set consumer_info [lindex $reply 0] + assert {[dict get $consumer_info idle] < 80} ;# consumer idle (seen-time) + assert {[dict get $consumer_info inactive] >= 100} ;# consumer inactive (active-time) + + + # Simulate loading from RDB + + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + set consumer [lindex [dict get $group consumers] 0] + set prev_seen [dict get $consumer seen-time] + set prev_active [dict get $consumer active-time] + + set dump [r DUMP mystream] + r DEL mystream + r RESTORE mystream 0 $dump + + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + set consumer [lindex [dict get $group consumers] 0] + assert_equal $prev_seen [dict get $consumer seen-time] + assert_equal $prev_active [dict get $consumer active-time] + } + + test {XGROUP CREATECONSUMER: create consumer if does not exist} { + r del mystream + r XGROUP CREATE mystream mygroup $ MKSTREAM + r XADD mystream * f v + + set reply [r xinfo groups mystream] + set group_info [lindex $reply 0] + set n_consumers [lindex $group_info 3] + assert_equal $n_consumers 0 ;# consumers number in cg + + # create consumer using XREADGROUP + r XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream > + + set reply [r xinfo groups mystream] + set group_info [lindex $reply 0] + set n_consumers [lindex $group_info 3] + assert_equal $n_consumers 1 ;# consumers number in cg + + set reply [r xinfo consumers mystream mygroup] + set consumer_info [lindex $reply 0] + assert_equal [lindex $consumer_info 1] "Alice" ;# consumer name + + # create group using XGROUP CREATECONSUMER when Alice already exists + set created [r XGROUP CREATECONSUMER mystream mygroup Alice] + assert_equal $created 0 + + # create group using XGROUP CREATECONSUMER when Bob does not exist + set created [r XGROUP CREATECONSUMER mystream mygroup Bob] + assert_equal $created 1 + + set reply [r xinfo groups mystream] + set group_info [lindex $reply 0] + set n_consumers [lindex $group_info 3] + assert_equal $n_consumers 2 ;# consumers number in cg + + set reply [r xinfo consumers mystream mygroup] + set consumer_info [lindex $reply 0] + assert_equal [lindex $consumer_info 1] "Alice" ;# consumer name + set consumer_info [lindex $reply 1] + assert_equal [lindex $consumer_info 1] "Bob" ;# consumer name + } + + test {XGROUP CREATECONSUMER: group must exist} { + r del mystream + r XADD mystream * f v + assert_error "*NOGROUP*" {r XGROUP CREATECONSUMER mystream mygroup consumer} + } + + start_server {tags {"stream needs:debug"} overrides {appendonly yes aof-use-rdb-preamble no appendfsync always}} { + test {XREADGROUP with NOACK creates consumer} { + r del mystream + r XGROUP CREATE mystream mygroup $ MKSTREAM + r XADD mystream * f1 v1 + r XREADGROUP GROUP mygroup Alice NOACK STREAMS mystream ">" + set rd [redis_deferring_client] + $rd XREADGROUP GROUP mygroup Bob BLOCK 0 NOACK STREAMS mystream ">" + wait_for_blocked_clients_count 1 + r XADD mystream * f2 v2 + set grpinfo [r xinfo groups mystream] + + r debug loadaof + assert_equal [r xinfo groups mystream] $grpinfo + set reply [r xinfo consumers mystream mygroup] + set consumer_info [lindex $reply 0] + assert_equal [lindex $consumer_info 1] "Alice" ;# consumer name + set consumer_info [lindex $reply 1] + assert_equal [lindex $consumer_info 1] "Bob" ;# consumer name + $rd close + } + + test {Consumer without PEL is present in AOF after AOFRW} { + r del mystream + r XGROUP CREATE mystream mygroup $ MKSTREAM + r XADD mystream * f v + r XREADGROUP GROUP mygroup Alice NOACK STREAMS mystream ">" + set rd [redis_deferring_client] + $rd XREADGROUP GROUP mygroup Bob BLOCK 0 NOACK STREAMS mystream ">" + wait_for_blocked_clients_count 1 + r XGROUP CREATECONSUMER mystream mygroup Charlie + set grpinfo [lindex [r xinfo groups mystream] 0] + + r bgrewriteaof + waitForBgrewriteaof r + r debug loadaof + + set curr_grpinfo [lindex [r xinfo groups mystream] 0] + assert {$curr_grpinfo == $grpinfo} + set n_consumers [lindex $grpinfo 3] + + # All consumers are created via XREADGROUP, regardless of whether they managed + # to read any entries ot not + assert_equal $n_consumers 3 + $rd close + } + } + + test {Consumer group read counter and lag in empty streams} { + r DEL x + r XGROUP CREATE x g1 0 MKSTREAM + + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $reply max-deleted-entry-id] "0-0" + assert_equal [dict get $reply entries-added] 0 + assert_equal [dict get $group entries-read] {} + assert_equal [dict get $group lag] 0 + + r XADD x 1-0 data a + r XDEL x 1-0 + + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $reply max-deleted-entry-id] "1-0" + assert_equal [dict get $reply entries-added] 1 + assert_equal [dict get $group entries-read] {} + assert_equal [dict get $group lag] 0 + } + + test {Consumer group read counter and lag sanity} { + r DEL x + r XADD x 1-0 data a + r XADD x 2-0 data b + r XADD x 3-0 data c + r XADD x 4-0 data d + r XADD x 5-0 data e + r XGROUP CREATE x g1 0 + + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $group entries-read] {} + assert_equal [dict get $group lag] 5 + + r XREADGROUP GROUP g1 c11 COUNT 1 STREAMS x > + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $group entries-read] 1 + assert_equal [dict get $group lag] 4 + + r XREADGROUP GROUP g1 c12 COUNT 10 STREAMS x > + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $group entries-read] 5 + assert_equal [dict get $group lag] 0 + + r XADD x 6-0 data f + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $group entries-read] 5 + assert_equal [dict get $group lag] 1 + } + + test {Consumer group lag with XDELs} { + r DEL x + r XADD x 1-0 data a + r XADD x 2-0 data b + r XADD x 3-0 data c + r XADD x 4-0 data d + r XADD x 5-0 data e + r XDEL x 3-0 + r XGROUP CREATE x g1 0 + r XGROUP CREATE x g2 0 + + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $group entries-read] {} + assert_equal [dict get $group lag] {} + + r XREADGROUP GROUP g1 c11 COUNT 1 STREAMS x > + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $group entries-read] {} + assert_equal [dict get $group lag] {} + + r XREADGROUP GROUP g1 c11 COUNT 1 STREAMS x > + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $group entries-read] {} + assert_equal [dict get $group lag] {} + + r XREADGROUP GROUP g1 c11 COUNT 1 STREAMS x > + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $group entries-read] {} + assert_equal [dict get $group lag] {} + + r XREADGROUP GROUP g1 c11 COUNT 1 STREAMS x > + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $group entries-read] 5 + assert_equal [dict get $group lag] 0 + + r XADD x 6-0 data f + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $group entries-read] 5 + assert_equal [dict get $group lag] 1 + + r XTRIM x MINID = 3-0 + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $group entries-read] 5 + assert_equal [dict get $group lag] 1 + set group [lindex [dict get $reply groups] 1] + assert_equal [dict get $group entries-read] {} + assert_equal [dict get $group lag] 3 + + r XTRIM x MINID = 5-0 + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $group entries-read] 5 + assert_equal [dict get $group lag] 1 + set group [lindex [dict get $reply groups] 1] + assert_equal [dict get $group entries-read] {} + assert_equal [dict get $group lag] 2 + } + + test {Loading from legacy (Redis <= v6.2.x, rdb_ver < 10) persistence} { + # The payload was DUMPed from a v5 instance after: + # XADD x 1-0 data a + # XADD x 2-0 data b + # XADD x 3-0 data c + # XADD x 4-0 data d + # XADD x 5-0 data e + # XADD x 6-0 data f + # XDEL x 3-0 + # XGROUP CREATE x g1 0 + # XGROUP CREATE x g2 0 + # XREADGROUP GROUP g1 c11 COUNT 4 STREAMS x > + # XTRIM x MAXLEN = 2 + + r DEL x + r RESTORE x 0 "\x0F\x01\x10\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\xC3\x40\x4A\x40\x57\x16\x57\x00\x00\x00\x23\x00\x02\x01\x04\x01\x01\x01\x84\x64\x61\x74\x61\x05\x00\x01\x03\x01\x00\x20\x01\x03\x81\x61\x02\x04\x20\x0A\x00\x01\x40\x0A\x00\x62\x60\x0A\x00\x02\x40\x0A\x00\x63\x60\x0A\x40\x22\x01\x81\x64\x20\x0A\x40\x39\x20\x0A\x00\x65\x60\x0A\x00\x05\x40\x0A\x00\x66\x20\x0A\x00\xFF\x02\x06\x00\x02\x02\x67\x31\x05\x00\x04\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x3E\xF7\x83\x43\x7A\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x3E\xF7\x83\x43\x7A\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x3E\xF7\x83\x43\x7A\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00\x3E\xF7\x83\x43\x7A\x01\x00\x00\x01\x01\x03\x63\x31\x31\x3E\xF7\x83\x43\x7A\x01\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00\x02\x67\x32\x00\x00\x00\x00\x09\x00\x3D\x52\xEF\x68\x67\x52\x1D\xFA" + + set reply [r XINFO STREAM x FULL] + assert_equal [dict get $reply max-deleted-entry-id] "0-0" + assert_equal [dict get $reply entries-added] 2 + set group [lindex [dict get $reply groups] 0] + assert_equal [dict get $group entries-read] 1 + assert_equal [dict get $group lag] 1 + set group [lindex [dict get $reply groups] 1] + assert_equal [dict get $group entries-read] 0 + assert_equal [dict get $group lag] 2 + } + + test {Loading from legacy (Redis <= v7.0.x, rdb_ver < 11) persistence} { + # The payload was DUMPed from a v7 instance after: + # XGROUP CREATE x g $ MKSTREAM + # XADD x 1-1 f v + # XREADGROUP GROUP g Alice STREAMS x > + + r DEL x + r RESTORE x 0 "\x13\x01\x10\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x1D\x1D\x00\x00\x00\x0A\x00\x01\x01\x00\x01\x01\x01\x81\x66\x02\x00\x01\x02\x01\x00\x01\x00\x01\x81\x76\x02\x04\x01\xFF\x01\x01\x01\x01\x01\x00\x00\x01\x01\x01\x67\x01\x01\x01\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\xF5\x5A\x71\xC7\x84\x01\x00\x00\x01\x01\x05\x41\x6C\x69\x63\x65\xF5\x5A\x71\xC7\x84\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x0B\x00\xA7\xA9\x14\xA5\x27\xFF\x9B\x9B" + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + set consumer [lindex [dict get $group consumers] 0] + assert_equal [dict get $consumer seen-time] [dict get $consumer active-time] + } + + start_server {tags {"external:skip"}} { + set master [srv -1 client] + set master_host [srv -1 host] + set master_port [srv -1 port] + set slave [srv 0 client] + + foreach noack {0 1} { + test "Consumer group last ID propagation to slave (NOACK=$noack)" { + $slave slaveof $master_host $master_port + wait_for_condition 50 100 { + [s 0 master_link_status] eq {up} + } else { + fail "Replication not started." + } + + $master del stream + $master xadd stream * a 1 + $master xadd stream * a 2 + $master xadd stream * a 3 + $master xgroup create stream mygroup 0 + + # Consume the first two items on the master + for {set j 0} {$j < 2} {incr j} { + if {$noack} { + set item [$master xreadgroup group mygroup \ + myconsumer COUNT 1 NOACK STREAMS stream >] + } else { + set item [$master xreadgroup group mygroup \ + myconsumer COUNT 1 STREAMS stream >] + } + set id [lindex $item 0 1 0 0] + if {$noack == 0} { + assert {[$master xack stream mygroup $id] eq "1"} + } + } + + wait_for_ofs_sync $master $slave + + # Turn slave into master + $slave slaveof no one + + set item [$slave xreadgroup group mygroup myconsumer \ + COUNT 1 STREAMS stream >] + + # The consumed entry should be the third + set myentry [lindex $item 0 1 0 1] + assert {$myentry eq {a 3}} + } + } + } + + start_server {tags {"external:skip"}} { + set master [srv -1 client] + set master_host [srv -1 host] + set master_port [srv -1 port] + set replica [srv 0 client] + + foreach autoclaim {0 1} { + test "Replication tests of XCLAIM with deleted entries (autclaim=$autoclaim)" { + $replica replicaof $master_host $master_port + wait_for_condition 50 100 { + [s 0 master_link_status] eq {up} + } else { + fail "Replication not started." + } + + $master DEL x + $master XADD x 1-0 f v + $master XADD x 2-0 f v + $master XADD x 3-0 f v + $master XADD x 4-0 f v + $master XADD x 5-0 f v + $master XGROUP CREATE x grp 0 + assert_equal [$master XREADGROUP GROUP grp Alice STREAMS x >] {{x {{1-0 {f v}} {2-0 {f v}} {3-0 {f v}} {4-0 {f v}} {5-0 {f v}}}}} + wait_for_ofs_sync $master $replica + assert_equal [llength [$replica XPENDING x grp - + 10 Alice]] 5 + $master XDEL x 2-0 + $master XDEL x 4-0 + if {$autoclaim} { + assert_equal [$master XAUTOCLAIM x grp Bob 0 0-0] {0-0 {{1-0 {f v}} {3-0 {f v}} {5-0 {f v}}} {2-0 4-0}} + wait_for_ofs_sync $master $replica + assert_equal [llength [$replica XPENDING x grp - + 10 Alice]] 0 + } else { + assert_equal [$master XCLAIM x grp Bob 0 1-0 2-0 3-0 4-0] {{1-0 {f v}} {3-0 {f v}}} + wait_for_ofs_sync $master $replica + assert_equal [llength [$replica XPENDING x grp - + 10 Alice]] 1 + } + } + } + } + + start_server {tags {"stream needs:debug"} overrides {appendonly yes aof-use-rdb-preamble no}} { + test {Empty stream with no lastid can be rewrite into AOF correctly} { + r XGROUP CREATE mystream group-name $ MKSTREAM + assert {[dict get [r xinfo stream mystream] length] == 0} + set grpinfo [r xinfo groups mystream] + r bgrewriteaof + waitForBgrewriteaof r + r debug loadaof + assert {[dict get [r xinfo stream mystream] length] == 0} + assert_equal [r xinfo groups mystream] $grpinfo + } + } +} -- cgit v1.2.3