summaryrefslogtreecommitdiffstats
path: root/src/tracking.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 17:31:02 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 17:31:02 +0000
commitbb12c1fd00eb51118749bbbc69c5596835fcbd3b (patch)
tree88038a98bd31c1b765f3390767a2ec12e37c79ec /src/tracking.c
parentInitial commit. (diff)
downloadredis-upstream.tar.xz
redis-upstream.zip
Adding upstream version 5:7.0.15.upstream/5%7.0.15upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/tracking.c')
-rw-r--r--src/tracking.c656
1 files changed, 656 insertions, 0 deletions
diff --git a/src/tracking.c b/src/tracking.c
new file mode 100644
index 0000000..475c437
--- /dev/null
+++ b/src/tracking.c
@@ -0,0 +1,656 @@
+/* tracking.c - Client side caching: keys tracking and invalidation
+ *
+ * Copyright (c) 2019, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "server.h"
+
+/* The tracking table is constituted by a radix tree of keys, each pointing
+ * to a radix tree of client IDs, used to track the clients that may have
+ * certain keys in their local, client side, cache.
+ *
+ * When a client enables tracking with "CLIENT TRACKING on", each key served to
+ * the client is remembered in the table mapping the keys to the client IDs.
+ * Later, when a key is modified, all the clients that may have local copy
+ * of such key will receive an invalidation message.
+ *
+ * Clients will normally take frequently requested objects in memory, removing
+ * them when invalidation messages are received. */
+rax *TrackingTable = NULL;
+rax *PrefixTable = NULL;
+uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across
+ the whole tracking table. This gives
+ an hint about the total memory we
+ are using server side for CSC. */
+robj *TrackingChannelName;
+
+/* This is the structure that we have as value of the PrefixTable, and
+ * represents the list of keys modified, and the list of clients that need
+ * to be notified, for a given prefix. */
+typedef struct bcastState {
+ rax *keys; /* Keys modified in the current event loop cycle. */
+ rax *clients; /* Clients subscribed to the notification events for this
+ prefix. */
+} bcastState;
+
+/* Remove the tracking state from the client 'c'. Note that there is not much
+ * to do for us here, if not to decrement the counter of the clients in
+ * tracking mode, because we just store the ID of the client in the tracking
+ * table, so we'll remove the ID reference in a lazy way. Otherwise when a
+ * client with many entries in the table is removed, it would cost a lot of
+ * time to do the cleanup. */
+void disableTracking(client *c) {
+ /* If this client is in broadcasting mode, we need to unsubscribe it
+ * from all the prefixes it is registered to. */
+ if (c->flags & CLIENT_TRACKING_BCAST) {
+ raxIterator ri;
+ raxStart(&ri,c->client_tracking_prefixes);
+ raxSeek(&ri,"^",NULL,0);
+ while(raxNext(&ri)) {
+ bcastState *bs = raxFind(PrefixTable,ri.key,ri.key_len);
+ serverAssert(bs != raxNotFound);
+ raxRemove(bs->clients,(unsigned char*)&c,sizeof(c),NULL);
+ /* Was it the last client? Remove the prefix from the
+ * table. */
+ if (raxSize(bs->clients) == 0) {
+ raxFree(bs->clients);
+ raxFree(bs->keys);
+ zfree(bs);
+ raxRemove(PrefixTable,ri.key,ri.key_len,NULL);
+ }
+ }
+ raxStop(&ri);
+ raxFree(c->client_tracking_prefixes);
+ c->client_tracking_prefixes = NULL;
+ }
+
+ /* Clear flags and adjust the count. */
+ if (c->flags & CLIENT_TRACKING) {
+ server.tracking_clients--;
+ c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR|
+ CLIENT_TRACKING_BCAST|CLIENT_TRACKING_OPTIN|
+ CLIENT_TRACKING_OPTOUT|CLIENT_TRACKING_CACHING|
+ CLIENT_TRACKING_NOLOOP);
+ }
+}
+
+static int stringCheckPrefix(unsigned char *s1, size_t s1_len, unsigned char *s2, size_t s2_len) {
+ size_t min_length = s1_len < s2_len ? s1_len : s2_len;
+ return memcmp(s1,s2,min_length) == 0;
+}
+
+/* Check if any of the provided prefixes collide with one another or
+ * with an existing prefix for the client. A collision is defined as two
+ * prefixes that will emit an invalidation for the same key. If no prefix
+ * collision is found, 1 is return, otherwise 0 is returned and the client
+ * has an error emitted describing the error. */
+int checkPrefixCollisionsOrReply(client *c, robj **prefixes, size_t numprefix) {
+ for (size_t i = 0; i < numprefix; i++) {
+ /* Check input list has no overlap with existing prefixes. */
+ if (c->client_tracking_prefixes) {
+ raxIterator ri;
+ raxStart(&ri,c->client_tracking_prefixes);
+ raxSeek(&ri,"^",NULL,0);
+ while(raxNext(&ri)) {
+ if (stringCheckPrefix(ri.key,ri.key_len,
+ prefixes[i]->ptr,sdslen(prefixes[i]->ptr)))
+ {
+ sds collision = sdsnewlen(ri.key,ri.key_len);
+ addReplyErrorFormat(c,
+ "Prefix '%s' overlaps with an existing prefix '%s'. "
+ "Prefixes for a single client must not overlap.",
+ (unsigned char *)prefixes[i]->ptr,
+ (unsigned char *)collision);
+ sdsfree(collision);
+ raxStop(&ri);
+ return 0;
+ }
+ }
+ raxStop(&ri);
+ }
+ /* Check input has no overlap with itself. */
+ for (size_t j = i + 1; j < numprefix; j++) {
+ if (stringCheckPrefix(prefixes[i]->ptr,sdslen(prefixes[i]->ptr),
+ prefixes[j]->ptr,sdslen(prefixes[j]->ptr)))
+ {
+ addReplyErrorFormat(c,
+ "Prefix '%s' overlaps with another provided prefix '%s'. "
+ "Prefixes for a single client must not overlap.",
+ (unsigned char *)prefixes[i]->ptr,
+ (unsigned char *)prefixes[j]->ptr);
+ return i;
+ }
+ }
+ }
+ return 1;
+}
+
+/* Set the client 'c' to track the prefix 'prefix'. If the client 'c' is
+ * already registered for the specified prefix, no operation is performed. */
+void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) {
+ bcastState *bs = raxFind(PrefixTable,(unsigned char*)prefix,plen);
+ /* If this is the first client subscribing to such prefix, create
+ * the prefix in the table. */
+ if (bs == raxNotFound) {
+ bs = zmalloc(sizeof(*bs));
+ bs->keys = raxNew();
+ bs->clients = raxNew();
+ raxInsert(PrefixTable,(unsigned char*)prefix,plen,bs,NULL);
+ }
+ if (raxTryInsert(bs->clients,(unsigned char*)&c,sizeof(c),NULL,NULL)) {
+ if (c->client_tracking_prefixes == NULL)
+ c->client_tracking_prefixes = raxNew();
+ raxInsert(c->client_tracking_prefixes,
+ (unsigned char*)prefix,plen,NULL,NULL);
+ }
+}
+
+/* Enable the tracking state for the client 'c', and as a side effect allocates
+ * the tracking table if needed. If the 'redirect_to' argument is non zero, the
+ * invalidation messages for this client will be sent to the client ID
+ * specified by the 'redirect_to' argument. Note that if such client will
+ * eventually get freed, we'll send a message to the original client to
+ * inform it of the condition. Multiple clients can redirect the invalidation
+ * messages to the same client ID. */
+void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix) {
+ if (!(c->flags & CLIENT_TRACKING)) server.tracking_clients++;
+ c->flags |= CLIENT_TRACKING;
+ c->flags &= ~(CLIENT_TRACKING_BROKEN_REDIR|CLIENT_TRACKING_BCAST|
+ CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT|
+ CLIENT_TRACKING_NOLOOP);
+ c->client_tracking_redirection = redirect_to;
+
+ /* This may be the first client we ever enable. Create the tracking
+ * table if it does not exist. */
+ if (TrackingTable == NULL) {
+ TrackingTable = raxNew();
+ PrefixTable = raxNew();
+ TrackingChannelName = createStringObject("__redis__:invalidate",20);
+ }
+
+ /* For broadcasting, set the list of prefixes in the client. */
+ if (options & CLIENT_TRACKING_BCAST) {
+ c->flags |= CLIENT_TRACKING_BCAST;
+ if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0);
+ for (size_t j = 0; j < numprefix; j++) {
+ sds sdsprefix = prefix[j]->ptr;
+ enableBcastTrackingForPrefix(c,sdsprefix,sdslen(sdsprefix));
+ }
+ }
+
+ /* Set the remaining flags that don't need any special handling. */
+ c->flags |= options & (CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT|
+ CLIENT_TRACKING_NOLOOP);
+}
+
+/* This function is called after the execution of a readonly command in the
+ * case the client 'c' has keys tracking enabled and the tracking is not
+ * in BCAST mode. It will populate the tracking invalidation table according
+ * to the keys the user fetched, so that Redis will know what are the clients
+ * that should receive an invalidation message with certain groups of keys
+ * are modified. */
+void trackingRememberKeys(client *c) {
+ /* Return if we are in optin/out mode and the right CACHING command
+ * was/wasn't given in order to modify the default behavior. */
+ uint64_t optin = c->flags & CLIENT_TRACKING_OPTIN;
+ uint64_t optout = c->flags & CLIENT_TRACKING_OPTOUT;
+ uint64_t caching_given = c->flags & CLIENT_TRACKING_CACHING;
+ if ((optin && !caching_given) || (optout && caching_given)) return;
+
+ getKeysResult result = GETKEYS_RESULT_INIT;
+ int numkeys = getKeysFromCommand(c->cmd,c->argv,c->argc,&result);
+ if (!numkeys) {
+ getKeysFreeResult(&result);
+ return;
+ }
+ /* Shard channels are treated as special keys for client
+ * library to rely on `COMMAND` command to discover the node
+ * to connect to. These channels doesn't need to be tracked. */
+ if (c->cmd->flags & CMD_PUBSUB) {
+ return;
+ }
+
+ keyReference *keys = result.keys;
+
+ for(int j = 0; j < numkeys; j++) {
+ int idx = keys[j].pos;
+ sds sdskey = c->argv[idx]->ptr;
+ rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey));
+ if (ids == raxNotFound) {
+ ids = raxNew();
+ int inserted = raxTryInsert(TrackingTable,(unsigned char*)sdskey,
+ sdslen(sdskey),ids, NULL);
+ serverAssert(inserted == 1);
+ }
+ if (raxTryInsert(ids,(unsigned char*)&c->id,sizeof(c->id),NULL,NULL))
+ TrackingTableTotalItems++;
+ }
+ getKeysFreeResult(&result);
+}
+
+/* Given a key name, this function sends an invalidation message in the
+ * proper channel (depending on RESP version: PubSub or Push message) and
+ * to the proper client (in case of redirection), in the context of the
+ * client 'c' with tracking enabled.
+ *
+ * In case the 'proto' argument is non zero, the function will assume that
+ * 'keyname' points to a buffer of 'keylen' bytes already expressed in the
+ * form of Redis RESP protocol. This is used for:
+ * - In BCAST mode, to send an array of invalidated keys to all
+ * applicable clients
+ * - Following a flush command, to send a single RESP NULL to indicate
+ * that all keys are now invalid. */
+void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
+ uint64_t old_flags = c->flags;
+ c->flags |= CLIENT_PUSHING;
+
+ int using_redirection = 0;
+ if (c->client_tracking_redirection) {
+ client *redir = lookupClientByID(c->client_tracking_redirection);
+ if (!redir) {
+ c->flags |= CLIENT_TRACKING_BROKEN_REDIR;
+ /* We need to signal to the original connection that we
+ * are unable to send invalidation messages to the redirected
+ * connection, because the client no longer exist. */
+ if (c->resp > 2) {
+ addReplyPushLen(c,2);
+ addReplyBulkCBuffer(c,"tracking-redir-broken",21);
+ addReplyLongLong(c,c->client_tracking_redirection);
+ }
+ if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
+ return;
+ }
+ if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
+ c = redir;
+ using_redirection = 1;
+ old_flags = c->flags;
+ c->flags |= CLIENT_PUSHING;
+ }
+
+ /* Only send such info for clients in RESP version 3 or more. However
+ * if redirection is active, and the connection we redirect to is
+ * in Pub/Sub mode, we can support the feature with RESP 2 as well,
+ * by sending Pub/Sub messages in the __redis__:invalidate channel. */
+ if (c->resp > 2) {
+ addReplyPushLen(c,2);
+ addReplyBulkCBuffer(c,"invalidate",10);
+ } else if (using_redirection && c->flags & CLIENT_PUBSUB) {
+ /* We use a static object to speedup things, however we assume
+ * that addReplyPubsubMessage() will not take a reference. */
+ addReplyPubsubMessage(c,TrackingChannelName,NULL,shared.messagebulk);
+ } else {
+ /* If are here, the client is not using RESP3, nor is
+ * redirecting to another client. We can't send anything to
+ * it since RESP2 does not support push messages in the same
+ * connection. */
+ if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
+ return;
+ }
+
+ /* Send the "value" part, which is the array of keys. */
+ if (proto) {
+ addReplyProto(c,keyname,keylen);
+ } else {
+ addReplyArrayLen(c,1);
+ addReplyBulkCBuffer(c,keyname,keylen);
+ }
+ updateClientMemUsageAndBucket(c);
+ if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
+}
+
+/* This function is called when a key is modified in Redis and in the case
+ * we have at least one client with the BCAST mode enabled.
+ * Its goal is to set the key in the right broadcast state if the key
+ * matches one or more prefixes in the prefix table. Later when we
+ * return to the event loop, we'll send invalidation messages to the
+ * clients subscribed to each prefix. */
+void trackingRememberKeyToBroadcast(client *c, char *keyname, size_t keylen) {
+ raxIterator ri;
+ raxStart(&ri,PrefixTable);
+ raxSeek(&ri,"^",NULL,0);
+ while(raxNext(&ri)) {
+ if (ri.key_len > keylen) continue;
+ if (ri.key_len != 0 && memcmp(ri.key,keyname,ri.key_len) != 0)
+ continue;
+ bcastState *bs = ri.data;
+ /* We insert the client pointer as associated value in the radix
+ * tree. This way we know who was the client that did the last
+ * change to the key, and can avoid sending the notification in the
+ * case the client is in NOLOOP mode. */
+ raxInsert(bs->keys,(unsigned char*)keyname,keylen,c,NULL);
+ }
+ raxStop(&ri);
+}
+
+/* This function is called from signalModifiedKey() or other places in Redis
+ * when a key changes value. In the context of keys tracking, our task here is
+ * to send a notification to every client that may have keys about such caching
+ * slot.
+ *
+ * Note that 'c' may be NULL in case the operation was performed outside the
+ * context of a client modifying the database (for instance when we delete a
+ * key because of expire).
+ *
+ * The last argument 'bcast' tells the function if it should also schedule
+ * the key for broadcasting to clients in BCAST mode. This is the case when
+ * the function is called from the Redis core once a key is modified, however
+ * we also call the function in order to evict keys in the key table in case
+ * of memory pressure: in that case the key didn't really change, so we want
+ * just to notify the clients that are in the table for this key, that would
+ * otherwise miss the fact we are no longer tracking the key for them. */
+void trackingInvalidateKey(client *c, robj *keyobj, int bcast) {
+ if (TrackingTable == NULL) return;
+
+ unsigned char *key = (unsigned char*)keyobj->ptr;
+ size_t keylen = sdslen(keyobj->ptr);
+
+ if (bcast && raxSize(PrefixTable) > 0)
+ trackingRememberKeyToBroadcast(c,(char *)key,keylen);
+
+ rax *ids = raxFind(TrackingTable,key,keylen);
+ if (ids == raxNotFound) return;
+
+ raxIterator ri;
+ raxStart(&ri,ids);
+ raxSeek(&ri,"^",NULL,0);
+ while(raxNext(&ri)) {
+ uint64_t id;
+ memcpy(&id,ri.key,sizeof(id));
+ client *target = lookupClientByID(id);
+ /* Note that if the client is in BCAST mode, we don't want to
+ * send invalidation messages that were pending in the case
+ * previously the client was not in BCAST mode. This can happen if
+ * TRACKING is enabled normally, and then the client switches to
+ * BCAST mode. */
+ if (target == NULL ||
+ !(target->flags & CLIENT_TRACKING)||
+ target->flags & CLIENT_TRACKING_BCAST)
+ {
+ continue;
+ }
+
+ /* If the client enabled the NOLOOP mode, don't send notifications
+ * about keys changed by the client itself. */
+ if (target->flags & CLIENT_TRACKING_NOLOOP &&
+ target == server.current_client)
+ {
+ continue;
+ }
+
+ /* If target is current client and it's executing a command, we need schedule key invalidation.
+ * As the invalidation messages may be interleaved with command
+ * response and should after command response. */
+ if (target == server.current_client && server.fixed_time_expire) {
+ incrRefCount(keyobj);
+ listAddNodeTail(server.tracking_pending_keys, keyobj);
+ } else {
+ sendTrackingMessage(target,(char *)keyobj->ptr,sdslen(keyobj->ptr),0);
+ }
+ }
+ raxStop(&ri);
+
+ /* Free the tracking table: we'll create the radix tree and populate it
+ * again if more keys will be modified in this caching slot. */
+ TrackingTableTotalItems -= raxSize(ids);
+ raxFree(ids);
+ raxRemove(TrackingTable,(unsigned char*)key,keylen,NULL);
+}
+
+void trackingHandlePendingKeyInvalidations() {
+ if (!listLength(server.tracking_pending_keys)) return;
+
+ listNode *ln;
+ listIter li;
+
+ listRewind(server.tracking_pending_keys,&li);
+ while ((ln = listNext(&li)) != NULL) {
+ robj *key = listNodeValue(ln);
+ /* current_client maybe freed, so we need to send invalidation
+ * message only when current_client is still alive */
+ if (server.current_client != NULL) {
+ if (key != NULL) {
+ sendTrackingMessage(server.current_client,(char *)key->ptr,sdslen(key->ptr),0);
+ } else {
+ sendTrackingMessage(server.current_client,shared.null[server.current_client->resp]->ptr,
+ sdslen(shared.null[server.current_client->resp]->ptr),1);
+ }
+ }
+ if (key != NULL) decrRefCount(key);
+ }
+ listEmpty(server.tracking_pending_keys);
+}
+
+/* This function is called when one or all the Redis databases are
+ * flushed. Caching keys are not specific for each DB but are global:
+ * currently what we do is send a special notification to clients with
+ * tracking enabled, sending a RESP NULL, which means, "all the keys",
+ * in order to avoid flooding clients with many invalidation messages
+ * for all the keys they may hold.
+ */
+void freeTrackingRadixTreeCallback(void *rt) {
+ raxFree(rt);
+}
+
+void freeTrackingRadixTree(rax *rt) {
+ raxFreeWithCallback(rt,freeTrackingRadixTreeCallback);
+}
+
+/* A RESP NULL is sent to indicate that all keys are invalid */
+void trackingInvalidateKeysOnFlush(int async) {
+ if (server.tracking_clients) {
+ listNode *ln;
+ listIter li;
+ listRewind(server.clients,&li);
+ while ((ln = listNext(&li)) != NULL) {
+ client *c = listNodeValue(ln);
+ if (c->flags & CLIENT_TRACKING) {
+ if (c == server.current_client) {
+ /* We use a special NULL to indicate that we should send null */
+ listAddNodeTail(server.tracking_pending_keys,NULL);
+ } else {
+ sendTrackingMessage(c,shared.null[c->resp]->ptr,sdslen(shared.null[c->resp]->ptr),1);
+ }
+ }
+ }
+ }
+
+ /* In case of FLUSHALL, reclaim all the memory used by tracking. */
+ if (TrackingTable) {
+ if (async) {
+ freeTrackingRadixTreeAsync(TrackingTable);
+ } else {
+ freeTrackingRadixTree(TrackingTable);
+ }
+ TrackingTable = raxNew();
+ TrackingTableTotalItems = 0;
+ }
+}
+
+/* Tracking forces Redis to remember information about which client may have
+ * certain keys. In workloads where there are a lot of reads, but keys are
+ * hardly modified, the amount of information we have to remember server side
+ * could be a lot, with the number of keys being totally not bound.
+ *
+ * So Redis allows the user to configure a maximum number of keys for the
+ * invalidation table. This function makes sure that we don't go over the
+ * specified fill rate: if we are over, we can just evict information about
+ * a random key, and send invalidation messages to clients like if the key was
+ * modified. */
+void trackingLimitUsedSlots(void) {
+ static unsigned int timeout_counter = 0;
+ if (TrackingTable == NULL) return;
+ if (server.tracking_table_max_keys == 0) return; /* No limits set. */
+ size_t max_keys = server.tracking_table_max_keys;
+ if (raxSize(TrackingTable) <= max_keys) {
+ timeout_counter = 0;
+ return; /* Limit not reached. */
+ }
+
+ /* We have to invalidate a few keys to reach the limit again. The effort
+ * we do here is proportional to the number of times we entered this
+ * function and found that we are still over the limit. */
+ int effort = 100 * (timeout_counter+1);
+
+ /* We just remove one key after another by using a random walk. */
+ raxIterator ri;
+ raxStart(&ri,TrackingTable);
+ while(effort > 0) {
+ effort--;
+ raxSeek(&ri,"^",NULL,0);
+ raxRandomWalk(&ri,0);
+ if (raxEOF(&ri)) break;
+ robj *keyobj = createStringObject((char*)ri.key,ri.key_len);
+ trackingInvalidateKey(NULL,keyobj,0);
+ decrRefCount(keyobj);
+ if (raxSize(TrackingTable) <= max_keys) {
+ timeout_counter = 0;
+ raxStop(&ri);
+ return; /* Return ASAP: we are again under the limit. */
+ }
+ }
+
+ /* If we reach this point, we were not able to go under the configured
+ * limit using the maximum effort we had for this run. */
+ raxStop(&ri);
+ timeout_counter++;
+}
+
+/* Generate Redis protocol for an array containing all the key names
+ * in the 'keys' radix tree. If the client is not NULL, the list will not
+ * include keys that were modified the last time by this client, in order
+ * to implement the NOLOOP option.
+ *
+ * If the resulting array would be empty, NULL is returned instead. */
+sds trackingBuildBroadcastReply(client *c, rax *keys) {
+ raxIterator ri;
+ uint64_t count;
+
+ if (c == NULL) {
+ count = raxSize(keys);
+ } else {
+ count = 0;
+ raxStart(&ri,keys);
+ raxSeek(&ri,"^",NULL,0);
+ while(raxNext(&ri)) {
+ if (ri.data != c) count++;
+ }
+ raxStop(&ri);
+
+ if (count == 0) return NULL;
+ }
+
+ /* Create the array reply with the list of keys once, then send
+ * it to all the clients subscribed to this prefix. */
+ char buf[32];
+ size_t len = ll2string(buf,sizeof(buf),count);
+ sds proto = sdsempty();
+ proto = sdsMakeRoomFor(proto,count*15);
+ proto = sdscatlen(proto,"*",1);
+ proto = sdscatlen(proto,buf,len);
+ proto = sdscatlen(proto,"\r\n",2);
+ raxStart(&ri,keys);
+ raxSeek(&ri,"^",NULL,0);
+ while(raxNext(&ri)) {
+ if (c && ri.data == c) continue;
+ len = ll2string(buf,sizeof(buf),ri.key_len);
+ proto = sdscatlen(proto,"$",1);
+ proto = sdscatlen(proto,buf,len);
+ proto = sdscatlen(proto,"\r\n",2);
+ proto = sdscatlen(proto,ri.key,ri.key_len);
+ proto = sdscatlen(proto,"\r\n",2);
+ }
+ raxStop(&ri);
+ return proto;
+}
+
+/* This function will run the prefixes of clients in BCAST mode and
+ * keys that were modified about each prefix, and will send the
+ * notifications to each client in each prefix. */
+void trackingBroadcastInvalidationMessages(void) {
+ raxIterator ri, ri2;
+
+ /* Return ASAP if there is nothing to do here. */
+ if (TrackingTable == NULL || !server.tracking_clients) return;
+
+ raxStart(&ri,PrefixTable);
+ raxSeek(&ri,"^",NULL,0);
+
+ /* For each prefix... */
+ while(raxNext(&ri)) {
+ bcastState *bs = ri.data;
+
+ if (raxSize(bs->keys)) {
+ /* Generate the common protocol for all the clients that are
+ * not using the NOLOOP option. */
+ sds proto = trackingBuildBroadcastReply(NULL,bs->keys);
+
+ /* Send this array of keys to every client in the list. */
+ raxStart(&ri2,bs->clients);
+ raxSeek(&ri2,"^",NULL,0);
+ while(raxNext(&ri2)) {
+ client *c;
+ memcpy(&c,ri2.key,sizeof(c));
+ if (c->flags & CLIENT_TRACKING_NOLOOP) {
+ /* This client may have certain keys excluded. */
+ sds adhoc = trackingBuildBroadcastReply(c,bs->keys);
+ if (adhoc) {
+ sendTrackingMessage(c,adhoc,sdslen(adhoc),1);
+ sdsfree(adhoc);
+ }
+ } else {
+ sendTrackingMessage(c,proto,sdslen(proto),1);
+ }
+ }
+ raxStop(&ri2);
+
+ /* Clean up: we can remove everything from this state, because we
+ * want to only track the new keys that will be accumulated starting
+ * from now. */
+ sdsfree(proto);
+ }
+ raxFree(bs->keys);
+ bs->keys = raxNew();
+ }
+ raxStop(&ri);
+}
+
+/* This is just used in order to access the amount of used slots in the
+ * tracking table. */
+uint64_t trackingGetTotalItems(void) {
+ return TrackingTableTotalItems;
+}
+
+uint64_t trackingGetTotalKeys(void) {
+ if (TrackingTable == NULL) return 0;
+ return raxSize(TrackingTable);
+}
+
+uint64_t trackingGetTotalPrefixes(void) {
+ if (PrefixTable == NULL) return 0;
+ return raxSize(PrefixTable);
+}