summaryrefslogtreecommitdiffstats
path: root/src/blocked.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/blocked.c')
-rw-r--r--src/blocked.c763
1 files changed, 763 insertions, 0 deletions
diff --git a/src/blocked.c b/src/blocked.c
new file mode 100644
index 0000000..6ad4667
--- /dev/null
+++ b/src/blocked.c
@@ -0,0 +1,763 @@
+/* blocked.c - generic support for blocking operations like BLPOP & WAIT.
+ *
+ * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * ---------------------------------------------------------------------------
+ *
+ * API:
+ *
+ * blockClient() set the CLIENT_BLOCKED flag in the client, and set the
+ * specified block type 'btype' filed to one of BLOCKED_* macros.
+ *
+ * unblockClient() unblocks the client doing the following:
+ * 1) It calls the btype-specific function to cleanup the state.
+ * 2) It unblocks the client by unsetting the CLIENT_BLOCKED flag.
+ * 3) It puts the client into a list of just unblocked clients that are
+ * processed ASAP in the beforeSleep() event loop callback, so that
+ * if there is some query buffer to process, we do it. This is also
+ * required because otherwise there is no 'readable' event fired, we
+ * already read the pending commands. We also set the CLIENT_UNBLOCKED
+ * flag to remember the client is in the unblocked_clients list.
+ *
+ * processUnblockedClients() is called inside the beforeSleep() function
+ * to process the query buffer from unblocked clients and remove the clients
+ * from the blocked_clients queue.
+ *
+ * replyToBlockedClientTimedOut() is called by the cron function when
+ * a client blocked reaches the specified timeout (if the timeout is set
+ * to 0, no timeout is processed).
+ * It usually just needs to send a reply to the client.
+ *
+ * When implementing a new type of blocking operation, the implementation
+ * should modify unblockClient() and replyToBlockedClientTimedOut() in order
+ * to handle the btype-specific behavior of this two functions.
+ * If the blocking operation waits for certain keys to change state, the
+ * clusterRedirectBlockedClientIfNeeded() function should also be updated.
+ */
+
+#include "server.h"
+#include "slowlog.h"
+#include "latency.h"
+#include "monotonic.h"
+
+/* forward declarations */
+static void unblockClientWaitingData(client *c);
+static void handleClientsBlockedOnKey(readyList *rl);
+static void unblockClientOnKey(client *c, robj *key);
+static void moduleUnblockClientOnKey(client *c, robj *key);
+static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key);
+
+void initClientBlockingState(client *c) {
+ c->bstate.btype = BLOCKED_NONE;
+ c->bstate.timeout = 0;
+ c->bstate.keys = dictCreate(&objectKeyHeapPointerValueDictType);
+ c->bstate.numreplicas = 0;
+ c->bstate.reploffset = 0;
+ c->bstate.unblock_on_nokey = 0;
+ c->bstate.async_rm_call_handle = NULL;
+}
+
+/* Block a client for the specific operation type. Once the CLIENT_BLOCKED
+ * flag is set client query buffer is not longer processed, but accumulated,
+ * and will be processed when the client is unblocked. */
+void blockClient(client *c, int btype) {
+ /* Master client should never be blocked unless pause or module */
+ serverAssert(!(c->flags & CLIENT_MASTER &&
+ btype != BLOCKED_MODULE &&
+ btype != BLOCKED_POSTPONE));
+
+ c->flags |= CLIENT_BLOCKED;
+ c->bstate.btype = btype;
+ if (!(c->flags & CLIENT_MODULE)) server.blocked_clients++; /* We count blocked client stats on regular clients and not on module clients */
+ server.blocked_clients_by_type[btype]++;
+ addClientToTimeoutTable(c);
+}
+
+/* Usually when a client is unblocked due to being blocked while processing some command
+ * he will attempt to reprocess the command which will update the statistics.
+ * However in case the client was timed out or in case of module blocked client is being unblocked
+ * the command will not be reprocessed and we need to make stats update.
+ * This function will make updates to the commandstats, slowlog and monitors.*/
+void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors){
+ const ustime_t total_cmd_duration = c->duration + blocked_us + reply_us;
+ c->lastcmd->microseconds += total_cmd_duration;
+ c->lastcmd->calls++;
+ server.stat_numcommands++;
+ if (had_errors)
+ c->lastcmd->failed_calls++;
+ if (server.latency_tracking_enabled)
+ updateCommandLatencyHistogram(&(c->lastcmd->latency_histogram), total_cmd_duration*1000);
+ /* Log the command into the Slow log if needed. */
+ slowlogPushCurrentCommand(c, c->lastcmd, total_cmd_duration);
+ c->duration = 0;
+ /* Log the reply duration event. */
+ latencyAddSampleIfNeeded("command-unblocking",reply_us/1000);
+}
+
+/* This function is called in the beforeSleep() function of the event loop
+ * in order to process the pending input buffer of clients that were
+ * unblocked after a blocking operation. */
+void processUnblockedClients(void) {
+ listNode *ln;
+ client *c;
+
+ while (listLength(server.unblocked_clients)) {
+ ln = listFirst(server.unblocked_clients);
+ serverAssert(ln != NULL);
+ c = ln->value;
+ listDelNode(server.unblocked_clients,ln);
+ c->flags &= ~CLIENT_UNBLOCKED;
+
+ if (c->flags & CLIENT_MODULE) {
+ if (!(c->flags & CLIENT_BLOCKED)) {
+ moduleCallCommandUnblockedHandler(c);
+ }
+ continue;
+ }
+
+ /* Process remaining data in the input buffer, unless the client
+ * is blocked again. Actually processInputBuffer() checks that the
+ * client is not blocked before to proceed, but things may change and
+ * the code is conceptually more correct this way. */
+ if (!(c->flags & CLIENT_BLOCKED)) {
+ /* If we have a queued command, execute it now. */
+ if (processPendingCommandAndInputBuffer(c) == C_ERR) {
+ c = NULL;
+ }
+ }
+ beforeNextClient(c);
+ }
+}
+
+/* This function will schedule the client for reprocessing at a safe time.
+ *
+ * This is useful when a client was blocked for some reason (blocking operation,
+ * CLIENT PAUSE, or whatever), because it may end with some accumulated query
+ * buffer that needs to be processed ASAP:
+ *
+ * 1. When a client is blocked, its readable handler is still active.
+ * 2. However in this case it only gets data into the query buffer, but the
+ * query is not parsed or executed once there is enough to proceed as
+ * usually (because the client is blocked... so we can't execute commands).
+ * 3. When the client is unblocked, without this function, the client would
+ * have to write some query in order for the readable handler to finally
+ * call processQueryBuffer*() on it.
+ * 4. With this function instead we can put the client in a queue that will
+ * process it for queries ready to be executed at a safe time.
+ */
+void queueClientForReprocessing(client *c) {
+ /* The client may already be into the unblocked list because of a previous
+ * blocking operation, don't add back it into the list multiple times. */
+ if (!(c->flags & CLIENT_UNBLOCKED)) {
+ c->flags |= CLIENT_UNBLOCKED;
+ listAddNodeTail(server.unblocked_clients,c);
+ }
+}
+
+/* Unblock a client calling the right function depending on the kind
+ * of operation the client is blocking for. */
+void unblockClient(client *c, int queue_for_reprocessing) {
+ if (c->bstate.btype == BLOCKED_LIST ||
+ c->bstate.btype == BLOCKED_ZSET ||
+ c->bstate.btype == BLOCKED_STREAM) {
+ unblockClientWaitingData(c);
+ } else if (c->bstate.btype == BLOCKED_WAIT || c->bstate.btype == BLOCKED_WAITAOF) {
+ unblockClientWaitingReplicas(c);
+ } else if (c->bstate.btype == BLOCKED_MODULE) {
+ if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
+ unblockClientFromModule(c);
+ } else if (c->bstate.btype == BLOCKED_POSTPONE) {
+ listDelNode(server.postponed_clients,c->postponed_list_node);
+ c->postponed_list_node = NULL;
+ } else if (c->bstate.btype == BLOCKED_SHUTDOWN) {
+ /* No special cleanup. */
+ } else {
+ serverPanic("Unknown btype in unblockClient().");
+ }
+
+ /* Reset the client for a new query, unless the client has pending command to process
+ * or in case a shutdown operation was canceled and we are still in the processCommand sequence */
+ if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN) {
+ freeClientOriginalArgv(c);
+ /* Clients that are not blocked on keys are not reprocessed so we must
+ * call reqresAppendResponse here (for clients blocked on key,
+ * unblockClientOnKey is called, which eventually calls processCommand,
+ * which calls reqresAppendResponse) */
+ reqresAppendResponse(c);
+ resetClient(c);
+ }
+
+ /* Clear the flags, and put the client in the unblocked list so that
+ * we'll process new commands in its query buffer ASAP. */
+ if (!(c->flags & CLIENT_MODULE)) server.blocked_clients--; /* We count blocked client stats on regular clients and not on module clients */
+ server.blocked_clients_by_type[c->bstate.btype]--;
+ c->flags &= ~CLIENT_BLOCKED;
+ c->bstate.btype = BLOCKED_NONE;
+ c->bstate.unblock_on_nokey = 0;
+ removeClientFromTimeoutTable(c);
+ if (queue_for_reprocessing) queueClientForReprocessing(c);
+}
+
+/* This function gets called when a blocked client timed out in order to
+ * send it a reply of some kind. After this function is called,
+ * unblockClient() will be called with the same client as argument. */
+void replyToBlockedClientTimedOut(client *c) {
+ if (c->bstate.btype == BLOCKED_LIST ||
+ c->bstate.btype == BLOCKED_ZSET ||
+ c->bstate.btype == BLOCKED_STREAM) {
+ addReplyNullArray(c);
+ updateStatsOnUnblock(c, 0, 0, 0);
+ } else if (c->bstate.btype == BLOCKED_WAIT) {
+ addReplyLongLong(c,replicationCountAcksByOffset(c->bstate.reploffset));
+ } else if (c->bstate.btype == BLOCKED_WAITAOF) {
+ addReplyArrayLen(c,2);
+ addReplyLongLong(c,server.fsynced_reploff >= c->bstate.reploffset);
+ addReplyLongLong(c,replicationCountAOFAcksByOffset(c->bstate.reploffset));
+ } else if (c->bstate.btype == BLOCKED_MODULE) {
+ moduleBlockedClientTimedOut(c);
+ } else {
+ serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
+ }
+}
+
+/* If one or more clients are blocked on the SHUTDOWN command, this function
+ * sends them an error reply and unblocks them. */
+void replyToClientsBlockedOnShutdown(void) {
+ if (server.blocked_clients_by_type[BLOCKED_SHUTDOWN] == 0) return;
+ listNode *ln;
+ listIter li;
+ listRewind(server.clients, &li);
+ while((ln = listNext(&li))) {
+ client *c = listNodeValue(ln);
+ if (c->flags & CLIENT_BLOCKED && c->bstate.btype == BLOCKED_SHUTDOWN) {
+ addReplyError(c, "Errors trying to SHUTDOWN. Check logs.");
+ unblockClient(c, 1);
+ }
+ }
+}
+
+/* Mass-unblock clients because something changed in the instance that makes
+ * blocking no longer safe. For example clients blocked in list operations
+ * in an instance which turns from master to slave is unsafe, so this function
+ * is called when a master turns into a slave.
+ *
+ * The semantics is to send an -UNBLOCKED error to the client, disconnecting
+ * it at the same time. */
+void disconnectAllBlockedClients(void) {
+ listNode *ln;
+ listIter li;
+
+ listRewind(server.clients,&li);
+ while((ln = listNext(&li))) {
+ client *c = listNodeValue(ln);
+
+ if (c->flags & CLIENT_BLOCKED) {
+ /* POSTPONEd clients are an exception, when they'll be unblocked, the
+ * command processing will start from scratch, and the command will
+ * be either executed or rejected. (unlike LIST blocked clients for
+ * which the command is already in progress in a way. */
+ if (c->bstate.btype == BLOCKED_POSTPONE)
+ continue;
+
+ unblockClientOnError(c,
+ "-UNBLOCKED force unblock from blocking operation, "
+ "instance state changed (master -> replica?)");
+ c->flags |= CLIENT_CLOSE_AFTER_REPLY;
+ }
+ }
+}
+
+/* This function should be called by Redis every time a single command,
+ * a MULTI/EXEC block, or a Lua script, terminated its execution after
+ * being called by a client. It handles serving clients blocked in all scenarios
+ * where a specific key access requires to block until that key is available.
+ *
+ * All the keys with at least one client blocked that are signaled as ready
+ * are accumulated into the server.ready_keys list. This function will run
+ * the list and will serve clients accordingly.
+ * Note that the function will iterate again and again (for example as a result of serving BLMOVE
+ * we can have new blocking clients to serve because of the PUSH side of BLMOVE.)
+ *
+ * This function is normally "fair", that is, it will serve clients
+ * using a FIFO behavior. However this fairness is violated in certain
+ * edge cases, that is, when we have clients blocked at the same time
+ * in a sorted set and in a list, for the same key (a very odd thing to
+ * do client side, indeed!). Because mismatching clients (blocking for
+ * a different type compared to the current key type) are moved in the
+ * other side of the linked list. However as long as the key starts to
+ * be used only for a single type, like virtually any Redis application will
+ * do, the function is already fair. */
+void handleClientsBlockedOnKeys(void) {
+
+ /* In case we are already in the process of unblocking clients we should
+ * not make a recursive call, in order to prevent breaking fairness. */
+ static int in_handling_blocked_clients = 0;
+ if (in_handling_blocked_clients)
+ return;
+ in_handling_blocked_clients = 1;
+
+ /* This function is called only when also_propagate is in its basic state
+ * (i.e. not from call(), module context, etc.) */
+ serverAssert(server.also_propagate.numops == 0);
+
+ /* If a command being unblocked causes another command to get unblocked,
+ * like a BLMOVE would do, then the new unblocked command will get processed
+ * right away rather than wait for later. */
+ while(listLength(server.ready_keys) != 0) {
+ list *l;
+
+ /* Point server.ready_keys to a fresh list and save the current one
+ * locally. This way as we run the old list we are free to call
+ * signalKeyAsReady() that may push new elements in server.ready_keys
+ * when handling clients blocked into BLMOVE. */
+ l = server.ready_keys;
+ server.ready_keys = listCreate();
+
+ while(listLength(l) != 0) {
+ listNode *ln = listFirst(l);
+ readyList *rl = ln->value;
+
+ /* First of all remove this key from db->ready_keys so that
+ * we can safely call signalKeyAsReady() against this key. */
+ dictDelete(rl->db->ready_keys,rl->key);
+
+ handleClientsBlockedOnKey(rl);
+
+ /* Free this item. */
+ decrRefCount(rl->key);
+ zfree(rl);
+ listDelNode(l,ln);
+ }
+ listRelease(l); /* We have the new list on place at this point. */
+ }
+ in_handling_blocked_clients = 0;
+}
+
+/* Set a client in blocking mode for the specified key, with the specified timeout.
+ * The 'type' argument is BLOCKED_LIST,BLOCKED_ZSET or BLOCKED_STREAM depending on the kind of operation we are
+ * waiting for an empty key in order to awake the client. The client is blocked
+ * for all the 'numkeys' keys as in the 'keys' argument.
+ * The client will unblocked as soon as one of the keys in 'keys' value was updated.
+ * the parameter unblock_on_nokey can be used to force client to be unblocked even in the case the key
+ * is updated to become unavailable, either by type change (override), deletion or swapdb */
+void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, int unblock_on_nokey) {
+ dictEntry *db_blocked_entry, *db_blocked_existing_entry, *client_blocked_entry;
+ list *l;
+ int j;
+
+ c->bstate.timeout = timeout;
+ for (j = 0; j < numkeys; j++) {
+ /* If the key already exists in the dictionary ignore it. */
+ if (!(client_blocked_entry = dictAddRaw(c->bstate.keys,keys[j],NULL))) {
+ continue;
+ }
+ incrRefCount(keys[j]);
+
+ /* And in the other "side", to map keys -> clients */
+ db_blocked_entry = dictAddRaw(c->db->blocking_keys,keys[j], &db_blocked_existing_entry);
+
+ /* In case key[j] did not have blocking clients yet, we need to create a new list */
+ if (db_blocked_entry != NULL) {
+ l = listCreate();
+ dictSetVal(c->db->blocking_keys, db_blocked_entry, l);
+ incrRefCount(keys[j]);
+ } else {
+ l = dictGetVal(db_blocked_existing_entry);
+ }
+ listAddNodeTail(l,c);
+ dictSetVal(c->bstate.keys,client_blocked_entry,listLast(l));
+
+
+ /* We need to add the key to blocking_keys_unblock_on_nokey, if the client
+ * wants to be awakened if key is deleted (like XREADGROUP) */
+ if (unblock_on_nokey) {
+ db_blocked_entry = dictAddRaw(c->db->blocking_keys_unblock_on_nokey, keys[j], &db_blocked_existing_entry);
+ if (db_blocked_entry) {
+ incrRefCount(keys[j]);
+ dictSetUnsignedIntegerVal(db_blocked_entry, 1);
+ } else {
+ dictIncrUnsignedIntegerVal(db_blocked_existing_entry, 1);
+ }
+ }
+ }
+ c->bstate.unblock_on_nokey = unblock_on_nokey;
+ /* Currently we assume key blocking will require reprocessing the command.
+ * However in case of modules, they have a different way to handle the reprocessing
+ * which does not require setting the pending command flag */
+ if (btype != BLOCKED_MODULE)
+ c->flags |= CLIENT_PENDING_COMMAND;
+ blockClient(c,btype);
+}
+
+/* Helper function to unblock a client that's waiting in a blocking operation such as BLPOP.
+ * Internal function for unblockClient() */
+static void unblockClientWaitingData(client *c) {
+ dictEntry *de;
+ dictIterator *di;
+
+ if (dictSize(c->bstate.keys) == 0)
+ return;
+
+ di = dictGetIterator(c->bstate.keys);
+ /* The client may wait for multiple keys, so unblock it for every key. */
+ while((de = dictNext(di)) != NULL) {
+ releaseBlockedEntry(c, de, 0);
+ }
+ dictReleaseIterator(di);
+ dictEmpty(c->bstate.keys, NULL);
+}
+
+static blocking_type getBlockedTypeByType(int type) {
+ switch (type) {
+ case OBJ_LIST: return BLOCKED_LIST;
+ case OBJ_ZSET: return BLOCKED_ZSET;
+ case OBJ_MODULE: return BLOCKED_MODULE;
+ case OBJ_STREAM: return BLOCKED_STREAM;
+ default: return BLOCKED_NONE;
+ }
+}
+
+/* If the specified key has clients blocked waiting for list pushes, this
+ * function will put the key reference into the server.ready_keys list.
+ * Note that db->ready_keys is a hash table that allows us to avoid putting
+ * the same key again and again in the list in case of multiple pushes
+ * made by a script or in the context of MULTI/EXEC.
+ *
+ * The list will be finally processed by handleClientsBlockedOnKeys() */
+static void signalKeyAsReadyLogic(redisDb *db, robj *key, int type, int deleted) {
+ readyList *rl;
+
+ /* Quick returns. */
+ int btype = getBlockedTypeByType(type);
+ if (btype == BLOCKED_NONE) {
+ /* The type can never block. */
+ return;
+ }
+ if (!server.blocked_clients_by_type[btype] &&
+ !server.blocked_clients_by_type[BLOCKED_MODULE]) {
+ /* No clients block on this type. Note: Blocked modules are represented
+ * by BLOCKED_MODULE, even if the intention is to wake up by normal
+ * types (list, zset, stream), so we need to check that there are no
+ * blocked modules before we do a quick return here. */
+ return;
+ }
+
+ if (deleted) {
+ /* Key deleted and no clients blocking for this key? No need to queue it. */
+ if (dictFind(db->blocking_keys_unblock_on_nokey,key) == NULL)
+ return;
+ /* Note: if we made it here it means the key is also present in db->blocking_keys */
+ } else {
+ /* No clients blocking for this key? No need to queue it. */
+ if (dictFind(db->blocking_keys,key) == NULL)
+ return;
+ }
+
+ dictEntry *de, *existing;
+ de = dictAddRaw(db->ready_keys, key, &existing);
+ if (de) {
+ /* We add the key in the db->ready_keys dictionary in order
+ * to avoid adding it multiple times into a list with a simple O(1)
+ * check. */
+ incrRefCount(key);
+ } else {
+ /* Key was already signaled? No need to queue it again. */
+ return;
+ }
+
+ /* Ok, we need to queue this key into server.ready_keys. */
+ rl = zmalloc(sizeof(*rl));
+ rl->key = key;
+ rl->db = db;
+ incrRefCount(key);
+ listAddNodeTail(server.ready_keys,rl);
+}
+
+/* Helper function to wrap the logic of removing a client blocked key entry
+ * In this case we would like to do the following:
+ * 1. unlink the client from the global DB locked client list
+ * 2. remove the entry from the global db blocking list in case the list is empty
+ * 3. in case the global list is empty, also remove the key from the global dict of keys
+ * which should trigger unblock on key deletion
+ * 4. remove key from the client blocking keys list - NOTE, since client can be blocked on lots of keys,
+ * but unblocked when only one of them is triggered, we would like to avoid deleting each key separately
+ * and instead clear the dictionary in one-shot. this is why the remove_key argument is provided
+ * to support this logic in unblockClientWaitingData
+ */
+static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key) {
+ list *l;
+ listNode *pos;
+ void *key;
+ dictEntry *unblock_on_nokey_entry;
+
+ key = dictGetKey(de);
+ pos = dictGetVal(de);
+ /* Remove this client from the list of clients waiting for this key. */
+ l = dictFetchValue(c->db->blocking_keys, key);
+ serverAssertWithInfo(c,key,l != NULL);
+ listUnlinkNode(l,pos);
+ /* If the list is empty we need to remove it to avoid wasting memory
+ * We will also remove the key (if exists) from the blocking_keys_unblock_on_nokey dict.
+ * However, in case the list is not empty, we will have to still perform reference accounting
+ * on the blocking_keys_unblock_on_nokey and delete the entry in case of zero reference.
+ * Why? because it is possible that some more clients are blocked on the same key but without
+ * require to be triggered on key deletion, we do not want these to be later triggered by the
+ * signalDeletedKeyAsReady. */
+ if (listLength(l) == 0) {
+ dictDelete(c->db->blocking_keys, key);
+ dictDelete(c->db->blocking_keys_unblock_on_nokey,key);
+ } else if (c->bstate.unblock_on_nokey) {
+ unblock_on_nokey_entry = dictFind(c->db->blocking_keys_unblock_on_nokey,key);
+ /* it is not possible to have a client blocked on nokey with no matching entry */
+ serverAssertWithInfo(c,key,unblock_on_nokey_entry != NULL);
+ if (!dictIncrUnsignedIntegerVal(unblock_on_nokey_entry, -1)) {
+ /* in case the count is zero, we can delete the entry */
+ dictDelete(c->db->blocking_keys_unblock_on_nokey,key);
+ }
+ }
+ if (remove_key)
+ dictDelete(c->bstate.keys, key);
+}
+
+void signalKeyAsReady(redisDb *db, robj *key, int type) {
+ signalKeyAsReadyLogic(db, key, type, 0);
+}
+
+void signalDeletedKeyAsReady(redisDb *db, robj *key, int type) {
+ signalKeyAsReadyLogic(db, key, type, 1);
+}
+
+/* Helper function for handleClientsBlockedOnKeys(). This function is called
+ * whenever a key is ready. we iterate over all the clients blocked on this key
+ * and try to re-execute the command (in case the key is still available). */
+static void handleClientsBlockedOnKey(readyList *rl) {
+
+ /* We serve clients in the same order they blocked for
+ * this key, from the first blocked to the last. */
+ dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
+
+ if (de) {
+ list *clients = dictGetVal(de);
+ listNode *ln;
+ listIter li;
+ listRewind(clients,&li);
+
+ /* Avoid processing more than the initial count so that we're not stuck
+ * in an endless loop in case the reprocessing of the command blocks again. */
+ long count = listLength(clients);
+ while ((ln = listNext(&li)) && count--) {
+ client *receiver = listNodeValue(ln);
+ robj *o = lookupKeyReadWithFlags(rl->db, rl->key, LOOKUP_NOEFFECTS);
+ /* 1. In case new key was added/touched we need to verify it satisfy the
+ * blocked type, since we might process the wrong key type.
+ * 2. We want to serve clients blocked on module keys
+ * regardless of the object type: we don't know what the
+ * module is trying to accomplish right now.
+ * 3. In case of XREADGROUP call we will want to unblock on any change in object type
+ * or in case the key was deleted, since the group is no longer valid. */
+ if ((o != NULL && (receiver->bstate.btype == getBlockedTypeByType(o->type))) ||
+ (o != NULL && (receiver->bstate.btype == BLOCKED_MODULE)) ||
+ (receiver->bstate.unblock_on_nokey))
+ {
+ if (receiver->bstate.btype != BLOCKED_MODULE)
+ unblockClientOnKey(receiver, rl->key);
+ else
+ moduleUnblockClientOnKey(receiver, rl->key);
+ }
+ }
+ }
+}
+
+/* block a client due to wait command */
+void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas) {
+ c->bstate.timeout = timeout;
+ c->bstate.reploffset = offset;
+ c->bstate.numreplicas = numreplicas;
+ listAddNodeHead(server.clients_waiting_acks,c);
+ blockClient(c,BLOCKED_WAIT);
+}
+
+/* block a client due to waitaof command */
+void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas) {
+ c->bstate.timeout = timeout;
+ c->bstate.reploffset = offset;
+ c->bstate.numreplicas = numreplicas;
+ c->bstate.numlocal = numlocal;
+ listAddNodeHead(server.clients_waiting_acks,c);
+ blockClient(c,BLOCKED_WAITAOF);
+}
+
+/* Postpone client from executing a command. For example the server might be busy
+ * requesting to avoid processing clients commands which will be processed later
+ * when the it is ready to accept them. */
+void blockPostponeClient(client *c) {
+ c->bstate.timeout = 0;
+ blockClient(c,BLOCKED_POSTPONE);
+ listAddNodeTail(server.postponed_clients, c);
+ c->postponed_list_node = listLast(server.postponed_clients);
+ /* Mark this client to execute its command */
+ c->flags |= CLIENT_PENDING_COMMAND;
+}
+
+/* Block client due to shutdown command */
+void blockClientShutdown(client *c) {
+ blockClient(c, BLOCKED_SHUTDOWN);
+}
+
+/* Unblock a client once a specific key became available for it.
+ * This function will remove the client from the list of clients blocked on this key
+ * and also remove the key from the dictionary of keys this client is blocked on.
+ * in case the client has a command pending it will process it immediately. */
+static void unblockClientOnKey(client *c, robj *key) {
+ dictEntry *de;
+
+ de = dictFind(c->bstate.keys, key);
+ releaseBlockedEntry(c, de, 1);
+
+ /* Only in case of blocking API calls, we might be blocked on several keys.
+ however we should force unblock the entire blocking keys */
+ serverAssert(c->bstate.btype == BLOCKED_STREAM ||
+ c->bstate.btype == BLOCKED_LIST ||
+ c->bstate.btype == BLOCKED_ZSET);
+
+ /* We need to unblock the client before calling processCommandAndResetClient
+ * because it checks the CLIENT_BLOCKED flag */
+ unblockClient(c, 0);
+ /* In case this client was blocked on keys during command
+ * we need to re process the command again */
+ if (c->flags & CLIENT_PENDING_COMMAND) {
+ c->flags &= ~CLIENT_PENDING_COMMAND;
+ /* We want the command processing and the unblock handler (see RM_Call 'K' option)
+ * to run atomically, this is why we must enter the execution unit here before
+ * running the command, and exit the execution unit after calling the unblock handler (if exists).
+ * Notice that we also must set the current client so it will be available
+ * when we will try to send the client side caching notification (done on 'afterCommand'). */
+ client *old_client = server.current_client;
+ server.current_client = c;
+ enterExecutionUnit(1, 0);
+ processCommandAndResetClient(c);
+ if (!(c->flags & CLIENT_BLOCKED)) {
+ if (c->flags & CLIENT_MODULE) {
+ moduleCallCommandUnblockedHandler(c);
+ } else {
+ queueClientForReprocessing(c);
+ }
+ }
+ exitExecutionUnit();
+ afterCommand(c);
+ server.current_client = old_client;
+ }
+}
+
+/* Unblock a client blocked on the specific key from module context.
+ * This function will try to serve the module call, and in case it succeeds,
+ * it will add the client to the list of module unblocked clients which will
+ * be processed in moduleHandleBlockedClients. */
+static void moduleUnblockClientOnKey(client *c, robj *key) {
+ long long prev_error_replies = server.stat_total_error_replies;
+ client *old_client = server.current_client;
+ server.current_client = c;
+ monotime replyTimer;
+ elapsedStart(&replyTimer);
+
+ if (moduleTryServeClientBlockedOnKey(c, key)) {
+ updateStatsOnUnblock(c, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies);
+ moduleUnblockClient(c);
+ }
+ /* We need to call afterCommand even if the client was not unblocked
+ * in order to propagate any changes that could have been done inside
+ * moduleTryServeClientBlockedOnKey */
+ afterCommand(c);
+ server.current_client = old_client;
+}
+
+/* Unblock a client which is currently Blocked on and provided a timeout.
+ * The implementation will first reply to the blocked client with null response
+ * or, in case of module blocked client the timeout callback will be used.
+ * In this case since we might have a command pending
+ * we want to remove the pending flag to indicate we already responded to the
+ * command with timeout reply. */
+void unblockClientOnTimeout(client *c) {
+ replyToBlockedClientTimedOut(c);
+ if (c->flags & CLIENT_PENDING_COMMAND)
+ c->flags &= ~CLIENT_PENDING_COMMAND;
+ unblockClient(c, 1);
+}
+
+/* Unblock a client which is currently Blocked with error.
+ * If err_str is provided it will be used to reply to the blocked client */
+void unblockClientOnError(client *c, const char *err_str) {
+ if (err_str)
+ addReplyError(c, err_str);
+ updateStatsOnUnblock(c, 0, 0, 1);
+ if (c->flags & CLIENT_PENDING_COMMAND)
+ c->flags &= ~CLIENT_PENDING_COMMAND;
+ unblockClient(c, 1);
+}
+
+/* sets blocking_keys to the total number of keys which has at least one client blocked on them
+ * sets blocking_keys_on_nokey to the total number of keys which has at least one client
+ * blocked on them to be written or deleted */
+void totalNumberOfBlockingKeys(unsigned long *blocking_keys, unsigned long *bloking_keys_on_nokey) {
+ unsigned long bkeys=0, bkeys_on_nokey=0;
+ for (int j = 0; j < server.dbnum; j++) {
+ bkeys += dictSize(server.db[j].blocking_keys);
+ bkeys_on_nokey += dictSize(server.db[j].blocking_keys_unblock_on_nokey);
+ }
+ if (blocking_keys)
+ *blocking_keys = bkeys;
+ if (bloking_keys_on_nokey)
+ *bloking_keys_on_nokey = bkeys_on_nokey;
+}
+
+void blockedBeforeSleep(void) {
+ /* Handle precise timeouts of blocked clients. */
+ handleBlockedClientsTimeout();
+
+ /* Unblock all the clients blocked for synchronous replication
+ * in WAIT or WAITAOF. */
+ if (listLength(server.clients_waiting_acks))
+ processClientsWaitingReplicas();
+
+ /* Try to process blocked clients every once in while.
+ *
+ * Example: A module calls RM_SignalKeyAsReady from within a timer callback
+ * (So we don't visit processCommand() at all).
+ *
+ * This may unblock clients, so must be done before processUnblockedClients */
+ handleClientsBlockedOnKeys();
+
+ /* Check if there are clients unblocked by modules that implement
+ * blocking commands. */
+ if (moduleCount())
+ moduleHandleBlockedClients();
+
+ /* Try to process pending commands for clients that were just unblocked. */
+ if (listLength(server.unblocked_clients))
+ processUnblockedClients();
+}