summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/aof.c4
-rw-r--r--src/blocked.c8
-rw-r--r--src/cluster.c9
-rw-r--r--src/module.c9
-rw-r--r--src/quicklist.c82
-rw-r--r--src/rdb.h5
-rw-r--r--src/redis-check-aof.c10
-rw-r--r--src/redis-check-rdb.c2
-rw-r--r--src/redis-cli.c3
-rw-r--r--src/script_lua.c13
-rw-r--r--src/server.c10
-rw-r--r--src/server.h1
-rw-r--r--src/t_zset.c5
-rw-r--r--src/version.h4
14 files changed, 124 insertions, 41 deletions
diff --git a/src/aof.c b/src/aof.c
index a89142b..6da6e1a 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -117,7 +117,9 @@ aofInfo *aofInfoDup(aofInfo *orig) {
return ai;
}
-/* Format aofInfo as a string and it will be a line in the manifest. */
+/* Format aofInfo as a string and it will be a line in the manifest.
+ *
+ * When update this format, make sure to update redis-check-aof as well. */
sds aofInfoFormat(sds buf, aofInfo *ai) {
sds filename_repr = NULL;
diff --git a/src/blocked.c b/src/blocked.c
index 6ad4667..7b48fca 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -370,7 +370,12 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
list *l;
int j;
- c->bstate.timeout = timeout;
+ if (!(c->flags & CLIENT_REPROCESSING_COMMAND)) {
+ /* If the client is re-processing the command, we do not set the timeout
+ * because we need to retain the client's original timeout. */
+ c->bstate.timeout = timeout;
+ }
+
for (j = 0; j < numkeys; j++) {
/* If the key already exists in the dictionary ignore it. */
if (!(client_blocked_entry = dictAddRaw(c->bstate.keys,keys[j],NULL))) {
@@ -392,7 +397,6 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
listAddNodeTail(l,c);
dictSetVal(c->bstate.keys,client_blocked_entry,listLast(l));
-
/* We need to add the key to blocking_keys_unblock_on_nokey, if the client
* wants to be awakened if key is deleted (like XREADGROUP) */
if (unblock_on_nokey) {
diff --git a/src/cluster.c b/src/cluster.c
index c985d0b..637837c 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -988,7 +988,7 @@ void clusterInit(void) {
server.cluster->myself = NULL;
server.cluster->currentEpoch = 0;
server.cluster->state = CLUSTER_FAIL;
- server.cluster->size = 1;
+ server.cluster->size = 0;
server.cluster->todo_before_sleep = 0;
server.cluster->nodes = dictCreate(&clusterNodesDictType);
server.cluster->shards = dictCreate(&clusterSdsToListType);
@@ -4771,10 +4771,13 @@ void clusterCron(void) {
/* Timeout reached. Set the node as possibly failing if it is
* not already in this state. */
if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {
- serverLog(LL_DEBUG,"*** NODE %.40s possibly failing",
- node->name);
node->flags |= CLUSTER_NODE_PFAIL;
update_state = 1;
+ if (myself->flags & CLUSTER_NODE_MASTER && server.cluster->size == 1) {
+ markNodeAsFailingIfNeeded(node);
+ } else {
+ serverLog(LL_DEBUG,"*** NODE %.40s possibly failing", node->name);
+ }
}
}
}
diff --git a/src/module.c b/src/module.c
index ac6cbbb..1cb418c 100644
--- a/src/module.c
+++ b/src/module.c
@@ -7706,15 +7706,15 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
bc->background_timer = 0;
bc->background_duration = 0;
- c->bstate.timeout = 0;
+ mstime_t timeout = 0;
if (timeout_ms) {
mstime_t now = mstime();
- if (timeout_ms > LLONG_MAX - now) {
+ if (timeout_ms > LLONG_MAX - now) {
c->bstate.module_blocked_handle = NULL;
addReplyError(c, "timeout is out of range"); /* 'timeout_ms+now' would overflow */
return bc;
}
- c->bstate.timeout = timeout_ms + now;
+ timeout = timeout_ms + now;
}
if (islua || ismulti) {
@@ -7730,8 +7730,9 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
addReplyError(c, "Clients undergoing module based authentication can only be blocked on auth");
} else {
if (keys) {
- blockForKeys(c,BLOCKED_MODULE,keys,numkeys,c->bstate.timeout,flags&REDISMODULE_BLOCK_UNBLOCK_DELETED);
+ blockForKeys(c,BLOCKED_MODULE,keys,numkeys,timeout,flags&REDISMODULE_BLOCK_UNBLOCK_DELETED);
} else {
+ c->bstate.timeout = timeout;
blockClient(c,BLOCKED_MODULE);
}
}
diff --git a/src/quicklist.c b/src/quicklist.c
index 301a216..3b11878 100644
--- a/src/quicklist.c
+++ b/src/quicklist.c
@@ -104,6 +104,9 @@ quicklistBookmark *_quicklistBookmarkFindByName(quicklist *ql, const char *name)
quicklistBookmark *_quicklistBookmarkFindByNode(quicklist *ql, quicklistNode *node);
void _quicklistBookmarkDelete(quicklist *ql, quicklistBookmark *bm);
+quicklistNode *_quicklistSplitNode(quicklistNode *node, int offset, int after);
+quicklistNode *_quicklistMergeNodes(quicklist *quicklist, quicklistNode *center);
+
/* Simple way to give quicklistEntry structs default values with one call. */
#define initEntry(e) \
do { \
@@ -378,6 +381,15 @@ REDIS_STATIC void __quicklistCompress(const quicklist *quicklist,
quicklistCompressNode(reverse);
}
+/* This macro is used to compress a node.
+ *
+ * If the 'recompress' flag of the node is true, we compress it directly without
+ * checking whether it is within the range of compress depth.
+ * However, it's important to ensure that the 'recompress' flag of head and tail
+ * is always false, as we always assume that head and tail are not compressed.
+ *
+ * If the 'recompress' flag of the node is false, we check whether the node is
+ * within the range of compress depth before compressing it. */
#define quicklistCompress(_ql, _node) \
do { \
if ((_node)->recompress) \
@@ -529,19 +541,25 @@ REDIS_STATIC int _quicklistNodeAllowMerge(const quicklistNode *a,
(node)->sz = lpBytes((node)->entry); \
} while (0)
-static quicklistNode* __quicklistCreatePlainNode(void *value, size_t sz) {
+static quicklistNode* __quicklistCreateNode(int container, void *value, size_t sz) {
quicklistNode *new_node = quicklistCreateNode();
- new_node->entry = zmalloc(sz);
- new_node->container = QUICKLIST_NODE_CONTAINER_PLAIN;
- memcpy(new_node->entry, value, sz);
+ new_node->container = container;
+ if (container == QUICKLIST_NODE_CONTAINER_PLAIN) {
+ new_node->entry = zmalloc(sz);
+ memcpy(new_node->entry, value, sz);
+ } else {
+ new_node->entry = lpPrepend(lpNew(0), value, sz);
+ }
new_node->sz = sz;
new_node->count++;
return new_node;
}
static void __quicklistInsertPlainNode(quicklist *quicklist, quicklistNode *old_node,
- void *value, size_t sz, int after) {
- __quicklistInsertNode(quicklist, old_node, __quicklistCreatePlainNode(value, sz), after);
+ void *value, size_t sz, int after)
+{
+ quicklistNode *new_node = __quicklistCreateNode(QUICKLIST_NODE_CONTAINER_PLAIN, value, sz);
+ __quicklistInsertNode(quicklist, old_node, new_node, after);
quicklist->count++;
}
@@ -741,9 +759,13 @@ void quicklistReplaceEntry(quicklistIter *iter, quicklistEntry *entry,
void *data, size_t sz)
{
quicklist* quicklist = iter->quicklist;
+ quicklistNode *node = entry->node;
+ unsigned char *newentry;
- if (likely(!QL_NODE_IS_PLAIN(entry->node) && !isLargeElement(sz))) {
- entry->node->entry = lpReplace(entry->node->entry, &entry->zi, data, sz);
+ if (likely(!QL_NODE_IS_PLAIN(entry->node) && !isLargeElement(sz) &&
+ (newentry = lpReplace(entry->node->entry, &entry->zi, data, sz)) != NULL))
+ {
+ entry->node->entry = newentry;
quicklistNodeUpdateSz(entry->node);
/* quicklistNext() and quicklistGetIteratorEntryAtIdx() provide an uncompressed node */
quicklistCompress(quicklist, entry->node);
@@ -758,17 +780,37 @@ void quicklistReplaceEntry(quicklistIter *iter, quicklistEntry *entry,
quicklistInsertAfter(iter, entry, data, sz);
__quicklistDelNode(quicklist, entry->node);
}
- } else {
- entry->node->dont_compress = 1; /* Prevent compression in quicklistInsertAfter() */
- quicklistInsertAfter(iter, entry, data, sz);
+ } else { /* The node is full or data is a large element */
+ quicklistNode *split_node = NULL, *new_node;
+ node->dont_compress = 1; /* Prevent compression in __quicklistInsertNode() */
+
+ /* If the entry is not at the tail, split the node at the entry's offset. */
+ if (entry->offset != node->count - 1 && entry->offset != -1)
+ split_node = _quicklistSplitNode(node, entry->offset, 1);
+
+ /* Create a new node and insert it after the original node.
+ * If the original node was split, insert the split node after the new node. */
+ new_node = __quicklistCreateNode(isLargeElement(sz) ?
+ QUICKLIST_NODE_CONTAINER_PLAIN : QUICKLIST_NODE_CONTAINER_PACKED, data, sz);
+ __quicklistInsertNode(quicklist, node, new_node, 1);
+ if (split_node) __quicklistInsertNode(quicklist, new_node, split_node, 1);
+ quicklist->count++;
+
+ /* Delete the replaced element. */
if (entry->node->count == 1) {
__quicklistDelNode(quicklist, entry->node);
} else {
unsigned char *p = lpSeek(entry->node->entry, -1);
quicklistDelIndex(quicklist, entry->node, &p);
entry->node->dont_compress = 0; /* Re-enable compression */
- quicklistCompress(quicklist, entry->node);
- quicklistCompress(quicklist, entry->node->next);
+ new_node = _quicklistMergeNodes(quicklist, new_node);
+ /* We can't know if the current node and its sibling nodes are correctly compressed,
+ * and we don't know if they are within the range of compress depth, so we need to
+ * use quicklistCompress() for compression, which checks if node is within compress
+ * depth before compressing. */
+ quicklistCompress(quicklist, new_node);
+ quicklistCompress(quicklist, new_node->prev);
+ if (new_node->next) quicklistCompress(quicklist, new_node->next);
}
}
@@ -826,6 +868,8 @@ REDIS_STATIC quicklistNode *_quicklistListpackMerge(quicklist *quicklist,
}
keep->count = lpLength(keep->entry);
quicklistNodeUpdateSz(keep);
+ keep->recompress = 0; /* Prevent 'keep' from being recompressed if
+ * it becomes head or tail after merging. */
nokeep->count = 0;
__quicklistDelNode(quicklist, nokeep);
@@ -844,9 +888,10 @@ REDIS_STATIC quicklistNode *_quicklistListpackMerge(quicklist *quicklist,
* - (center->next, center->next->next)
* - (center->prev, center)
* - (center, center->next)
+ *
+ * Returns the new 'center' after merging.
*/
-REDIS_STATIC void _quicklistMergeNodes(quicklist *quicklist,
- quicklistNode *center) {
+REDIS_STATIC quicklistNode *_quicklistMergeNodes(quicklist *quicklist, quicklistNode *center) {
int fill = quicklist->fill;
quicklistNode *prev, *prev_prev, *next, *next_next, *target;
prev = prev_prev = next = next_next = target = NULL;
@@ -886,8 +931,9 @@ REDIS_STATIC void _quicklistMergeNodes(quicklist *quicklist,
/* Use result of center merge (or original) to merge with next node. */
if (_quicklistNodeAllowMerge(target, target->next, fill)) {
- _quicklistListpackMerge(quicklist, target, target->next);
+ target = _quicklistListpackMerge(quicklist, target, target->next);
}
+ return target;
}
/* Split 'node' into two parts, parameterized by 'offset' and 'after'.
@@ -1002,7 +1048,7 @@ REDIS_STATIC void _quicklistInsert(quicklistIter *iter, quicklistEntry *entry,
} else {
quicklistDecompressNodeForUse(node);
new_node = _quicklistSplitNode(node, entry->offset, after);
- quicklistNode *entry_node = __quicklistCreatePlainNode(value, sz);
+ quicklistNode *entry_node = __quicklistCreateNode(QUICKLIST_NODE_CONTAINER_PLAIN, value, sz);
__quicklistInsertNode(quicklist, node, entry_node, after);
__quicklistInsertNode(quicklist, entry_node, new_node, after);
quicklist->count++;
@@ -3224,7 +3270,7 @@ int quicklistTest(int argc, char *argv[], int flags) {
memcpy(s, "helloworld", 10);
memcpy(s + sz - 10, "1234567890", 10);
- quicklistNode *node = __quicklistCreatePlainNode(s, sz);
+ quicklistNode *node = __quicklistCreateNode(QUICKLIST_NODE_CONTAINER_PLAIN, s, sz);
/* Just to avoid triggering the assertion in __quicklistCompressNode(),
* it disables the passing of quicklist head or tail node. */
diff --git a/src/rdb.h b/src/rdb.h
index 234bde2..a47ea2b 100644
--- a/src/rdb.h
+++ b/src/rdb.h
@@ -81,9 +81,6 @@
#define RDB_TYPE_MODULE_PRE_GA 6 /* Used in 4.0 release candidates */
#define RDB_TYPE_MODULE_2 7 /* Module value with annotations for parsing without
the generating module being loaded. */
-/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */
-
-/* Object types for encoded objects. */
#define RDB_TYPE_HASH_ZIPMAP 9
#define RDB_TYPE_LIST_ZIPLIST 10
#define RDB_TYPE_SET_INTSET 11
@@ -97,7 +94,7 @@
#define RDB_TYPE_STREAM_LISTPACKS_2 19
#define RDB_TYPE_SET_LISTPACK 20
#define RDB_TYPE_STREAM_LISTPACKS_3 21
-/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */
+/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType(), and rdb_type_string[] */
/* Test if a type is an object type. */
#define rdbIsObjectType(t) (((t) >= 0 && (t) <= 7) || ((t) >= 9 && (t) <= 21))
diff --git a/src/redis-check-aof.c b/src/redis-check-aof.c
index 616177a..e28126d 100644
--- a/src/redis-check-aof.c
+++ b/src/redis-check-aof.c
@@ -233,6 +233,7 @@ int checkSingleAof(char *aof_filename, char *aof_filepath, int last_file, int fi
struct redis_stat sb;
if (redis_fstat(fileno(fp),&sb) == -1) {
printf("Cannot stat file: %s, aborting...\n", aof_filename);
+ fclose(fp);
exit(1);
}
@@ -343,6 +344,7 @@ int fileIsRDB(char *filepath) {
struct redis_stat sb;
if (redis_fstat(fileno(fp), &sb) == -1) {
printf("Cannot stat file: %s\n", filepath);
+ fclose(fp);
exit(1);
}
@@ -379,6 +381,7 @@ int fileIsManifest(char *filepath) {
struct redis_stat sb;
if (redis_fstat(fileno(fp), &sb) == -1) {
printf("Cannot stat file: %s\n", filepath);
+ fclose(fp);
exit(1);
}
@@ -395,15 +398,20 @@ int fileIsManifest(char *filepath) {
break;
} else {
printf("Cannot read file: %s\n", filepath);
+ fclose(fp);
exit(1);
}
}
- /* Skip comments lines */
+ /* We will skip comments lines.
+ * At present, the manifest format is fixed, see aofInfoFormat.
+ * We will break directly as long as it encounters other items. */
if (buf[0] == '#') {
continue;
} else if (!memcmp(buf, "file", strlen("file"))) {
is_manifest = 1;
+ } else {
+ break;
}
}
diff --git a/src/redis-check-rdb.c b/src/redis-check-rdb.c
index 682135e..ffa05e8 100644
--- a/src/redis-check-rdb.c
+++ b/src/redis-check-rdb.c
@@ -98,7 +98,9 @@ char *rdb_type_string[] = {
"hash-listpack",
"zset-listpack",
"quicklist-v2",
+ "stream-v2",
"set-listpack",
+ "stream-v3",
};
/* Show a few stats collected into 'rdbstate' */
diff --git a/src/redis-cli.c b/src/redis-cli.c
index 3854701..b5c2736 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -8839,7 +8839,8 @@ static redisReply *sendScan(unsigned long long *it) {
reply = redisCommand(context, "SCAN %llu MATCH %b COUNT %d",
*it, config.pattern, sdslen(config.pattern), config.count);
else
- reply = redisCommand(context,"SCAN %llu",*it);
+ reply = redisCommand(context, "SCAN %llu COUNT %d",
+ *it, config.count);
/* Handle any error conditions */
if(reply == NULL) {
diff --git a/src/script_lua.c b/src/script_lua.c
index 8cdd805..2c92638 100644
--- a/src/script_lua.c
+++ b/src/script_lua.c
@@ -818,8 +818,17 @@ static robj **luaArgsToRedisArgv(lua_State *lua, int *argc, int *argv_len) {
/* We can't use lua_tolstring() for number -> string conversion
* since Lua uses a format specifier that loses precision. */
lua_Number num = lua_tonumber(lua,j+1);
- obj_len = fpconv_dtoa((double)num, dbuf);
- dbuf[obj_len] = '\0';
+ /* Integer printing function is much faster, check if we can safely use it.
+ * Since lua_Number is not explicitly an integer or a double, we need to make an effort
+ * to convert it as an integer when that's possible, since the string could later be used
+ * in a context that doesn't support scientific notation (e.g. 1e9 instead of 100000000). */
+ long long lvalue;
+ if (double2ll((double)num, &lvalue))
+ obj_len = ll2string(dbuf, sizeof(dbuf), lvalue);
+ else {
+ obj_len = fpconv_dtoa((double)num, dbuf);
+ dbuf[obj_len] = '\0';
+ }
obj_s = dbuf;
} else {
obj_s = (char*)lua_tolstring(lua,j+1,&obj_len);
diff --git a/src/server.c b/src/server.c
index 438325f..4d47b5e 100644
--- a/src/server.c
+++ b/src/server.c
@@ -3512,12 +3512,20 @@ void call(client *c, int flags) {
* re-processing and unblock the client.*/
c->flags |= CLIENT_EXECUTING_COMMAND;
+ /* Setting the CLIENT_REPROCESSING_COMMAND flag so that during the actual
+ * processing of the command proc, the client is aware that it is being
+ * re-processed. */
+ if (reprocessing_command) c->flags |= CLIENT_REPROCESSING_COMMAND;
+
monotime monotonic_start = 0;
if (monotonicGetType() == MONOTONIC_CLOCK_HW)
monotonic_start = getMonotonicUs();
c->cmd->proc(c);
+ /* Clear the CLIENT_REPROCESSING_COMMAND flag after the proc is executed. */
+ if (reprocessing_command) c->flags &= ~CLIENT_REPROCESSING_COMMAND;
+
exitExecutionUnit();
/* In case client is blocked after trying to execute the command,
@@ -3575,7 +3583,7 @@ void call(client *c, int flags) {
/* Send the command to clients in MONITOR mode if applicable,
* since some administrative commands are considered too dangerous to be shown.
- * Other exceptions is a client which is unblocked and retring to process the command
+ * Other exceptions is a client which is unblocked and retrying to process the command
* or we are currently in the process of loading AOF. */
if (update_command_stats && !reprocessing_command &&
!(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN))) {
diff --git a/src/server.h b/src/server.h
index cb55503..b1fa542 100644
--- a/src/server.h
+++ b/src/server.h
@@ -400,6 +400,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
auth had been authenticated from the Module. */
#define CLIENT_MODULE_PREVENT_AOF_PROP (1ULL<<48) /* Module client do not want to propagate to AOF */
#define CLIENT_MODULE_PREVENT_REPL_PROP (1ULL<<49) /* Module client do not want to propagate to replica */
+#define CLIENT_REPROCESSING_COMMAND (1ULL<<50) /* The client is re-processing the command. */
/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
diff --git a/src/t_zset.c b/src/t_zset.c
index 7717a4a..1b267e0 100644
--- a/src/t_zset.c
+++ b/src/t_zset.c
@@ -1172,7 +1172,8 @@ unsigned long zsetLength(const robj *zobj) {
* and the value len hint indicates the approximate individual size of the added elements,
* they are used to determine the initial representation.
*
- * If the hints are not known, and underestimation or 0 is suitable. */
+ * If the hints are not known, and underestimation or 0 is suitable.
+ * We should never pass a negative value because it will convert to a very large unsigned number. */
robj *zsetTypeCreate(size_t size_hint, size_t val_len_hint) {
if (size_hint <= server.zset_max_listpack_entries &&
val_len_hint <= server.zset_max_listpack_value)
@@ -3001,7 +3002,7 @@ static void zrangeResultFinalizeClient(zrange_result_handler *handler,
/* Result handler methods for storing the ZRANGESTORE to a zset. */
static void zrangeResultBeginStore(zrange_result_handler *handler, long length)
{
- handler->dstobj = zsetTypeCreate(length, 0);
+ handler->dstobj = zsetTypeCreate(length >= 0 ? length : 0, 0);
}
static void zrangeResultEmitCBufferForStore(zrange_result_handler *handler,
diff --git a/src/version.h b/src/version.h
index 7c6eea6..0b22cd0 100644
--- a/src/version.h
+++ b/src/version.h
@@ -1,2 +1,2 @@
-#define REDIS_VERSION "7.2.4"
-#define REDIS_VERSION_NUM 0x00070204
+#define REDIS_VERSION "7.2.5"
+#define REDIS_VERSION_NUM 0x00070205