summaryrefslogtreecommitdiffstats
path: root/src/db.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/db.c')
-rw-r--r--src/db.c2457
1 files changed, 2457 insertions, 0 deletions
diff --git a/src/db.c b/src/db.c
new file mode 100644
index 0000000..588ae34
--- /dev/null
+++ b/src/db.c
@@ -0,0 +1,2457 @@
+/*
+ * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "server.h"
+#include "cluster.h"
+#include "atomicvar.h"
+#include "latency.h"
+#include "script.h"
+#include "functions.h"
+
+#include <signal.h>
+#include <ctype.h>
+
+/*-----------------------------------------------------------------------------
+ * C-level DB API
+ *----------------------------------------------------------------------------*/
+
+/* Flags for expireIfNeeded */
+#define EXPIRE_FORCE_DELETE_EXPIRED 1
+#define EXPIRE_AVOID_DELETE_EXPIRED 2
+
+int expireIfNeeded(redisDb *db, robj *key, int flags);
+int keyIsExpired(redisDb *db, robj *key);
+
+/* Update LFU when an object is accessed.
+ * Firstly, decrement the counter if the decrement time is reached.
+ * Then logarithmically increment the counter, and update the access time. */
+void updateLFU(robj *val) {
+ unsigned long counter = LFUDecrAndReturn(val);
+ counter = LFULogIncr(counter);
+ val->lru = (LFUGetTimeInMinutes()<<8) | counter;
+}
+
+/* Lookup a key for read or write operations, or return NULL if the key is not
+ * found in the specified DB. This function implements the functionality of
+ * lookupKeyRead(), lookupKeyWrite() and their ...WithFlags() variants.
+ *
+ * Side-effects of calling this function:
+ *
+ * 1. A key gets expired if it reached it's TTL.
+ * 2. The key's last access time is updated.
+ * 3. The global keys hits/misses stats are updated (reported in INFO).
+ * 4. If keyspace notifications are enabled, a "keymiss" notification is fired.
+ *
+ * Flags change the behavior of this command:
+ *
+ * LOOKUP_NONE (or zero): No special flags are passed.
+ * LOOKUP_NOTOUCH: Don't alter the last access time of the key.
+ * LOOKUP_NONOTIFY: Don't trigger keyspace event on key miss.
+ * LOOKUP_NOSTATS: Don't increment key hits/misses counters.
+ * LOOKUP_WRITE: Prepare the key for writing (delete expired keys even on
+ * replicas, use separate keyspace stats and events (TODO)).
+ * LOOKUP_NOEXPIRE: Perform expiration check, but avoid deleting the key,
+ * so that we don't have to propagate the deletion.
+ *
+ * Note: this function also returns NULL if the key is logically expired but
+ * still existing, in case this is a replica and the LOOKUP_WRITE is not set.
+ * Even if the key expiry is master-driven, we can correctly report a key is
+ * expired on replicas even if the master is lagging expiring our key via DELs
+ * in the replication link. */
+robj *lookupKey(redisDb *db, robj *key, int flags) {
+ dictEntry *de = dictFind(db->dict,key->ptr);
+ robj *val = NULL;
+ if (de) {
+ val = dictGetVal(de);
+ /* Forcing deletion of expired keys on a replica makes the replica
+ * inconsistent with the master. We forbid it on readonly replicas, but
+ * we have to allow it on writable replicas to make write commands
+ * behave consistently.
+ *
+ * It's possible that the WRITE flag is set even during a readonly
+ * command, since the command may trigger events that cause modules to
+ * perform additional writes. */
+ int is_ro_replica = server.masterhost && server.repl_slave_ro;
+ int expire_flags = 0;
+ if (flags & LOOKUP_WRITE && !is_ro_replica)
+ expire_flags |= EXPIRE_FORCE_DELETE_EXPIRED;
+ if (flags & LOOKUP_NOEXPIRE)
+ expire_flags |= EXPIRE_AVOID_DELETE_EXPIRED;
+ if (expireIfNeeded(db, key, expire_flags)) {
+ /* The key is no longer valid. */
+ val = NULL;
+ }
+ }
+
+ if (val) {
+ /* Update the access time for the ageing algorithm.
+ * Don't do it if we have a saving child, as this will trigger
+ * a copy on write madness. */
+ if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)){
+ if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
+ updateLFU(val);
+ } else {
+ val->lru = LRU_CLOCK();
+ }
+ }
+
+ if (!(flags & (LOOKUP_NOSTATS | LOOKUP_WRITE)))
+ server.stat_keyspace_hits++;
+ /* TODO: Use separate hits stats for WRITE */
+ } else {
+ if (!(flags & (LOOKUP_NONOTIFY | LOOKUP_WRITE)))
+ notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
+ if (!(flags & (LOOKUP_NOSTATS | LOOKUP_WRITE)))
+ server.stat_keyspace_misses++;
+ /* TODO: Use separate misses stats and notify event for WRITE */
+ }
+
+ return val;
+}
+
+/* Lookup a key for read operations, or return NULL if the key is not found
+ * in the specified DB.
+ *
+ * This API should not be used when we write to the key after obtaining
+ * the object linked to the key, but only for read only operations.
+ *
+ * This function is equivalent to lookupKey(). The point of using this function
+ * rather than lookupKey() directly is to indicate that the purpose is to read
+ * the key. */
+robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
+ serverAssert(!(flags & LOOKUP_WRITE));
+ return lookupKey(db, key, flags);
+}
+
+/* Like lookupKeyReadWithFlags(), but does not use any flag, which is the
+ * common case. */
+robj *lookupKeyRead(redisDb *db, robj *key) {
+ return lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
+}
+
+/* Lookup a key for write operations, and as a side effect, if needed, expires
+ * the key if its TTL is reached. It's equivalent to lookupKey() with the
+ * LOOKUP_WRITE flag added.
+ *
+ * Returns the linked value object if the key exists or NULL if the key
+ * does not exist in the specified DB. */
+robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags) {
+ return lookupKey(db, key, flags | LOOKUP_WRITE);
+}
+
+robj *lookupKeyWrite(redisDb *db, robj *key) {
+ return lookupKeyWriteWithFlags(db, key, LOOKUP_NONE);
+}
+
+robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
+ robj *o = lookupKeyRead(c->db, key);
+ if (!o) addReplyOrErrorObject(c, reply);
+ return o;
+}
+
+robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
+ robj *o = lookupKeyWrite(c->db, key);
+ if (!o) addReplyOrErrorObject(c, reply);
+ return o;
+}
+
+/* Add the key to the DB. It's up to the caller to increment the reference
+ * counter of the value if needed.
+ *
+ * The program is aborted if the key already exists. */
+void dbAdd(redisDb *db, robj *key, robj *val) {
+ sds copy = sdsdup(key->ptr);
+ dictEntry *de = dictAddRaw(db->dict, copy, NULL);
+ serverAssertWithInfo(NULL, key, de != NULL);
+ dictSetVal(db->dict, de, val);
+ signalKeyAsReady(db, key, val->type);
+ if (server.cluster_enabled) slotToKeyAddEntry(de, db);
+ notifyKeyspaceEvent(NOTIFY_NEW,"new",key,db->id);
+}
+
+/* This is a special version of dbAdd() that is used only when loading
+ * keys from the RDB file: the key is passed as an SDS string that is
+ * retained by the function (and not freed by the caller).
+ *
+ * Moreover this function will not abort if the key is already busy, to
+ * give more control to the caller, nor will signal the key as ready
+ * since it is not useful in this context.
+ *
+ * The function returns 1 if the key was added to the database, taking
+ * ownership of the SDS string, otherwise 0 is returned, and is up to the
+ * caller to free the SDS string. */
+int dbAddRDBLoad(redisDb *db, sds key, robj *val) {
+ dictEntry *de = dictAddRaw(db->dict, key, NULL);
+ if (de == NULL) return 0;
+ dictSetVal(db->dict, de, val);
+ if (server.cluster_enabled) slotToKeyAddEntry(de, db);
+ return 1;
+}
+
+/* Overwrite an existing key with a new value. Incrementing the reference
+ * count of the new value is up to the caller.
+ * This function does not modify the expire time of the existing key.
+ *
+ * The program is aborted if the key was not already present. */
+void dbOverwrite(redisDb *db, robj *key, robj *val) {
+ dictEntry *de = dictFind(db->dict,key->ptr);
+
+ serverAssertWithInfo(NULL,key,de != NULL);
+ dictEntry auxentry = *de;
+ robj *old = dictGetVal(de);
+ if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
+ val->lru = old->lru;
+ }
+ /* Although the key is not really deleted from the database, we regard
+ * overwrite as two steps of unlink+add, so we still need to call the unlink
+ * callback of the module. */
+ moduleNotifyKeyUnlink(key,old,db->id);
+ /* We want to try to unblock any client using a blocking XREADGROUP */
+ if (old->type == OBJ_STREAM)
+ signalKeyAsReady(db,key,old->type);
+ dictSetVal(db->dict, de, val);
+
+ if (server.lazyfree_lazy_server_del) {
+ freeObjAsync(key,old,db->id);
+ dictSetVal(db->dict, &auxentry, NULL);
+ }
+
+ dictFreeVal(db->dict, &auxentry);
+}
+
+/* High level Set operation. This function can be used in order to set
+ * a key, whatever it was existing or not, to a new object.
+ *
+ * 1) The ref count of the value object is incremented.
+ * 2) clients WATCHing for the destination key notified.
+ * 3) The expire time of the key is reset (the key is made persistent),
+ * unless 'SETKEY_KEEPTTL' is enabled in flags.
+ * 4) The key lookup can take place outside this interface outcome will be
+ * delivered with 'SETKEY_ALREADY_EXIST' or 'SETKEY_DOESNT_EXIST'
+ *
+ * All the new keys in the database should be created via this interface.
+ * The client 'c' argument may be set to NULL if the operation is performed
+ * in a context where there is no clear client performing the operation. */
+void setKey(client *c, redisDb *db, robj *key, robj *val, int flags) {
+ int keyfound = 0;
+
+ if (flags & SETKEY_ALREADY_EXIST)
+ keyfound = 1;
+ else if (!(flags & SETKEY_DOESNT_EXIST))
+ keyfound = (lookupKeyWrite(db,key) != NULL);
+
+ if (!keyfound) {
+ dbAdd(db,key,val);
+ } else {
+ dbOverwrite(db,key,val);
+ }
+ incrRefCount(val);
+ if (!(flags & SETKEY_KEEPTTL)) removeExpire(db,key);
+ if (!(flags & SETKEY_NO_SIGNAL)) signalModifiedKey(c,db,key);
+}
+
+/* Return a random key, in form of a Redis object.
+ * If there are no keys, NULL is returned.
+ *
+ * The function makes sure to return keys not already expired. */
+robj *dbRandomKey(redisDb *db) {
+ dictEntry *de;
+ int maxtries = 100;
+ int allvolatile = dictSize(db->dict) == dictSize(db->expires);
+
+ while(1) {
+ sds key;
+ robj *keyobj;
+
+ de = dictGetFairRandomKey(db->dict);
+ if (de == NULL) return NULL;
+
+ key = dictGetKey(de);
+ keyobj = createStringObject(key,sdslen(key));
+ if (dictFind(db->expires,key)) {
+ if (allvolatile && server.masterhost && --maxtries == 0) {
+ /* If the DB is composed only of keys with an expire set,
+ * it could happen that all the keys are already logically
+ * expired in the slave, so the function cannot stop because
+ * expireIfNeeded() is false, nor it can stop because
+ * dictGetFairRandomKey() returns NULL (there are keys to return).
+ * To prevent the infinite loop we do some tries, but if there
+ * are the conditions for an infinite loop, eventually we
+ * return a key name that may be already expired. */
+ return keyobj;
+ }
+ if (expireIfNeeded(db,keyobj,0)) {
+ decrRefCount(keyobj);
+ continue; /* search for another key. This expired. */
+ }
+ }
+ return keyobj;
+ }
+}
+
+/* Helper for sync and async delete. */
+static int dbGenericDelete(redisDb *db, robj *key, int async) {
+ /* Deleting an entry from the expires dict will not free the sds of
+ * the key, because it is shared with the main dictionary. */
+ if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
+ dictEntry *de = dictUnlink(db->dict,key->ptr);
+ if (de) {
+ robj *val = dictGetVal(de);
+ /* Tells the module that the key has been unlinked from the database. */
+ moduleNotifyKeyUnlink(key,val,db->id);
+ /* We want to try to unblock any client using a blocking XREADGROUP */
+ if (val->type == OBJ_STREAM)
+ signalKeyAsReady(db,key,val->type);
+ if (async) {
+ freeObjAsync(key, val, db->id);
+ dictSetVal(db->dict, de, NULL);
+ }
+ if (server.cluster_enabled) slotToKeyDelEntry(de, db);
+ dictFreeUnlinkedEntry(db->dict,de);
+ return 1;
+ } else {
+ return 0;
+ }
+}
+
+/* Delete a key, value, and associated expiration entry if any, from the DB */
+int dbSyncDelete(redisDb *db, robj *key) {
+ return dbGenericDelete(db, key, 0);
+}
+
+/* Delete a key, value, and associated expiration entry if any, from the DB. If
+ * the value consists of many allocations, it may be freed asynchronously. */
+int dbAsyncDelete(redisDb *db, robj *key) {
+ return dbGenericDelete(db, key, 1);
+}
+
+/* This is a wrapper whose behavior depends on the Redis lazy free
+ * configuration. Deletes the key synchronously or asynchronously. */
+int dbDelete(redisDb *db, robj *key) {
+ return dbGenericDelete(db, key, server.lazyfree_lazy_server_del);
+}
+
+/* Prepare the string object stored at 'key' to be modified destructively
+ * to implement commands like SETBIT or APPEND.
+ *
+ * An object is usually ready to be modified unless one of the two conditions
+ * are true:
+ *
+ * 1) The object 'o' is shared (refcount > 1), we don't want to affect
+ * other users.
+ * 2) The object encoding is not "RAW".
+ *
+ * If the object is found in one of the above conditions (or both) by the
+ * function, an unshared / not-encoded copy of the string object is stored
+ * at 'key' in the specified 'db'. Otherwise the object 'o' itself is
+ * returned.
+ *
+ * USAGE:
+ *
+ * The object 'o' is what the caller already obtained by looking up 'key'
+ * in 'db', the usage pattern looks like this:
+ *
+ * o = lookupKeyWrite(db,key);
+ * if (checkType(c,o,OBJ_STRING)) return;
+ * o = dbUnshareStringValue(db,key,o);
+ *
+ * At this point the caller is ready to modify the object, for example
+ * using an sdscat() call to append some data, or anything else.
+ */
+robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
+ serverAssert(o->type == OBJ_STRING);
+ if (o->refcount != 1 || o->encoding != OBJ_ENCODING_RAW) {
+ robj *decoded = getDecodedObject(o);
+ o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr));
+ decrRefCount(decoded);
+ dbOverwrite(db,key,o);
+ }
+ return o;
+}
+
+/* Remove all keys from the database(s) structure. The dbarray argument
+ * may not be the server main DBs (could be a temporary DB).
+ *
+ * The dbnum can be -1 if all the DBs should be emptied, or the specified
+ * DB index if we want to empty only a single database.
+ * The function returns the number of keys removed from the database(s). */
+long long emptyDbStructure(redisDb *dbarray, int dbnum, int async,
+ void(callback)(dict*))
+{
+ long long removed = 0;
+ int startdb, enddb;
+
+ if (dbnum == -1) {
+ startdb = 0;
+ enddb = server.dbnum-1;
+ } else {
+ startdb = enddb = dbnum;
+ }
+
+ for (int j = startdb; j <= enddb; j++) {
+ removed += dictSize(dbarray[j].dict);
+ if (async) {
+ emptyDbAsync(&dbarray[j]);
+ } else {
+ dictEmpty(dbarray[j].dict,callback);
+ dictEmpty(dbarray[j].expires,callback);
+ }
+ /* Because all keys of database are removed, reset average ttl. */
+ dbarray[j].avg_ttl = 0;
+ dbarray[j].expires_cursor = 0;
+ }
+
+ return removed;
+}
+
+/* Remove all data (keys and functions) from all the databases in a
+ * Redis server. If callback is given the function is called from
+ * time to time to signal that work is in progress.
+ *
+ * The dbnum can be -1 if all the DBs should be flushed, or the specified
+ * DB number if we want to flush only a single Redis database number.
+ *
+ * Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or
+ * EMPTYDB_ASYNC if we want the memory to be freed in a different thread
+ * and the function to return ASAP. EMPTYDB_NOFUNCTIONS can also be set
+ * to specify that we do not want to delete the functions.
+ *
+ * On success the function returns the number of keys removed from the
+ * database(s). Otherwise -1 is returned in the specific case the
+ * DB number is out of range, and errno is set to EINVAL. */
+long long emptyData(int dbnum, int flags, void(callback)(dict*)) {
+ int async = (flags & EMPTYDB_ASYNC);
+ int with_functions = !(flags & EMPTYDB_NOFUNCTIONS);
+ RedisModuleFlushInfoV1 fi = {REDISMODULE_FLUSHINFO_VERSION,!async,dbnum};
+ long long removed = 0;
+
+ if (dbnum < -1 || dbnum >= server.dbnum) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ /* Fire the flushdb modules event. */
+ moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB,
+ REDISMODULE_SUBEVENT_FLUSHDB_START,
+ &fi);
+
+ /* Make sure the WATCHed keys are affected by the FLUSH* commands.
+ * Note that we need to call the function while the keys are still
+ * there. */
+ signalFlushedDb(dbnum, async);
+
+ /* Empty redis database structure. */
+ removed = emptyDbStructure(server.db, dbnum, async, callback);
+
+ /* Flush slots to keys map if enable cluster, we can flush entire
+ * slots to keys map whatever dbnum because only support one DB
+ * in cluster mode. */
+ if (server.cluster_enabled) slotToKeyFlush(server.db);
+
+ if (dbnum == -1) flushSlaveKeysWithExpireList();
+
+ if (with_functions) {
+ serverAssert(dbnum == -1);
+ functionsLibCtxClearCurrent(async);
+ }
+
+ /* Also fire the end event. Note that this event will fire almost
+ * immediately after the start event if the flush is asynchronous. */
+ moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB,
+ REDISMODULE_SUBEVENT_FLUSHDB_END,
+ &fi);
+
+ return removed;
+}
+
+/* Initialize temporary db on replica for use during diskless replication. */
+redisDb *initTempDb(void) {
+ redisDb *tempDb = zcalloc(sizeof(redisDb)*server.dbnum);
+ for (int i=0; i<server.dbnum; i++) {
+ tempDb[i].dict = dictCreate(&dbDictType);
+ tempDb[i].expires = dictCreate(&dbExpiresDictType);
+ tempDb[i].slots_to_keys = NULL;
+ }
+
+ if (server.cluster_enabled) {
+ /* Prepare temp slot to key map to be written during async diskless replication. */
+ slotToKeyInit(tempDb);
+ }
+
+ return tempDb;
+}
+
+/* Discard tempDb, this can be slow (similar to FLUSHALL), but it's always async. */
+void discardTempDb(redisDb *tempDb, void(callback)(dict*)) {
+ int async = 1;
+
+ /* Release temp DBs. */
+ emptyDbStructure(tempDb, -1, async, callback);
+ for (int i=0; i<server.dbnum; i++) {
+ dictRelease(tempDb[i].dict);
+ dictRelease(tempDb[i].expires);
+ }
+
+ if (server.cluster_enabled) {
+ /* Release temp slot to key map. */
+ slotToKeyDestroy(tempDb);
+ }
+
+ zfree(tempDb);
+}
+
+int selectDb(client *c, int id) {
+ if (id < 0 || id >= server.dbnum)
+ return C_ERR;
+ c->db = &server.db[id];
+ return C_OK;
+}
+
+long long dbTotalServerKeyCount() {
+ long long total = 0;
+ int j;
+ for (j = 0; j < server.dbnum; j++) {
+ total += dictSize(server.db[j].dict);
+ }
+ return total;
+}
+
+/*-----------------------------------------------------------------------------
+ * Hooks for key space changes.
+ *
+ * Every time a key in the database is modified the function
+ * signalModifiedKey() is called.
+ *
+ * Every time a DB is flushed the function signalFlushDb() is called.
+ *----------------------------------------------------------------------------*/
+
+/* Note that the 'c' argument may be NULL if the key was modified out of
+ * a context of a client. */
+void signalModifiedKey(client *c, redisDb *db, robj *key) {
+ touchWatchedKey(db,key);
+ trackingInvalidateKey(c,key,1);
+}
+
+void signalFlushedDb(int dbid, int async) {
+ int startdb, enddb;
+ if (dbid == -1) {
+ startdb = 0;
+ enddb = server.dbnum-1;
+ } else {
+ startdb = enddb = dbid;
+ }
+
+ for (int j = startdb; j <= enddb; j++) {
+ scanDatabaseForDeletedStreams(&server.db[j], NULL);
+ touchAllWatchedKeysInDb(&server.db[j], NULL);
+ }
+
+ trackingInvalidateKeysOnFlush(async);
+
+ /* Changes in this method may take place in swapMainDbWithTempDb as well,
+ * where we execute similar calls, but with subtle differences as it's
+ * not simply flushing db. */
+}
+
+/*-----------------------------------------------------------------------------
+ * Type agnostic commands operating on the key space
+ *----------------------------------------------------------------------------*/
+
+/* Return the set of flags to use for the emptyDb() call for FLUSHALL
+ * and FLUSHDB commands.
+ *
+ * sync: flushes the database in an sync manner.
+ * async: flushes the database in an async manner.
+ * no option: determine sync or async according to the value of lazyfree-lazy-user-flush.
+ *
+ * On success C_OK is returned and the flags are stored in *flags, otherwise
+ * C_ERR is returned and the function sends an error to the client. */
+int getFlushCommandFlags(client *c, int *flags) {
+ /* Parse the optional ASYNC option. */
+ if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"sync")) {
+ *flags = EMPTYDB_NO_FLAGS;
+ } else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"async")) {
+ *flags = EMPTYDB_ASYNC;
+ } else if (c->argc == 1) {
+ *flags = server.lazyfree_lazy_user_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS;
+ } else {
+ addReplyErrorObject(c,shared.syntaxerr);
+ return C_ERR;
+ }
+ return C_OK;
+}
+
+/* Flushes the whole server data set. */
+void flushAllDataAndResetRDB(int flags) {
+ server.dirty += emptyData(-1,flags,NULL);
+ if (server.child_type == CHILD_TYPE_RDB) killRDBChild();
+ if (server.saveparamslen > 0) {
+ rdbSaveInfo rsi, *rsiptr;
+ rsiptr = rdbPopulateSaveInfo(&rsi);
+ rdbSave(SLAVE_REQ_NONE,server.rdb_filename,rsiptr);
+ }
+
+#if defined(USE_JEMALLOC)
+ /* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
+ * for large databases, flushdb blocks for long anyway, so a bit more won't
+ * harm and this way the flush and purge will be synchronous. */
+ if (!(flags & EMPTYDB_ASYNC))
+ jemalloc_purge();
+#endif
+}
+
+/* FLUSHDB [ASYNC]
+ *
+ * Flushes the currently SELECTed Redis DB. */
+void flushdbCommand(client *c) {
+ int flags;
+
+ if (getFlushCommandFlags(c,&flags) == C_ERR) return;
+ /* flushdb should not flush the functions */
+ server.dirty += emptyData(c->db->id,flags | EMPTYDB_NOFUNCTIONS,NULL);
+
+ /* Without the forceCommandPropagation, when DB was already empty,
+ * FLUSHDB will not be replicated nor put into the AOF. */
+ forceCommandPropagation(c, PROPAGATE_REPL | PROPAGATE_AOF);
+
+ addReply(c,shared.ok);
+
+#if defined(USE_JEMALLOC)
+ /* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
+ * for large databases, flushdb blocks for long anyway, so a bit more won't
+ * harm and this way the flush and purge will be synchronous. */
+ if (!(flags & EMPTYDB_ASYNC))
+ jemalloc_purge();
+#endif
+}
+
+/* FLUSHALL [ASYNC]
+ *
+ * Flushes the whole server data set. */
+void flushallCommand(client *c) {
+ int flags;
+ if (getFlushCommandFlags(c,&flags) == C_ERR) return;
+ /* flushall should not flush the functions */
+ flushAllDataAndResetRDB(flags | EMPTYDB_NOFUNCTIONS);
+
+ /* Without the forceCommandPropagation, when DBs were already empty,
+ * FLUSHALL will not be replicated nor put into the AOF. */
+ forceCommandPropagation(c, PROPAGATE_REPL | PROPAGATE_AOF);
+
+ addReply(c,shared.ok);
+}
+
+/* This command implements DEL and LAZYDEL. */
+void delGenericCommand(client *c, int lazy) {
+ int numdel = 0, j;
+
+ for (j = 1; j < c->argc; j++) {
+ expireIfNeeded(c->db,c->argv[j],0);
+ int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) :
+ dbSyncDelete(c->db,c->argv[j]);
+ if (deleted) {
+ signalModifiedKey(c,c->db,c->argv[j]);
+ notifyKeyspaceEvent(NOTIFY_GENERIC,
+ "del",c->argv[j],c->db->id);
+ server.dirty++;
+ numdel++;
+ }
+ }
+ addReplyLongLong(c,numdel);
+}
+
+void delCommand(client *c) {
+ delGenericCommand(c,server.lazyfree_lazy_user_del);
+}
+
+void unlinkCommand(client *c) {
+ delGenericCommand(c,1);
+}
+
+/* EXISTS key1 key2 ... key_N.
+ * Return value is the number of keys existing. */
+void existsCommand(client *c) {
+ long long count = 0;
+ int j;
+
+ for (j = 1; j < c->argc; j++) {
+ if (lookupKeyReadWithFlags(c->db,c->argv[j],LOOKUP_NOTOUCH)) count++;
+ }
+ addReplyLongLong(c,count);
+}
+
+void selectCommand(client *c) {
+ int id;
+
+ if (getIntFromObjectOrReply(c, c->argv[1], &id, NULL) != C_OK)
+ return;
+
+ if (server.cluster_enabled && id != 0) {
+ addReplyError(c,"SELECT is not allowed in cluster mode");
+ return;
+ }
+ if (selectDb(c,id) == C_ERR) {
+ addReplyError(c,"DB index is out of range");
+ } else {
+ addReply(c,shared.ok);
+ }
+}
+
+void randomkeyCommand(client *c) {
+ robj *key;
+
+ if ((key = dbRandomKey(c->db)) == NULL) {
+ addReplyNull(c);
+ return;
+ }
+
+ addReplyBulk(c,key);
+ decrRefCount(key);
+}
+
+void keysCommand(client *c) {
+ dictIterator *di;
+ dictEntry *de;
+ sds pattern = c->argv[1]->ptr;
+ int plen = sdslen(pattern), allkeys;
+ unsigned long numkeys = 0;
+ void *replylen = addReplyDeferredLen(c);
+
+ di = dictGetSafeIterator(c->db->dict);
+ allkeys = (pattern[0] == '*' && plen == 1);
+ while((de = dictNext(di)) != NULL) {
+ sds key = dictGetKey(de);
+ robj *keyobj;
+
+ if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
+ keyobj = createStringObject(key,sdslen(key));
+ if (!keyIsExpired(c->db,keyobj)) {
+ addReplyBulk(c,keyobj);
+ numkeys++;
+ }
+ decrRefCount(keyobj);
+ }
+ if (c->flags & CLIENT_CLOSE_ASAP)
+ break;
+ }
+ dictReleaseIterator(di);
+ setDeferredArrayLen(c,replylen,numkeys);
+}
+
+/* This callback is used by scanGenericCommand in order to collect elements
+ * returned by the dictionary iterator into a list. */
+void scanCallback(void *privdata, const dictEntry *de) {
+ void **pd = (void**) privdata;
+ list *keys = pd[0];
+ robj *o = pd[1];
+ robj *key, *val = NULL;
+
+ if (o == NULL) {
+ sds sdskey = dictGetKey(de);
+ key = createStringObject(sdskey, sdslen(sdskey));
+ } else if (o->type == OBJ_SET) {
+ sds keysds = dictGetKey(de);
+ key = createStringObject(keysds,sdslen(keysds));
+ } else if (o->type == OBJ_HASH) {
+ sds sdskey = dictGetKey(de);
+ sds sdsval = dictGetVal(de);
+ key = createStringObject(sdskey,sdslen(sdskey));
+ val = createStringObject(sdsval,sdslen(sdsval));
+ } else if (o->type == OBJ_ZSET) {
+ sds sdskey = dictGetKey(de);
+ key = createStringObject(sdskey,sdslen(sdskey));
+ val = createStringObjectFromLongDouble(*(double*)dictGetVal(de),0);
+ } else {
+ serverPanic("Type not handled in SCAN callback.");
+ }
+
+ listAddNodeTail(keys, key);
+ if (val) listAddNodeTail(keys, val);
+}
+
+/* Try to parse a SCAN cursor stored at object 'o':
+ * if the cursor is valid, store it as unsigned integer into *cursor and
+ * returns C_OK. Otherwise return C_ERR and send an error to the
+ * client. */
+int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor) {
+ char *eptr;
+
+ /* Use strtoul() because we need an *unsigned* long, so
+ * getLongLongFromObject() does not cover the whole cursor space. */
+ errno = 0;
+ *cursor = strtoul(o->ptr, &eptr, 10);
+ if (isspace(((char*)o->ptr)[0]) || eptr[0] != '\0' || errno == ERANGE)
+ {
+ addReplyError(c, "invalid cursor");
+ return C_ERR;
+ }
+ return C_OK;
+}
+
+/* This command implements SCAN, HSCAN and SSCAN commands.
+ * If object 'o' is passed, then it must be a Hash, Set or Zset object, otherwise
+ * if 'o' is NULL the command will operate on the dictionary associated with
+ * the current database.
+ *
+ * When 'o' is not NULL the function assumes that the first argument in
+ * the client arguments vector is a key so it skips it before iterating
+ * in order to parse options.
+ *
+ * In the case of a Hash object the function returns both the field and value
+ * of every element on the Hash. */
+void scanGenericCommand(client *c, robj *o, unsigned long cursor) {
+ int i, j;
+ list *keys = listCreate();
+ listNode *node, *nextnode;
+ long count = 10;
+ sds pat = NULL;
+ sds typename = NULL;
+ int patlen = 0, use_pattern = 0;
+ dict *ht;
+
+ /* Object must be NULL (to iterate keys names), or the type of the object
+ * must be Set, Sorted Set, or Hash. */
+ serverAssert(o == NULL || o->type == OBJ_SET || o->type == OBJ_HASH ||
+ o->type == OBJ_ZSET);
+
+ /* Set i to the first option argument. The previous one is the cursor. */
+ i = (o == NULL) ? 2 : 3; /* Skip the key argument if needed. */
+
+ /* Step 1: Parse options. */
+ while (i < c->argc) {
+ j = c->argc - i;
+ if (!strcasecmp(c->argv[i]->ptr, "count") && j >= 2) {
+ if (getLongFromObjectOrReply(c, c->argv[i+1], &count, NULL)
+ != C_OK)
+ {
+ goto cleanup;
+ }
+
+ if (count < 1) {
+ addReplyErrorObject(c,shared.syntaxerr);
+ goto cleanup;
+ }
+
+ i += 2;
+ } else if (!strcasecmp(c->argv[i]->ptr, "match") && j >= 2) {
+ pat = c->argv[i+1]->ptr;
+ patlen = sdslen(pat);
+
+ /* The pattern always matches if it is exactly "*", so it is
+ * equivalent to disabling it. */
+ use_pattern = !(patlen == 1 && pat[0] == '*');
+
+ i += 2;
+ } else if (!strcasecmp(c->argv[i]->ptr, "type") && o == NULL && j >= 2) {
+ /* SCAN for a particular type only applies to the db dict */
+ typename = c->argv[i+1]->ptr;
+ i+= 2;
+ } else {
+ addReplyErrorObject(c,shared.syntaxerr);
+ goto cleanup;
+ }
+ }
+
+ /* Step 2: Iterate the collection.
+ *
+ * Note that if the object is encoded with a listpack, intset, or any other
+ * representation that is not a hash table, we are sure that it is also
+ * composed of a small number of elements. So to avoid taking state we
+ * just return everything inside the object in a single call, setting the
+ * cursor to zero to signal the end of the iteration. */
+
+ /* Handle the case of a hash table. */
+ ht = NULL;
+ if (o == NULL) {
+ ht = c->db->dict;
+ } else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) {
+ ht = o->ptr;
+ } else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) {
+ ht = o->ptr;
+ count *= 2; /* We return key / value for this type. */
+ } else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) {
+ zset *zs = o->ptr;
+ ht = zs->dict;
+ count *= 2; /* We return key / value for this type. */
+ }
+
+ if (ht) {
+ void *privdata[2];
+ /* We set the max number of iterations to ten times the specified
+ * COUNT, so if the hash table is in a pathological state (very
+ * sparsely populated) we avoid to block too much time at the cost
+ * of returning no or very few elements. */
+ long maxiterations = count*10;
+
+ /* We pass two pointers to the callback: the list to which it will
+ * add new elements, and the object containing the dictionary so that
+ * it is possible to fetch more data in a type-dependent way. */
+ privdata[0] = keys;
+ privdata[1] = o;
+ do {
+ cursor = dictScan(ht, cursor, scanCallback, NULL, privdata);
+ } while (cursor &&
+ maxiterations-- &&
+ listLength(keys) < (unsigned long)count);
+ } else if (o->type == OBJ_SET) {
+ int pos = 0;
+ int64_t ll;
+
+ while(intsetGet(o->ptr,pos++,&ll))
+ listAddNodeTail(keys,createStringObjectFromLongLong(ll));
+ cursor = 0;
+ } else if (o->type == OBJ_HASH || o->type == OBJ_ZSET) {
+ unsigned char *p = lpFirst(o->ptr);
+ unsigned char *vstr;
+ int64_t vlen;
+ unsigned char intbuf[LP_INTBUF_SIZE];
+
+ while(p) {
+ vstr = lpGet(p,&vlen,intbuf);
+ listAddNodeTail(keys, createStringObject((char*)vstr,vlen));
+ p = lpNext(o->ptr,p);
+ }
+ cursor = 0;
+ } else {
+ serverPanic("Not handled encoding in SCAN.");
+ }
+
+ /* Step 3: Filter elements. */
+ node = listFirst(keys);
+ while (node) {
+ robj *kobj = listNodeValue(node);
+ nextnode = listNextNode(node);
+ int filter = 0;
+
+ /* Filter element if it does not match the pattern. */
+ if (use_pattern) {
+ if (sdsEncodedObject(kobj)) {
+ if (!stringmatchlen(pat, patlen, kobj->ptr, sdslen(kobj->ptr), 0))
+ filter = 1;
+ } else {
+ char buf[LONG_STR_SIZE];
+ int len;
+
+ serverAssert(kobj->encoding == OBJ_ENCODING_INT);
+ len = ll2string(buf,sizeof(buf),(long)kobj->ptr);
+ if (!stringmatchlen(pat, patlen, buf, len, 0)) filter = 1;
+ }
+ }
+
+ /* Filter an element if it isn't the type we want. */
+ if (!filter && o == NULL && typename){
+ robj* typecheck = lookupKeyReadWithFlags(c->db, kobj, LOOKUP_NOTOUCH);
+ char* type = getObjectTypeName(typecheck);
+ if (strcasecmp((char*) typename, type)) filter = 1;
+ }
+
+ /* Filter element if it is an expired key. */
+ if (!filter && o == NULL && expireIfNeeded(c->db, kobj, 0)) filter = 1;
+
+ /* Remove the element and its associated value if needed. */
+ if (filter) {
+ decrRefCount(kobj);
+ listDelNode(keys, node);
+ }
+
+ /* If this is a hash or a sorted set, we have a flat list of
+ * key-value elements, so if this element was filtered, remove the
+ * value, or skip it if it was not filtered: we only match keys. */
+ if (o && (o->type == OBJ_ZSET || o->type == OBJ_HASH)) {
+ node = nextnode;
+ serverAssert(node); /* assertion for valgrind (avoid NPD) */
+ nextnode = listNextNode(node);
+ if (filter) {
+ kobj = listNodeValue(node);
+ decrRefCount(kobj);
+ listDelNode(keys, node);
+ }
+ }
+ node = nextnode;
+ }
+
+ /* Step 4: Reply to the client. */
+ addReplyArrayLen(c, 2);
+ addReplyBulkLongLong(c,cursor);
+
+ addReplyArrayLen(c, listLength(keys));
+ while ((node = listFirst(keys)) != NULL) {
+ robj *kobj = listNodeValue(node);
+ addReplyBulk(c, kobj);
+ decrRefCount(kobj);
+ listDelNode(keys, node);
+ }
+
+cleanup:
+ listSetFreeMethod(keys,decrRefCountVoid);
+ listRelease(keys);
+}
+
+/* The SCAN command completely relies on scanGenericCommand. */
+void scanCommand(client *c) {
+ unsigned long cursor;
+ if (parseScanCursorOrReply(c,c->argv[1],&cursor) == C_ERR) return;
+ scanGenericCommand(c,NULL,cursor);
+}
+
+void dbsizeCommand(client *c) {
+ addReplyLongLong(c,dictSize(c->db->dict));
+}
+
+void lastsaveCommand(client *c) {
+ addReplyLongLong(c,server.lastsave);
+}
+
+char* getObjectTypeName(robj *o) {
+ char* type;
+ if (o == NULL) {
+ type = "none";
+ } else {
+ switch(o->type) {
+ case OBJ_STRING: type = "string"; break;
+ case OBJ_LIST: type = "list"; break;
+ case OBJ_SET: type = "set"; break;
+ case OBJ_ZSET: type = "zset"; break;
+ case OBJ_HASH: type = "hash"; break;
+ case OBJ_STREAM: type = "stream"; break;
+ case OBJ_MODULE: {
+ moduleValue *mv = o->ptr;
+ type = mv->type->name;
+ }; break;
+ default: type = "unknown"; break;
+ }
+ }
+ return type;
+}
+
+void typeCommand(client *c) {
+ robj *o;
+ o = lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH);
+ addReplyStatus(c, getObjectTypeName(o));
+}
+
+void shutdownCommand(client *c) {
+ int flags = SHUTDOWN_NOFLAGS;
+ int abort = 0;
+ for (int i = 1; i < c->argc; i++) {
+ if (!strcasecmp(c->argv[i]->ptr,"nosave")) {
+ flags |= SHUTDOWN_NOSAVE;
+ } else if (!strcasecmp(c->argv[i]->ptr,"save")) {
+ flags |= SHUTDOWN_SAVE;
+ } else if (!strcasecmp(c->argv[i]->ptr, "now")) {
+ flags |= SHUTDOWN_NOW;
+ } else if (!strcasecmp(c->argv[i]->ptr, "force")) {
+ flags |= SHUTDOWN_FORCE;
+ } else if (!strcasecmp(c->argv[i]->ptr, "abort")) {
+ abort = 1;
+ } else {
+ addReplyErrorObject(c,shared.syntaxerr);
+ return;
+ }
+ }
+ if ((abort && flags != SHUTDOWN_NOFLAGS) ||
+ (flags & SHUTDOWN_NOSAVE && flags & SHUTDOWN_SAVE))
+ {
+ /* Illegal combo. */
+ addReplyErrorObject(c,shared.syntaxerr);
+ return;
+ }
+
+ if (abort) {
+ if (abortShutdown() == C_OK)
+ addReply(c, shared.ok);
+ else
+ addReplyError(c, "No shutdown in progress.");
+ return;
+ }
+
+ if (!(flags & SHUTDOWN_NOW) && c->flags & CLIENT_DENY_BLOCKING) {
+ addReplyError(c, "SHUTDOWN without NOW or ABORT isn't allowed for DENY BLOCKING client");
+ return;
+ }
+
+ if (!(flags & SHUTDOWN_NOSAVE) && isInsideYieldingLongCommand()) {
+ /* Script timed out. Shutdown allowed only with the NOSAVE flag. See
+ * also processCommand where these errors are returned. */
+ if (server.busy_module_yield_flags && server.busy_module_yield_reply) {
+ addReplyErrorFormat(c, "-BUSY %s", server.busy_module_yield_reply);
+ } else if (server.busy_module_yield_flags) {
+ addReplyErrorObject(c, shared.slowmoduleerr);
+ } else if (scriptIsEval()) {
+ addReplyErrorObject(c, shared.slowevalerr);
+ } else {
+ addReplyErrorObject(c, shared.slowscripterr);
+ }
+ return;
+ }
+
+ blockClient(c, BLOCKED_SHUTDOWN);
+ if (prepareForShutdown(flags) == C_OK) exit(0);
+ /* If we're here, then shutdown is ongoing (the client is still blocked) or
+ * failed (the client has received an error). */
+}
+
+void renameGenericCommand(client *c, int nx) {
+ robj *o;
+ long long expire;
+ int samekey = 0;
+
+ /* When source and dest key is the same, no operation is performed,
+ * if the key exists, however we still return an error on unexisting key. */
+ if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) samekey = 1;
+
+ if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL)
+ return;
+
+ if (samekey) {
+ addReply(c,nx ? shared.czero : shared.ok);
+ return;
+ }
+
+ incrRefCount(o);
+ expire = getExpire(c->db,c->argv[1]);
+ if (lookupKeyWrite(c->db,c->argv[2]) != NULL) {
+ if (nx) {
+ decrRefCount(o);
+ addReply(c,shared.czero);
+ return;
+ }
+ /* Overwrite: delete the old key before creating the new one
+ * with the same name. */
+ dbDelete(c->db,c->argv[2]);
+ }
+ dbAdd(c->db,c->argv[2],o);
+ if (expire != -1) setExpire(c,c->db,c->argv[2],expire);
+ dbDelete(c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[2]);
+ notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from",
+ c->argv[1],c->db->id);
+ notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_to",
+ c->argv[2],c->db->id);
+ server.dirty++;
+ addReply(c,nx ? shared.cone : shared.ok);
+}
+
+void renameCommand(client *c) {
+ renameGenericCommand(c,0);
+}
+
+void renamenxCommand(client *c) {
+ renameGenericCommand(c,1);
+}
+
+void moveCommand(client *c) {
+ robj *o;
+ redisDb *src, *dst;
+ int srcid, dbid;
+ long long expire;
+
+ if (server.cluster_enabled) {
+ addReplyError(c,"MOVE is not allowed in cluster mode");
+ return;
+ }
+
+ /* Obtain source and target DB pointers */
+ src = c->db;
+ srcid = c->db->id;
+
+ if (getIntFromObjectOrReply(c, c->argv[2], &dbid, NULL) != C_OK)
+ return;
+
+ if (selectDb(c,dbid) == C_ERR) {
+ addReplyError(c,"DB index is out of range");
+ return;
+ }
+ dst = c->db;
+ selectDb(c,srcid); /* Back to the source DB */
+
+ /* If the user is moving using as target the same
+ * DB as the source DB it is probably an error. */
+ if (src == dst) {
+ addReplyErrorObject(c,shared.sameobjecterr);
+ return;
+ }
+
+ /* Check if the element exists and get a reference */
+ o = lookupKeyWrite(c->db,c->argv[1]);
+ if (!o) {
+ addReply(c,shared.czero);
+ return;
+ }
+ expire = getExpire(c->db,c->argv[1]);
+
+ /* Return zero if the key already exists in the target DB */
+ if (lookupKeyWrite(dst,c->argv[1]) != NULL) {
+ addReply(c,shared.czero);
+ return;
+ }
+ dbAdd(dst,c->argv[1],o);
+ if (expire != -1) setExpire(c,dst,c->argv[1],expire);
+ incrRefCount(o);
+
+ /* OK! key moved, free the entry in the source DB */
+ dbDelete(src,c->argv[1]);
+ signalModifiedKey(c,src,c->argv[1]);
+ signalModifiedKey(c,dst,c->argv[1]);
+ notifyKeyspaceEvent(NOTIFY_GENERIC,
+ "move_from",c->argv[1],src->id);
+ notifyKeyspaceEvent(NOTIFY_GENERIC,
+ "move_to",c->argv[1],dst->id);
+
+ server.dirty++;
+ addReply(c,shared.cone);
+}
+
+void copyCommand(client *c) {
+ robj *o;
+ redisDb *src, *dst;
+ int srcid, dbid;
+ long long expire;
+ int j, replace = 0, delete = 0;
+
+ /* Obtain source and target DB pointers
+ * Default target DB is the same as the source DB
+ * Parse the REPLACE option and targetDB option. */
+ src = c->db;
+ dst = c->db;
+ srcid = c->db->id;
+ dbid = c->db->id;
+ for (j = 3; j < c->argc; j++) {
+ int additional = c->argc - j - 1;
+ if (!strcasecmp(c->argv[j]->ptr,"replace")) {
+ replace = 1;
+ } else if (!strcasecmp(c->argv[j]->ptr, "db") && additional >= 1) {
+ if (getIntFromObjectOrReply(c, c->argv[j+1], &dbid, NULL) != C_OK)
+ return;
+
+ if (selectDb(c, dbid) == C_ERR) {
+ addReplyError(c,"DB index is out of range");
+ return;
+ }
+ dst = c->db;
+ selectDb(c,srcid); /* Back to the source DB */
+ j++; /* Consume additional arg. */
+ } else {
+ addReplyErrorObject(c,shared.syntaxerr);
+ return;
+ }
+ }
+
+ if ((server.cluster_enabled == 1) && (srcid != 0 || dbid != 0)) {
+ addReplyError(c,"Copying to another database is not allowed in cluster mode");
+ return;
+ }
+
+ /* If the user select the same DB as
+ * the source DB and using newkey as the same key
+ * it is probably an error. */
+ robj *key = c->argv[1];
+ robj *newkey = c->argv[2];
+ if (src == dst && (sdscmp(key->ptr, newkey->ptr) == 0)) {
+ addReplyErrorObject(c,shared.sameobjecterr);
+ return;
+ }
+
+ /* Check if the element exists and get a reference */
+ o = lookupKeyRead(c->db, key);
+ if (!o) {
+ addReply(c,shared.czero);
+ return;
+ }
+ expire = getExpire(c->db,key);
+
+ /* Return zero if the key already exists in the target DB.
+ * If REPLACE option is selected, delete newkey from targetDB. */
+ if (lookupKeyWrite(dst,newkey) != NULL) {
+ if (replace) {
+ delete = 1;
+ } else {
+ addReply(c,shared.czero);
+ return;
+ }
+ }
+
+ /* Duplicate object according to object's type. */
+ robj *newobj;
+ switch(o->type) {
+ case OBJ_STRING: newobj = dupStringObject(o); break;
+ case OBJ_LIST: newobj = listTypeDup(o); break;
+ case OBJ_SET: newobj = setTypeDup(o); break;
+ case OBJ_ZSET: newobj = zsetDup(o); break;
+ case OBJ_HASH: newobj = hashTypeDup(o); break;
+ case OBJ_STREAM: newobj = streamDup(o); break;
+ case OBJ_MODULE:
+ newobj = moduleTypeDupOrReply(c, key, newkey, dst->id, o);
+ if (!newobj) return;
+ break;
+ default:
+ addReplyError(c, "unknown type object");
+ return;
+ }
+
+ if (delete) {
+ dbDelete(dst,newkey);
+ }
+
+ dbAdd(dst,newkey,newobj);
+ if (expire != -1) setExpire(c, dst, newkey, expire);
+
+ /* OK! key copied */
+ signalModifiedKey(c,dst,c->argv[2]);
+ notifyKeyspaceEvent(NOTIFY_GENERIC,"copy_to",c->argv[2],dst->id);
+
+ server.dirty++;
+ addReply(c,shared.cone);
+}
+
+/* Helper function for dbSwapDatabases(): scans the list of keys that have
+ * one or more blocked clients for B[LR]POP or other blocking commands
+ * and signal the keys as ready if they are of the right type. See the comment
+ * where the function is used for more info. */
+void scanDatabaseForReadyKeys(redisDb *db) {
+ dictEntry *de;
+ dictIterator *di = dictGetSafeIterator(db->blocking_keys);
+ while((de = dictNext(di)) != NULL) {
+ robj *key = dictGetKey(de);
+ dictEntry *kde = dictFind(db->dict,key->ptr);
+ if (kde) {
+ robj *value = dictGetVal(kde);
+ signalKeyAsReady(db, key, value->type);
+ }
+ }
+ dictReleaseIterator(di);
+}
+
+/* Since we are unblocking XREADGROUP clients in the event the
+ * key was deleted/overwritten we must do the same in case the
+ * database was flushed/swapped. */
+void scanDatabaseForDeletedStreams(redisDb *emptied, redisDb *replaced_with) {
+ /* Optimization: If no clients are in type BLOCKED_STREAM,
+ * we can skip this loop. */
+ if (!server.blocked_clients_by_type[BLOCKED_STREAM]) return;
+
+ dictEntry *de;
+ dictIterator *di = dictGetSafeIterator(emptied->blocking_keys);
+ while((de = dictNext(di)) != NULL) {
+ robj *key = dictGetKey(de);
+ int was_stream = 0, is_stream = 0;
+
+ dictEntry *kde = dictFind(emptied->dict, key->ptr);
+ if (kde) {
+ robj *value = dictGetVal(kde);
+ was_stream = value->type == OBJ_STREAM;
+ }
+ if (replaced_with) {
+ dictEntry *kde = dictFind(replaced_with->dict, key->ptr);
+ if (kde) {
+ robj *value = dictGetVal(kde);
+ is_stream = value->type == OBJ_STREAM;
+ }
+ }
+ /* We want to try to unblock any client using a blocking XREADGROUP */
+ if (was_stream && !is_stream)
+ signalKeyAsReady(emptied, key, OBJ_STREAM);
+ }
+ dictReleaseIterator(di);
+}
+
+/* Swap two databases at runtime so that all clients will magically see
+ * the new database even if already connected. Note that the client
+ * structure c->db points to a given DB, so we need to be smarter and
+ * swap the underlying referenced structures, otherwise we would need
+ * to fix all the references to the Redis DB structure.
+ *
+ * Returns C_ERR if at least one of the DB ids are out of range, otherwise
+ * C_OK is returned. */
+int dbSwapDatabases(int id1, int id2) {
+ if (id1 < 0 || id1 >= server.dbnum ||
+ id2 < 0 || id2 >= server.dbnum) return C_ERR;
+ if (id1 == id2) return C_OK;
+ redisDb aux = server.db[id1];
+ redisDb *db1 = &server.db[id1], *db2 = &server.db[id2];
+
+ /* Swapdb should make transaction fail if there is any
+ * client watching keys */
+ touchAllWatchedKeysInDb(db1, db2);
+ touchAllWatchedKeysInDb(db2, db1);
+
+ /* Try to unblock any XREADGROUP clients if the key no longer exists. */
+ scanDatabaseForDeletedStreams(db1, db2);
+ scanDatabaseForDeletedStreams(db2, db1);
+
+ /* Swap hash tables. Note that we don't swap blocking_keys,
+ * ready_keys and watched_keys, since we want clients to
+ * remain in the same DB they were. */
+ db1->dict = db2->dict;
+ db1->expires = db2->expires;
+ db1->avg_ttl = db2->avg_ttl;
+ db1->expires_cursor = db2->expires_cursor;
+
+ db2->dict = aux.dict;
+ db2->expires = aux.expires;
+ db2->avg_ttl = aux.avg_ttl;
+ db2->expires_cursor = aux.expires_cursor;
+
+ /* Now we need to handle clients blocked on lists: as an effect
+ * of swapping the two DBs, a client that was waiting for list
+ * X in a given DB, may now actually be unblocked if X happens
+ * to exist in the new version of the DB, after the swap.
+ *
+ * However normally we only do this check for efficiency reasons
+ * in dbAdd() when a list is created. So here we need to rescan
+ * the list of clients blocked on lists and signal lists as ready
+ * if needed. */
+ scanDatabaseForReadyKeys(db1);
+ scanDatabaseForReadyKeys(db2);
+ return C_OK;
+}
+
+/* Logically, this discards (flushes) the old main database, and apply the newly loaded
+ * database (temp) as the main (active) database, the actual freeing of old database
+ * (which will now be placed in the temp one) is done later. */
+void swapMainDbWithTempDb(redisDb *tempDb) {
+ if (server.cluster_enabled) {
+ /* Swap slots_to_keys from tempdb just loaded with main db slots_to_keys. */
+ clusterSlotToKeyMapping *aux = server.db->slots_to_keys;
+ server.db->slots_to_keys = tempDb->slots_to_keys;
+ tempDb->slots_to_keys = aux;
+ }
+
+ for (int i=0; i<server.dbnum; i++) {
+ redisDb aux = server.db[i];
+ redisDb *activedb = &server.db[i], *newdb = &tempDb[i];
+
+ /* Swapping databases should make transaction fail if there is any
+ * client watching keys. */
+ touchAllWatchedKeysInDb(activedb, newdb);
+
+ /* Try to unblock any XREADGROUP clients if the key no longer exists. */
+ scanDatabaseForDeletedStreams(activedb, newdb);
+
+ /* Swap hash tables. Note that we don't swap blocking_keys,
+ * ready_keys and watched_keys, since clients
+ * remain in the same DB they were. */
+ activedb->dict = newdb->dict;
+ activedb->expires = newdb->expires;
+ activedb->avg_ttl = newdb->avg_ttl;
+ activedb->expires_cursor = newdb->expires_cursor;
+
+ newdb->dict = aux.dict;
+ newdb->expires = aux.expires;
+ newdb->avg_ttl = aux.avg_ttl;
+ newdb->expires_cursor = aux.expires_cursor;
+
+ /* Now we need to handle clients blocked on lists: as an effect
+ * of swapping the two DBs, a client that was waiting for list
+ * X in a given DB, may now actually be unblocked if X happens
+ * to exist in the new version of the DB, after the swap.
+ *
+ * However normally we only do this check for efficiency reasons
+ * in dbAdd() when a list is created. So here we need to rescan
+ * the list of clients blocked on lists and signal lists as ready
+ * if needed. */
+ scanDatabaseForReadyKeys(activedb);
+ }
+
+ trackingInvalidateKeysOnFlush(1);
+ flushSlaveKeysWithExpireList();
+}
+
+/* SWAPDB db1 db2 */
+void swapdbCommand(client *c) {
+ int id1, id2;
+
+ /* Not allowed in cluster mode: we have just DB 0 there. */
+ if (server.cluster_enabled) {
+ addReplyError(c,"SWAPDB is not allowed in cluster mode");
+ return;
+ }
+
+ /* Get the two DBs indexes. */
+ if (getIntFromObjectOrReply(c, c->argv[1], &id1,
+ "invalid first DB index") != C_OK)
+ return;
+
+ if (getIntFromObjectOrReply(c, c->argv[2], &id2,
+ "invalid second DB index") != C_OK)
+ return;
+
+ /* Swap... */
+ if (dbSwapDatabases(id1,id2) == C_ERR) {
+ addReplyError(c,"DB index is out of range");
+ return;
+ } else {
+ RedisModuleSwapDbInfo si = {REDISMODULE_SWAPDBINFO_VERSION,id1,id2};
+ moduleFireServerEvent(REDISMODULE_EVENT_SWAPDB,0,&si);
+ server.dirty++;
+ addReply(c,shared.ok);
+ }
+}
+
+/*-----------------------------------------------------------------------------
+ * Expires API
+ *----------------------------------------------------------------------------*/
+
+int removeExpire(redisDb *db, robj *key) {
+ /* An expire may only be removed if there is a corresponding entry in the
+ * main dict. Otherwise, the key will never be freed. */
+ serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
+ return dictDelete(db->expires,key->ptr) == DICT_OK;
+}
+
+/* Set an expire to the specified key. If the expire is set in the context
+ * of an user calling a command 'c' is the client, otherwise 'c' is set
+ * to NULL. The 'when' parameter is the absolute unix time in milliseconds
+ * after which the key will no longer be considered valid. */
+void setExpire(client *c, redisDb *db, robj *key, long long when) {
+ dictEntry *kde, *de;
+
+ /* Reuse the sds from the main dict in the expire dict */
+ kde = dictFind(db->dict,key->ptr);
+ serverAssertWithInfo(NULL,key,kde != NULL);
+ de = dictAddOrFind(db->expires,dictGetKey(kde));
+ dictSetSignedIntegerVal(de,when);
+
+ int writable_slave = server.masterhost && server.repl_slave_ro == 0;
+ if (c && writable_slave && !(c->flags & CLIENT_MASTER))
+ rememberSlaveKeyWithExpire(db,key);
+}
+
+/* Return the expire time of the specified key, or -1 if no expire
+ * is associated with this key (i.e. the key is non volatile) */
+long long getExpire(redisDb *db, robj *key) {
+ dictEntry *de;
+
+ /* No expire? return ASAP */
+ if (dictSize(db->expires) == 0 ||
+ (de = dictFind(db->expires,key->ptr)) == NULL) return -1;
+
+ /* The entry was found in the expire dict, this means it should also
+ * be present in the main dict (safety check). */
+ serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
+ return dictGetSignedIntegerVal(de);
+}
+
+/* Delete the specified expired key and propagate expire. */
+void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj) {
+ mstime_t expire_latency;
+ latencyStartMonitor(expire_latency);
+ if (server.lazyfree_lazy_expire)
+ dbAsyncDelete(db,keyobj);
+ else
+ dbSyncDelete(db,keyobj);
+ latencyEndMonitor(expire_latency);
+ latencyAddSampleIfNeeded("expire-del",expire_latency);
+ notifyKeyspaceEvent(NOTIFY_EXPIRED,"expired",keyobj,db->id);
+ signalModifiedKey(NULL, db, keyobj);
+ propagateDeletion(db,keyobj,server.lazyfree_lazy_expire);
+ server.stat_expiredkeys++;
+}
+
+/* Propagate expires into slaves and the AOF file.
+ * When a key expires in the master, a DEL operation for this key is sent
+ * to all the slaves and the AOF file if enabled.
+ *
+ * This way the key expiry is centralized in one place, and since both
+ * AOF and the master->slave link guarantee operation ordering, everything
+ * will be consistent even if we allow write operations against expiring
+ * keys.
+ *
+ * This function may be called from:
+ * 1. Within call(): Example: Lazy-expire on key access.
+ * In this case the caller doesn't have to do anything
+ * because call() handles server.also_propagate(); or
+ * 2. Outside of call(): Example: Active-expire, eviction.
+ * In this the caller must remember to call
+ * postExecutionUnitOperations, preferably just after a
+ * single deletion batch, so that DELs will NOT be wrapped
+ * in MULTI/EXEC */
+void propagateDeletion(redisDb *db, robj *key, int lazy) {
+ robj *argv[2];
+
+ argv[0] = lazy ? shared.unlink : shared.del;
+ argv[1] = key;
+ incrRefCount(argv[0]);
+ incrRefCount(argv[1]);
+
+ /* If the master decided to expire a key we must propagate it to replicas no matter what..
+ * Even if module executed a command without asking for propagation. */
+ int prev_replication_allowed = server.replication_allowed;
+ server.replication_allowed = 1;
+ alsoPropagate(db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
+ server.replication_allowed = prev_replication_allowed;
+
+ decrRefCount(argv[0]);
+ decrRefCount(argv[1]);
+}
+
+/* Check if the key is expired. */
+int keyIsExpired(redisDb *db, robj *key) {
+ mstime_t when = getExpire(db,key);
+ mstime_t now;
+
+ if (when < 0) return 0; /* No expire for this key */
+
+ /* Don't expire anything while loading. It will be done later. */
+ if (server.loading) return 0;
+
+ /* If we are in the context of a Lua script, we pretend that time is
+ * blocked to when the Lua script started. This way a key can expire
+ * only the first time it is accessed and not in the middle of the
+ * script execution, making propagation to slaves / AOF consistent.
+ * See issue #1525 on Github for more information. */
+ if (server.script_caller) {
+ now = scriptTimeSnapshot();
+ }
+ /* If we are in the middle of a command execution, we still want to use
+ * a reference time that does not change: in that case we just use the
+ * cached time, that we update before each call in the call() function.
+ * This way we avoid that commands such as RPOPLPUSH or similar, that
+ * may re-open the same key multiple times, can invalidate an already
+ * open object in a next call, if the next call will see the key expired,
+ * while the first did not. */
+ else if (server.fixed_time_expire > 0) {
+ now = server.mstime;
+ }
+ /* For the other cases, we want to use the most fresh time we have. */
+ else {
+ now = mstime();
+ }
+
+ /* The key expired if the current (virtual or real) time is greater
+ * than the expire time of the key. */
+ return now > when;
+}
+
+/* This function is called when we are going to perform some operation
+ * in a given key, but such key may be already logically expired even if
+ * it still exists in the database. The main way this function is called
+ * is via lookupKey*() family of functions.
+ *
+ * The behavior of the function depends on the replication role of the
+ * instance, because by default replicas do not delete expired keys. They
+ * wait for DELs from the master for consistency matters. However even
+ * replicas will try to have a coherent return value for the function,
+ * so that read commands executed in the replica side will be able to
+ * behave like if the key is expired even if still present (because the
+ * master has yet to propagate the DEL).
+ *
+ * In masters as a side effect of finding a key which is expired, such
+ * key will be evicted from the database. Also this may trigger the
+ * propagation of a DEL/UNLINK command in AOF / replication stream.
+ *
+ * On replicas, this function does not delete expired keys by default, but
+ * it still returns 1 if the key is logically expired. To force deletion
+ * of logically expired keys even on replicas, use the EXPIRE_FORCE_DELETE_EXPIRED
+ * flag. Note though that if the current client is executing
+ * replicated commands from the master, keys are never considered expired.
+ *
+ * On the other hand, if you just want expiration check, but need to avoid
+ * the actual key deletion and propagation of the deletion, use the
+ * EXPIRE_AVOID_DELETE_EXPIRED flag.
+ *
+ * The return value of the function is 0 if the key is still valid,
+ * otherwise the function returns 1 if the key is expired. */
+int expireIfNeeded(redisDb *db, robj *key, int flags) {
+ if (!keyIsExpired(db,key)) return 0;
+
+ /* If we are running in the context of a replica, instead of
+ * evicting the expired key from the database, we return ASAP:
+ * the replica key expiration is controlled by the master that will
+ * send us synthesized DEL operations for expired keys. The
+ * exception is when write operations are performed on writable
+ * replicas.
+ *
+ * Still we try to return the right information to the caller,
+ * that is, 0 if we think the key should be still valid, 1 if
+ * we think the key is expired at this time.
+ *
+ * When replicating commands from the master, keys are never considered
+ * expired. */
+ if (server.masterhost != NULL) {
+ if (server.current_client == server.master) return 0;
+ if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return 1;
+ }
+
+ /* In some cases we're explicitly instructed to return an indication of a
+ * missing key without actually deleting it, even on masters. */
+ if (flags & EXPIRE_AVOID_DELETE_EXPIRED)
+ return 1;
+
+ /* If clients are paused, we keep the current dataset constant,
+ * but return to the client what we believe is the right state. Typically,
+ * at the end of the pause we will properly expire the key OR we will
+ * have failed over and the new primary will send us the expire. */
+ if (checkClientPauseTimeoutAndReturnIfPaused()) return 1;
+
+ /* Delete the key */
+ deleteExpiredKeyAndPropagate(db,key);
+ return 1;
+}
+
+/* -----------------------------------------------------------------------------
+ * API to get key arguments from commands
+ * ---------------------------------------------------------------------------*/
+
+/* Prepare the getKeysResult struct to hold numkeys, either by using the
+ * pre-allocated keysbuf or by allocating a new array on the heap.
+ *
+ * This function must be called at least once before starting to populate
+ * the result, and can be called repeatedly to enlarge the result array.
+ */
+keyReference *getKeysPrepareResult(getKeysResult *result, int numkeys) {
+ /* GETKEYS_RESULT_INIT initializes keys to NULL, point it to the pre-allocated stack
+ * buffer here. */
+ if (!result->keys) {
+ serverAssert(!result->numkeys);
+ result->keys = result->keysbuf;
+ }
+
+ /* Resize if necessary */
+ if (numkeys > result->size) {
+ if (result->keys != result->keysbuf) {
+ /* We're not using a static buffer, just (re)alloc */
+ result->keys = zrealloc(result->keys, numkeys * sizeof(keyReference));
+ } else {
+ /* We are using a static buffer, copy its contents */
+ result->keys = zmalloc(numkeys * sizeof(keyReference));
+ if (result->numkeys)
+ memcpy(result->keys, result->keysbuf, result->numkeys * sizeof(keyReference));
+ }
+ result->size = numkeys;
+ }
+
+ return result->keys;
+}
+
+/* Returns a bitmask with all the flags found in any of the key specs of the command.
+ * The 'inv' argument means we'll return a mask with all flags that are missing in at least one spec. */
+int64_t getAllKeySpecsFlags(struct redisCommand *cmd, int inv) {
+ int64_t flags = 0;
+ for (int j = 0; j < cmd->key_specs_num; j++) {
+ keySpec *spec = cmd->key_specs + j;
+ flags |= inv? ~spec->flags : spec->flags;
+ }
+ return flags;
+}
+
+/* Fetch the keys based of the provided key specs. Returns the number of keys found, or -1 on error.
+ * There are several flags that can be used to modify how this function finds keys in a command.
+ *
+ * GET_KEYSPEC_INCLUDE_NOT_KEYS: Return 'fake' keys as if they were keys.
+ * GET_KEYSPEC_RETURN_PARTIAL: Skips invalid and incomplete keyspecs but returns the keys
+ * found in other valid keyspecs.
+ */
+int getKeysUsingKeySpecs(struct redisCommand *cmd, robj **argv, int argc, int search_flags, getKeysResult *result) {
+ int j, i, last, first, step;
+ keyReference *keys;
+ result->numkeys = 0;
+
+ for (j = 0; j < cmd->key_specs_num; j++) {
+ keySpec *spec = cmd->key_specs + j;
+ serverAssert(spec->begin_search_type != KSPEC_BS_INVALID);
+ /* Skip specs that represent 'fake' keys */
+ if ((spec->flags & CMD_KEY_NOT_KEY) && !(search_flags & GET_KEYSPEC_INCLUDE_NOT_KEYS)) {
+ continue;
+ }
+
+ first = 0;
+ if (spec->begin_search_type == KSPEC_BS_INDEX) {
+ first = spec->bs.index.pos;
+ } else if (spec->begin_search_type == KSPEC_BS_KEYWORD) {
+ int start_index = spec->bs.keyword.startfrom > 0 ? spec->bs.keyword.startfrom : argc+spec->bs.keyword.startfrom;
+ int end_index = spec->bs.keyword.startfrom > 0 ? argc-1: 1;
+ for (i = start_index; i != end_index; i = start_index <= end_index ? i + 1 : i - 1) {
+ if (i >= argc || i < 1)
+ break;
+ if (!strcasecmp((char*)argv[i]->ptr,spec->bs.keyword.keyword)) {
+ first = i+1;
+ break;
+ }
+ }
+ /* keyword not found */
+ if (!first) {
+ continue;
+ }
+ } else {
+ /* unknown spec */
+ goto invalid_spec;
+ }
+
+ if (spec->find_keys_type == KSPEC_FK_RANGE) {
+ step = spec->fk.range.keystep;
+ if (spec->fk.range.lastkey >= 0) {
+ last = first + spec->fk.range.lastkey;
+ } else {
+ if (!spec->fk.range.limit) {
+ last = argc + spec->fk.range.lastkey;
+ } else {
+ serverAssert(spec->fk.range.lastkey == -1);
+ last = first + ((argc-first)/spec->fk.range.limit + spec->fk.range.lastkey);
+ }
+ }
+ } else if (spec->find_keys_type == KSPEC_FK_KEYNUM) {
+ step = spec->fk.keynum.keystep;
+ long long numkeys;
+ if (spec->fk.keynum.keynumidx >= argc)
+ goto invalid_spec;
+
+ sds keynum_str = argv[first + spec->fk.keynum.keynumidx]->ptr;
+ if (!string2ll(keynum_str,sdslen(keynum_str),&numkeys) || numkeys < 0) {
+ /* Unable to parse the numkeys argument or it was invalid */
+ goto invalid_spec;
+ }
+
+ first += spec->fk.keynum.firstkey;
+ last = first + (int)numkeys-1;
+ } else {
+ /* unknown spec */
+ goto invalid_spec;
+ }
+
+ int count = ((last - first)+1);
+ keys = getKeysPrepareResult(result, result->numkeys + count);
+
+ /* First or last is out of bounds, which indicates a syntax error */
+ if (last >= argc || last < first || first >= argc) {
+ goto invalid_spec;
+ }
+
+ for (i = first; i <= last; i += step) {
+ if (i >= argc || i < first) {
+ /* Modules commands, and standard commands with a not fixed number
+ * of arguments (negative arity parameter) do not have dispatch
+ * time arity checks, so we need to handle the case where the user
+ * passed an invalid number of arguments here. In this case we
+ * return no keys and expect the command implementation to report
+ * an arity or syntax error. */
+ if (cmd->flags & CMD_MODULE || cmd->arity < 0) {
+ continue;
+ } else {
+ serverPanic("Redis built-in command declared keys positions not matching the arity requirements.");
+ }
+ }
+ keys[result->numkeys].pos = i;
+ keys[result->numkeys].flags = spec->flags;
+ result->numkeys++;
+ }
+
+ /* Handle incomplete specs (only after we added the current spec
+ * to `keys`, just in case GET_KEYSPEC_RETURN_PARTIAL was given) */
+ if (spec->flags & CMD_KEY_INCOMPLETE) {
+ goto invalid_spec;
+ }
+
+ /* Done with this spec */
+ continue;
+
+invalid_spec:
+ if (search_flags & GET_KEYSPEC_RETURN_PARTIAL) {
+ continue;
+ } else {
+ result->numkeys = 0;
+ return -1;
+ }
+ }
+
+ return result->numkeys;
+}
+
+/* Return all the arguments that are keys in the command passed via argc / argv.
+ * This function will eventually replace getKeysFromCommand.
+ *
+ * The command returns the positions of all the key arguments inside the array,
+ * so the actual return value is a heap allocated array of integers. The
+ * length of the array is returned by reference into *numkeys.
+ *
+ * Along with the position, this command also returns the flags that are
+ * associated with how Redis will access the key.
+ *
+ * 'cmd' must be point to the corresponding entry into the redisCommand
+ * table, according to the command name in argv[0]. */
+int getKeysFromCommandWithSpecs(struct redisCommand *cmd, robj **argv, int argc, int search_flags, getKeysResult *result) {
+ /* The command has at least one key-spec not marked as NOT_KEY */
+ int has_keyspec = (getAllKeySpecsFlags(cmd, 1) & CMD_KEY_NOT_KEY);
+ /* The command has at least one key-spec marked as VARIABLE_FLAGS */
+ int has_varflags = (getAllKeySpecsFlags(cmd, 0) & CMD_KEY_VARIABLE_FLAGS);
+
+ /* We prefer key-specs if there are any, and their flags are reliable. */
+ if (has_keyspec && !has_varflags) {
+ int ret = getKeysUsingKeySpecs(cmd,argv,argc,search_flags,result);
+ if (ret >= 0)
+ return ret;
+ /* If the specs returned with an error (probably an INVALID or INCOMPLETE spec),
+ * fallback to the callback method. */
+ }
+
+ /* Resort to getkeys callback methods. */
+ if (cmd->flags & CMD_MODULE_GETKEYS)
+ return moduleGetCommandKeysViaAPI(cmd,argv,argc,result);
+
+ /* We use native getkeys as a last resort, since not all these native getkeys provide
+ * flags properly (only the ones that correspond to INVALID, INCOMPLETE or VARIABLE_FLAGS do.*/
+ if (cmd->getkeys_proc)
+ return cmd->getkeys_proc(cmd,argv,argc,result);
+ return 0;
+}
+
+/* This function returns a sanity check if the command may have keys. */
+int doesCommandHaveKeys(struct redisCommand *cmd) {
+ return cmd->getkeys_proc || /* has getkeys_proc (non modules) */
+ (cmd->flags & CMD_MODULE_GETKEYS) || /* module with GETKEYS */
+ (getAllKeySpecsFlags(cmd, 1) & CMD_KEY_NOT_KEY); /* has at least one key-spec not marked as NOT_KEY */
+}
+
+/* A simplified channel spec table that contains all of the redis commands
+ * and which channels they have and how they are accessed. */
+typedef struct ChannelSpecs {
+ redisCommandProc *proc; /* Command procedure to match against */
+ uint64_t flags; /* CMD_CHANNEL_* flags for this command */
+ int start; /* The initial position of the first channel */
+ int count; /* The number of channels, or -1 if all remaining
+ * arguments are channels. */
+} ChannelSpecs;
+
+ChannelSpecs commands_with_channels[] = {
+ {subscribeCommand, CMD_CHANNEL_SUBSCRIBE, 1, -1},
+ {ssubscribeCommand, CMD_CHANNEL_SUBSCRIBE, 1, -1},
+ {unsubscribeCommand, CMD_CHANNEL_UNSUBSCRIBE, 1, -1},
+ {sunsubscribeCommand, CMD_CHANNEL_UNSUBSCRIBE, 1, -1},
+ {psubscribeCommand, CMD_CHANNEL_PATTERN | CMD_CHANNEL_SUBSCRIBE, 1, -1},
+ {punsubscribeCommand, CMD_CHANNEL_PATTERN | CMD_CHANNEL_UNSUBSCRIBE, 1, -1},
+ {publishCommand, CMD_CHANNEL_PUBLISH, 1, 1},
+ {spublishCommand, CMD_CHANNEL_PUBLISH, 1, 1},
+ {NULL,0} /* Terminator. */
+};
+
+/* Returns 1 if the command may access any channels matched by the flags
+ * argument. */
+int doesCommandHaveChannelsWithFlags(struct redisCommand *cmd, int flags) {
+ /* If a module declares get channels, we are just going to assume
+ * has channels. This API is allowed to return false positives. */
+ if (cmd->flags & CMD_MODULE_GETCHANNELS) {
+ return 1;
+ }
+ for (ChannelSpecs *spec = commands_with_channels; spec->proc != NULL; spec += 1) {
+ if (cmd->proc == spec->proc) {
+ return !!(spec->flags & flags);
+ }
+ }
+ return 0;
+}
+
+/* Return all the arguments that are channels in the command passed via argc / argv.
+ * This function behaves similar to getKeysFromCommandWithSpecs, but with channels
+ * instead of keys.
+ *
+ * The command returns the positions of all the channel arguments inside the array,
+ * so the actual return value is a heap allocated array of integers. The
+ * length of the array is returned by reference into *numkeys.
+ *
+ * Along with the position, this command also returns the flags that are
+ * associated with how Redis will access the channel.
+ *
+ * 'cmd' must be point to the corresponding entry into the redisCommand
+ * table, according to the command name in argv[0]. */
+int getChannelsFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
+ keyReference *keys;
+ /* If a module declares get channels, use that. */
+ if (cmd->flags & CMD_MODULE_GETCHANNELS) {
+ return moduleGetCommandChannelsViaAPI(cmd, argv, argc, result);
+ }
+ /* Otherwise check the channel spec table */
+ for (ChannelSpecs *spec = commands_with_channels; spec != NULL; spec += 1) {
+ if (cmd->proc == spec->proc) {
+ int start = spec->start;
+ int stop = (spec->count == -1) ? argc : start + spec->count;
+ if (stop > argc) stop = argc;
+ int count = 0;
+ keys = getKeysPrepareResult(result, stop - start);
+ for (int i = start; i < stop; i++ ) {
+ keys[count].pos = i;
+ keys[count++].flags = spec->flags;
+ }
+ result->numkeys = count;
+ return count;
+ }
+ }
+ return 0;
+}
+
+/* The base case is to use the keys position as given in the command table
+ * (firstkey, lastkey, step).
+ * This function works only on command with the legacy_range_key_spec,
+ * all other commands should be handled by getkeys_proc.
+ *
+ * If the commands keyspec is incomplete, no keys will be returned, and the provided
+ * keys function should be called instead.
+ *
+ * NOTE: This function does not guarantee populating the flags for
+ * the keys, in order to get flags you should use getKeysUsingKeySpecs. */
+int getKeysUsingLegacyRangeSpec(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
+ int j, i = 0, last, first, step;
+ keyReference *keys;
+ UNUSED(argv);
+
+ if (cmd->legacy_range_key_spec.begin_search_type == KSPEC_BS_INVALID) {
+ result->numkeys = 0;
+ return 0;
+ }
+
+ first = cmd->legacy_range_key_spec.bs.index.pos;
+ last = cmd->legacy_range_key_spec.fk.range.lastkey;
+ if (last >= 0)
+ last += first;
+ step = cmd->legacy_range_key_spec.fk.range.keystep;
+
+ if (last < 0) last = argc+last;
+
+ int count = ((last - first)+1);
+ keys = getKeysPrepareResult(result, count);
+
+ for (j = first; j <= last; j += step) {
+ if (j >= argc || j < first) {
+ /* Modules commands, and standard commands with a not fixed number
+ * of arguments (negative arity parameter) do not have dispatch
+ * time arity checks, so we need to handle the case where the user
+ * passed an invalid number of arguments here. In this case we
+ * return no keys and expect the command implementation to report
+ * an arity or syntax error. */
+ if (cmd->flags & CMD_MODULE || cmd->arity < 0) {
+ result->numkeys = 0;
+ return 0;
+ } else {
+ serverPanic("Redis built-in command declared keys positions not matching the arity requirements.");
+ }
+ }
+ keys[i].pos = j;
+ /* Flags are omitted from legacy key specs */
+ keys[i++].flags = 0;
+ }
+ result->numkeys = i;
+ return i;
+}
+
+/* Return all the arguments that are keys in the command passed via argc / argv.
+ *
+ * The command returns the positions of all the key arguments inside the array,
+ * so the actual return value is a heap allocated array of integers. The
+ * length of the array is returned by reference into *numkeys.
+ *
+ * 'cmd' must be point to the corresponding entry into the redisCommand
+ * table, according to the command name in argv[0].
+ *
+ * This function uses the command table if a command-specific helper function
+ * is not required, otherwise it calls the command-specific function. */
+int getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
+ if (cmd->flags & CMD_MODULE_GETKEYS) {
+ return moduleGetCommandKeysViaAPI(cmd,argv,argc,result);
+ } else if (cmd->getkeys_proc) {
+ return cmd->getkeys_proc(cmd,argv,argc,result);
+ } else {
+ return getKeysUsingLegacyRangeSpec(cmd,argv,argc,result);
+ }
+}
+
+/* Free the result of getKeysFromCommand. */
+void getKeysFreeResult(getKeysResult *result) {
+ if (result && result->keys != result->keysbuf)
+ zfree(result->keys);
+}
+
+/* Helper function to extract keys from following commands:
+ * COMMAND [destkey] <num-keys> <key> [...] <key> [...] ... <options>
+ *
+ * eg:
+ * ZUNION <num-keys> <key> <key> ... <key> <options>
+ * ZUNIONSTORE <destkey> <num-keys> <key> <key> ... <key> <options>
+ *
+ * 'storeKeyOfs': destkey index, 0 means destkey not exists.
+ * 'keyCountOfs': num-keys index.
+ * 'firstKeyOfs': firstkey index.
+ * 'keyStep': the interval of each key, usually this value is 1.
+ *
+ * The commands using this functoin have a fully defined keyspec, so returning flags isn't needed. */
+int genericGetKeys(int storeKeyOfs, int keyCountOfs, int firstKeyOfs, int keyStep,
+ robj **argv, int argc, getKeysResult *result) {
+ int i, num;
+ keyReference *keys;
+
+ num = atoi(argv[keyCountOfs]->ptr);
+ /* Sanity check. Don't return any key if the command is going to
+ * reply with syntax error. (no input keys). */
+ if (num < 1 || num > (argc - firstKeyOfs)/keyStep) {
+ result->numkeys = 0;
+ return 0;
+ }
+
+ int numkeys = storeKeyOfs ? num + 1 : num;
+ keys = getKeysPrepareResult(result, numkeys);
+ result->numkeys = numkeys;
+
+ /* Add all key positions for argv[firstKeyOfs...n] to keys[] */
+ for (i = 0; i < num; i++) {
+ keys[i].pos = firstKeyOfs+(i*keyStep);
+ keys[i].flags = 0;
+ }
+
+ if (storeKeyOfs) {
+ keys[num].pos = storeKeyOfs;
+ keys[num].flags = 0;
+ }
+ return result->numkeys;
+}
+
+int sintercardGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
+ UNUSED(cmd);
+ return genericGetKeys(0, 1, 2, 1, argv, argc, result);
+}
+
+int zunionInterDiffStoreGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
+ UNUSED(cmd);
+ return genericGetKeys(1, 2, 3, 1, argv, argc, result);
+}
+
+int zunionInterDiffGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
+ UNUSED(cmd);
+ return genericGetKeys(0, 1, 2, 1, argv, argc, result);
+}
+
+int evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
+ UNUSED(cmd);
+ return genericGetKeys(0, 2, 3, 1, argv, argc, result);
+}
+
+int functionGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
+ UNUSED(cmd);
+ return genericGetKeys(0, 2, 3, 1, argv, argc, result);
+}
+
+int lmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
+ UNUSED(cmd);
+ return genericGetKeys(0, 1, 2, 1, argv, argc, result);
+}
+
+int blmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
+ UNUSED(cmd);
+ return genericGetKeys(0, 2, 3, 1, argv, argc, result);
+}
+
+int zmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
+ UNUSED(cmd);
+ return genericGetKeys(0, 1, 2, 1, argv, argc, result);
+}
+
+int bzmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
+ UNUSED(cmd);
+ return genericGetKeys(0, 2, 3, 1, argv, argc, result);
+}
+
+/* Helper function to extract keys from the SORT RO command.
+ *
+ * SORT <sort-key>
+ *
+ * The second argument of SORT is always a key, however an arbitrary number of
+ * keys may be accessed while doing the sort (the BY and GET args), so the
+ * key-spec declares incomplete keys which is why we have to provide a concrete
+ * implementation to fetch the keys.
+ *
+ * This command declares incomplete keys, so the flags are correctly set for this function */
+int sortROGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
+ keyReference *keys;
+ UNUSED(cmd);
+ UNUSED(argv);
+ UNUSED(argc);
+
+ keys = getKeysPrepareResult(result, 1);
+ keys[0].pos = 1; /* <sort-key> is always present. */
+ keys[0].flags = CMD_KEY_RO | CMD_KEY_ACCESS;
+ result->numkeys = 1;
+ return result->numkeys;
+}
+
+/* Helper function to extract keys from the SORT command.
+ *
+ * SORT <sort-key> ... STORE <store-key> ...
+ *
+ * The first argument of SORT is always a key, however a list of options
+ * follow in SQL-alike style. Here we parse just the minimum in order to
+ * correctly identify keys in the "STORE" option.
+ *
+ * This command declares incomplete keys, so the flags are correctly set for this function */
+int sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
+ int i, j, num, found_store = 0;
+ keyReference *keys;
+ UNUSED(cmd);
+
+ num = 0;
+ keys = getKeysPrepareResult(result, 2); /* Alloc 2 places for the worst case. */
+ keys[num].pos = 1; /* <sort-key> is always present. */
+ keys[num++].flags = CMD_KEY_RO | CMD_KEY_ACCESS;
+
+ /* Search for STORE option. By default we consider options to don't
+ * have arguments, so if we find an unknown option name we scan the
+ * next. However there are options with 1 or 2 arguments, so we
+ * provide a list here in order to skip the right number of args. */
+ struct {
+ char *name;
+ int skip;
+ } skiplist[] = {
+ {"limit", 2},
+ {"get", 1},
+ {"by", 1},
+ {NULL, 0} /* End of elements. */
+ };
+
+ for (i = 2; i < argc; i++) {
+ for (j = 0; skiplist[j].name != NULL; j++) {
+ if (!strcasecmp(argv[i]->ptr,skiplist[j].name)) {
+ i += skiplist[j].skip;
+ break;
+ } else if (!strcasecmp(argv[i]->ptr,"store") && i+1 < argc) {
+ /* Note: we don't increment "num" here and continue the loop
+ * to be sure to process the *last* "STORE" option if multiple
+ * ones are provided. This is same behavior as SORT. */
+ found_store = 1;
+ keys[num].pos = i+1; /* <store-key> */
+ keys[num].flags = CMD_KEY_OW | CMD_KEY_UPDATE;
+ break;
+ }
+ }
+ }
+ result->numkeys = num + found_store;
+ return result->numkeys;
+}
+
+/* This command declares incomplete keys, so the flags are correctly set for this function */
+int migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
+ int i, j, num, first;
+ keyReference *keys;
+ UNUSED(cmd);
+
+ /* Assume the obvious form. */
+ first = 3;
+ num = 1;
+
+ /* But check for the extended one with the KEYS option. */
+ struct {
+ char* name;
+ int skip;
+ } skip_keywords[] = {
+ {"copy", 0},
+ {"replace", 0},
+ {"auth", 1},
+ {"auth2", 2},
+ {NULL, 0}
+ };
+ if (argc > 6) {
+ for (i = 6; i < argc; i++) {
+ if (!strcasecmp(argv[i]->ptr, "keys")) {
+ if (sdslen(argv[3]->ptr) > 0) {
+ /* This is a syntax error. So ignore the keys and leave
+ * the syntax error to be handled by migrateCommand. */
+ num = 0;
+ } else {
+ first = i + 1;
+ num = argc - first;
+ }
+ break;
+ }
+ for (j = 0; skip_keywords[j].name != NULL; j++) {
+ if (!strcasecmp(argv[i]->ptr, skip_keywords[j].name)) {
+ i += skip_keywords[j].skip;
+ break;
+ }
+ }
+ }
+ }
+
+ keys = getKeysPrepareResult(result, num);
+ for (i = 0; i < num; i++) {
+ keys[i].pos = first+i;
+ keys[i].flags = CMD_KEY_RW | CMD_KEY_ACCESS | CMD_KEY_DELETE;
+ }
+ result->numkeys = num;
+ return num;
+}
+
+/* Helper function to extract keys from following commands:
+ * GEORADIUS key x y radius unit [WITHDIST] [WITHHASH] [WITHCOORD] [ASC|DESC]
+ * [COUNT count] [STORE key] [STOREDIST key]
+ * GEORADIUSBYMEMBER key member radius unit ... options ...
+ *
+ * This command has a fully defined keyspec, so returning flags isn't needed. */
+int georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
+ int i, num;
+ keyReference *keys;
+ UNUSED(cmd);
+
+ /* Check for the presence of the stored key in the command */
+ int stored_key = -1;
+ for (i = 5; i < argc; i++) {
+ char *arg = argv[i]->ptr;
+ /* For the case when user specifies both "store" and "storedist" options, the
+ * second key specified would override the first key. This behavior is kept
+ * the same as in georadiusCommand method.
+ */
+ if ((!strcasecmp(arg, "store") || !strcasecmp(arg, "storedist")) && ((i+1) < argc)) {
+ stored_key = i+1;
+ i++;
+ }
+ }
+ num = 1 + (stored_key == -1 ? 0 : 1);
+
+ /* Keys in the command come from two places:
+ * argv[1] = key,
+ * argv[5...n] = stored key if present
+ */
+ keys = getKeysPrepareResult(result, num);
+
+ /* Add all key positions to keys[] */
+ keys[0].pos = 1;
+ keys[0].flags = 0;
+ if(num > 1) {
+ keys[1].pos = stored_key;
+ keys[1].flags = 0;
+ }
+ result->numkeys = num;
+ return num;
+}
+
+/* XREAD [BLOCK <milliseconds>] [COUNT <count>] [GROUP <groupname> <ttl>]
+ * STREAMS key_1 key_2 ... key_N ID_1 ID_2 ... ID_N
+ *
+ * This command has a fully defined keyspec, so returning flags isn't needed. */
+int xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
+ int i, num = 0;
+ keyReference *keys;
+ UNUSED(cmd);
+
+ /* We need to parse the options of the command in order to seek the first
+ * "STREAMS" string which is actually the option. This is needed because
+ * "STREAMS" could also be the name of the consumer group and even the
+ * name of the stream key. */
+ int streams_pos = -1;
+ for (i = 1; i < argc; i++) {
+ char *arg = argv[i]->ptr;
+ if (!strcasecmp(arg, "block")) {
+ i++; /* Skip option argument. */
+ } else if (!strcasecmp(arg, "count")) {
+ i++; /* Skip option argument. */
+ } else if (!strcasecmp(arg, "group")) {
+ i += 2; /* Skip option argument. */
+ } else if (!strcasecmp(arg, "noack")) {
+ /* Nothing to do. */
+ } else if (!strcasecmp(arg, "streams")) {
+ streams_pos = i;
+ break;
+ } else {
+ break; /* Syntax error. */
+ }
+ }
+ if (streams_pos != -1) num = argc - streams_pos - 1;
+
+ /* Syntax error. */
+ if (streams_pos == -1 || num == 0 || num % 2 != 0) {
+ result->numkeys = 0;
+ return 0;
+ }
+ num /= 2; /* We have half the keys as there are arguments because
+ there are also the IDs, one per key. */
+
+ keys = getKeysPrepareResult(result, num);
+ for (i = streams_pos+1; i < argc-num; i++) {
+ keys[i-streams_pos-1].pos = i;
+ keys[i-streams_pos-1].flags = 0;
+ }
+ result->numkeys = num;
+ return num;
+}
+
+/* Helper function to extract keys from the SET command, which may have
+ * a read flag if the GET argument is passed in. */
+int setGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
+ keyReference *keys;
+ UNUSED(cmd);
+
+ keys = getKeysPrepareResult(result, 1);
+ keys[0].pos = 1; /* We always know the position */
+ result->numkeys = 1;
+
+ for (int i = 3; i < argc; i++) {
+ char *arg = argv[i]->ptr;
+ if ((arg[0] == 'g' || arg[0] == 'G') &&
+ (arg[1] == 'e' || arg[1] == 'E') &&
+ (arg[2] == 't' || arg[2] == 'T') && arg[3] == '\0')
+ {
+ keys[0].flags = CMD_KEY_RW | CMD_KEY_ACCESS | CMD_KEY_UPDATE;
+ return 1;
+ }
+ }
+
+ keys[0].flags = CMD_KEY_OW | CMD_KEY_UPDATE;
+ return 1;
+}
+
+/* Helper function to extract keys from the BITFIELD command, which may be
+ * read-only if the BITFIELD GET subcommand is used. */
+int bitfieldGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
+ keyReference *keys;
+ int readonly = 1;
+ UNUSED(cmd);
+
+ keys = getKeysPrepareResult(result, 1);
+ keys[0].pos = 1; /* We always know the position */
+ result->numkeys = 1;
+
+ for (int i = 2; i < argc; i++) {
+ int remargs = argc - i - 1; /* Remaining args other than current. */
+ char *arg = argv[i]->ptr;
+ if (!strcasecmp(arg, "get") && remargs >= 2) {
+ i += 2;
+ } else if ((!strcasecmp(arg, "set") || !strcasecmp(arg, "incrby")) && remargs >= 3) {
+ readonly = 0;
+ i += 3;
+ break;
+ } else if (!strcasecmp(arg, "overflow") && remargs >= 1) {
+ i += 1;
+ } else {
+ readonly = 0; /* Syntax error. safer to assume non-RO. */
+ break;
+ }
+ }
+
+ if (readonly) {
+ keys[0].flags = CMD_KEY_RO | CMD_KEY_ACCESS;
+ } else {
+ keys[0].flags = CMD_KEY_RW | CMD_KEY_ACCESS | CMD_KEY_UPDATE;
+ }
+ return 1;
+}