diff options
Diffstat (limited to 'src/t_zset.c')
-rw-r--r-- | src/t_zset.c | 4460 |
1 files changed, 4460 insertions, 0 deletions
diff --git a/src/t_zset.c b/src/t_zset.c new file mode 100644 index 0000000..7717a4a --- /dev/null +++ b/src/t_zset.c @@ -0,0 +1,4460 @@ +/* + * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> + * Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com> + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/*----------------------------------------------------------------------------- + * Sorted set API + *----------------------------------------------------------------------------*/ + +/* ZSETs are ordered sets using two data structures to hold the same elements + * in order to get O(log(N)) INSERT and REMOVE operations into a sorted + * data structure. + * + * The elements are added to a hash table mapping Redis objects to scores. + * At the same time the elements are added to a skip list mapping scores + * to Redis objects (so objects are sorted by scores in this "view"). + * + * Note that the SDS string representing the element is the same in both + * the hash table and skiplist in order to save memory. What we do in order + * to manage the shared SDS string more easily is to free the SDS string + * only in zslFreeNode(). The dictionary has no value free method set. + * So we should always remove an element from the dictionary, and later from + * the skiplist. + * + * This skiplist implementation is almost a C translation of the original + * algorithm described by William Pugh in "Skip Lists: A Probabilistic + * Alternative to Balanced Trees", modified in three ways: + * a) this implementation allows for repeated scores. + * b) the comparison is not just by key (our 'score') but by satellite data. + * c) there is a back pointer, so it's a doubly linked list with the back + * pointers being only at "level 1". This allows to traverse the list + * from tail to head, useful for ZREVRANGE. */ + +#include "server.h" +#include "intset.h" /* Compact integer set structure */ +#include <math.h> + +/*----------------------------------------------------------------------------- + * Skiplist implementation of the low level API + *----------------------------------------------------------------------------*/ + +int zslLexValueGteMin(sds value, zlexrangespec *spec); +int zslLexValueLteMax(sds value, zlexrangespec *spec); +void zsetConvertAndExpand(robj *zobj, int encoding, unsigned long cap); + +/* Create a skiplist node with the specified number of levels. + * The SDS string 'ele' is referenced by the node after the call. */ +zskiplistNode *zslCreateNode(int level, double score, sds ele) { + zskiplistNode *zn = + zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel)); + zn->score = score; + zn->ele = ele; + return zn; +} + +/* Create a new skiplist. */ +zskiplist *zslCreate(void) { + int j; + zskiplist *zsl; + + zsl = zmalloc(sizeof(*zsl)); + zsl->level = 1; + zsl->length = 0; + zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL); + for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) { + zsl->header->level[j].forward = NULL; + zsl->header->level[j].span = 0; + } + zsl->header->backward = NULL; + zsl->tail = NULL; + return zsl; +} + +/* Free the specified skiplist node. The referenced SDS string representation + * of the element is freed too, unless node->ele is set to NULL before calling + * this function. */ +void zslFreeNode(zskiplistNode *node) { + sdsfree(node->ele); + zfree(node); +} + +/* Free a whole skiplist. */ +void zslFree(zskiplist *zsl) { + zskiplistNode *node = zsl->header->level[0].forward, *next; + + zfree(zsl->header); + while(node) { + next = node->level[0].forward; + zslFreeNode(node); + node = next; + } + zfree(zsl); +} + +/* Returns a random level for the new skiplist node we are going to create. + * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL + * (both inclusive), with a powerlaw-alike distribution where higher + * levels are less likely to be returned. */ +int zslRandomLevel(void) { + static const int threshold = ZSKIPLIST_P*RAND_MAX; + int level = 1; + while (random() < threshold) + level += 1; + return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL; +} + +/* Insert a new node in the skiplist. Assumes the element does not already + * exist (up to the caller to enforce that). The skiplist takes ownership + * of the passed SDS string 'ele'. */ +zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) { + zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; + unsigned long rank[ZSKIPLIST_MAXLEVEL]; + int i, level; + + serverAssert(!isnan(score)); + x = zsl->header; + for (i = zsl->level-1; i >= 0; i--) { + /* store rank that is crossed to reach the insert position */ + rank[i] = i == (zsl->level-1) ? 0 : rank[i+1]; + while (x->level[i].forward && + (x->level[i].forward->score < score || + (x->level[i].forward->score == score && + sdscmp(x->level[i].forward->ele,ele) < 0))) + { + rank[i] += x->level[i].span; + x = x->level[i].forward; + } + update[i] = x; + } + /* we assume the element is not already inside, since we allow duplicated + * scores, reinserting the same element should never happen since the + * caller of zslInsert() should test in the hash table if the element is + * already inside or not. */ + level = zslRandomLevel(); + if (level > zsl->level) { + for (i = zsl->level; i < level; i++) { + rank[i] = 0; + update[i] = zsl->header; + update[i]->level[i].span = zsl->length; + } + zsl->level = level; + } + x = zslCreateNode(level,score,ele); + for (i = 0; i < level; i++) { + x->level[i].forward = update[i]->level[i].forward; + update[i]->level[i].forward = x; + + /* update span covered by update[i] as x is inserted here */ + x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]); + update[i]->level[i].span = (rank[0] - rank[i]) + 1; + } + + /* increment span for untouched levels */ + for (i = level; i < zsl->level; i++) { + update[i]->level[i].span++; + } + + x->backward = (update[0] == zsl->header) ? NULL : update[0]; + if (x->level[0].forward) + x->level[0].forward->backward = x; + else + zsl->tail = x; + zsl->length++; + return x; +} + +/* Internal function used by zslDelete, zslDeleteRangeByScore and + * zslDeleteRangeByRank. */ +void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) { + int i; + for (i = 0; i < zsl->level; i++) { + if (update[i]->level[i].forward == x) { + update[i]->level[i].span += x->level[i].span - 1; + update[i]->level[i].forward = x->level[i].forward; + } else { + update[i]->level[i].span -= 1; + } + } + if (x->level[0].forward) { + x->level[0].forward->backward = x->backward; + } else { + zsl->tail = x->backward; + } + while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL) + zsl->level--; + zsl->length--; +} + +/* Delete an element with matching score/element from the skiplist. + * The function returns 1 if the node was found and deleted, otherwise + * 0 is returned. + * + * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise + * it is not freed (but just unlinked) and *node is set to the node pointer, + * so that it is possible for the caller to reuse the node (including the + * referenced SDS string at node->ele). */ +int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) { + zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; + int i; + + x = zsl->header; + for (i = zsl->level-1; i >= 0; i--) { + while (x->level[i].forward && + (x->level[i].forward->score < score || + (x->level[i].forward->score == score && + sdscmp(x->level[i].forward->ele,ele) < 0))) + { + x = x->level[i].forward; + } + update[i] = x; + } + /* We may have multiple elements with the same score, what we need + * is to find the element with both the right score and object. */ + x = x->level[0].forward; + if (x && score == x->score && sdscmp(x->ele,ele) == 0) { + zslDeleteNode(zsl, x, update); + if (!node) + zslFreeNode(x); + else + *node = x; + return 1; + } + return 0; /* not found */ +} + +/* Update the score of an element inside the sorted set skiplist. + * Note that the element must exist and must match 'score'. + * This function does not update the score in the hash table side, the + * caller should take care of it. + * + * Note that this function attempts to just update the node, in case after + * the score update, the node would be exactly at the same position. + * Otherwise the skiplist is modified by removing and re-adding a new + * element, which is more costly. + * + * The function returns the updated element skiplist node pointer. */ +zskiplistNode *zslUpdateScore(zskiplist *zsl, double curscore, sds ele, double newscore) { + zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; + int i; + + /* We need to seek to element to update to start: this is useful anyway, + * we'll have to update or remove it. */ + x = zsl->header; + for (i = zsl->level-1; i >= 0; i--) { + while (x->level[i].forward && + (x->level[i].forward->score < curscore || + (x->level[i].forward->score == curscore && + sdscmp(x->level[i].forward->ele,ele) < 0))) + { + x = x->level[i].forward; + } + update[i] = x; + } + + /* Jump to our element: note that this function assumes that the + * element with the matching score exists. */ + x = x->level[0].forward; + serverAssert(x && curscore == x->score && sdscmp(x->ele,ele) == 0); + + /* If the node, after the score update, would be still exactly + * at the same position, we can just update the score without + * actually removing and re-inserting the element in the skiplist. */ + if ((x->backward == NULL || x->backward->score < newscore) && + (x->level[0].forward == NULL || x->level[0].forward->score > newscore)) + { + x->score = newscore; + return x; + } + + /* No way to reuse the old node: we need to remove and insert a new + * one at a different place. */ + zslDeleteNode(zsl, x, update); + zskiplistNode *newnode = zslInsert(zsl,newscore,x->ele); + /* We reused the old node x->ele SDS string, free the node now + * since zslInsert created a new one. */ + x->ele = NULL; + zslFreeNode(x); + return newnode; +} + +int zslValueGteMin(double value, zrangespec *spec) { + return spec->minex ? (value > spec->min) : (value >= spec->min); +} + +int zslValueLteMax(double value, zrangespec *spec) { + return spec->maxex ? (value < spec->max) : (value <= spec->max); +} + +/* Returns if there is a part of the zset is in range. */ +int zslIsInRange(zskiplist *zsl, zrangespec *range) { + zskiplistNode *x; + + /* Test for ranges that will always be empty. */ + if (range->min > range->max || + (range->min == range->max && (range->minex || range->maxex))) + return 0; + x = zsl->tail; + if (x == NULL || !zslValueGteMin(x->score,range)) + return 0; + x = zsl->header->level[0].forward; + if (x == NULL || !zslValueLteMax(x->score,range)) + return 0; + return 1; +} + +/* Find the first node that is contained in the specified range. + * Returns NULL when no element is contained in the range. */ +zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) { + zskiplistNode *x; + int i; + + /* If everything is out of range, return early. */ + if (!zslIsInRange(zsl,range)) return NULL; + + x = zsl->header; + for (i = zsl->level-1; i >= 0; i--) { + /* Go forward while *OUT* of range. */ + while (x->level[i].forward && + !zslValueGteMin(x->level[i].forward->score,range)) + x = x->level[i].forward; + } + + /* This is an inner range, so the next node cannot be NULL. */ + x = x->level[0].forward; + serverAssert(x != NULL); + + /* Check if score <= max. */ + if (!zslValueLteMax(x->score,range)) return NULL; + return x; +} + +/* Find the last node that is contained in the specified range. + * Returns NULL when no element is contained in the range. */ +zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec *range) { + zskiplistNode *x; + int i; + + /* If everything is out of range, return early. */ + if (!zslIsInRange(zsl,range)) return NULL; + + x = zsl->header; + for (i = zsl->level-1; i >= 0; i--) { + /* Go forward while *IN* range. */ + while (x->level[i].forward && + zslValueLteMax(x->level[i].forward->score,range)) + x = x->level[i].forward; + } + + /* This is an inner range, so this node cannot be NULL. */ + serverAssert(x != NULL); + + /* Check if score >= min. */ + if (!zslValueGteMin(x->score,range)) return NULL; + return x; +} + +/* Delete all the elements with score between min and max from the skiplist. + * Both min and max can be inclusive or exclusive (see range->minex and + * range->maxex). When inclusive a score >= min && score <= max is deleted. + * Note that this function takes the reference to the hash table view of the + * sorted set, in order to remove the elements from the hash table too. */ +unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec *range, dict *dict) { + zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; + unsigned long removed = 0; + int i; + + x = zsl->header; + for (i = zsl->level-1; i >= 0; i--) { + while (x->level[i].forward && + !zslValueGteMin(x->level[i].forward->score, range)) + x = x->level[i].forward; + update[i] = x; + } + + /* Current node is the last with score < or <= min. */ + x = x->level[0].forward; + + /* Delete nodes while in range. */ + while (x && zslValueLteMax(x->score, range)) { + zskiplistNode *next = x->level[0].forward; + zslDeleteNode(zsl,x,update); + dictDelete(dict,x->ele); + zslFreeNode(x); /* Here is where x->ele is actually released. */ + removed++; + x = next; + } + return removed; +} + +unsigned long zslDeleteRangeByLex(zskiplist *zsl, zlexrangespec *range, dict *dict) { + zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; + unsigned long removed = 0; + int i; + + + x = zsl->header; + for (i = zsl->level-1; i >= 0; i--) { + while (x->level[i].forward && + !zslLexValueGteMin(x->level[i].forward->ele,range)) + x = x->level[i].forward; + update[i] = x; + } + + /* Current node is the last with score < or <= min. */ + x = x->level[0].forward; + + /* Delete nodes while in range. */ + while (x && zslLexValueLteMax(x->ele,range)) { + zskiplistNode *next = x->level[0].forward; + zslDeleteNode(zsl,x,update); + dictDelete(dict,x->ele); + zslFreeNode(x); /* Here is where x->ele is actually released. */ + removed++; + x = next; + } + return removed; +} + +/* Delete all the elements with rank between start and end from the skiplist. + * Start and end are inclusive. Note that start and end need to be 1-based */ +unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) { + zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; + unsigned long traversed = 0, removed = 0; + int i; + + x = zsl->header; + for (i = zsl->level-1; i >= 0; i--) { + while (x->level[i].forward && (traversed + x->level[i].span) < start) { + traversed += x->level[i].span; + x = x->level[i].forward; + } + update[i] = x; + } + + traversed++; + x = x->level[0].forward; + while (x && traversed <= end) { + zskiplistNode *next = x->level[0].forward; + zslDeleteNode(zsl,x,update); + dictDelete(dict,x->ele); + zslFreeNode(x); + removed++; + traversed++; + x = next; + } + return removed; +} + +/* Find the rank for an element by both score and key. + * Returns 0 when the element cannot be found, rank otherwise. + * Note that the rank is 1-based due to the span of zsl->header to the + * first element. */ +unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) { + zskiplistNode *x; + unsigned long rank = 0; + int i; + + x = zsl->header; + for (i = zsl->level-1; i >= 0; i--) { + while (x->level[i].forward && + (x->level[i].forward->score < score || + (x->level[i].forward->score == score && + sdscmp(x->level[i].forward->ele,ele) <= 0))) { + rank += x->level[i].span; + x = x->level[i].forward; + } + + /* x might be equal to zsl->header, so test if obj is non-NULL */ + if (x->ele && x->score == score && sdscmp(x->ele,ele) == 0) { + return rank; + } + } + return 0; +} + +/* Finds an element by its rank. The rank argument needs to be 1-based. */ +zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) { + zskiplistNode *x; + unsigned long traversed = 0; + int i; + + x = zsl->header; + for (i = zsl->level-1; i >= 0; i--) { + while (x->level[i].forward && (traversed + x->level[i].span) <= rank) + { + traversed += x->level[i].span; + x = x->level[i].forward; + } + if (traversed == rank) { + return x; + } + } + return NULL; +} + +/* Populate the rangespec according to the objects min and max. */ +static int zslParseRange(robj *min, robj *max, zrangespec *spec) { + char *eptr; + spec->minex = spec->maxex = 0; + + /* Parse the min-max interval. If one of the values is prefixed + * by the "(" character, it's considered "open". For instance + * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max + * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */ + if (min->encoding == OBJ_ENCODING_INT) { + spec->min = (long)min->ptr; + } else { + if (((char*)min->ptr)[0] == '(') { + spec->min = strtod((char*)min->ptr+1,&eptr); + if (eptr[0] != '\0' || isnan(spec->min)) return C_ERR; + spec->minex = 1; + } else { + spec->min = strtod((char*)min->ptr,&eptr); + if (eptr[0] != '\0' || isnan(spec->min)) return C_ERR; + } + } + if (max->encoding == OBJ_ENCODING_INT) { + spec->max = (long)max->ptr; + } else { + if (((char*)max->ptr)[0] == '(') { + spec->max = strtod((char*)max->ptr+1,&eptr); + if (eptr[0] != '\0' || isnan(spec->max)) return C_ERR; + spec->maxex = 1; + } else { + spec->max = strtod((char*)max->ptr,&eptr); + if (eptr[0] != '\0' || isnan(spec->max)) return C_ERR; + } + } + + return C_OK; +} + +/* ------------------------ Lexicographic ranges ---------------------------- */ + +/* Parse max or min argument of ZRANGEBYLEX. + * (foo means foo (open interval) + * [foo means foo (closed interval) + * - means the min string possible + * + means the max string possible + * + * If the string is valid the *dest pointer is set to the redis object + * that will be used for the comparison, and ex will be set to 0 or 1 + * respectively if the item is exclusive or inclusive. C_OK will be + * returned. + * + * If the string is not a valid range C_ERR is returned, and the value + * of *dest and *ex is undefined. */ +int zslParseLexRangeItem(robj *item, sds *dest, int *ex) { + char *c = item->ptr; + + switch(c[0]) { + case '+': + if (c[1] != '\0') return C_ERR; + *ex = 1; + *dest = shared.maxstring; + return C_OK; + case '-': + if (c[1] != '\0') return C_ERR; + *ex = 1; + *dest = shared.minstring; + return C_OK; + case '(': + *ex = 1; + *dest = sdsnewlen(c+1,sdslen(c)-1); + return C_OK; + case '[': + *ex = 0; + *dest = sdsnewlen(c+1,sdslen(c)-1); + return C_OK; + default: + return C_ERR; + } +} + +/* Free a lex range structure, must be called only after zslParseLexRange() + * populated the structure with success (C_OK returned). */ +void zslFreeLexRange(zlexrangespec *spec) { + if (spec->min != shared.minstring && + spec->min != shared.maxstring) sdsfree(spec->min); + if (spec->max != shared.minstring && + spec->max != shared.maxstring) sdsfree(spec->max); +} + +/* Populate the lex rangespec according to the objects min and max. + * + * Return C_OK on success. On error C_ERR is returned. + * When OK is returned the structure must be freed with zslFreeLexRange(), + * otherwise no release is needed. */ +int zslParseLexRange(robj *min, robj *max, zlexrangespec *spec) { + /* The range can't be valid if objects are integer encoded. + * Every item must start with ( or [. */ + if (min->encoding == OBJ_ENCODING_INT || + max->encoding == OBJ_ENCODING_INT) return C_ERR; + + spec->min = spec->max = NULL; + if (zslParseLexRangeItem(min, &spec->min, &spec->minex) == C_ERR || + zslParseLexRangeItem(max, &spec->max, &spec->maxex) == C_ERR) { + zslFreeLexRange(spec); + return C_ERR; + } else { + return C_OK; + } +} + +/* This is just a wrapper to sdscmp() that is able to + * handle shared.minstring and shared.maxstring as the equivalent of + * -inf and +inf for strings */ +int sdscmplex(sds a, sds b) { + if (a == b) return 0; + if (a == shared.minstring || b == shared.maxstring) return -1; + if (a == shared.maxstring || b == shared.minstring) return 1; + return sdscmp(a,b); +} + +int zslLexValueGteMin(sds value, zlexrangespec *spec) { + return spec->minex ? + (sdscmplex(value,spec->min) > 0) : + (sdscmplex(value,spec->min) >= 0); +} + +int zslLexValueLteMax(sds value, zlexrangespec *spec) { + return spec->maxex ? + (sdscmplex(value,spec->max) < 0) : + (sdscmplex(value,spec->max) <= 0); +} + +/* Returns if there is a part of the zset is in the lex range. */ +int zslIsInLexRange(zskiplist *zsl, zlexrangespec *range) { + zskiplistNode *x; + + /* Test for ranges that will always be empty. */ + int cmp = sdscmplex(range->min,range->max); + if (cmp > 0 || (cmp == 0 && (range->minex || range->maxex))) + return 0; + x = zsl->tail; + if (x == NULL || !zslLexValueGteMin(x->ele,range)) + return 0; + x = zsl->header->level[0].forward; + if (x == NULL || !zslLexValueLteMax(x->ele,range)) + return 0; + return 1; +} + +/* Find the first node that is contained in the specified lex range. + * Returns NULL when no element is contained in the range. */ +zskiplistNode *zslFirstInLexRange(zskiplist *zsl, zlexrangespec *range) { + zskiplistNode *x; + int i; + + /* If everything is out of range, return early. */ + if (!zslIsInLexRange(zsl,range)) return NULL; + + x = zsl->header; + for (i = zsl->level-1; i >= 0; i--) { + /* Go forward while *OUT* of range. */ + while (x->level[i].forward && + !zslLexValueGteMin(x->level[i].forward->ele,range)) + x = x->level[i].forward; + } + + /* This is an inner range, so the next node cannot be NULL. */ + x = x->level[0].forward; + serverAssert(x != NULL); + + /* Check if score <= max. */ + if (!zslLexValueLteMax(x->ele,range)) return NULL; + return x; +} + +/* Find the last node that is contained in the specified range. + * Returns NULL when no element is contained in the range. */ +zskiplistNode *zslLastInLexRange(zskiplist *zsl, zlexrangespec *range) { + zskiplistNode *x; + int i; + + /* If everything is out of range, return early. */ + if (!zslIsInLexRange(zsl,range)) return NULL; + + x = zsl->header; + for (i = zsl->level-1; i >= 0; i--) { + /* Go forward while *IN* range. */ + while (x->level[i].forward && + zslLexValueLteMax(x->level[i].forward->ele,range)) + x = x->level[i].forward; + } + + /* This is an inner range, so this node cannot be NULL. */ + serverAssert(x != NULL); + + /* Check if score >= min. */ + if (!zslLexValueGteMin(x->ele,range)) return NULL; + return x; +} + +/*----------------------------------------------------------------------------- + * Listpack-backed sorted set API + *----------------------------------------------------------------------------*/ + +double zzlStrtod(unsigned char *vstr, unsigned int vlen) { + char buf[128]; + if (vlen > sizeof(buf) - 1) + vlen = sizeof(buf) - 1; + memcpy(buf,vstr,vlen); + buf[vlen] = '\0'; + return strtod(buf,NULL); + } + +double zzlGetScore(unsigned char *sptr) { + unsigned char *vstr; + unsigned int vlen; + long long vlong; + double score; + + serverAssert(sptr != NULL); + vstr = lpGetValue(sptr,&vlen,&vlong); + + if (vstr) { + score = zzlStrtod(vstr,vlen); + } else { + score = vlong; + } + + return score; +} + +/* Return a listpack element as an SDS string. */ +sds lpGetObject(unsigned char *sptr) { + unsigned char *vstr; + unsigned int vlen; + long long vlong; + + serverAssert(sptr != NULL); + vstr = lpGetValue(sptr,&vlen,&vlong); + + if (vstr) { + return sdsnewlen((char*)vstr,vlen); + } else { + return sdsfromlonglong(vlong); + } +} + +/* Compare element in sorted set with given element. */ +int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) { + unsigned char *vstr; + unsigned int vlen; + long long vlong; + unsigned char vbuf[32]; + int minlen, cmp; + + vstr = lpGetValue(eptr,&vlen,&vlong); + if (vstr == NULL) { + /* Store string representation of long long in buf. */ + vlen = ll2string((char*)vbuf,sizeof(vbuf),vlong); + vstr = vbuf; + } + + minlen = (vlen < clen) ? vlen : clen; + cmp = memcmp(vstr,cstr,minlen); + if (cmp == 0) return vlen-clen; + return cmp; +} + +unsigned int zzlLength(unsigned char *zl) { + return lpLength(zl)/2; +} + +/* Move to next entry based on the values in eptr and sptr. Both are set to + * NULL when there is no next entry. */ +void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) { + unsigned char *_eptr, *_sptr; + serverAssert(*eptr != NULL && *sptr != NULL); + + _eptr = lpNext(zl,*sptr); + if (_eptr != NULL) { + _sptr = lpNext(zl,_eptr); + serverAssert(_sptr != NULL); + } else { + /* No next entry. */ + _sptr = NULL; + } + + *eptr = _eptr; + *sptr = _sptr; +} + +/* Move to the previous entry based on the values in eptr and sptr. Both are + * set to NULL when there is no prev entry. */ +void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) { + unsigned char *_eptr, *_sptr; + serverAssert(*eptr != NULL && *sptr != NULL); + + _sptr = lpPrev(zl,*eptr); + if (_sptr != NULL) { + _eptr = lpPrev(zl,_sptr); + serverAssert(_eptr != NULL); + } else { + /* No previous entry. */ + _eptr = NULL; + } + + *eptr = _eptr; + *sptr = _sptr; +} + +/* Returns if there is a part of the zset is in range. Should only be used + * internally by zzlFirstInRange and zzlLastInRange. */ +int zzlIsInRange(unsigned char *zl, zrangespec *range) { + unsigned char *p; + double score; + + /* Test for ranges that will always be empty. */ + if (range->min > range->max || + (range->min == range->max && (range->minex || range->maxex))) + return 0; + + p = lpSeek(zl,-1); /* Last score. */ + if (p == NULL) return 0; /* Empty sorted set */ + score = zzlGetScore(p); + if (!zslValueGteMin(score,range)) + return 0; + + p = lpSeek(zl,1); /* First score. */ + serverAssert(p != NULL); + score = zzlGetScore(p); + if (!zslValueLteMax(score,range)) + return 0; + + return 1; +} + +/* Find pointer to the first element contained in the specified range. + * Returns NULL when no element is contained in the range. */ +unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec *range) { + unsigned char *eptr = lpSeek(zl,0), *sptr; + double score; + + /* If everything is out of range, return early. */ + if (!zzlIsInRange(zl,range)) return NULL; + + while (eptr != NULL) { + sptr = lpNext(zl,eptr); + serverAssert(sptr != NULL); + + score = zzlGetScore(sptr); + if (zslValueGteMin(score,range)) { + /* Check if score <= max. */ + if (zslValueLteMax(score,range)) + return eptr; + return NULL; + } + + /* Move to next element. */ + eptr = lpNext(zl,sptr); + } + + return NULL; +} + +/* Find pointer to the last element contained in the specified range. + * Returns NULL when no element is contained in the range. */ +unsigned char *zzlLastInRange(unsigned char *zl, zrangespec *range) { + unsigned char *eptr = lpSeek(zl,-2), *sptr; + double score; + + /* If everything is out of range, return early. */ + if (!zzlIsInRange(zl,range)) return NULL; + + while (eptr != NULL) { + sptr = lpNext(zl,eptr); + serverAssert(sptr != NULL); + + score = zzlGetScore(sptr); + if (zslValueLteMax(score,range)) { + /* Check if score >= min. */ + if (zslValueGteMin(score,range)) + return eptr; + return NULL; + } + + /* Move to previous element by moving to the score of previous element. + * When this returns NULL, we know there also is no element. */ + sptr = lpPrev(zl,eptr); + if (sptr != NULL) + serverAssert((eptr = lpPrev(zl,sptr)) != NULL); + else + eptr = NULL; + } + + return NULL; +} + +int zzlLexValueGteMin(unsigned char *p, zlexrangespec *spec) { + sds value = lpGetObject(p); + int res = zslLexValueGteMin(value,spec); + sdsfree(value); + return res; +} + +int zzlLexValueLteMax(unsigned char *p, zlexrangespec *spec) { + sds value = lpGetObject(p); + int res = zslLexValueLteMax(value,spec); + sdsfree(value); + return res; +} + +/* Returns if there is a part of the zset is in range. Should only be used + * internally by zzlFirstInLexRange and zzlLastInLexRange. */ +int zzlIsInLexRange(unsigned char *zl, zlexrangespec *range) { + unsigned char *p; + + /* Test for ranges that will always be empty. */ + int cmp = sdscmplex(range->min,range->max); + if (cmp > 0 || (cmp == 0 && (range->minex || range->maxex))) + return 0; + + p = lpSeek(zl,-2); /* Last element. */ + if (p == NULL) return 0; + if (!zzlLexValueGteMin(p,range)) + return 0; + + p = lpSeek(zl,0); /* First element. */ + serverAssert(p != NULL); + if (!zzlLexValueLteMax(p,range)) + return 0; + + return 1; +} + +/* Find pointer to the first element contained in the specified lex range. + * Returns NULL when no element is contained in the range. */ +unsigned char *zzlFirstInLexRange(unsigned char *zl, zlexrangespec *range) { + unsigned char *eptr = lpSeek(zl,0), *sptr; + + /* If everything is out of range, return early. */ + if (!zzlIsInLexRange(zl,range)) return NULL; + + while (eptr != NULL) { + if (zzlLexValueGteMin(eptr,range)) { + /* Check if score <= max. */ + if (zzlLexValueLteMax(eptr,range)) + return eptr; + return NULL; + } + + /* Move to next element. */ + sptr = lpNext(zl,eptr); /* This element score. Skip it. */ + serverAssert(sptr != NULL); + eptr = lpNext(zl,sptr); /* Next element. */ + } + + return NULL; +} + +/* Find pointer to the last element contained in the specified lex range. + * Returns NULL when no element is contained in the range. */ +unsigned char *zzlLastInLexRange(unsigned char *zl, zlexrangespec *range) { + unsigned char *eptr = lpSeek(zl,-2), *sptr; + + /* If everything is out of range, return early. */ + if (!zzlIsInLexRange(zl,range)) return NULL; + + while (eptr != NULL) { + if (zzlLexValueLteMax(eptr,range)) { + /* Check if score >= min. */ + if (zzlLexValueGteMin(eptr,range)) + return eptr; + return NULL; + } + + /* Move to previous element by moving to the score of previous element. + * When this returns NULL, we know there also is no element. */ + sptr = lpPrev(zl,eptr); + if (sptr != NULL) + serverAssert((eptr = lpPrev(zl,sptr)) != NULL); + else + eptr = NULL; + } + + return NULL; +} + +unsigned char *zzlFind(unsigned char *lp, sds ele, double *score) { + unsigned char *eptr, *sptr; + + if ((eptr = lpFirst(lp)) == NULL) return NULL; + eptr = lpFind(lp, eptr, (unsigned char*)ele, sdslen(ele), 1); + if (eptr) { + sptr = lpNext(lp,eptr); + serverAssert(sptr != NULL); + + /* Matching element, pull out score. */ + if (score != NULL) *score = zzlGetScore(sptr); + return eptr; + } + + return NULL; +} + +/* Delete (element,score) pair from listpack. Use local copy of eptr because we + * don't want to modify the one given as argument. */ +unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) { + return lpDeleteRangeWithEntry(zl,&eptr,2); +} + +unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, sds ele, double score) { + unsigned char *sptr; + char scorebuf[MAX_D2STRING_CHARS]; + int scorelen = 0; + long long lscore; + int score_is_long = double2ll(score, &lscore); + if (!score_is_long) + scorelen = d2string(scorebuf,sizeof(scorebuf),score); + if (eptr == NULL) { + zl = lpAppend(zl,(unsigned char*)ele,sdslen(ele)); + if (score_is_long) + zl = lpAppendInteger(zl,lscore); + else + zl = lpAppend(zl,(unsigned char*)scorebuf,scorelen); + } else { + /* Insert member before the element 'eptr'. */ + zl = lpInsertString(zl,(unsigned char*)ele,sdslen(ele),eptr,LP_BEFORE,&sptr); + + /* Insert score after the member. */ + if (score_is_long) + zl = lpInsertInteger(zl,lscore,sptr,LP_AFTER,NULL); + else + zl = lpInsertString(zl,(unsigned char*)scorebuf,scorelen,sptr,LP_AFTER,NULL); + } + return zl; +} + +/* Insert (element,score) pair in listpack. This function assumes the element is + * not yet present in the list. */ +unsigned char *zzlInsert(unsigned char *zl, sds ele, double score) { + unsigned char *eptr = lpSeek(zl,0), *sptr; + double s; + + while (eptr != NULL) { + sptr = lpNext(zl,eptr); + serverAssert(sptr != NULL); + s = zzlGetScore(sptr); + + if (s > score) { + /* First element with score larger than score for element to be + * inserted. This means we should take its spot in the list to + * maintain ordering. */ + zl = zzlInsertAt(zl,eptr,ele,score); + break; + } else if (s == score) { + /* Ensure lexicographical ordering for elements. */ + if (zzlCompareElements(eptr,(unsigned char*)ele,sdslen(ele)) > 0) { + zl = zzlInsertAt(zl,eptr,ele,score); + break; + } + } + + /* Move to next element. */ + eptr = lpNext(zl,sptr); + } + + /* Push on tail of list when it was not yet inserted. */ + if (eptr == NULL) + zl = zzlInsertAt(zl,NULL,ele,score); + return zl; +} + +unsigned char *zzlDeleteRangeByScore(unsigned char *zl, zrangespec *range, unsigned long *deleted) { + unsigned char *eptr, *sptr; + double score; + unsigned long num = 0; + + if (deleted != NULL) *deleted = 0; + + eptr = zzlFirstInRange(zl,range); + if (eptr == NULL) return zl; + + /* When the tail of the listpack is deleted, eptr will be NULL. */ + while (eptr && (sptr = lpNext(zl,eptr)) != NULL) { + score = zzlGetScore(sptr); + if (zslValueLteMax(score,range)) { + /* Delete both the element and the score. */ + zl = lpDeleteRangeWithEntry(zl,&eptr,2); + num++; + } else { + /* No longer in range. */ + break; + } + } + + if (deleted != NULL) *deleted = num; + return zl; +} + +unsigned char *zzlDeleteRangeByLex(unsigned char *zl, zlexrangespec *range, unsigned long *deleted) { + unsigned char *eptr, *sptr; + unsigned long num = 0; + + if (deleted != NULL) *deleted = 0; + + eptr = zzlFirstInLexRange(zl,range); + if (eptr == NULL) return zl; + + /* When the tail of the listpack is deleted, eptr will be NULL. */ + while (eptr && (sptr = lpNext(zl,eptr)) != NULL) { + if (zzlLexValueLteMax(eptr,range)) { + /* Delete both the element and the score. */ + zl = lpDeleteRangeWithEntry(zl,&eptr,2); + num++; + } else { + /* No longer in range. */ + break; + } + } + + if (deleted != NULL) *deleted = num; + return zl; +} + +/* Delete all the elements with rank between start and end from the skiplist. + * Start and end are inclusive. Note that start and end need to be 1-based */ +unsigned char *zzlDeleteRangeByRank(unsigned char *zl, unsigned int start, unsigned int end, unsigned long *deleted) { + unsigned int num = (end-start)+1; + if (deleted) *deleted = num; + zl = lpDeleteRange(zl,2*(start-1),2*num); + return zl; +} + +/*----------------------------------------------------------------------------- + * Common sorted set API + *----------------------------------------------------------------------------*/ + +unsigned long zsetLength(const robj *zobj) { + unsigned long length = 0; + if (zobj->encoding == OBJ_ENCODING_LISTPACK) { + length = zzlLength(zobj->ptr); + } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { + length = ((const zset*)zobj->ptr)->zsl->length; + } else { + serverPanic("Unknown sorted set encoding"); + } + return length; +} + +/* Factory method to return a zset. + * + * The size hint indicates approximately how many items will be added, + * and the value len hint indicates the approximate individual size of the added elements, + * they are used to determine the initial representation. + * + * If the hints are not known, and underestimation or 0 is suitable. */ +robj *zsetTypeCreate(size_t size_hint, size_t val_len_hint) { + if (size_hint <= server.zset_max_listpack_entries && + val_len_hint <= server.zset_max_listpack_value) + { + return createZsetListpackObject(); + } + + robj *zobj = createZsetObject(); + zset *zs = zobj->ptr; + dictExpand(zs->dict, size_hint); + return zobj; +} + +/* Check if the existing zset should be converted to another encoding based off the + * the size hint. */ +void zsetTypeMaybeConvert(robj *zobj, size_t size_hint) { + if (zobj->encoding == OBJ_ENCODING_LISTPACK && + size_hint > server.zset_max_listpack_entries) + { + zsetConvertAndExpand(zobj, OBJ_ENCODING_SKIPLIST, size_hint); + } +} + +/* Convert the zset to specified encoding. The zset dict (when converting + * to a skiplist) is presized to hold the number of elements in the original + * zset. */ +void zsetConvert(robj *zobj, int encoding) { + zsetConvertAndExpand(zobj, encoding, zsetLength(zobj)); +} + +/* Converts a zset to the specified encoding, pre-sizing it for 'cap' elements. */ +void zsetConvertAndExpand(robj *zobj, int encoding, unsigned long cap) { + zset *zs; + zskiplistNode *node, *next; + sds ele; + double score; + + if (zobj->encoding == encoding) return; + if (zobj->encoding == OBJ_ENCODING_LISTPACK) { + unsigned char *zl = zobj->ptr; + unsigned char *eptr, *sptr; + unsigned char *vstr; + unsigned int vlen; + long long vlong; + + if (encoding != OBJ_ENCODING_SKIPLIST) + serverPanic("Unknown target encoding"); + + zs = zmalloc(sizeof(*zs)); + zs->dict = dictCreate(&zsetDictType); + zs->zsl = zslCreate(); + + /* Presize the dict to avoid rehashing */ + dictExpand(zs->dict, cap); + + eptr = lpSeek(zl,0); + if (eptr != NULL) { + sptr = lpNext(zl,eptr); + serverAssertWithInfo(NULL,zobj,sptr != NULL); + } + + while (eptr != NULL) { + score = zzlGetScore(sptr); + vstr = lpGetValue(eptr,&vlen,&vlong); + if (vstr == NULL) + ele = sdsfromlonglong(vlong); + else + ele = sdsnewlen((char*)vstr,vlen); + + node = zslInsert(zs->zsl,score,ele); + serverAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK); + zzlNext(zl,&eptr,&sptr); + } + + zfree(zobj->ptr); + zobj->ptr = zs; + zobj->encoding = OBJ_ENCODING_SKIPLIST; + } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { + unsigned char *zl = lpNew(0); + + if (encoding != OBJ_ENCODING_LISTPACK) + serverPanic("Unknown target encoding"); + + /* Approach similar to zslFree(), since we want to free the skiplist at + * the same time as creating the listpack. */ + zs = zobj->ptr; + dictRelease(zs->dict); + node = zs->zsl->header->level[0].forward; + zfree(zs->zsl->header); + zfree(zs->zsl); + + while (node) { + zl = zzlInsertAt(zl,NULL,node->ele,node->score); + next = node->level[0].forward; + zslFreeNode(node); + node = next; + } + + zfree(zs); + zobj->ptr = zl; + zobj->encoding = OBJ_ENCODING_LISTPACK; + } else { + serverPanic("Unknown sorted set encoding"); + } +} + +/* Convert the sorted set object into a listpack if it is not already a listpack + * and if the number of elements and the maximum element size and total elements size + * are within the expected ranges. */ +void zsetConvertToListpackIfNeeded(robj *zobj, size_t maxelelen, size_t totelelen) { + if (zobj->encoding == OBJ_ENCODING_LISTPACK) return; + zset *zset = zobj->ptr; + + if (zset->zsl->length <= server.zset_max_listpack_entries && + maxelelen <= server.zset_max_listpack_value && + lpSafeToAdd(NULL, totelelen)) + { + zsetConvert(zobj,OBJ_ENCODING_LISTPACK); + } +} + +/* Return (by reference) the score of the specified member of the sorted set + * storing it into *score. If the element does not exist C_ERR is returned + * otherwise C_OK is returned and *score is correctly populated. + * If 'zobj' or 'member' is NULL, C_ERR is returned. */ +int zsetScore(robj *zobj, sds member, double *score) { + if (!zobj || !member) return C_ERR; + + if (zobj->encoding == OBJ_ENCODING_LISTPACK) { + if (zzlFind(zobj->ptr, member, score) == NULL) return C_ERR; + } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { + zset *zs = zobj->ptr; + dictEntry *de = dictFind(zs->dict, member); + if (de == NULL) return C_ERR; + *score = *(double*)dictGetVal(de); + } else { + serverPanic("Unknown sorted set encoding"); + } + return C_OK; +} + +/* Add a new element or update the score of an existing element in a sorted + * set, regardless of its encoding. + * + * The set of flags change the command behavior. + * + * The input flags are the following: + * + * ZADD_INCR: Increment the current element score by 'score' instead of updating + * the current element score. If the element does not exist, we + * assume 0 as previous score. + * ZADD_NX: Perform the operation only if the element does not exist. + * ZADD_XX: Perform the operation only if the element already exist. + * ZADD_GT: Perform the operation on existing elements only if the new score is + * greater than the current score. + * ZADD_LT: Perform the operation on existing elements only if the new score is + * less than the current score. + * + * When ZADD_INCR is used, the new score of the element is stored in + * '*newscore' if 'newscore' is not NULL. + * + * The returned flags are the following: + * + * ZADD_NAN: The resulting score is not a number. + * ZADD_ADDED: The element was added (not present before the call). + * ZADD_UPDATED: The element score was updated. + * ZADD_NOP: No operation was performed because of NX or XX. + * + * Return value: + * + * The function returns 1 on success, and sets the appropriate flags + * ADDED or UPDATED to signal what happened during the operation (note that + * none could be set if we re-added an element using the same score it used + * to have, or in the case a zero increment is used). + * + * The function returns 0 on error, currently only when the increment + * produces a NAN condition, or when the 'score' value is NAN since the + * start. + * + * The command as a side effect of adding a new element may convert the sorted + * set internal encoding from listpack to hashtable+skiplist. + * + * Memory management of 'ele': + * + * The function does not take ownership of the 'ele' SDS string, but copies + * it if needed. */ +int zsetAdd(robj *zobj, double score, sds ele, int in_flags, int *out_flags, double *newscore) { + /* Turn options into simple to check vars. */ + int incr = (in_flags & ZADD_IN_INCR) != 0; + int nx = (in_flags & ZADD_IN_NX) != 0; + int xx = (in_flags & ZADD_IN_XX) != 0; + int gt = (in_flags & ZADD_IN_GT) != 0; + int lt = (in_flags & ZADD_IN_LT) != 0; + *out_flags = 0; /* We'll return our response flags. */ + double curscore; + + /* NaN as input is an error regardless of all the other parameters. */ + if (isnan(score)) { + *out_flags = ZADD_OUT_NAN; + return 0; + } + + /* Update the sorted set according to its encoding. */ + if (zobj->encoding == OBJ_ENCODING_LISTPACK) { + unsigned char *eptr; + + if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) { + /* NX? Return, same element already exists. */ + if (nx) { + *out_flags |= ZADD_OUT_NOP; + return 1; + } + + /* Prepare the score for the increment if needed. */ + if (incr) { + score += curscore; + if (isnan(score)) { + *out_flags |= ZADD_OUT_NAN; + return 0; + } + } + + /* GT/LT? Only update if score is greater/less than current. */ + if ((lt && score >= curscore) || (gt && score <= curscore)) { + *out_flags |= ZADD_OUT_NOP; + return 1; + } + + if (newscore) *newscore = score; + + /* Remove and re-insert when score changed. */ + if (score != curscore) { + zobj->ptr = zzlDelete(zobj->ptr,eptr); + zobj->ptr = zzlInsert(zobj->ptr,ele,score); + *out_flags |= ZADD_OUT_UPDATED; + } + return 1; + } else if (!xx) { + /* check if the element is too large or the list + * becomes too long *before* executing zzlInsert. */ + if (zzlLength(zobj->ptr)+1 > server.zset_max_listpack_entries || + sdslen(ele) > server.zset_max_listpack_value || + !lpSafeToAdd(zobj->ptr, sdslen(ele))) + { + zsetConvertAndExpand(zobj, OBJ_ENCODING_SKIPLIST, zsetLength(zobj) + 1); + } else { + zobj->ptr = zzlInsert(zobj->ptr,ele,score); + if (newscore) *newscore = score; + *out_flags |= ZADD_OUT_ADDED; + return 1; + } + } else { + *out_flags |= ZADD_OUT_NOP; + return 1; + } + } + + /* Note that the above block handling listpack would have either returned or + * converted the key to skiplist. */ + if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { + zset *zs = zobj->ptr; + zskiplistNode *znode; + dictEntry *de; + + de = dictFind(zs->dict,ele); + if (de != NULL) { + /* NX? Return, same element already exists. */ + if (nx) { + *out_flags |= ZADD_OUT_NOP; + return 1; + } + + curscore = *(double*)dictGetVal(de); + + /* Prepare the score for the increment if needed. */ + if (incr) { + score += curscore; + if (isnan(score)) { + *out_flags |= ZADD_OUT_NAN; + return 0; + } + } + + /* GT/LT? Only update if score is greater/less than current. */ + if ((lt && score >= curscore) || (gt && score <= curscore)) { + *out_flags |= ZADD_OUT_NOP; + return 1; + } + + if (newscore) *newscore = score; + + /* Remove and re-insert when score changes. */ + if (score != curscore) { + znode = zslUpdateScore(zs->zsl,curscore,ele,score); + /* Note that we did not removed the original element from + * the hash table representing the sorted set, so we just + * update the score. */ + dictSetVal(zs->dict, de, &znode->score); /* Update score ptr. */ + *out_flags |= ZADD_OUT_UPDATED; + } + return 1; + } else if (!xx) { + ele = sdsdup(ele); + znode = zslInsert(zs->zsl,score,ele); + serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK); + *out_flags |= ZADD_OUT_ADDED; + if (newscore) *newscore = score; + return 1; + } else { + *out_flags |= ZADD_OUT_NOP; + return 1; + } + } else { + serverPanic("Unknown sorted set encoding"); + } + return 0; /* Never reached. */ +} + +/* Deletes the element 'ele' from the sorted set encoded as a skiplist+dict, + * returning 1 if the element existed and was deleted, 0 otherwise (the + * element was not there). It does not resize the dict after deleting the + * element. */ +static int zsetRemoveFromSkiplist(zset *zs, sds ele) { + dictEntry *de; + double score; + + de = dictUnlink(zs->dict,ele); + if (de != NULL) { + /* Get the score in order to delete from the skiplist later. */ + score = *(double*)dictGetVal(de); + + /* Delete from the hash table and later from the skiplist. + * Note that the order is important: deleting from the skiplist + * actually releases the SDS string representing the element, + * which is shared between the skiplist and the hash table, so + * we need to delete from the skiplist as the final step. */ + dictFreeUnlinkedEntry(zs->dict,de); + + /* Delete from skiplist. */ + int retval = zslDelete(zs->zsl,score,ele,NULL); + serverAssert(retval); + + return 1; + } + + return 0; +} + +/* Delete the element 'ele' from the sorted set, returning 1 if the element + * existed and was deleted, 0 otherwise (the element was not there). */ +int zsetDel(robj *zobj, sds ele) { + if (zobj->encoding == OBJ_ENCODING_LISTPACK) { + unsigned char *eptr; + + if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) { + zobj->ptr = zzlDelete(zobj->ptr,eptr); + return 1; + } + } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { + zset *zs = zobj->ptr; + if (zsetRemoveFromSkiplist(zs, ele)) { + if (htNeedsResize(zs->dict)) dictResize(zs->dict); + return 1; + } + } else { + serverPanic("Unknown sorted set encoding"); + } + return 0; /* No such element found. */ +} + +/* Given a sorted set object returns the 0-based rank of the object or + * -1 if the object does not exist. + * + * For rank we mean the position of the element in the sorted collection + * of elements. So the first element has rank 0, the second rank 1, and so + * forth up to length-1 elements. + * + * If 'reverse' is false, the rank is returned considering as first element + * the one with the lowest score. Otherwise if 'reverse' is non-zero + * the rank is computed considering as element with rank 0 the one with + * the highest score. */ +long zsetRank(robj *zobj, sds ele, int reverse, double *output_score) { + unsigned long llen; + unsigned long rank; + + llen = zsetLength(zobj); + + if (zobj->encoding == OBJ_ENCODING_LISTPACK) { + unsigned char *zl = zobj->ptr; + unsigned char *eptr, *sptr; + + eptr = lpSeek(zl,0); + serverAssert(eptr != NULL); + sptr = lpNext(zl,eptr); + serverAssert(sptr != NULL); + + rank = 1; + while(eptr != NULL) { + if (lpCompare(eptr,(unsigned char*)ele,sdslen(ele))) + break; + rank++; + zzlNext(zl,&eptr,&sptr); + } + + if (eptr != NULL) { + if (output_score) + *output_score = zzlGetScore(sptr); + if (reverse) + return llen-rank; + else + return rank-1; + } else { + return -1; + } + } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { + zset *zs = zobj->ptr; + zskiplist *zsl = zs->zsl; + dictEntry *de; + double score; + + de = dictFind(zs->dict,ele); + if (de != NULL) { + score = *(double*)dictGetVal(de); + rank = zslGetRank(zsl,score,ele); + /* Existing elements always have a rank. */ + serverAssert(rank != 0); + if (output_score) + *output_score = score; + if (reverse) + return llen-rank; + else + return rank-1; + } else { + return -1; + } + } else { + serverPanic("Unknown sorted set encoding"); + } +} + +/* This is a helper function for the COPY command. + * Duplicate a sorted set object, with the guarantee that the returned object + * has the same encoding as the original one. + * + * The resulting object always has refcount set to 1 */ +robj *zsetDup(robj *o) { + robj *zobj; + zset *zs; + zset *new_zs; + + serverAssert(o->type == OBJ_ZSET); + + /* Create a new sorted set object that have the same encoding as the original object's encoding */ + if (o->encoding == OBJ_ENCODING_LISTPACK) { + unsigned char *zl = o->ptr; + size_t sz = lpBytes(zl); + unsigned char *new_zl = zmalloc(sz); + memcpy(new_zl, zl, sz); + zobj = createObject(OBJ_ZSET, new_zl); + zobj->encoding = OBJ_ENCODING_LISTPACK; + } else if (o->encoding == OBJ_ENCODING_SKIPLIST) { + zobj = createZsetObject(); + zs = o->ptr; + new_zs = zobj->ptr; + dictExpand(new_zs->dict,dictSize(zs->dict)); + zskiplist *zsl = zs->zsl; + zskiplistNode *ln; + sds ele; + long llen = zsetLength(o); + + /* We copy the skiplist elements from the greatest to the + * smallest (that's trivial since the elements are already ordered in + * the skiplist): this improves the load process, since the next loaded + * element will always be the smaller, so adding to the skiplist + * will always immediately stop at the head, making the insertion + * O(1) instead of O(log(N)). */ + ln = zsl->tail; + while (llen--) { + ele = ln->ele; + sds new_ele = sdsdup(ele); + zskiplistNode *znode = zslInsert(new_zs->zsl,ln->score,new_ele); + dictAdd(new_zs->dict,new_ele,&znode->score); + ln = ln->backward; + } + } else { + serverPanic("Unknown sorted set encoding"); + } + return zobj; +} + +/* Create a new sds string from the listpack entry. */ +sds zsetSdsFromListpackEntry(listpackEntry *e) { + return e->sval ? sdsnewlen(e->sval, e->slen) : sdsfromlonglong(e->lval); +} + +/* Reply with bulk string from the listpack entry. */ +void zsetReplyFromListpackEntry(client *c, listpackEntry *e) { + if (e->sval) + addReplyBulkCBuffer(c, e->sval, e->slen); + else + addReplyBulkLongLong(c, e->lval); +} + + +/* Return random element from a non empty zset. + * 'key' and 'val' will be set to hold the element. + * The memory in `key` is not to be freed or modified by the caller. + * 'score' can be NULL in which case it's not extracted. */ +void zsetTypeRandomElement(robj *zsetobj, unsigned long zsetsize, listpackEntry *key, double *score) { + if (zsetobj->encoding == OBJ_ENCODING_SKIPLIST) { + zset *zs = zsetobj->ptr; + dictEntry *de = dictGetFairRandomKey(zs->dict); + sds s = dictGetKey(de); + key->sval = (unsigned char*)s; + key->slen = sdslen(s); + if (score) + *score = *(double*)dictGetVal(de); + } else if (zsetobj->encoding == OBJ_ENCODING_LISTPACK) { + listpackEntry val; + lpRandomPair(zsetobj->ptr, zsetsize, key, &val); + if (score) { + if (val.sval) { + *score = zzlStrtod(val.sval,val.slen); + } else { + *score = (double)val.lval; + } + } + } else { + serverPanic("Unknown zset encoding"); + } +} + +/*----------------------------------------------------------------------------- + * Sorted set commands + *----------------------------------------------------------------------------*/ + +/* This generic command implements both ZADD and ZINCRBY. */ +void zaddGenericCommand(client *c, int flags) { + static char *nanerr = "resulting score is not a number (NaN)"; + robj *key = c->argv[1]; + robj *zobj; + sds ele; + double score = 0, *scores = NULL; + int j, elements, ch = 0; + int scoreidx = 0; + /* The following vars are used in order to track what the command actually + * did during the execution, to reply to the client and to trigger the + * notification of keyspace change. */ + int added = 0; /* Number of new elements added. */ + int updated = 0; /* Number of elements with updated score. */ + int processed = 0; /* Number of elements processed, may remain zero with + options like XX. */ + + /* Parse options. At the end 'scoreidx' is set to the argument position + * of the score of the first score-element pair. */ + scoreidx = 2; + while(scoreidx < c->argc) { + char *opt = c->argv[scoreidx]->ptr; + if (!strcasecmp(opt,"nx")) flags |= ZADD_IN_NX; + else if (!strcasecmp(opt,"xx")) flags |= ZADD_IN_XX; + else if (!strcasecmp(opt,"ch")) ch = 1; /* Return num of elements added or updated. */ + else if (!strcasecmp(opt,"incr")) flags |= ZADD_IN_INCR; + else if (!strcasecmp(opt,"gt")) flags |= ZADD_IN_GT; + else if (!strcasecmp(opt,"lt")) flags |= ZADD_IN_LT; + else break; + scoreidx++; + } + + /* Turn options into simple to check vars. */ + int incr = (flags & ZADD_IN_INCR) != 0; + int nx = (flags & ZADD_IN_NX) != 0; + int xx = (flags & ZADD_IN_XX) != 0; + int gt = (flags & ZADD_IN_GT) != 0; + int lt = (flags & ZADD_IN_LT) != 0; + + /* After the options, we expect to have an even number of args, since + * we expect any number of score-element pairs. */ + elements = c->argc-scoreidx; + if (elements % 2 || !elements) { + addReplyErrorObject(c,shared.syntaxerr); + return; + } + elements /= 2; /* Now this holds the number of score-element pairs. */ + + /* Check for incompatible options. */ + if (nx && xx) { + addReplyError(c, + "XX and NX options at the same time are not compatible"); + return; + } + + if ((gt && nx) || (lt && nx) || (gt && lt)) { + addReplyError(c, + "GT, LT, and/or NX options at the same time are not compatible"); + return; + } + /* Note that XX is compatible with either GT or LT */ + + if (incr && elements > 1) { + addReplyError(c, + "INCR option supports a single increment-element pair"); + return; + } + + /* Start parsing all the scores, we need to emit any syntax error + * before executing additions to the sorted set, as the command should + * either execute fully or nothing at all. */ + scores = zmalloc(sizeof(double)*elements); + for (j = 0; j < elements; j++) { + if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL) + != C_OK) goto cleanup; + } + + /* Lookup the key and create the sorted set if does not exist. */ + zobj = lookupKeyWrite(c->db,key); + if (checkType(c,zobj,OBJ_ZSET)) goto cleanup; + if (zobj == NULL) { + if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */ + zobj = zsetTypeCreate(elements, sdslen(c->argv[scoreidx+1]->ptr)); + dbAdd(c->db,key,zobj); + } else { + zsetTypeMaybeConvert(zobj, elements); + } + + for (j = 0; j < elements; j++) { + double newscore; + score = scores[j]; + int retflags = 0; + + ele = c->argv[scoreidx+1+j*2]->ptr; + int retval = zsetAdd(zobj, score, ele, flags, &retflags, &newscore); + if (retval == 0) { + addReplyError(c,nanerr); + goto cleanup; + } + if (retflags & ZADD_OUT_ADDED) added++; + if (retflags & ZADD_OUT_UPDATED) updated++; + if (!(retflags & ZADD_OUT_NOP)) processed++; + score = newscore; + } + server.dirty += (added+updated); + +reply_to_client: + if (incr) { /* ZINCRBY or INCR option. */ + if (processed) + addReplyDouble(c,score); + else + addReplyNull(c); + } else { /* ZADD. */ + addReplyLongLong(c,ch ? added+updated : added); + } + +cleanup: + zfree(scores); + if (added || updated) { + signalModifiedKey(c,c->db,key); + notifyKeyspaceEvent(NOTIFY_ZSET, + incr ? "zincr" : "zadd", key, c->db->id); + } +} + +void zaddCommand(client *c) { + zaddGenericCommand(c,ZADD_IN_NONE); +} + +void zincrbyCommand(client *c) { + zaddGenericCommand(c,ZADD_IN_INCR); +} + +void zremCommand(client *c) { + robj *key = c->argv[1]; + robj *zobj; + int deleted = 0, keyremoved = 0, j; + + if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL || + checkType(c,zobj,OBJ_ZSET)) return; + + for (j = 2; j < c->argc; j++) { + if (zsetDel(zobj,c->argv[j]->ptr)) deleted++; + if (zsetLength(zobj) == 0) { + dbDelete(c->db,key); + keyremoved = 1; + break; + } + } + + if (deleted) { + notifyKeyspaceEvent(NOTIFY_ZSET,"zrem",key,c->db->id); + if (keyremoved) + notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); + signalModifiedKey(c,c->db,key); + server.dirty += deleted; + } + addReplyLongLong(c,deleted); +} + +typedef enum { + ZRANGE_AUTO = 0, + ZRANGE_RANK, + ZRANGE_SCORE, + ZRANGE_LEX, +} zrange_type; + +/* Implements ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZREMRANGEBYLEX commands. */ +void zremrangeGenericCommand(client *c, zrange_type rangetype) { + robj *key = c->argv[1]; + robj *zobj; + int keyremoved = 0; + unsigned long deleted = 0; + zrangespec range; + zlexrangespec lexrange; + long start, end, llen; + char *notify_type = NULL; + + /* Step 1: Parse the range. */ + if (rangetype == ZRANGE_RANK) { + notify_type = "zremrangebyrank"; + if ((getLongFromObjectOrReply(c,c->argv[2],&start,NULL) != C_OK) || + (getLongFromObjectOrReply(c,c->argv[3],&end,NULL) != C_OK)) + return; + } else if (rangetype == ZRANGE_SCORE) { + notify_type = "zremrangebyscore"; + if (zslParseRange(c->argv[2],c->argv[3],&range) != C_OK) { + addReplyError(c,"min or max is not a float"); + return; + } + } else if (rangetype == ZRANGE_LEX) { + notify_type = "zremrangebylex"; + if (zslParseLexRange(c->argv[2],c->argv[3],&lexrange) != C_OK) { + addReplyError(c,"min or max not valid string range item"); + return; + } + } else { + serverPanic("unknown rangetype %d", (int)rangetype); + } + + /* Step 2: Lookup & range sanity checks if needed. */ + if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL || + checkType(c,zobj,OBJ_ZSET)) goto cleanup; + + if (rangetype == ZRANGE_RANK) { + /* Sanitize indexes. */ + llen = zsetLength(zobj); + if (start < 0) start = llen+start; + if (end < 0) end = llen+end; + if (start < 0) start = 0; + + /* Invariant: start >= 0, so this test will be true when end < 0. + * The range is empty when start > end or start >= length. */ + if (start > end || start >= llen) { + addReply(c,shared.czero); + goto cleanup; + } + if (end >= llen) end = llen-1; + } + + /* Step 3: Perform the range deletion operation. */ + if (zobj->encoding == OBJ_ENCODING_LISTPACK) { + switch(rangetype) { + case ZRANGE_AUTO: + case ZRANGE_RANK: + zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted); + break; + case ZRANGE_SCORE: + zobj->ptr = zzlDeleteRangeByScore(zobj->ptr,&range,&deleted); + break; + case ZRANGE_LEX: + zobj->ptr = zzlDeleteRangeByLex(zobj->ptr,&lexrange,&deleted); + break; + } + if (zzlLength(zobj->ptr) == 0) { + dbDelete(c->db,key); + keyremoved = 1; + } + } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { + zset *zs = zobj->ptr; + switch(rangetype) { + case ZRANGE_AUTO: + case ZRANGE_RANK: + deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict); + break; + case ZRANGE_SCORE: + deleted = zslDeleteRangeByScore(zs->zsl,&range,zs->dict); + break; + case ZRANGE_LEX: + deleted = zslDeleteRangeByLex(zs->zsl,&lexrange,zs->dict); + break; + } + if (htNeedsResize(zs->dict)) dictResize(zs->dict); + if (dictSize(zs->dict) == 0) { + dbDelete(c->db,key); + keyremoved = 1; + } + } else { + serverPanic("Unknown sorted set encoding"); + } + + /* Step 4: Notifications and reply. */ + if (deleted) { + signalModifiedKey(c,c->db,key); + notifyKeyspaceEvent(NOTIFY_ZSET,notify_type,key,c->db->id); + if (keyremoved) + notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); + } + server.dirty += deleted; + addReplyLongLong(c,deleted); + +cleanup: + if (rangetype == ZRANGE_LEX) zslFreeLexRange(&lexrange); +} + +void zremrangebyrankCommand(client *c) { + zremrangeGenericCommand(c,ZRANGE_RANK); +} + +void zremrangebyscoreCommand(client *c) { + zremrangeGenericCommand(c,ZRANGE_SCORE); +} + +void zremrangebylexCommand(client *c) { + zremrangeGenericCommand(c,ZRANGE_LEX); +} + +typedef struct { + robj *subject; + int type; /* Set, sorted set */ + int encoding; + double weight; + + union { + /* Set iterators. */ + union _iterset { + struct { + intset *is; + int ii; + } is; + struct { + dict *dict; + dictIterator *di; + dictEntry *de; + } ht; + struct { + unsigned char *lp; + unsigned char *p; + } lp; + } set; + + /* Sorted set iterators. */ + union _iterzset { + struct { + unsigned char *zl; + unsigned char *eptr, *sptr; + } zl; + struct { + zset *zs; + zskiplistNode *node; + } sl; + } zset; + } iter; +} zsetopsrc; + + +/* Use dirty flags for pointers that need to be cleaned up in the next + * iteration over the zsetopval. The dirty flag for the long long value is + * special, since long long values don't need cleanup. Instead, it means that + * we already checked that "ell" holds a long long, or tried to convert another + * representation into a long long value. When this was successful, + * OPVAL_VALID_LL is set as well. */ +#define OPVAL_DIRTY_SDS 1 +#define OPVAL_DIRTY_LL 2 +#define OPVAL_VALID_LL 4 + +/* Store value retrieved from the iterator. */ +typedef struct { + int flags; + unsigned char _buf[32]; /* Private buffer. */ + sds ele; + unsigned char *estr; + unsigned int elen; + long long ell; + double score; +} zsetopval; + +typedef union _iterset iterset; +typedef union _iterzset iterzset; + +void zuiInitIterator(zsetopsrc *op) { + if (op->subject == NULL) + return; + + if (op->type == OBJ_SET) { + iterset *it = &op->iter.set; + if (op->encoding == OBJ_ENCODING_INTSET) { + it->is.is = op->subject->ptr; + it->is.ii = 0; + } else if (op->encoding == OBJ_ENCODING_HT) { + it->ht.dict = op->subject->ptr; + it->ht.di = dictGetIterator(op->subject->ptr); + it->ht.de = dictNext(it->ht.di); + } else if (op->encoding == OBJ_ENCODING_LISTPACK) { + it->lp.lp = op->subject->ptr; + it->lp.p = lpFirst(it->lp.lp); + } else { + serverPanic("Unknown set encoding"); + } + } else if (op->type == OBJ_ZSET) { + /* Sorted sets are traversed in reverse order to optimize for + * the insertion of the elements in a new list as in + * ZDIFF/ZINTER/ZUNION */ + iterzset *it = &op->iter.zset; + if (op->encoding == OBJ_ENCODING_LISTPACK) { + it->zl.zl = op->subject->ptr; + it->zl.eptr = lpSeek(it->zl.zl,-2); + if (it->zl.eptr != NULL) { + it->zl.sptr = lpNext(it->zl.zl,it->zl.eptr); + serverAssert(it->zl.sptr != NULL); + } + } else if (op->encoding == OBJ_ENCODING_SKIPLIST) { + it->sl.zs = op->subject->ptr; + it->sl.node = it->sl.zs->zsl->tail; + } else { + serverPanic("Unknown sorted set encoding"); + } + } else { + serverPanic("Unsupported type"); + } +} + +void zuiClearIterator(zsetopsrc *op) { + if (op->subject == NULL) + return; + + if (op->type == OBJ_SET) { + iterset *it = &op->iter.set; + if (op->encoding == OBJ_ENCODING_INTSET) { + UNUSED(it); /* skip */ + } else if (op->encoding == OBJ_ENCODING_HT) { + dictReleaseIterator(it->ht.di); + } else if (op->encoding == OBJ_ENCODING_LISTPACK) { + UNUSED(it); + } else { + serverPanic("Unknown set encoding"); + } + } else if (op->type == OBJ_ZSET) { + iterzset *it = &op->iter.zset; + if (op->encoding == OBJ_ENCODING_LISTPACK) { + UNUSED(it); /* skip */ + } else if (op->encoding == OBJ_ENCODING_SKIPLIST) { + UNUSED(it); /* skip */ + } else { + serverPanic("Unknown sorted set encoding"); + } + } else { + serverPanic("Unsupported type"); + } +} + +void zuiDiscardDirtyValue(zsetopval *val) { + if (val->flags & OPVAL_DIRTY_SDS) { + sdsfree(val->ele); + val->ele = NULL; + val->flags &= ~OPVAL_DIRTY_SDS; + } +} + +unsigned long zuiLength(zsetopsrc *op) { + if (op->subject == NULL) + return 0; + + if (op->type == OBJ_SET) { + return setTypeSize(op->subject); + } else if (op->type == OBJ_ZSET) { + if (op->encoding == OBJ_ENCODING_LISTPACK) { + return zzlLength(op->subject->ptr); + } else if (op->encoding == OBJ_ENCODING_SKIPLIST) { + zset *zs = op->subject->ptr; + return zs->zsl->length; + } else { + serverPanic("Unknown sorted set encoding"); + } + } else { + serverPanic("Unsupported type"); + } +} + +/* Check if the current value is valid. If so, store it in the passed structure + * and move to the next element. If not valid, this means we have reached the + * end of the structure and can abort. */ +int zuiNext(zsetopsrc *op, zsetopval *val) { + if (op->subject == NULL) + return 0; + + zuiDiscardDirtyValue(val); + + memset(val,0,sizeof(zsetopval)); + + if (op->type == OBJ_SET) { + iterset *it = &op->iter.set; + if (op->encoding == OBJ_ENCODING_INTSET) { + int64_t ell; + + if (!intsetGet(it->is.is,it->is.ii,&ell)) + return 0; + val->ell = ell; + val->score = 1.0; + + /* Move to next element. */ + it->is.ii++; + } else if (op->encoding == OBJ_ENCODING_HT) { + if (it->ht.de == NULL) + return 0; + val->ele = dictGetKey(it->ht.de); + val->score = 1.0; + + /* Move to next element. */ + it->ht.de = dictNext(it->ht.di); + } else if (op->encoding == OBJ_ENCODING_LISTPACK) { + if (it->lp.p == NULL) + return 0; + val->estr = lpGetValue(it->lp.p, &val->elen, &val->ell); + val->score = 1.0; + + /* Move to next element. */ + it->lp.p = lpNext(it->lp.lp, it->lp.p); + } else { + serverPanic("Unknown set encoding"); + } + } else if (op->type == OBJ_ZSET) { + iterzset *it = &op->iter.zset; + if (op->encoding == OBJ_ENCODING_LISTPACK) { + /* No need to check both, but better be explicit. */ + if (it->zl.eptr == NULL || it->zl.sptr == NULL) + return 0; + val->estr = lpGetValue(it->zl.eptr,&val->elen,&val->ell); + val->score = zzlGetScore(it->zl.sptr); + + /* Move to next element (going backwards, see zuiInitIterator). */ + zzlPrev(it->zl.zl,&it->zl.eptr,&it->zl.sptr); + } else if (op->encoding == OBJ_ENCODING_SKIPLIST) { + if (it->sl.node == NULL) + return 0; + val->ele = it->sl.node->ele; + val->score = it->sl.node->score; + + /* Move to next element. (going backwards, see zuiInitIterator) */ + it->sl.node = it->sl.node->backward; + } else { + serverPanic("Unknown sorted set encoding"); + } + } else { + serverPanic("Unsupported type"); + } + return 1; +} + +int zuiLongLongFromValue(zsetopval *val) { + if (!(val->flags & OPVAL_DIRTY_LL)) { + val->flags |= OPVAL_DIRTY_LL; + + if (val->ele != NULL) { + if (string2ll(val->ele,sdslen(val->ele),&val->ell)) + val->flags |= OPVAL_VALID_LL; + } else if (val->estr != NULL) { + if (string2ll((char*)val->estr,val->elen,&val->ell)) + val->flags |= OPVAL_VALID_LL; + } else { + /* The long long was already set, flag as valid. */ + val->flags |= OPVAL_VALID_LL; + } + } + return val->flags & OPVAL_VALID_LL; +} + +sds zuiSdsFromValue(zsetopval *val) { + if (val->ele == NULL) { + if (val->estr != NULL) { + val->ele = sdsnewlen((char*)val->estr,val->elen); + } else { + val->ele = sdsfromlonglong(val->ell); + } + val->flags |= OPVAL_DIRTY_SDS; + } + return val->ele; +} + +/* This is different from zuiSdsFromValue since returns a new SDS string + * which is up to the caller to free. */ +sds zuiNewSdsFromValue(zsetopval *val) { + if (val->flags & OPVAL_DIRTY_SDS) { + /* We have already one to return! */ + sds ele = val->ele; + val->flags &= ~OPVAL_DIRTY_SDS; + val->ele = NULL; + return ele; + } else if (val->ele) { + return sdsdup(val->ele); + } else if (val->estr) { + return sdsnewlen((char*)val->estr,val->elen); + } else { + return sdsfromlonglong(val->ell); + } +} + +int zuiBufferFromValue(zsetopval *val) { + if (val->estr == NULL) { + if (val->ele != NULL) { + val->elen = sdslen(val->ele); + val->estr = (unsigned char*)val->ele; + } else { + val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),val->ell); + val->estr = val->_buf; + } + } + return 1; +} + +/* Find value pointed to by val in the source pointer to by op. When found, + * return 1 and store its score in target. Return 0 otherwise. */ +int zuiFind(zsetopsrc *op, zsetopval *val, double *score) { + if (op->subject == NULL) + return 0; + + if (op->type == OBJ_SET) { + char *str = val->ele ? val->ele : (char *)val->estr; + size_t len = val->ele ? sdslen(val->ele) : val->elen; + if (setTypeIsMemberAux(op->subject, str, len, val->ell, val->ele != NULL)) { + *score = 1.0; + return 1; + } else { + return 0; + } + } else if (op->type == OBJ_ZSET) { + zuiSdsFromValue(val); + + if (op->encoding == OBJ_ENCODING_LISTPACK) { + if (zzlFind(op->subject->ptr,val->ele,score) != NULL) { + /* Score is already set by zzlFind. */ + return 1; + } else { + return 0; + } + } else if (op->encoding == OBJ_ENCODING_SKIPLIST) { + zset *zs = op->subject->ptr; + dictEntry *de; + if ((de = dictFind(zs->dict,val->ele)) != NULL) { + *score = *(double*)dictGetVal(de); + return 1; + } else { + return 0; + } + } else { + serverPanic("Unknown sorted set encoding"); + } + } else { + serverPanic("Unsupported type"); + } +} + +int zuiCompareByCardinality(const void *s1, const void *s2) { + unsigned long first = zuiLength((zsetopsrc*)s1); + unsigned long second = zuiLength((zsetopsrc*)s2); + if (first > second) return 1; + if (first < second) return -1; + return 0; +} + +static int zuiCompareByRevCardinality(const void *s1, const void *s2) { + return zuiCompareByCardinality(s1, s2) * -1; +} + +#define REDIS_AGGR_SUM 1 +#define REDIS_AGGR_MIN 2 +#define REDIS_AGGR_MAX 3 +#define zunionInterDictValue(_e) (dictGetVal(_e) == NULL ? 1.0 : *(double*)dictGetVal(_e)) + +inline static void zunionInterAggregate(double *target, double val, int aggregate) { + if (aggregate == REDIS_AGGR_SUM) { + *target = *target + val; + /* The result of adding two doubles is NaN when one variable + * is +inf and the other is -inf. When these numbers are added, + * we maintain the convention of the result being 0.0. */ + if (isnan(*target)) *target = 0.0; + } else if (aggregate == REDIS_AGGR_MIN) { + *target = val < *target ? val : *target; + } else if (aggregate == REDIS_AGGR_MAX) { + *target = val > *target ? val : *target; + } else { + /* safety net */ + serverPanic("Unknown ZUNION/INTER aggregate type"); + } +} + +static size_t zsetDictGetMaxElementLength(dict *d, size_t *totallen) { + dictIterator *di; + dictEntry *de; + size_t maxelelen = 0; + + di = dictGetIterator(d); + + while((de = dictNext(di)) != NULL) { + sds ele = dictGetKey(de); + if (sdslen(ele) > maxelelen) maxelelen = sdslen(ele); + if (totallen) + (*totallen) += sdslen(ele); + } + + dictReleaseIterator(di); + + return maxelelen; +} + +static void zdiffAlgorithm1(zsetopsrc *src, long setnum, zset *dstzset, size_t *maxelelen, size_t *totelelen) { + /* DIFF Algorithm 1: + * + * We perform the diff by iterating all the elements of the first set, + * and only adding it to the target set if the element does not exist + * into all the other sets. + * + * This way we perform at max N*M operations, where N is the size of + * the first set, and M the number of sets. + * + * There is also a O(K*log(K)) cost for adding the resulting elements + * to the target set, where K is the final size of the target set. + * + * The final complexity of this algorithm is O(N*M + K*log(K)). */ + int j; + zsetopval zval; + zskiplistNode *znode; + sds tmp; + + /* With algorithm 1 it is better to order the sets to subtract + * by decreasing size, so that we are more likely to find + * duplicated elements ASAP. */ + qsort(src+1,setnum-1,sizeof(zsetopsrc),zuiCompareByRevCardinality); + + memset(&zval, 0, sizeof(zval)); + zuiInitIterator(&src[0]); + while (zuiNext(&src[0],&zval)) { + double value; + int exists = 0; + + for (j = 1; j < setnum; j++) { + /* It is not safe to access the zset we are + * iterating, so explicitly check for equal object. + * This check isn't really needed anymore since we already + * check for a duplicate set in the zsetChooseDiffAlgorithm + * function, but we're leaving it for future-proofing. */ + if (src[j].subject == src[0].subject || + zuiFind(&src[j],&zval,&value)) { + exists = 1; + break; + } + } + + if (!exists) { + tmp = zuiNewSdsFromValue(&zval); + znode = zslInsert(dstzset->zsl,zval.score,tmp); + dictAdd(dstzset->dict,tmp,&znode->score); + if (sdslen(tmp) > *maxelelen) *maxelelen = sdslen(tmp); + (*totelelen) += sdslen(tmp); + } + } + zuiClearIterator(&src[0]); +} + + +static void zdiffAlgorithm2(zsetopsrc *src, long setnum, zset *dstzset, size_t *maxelelen, size_t *totelelen) { + /* DIFF Algorithm 2: + * + * Add all the elements of the first set to the auxiliary set. + * Then remove all the elements of all the next sets from it. + * + + * This is O(L + (N-K)log(N)) where L is the sum of all the elements in every + * set, N is the size of the first set, and K is the size of the result set. + * + * Note that from the (L-N) dict searches, (N-K) got to the zsetRemoveFromSkiplist + * which costs log(N) + * + * There is also a O(K) cost at the end for finding the largest element + * size, but this doesn't change the algorithm complexity since K < L, and + * O(2L) is the same as O(L). */ + int j; + int cardinality = 0; + zsetopval zval; + zskiplistNode *znode; + sds tmp; + + for (j = 0; j < setnum; j++) { + if (zuiLength(&src[j]) == 0) continue; + + memset(&zval, 0, sizeof(zval)); + zuiInitIterator(&src[j]); + while (zuiNext(&src[j],&zval)) { + if (j == 0) { + tmp = zuiNewSdsFromValue(&zval); + znode = zslInsert(dstzset->zsl,zval.score,tmp); + dictAdd(dstzset->dict,tmp,&znode->score); + cardinality++; + } else { + tmp = zuiSdsFromValue(&zval); + if (zsetRemoveFromSkiplist(dstzset, tmp)) { + cardinality--; + } + } + + /* Exit if result set is empty as any additional removal + * of elements will have no effect. */ + if (cardinality == 0) break; + } + zuiClearIterator(&src[j]); + + if (cardinality == 0) break; + } + + /* Resize dict if needed after removing multiple elements */ + if (htNeedsResize(dstzset->dict)) dictResize(dstzset->dict); + + /* Using this algorithm, we can't calculate the max element as we go, + * we have to iterate through all elements to find the max one after. */ + *maxelelen = zsetDictGetMaxElementLength(dstzset->dict, totelelen); +} + +static int zsetChooseDiffAlgorithm(zsetopsrc *src, long setnum) { + int j; + + /* Select what DIFF algorithm to use. + * + * Algorithm 1 is O(N*M + K*log(K)) where N is the size of the + * first set, M the total number of sets, and K is the size of the + * result set. + * + * Algorithm 2 is O(L + (N-K)log(N)) where L is the total number of elements + * in all the sets, N is the size of the first set, and K is the size of the + * result set. + * + * We compute what is the best bet with the current input here. */ + long long algo_one_work = 0; + long long algo_two_work = 0; + + for (j = 0; j < setnum; j++) { + /* If any other set is equal to the first set, there is nothing to be + * done, since we would remove all elements anyway. */ + if (j > 0 && src[0].subject == src[j].subject) { + return 0; + } + + algo_one_work += zuiLength(&src[0]); + algo_two_work += zuiLength(&src[j]); + } + + /* Algorithm 1 has better constant times and performs less operations + * if there are elements in common. Give it some advantage. */ + algo_one_work /= 2; + return (algo_one_work <= algo_two_work) ? 1 : 2; +} + +static void zdiff(zsetopsrc *src, long setnum, zset *dstzset, size_t *maxelelen, size_t *totelelen) { + /* Skip everything if the smallest input is empty. */ + if (zuiLength(&src[0]) > 0) { + int diff_algo = zsetChooseDiffAlgorithm(src, setnum); + if (diff_algo == 1) { + zdiffAlgorithm1(src, setnum, dstzset, maxelelen, totelelen); + } else if (diff_algo == 2) { + zdiffAlgorithm2(src, setnum, dstzset, maxelelen, totelelen); + } else if (diff_algo != 0) { + serverPanic("Unknown algorithm"); + } + } +} + +dictType setAccumulatorDictType = { + dictSdsHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCompare, /* key compare */ + NULL, /* key destructor */ + NULL, /* val destructor */ + NULL /* allow to expand */ +}; + +/* The zunionInterDiffGenericCommand() function is called in order to implement the + * following commands: ZUNION, ZINTER, ZDIFF, ZUNIONSTORE, ZINTERSTORE, ZDIFFSTORE, + * ZINTERCARD. + * + * 'numkeysIndex' parameter position of key number. for ZUNION/ZINTER/ZDIFF command, + * this value is 1, for ZUNIONSTORE/ZINTERSTORE/ZDIFFSTORE command, this value is 2. + * + * 'op' SET_OP_INTER, SET_OP_UNION or SET_OP_DIFF. + * + * 'cardinality_only' is currently only applicable when 'op' is SET_OP_INTER. + * Work for SINTERCARD, only return the cardinality with minimum processing and memory overheads. + */ +void zunionInterDiffGenericCommand(client *c, robj *dstkey, int numkeysIndex, int op, + int cardinality_only) { + int i, j; + long setnum; + int aggregate = REDIS_AGGR_SUM; + zsetopsrc *src; + zsetopval zval; + sds tmp; + size_t maxelelen = 0, totelelen = 0; + robj *dstobj = NULL; + zset *dstzset = NULL; + zskiplistNode *znode; + int withscores = 0; + unsigned long cardinality = 0; + long limit = 0; /* Stop searching after reaching the limit. 0 means unlimited. */ + + /* expect setnum input keys to be given */ + if ((getLongFromObjectOrReply(c, c->argv[numkeysIndex], &setnum, NULL) != C_OK)) + return; + + if (setnum < 1) { + addReplyErrorFormat(c, + "at least 1 input key is needed for '%s' command", c->cmd->fullname); + return; + } + + /* test if the expected number of keys would overflow */ + if (setnum > (c->argc-(numkeysIndex+1))) { + addReplyErrorObject(c,shared.syntaxerr); + return; + } + + /* read keys to be used for input */ + src = zcalloc(sizeof(zsetopsrc) * setnum); + for (i = 0, j = numkeysIndex+1; i < setnum; i++, j++) { + robj *obj = lookupKeyRead(c->db, c->argv[j]); + if (obj != NULL) { + if (obj->type != OBJ_ZSET && obj->type != OBJ_SET) { + zfree(src); + addReplyErrorObject(c,shared.wrongtypeerr); + return; + } + + src[i].subject = obj; + src[i].type = obj->type; + src[i].encoding = obj->encoding; + } else { + src[i].subject = NULL; + } + + /* Default all weights to 1. */ + src[i].weight = 1.0; + } + + /* parse optional extra arguments */ + if (j < c->argc) { + int remaining = c->argc - j; + + while (remaining) { + if (op != SET_OP_DIFF && !cardinality_only && + remaining >= (setnum + 1) && + !strcasecmp(c->argv[j]->ptr,"weights")) + { + j++; remaining--; + for (i = 0; i < setnum; i++, j++, remaining--) { + if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight, + "weight value is not a float") != C_OK) + { + zfree(src); + return; + } + } + } else if (op != SET_OP_DIFF && !cardinality_only && + remaining >= 2 && + !strcasecmp(c->argv[j]->ptr,"aggregate")) + { + j++; remaining--; + if (!strcasecmp(c->argv[j]->ptr,"sum")) { + aggregate = REDIS_AGGR_SUM; + } else if (!strcasecmp(c->argv[j]->ptr,"min")) { + aggregate = REDIS_AGGR_MIN; + } else if (!strcasecmp(c->argv[j]->ptr,"max")) { + aggregate = REDIS_AGGR_MAX; + } else { + zfree(src); + addReplyErrorObject(c,shared.syntaxerr); + return; + } + j++; remaining--; + } else if (remaining >= 1 && + !dstkey && !cardinality_only && + !strcasecmp(c->argv[j]->ptr,"withscores")) + { + j++; remaining--; + withscores = 1; + } else if (cardinality_only && remaining >= 2 && + !strcasecmp(c->argv[j]->ptr, "limit")) + { + j++; remaining--; + if (getPositiveLongFromObjectOrReply(c, c->argv[j], &limit, + "LIMIT can't be negative") != C_OK) + { + zfree(src); + return; + } + j++; remaining--; + } else { + zfree(src); + addReplyErrorObject(c,shared.syntaxerr); + return; + } + } + } + + if (op != SET_OP_DIFF) { + /* sort sets from the smallest to largest, this will improve our + * algorithm's performance */ + qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality); + } + + /* We need a temp zset object to store our union/inter/diff. If the dstkey + * is not NULL (that is, we are inside an ZUNIONSTORE/ZINTERSTORE/ZDIFFSTORE operation) then + * this zset object will be the resulting object to zset into the target key. + * In SINTERCARD case, we don't need the temp obj, so we can avoid creating it. */ + if (!cardinality_only) { + dstobj = createZsetObject(); + dstzset = dstobj->ptr; + } + memset(&zval, 0, sizeof(zval)); + + if (op == SET_OP_INTER) { + /* Skip everything if the smallest input is empty. */ + if (zuiLength(&src[0]) > 0) { + /* Precondition: as src[0] is non-empty and the inputs are ordered + * by size, all src[i > 0] are non-empty too. */ + zuiInitIterator(&src[0]); + while (zuiNext(&src[0],&zval)) { + double score, value; + + score = src[0].weight * zval.score; + if (isnan(score)) score = 0; + + for (j = 1; j < setnum; j++) { + /* It is not safe to access the zset we are + * iterating, so explicitly check for equal object. */ + if (src[j].subject == src[0].subject) { + value = zval.score*src[j].weight; + zunionInterAggregate(&score,value,aggregate); + } else if (zuiFind(&src[j],&zval,&value)) { + value *= src[j].weight; + zunionInterAggregate(&score,value,aggregate); + } else { + break; + } + } + + /* Only continue when present in every input. */ + if (j == setnum && cardinality_only) { + cardinality++; + + /* We stop the searching after reaching the limit. */ + if (limit && cardinality >= (unsigned long)limit) { + /* Cleanup before we break the zuiNext loop. */ + zuiDiscardDirtyValue(&zval); + break; + } + } else if (j == setnum) { + tmp = zuiNewSdsFromValue(&zval); + znode = zslInsert(dstzset->zsl,score,tmp); + dictAdd(dstzset->dict,tmp,&znode->score); + totelelen += sdslen(tmp); + if (sdslen(tmp) > maxelelen) maxelelen = sdslen(tmp); + } + } + zuiClearIterator(&src[0]); + } + } else if (op == SET_OP_UNION) { + dict *accumulator = dictCreate(&setAccumulatorDictType); + dictIterator *di; + dictEntry *de, *existing; + double score; + + if (setnum) { + /* Our union is at least as large as the largest set. + * Resize the dictionary ASAP to avoid useless rehashing. */ + dictExpand(accumulator,zuiLength(&src[setnum-1])); + } + + /* Step 1: Create a dictionary of elements -> aggregated-scores + * by iterating one sorted set after the other. */ + for (i = 0; i < setnum; i++) { + if (zuiLength(&src[i]) == 0) continue; + + zuiInitIterator(&src[i]); + while (zuiNext(&src[i],&zval)) { + /* Initialize value */ + score = src[i].weight * zval.score; + if (isnan(score)) score = 0; + + /* Search for this element in the accumulating dictionary. */ + de = dictAddRaw(accumulator,zuiSdsFromValue(&zval),&existing); + /* If we don't have it, we need to create a new entry. */ + if (!existing) { + tmp = zuiNewSdsFromValue(&zval); + /* Remember the longest single element encountered, + * to understand if it's possible to convert to listpack + * at the end. */ + totelelen += sdslen(tmp); + if (sdslen(tmp) > maxelelen) maxelelen = sdslen(tmp); + /* Update the element with its initial score. */ + dictSetKey(accumulator, de, tmp); + dictSetDoubleVal(de,score); + } else { + /* Update the score with the score of the new instance + * of the element found in the current sorted set. + * + * Here we access directly the dictEntry double + * value inside the union as it is a big speedup + * compared to using the getDouble/setDouble API. */ + double *existing_score_ptr = dictGetDoubleValPtr(existing); + zunionInterAggregate(existing_score_ptr, score, aggregate); + } + } + zuiClearIterator(&src[i]); + } + + /* Step 2: convert the dictionary into the final sorted set. */ + di = dictGetIterator(accumulator); + + /* We now are aware of the final size of the resulting sorted set, + * let's resize the dictionary embedded inside the sorted set to the + * right size, in order to save rehashing time. */ + dictExpand(dstzset->dict,dictSize(accumulator)); + + while((de = dictNext(di)) != NULL) { + sds ele = dictGetKey(de); + score = dictGetDoubleVal(de); + znode = zslInsert(dstzset->zsl,score,ele); + dictAdd(dstzset->dict,ele,&znode->score); + } + dictReleaseIterator(di); + dictRelease(accumulator); + } else if (op == SET_OP_DIFF) { + zdiff(src, setnum, dstzset, &maxelelen, &totelelen); + } else { + serverPanic("Unknown operator"); + } + + if (dstkey) { + if (dstzset->zsl->length) { + zsetConvertToListpackIfNeeded(dstobj, maxelelen, totelelen); + setKey(c, c->db, dstkey, dstobj, 0); + addReplyLongLong(c, zsetLength(dstobj)); + notifyKeyspaceEvent(NOTIFY_ZSET, + (op == SET_OP_UNION) ? "zunionstore" : + (op == SET_OP_INTER ? "zinterstore" : "zdiffstore"), + dstkey, c->db->id); + server.dirty++; + } else { + addReply(c, shared.czero); + if (dbDelete(c->db, dstkey)) { + signalModifiedKey(c, c->db, dstkey); + notifyKeyspaceEvent(NOTIFY_GENERIC, "del", dstkey, c->db->id); + server.dirty++; + } + } + decrRefCount(dstobj); + } else if (cardinality_only) { + addReplyLongLong(c, cardinality); + } else { + unsigned long length = dstzset->zsl->length; + zskiplist *zsl = dstzset->zsl; + zskiplistNode *zn = zsl->header->level[0].forward; + /* In case of WITHSCORES, respond with a single array in RESP2, and + * nested arrays in RESP3. We can't use a map response type since the + * client library needs to know to respect the order. */ + if (withscores && c->resp == 2) + addReplyArrayLen(c, length*2); + else + addReplyArrayLen(c, length); + + while (zn != NULL) { + if (withscores && c->resp > 2) addReplyArrayLen(c,2); + addReplyBulkCBuffer(c,zn->ele,sdslen(zn->ele)); + if (withscores) addReplyDouble(c,zn->score); + zn = zn->level[0].forward; + } + server.lazyfree_lazy_server_del ? freeObjAsync(NULL, dstobj, -1) : + decrRefCount(dstobj); + } + zfree(src); +} + +/* ZUNIONSTORE destination numkeys key [key ...] [WEIGHTS weight] [AGGREGATE SUM|MIN|MAX] */ +void zunionstoreCommand(client *c) { + zunionInterDiffGenericCommand(c, c->argv[1], 2, SET_OP_UNION, 0); +} + +/* ZINTERSTORE destination numkeys key [key ...] [WEIGHTS weight] [AGGREGATE SUM|MIN|MAX] */ +void zinterstoreCommand(client *c) { + zunionInterDiffGenericCommand(c, c->argv[1], 2, SET_OP_INTER, 0); +} + +/* ZDIFFSTORE destination numkeys key [key ...] */ +void zdiffstoreCommand(client *c) { + zunionInterDiffGenericCommand(c, c->argv[1], 2, SET_OP_DIFF, 0); +} + +/* ZUNION numkeys key [key ...] [WEIGHTS weight] [AGGREGATE SUM|MIN|MAX] [WITHSCORES] */ +void zunionCommand(client *c) { + zunionInterDiffGenericCommand(c, NULL, 1, SET_OP_UNION, 0); +} + +/* ZINTER numkeys key [key ...] [WEIGHTS weight] [AGGREGATE SUM|MIN|MAX] [WITHSCORES] */ +void zinterCommand(client *c) { + zunionInterDiffGenericCommand(c, NULL, 1, SET_OP_INTER, 0); +} + +/* ZINTERCARD numkeys key [key ...] [LIMIT limit] */ +void zinterCardCommand(client *c) { + zunionInterDiffGenericCommand(c, NULL, 1, SET_OP_INTER, 1); +} + +/* ZDIFF numkeys key [key ...] [WITHSCORES] */ +void zdiffCommand(client *c) { + zunionInterDiffGenericCommand(c, NULL, 1, SET_OP_DIFF, 0); +} + +typedef enum { + ZRANGE_DIRECTION_AUTO = 0, + ZRANGE_DIRECTION_FORWARD, + ZRANGE_DIRECTION_REVERSE +} zrange_direction; + +typedef enum { + ZRANGE_CONSUMER_TYPE_CLIENT = 0, + ZRANGE_CONSUMER_TYPE_INTERNAL +} zrange_consumer_type; + +typedef struct zrange_result_handler zrange_result_handler; + +typedef void (*zrangeResultBeginFunction)(zrange_result_handler *c, long length); +typedef void (*zrangeResultFinalizeFunction)( + zrange_result_handler *c, size_t result_count); +typedef void (*zrangeResultEmitCBufferFunction)( + zrange_result_handler *c, const void *p, size_t len, double score); +typedef void (*zrangeResultEmitLongLongFunction)( + zrange_result_handler *c, long long ll, double score); + +void zrangeGenericCommand (zrange_result_handler *handler, int argc_start, int store, + zrange_type rangetype, zrange_direction direction); + +/* Interface struct for ZRANGE/ZRANGESTORE generic implementation. + * There is one implementation of this interface that sends a RESP reply to clients. + * and one implementation that stores the range result into a zset object. */ +struct zrange_result_handler { + zrange_consumer_type type; + client *client; + robj *dstkey; + robj *dstobj; + void *userdata; + int withscores; + int should_emit_array_length; + zrangeResultBeginFunction beginResultEmission; + zrangeResultFinalizeFunction finalizeResultEmission; + zrangeResultEmitCBufferFunction emitResultFromCBuffer; + zrangeResultEmitLongLongFunction emitResultFromLongLong; +}; + +/* Result handler methods for responding the ZRANGE to clients. + * length can be used to provide the result length in advance (avoids deferred reply overhead). + * length can be set to -1 if the result length is not know in advance. + */ +static void zrangeResultBeginClient(zrange_result_handler *handler, long length) { + if (length > 0) { + /* In case of WITHSCORES, respond with a single array in RESP2, and + * nested arrays in RESP3. We can't use a map response type since the + * client library needs to know to respect the order. */ + if (handler->withscores && (handler->client->resp == 2)) { + length *= 2; + } + addReplyArrayLen(handler->client, length); + handler->userdata = NULL; + return; + } + handler->userdata = addReplyDeferredLen(handler->client); +} + +static void zrangeResultEmitCBufferToClient(zrange_result_handler *handler, + const void *value, size_t value_length_in_bytes, double score) +{ + if (handler->should_emit_array_length) { + addReplyArrayLen(handler->client, 2); + } + + addReplyBulkCBuffer(handler->client, value, value_length_in_bytes); + + if (handler->withscores) { + addReplyDouble(handler->client, score); + } +} + +static void zrangeResultEmitLongLongToClient(zrange_result_handler *handler, + long long value, double score) +{ + if (handler->should_emit_array_length) { + addReplyArrayLen(handler->client, 2); + } + + addReplyBulkLongLong(handler->client, value); + + if (handler->withscores) { + addReplyDouble(handler->client, score); + } +} + +static void zrangeResultFinalizeClient(zrange_result_handler *handler, + size_t result_count) +{ + /* If the reply size was know at start there's nothing left to do */ + if (!handler->userdata) + return; + /* In case of WITHSCORES, respond with a single array in RESP2, and + * nested arrays in RESP3. We can't use a map response type since the + * client library needs to know to respect the order. */ + if (handler->withscores && (handler->client->resp == 2)) { + result_count *= 2; + } + + setDeferredArrayLen(handler->client, handler->userdata, result_count); +} + +/* Result handler methods for storing the ZRANGESTORE to a zset. */ +static void zrangeResultBeginStore(zrange_result_handler *handler, long length) +{ + handler->dstobj = zsetTypeCreate(length, 0); +} + +static void zrangeResultEmitCBufferForStore(zrange_result_handler *handler, + const void *value, size_t value_length_in_bytes, double score) +{ + double newscore; + int retflags = 0; + sds ele = sdsnewlen(value, value_length_in_bytes); + int retval = zsetAdd(handler->dstobj, score, ele, ZADD_IN_NONE, &retflags, &newscore); + sdsfree(ele); + serverAssert(retval); +} + +static void zrangeResultEmitLongLongForStore(zrange_result_handler *handler, + long long value, double score) +{ + double newscore; + int retflags = 0; + sds ele = sdsfromlonglong(value); + int retval = zsetAdd(handler->dstobj, score, ele, ZADD_IN_NONE, &retflags, &newscore); + sdsfree(ele); + serverAssert(retval); +} + +static void zrangeResultFinalizeStore(zrange_result_handler *handler, size_t result_count) +{ + if (result_count) { + setKey(handler->client, handler->client->db, handler->dstkey, handler->dstobj, 0); + addReplyLongLong(handler->client, result_count); + notifyKeyspaceEvent(NOTIFY_ZSET, "zrangestore", handler->dstkey, handler->client->db->id); + server.dirty++; + } else { + addReply(handler->client, shared.czero); + if (dbDelete(handler->client->db, handler->dstkey)) { + signalModifiedKey(handler->client, handler->client->db, handler->dstkey); + notifyKeyspaceEvent(NOTIFY_GENERIC, "del", handler->dstkey, handler->client->db->id); + server.dirty++; + } + } + decrRefCount(handler->dstobj); +} + +/* Initialize the consumer interface type with the requested type. */ +static void zrangeResultHandlerInit(zrange_result_handler *handler, + client *client, zrange_consumer_type type) +{ + memset(handler, 0, sizeof(*handler)); + + handler->client = client; + + switch (type) { + case ZRANGE_CONSUMER_TYPE_CLIENT: + handler->beginResultEmission = zrangeResultBeginClient; + handler->finalizeResultEmission = zrangeResultFinalizeClient; + handler->emitResultFromCBuffer = zrangeResultEmitCBufferToClient; + handler->emitResultFromLongLong = zrangeResultEmitLongLongToClient; + break; + + case ZRANGE_CONSUMER_TYPE_INTERNAL: + handler->beginResultEmission = zrangeResultBeginStore; + handler->finalizeResultEmission = zrangeResultFinalizeStore; + handler->emitResultFromCBuffer = zrangeResultEmitCBufferForStore; + handler->emitResultFromLongLong = zrangeResultEmitLongLongForStore; + break; + } +} + +static void zrangeResultHandlerScoreEmissionEnable(zrange_result_handler *handler) { + handler->withscores = 1; + handler->should_emit_array_length = (handler->client->resp > 2); +} + +static void zrangeResultHandlerDestinationKeySet (zrange_result_handler *handler, + robj *dstkey) +{ + handler->dstkey = dstkey; +} + +/* This command implements ZRANGE, ZREVRANGE. */ +void genericZrangebyrankCommand(zrange_result_handler *handler, + robj *zobj, long start, long end, int withscores, int reverse) { + + client *c = handler->client; + long llen; + long rangelen; + size_t result_cardinality; + + /* Sanitize indexes. */ + llen = zsetLength(zobj); + if (start < 0) start = llen+start; + if (end < 0) end = llen+end; + if (start < 0) start = 0; + + + /* Invariant: start >= 0, so this test will be true when end < 0. + * The range is empty when start > end or start >= length. */ + if (start > end || start >= llen) { + handler->beginResultEmission(handler, 0); + handler->finalizeResultEmission(handler, 0); + return; + } + if (end >= llen) end = llen-1; + rangelen = (end-start)+1; + result_cardinality = rangelen; + + handler->beginResultEmission(handler, rangelen); + if (zobj->encoding == OBJ_ENCODING_LISTPACK) { + unsigned char *zl = zobj->ptr; + unsigned char *eptr, *sptr; + unsigned char *vstr; + unsigned int vlen; + long long vlong; + double score = 0.0; + + if (reverse) + eptr = lpSeek(zl,-2-(2*start)); + else + eptr = lpSeek(zl,2*start); + + serverAssertWithInfo(c,zobj,eptr != NULL); + sptr = lpNext(zl,eptr); + + while (rangelen--) { + serverAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL); + vstr = lpGetValue(eptr,&vlen,&vlong); + + if (withscores) /* don't bother to extract the score if it's gonna be ignored. */ + score = zzlGetScore(sptr); + + if (vstr == NULL) { + handler->emitResultFromLongLong(handler, vlong, score); + } else { + handler->emitResultFromCBuffer(handler, vstr, vlen, score); + } + + if (reverse) + zzlPrev(zl,&eptr,&sptr); + else + zzlNext(zl,&eptr,&sptr); + } + + } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { + zset *zs = zobj->ptr; + zskiplist *zsl = zs->zsl; + zskiplistNode *ln; + + /* Check if starting point is trivial, before doing log(N) lookup. */ + if (reverse) { + ln = zsl->tail; + if (start > 0) + ln = zslGetElementByRank(zsl,llen-start); + } else { + ln = zsl->header->level[0].forward; + if (start > 0) + ln = zslGetElementByRank(zsl,start+1); + } + + while(rangelen--) { + serverAssertWithInfo(c,zobj,ln != NULL); + sds ele = ln->ele; + handler->emitResultFromCBuffer(handler, ele, sdslen(ele), ln->score); + ln = reverse ? ln->backward : ln->level[0].forward; + } + } else { + serverPanic("Unknown sorted set encoding"); + } + + handler->finalizeResultEmission(handler, result_cardinality); +} + +/* ZRANGESTORE <dst> <src> <min> <max> [BYSCORE | BYLEX] [REV] [LIMIT offset count] */ +void zrangestoreCommand (client *c) { + robj *dstkey = c->argv[1]; + zrange_result_handler handler; + zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_INTERNAL); + zrangeResultHandlerDestinationKeySet(&handler, dstkey); + zrangeGenericCommand(&handler, 2, 1, ZRANGE_AUTO, ZRANGE_DIRECTION_AUTO); +} + +/* ZRANGE <key> <min> <max> [BYSCORE | BYLEX] [REV] [WITHSCORES] [LIMIT offset count] */ +void zrangeCommand(client *c) { + zrange_result_handler handler; + zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT); + zrangeGenericCommand(&handler, 1, 0, ZRANGE_AUTO, ZRANGE_DIRECTION_AUTO); +} + +/* ZREVRANGE <key> <start> <stop> [WITHSCORES] */ +void zrevrangeCommand(client *c) { + zrange_result_handler handler; + zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT); + zrangeGenericCommand(&handler, 1, 0, ZRANGE_RANK, ZRANGE_DIRECTION_REVERSE); +} + +/* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */ +void genericZrangebyscoreCommand(zrange_result_handler *handler, + zrangespec *range, robj *zobj, long offset, long limit, + int reverse) { + unsigned long rangelen = 0; + + handler->beginResultEmission(handler, -1); + + /* For invalid offset, return directly. */ + if (offset > 0 && offset >= (long)zsetLength(zobj)) { + handler->finalizeResultEmission(handler, 0); + return; + } + + if (zobj->encoding == OBJ_ENCODING_LISTPACK) { + unsigned char *zl = zobj->ptr; + unsigned char *eptr, *sptr; + unsigned char *vstr; + unsigned int vlen; + long long vlong; + + /* If reversed, get the last node in range as starting point. */ + if (reverse) { + eptr = zzlLastInRange(zl,range); + } else { + eptr = zzlFirstInRange(zl,range); + } + + /* Get score pointer for the first element. */ + if (eptr) + sptr = lpNext(zl,eptr); + + /* If there is an offset, just traverse the number of elements without + * checking the score because that is done in the next loop. */ + while (eptr && offset--) { + if (reverse) { + zzlPrev(zl,&eptr,&sptr); + } else { + zzlNext(zl,&eptr,&sptr); + } + } + + while (eptr && limit--) { + double score = zzlGetScore(sptr); + + /* Abort when the node is no longer in range. */ + if (reverse) { + if (!zslValueGteMin(score,range)) break; + } else { + if (!zslValueLteMax(score,range)) break; + } + + vstr = lpGetValue(eptr,&vlen,&vlong); + rangelen++; + if (vstr == NULL) { + handler->emitResultFromLongLong(handler, vlong, score); + } else { + handler->emitResultFromCBuffer(handler, vstr, vlen, score); + } + + /* Move to next node */ + if (reverse) { + zzlPrev(zl,&eptr,&sptr); + } else { + zzlNext(zl,&eptr,&sptr); + } + } + } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { + zset *zs = zobj->ptr; + zskiplist *zsl = zs->zsl; + zskiplistNode *ln; + + /* If reversed, get the last node in range as starting point. */ + if (reverse) { + ln = zslLastInRange(zsl,range); + } else { + ln = zslFirstInRange(zsl,range); + } + + /* If there is an offset, just traverse the number of elements without + * checking the score because that is done in the next loop. */ + while (ln && offset--) { + if (reverse) { + ln = ln->backward; + } else { + ln = ln->level[0].forward; + } + } + + while (ln && limit--) { + /* Abort when the node is no longer in range. */ + if (reverse) { + if (!zslValueGteMin(ln->score,range)) break; + } else { + if (!zslValueLteMax(ln->score,range)) break; + } + + rangelen++; + handler->emitResultFromCBuffer(handler, ln->ele, sdslen(ln->ele), ln->score); + + /* Move to next node */ + if (reverse) { + ln = ln->backward; + } else { + ln = ln->level[0].forward; + } + } + } else { + serverPanic("Unknown sorted set encoding"); + } + + handler->finalizeResultEmission(handler, rangelen); +} + +/* ZRANGEBYSCORE <key> <min> <max> [WITHSCORES] [LIMIT offset count] */ +void zrangebyscoreCommand(client *c) { + zrange_result_handler handler; + zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT); + zrangeGenericCommand(&handler, 1, 0, ZRANGE_SCORE, ZRANGE_DIRECTION_FORWARD); +} + +/* ZREVRANGEBYSCORE <key> <max> <min> [WITHSCORES] [LIMIT offset count] */ +void zrevrangebyscoreCommand(client *c) { + zrange_result_handler handler; + zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT); + zrangeGenericCommand(&handler, 1, 0, ZRANGE_SCORE, ZRANGE_DIRECTION_REVERSE); +} + +void zcountCommand(client *c) { + robj *key = c->argv[1]; + robj *zobj; + zrangespec range; + unsigned long count = 0; + + /* Parse the range arguments */ + if (zslParseRange(c->argv[2],c->argv[3],&range) != C_OK) { + addReplyError(c,"min or max is not a float"); + return; + } + + /* Lookup the sorted set */ + if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL || + checkType(c, zobj, OBJ_ZSET)) return; + + if (zobj->encoding == OBJ_ENCODING_LISTPACK) { + unsigned char *zl = zobj->ptr; + unsigned char *eptr, *sptr; + double score; + + /* Use the first element in range as the starting point */ + eptr = zzlFirstInRange(zl,&range); + + /* No "first" element */ + if (eptr == NULL) { + addReply(c, shared.czero); + return; + } + + /* First element is in range */ + sptr = lpNext(zl,eptr); + score = zzlGetScore(sptr); + serverAssertWithInfo(c,zobj,zslValueLteMax(score,&range)); + + /* Iterate over elements in range */ + while (eptr) { + score = zzlGetScore(sptr); + + /* Abort when the node is no longer in range. */ + if (!zslValueLteMax(score,&range)) { + break; + } else { + count++; + zzlNext(zl,&eptr,&sptr); + } + } + } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { + zset *zs = zobj->ptr; + zskiplist *zsl = zs->zsl; + zskiplistNode *zn; + unsigned long rank; + + /* Find first element in range */ + zn = zslFirstInRange(zsl, &range); + + /* Use rank of first element, if any, to determine preliminary count */ + if (zn != NULL) { + rank = zslGetRank(zsl, zn->score, zn->ele); + count = (zsl->length - (rank - 1)); + + /* Find last element in range */ + zn = zslLastInRange(zsl, &range); + + /* Use rank of last element, if any, to determine the actual count */ + if (zn != NULL) { + rank = zslGetRank(zsl, zn->score, zn->ele); + count -= (zsl->length - rank); + } + } + } else { + serverPanic("Unknown sorted set encoding"); + } + + addReplyLongLong(c, count); +} + +void zlexcountCommand(client *c) { + robj *key = c->argv[1]; + robj *zobj; + zlexrangespec range; + unsigned long count = 0; + + /* Parse the range arguments */ + if (zslParseLexRange(c->argv[2],c->argv[3],&range) != C_OK) { + addReplyError(c,"min or max not valid string range item"); + return; + } + + /* Lookup the sorted set */ + if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL || + checkType(c, zobj, OBJ_ZSET)) + { + zslFreeLexRange(&range); + return; + } + + if (zobj->encoding == OBJ_ENCODING_LISTPACK) { + unsigned char *zl = zobj->ptr; + unsigned char *eptr, *sptr; + + /* Use the first element in range as the starting point */ + eptr = zzlFirstInLexRange(zl,&range); + + /* No "first" element */ + if (eptr == NULL) { + zslFreeLexRange(&range); + addReply(c, shared.czero); + return; + } + + /* First element is in range */ + sptr = lpNext(zl,eptr); + serverAssertWithInfo(c,zobj,zzlLexValueLteMax(eptr,&range)); + + /* Iterate over elements in range */ + while (eptr) { + /* Abort when the node is no longer in range. */ + if (!zzlLexValueLteMax(eptr,&range)) { + break; + } else { + count++; + zzlNext(zl,&eptr,&sptr); + } + } + } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { + zset *zs = zobj->ptr; + zskiplist *zsl = zs->zsl; + zskiplistNode *zn; + unsigned long rank; + + /* Find first element in range */ + zn = zslFirstInLexRange(zsl, &range); + + /* Use rank of first element, if any, to determine preliminary count */ + if (zn != NULL) { + rank = zslGetRank(zsl, zn->score, zn->ele); + count = (zsl->length - (rank - 1)); + + /* Find last element in range */ + zn = zslLastInLexRange(zsl, &range); + + /* Use rank of last element, if any, to determine the actual count */ + if (zn != NULL) { + rank = zslGetRank(zsl, zn->score, zn->ele); + count -= (zsl->length - rank); + } + } + } else { + serverPanic("Unknown sorted set encoding"); + } + + zslFreeLexRange(&range); + addReplyLongLong(c, count); +} + +/* This command implements ZRANGEBYLEX, ZREVRANGEBYLEX. */ +void genericZrangebylexCommand(zrange_result_handler *handler, + zlexrangespec *range, robj *zobj, int withscores, long offset, long limit, + int reverse) +{ + unsigned long rangelen = 0; + + handler->beginResultEmission(handler, -1); + + if (zobj->encoding == OBJ_ENCODING_LISTPACK) { + unsigned char *zl = zobj->ptr; + unsigned char *eptr, *sptr; + unsigned char *vstr; + unsigned int vlen; + long long vlong; + + /* If reversed, get the last node in range as starting point. */ + if (reverse) { + eptr = zzlLastInLexRange(zl,range); + } else { + eptr = zzlFirstInLexRange(zl,range); + } + + /* Get score pointer for the first element. */ + if (eptr) + sptr = lpNext(zl,eptr); + + /* If there is an offset, just traverse the number of elements without + * checking the score because that is done in the next loop. */ + while (eptr && offset--) { + if (reverse) { + zzlPrev(zl,&eptr,&sptr); + } else { + zzlNext(zl,&eptr,&sptr); + } + } + + while (eptr && limit--) { + double score = 0; + if (withscores) /* don't bother to extract the score if it's gonna be ignored. */ + score = zzlGetScore(sptr); + + /* Abort when the node is no longer in range. */ + if (reverse) { + if (!zzlLexValueGteMin(eptr,range)) break; + } else { + if (!zzlLexValueLteMax(eptr,range)) break; + } + + vstr = lpGetValue(eptr,&vlen,&vlong); + rangelen++; + if (vstr == NULL) { + handler->emitResultFromLongLong(handler, vlong, score); + } else { + handler->emitResultFromCBuffer(handler, vstr, vlen, score); + } + + /* Move to next node */ + if (reverse) { + zzlPrev(zl,&eptr,&sptr); + } else { + zzlNext(zl,&eptr,&sptr); + } + } + } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { + zset *zs = zobj->ptr; + zskiplist *zsl = zs->zsl; + zskiplistNode *ln; + + /* If reversed, get the last node in range as starting point. */ + if (reverse) { + ln = zslLastInLexRange(zsl,range); + } else { + ln = zslFirstInLexRange(zsl,range); + } + + /* If there is an offset, just traverse the number of elements without + * checking the score because that is done in the next loop. */ + while (ln && offset--) { + if (reverse) { + ln = ln->backward; + } else { + ln = ln->level[0].forward; + } + } + + while (ln && limit--) { + /* Abort when the node is no longer in range. */ + if (reverse) { + if (!zslLexValueGteMin(ln->ele,range)) break; + } else { + if (!zslLexValueLteMax(ln->ele,range)) break; + } + + rangelen++; + handler->emitResultFromCBuffer(handler, ln->ele, sdslen(ln->ele), ln->score); + + /* Move to next node */ + if (reverse) { + ln = ln->backward; + } else { + ln = ln->level[0].forward; + } + } + } else { + serverPanic("Unknown sorted set encoding"); + } + + handler->finalizeResultEmission(handler, rangelen); +} + +/* ZRANGEBYLEX <key> <min> <max> [LIMIT offset count] */ +void zrangebylexCommand(client *c) { + zrange_result_handler handler; + zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT); + zrangeGenericCommand(&handler, 1, 0, ZRANGE_LEX, ZRANGE_DIRECTION_FORWARD); +} + +/* ZREVRANGEBYLEX <key> <max> <min> [LIMIT offset count] */ +void zrevrangebylexCommand(client *c) { + zrange_result_handler handler; + zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT); + zrangeGenericCommand(&handler, 1, 0, ZRANGE_LEX, ZRANGE_DIRECTION_REVERSE); +} + +/** + * This function handles ZRANGE and ZRANGESTORE, and also the deprecated + * Z[REV]RANGE[BYSCORE|BYLEX] commands. + * + * The simple ZRANGE and ZRANGESTORE can take _AUTO in rangetype and direction, + * other command pass explicit value. + * + * The argc_start points to the src key argument, so following syntax is like: + * <src> <min> <max> [BYSCORE | BYLEX] [REV] [WITHSCORES] [LIMIT offset count] + */ +void zrangeGenericCommand(zrange_result_handler *handler, int argc_start, int store, + zrange_type rangetype, zrange_direction direction) +{ + client *c = handler->client; + robj *key = c->argv[argc_start]; + robj *zobj; + zrangespec range; + zlexrangespec lexrange; + int minidx = argc_start + 1; + int maxidx = argc_start + 2; + + /* Options common to all */ + long opt_start = 0; + long opt_end = 0; + int opt_withscores = 0; + long opt_offset = 0; + long opt_limit = -1; + + /* Step 1: Skip the <src> <min> <max> args and parse remaining optional arguments. */ + for (int j=argc_start + 3; j < c->argc; j++) { + int leftargs = c->argc-j-1; + if (!store && !strcasecmp(c->argv[j]->ptr,"withscores")) { + opt_withscores = 1; + } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) { + if ((getLongFromObjectOrReply(c, c->argv[j+1], &opt_offset, NULL) != C_OK) || + (getLongFromObjectOrReply(c, c->argv[j+2], &opt_limit, NULL) != C_OK)) + { + return; + } + j += 2; + } else if (direction == ZRANGE_DIRECTION_AUTO && + !strcasecmp(c->argv[j]->ptr,"rev")) + { + direction = ZRANGE_DIRECTION_REVERSE; + } else if (rangetype == ZRANGE_AUTO && + !strcasecmp(c->argv[j]->ptr,"bylex")) + { + rangetype = ZRANGE_LEX; + } else if (rangetype == ZRANGE_AUTO && + !strcasecmp(c->argv[j]->ptr,"byscore")) + { + rangetype = ZRANGE_SCORE; + } else { + addReplyErrorObject(c,shared.syntaxerr); + return; + } + } + + /* Use defaults if not overridden by arguments. */ + if (direction == ZRANGE_DIRECTION_AUTO) + direction = ZRANGE_DIRECTION_FORWARD; + if (rangetype == ZRANGE_AUTO) + rangetype = ZRANGE_RANK; + + /* Check for conflicting arguments. */ + if (opt_limit != -1 && rangetype == ZRANGE_RANK) { + addReplyError(c,"syntax error, LIMIT is only supported in combination with either BYSCORE or BYLEX"); + return; + } + if (opt_withscores && rangetype == ZRANGE_LEX) { + addReplyError(c,"syntax error, WITHSCORES not supported in combination with BYLEX"); + return; + } + + if (direction == ZRANGE_DIRECTION_REVERSE && + ((ZRANGE_SCORE == rangetype) || (ZRANGE_LEX == rangetype))) + { + /* Range is given as [max,min] */ + int tmp = maxidx; + maxidx = minidx; + minidx = tmp; + } + + /* Step 2: Parse the range. */ + switch (rangetype) { + case ZRANGE_AUTO: + case ZRANGE_RANK: + /* Z[REV]RANGE, ZRANGESTORE [REV]RANGE */ + if ((getLongFromObjectOrReply(c, c->argv[minidx], &opt_start,NULL) != C_OK) || + (getLongFromObjectOrReply(c, c->argv[maxidx], &opt_end,NULL) != C_OK)) + { + return; + } + break; + + case ZRANGE_SCORE: + /* Z[REV]RANGEBYSCORE, ZRANGESTORE [REV]RANGEBYSCORE */ + if (zslParseRange(c->argv[minidx], c->argv[maxidx], &range) != C_OK) { + addReplyError(c, "min or max is not a float"); + return; + } + break; + + case ZRANGE_LEX: + /* Z[REV]RANGEBYLEX, ZRANGESTORE [REV]RANGEBYLEX */ + if (zslParseLexRange(c->argv[minidx], c->argv[maxidx], &lexrange) != C_OK) { + addReplyError(c, "min or max not valid string range item"); + return; + } + break; + } + + if (opt_withscores || store) { + zrangeResultHandlerScoreEmissionEnable(handler); + } + + /* Step 3: Lookup the key and get the range. */ + zobj = lookupKeyRead(c->db, key); + if (zobj == NULL) { + if (store) { + handler->beginResultEmission(handler, -1); + handler->finalizeResultEmission(handler, 0); + } else { + addReply(c, shared.emptyarray); + } + goto cleanup; + } + + if (checkType(c,zobj,OBJ_ZSET)) goto cleanup; + + /* Step 4: Pass this to the command-specific handler. */ + switch (rangetype) { + case ZRANGE_AUTO: + case ZRANGE_RANK: + genericZrangebyrankCommand(handler, zobj, opt_start, opt_end, + opt_withscores || store, direction == ZRANGE_DIRECTION_REVERSE); + break; + + case ZRANGE_SCORE: + genericZrangebyscoreCommand(handler, &range, zobj, opt_offset, + opt_limit, direction == ZRANGE_DIRECTION_REVERSE); + break; + + case ZRANGE_LEX: + genericZrangebylexCommand(handler, &lexrange, zobj, opt_withscores || store, + opt_offset, opt_limit, direction == ZRANGE_DIRECTION_REVERSE); + break; + } + + /* Instead of returning here, we'll just fall-through the clean-up. */ + +cleanup: + + if (rangetype == ZRANGE_LEX) { + zslFreeLexRange(&lexrange); + } +} + +void zcardCommand(client *c) { + robj *key = c->argv[1]; + robj *zobj; + + if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL || + checkType(c,zobj,OBJ_ZSET)) return; + + addReplyLongLong(c,zsetLength(zobj)); +} + +void zscoreCommand(client *c) { + robj *key = c->argv[1]; + robj *zobj; + double score; + + if ((zobj = lookupKeyReadOrReply(c,key,shared.null[c->resp])) == NULL || + checkType(c,zobj,OBJ_ZSET)) return; + + if (zsetScore(zobj,c->argv[2]->ptr,&score) == C_ERR) { + addReplyNull(c); + } else { + addReplyDouble(c,score); + } +} + +void zmscoreCommand(client *c) { + robj *key = c->argv[1]; + robj *zobj; + double score; + zobj = lookupKeyRead(c->db,key); + if (checkType(c,zobj,OBJ_ZSET)) return; + + addReplyArrayLen(c,c->argc - 2); + for (int j = 2; j < c->argc; j++) { + /* Treat a missing set the same way as an empty set */ + if (zobj == NULL || zsetScore(zobj,c->argv[j]->ptr,&score) == C_ERR) { + addReplyNull(c); + } else { + addReplyDouble(c,score); + } + } +} + +void zrankGenericCommand(client *c, int reverse) { + robj *key = c->argv[1]; + robj *ele = c->argv[2]; + robj *zobj; + robj* reply; + long rank; + int opt_withscore = 0; + double score; + + if (c->argc > 4) { + addReplyErrorArity(c); + return; + } + if (c->argc > 3) { + if (!strcasecmp(c->argv[3]->ptr, "withscore")) { + opt_withscore = 1; + } else { + addReplyErrorObject(c, shared.syntaxerr); + return; + } + } + reply = opt_withscore ? shared.nullarray[c->resp] : shared.null[c->resp]; + if ((zobj = lookupKeyReadOrReply(c, key, reply)) == NULL || checkType(c, zobj, OBJ_ZSET)) { + return; + } + serverAssertWithInfo(c, ele, sdsEncodedObject(ele)); + rank = zsetRank(zobj, ele->ptr, reverse, opt_withscore ? &score : NULL); + if (rank >= 0) { + if (opt_withscore) { + addReplyArrayLen(c, 2); + } + addReplyLongLong(c, rank); + if (opt_withscore) { + addReplyDouble(c, score); + } + } else { + if (opt_withscore) { + addReplyNullArray(c); + } else { + addReplyNull(c); + } + } +} + +void zrankCommand(client *c) { + zrankGenericCommand(c, 0); +} + +void zrevrankCommand(client *c) { + zrankGenericCommand(c, 1); +} + +void zscanCommand(client *c) { + robj *o; + unsigned long cursor; + + if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return; + if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL || + checkType(c,o,OBJ_ZSET)) return; + scanGenericCommand(c,o,cursor); +} + +/* This command implements the generic zpop operation, used by: + * ZPOPMIN, ZPOPMAX, BZPOPMIN, BZPOPMAX and ZMPOP. This function is also used + * inside blocked.c in the unblocking stage of BZPOPMIN, BZPOPMAX and BZMPOP. + * + * If 'emitkey' is true also the key name is emitted, useful for the blocking + * behavior of BZPOP[MIN|MAX], since we can block into multiple keys. + * Or in ZMPOP/BZMPOP, because we also can take multiple keys. + * + * 'count' is the number of elements requested to pop, or -1 for plain single pop. + * + * 'use_nested_array' when false it generates a flat array (with or without key name). + * When true, it generates a nested 2 level array of field + score pairs, or 3 level when emitkey is set. + * + * 'reply_nil_when_empty' when true we reply a NIL if we are not able to pop up any elements. + * Like in ZMPOP/BZMPOP we reply with a structured nested array containing key name + * and member + score pairs. In these commands, we reply with null when we have no result. + * Otherwise in ZPOPMIN/ZPOPMAX we reply an empty array by default. + * + * 'deleted' is an optional output argument to get an indication + * if the key got deleted by this function. + * */ +void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey, + long count, int use_nested_array, int reply_nil_when_empty, int *deleted) { + int idx; + robj *key = NULL; + robj *zobj = NULL; + sds ele; + double score; + + if (deleted) *deleted = 0; + + /* Check type and break on the first error, otherwise identify candidate. */ + idx = 0; + while (idx < keyc) { + key = keyv[idx++]; + zobj = lookupKeyWrite(c->db,key); + if (!zobj) continue; + if (checkType(c,zobj,OBJ_ZSET)) return; + break; + } + + /* No candidate for zpopping, return empty. */ + if (!zobj) { + if (reply_nil_when_empty) { + addReplyNullArray(c); + } else { + addReply(c,shared.emptyarray); + } + return; + } + + if (count == 0) { + /* ZPOPMIN/ZPOPMAX with count 0. */ + addReply(c, shared.emptyarray); + return; + } + + long result_count = 0; + + /* When count is -1, we need to correct it to 1 for plain single pop. */ + if (count == -1) count = 1; + + long llen = zsetLength(zobj); + long rangelen = (count > llen) ? llen : count; + + if (!use_nested_array && !emitkey) { + /* ZPOPMIN/ZPOPMAX with or without COUNT option in RESP2. */ + addReplyArrayLen(c, rangelen * 2); + } else if (use_nested_array && !emitkey) { + /* ZPOPMIN/ZPOPMAX with COUNT option in RESP3. */ + addReplyArrayLen(c, rangelen); + } else if (!use_nested_array && emitkey) { + /* BZPOPMIN/BZPOPMAX in RESP2 and RESP3. */ + addReplyArrayLen(c, rangelen * 2 + 1); + addReplyBulk(c, key); + } else if (use_nested_array && emitkey) { + /* ZMPOP/BZMPOP in RESP2 and RESP3. */ + addReplyArrayLen(c, 2); + addReplyBulk(c, key); + addReplyArrayLen(c, rangelen); + } + + /* Remove the element. */ + do { + if (zobj->encoding == OBJ_ENCODING_LISTPACK) { + unsigned char *zl = zobj->ptr; + unsigned char *eptr, *sptr; + unsigned char *vstr; + unsigned int vlen; + long long vlong; + + /* Get the first or last element in the sorted set. */ + eptr = lpSeek(zl,where == ZSET_MAX ? -2 : 0); + serverAssertWithInfo(c,zobj,eptr != NULL); + vstr = lpGetValue(eptr,&vlen,&vlong); + if (vstr == NULL) + ele = sdsfromlonglong(vlong); + else + ele = sdsnewlen(vstr,vlen); + + /* Get the score. */ + sptr = lpNext(zl,eptr); + serverAssertWithInfo(c,zobj,sptr != NULL); + score = zzlGetScore(sptr); + } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { + zset *zs = zobj->ptr; + zskiplist *zsl = zs->zsl; + zskiplistNode *zln; + + /* Get the first or last element in the sorted set. */ + zln = (where == ZSET_MAX ? zsl->tail : + zsl->header->level[0].forward); + + /* There must be an element in the sorted set. */ + serverAssertWithInfo(c,zobj,zln != NULL); + ele = sdsdup(zln->ele); + score = zln->score; + } else { + serverPanic("Unknown sorted set encoding"); + } + + serverAssertWithInfo(c,zobj,zsetDel(zobj,ele)); + server.dirty++; + + if (result_count == 0) { /* Do this only for the first iteration. */ + char *events[2] = {"zpopmin","zpopmax"}; + notifyKeyspaceEvent(NOTIFY_ZSET,events[where],key,c->db->id); + signalModifiedKey(c,c->db,key); + } + + if (use_nested_array) { + addReplyArrayLen(c,2); + } + addReplyBulkCBuffer(c,ele,sdslen(ele)); + addReplyDouble(c,score); + sdsfree(ele); + ++result_count; + } while(--rangelen); + + /* Remove the key, if indeed needed. */ + if (zsetLength(zobj) == 0) { + if (deleted) *deleted = 1; + + dbDelete(c->db,key); + notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); + } + + if (c->cmd->proc == zmpopCommand) { + /* Always replicate it as ZPOP[MIN|MAX] with COUNT option instead of ZMPOP. */ + robj *count_obj = createStringObjectFromLongLong((count > llen) ? llen : count); + rewriteClientCommandVector(c, 3, + (where == ZSET_MAX) ? shared.zpopmax : shared.zpopmin, + key, count_obj); + decrRefCount(count_obj); + } +} + +/* ZPOPMIN/ZPOPMAX key [<count>] */ +void zpopMinMaxCommand(client *c, int where) { + if (c->argc > 3) { + addReplyErrorObject(c,shared.syntaxerr); + return; + } + + long count = -1; /* -1 for plain single pop. */ + if (c->argc == 3 && getPositiveLongFromObjectOrReply(c, c->argv[2], &count, NULL) != C_OK) + return; + + /* Respond with a single (flat) array in RESP2 or if count is -1 + * (returning a single element). In RESP3, when count > 0 use nested array. */ + int use_nested_array = (c->resp > 2 && count != -1); + + genericZpopCommand(c, &c->argv[1], 1, where, 0, count, use_nested_array, 0, NULL); +} + +/* ZPOPMIN key [<count>] */ +void zpopminCommand(client *c) { + zpopMinMaxCommand(c, ZSET_MIN); +} + +/* ZPOPMAX key [<count>] */ +void zpopmaxCommand(client *c) { + zpopMinMaxCommand(c, ZSET_MAX); +} + +/* BZPOPMIN, BZPOPMAX, BZMPOP actual implementation. + * + * 'numkeys' is the number of keys. + * + * 'timeout_idx' parameter position of block timeout. + * + * 'where' ZSET_MIN or ZSET_MAX. + * + * 'count' is the number of elements requested to pop, or -1 for plain single pop. + * + * 'use_nested_array' when false it generates a flat array (with or without key name). + * When true, it generates a nested 3 level array of keyname, field + score pairs. + * */ +void blockingGenericZpopCommand(client *c, robj **keys, int numkeys, int where, + int timeout_idx, long count, int use_nested_array, int reply_nil_when_empty) { + robj *o; + robj *key; + mstime_t timeout; + int j; + + if (getTimeoutFromObjectOrReply(c,c->argv[timeout_idx],&timeout,UNIT_SECONDS) + != C_OK) return; + + for (j = 0; j < numkeys; j++) { + key = keys[j]; + o = lookupKeyWrite(c->db,key); + /* Non-existing key, move to next key. */ + if (o == NULL) continue; + + if (checkType(c,o,OBJ_ZSET)) return; + + long llen = zsetLength(o); + /* Empty zset, move to next key. */ + if (llen == 0) continue; + + /* Non empty zset, this is like a normal ZPOP[MIN|MAX]. */ + genericZpopCommand(c, &key, 1, where, 1, count, use_nested_array, reply_nil_when_empty, NULL); + + if (count == -1) { + /* Replicate it as ZPOP[MIN|MAX] instead of BZPOP[MIN|MAX]. */ + rewriteClientCommandVector(c,2, + (where == ZSET_MAX) ? shared.zpopmax : shared.zpopmin, + key); + } else { + /* Replicate it as ZPOP[MIN|MAX] with COUNT option. */ + robj *count_obj = createStringObjectFromLongLong((count > llen) ? llen : count); + rewriteClientCommandVector(c, 3, + (where == ZSET_MAX) ? shared.zpopmax : shared.zpopmin, + key, count_obj); + decrRefCount(count_obj); + } + + return; + } + + /* If we are not allowed to block the client and the zset is empty the only thing + * we can do is treating it as a timeout (even with timeout 0). */ + if (c->flags & CLIENT_DENY_BLOCKING) { + addReplyNullArray(c); + return; + } + + /* If the keys do not exist we must block */ + blockForKeys(c,BLOCKED_ZSET,keys,numkeys,timeout,0); +} + +// BZPOPMIN key [key ...] timeout +void bzpopminCommand(client *c) { + blockingGenericZpopCommand(c, c->argv+1, c->argc-2, ZSET_MIN, c->argc-1, -1, 0, 0); +} + +// BZPOPMAX key [key ...] timeout +void bzpopmaxCommand(client *c) { + blockingGenericZpopCommand(c, c->argv+1, c->argc-2, ZSET_MAX, c->argc-1, -1, 0, 0); +} + +static void zrandmemberReplyWithListpack(client *c, unsigned int count, listpackEntry *keys, listpackEntry *vals) { + for (unsigned long i = 0; i < count; i++) { + if (vals && c->resp > 2) + addReplyArrayLen(c,2); + if (keys[i].sval) + addReplyBulkCBuffer(c, keys[i].sval, keys[i].slen); + else + addReplyBulkLongLong(c, keys[i].lval); + if (vals) { + if (vals[i].sval) { + addReplyDouble(c, zzlStrtod(vals[i].sval,vals[i].slen)); + } else + addReplyDouble(c, vals[i].lval); + } + } +} + +/* How many times bigger should be the zset compared to the requested size + * for us to not use the "remove elements" strategy? Read later in the + * implementation for more info. */ +#define ZRANDMEMBER_SUB_STRATEGY_MUL 3 + +/* If client is trying to ask for a very large number of random elements, + * queuing may consume an unlimited amount of memory, so we want to limit + * the number of randoms per time. */ +#define ZRANDMEMBER_RANDOM_SAMPLE_LIMIT 1000 + +void zrandmemberWithCountCommand(client *c, long l, int withscores) { + unsigned long count, size; + int uniq = 1; + robj *zsetobj; + + if ((zsetobj = lookupKeyReadOrReply(c, c->argv[1], shared.emptyarray)) + == NULL || checkType(c, zsetobj, OBJ_ZSET)) return; + size = zsetLength(zsetobj); + + if(l >= 0) { + count = (unsigned long) l; + } else { + count = -l; + uniq = 0; + } + + /* If count is zero, serve it ASAP to avoid special cases later. */ + if (count == 0) { + addReply(c,shared.emptyarray); + return; + } + + /* CASE 1: The count was negative, so the extraction method is just: + * "return N random elements" sampling the whole set every time. + * This case is trivial and can be served without auxiliary data + * structures. This case is the only one that also needs to return the + * elements in random order. */ + if (!uniq || count == 1) { + if (withscores && c->resp == 2) + addReplyArrayLen(c, count*2); + else + addReplyArrayLen(c, count); + if (zsetobj->encoding == OBJ_ENCODING_SKIPLIST) { + zset *zs = zsetobj->ptr; + while (count--) { + dictEntry *de = dictGetFairRandomKey(zs->dict); + sds key = dictGetKey(de); + if (withscores && c->resp > 2) + addReplyArrayLen(c,2); + addReplyBulkCBuffer(c, key, sdslen(key)); + if (withscores) + addReplyDouble(c, *(double*)dictGetVal(de)); + if (c->flags & CLIENT_CLOSE_ASAP) + break; + } + } else if (zsetobj->encoding == OBJ_ENCODING_LISTPACK) { + listpackEntry *keys, *vals = NULL; + unsigned long limit, sample_count; + limit = count > ZRANDMEMBER_RANDOM_SAMPLE_LIMIT ? ZRANDMEMBER_RANDOM_SAMPLE_LIMIT : count; + keys = zmalloc(sizeof(listpackEntry)*limit); + if (withscores) + vals = zmalloc(sizeof(listpackEntry)*limit); + while (count) { + sample_count = count > limit ? limit : count; + count -= sample_count; + lpRandomPairs(zsetobj->ptr, sample_count, keys, vals); + zrandmemberReplyWithListpack(c, sample_count, keys, vals); + if (c->flags & CLIENT_CLOSE_ASAP) + break; + } + zfree(keys); + zfree(vals); + } + return; + } + + zsetopsrc src; + zsetopval zval; + src.subject = zsetobj; + src.type = zsetobj->type; + src.encoding = zsetobj->encoding; + zuiInitIterator(&src); + memset(&zval, 0, sizeof(zval)); + + /* Initiate reply count, RESP3 responds with nested array, RESP2 with flat one. */ + long reply_size = count < size ? count : size; + if (withscores && c->resp == 2) + addReplyArrayLen(c, reply_size*2); + else + addReplyArrayLen(c, reply_size); + + /* CASE 2: + * The number of requested elements is greater than the number of + * elements inside the zset: simply return the whole zset. */ + if (count >= size) { + while (zuiNext(&src, &zval)) { + if (withscores && c->resp > 2) + addReplyArrayLen(c,2); + addReplyBulkSds(c, zuiNewSdsFromValue(&zval)); + if (withscores) + addReplyDouble(c, zval.score); + } + zuiClearIterator(&src); + return; + } + + /* CASE 2.5 listpack only. Sampling unique elements, in non-random order. + * Listpack encoded zsets are meant to be relatively small, so + * ZRANDMEMBER_SUB_STRATEGY_MUL isn't necessary and we rather not make + * copies of the entries. Instead, we emit them directly to the output + * buffer. + * + * And it is inefficient to repeatedly pick one random element from a + * listpack in CASE 4. So we use this instead. */ + if (zsetobj->encoding == OBJ_ENCODING_LISTPACK) { + listpackEntry *keys, *vals = NULL; + keys = zmalloc(sizeof(listpackEntry)*count); + if (withscores) + vals = zmalloc(sizeof(listpackEntry)*count); + serverAssert(lpRandomPairsUnique(zsetobj->ptr, count, keys, vals) == count); + zrandmemberReplyWithListpack(c, count, keys, vals); + zfree(keys); + zfree(vals); + zuiClearIterator(&src); + return; + } + + /* CASE 3: + * The number of elements inside the zset is not greater than + * ZRANDMEMBER_SUB_STRATEGY_MUL times the number of requested elements. + * In this case we create a dict from scratch with all the elements, and + * subtract random elements to reach the requested number of elements. + * + * This is done because if the number of requested elements is just + * a bit less than the number of elements in the set, the natural approach + * used into CASE 4 is highly inefficient. */ + if (count*ZRANDMEMBER_SUB_STRATEGY_MUL > size) { + /* Hashtable encoding (generic implementation) */ + dict *d = dictCreate(&sdsReplyDictType); + dictExpand(d, size); + /* Add all the elements into the temporary dictionary. */ + while (zuiNext(&src, &zval)) { + sds key = zuiNewSdsFromValue(&zval); + dictEntry *de = dictAddRaw(d, key, NULL); + serverAssert(de); + if (withscores) + dictSetDoubleVal(de, zval.score); + } + serverAssert(dictSize(d) == size); + + /* Remove random elements to reach the right count. */ + while (size > count) { + dictEntry *de; + de = dictGetFairRandomKey(d); + dictUnlink(d,dictGetKey(de)); + sdsfree(dictGetKey(de)); + dictFreeUnlinkedEntry(d,de); + size--; + } + + /* Reply with what's in the dict and release memory */ + dictIterator *di; + dictEntry *de; + di = dictGetIterator(d); + while ((de = dictNext(di)) != NULL) { + if (withscores && c->resp > 2) + addReplyArrayLen(c,2); + addReplyBulkSds(c, dictGetKey(de)); + if (withscores) + addReplyDouble(c, dictGetDoubleVal(de)); + } + + dictReleaseIterator(di); + dictRelease(d); + } + + /* CASE 4: We have a big zset compared to the requested number of elements. + * In this case we can simply get random elements from the zset and add + * to the temporary set, trying to eventually get enough unique elements + * to reach the specified count. */ + else { + /* Hashtable encoding (generic implementation) */ + unsigned long added = 0; + dict *d = dictCreate(&hashDictType); + dictExpand(d, count); + + while (added < count) { + listpackEntry key; + double score; + zsetTypeRandomElement(zsetobj, size, &key, withscores ? &score: NULL); + + /* Try to add the object to the dictionary. If it already exists + * free it, otherwise increment the number of objects we have + * in the result dictionary. */ + sds skey = zsetSdsFromListpackEntry(&key); + if (dictAdd(d,skey,NULL) != DICT_OK) { + sdsfree(skey); + continue; + } + added++; + + if (withscores && c->resp > 2) + addReplyArrayLen(c,2); + zsetReplyFromListpackEntry(c, &key); + if (withscores) + addReplyDouble(c, score); + } + + /* Release memory */ + dictRelease(d); + } + zuiClearIterator(&src); +} + +/* ZRANDMEMBER key [<count> [WITHSCORES]] */ +void zrandmemberCommand(client *c) { + long l; + int withscores = 0; + robj *zset; + listpackEntry ele; + + if (c->argc >= 3) { + if (getRangeLongFromObjectOrReply(c,c->argv[2],-LONG_MAX,LONG_MAX,&l,NULL) != C_OK) return; + if (c->argc > 4 || (c->argc == 4 && strcasecmp(c->argv[3]->ptr,"withscores"))) { + addReplyErrorObject(c,shared.syntaxerr); + return; + } else if (c->argc == 4) { + withscores = 1; + if (l < -LONG_MAX/2 || l > LONG_MAX/2) { + addReplyError(c,"value is out of range"); + return; + } + } + zrandmemberWithCountCommand(c, l, withscores); + return; + } + + /* Handle variant without <count> argument. Reply with simple bulk string */ + if ((zset = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]))== NULL || + checkType(c,zset,OBJ_ZSET)) { + return; + } + + zsetTypeRandomElement(zset, zsetLength(zset), &ele,NULL); + zsetReplyFromListpackEntry(c,&ele); +} + +/* ZMPOP/BZMPOP + * + * 'numkeys_idx' parameter position of key number. + * 'is_block' this indicates whether it is a blocking variant. */ +void zmpopGenericCommand(client *c, int numkeys_idx, int is_block) { + long j; + long numkeys = 0; /* Number of keys. */ + int where = 0; /* ZSET_MIN or ZSET_MAX. */ + long count = -1; /* Reply will consist of up to count elements, depending on the zset's length. */ + + /* Parse the numkeys. */ + if (getRangeLongFromObjectOrReply(c, c->argv[numkeys_idx], 1, LONG_MAX, + &numkeys, "numkeys should be greater than 0") != C_OK) + return; + + /* Parse the where. where_idx: the index of where in the c->argv. */ + long where_idx = numkeys_idx + numkeys + 1; + if (where_idx >= c->argc) { + addReplyErrorObject(c, shared.syntaxerr); + return; + } + if (!strcasecmp(c->argv[where_idx]->ptr, "MIN")) { + where = ZSET_MIN; + } else if (!strcasecmp(c->argv[where_idx]->ptr, "MAX")) { + where = ZSET_MAX; + } else { + addReplyErrorObject(c, shared.syntaxerr); + return; + } + + /* Parse the optional arguments. */ + for (j = where_idx + 1; j < c->argc; j++) { + char *opt = c->argv[j]->ptr; + int moreargs = (c->argc - 1) - j; + + if (count == -1 && !strcasecmp(opt, "COUNT") && moreargs) { + j++; + if (getRangeLongFromObjectOrReply(c, c->argv[j], 1, LONG_MAX, + &count,"count should be greater than 0") != C_OK) + return; + } else { + addReplyErrorObject(c, shared.syntaxerr); + return; + } + } + + if (count == -1) count = 1; + + if (is_block) { + /* BLOCK. We will handle CLIENT_DENY_BLOCKING flag in blockingGenericZpopCommand. */ + blockingGenericZpopCommand(c, c->argv+numkeys_idx+1, numkeys, where, 1, count, 1, 1); + } else { + /* NON-BLOCK */ + genericZpopCommand(c, c->argv+numkeys_idx+1, numkeys, where, 1, count, 1, 1, NULL); + } +} + +/* ZMPOP numkeys key [<key> ...] MIN|MAX [COUNT count] */ +void zmpopCommand(client *c) { + zmpopGenericCommand(c, 1, 0); +} + +/* BZMPOP timeout numkeys key [<key> ...] MIN|MAX [COUNT count] */ +void bzmpopCommand(client *c) { + zmpopGenericCommand(c, 2, 1); +} |