diff options
Diffstat (limited to 'src/t_stream.c')
-rw-r--r-- | src/t_stream.c | 4038 |
1 files changed, 4038 insertions, 0 deletions
diff --git a/src/t_stream.c b/src/t_stream.c new file mode 100644 index 0000000..5fcb631 --- /dev/null +++ b/src/t_stream.c @@ -0,0 +1,4038 @@ +/* + * Copyright (c) 2017, Salvatore Sanfilippo <antirez at gmail dot com> + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "server.h" +#include "endianconv.h" +#include "stream.h" + +/* Every stream item inside the listpack, has a flags field that is used to + * mark the entry as deleted, or having the same field as the "master" + * entry at the start of the listpack> */ +#define STREAM_ITEM_FLAG_NONE 0 /* No special flags. */ +#define STREAM_ITEM_FLAG_DELETED (1<<0) /* Entry is deleted. Skip it. */ +#define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */ + +/* For stream commands that require multiple IDs + * when the number of IDs is less than 'STREAMID_STATIC_VECTOR_LEN', + * avoid malloc allocation.*/ +#define STREAMID_STATIC_VECTOR_LEN 8 + +/* Max pre-allocation for listpack. This is done to avoid abuse of a user + * setting stream_node_max_bytes to a huge number. */ +#define STREAM_LISTPACK_MAX_PRE_ALLOCATE 4096 + +/* Don't let listpacks grow too big, even if the user config allows it. + * doing so can lead to an overflow (trying to store more than 32bit length + * into the listpack header), or actually an assertion since lpInsert + * will return NULL. */ +#define STREAM_LISTPACK_MAX_SIZE (1<<30) + +void streamFreeCG(streamCG *cg); +void streamFreeNACK(streamNACK *na); +size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer); +int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int *seq_given); +int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq); + +/* ----------------------------------------------------------------------- + * Low level stream encoding: a radix tree of listpacks. + * ----------------------------------------------------------------------- */ + +/* Create a new stream data structure. */ +stream *streamNew(void) { + stream *s = zmalloc(sizeof(*s)); + s->rax = raxNew(); + s->length = 0; + s->first_id.ms = 0; + s->first_id.seq = 0; + s->last_id.ms = 0; + s->last_id.seq = 0; + s->max_deleted_entry_id.seq = 0; + s->max_deleted_entry_id.ms = 0; + s->entries_added = 0; + s->cgroups = NULL; /* Created on demand to save memory when not used. */ + return s; +} + +/* Free a stream, including the listpacks stored inside the radix tree. */ +void freeStream(stream *s) { + raxFreeWithCallback(s->rax,(void(*)(void*))lpFree); + if (s->cgroups) + raxFreeWithCallback(s->cgroups,(void(*)(void*))streamFreeCG); + zfree(s); +} + +/* Return the length of a stream. */ +unsigned long streamLength(const robj *subject) { + stream *s = subject->ptr; + return s->length; +} + +/* Set 'id' to be its successor stream ID. + * If 'id' is the maximal possible id, it is wrapped around to 0-0 and a + * C_ERR is returned. */ +int streamIncrID(streamID *id) { + int ret = C_OK; + if (id->seq == UINT64_MAX) { + if (id->ms == UINT64_MAX) { + /* Special case where 'id' is the last possible streamID... */ + id->ms = id->seq = 0; + ret = C_ERR; + } else { + id->ms++; + id->seq = 0; + } + } else { + id->seq++; + } + return ret; +} + +/* Set 'id' to be its predecessor stream ID. + * If 'id' is the minimal possible id, it remains 0-0 and a C_ERR is + * returned. */ +int streamDecrID(streamID *id) { + int ret = C_OK; + if (id->seq == 0) { + if (id->ms == 0) { + /* Special case where 'id' is the first possible streamID... */ + id->ms = id->seq = UINT64_MAX; + ret = C_ERR; + } else { + id->ms--; + id->seq = UINT64_MAX; + } + } else { + id->seq--; + } + return ret; +} + +/* Generate the next stream item ID given the previous one. If the current + * milliseconds Unix time is greater than the previous one, just use this + * as time part and start with sequence part of zero. Otherwise we use the + * previous time (and never go backward) and increment the sequence. */ +void streamNextID(streamID *last_id, streamID *new_id) { + uint64_t ms = commandTimeSnapshot(); + if (ms > last_id->ms) { + new_id->ms = ms; + new_id->seq = 0; + } else { + *new_id = *last_id; + streamIncrID(new_id); + } +} + +/* This is a helper function for the COPY command. + * Duplicate a Stream object, with the guarantee that the returned object + * has the same encoding as the original one. + * + * The resulting object always has refcount set to 1 */ +robj *streamDup(robj *o) { + robj *sobj; + + serverAssert(o->type == OBJ_STREAM); + + switch (o->encoding) { + case OBJ_ENCODING_STREAM: + sobj = createStreamObject(); + break; + default: + serverPanic("Wrong encoding."); + break; + } + + stream *s; + stream *new_s; + s = o->ptr; + new_s = sobj->ptr; + + raxIterator ri; + uint64_t rax_key[2]; + raxStart(&ri, s->rax); + raxSeek(&ri, "^", NULL, 0); + size_t lp_bytes = 0; /* Total bytes in the listpack. */ + unsigned char *lp = NULL; /* listpack pointer. */ + /* Get a reference to the listpack node. */ + while (raxNext(&ri)) { + lp = ri.data; + lp_bytes = lpBytes(lp); + unsigned char *new_lp = zmalloc(lp_bytes); + memcpy(new_lp, lp, lp_bytes); + memcpy(rax_key, ri.key, sizeof(rax_key)); + raxInsert(new_s->rax, (unsigned char *)&rax_key, sizeof(rax_key), + new_lp, NULL); + } + new_s->length = s->length; + new_s->first_id = s->first_id; + new_s->last_id = s->last_id; + new_s->max_deleted_entry_id = s->max_deleted_entry_id; + new_s->entries_added = s->entries_added; + raxStop(&ri); + + if (s->cgroups == NULL) return sobj; + + /* Consumer Groups */ + raxIterator ri_cgroups; + raxStart(&ri_cgroups, s->cgroups); + raxSeek(&ri_cgroups, "^", NULL, 0); + while (raxNext(&ri_cgroups)) { + streamCG *cg = ri_cgroups.data; + streamCG *new_cg = streamCreateCG(new_s, (char *)ri_cgroups.key, + ri_cgroups.key_len, &cg->last_id, + cg->entries_read); + + serverAssert(new_cg != NULL); + + /* Consumer Group PEL */ + raxIterator ri_cg_pel; + raxStart(&ri_cg_pel,cg->pel); + raxSeek(&ri_cg_pel,"^",NULL,0); + while(raxNext(&ri_cg_pel)){ + streamNACK *nack = ri_cg_pel.data; + streamNACK *new_nack = streamCreateNACK(NULL); + new_nack->delivery_time = nack->delivery_time; + new_nack->delivery_count = nack->delivery_count; + raxInsert(new_cg->pel, ri_cg_pel.key, sizeof(streamID), new_nack, NULL); + } + raxStop(&ri_cg_pel); + + /* Consumers */ + raxIterator ri_consumers; + raxStart(&ri_consumers, cg->consumers); + raxSeek(&ri_consumers, "^", NULL, 0); + while (raxNext(&ri_consumers)) { + streamConsumer *consumer = ri_consumers.data; + streamConsumer *new_consumer; + new_consumer = zmalloc(sizeof(*new_consumer)); + new_consumer->name = sdsdup(consumer->name); + new_consumer->pel = raxNew(); + raxInsert(new_cg->consumers,(unsigned char *)new_consumer->name, + sdslen(new_consumer->name), new_consumer, NULL); + new_consumer->seen_time = consumer->seen_time; + new_consumer->active_time = consumer->active_time; + + /* Consumer PEL */ + raxIterator ri_cpel; + raxStart(&ri_cpel, consumer->pel); + raxSeek(&ri_cpel, "^", NULL, 0); + while (raxNext(&ri_cpel)) { + streamNACK *new_nack = raxFind(new_cg->pel,ri_cpel.key,sizeof(streamID)); + + serverAssert(new_nack != raxNotFound); + + new_nack->consumer = new_consumer; + raxInsert(new_consumer->pel,ri_cpel.key,sizeof(streamID),new_nack,NULL); + } + raxStop(&ri_cpel); + } + raxStop(&ri_consumers); + } + raxStop(&ri_cgroups); + return sobj; +} + +/* This is a wrapper function for lpGet() to directly get an integer value + * from the listpack (that may store numbers as a string), converting + * the string if needed. + * The 'valid" argument is an optional output parameter to get an indication + * if the record was valid, when this parameter is NULL, the function will + * fail with an assertion. */ +static inline int64_t lpGetIntegerIfValid(unsigned char *ele, int *valid) { + int64_t v; + unsigned char *e = lpGet(ele,&v,NULL); + if (e == NULL) { + if (valid) + *valid = 1; + return v; + } + /* The following code path should never be used for how listpacks work: + * they should always be able to store an int64_t value in integer + * encoded form. However the implementation may change. */ + long long ll; + int ret = string2ll((char*)e,v,&ll); + if (valid) + *valid = ret; + else + serverAssert(ret != 0); + v = ll; + return v; +} + +#define lpGetInteger(ele) lpGetIntegerIfValid(ele, NULL) + +/* Get an edge streamID of a given listpack. + * 'master_id' is an input param, used to build the 'edge_id' output param */ +int lpGetEdgeStreamID(unsigned char *lp, int first, streamID *master_id, streamID *edge_id) +{ + if (lp == NULL) + return 0; + + unsigned char *lp_ele; + + /* We need to seek either the first or the last entry depending + * on the direction of the iteration. */ + if (first) { + /* Get the master fields count. */ + lp_ele = lpFirst(lp); /* Seek items count */ + lp_ele = lpNext(lp, lp_ele); /* Seek deleted count. */ + lp_ele = lpNext(lp, lp_ele); /* Seek num fields. */ + int64_t master_fields_count = lpGetInteger(lp_ele); + lp_ele = lpNext(lp, lp_ele); /* Seek first field. */ + + /* If we are iterating in normal order, skip the master fields + * to seek the first actual entry. */ + for (int64_t i = 0; i < master_fields_count; i++) + lp_ele = lpNext(lp, lp_ele); + + /* If we are going forward, skip the previous entry's + * lp-count field (or in case of the master entry, the zero + * term field) */ + lp_ele = lpNext(lp, lp_ele); + if (lp_ele == NULL) + return 0; + } else { + /* If we are iterating in reverse direction, just seek the + * last part of the last entry in the listpack (that is, the + * fields count). */ + lp_ele = lpLast(lp); + + /* If we are going backward, read the number of elements this + * entry is composed of, and jump backward N times to seek + * its start. */ + int64_t lp_count = lpGetInteger(lp_ele); + if (lp_count == 0) /* We reached the master entry. */ + return 0; + + while (lp_count--) + lp_ele = lpPrev(lp, lp_ele); + } + + lp_ele = lpNext(lp, lp_ele); /* Seek ID (lp_ele currently points to 'flags'). */ + + /* Get the ID: it is encoded as difference between the master + * ID and this entry ID. */ + streamID id = *master_id; + id.ms += lpGetInteger(lp_ele); + lp_ele = lpNext(lp, lp_ele); + id.seq += lpGetInteger(lp_ele); + *edge_id = id; + return 1; +} + +/* Debugging function to log the full content of a listpack. Useful + * for development and debugging. */ +void streamLogListpackContent(unsigned char *lp) { + unsigned char *p = lpFirst(lp); + while(p) { + unsigned char buf[LP_INTBUF_SIZE]; + int64_t v; + unsigned char *ele = lpGet(p,&v,buf); + serverLog(LL_WARNING,"- [%d] '%.*s'", (int)v, (int)v, ele); + p = lpNext(lp,p); + } +} + +/* Convert the specified stream entry ID as a 128 bit big endian number, so + * that the IDs can be sorted lexicographically. */ +void streamEncodeID(void *buf, streamID *id) { + uint64_t e[2]; + e[0] = htonu64(id->ms); + e[1] = htonu64(id->seq); + memcpy(buf,e,sizeof(e)); +} + +/* This is the reverse of streamEncodeID(): the decoded ID will be stored + * in the 'id' structure passed by reference. The buffer 'buf' must point + * to a 128 bit big-endian encoded ID. */ +void streamDecodeID(void *buf, streamID *id) { + uint64_t e[2]; + memcpy(e,buf,sizeof(e)); + id->ms = ntohu64(e[0]); + id->seq = ntohu64(e[1]); +} + +/* Compare two stream IDs. Return -1 if a < b, 0 if a == b, 1 if a > b. */ +int streamCompareID(streamID *a, streamID *b) { + if (a->ms > b->ms) return 1; + else if (a->ms < b->ms) return -1; + /* The ms part is the same. Check the sequence part. */ + else if (a->seq > b->seq) return 1; + else if (a->seq < b->seq) return -1; + /* Everything is the same: IDs are equal. */ + return 0; +} + +/* Retrieves the ID of the stream edge entry. An edge is either the first or + * the last ID in the stream, and may be a tombstone. To filter out tombstones, + * set the'skip_tombstones' argument to 1. */ +void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id) +{ + streamIterator si; + int64_t numfields; + streamIteratorStart(&si,s,NULL,NULL,!first); + si.skip_tombstones = skip_tombstones; + int found = streamIteratorGetID(&si,edge_id,&numfields); + if (!found) { + streamID min_id = {0, 0}, max_id = {UINT64_MAX, UINT64_MAX}; + *edge_id = first ? max_id : min_id; + } + streamIteratorStop(&si); +} + +/* Adds a new item into the stream 's' having the specified number of + * field-value pairs as specified in 'numfields' and stored into 'argv'. + * Returns the new entry ID populating the 'added_id' structure. + * + * If 'use_id' is not NULL, the ID is not auto-generated by the function, + * but instead the passed ID is used to add the new entry. In this case + * adding the entry may fail as specified later in this comment. + * + * When 'use_id' is used alongside with a zero 'seq-given', the sequence + * part of the passed ID is ignored and the function will attempt to use an + * auto-generated sequence. + * + * The function returns C_OK if the item was added, this is always true + * if the ID was generated by the function. However the function may return + * C_ERR in several cases: + * 1. If an ID was given via 'use_id', but adding it failed since the + * current top ID is greater or equal. errno will be set to EDOM. + * 2. If a size of a single element or the sum of the elements is too big to + * be stored into the stream. errno will be set to ERANGE. */ +int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id, int seq_given) { + + /* Generate the new entry ID. */ + streamID id; + if (use_id) { + if (seq_given) { + id = *use_id; + } else { + /* The automatically generated sequence can be either zero (new + * timestamps) or the incremented sequence of the last ID. In the + * latter case, we need to prevent an overflow/advancing forward + * in time. */ + if (s->last_id.ms == use_id->ms) { + if (s->last_id.seq == UINT64_MAX) { + errno = EDOM; + return C_ERR; + } + id = s->last_id; + id.seq++; + } else { + id = *use_id; + } + } + } else { + streamNextID(&s->last_id,&id); + } + + /* Check that the new ID is greater than the last entry ID + * or return an error. Automatically generated IDs might + * overflow (and wrap-around) when incrementing the sequence + part. */ + if (streamCompareID(&id,&s->last_id) <= 0) { + errno = EDOM; + return C_ERR; + } + + /* Avoid overflow when trying to add an element to the stream (listpack + * can only host up to 32bit length strings, and also a total listpack size + * can't be bigger than 32bit length. */ + size_t totelelen = 0; + for (int64_t i = 0; i < numfields*2; i++) { + sds ele = argv[i]->ptr; + totelelen += sdslen(ele); + } + if (totelelen > STREAM_LISTPACK_MAX_SIZE) { + errno = ERANGE; + return C_ERR; + } + + /* Add the new entry. */ + raxIterator ri; + raxStart(&ri,s->rax); + raxSeek(&ri,"$",NULL,0); + + size_t lp_bytes = 0; /* Total bytes in the tail listpack. */ + unsigned char *lp = NULL; /* Tail listpack pointer. */ + + if (!raxEOF(&ri)) { + /* Get a reference to the tail node listpack. */ + lp = ri.data; + lp_bytes = lpBytes(lp); + } + raxStop(&ri); + + /* We have to add the key into the radix tree in lexicographic order, + * to do so we consider the ID as a single 128 bit number written in + * big endian, so that the most significant bytes are the first ones. */ + uint64_t rax_key[2]; /* Key in the radix tree containing the listpack.*/ + streamID master_id; /* ID of the master entry in the listpack. */ + + /* Create a new listpack and radix tree node if needed. Note that when + * a new listpack is created, we populate it with a "master entry". This + * is just a set of fields that is taken as references in order to compress + * the stream entries that we'll add inside the listpack. + * + * Note that while we use the first added entry fields to create + * the master entry, the first added entry is NOT represented in the master + * entry, which is a stand alone object. But of course, the first entry + * will compress well because it's used as reference. + * + * The master entry is composed like in the following example: + * + * +-------+---------+------------+---------+--/--+---------+---------+-+ + * | count | deleted | num-fields | field_1 | field_2 | ... | field_N |0| + * +-------+---------+------------+---------+--/--+---------+---------+-+ + * + * count and deleted just represent respectively the total number of + * entries inside the listpack that are valid, and marked as deleted + * (deleted flag in the entry flags set). So the total number of items + * actually inside the listpack (both deleted and not) is count+deleted. + * + * The real entries will be encoded with an ID that is just the + * millisecond and sequence difference compared to the key stored at + * the radix tree node containing the listpack (delta encoding), and + * if the fields of the entry are the same as the master entry fields, the + * entry flags will specify this fact and the entry fields and number + * of fields will be omitted (see later in the code of this function). + * + * The "0" entry at the end is the same as the 'lp-count' entry in the + * regular stream entries (see below), and marks the fact that there are + * no more entries, when we scan the stream from right to left. */ + + /* First of all, check if we can append to the current macro node or + * if we need to switch to the next one. 'lp' will be set to NULL if + * the current node is full. */ + if (lp != NULL) { + int new_node = 0; + size_t node_max_bytes = server.stream_node_max_bytes; + if (node_max_bytes == 0 || node_max_bytes > STREAM_LISTPACK_MAX_SIZE) + node_max_bytes = STREAM_LISTPACK_MAX_SIZE; + if (lp_bytes + totelelen >= node_max_bytes) { + new_node = 1; + } else if (server.stream_node_max_entries) { + unsigned char *lp_ele = lpFirst(lp); + /* Count both live entries and deleted ones. */ + int64_t count = lpGetInteger(lp_ele) + lpGetInteger(lpNext(lp,lp_ele)); + if (count >= server.stream_node_max_entries) new_node = 1; + } + + if (new_node) { + /* Shrink extra pre-allocated memory */ + lp = lpShrinkToFit(lp); + if (ri.data != lp) + raxInsert(s->rax,ri.key,ri.key_len,lp,NULL); + lp = NULL; + } + } + + int flags = STREAM_ITEM_FLAG_NONE; + if (lp == NULL) { + master_id = id; + streamEncodeID(rax_key,&id); + /* Create the listpack having the master entry ID and fields. + * Pre-allocate some bytes when creating listpack to avoid realloc on + * every XADD. Since listpack.c uses malloc_size, it'll grow in steps, + * and won't realloc on every XADD. + * When listpack reaches max number of entries, we'll shrink the + * allocation to fit the data. */ + size_t prealloc = STREAM_LISTPACK_MAX_PRE_ALLOCATE; + if (server.stream_node_max_bytes > 0 && server.stream_node_max_bytes < prealloc) { + prealloc = server.stream_node_max_bytes; + } + lp = lpNew(prealloc); + lp = lpAppendInteger(lp,1); /* One item, the one we are adding. */ + lp = lpAppendInteger(lp,0); /* Zero deleted so far. */ + lp = lpAppendInteger(lp,numfields); + for (int64_t i = 0; i < numfields; i++) { + sds field = argv[i*2]->ptr; + lp = lpAppend(lp,(unsigned char*)field,sdslen(field)); + } + lp = lpAppendInteger(lp,0); /* Master entry zero terminator. */ + raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL); + /* The first entry we insert, has obviously the same fields of the + * master entry. */ + flags |= STREAM_ITEM_FLAG_SAMEFIELDS; + } else { + serverAssert(ri.key_len == sizeof(rax_key)); + memcpy(rax_key,ri.key,sizeof(rax_key)); + + /* Read the master ID from the radix tree key. */ + streamDecodeID(rax_key,&master_id); + unsigned char *lp_ele = lpFirst(lp); + + /* Update count and skip the deleted fields. */ + int64_t count = lpGetInteger(lp_ele); + lp = lpReplaceInteger(lp,&lp_ele,count+1); + lp_ele = lpNext(lp,lp_ele); /* seek deleted. */ + lp_ele = lpNext(lp,lp_ele); /* seek master entry num fields. */ + + /* Check if the entry we are adding, have the same fields + * as the master entry. */ + int64_t master_fields_count = lpGetInteger(lp_ele); + lp_ele = lpNext(lp,lp_ele); + if (numfields == master_fields_count) { + int64_t i; + for (i = 0; i < master_fields_count; i++) { + sds field = argv[i*2]->ptr; + int64_t e_len; + unsigned char buf[LP_INTBUF_SIZE]; + unsigned char *e = lpGet(lp_ele,&e_len,buf); + /* Stop if there is a mismatch. */ + if (sdslen(field) != (size_t)e_len || + memcmp(e,field,e_len) != 0) break; + lp_ele = lpNext(lp,lp_ele); + } + /* All fields are the same! We can compress the field names + * setting a single bit in the flags. */ + if (i == master_fields_count) flags |= STREAM_ITEM_FLAG_SAMEFIELDS; + } + } + + /* Populate the listpack with the new entry. We use the following + * encoding: + * + * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+ + * |flags|entry-id|num-fields|field-1|value-1|...|field-N|value-N|lp-count| + * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+ + * + * However if the SAMEFIELD flag is set, we have just to populate + * the entry with the values, so it becomes: + * + * +-----+--------+-------+-/-+-------+--------+ + * |flags|entry-id|value-1|...|value-N|lp-count| + * +-----+--------+-------+-/-+-------+--------+ + * + * The entry-id field is actually two separated fields: the ms + * and seq difference compared to the master entry. + * + * The lp-count field is a number that states the number of listpack pieces + * that compose the entry, so that it's possible to travel the entry + * in reverse order: we can just start from the end of the listpack, read + * the entry, and jump back N times to seek the "flags" field to read + * the stream full entry. */ + lp = lpAppendInteger(lp,flags); + lp = lpAppendInteger(lp,id.ms - master_id.ms); + lp = lpAppendInteger(lp,id.seq - master_id.seq); + if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) + lp = lpAppendInteger(lp,numfields); + for (int64_t i = 0; i < numfields; i++) { + sds field = argv[i*2]->ptr, value = argv[i*2+1]->ptr; + if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) + lp = lpAppend(lp,(unsigned char*)field,sdslen(field)); + lp = lpAppend(lp,(unsigned char*)value,sdslen(value)); + } + /* Compute and store the lp-count field. */ + int64_t lp_count = numfields; + lp_count += 3; /* Add the 3 fixed fields flags + ms-diff + seq-diff. */ + if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) { + /* If the item is not compressed, it also has the fields other than + * the values, and an additional num-fields field. */ + lp_count += numfields+1; + } + lp = lpAppendInteger(lp,lp_count); + + /* Insert back into the tree in order to update the listpack pointer. */ + if (ri.data != lp) + raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL); + s->length++; + s->entries_added++; + s->last_id = id; + if (s->length == 1) s->first_id = id; + if (added_id) *added_id = id; + return C_OK; +} + +typedef struct { + /* XADD options */ + streamID id; /* User-provided ID, for XADD only. */ + int id_given; /* Was an ID different than "*" specified? for XADD only. */ + int seq_given; /* Was an ID different than "ms-*" specified? for XADD only. */ + int no_mkstream; /* if set to 1 do not create new stream */ + + /* XADD + XTRIM common options */ + int trim_strategy; /* TRIM_STRATEGY_* */ + int trim_strategy_arg_idx; /* Index of the count in MAXLEN/MINID, for rewriting. */ + int approx_trim; /* If 1 only delete whole radix tree nodes, so + * the trim argument is not applied verbatim. */ + long long limit; /* Maximum amount of entries to trim. If 0, no limitation + * on the amount of trimming work is enforced. */ + /* TRIM_STRATEGY_MAXLEN options */ + long long maxlen; /* After trimming, leave stream at this length . */ + /* TRIM_STRATEGY_MINID options */ + streamID minid; /* Trim by ID (No stream entries with ID < 'minid' will remain) */ +} streamAddTrimArgs; + +#define TRIM_STRATEGY_NONE 0 +#define TRIM_STRATEGY_MAXLEN 1 +#define TRIM_STRATEGY_MINID 2 + +/* Trim the stream 's' according to args->trim_strategy, and return the + * number of elements removed from the stream. The 'approx' option, if non-zero, + * specifies that the trimming must be performed in a approximated way in + * order to maximize performances. This means that the stream may contain + * entries with IDs < 'id' in case of MINID (or more elements than 'maxlen' + * in case of MAXLEN), and elements are only removed if we can remove + * a *whole* node of the radix tree. The elements are removed from the head + * of the stream (older elements). + * + * The function may return zero if: + * + * 1) The minimal entry ID of the stream is already < 'id' (MINID); or + * 2) The stream is already shorter or equal to the specified max length (MAXLEN); or + * 3) The 'approx' option is true and the head node did not have enough elements + * to be deleted. + * + * args->limit is the maximum number of entries to delete. The purpose is to + * prevent this function from taking to long. + * If 'limit' is 0 then we do not limit the number of deleted entries. + * Much like the 'approx', if 'limit' is smaller than the number of entries + * that should be trimmed, there is a chance we will still have entries with + * IDs < 'id' (or number of elements >= maxlen in case of MAXLEN). + */ +int64_t streamTrim(stream *s, streamAddTrimArgs *args) { + size_t maxlen = args->maxlen; + streamID *id = &args->minid; + int approx = args->approx_trim; + int64_t limit = args->limit; + int trim_strategy = args->trim_strategy; + + if (trim_strategy == TRIM_STRATEGY_NONE) + return 0; + + raxIterator ri; + raxStart(&ri,s->rax); + raxSeek(&ri,"^",NULL,0); + + int64_t deleted = 0; + while (raxNext(&ri)) { + if (trim_strategy == TRIM_STRATEGY_MAXLEN && s->length <= maxlen) + break; + + unsigned char *lp = ri.data, *p = lpFirst(lp); + int64_t entries = lpGetInteger(p); + + /* Check if we exceeded the amount of work we could do */ + if (limit && (deleted + entries) > limit) + break; + + /* Check if we can remove the whole node. */ + int remove_node; + streamID master_id = {0}; /* For MINID */ + if (trim_strategy == TRIM_STRATEGY_MAXLEN) { + remove_node = s->length - entries >= maxlen; + } else { + /* Read the master ID from the radix tree key. */ + streamDecodeID(ri.key, &master_id); + + /* Read last ID. */ + streamID last_id = {0,0}; + lpGetEdgeStreamID(lp, 0, &master_id, &last_id); + + /* We can remove the entire node id its last ID < 'id' */ + remove_node = streamCompareID(&last_id, id) < 0; + } + + if (remove_node) { + lpFree(lp); + raxRemove(s->rax,ri.key,ri.key_len,NULL); + raxSeek(&ri,">=",ri.key,ri.key_len); + s->length -= entries; + deleted += entries; + continue; + } + + /* If we cannot remove a whole element, and approx is true, + * stop here. */ + if (approx) break; + + /* Now we have to trim entries from within 'lp' */ + int64_t deleted_from_lp = 0; + + p = lpNext(lp, p); /* Skip deleted field. */ + p = lpNext(lp, p); /* Skip num-of-fields in the master entry. */ + + /* Skip all the master fields. */ + int64_t master_fields_count = lpGetInteger(p); + p = lpNext(lp,p); /* Skip the first field. */ + for (int64_t j = 0; j < master_fields_count; j++) + p = lpNext(lp,p); /* Skip all master fields. */ + p = lpNext(lp,p); /* Skip the zero master entry terminator. */ + + /* 'p' is now pointing to the first entry inside the listpack. + * We have to run entry after entry, marking entries as deleted + * if they are already not deleted. */ + while (p) { + /* We keep a copy of p (which point to flags part) in order to + * update it after (and if) we actually remove the entry */ + unsigned char *pcopy = p; + + int64_t flags = lpGetInteger(p); + p = lpNext(lp, p); /* Skip flags. */ + int64_t to_skip; + + int64_t ms_delta = lpGetInteger(p); + p = lpNext(lp, p); /* Skip ID ms delta */ + int64_t seq_delta = lpGetInteger(p); + p = lpNext(lp, p); /* Skip ID seq delta */ + + streamID currid = {0}; /* For MINID */ + if (trim_strategy == TRIM_STRATEGY_MINID) { + currid.ms = master_id.ms + ms_delta; + currid.seq = master_id.seq + seq_delta; + } + + int stop; + if (trim_strategy == TRIM_STRATEGY_MAXLEN) { + stop = s->length <= maxlen; + } else { + /* Following IDs will definitely be greater because the rax + * tree is sorted, no point of continuing. */ + stop = streamCompareID(&currid, id) >= 0; + } + if (stop) + break; + + if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) { + to_skip = master_fields_count; + } else { + to_skip = lpGetInteger(p); /* Get num-fields. */ + p = lpNext(lp,p); /* Skip num-fields. */ + to_skip *= 2; /* Fields and values. */ + } + + while(to_skip--) p = lpNext(lp,p); /* Skip the whole entry. */ + p = lpNext(lp,p); /* Skip the final lp-count field. */ + + /* Mark the entry as deleted. */ + if (!(flags & STREAM_ITEM_FLAG_DELETED)) { + intptr_t delta = p - lp; + flags |= STREAM_ITEM_FLAG_DELETED; + lp = lpReplaceInteger(lp, &pcopy, flags); + deleted_from_lp++; + s->length--; + p = lp + delta; + } + } + deleted += deleted_from_lp; + + /* Now we update the entries/deleted counters. */ + p = lpFirst(lp); + lp = lpReplaceInteger(lp,&p,entries-deleted_from_lp); + p = lpNext(lp,p); /* Skip deleted field. */ + int64_t marked_deleted = lpGetInteger(p); + lp = lpReplaceInteger(lp,&p,marked_deleted+deleted_from_lp); + p = lpNext(lp,p); /* Skip num-of-fields in the master entry. */ + + /* Here we should perform garbage collection in case at this point + * there are too many entries deleted inside the listpack. */ + entries -= deleted_from_lp; + marked_deleted += deleted_from_lp; + if (entries + marked_deleted > 10 && marked_deleted > entries/2) { + /* TODO: perform a garbage collection. */ + } + + /* Update the listpack with the new pointer. */ + raxInsert(s->rax,ri.key,ri.key_len,lp,NULL); + + break; /* If we are here, there was enough to delete in the current + node, so no need to go to the next node. */ + } + raxStop(&ri); + + /* Update the stream's first ID after the trimming. */ + if (s->length == 0) { + s->first_id.ms = 0; + s->first_id.seq = 0; + } else if (deleted) { + streamGetEdgeID(s,1,1,&s->first_id); + } + + return deleted; +} + +/* Trims a stream by length. Returns the number of deleted items. */ +int64_t streamTrimByLength(stream *s, long long maxlen, int approx) { + streamAddTrimArgs args = { + .trim_strategy = TRIM_STRATEGY_MAXLEN, + .approx_trim = approx, + .limit = approx ? 100 * server.stream_node_max_entries : 0, + .maxlen = maxlen + }; + return streamTrim(s, &args); +} + +/* Trims a stream by minimum ID. Returns the number of deleted items. */ +int64_t streamTrimByID(stream *s, streamID minid, int approx) { + streamAddTrimArgs args = { + .trim_strategy = TRIM_STRATEGY_MINID, + .approx_trim = approx, + .limit = approx ? 100 * server.stream_node_max_entries : 0, + .minid = minid + }; + return streamTrim(s, &args); +} + +/* Parse the arguments of XADD/XTRIM. + * + * See streamAddTrimArgs for more details about the arguments handled. + * + * This function returns the position of the ID argument (relevant only to XADD). + * On error -1 is returned and a reply is sent. */ +static int streamParseAddOrTrimArgsOrReply(client *c, streamAddTrimArgs *args, int xadd) { + /* Initialize arguments to defaults */ + memset(args, 0, sizeof(*args)); + + /* Parse options. */ + int i = 2; /* This is the first argument position where we could + find an option, or the ID. */ + int limit_given = 0; + for (; i < c->argc; i++) { + int moreargs = (c->argc-1) - i; /* Number of additional arguments. */ + char *opt = c->argv[i]->ptr; + if (xadd && opt[0] == '*' && opt[1] == '\0') { + /* This is just a fast path for the common case of auto-ID + * creation. */ + break; + } else if (!strcasecmp(opt,"maxlen") && moreargs) { + if (args->trim_strategy != TRIM_STRATEGY_NONE) { + addReplyError(c,"syntax error, MAXLEN and MINID options at the same time are not compatible"); + return -1; + } + args->approx_trim = 0; + char *next = c->argv[i+1]->ptr; + /* Check for the form MAXLEN ~ <count>. */ + if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') { + args->approx_trim = 1; + i++; + } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') { + i++; + } + if (getLongLongFromObjectOrReply(c,c->argv[i+1],&args->maxlen,NULL) + != C_OK) return -1; + + if (args->maxlen < 0) { + addReplyError(c,"The MAXLEN argument must be >= 0."); + return -1; + } + i++; + args->trim_strategy = TRIM_STRATEGY_MAXLEN; + args->trim_strategy_arg_idx = i; + } else if (!strcasecmp(opt,"minid") && moreargs) { + if (args->trim_strategy != TRIM_STRATEGY_NONE) { + addReplyError(c,"syntax error, MAXLEN and MINID options at the same time are not compatible"); + return -1; + } + args->approx_trim = 0; + char *next = c->argv[i+1]->ptr; + /* Check for the form MINID ~ <id> */ + if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') { + args->approx_trim = 1; + i++; + } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') { + i++; + } + + if (streamParseStrictIDOrReply(c,c->argv[i+1],&args->minid,0,NULL) != C_OK) + return -1; + + i++; + args->trim_strategy = TRIM_STRATEGY_MINID; + args->trim_strategy_arg_idx = i; + } else if (!strcasecmp(opt,"limit") && moreargs) { + /* Note about LIMIT: If it was not provided by the caller we set + * it to 100*server.stream_node_max_entries, and that's to prevent the + * trimming from taking too long, on the expense of not deleting entries + * that should be trimmed. + * If user wanted exact trimming (i.e. no '~') we never limit the number + * of trimmed entries */ + if (getLongLongFromObjectOrReply(c,c->argv[i+1],&args->limit,NULL) != C_OK) + return -1; + + if (args->limit < 0) { + addReplyError(c,"The LIMIT argument must be >= 0."); + return -1; + } + limit_given = 1; + i++; + } else if (xadd && !strcasecmp(opt,"nomkstream")) { + args->no_mkstream = 1; + } else if (xadd) { + /* If we are here is a syntax error or a valid ID. */ + if (streamParseStrictIDOrReply(c,c->argv[i],&args->id,0,&args->seq_given) != C_OK) + return -1; + args->id_given = 1; + break; + } else { + addReplyErrorObject(c,shared.syntaxerr); + return -1; + } + } + + if (args->limit && args->trim_strategy == TRIM_STRATEGY_NONE) { + addReplyError(c,"syntax error, LIMIT cannot be used without specifying a trimming strategy"); + return -1; + } + + if (!xadd && args->trim_strategy == TRIM_STRATEGY_NONE) { + addReplyError(c,"syntax error, XTRIM must be called with a trimming strategy"); + return -1; + } + + if (mustObeyClient(c)) { + /* If command came from master or from AOF we must not enforce maxnodes + * (The maxlen/minid argument was re-written to make sure there's no + * inconsistency). */ + args->limit = 0; + } else { + /* We need to set the limit (only if we got '~') */ + if (limit_given) { + if (!args->approx_trim) { + /* LIMIT was provided without ~ */ + addReplyError(c,"syntax error, LIMIT cannot be used without the special ~ option"); + return -1; + } + } else { + /* User didn't provide LIMIT, we must set it. */ + if (args->approx_trim) { + /* In order to prevent from trimming to do too much work and + * cause latency spikes we limit the amount of work it can do. + * We have to cap args->limit from both sides in case + * stream_node_max_entries is 0 or too big (could cause overflow) + */ + args->limit = 100 * server.stream_node_max_entries; /* Maximum 100 rax nodes. */ + if (args->limit <= 0) args->limit = 10000; + if (args->limit > 1000000) args->limit = 1000000; + } else { + /* No LIMIT for exact trimming */ + args->limit = 0; + } + } + } + + return i; +} + +/* Initialize the stream iterator, so that we can call iterating functions + * to get the next items. This requires a corresponding streamIteratorStop() + * at the end. The 'rev' parameter controls the direction. If it's zero the + * iteration is from the start to the end element (inclusive), otherwise + * if rev is non-zero, the iteration is reversed. + * + * Once the iterator is initialized, we iterate like this: + * + * streamIterator myiterator; + * streamIteratorStart(&myiterator,...); + * int64_t numfields; + * while(streamIteratorGetID(&myiterator,&ID,&numfields)) { + * while(numfields--) { + * unsigned char *key, *value; + * size_t key_len, value_len; + * streamIteratorGetField(&myiterator,&key,&value,&key_len,&value_len); + * + * ... do what you want with key and value ... + * } + * } + * streamIteratorStop(&myiterator); */ +void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev) { + /* Initialize the iterator and translates the iteration start/stop + * elements into a 128 big big-endian number. */ + if (start) { + streamEncodeID(si->start_key,start); + } else { + si->start_key[0] = 0; + si->start_key[1] = 0; + } + + if (end) { + streamEncodeID(si->end_key,end); + } else { + si->end_key[0] = UINT64_MAX; + si->end_key[1] = UINT64_MAX; + } + + /* Seek the correct node in the radix tree. */ + raxStart(&si->ri,s->rax); + if (!rev) { + if (start && (start->ms || start->seq)) { + raxSeek(&si->ri,"<=",(unsigned char*)si->start_key, + sizeof(si->start_key)); + if (raxEOF(&si->ri)) raxSeek(&si->ri,"^",NULL,0); + } else { + raxSeek(&si->ri,"^",NULL,0); + } + } else { + if (end && (end->ms || end->seq)) { + raxSeek(&si->ri,"<=",(unsigned char*)si->end_key, + sizeof(si->end_key)); + if (raxEOF(&si->ri)) raxSeek(&si->ri,"$",NULL,0); + } else { + raxSeek(&si->ri,"$",NULL,0); + } + } + si->stream = s; + si->lp = NULL; /* There is no current listpack right now. */ + si->lp_ele = NULL; /* Current listpack cursor. */ + si->rev = rev; /* Direction, if non-zero reversed, from end to start. */ + si->skip_tombstones = 1; /* By default tombstones aren't emitted. */ +} + +/* Return 1 and store the current item ID at 'id' if there are still + * elements within the iteration range, otherwise return 0 in order to + * signal the iteration terminated. */ +int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) { + while(1) { /* Will stop when element > stop_key or end of radix tree. */ + /* If the current listpack is set to NULL, this is the start of the + * iteration or the previous listpack was completely iterated. + * Go to the next node. */ + if (si->lp == NULL || si->lp_ele == NULL) { + if (!si->rev && !raxNext(&si->ri)) return 0; + else if (si->rev && !raxPrev(&si->ri)) return 0; + serverAssert(si->ri.key_len == sizeof(streamID)); + /* Get the master ID. */ + streamDecodeID(si->ri.key,&si->master_id); + /* Get the master fields count. */ + si->lp = si->ri.data; + si->lp_ele = lpFirst(si->lp); /* Seek items count */ + si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek deleted count. */ + si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek num fields. */ + si->master_fields_count = lpGetInteger(si->lp_ele); + si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek first field. */ + si->master_fields_start = si->lp_ele; + /* We are now pointing to the first field of the master entry. + * We need to seek either the first or the last entry depending + * on the direction of the iteration. */ + if (!si->rev) { + /* If we are iterating in normal order, skip the master fields + * to seek the first actual entry. */ + for (uint64_t i = 0; i < si->master_fields_count; i++) + si->lp_ele = lpNext(si->lp,si->lp_ele); + } else { + /* If we are iterating in reverse direction, just seek the + * last part of the last entry in the listpack (that is, the + * fields count). */ + si->lp_ele = lpLast(si->lp); + } + } else if (si->rev) { + /* If we are iterating in the reverse order, and this is not + * the first entry emitted for this listpack, then we already + * emitted the current entry, and have to go back to the previous + * one. */ + int64_t lp_count = lpGetInteger(si->lp_ele); + while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele); + /* Seek lp-count of prev entry. */ + si->lp_ele = lpPrev(si->lp,si->lp_ele); + } + + /* For every radix tree node, iterate the corresponding listpack, + * returning elements when they are within range. */ + while(1) { + if (!si->rev) { + /* If we are going forward, skip the previous entry + * lp-count field (or in case of the master entry, the zero + * term field) */ + si->lp_ele = lpNext(si->lp,si->lp_ele); + if (si->lp_ele == NULL) break; + } else { + /* If we are going backward, read the number of elements this + * entry is composed of, and jump backward N times to seek + * its start. */ + int64_t lp_count = lpGetInteger(si->lp_ele); + if (lp_count == 0) { /* We reached the master entry. */ + si->lp = NULL; + si->lp_ele = NULL; + break; + } + while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele); + } + + /* Get the flags entry. */ + si->lp_flags = si->lp_ele; + int64_t flags = lpGetInteger(si->lp_ele); + si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek ID. */ + + /* Get the ID: it is encoded as difference between the master + * ID and this entry ID. */ + *id = si->master_id; + id->ms += lpGetInteger(si->lp_ele); + si->lp_ele = lpNext(si->lp,si->lp_ele); + id->seq += lpGetInteger(si->lp_ele); + si->lp_ele = lpNext(si->lp,si->lp_ele); + unsigned char buf[sizeof(streamID)]; + streamEncodeID(buf,id); + + /* The number of entries is here or not depending on the + * flags. */ + if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) { + *numfields = si->master_fields_count; + } else { + *numfields = lpGetInteger(si->lp_ele); + si->lp_ele = lpNext(si->lp,si->lp_ele); + } + serverAssert(*numfields>=0); + + /* If current >= start, and the entry is not marked as + * deleted or tombstones are included, emit it. */ + if (!si->rev) { + if (memcmp(buf,si->start_key,sizeof(streamID)) >= 0 && + (!si->skip_tombstones || !(flags & STREAM_ITEM_FLAG_DELETED))) + { + if (memcmp(buf,si->end_key,sizeof(streamID)) > 0) + return 0; /* We are already out of range. */ + si->entry_flags = flags; + if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) + si->master_fields_ptr = si->master_fields_start; + return 1; /* Valid item returned. */ + } + } else { + if (memcmp(buf,si->end_key,sizeof(streamID)) <= 0 && + (!si->skip_tombstones || !(flags & STREAM_ITEM_FLAG_DELETED))) + { + if (memcmp(buf,si->start_key,sizeof(streamID)) < 0) + return 0; /* We are already out of range. */ + si->entry_flags = flags; + if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) + si->master_fields_ptr = si->master_fields_start; + return 1; /* Valid item returned. */ + } + } + + /* If we do not emit, we have to discard if we are going + * forward, or seek the previous entry if we are going + * backward. */ + if (!si->rev) { + int64_t to_discard = (flags & STREAM_ITEM_FLAG_SAMEFIELDS) ? + *numfields : *numfields*2; + for (int64_t i = 0; i < to_discard; i++) + si->lp_ele = lpNext(si->lp,si->lp_ele); + } else { + int64_t prev_times = 4; /* flag + id ms + id seq + one more to + go back to the previous entry "count" + field. */ + /* If the entry was not flagged SAMEFIELD we also read the + * number of fields, so go back one more. */ + if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) prev_times++; + while(prev_times--) si->lp_ele = lpPrev(si->lp,si->lp_ele); + } + } + + /* End of listpack reached. Try the next/prev radix tree node. */ + } +} + +/* Get the field and value of the current item we are iterating. This should + * be called immediately after streamIteratorGetID(), and for each field + * according to the number of fields returned by streamIteratorGetID(). + * The function populates the field and value pointers and the corresponding + * lengths by reference, that are valid until the next iterator call, assuming + * no one touches the stream meanwhile. */ +void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen) { + if (si->entry_flags & STREAM_ITEM_FLAG_SAMEFIELDS) { + *fieldptr = lpGet(si->master_fields_ptr,fieldlen,si->field_buf); + si->master_fields_ptr = lpNext(si->lp,si->master_fields_ptr); + } else { + *fieldptr = lpGet(si->lp_ele,fieldlen,si->field_buf); + si->lp_ele = lpNext(si->lp,si->lp_ele); + } + *valueptr = lpGet(si->lp_ele,valuelen,si->value_buf); + si->lp_ele = lpNext(si->lp,si->lp_ele); +} + +/* Remove the current entry from the stream: can be called after the + * GetID() API or after any GetField() call, however we need to iterate + * a valid entry while calling this function. Moreover the function + * requires the entry ID we are currently iterating, that was previously + * returned by GetID(). + * + * Note that after calling this function, next calls to GetField() can't + * be performed: the entry is now deleted. Instead the iterator will + * automatically re-seek to the next entry, so the caller should continue + * with GetID(). */ +void streamIteratorRemoveEntry(streamIterator *si, streamID *current) { + unsigned char *lp = si->lp; + int64_t aux; + + /* We do not really delete the entry here. Instead we mark it as + * deleted by flagging it, and also incrementing the count of the + * deleted entries in the listpack header. + * + * We start flagging: */ + int64_t flags = lpGetInteger(si->lp_flags); + flags |= STREAM_ITEM_FLAG_DELETED; + lp = lpReplaceInteger(lp,&si->lp_flags,flags); + + /* Change the valid/deleted entries count in the master entry. */ + unsigned char *p = lpFirst(lp); + aux = lpGetInteger(p); + + if (aux == 1) { + /* If this is the last element in the listpack, we can remove the whole + * node. */ + lpFree(lp); + raxRemove(si->stream->rax,si->ri.key,si->ri.key_len,NULL); + } else { + /* In the base case we alter the counters of valid/deleted entries. */ + lp = lpReplaceInteger(lp,&p,aux-1); + p = lpNext(lp,p); /* Seek deleted field. */ + aux = lpGetInteger(p); + lp = lpReplaceInteger(lp,&p,aux+1); + + /* Update the listpack with the new pointer. */ + if (si->lp != lp) + raxInsert(si->stream->rax,si->ri.key,si->ri.key_len,lp,NULL); + } + + /* Update the number of entries counter. */ + si->stream->length--; + + /* Re-seek the iterator to fix the now messed up state. */ + streamID start, end; + if (si->rev) { + streamDecodeID(si->start_key,&start); + end = *current; + } else { + start = *current; + streamDecodeID(si->end_key,&end); + } + streamIteratorStop(si); + streamIteratorStart(si,si->stream,&start,&end,si->rev); + + /* TODO: perform a garbage collection here if the ratio between + * deleted and valid goes over a certain limit. */ +} + +/* Stop the stream iterator. The only cleanup we need is to free the rax + * iterator, since the stream iterator itself is supposed to be stack + * allocated. */ +void streamIteratorStop(streamIterator *si) { + raxStop(&si->ri); +} + +/* Return 1 if `id` exists in `s` (and not marked as deleted) */ +int streamEntryExists(stream *s, streamID *id) { + streamIterator si; + streamIteratorStart(&si,s,id,id,0); + streamID myid; + int64_t numfields; + int found = streamIteratorGetID(&si,&myid,&numfields); + streamIteratorStop(&si); + if (!found) + return 0; + serverAssert(streamCompareID(id,&myid) == 0); + return 1; +} + +/* Delete the specified item ID from the stream, returning 1 if the item + * was deleted 0 otherwise (if it does not exist). */ +int streamDeleteItem(stream *s, streamID *id) { + int deleted = 0; + streamIterator si; + streamIteratorStart(&si,s,id,id,0); + streamID myid; + int64_t numfields; + if (streamIteratorGetID(&si,&myid,&numfields)) { + streamIteratorRemoveEntry(&si,&myid); + deleted = 1; + } + streamIteratorStop(&si); + return deleted; +} + +/* Get the last valid (non-tombstone) streamID of 's'. */ +void streamLastValidID(stream *s, streamID *maxid) +{ + streamIterator si; + streamIteratorStart(&si,s,NULL,NULL,1); + int64_t numfields; + if (!streamIteratorGetID(&si,maxid,&numfields) && s->length) + serverPanic("Corrupt stream, length is %llu, but no max id", (unsigned long long)s->length); + streamIteratorStop(&si); +} + +/* Maximum size for a stream ID string. In theory 20*2+1 should be enough, + * But to avoid chance for off by one issues and null-term, in case this will + * be used as parsing buffer, we use a slightly larger buffer. On the other + * hand considering sds header is gonna add 4 bytes, we wanna keep below the + * allocator's 48 bytes bin. */ +#define STREAM_ID_STR_LEN 44 + +sds createStreamIDString(streamID *id) { + /* Optimization: pre-allocate a big enough buffer to avoid reallocs. */ + sds str = sdsnewlen(SDS_NOINIT, STREAM_ID_STR_LEN); + sdssetlen(str, 0); + return sdscatfmt(str,"%U-%U", id->ms,id->seq); +} + +/* Emit a reply in the client output buffer by formatting a Stream ID + * in the standard <ms>-<seq> format, using the simple string protocol + * of REPL. */ +void addReplyStreamID(client *c, streamID *id) { + addReplyBulkSds(c,createStreamIDString(id)); +} + +void setDeferredReplyStreamID(client *c, void *dr, streamID *id) { + setDeferredReplyBulkSds(c, dr, createStreamIDString(id)); +} + +/* Similar to the above function, but just creates an object, usually useful + * for replication purposes to create arguments. */ +robj *createObjectFromStreamID(streamID *id) { + return createObject(OBJ_STRING, createStreamIDString(id)); +} + +/* Returns non-zero if the ID is 0-0. */ +int streamIDEqZero(streamID *id) { + return !(id->ms || id->seq); +} + +/* A helper that returns non-zero if the range from 'start' to `end` + * contains a tombstone. + * + * NOTE: this assumes that the caller had verified that 'start' is less than + * 's->last_id'. */ +int streamRangeHasTombstones(stream *s, streamID *start, streamID *end) { + streamID start_id, end_id; + + if (!s->length || streamIDEqZero(&s->max_deleted_entry_id)) { + /* The stream is empty or has no tombstones. */ + return 0; + } + + if (streamCompareID(&s->first_id,&s->max_deleted_entry_id) > 0) { + /* The latest tombstone is before the first entry. */ + return 0; + } + + if (start) { + start_id = *start; + } else { + start_id.ms = 0; + start_id.seq = 0; + } + + if (end) { + end_id = *end; + } else { + end_id.ms = UINT64_MAX; + end_id.seq = UINT64_MAX; + } + + if (streamCompareID(&start_id,&s->max_deleted_entry_id) <= 0 && + streamCompareID(&s->max_deleted_entry_id,&end_id) <= 0) + { + /* start_id <= max_deleted_entry_id <= end_id: The range does include a tombstone. */ + return 1; + } + + /* The range doesn't includes a tombstone. */ + return 0; +} + +/* Replies with a consumer group's current lag, that is the number of messages + * in the stream that are yet to be delivered. In case that the lag isn't + * available due to fragmentation, the reply to the client is a null. */ +void streamReplyWithCGLag(client *c, stream *s, streamCG *cg) { + int valid = 0; + long long lag = 0; + + if (!s->entries_added) { + /* The lag of a newly-initialized stream is 0. */ + lag = 0; + valid = 1; + } else if (cg->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s,&cg->last_id,NULL)) { + /* No fragmentation ahead means that the group's logical reads counter + * is valid for performing the lag calculation. */ + lag = (long long)s->entries_added - cg->entries_read; + valid = 1; + } else { + /* Attempt to retrieve the group's last ID logical read counter. */ + long long entries_read = streamEstimateDistanceFromFirstEverEntry(s,&cg->last_id); + if (entries_read != SCG_INVALID_ENTRIES_READ) { + /* A valid counter was obtained. */ + lag = (long long)s->entries_added - entries_read; + valid = 1; + } + } + + if (valid) { + addReplyLongLong(c,lag); + } else { + addReplyNull(c); + } +} + +/* This function returns a value that is the ID's logical read counter, or its + * distance (the number of entries) from the first entry ever to have been added + * to the stream. + * + * A counter is returned only in one of the following cases: + * 1. The ID is the same as the stream's last ID. In this case, the returned + * is the same as the stream's entries_added counter. + * 2. The ID equals that of the currently first entry in the stream, and the + * stream has no tombstones. The returned value, in this case, is the result + * of subtracting the stream's length from its added_entries, incremented by + * one. + * 3. The ID less than the stream's first current entry's ID, and there are no + * tombstones. Here the estimated counter is the result of subtracting the + * stream's length from its added_entries. + * 4. The stream's added_entries is zero, meaning that no entries were ever + * added. + * + * The special return value of ULLONG_MAX signals that the counter's value isn't + * obtainable. It is returned in these cases: + * 1. The provided ID, if it even exists, is somewhere between the stream's + * current first and last entries' IDs, or in the future. + * 2. The stream contains one or more tombstones. */ +long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id) { + /* The counter of any ID in an empty, never-before-used stream is 0. */ + if (!s->entries_added) { + return 0; + } + + /* In the empty stream, if the ID is smaller or equal to the last ID, + * it can set to the current added_entries value. */ + if (!s->length && streamCompareID(id,&s->last_id) < 1) { + return s->entries_added; + } + + int cmp_last = streamCompareID(id,&s->last_id); + if (cmp_last == 0) { + /* Return the exact counter of the last entry in the stream. */ + return s->entries_added; + } else if (cmp_last > 0) { + /* The counter of a future ID is unknown. */ + return SCG_INVALID_ENTRIES_READ; + } + + int cmp_id_first = streamCompareID(id,&s->first_id); + int cmp_xdel_first = streamCompareID(&s->max_deleted_entry_id,&s->first_id); + if (streamIDEqZero(&s->max_deleted_entry_id) || cmp_xdel_first < 0) { + /* There's definitely no fragmentation ahead. */ + if (cmp_id_first < 0) { + /* Return the estimated counter. */ + return s->entries_added - s->length; + } else if (cmp_id_first == 0) { + /* Return the exact counter of the first entry in the stream. */ + return s->entries_added - s->length + 1; + } + } + + /* The ID is either before an XDEL that fragments the stream or an arbitrary + * ID. Either case, so we can't make a prediction. */ + return SCG_INVALID_ENTRIES_READ; +} + +/* As a result of an explicit XCLAIM or XREADGROUP command, new entries + * are created in the pending list of the stream and consumers. We need + * to propagate this changes in the form of XCLAIM commands. */ +void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupname, robj *id, streamNACK *nack) { + /* We need to generate an XCLAIM that will work in a idempotent fashion: + * + * XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time> + * RETRYCOUNT <count> FORCE JUSTID LASTID <id>. + * + * Note that JUSTID is useful in order to avoid that XCLAIM will do + * useless work in the slave side, trying to fetch the stream item. */ + robj *argv[14]; + argv[0] = shared.xclaim; + argv[1] = key; + argv[2] = groupname; + argv[3] = createStringObject(nack->consumer->name,sdslen(nack->consumer->name)); + argv[4] = shared.integers[0]; + argv[5] = id; + argv[6] = shared.time; + argv[7] = createStringObjectFromLongLong(nack->delivery_time); + argv[8] = shared.retrycount; + argv[9] = createStringObjectFromLongLong(nack->delivery_count); + argv[10] = shared.force; + argv[11] = shared.justid; + argv[12] = shared.lastid; + argv[13] = createObjectFromStreamID(&group->last_id); + + alsoPropagate(c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL); + + decrRefCount(argv[3]); + decrRefCount(argv[7]); + decrRefCount(argv[9]); + decrRefCount(argv[13]); +} + +/* We need this when we want to propagate the new last-id of a consumer group + * that was consumed by XREADGROUP with the NOACK option: in that case we can't + * propagate the last ID just using the XCLAIM LASTID option, so we emit + * + * XGROUP SETID <key> <groupname> <id> ENTRIESREAD <entries_read> + */ +void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupname) { + robj *argv[7]; + argv[0] = shared.xgroup; + argv[1] = shared.setid; + argv[2] = key; + argv[3] = groupname; + argv[4] = createObjectFromStreamID(&group->last_id); + argv[5] = shared.entriesread; + argv[6] = createStringObjectFromLongLong(group->entries_read); + + alsoPropagate(c->db->id,argv,7,PROPAGATE_AOF|PROPAGATE_REPL); + + decrRefCount(argv[4]); + decrRefCount(argv[6]); +} + +/* We need this when we want to propagate creation of consumer that was created + * by XREADGROUP with the NOACK option. In that case, the only way to create + * the consumer at the replica is by using XGROUP CREATECONSUMER (see issue #7140) + * + * XGROUP CREATECONSUMER <key> <groupname> <consumername> + */ +void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername) { + robj *argv[5]; + argv[0] = shared.xgroup; + argv[1] = shared.createconsumer; + argv[2] = key; + argv[3] = groupname; + argv[4] = createObject(OBJ_STRING,sdsdup(consumername)); + + alsoPropagate(c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL); + + decrRefCount(argv[4]); +} + +/* Send the stream items in the specified range to the client 'c'. The range + * the client will receive is between start and end inclusive, if 'count' is + * non zero, no more than 'count' elements are sent. + * + * The 'end' pointer can be NULL to mean that we want all the elements from + * 'start' till the end of the stream. If 'rev' is non zero, elements are + * produced in reversed order from end to start. + * + * The function returns the number of entries emitted. + * + * If group and consumer are not NULL, the function performs additional work: + * 1. It updates the last delivered ID in the group in case we are + * sending IDs greater than the current last ID. + * 2. If the requested IDs are already assigned to some other consumer, the + * function will not return it to the client. + * 3. An entry in the pending list will be created for every entry delivered + * for the first time to this consumer. + * 4. The group's read counter is incremented if it is already valid and there + * are no future tombstones, or is invalidated (set to 0) otherwise. If the + * counter is invalid to begin with, we try to obtain it for the last + * delivered ID. + * + * The behavior may be modified passing non-zero flags: + * + * STREAM_RWR_NOACK: Do not create PEL entries, that is, the point "3" above + * is not performed. + * STREAM_RWR_RAWENTRIES: Do not emit array boundaries, but just the entries, + * and return the number of entries emitted as usually. + * This is used when the function is just used in order + * to emit data and there is some higher level logic. + * + * The final argument 'spi' (stream propagation info pointer) is a structure + * filled with information needed to propagate the command execution to AOF + * and slaves, in the case a consumer group was passed: we need to generate + * XCLAIM commands to create the pending list into AOF/slaves in that case. + * + * If 'spi' is set to NULL no propagation will happen even if the group was + * given, but currently such a feature is never used by the code base that + * will always pass 'spi' and propagate when a group is passed. + * + * Note that this function is recursive in certain cases. When it's called + * with a non NULL group and consumer argument, it may call + * streamReplyWithRangeFromConsumerPEL() in order to get entries from the + * consumer pending entries list. However such a function will then call + * streamReplyWithRange() in order to emit single entries (found in the + * PEL by ID) to the client. This is the use case for the STREAM_RWR_RAWENTRIES + * flag. + */ +#define STREAM_RWR_NOACK (1<<0) /* Do not create entries in the PEL. */ +#define STREAM_RWR_RAWENTRIES (1<<1) /* Do not emit protocol for array + boundaries, just the entries. */ +#define STREAM_RWR_HISTORY (1<<2) /* Only serve consumer local PEL. */ +size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi) { + void *arraylen_ptr = NULL; + size_t arraylen = 0; + streamIterator si; + int64_t numfields; + streamID id; + int propagate_last_id = 0; + int noack = flags & STREAM_RWR_NOACK; + + /* If the client is asking for some history, we serve it using a + * different function, so that we return entries *solely* from its + * own PEL. This ensures each consumer will always and only see + * the history of messages delivered to it and not yet confirmed + * as delivered. */ + if (group && (flags & STREAM_RWR_HISTORY)) { + return streamReplyWithRangeFromConsumerPEL(c,s,start,end,count, + consumer); + } + + if (!(flags & STREAM_RWR_RAWENTRIES)) + arraylen_ptr = addReplyDeferredLen(c); + streamIteratorStart(&si,s,start,end,rev); + while(streamIteratorGetID(&si,&id,&numfields)) { + /* Update the group last_id if needed. */ + if (group && streamCompareID(&id,&group->last_id) > 0) { + if (group->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s,&id,NULL)) { + /* A valid counter and no future tombstones mean we can + * increment the read counter to keep tracking the group's + * progress. */ + group->entries_read++; + } else if (s->entries_added) { + /* The group's counter may be invalid, so we try to obtain it. */ + group->entries_read = streamEstimateDistanceFromFirstEverEntry(s,&id); + } + group->last_id = id; + /* Group last ID should be propagated only if NOACK was + * specified, otherwise the last id will be included + * in the propagation of XCLAIM itself. */ + if (noack) propagate_last_id = 1; + } + + /* Emit a two elements array for each item. The first is + * the ID, the second is an array of field-value pairs. */ + addReplyArrayLen(c,2); + addReplyStreamID(c,&id); + + addReplyArrayLen(c,numfields*2); + + /* Emit the field-value pairs. */ + while(numfields--) { + unsigned char *key, *value; + int64_t key_len, value_len; + streamIteratorGetField(&si,&key,&value,&key_len,&value_len); + addReplyBulkCBuffer(c,key,key_len); + addReplyBulkCBuffer(c,value,value_len); + } + + /* If a group is passed, we need to create an entry in the + * PEL (pending entries list) of this group *and* this consumer. + * + * Note that we cannot be sure about the fact the message is not + * already owned by another consumer, because the admin is able + * to change the consumer group last delivered ID using the + * XGROUP SETID command. So if we find that there is already + * a NACK for the entry, we need to associate it to the new + * consumer. */ + if (group && !noack) { + unsigned char buf[sizeof(streamID)]; + streamEncodeID(buf,&id); + + /* Try to add a new NACK. Most of the time this will work and + * will not require extra lookups. We'll fix the problem later + * if we find that there is already a entry for this ID. */ + streamNACK *nack = streamCreateNACK(consumer); + int group_inserted = + raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL); + int consumer_inserted = + raxTryInsert(consumer->pel,buf,sizeof(buf),nack,NULL); + + /* Now we can check if the entry was already busy, and + * in that case reassign the entry to the new consumer, + * or update it if the consumer is the same as before. */ + if (group_inserted == 0) { + streamFreeNACK(nack); + nack = raxFind(group->pel,buf,sizeof(buf)); + serverAssert(nack != raxNotFound); + raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); + /* Update the consumer and NACK metadata. */ + nack->consumer = consumer; + nack->delivery_time = commandTimeSnapshot(); + nack->delivery_count = 1; + /* Add the entry in the new consumer local PEL. */ + raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); + } else if (group_inserted == 1 && consumer_inserted == 0) { + serverPanic("NACK half-created. Should not be possible."); + } + + consumer->active_time = commandTimeSnapshot(); + + /* Propagate as XCLAIM. */ + if (spi) { + robj *idarg = createObjectFromStreamID(&id); + streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack); + decrRefCount(idarg); + } + } + + arraylen++; + if (count && count == arraylen) break; + } + + if (spi && propagate_last_id) + streamPropagateGroupID(c,spi->keyname,group,spi->groupname); + + streamIteratorStop(&si); + if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen); + return arraylen; +} + +/* This is a helper function for streamReplyWithRange() when called with + * group and consumer arguments, but with a range that is referring to already + * delivered messages. In this case we just emit messages that are already + * in the history of the consumer, fetching the IDs from its PEL. + * + * Note that this function does not have a 'rev' argument because it's not + * possible to iterate in reverse using a group. Basically this function + * is only called as a result of the XREADGROUP command. + * + * This function is more expensive because it needs to inspect the PEL and then + * seek into the radix tree of the messages in order to emit the full message + * to the client. However clients only reach this code path when they are + * fetching the history of already retrieved messages, which is rare. */ +size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer) { + raxIterator ri; + unsigned char startkey[sizeof(streamID)]; + unsigned char endkey[sizeof(streamID)]; + streamEncodeID(startkey,start); + if (end) streamEncodeID(endkey,end); + + size_t arraylen = 0; + void *arraylen_ptr = addReplyDeferredLen(c); + raxStart(&ri,consumer->pel); + raxSeek(&ri,">=",startkey,sizeof(startkey)); + while(raxNext(&ri) && (!count || arraylen < count)) { + if (end && memcmp(ri.key,end,ri.key_len) > 0) break; + streamID thisid; + streamDecodeID(ri.key,&thisid); + if (streamReplyWithRange(c,s,&thisid,&thisid,1,0,NULL,NULL, + STREAM_RWR_RAWENTRIES,NULL) == 0) + { + /* Note that we may have a not acknowledged entry in the PEL + * about a message that's no longer here because was removed + * by the user by other means. In that case we signal it emitting + * the ID but then a NULL entry for the fields. */ + addReplyArrayLen(c,2); + addReplyStreamID(c,&thisid); + addReplyNullArray(c); + } else { + streamNACK *nack = ri.data; + nack->delivery_time = commandTimeSnapshot(); + nack->delivery_count++; + } + arraylen++; + } + raxStop(&ri); + setDeferredArrayLen(c,arraylen_ptr,arraylen); + return arraylen; +} + +/* ----------------------------------------------------------------------- + * Stream commands implementation + * ----------------------------------------------------------------------- */ + +/* Look the stream at 'key' and return the corresponding stream object. + * The function creates a key setting it to an empty stream if needed. */ +robj *streamTypeLookupWriteOrCreate(client *c, robj *key, int no_create) { + robj *o = lookupKeyWrite(c->db,key); + if (checkType(c,o,OBJ_STREAM)) return NULL; + if (o == NULL) { + if (no_create) { + addReplyNull(c); + return NULL; + } + o = createStreamObject(); + dbAdd(c->db,key,o); + } + return o; +} + +/* Parse a stream ID in the format given by clients to Redis, that is + * <ms>-<seq>, and converts it into a streamID structure. If + * the specified ID is invalid C_ERR is returned and an error is reported + * to the client, otherwise C_OK is returned. The ID may be in incomplete + * form, just stating the milliseconds time part of the stream. In such a case + * the missing part is set according to the value of 'missing_seq' parameter. + * + * The IDs "-" and "+" specify respectively the minimum and maximum IDs + * that can be represented. If 'strict' is set to 1, "-" and "+" will be + * treated as an invalid ID. + * + * The ID form <ms>-* specifies a millisconds-only ID, leaving the sequence part + * to be autogenerated. When a non-NULL 'seq_given' argument is provided, this + * form is accepted and the argument is set to 0 unless the sequence part is + * specified. + * + * If 'c' is set to NULL, no reply is sent to the client. */ +int streamGenericParseIDOrReply(client *c, const robj *o, streamID *id, uint64_t missing_seq, int strict, int *seq_given) { + char buf[128]; + if (sdslen(o->ptr) > sizeof(buf)-1) goto invalid; + memcpy(buf,o->ptr,sdslen(o->ptr)+1); + + if (strict && (buf[0] == '-' || buf[0] == '+') && buf[1] == '\0') + goto invalid; + + if (seq_given != NULL) { + *seq_given = 1; + } + + /* Handle the "-" and "+" special cases. */ + if (buf[0] == '-' && buf[1] == '\0') { + id->ms = 0; + id->seq = 0; + return C_OK; + } else if (buf[0] == '+' && buf[1] == '\0') { + id->ms = UINT64_MAX; + id->seq = UINT64_MAX; + return C_OK; + } + + /* Parse <ms>-<seq> form. */ + unsigned long long ms, seq; + char *dot = strchr(buf,'-'); + if (dot) *dot = '\0'; + if (string2ull(buf,&ms) == 0) goto invalid; + if (dot) { + size_t seqlen = strlen(dot+1); + if (seq_given != NULL && seqlen == 1 && *(dot + 1) == '*') { + /* Handle the <ms>-* form. */ + seq = 0; + *seq_given = 0; + } else if (string2ull(dot+1,&seq) == 0) { + goto invalid; + } + } else { + seq = missing_seq; + } + id->ms = ms; + id->seq = seq; + return C_OK; + +invalid: + if (c) addReplyError(c,"Invalid stream ID specified as stream " + "command argument"); + return C_ERR; +} + +/* Wrapper for streamGenericParseIDOrReply() used by module API. */ +int streamParseID(const robj *o, streamID *id) { + return streamGenericParseIDOrReply(NULL,o,id,0,0,NULL); +} + +/* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to + * 0, to be used when - and + are acceptable IDs. */ +int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) { + return streamGenericParseIDOrReply(c,o,id,missing_seq,0,NULL); +} + +/* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to + * 1, to be used when we want to return an error if the special IDs + or - + * are provided. */ +int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int *seq_given) { + return streamGenericParseIDOrReply(c,o,id,missing_seq,1,seq_given); +} + +/* Helper for parsing a stream ID that is a range query interval. When the + * exclude argument is NULL, streamParseIDOrReply() is called and the interval + * is treated as close (inclusive). Otherwise, the exclude argument is set if + * the interval is open (the "(" prefix) and streamParseStrictIDOrReply() is + * called in that case. + */ +int streamParseIntervalIDOrReply(client *c, robj *o, streamID *id, int *exclude, uint64_t missing_seq) { + char *p = o->ptr; + size_t len = sdslen(p); + int invalid = 0; + + if (exclude != NULL) *exclude = (len > 1 && p[0] == '('); + if (exclude != NULL && *exclude) { + robj *t = createStringObject(p+1,len-1); + invalid = (streamParseStrictIDOrReply(c,t,id,missing_seq,NULL) == C_ERR); + decrRefCount(t); + } else + invalid = (streamParseIDOrReply(c,o,id,missing_seq) == C_ERR); + if (invalid) + return C_ERR; + return C_OK; +} + +void streamRewriteApproxSpecifier(client *c, int idx) { + rewriteClientCommandArgument(c,idx,shared.special_equals); +} + +/* We propagate MAXLEN/MINID ~ <count> as MAXLEN/MINID = <resulting-len-of-stream> + * otherwise trimming is no longer deterministic on replicas / AOF. */ +void streamRewriteTrimArgument(client *c, stream *s, int trim_strategy, int idx) { + robj *arg; + if (trim_strategy == TRIM_STRATEGY_MAXLEN) { + arg = createStringObjectFromLongLong(s->length); + } else { + streamID first_id; + streamGetEdgeID(s,1,0,&first_id); + arg = createObjectFromStreamID(&first_id); + } + + rewriteClientCommandArgument(c,idx,arg); + decrRefCount(arg); +} + +/* XADD key [(MAXLEN [~|=] <count> | MINID [~|=] <id>) [LIMIT <entries>]] [NOMKSTREAM] <ID or *> [field value] [field value] ... */ +void xaddCommand(client *c) { + /* Parse options. */ + streamAddTrimArgs parsed_args; + int idpos = streamParseAddOrTrimArgsOrReply(c, &parsed_args, 1); + if (idpos < 0) + return; /* streamParseAddOrTrimArgsOrReply already replied. */ + int field_pos = idpos+1; /* The ID is always one argument before the first field */ + + /* Check arity. */ + if ((c->argc - field_pos) < 2 || ((c->argc-field_pos) % 2) == 1) { + addReplyErrorArity(c); + return; + } + + /* Return ASAP if minimal ID (0-0) was given so we avoid possibly creating + * a new stream and have streamAppendItem fail, leaving an empty key in the + * database. */ + if (parsed_args.id_given && parsed_args.seq_given && + parsed_args.id.ms == 0 && parsed_args.id.seq == 0) + { + addReplyError(c,"The ID specified in XADD must be greater than 0-0"); + return; + } + + /* Lookup the stream at key. */ + robj *o; + stream *s; + if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1],parsed_args.no_mkstream)) == NULL) return; + s = o->ptr; + + /* Return ASAP if the stream has reached the last possible ID */ + if (s->last_id.ms == UINT64_MAX && s->last_id.seq == UINT64_MAX) { + addReplyError(c,"The stream has exhausted the last possible ID, " + "unable to add more items"); + return; + } + + /* Append using the low level function and return the ID. */ + errno = 0; + streamID id; + if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2, + &id,parsed_args.id_given ? &parsed_args.id : NULL,parsed_args.seq_given) == C_ERR) + { + serverAssert(errno != 0); + if (errno == EDOM) + addReplyError(c,"The ID specified in XADD is equal or smaller than " + "the target stream top item"); + else + addReplyError(c,"Elements are too large to be stored"); + return; + } + sds replyid = createStreamIDString(&id); + addReplyBulkCBuffer(c, replyid, sdslen(replyid)); + + signalModifiedKey(c,c->db,c->argv[1]); + notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id); + server.dirty++; + + /* Trim if needed. */ + if (parsed_args.trim_strategy != TRIM_STRATEGY_NONE) { + if (streamTrim(s, &parsed_args)) { + notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id); + } + if (parsed_args.approx_trim) { + /* In case our trimming was limited (by LIMIT or by ~) we must + * re-write the relevant trim argument to make sure there will be + * no inconsistencies in AOF loading or in the replica. + * It's enough to check only args->approx because there is no + * way LIMIT is given without the ~ option. */ + streamRewriteApproxSpecifier(c,parsed_args.trim_strategy_arg_idx-1); + streamRewriteTrimArgument(c,s,parsed_args.trim_strategy,parsed_args.trim_strategy_arg_idx); + } + } + + /* Let's rewrite the ID argument with the one actually generated for + * AOF/replication propagation. */ + if (!parsed_args.id_given || !parsed_args.seq_given) { + robj *idarg = createObject(OBJ_STRING, replyid); + rewriteClientCommandArgument(c, idpos, idarg); + decrRefCount(idarg); + } else { + sdsfree(replyid); + } + + /* We need to signal to blocked clients that there is new data on this + * stream. */ + signalKeyAsReady(c->db, c->argv[1], OBJ_STREAM); +} + +/* XRANGE/XREVRANGE actual implementation. + * The 'start' and 'end' IDs are parsed as follows: + * Incomplete 'start' has its sequence set to 0, and 'end' to UINT64_MAX. + * "-" and "+"" mean the minimal and maximal ID values, respectively. + * The "(" prefix means an open (exclusive) range, so XRANGE stream (1-0 (2-0 + * will match anything from 1-1 and 1-UINT64_MAX. + */ +void xrangeGenericCommand(client *c, int rev) { + robj *o; + stream *s; + streamID startid, endid; + long long count = -1; + robj *startarg = rev ? c->argv[3] : c->argv[2]; + robj *endarg = rev ? c->argv[2] : c->argv[3]; + int startex = 0, endex = 0; + + /* Parse start and end IDs. */ + if (streamParseIntervalIDOrReply(c,startarg,&startid,&startex,0) != C_OK) + return; + if (startex && streamIncrID(&startid) != C_OK) { + addReplyError(c,"invalid start ID for the interval"); + return; + } + if (streamParseIntervalIDOrReply(c,endarg,&endid,&endex,UINT64_MAX) != C_OK) + return; + if (endex && streamDecrID(&endid) != C_OK) { + addReplyError(c,"invalid end ID for the interval"); + return; + } + + /* Parse the COUNT option if any. */ + if (c->argc > 4) { + for (int j = 4; j < c->argc; j++) { + int additional = c->argc-j-1; + if (strcasecmp(c->argv[j]->ptr,"COUNT") == 0 && additional >= 1) { + if (getLongLongFromObjectOrReply(c,c->argv[j+1],&count,NULL) + != C_OK) return; + if (count < 0) count = 0; + j++; /* Consume additional arg. */ + } else { + addReplyErrorObject(c,shared.syntaxerr); + return; + } + } + } + + /* Return the specified range to the user. */ + if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL || + checkType(c,o,OBJ_STREAM)) return; + + s = o->ptr; + + if (count == 0) { + addReplyNullArray(c); + } else { + if (count == -1) count = 0; + streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL); + } +} + +/* XRANGE key start end [COUNT <n>] */ +void xrangeCommand(client *c) { + xrangeGenericCommand(c,0); +} + +/* XREVRANGE key end start [COUNT <n>] */ +void xrevrangeCommand(client *c) { + xrangeGenericCommand(c,1); +} + +/* XLEN key*/ +void xlenCommand(client *c) { + robj *o; + if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL + || checkType(c,o,OBJ_STREAM)) return; + stream *s = o->ptr; + addReplyLongLong(c,s->length); +} + +/* XREAD [BLOCK <milliseconds>] [COUNT <count>] STREAMS key_1 key_2 ... key_N + * ID_1 ID_2 ... ID_N + * + * This function also implements the XREADGROUP command, which is like XREAD + * but accepting the [GROUP group-name consumer-name] additional option. + * This is useful because while XREAD is a read command and can be called + * on slaves, XREADGROUP is not. */ +#define XREAD_BLOCKED_DEFAULT_COUNT 1000 +void xreadCommand(client *c) { + long long timeout = -1; /* -1 means, no BLOCK argument given. */ + long long count = 0; + int streams_count = 0; + int streams_arg = 0; + int noack = 0; /* True if NOACK option was specified. */ + streamID static_ids[STREAMID_STATIC_VECTOR_LEN]; + streamID *ids = static_ids; + streamCG **groups = NULL; + int xreadgroup = sdslen(c->argv[0]->ptr) == 10; /* XREAD or XREADGROUP? */ + robj *groupname = NULL; + robj *consumername = NULL; + + /* Parse arguments. */ + for (int i = 1; i < c->argc; i++) { + int moreargs = c->argc-i-1; + char *o = c->argv[i]->ptr; + if (!strcasecmp(o,"BLOCK") && moreargs) { + if (c->flags & CLIENT_SCRIPT) { + /* + * Although the CLIENT_DENY_BLOCKING flag should protect from blocking the client + * on Lua/MULTI/RM_Call we want special treatment for Lua to keep backward compatibility. + * There is no sense to use BLOCK option within Lua. */ + addReplyErrorFormat(c, "%s command is not allowed with BLOCK option from scripts", (char *)c->argv[0]->ptr); + return; + } + i++; + if (getTimeoutFromObjectOrReply(c,c->argv[i],&timeout, + UNIT_MILLISECONDS) != C_OK) return; + } else if (!strcasecmp(o,"COUNT") && moreargs) { + i++; + if (getLongLongFromObjectOrReply(c,c->argv[i],&count,NULL) != C_OK) + return; + if (count < 0) count = 0; + } else if (!strcasecmp(o,"STREAMS") && moreargs) { + streams_arg = i+1; + streams_count = (c->argc-streams_arg); + if ((streams_count % 2) != 0) { + char symbol = xreadgroup ? '>' : '$'; + addReplyErrorFormat(c,"Unbalanced '%s' list of streams: " + "for each stream key an ID or '%c' must be " + "specified.", c->cmd->fullname,symbol); + return; + } + streams_count /= 2; /* We have two arguments for each stream. */ + break; + } else if (!strcasecmp(o,"GROUP") && moreargs >= 2) { + if (!xreadgroup) { + addReplyError(c,"The GROUP option is only supported by " + "XREADGROUP. You called XREAD instead."); + return; + } + groupname = c->argv[i+1]; + consumername = c->argv[i+2]; + i += 2; + } else if (!strcasecmp(o,"NOACK")) { + if (!xreadgroup) { + addReplyError(c,"The NOACK option is only supported by " + "XREADGROUP. You called XREAD instead."); + return; + } + noack = 1; + } else { + addReplyErrorObject(c,shared.syntaxerr); + return; + } + } + + /* STREAMS option is mandatory. */ + if (streams_arg == 0) { + addReplyErrorObject(c,shared.syntaxerr); + return; + } + + /* If the user specified XREADGROUP then it must also + * provide the GROUP option. */ + if (xreadgroup && groupname == NULL) { + addReplyError(c,"Missing GROUP option for XREADGROUP"); + return; + } + + /* Parse the IDs and resolve the group name. */ + if (streams_count > STREAMID_STATIC_VECTOR_LEN) + ids = zmalloc(sizeof(streamID)*streams_count); + if (groupname) groups = zmalloc(sizeof(streamCG*)*streams_count); + + for (int i = streams_arg + streams_count; i < c->argc; i++) { + /* Specifying "$" as last-known-id means that the client wants to be + * served with just the messages that will arrive into the stream + * starting from now. */ + int id_idx = i - streams_arg - streams_count; + robj *key = c->argv[i-streams_count]; + robj *o = lookupKeyRead(c->db,key); + if (checkType(c,o,OBJ_STREAM)) goto cleanup; + streamCG *group = NULL; + + /* If a group was specified, than we need to be sure that the + * key and group actually exist. */ + if (groupname) { + if (o == NULL || + (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL) + { + addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer " + "group '%s' in XREADGROUP with GROUP " + "option", + (char*)key->ptr,(char*)groupname->ptr); + goto cleanup; + } + groups[id_idx] = group; + } + + if (strcmp(c->argv[i]->ptr,"$") == 0) { + if (xreadgroup) { + addReplyError(c,"The $ ID is meaningless in the context of " + "XREADGROUP: you want to read the history of " + "this consumer by specifying a proper ID, or " + "use the > ID to get new messages. The $ ID would " + "just return an empty result set."); + goto cleanup; + } + if (o) { + stream *s = o->ptr; + ids[id_idx] = s->last_id; + } else { + ids[id_idx].ms = 0; + ids[id_idx].seq = 0; + } + continue; + } else if (strcmp(c->argv[i]->ptr,">") == 0) { + if (!xreadgroup) { + addReplyError(c,"The > ID can be specified only when calling " + "XREADGROUP using the GROUP <group> " + "<consumer> option."); + goto cleanup; + } + /* We use just the maximum ID to signal this is a ">" ID, anyway + * the code handling the blocking clients will have to update the + * ID later in order to match the changing consumer group last ID. */ + ids[id_idx].ms = UINT64_MAX; + ids[id_idx].seq = UINT64_MAX; + continue; + } + if (streamParseStrictIDOrReply(c,c->argv[i],ids+id_idx,0,NULL) != C_OK) + goto cleanup; + } + + /* Try to serve the client synchronously. */ + size_t arraylen = 0; + void *arraylen_ptr = NULL; + for (int i = 0; i < streams_count; i++) { + robj *o = lookupKeyRead(c->db,c->argv[streams_arg+i]); + if (o == NULL) continue; + stream *s = o->ptr; + streamID *gt = ids+i; /* ID must be greater than this. */ + int serve_synchronously = 0; + int serve_history = 0; /* True for XREADGROUP with ID != ">". */ + streamConsumer *consumer = NULL; /* Unused if XREAD */ + streamPropInfo spi = {c->argv[streams_arg+i],groupname}; /* Unused if XREAD */ + + /* Check if there are the conditions to serve the client + * synchronously. */ + if (groups) { + /* If the consumer is blocked on a group, we always serve it + * synchronously (serving its local history) if the ID specified + * was not the special ">" ID. */ + if (gt->ms != UINT64_MAX || + gt->seq != UINT64_MAX) + { + serve_synchronously = 1; + serve_history = 1; + } else if (s->length) { + /* We also want to serve a consumer in a consumer group + * synchronously in case the group top item delivered is smaller + * than what the stream has inside. */ + streamID maxid, *last = &groups[i]->last_id; + streamLastValidID(s, &maxid); + if (streamCompareID(&maxid, last) > 0) { + serve_synchronously = 1; + *gt = *last; + } + } + consumer = streamLookupConsumer(groups[i],consumername->ptr); + if (consumer == NULL) { + consumer = streamCreateConsumer(groups[i],consumername->ptr, + c->argv[streams_arg+i], + c->db->id,SCC_DEFAULT); + if (noack) + streamPropagateConsumerCreation(c,spi.keyname, + spi.groupname, + consumer->name); + } + consumer->seen_time = commandTimeSnapshot(); + } else if (s->length) { + /* For consumers without a group, we serve synchronously if we can + * actually provide at least one item from the stream. */ + streamID maxid; + streamLastValidID(s, &maxid); + if (streamCompareID(&maxid, gt) > 0) { + serve_synchronously = 1; + } + } + + if (serve_synchronously) { + arraylen++; + if (arraylen == 1) arraylen_ptr = addReplyDeferredLen(c); + /* streamReplyWithRange() handles the 'start' ID as inclusive, + * so start from the next ID, since we want only messages with + * IDs greater than start. */ + streamID start = *gt; + streamIncrID(&start); + + /* Emit the two elements sub-array consisting of the name + * of the stream and the data we extracted from it. */ + if (c->resp == 2) addReplyArrayLen(c,2); + addReplyBulk(c,c->argv[streams_arg+i]); + + int flags = 0; + if (noack) flags |= STREAM_RWR_NOACK; + if (serve_history) flags |= STREAM_RWR_HISTORY; + streamReplyWithRange(c,s,&start,NULL,count,0, + groups ? groups[i] : NULL, + consumer, flags, &spi); + if (groups) server.dirty++; + } + } + + /* We replied synchronously! Set the top array len and return to caller. */ + if (arraylen) { + if (c->resp == 2) + setDeferredArrayLen(c,arraylen_ptr,arraylen); + else + setDeferredMapLen(c,arraylen_ptr,arraylen); + goto cleanup; + } + + /* Block if needed. */ + if (timeout != -1) { + /* If we are not allowed to block the client, the only thing + * we can do is treating it as a timeout (even with timeout 0). */ + if (c->flags & CLIENT_DENY_BLOCKING) { + addReplyNullArray(c); + goto cleanup; + } + /* We change the '$' to the current last ID for this stream. this is + * Since later on when we unblock on arriving data - we would like to + * re-process the command and in case '$' stays we will spin-block forever. + */ + for (int id_idx = 0; id_idx < streams_count; id_idx++) { + int arg_idx = id_idx + streams_arg + streams_count; + if (strcmp(c->argv[arg_idx]->ptr,"$") == 0) { + robj *argv_streamid = createObjectFromStreamID(&ids[id_idx]); + rewriteClientCommandArgument(c, arg_idx, argv_streamid); + decrRefCount(argv_streamid); + } + } + blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count, timeout, xreadgroup); + goto cleanup; + } + + /* No BLOCK option, nor any stream we can serve. Reply as with a + * timeout happened. */ + addReplyNullArray(c); + /* Continue to cleanup... */ + +cleanup: /* Cleanup. */ + + /* The command is propagated (in the READGROUP form) as a side effect + * of calling lower level APIs. So stop any implicit propagation. */ + preventCommandPropagation(c); + if (ids != static_ids) zfree(ids); + zfree(groups); +} + +/* ----------------------------------------------------------------------- + * Low level implementation of consumer groups + * ----------------------------------------------------------------------- */ + +/* Create a NACK entry setting the delivery count to 1 and the delivery + * time to the current time. The NACK consumer will be set to the one + * specified as argument of the function. */ +streamNACK *streamCreateNACK(streamConsumer *consumer) { + streamNACK *nack = zmalloc(sizeof(*nack)); + nack->delivery_time = commandTimeSnapshot(); + nack->delivery_count = 1; + nack->consumer = consumer; + return nack; +} + +/* Free a NACK entry. */ +void streamFreeNACK(streamNACK *na) { + zfree(na); +} + +/* Free a consumer and associated data structures. Note that this function + * will not reassign the pending messages associated with this consumer + * nor will delete them from the stream, so when this function is called + * to delete a consumer, and not when the whole stream is destroyed, the caller + * should do some work before. */ +void streamFreeConsumer(streamConsumer *sc) { + raxFree(sc->pel); /* No value free callback: the PEL entries are shared + between the consumer and the main stream PEL. */ + sdsfree(sc->name); + zfree(sc); +} + +/* Create a new consumer group in the context of the stream 's', having the + * specified name, last server ID and reads counter. If a consumer group with + * the same name already exists NULL is returned, otherwise the pointer to the + * consumer group is returned. */ +streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read) { + if (s->cgroups == NULL) s->cgroups = raxNew(); + if (raxFind(s->cgroups,(unsigned char*)name,namelen) != raxNotFound) + return NULL; + + streamCG *cg = zmalloc(sizeof(*cg)); + cg->pel = raxNew(); + cg->consumers = raxNew(); + cg->last_id = *id; + cg->entries_read = entries_read; + raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL); + return cg; +} + +/* Free a consumer group and all its associated data. */ +void streamFreeCG(streamCG *cg) { + raxFreeWithCallback(cg->pel,(void(*)(void*))streamFreeNACK); + raxFreeWithCallback(cg->consumers,(void(*)(void*))streamFreeConsumer); + zfree(cg); +} + +/* Lookup the consumer group in the specified stream and returns its + * pointer, otherwise if there is no such group, NULL is returned. */ +streamCG *streamLookupCG(stream *s, sds groupname) { + if (s->cgroups == NULL) return NULL; + streamCG *cg = raxFind(s->cgroups,(unsigned char*)groupname, + sdslen(groupname)); + return (cg == raxNotFound) ? NULL : cg; +} + +/* Create a consumer with the specified name in the group 'cg' and return. + * If the consumer exists, return NULL. As a side effect, when the consumer + * is successfully created, the key space will be notified and dirty++ unless + * the SCC_NO_NOTIFY or SCC_NO_DIRTIFY flags is specified. */ +streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid, int flags) { + if (cg == NULL) return NULL; + int notify = !(flags & SCC_NO_NOTIFY); + int dirty = !(flags & SCC_NO_DIRTIFY); + streamConsumer *consumer = zmalloc(sizeof(*consumer)); + int success = raxTryInsert(cg->consumers,(unsigned char*)name, + sdslen(name),consumer,NULL); + if (!success) { + zfree(consumer); + return NULL; + } + consumer->name = sdsdup(name); + consumer->pel = raxNew(); + consumer->active_time = -1; + consumer->seen_time = commandTimeSnapshot(); + if (dirty) server.dirty++; + if (notify) notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-createconsumer",key,dbid); + return consumer; +} + +/* Lookup the consumer with the specified name in the group 'cg'. */ +streamConsumer *streamLookupConsumer(streamCG *cg, sds name) { + if (cg == NULL) return NULL; + streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name, + sdslen(name)); + if (consumer == raxNotFound) return NULL; + return consumer; +} + +/* Delete the consumer specified in the consumer group 'cg'. */ +void streamDelConsumer(streamCG *cg, streamConsumer *consumer) { + /* Iterate all the consumer pending messages, deleting every corresponding + * entry from the global entry. */ + raxIterator ri; + raxStart(&ri,consumer->pel); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + streamNACK *nack = ri.data; + raxRemove(cg->pel,ri.key,ri.key_len,NULL); + streamFreeNACK(nack); + } + raxStop(&ri); + + /* Deallocate the consumer. */ + raxRemove(cg->consumers,(unsigned char*)consumer->name, + sdslen(consumer->name),NULL); + streamFreeConsumer(consumer); +} + +/* ----------------------------------------------------------------------- + * Consumer groups commands + * ----------------------------------------------------------------------- */ + +/* XGROUP CREATE <key> <groupname> <id or $> [MKSTREAM] [ENTRIESREAD entries_read] + * XGROUP SETID <key> <groupname> <id or $> [ENTRIESREAD entries_read] + * XGROUP DESTROY <key> <groupname> + * XGROUP CREATECONSUMER <key> <groupname> <consumer> + * XGROUP DELCONSUMER <key> <groupname> <consumername> */ +void xgroupCommand(client *c) { + stream *s = NULL; + sds grpname = NULL; + streamCG *cg = NULL; + char *opt = c->argv[1]->ptr; /* Subcommand name. */ + int mkstream = 0; + long long entries_read = SCG_INVALID_ENTRIES_READ; + robj *o; + + /* Everything but the "HELP" option requires a key and group name. */ + if (c->argc >= 4) { + /* Parse optional arguments for CREATE and SETID */ + int i = 5; + int create_subcmd = !strcasecmp(opt,"CREATE"); + int setid_subcmd = !strcasecmp(opt,"SETID"); + while (i < c->argc) { + if (create_subcmd && !strcasecmp(c->argv[i]->ptr,"MKSTREAM")) { + mkstream = 1; + i++; + } else if ((create_subcmd || setid_subcmd) && !strcasecmp(c->argv[i]->ptr,"ENTRIESREAD") && i + 1 < c->argc) { + if (getLongLongFromObjectOrReply(c,c->argv[i+1],&entries_read,NULL) != C_OK) + return; + if (entries_read < 0 && entries_read != SCG_INVALID_ENTRIES_READ) { + addReplyError(c,"value for ENTRIESREAD must be positive or -1"); + return; + } + i += 2; + } else { + addReplySubcommandSyntaxError(c); + return; + } + } + + o = lookupKeyWrite(c->db,c->argv[2]); + if (o) { + if (checkType(c,o,OBJ_STREAM)) return; + s = o->ptr; + } + grpname = c->argv[3]->ptr; + } + + /* Check for missing key/group. */ + if (c->argc >= 4 && !mkstream) { + /* At this point key must exist, or there is an error. */ + if (s == NULL) { + addReplyError(c, + "The XGROUP subcommand requires the key to exist. " + "Note that for CREATE you may want to use the MKSTREAM " + "option to create an empty stream automatically."); + return; + } + + /* Certain subcommands require the group to exist. */ + if ((cg = streamLookupCG(s,grpname)) == NULL && + (!strcasecmp(opt,"SETID") || + !strcasecmp(opt,"CREATECONSUMER") || + !strcasecmp(opt,"DELCONSUMER"))) + { + addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' " + "for key name '%s'", + (char*)grpname, (char*)c->argv[2]->ptr); + return; + } + } + + /* Dispatch the different subcommands. */ + if (c->argc == 2 && !strcasecmp(opt,"HELP")) { + const char *help[] = { +"CREATE <key> <groupname> <id|$> [option]", +" Create a new consumer group. Options are:", +" * MKSTREAM", +" Create the empty stream if it does not exist.", +" * ENTRIESREAD entries_read", +" Set the group's entries_read counter (internal use).", +"CREATECONSUMER <key> <groupname> <consumer>", +" Create a new consumer in the specified group.", +"DELCONSUMER <key> <groupname> <consumer>", +" Remove the specified consumer.", +"DESTROY <key> <groupname>", +" Remove the specified group.", +"SETID <key> <groupname> <id|$> [ENTRIESREAD entries_read]", +" Set the current group ID and entries_read counter.", +NULL + }; + addReplyHelp(c, help); + } else if (!strcasecmp(opt,"CREATE") && (c->argc >= 5 && c->argc <= 8)) { + streamID id; + if (!strcmp(c->argv[4]->ptr,"$")) { + if (s) { + id = s->last_id; + } else { + id.ms = 0; + id.seq = 0; + } + } else if (streamParseStrictIDOrReply(c,c->argv[4],&id,0,NULL) != C_OK) { + return; + } + + /* Handle the MKSTREAM option now that the command can no longer fail. */ + if (s == NULL) { + serverAssert(mkstream); + o = createStreamObject(); + dbAdd(c->db,c->argv[2],o); + s = o->ptr; + signalModifiedKey(c,c->db,c->argv[2]); + } + + streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id,entries_read); + if (cg) { + addReply(c,shared.ok); + server.dirty++; + notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-create", + c->argv[2],c->db->id); + } else { + addReplyError(c,"-BUSYGROUP Consumer Group name already exists"); + } + } else if (!strcasecmp(opt,"SETID") && (c->argc == 5 || c->argc == 7)) { + streamID id; + if (!strcmp(c->argv[4]->ptr,"$")) { + id = s->last_id; + } else if (streamParseIDOrReply(c,c->argv[4],&id,0) != C_OK) { + return; + } + cg->last_id = id; + cg->entries_read = entries_read; + addReply(c,shared.ok); + server.dirty++; + notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-setid",c->argv[2],c->db->id); + } else if (!strcasecmp(opt,"DESTROY") && c->argc == 4) { + if (cg) { + raxRemove(s->cgroups,(unsigned char*)grpname,sdslen(grpname),NULL); + streamFreeCG(cg); + addReply(c,shared.cone); + server.dirty++; + notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-destroy", + c->argv[2],c->db->id); + /* We want to unblock any XREADGROUP consumers with -NOGROUP. */ + signalKeyAsReady(c->db,c->argv[2],OBJ_STREAM); + } else { + addReply(c,shared.czero); + } + } else if (!strcasecmp(opt,"CREATECONSUMER") && c->argc == 5) { + streamConsumer *created = streamCreateConsumer(cg,c->argv[4]->ptr,c->argv[2], + c->db->id,SCC_DEFAULT); + addReplyLongLong(c,created ? 1 : 0); + } else if (!strcasecmp(opt,"DELCONSUMER") && c->argc == 5) { + long long pending = 0; + streamConsumer *consumer = streamLookupConsumer(cg,c->argv[4]->ptr); + if (consumer) { + /* Delete the consumer and returns the number of pending messages + * that were yet associated with such a consumer. */ + pending = raxSize(consumer->pel); + streamDelConsumer(cg,consumer); + server.dirty++; + notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-delconsumer", + c->argv[2],c->db->id); + } + addReplyLongLong(c,pending); + } else { + addReplySubcommandSyntaxError(c); + } +} + +/* XSETID <stream> <id> [ENTRIESADDED entries_added] [MAXDELETEDID max_deleted_entry_id] + * + * Set the internal "last ID", "added entries" and "maximal deleted entry ID" + * of a stream. */ +void xsetidCommand(client *c) { + streamID id, max_xdel_id = {0, 0}; + long long entries_added = -1; + + if (streamParseStrictIDOrReply(c,c->argv[2],&id,0,NULL) != C_OK) + return; + + int i = 3; + while (i < c->argc) { + int moreargs = (c->argc-1) - i; /* Number of additional arguments. */ + char *opt = c->argv[i]->ptr; + if (!strcasecmp(opt,"ENTRIESADDED") && moreargs) { + if (getLongLongFromObjectOrReply(c,c->argv[i+1],&entries_added,NULL) != C_OK) { + return; + } else if (entries_added < 0) { + addReplyError(c,"entries_added must be positive"); + return; + } + i += 2; + } else if (!strcasecmp(opt,"MAXDELETEDID") && moreargs) { + if (streamParseStrictIDOrReply(c,c->argv[i+1],&max_xdel_id,0,NULL) != C_OK) { + return; + } else if (streamCompareID(&id,&max_xdel_id) < 0) { + addReplyError(c,"The ID specified in XSETID is smaller than the provided max_deleted_entry_id"); + return; + } + i += 2; + } else { + addReplyErrorObject(c,shared.syntaxerr); + return; + } + } + + robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr); + if (o == NULL || checkType(c,o,OBJ_STREAM)) return; + stream *s = o->ptr; + + if (streamCompareID(&id,&s->max_deleted_entry_id) < 0) { + addReplyError(c,"The ID specified in XSETID is smaller than current max_deleted_entry_id"); + return; + } + + /* If the stream has at least one item, we want to check that the user + * is setting a last ID that is equal or greater than the current top + * item, otherwise the fundamental ID monotonicity assumption is violated. */ + if (s->length > 0) { + streamID maxid; + streamLastValidID(s,&maxid); + + if (streamCompareID(&id,&maxid) < 0) { + addReplyError(c,"The ID specified in XSETID is smaller than the target stream top item"); + return; + } + + /* If an entries_added was provided, it can't be lower than the length. */ + if (entries_added != -1 && s->length > (uint64_t)entries_added) { + addReplyError(c,"The entries_added specified in XSETID is smaller than the target stream length"); + return; + } + } + + s->last_id = id; + if (entries_added != -1) + s->entries_added = entries_added; + if (!streamIDEqZero(&max_xdel_id)) + s->max_deleted_entry_id = max_xdel_id; + addReply(c,shared.ok); + server.dirty++; + notifyKeyspaceEvent(NOTIFY_STREAM,"xsetid",c->argv[1],c->db->id); +} + +/* XACK <key> <group> <id> <id> ... <id> + * Acknowledge a message as processed. In practical terms we just check the + * pending entries list (PEL) of the group, and delete the PEL entry both from + * the group and the consumer (pending messages are referenced in both places). + * + * Return value of the command is the number of messages successfully + * acknowledged, that is, the IDs we were actually able to resolve in the PEL. + */ +void xackCommand(client *c) { + streamCG *group = NULL; + robj *o = lookupKeyRead(c->db,c->argv[1]); + if (o) { + if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */ + group = streamLookupCG(o->ptr,c->argv[2]->ptr); + } + + /* No key or group? Nothing to ack. */ + if (o == NULL || group == NULL) { + addReply(c,shared.czero); + return; + } + + /* Start parsing the IDs, so that we abort ASAP if there is a syntax + * error: the return value of this command cannot be an error in case + * the client successfully acknowledged some messages, so it should be + * executed in a "all or nothing" fashion. */ + streamID static_ids[STREAMID_STATIC_VECTOR_LEN]; + streamID *ids = static_ids; + int id_count = c->argc-3; + if (id_count > STREAMID_STATIC_VECTOR_LEN) + ids = zmalloc(sizeof(streamID)*id_count); + for (int j = 3; j < c->argc; j++) { + if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-3],0,NULL) != C_OK) goto cleanup; + } + + int acknowledged = 0; + for (int j = 3; j < c->argc; j++) { + unsigned char buf[sizeof(streamID)]; + streamEncodeID(buf,&ids[j-3]); + + /* Lookup the ID in the group PEL: it will have a reference to the + * NACK structure that will have a reference to the consumer, so that + * we are able to remove the entry from both PELs. */ + streamNACK *nack = raxFind(group->pel,buf,sizeof(buf)); + if (nack != raxNotFound) { + raxRemove(group->pel,buf,sizeof(buf),NULL); + raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); + streamFreeNACK(nack); + acknowledged++; + server.dirty++; + } + } + addReplyLongLong(c,acknowledged); +cleanup: + if (ids != static_ids) zfree(ids); +} + +/* XPENDING <key> <group> [[IDLE <idle>] <start> <stop> <count> [<consumer>]] + * + * If start and stop are omitted, the command just outputs information about + * the amount of pending messages for the key/group pair, together with + * the minimum and maximum ID of pending messages. + * + * If start and stop are provided instead, the pending messages are returned + * with information about the current owner, number of deliveries and last + * delivery time and so forth. */ +void xpendingCommand(client *c) { + int justinfo = c->argc == 3; /* Without the range just outputs general + information about the PEL. */ + robj *key = c->argv[1]; + robj *groupname = c->argv[2]; + robj *consumername = NULL; + streamID startid, endid; + long long count = 0; + long long minidle = 0; + int startex = 0, endex = 0; + + /* Start and stop, and the consumer, can be omitted. Also the IDLE modifier. */ + if (c->argc != 3 && (c->argc < 6 || c->argc > 9)) { + addReplyErrorObject(c,shared.syntaxerr); + return; + } + + /* Parse start/end/count arguments ASAP if needed, in order to report + * syntax errors before any other error. */ + if (c->argc >= 6) { + int startidx = 3; /* Without IDLE */ + + if (!strcasecmp(c->argv[3]->ptr, "IDLE")) { + if (getLongLongFromObjectOrReply(c, c->argv[4], &minidle, NULL) == C_ERR) + return; + if (c->argc < 8) { + /* If IDLE was provided we must have at least 'start end count' */ + addReplyErrorObject(c,shared.syntaxerr); + return; + } + /* Search for rest of arguments after 'IDLE <idle>' */ + startidx += 2; + } + + /* count argument. */ + if (getLongLongFromObjectOrReply(c,c->argv[startidx+2],&count,NULL) == C_ERR) + return; + if (count < 0) count = 0; + + /* start and end arguments. */ + if (streamParseIntervalIDOrReply(c,c->argv[startidx],&startid,&startex,0) != C_OK) + return; + if (startex && streamIncrID(&startid) != C_OK) { + addReplyError(c,"invalid start ID for the interval"); + return; + } + if (streamParseIntervalIDOrReply(c,c->argv[startidx+1],&endid,&endex,UINT64_MAX) != C_OK) + return; + if (endex && streamDecrID(&endid) != C_OK) { + addReplyError(c,"invalid end ID for the interval"); + return; + } + + if (startidx+3 < c->argc) { + /* 'consumer' was provided */ + consumername = c->argv[startidx+3]; + } + } + + /* Lookup the key and the group inside the stream. */ + robj *o = lookupKeyRead(c->db,c->argv[1]); + streamCG *group; + + if (checkType(c,o,OBJ_STREAM)) return; + if (o == NULL || + (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL) + { + addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer " + "group '%s'", + (char*)key->ptr,(char*)groupname->ptr); + return; + } + + /* XPENDING <key> <group> variant. */ + if (justinfo) { + addReplyArrayLen(c,4); + /* Total number of messages in the PEL. */ + addReplyLongLong(c,raxSize(group->pel)); + /* First and last IDs. */ + if (raxSize(group->pel) == 0) { + addReplyNull(c); /* Start. */ + addReplyNull(c); /* End. */ + addReplyNullArray(c); /* Clients. */ + } else { + /* Start. */ + raxIterator ri; + raxStart(&ri,group->pel); + raxSeek(&ri,"^",NULL,0); + raxNext(&ri); + streamDecodeID(ri.key,&startid); + addReplyStreamID(c,&startid); + + /* End. */ + raxSeek(&ri,"$",NULL,0); + raxNext(&ri); + streamDecodeID(ri.key,&endid); + addReplyStreamID(c,&endid); + raxStop(&ri); + + /* Consumers with pending messages. */ + raxStart(&ri,group->consumers); + raxSeek(&ri,"^",NULL,0); + void *arraylen_ptr = addReplyDeferredLen(c); + size_t arraylen = 0; + while(raxNext(&ri)) { + streamConsumer *consumer = ri.data; + if (raxSize(consumer->pel) == 0) continue; + addReplyArrayLen(c,2); + addReplyBulkCBuffer(c,ri.key,ri.key_len); + addReplyBulkLongLong(c,raxSize(consumer->pel)); + arraylen++; + } + setDeferredArrayLen(c,arraylen_ptr,arraylen); + raxStop(&ri); + } + } else { /* <start>, <stop> and <count> provided, return actual pending entries (not just info) */ + streamConsumer *consumer = NULL; + if (consumername) { + consumer = streamLookupConsumer(group,consumername->ptr); + + /* If a consumer name was mentioned but it does not exist, we can + * just return an empty array. */ + if (consumer == NULL) { + addReplyArrayLen(c,0); + return; + } + } + + rax *pel = consumer ? consumer->pel : group->pel; + unsigned char startkey[sizeof(streamID)]; + unsigned char endkey[sizeof(streamID)]; + raxIterator ri; + mstime_t now = commandTimeSnapshot(); + + streamEncodeID(startkey,&startid); + streamEncodeID(endkey,&endid); + raxStart(&ri,pel); + raxSeek(&ri,">=",startkey,sizeof(startkey)); + void *arraylen_ptr = addReplyDeferredLen(c); + size_t arraylen = 0; + + while(count && raxNext(&ri) && memcmp(ri.key,endkey,ri.key_len) <= 0) { + streamNACK *nack = ri.data; + + if (minidle) { + mstime_t this_idle = now - nack->delivery_time; + if (this_idle < minidle) continue; + } + + arraylen++; + count--; + addReplyArrayLen(c,4); + + /* Entry ID. */ + streamID id; + streamDecodeID(ri.key,&id); + addReplyStreamID(c,&id); + + /* Consumer name. */ + addReplyBulkCBuffer(c,nack->consumer->name, + sdslen(nack->consumer->name)); + + /* Milliseconds elapsed since last delivery. */ + mstime_t elapsed = now - nack->delivery_time; + if (elapsed < 0) elapsed = 0; + addReplyLongLong(c,elapsed); + + /* Number of deliveries. */ + addReplyLongLong(c,nack->delivery_count); + } + raxStop(&ri); + setDeferredArrayLen(c,arraylen_ptr,arraylen); + } +} + +/* XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> + * [IDLE <milliseconds>] [TIME <mstime>] [RETRYCOUNT <count>] + * [FORCE] [JUSTID] + * + * Changes ownership of one or multiple messages in the Pending Entries List + * of a given stream consumer group. + * + * If the message ID (among the specified ones) exists, and its idle + * time greater or equal to <min-idle-time>, then the message new owner + * becomes the specified <consumer>. If the minimum idle time specified + * is zero, messages are claimed regardless of their idle time. + * + * All the messages that cannot be found inside the pending entries list + * are ignored, but in case the FORCE option is used. In that case we + * create the NACK (representing a not yet acknowledged message) entry in + * the consumer group PEL. + * + * This command creates the consumer as side effect if it does not yet + * exists. Moreover the command reset the idle time of the message to 0, + * even if by using the IDLE or TIME options, the user can control the + * new idle time. + * + * The options at the end can be used in order to specify more attributes + * to set in the representation of the pending message: + * + * 1. IDLE <ms>: + * Set the idle time (last time it was delivered) of the message. + * If IDLE is not specified, an IDLE of 0 is assumed, that is, + * the time count is reset because the message has now a new + * owner trying to process it. + * + * 2. TIME <ms-unix-time>: + * This is the same as IDLE but instead of a relative amount of + * milliseconds, it sets the idle time to a specific unix time + * (in milliseconds). This is useful in order to rewrite the AOF + * file generating XCLAIM commands. + * + * 3. RETRYCOUNT <count>: + * Set the retry counter to the specified value. This counter is + * incremented every time a message is delivered again. Normally + * XCLAIM does not alter this counter, which is just served to clients + * when the XPENDING command is called: this way clients can detect + * anomalies, like messages that are never processed for some reason + * after a big number of delivery attempts. + * + * 4. FORCE: + * Creates the pending message entry in the PEL even if certain + * specified IDs are not already in the PEL assigned to a different + * client. However the message must be exist in the stream, otherwise + * the IDs of non existing messages are ignored. + * + * 5. JUSTID: + * Return just an array of IDs of messages successfully claimed, + * without returning the actual message. + * + * 6. LASTID <id>: + * Update the consumer group last ID with the specified ID if the + * current last ID is smaller than the provided one. + * This is used for replication / AOF, so that when we read from a + * consumer group, the XCLAIM that gets propagated to give ownership + * to the consumer, is also used in order to update the group current + * ID. + * + * The command returns an array of messages that the user + * successfully claimed, so that the caller is able to understand + * what messages it is now in charge of. */ +void xclaimCommand(client *c) { + streamCG *group = NULL; + robj *o = lookupKeyRead(c->db,c->argv[1]); + long long minidle; /* Minimum idle time argument. */ + long long retrycount = -1; /* -1 means RETRYCOUNT option not given. */ + mstime_t deliverytime = -1; /* -1 means IDLE/TIME options not given. */ + int force = 0; + int justid = 0; + + if (o) { + if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */ + group = streamLookupCG(o->ptr,c->argv[2]->ptr); + } + + /* No key or group? Send an error given that the group creation + * is mandatory. */ + if (o == NULL || group == NULL) { + addReplyErrorFormat(c,"-NOGROUP No such key '%s' or " + "consumer group '%s'", (char*)c->argv[1]->ptr, + (char*)c->argv[2]->ptr); + return; + } + + if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle, + "Invalid min-idle-time argument for XCLAIM") + != C_OK) return; + if (minidle < 0) minidle = 0; + + /* Start parsing the IDs, so that we abort ASAP if there is a syntax + * error: the return value of this command cannot be an error in case + * the client successfully claimed some message, so it should be + * executed in a "all or nothing" fashion. */ + int j; + streamID static_ids[STREAMID_STATIC_VECTOR_LEN]; + streamID *ids = static_ids; + int id_count = c->argc-5; + if (id_count > STREAMID_STATIC_VECTOR_LEN) + ids = zmalloc(sizeof(streamID)*id_count); + for (j = 5; j < c->argc; j++) { + if (streamParseStrictIDOrReply(NULL,c->argv[j],&ids[j-5],0,NULL) != C_OK) break; + } + int last_id_arg = j-1; /* Next time we iterate the IDs we now the range. */ + + /* If we stopped because some IDs cannot be parsed, perhaps they + * are trailing options. */ + mstime_t now = commandTimeSnapshot(); + streamID last_id = {0,0}; + int propagate_last_id = 0; + for (; j < c->argc; j++) { + int moreargs = (c->argc-1) - j; /* Number of additional arguments. */ + char *opt = c->argv[j]->ptr; + if (!strcasecmp(opt,"FORCE")) { + force = 1; + } else if (!strcasecmp(opt,"JUSTID")) { + justid = 1; + } else if (!strcasecmp(opt,"IDLE") && moreargs) { + j++; + if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime, + "Invalid IDLE option argument for XCLAIM") + != C_OK) goto cleanup; + deliverytime = now - deliverytime; + } else if (!strcasecmp(opt,"TIME") && moreargs) { + j++; + if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime, + "Invalid TIME option argument for XCLAIM") + != C_OK) goto cleanup; + } else if (!strcasecmp(opt,"RETRYCOUNT") && moreargs) { + j++; + if (getLongLongFromObjectOrReply(c,c->argv[j],&retrycount, + "Invalid RETRYCOUNT option argument for XCLAIM") + != C_OK) goto cleanup; + } else if (!strcasecmp(opt,"LASTID") && moreargs) { + j++; + if (streamParseStrictIDOrReply(c,c->argv[j],&last_id,0,NULL) != C_OK) goto cleanup; + } else { + addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt); + goto cleanup; + } + } + + if (streamCompareID(&last_id,&group->last_id) > 0) { + group->last_id = last_id; + propagate_last_id = 1; + } + + if (deliverytime != -1) { + /* If a delivery time was passed, either with IDLE or TIME, we + * do some sanity check on it, and set the deliverytime to now + * (which is a sane choice usually) if the value is bogus. + * To raise an error here is not wise because clients may compute + * the idle time doing some math starting from their local time, + * and this is not a good excuse to fail in case, for instance, + * the computer time is a bit in the future from our POV. */ + if (deliverytime < 0 || deliverytime > now) deliverytime = now; + } else { + /* If no IDLE/TIME option was passed, we want the last delivery + * time to be now, so that the idle time of the message will be + * zero. */ + deliverytime = now; + } + + /* Do the actual claiming. */ + streamConsumer *consumer = streamLookupConsumer(group,c->argv[3]->ptr); + if (consumer == NULL) { + consumer = streamCreateConsumer(group,c->argv[3]->ptr,c->argv[1],c->db->id,SCC_DEFAULT); + } + consumer->seen_time = commandTimeSnapshot(); + + void *arraylenptr = addReplyDeferredLen(c); + size_t arraylen = 0; + for (int j = 5; j <= last_id_arg; j++) { + streamID id = ids[j-5]; + unsigned char buf[sizeof(streamID)]; + streamEncodeID(buf,&id); + + /* Lookup the ID in the group PEL. */ + streamNACK *nack = raxFind(group->pel,buf,sizeof(buf)); + + /* Item must exist for us to transfer it to another consumer. */ + if (!streamEntryExists(o->ptr,&id)) { + /* Clear this entry from the PEL, it no longer exists */ + if (nack != raxNotFound) { + /* Propagate this change (we are going to delete the NACK). */ + streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack); + propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */ + server.dirty++; + /* Release the NACK */ + raxRemove(group->pel,buf,sizeof(buf),NULL); + raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); + streamFreeNACK(nack); + } + continue; + } + + /* If FORCE is passed, let's check if at least the entry + * exists in the Stream. In such case, we'll create a new + * entry in the PEL from scratch, so that XCLAIM can also + * be used to create entries in the PEL. Useful for AOF + * and replication of consumer groups. */ + if (force && nack == raxNotFound) { + /* Create the NACK. */ + nack = streamCreateNACK(NULL); + raxInsert(group->pel,buf,sizeof(buf),nack,NULL); + } + + if (nack != raxNotFound) { + /* We need to check if the minimum idle time requested + * by the caller is satisfied by this entry. + * + * Note that the nack could be created by FORCE, in this + * case there was no pre-existing entry and minidle should + * be ignored, but in that case nack->consumer is NULL. */ + if (nack->consumer && minidle) { + mstime_t this_idle = now - nack->delivery_time; + if (this_idle < minidle) continue; + } + + if (nack->consumer != consumer) { + /* Remove the entry from the old consumer. + * Note that nack->consumer is NULL if we created the + * NACK above because of the FORCE option. */ + if (nack->consumer) + raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); + } + nack->delivery_time = deliverytime; + /* Set the delivery attempts counter if given, otherwise + * autoincrement unless JUSTID option provided */ + if (retrycount >= 0) { + nack->delivery_count = retrycount; + } else if (!justid) { + nack->delivery_count++; + } + if (nack->consumer != consumer) { + /* Add the entry in the new consumer local PEL. */ + raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); + nack->consumer = consumer; + } + /* Send the reply for this entry. */ + if (justid) { + addReplyStreamID(c,&id); + } else { + serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL) == 1); + } + arraylen++; + + consumer->active_time = commandTimeSnapshot(); + + /* Propagate this change. */ + streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack); + propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */ + server.dirty++; + } + } + if (propagate_last_id) { + streamPropagateGroupID(c,c->argv[1],group,c->argv[2]); + server.dirty++; + } + setDeferredArrayLen(c,arraylenptr,arraylen); + preventCommandPropagation(c); +cleanup: + if (ids != static_ids) zfree(ids); +} + +/* XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT <count>] [JUSTID] + * + * Changes ownership of one or multiple messages in the Pending Entries List + * of a given stream consumer group. + * + * For each PEL entry, if its idle time greater or equal to <min-idle-time>, + * then the message new owner becomes the specified <consumer>. + * If the minimum idle time specified is zero, messages are claimed + * regardless of their idle time. + * + * This command creates the consumer as side effect if it does not yet + * exists. Moreover the command reset the idle time of the message to 0. + * + * The command returns an array of messages that the user + * successfully claimed, so that the caller is able to understand + * what messages it is now in charge of. */ +void xautoclaimCommand(client *c) { + streamCG *group = NULL; + robj *o = lookupKeyRead(c->db,c->argv[1]); + long long minidle; /* Minimum idle time argument, in milliseconds. */ + long count = 100; /* Maximum entries to claim. */ + const unsigned attempts_factor = 10; + streamID startid; + int startex; + int justid = 0; + + /* Parse idle/start/end/count arguments ASAP if needed, in order to report + * syntax errors before any other error. */ + if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle,"Invalid min-idle-time argument for XAUTOCLAIM") != C_OK) + return; + if (minidle < 0) minidle = 0; + + if (streamParseIntervalIDOrReply(c,c->argv[5],&startid,&startex,0) != C_OK) + return; + if (startex && streamIncrID(&startid) != C_OK) { + addReplyError(c,"invalid start ID for the interval"); + return; + } + + int j = 6; /* options start at argv[6] */ + while(j < c->argc) { + int moreargs = (c->argc-1) - j; /* Number of additional arguments. */ + char *opt = c->argv[j]->ptr; + if (!strcasecmp(opt,"COUNT") && moreargs) { + long max_count = LONG_MAX / (max(sizeof(streamID), attempts_factor)); + if (getRangeLongFromObjectOrReply(c,c->argv[j+1],1,max_count,&count,"COUNT must be > 0") != C_OK) + return; + j++; + } else if (!strcasecmp(opt,"JUSTID")) { + justid = 1; + } else { + addReplyErrorObject(c,shared.syntaxerr); + return; + } + j++; + } + + if (o) { + if (checkType(c,o,OBJ_STREAM)) + return; /* Type error. */ + group = streamLookupCG(o->ptr,c->argv[2]->ptr); + } + + /* No key or group? Send an error given that the group creation + * is mandatory. */ + if (o == NULL || group == NULL) { + addReplyErrorFormat(c,"-NOGROUP No such key '%s' or consumer group '%s'", + (char*)c->argv[1]->ptr, + (char*)c->argv[2]->ptr); + return; + } + + streamID *deleted_ids = ztrymalloc(count * sizeof(streamID)); + if (!deleted_ids) { + addReplyError(c, "Insufficient memory, failed allocating transient memory, COUNT too high."); + return; + } + + /* Do the actual claiming. */ + streamConsumer *consumer = streamLookupConsumer(group,c->argv[3]->ptr); + if (consumer == NULL) { + consumer = streamCreateConsumer(group,c->argv[3]->ptr,c->argv[1],c->db->id,SCC_DEFAULT); + } + consumer->seen_time = commandTimeSnapshot(); + + long long attempts = count * attempts_factor; + + addReplyArrayLen(c, 3); /* We add another reply later */ + void *endidptr = addReplyDeferredLen(c); /* reply[0] */ + void *arraylenptr = addReplyDeferredLen(c); /* reply[1] */ + + unsigned char startkey[sizeof(streamID)]; + streamEncodeID(startkey,&startid); + raxIterator ri; + raxStart(&ri,group->pel); + raxSeek(&ri,">=",startkey,sizeof(startkey)); + size_t arraylen = 0; + mstime_t now = commandTimeSnapshot(); + int deleted_id_num = 0; + while (attempts-- && count && raxNext(&ri)) { + streamNACK *nack = ri.data; + + streamID id; + streamDecodeID(ri.key, &id); + + /* Item must exist for us to transfer it to another consumer. */ + if (!streamEntryExists(o->ptr,&id)) { + /* Propagate this change (we are going to delete the NACK). */ + robj *idstr = createObjectFromStreamID(&id); + streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],idstr,nack); + decrRefCount(idstr); + server.dirty++; + /* Clear this entry from the PEL, it no longer exists */ + raxRemove(group->pel,ri.key,ri.key_len,NULL); + raxRemove(nack->consumer->pel,ri.key,ri.key_len,NULL); + streamFreeNACK(nack); + /* Remember the ID for later */ + deleted_ids[deleted_id_num++] = id; + raxSeek(&ri,">=",ri.key,ri.key_len); + count--; /* Count is a limit of the command response size. */ + continue; + } + + if (minidle) { + mstime_t this_idle = now - nack->delivery_time; + if (this_idle < minidle) + continue; + } + + if (nack->consumer != consumer) { + /* Remove the entry from the old consumer. + * Note that nack->consumer is NULL if we created the + * NACK above because of the FORCE option. */ + if (nack->consumer) + raxRemove(nack->consumer->pel,ri.key,ri.key_len,NULL); + } + + /* Update the consumer and idle time. */ + nack->delivery_time = now; + /* Increment the delivery attempts counter unless JUSTID option provided */ + if (!justid) + nack->delivery_count++; + + if (nack->consumer != consumer) { + /* Add the entry in the new consumer local PEL. */ + raxInsert(consumer->pel,ri.key,ri.key_len,nack,NULL); + nack->consumer = consumer; + } + + /* Send the reply for this entry. */ + if (justid) { + addReplyStreamID(c,&id); + } else { + serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL) == 1); + } + arraylen++; + count--; + + consumer->active_time = commandTimeSnapshot(); + + /* Propagate this change. */ + robj *idstr = createObjectFromStreamID(&id); + streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],idstr,nack); + decrRefCount(idstr); + server.dirty++; + } + + /* We need to return the next entry as a cursor for the next XAUTOCLAIM call */ + raxNext(&ri); + + streamID endid; + if (raxEOF(&ri)) { + endid.ms = endid.seq = 0; + } else { + streamDecodeID(ri.key, &endid); + } + raxStop(&ri); + + setDeferredArrayLen(c,arraylenptr,arraylen); + setDeferredReplyStreamID(c,endidptr,&endid); + + addReplyArrayLen(c, deleted_id_num); /* reply[2] */ + for (int i = 0; i < deleted_id_num; i++) { + addReplyStreamID(c, &deleted_ids[i]); + } + zfree(deleted_ids); + + preventCommandPropagation(c); +} + +/* XDEL <key> [<ID1> <ID2> ... <IDN>] + * + * Removes the specified entries from the stream. Returns the number + * of items actually deleted, that may be different from the number + * of IDs passed in case certain IDs do not exist. */ +void xdelCommand(client *c) { + robj *o; + + if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL + || checkType(c,o,OBJ_STREAM)) return; + stream *s = o->ptr; + + /* We need to sanity check the IDs passed to start. Even if not + * a big issue, it is not great that the command is only partially + * executed because at some point an invalid ID is parsed. */ + streamID static_ids[STREAMID_STATIC_VECTOR_LEN]; + streamID *ids = static_ids; + int id_count = c->argc-2; + if (id_count > STREAMID_STATIC_VECTOR_LEN) + ids = zmalloc(sizeof(streamID)*id_count); + for (int j = 2; j < c->argc; j++) { + if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-2],0,NULL) != C_OK) goto cleanup; + } + + /* Actually apply the command. */ + int deleted = 0; + int first_entry = 0; + for (int j = 2; j < c->argc; j++) { + streamID *id = &ids[j-2]; + if (streamDeleteItem(s,id)) { + /* We want to know if the first entry in the stream was deleted + * so we can later set the new one. */ + if (streamCompareID(id,&s->first_id) == 0) { + first_entry = 1; + } + /* Update the stream's maximal tombstone if needed. */ + if (streamCompareID(id,&s->max_deleted_entry_id) > 0) { + s->max_deleted_entry_id = *id; + } + deleted++; + }; + } + + /* Update the stream's first ID. */ + if (deleted) { + if (s->length == 0) { + s->first_id.ms = 0; + s->first_id.seq = 0; + } else if (first_entry) { + streamGetEdgeID(s,1,1,&s->first_id); + } + } + + /* Propagate the write if needed. */ + if (deleted) { + signalModifiedKey(c,c->db,c->argv[1]); + notifyKeyspaceEvent(NOTIFY_STREAM,"xdel",c->argv[1],c->db->id); + server.dirty += deleted; + } + addReplyLongLong(c,deleted); +cleanup: + if (ids != static_ids) zfree(ids); +} + +/* General form: XTRIM <key> [... options ...] + * + * List of options: + * + * Trim strategies: + * + * MAXLEN [~|=] <count> -- Trim so that the stream will be capped at + * the specified length. Use ~ before the + * count in order to demand approximated trimming + * (like XADD MAXLEN option). + * MINID [~|=] <id> -- Trim so that the stream will not contain entries + * with IDs smaller than 'id'. Use ~ before the + * count in order to demand approximated trimming + * (like XADD MINID option). + * + * Other options: + * + * LIMIT <entries> -- The maximum number of entries to trim. + * 0 means unlimited. Unless specified, it is set + * to a default of 100*server.stream_node_max_entries, + * and that's in order to keep the trimming time sane. + * Has meaning only if `~` was provided. + */ +void xtrimCommand(client *c) { + robj *o; + + /* Argument parsing. */ + streamAddTrimArgs parsed_args; + if (streamParseAddOrTrimArgsOrReply(c, &parsed_args, 0) < 0) + return; /* streamParseAddOrTrimArgsOrReply already replied. */ + + /* If the key does not exist, we are ok returning zero, that is, the + * number of elements removed from the stream. */ + if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL + || checkType(c,o,OBJ_STREAM)) return; + stream *s = o->ptr; + + /* Perform the trimming. */ + int64_t deleted = streamTrim(s, &parsed_args); + if (deleted) { + notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id); + if (parsed_args.approx_trim) { + /* In case our trimming was limited (by LIMIT or by ~) we must + * re-write the relevant trim argument to make sure there will be + * no inconsistencies in AOF loading or in the replica. + * It's enough to check only args->approx because there is no + * way LIMIT is given without the ~ option. */ + streamRewriteApproxSpecifier(c,parsed_args.trim_strategy_arg_idx-1); + streamRewriteTrimArgument(c,s,parsed_args.trim_strategy,parsed_args.trim_strategy_arg_idx); + } + + /* Propagate the write. */ + signalModifiedKey(c, c->db,c->argv[1]); + server.dirty += deleted; + } + addReplyLongLong(c,deleted); +} + +/* Helper function for xinfoCommand. + * Handles the variants of XINFO STREAM */ +void xinfoReplyWithStreamInfo(client *c, stream *s) { + int full = 1; + long long count = 10; /* Default COUNT is 10 so we don't block the server */ + robj **optv = c->argv + 3; /* Options start after XINFO STREAM <key> */ + int optc = c->argc - 3; + + /* Parse options. */ + if (optc == 0) { + full = 0; + } else { + /* Valid options are [FULL] or [FULL COUNT <count>] */ + if (optc != 1 && optc != 3) { + addReplySubcommandSyntaxError(c); + return; + } + + /* First option must be "FULL" */ + if (strcasecmp(optv[0]->ptr,"full")) { + addReplySubcommandSyntaxError(c); + return; + } + + if (optc == 3) { + /* First option must be "FULL" */ + if (strcasecmp(optv[1]->ptr,"count")) { + addReplySubcommandSyntaxError(c); + return; + } + if (getLongLongFromObjectOrReply(c,optv[2],&count,NULL) == C_ERR) + return; + if (count < 0) count = 10; + } + } + + addReplyMapLen(c,full ? 9 : 10); + addReplyBulkCString(c,"length"); + addReplyLongLong(c,s->length); + addReplyBulkCString(c,"radix-tree-keys"); + addReplyLongLong(c,raxSize(s->rax)); + addReplyBulkCString(c,"radix-tree-nodes"); + addReplyLongLong(c,s->rax->numnodes); + addReplyBulkCString(c,"last-generated-id"); + addReplyStreamID(c,&s->last_id); + addReplyBulkCString(c,"max-deleted-entry-id"); + addReplyStreamID(c,&s->max_deleted_entry_id); + addReplyBulkCString(c,"entries-added"); + addReplyLongLong(c,s->entries_added); + addReplyBulkCString(c,"recorded-first-entry-id"); + addReplyStreamID(c,&s->first_id); + + if (!full) { + /* XINFO STREAM <key> */ + + addReplyBulkCString(c,"groups"); + addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0); + + /* To emit the first/last entry we use streamReplyWithRange(). */ + int emitted; + streamID start, end; + start.ms = start.seq = 0; + end.ms = end.seq = UINT64_MAX; + addReplyBulkCString(c,"first-entry"); + emitted = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL, + STREAM_RWR_RAWENTRIES,NULL); + if (!emitted) addReplyNull(c); + addReplyBulkCString(c,"last-entry"); + emitted = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL, + STREAM_RWR_RAWENTRIES,NULL); + if (!emitted) addReplyNull(c); + } else { + /* XINFO STREAM <key> FULL [COUNT <count>] */ + + /* Stream entries */ + addReplyBulkCString(c,"entries"); + streamReplyWithRange(c,s,NULL,NULL,count,0,NULL,NULL,0,NULL); + + /* Consumer groups */ + addReplyBulkCString(c,"groups"); + if (s->cgroups == NULL) { + addReplyArrayLen(c,0); + } else { + addReplyArrayLen(c,raxSize(s->cgroups)); + raxIterator ri_cgroups; + raxStart(&ri_cgroups,s->cgroups); + raxSeek(&ri_cgroups,"^",NULL,0); + while(raxNext(&ri_cgroups)) { + streamCG *cg = ri_cgroups.data; + addReplyMapLen(c,7); + + /* Name */ + addReplyBulkCString(c,"name"); + addReplyBulkCBuffer(c,ri_cgroups.key,ri_cgroups.key_len); + + /* Last delivered ID */ + addReplyBulkCString(c,"last-delivered-id"); + addReplyStreamID(c,&cg->last_id); + + /* Read counter of the last delivered ID */ + addReplyBulkCString(c,"entries-read"); + if (cg->entries_read != SCG_INVALID_ENTRIES_READ) { + addReplyLongLong(c,cg->entries_read); + } else { + addReplyNull(c); + } + + /* Group lag */ + addReplyBulkCString(c,"lag"); + streamReplyWithCGLag(c,s,cg); + + /* Group PEL count */ + addReplyBulkCString(c,"pel-count"); + addReplyLongLong(c,raxSize(cg->pel)); + + /* Group PEL */ + addReplyBulkCString(c,"pending"); + long long arraylen_cg_pel = 0; + void *arrayptr_cg_pel = addReplyDeferredLen(c); + raxIterator ri_cg_pel; + raxStart(&ri_cg_pel,cg->pel); + raxSeek(&ri_cg_pel,"^",NULL,0); + while(raxNext(&ri_cg_pel) && (!count || arraylen_cg_pel < count)) { + streamNACK *nack = ri_cg_pel.data; + addReplyArrayLen(c,4); + + /* Entry ID. */ + streamID id; + streamDecodeID(ri_cg_pel.key,&id); + addReplyStreamID(c,&id); + + /* Consumer name. */ + serverAssert(nack->consumer); /* assertion for valgrind (avoid NPD) */ + addReplyBulkCBuffer(c,nack->consumer->name, + sdslen(nack->consumer->name)); + + /* Last delivery. */ + addReplyLongLong(c,nack->delivery_time); + + /* Number of deliveries. */ + addReplyLongLong(c,nack->delivery_count); + + arraylen_cg_pel++; + } + setDeferredArrayLen(c,arrayptr_cg_pel,arraylen_cg_pel); + raxStop(&ri_cg_pel); + + /* Consumers */ + addReplyBulkCString(c,"consumers"); + addReplyArrayLen(c,raxSize(cg->consumers)); + raxIterator ri_consumers; + raxStart(&ri_consumers,cg->consumers); + raxSeek(&ri_consumers,"^",NULL,0); + while(raxNext(&ri_consumers)) { + streamConsumer *consumer = ri_consumers.data; + addReplyMapLen(c,5); + + /* Consumer name */ + addReplyBulkCString(c,"name"); + addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name)); + + /* Seen-time */ + addReplyBulkCString(c,"seen-time"); + addReplyLongLong(c,consumer->seen_time); + + /* Active-time */ + addReplyBulkCString(c,"active-time"); + addReplyLongLong(c,consumer->active_time); + + /* Consumer PEL count */ + addReplyBulkCString(c,"pel-count"); + addReplyLongLong(c,raxSize(consumer->pel)); + + /* Consumer PEL */ + addReplyBulkCString(c,"pending"); + long long arraylen_cpel = 0; + void *arrayptr_cpel = addReplyDeferredLen(c); + raxIterator ri_cpel; + raxStart(&ri_cpel,consumer->pel); + raxSeek(&ri_cpel,"^",NULL,0); + while(raxNext(&ri_cpel) && (!count || arraylen_cpel < count)) { + streamNACK *nack = ri_cpel.data; + addReplyArrayLen(c,3); + + /* Entry ID. */ + streamID id; + streamDecodeID(ri_cpel.key,&id); + addReplyStreamID(c,&id); + + /* Last delivery. */ + addReplyLongLong(c,nack->delivery_time); + + /* Number of deliveries. */ + addReplyLongLong(c,nack->delivery_count); + + arraylen_cpel++; + } + setDeferredArrayLen(c,arrayptr_cpel,arraylen_cpel); + raxStop(&ri_cpel); + } + raxStop(&ri_consumers); + } + raxStop(&ri_cgroups); + } + } +} + +/* XINFO CONSUMERS <key> <group> + * XINFO GROUPS <key> + * XINFO STREAM <key> [FULL [COUNT <count>]] + * XINFO HELP. */ +void xinfoCommand(client *c) { + stream *s = NULL; + char *opt; + robj *key; + + /* HELP is special. Handle it ASAP. */ + if (!strcasecmp(c->argv[1]->ptr,"HELP")) { + const char *help[] = { +"CONSUMERS <key> <groupname>", +" Show consumers of <groupname>.", +"GROUPS <key>", +" Show the stream consumer groups.", +"STREAM <key> [FULL [COUNT <count>]", +" Show information about the stream.", +NULL + }; + addReplyHelp(c, help); + return; + } + + /* With the exception of HELP handled before any other sub commands, all + * the ones are in the form of "<subcommand> <key>". */ + opt = c->argv[1]->ptr; + key = c->argv[2]; + + /* Lookup the key now, this is common for all the subcommands but HELP. */ + robj *o = lookupKeyReadOrReply(c,key,shared.nokeyerr); + if (o == NULL || checkType(c,o,OBJ_STREAM)) return; + s = o->ptr; + + /* Dispatch the different subcommands. */ + if (!strcasecmp(opt,"CONSUMERS") && c->argc == 4) { + /* XINFO CONSUMERS <key> <group>. */ + streamCG *cg = streamLookupCG(s,c->argv[3]->ptr); + if (cg == NULL) { + addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' " + "for key name '%s'", + (char*)c->argv[3]->ptr, (char*)key->ptr); + return; + } + + addReplyArrayLen(c,raxSize(cg->consumers)); + raxIterator ri; + raxStart(&ri,cg->consumers); + raxSeek(&ri,"^",NULL,0); + mstime_t now = commandTimeSnapshot(); + while(raxNext(&ri)) { + streamConsumer *consumer = ri.data; + mstime_t inactive = consumer->active_time != -1 ? now - consumer->active_time : consumer->active_time; + mstime_t idle = now - consumer->seen_time; + if (idle < 0) idle = 0; + + addReplyMapLen(c,4); + addReplyBulkCString(c,"name"); + addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name)); + addReplyBulkCString(c,"pending"); + addReplyLongLong(c,raxSize(consumer->pel)); + addReplyBulkCString(c,"idle"); + addReplyLongLong(c,idle); + addReplyBulkCString(c,"inactive"); + addReplyLongLong(c,inactive); + } + raxStop(&ri); + } else if (!strcasecmp(opt,"GROUPS") && c->argc == 3) { + /* XINFO GROUPS <key>. */ + if (s->cgroups == NULL) { + addReplyArrayLen(c,0); + return; + } + + addReplyArrayLen(c,raxSize(s->cgroups)); + raxIterator ri; + raxStart(&ri,s->cgroups); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + streamCG *cg = ri.data; + addReplyMapLen(c,6); + addReplyBulkCString(c,"name"); + addReplyBulkCBuffer(c,ri.key,ri.key_len); + addReplyBulkCString(c,"consumers"); + addReplyLongLong(c,raxSize(cg->consumers)); + addReplyBulkCString(c,"pending"); + addReplyLongLong(c,raxSize(cg->pel)); + addReplyBulkCString(c,"last-delivered-id"); + addReplyStreamID(c,&cg->last_id); + addReplyBulkCString(c,"entries-read"); + if (cg->entries_read != SCG_INVALID_ENTRIES_READ) { + addReplyLongLong(c,cg->entries_read); + } else { + addReplyNull(c); + } + addReplyBulkCString(c,"lag"); + streamReplyWithCGLag(c,s,cg); + } + raxStop(&ri); + } else if (!strcasecmp(opt,"STREAM")) { + /* XINFO STREAM <key> [FULL [COUNT <count>]]. */ + xinfoReplyWithStreamInfo(c,s); + } else { + addReplySubcommandSyntaxError(c); + } +} + +/* Validate the integrity stream listpack entries structure. Both in term of a + * valid listpack, but also that the structure of the entries matches a valid + * stream. return 1 if valid 0 if not valid. */ +int streamValidateListpackIntegrity(unsigned char *lp, size_t size, int deep) { + int valid_record; + unsigned char *p, *next; + + /* Since we don't want to run validation of all records twice, we'll + * run the listpack validation of just the header and do the rest here. */ + if (!lpValidateIntegrity(lp, size, 0, NULL, NULL)) + return 0; + + /* In non-deep mode we just validated the listpack header (encoded size) */ + if (!deep) return 1; + + next = p = lpValidateFirst(lp); + if (!lpValidateNext(lp, &next, size)) return 0; + if (!p) return 0; + + /* entry count */ + int64_t entry_count = lpGetIntegerIfValid(p, &valid_record); + if (!valid_record) return 0; + p = next; if (!lpValidateNext(lp, &next, size)) return 0; + + /* deleted */ + int64_t deleted_count = lpGetIntegerIfValid(p, &valid_record); + if (!valid_record) return 0; + p = next; if (!lpValidateNext(lp, &next, size)) return 0; + + /* num-of-fields */ + int64_t master_fields = lpGetIntegerIfValid(p, &valid_record); + if (!valid_record) return 0; + p = next; if (!lpValidateNext(lp, &next, size)) return 0; + + /* the field names */ + for (int64_t j = 0; j < master_fields; j++) { + p = next; if (!lpValidateNext(lp, &next, size)) return 0; + } + + /* the zero master entry terminator. */ + int64_t zero = lpGetIntegerIfValid(p, &valid_record); + if (!valid_record || zero != 0) return 0; + p = next; if (!lpValidateNext(lp, &next, size)) return 0; + + entry_count += deleted_count; + while (entry_count--) { + if (!p) return 0; + int64_t fields = master_fields, extra_fields = 3; + int64_t flags = lpGetIntegerIfValid(p, &valid_record); + if (!valid_record) return 0; + p = next; if (!lpValidateNext(lp, &next, size)) return 0; + + /* entry id */ + lpGetIntegerIfValid(p, &valid_record); + if (!valid_record) return 0; + p = next; if (!lpValidateNext(lp, &next, size)) return 0; + lpGetIntegerIfValid(p, &valid_record); + if (!valid_record) return 0; + p = next; if (!lpValidateNext(lp, &next, size)) return 0; + + if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) { + /* num-of-fields */ + fields = lpGetIntegerIfValid(p, &valid_record); + if (!valid_record) return 0; + p = next; if (!lpValidateNext(lp, &next, size)) return 0; + + /* the field names */ + for (int64_t j = 0; j < fields; j++) { + p = next; if (!lpValidateNext(lp, &next, size)) return 0; + } + + extra_fields += fields + 1; + } + + /* the values */ + for (int64_t j = 0; j < fields; j++) { + p = next; if (!lpValidateNext(lp, &next, size)) return 0; + } + + /* lp-count */ + int64_t lp_count = lpGetIntegerIfValid(p, &valid_record); + if (!valid_record) return 0; + if (lp_count != fields + extra_fields) return 0; + p = next; if (!lpValidateNext(lp, &next, size)) return 0; + } + + if (next) + return 0; + + return 1; +} |