diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-14 13:40:54 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-14 13:40:54 +0000 |
commit | 317c0644ccf108aa23ef3fd8358bd66c2840bfc0 (patch) | |
tree | c417b3d25c86b775989cb5ac042f37611b626c8a /src/blocked.c | |
parent | Initial commit. (diff) | |
download | redis-317c0644ccf108aa23ef3fd8358bd66c2840bfc0.tar.xz redis-317c0644ccf108aa23ef3fd8358bd66c2840bfc0.zip |
Adding upstream version 5:7.2.4.upstream/5%7.2.4
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/blocked.c')
-rw-r--r-- | src/blocked.c | 763 |
1 files changed, 763 insertions, 0 deletions
diff --git a/src/blocked.c b/src/blocked.c new file mode 100644 index 0000000..6ad4667 --- /dev/null +++ b/src/blocked.c @@ -0,0 +1,763 @@ +/* blocked.c - generic support for blocking operations like BLPOP & WAIT. + * + * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * --------------------------------------------------------------------------- + * + * API: + * + * blockClient() set the CLIENT_BLOCKED flag in the client, and set the + * specified block type 'btype' filed to one of BLOCKED_* macros. + * + * unblockClient() unblocks the client doing the following: + * 1) It calls the btype-specific function to cleanup the state. + * 2) It unblocks the client by unsetting the CLIENT_BLOCKED flag. + * 3) It puts the client into a list of just unblocked clients that are + * processed ASAP in the beforeSleep() event loop callback, so that + * if there is some query buffer to process, we do it. This is also + * required because otherwise there is no 'readable' event fired, we + * already read the pending commands. We also set the CLIENT_UNBLOCKED + * flag to remember the client is in the unblocked_clients list. + * + * processUnblockedClients() is called inside the beforeSleep() function + * to process the query buffer from unblocked clients and remove the clients + * from the blocked_clients queue. + * + * replyToBlockedClientTimedOut() is called by the cron function when + * a client blocked reaches the specified timeout (if the timeout is set + * to 0, no timeout is processed). + * It usually just needs to send a reply to the client. + * + * When implementing a new type of blocking operation, the implementation + * should modify unblockClient() and replyToBlockedClientTimedOut() in order + * to handle the btype-specific behavior of this two functions. + * If the blocking operation waits for certain keys to change state, the + * clusterRedirectBlockedClientIfNeeded() function should also be updated. + */ + +#include "server.h" +#include "slowlog.h" +#include "latency.h" +#include "monotonic.h" + +/* forward declarations */ +static void unblockClientWaitingData(client *c); +static void handleClientsBlockedOnKey(readyList *rl); +static void unblockClientOnKey(client *c, robj *key); +static void moduleUnblockClientOnKey(client *c, robj *key); +static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key); + +void initClientBlockingState(client *c) { + c->bstate.btype = BLOCKED_NONE; + c->bstate.timeout = 0; + c->bstate.keys = dictCreate(&objectKeyHeapPointerValueDictType); + c->bstate.numreplicas = 0; + c->bstate.reploffset = 0; + c->bstate.unblock_on_nokey = 0; + c->bstate.async_rm_call_handle = NULL; +} + +/* Block a client for the specific operation type. Once the CLIENT_BLOCKED + * flag is set client query buffer is not longer processed, but accumulated, + * and will be processed when the client is unblocked. */ +void blockClient(client *c, int btype) { + /* Master client should never be blocked unless pause or module */ + serverAssert(!(c->flags & CLIENT_MASTER && + btype != BLOCKED_MODULE && + btype != BLOCKED_POSTPONE)); + + c->flags |= CLIENT_BLOCKED; + c->bstate.btype = btype; + if (!(c->flags & CLIENT_MODULE)) server.blocked_clients++; /* We count blocked client stats on regular clients and not on module clients */ + server.blocked_clients_by_type[btype]++; + addClientToTimeoutTable(c); +} + +/* Usually when a client is unblocked due to being blocked while processing some command + * he will attempt to reprocess the command which will update the statistics. + * However in case the client was timed out or in case of module blocked client is being unblocked + * the command will not be reprocessed and we need to make stats update. + * This function will make updates to the commandstats, slowlog and monitors.*/ +void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors){ + const ustime_t total_cmd_duration = c->duration + blocked_us + reply_us; + c->lastcmd->microseconds += total_cmd_duration; + c->lastcmd->calls++; + server.stat_numcommands++; + if (had_errors) + c->lastcmd->failed_calls++; + if (server.latency_tracking_enabled) + updateCommandLatencyHistogram(&(c->lastcmd->latency_histogram), total_cmd_duration*1000); + /* Log the command into the Slow log if needed. */ + slowlogPushCurrentCommand(c, c->lastcmd, total_cmd_duration); + c->duration = 0; + /* Log the reply duration event. */ + latencyAddSampleIfNeeded("command-unblocking",reply_us/1000); +} + +/* This function is called in the beforeSleep() function of the event loop + * in order to process the pending input buffer of clients that were + * unblocked after a blocking operation. */ +void processUnblockedClients(void) { + listNode *ln; + client *c; + + while (listLength(server.unblocked_clients)) { + ln = listFirst(server.unblocked_clients); + serverAssert(ln != NULL); + c = ln->value; + listDelNode(server.unblocked_clients,ln); + c->flags &= ~CLIENT_UNBLOCKED; + + if (c->flags & CLIENT_MODULE) { + if (!(c->flags & CLIENT_BLOCKED)) { + moduleCallCommandUnblockedHandler(c); + } + continue; + } + + /* Process remaining data in the input buffer, unless the client + * is blocked again. Actually processInputBuffer() checks that the + * client is not blocked before to proceed, but things may change and + * the code is conceptually more correct this way. */ + if (!(c->flags & CLIENT_BLOCKED)) { + /* If we have a queued command, execute it now. */ + if (processPendingCommandAndInputBuffer(c) == C_ERR) { + c = NULL; + } + } + beforeNextClient(c); + } +} + +/* This function will schedule the client for reprocessing at a safe time. + * + * This is useful when a client was blocked for some reason (blocking operation, + * CLIENT PAUSE, or whatever), because it may end with some accumulated query + * buffer that needs to be processed ASAP: + * + * 1. When a client is blocked, its readable handler is still active. + * 2. However in this case it only gets data into the query buffer, but the + * query is not parsed or executed once there is enough to proceed as + * usually (because the client is blocked... so we can't execute commands). + * 3. When the client is unblocked, without this function, the client would + * have to write some query in order for the readable handler to finally + * call processQueryBuffer*() on it. + * 4. With this function instead we can put the client in a queue that will + * process it for queries ready to be executed at a safe time. + */ +void queueClientForReprocessing(client *c) { + /* The client may already be into the unblocked list because of a previous + * blocking operation, don't add back it into the list multiple times. */ + if (!(c->flags & CLIENT_UNBLOCKED)) { + c->flags |= CLIENT_UNBLOCKED; + listAddNodeTail(server.unblocked_clients,c); + } +} + +/* Unblock a client calling the right function depending on the kind + * of operation the client is blocking for. */ +void unblockClient(client *c, int queue_for_reprocessing) { + if (c->bstate.btype == BLOCKED_LIST || + c->bstate.btype == BLOCKED_ZSET || + c->bstate.btype == BLOCKED_STREAM) { + unblockClientWaitingData(c); + } else if (c->bstate.btype == BLOCKED_WAIT || c->bstate.btype == BLOCKED_WAITAOF) { + unblockClientWaitingReplicas(c); + } else if (c->bstate.btype == BLOCKED_MODULE) { + if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c); + unblockClientFromModule(c); + } else if (c->bstate.btype == BLOCKED_POSTPONE) { + listDelNode(server.postponed_clients,c->postponed_list_node); + c->postponed_list_node = NULL; + } else if (c->bstate.btype == BLOCKED_SHUTDOWN) { + /* No special cleanup. */ + } else { + serverPanic("Unknown btype in unblockClient()."); + } + + /* Reset the client for a new query, unless the client has pending command to process + * or in case a shutdown operation was canceled and we are still in the processCommand sequence */ + if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN) { + freeClientOriginalArgv(c); + /* Clients that are not blocked on keys are not reprocessed so we must + * call reqresAppendResponse here (for clients blocked on key, + * unblockClientOnKey is called, which eventually calls processCommand, + * which calls reqresAppendResponse) */ + reqresAppendResponse(c); + resetClient(c); + } + + /* Clear the flags, and put the client in the unblocked list so that + * we'll process new commands in its query buffer ASAP. */ + if (!(c->flags & CLIENT_MODULE)) server.blocked_clients--; /* We count blocked client stats on regular clients and not on module clients */ + server.blocked_clients_by_type[c->bstate.btype]--; + c->flags &= ~CLIENT_BLOCKED; + c->bstate.btype = BLOCKED_NONE; + c->bstate.unblock_on_nokey = 0; + removeClientFromTimeoutTable(c); + if (queue_for_reprocessing) queueClientForReprocessing(c); +} + +/* This function gets called when a blocked client timed out in order to + * send it a reply of some kind. After this function is called, + * unblockClient() will be called with the same client as argument. */ +void replyToBlockedClientTimedOut(client *c) { + if (c->bstate.btype == BLOCKED_LIST || + c->bstate.btype == BLOCKED_ZSET || + c->bstate.btype == BLOCKED_STREAM) { + addReplyNullArray(c); + updateStatsOnUnblock(c, 0, 0, 0); + } else if (c->bstate.btype == BLOCKED_WAIT) { + addReplyLongLong(c,replicationCountAcksByOffset(c->bstate.reploffset)); + } else if (c->bstate.btype == BLOCKED_WAITAOF) { + addReplyArrayLen(c,2); + addReplyLongLong(c,server.fsynced_reploff >= c->bstate.reploffset); + addReplyLongLong(c,replicationCountAOFAcksByOffset(c->bstate.reploffset)); + } else if (c->bstate.btype == BLOCKED_MODULE) { + moduleBlockedClientTimedOut(c); + } else { + serverPanic("Unknown btype in replyToBlockedClientTimedOut()."); + } +} + +/* If one or more clients are blocked on the SHUTDOWN command, this function + * sends them an error reply and unblocks them. */ +void replyToClientsBlockedOnShutdown(void) { + if (server.blocked_clients_by_type[BLOCKED_SHUTDOWN] == 0) return; + listNode *ln; + listIter li; + listRewind(server.clients, &li); + while((ln = listNext(&li))) { + client *c = listNodeValue(ln); + if (c->flags & CLIENT_BLOCKED && c->bstate.btype == BLOCKED_SHUTDOWN) { + addReplyError(c, "Errors trying to SHUTDOWN. Check logs."); + unblockClient(c, 1); + } + } +} + +/* Mass-unblock clients because something changed in the instance that makes + * blocking no longer safe. For example clients blocked in list operations + * in an instance which turns from master to slave is unsafe, so this function + * is called when a master turns into a slave. + * + * The semantics is to send an -UNBLOCKED error to the client, disconnecting + * it at the same time. */ +void disconnectAllBlockedClients(void) { + listNode *ln; + listIter li; + + listRewind(server.clients,&li); + while((ln = listNext(&li))) { + client *c = listNodeValue(ln); + + if (c->flags & CLIENT_BLOCKED) { + /* POSTPONEd clients are an exception, when they'll be unblocked, the + * command processing will start from scratch, and the command will + * be either executed or rejected. (unlike LIST blocked clients for + * which the command is already in progress in a way. */ + if (c->bstate.btype == BLOCKED_POSTPONE) + continue; + + unblockClientOnError(c, + "-UNBLOCKED force unblock from blocking operation, " + "instance state changed (master -> replica?)"); + c->flags |= CLIENT_CLOSE_AFTER_REPLY; + } + } +} + +/* This function should be called by Redis every time a single command, + * a MULTI/EXEC block, or a Lua script, terminated its execution after + * being called by a client. It handles serving clients blocked in all scenarios + * where a specific key access requires to block until that key is available. + * + * All the keys with at least one client blocked that are signaled as ready + * are accumulated into the server.ready_keys list. This function will run + * the list and will serve clients accordingly. + * Note that the function will iterate again and again (for example as a result of serving BLMOVE + * we can have new blocking clients to serve because of the PUSH side of BLMOVE.) + * + * This function is normally "fair", that is, it will serve clients + * using a FIFO behavior. However this fairness is violated in certain + * edge cases, that is, when we have clients blocked at the same time + * in a sorted set and in a list, for the same key (a very odd thing to + * do client side, indeed!). Because mismatching clients (blocking for + * a different type compared to the current key type) are moved in the + * other side of the linked list. However as long as the key starts to + * be used only for a single type, like virtually any Redis application will + * do, the function is already fair. */ +void handleClientsBlockedOnKeys(void) { + + /* In case we are already in the process of unblocking clients we should + * not make a recursive call, in order to prevent breaking fairness. */ + static int in_handling_blocked_clients = 0; + if (in_handling_blocked_clients) + return; + in_handling_blocked_clients = 1; + + /* This function is called only when also_propagate is in its basic state + * (i.e. not from call(), module context, etc.) */ + serverAssert(server.also_propagate.numops == 0); + + /* If a command being unblocked causes another command to get unblocked, + * like a BLMOVE would do, then the new unblocked command will get processed + * right away rather than wait for later. */ + while(listLength(server.ready_keys) != 0) { + list *l; + + /* Point server.ready_keys to a fresh list and save the current one + * locally. This way as we run the old list we are free to call + * signalKeyAsReady() that may push new elements in server.ready_keys + * when handling clients blocked into BLMOVE. */ + l = server.ready_keys; + server.ready_keys = listCreate(); + + while(listLength(l) != 0) { + listNode *ln = listFirst(l); + readyList *rl = ln->value; + + /* First of all remove this key from db->ready_keys so that + * we can safely call signalKeyAsReady() against this key. */ + dictDelete(rl->db->ready_keys,rl->key); + + handleClientsBlockedOnKey(rl); + + /* Free this item. */ + decrRefCount(rl->key); + zfree(rl); + listDelNode(l,ln); + } + listRelease(l); /* We have the new list on place at this point. */ + } + in_handling_blocked_clients = 0; +} + +/* Set a client in blocking mode for the specified key, with the specified timeout. + * The 'type' argument is BLOCKED_LIST,BLOCKED_ZSET or BLOCKED_STREAM depending on the kind of operation we are + * waiting for an empty key in order to awake the client. The client is blocked + * for all the 'numkeys' keys as in the 'keys' argument. + * The client will unblocked as soon as one of the keys in 'keys' value was updated. + * the parameter unblock_on_nokey can be used to force client to be unblocked even in the case the key + * is updated to become unavailable, either by type change (override), deletion or swapdb */ +void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, int unblock_on_nokey) { + dictEntry *db_blocked_entry, *db_blocked_existing_entry, *client_blocked_entry; + list *l; + int j; + + c->bstate.timeout = timeout; + for (j = 0; j < numkeys; j++) { + /* If the key already exists in the dictionary ignore it. */ + if (!(client_blocked_entry = dictAddRaw(c->bstate.keys,keys[j],NULL))) { + continue; + } + incrRefCount(keys[j]); + + /* And in the other "side", to map keys -> clients */ + db_blocked_entry = dictAddRaw(c->db->blocking_keys,keys[j], &db_blocked_existing_entry); + + /* In case key[j] did not have blocking clients yet, we need to create a new list */ + if (db_blocked_entry != NULL) { + l = listCreate(); + dictSetVal(c->db->blocking_keys, db_blocked_entry, l); + incrRefCount(keys[j]); + } else { + l = dictGetVal(db_blocked_existing_entry); + } + listAddNodeTail(l,c); + dictSetVal(c->bstate.keys,client_blocked_entry,listLast(l)); + + + /* We need to add the key to blocking_keys_unblock_on_nokey, if the client + * wants to be awakened if key is deleted (like XREADGROUP) */ + if (unblock_on_nokey) { + db_blocked_entry = dictAddRaw(c->db->blocking_keys_unblock_on_nokey, keys[j], &db_blocked_existing_entry); + if (db_blocked_entry) { + incrRefCount(keys[j]); + dictSetUnsignedIntegerVal(db_blocked_entry, 1); + } else { + dictIncrUnsignedIntegerVal(db_blocked_existing_entry, 1); + } + } + } + c->bstate.unblock_on_nokey = unblock_on_nokey; + /* Currently we assume key blocking will require reprocessing the command. + * However in case of modules, they have a different way to handle the reprocessing + * which does not require setting the pending command flag */ + if (btype != BLOCKED_MODULE) + c->flags |= CLIENT_PENDING_COMMAND; + blockClient(c,btype); +} + +/* Helper function to unblock a client that's waiting in a blocking operation such as BLPOP. + * Internal function for unblockClient() */ +static void unblockClientWaitingData(client *c) { + dictEntry *de; + dictIterator *di; + + if (dictSize(c->bstate.keys) == 0) + return; + + di = dictGetIterator(c->bstate.keys); + /* The client may wait for multiple keys, so unblock it for every key. */ + while((de = dictNext(di)) != NULL) { + releaseBlockedEntry(c, de, 0); + } + dictReleaseIterator(di); + dictEmpty(c->bstate.keys, NULL); +} + +static blocking_type getBlockedTypeByType(int type) { + switch (type) { + case OBJ_LIST: return BLOCKED_LIST; + case OBJ_ZSET: return BLOCKED_ZSET; + case OBJ_MODULE: return BLOCKED_MODULE; + case OBJ_STREAM: return BLOCKED_STREAM; + default: return BLOCKED_NONE; + } +} + +/* If the specified key has clients blocked waiting for list pushes, this + * function will put the key reference into the server.ready_keys list. + * Note that db->ready_keys is a hash table that allows us to avoid putting + * the same key again and again in the list in case of multiple pushes + * made by a script or in the context of MULTI/EXEC. + * + * The list will be finally processed by handleClientsBlockedOnKeys() */ +static void signalKeyAsReadyLogic(redisDb *db, robj *key, int type, int deleted) { + readyList *rl; + + /* Quick returns. */ + int btype = getBlockedTypeByType(type); + if (btype == BLOCKED_NONE) { + /* The type can never block. */ + return; + } + if (!server.blocked_clients_by_type[btype] && + !server.blocked_clients_by_type[BLOCKED_MODULE]) { + /* No clients block on this type. Note: Blocked modules are represented + * by BLOCKED_MODULE, even if the intention is to wake up by normal + * types (list, zset, stream), so we need to check that there are no + * blocked modules before we do a quick return here. */ + return; + } + + if (deleted) { + /* Key deleted and no clients blocking for this key? No need to queue it. */ + if (dictFind(db->blocking_keys_unblock_on_nokey,key) == NULL) + return; + /* Note: if we made it here it means the key is also present in db->blocking_keys */ + } else { + /* No clients blocking for this key? No need to queue it. */ + if (dictFind(db->blocking_keys,key) == NULL) + return; + } + + dictEntry *de, *existing; + de = dictAddRaw(db->ready_keys, key, &existing); + if (de) { + /* We add the key in the db->ready_keys dictionary in order + * to avoid adding it multiple times into a list with a simple O(1) + * check. */ + incrRefCount(key); + } else { + /* Key was already signaled? No need to queue it again. */ + return; + } + + /* Ok, we need to queue this key into server.ready_keys. */ + rl = zmalloc(sizeof(*rl)); + rl->key = key; + rl->db = db; + incrRefCount(key); + listAddNodeTail(server.ready_keys,rl); +} + +/* Helper function to wrap the logic of removing a client blocked key entry + * In this case we would like to do the following: + * 1. unlink the client from the global DB locked client list + * 2. remove the entry from the global db blocking list in case the list is empty + * 3. in case the global list is empty, also remove the key from the global dict of keys + * which should trigger unblock on key deletion + * 4. remove key from the client blocking keys list - NOTE, since client can be blocked on lots of keys, + * but unblocked when only one of them is triggered, we would like to avoid deleting each key separately + * and instead clear the dictionary in one-shot. this is why the remove_key argument is provided + * to support this logic in unblockClientWaitingData + */ +static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key) { + list *l; + listNode *pos; + void *key; + dictEntry *unblock_on_nokey_entry; + + key = dictGetKey(de); + pos = dictGetVal(de); + /* Remove this client from the list of clients waiting for this key. */ + l = dictFetchValue(c->db->blocking_keys, key); + serverAssertWithInfo(c,key,l != NULL); + listUnlinkNode(l,pos); + /* If the list is empty we need to remove it to avoid wasting memory + * We will also remove the key (if exists) from the blocking_keys_unblock_on_nokey dict. + * However, in case the list is not empty, we will have to still perform reference accounting + * on the blocking_keys_unblock_on_nokey and delete the entry in case of zero reference. + * Why? because it is possible that some more clients are blocked on the same key but without + * require to be triggered on key deletion, we do not want these to be later triggered by the + * signalDeletedKeyAsReady. */ + if (listLength(l) == 0) { + dictDelete(c->db->blocking_keys, key); + dictDelete(c->db->blocking_keys_unblock_on_nokey,key); + } else if (c->bstate.unblock_on_nokey) { + unblock_on_nokey_entry = dictFind(c->db->blocking_keys_unblock_on_nokey,key); + /* it is not possible to have a client blocked on nokey with no matching entry */ + serverAssertWithInfo(c,key,unblock_on_nokey_entry != NULL); + if (!dictIncrUnsignedIntegerVal(unblock_on_nokey_entry, -1)) { + /* in case the count is zero, we can delete the entry */ + dictDelete(c->db->blocking_keys_unblock_on_nokey,key); + } + } + if (remove_key) + dictDelete(c->bstate.keys, key); +} + +void signalKeyAsReady(redisDb *db, robj *key, int type) { + signalKeyAsReadyLogic(db, key, type, 0); +} + +void signalDeletedKeyAsReady(redisDb *db, robj *key, int type) { + signalKeyAsReadyLogic(db, key, type, 1); +} + +/* Helper function for handleClientsBlockedOnKeys(). This function is called + * whenever a key is ready. we iterate over all the clients blocked on this key + * and try to re-execute the command (in case the key is still available). */ +static void handleClientsBlockedOnKey(readyList *rl) { + + /* We serve clients in the same order they blocked for + * this key, from the first blocked to the last. */ + dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); + + if (de) { + list *clients = dictGetVal(de); + listNode *ln; + listIter li; + listRewind(clients,&li); + + /* Avoid processing more than the initial count so that we're not stuck + * in an endless loop in case the reprocessing of the command blocks again. */ + long count = listLength(clients); + while ((ln = listNext(&li)) && count--) { + client *receiver = listNodeValue(ln); + robj *o = lookupKeyReadWithFlags(rl->db, rl->key, LOOKUP_NOEFFECTS); + /* 1. In case new key was added/touched we need to verify it satisfy the + * blocked type, since we might process the wrong key type. + * 2. We want to serve clients blocked on module keys + * regardless of the object type: we don't know what the + * module is trying to accomplish right now. + * 3. In case of XREADGROUP call we will want to unblock on any change in object type + * or in case the key was deleted, since the group is no longer valid. */ + if ((o != NULL && (receiver->bstate.btype == getBlockedTypeByType(o->type))) || + (o != NULL && (receiver->bstate.btype == BLOCKED_MODULE)) || + (receiver->bstate.unblock_on_nokey)) + { + if (receiver->bstate.btype != BLOCKED_MODULE) + unblockClientOnKey(receiver, rl->key); + else + moduleUnblockClientOnKey(receiver, rl->key); + } + } + } +} + +/* block a client due to wait command */ +void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas) { + c->bstate.timeout = timeout; + c->bstate.reploffset = offset; + c->bstate.numreplicas = numreplicas; + listAddNodeHead(server.clients_waiting_acks,c); + blockClient(c,BLOCKED_WAIT); +} + +/* block a client due to waitaof command */ +void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas) { + c->bstate.timeout = timeout; + c->bstate.reploffset = offset; + c->bstate.numreplicas = numreplicas; + c->bstate.numlocal = numlocal; + listAddNodeHead(server.clients_waiting_acks,c); + blockClient(c,BLOCKED_WAITAOF); +} + +/* Postpone client from executing a command. For example the server might be busy + * requesting to avoid processing clients commands which will be processed later + * when the it is ready to accept them. */ +void blockPostponeClient(client *c) { + c->bstate.timeout = 0; + blockClient(c,BLOCKED_POSTPONE); + listAddNodeTail(server.postponed_clients, c); + c->postponed_list_node = listLast(server.postponed_clients); + /* Mark this client to execute its command */ + c->flags |= CLIENT_PENDING_COMMAND; +} + +/* Block client due to shutdown command */ +void blockClientShutdown(client *c) { + blockClient(c, BLOCKED_SHUTDOWN); +} + +/* Unblock a client once a specific key became available for it. + * This function will remove the client from the list of clients blocked on this key + * and also remove the key from the dictionary of keys this client is blocked on. + * in case the client has a command pending it will process it immediately. */ +static void unblockClientOnKey(client *c, robj *key) { + dictEntry *de; + + de = dictFind(c->bstate.keys, key); + releaseBlockedEntry(c, de, 1); + + /* Only in case of blocking API calls, we might be blocked on several keys. + however we should force unblock the entire blocking keys */ + serverAssert(c->bstate.btype == BLOCKED_STREAM || + c->bstate.btype == BLOCKED_LIST || + c->bstate.btype == BLOCKED_ZSET); + + /* We need to unblock the client before calling processCommandAndResetClient + * because it checks the CLIENT_BLOCKED flag */ + unblockClient(c, 0); + /* In case this client was blocked on keys during command + * we need to re process the command again */ + if (c->flags & CLIENT_PENDING_COMMAND) { + c->flags &= ~CLIENT_PENDING_COMMAND; + /* We want the command processing and the unblock handler (see RM_Call 'K' option) + * to run atomically, this is why we must enter the execution unit here before + * running the command, and exit the execution unit after calling the unblock handler (if exists). + * Notice that we also must set the current client so it will be available + * when we will try to send the client side caching notification (done on 'afterCommand'). */ + client *old_client = server.current_client; + server.current_client = c; + enterExecutionUnit(1, 0); + processCommandAndResetClient(c); + if (!(c->flags & CLIENT_BLOCKED)) { + if (c->flags & CLIENT_MODULE) { + moduleCallCommandUnblockedHandler(c); + } else { + queueClientForReprocessing(c); + } + } + exitExecutionUnit(); + afterCommand(c); + server.current_client = old_client; + } +} + +/* Unblock a client blocked on the specific key from module context. + * This function will try to serve the module call, and in case it succeeds, + * it will add the client to the list of module unblocked clients which will + * be processed in moduleHandleBlockedClients. */ +static void moduleUnblockClientOnKey(client *c, robj *key) { + long long prev_error_replies = server.stat_total_error_replies; + client *old_client = server.current_client; + server.current_client = c; + monotime replyTimer; + elapsedStart(&replyTimer); + + if (moduleTryServeClientBlockedOnKey(c, key)) { + updateStatsOnUnblock(c, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies); + moduleUnblockClient(c); + } + /* We need to call afterCommand even if the client was not unblocked + * in order to propagate any changes that could have been done inside + * moduleTryServeClientBlockedOnKey */ + afterCommand(c); + server.current_client = old_client; +} + +/* Unblock a client which is currently Blocked on and provided a timeout. + * The implementation will first reply to the blocked client with null response + * or, in case of module blocked client the timeout callback will be used. + * In this case since we might have a command pending + * we want to remove the pending flag to indicate we already responded to the + * command with timeout reply. */ +void unblockClientOnTimeout(client *c) { + replyToBlockedClientTimedOut(c); + if (c->flags & CLIENT_PENDING_COMMAND) + c->flags &= ~CLIENT_PENDING_COMMAND; + unblockClient(c, 1); +} + +/* Unblock a client which is currently Blocked with error. + * If err_str is provided it will be used to reply to the blocked client */ +void unblockClientOnError(client *c, const char *err_str) { + if (err_str) + addReplyError(c, err_str); + updateStatsOnUnblock(c, 0, 0, 1); + if (c->flags & CLIENT_PENDING_COMMAND) + c->flags &= ~CLIENT_PENDING_COMMAND; + unblockClient(c, 1); +} + +/* sets blocking_keys to the total number of keys which has at least one client blocked on them + * sets blocking_keys_on_nokey to the total number of keys which has at least one client + * blocked on them to be written or deleted */ +void totalNumberOfBlockingKeys(unsigned long *blocking_keys, unsigned long *bloking_keys_on_nokey) { + unsigned long bkeys=0, bkeys_on_nokey=0; + for (int j = 0; j < server.dbnum; j++) { + bkeys += dictSize(server.db[j].blocking_keys); + bkeys_on_nokey += dictSize(server.db[j].blocking_keys_unblock_on_nokey); + } + if (blocking_keys) + *blocking_keys = bkeys; + if (bloking_keys_on_nokey) + *bloking_keys_on_nokey = bkeys_on_nokey; +} + +void blockedBeforeSleep(void) { + /* Handle precise timeouts of blocked clients. */ + handleBlockedClientsTimeout(); + + /* Unblock all the clients blocked for synchronous replication + * in WAIT or WAITAOF. */ + if (listLength(server.clients_waiting_acks)) + processClientsWaitingReplicas(); + + /* Try to process blocked clients every once in while. + * + * Example: A module calls RM_SignalKeyAsReady from within a timer callback + * (So we don't visit processCommand() at all). + * + * This may unblock clients, so must be done before processUnblockedClients */ + handleClientsBlockedOnKeys(); + + /* Check if there are clients unblocked by modules that implement + * blocking commands. */ + if (moduleCount()) + moduleHandleBlockedClients(); + + /* Try to process pending commands for clients that were just unblocked. */ + if (listLength(server.unblocked_clients)) + processUnblockedClients(); +} |