summaryrefslogtreecommitdiffstats
path: root/src/t_stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/t_stream.c')
-rw-r--r--src/t_stream.c4038
1 files changed, 4038 insertions, 0 deletions
diff --git a/src/t_stream.c b/src/t_stream.c
new file mode 100644
index 0000000..5fcb631
--- /dev/null
+++ b/src/t_stream.c
@@ -0,0 +1,4038 @@
+/*
+ * Copyright (c) 2017, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "server.h"
+#include "endianconv.h"
+#include "stream.h"
+
+/* Every stream item inside the listpack, has a flags field that is used to
+ * mark the entry as deleted, or having the same field as the "master"
+ * entry at the start of the listpack> */
+#define STREAM_ITEM_FLAG_NONE 0 /* No special flags. */
+#define STREAM_ITEM_FLAG_DELETED (1<<0) /* Entry is deleted. Skip it. */
+#define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */
+
+/* For stream commands that require multiple IDs
+ * when the number of IDs is less than 'STREAMID_STATIC_VECTOR_LEN',
+ * avoid malloc allocation.*/
+#define STREAMID_STATIC_VECTOR_LEN 8
+
+/* Max pre-allocation for listpack. This is done to avoid abuse of a user
+ * setting stream_node_max_bytes to a huge number. */
+#define STREAM_LISTPACK_MAX_PRE_ALLOCATE 4096
+
+/* Don't let listpacks grow too big, even if the user config allows it.
+ * doing so can lead to an overflow (trying to store more than 32bit length
+ * into the listpack header), or actually an assertion since lpInsert
+ * will return NULL. */
+#define STREAM_LISTPACK_MAX_SIZE (1<<30)
+
+void streamFreeCG(streamCG *cg);
+void streamFreeNACK(streamNACK *na);
+size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer);
+int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int *seq_given);
+int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq);
+
+/* -----------------------------------------------------------------------
+ * Low level stream encoding: a radix tree of listpacks.
+ * ----------------------------------------------------------------------- */
+
+/* Create a new stream data structure. */
+stream *streamNew(void) {
+ stream *s = zmalloc(sizeof(*s));
+ s->rax = raxNew();
+ s->length = 0;
+ s->first_id.ms = 0;
+ s->first_id.seq = 0;
+ s->last_id.ms = 0;
+ s->last_id.seq = 0;
+ s->max_deleted_entry_id.seq = 0;
+ s->max_deleted_entry_id.ms = 0;
+ s->entries_added = 0;
+ s->cgroups = NULL; /* Created on demand to save memory when not used. */
+ return s;
+}
+
+/* Free a stream, including the listpacks stored inside the radix tree. */
+void freeStream(stream *s) {
+ raxFreeWithCallback(s->rax,(void(*)(void*))lpFree);
+ if (s->cgroups)
+ raxFreeWithCallback(s->cgroups,(void(*)(void*))streamFreeCG);
+ zfree(s);
+}
+
+/* Return the length of a stream. */
+unsigned long streamLength(const robj *subject) {
+ stream *s = subject->ptr;
+ return s->length;
+}
+
+/* Set 'id' to be its successor stream ID.
+ * If 'id' is the maximal possible id, it is wrapped around to 0-0 and a
+ * C_ERR is returned. */
+int streamIncrID(streamID *id) {
+ int ret = C_OK;
+ if (id->seq == UINT64_MAX) {
+ if (id->ms == UINT64_MAX) {
+ /* Special case where 'id' is the last possible streamID... */
+ id->ms = id->seq = 0;
+ ret = C_ERR;
+ } else {
+ id->ms++;
+ id->seq = 0;
+ }
+ } else {
+ id->seq++;
+ }
+ return ret;
+}
+
+/* Set 'id' to be its predecessor stream ID.
+ * If 'id' is the minimal possible id, it remains 0-0 and a C_ERR is
+ * returned. */
+int streamDecrID(streamID *id) {
+ int ret = C_OK;
+ if (id->seq == 0) {
+ if (id->ms == 0) {
+ /* Special case where 'id' is the first possible streamID... */
+ id->ms = id->seq = UINT64_MAX;
+ ret = C_ERR;
+ } else {
+ id->ms--;
+ id->seq = UINT64_MAX;
+ }
+ } else {
+ id->seq--;
+ }
+ return ret;
+}
+
+/* Generate the next stream item ID given the previous one. If the current
+ * milliseconds Unix time is greater than the previous one, just use this
+ * as time part and start with sequence part of zero. Otherwise we use the
+ * previous time (and never go backward) and increment the sequence. */
+void streamNextID(streamID *last_id, streamID *new_id) {
+ uint64_t ms = commandTimeSnapshot();
+ if (ms > last_id->ms) {
+ new_id->ms = ms;
+ new_id->seq = 0;
+ } else {
+ *new_id = *last_id;
+ streamIncrID(new_id);
+ }
+}
+
+/* This is a helper function for the COPY command.
+ * Duplicate a Stream object, with the guarantee that the returned object
+ * has the same encoding as the original one.
+ *
+ * The resulting object always has refcount set to 1 */
+robj *streamDup(robj *o) {
+ robj *sobj;
+
+ serverAssert(o->type == OBJ_STREAM);
+
+ switch (o->encoding) {
+ case OBJ_ENCODING_STREAM:
+ sobj = createStreamObject();
+ break;
+ default:
+ serverPanic("Wrong encoding.");
+ break;
+ }
+
+ stream *s;
+ stream *new_s;
+ s = o->ptr;
+ new_s = sobj->ptr;
+
+ raxIterator ri;
+ uint64_t rax_key[2];
+ raxStart(&ri, s->rax);
+ raxSeek(&ri, "^", NULL, 0);
+ size_t lp_bytes = 0; /* Total bytes in the listpack. */
+ unsigned char *lp = NULL; /* listpack pointer. */
+ /* Get a reference to the listpack node. */
+ while (raxNext(&ri)) {
+ lp = ri.data;
+ lp_bytes = lpBytes(lp);
+ unsigned char *new_lp = zmalloc(lp_bytes);
+ memcpy(new_lp, lp, lp_bytes);
+ memcpy(rax_key, ri.key, sizeof(rax_key));
+ raxInsert(new_s->rax, (unsigned char *)&rax_key, sizeof(rax_key),
+ new_lp, NULL);
+ }
+ new_s->length = s->length;
+ new_s->first_id = s->first_id;
+ new_s->last_id = s->last_id;
+ new_s->max_deleted_entry_id = s->max_deleted_entry_id;
+ new_s->entries_added = s->entries_added;
+ raxStop(&ri);
+
+ if (s->cgroups == NULL) return sobj;
+
+ /* Consumer Groups */
+ raxIterator ri_cgroups;
+ raxStart(&ri_cgroups, s->cgroups);
+ raxSeek(&ri_cgroups, "^", NULL, 0);
+ while (raxNext(&ri_cgroups)) {
+ streamCG *cg = ri_cgroups.data;
+ streamCG *new_cg = streamCreateCG(new_s, (char *)ri_cgroups.key,
+ ri_cgroups.key_len, &cg->last_id,
+ cg->entries_read);
+
+ serverAssert(new_cg != NULL);
+
+ /* Consumer Group PEL */
+ raxIterator ri_cg_pel;
+ raxStart(&ri_cg_pel,cg->pel);
+ raxSeek(&ri_cg_pel,"^",NULL,0);
+ while(raxNext(&ri_cg_pel)){
+ streamNACK *nack = ri_cg_pel.data;
+ streamNACK *new_nack = streamCreateNACK(NULL);
+ new_nack->delivery_time = nack->delivery_time;
+ new_nack->delivery_count = nack->delivery_count;
+ raxInsert(new_cg->pel, ri_cg_pel.key, sizeof(streamID), new_nack, NULL);
+ }
+ raxStop(&ri_cg_pel);
+
+ /* Consumers */
+ raxIterator ri_consumers;
+ raxStart(&ri_consumers, cg->consumers);
+ raxSeek(&ri_consumers, "^", NULL, 0);
+ while (raxNext(&ri_consumers)) {
+ streamConsumer *consumer = ri_consumers.data;
+ streamConsumer *new_consumer;
+ new_consumer = zmalloc(sizeof(*new_consumer));
+ new_consumer->name = sdsdup(consumer->name);
+ new_consumer->pel = raxNew();
+ raxInsert(new_cg->consumers,(unsigned char *)new_consumer->name,
+ sdslen(new_consumer->name), new_consumer, NULL);
+ new_consumer->seen_time = consumer->seen_time;
+ new_consumer->active_time = consumer->active_time;
+
+ /* Consumer PEL */
+ raxIterator ri_cpel;
+ raxStart(&ri_cpel, consumer->pel);
+ raxSeek(&ri_cpel, "^", NULL, 0);
+ while (raxNext(&ri_cpel)) {
+ streamNACK *new_nack = raxFind(new_cg->pel,ri_cpel.key,sizeof(streamID));
+
+ serverAssert(new_nack != raxNotFound);
+
+ new_nack->consumer = new_consumer;
+ raxInsert(new_consumer->pel,ri_cpel.key,sizeof(streamID),new_nack,NULL);
+ }
+ raxStop(&ri_cpel);
+ }
+ raxStop(&ri_consumers);
+ }
+ raxStop(&ri_cgroups);
+ return sobj;
+}
+
+/* This is a wrapper function for lpGet() to directly get an integer value
+ * from the listpack (that may store numbers as a string), converting
+ * the string if needed.
+ * The 'valid" argument is an optional output parameter to get an indication
+ * if the record was valid, when this parameter is NULL, the function will
+ * fail with an assertion. */
+static inline int64_t lpGetIntegerIfValid(unsigned char *ele, int *valid) {
+ int64_t v;
+ unsigned char *e = lpGet(ele,&v,NULL);
+ if (e == NULL) {
+ if (valid)
+ *valid = 1;
+ return v;
+ }
+ /* The following code path should never be used for how listpacks work:
+ * they should always be able to store an int64_t value in integer
+ * encoded form. However the implementation may change. */
+ long long ll;
+ int ret = string2ll((char*)e,v,&ll);
+ if (valid)
+ *valid = ret;
+ else
+ serverAssert(ret != 0);
+ v = ll;
+ return v;
+}
+
+#define lpGetInteger(ele) lpGetIntegerIfValid(ele, NULL)
+
+/* Get an edge streamID of a given listpack.
+ * 'master_id' is an input param, used to build the 'edge_id' output param */
+int lpGetEdgeStreamID(unsigned char *lp, int first, streamID *master_id, streamID *edge_id)
+{
+ if (lp == NULL)
+ return 0;
+
+ unsigned char *lp_ele;
+
+ /* We need to seek either the first or the last entry depending
+ * on the direction of the iteration. */
+ if (first) {
+ /* Get the master fields count. */
+ lp_ele = lpFirst(lp); /* Seek items count */
+ lp_ele = lpNext(lp, lp_ele); /* Seek deleted count. */
+ lp_ele = lpNext(lp, lp_ele); /* Seek num fields. */
+ int64_t master_fields_count = lpGetInteger(lp_ele);
+ lp_ele = lpNext(lp, lp_ele); /* Seek first field. */
+
+ /* If we are iterating in normal order, skip the master fields
+ * to seek the first actual entry. */
+ for (int64_t i = 0; i < master_fields_count; i++)
+ lp_ele = lpNext(lp, lp_ele);
+
+ /* If we are going forward, skip the previous entry's
+ * lp-count field (or in case of the master entry, the zero
+ * term field) */
+ lp_ele = lpNext(lp, lp_ele);
+ if (lp_ele == NULL)
+ return 0;
+ } else {
+ /* If we are iterating in reverse direction, just seek the
+ * last part of the last entry in the listpack (that is, the
+ * fields count). */
+ lp_ele = lpLast(lp);
+
+ /* If we are going backward, read the number of elements this
+ * entry is composed of, and jump backward N times to seek
+ * its start. */
+ int64_t lp_count = lpGetInteger(lp_ele);
+ if (lp_count == 0) /* We reached the master entry. */
+ return 0;
+
+ while (lp_count--)
+ lp_ele = lpPrev(lp, lp_ele);
+ }
+
+ lp_ele = lpNext(lp, lp_ele); /* Seek ID (lp_ele currently points to 'flags'). */
+
+ /* Get the ID: it is encoded as difference between the master
+ * ID and this entry ID. */
+ streamID id = *master_id;
+ id.ms += lpGetInteger(lp_ele);
+ lp_ele = lpNext(lp, lp_ele);
+ id.seq += lpGetInteger(lp_ele);
+ *edge_id = id;
+ return 1;
+}
+
+/* Debugging function to log the full content of a listpack. Useful
+ * for development and debugging. */
+void streamLogListpackContent(unsigned char *lp) {
+ unsigned char *p = lpFirst(lp);
+ while(p) {
+ unsigned char buf[LP_INTBUF_SIZE];
+ int64_t v;
+ unsigned char *ele = lpGet(p,&v,buf);
+ serverLog(LL_WARNING,"- [%d] '%.*s'", (int)v, (int)v, ele);
+ p = lpNext(lp,p);
+ }
+}
+
+/* Convert the specified stream entry ID as a 128 bit big endian number, so
+ * that the IDs can be sorted lexicographically. */
+void streamEncodeID(void *buf, streamID *id) {
+ uint64_t e[2];
+ e[0] = htonu64(id->ms);
+ e[1] = htonu64(id->seq);
+ memcpy(buf,e,sizeof(e));
+}
+
+/* This is the reverse of streamEncodeID(): the decoded ID will be stored
+ * in the 'id' structure passed by reference. The buffer 'buf' must point
+ * to a 128 bit big-endian encoded ID. */
+void streamDecodeID(void *buf, streamID *id) {
+ uint64_t e[2];
+ memcpy(e,buf,sizeof(e));
+ id->ms = ntohu64(e[0]);
+ id->seq = ntohu64(e[1]);
+}
+
+/* Compare two stream IDs. Return -1 if a < b, 0 if a == b, 1 if a > b. */
+int streamCompareID(streamID *a, streamID *b) {
+ if (a->ms > b->ms) return 1;
+ else if (a->ms < b->ms) return -1;
+ /* The ms part is the same. Check the sequence part. */
+ else if (a->seq > b->seq) return 1;
+ else if (a->seq < b->seq) return -1;
+ /* Everything is the same: IDs are equal. */
+ return 0;
+}
+
+/* Retrieves the ID of the stream edge entry. An edge is either the first or
+ * the last ID in the stream, and may be a tombstone. To filter out tombstones,
+ * set the'skip_tombstones' argument to 1. */
+void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id)
+{
+ streamIterator si;
+ int64_t numfields;
+ streamIteratorStart(&si,s,NULL,NULL,!first);
+ si.skip_tombstones = skip_tombstones;
+ int found = streamIteratorGetID(&si,edge_id,&numfields);
+ if (!found) {
+ streamID min_id = {0, 0}, max_id = {UINT64_MAX, UINT64_MAX};
+ *edge_id = first ? max_id : min_id;
+ }
+ streamIteratorStop(&si);
+}
+
+/* Adds a new item into the stream 's' having the specified number of
+ * field-value pairs as specified in 'numfields' and stored into 'argv'.
+ * Returns the new entry ID populating the 'added_id' structure.
+ *
+ * If 'use_id' is not NULL, the ID is not auto-generated by the function,
+ * but instead the passed ID is used to add the new entry. In this case
+ * adding the entry may fail as specified later in this comment.
+ *
+ * When 'use_id' is used alongside with a zero 'seq-given', the sequence
+ * part of the passed ID is ignored and the function will attempt to use an
+ * auto-generated sequence.
+ *
+ * The function returns C_OK if the item was added, this is always true
+ * if the ID was generated by the function. However the function may return
+ * C_ERR in several cases:
+ * 1. If an ID was given via 'use_id', but adding it failed since the
+ * current top ID is greater or equal. errno will be set to EDOM.
+ * 2. If a size of a single element or the sum of the elements is too big to
+ * be stored into the stream. errno will be set to ERANGE. */
+int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id, int seq_given) {
+
+ /* Generate the new entry ID. */
+ streamID id;
+ if (use_id) {
+ if (seq_given) {
+ id = *use_id;
+ } else {
+ /* The automatically generated sequence can be either zero (new
+ * timestamps) or the incremented sequence of the last ID. In the
+ * latter case, we need to prevent an overflow/advancing forward
+ * in time. */
+ if (s->last_id.ms == use_id->ms) {
+ if (s->last_id.seq == UINT64_MAX) {
+ errno = EDOM;
+ return C_ERR;
+ }
+ id = s->last_id;
+ id.seq++;
+ } else {
+ id = *use_id;
+ }
+ }
+ } else {
+ streamNextID(&s->last_id,&id);
+ }
+
+ /* Check that the new ID is greater than the last entry ID
+ * or return an error. Automatically generated IDs might
+ * overflow (and wrap-around) when incrementing the sequence
+ part. */
+ if (streamCompareID(&id,&s->last_id) <= 0) {
+ errno = EDOM;
+ return C_ERR;
+ }
+
+ /* Avoid overflow when trying to add an element to the stream (listpack
+ * can only host up to 32bit length strings, and also a total listpack size
+ * can't be bigger than 32bit length. */
+ size_t totelelen = 0;
+ for (int64_t i = 0; i < numfields*2; i++) {
+ sds ele = argv[i]->ptr;
+ totelelen += sdslen(ele);
+ }
+ if (totelelen > STREAM_LISTPACK_MAX_SIZE) {
+ errno = ERANGE;
+ return C_ERR;
+ }
+
+ /* Add the new entry. */
+ raxIterator ri;
+ raxStart(&ri,s->rax);
+ raxSeek(&ri,"$",NULL,0);
+
+ size_t lp_bytes = 0; /* Total bytes in the tail listpack. */
+ unsigned char *lp = NULL; /* Tail listpack pointer. */
+
+ if (!raxEOF(&ri)) {
+ /* Get a reference to the tail node listpack. */
+ lp = ri.data;
+ lp_bytes = lpBytes(lp);
+ }
+ raxStop(&ri);
+
+ /* We have to add the key into the radix tree in lexicographic order,
+ * to do so we consider the ID as a single 128 bit number written in
+ * big endian, so that the most significant bytes are the first ones. */
+ uint64_t rax_key[2]; /* Key in the radix tree containing the listpack.*/
+ streamID master_id; /* ID of the master entry in the listpack. */
+
+ /* Create a new listpack and radix tree node if needed. Note that when
+ * a new listpack is created, we populate it with a "master entry". This
+ * is just a set of fields that is taken as references in order to compress
+ * the stream entries that we'll add inside the listpack.
+ *
+ * Note that while we use the first added entry fields to create
+ * the master entry, the first added entry is NOT represented in the master
+ * entry, which is a stand alone object. But of course, the first entry
+ * will compress well because it's used as reference.
+ *
+ * The master entry is composed like in the following example:
+ *
+ * +-------+---------+------------+---------+--/--+---------+---------+-+
+ * | count | deleted | num-fields | field_1 | field_2 | ... | field_N |0|
+ * +-------+---------+------------+---------+--/--+---------+---------+-+
+ *
+ * count and deleted just represent respectively the total number of
+ * entries inside the listpack that are valid, and marked as deleted
+ * (deleted flag in the entry flags set). So the total number of items
+ * actually inside the listpack (both deleted and not) is count+deleted.
+ *
+ * The real entries will be encoded with an ID that is just the
+ * millisecond and sequence difference compared to the key stored at
+ * the radix tree node containing the listpack (delta encoding), and
+ * if the fields of the entry are the same as the master entry fields, the
+ * entry flags will specify this fact and the entry fields and number
+ * of fields will be omitted (see later in the code of this function).
+ *
+ * The "0" entry at the end is the same as the 'lp-count' entry in the
+ * regular stream entries (see below), and marks the fact that there are
+ * no more entries, when we scan the stream from right to left. */
+
+ /* First of all, check if we can append to the current macro node or
+ * if we need to switch to the next one. 'lp' will be set to NULL if
+ * the current node is full. */
+ if (lp != NULL) {
+ int new_node = 0;
+ size_t node_max_bytes = server.stream_node_max_bytes;
+ if (node_max_bytes == 0 || node_max_bytes > STREAM_LISTPACK_MAX_SIZE)
+ node_max_bytes = STREAM_LISTPACK_MAX_SIZE;
+ if (lp_bytes + totelelen >= node_max_bytes) {
+ new_node = 1;
+ } else if (server.stream_node_max_entries) {
+ unsigned char *lp_ele = lpFirst(lp);
+ /* Count both live entries and deleted ones. */
+ int64_t count = lpGetInteger(lp_ele) + lpGetInteger(lpNext(lp,lp_ele));
+ if (count >= server.stream_node_max_entries) new_node = 1;
+ }
+
+ if (new_node) {
+ /* Shrink extra pre-allocated memory */
+ lp = lpShrinkToFit(lp);
+ if (ri.data != lp)
+ raxInsert(s->rax,ri.key,ri.key_len,lp,NULL);
+ lp = NULL;
+ }
+ }
+
+ int flags = STREAM_ITEM_FLAG_NONE;
+ if (lp == NULL) {
+ master_id = id;
+ streamEncodeID(rax_key,&id);
+ /* Create the listpack having the master entry ID and fields.
+ * Pre-allocate some bytes when creating listpack to avoid realloc on
+ * every XADD. Since listpack.c uses malloc_size, it'll grow in steps,
+ * and won't realloc on every XADD.
+ * When listpack reaches max number of entries, we'll shrink the
+ * allocation to fit the data. */
+ size_t prealloc = STREAM_LISTPACK_MAX_PRE_ALLOCATE;
+ if (server.stream_node_max_bytes > 0 && server.stream_node_max_bytes < prealloc) {
+ prealloc = server.stream_node_max_bytes;
+ }
+ lp = lpNew(prealloc);
+ lp = lpAppendInteger(lp,1); /* One item, the one we are adding. */
+ lp = lpAppendInteger(lp,0); /* Zero deleted so far. */
+ lp = lpAppendInteger(lp,numfields);
+ for (int64_t i = 0; i < numfields; i++) {
+ sds field = argv[i*2]->ptr;
+ lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
+ }
+ lp = lpAppendInteger(lp,0); /* Master entry zero terminator. */
+ raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
+ /* The first entry we insert, has obviously the same fields of the
+ * master entry. */
+ flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
+ } else {
+ serverAssert(ri.key_len == sizeof(rax_key));
+ memcpy(rax_key,ri.key,sizeof(rax_key));
+
+ /* Read the master ID from the radix tree key. */
+ streamDecodeID(rax_key,&master_id);
+ unsigned char *lp_ele = lpFirst(lp);
+
+ /* Update count and skip the deleted fields. */
+ int64_t count = lpGetInteger(lp_ele);
+ lp = lpReplaceInteger(lp,&lp_ele,count+1);
+ lp_ele = lpNext(lp,lp_ele); /* seek deleted. */
+ lp_ele = lpNext(lp,lp_ele); /* seek master entry num fields. */
+
+ /* Check if the entry we are adding, have the same fields
+ * as the master entry. */
+ int64_t master_fields_count = lpGetInteger(lp_ele);
+ lp_ele = lpNext(lp,lp_ele);
+ if (numfields == master_fields_count) {
+ int64_t i;
+ for (i = 0; i < master_fields_count; i++) {
+ sds field = argv[i*2]->ptr;
+ int64_t e_len;
+ unsigned char buf[LP_INTBUF_SIZE];
+ unsigned char *e = lpGet(lp_ele,&e_len,buf);
+ /* Stop if there is a mismatch. */
+ if (sdslen(field) != (size_t)e_len ||
+ memcmp(e,field,e_len) != 0) break;
+ lp_ele = lpNext(lp,lp_ele);
+ }
+ /* All fields are the same! We can compress the field names
+ * setting a single bit in the flags. */
+ if (i == master_fields_count) flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
+ }
+ }
+
+ /* Populate the listpack with the new entry. We use the following
+ * encoding:
+ *
+ * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+
+ * |flags|entry-id|num-fields|field-1|value-1|...|field-N|value-N|lp-count|
+ * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+
+ *
+ * However if the SAMEFIELD flag is set, we have just to populate
+ * the entry with the values, so it becomes:
+ *
+ * +-----+--------+-------+-/-+-------+--------+
+ * |flags|entry-id|value-1|...|value-N|lp-count|
+ * +-----+--------+-------+-/-+-------+--------+
+ *
+ * The entry-id field is actually two separated fields: the ms
+ * and seq difference compared to the master entry.
+ *
+ * The lp-count field is a number that states the number of listpack pieces
+ * that compose the entry, so that it's possible to travel the entry
+ * in reverse order: we can just start from the end of the listpack, read
+ * the entry, and jump back N times to seek the "flags" field to read
+ * the stream full entry. */
+ lp = lpAppendInteger(lp,flags);
+ lp = lpAppendInteger(lp,id.ms - master_id.ms);
+ lp = lpAppendInteger(lp,id.seq - master_id.seq);
+ if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS))
+ lp = lpAppendInteger(lp,numfields);
+ for (int64_t i = 0; i < numfields; i++) {
+ sds field = argv[i*2]->ptr, value = argv[i*2+1]->ptr;
+ if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS))
+ lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
+ lp = lpAppend(lp,(unsigned char*)value,sdslen(value));
+ }
+ /* Compute and store the lp-count field. */
+ int64_t lp_count = numfields;
+ lp_count += 3; /* Add the 3 fixed fields flags + ms-diff + seq-diff. */
+ if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) {
+ /* If the item is not compressed, it also has the fields other than
+ * the values, and an additional num-fields field. */
+ lp_count += numfields+1;
+ }
+ lp = lpAppendInteger(lp,lp_count);
+
+ /* Insert back into the tree in order to update the listpack pointer. */
+ if (ri.data != lp)
+ raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
+ s->length++;
+ s->entries_added++;
+ s->last_id = id;
+ if (s->length == 1) s->first_id = id;
+ if (added_id) *added_id = id;
+ return C_OK;
+}
+
+typedef struct {
+ /* XADD options */
+ streamID id; /* User-provided ID, for XADD only. */
+ int id_given; /* Was an ID different than "*" specified? for XADD only. */
+ int seq_given; /* Was an ID different than "ms-*" specified? for XADD only. */
+ int no_mkstream; /* if set to 1 do not create new stream */
+
+ /* XADD + XTRIM common options */
+ int trim_strategy; /* TRIM_STRATEGY_* */
+ int trim_strategy_arg_idx; /* Index of the count in MAXLEN/MINID, for rewriting. */
+ int approx_trim; /* If 1 only delete whole radix tree nodes, so
+ * the trim argument is not applied verbatim. */
+ long long limit; /* Maximum amount of entries to trim. If 0, no limitation
+ * on the amount of trimming work is enforced. */
+ /* TRIM_STRATEGY_MAXLEN options */
+ long long maxlen; /* After trimming, leave stream at this length . */
+ /* TRIM_STRATEGY_MINID options */
+ streamID minid; /* Trim by ID (No stream entries with ID < 'minid' will remain) */
+} streamAddTrimArgs;
+
+#define TRIM_STRATEGY_NONE 0
+#define TRIM_STRATEGY_MAXLEN 1
+#define TRIM_STRATEGY_MINID 2
+
+/* Trim the stream 's' according to args->trim_strategy, and return the
+ * number of elements removed from the stream. The 'approx' option, if non-zero,
+ * specifies that the trimming must be performed in a approximated way in
+ * order to maximize performances. This means that the stream may contain
+ * entries with IDs < 'id' in case of MINID (or more elements than 'maxlen'
+ * in case of MAXLEN), and elements are only removed if we can remove
+ * a *whole* node of the radix tree. The elements are removed from the head
+ * of the stream (older elements).
+ *
+ * The function may return zero if:
+ *
+ * 1) The minimal entry ID of the stream is already < 'id' (MINID); or
+ * 2) The stream is already shorter or equal to the specified max length (MAXLEN); or
+ * 3) The 'approx' option is true and the head node did not have enough elements
+ * to be deleted.
+ *
+ * args->limit is the maximum number of entries to delete. The purpose is to
+ * prevent this function from taking to long.
+ * If 'limit' is 0 then we do not limit the number of deleted entries.
+ * Much like the 'approx', if 'limit' is smaller than the number of entries
+ * that should be trimmed, there is a chance we will still have entries with
+ * IDs < 'id' (or number of elements >= maxlen in case of MAXLEN).
+ */
+int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
+ size_t maxlen = args->maxlen;
+ streamID *id = &args->minid;
+ int approx = args->approx_trim;
+ int64_t limit = args->limit;
+ int trim_strategy = args->trim_strategy;
+
+ if (trim_strategy == TRIM_STRATEGY_NONE)
+ return 0;
+
+ raxIterator ri;
+ raxStart(&ri,s->rax);
+ raxSeek(&ri,"^",NULL,0);
+
+ int64_t deleted = 0;
+ while (raxNext(&ri)) {
+ if (trim_strategy == TRIM_STRATEGY_MAXLEN && s->length <= maxlen)
+ break;
+
+ unsigned char *lp = ri.data, *p = lpFirst(lp);
+ int64_t entries = lpGetInteger(p);
+
+ /* Check if we exceeded the amount of work we could do */
+ if (limit && (deleted + entries) > limit)
+ break;
+
+ /* Check if we can remove the whole node. */
+ int remove_node;
+ streamID master_id = {0}; /* For MINID */
+ if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
+ remove_node = s->length - entries >= maxlen;
+ } else {
+ /* Read the master ID from the radix tree key. */
+ streamDecodeID(ri.key, &master_id);
+
+ /* Read last ID. */
+ streamID last_id = {0,0};
+ lpGetEdgeStreamID(lp, 0, &master_id, &last_id);
+
+ /* We can remove the entire node id its last ID < 'id' */
+ remove_node = streamCompareID(&last_id, id) < 0;
+ }
+
+ if (remove_node) {
+ lpFree(lp);
+ raxRemove(s->rax,ri.key,ri.key_len,NULL);
+ raxSeek(&ri,">=",ri.key,ri.key_len);
+ s->length -= entries;
+ deleted += entries;
+ continue;
+ }
+
+ /* If we cannot remove a whole element, and approx is true,
+ * stop here. */
+ if (approx) break;
+
+ /* Now we have to trim entries from within 'lp' */
+ int64_t deleted_from_lp = 0;
+
+ p = lpNext(lp, p); /* Skip deleted field. */
+ p = lpNext(lp, p); /* Skip num-of-fields in the master entry. */
+
+ /* Skip all the master fields. */
+ int64_t master_fields_count = lpGetInteger(p);
+ p = lpNext(lp,p); /* Skip the first field. */
+ for (int64_t j = 0; j < master_fields_count; j++)
+ p = lpNext(lp,p); /* Skip all master fields. */
+ p = lpNext(lp,p); /* Skip the zero master entry terminator. */
+
+ /* 'p' is now pointing to the first entry inside the listpack.
+ * We have to run entry after entry, marking entries as deleted
+ * if they are already not deleted. */
+ while (p) {
+ /* We keep a copy of p (which point to flags part) in order to
+ * update it after (and if) we actually remove the entry */
+ unsigned char *pcopy = p;
+
+ int64_t flags = lpGetInteger(p);
+ p = lpNext(lp, p); /* Skip flags. */
+ int64_t to_skip;
+
+ int64_t ms_delta = lpGetInteger(p);
+ p = lpNext(lp, p); /* Skip ID ms delta */
+ int64_t seq_delta = lpGetInteger(p);
+ p = lpNext(lp, p); /* Skip ID seq delta */
+
+ streamID currid = {0}; /* For MINID */
+ if (trim_strategy == TRIM_STRATEGY_MINID) {
+ currid.ms = master_id.ms + ms_delta;
+ currid.seq = master_id.seq + seq_delta;
+ }
+
+ int stop;
+ if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
+ stop = s->length <= maxlen;
+ } else {
+ /* Following IDs will definitely be greater because the rax
+ * tree is sorted, no point of continuing. */
+ stop = streamCompareID(&currid, id) >= 0;
+ }
+ if (stop)
+ break;
+
+ if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
+ to_skip = master_fields_count;
+ } else {
+ to_skip = lpGetInteger(p); /* Get num-fields. */
+ p = lpNext(lp,p); /* Skip num-fields. */
+ to_skip *= 2; /* Fields and values. */
+ }
+
+ while(to_skip--) p = lpNext(lp,p); /* Skip the whole entry. */
+ p = lpNext(lp,p); /* Skip the final lp-count field. */
+
+ /* Mark the entry as deleted. */
+ if (!(flags & STREAM_ITEM_FLAG_DELETED)) {
+ intptr_t delta = p - lp;
+ flags |= STREAM_ITEM_FLAG_DELETED;
+ lp = lpReplaceInteger(lp, &pcopy, flags);
+ deleted_from_lp++;
+ s->length--;
+ p = lp + delta;
+ }
+ }
+ deleted += deleted_from_lp;
+
+ /* Now we update the entries/deleted counters. */
+ p = lpFirst(lp);
+ lp = lpReplaceInteger(lp,&p,entries-deleted_from_lp);
+ p = lpNext(lp,p); /* Skip deleted field. */
+ int64_t marked_deleted = lpGetInteger(p);
+ lp = lpReplaceInteger(lp,&p,marked_deleted+deleted_from_lp);
+ p = lpNext(lp,p); /* Skip num-of-fields in the master entry. */
+
+ /* Here we should perform garbage collection in case at this point
+ * there are too many entries deleted inside the listpack. */
+ entries -= deleted_from_lp;
+ marked_deleted += deleted_from_lp;
+ if (entries + marked_deleted > 10 && marked_deleted > entries/2) {
+ /* TODO: perform a garbage collection. */
+ }
+
+ /* Update the listpack with the new pointer. */
+ raxInsert(s->rax,ri.key,ri.key_len,lp,NULL);
+
+ break; /* If we are here, there was enough to delete in the current
+ node, so no need to go to the next node. */
+ }
+ raxStop(&ri);
+
+ /* Update the stream's first ID after the trimming. */
+ if (s->length == 0) {
+ s->first_id.ms = 0;
+ s->first_id.seq = 0;
+ } else if (deleted) {
+ streamGetEdgeID(s,1,1,&s->first_id);
+ }
+
+ return deleted;
+}
+
+/* Trims a stream by length. Returns the number of deleted items. */
+int64_t streamTrimByLength(stream *s, long long maxlen, int approx) {
+ streamAddTrimArgs args = {
+ .trim_strategy = TRIM_STRATEGY_MAXLEN,
+ .approx_trim = approx,
+ .limit = approx ? 100 * server.stream_node_max_entries : 0,
+ .maxlen = maxlen
+ };
+ return streamTrim(s, &args);
+}
+
+/* Trims a stream by minimum ID. Returns the number of deleted items. */
+int64_t streamTrimByID(stream *s, streamID minid, int approx) {
+ streamAddTrimArgs args = {
+ .trim_strategy = TRIM_STRATEGY_MINID,
+ .approx_trim = approx,
+ .limit = approx ? 100 * server.stream_node_max_entries : 0,
+ .minid = minid
+ };
+ return streamTrim(s, &args);
+}
+
+/* Parse the arguments of XADD/XTRIM.
+ *
+ * See streamAddTrimArgs for more details about the arguments handled.
+ *
+ * This function returns the position of the ID argument (relevant only to XADD).
+ * On error -1 is returned and a reply is sent. */
+static int streamParseAddOrTrimArgsOrReply(client *c, streamAddTrimArgs *args, int xadd) {
+ /* Initialize arguments to defaults */
+ memset(args, 0, sizeof(*args));
+
+ /* Parse options. */
+ int i = 2; /* This is the first argument position where we could
+ find an option, or the ID. */
+ int limit_given = 0;
+ for (; i < c->argc; i++) {
+ int moreargs = (c->argc-1) - i; /* Number of additional arguments. */
+ char *opt = c->argv[i]->ptr;
+ if (xadd && opt[0] == '*' && opt[1] == '\0') {
+ /* This is just a fast path for the common case of auto-ID
+ * creation. */
+ break;
+ } else if (!strcasecmp(opt,"maxlen") && moreargs) {
+ if (args->trim_strategy != TRIM_STRATEGY_NONE) {
+ addReplyError(c,"syntax error, MAXLEN and MINID options at the same time are not compatible");
+ return -1;
+ }
+ args->approx_trim = 0;
+ char *next = c->argv[i+1]->ptr;
+ /* Check for the form MAXLEN ~ <count>. */
+ if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
+ args->approx_trim = 1;
+ i++;
+ } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') {
+ i++;
+ }
+ if (getLongLongFromObjectOrReply(c,c->argv[i+1],&args->maxlen,NULL)
+ != C_OK) return -1;
+
+ if (args->maxlen < 0) {
+ addReplyError(c,"The MAXLEN argument must be >= 0.");
+ return -1;
+ }
+ i++;
+ args->trim_strategy = TRIM_STRATEGY_MAXLEN;
+ args->trim_strategy_arg_idx = i;
+ } else if (!strcasecmp(opt,"minid") && moreargs) {
+ if (args->trim_strategy != TRIM_STRATEGY_NONE) {
+ addReplyError(c,"syntax error, MAXLEN and MINID options at the same time are not compatible");
+ return -1;
+ }
+ args->approx_trim = 0;
+ char *next = c->argv[i+1]->ptr;
+ /* Check for the form MINID ~ <id> */
+ if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
+ args->approx_trim = 1;
+ i++;
+ } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') {
+ i++;
+ }
+
+ if (streamParseStrictIDOrReply(c,c->argv[i+1],&args->minid,0,NULL) != C_OK)
+ return -1;
+
+ i++;
+ args->trim_strategy = TRIM_STRATEGY_MINID;
+ args->trim_strategy_arg_idx = i;
+ } else if (!strcasecmp(opt,"limit") && moreargs) {
+ /* Note about LIMIT: If it was not provided by the caller we set
+ * it to 100*server.stream_node_max_entries, and that's to prevent the
+ * trimming from taking too long, on the expense of not deleting entries
+ * that should be trimmed.
+ * If user wanted exact trimming (i.e. no '~') we never limit the number
+ * of trimmed entries */
+ if (getLongLongFromObjectOrReply(c,c->argv[i+1],&args->limit,NULL) != C_OK)
+ return -1;
+
+ if (args->limit < 0) {
+ addReplyError(c,"The LIMIT argument must be >= 0.");
+ return -1;
+ }
+ limit_given = 1;
+ i++;
+ } else if (xadd && !strcasecmp(opt,"nomkstream")) {
+ args->no_mkstream = 1;
+ } else if (xadd) {
+ /* If we are here is a syntax error or a valid ID. */
+ if (streamParseStrictIDOrReply(c,c->argv[i],&args->id,0,&args->seq_given) != C_OK)
+ return -1;
+ args->id_given = 1;
+ break;
+ } else {
+ addReplyErrorObject(c,shared.syntaxerr);
+ return -1;
+ }
+ }
+
+ if (args->limit && args->trim_strategy == TRIM_STRATEGY_NONE) {
+ addReplyError(c,"syntax error, LIMIT cannot be used without specifying a trimming strategy");
+ return -1;
+ }
+
+ if (!xadd && args->trim_strategy == TRIM_STRATEGY_NONE) {
+ addReplyError(c,"syntax error, XTRIM must be called with a trimming strategy");
+ return -1;
+ }
+
+ if (mustObeyClient(c)) {
+ /* If command came from master or from AOF we must not enforce maxnodes
+ * (The maxlen/minid argument was re-written to make sure there's no
+ * inconsistency). */
+ args->limit = 0;
+ } else {
+ /* We need to set the limit (only if we got '~') */
+ if (limit_given) {
+ if (!args->approx_trim) {
+ /* LIMIT was provided without ~ */
+ addReplyError(c,"syntax error, LIMIT cannot be used without the special ~ option");
+ return -1;
+ }
+ } else {
+ /* User didn't provide LIMIT, we must set it. */
+ if (args->approx_trim) {
+ /* In order to prevent from trimming to do too much work and
+ * cause latency spikes we limit the amount of work it can do.
+ * We have to cap args->limit from both sides in case
+ * stream_node_max_entries is 0 or too big (could cause overflow)
+ */
+ args->limit = 100 * server.stream_node_max_entries; /* Maximum 100 rax nodes. */
+ if (args->limit <= 0) args->limit = 10000;
+ if (args->limit > 1000000) args->limit = 1000000;
+ } else {
+ /* No LIMIT for exact trimming */
+ args->limit = 0;
+ }
+ }
+ }
+
+ return i;
+}
+
+/* Initialize the stream iterator, so that we can call iterating functions
+ * to get the next items. This requires a corresponding streamIteratorStop()
+ * at the end. The 'rev' parameter controls the direction. If it's zero the
+ * iteration is from the start to the end element (inclusive), otherwise
+ * if rev is non-zero, the iteration is reversed.
+ *
+ * Once the iterator is initialized, we iterate like this:
+ *
+ * streamIterator myiterator;
+ * streamIteratorStart(&myiterator,...);
+ * int64_t numfields;
+ * while(streamIteratorGetID(&myiterator,&ID,&numfields)) {
+ * while(numfields--) {
+ * unsigned char *key, *value;
+ * size_t key_len, value_len;
+ * streamIteratorGetField(&myiterator,&key,&value,&key_len,&value_len);
+ *
+ * ... do what you want with key and value ...
+ * }
+ * }
+ * streamIteratorStop(&myiterator); */
+void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev) {
+ /* Initialize the iterator and translates the iteration start/stop
+ * elements into a 128 big big-endian number. */
+ if (start) {
+ streamEncodeID(si->start_key,start);
+ } else {
+ si->start_key[0] = 0;
+ si->start_key[1] = 0;
+ }
+
+ if (end) {
+ streamEncodeID(si->end_key,end);
+ } else {
+ si->end_key[0] = UINT64_MAX;
+ si->end_key[1] = UINT64_MAX;
+ }
+
+ /* Seek the correct node in the radix tree. */
+ raxStart(&si->ri,s->rax);
+ if (!rev) {
+ if (start && (start->ms || start->seq)) {
+ raxSeek(&si->ri,"<=",(unsigned char*)si->start_key,
+ sizeof(si->start_key));
+ if (raxEOF(&si->ri)) raxSeek(&si->ri,"^",NULL,0);
+ } else {
+ raxSeek(&si->ri,"^",NULL,0);
+ }
+ } else {
+ if (end && (end->ms || end->seq)) {
+ raxSeek(&si->ri,"<=",(unsigned char*)si->end_key,
+ sizeof(si->end_key));
+ if (raxEOF(&si->ri)) raxSeek(&si->ri,"$",NULL,0);
+ } else {
+ raxSeek(&si->ri,"$",NULL,0);
+ }
+ }
+ si->stream = s;
+ si->lp = NULL; /* There is no current listpack right now. */
+ si->lp_ele = NULL; /* Current listpack cursor. */
+ si->rev = rev; /* Direction, if non-zero reversed, from end to start. */
+ si->skip_tombstones = 1; /* By default tombstones aren't emitted. */
+}
+
+/* Return 1 and store the current item ID at 'id' if there are still
+ * elements within the iteration range, otherwise return 0 in order to
+ * signal the iteration terminated. */
+int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
+ while(1) { /* Will stop when element > stop_key or end of radix tree. */
+ /* If the current listpack is set to NULL, this is the start of the
+ * iteration or the previous listpack was completely iterated.
+ * Go to the next node. */
+ if (si->lp == NULL || si->lp_ele == NULL) {
+ if (!si->rev && !raxNext(&si->ri)) return 0;
+ else if (si->rev && !raxPrev(&si->ri)) return 0;
+ serverAssert(si->ri.key_len == sizeof(streamID));
+ /* Get the master ID. */
+ streamDecodeID(si->ri.key,&si->master_id);
+ /* Get the master fields count. */
+ si->lp = si->ri.data;
+ si->lp_ele = lpFirst(si->lp); /* Seek items count */
+ si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek deleted count. */
+ si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek num fields. */
+ si->master_fields_count = lpGetInteger(si->lp_ele);
+ si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek first field. */
+ si->master_fields_start = si->lp_ele;
+ /* We are now pointing to the first field of the master entry.
+ * We need to seek either the first or the last entry depending
+ * on the direction of the iteration. */
+ if (!si->rev) {
+ /* If we are iterating in normal order, skip the master fields
+ * to seek the first actual entry. */
+ for (uint64_t i = 0; i < si->master_fields_count; i++)
+ si->lp_ele = lpNext(si->lp,si->lp_ele);
+ } else {
+ /* If we are iterating in reverse direction, just seek the
+ * last part of the last entry in the listpack (that is, the
+ * fields count). */
+ si->lp_ele = lpLast(si->lp);
+ }
+ } else if (si->rev) {
+ /* If we are iterating in the reverse order, and this is not
+ * the first entry emitted for this listpack, then we already
+ * emitted the current entry, and have to go back to the previous
+ * one. */
+ int64_t lp_count = lpGetInteger(si->lp_ele);
+ while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele);
+ /* Seek lp-count of prev entry. */
+ si->lp_ele = lpPrev(si->lp,si->lp_ele);
+ }
+
+ /* For every radix tree node, iterate the corresponding listpack,
+ * returning elements when they are within range. */
+ while(1) {
+ if (!si->rev) {
+ /* If we are going forward, skip the previous entry
+ * lp-count field (or in case of the master entry, the zero
+ * term field) */
+ si->lp_ele = lpNext(si->lp,si->lp_ele);
+ if (si->lp_ele == NULL) break;
+ } else {
+ /* If we are going backward, read the number of elements this
+ * entry is composed of, and jump backward N times to seek
+ * its start. */
+ int64_t lp_count = lpGetInteger(si->lp_ele);
+ if (lp_count == 0) { /* We reached the master entry. */
+ si->lp = NULL;
+ si->lp_ele = NULL;
+ break;
+ }
+ while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele);
+ }
+
+ /* Get the flags entry. */
+ si->lp_flags = si->lp_ele;
+ int64_t flags = lpGetInteger(si->lp_ele);
+ si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek ID. */
+
+ /* Get the ID: it is encoded as difference between the master
+ * ID and this entry ID. */
+ *id = si->master_id;
+ id->ms += lpGetInteger(si->lp_ele);
+ si->lp_ele = lpNext(si->lp,si->lp_ele);
+ id->seq += lpGetInteger(si->lp_ele);
+ si->lp_ele = lpNext(si->lp,si->lp_ele);
+ unsigned char buf[sizeof(streamID)];
+ streamEncodeID(buf,id);
+
+ /* The number of entries is here or not depending on the
+ * flags. */
+ if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
+ *numfields = si->master_fields_count;
+ } else {
+ *numfields = lpGetInteger(si->lp_ele);
+ si->lp_ele = lpNext(si->lp,si->lp_ele);
+ }
+ serverAssert(*numfields>=0);
+
+ /* If current >= start, and the entry is not marked as
+ * deleted or tombstones are included, emit it. */
+ if (!si->rev) {
+ if (memcmp(buf,si->start_key,sizeof(streamID)) >= 0 &&
+ (!si->skip_tombstones || !(flags & STREAM_ITEM_FLAG_DELETED)))
+ {
+ if (memcmp(buf,si->end_key,sizeof(streamID)) > 0)
+ return 0; /* We are already out of range. */
+ si->entry_flags = flags;
+ if (flags & STREAM_ITEM_FLAG_SAMEFIELDS)
+ si->master_fields_ptr = si->master_fields_start;
+ return 1; /* Valid item returned. */
+ }
+ } else {
+ if (memcmp(buf,si->end_key,sizeof(streamID)) <= 0 &&
+ (!si->skip_tombstones || !(flags & STREAM_ITEM_FLAG_DELETED)))
+ {
+ if (memcmp(buf,si->start_key,sizeof(streamID)) < 0)
+ return 0; /* We are already out of range. */
+ si->entry_flags = flags;
+ if (flags & STREAM_ITEM_FLAG_SAMEFIELDS)
+ si->master_fields_ptr = si->master_fields_start;
+ return 1; /* Valid item returned. */
+ }
+ }
+
+ /* If we do not emit, we have to discard if we are going
+ * forward, or seek the previous entry if we are going
+ * backward. */
+ if (!si->rev) {
+ int64_t to_discard = (flags & STREAM_ITEM_FLAG_SAMEFIELDS) ?
+ *numfields : *numfields*2;
+ for (int64_t i = 0; i < to_discard; i++)
+ si->lp_ele = lpNext(si->lp,si->lp_ele);
+ } else {
+ int64_t prev_times = 4; /* flag + id ms + id seq + one more to
+ go back to the previous entry "count"
+ field. */
+ /* If the entry was not flagged SAMEFIELD we also read the
+ * number of fields, so go back one more. */
+ if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) prev_times++;
+ while(prev_times--) si->lp_ele = lpPrev(si->lp,si->lp_ele);
+ }
+ }
+
+ /* End of listpack reached. Try the next/prev radix tree node. */
+ }
+}
+
+/* Get the field and value of the current item we are iterating. This should
+ * be called immediately after streamIteratorGetID(), and for each field
+ * according to the number of fields returned by streamIteratorGetID().
+ * The function populates the field and value pointers and the corresponding
+ * lengths by reference, that are valid until the next iterator call, assuming
+ * no one touches the stream meanwhile. */
+void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen) {
+ if (si->entry_flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
+ *fieldptr = lpGet(si->master_fields_ptr,fieldlen,si->field_buf);
+ si->master_fields_ptr = lpNext(si->lp,si->master_fields_ptr);
+ } else {
+ *fieldptr = lpGet(si->lp_ele,fieldlen,si->field_buf);
+ si->lp_ele = lpNext(si->lp,si->lp_ele);
+ }
+ *valueptr = lpGet(si->lp_ele,valuelen,si->value_buf);
+ si->lp_ele = lpNext(si->lp,si->lp_ele);
+}
+
+/* Remove the current entry from the stream: can be called after the
+ * GetID() API or after any GetField() call, however we need to iterate
+ * a valid entry while calling this function. Moreover the function
+ * requires the entry ID we are currently iterating, that was previously
+ * returned by GetID().
+ *
+ * Note that after calling this function, next calls to GetField() can't
+ * be performed: the entry is now deleted. Instead the iterator will
+ * automatically re-seek to the next entry, so the caller should continue
+ * with GetID(). */
+void streamIteratorRemoveEntry(streamIterator *si, streamID *current) {
+ unsigned char *lp = si->lp;
+ int64_t aux;
+
+ /* We do not really delete the entry here. Instead we mark it as
+ * deleted by flagging it, and also incrementing the count of the
+ * deleted entries in the listpack header.
+ *
+ * We start flagging: */
+ int64_t flags = lpGetInteger(si->lp_flags);
+ flags |= STREAM_ITEM_FLAG_DELETED;
+ lp = lpReplaceInteger(lp,&si->lp_flags,flags);
+
+ /* Change the valid/deleted entries count in the master entry. */
+ unsigned char *p = lpFirst(lp);
+ aux = lpGetInteger(p);
+
+ if (aux == 1) {
+ /* If this is the last element in the listpack, we can remove the whole
+ * node. */
+ lpFree(lp);
+ raxRemove(si->stream->rax,si->ri.key,si->ri.key_len,NULL);
+ } else {
+ /* In the base case we alter the counters of valid/deleted entries. */
+ lp = lpReplaceInteger(lp,&p,aux-1);
+ p = lpNext(lp,p); /* Seek deleted field. */
+ aux = lpGetInteger(p);
+ lp = lpReplaceInteger(lp,&p,aux+1);
+
+ /* Update the listpack with the new pointer. */
+ if (si->lp != lp)
+ raxInsert(si->stream->rax,si->ri.key,si->ri.key_len,lp,NULL);
+ }
+
+ /* Update the number of entries counter. */
+ si->stream->length--;
+
+ /* Re-seek the iterator to fix the now messed up state. */
+ streamID start, end;
+ if (si->rev) {
+ streamDecodeID(si->start_key,&start);
+ end = *current;
+ } else {
+ start = *current;
+ streamDecodeID(si->end_key,&end);
+ }
+ streamIteratorStop(si);
+ streamIteratorStart(si,si->stream,&start,&end,si->rev);
+
+ /* TODO: perform a garbage collection here if the ratio between
+ * deleted and valid goes over a certain limit. */
+}
+
+/* Stop the stream iterator. The only cleanup we need is to free the rax
+ * iterator, since the stream iterator itself is supposed to be stack
+ * allocated. */
+void streamIteratorStop(streamIterator *si) {
+ raxStop(&si->ri);
+}
+
+/* Return 1 if `id` exists in `s` (and not marked as deleted) */
+int streamEntryExists(stream *s, streamID *id) {
+ streamIterator si;
+ streamIteratorStart(&si,s,id,id,0);
+ streamID myid;
+ int64_t numfields;
+ int found = streamIteratorGetID(&si,&myid,&numfields);
+ streamIteratorStop(&si);
+ if (!found)
+ return 0;
+ serverAssert(streamCompareID(id,&myid) == 0);
+ return 1;
+}
+
+/* Delete the specified item ID from the stream, returning 1 if the item
+ * was deleted 0 otherwise (if it does not exist). */
+int streamDeleteItem(stream *s, streamID *id) {
+ int deleted = 0;
+ streamIterator si;
+ streamIteratorStart(&si,s,id,id,0);
+ streamID myid;
+ int64_t numfields;
+ if (streamIteratorGetID(&si,&myid,&numfields)) {
+ streamIteratorRemoveEntry(&si,&myid);
+ deleted = 1;
+ }
+ streamIteratorStop(&si);
+ return deleted;
+}
+
+/* Get the last valid (non-tombstone) streamID of 's'. */
+void streamLastValidID(stream *s, streamID *maxid)
+{
+ streamIterator si;
+ streamIteratorStart(&si,s,NULL,NULL,1);
+ int64_t numfields;
+ if (!streamIteratorGetID(&si,maxid,&numfields) && s->length)
+ serverPanic("Corrupt stream, length is %llu, but no max id", (unsigned long long)s->length);
+ streamIteratorStop(&si);
+}
+
+/* Maximum size for a stream ID string. In theory 20*2+1 should be enough,
+ * But to avoid chance for off by one issues and null-term, in case this will
+ * be used as parsing buffer, we use a slightly larger buffer. On the other
+ * hand considering sds header is gonna add 4 bytes, we wanna keep below the
+ * allocator's 48 bytes bin. */
+#define STREAM_ID_STR_LEN 44
+
+sds createStreamIDString(streamID *id) {
+ /* Optimization: pre-allocate a big enough buffer to avoid reallocs. */
+ sds str = sdsnewlen(SDS_NOINIT, STREAM_ID_STR_LEN);
+ sdssetlen(str, 0);
+ return sdscatfmt(str,"%U-%U", id->ms,id->seq);
+}
+
+/* Emit a reply in the client output buffer by formatting a Stream ID
+ * in the standard <ms>-<seq> format, using the simple string protocol
+ * of REPL. */
+void addReplyStreamID(client *c, streamID *id) {
+ addReplyBulkSds(c,createStreamIDString(id));
+}
+
+void setDeferredReplyStreamID(client *c, void *dr, streamID *id) {
+ setDeferredReplyBulkSds(c, dr, createStreamIDString(id));
+}
+
+/* Similar to the above function, but just creates an object, usually useful
+ * for replication purposes to create arguments. */
+robj *createObjectFromStreamID(streamID *id) {
+ return createObject(OBJ_STRING, createStreamIDString(id));
+}
+
+/* Returns non-zero if the ID is 0-0. */
+int streamIDEqZero(streamID *id) {
+ return !(id->ms || id->seq);
+}
+
+/* A helper that returns non-zero if the range from 'start' to `end`
+ * contains a tombstone.
+ *
+ * NOTE: this assumes that the caller had verified that 'start' is less than
+ * 's->last_id'. */
+int streamRangeHasTombstones(stream *s, streamID *start, streamID *end) {
+ streamID start_id, end_id;
+
+ if (!s->length || streamIDEqZero(&s->max_deleted_entry_id)) {
+ /* The stream is empty or has no tombstones. */
+ return 0;
+ }
+
+ if (streamCompareID(&s->first_id,&s->max_deleted_entry_id) > 0) {
+ /* The latest tombstone is before the first entry. */
+ return 0;
+ }
+
+ if (start) {
+ start_id = *start;
+ } else {
+ start_id.ms = 0;
+ start_id.seq = 0;
+ }
+
+ if (end) {
+ end_id = *end;
+ } else {
+ end_id.ms = UINT64_MAX;
+ end_id.seq = UINT64_MAX;
+ }
+
+ if (streamCompareID(&start_id,&s->max_deleted_entry_id) <= 0 &&
+ streamCompareID(&s->max_deleted_entry_id,&end_id) <= 0)
+ {
+ /* start_id <= max_deleted_entry_id <= end_id: The range does include a tombstone. */
+ return 1;
+ }
+
+ /* The range doesn't includes a tombstone. */
+ return 0;
+}
+
+/* Replies with a consumer group's current lag, that is the number of messages
+ * in the stream that are yet to be delivered. In case that the lag isn't
+ * available due to fragmentation, the reply to the client is a null. */
+void streamReplyWithCGLag(client *c, stream *s, streamCG *cg) {
+ int valid = 0;
+ long long lag = 0;
+
+ if (!s->entries_added) {
+ /* The lag of a newly-initialized stream is 0. */
+ lag = 0;
+ valid = 1;
+ } else if (cg->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s,&cg->last_id,NULL)) {
+ /* No fragmentation ahead means that the group's logical reads counter
+ * is valid for performing the lag calculation. */
+ lag = (long long)s->entries_added - cg->entries_read;
+ valid = 1;
+ } else {
+ /* Attempt to retrieve the group's last ID logical read counter. */
+ long long entries_read = streamEstimateDistanceFromFirstEverEntry(s,&cg->last_id);
+ if (entries_read != SCG_INVALID_ENTRIES_READ) {
+ /* A valid counter was obtained. */
+ lag = (long long)s->entries_added - entries_read;
+ valid = 1;
+ }
+ }
+
+ if (valid) {
+ addReplyLongLong(c,lag);
+ } else {
+ addReplyNull(c);
+ }
+}
+
+/* This function returns a value that is the ID's logical read counter, or its
+ * distance (the number of entries) from the first entry ever to have been added
+ * to the stream.
+ *
+ * A counter is returned only in one of the following cases:
+ * 1. The ID is the same as the stream's last ID. In this case, the returned
+ * is the same as the stream's entries_added counter.
+ * 2. The ID equals that of the currently first entry in the stream, and the
+ * stream has no tombstones. The returned value, in this case, is the result
+ * of subtracting the stream's length from its added_entries, incremented by
+ * one.
+ * 3. The ID less than the stream's first current entry's ID, and there are no
+ * tombstones. Here the estimated counter is the result of subtracting the
+ * stream's length from its added_entries.
+ * 4. The stream's added_entries is zero, meaning that no entries were ever
+ * added.
+ *
+ * The special return value of ULLONG_MAX signals that the counter's value isn't
+ * obtainable. It is returned in these cases:
+ * 1. The provided ID, if it even exists, is somewhere between the stream's
+ * current first and last entries' IDs, or in the future.
+ * 2. The stream contains one or more tombstones. */
+long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id) {
+ /* The counter of any ID in an empty, never-before-used stream is 0. */
+ if (!s->entries_added) {
+ return 0;
+ }
+
+ /* In the empty stream, if the ID is smaller or equal to the last ID,
+ * it can set to the current added_entries value. */
+ if (!s->length && streamCompareID(id,&s->last_id) < 1) {
+ return s->entries_added;
+ }
+
+ int cmp_last = streamCompareID(id,&s->last_id);
+ if (cmp_last == 0) {
+ /* Return the exact counter of the last entry in the stream. */
+ return s->entries_added;
+ } else if (cmp_last > 0) {
+ /* The counter of a future ID is unknown. */
+ return SCG_INVALID_ENTRIES_READ;
+ }
+
+ int cmp_id_first = streamCompareID(id,&s->first_id);
+ int cmp_xdel_first = streamCompareID(&s->max_deleted_entry_id,&s->first_id);
+ if (streamIDEqZero(&s->max_deleted_entry_id) || cmp_xdel_first < 0) {
+ /* There's definitely no fragmentation ahead. */
+ if (cmp_id_first < 0) {
+ /* Return the estimated counter. */
+ return s->entries_added - s->length;
+ } else if (cmp_id_first == 0) {
+ /* Return the exact counter of the first entry in the stream. */
+ return s->entries_added - s->length + 1;
+ }
+ }
+
+ /* The ID is either before an XDEL that fragments the stream or an arbitrary
+ * ID. Either case, so we can't make a prediction. */
+ return SCG_INVALID_ENTRIES_READ;
+}
+
+/* As a result of an explicit XCLAIM or XREADGROUP command, new entries
+ * are created in the pending list of the stream and consumers. We need
+ * to propagate this changes in the form of XCLAIM commands. */
+void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupname, robj *id, streamNACK *nack) {
+ /* We need to generate an XCLAIM that will work in a idempotent fashion:
+ *
+ * XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time>
+ * RETRYCOUNT <count> FORCE JUSTID LASTID <id>.
+ *
+ * Note that JUSTID is useful in order to avoid that XCLAIM will do
+ * useless work in the slave side, trying to fetch the stream item. */
+ robj *argv[14];
+ argv[0] = shared.xclaim;
+ argv[1] = key;
+ argv[2] = groupname;
+ argv[3] = createStringObject(nack->consumer->name,sdslen(nack->consumer->name));
+ argv[4] = shared.integers[0];
+ argv[5] = id;
+ argv[6] = shared.time;
+ argv[7] = createStringObjectFromLongLong(nack->delivery_time);
+ argv[8] = shared.retrycount;
+ argv[9] = createStringObjectFromLongLong(nack->delivery_count);
+ argv[10] = shared.force;
+ argv[11] = shared.justid;
+ argv[12] = shared.lastid;
+ argv[13] = createObjectFromStreamID(&group->last_id);
+
+ alsoPropagate(c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL);
+
+ decrRefCount(argv[3]);
+ decrRefCount(argv[7]);
+ decrRefCount(argv[9]);
+ decrRefCount(argv[13]);
+}
+
+/* We need this when we want to propagate the new last-id of a consumer group
+ * that was consumed by XREADGROUP with the NOACK option: in that case we can't
+ * propagate the last ID just using the XCLAIM LASTID option, so we emit
+ *
+ * XGROUP SETID <key> <groupname> <id> ENTRIESREAD <entries_read>
+ */
+void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupname) {
+ robj *argv[7];
+ argv[0] = shared.xgroup;
+ argv[1] = shared.setid;
+ argv[2] = key;
+ argv[3] = groupname;
+ argv[4] = createObjectFromStreamID(&group->last_id);
+ argv[5] = shared.entriesread;
+ argv[6] = createStringObjectFromLongLong(group->entries_read);
+
+ alsoPropagate(c->db->id,argv,7,PROPAGATE_AOF|PROPAGATE_REPL);
+
+ decrRefCount(argv[4]);
+ decrRefCount(argv[6]);
+}
+
+/* We need this when we want to propagate creation of consumer that was created
+ * by XREADGROUP with the NOACK option. In that case, the only way to create
+ * the consumer at the replica is by using XGROUP CREATECONSUMER (see issue #7140)
+ *
+ * XGROUP CREATECONSUMER <key> <groupname> <consumername>
+ */
+void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername) {
+ robj *argv[5];
+ argv[0] = shared.xgroup;
+ argv[1] = shared.createconsumer;
+ argv[2] = key;
+ argv[3] = groupname;
+ argv[4] = createObject(OBJ_STRING,sdsdup(consumername));
+
+ alsoPropagate(c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL);
+
+ decrRefCount(argv[4]);
+}
+
+/* Send the stream items in the specified range to the client 'c'. The range
+ * the client will receive is between start and end inclusive, if 'count' is
+ * non zero, no more than 'count' elements are sent.
+ *
+ * The 'end' pointer can be NULL to mean that we want all the elements from
+ * 'start' till the end of the stream. If 'rev' is non zero, elements are
+ * produced in reversed order from end to start.
+ *
+ * The function returns the number of entries emitted.
+ *
+ * If group and consumer are not NULL, the function performs additional work:
+ * 1. It updates the last delivered ID in the group in case we are
+ * sending IDs greater than the current last ID.
+ * 2. If the requested IDs are already assigned to some other consumer, the
+ * function will not return it to the client.
+ * 3. An entry in the pending list will be created for every entry delivered
+ * for the first time to this consumer.
+ * 4. The group's read counter is incremented if it is already valid and there
+ * are no future tombstones, or is invalidated (set to 0) otherwise. If the
+ * counter is invalid to begin with, we try to obtain it for the last
+ * delivered ID.
+ *
+ * The behavior may be modified passing non-zero flags:
+ *
+ * STREAM_RWR_NOACK: Do not create PEL entries, that is, the point "3" above
+ * is not performed.
+ * STREAM_RWR_RAWENTRIES: Do not emit array boundaries, but just the entries,
+ * and return the number of entries emitted as usually.
+ * This is used when the function is just used in order
+ * to emit data and there is some higher level logic.
+ *
+ * The final argument 'spi' (stream propagation info pointer) is a structure
+ * filled with information needed to propagate the command execution to AOF
+ * and slaves, in the case a consumer group was passed: we need to generate
+ * XCLAIM commands to create the pending list into AOF/slaves in that case.
+ *
+ * If 'spi' is set to NULL no propagation will happen even if the group was
+ * given, but currently such a feature is never used by the code base that
+ * will always pass 'spi' and propagate when a group is passed.
+ *
+ * Note that this function is recursive in certain cases. When it's called
+ * with a non NULL group and consumer argument, it may call
+ * streamReplyWithRangeFromConsumerPEL() in order to get entries from the
+ * consumer pending entries list. However such a function will then call
+ * streamReplyWithRange() in order to emit single entries (found in the
+ * PEL by ID) to the client. This is the use case for the STREAM_RWR_RAWENTRIES
+ * flag.
+ */
+#define STREAM_RWR_NOACK (1<<0) /* Do not create entries in the PEL. */
+#define STREAM_RWR_RAWENTRIES (1<<1) /* Do not emit protocol for array
+ boundaries, just the entries. */
+#define STREAM_RWR_HISTORY (1<<2) /* Only serve consumer local PEL. */
+size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi) {
+ void *arraylen_ptr = NULL;
+ size_t arraylen = 0;
+ streamIterator si;
+ int64_t numfields;
+ streamID id;
+ int propagate_last_id = 0;
+ int noack = flags & STREAM_RWR_NOACK;
+
+ /* If the client is asking for some history, we serve it using a
+ * different function, so that we return entries *solely* from its
+ * own PEL. This ensures each consumer will always and only see
+ * the history of messages delivered to it and not yet confirmed
+ * as delivered. */
+ if (group && (flags & STREAM_RWR_HISTORY)) {
+ return streamReplyWithRangeFromConsumerPEL(c,s,start,end,count,
+ consumer);
+ }
+
+ if (!(flags & STREAM_RWR_RAWENTRIES))
+ arraylen_ptr = addReplyDeferredLen(c);
+ streamIteratorStart(&si,s,start,end,rev);
+ while(streamIteratorGetID(&si,&id,&numfields)) {
+ /* Update the group last_id if needed. */
+ if (group && streamCompareID(&id,&group->last_id) > 0) {
+ if (group->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s,&id,NULL)) {
+ /* A valid counter and no future tombstones mean we can
+ * increment the read counter to keep tracking the group's
+ * progress. */
+ group->entries_read++;
+ } else if (s->entries_added) {
+ /* The group's counter may be invalid, so we try to obtain it. */
+ group->entries_read = streamEstimateDistanceFromFirstEverEntry(s,&id);
+ }
+ group->last_id = id;
+ /* Group last ID should be propagated only if NOACK was
+ * specified, otherwise the last id will be included
+ * in the propagation of XCLAIM itself. */
+ if (noack) propagate_last_id = 1;
+ }
+
+ /* Emit a two elements array for each item. The first is
+ * the ID, the second is an array of field-value pairs. */
+ addReplyArrayLen(c,2);
+ addReplyStreamID(c,&id);
+
+ addReplyArrayLen(c,numfields*2);
+
+ /* Emit the field-value pairs. */
+ while(numfields--) {
+ unsigned char *key, *value;
+ int64_t key_len, value_len;
+ streamIteratorGetField(&si,&key,&value,&key_len,&value_len);
+ addReplyBulkCBuffer(c,key,key_len);
+ addReplyBulkCBuffer(c,value,value_len);
+ }
+
+ /* If a group is passed, we need to create an entry in the
+ * PEL (pending entries list) of this group *and* this consumer.
+ *
+ * Note that we cannot be sure about the fact the message is not
+ * already owned by another consumer, because the admin is able
+ * to change the consumer group last delivered ID using the
+ * XGROUP SETID command. So if we find that there is already
+ * a NACK for the entry, we need to associate it to the new
+ * consumer. */
+ if (group && !noack) {
+ unsigned char buf[sizeof(streamID)];
+ streamEncodeID(buf,&id);
+
+ /* Try to add a new NACK. Most of the time this will work and
+ * will not require extra lookups. We'll fix the problem later
+ * if we find that there is already a entry for this ID. */
+ streamNACK *nack = streamCreateNACK(consumer);
+ int group_inserted =
+ raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL);
+ int consumer_inserted =
+ raxTryInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
+
+ /* Now we can check if the entry was already busy, and
+ * in that case reassign the entry to the new consumer,
+ * or update it if the consumer is the same as before. */
+ if (group_inserted == 0) {
+ streamFreeNACK(nack);
+ nack = raxFind(group->pel,buf,sizeof(buf));
+ serverAssert(nack != raxNotFound);
+ raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
+ /* Update the consumer and NACK metadata. */
+ nack->consumer = consumer;
+ nack->delivery_time = commandTimeSnapshot();
+ nack->delivery_count = 1;
+ /* Add the entry in the new consumer local PEL. */
+ raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
+ } else if (group_inserted == 1 && consumer_inserted == 0) {
+ serverPanic("NACK half-created. Should not be possible.");
+ }
+
+ consumer->active_time = commandTimeSnapshot();
+
+ /* Propagate as XCLAIM. */
+ if (spi) {
+ robj *idarg = createObjectFromStreamID(&id);
+ streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack);
+ decrRefCount(idarg);
+ }
+ }
+
+ arraylen++;
+ if (count && count == arraylen) break;
+ }
+
+ if (spi && propagate_last_id)
+ streamPropagateGroupID(c,spi->keyname,group,spi->groupname);
+
+ streamIteratorStop(&si);
+ if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen);
+ return arraylen;
+}
+
+/* This is a helper function for streamReplyWithRange() when called with
+ * group and consumer arguments, but with a range that is referring to already
+ * delivered messages. In this case we just emit messages that are already
+ * in the history of the consumer, fetching the IDs from its PEL.
+ *
+ * Note that this function does not have a 'rev' argument because it's not
+ * possible to iterate in reverse using a group. Basically this function
+ * is only called as a result of the XREADGROUP command.
+ *
+ * This function is more expensive because it needs to inspect the PEL and then
+ * seek into the radix tree of the messages in order to emit the full message
+ * to the client. However clients only reach this code path when they are
+ * fetching the history of already retrieved messages, which is rare. */
+size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer) {
+ raxIterator ri;
+ unsigned char startkey[sizeof(streamID)];
+ unsigned char endkey[sizeof(streamID)];
+ streamEncodeID(startkey,start);
+ if (end) streamEncodeID(endkey,end);
+
+ size_t arraylen = 0;
+ void *arraylen_ptr = addReplyDeferredLen(c);
+ raxStart(&ri,consumer->pel);
+ raxSeek(&ri,">=",startkey,sizeof(startkey));
+ while(raxNext(&ri) && (!count || arraylen < count)) {
+ if (end && memcmp(ri.key,end,ri.key_len) > 0) break;
+ streamID thisid;
+ streamDecodeID(ri.key,&thisid);
+ if (streamReplyWithRange(c,s,&thisid,&thisid,1,0,NULL,NULL,
+ STREAM_RWR_RAWENTRIES,NULL) == 0)
+ {
+ /* Note that we may have a not acknowledged entry in the PEL
+ * about a message that's no longer here because was removed
+ * by the user by other means. In that case we signal it emitting
+ * the ID but then a NULL entry for the fields. */
+ addReplyArrayLen(c,2);
+ addReplyStreamID(c,&thisid);
+ addReplyNullArray(c);
+ } else {
+ streamNACK *nack = ri.data;
+ nack->delivery_time = commandTimeSnapshot();
+ nack->delivery_count++;
+ }
+ arraylen++;
+ }
+ raxStop(&ri);
+ setDeferredArrayLen(c,arraylen_ptr,arraylen);
+ return arraylen;
+}
+
+/* -----------------------------------------------------------------------
+ * Stream commands implementation
+ * ----------------------------------------------------------------------- */
+
+/* Look the stream at 'key' and return the corresponding stream object.
+ * The function creates a key setting it to an empty stream if needed. */
+robj *streamTypeLookupWriteOrCreate(client *c, robj *key, int no_create) {
+ robj *o = lookupKeyWrite(c->db,key);
+ if (checkType(c,o,OBJ_STREAM)) return NULL;
+ if (o == NULL) {
+ if (no_create) {
+ addReplyNull(c);
+ return NULL;
+ }
+ o = createStreamObject();
+ dbAdd(c->db,key,o);
+ }
+ return o;
+}
+
+/* Parse a stream ID in the format given by clients to Redis, that is
+ * <ms>-<seq>, and converts it into a streamID structure. If
+ * the specified ID is invalid C_ERR is returned and an error is reported
+ * to the client, otherwise C_OK is returned. The ID may be in incomplete
+ * form, just stating the milliseconds time part of the stream. In such a case
+ * the missing part is set according to the value of 'missing_seq' parameter.
+ *
+ * The IDs "-" and "+" specify respectively the minimum and maximum IDs
+ * that can be represented. If 'strict' is set to 1, "-" and "+" will be
+ * treated as an invalid ID.
+ *
+ * The ID form <ms>-* specifies a millisconds-only ID, leaving the sequence part
+ * to be autogenerated. When a non-NULL 'seq_given' argument is provided, this
+ * form is accepted and the argument is set to 0 unless the sequence part is
+ * specified.
+ *
+ * If 'c' is set to NULL, no reply is sent to the client. */
+int streamGenericParseIDOrReply(client *c, const robj *o, streamID *id, uint64_t missing_seq, int strict, int *seq_given) {
+ char buf[128];
+ if (sdslen(o->ptr) > sizeof(buf)-1) goto invalid;
+ memcpy(buf,o->ptr,sdslen(o->ptr)+1);
+
+ if (strict && (buf[0] == '-' || buf[0] == '+') && buf[1] == '\0')
+ goto invalid;
+
+ if (seq_given != NULL) {
+ *seq_given = 1;
+ }
+
+ /* Handle the "-" and "+" special cases. */
+ if (buf[0] == '-' && buf[1] == '\0') {
+ id->ms = 0;
+ id->seq = 0;
+ return C_OK;
+ } else if (buf[0] == '+' && buf[1] == '\0') {
+ id->ms = UINT64_MAX;
+ id->seq = UINT64_MAX;
+ return C_OK;
+ }
+
+ /* Parse <ms>-<seq> form. */
+ unsigned long long ms, seq;
+ char *dot = strchr(buf,'-');
+ if (dot) *dot = '\0';
+ if (string2ull(buf,&ms) == 0) goto invalid;
+ if (dot) {
+ size_t seqlen = strlen(dot+1);
+ if (seq_given != NULL && seqlen == 1 && *(dot + 1) == '*') {
+ /* Handle the <ms>-* form. */
+ seq = 0;
+ *seq_given = 0;
+ } else if (string2ull(dot+1,&seq) == 0) {
+ goto invalid;
+ }
+ } else {
+ seq = missing_seq;
+ }
+ id->ms = ms;
+ id->seq = seq;
+ return C_OK;
+
+invalid:
+ if (c) addReplyError(c,"Invalid stream ID specified as stream "
+ "command argument");
+ return C_ERR;
+}
+
+/* Wrapper for streamGenericParseIDOrReply() used by module API. */
+int streamParseID(const robj *o, streamID *id) {
+ return streamGenericParseIDOrReply(NULL,o,id,0,0,NULL);
+}
+
+/* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to
+ * 0, to be used when - and + are acceptable IDs. */
+int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) {
+ return streamGenericParseIDOrReply(c,o,id,missing_seq,0,NULL);
+}
+
+/* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to
+ * 1, to be used when we want to return an error if the special IDs + or -
+ * are provided. */
+int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int *seq_given) {
+ return streamGenericParseIDOrReply(c,o,id,missing_seq,1,seq_given);
+}
+
+/* Helper for parsing a stream ID that is a range query interval. When the
+ * exclude argument is NULL, streamParseIDOrReply() is called and the interval
+ * is treated as close (inclusive). Otherwise, the exclude argument is set if
+ * the interval is open (the "(" prefix) and streamParseStrictIDOrReply() is
+ * called in that case.
+ */
+int streamParseIntervalIDOrReply(client *c, robj *o, streamID *id, int *exclude, uint64_t missing_seq) {
+ char *p = o->ptr;
+ size_t len = sdslen(p);
+ int invalid = 0;
+
+ if (exclude != NULL) *exclude = (len > 1 && p[0] == '(');
+ if (exclude != NULL && *exclude) {
+ robj *t = createStringObject(p+1,len-1);
+ invalid = (streamParseStrictIDOrReply(c,t,id,missing_seq,NULL) == C_ERR);
+ decrRefCount(t);
+ } else
+ invalid = (streamParseIDOrReply(c,o,id,missing_seq) == C_ERR);
+ if (invalid)
+ return C_ERR;
+ return C_OK;
+}
+
+void streamRewriteApproxSpecifier(client *c, int idx) {
+ rewriteClientCommandArgument(c,idx,shared.special_equals);
+}
+
+/* We propagate MAXLEN/MINID ~ <count> as MAXLEN/MINID = <resulting-len-of-stream>
+ * otherwise trimming is no longer deterministic on replicas / AOF. */
+void streamRewriteTrimArgument(client *c, stream *s, int trim_strategy, int idx) {
+ robj *arg;
+ if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
+ arg = createStringObjectFromLongLong(s->length);
+ } else {
+ streamID first_id;
+ streamGetEdgeID(s,1,0,&first_id);
+ arg = createObjectFromStreamID(&first_id);
+ }
+
+ rewriteClientCommandArgument(c,idx,arg);
+ decrRefCount(arg);
+}
+
+/* XADD key [(MAXLEN [~|=] <count> | MINID [~|=] <id>) [LIMIT <entries>]] [NOMKSTREAM] <ID or *> [field value] [field value] ... */
+void xaddCommand(client *c) {
+ /* Parse options. */
+ streamAddTrimArgs parsed_args;
+ int idpos = streamParseAddOrTrimArgsOrReply(c, &parsed_args, 1);
+ if (idpos < 0)
+ return; /* streamParseAddOrTrimArgsOrReply already replied. */
+ int field_pos = idpos+1; /* The ID is always one argument before the first field */
+
+ /* Check arity. */
+ if ((c->argc - field_pos) < 2 || ((c->argc-field_pos) % 2) == 1) {
+ addReplyErrorArity(c);
+ return;
+ }
+
+ /* Return ASAP if minimal ID (0-0) was given so we avoid possibly creating
+ * a new stream and have streamAppendItem fail, leaving an empty key in the
+ * database. */
+ if (parsed_args.id_given && parsed_args.seq_given &&
+ parsed_args.id.ms == 0 && parsed_args.id.seq == 0)
+ {
+ addReplyError(c,"The ID specified in XADD must be greater than 0-0");
+ return;
+ }
+
+ /* Lookup the stream at key. */
+ robj *o;
+ stream *s;
+ if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1],parsed_args.no_mkstream)) == NULL) return;
+ s = o->ptr;
+
+ /* Return ASAP if the stream has reached the last possible ID */
+ if (s->last_id.ms == UINT64_MAX && s->last_id.seq == UINT64_MAX) {
+ addReplyError(c,"The stream has exhausted the last possible ID, "
+ "unable to add more items");
+ return;
+ }
+
+ /* Append using the low level function and return the ID. */
+ errno = 0;
+ streamID id;
+ if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2,
+ &id,parsed_args.id_given ? &parsed_args.id : NULL,parsed_args.seq_given) == C_ERR)
+ {
+ serverAssert(errno != 0);
+ if (errno == EDOM)
+ addReplyError(c,"The ID specified in XADD is equal or smaller than "
+ "the target stream top item");
+ else
+ addReplyError(c,"Elements are too large to be stored");
+ return;
+ }
+ sds replyid = createStreamIDString(&id);
+ addReplyBulkCBuffer(c, replyid, sdslen(replyid));
+
+ signalModifiedKey(c,c->db,c->argv[1]);
+ notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);
+ server.dirty++;
+
+ /* Trim if needed. */
+ if (parsed_args.trim_strategy != TRIM_STRATEGY_NONE) {
+ if (streamTrim(s, &parsed_args)) {
+ notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
+ }
+ if (parsed_args.approx_trim) {
+ /* In case our trimming was limited (by LIMIT or by ~) we must
+ * re-write the relevant trim argument to make sure there will be
+ * no inconsistencies in AOF loading or in the replica.
+ * It's enough to check only args->approx because there is no
+ * way LIMIT is given without the ~ option. */
+ streamRewriteApproxSpecifier(c,parsed_args.trim_strategy_arg_idx-1);
+ streamRewriteTrimArgument(c,s,parsed_args.trim_strategy,parsed_args.trim_strategy_arg_idx);
+ }
+ }
+
+ /* Let's rewrite the ID argument with the one actually generated for
+ * AOF/replication propagation. */
+ if (!parsed_args.id_given || !parsed_args.seq_given) {
+ robj *idarg = createObject(OBJ_STRING, replyid);
+ rewriteClientCommandArgument(c, idpos, idarg);
+ decrRefCount(idarg);
+ } else {
+ sdsfree(replyid);
+ }
+
+ /* We need to signal to blocked clients that there is new data on this
+ * stream. */
+ signalKeyAsReady(c->db, c->argv[1], OBJ_STREAM);
+}
+
+/* XRANGE/XREVRANGE actual implementation.
+ * The 'start' and 'end' IDs are parsed as follows:
+ * Incomplete 'start' has its sequence set to 0, and 'end' to UINT64_MAX.
+ * "-" and "+"" mean the minimal and maximal ID values, respectively.
+ * The "(" prefix means an open (exclusive) range, so XRANGE stream (1-0 (2-0
+ * will match anything from 1-1 and 1-UINT64_MAX.
+ */
+void xrangeGenericCommand(client *c, int rev) {
+ robj *o;
+ stream *s;
+ streamID startid, endid;
+ long long count = -1;
+ robj *startarg = rev ? c->argv[3] : c->argv[2];
+ robj *endarg = rev ? c->argv[2] : c->argv[3];
+ int startex = 0, endex = 0;
+
+ /* Parse start and end IDs. */
+ if (streamParseIntervalIDOrReply(c,startarg,&startid,&startex,0) != C_OK)
+ return;
+ if (startex && streamIncrID(&startid) != C_OK) {
+ addReplyError(c,"invalid start ID for the interval");
+ return;
+ }
+ if (streamParseIntervalIDOrReply(c,endarg,&endid,&endex,UINT64_MAX) != C_OK)
+ return;
+ if (endex && streamDecrID(&endid) != C_OK) {
+ addReplyError(c,"invalid end ID for the interval");
+ return;
+ }
+
+ /* Parse the COUNT option if any. */
+ if (c->argc > 4) {
+ for (int j = 4; j < c->argc; j++) {
+ int additional = c->argc-j-1;
+ if (strcasecmp(c->argv[j]->ptr,"COUNT") == 0 && additional >= 1) {
+ if (getLongLongFromObjectOrReply(c,c->argv[j+1],&count,NULL)
+ != C_OK) return;
+ if (count < 0) count = 0;
+ j++; /* Consume additional arg. */
+ } else {
+ addReplyErrorObject(c,shared.syntaxerr);
+ return;
+ }
+ }
+ }
+
+ /* Return the specified range to the user. */
+ if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL ||
+ checkType(c,o,OBJ_STREAM)) return;
+
+ s = o->ptr;
+
+ if (count == 0) {
+ addReplyNullArray(c);
+ } else {
+ if (count == -1) count = 0;
+ streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL);
+ }
+}
+
+/* XRANGE key start end [COUNT <n>] */
+void xrangeCommand(client *c) {
+ xrangeGenericCommand(c,0);
+}
+
+/* XREVRANGE key end start [COUNT <n>] */
+void xrevrangeCommand(client *c) {
+ xrangeGenericCommand(c,1);
+}
+
+/* XLEN key*/
+void xlenCommand(client *c) {
+ robj *o;
+ if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL
+ || checkType(c,o,OBJ_STREAM)) return;
+ stream *s = o->ptr;
+ addReplyLongLong(c,s->length);
+}
+
+/* XREAD [BLOCK <milliseconds>] [COUNT <count>] STREAMS key_1 key_2 ... key_N
+ * ID_1 ID_2 ... ID_N
+ *
+ * This function also implements the XREADGROUP command, which is like XREAD
+ * but accepting the [GROUP group-name consumer-name] additional option.
+ * This is useful because while XREAD is a read command and can be called
+ * on slaves, XREADGROUP is not. */
+#define XREAD_BLOCKED_DEFAULT_COUNT 1000
+void xreadCommand(client *c) {
+ long long timeout = -1; /* -1 means, no BLOCK argument given. */
+ long long count = 0;
+ int streams_count = 0;
+ int streams_arg = 0;
+ int noack = 0; /* True if NOACK option was specified. */
+ streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
+ streamID *ids = static_ids;
+ streamCG **groups = NULL;
+ int xreadgroup = sdslen(c->argv[0]->ptr) == 10; /* XREAD or XREADGROUP? */
+ robj *groupname = NULL;
+ robj *consumername = NULL;
+
+ /* Parse arguments. */
+ for (int i = 1; i < c->argc; i++) {
+ int moreargs = c->argc-i-1;
+ char *o = c->argv[i]->ptr;
+ if (!strcasecmp(o,"BLOCK") && moreargs) {
+ if (c->flags & CLIENT_SCRIPT) {
+ /*
+ * Although the CLIENT_DENY_BLOCKING flag should protect from blocking the client
+ * on Lua/MULTI/RM_Call we want special treatment for Lua to keep backward compatibility.
+ * There is no sense to use BLOCK option within Lua. */
+ addReplyErrorFormat(c, "%s command is not allowed with BLOCK option from scripts", (char *)c->argv[0]->ptr);
+ return;
+ }
+ i++;
+ if (getTimeoutFromObjectOrReply(c,c->argv[i],&timeout,
+ UNIT_MILLISECONDS) != C_OK) return;
+ } else if (!strcasecmp(o,"COUNT") && moreargs) {
+ i++;
+ if (getLongLongFromObjectOrReply(c,c->argv[i],&count,NULL) != C_OK)
+ return;
+ if (count < 0) count = 0;
+ } else if (!strcasecmp(o,"STREAMS") && moreargs) {
+ streams_arg = i+1;
+ streams_count = (c->argc-streams_arg);
+ if ((streams_count % 2) != 0) {
+ char symbol = xreadgroup ? '>' : '$';
+ addReplyErrorFormat(c,"Unbalanced '%s' list of streams: "
+ "for each stream key an ID or '%c' must be "
+ "specified.", c->cmd->fullname,symbol);
+ return;
+ }
+ streams_count /= 2; /* We have two arguments for each stream. */
+ break;
+ } else if (!strcasecmp(o,"GROUP") && moreargs >= 2) {
+ if (!xreadgroup) {
+ addReplyError(c,"The GROUP option is only supported by "
+ "XREADGROUP. You called XREAD instead.");
+ return;
+ }
+ groupname = c->argv[i+1];
+ consumername = c->argv[i+2];
+ i += 2;
+ } else if (!strcasecmp(o,"NOACK")) {
+ if (!xreadgroup) {
+ addReplyError(c,"The NOACK option is only supported by "
+ "XREADGROUP. You called XREAD instead.");
+ return;
+ }
+ noack = 1;
+ } else {
+ addReplyErrorObject(c,shared.syntaxerr);
+ return;
+ }
+ }
+
+ /* STREAMS option is mandatory. */
+ if (streams_arg == 0) {
+ addReplyErrorObject(c,shared.syntaxerr);
+ return;
+ }
+
+ /* If the user specified XREADGROUP then it must also
+ * provide the GROUP option. */
+ if (xreadgroup && groupname == NULL) {
+ addReplyError(c,"Missing GROUP option for XREADGROUP");
+ return;
+ }
+
+ /* Parse the IDs and resolve the group name. */
+ if (streams_count > STREAMID_STATIC_VECTOR_LEN)
+ ids = zmalloc(sizeof(streamID)*streams_count);
+ if (groupname) groups = zmalloc(sizeof(streamCG*)*streams_count);
+
+ for (int i = streams_arg + streams_count; i < c->argc; i++) {
+ /* Specifying "$" as last-known-id means that the client wants to be
+ * served with just the messages that will arrive into the stream
+ * starting from now. */
+ int id_idx = i - streams_arg - streams_count;
+ robj *key = c->argv[i-streams_count];
+ robj *o = lookupKeyRead(c->db,key);
+ if (checkType(c,o,OBJ_STREAM)) goto cleanup;
+ streamCG *group = NULL;
+
+ /* If a group was specified, than we need to be sure that the
+ * key and group actually exist. */
+ if (groupname) {
+ if (o == NULL ||
+ (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL)
+ {
+ addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer "
+ "group '%s' in XREADGROUP with GROUP "
+ "option",
+ (char*)key->ptr,(char*)groupname->ptr);
+ goto cleanup;
+ }
+ groups[id_idx] = group;
+ }
+
+ if (strcmp(c->argv[i]->ptr,"$") == 0) {
+ if (xreadgroup) {
+ addReplyError(c,"The $ ID is meaningless in the context of "
+ "XREADGROUP: you want to read the history of "
+ "this consumer by specifying a proper ID, or "
+ "use the > ID to get new messages. The $ ID would "
+ "just return an empty result set.");
+ goto cleanup;
+ }
+ if (o) {
+ stream *s = o->ptr;
+ ids[id_idx] = s->last_id;
+ } else {
+ ids[id_idx].ms = 0;
+ ids[id_idx].seq = 0;
+ }
+ continue;
+ } else if (strcmp(c->argv[i]->ptr,">") == 0) {
+ if (!xreadgroup) {
+ addReplyError(c,"The > ID can be specified only when calling "
+ "XREADGROUP using the GROUP <group> "
+ "<consumer> option.");
+ goto cleanup;
+ }
+ /* We use just the maximum ID to signal this is a ">" ID, anyway
+ * the code handling the blocking clients will have to update the
+ * ID later in order to match the changing consumer group last ID. */
+ ids[id_idx].ms = UINT64_MAX;
+ ids[id_idx].seq = UINT64_MAX;
+ continue;
+ }
+ if (streamParseStrictIDOrReply(c,c->argv[i],ids+id_idx,0,NULL) != C_OK)
+ goto cleanup;
+ }
+
+ /* Try to serve the client synchronously. */
+ size_t arraylen = 0;
+ void *arraylen_ptr = NULL;
+ for (int i = 0; i < streams_count; i++) {
+ robj *o = lookupKeyRead(c->db,c->argv[streams_arg+i]);
+ if (o == NULL) continue;
+ stream *s = o->ptr;
+ streamID *gt = ids+i; /* ID must be greater than this. */
+ int serve_synchronously = 0;
+ int serve_history = 0; /* True for XREADGROUP with ID != ">". */
+ streamConsumer *consumer = NULL; /* Unused if XREAD */
+ streamPropInfo spi = {c->argv[streams_arg+i],groupname}; /* Unused if XREAD */
+
+ /* Check if there are the conditions to serve the client
+ * synchronously. */
+ if (groups) {
+ /* If the consumer is blocked on a group, we always serve it
+ * synchronously (serving its local history) if the ID specified
+ * was not the special ">" ID. */
+ if (gt->ms != UINT64_MAX ||
+ gt->seq != UINT64_MAX)
+ {
+ serve_synchronously = 1;
+ serve_history = 1;
+ } else if (s->length) {
+ /* We also want to serve a consumer in a consumer group
+ * synchronously in case the group top item delivered is smaller
+ * than what the stream has inside. */
+ streamID maxid, *last = &groups[i]->last_id;
+ streamLastValidID(s, &maxid);
+ if (streamCompareID(&maxid, last) > 0) {
+ serve_synchronously = 1;
+ *gt = *last;
+ }
+ }
+ consumer = streamLookupConsumer(groups[i],consumername->ptr);
+ if (consumer == NULL) {
+ consumer = streamCreateConsumer(groups[i],consumername->ptr,
+ c->argv[streams_arg+i],
+ c->db->id,SCC_DEFAULT);
+ if (noack)
+ streamPropagateConsumerCreation(c,spi.keyname,
+ spi.groupname,
+ consumer->name);
+ }
+ consumer->seen_time = commandTimeSnapshot();
+ } else if (s->length) {
+ /* For consumers without a group, we serve synchronously if we can
+ * actually provide at least one item from the stream. */
+ streamID maxid;
+ streamLastValidID(s, &maxid);
+ if (streamCompareID(&maxid, gt) > 0) {
+ serve_synchronously = 1;
+ }
+ }
+
+ if (serve_synchronously) {
+ arraylen++;
+ if (arraylen == 1) arraylen_ptr = addReplyDeferredLen(c);
+ /* streamReplyWithRange() handles the 'start' ID as inclusive,
+ * so start from the next ID, since we want only messages with
+ * IDs greater than start. */
+ streamID start = *gt;
+ streamIncrID(&start);
+
+ /* Emit the two elements sub-array consisting of the name
+ * of the stream and the data we extracted from it. */
+ if (c->resp == 2) addReplyArrayLen(c,2);
+ addReplyBulk(c,c->argv[streams_arg+i]);
+
+ int flags = 0;
+ if (noack) flags |= STREAM_RWR_NOACK;
+ if (serve_history) flags |= STREAM_RWR_HISTORY;
+ streamReplyWithRange(c,s,&start,NULL,count,0,
+ groups ? groups[i] : NULL,
+ consumer, flags, &spi);
+ if (groups) server.dirty++;
+ }
+ }
+
+ /* We replied synchronously! Set the top array len and return to caller. */
+ if (arraylen) {
+ if (c->resp == 2)
+ setDeferredArrayLen(c,arraylen_ptr,arraylen);
+ else
+ setDeferredMapLen(c,arraylen_ptr,arraylen);
+ goto cleanup;
+ }
+
+ /* Block if needed. */
+ if (timeout != -1) {
+ /* If we are not allowed to block the client, the only thing
+ * we can do is treating it as a timeout (even with timeout 0). */
+ if (c->flags & CLIENT_DENY_BLOCKING) {
+ addReplyNullArray(c);
+ goto cleanup;
+ }
+ /* We change the '$' to the current last ID for this stream. this is
+ * Since later on when we unblock on arriving data - we would like to
+ * re-process the command and in case '$' stays we will spin-block forever.
+ */
+ for (int id_idx = 0; id_idx < streams_count; id_idx++) {
+ int arg_idx = id_idx + streams_arg + streams_count;
+ if (strcmp(c->argv[arg_idx]->ptr,"$") == 0) {
+ robj *argv_streamid = createObjectFromStreamID(&ids[id_idx]);
+ rewriteClientCommandArgument(c, arg_idx, argv_streamid);
+ decrRefCount(argv_streamid);
+ }
+ }
+ blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count, timeout, xreadgroup);
+ goto cleanup;
+ }
+
+ /* No BLOCK option, nor any stream we can serve. Reply as with a
+ * timeout happened. */
+ addReplyNullArray(c);
+ /* Continue to cleanup... */
+
+cleanup: /* Cleanup. */
+
+ /* The command is propagated (in the READGROUP form) as a side effect
+ * of calling lower level APIs. So stop any implicit propagation. */
+ preventCommandPropagation(c);
+ if (ids != static_ids) zfree(ids);
+ zfree(groups);
+}
+
+/* -----------------------------------------------------------------------
+ * Low level implementation of consumer groups
+ * ----------------------------------------------------------------------- */
+
+/* Create a NACK entry setting the delivery count to 1 and the delivery
+ * time to the current time. The NACK consumer will be set to the one
+ * specified as argument of the function. */
+streamNACK *streamCreateNACK(streamConsumer *consumer) {
+ streamNACK *nack = zmalloc(sizeof(*nack));
+ nack->delivery_time = commandTimeSnapshot();
+ nack->delivery_count = 1;
+ nack->consumer = consumer;
+ return nack;
+}
+
+/* Free a NACK entry. */
+void streamFreeNACK(streamNACK *na) {
+ zfree(na);
+}
+
+/* Free a consumer and associated data structures. Note that this function
+ * will not reassign the pending messages associated with this consumer
+ * nor will delete them from the stream, so when this function is called
+ * to delete a consumer, and not when the whole stream is destroyed, the caller
+ * should do some work before. */
+void streamFreeConsumer(streamConsumer *sc) {
+ raxFree(sc->pel); /* No value free callback: the PEL entries are shared
+ between the consumer and the main stream PEL. */
+ sdsfree(sc->name);
+ zfree(sc);
+}
+
+/* Create a new consumer group in the context of the stream 's', having the
+ * specified name, last server ID and reads counter. If a consumer group with
+ * the same name already exists NULL is returned, otherwise the pointer to the
+ * consumer group is returned. */
+streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read) {
+ if (s->cgroups == NULL) s->cgroups = raxNew();
+ if (raxFind(s->cgroups,(unsigned char*)name,namelen) != raxNotFound)
+ return NULL;
+
+ streamCG *cg = zmalloc(sizeof(*cg));
+ cg->pel = raxNew();
+ cg->consumers = raxNew();
+ cg->last_id = *id;
+ cg->entries_read = entries_read;
+ raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL);
+ return cg;
+}
+
+/* Free a consumer group and all its associated data. */
+void streamFreeCG(streamCG *cg) {
+ raxFreeWithCallback(cg->pel,(void(*)(void*))streamFreeNACK);
+ raxFreeWithCallback(cg->consumers,(void(*)(void*))streamFreeConsumer);
+ zfree(cg);
+}
+
+/* Lookup the consumer group in the specified stream and returns its
+ * pointer, otherwise if there is no such group, NULL is returned. */
+streamCG *streamLookupCG(stream *s, sds groupname) {
+ if (s->cgroups == NULL) return NULL;
+ streamCG *cg = raxFind(s->cgroups,(unsigned char*)groupname,
+ sdslen(groupname));
+ return (cg == raxNotFound) ? NULL : cg;
+}
+
+/* Create a consumer with the specified name in the group 'cg' and return.
+ * If the consumer exists, return NULL. As a side effect, when the consumer
+ * is successfully created, the key space will be notified and dirty++ unless
+ * the SCC_NO_NOTIFY or SCC_NO_DIRTIFY flags is specified. */
+streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid, int flags) {
+ if (cg == NULL) return NULL;
+ int notify = !(flags & SCC_NO_NOTIFY);
+ int dirty = !(flags & SCC_NO_DIRTIFY);
+ streamConsumer *consumer = zmalloc(sizeof(*consumer));
+ int success = raxTryInsert(cg->consumers,(unsigned char*)name,
+ sdslen(name),consumer,NULL);
+ if (!success) {
+ zfree(consumer);
+ return NULL;
+ }
+ consumer->name = sdsdup(name);
+ consumer->pel = raxNew();
+ consumer->active_time = -1;
+ consumer->seen_time = commandTimeSnapshot();
+ if (dirty) server.dirty++;
+ if (notify) notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-createconsumer",key,dbid);
+ return consumer;
+}
+
+/* Lookup the consumer with the specified name in the group 'cg'. */
+streamConsumer *streamLookupConsumer(streamCG *cg, sds name) {
+ if (cg == NULL) return NULL;
+ streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name,
+ sdslen(name));
+ if (consumer == raxNotFound) return NULL;
+ return consumer;
+}
+
+/* Delete the consumer specified in the consumer group 'cg'. */
+void streamDelConsumer(streamCG *cg, streamConsumer *consumer) {
+ /* Iterate all the consumer pending messages, deleting every corresponding
+ * entry from the global entry. */
+ raxIterator ri;
+ raxStart(&ri,consumer->pel);
+ raxSeek(&ri,"^",NULL,0);
+ while(raxNext(&ri)) {
+ streamNACK *nack = ri.data;
+ raxRemove(cg->pel,ri.key,ri.key_len,NULL);
+ streamFreeNACK(nack);
+ }
+ raxStop(&ri);
+
+ /* Deallocate the consumer. */
+ raxRemove(cg->consumers,(unsigned char*)consumer->name,
+ sdslen(consumer->name),NULL);
+ streamFreeConsumer(consumer);
+}
+
+/* -----------------------------------------------------------------------
+ * Consumer groups commands
+ * ----------------------------------------------------------------------- */
+
+/* XGROUP CREATE <key> <groupname> <id or $> [MKSTREAM] [ENTRIESREAD entries_read]
+ * XGROUP SETID <key> <groupname> <id or $> [ENTRIESREAD entries_read]
+ * XGROUP DESTROY <key> <groupname>
+ * XGROUP CREATECONSUMER <key> <groupname> <consumer>
+ * XGROUP DELCONSUMER <key> <groupname> <consumername> */
+void xgroupCommand(client *c) {
+ stream *s = NULL;
+ sds grpname = NULL;
+ streamCG *cg = NULL;
+ char *opt = c->argv[1]->ptr; /* Subcommand name. */
+ int mkstream = 0;
+ long long entries_read = SCG_INVALID_ENTRIES_READ;
+ robj *o;
+
+ /* Everything but the "HELP" option requires a key and group name. */
+ if (c->argc >= 4) {
+ /* Parse optional arguments for CREATE and SETID */
+ int i = 5;
+ int create_subcmd = !strcasecmp(opt,"CREATE");
+ int setid_subcmd = !strcasecmp(opt,"SETID");
+ while (i < c->argc) {
+ if (create_subcmd && !strcasecmp(c->argv[i]->ptr,"MKSTREAM")) {
+ mkstream = 1;
+ i++;
+ } else if ((create_subcmd || setid_subcmd) && !strcasecmp(c->argv[i]->ptr,"ENTRIESREAD") && i + 1 < c->argc) {
+ if (getLongLongFromObjectOrReply(c,c->argv[i+1],&entries_read,NULL) != C_OK)
+ return;
+ if (entries_read < 0 && entries_read != SCG_INVALID_ENTRIES_READ) {
+ addReplyError(c,"value for ENTRIESREAD must be positive or -1");
+ return;
+ }
+ i += 2;
+ } else {
+ addReplySubcommandSyntaxError(c);
+ return;
+ }
+ }
+
+ o = lookupKeyWrite(c->db,c->argv[2]);
+ if (o) {
+ if (checkType(c,o,OBJ_STREAM)) return;
+ s = o->ptr;
+ }
+ grpname = c->argv[3]->ptr;
+ }
+
+ /* Check for missing key/group. */
+ if (c->argc >= 4 && !mkstream) {
+ /* At this point key must exist, or there is an error. */
+ if (s == NULL) {
+ addReplyError(c,
+ "The XGROUP subcommand requires the key to exist. "
+ "Note that for CREATE you may want to use the MKSTREAM "
+ "option to create an empty stream automatically.");
+ return;
+ }
+
+ /* Certain subcommands require the group to exist. */
+ if ((cg = streamLookupCG(s,grpname)) == NULL &&
+ (!strcasecmp(opt,"SETID") ||
+ !strcasecmp(opt,"CREATECONSUMER") ||
+ !strcasecmp(opt,"DELCONSUMER")))
+ {
+ addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' "
+ "for key name '%s'",
+ (char*)grpname, (char*)c->argv[2]->ptr);
+ return;
+ }
+ }
+
+ /* Dispatch the different subcommands. */
+ if (c->argc == 2 && !strcasecmp(opt,"HELP")) {
+ const char *help[] = {
+"CREATE <key> <groupname> <id|$> [option]",
+" Create a new consumer group. Options are:",
+" * MKSTREAM",
+" Create the empty stream if it does not exist.",
+" * ENTRIESREAD entries_read",
+" Set the group's entries_read counter (internal use).",
+"CREATECONSUMER <key> <groupname> <consumer>",
+" Create a new consumer in the specified group.",
+"DELCONSUMER <key> <groupname> <consumer>",
+" Remove the specified consumer.",
+"DESTROY <key> <groupname>",
+" Remove the specified group.",
+"SETID <key> <groupname> <id|$> [ENTRIESREAD entries_read]",
+" Set the current group ID and entries_read counter.",
+NULL
+ };
+ addReplyHelp(c, help);
+ } else if (!strcasecmp(opt,"CREATE") && (c->argc >= 5 && c->argc <= 8)) {
+ streamID id;
+ if (!strcmp(c->argv[4]->ptr,"$")) {
+ if (s) {
+ id = s->last_id;
+ } else {
+ id.ms = 0;
+ id.seq = 0;
+ }
+ } else if (streamParseStrictIDOrReply(c,c->argv[4],&id,0,NULL) != C_OK) {
+ return;
+ }
+
+ /* Handle the MKSTREAM option now that the command can no longer fail. */
+ if (s == NULL) {
+ serverAssert(mkstream);
+ o = createStreamObject();
+ dbAdd(c->db,c->argv[2],o);
+ s = o->ptr;
+ signalModifiedKey(c,c->db,c->argv[2]);
+ }
+
+ streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id,entries_read);
+ if (cg) {
+ addReply(c,shared.ok);
+ server.dirty++;
+ notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-create",
+ c->argv[2],c->db->id);
+ } else {
+ addReplyError(c,"-BUSYGROUP Consumer Group name already exists");
+ }
+ } else if (!strcasecmp(opt,"SETID") && (c->argc == 5 || c->argc == 7)) {
+ streamID id;
+ if (!strcmp(c->argv[4]->ptr,"$")) {
+ id = s->last_id;
+ } else if (streamParseIDOrReply(c,c->argv[4],&id,0) != C_OK) {
+ return;
+ }
+ cg->last_id = id;
+ cg->entries_read = entries_read;
+ addReply(c,shared.ok);
+ server.dirty++;
+ notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-setid",c->argv[2],c->db->id);
+ } else if (!strcasecmp(opt,"DESTROY") && c->argc == 4) {
+ if (cg) {
+ raxRemove(s->cgroups,(unsigned char*)grpname,sdslen(grpname),NULL);
+ streamFreeCG(cg);
+ addReply(c,shared.cone);
+ server.dirty++;
+ notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-destroy",
+ c->argv[2],c->db->id);
+ /* We want to unblock any XREADGROUP consumers with -NOGROUP. */
+ signalKeyAsReady(c->db,c->argv[2],OBJ_STREAM);
+ } else {
+ addReply(c,shared.czero);
+ }
+ } else if (!strcasecmp(opt,"CREATECONSUMER") && c->argc == 5) {
+ streamConsumer *created = streamCreateConsumer(cg,c->argv[4]->ptr,c->argv[2],
+ c->db->id,SCC_DEFAULT);
+ addReplyLongLong(c,created ? 1 : 0);
+ } else if (!strcasecmp(opt,"DELCONSUMER") && c->argc == 5) {
+ long long pending = 0;
+ streamConsumer *consumer = streamLookupConsumer(cg,c->argv[4]->ptr);
+ if (consumer) {
+ /* Delete the consumer and returns the number of pending messages
+ * that were yet associated with such a consumer. */
+ pending = raxSize(consumer->pel);
+ streamDelConsumer(cg,consumer);
+ server.dirty++;
+ notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-delconsumer",
+ c->argv[2],c->db->id);
+ }
+ addReplyLongLong(c,pending);
+ } else {
+ addReplySubcommandSyntaxError(c);
+ }
+}
+
+/* XSETID <stream> <id> [ENTRIESADDED entries_added] [MAXDELETEDID max_deleted_entry_id]
+ *
+ * Set the internal "last ID", "added entries" and "maximal deleted entry ID"
+ * of a stream. */
+void xsetidCommand(client *c) {
+ streamID id, max_xdel_id = {0, 0};
+ long long entries_added = -1;
+
+ if (streamParseStrictIDOrReply(c,c->argv[2],&id,0,NULL) != C_OK)
+ return;
+
+ int i = 3;
+ while (i < c->argc) {
+ int moreargs = (c->argc-1) - i; /* Number of additional arguments. */
+ char *opt = c->argv[i]->ptr;
+ if (!strcasecmp(opt,"ENTRIESADDED") && moreargs) {
+ if (getLongLongFromObjectOrReply(c,c->argv[i+1],&entries_added,NULL) != C_OK) {
+ return;
+ } else if (entries_added < 0) {
+ addReplyError(c,"entries_added must be positive");
+ return;
+ }
+ i += 2;
+ } else if (!strcasecmp(opt,"MAXDELETEDID") && moreargs) {
+ if (streamParseStrictIDOrReply(c,c->argv[i+1],&max_xdel_id,0,NULL) != C_OK) {
+ return;
+ } else if (streamCompareID(&id,&max_xdel_id) < 0) {
+ addReplyError(c,"The ID specified in XSETID is smaller than the provided max_deleted_entry_id");
+ return;
+ }
+ i += 2;
+ } else {
+ addReplyErrorObject(c,shared.syntaxerr);
+ return;
+ }
+ }
+
+ robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
+ if (o == NULL || checkType(c,o,OBJ_STREAM)) return;
+ stream *s = o->ptr;
+
+ if (streamCompareID(&id,&s->max_deleted_entry_id) < 0) {
+ addReplyError(c,"The ID specified in XSETID is smaller than current max_deleted_entry_id");
+ return;
+ }
+
+ /* If the stream has at least one item, we want to check that the user
+ * is setting a last ID that is equal or greater than the current top
+ * item, otherwise the fundamental ID monotonicity assumption is violated. */
+ if (s->length > 0) {
+ streamID maxid;
+ streamLastValidID(s,&maxid);
+
+ if (streamCompareID(&id,&maxid) < 0) {
+ addReplyError(c,"The ID specified in XSETID is smaller than the target stream top item");
+ return;
+ }
+
+ /* If an entries_added was provided, it can't be lower than the length. */
+ if (entries_added != -1 && s->length > (uint64_t)entries_added) {
+ addReplyError(c,"The entries_added specified in XSETID is smaller than the target stream length");
+ return;
+ }
+ }
+
+ s->last_id = id;
+ if (entries_added != -1)
+ s->entries_added = entries_added;
+ if (!streamIDEqZero(&max_xdel_id))
+ s->max_deleted_entry_id = max_xdel_id;
+ addReply(c,shared.ok);
+ server.dirty++;
+ notifyKeyspaceEvent(NOTIFY_STREAM,"xsetid",c->argv[1],c->db->id);
+}
+
+/* XACK <key> <group> <id> <id> ... <id>
+ * Acknowledge a message as processed. In practical terms we just check the
+ * pending entries list (PEL) of the group, and delete the PEL entry both from
+ * the group and the consumer (pending messages are referenced in both places).
+ *
+ * Return value of the command is the number of messages successfully
+ * acknowledged, that is, the IDs we were actually able to resolve in the PEL.
+ */
+void xackCommand(client *c) {
+ streamCG *group = NULL;
+ robj *o = lookupKeyRead(c->db,c->argv[1]);
+ if (o) {
+ if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */
+ group = streamLookupCG(o->ptr,c->argv[2]->ptr);
+ }
+
+ /* No key or group? Nothing to ack. */
+ if (o == NULL || group == NULL) {
+ addReply(c,shared.czero);
+ return;
+ }
+
+ /* Start parsing the IDs, so that we abort ASAP if there is a syntax
+ * error: the return value of this command cannot be an error in case
+ * the client successfully acknowledged some messages, so it should be
+ * executed in a "all or nothing" fashion. */
+ streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
+ streamID *ids = static_ids;
+ int id_count = c->argc-3;
+ if (id_count > STREAMID_STATIC_VECTOR_LEN)
+ ids = zmalloc(sizeof(streamID)*id_count);
+ for (int j = 3; j < c->argc; j++) {
+ if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-3],0,NULL) != C_OK) goto cleanup;
+ }
+
+ int acknowledged = 0;
+ for (int j = 3; j < c->argc; j++) {
+ unsigned char buf[sizeof(streamID)];
+ streamEncodeID(buf,&ids[j-3]);
+
+ /* Lookup the ID in the group PEL: it will have a reference to the
+ * NACK structure that will have a reference to the consumer, so that
+ * we are able to remove the entry from both PELs. */
+ streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));
+ if (nack != raxNotFound) {
+ raxRemove(group->pel,buf,sizeof(buf),NULL);
+ raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
+ streamFreeNACK(nack);
+ acknowledged++;
+ server.dirty++;
+ }
+ }
+ addReplyLongLong(c,acknowledged);
+cleanup:
+ if (ids != static_ids) zfree(ids);
+}
+
+/* XPENDING <key> <group> [[IDLE <idle>] <start> <stop> <count> [<consumer>]]
+ *
+ * If start and stop are omitted, the command just outputs information about
+ * the amount of pending messages for the key/group pair, together with
+ * the minimum and maximum ID of pending messages.
+ *
+ * If start and stop are provided instead, the pending messages are returned
+ * with information about the current owner, number of deliveries and last
+ * delivery time and so forth. */
+void xpendingCommand(client *c) {
+ int justinfo = c->argc == 3; /* Without the range just outputs general
+ information about the PEL. */
+ robj *key = c->argv[1];
+ robj *groupname = c->argv[2];
+ robj *consumername = NULL;
+ streamID startid, endid;
+ long long count = 0;
+ long long minidle = 0;
+ int startex = 0, endex = 0;
+
+ /* Start and stop, and the consumer, can be omitted. Also the IDLE modifier. */
+ if (c->argc != 3 && (c->argc < 6 || c->argc > 9)) {
+ addReplyErrorObject(c,shared.syntaxerr);
+ return;
+ }
+
+ /* Parse start/end/count arguments ASAP if needed, in order to report
+ * syntax errors before any other error. */
+ if (c->argc >= 6) {
+ int startidx = 3; /* Without IDLE */
+
+ if (!strcasecmp(c->argv[3]->ptr, "IDLE")) {
+ if (getLongLongFromObjectOrReply(c, c->argv[4], &minidle, NULL) == C_ERR)
+ return;
+ if (c->argc < 8) {
+ /* If IDLE was provided we must have at least 'start end count' */
+ addReplyErrorObject(c,shared.syntaxerr);
+ return;
+ }
+ /* Search for rest of arguments after 'IDLE <idle>' */
+ startidx += 2;
+ }
+
+ /* count argument. */
+ if (getLongLongFromObjectOrReply(c,c->argv[startidx+2],&count,NULL) == C_ERR)
+ return;
+ if (count < 0) count = 0;
+
+ /* start and end arguments. */
+ if (streamParseIntervalIDOrReply(c,c->argv[startidx],&startid,&startex,0) != C_OK)
+ return;
+ if (startex && streamIncrID(&startid) != C_OK) {
+ addReplyError(c,"invalid start ID for the interval");
+ return;
+ }
+ if (streamParseIntervalIDOrReply(c,c->argv[startidx+1],&endid,&endex,UINT64_MAX) != C_OK)
+ return;
+ if (endex && streamDecrID(&endid) != C_OK) {
+ addReplyError(c,"invalid end ID for the interval");
+ return;
+ }
+
+ if (startidx+3 < c->argc) {
+ /* 'consumer' was provided */
+ consumername = c->argv[startidx+3];
+ }
+ }
+
+ /* Lookup the key and the group inside the stream. */
+ robj *o = lookupKeyRead(c->db,c->argv[1]);
+ streamCG *group;
+
+ if (checkType(c,o,OBJ_STREAM)) return;
+ if (o == NULL ||
+ (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL)
+ {
+ addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer "
+ "group '%s'",
+ (char*)key->ptr,(char*)groupname->ptr);
+ return;
+ }
+
+ /* XPENDING <key> <group> variant. */
+ if (justinfo) {
+ addReplyArrayLen(c,4);
+ /* Total number of messages in the PEL. */
+ addReplyLongLong(c,raxSize(group->pel));
+ /* First and last IDs. */
+ if (raxSize(group->pel) == 0) {
+ addReplyNull(c); /* Start. */
+ addReplyNull(c); /* End. */
+ addReplyNullArray(c); /* Clients. */
+ } else {
+ /* Start. */
+ raxIterator ri;
+ raxStart(&ri,group->pel);
+ raxSeek(&ri,"^",NULL,0);
+ raxNext(&ri);
+ streamDecodeID(ri.key,&startid);
+ addReplyStreamID(c,&startid);
+
+ /* End. */
+ raxSeek(&ri,"$",NULL,0);
+ raxNext(&ri);
+ streamDecodeID(ri.key,&endid);
+ addReplyStreamID(c,&endid);
+ raxStop(&ri);
+
+ /* Consumers with pending messages. */
+ raxStart(&ri,group->consumers);
+ raxSeek(&ri,"^",NULL,0);
+ void *arraylen_ptr = addReplyDeferredLen(c);
+ size_t arraylen = 0;
+ while(raxNext(&ri)) {
+ streamConsumer *consumer = ri.data;
+ if (raxSize(consumer->pel) == 0) continue;
+ addReplyArrayLen(c,2);
+ addReplyBulkCBuffer(c,ri.key,ri.key_len);
+ addReplyBulkLongLong(c,raxSize(consumer->pel));
+ arraylen++;
+ }
+ setDeferredArrayLen(c,arraylen_ptr,arraylen);
+ raxStop(&ri);
+ }
+ } else { /* <start>, <stop> and <count> provided, return actual pending entries (not just info) */
+ streamConsumer *consumer = NULL;
+ if (consumername) {
+ consumer = streamLookupConsumer(group,consumername->ptr);
+
+ /* If a consumer name was mentioned but it does not exist, we can
+ * just return an empty array. */
+ if (consumer == NULL) {
+ addReplyArrayLen(c,0);
+ return;
+ }
+ }
+
+ rax *pel = consumer ? consumer->pel : group->pel;
+ unsigned char startkey[sizeof(streamID)];
+ unsigned char endkey[sizeof(streamID)];
+ raxIterator ri;
+ mstime_t now = commandTimeSnapshot();
+
+ streamEncodeID(startkey,&startid);
+ streamEncodeID(endkey,&endid);
+ raxStart(&ri,pel);
+ raxSeek(&ri,">=",startkey,sizeof(startkey));
+ void *arraylen_ptr = addReplyDeferredLen(c);
+ size_t arraylen = 0;
+
+ while(count && raxNext(&ri) && memcmp(ri.key,endkey,ri.key_len) <= 0) {
+ streamNACK *nack = ri.data;
+
+ if (minidle) {
+ mstime_t this_idle = now - nack->delivery_time;
+ if (this_idle < minidle) continue;
+ }
+
+ arraylen++;
+ count--;
+ addReplyArrayLen(c,4);
+
+ /* Entry ID. */
+ streamID id;
+ streamDecodeID(ri.key,&id);
+ addReplyStreamID(c,&id);
+
+ /* Consumer name. */
+ addReplyBulkCBuffer(c,nack->consumer->name,
+ sdslen(nack->consumer->name));
+
+ /* Milliseconds elapsed since last delivery. */
+ mstime_t elapsed = now - nack->delivery_time;
+ if (elapsed < 0) elapsed = 0;
+ addReplyLongLong(c,elapsed);
+
+ /* Number of deliveries. */
+ addReplyLongLong(c,nack->delivery_count);
+ }
+ raxStop(&ri);
+ setDeferredArrayLen(c,arraylen_ptr,arraylen);
+ }
+}
+
+/* XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2>
+ * [IDLE <milliseconds>] [TIME <mstime>] [RETRYCOUNT <count>]
+ * [FORCE] [JUSTID]
+ *
+ * Changes ownership of one or multiple messages in the Pending Entries List
+ * of a given stream consumer group.
+ *
+ * If the message ID (among the specified ones) exists, and its idle
+ * time greater or equal to <min-idle-time>, then the message new owner
+ * becomes the specified <consumer>. If the minimum idle time specified
+ * is zero, messages are claimed regardless of their idle time.
+ *
+ * All the messages that cannot be found inside the pending entries list
+ * are ignored, but in case the FORCE option is used. In that case we
+ * create the NACK (representing a not yet acknowledged message) entry in
+ * the consumer group PEL.
+ *
+ * This command creates the consumer as side effect if it does not yet
+ * exists. Moreover the command reset the idle time of the message to 0,
+ * even if by using the IDLE or TIME options, the user can control the
+ * new idle time.
+ *
+ * The options at the end can be used in order to specify more attributes
+ * to set in the representation of the pending message:
+ *
+ * 1. IDLE <ms>:
+ * Set the idle time (last time it was delivered) of the message.
+ * If IDLE is not specified, an IDLE of 0 is assumed, that is,
+ * the time count is reset because the message has now a new
+ * owner trying to process it.
+ *
+ * 2. TIME <ms-unix-time>:
+ * This is the same as IDLE but instead of a relative amount of
+ * milliseconds, it sets the idle time to a specific unix time
+ * (in milliseconds). This is useful in order to rewrite the AOF
+ * file generating XCLAIM commands.
+ *
+ * 3. RETRYCOUNT <count>:
+ * Set the retry counter to the specified value. This counter is
+ * incremented every time a message is delivered again. Normally
+ * XCLAIM does not alter this counter, which is just served to clients
+ * when the XPENDING command is called: this way clients can detect
+ * anomalies, like messages that are never processed for some reason
+ * after a big number of delivery attempts.
+ *
+ * 4. FORCE:
+ * Creates the pending message entry in the PEL even if certain
+ * specified IDs are not already in the PEL assigned to a different
+ * client. However the message must be exist in the stream, otherwise
+ * the IDs of non existing messages are ignored.
+ *
+ * 5. JUSTID:
+ * Return just an array of IDs of messages successfully claimed,
+ * without returning the actual message.
+ *
+ * 6. LASTID <id>:
+ * Update the consumer group last ID with the specified ID if the
+ * current last ID is smaller than the provided one.
+ * This is used for replication / AOF, so that when we read from a
+ * consumer group, the XCLAIM that gets propagated to give ownership
+ * to the consumer, is also used in order to update the group current
+ * ID.
+ *
+ * The command returns an array of messages that the user
+ * successfully claimed, so that the caller is able to understand
+ * what messages it is now in charge of. */
+void xclaimCommand(client *c) {
+ streamCG *group = NULL;
+ robj *o = lookupKeyRead(c->db,c->argv[1]);
+ long long minidle; /* Minimum idle time argument. */
+ long long retrycount = -1; /* -1 means RETRYCOUNT option not given. */
+ mstime_t deliverytime = -1; /* -1 means IDLE/TIME options not given. */
+ int force = 0;
+ int justid = 0;
+
+ if (o) {
+ if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */
+ group = streamLookupCG(o->ptr,c->argv[2]->ptr);
+ }
+
+ /* No key or group? Send an error given that the group creation
+ * is mandatory. */
+ if (o == NULL || group == NULL) {
+ addReplyErrorFormat(c,"-NOGROUP No such key '%s' or "
+ "consumer group '%s'", (char*)c->argv[1]->ptr,
+ (char*)c->argv[2]->ptr);
+ return;
+ }
+
+ if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle,
+ "Invalid min-idle-time argument for XCLAIM")
+ != C_OK) return;
+ if (minidle < 0) minidle = 0;
+
+ /* Start parsing the IDs, so that we abort ASAP if there is a syntax
+ * error: the return value of this command cannot be an error in case
+ * the client successfully claimed some message, so it should be
+ * executed in a "all or nothing" fashion. */
+ int j;
+ streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
+ streamID *ids = static_ids;
+ int id_count = c->argc-5;
+ if (id_count > STREAMID_STATIC_VECTOR_LEN)
+ ids = zmalloc(sizeof(streamID)*id_count);
+ for (j = 5; j < c->argc; j++) {
+ if (streamParseStrictIDOrReply(NULL,c->argv[j],&ids[j-5],0,NULL) != C_OK) break;
+ }
+ int last_id_arg = j-1; /* Next time we iterate the IDs we now the range. */
+
+ /* If we stopped because some IDs cannot be parsed, perhaps they
+ * are trailing options. */
+ mstime_t now = commandTimeSnapshot();
+ streamID last_id = {0,0};
+ int propagate_last_id = 0;
+ for (; j < c->argc; j++) {
+ int moreargs = (c->argc-1) - j; /* Number of additional arguments. */
+ char *opt = c->argv[j]->ptr;
+ if (!strcasecmp(opt,"FORCE")) {
+ force = 1;
+ } else if (!strcasecmp(opt,"JUSTID")) {
+ justid = 1;
+ } else if (!strcasecmp(opt,"IDLE") && moreargs) {
+ j++;
+ if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime,
+ "Invalid IDLE option argument for XCLAIM")
+ != C_OK) goto cleanup;
+ deliverytime = now - deliverytime;
+ } else if (!strcasecmp(opt,"TIME") && moreargs) {
+ j++;
+ if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime,
+ "Invalid TIME option argument for XCLAIM")
+ != C_OK) goto cleanup;
+ } else if (!strcasecmp(opt,"RETRYCOUNT") && moreargs) {
+ j++;
+ if (getLongLongFromObjectOrReply(c,c->argv[j],&retrycount,
+ "Invalid RETRYCOUNT option argument for XCLAIM")
+ != C_OK) goto cleanup;
+ } else if (!strcasecmp(opt,"LASTID") && moreargs) {
+ j++;
+ if (streamParseStrictIDOrReply(c,c->argv[j],&last_id,0,NULL) != C_OK) goto cleanup;
+ } else {
+ addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt);
+ goto cleanup;
+ }
+ }
+
+ if (streamCompareID(&last_id,&group->last_id) > 0) {
+ group->last_id = last_id;
+ propagate_last_id = 1;
+ }
+
+ if (deliverytime != -1) {
+ /* If a delivery time was passed, either with IDLE or TIME, we
+ * do some sanity check on it, and set the deliverytime to now
+ * (which is a sane choice usually) if the value is bogus.
+ * To raise an error here is not wise because clients may compute
+ * the idle time doing some math starting from their local time,
+ * and this is not a good excuse to fail in case, for instance,
+ * the computer time is a bit in the future from our POV. */
+ if (deliverytime < 0 || deliverytime > now) deliverytime = now;
+ } else {
+ /* If no IDLE/TIME option was passed, we want the last delivery
+ * time to be now, so that the idle time of the message will be
+ * zero. */
+ deliverytime = now;
+ }
+
+ /* Do the actual claiming. */
+ streamConsumer *consumer = streamLookupConsumer(group,c->argv[3]->ptr);
+ if (consumer == NULL) {
+ consumer = streamCreateConsumer(group,c->argv[3]->ptr,c->argv[1],c->db->id,SCC_DEFAULT);
+ }
+ consumer->seen_time = commandTimeSnapshot();
+
+ void *arraylenptr = addReplyDeferredLen(c);
+ size_t arraylen = 0;
+ for (int j = 5; j <= last_id_arg; j++) {
+ streamID id = ids[j-5];
+ unsigned char buf[sizeof(streamID)];
+ streamEncodeID(buf,&id);
+
+ /* Lookup the ID in the group PEL. */
+ streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));
+
+ /* Item must exist for us to transfer it to another consumer. */
+ if (!streamEntryExists(o->ptr,&id)) {
+ /* Clear this entry from the PEL, it no longer exists */
+ if (nack != raxNotFound) {
+ /* Propagate this change (we are going to delete the NACK). */
+ streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack);
+ propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */
+ server.dirty++;
+ /* Release the NACK */
+ raxRemove(group->pel,buf,sizeof(buf),NULL);
+ raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
+ streamFreeNACK(nack);
+ }
+ continue;
+ }
+
+ /* If FORCE is passed, let's check if at least the entry
+ * exists in the Stream. In such case, we'll create a new
+ * entry in the PEL from scratch, so that XCLAIM can also
+ * be used to create entries in the PEL. Useful for AOF
+ * and replication of consumer groups. */
+ if (force && nack == raxNotFound) {
+ /* Create the NACK. */
+ nack = streamCreateNACK(NULL);
+ raxInsert(group->pel,buf,sizeof(buf),nack,NULL);
+ }
+
+ if (nack != raxNotFound) {
+ /* We need to check if the minimum idle time requested
+ * by the caller is satisfied by this entry.
+ *
+ * Note that the nack could be created by FORCE, in this
+ * case there was no pre-existing entry and minidle should
+ * be ignored, but in that case nack->consumer is NULL. */
+ if (nack->consumer && minidle) {
+ mstime_t this_idle = now - nack->delivery_time;
+ if (this_idle < minidle) continue;
+ }
+
+ if (nack->consumer != consumer) {
+ /* Remove the entry from the old consumer.
+ * Note that nack->consumer is NULL if we created the
+ * NACK above because of the FORCE option. */
+ if (nack->consumer)
+ raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
+ }
+ nack->delivery_time = deliverytime;
+ /* Set the delivery attempts counter if given, otherwise
+ * autoincrement unless JUSTID option provided */
+ if (retrycount >= 0) {
+ nack->delivery_count = retrycount;
+ } else if (!justid) {
+ nack->delivery_count++;
+ }
+ if (nack->consumer != consumer) {
+ /* Add the entry in the new consumer local PEL. */
+ raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
+ nack->consumer = consumer;
+ }
+ /* Send the reply for this entry. */
+ if (justid) {
+ addReplyStreamID(c,&id);
+ } else {
+ serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL) == 1);
+ }
+ arraylen++;
+
+ consumer->active_time = commandTimeSnapshot();
+
+ /* Propagate this change. */
+ streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack);
+ propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */
+ server.dirty++;
+ }
+ }
+ if (propagate_last_id) {
+ streamPropagateGroupID(c,c->argv[1],group,c->argv[2]);
+ server.dirty++;
+ }
+ setDeferredArrayLen(c,arraylenptr,arraylen);
+ preventCommandPropagation(c);
+cleanup:
+ if (ids != static_ids) zfree(ids);
+}
+
+/* XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT <count>] [JUSTID]
+ *
+ * Changes ownership of one or multiple messages in the Pending Entries List
+ * of a given stream consumer group.
+ *
+ * For each PEL entry, if its idle time greater or equal to <min-idle-time>,
+ * then the message new owner becomes the specified <consumer>.
+ * If the minimum idle time specified is zero, messages are claimed
+ * regardless of their idle time.
+ *
+ * This command creates the consumer as side effect if it does not yet
+ * exists. Moreover the command reset the idle time of the message to 0.
+ *
+ * The command returns an array of messages that the user
+ * successfully claimed, so that the caller is able to understand
+ * what messages it is now in charge of. */
+void xautoclaimCommand(client *c) {
+ streamCG *group = NULL;
+ robj *o = lookupKeyRead(c->db,c->argv[1]);
+ long long minidle; /* Minimum idle time argument, in milliseconds. */
+ long count = 100; /* Maximum entries to claim. */
+ const unsigned attempts_factor = 10;
+ streamID startid;
+ int startex;
+ int justid = 0;
+
+ /* Parse idle/start/end/count arguments ASAP if needed, in order to report
+ * syntax errors before any other error. */
+ if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle,"Invalid min-idle-time argument for XAUTOCLAIM") != C_OK)
+ return;
+ if (minidle < 0) minidle = 0;
+
+ if (streamParseIntervalIDOrReply(c,c->argv[5],&startid,&startex,0) != C_OK)
+ return;
+ if (startex && streamIncrID(&startid) != C_OK) {
+ addReplyError(c,"invalid start ID for the interval");
+ return;
+ }
+
+ int j = 6; /* options start at argv[6] */
+ while(j < c->argc) {
+ int moreargs = (c->argc-1) - j; /* Number of additional arguments. */
+ char *opt = c->argv[j]->ptr;
+ if (!strcasecmp(opt,"COUNT") && moreargs) {
+ long max_count = LONG_MAX / (max(sizeof(streamID), attempts_factor));
+ if (getRangeLongFromObjectOrReply(c,c->argv[j+1],1,max_count,&count,"COUNT must be > 0") != C_OK)
+ return;
+ j++;
+ } else if (!strcasecmp(opt,"JUSTID")) {
+ justid = 1;
+ } else {
+ addReplyErrorObject(c,shared.syntaxerr);
+ return;
+ }
+ j++;
+ }
+
+ if (o) {
+ if (checkType(c,o,OBJ_STREAM))
+ return; /* Type error. */
+ group = streamLookupCG(o->ptr,c->argv[2]->ptr);
+ }
+
+ /* No key or group? Send an error given that the group creation
+ * is mandatory. */
+ if (o == NULL || group == NULL) {
+ addReplyErrorFormat(c,"-NOGROUP No such key '%s' or consumer group '%s'",
+ (char*)c->argv[1]->ptr,
+ (char*)c->argv[2]->ptr);
+ return;
+ }
+
+ streamID *deleted_ids = ztrymalloc(count * sizeof(streamID));
+ if (!deleted_ids) {
+ addReplyError(c, "Insufficient memory, failed allocating transient memory, COUNT too high.");
+ return;
+ }
+
+ /* Do the actual claiming. */
+ streamConsumer *consumer = streamLookupConsumer(group,c->argv[3]->ptr);
+ if (consumer == NULL) {
+ consumer = streamCreateConsumer(group,c->argv[3]->ptr,c->argv[1],c->db->id,SCC_DEFAULT);
+ }
+ consumer->seen_time = commandTimeSnapshot();
+
+ long long attempts = count * attempts_factor;
+
+ addReplyArrayLen(c, 3); /* We add another reply later */
+ void *endidptr = addReplyDeferredLen(c); /* reply[0] */
+ void *arraylenptr = addReplyDeferredLen(c); /* reply[1] */
+
+ unsigned char startkey[sizeof(streamID)];
+ streamEncodeID(startkey,&startid);
+ raxIterator ri;
+ raxStart(&ri,group->pel);
+ raxSeek(&ri,">=",startkey,sizeof(startkey));
+ size_t arraylen = 0;
+ mstime_t now = commandTimeSnapshot();
+ int deleted_id_num = 0;
+ while (attempts-- && count && raxNext(&ri)) {
+ streamNACK *nack = ri.data;
+
+ streamID id;
+ streamDecodeID(ri.key, &id);
+
+ /* Item must exist for us to transfer it to another consumer. */
+ if (!streamEntryExists(o->ptr,&id)) {
+ /* Propagate this change (we are going to delete the NACK). */
+ robj *idstr = createObjectFromStreamID(&id);
+ streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],idstr,nack);
+ decrRefCount(idstr);
+ server.dirty++;
+ /* Clear this entry from the PEL, it no longer exists */
+ raxRemove(group->pel,ri.key,ri.key_len,NULL);
+ raxRemove(nack->consumer->pel,ri.key,ri.key_len,NULL);
+ streamFreeNACK(nack);
+ /* Remember the ID for later */
+ deleted_ids[deleted_id_num++] = id;
+ raxSeek(&ri,">=",ri.key,ri.key_len);
+ count--; /* Count is a limit of the command response size. */
+ continue;
+ }
+
+ if (minidle) {
+ mstime_t this_idle = now - nack->delivery_time;
+ if (this_idle < minidle)
+ continue;
+ }
+
+ if (nack->consumer != consumer) {
+ /* Remove the entry from the old consumer.
+ * Note that nack->consumer is NULL if we created the
+ * NACK above because of the FORCE option. */
+ if (nack->consumer)
+ raxRemove(nack->consumer->pel,ri.key,ri.key_len,NULL);
+ }
+
+ /* Update the consumer and idle time. */
+ nack->delivery_time = now;
+ /* Increment the delivery attempts counter unless JUSTID option provided */
+ if (!justid)
+ nack->delivery_count++;
+
+ if (nack->consumer != consumer) {
+ /* Add the entry in the new consumer local PEL. */
+ raxInsert(consumer->pel,ri.key,ri.key_len,nack,NULL);
+ nack->consumer = consumer;
+ }
+
+ /* Send the reply for this entry. */
+ if (justid) {
+ addReplyStreamID(c,&id);
+ } else {
+ serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL) == 1);
+ }
+ arraylen++;
+ count--;
+
+ consumer->active_time = commandTimeSnapshot();
+
+ /* Propagate this change. */
+ robj *idstr = createObjectFromStreamID(&id);
+ streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],idstr,nack);
+ decrRefCount(idstr);
+ server.dirty++;
+ }
+
+ /* We need to return the next entry as a cursor for the next XAUTOCLAIM call */
+ raxNext(&ri);
+
+ streamID endid;
+ if (raxEOF(&ri)) {
+ endid.ms = endid.seq = 0;
+ } else {
+ streamDecodeID(ri.key, &endid);
+ }
+ raxStop(&ri);
+
+ setDeferredArrayLen(c,arraylenptr,arraylen);
+ setDeferredReplyStreamID(c,endidptr,&endid);
+
+ addReplyArrayLen(c, deleted_id_num); /* reply[2] */
+ for (int i = 0; i < deleted_id_num; i++) {
+ addReplyStreamID(c, &deleted_ids[i]);
+ }
+ zfree(deleted_ids);
+
+ preventCommandPropagation(c);
+}
+
+/* XDEL <key> [<ID1> <ID2> ... <IDN>]
+ *
+ * Removes the specified entries from the stream. Returns the number
+ * of items actually deleted, that may be different from the number
+ * of IDs passed in case certain IDs do not exist. */
+void xdelCommand(client *c) {
+ robj *o;
+
+ if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL
+ || checkType(c,o,OBJ_STREAM)) return;
+ stream *s = o->ptr;
+
+ /* We need to sanity check the IDs passed to start. Even if not
+ * a big issue, it is not great that the command is only partially
+ * executed because at some point an invalid ID is parsed. */
+ streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
+ streamID *ids = static_ids;
+ int id_count = c->argc-2;
+ if (id_count > STREAMID_STATIC_VECTOR_LEN)
+ ids = zmalloc(sizeof(streamID)*id_count);
+ for (int j = 2; j < c->argc; j++) {
+ if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-2],0,NULL) != C_OK) goto cleanup;
+ }
+
+ /* Actually apply the command. */
+ int deleted = 0;
+ int first_entry = 0;
+ for (int j = 2; j < c->argc; j++) {
+ streamID *id = &ids[j-2];
+ if (streamDeleteItem(s,id)) {
+ /* We want to know if the first entry in the stream was deleted
+ * so we can later set the new one. */
+ if (streamCompareID(id,&s->first_id) == 0) {
+ first_entry = 1;
+ }
+ /* Update the stream's maximal tombstone if needed. */
+ if (streamCompareID(id,&s->max_deleted_entry_id) > 0) {
+ s->max_deleted_entry_id = *id;
+ }
+ deleted++;
+ };
+ }
+
+ /* Update the stream's first ID. */
+ if (deleted) {
+ if (s->length == 0) {
+ s->first_id.ms = 0;
+ s->first_id.seq = 0;
+ } else if (first_entry) {
+ streamGetEdgeID(s,1,1,&s->first_id);
+ }
+ }
+
+ /* Propagate the write if needed. */
+ if (deleted) {
+ signalModifiedKey(c,c->db,c->argv[1]);
+ notifyKeyspaceEvent(NOTIFY_STREAM,"xdel",c->argv[1],c->db->id);
+ server.dirty += deleted;
+ }
+ addReplyLongLong(c,deleted);
+cleanup:
+ if (ids != static_ids) zfree(ids);
+}
+
+/* General form: XTRIM <key> [... options ...]
+ *
+ * List of options:
+ *
+ * Trim strategies:
+ *
+ * MAXLEN [~|=] <count> -- Trim so that the stream will be capped at
+ * the specified length. Use ~ before the
+ * count in order to demand approximated trimming
+ * (like XADD MAXLEN option).
+ * MINID [~|=] <id> -- Trim so that the stream will not contain entries
+ * with IDs smaller than 'id'. Use ~ before the
+ * count in order to demand approximated trimming
+ * (like XADD MINID option).
+ *
+ * Other options:
+ *
+ * LIMIT <entries> -- The maximum number of entries to trim.
+ * 0 means unlimited. Unless specified, it is set
+ * to a default of 100*server.stream_node_max_entries,
+ * and that's in order to keep the trimming time sane.
+ * Has meaning only if `~` was provided.
+ */
+void xtrimCommand(client *c) {
+ robj *o;
+
+ /* Argument parsing. */
+ streamAddTrimArgs parsed_args;
+ if (streamParseAddOrTrimArgsOrReply(c, &parsed_args, 0) < 0)
+ return; /* streamParseAddOrTrimArgsOrReply already replied. */
+
+ /* If the key does not exist, we are ok returning zero, that is, the
+ * number of elements removed from the stream. */
+ if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL
+ || checkType(c,o,OBJ_STREAM)) return;
+ stream *s = o->ptr;
+
+ /* Perform the trimming. */
+ int64_t deleted = streamTrim(s, &parsed_args);
+ if (deleted) {
+ notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
+ if (parsed_args.approx_trim) {
+ /* In case our trimming was limited (by LIMIT or by ~) we must
+ * re-write the relevant trim argument to make sure there will be
+ * no inconsistencies in AOF loading or in the replica.
+ * It's enough to check only args->approx because there is no
+ * way LIMIT is given without the ~ option. */
+ streamRewriteApproxSpecifier(c,parsed_args.trim_strategy_arg_idx-1);
+ streamRewriteTrimArgument(c,s,parsed_args.trim_strategy,parsed_args.trim_strategy_arg_idx);
+ }
+
+ /* Propagate the write. */
+ signalModifiedKey(c, c->db,c->argv[1]);
+ server.dirty += deleted;
+ }
+ addReplyLongLong(c,deleted);
+}
+
+/* Helper function for xinfoCommand.
+ * Handles the variants of XINFO STREAM */
+void xinfoReplyWithStreamInfo(client *c, stream *s) {
+ int full = 1;
+ long long count = 10; /* Default COUNT is 10 so we don't block the server */
+ robj **optv = c->argv + 3; /* Options start after XINFO STREAM <key> */
+ int optc = c->argc - 3;
+
+ /* Parse options. */
+ if (optc == 0) {
+ full = 0;
+ } else {
+ /* Valid options are [FULL] or [FULL COUNT <count>] */
+ if (optc != 1 && optc != 3) {
+ addReplySubcommandSyntaxError(c);
+ return;
+ }
+
+ /* First option must be "FULL" */
+ if (strcasecmp(optv[0]->ptr,"full")) {
+ addReplySubcommandSyntaxError(c);
+ return;
+ }
+
+ if (optc == 3) {
+ /* First option must be "FULL" */
+ if (strcasecmp(optv[1]->ptr,"count")) {
+ addReplySubcommandSyntaxError(c);
+ return;
+ }
+ if (getLongLongFromObjectOrReply(c,optv[2],&count,NULL) == C_ERR)
+ return;
+ if (count < 0) count = 10;
+ }
+ }
+
+ addReplyMapLen(c,full ? 9 : 10);
+ addReplyBulkCString(c,"length");
+ addReplyLongLong(c,s->length);
+ addReplyBulkCString(c,"radix-tree-keys");
+ addReplyLongLong(c,raxSize(s->rax));
+ addReplyBulkCString(c,"radix-tree-nodes");
+ addReplyLongLong(c,s->rax->numnodes);
+ addReplyBulkCString(c,"last-generated-id");
+ addReplyStreamID(c,&s->last_id);
+ addReplyBulkCString(c,"max-deleted-entry-id");
+ addReplyStreamID(c,&s->max_deleted_entry_id);
+ addReplyBulkCString(c,"entries-added");
+ addReplyLongLong(c,s->entries_added);
+ addReplyBulkCString(c,"recorded-first-entry-id");
+ addReplyStreamID(c,&s->first_id);
+
+ if (!full) {
+ /* XINFO STREAM <key> */
+
+ addReplyBulkCString(c,"groups");
+ addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0);
+
+ /* To emit the first/last entry we use streamReplyWithRange(). */
+ int emitted;
+ streamID start, end;
+ start.ms = start.seq = 0;
+ end.ms = end.seq = UINT64_MAX;
+ addReplyBulkCString(c,"first-entry");
+ emitted = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL,
+ STREAM_RWR_RAWENTRIES,NULL);
+ if (!emitted) addReplyNull(c);
+ addReplyBulkCString(c,"last-entry");
+ emitted = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL,
+ STREAM_RWR_RAWENTRIES,NULL);
+ if (!emitted) addReplyNull(c);
+ } else {
+ /* XINFO STREAM <key> FULL [COUNT <count>] */
+
+ /* Stream entries */
+ addReplyBulkCString(c,"entries");
+ streamReplyWithRange(c,s,NULL,NULL,count,0,NULL,NULL,0,NULL);
+
+ /* Consumer groups */
+ addReplyBulkCString(c,"groups");
+ if (s->cgroups == NULL) {
+ addReplyArrayLen(c,0);
+ } else {
+ addReplyArrayLen(c,raxSize(s->cgroups));
+ raxIterator ri_cgroups;
+ raxStart(&ri_cgroups,s->cgroups);
+ raxSeek(&ri_cgroups,"^",NULL,0);
+ while(raxNext(&ri_cgroups)) {
+ streamCG *cg = ri_cgroups.data;
+ addReplyMapLen(c,7);
+
+ /* Name */
+ addReplyBulkCString(c,"name");
+ addReplyBulkCBuffer(c,ri_cgroups.key,ri_cgroups.key_len);
+
+ /* Last delivered ID */
+ addReplyBulkCString(c,"last-delivered-id");
+ addReplyStreamID(c,&cg->last_id);
+
+ /* Read counter of the last delivered ID */
+ addReplyBulkCString(c,"entries-read");
+ if (cg->entries_read != SCG_INVALID_ENTRIES_READ) {
+ addReplyLongLong(c,cg->entries_read);
+ } else {
+ addReplyNull(c);
+ }
+
+ /* Group lag */
+ addReplyBulkCString(c,"lag");
+ streamReplyWithCGLag(c,s,cg);
+
+ /* Group PEL count */
+ addReplyBulkCString(c,"pel-count");
+ addReplyLongLong(c,raxSize(cg->pel));
+
+ /* Group PEL */
+ addReplyBulkCString(c,"pending");
+ long long arraylen_cg_pel = 0;
+ void *arrayptr_cg_pel = addReplyDeferredLen(c);
+ raxIterator ri_cg_pel;
+ raxStart(&ri_cg_pel,cg->pel);
+ raxSeek(&ri_cg_pel,"^",NULL,0);
+ while(raxNext(&ri_cg_pel) && (!count || arraylen_cg_pel < count)) {
+ streamNACK *nack = ri_cg_pel.data;
+ addReplyArrayLen(c,4);
+
+ /* Entry ID. */
+ streamID id;
+ streamDecodeID(ri_cg_pel.key,&id);
+ addReplyStreamID(c,&id);
+
+ /* Consumer name. */
+ serverAssert(nack->consumer); /* assertion for valgrind (avoid NPD) */
+ addReplyBulkCBuffer(c,nack->consumer->name,
+ sdslen(nack->consumer->name));
+
+ /* Last delivery. */
+ addReplyLongLong(c,nack->delivery_time);
+
+ /* Number of deliveries. */
+ addReplyLongLong(c,nack->delivery_count);
+
+ arraylen_cg_pel++;
+ }
+ setDeferredArrayLen(c,arrayptr_cg_pel,arraylen_cg_pel);
+ raxStop(&ri_cg_pel);
+
+ /* Consumers */
+ addReplyBulkCString(c,"consumers");
+ addReplyArrayLen(c,raxSize(cg->consumers));
+ raxIterator ri_consumers;
+ raxStart(&ri_consumers,cg->consumers);
+ raxSeek(&ri_consumers,"^",NULL,0);
+ while(raxNext(&ri_consumers)) {
+ streamConsumer *consumer = ri_consumers.data;
+ addReplyMapLen(c,5);
+
+ /* Consumer name */
+ addReplyBulkCString(c,"name");
+ addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name));
+
+ /* Seen-time */
+ addReplyBulkCString(c,"seen-time");
+ addReplyLongLong(c,consumer->seen_time);
+
+ /* Active-time */
+ addReplyBulkCString(c,"active-time");
+ addReplyLongLong(c,consumer->active_time);
+
+ /* Consumer PEL count */
+ addReplyBulkCString(c,"pel-count");
+ addReplyLongLong(c,raxSize(consumer->pel));
+
+ /* Consumer PEL */
+ addReplyBulkCString(c,"pending");
+ long long arraylen_cpel = 0;
+ void *arrayptr_cpel = addReplyDeferredLen(c);
+ raxIterator ri_cpel;
+ raxStart(&ri_cpel,consumer->pel);
+ raxSeek(&ri_cpel,"^",NULL,0);
+ while(raxNext(&ri_cpel) && (!count || arraylen_cpel < count)) {
+ streamNACK *nack = ri_cpel.data;
+ addReplyArrayLen(c,3);
+
+ /* Entry ID. */
+ streamID id;
+ streamDecodeID(ri_cpel.key,&id);
+ addReplyStreamID(c,&id);
+
+ /* Last delivery. */
+ addReplyLongLong(c,nack->delivery_time);
+
+ /* Number of deliveries. */
+ addReplyLongLong(c,nack->delivery_count);
+
+ arraylen_cpel++;
+ }
+ setDeferredArrayLen(c,arrayptr_cpel,arraylen_cpel);
+ raxStop(&ri_cpel);
+ }
+ raxStop(&ri_consumers);
+ }
+ raxStop(&ri_cgroups);
+ }
+ }
+}
+
+/* XINFO CONSUMERS <key> <group>
+ * XINFO GROUPS <key>
+ * XINFO STREAM <key> [FULL [COUNT <count>]]
+ * XINFO HELP. */
+void xinfoCommand(client *c) {
+ stream *s = NULL;
+ char *opt;
+ robj *key;
+
+ /* HELP is special. Handle it ASAP. */
+ if (!strcasecmp(c->argv[1]->ptr,"HELP")) {
+ const char *help[] = {
+"CONSUMERS <key> <groupname>",
+" Show consumers of <groupname>.",
+"GROUPS <key>",
+" Show the stream consumer groups.",
+"STREAM <key> [FULL [COUNT <count>]",
+" Show information about the stream.",
+NULL
+ };
+ addReplyHelp(c, help);
+ return;
+ }
+
+ /* With the exception of HELP handled before any other sub commands, all
+ * the ones are in the form of "<subcommand> <key>". */
+ opt = c->argv[1]->ptr;
+ key = c->argv[2];
+
+ /* Lookup the key now, this is common for all the subcommands but HELP. */
+ robj *o = lookupKeyReadOrReply(c,key,shared.nokeyerr);
+ if (o == NULL || checkType(c,o,OBJ_STREAM)) return;
+ s = o->ptr;
+
+ /* Dispatch the different subcommands. */
+ if (!strcasecmp(opt,"CONSUMERS") && c->argc == 4) {
+ /* XINFO CONSUMERS <key> <group>. */
+ streamCG *cg = streamLookupCG(s,c->argv[3]->ptr);
+ if (cg == NULL) {
+ addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' "
+ "for key name '%s'",
+ (char*)c->argv[3]->ptr, (char*)key->ptr);
+ return;
+ }
+
+ addReplyArrayLen(c,raxSize(cg->consumers));
+ raxIterator ri;
+ raxStart(&ri,cg->consumers);
+ raxSeek(&ri,"^",NULL,0);
+ mstime_t now = commandTimeSnapshot();
+ while(raxNext(&ri)) {
+ streamConsumer *consumer = ri.data;
+ mstime_t inactive = consumer->active_time != -1 ? now - consumer->active_time : consumer->active_time;
+ mstime_t idle = now - consumer->seen_time;
+ if (idle < 0) idle = 0;
+
+ addReplyMapLen(c,4);
+ addReplyBulkCString(c,"name");
+ addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name));
+ addReplyBulkCString(c,"pending");
+ addReplyLongLong(c,raxSize(consumer->pel));
+ addReplyBulkCString(c,"idle");
+ addReplyLongLong(c,idle);
+ addReplyBulkCString(c,"inactive");
+ addReplyLongLong(c,inactive);
+ }
+ raxStop(&ri);
+ } else if (!strcasecmp(opt,"GROUPS") && c->argc == 3) {
+ /* XINFO GROUPS <key>. */
+ if (s->cgroups == NULL) {
+ addReplyArrayLen(c,0);
+ return;
+ }
+
+ addReplyArrayLen(c,raxSize(s->cgroups));
+ raxIterator ri;
+ raxStart(&ri,s->cgroups);
+ raxSeek(&ri,"^",NULL,0);
+ while(raxNext(&ri)) {
+ streamCG *cg = ri.data;
+ addReplyMapLen(c,6);
+ addReplyBulkCString(c,"name");
+ addReplyBulkCBuffer(c,ri.key,ri.key_len);
+ addReplyBulkCString(c,"consumers");
+ addReplyLongLong(c,raxSize(cg->consumers));
+ addReplyBulkCString(c,"pending");
+ addReplyLongLong(c,raxSize(cg->pel));
+ addReplyBulkCString(c,"last-delivered-id");
+ addReplyStreamID(c,&cg->last_id);
+ addReplyBulkCString(c,"entries-read");
+ if (cg->entries_read != SCG_INVALID_ENTRIES_READ) {
+ addReplyLongLong(c,cg->entries_read);
+ } else {
+ addReplyNull(c);
+ }
+ addReplyBulkCString(c,"lag");
+ streamReplyWithCGLag(c,s,cg);
+ }
+ raxStop(&ri);
+ } else if (!strcasecmp(opt,"STREAM")) {
+ /* XINFO STREAM <key> [FULL [COUNT <count>]]. */
+ xinfoReplyWithStreamInfo(c,s);
+ } else {
+ addReplySubcommandSyntaxError(c);
+ }
+}
+
+/* Validate the integrity stream listpack entries structure. Both in term of a
+ * valid listpack, but also that the structure of the entries matches a valid
+ * stream. return 1 if valid 0 if not valid. */
+int streamValidateListpackIntegrity(unsigned char *lp, size_t size, int deep) {
+ int valid_record;
+ unsigned char *p, *next;
+
+ /* Since we don't want to run validation of all records twice, we'll
+ * run the listpack validation of just the header and do the rest here. */
+ if (!lpValidateIntegrity(lp, size, 0, NULL, NULL))
+ return 0;
+
+ /* In non-deep mode we just validated the listpack header (encoded size) */
+ if (!deep) return 1;
+
+ next = p = lpValidateFirst(lp);
+ if (!lpValidateNext(lp, &next, size)) return 0;
+ if (!p) return 0;
+
+ /* entry count */
+ int64_t entry_count = lpGetIntegerIfValid(p, &valid_record);
+ if (!valid_record) return 0;
+ p = next; if (!lpValidateNext(lp, &next, size)) return 0;
+
+ /* deleted */
+ int64_t deleted_count = lpGetIntegerIfValid(p, &valid_record);
+ if (!valid_record) return 0;
+ p = next; if (!lpValidateNext(lp, &next, size)) return 0;
+
+ /* num-of-fields */
+ int64_t master_fields = lpGetIntegerIfValid(p, &valid_record);
+ if (!valid_record) return 0;
+ p = next; if (!lpValidateNext(lp, &next, size)) return 0;
+
+ /* the field names */
+ for (int64_t j = 0; j < master_fields; j++) {
+ p = next; if (!lpValidateNext(lp, &next, size)) return 0;
+ }
+
+ /* the zero master entry terminator. */
+ int64_t zero = lpGetIntegerIfValid(p, &valid_record);
+ if (!valid_record || zero != 0) return 0;
+ p = next; if (!lpValidateNext(lp, &next, size)) return 0;
+
+ entry_count += deleted_count;
+ while (entry_count--) {
+ if (!p) return 0;
+ int64_t fields = master_fields, extra_fields = 3;
+ int64_t flags = lpGetIntegerIfValid(p, &valid_record);
+ if (!valid_record) return 0;
+ p = next; if (!lpValidateNext(lp, &next, size)) return 0;
+
+ /* entry id */
+ lpGetIntegerIfValid(p, &valid_record);
+ if (!valid_record) return 0;
+ p = next; if (!lpValidateNext(lp, &next, size)) return 0;
+ lpGetIntegerIfValid(p, &valid_record);
+ if (!valid_record) return 0;
+ p = next; if (!lpValidateNext(lp, &next, size)) return 0;
+
+ if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) {
+ /* num-of-fields */
+ fields = lpGetIntegerIfValid(p, &valid_record);
+ if (!valid_record) return 0;
+ p = next; if (!lpValidateNext(lp, &next, size)) return 0;
+
+ /* the field names */
+ for (int64_t j = 0; j < fields; j++) {
+ p = next; if (!lpValidateNext(lp, &next, size)) return 0;
+ }
+
+ extra_fields += fields + 1;
+ }
+
+ /* the values */
+ for (int64_t j = 0; j < fields; j++) {
+ p = next; if (!lpValidateNext(lp, &next, size)) return 0;
+ }
+
+ /* lp-count */
+ int64_t lp_count = lpGetIntegerIfValid(p, &valid_record);
+ if (!valid_record) return 0;
+ if (lp_count != fields + extra_fields) return 0;
+ p = next; if (!lpValidateNext(lp, &next, size)) return 0;
+ }
+
+ if (next)
+ return 0;
+
+ return 1;
+}