diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/aof.c | 4 | ||||
-rw-r--r-- | src/blocked.c | 8 | ||||
-rw-r--r-- | src/cluster.c | 9 | ||||
-rw-r--r-- | src/module.c | 9 | ||||
-rw-r--r-- | src/quicklist.c | 82 | ||||
-rw-r--r-- | src/rdb.h | 5 | ||||
-rw-r--r-- | src/redis-check-aof.c | 10 | ||||
-rw-r--r-- | src/redis-check-rdb.c | 2 | ||||
-rw-r--r-- | src/redis-cli.c | 3 | ||||
-rw-r--r-- | src/script_lua.c | 13 | ||||
-rw-r--r-- | src/server.c | 10 | ||||
-rw-r--r-- | src/server.h | 1 | ||||
-rw-r--r-- | src/t_zset.c | 5 | ||||
-rw-r--r-- | src/version.h | 4 |
14 files changed, 124 insertions, 41 deletions
@@ -117,7 +117,9 @@ aofInfo *aofInfoDup(aofInfo *orig) { return ai; } -/* Format aofInfo as a string and it will be a line in the manifest. */ +/* Format aofInfo as a string and it will be a line in the manifest. + * + * When update this format, make sure to update redis-check-aof as well. */ sds aofInfoFormat(sds buf, aofInfo *ai) { sds filename_repr = NULL; diff --git a/src/blocked.c b/src/blocked.c index 6ad4667..7b48fca 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -370,7 +370,12 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo list *l; int j; - c->bstate.timeout = timeout; + if (!(c->flags & CLIENT_REPROCESSING_COMMAND)) { + /* If the client is re-processing the command, we do not set the timeout + * because we need to retain the client's original timeout. */ + c->bstate.timeout = timeout; + } + for (j = 0; j < numkeys; j++) { /* If the key already exists in the dictionary ignore it. */ if (!(client_blocked_entry = dictAddRaw(c->bstate.keys,keys[j],NULL))) { @@ -392,7 +397,6 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo listAddNodeTail(l,c); dictSetVal(c->bstate.keys,client_blocked_entry,listLast(l)); - /* We need to add the key to blocking_keys_unblock_on_nokey, if the client * wants to be awakened if key is deleted (like XREADGROUP) */ if (unblock_on_nokey) { diff --git a/src/cluster.c b/src/cluster.c index c985d0b..637837c 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -988,7 +988,7 @@ void clusterInit(void) { server.cluster->myself = NULL; server.cluster->currentEpoch = 0; server.cluster->state = CLUSTER_FAIL; - server.cluster->size = 1; + server.cluster->size = 0; server.cluster->todo_before_sleep = 0; server.cluster->nodes = dictCreate(&clusterNodesDictType); server.cluster->shards = dictCreate(&clusterSdsToListType); @@ -4771,10 +4771,13 @@ void clusterCron(void) { /* Timeout reached. Set the node as possibly failing if it is * not already in this state. */ if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) { - serverLog(LL_DEBUG,"*** NODE %.40s possibly failing", - node->name); node->flags |= CLUSTER_NODE_PFAIL; update_state = 1; + if (myself->flags & CLUSTER_NODE_MASTER && server.cluster->size == 1) { + markNodeAsFailingIfNeeded(node); + } else { + serverLog(LL_DEBUG,"*** NODE %.40s possibly failing", node->name); + } } } } diff --git a/src/module.c b/src/module.c index ac6cbbb..1cb418c 100644 --- a/src/module.c +++ b/src/module.c @@ -7706,15 +7706,15 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF bc->background_timer = 0; bc->background_duration = 0; - c->bstate.timeout = 0; + mstime_t timeout = 0; if (timeout_ms) { mstime_t now = mstime(); - if (timeout_ms > LLONG_MAX - now) { + if (timeout_ms > LLONG_MAX - now) { c->bstate.module_blocked_handle = NULL; addReplyError(c, "timeout is out of range"); /* 'timeout_ms+now' would overflow */ return bc; } - c->bstate.timeout = timeout_ms + now; + timeout = timeout_ms + now; } if (islua || ismulti) { @@ -7730,8 +7730,9 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF addReplyError(c, "Clients undergoing module based authentication can only be blocked on auth"); } else { if (keys) { - blockForKeys(c,BLOCKED_MODULE,keys,numkeys,c->bstate.timeout,flags&REDISMODULE_BLOCK_UNBLOCK_DELETED); + blockForKeys(c,BLOCKED_MODULE,keys,numkeys,timeout,flags&REDISMODULE_BLOCK_UNBLOCK_DELETED); } else { + c->bstate.timeout = timeout; blockClient(c,BLOCKED_MODULE); } } diff --git a/src/quicklist.c b/src/quicklist.c index 301a216..3b11878 100644 --- a/src/quicklist.c +++ b/src/quicklist.c @@ -104,6 +104,9 @@ quicklistBookmark *_quicklistBookmarkFindByName(quicklist *ql, const char *name) quicklistBookmark *_quicklistBookmarkFindByNode(quicklist *ql, quicklistNode *node); void _quicklistBookmarkDelete(quicklist *ql, quicklistBookmark *bm); +quicklistNode *_quicklistSplitNode(quicklistNode *node, int offset, int after); +quicklistNode *_quicklistMergeNodes(quicklist *quicklist, quicklistNode *center); + /* Simple way to give quicklistEntry structs default values with one call. */ #define initEntry(e) \ do { \ @@ -378,6 +381,15 @@ REDIS_STATIC void __quicklistCompress(const quicklist *quicklist, quicklistCompressNode(reverse); } +/* This macro is used to compress a node. + * + * If the 'recompress' flag of the node is true, we compress it directly without + * checking whether it is within the range of compress depth. + * However, it's important to ensure that the 'recompress' flag of head and tail + * is always false, as we always assume that head and tail are not compressed. + * + * If the 'recompress' flag of the node is false, we check whether the node is + * within the range of compress depth before compressing it. */ #define quicklistCompress(_ql, _node) \ do { \ if ((_node)->recompress) \ @@ -529,19 +541,25 @@ REDIS_STATIC int _quicklistNodeAllowMerge(const quicklistNode *a, (node)->sz = lpBytes((node)->entry); \ } while (0) -static quicklistNode* __quicklistCreatePlainNode(void *value, size_t sz) { +static quicklistNode* __quicklistCreateNode(int container, void *value, size_t sz) { quicklistNode *new_node = quicklistCreateNode(); - new_node->entry = zmalloc(sz); - new_node->container = QUICKLIST_NODE_CONTAINER_PLAIN; - memcpy(new_node->entry, value, sz); + new_node->container = container; + if (container == QUICKLIST_NODE_CONTAINER_PLAIN) { + new_node->entry = zmalloc(sz); + memcpy(new_node->entry, value, sz); + } else { + new_node->entry = lpPrepend(lpNew(0), value, sz); + } new_node->sz = sz; new_node->count++; return new_node; } static void __quicklistInsertPlainNode(quicklist *quicklist, quicklistNode *old_node, - void *value, size_t sz, int after) { - __quicklistInsertNode(quicklist, old_node, __quicklistCreatePlainNode(value, sz), after); + void *value, size_t sz, int after) +{ + quicklistNode *new_node = __quicklistCreateNode(QUICKLIST_NODE_CONTAINER_PLAIN, value, sz); + __quicklistInsertNode(quicklist, old_node, new_node, after); quicklist->count++; } @@ -741,9 +759,13 @@ void quicklistReplaceEntry(quicklistIter *iter, quicklistEntry *entry, void *data, size_t sz) { quicklist* quicklist = iter->quicklist; + quicklistNode *node = entry->node; + unsigned char *newentry; - if (likely(!QL_NODE_IS_PLAIN(entry->node) && !isLargeElement(sz))) { - entry->node->entry = lpReplace(entry->node->entry, &entry->zi, data, sz); + if (likely(!QL_NODE_IS_PLAIN(entry->node) && !isLargeElement(sz) && + (newentry = lpReplace(entry->node->entry, &entry->zi, data, sz)) != NULL)) + { + entry->node->entry = newentry; quicklistNodeUpdateSz(entry->node); /* quicklistNext() and quicklistGetIteratorEntryAtIdx() provide an uncompressed node */ quicklistCompress(quicklist, entry->node); @@ -758,17 +780,37 @@ void quicklistReplaceEntry(quicklistIter *iter, quicklistEntry *entry, quicklistInsertAfter(iter, entry, data, sz); __quicklistDelNode(quicklist, entry->node); } - } else { - entry->node->dont_compress = 1; /* Prevent compression in quicklistInsertAfter() */ - quicklistInsertAfter(iter, entry, data, sz); + } else { /* The node is full or data is a large element */ + quicklistNode *split_node = NULL, *new_node; + node->dont_compress = 1; /* Prevent compression in __quicklistInsertNode() */ + + /* If the entry is not at the tail, split the node at the entry's offset. */ + if (entry->offset != node->count - 1 && entry->offset != -1) + split_node = _quicklistSplitNode(node, entry->offset, 1); + + /* Create a new node and insert it after the original node. + * If the original node was split, insert the split node after the new node. */ + new_node = __quicklistCreateNode(isLargeElement(sz) ? + QUICKLIST_NODE_CONTAINER_PLAIN : QUICKLIST_NODE_CONTAINER_PACKED, data, sz); + __quicklistInsertNode(quicklist, node, new_node, 1); + if (split_node) __quicklistInsertNode(quicklist, new_node, split_node, 1); + quicklist->count++; + + /* Delete the replaced element. */ if (entry->node->count == 1) { __quicklistDelNode(quicklist, entry->node); } else { unsigned char *p = lpSeek(entry->node->entry, -1); quicklistDelIndex(quicklist, entry->node, &p); entry->node->dont_compress = 0; /* Re-enable compression */ - quicklistCompress(quicklist, entry->node); - quicklistCompress(quicklist, entry->node->next); + new_node = _quicklistMergeNodes(quicklist, new_node); + /* We can't know if the current node and its sibling nodes are correctly compressed, + * and we don't know if they are within the range of compress depth, so we need to + * use quicklistCompress() for compression, which checks if node is within compress + * depth before compressing. */ + quicklistCompress(quicklist, new_node); + quicklistCompress(quicklist, new_node->prev); + if (new_node->next) quicklistCompress(quicklist, new_node->next); } } @@ -826,6 +868,8 @@ REDIS_STATIC quicklistNode *_quicklistListpackMerge(quicklist *quicklist, } keep->count = lpLength(keep->entry); quicklistNodeUpdateSz(keep); + keep->recompress = 0; /* Prevent 'keep' from being recompressed if + * it becomes head or tail after merging. */ nokeep->count = 0; __quicklistDelNode(quicklist, nokeep); @@ -844,9 +888,10 @@ REDIS_STATIC quicklistNode *_quicklistListpackMerge(quicklist *quicklist, * - (center->next, center->next->next) * - (center->prev, center) * - (center, center->next) + * + * Returns the new 'center' after merging. */ -REDIS_STATIC void _quicklistMergeNodes(quicklist *quicklist, - quicklistNode *center) { +REDIS_STATIC quicklistNode *_quicklistMergeNodes(quicklist *quicklist, quicklistNode *center) { int fill = quicklist->fill; quicklistNode *prev, *prev_prev, *next, *next_next, *target; prev = prev_prev = next = next_next = target = NULL; @@ -886,8 +931,9 @@ REDIS_STATIC void _quicklistMergeNodes(quicklist *quicklist, /* Use result of center merge (or original) to merge with next node. */ if (_quicklistNodeAllowMerge(target, target->next, fill)) { - _quicklistListpackMerge(quicklist, target, target->next); + target = _quicklistListpackMerge(quicklist, target, target->next); } + return target; } /* Split 'node' into two parts, parameterized by 'offset' and 'after'. @@ -1002,7 +1048,7 @@ REDIS_STATIC void _quicklistInsert(quicklistIter *iter, quicklistEntry *entry, } else { quicklistDecompressNodeForUse(node); new_node = _quicklistSplitNode(node, entry->offset, after); - quicklistNode *entry_node = __quicklistCreatePlainNode(value, sz); + quicklistNode *entry_node = __quicklistCreateNode(QUICKLIST_NODE_CONTAINER_PLAIN, value, sz); __quicklistInsertNode(quicklist, node, entry_node, after); __quicklistInsertNode(quicklist, entry_node, new_node, after); quicklist->count++; @@ -3224,7 +3270,7 @@ int quicklistTest(int argc, char *argv[], int flags) { memcpy(s, "helloworld", 10); memcpy(s + sz - 10, "1234567890", 10); - quicklistNode *node = __quicklistCreatePlainNode(s, sz); + quicklistNode *node = __quicklistCreateNode(QUICKLIST_NODE_CONTAINER_PLAIN, s, sz); /* Just to avoid triggering the assertion in __quicklistCompressNode(), * it disables the passing of quicklist head or tail node. */ @@ -81,9 +81,6 @@ #define RDB_TYPE_MODULE_PRE_GA 6 /* Used in 4.0 release candidates */ #define RDB_TYPE_MODULE_2 7 /* Module value with annotations for parsing without the generating module being loaded. */ -/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */ - -/* Object types for encoded objects. */ #define RDB_TYPE_HASH_ZIPMAP 9 #define RDB_TYPE_LIST_ZIPLIST 10 #define RDB_TYPE_SET_INTSET 11 @@ -97,7 +94,7 @@ #define RDB_TYPE_STREAM_LISTPACKS_2 19 #define RDB_TYPE_SET_LISTPACK 20 #define RDB_TYPE_STREAM_LISTPACKS_3 21 -/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */ +/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType(), and rdb_type_string[] */ /* Test if a type is an object type. */ #define rdbIsObjectType(t) (((t) >= 0 && (t) <= 7) || ((t) >= 9 && (t) <= 21)) diff --git a/src/redis-check-aof.c b/src/redis-check-aof.c index 616177a..e28126d 100644 --- a/src/redis-check-aof.c +++ b/src/redis-check-aof.c @@ -233,6 +233,7 @@ int checkSingleAof(char *aof_filename, char *aof_filepath, int last_file, int fi struct redis_stat sb; if (redis_fstat(fileno(fp),&sb) == -1) { printf("Cannot stat file: %s, aborting...\n", aof_filename); + fclose(fp); exit(1); } @@ -343,6 +344,7 @@ int fileIsRDB(char *filepath) { struct redis_stat sb; if (redis_fstat(fileno(fp), &sb) == -1) { printf("Cannot stat file: %s\n", filepath); + fclose(fp); exit(1); } @@ -379,6 +381,7 @@ int fileIsManifest(char *filepath) { struct redis_stat sb; if (redis_fstat(fileno(fp), &sb) == -1) { printf("Cannot stat file: %s\n", filepath); + fclose(fp); exit(1); } @@ -395,15 +398,20 @@ int fileIsManifest(char *filepath) { break; } else { printf("Cannot read file: %s\n", filepath); + fclose(fp); exit(1); } } - /* Skip comments lines */ + /* We will skip comments lines. + * At present, the manifest format is fixed, see aofInfoFormat. + * We will break directly as long as it encounters other items. */ if (buf[0] == '#') { continue; } else if (!memcmp(buf, "file", strlen("file"))) { is_manifest = 1; + } else { + break; } } diff --git a/src/redis-check-rdb.c b/src/redis-check-rdb.c index 682135e..ffa05e8 100644 --- a/src/redis-check-rdb.c +++ b/src/redis-check-rdb.c @@ -98,7 +98,9 @@ char *rdb_type_string[] = { "hash-listpack", "zset-listpack", "quicklist-v2", + "stream-v2", "set-listpack", + "stream-v3", }; /* Show a few stats collected into 'rdbstate' */ diff --git a/src/redis-cli.c b/src/redis-cli.c index 3854701..b5c2736 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -8839,7 +8839,8 @@ static redisReply *sendScan(unsigned long long *it) { reply = redisCommand(context, "SCAN %llu MATCH %b COUNT %d", *it, config.pattern, sdslen(config.pattern), config.count); else - reply = redisCommand(context,"SCAN %llu",*it); + reply = redisCommand(context, "SCAN %llu COUNT %d", + *it, config.count); /* Handle any error conditions */ if(reply == NULL) { diff --git a/src/script_lua.c b/src/script_lua.c index 8cdd805..2c92638 100644 --- a/src/script_lua.c +++ b/src/script_lua.c @@ -818,8 +818,17 @@ static robj **luaArgsToRedisArgv(lua_State *lua, int *argc, int *argv_len) { /* We can't use lua_tolstring() for number -> string conversion * since Lua uses a format specifier that loses precision. */ lua_Number num = lua_tonumber(lua,j+1); - obj_len = fpconv_dtoa((double)num, dbuf); - dbuf[obj_len] = '\0'; + /* Integer printing function is much faster, check if we can safely use it. + * Since lua_Number is not explicitly an integer or a double, we need to make an effort + * to convert it as an integer when that's possible, since the string could later be used + * in a context that doesn't support scientific notation (e.g. 1e9 instead of 100000000). */ + long long lvalue; + if (double2ll((double)num, &lvalue)) + obj_len = ll2string(dbuf, sizeof(dbuf), lvalue); + else { + obj_len = fpconv_dtoa((double)num, dbuf); + dbuf[obj_len] = '\0'; + } obj_s = dbuf; } else { obj_s = (char*)lua_tolstring(lua,j+1,&obj_len); diff --git a/src/server.c b/src/server.c index 438325f..4d47b5e 100644 --- a/src/server.c +++ b/src/server.c @@ -3512,12 +3512,20 @@ void call(client *c, int flags) { * re-processing and unblock the client.*/ c->flags |= CLIENT_EXECUTING_COMMAND; + /* Setting the CLIENT_REPROCESSING_COMMAND flag so that during the actual + * processing of the command proc, the client is aware that it is being + * re-processed. */ + if (reprocessing_command) c->flags |= CLIENT_REPROCESSING_COMMAND; + monotime monotonic_start = 0; if (monotonicGetType() == MONOTONIC_CLOCK_HW) monotonic_start = getMonotonicUs(); c->cmd->proc(c); + /* Clear the CLIENT_REPROCESSING_COMMAND flag after the proc is executed. */ + if (reprocessing_command) c->flags &= ~CLIENT_REPROCESSING_COMMAND; + exitExecutionUnit(); /* In case client is blocked after trying to execute the command, @@ -3575,7 +3583,7 @@ void call(client *c, int flags) { /* Send the command to clients in MONITOR mode if applicable, * since some administrative commands are considered too dangerous to be shown. - * Other exceptions is a client which is unblocked and retring to process the command + * Other exceptions is a client which is unblocked and retrying to process the command * or we are currently in the process of loading AOF. */ if (update_command_stats && !reprocessing_command && !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN))) { diff --git a/src/server.h b/src/server.h index cb55503..b1fa542 100644 --- a/src/server.h +++ b/src/server.h @@ -400,6 +400,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; auth had been authenticated from the Module. */ #define CLIENT_MODULE_PREVENT_AOF_PROP (1ULL<<48) /* Module client do not want to propagate to AOF */ #define CLIENT_MODULE_PREVENT_REPL_PROP (1ULL<<49) /* Module client do not want to propagate to replica */ +#define CLIENT_REPROCESSING_COMMAND (1ULL<<50) /* The client is re-processing the command. */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ diff --git a/src/t_zset.c b/src/t_zset.c index 7717a4a..1b267e0 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -1172,7 +1172,8 @@ unsigned long zsetLength(const robj *zobj) { * and the value len hint indicates the approximate individual size of the added elements, * they are used to determine the initial representation. * - * If the hints are not known, and underestimation or 0 is suitable. */ + * If the hints are not known, and underestimation or 0 is suitable. + * We should never pass a negative value because it will convert to a very large unsigned number. */ robj *zsetTypeCreate(size_t size_hint, size_t val_len_hint) { if (size_hint <= server.zset_max_listpack_entries && val_len_hint <= server.zset_max_listpack_value) @@ -3001,7 +3002,7 @@ static void zrangeResultFinalizeClient(zrange_result_handler *handler, /* Result handler methods for storing the ZRANGESTORE to a zset. */ static void zrangeResultBeginStore(zrange_result_handler *handler, long length) { - handler->dstobj = zsetTypeCreate(length, 0); + handler->dstobj = zsetTypeCreate(length >= 0 ? length : 0, 0); } static void zrangeResultEmitCBufferForStore(zrange_result_handler *handler, diff --git a/src/version.h b/src/version.h index 7c6eea6..0b22cd0 100644 --- a/src/version.h +++ b/src/version.h @@ -1,2 +1,2 @@ -#define REDIS_VERSION "7.2.4" -#define REDIS_VERSION_NUM 0x00070204 +#define REDIS_VERSION "7.2.5" +#define REDIS_VERSION_NUM 0x00070205 |