diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-14 13:40:54 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-14 13:40:54 +0000 |
commit | 317c0644ccf108aa23ef3fd8358bd66c2840bfc0 (patch) | |
tree | c417b3d25c86b775989cb5ac042f37611b626c8a /src/tracking.c | |
parent | Initial commit. (diff) | |
download | redis-317c0644ccf108aa23ef3fd8358bd66c2840bfc0.tar.xz redis-317c0644ccf108aa23ef3fd8358bd66c2840bfc0.zip |
Adding upstream version 5:7.2.4.upstream/5%7.2.4
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/tracking.c')
-rw-r--r-- | src/tracking.c | 660 |
1 files changed, 660 insertions, 0 deletions
diff --git a/src/tracking.c b/src/tracking.c new file mode 100644 index 0000000..5a9b114 --- /dev/null +++ b/src/tracking.c @@ -0,0 +1,660 @@ +/* tracking.c - Client side caching: keys tracking and invalidation + * + * Copyright (c) 2019, Salvatore Sanfilippo <antirez at gmail dot com> + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "server.h" + +/* The tracking table is constituted by a radix tree of keys, each pointing + * to a radix tree of client IDs, used to track the clients that may have + * certain keys in their local, client side, cache. + * + * When a client enables tracking with "CLIENT TRACKING on", each key served to + * the client is remembered in the table mapping the keys to the client IDs. + * Later, when a key is modified, all the clients that may have local copy + * of such key will receive an invalidation message. + * + * Clients will normally take frequently requested objects in memory, removing + * them when invalidation messages are received. */ +rax *TrackingTable = NULL; +rax *PrefixTable = NULL; +uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across + the whole tracking table. This gives + an hint about the total memory we + are using server side for CSC. */ +robj *TrackingChannelName; + +/* This is the structure that we have as value of the PrefixTable, and + * represents the list of keys modified, and the list of clients that need + * to be notified, for a given prefix. */ +typedef struct bcastState { + rax *keys; /* Keys modified in the current event loop cycle. */ + rax *clients; /* Clients subscribed to the notification events for this + prefix. */ +} bcastState; + +/* Remove the tracking state from the client 'c'. Note that there is not much + * to do for us here, if not to decrement the counter of the clients in + * tracking mode, because we just store the ID of the client in the tracking + * table, so we'll remove the ID reference in a lazy way. Otherwise when a + * client with many entries in the table is removed, it would cost a lot of + * time to do the cleanup. */ +void disableTracking(client *c) { + /* If this client is in broadcasting mode, we need to unsubscribe it + * from all the prefixes it is registered to. */ + if (c->flags & CLIENT_TRACKING_BCAST) { + raxIterator ri; + raxStart(&ri,c->client_tracking_prefixes); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + bcastState *bs = raxFind(PrefixTable,ri.key,ri.key_len); + serverAssert(bs != raxNotFound); + raxRemove(bs->clients,(unsigned char*)&c,sizeof(c),NULL); + /* Was it the last client? Remove the prefix from the + * table. */ + if (raxSize(bs->clients) == 0) { + raxFree(bs->clients); + raxFree(bs->keys); + zfree(bs); + raxRemove(PrefixTable,ri.key,ri.key_len,NULL); + } + } + raxStop(&ri); + raxFree(c->client_tracking_prefixes); + c->client_tracking_prefixes = NULL; + } + + /* Clear flags and adjust the count. */ + if (c->flags & CLIENT_TRACKING) { + server.tracking_clients--; + c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR| + CLIENT_TRACKING_BCAST|CLIENT_TRACKING_OPTIN| + CLIENT_TRACKING_OPTOUT|CLIENT_TRACKING_CACHING| + CLIENT_TRACKING_NOLOOP); + } +} + +static int stringCheckPrefix(unsigned char *s1, size_t s1_len, unsigned char *s2, size_t s2_len) { + size_t min_length = s1_len < s2_len ? s1_len : s2_len; + return memcmp(s1,s2,min_length) == 0; +} + +/* Check if any of the provided prefixes collide with one another or + * with an existing prefix for the client. A collision is defined as two + * prefixes that will emit an invalidation for the same key. If no prefix + * collision is found, 1 is return, otherwise 0 is returned and the client + * has an error emitted describing the error. */ +int checkPrefixCollisionsOrReply(client *c, robj **prefixes, size_t numprefix) { + for (size_t i = 0; i < numprefix; i++) { + /* Check input list has no overlap with existing prefixes. */ + if (c->client_tracking_prefixes) { + raxIterator ri; + raxStart(&ri,c->client_tracking_prefixes); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + if (stringCheckPrefix(ri.key,ri.key_len, + prefixes[i]->ptr,sdslen(prefixes[i]->ptr))) + { + sds collision = sdsnewlen(ri.key,ri.key_len); + addReplyErrorFormat(c, + "Prefix '%s' overlaps with an existing prefix '%s'. " + "Prefixes for a single client must not overlap.", + (unsigned char *)prefixes[i]->ptr, + (unsigned char *)collision); + sdsfree(collision); + raxStop(&ri); + return 0; + } + } + raxStop(&ri); + } + /* Check input has no overlap with itself. */ + for (size_t j = i + 1; j < numprefix; j++) { + if (stringCheckPrefix(prefixes[i]->ptr,sdslen(prefixes[i]->ptr), + prefixes[j]->ptr,sdslen(prefixes[j]->ptr))) + { + addReplyErrorFormat(c, + "Prefix '%s' overlaps with another provided prefix '%s'. " + "Prefixes for a single client must not overlap.", + (unsigned char *)prefixes[i]->ptr, + (unsigned char *)prefixes[j]->ptr); + return i; + } + } + } + return 1; +} + +/* Set the client 'c' to track the prefix 'prefix'. If the client 'c' is + * already registered for the specified prefix, no operation is performed. */ +void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) { + bcastState *bs = raxFind(PrefixTable,(unsigned char*)prefix,plen); + /* If this is the first client subscribing to such prefix, create + * the prefix in the table. */ + if (bs == raxNotFound) { + bs = zmalloc(sizeof(*bs)); + bs->keys = raxNew(); + bs->clients = raxNew(); + raxInsert(PrefixTable,(unsigned char*)prefix,plen,bs,NULL); + } + if (raxTryInsert(bs->clients,(unsigned char*)&c,sizeof(c),NULL,NULL)) { + if (c->client_tracking_prefixes == NULL) + c->client_tracking_prefixes = raxNew(); + raxInsert(c->client_tracking_prefixes, + (unsigned char*)prefix,plen,NULL,NULL); + } +} + +/* Enable the tracking state for the client 'c', and as a side effect allocates + * the tracking table if needed. If the 'redirect_to' argument is non zero, the + * invalidation messages for this client will be sent to the client ID + * specified by the 'redirect_to' argument. Note that if such client will + * eventually get freed, we'll send a message to the original client to + * inform it of the condition. Multiple clients can redirect the invalidation + * messages to the same client ID. */ +void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix) { + if (!(c->flags & CLIENT_TRACKING)) server.tracking_clients++; + c->flags |= CLIENT_TRACKING; + c->flags &= ~(CLIENT_TRACKING_BROKEN_REDIR|CLIENT_TRACKING_BCAST| + CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT| + CLIENT_TRACKING_NOLOOP); + c->client_tracking_redirection = redirect_to; + + /* This may be the first client we ever enable. Create the tracking + * table if it does not exist. */ + if (TrackingTable == NULL) { + TrackingTable = raxNew(); + PrefixTable = raxNew(); + TrackingChannelName = createStringObject("__redis__:invalidate",20); + } + + /* For broadcasting, set the list of prefixes in the client. */ + if (options & CLIENT_TRACKING_BCAST) { + c->flags |= CLIENT_TRACKING_BCAST; + if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0); + for (size_t j = 0; j < numprefix; j++) { + sds sdsprefix = prefix[j]->ptr; + enableBcastTrackingForPrefix(c,sdsprefix,sdslen(sdsprefix)); + } + } + + /* Set the remaining flags that don't need any special handling. */ + c->flags |= options & (CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT| + CLIENT_TRACKING_NOLOOP); +} + +/* This function is called after the execution of a readonly command in the + * case the client 'c' has keys tracking enabled and the tracking is not + * in BCAST mode. It will populate the tracking invalidation table according + * to the keys the user fetched, so that Redis will know what are the clients + * that should receive an invalidation message with certain groups of keys + * are modified. */ +void trackingRememberKeys(client *tracking, client *executing) { + /* Return if we are in optin/out mode and the right CACHING command + * was/wasn't given in order to modify the default behavior. */ + uint64_t optin = tracking->flags & CLIENT_TRACKING_OPTIN; + uint64_t optout = tracking->flags & CLIENT_TRACKING_OPTOUT; + uint64_t caching_given = tracking->flags & CLIENT_TRACKING_CACHING; + if ((optin && !caching_given) || (optout && caching_given)) return; + + getKeysResult result = GETKEYS_RESULT_INIT; + int numkeys = getKeysFromCommand(executing->cmd,executing->argv,executing->argc,&result); + if (!numkeys) { + getKeysFreeResult(&result); + return; + } + /* Shard channels are treated as special keys for client + * library to rely on `COMMAND` command to discover the node + * to connect to. These channels doesn't need to be tracked. */ + if (executing->cmd->flags & CMD_PUBSUB) { + return; + } + + keyReference *keys = result.keys; + + for(int j = 0; j < numkeys; j++) { + int idx = keys[j].pos; + sds sdskey = executing->argv[idx]->ptr; + rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey)); + if (ids == raxNotFound) { + ids = raxNew(); + int inserted = raxTryInsert(TrackingTable,(unsigned char*)sdskey, + sdslen(sdskey),ids, NULL); + serverAssert(inserted == 1); + } + if (raxTryInsert(ids,(unsigned char*)&tracking->id,sizeof(tracking->id),NULL,NULL)) + TrackingTableTotalItems++; + } + getKeysFreeResult(&result); +} + +/* Given a key name, this function sends an invalidation message in the + * proper channel (depending on RESP version: PubSub or Push message) and + * to the proper client (in case of redirection), in the context of the + * client 'c' with tracking enabled. + * + * In case the 'proto' argument is non zero, the function will assume that + * 'keyname' points to a buffer of 'keylen' bytes already expressed in the + * form of Redis RESP protocol. This is used for: + * - In BCAST mode, to send an array of invalidated keys to all + * applicable clients + * - Following a flush command, to send a single RESP NULL to indicate + * that all keys are now invalid. */ +void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) { + uint64_t old_flags = c->flags; + c->flags |= CLIENT_PUSHING; + + int using_redirection = 0; + if (c->client_tracking_redirection) { + client *redir = lookupClientByID(c->client_tracking_redirection); + if (!redir) { + c->flags |= CLIENT_TRACKING_BROKEN_REDIR; + /* We need to signal to the original connection that we + * are unable to send invalidation messages to the redirected + * connection, because the client no longer exist. */ + if (c->resp > 2) { + addReplyPushLen(c,2); + addReplyBulkCBuffer(c,"tracking-redir-broken",21); + addReplyLongLong(c,c->client_tracking_redirection); + } + if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING; + return; + } + if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING; + c = redir; + using_redirection = 1; + old_flags = c->flags; + c->flags |= CLIENT_PUSHING; + } + + /* Only send such info for clients in RESP version 3 or more. However + * if redirection is active, and the connection we redirect to is + * in Pub/Sub mode, we can support the feature with RESP 2 as well, + * by sending Pub/Sub messages in the __redis__:invalidate channel. */ + if (c->resp > 2) { + addReplyPushLen(c,2); + addReplyBulkCBuffer(c,"invalidate",10); + } else if (using_redirection && c->flags & CLIENT_PUBSUB) { + /* We use a static object to speedup things, however we assume + * that addReplyPubsubMessage() will not take a reference. */ + addReplyPubsubMessage(c,TrackingChannelName,NULL,shared.messagebulk); + } else { + /* If are here, the client is not using RESP3, nor is + * redirecting to another client. We can't send anything to + * it since RESP2 does not support push messages in the same + * connection. */ + if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING; + return; + } + + /* Send the "value" part, which is the array of keys. */ + if (proto) { + addReplyProto(c,keyname,keylen); + } else { + addReplyArrayLen(c,1); + addReplyBulkCBuffer(c,keyname,keylen); + } + updateClientMemUsageAndBucket(c); + if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING; +} + +/* This function is called when a key is modified in Redis and in the case + * we have at least one client with the BCAST mode enabled. + * Its goal is to set the key in the right broadcast state if the key + * matches one or more prefixes in the prefix table. Later when we + * return to the event loop, we'll send invalidation messages to the + * clients subscribed to each prefix. */ +void trackingRememberKeyToBroadcast(client *c, char *keyname, size_t keylen) { + raxIterator ri; + raxStart(&ri,PrefixTable); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + if (ri.key_len > keylen) continue; + if (ri.key_len != 0 && memcmp(ri.key,keyname,ri.key_len) != 0) + continue; + bcastState *bs = ri.data; + /* We insert the client pointer as associated value in the radix + * tree. This way we know who was the client that did the last + * change to the key, and can avoid sending the notification in the + * case the client is in NOLOOP mode. */ + raxInsert(bs->keys,(unsigned char*)keyname,keylen,c,NULL); + } + raxStop(&ri); +} + +/* This function is called from signalModifiedKey() or other places in Redis + * when a key changes value. In the context of keys tracking, our task here is + * to send a notification to every client that may have keys about such caching + * slot. + * + * Note that 'c' may be NULL in case the operation was performed outside the + * context of a client modifying the database (for instance when we delete a + * key because of expire). + * + * The last argument 'bcast' tells the function if it should also schedule + * the key for broadcasting to clients in BCAST mode. This is the case when + * the function is called from the Redis core once a key is modified, however + * we also call the function in order to evict keys in the key table in case + * of memory pressure: in that case the key didn't really change, so we want + * just to notify the clients that are in the table for this key, that would + * otherwise miss the fact we are no longer tracking the key for them. */ +void trackingInvalidateKey(client *c, robj *keyobj, int bcast) { + if (TrackingTable == NULL) return; + + unsigned char *key = (unsigned char*)keyobj->ptr; + size_t keylen = sdslen(keyobj->ptr); + + if (bcast && raxSize(PrefixTable) > 0) + trackingRememberKeyToBroadcast(c,(char *)key,keylen); + + rax *ids = raxFind(TrackingTable,key,keylen); + if (ids == raxNotFound) return; + + raxIterator ri; + raxStart(&ri,ids); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + uint64_t id; + memcpy(&id,ri.key,sizeof(id)); + client *target = lookupClientByID(id); + /* Note that if the client is in BCAST mode, we don't want to + * send invalidation messages that were pending in the case + * previously the client was not in BCAST mode. This can happen if + * TRACKING is enabled normally, and then the client switches to + * BCAST mode. */ + if (target == NULL || + !(target->flags & CLIENT_TRACKING)|| + target->flags & CLIENT_TRACKING_BCAST) + { + continue; + } + + /* If the client enabled the NOLOOP mode, don't send notifications + * about keys changed by the client itself. */ + if (target->flags & CLIENT_TRACKING_NOLOOP && + target == server.current_client) + { + continue; + } + + /* If target is current client and it's executing a command, we need schedule key invalidation. + * As the invalidation messages may be interleaved with command + * response and should after command response. */ + if (target == server.current_client && (server.current_client->flags & CLIENT_EXECUTING_COMMAND)) { + incrRefCount(keyobj); + listAddNodeTail(server.tracking_pending_keys, keyobj); + } else { + sendTrackingMessage(target,(char *)keyobj->ptr,sdslen(keyobj->ptr),0); + } + } + raxStop(&ri); + + /* Free the tracking table: we'll create the radix tree and populate it + * again if more keys will be modified in this caching slot. */ + TrackingTableTotalItems -= raxSize(ids); + raxFree(ids); + raxRemove(TrackingTable,(unsigned char*)key,keylen,NULL); +} + +void trackingHandlePendingKeyInvalidations(void) { + if (!listLength(server.tracking_pending_keys)) return; + + /* Flush pending invalidation messages only when we are not in nested call. + * So the messages are not interleaved with transaction response. */ + if (server.execution_nesting) return; + + listNode *ln; + listIter li; + + listRewind(server.tracking_pending_keys,&li); + while ((ln = listNext(&li)) != NULL) { + robj *key = listNodeValue(ln); + /* current_client maybe freed, so we need to send invalidation + * message only when current_client is still alive */ + if (server.current_client != NULL) { + if (key != NULL) { + sendTrackingMessage(server.current_client,(char *)key->ptr,sdslen(key->ptr),0); + } else { + sendTrackingMessage(server.current_client,shared.null[server.current_client->resp]->ptr, + sdslen(shared.null[server.current_client->resp]->ptr),1); + } + } + if (key != NULL) decrRefCount(key); + } + listEmpty(server.tracking_pending_keys); +} + +/* This function is called when one or all the Redis databases are + * flushed. Caching keys are not specific for each DB but are global: + * currently what we do is send a special notification to clients with + * tracking enabled, sending a RESP NULL, which means, "all the keys", + * in order to avoid flooding clients with many invalidation messages + * for all the keys they may hold. + */ +void freeTrackingRadixTreeCallback(void *rt) { + raxFree(rt); +} + +void freeTrackingRadixTree(rax *rt) { + raxFreeWithCallback(rt,freeTrackingRadixTreeCallback); +} + +/* A RESP NULL is sent to indicate that all keys are invalid */ +void trackingInvalidateKeysOnFlush(int async) { + if (server.tracking_clients) { + listNode *ln; + listIter li; + listRewind(server.clients,&li); + while ((ln = listNext(&li)) != NULL) { + client *c = listNodeValue(ln); + if (c->flags & CLIENT_TRACKING) { + if (c == server.current_client) { + /* We use a special NULL to indicate that we should send null */ + listAddNodeTail(server.tracking_pending_keys,NULL); + } else { + sendTrackingMessage(c,shared.null[c->resp]->ptr,sdslen(shared.null[c->resp]->ptr),1); + } + } + } + } + + /* In case of FLUSHALL, reclaim all the memory used by tracking. */ + if (TrackingTable) { + if (async) { + freeTrackingRadixTreeAsync(TrackingTable); + } else { + freeTrackingRadixTree(TrackingTable); + } + TrackingTable = raxNew(); + TrackingTableTotalItems = 0; + } +} + +/* Tracking forces Redis to remember information about which client may have + * certain keys. In workloads where there are a lot of reads, but keys are + * hardly modified, the amount of information we have to remember server side + * could be a lot, with the number of keys being totally not bound. + * + * So Redis allows the user to configure a maximum number of keys for the + * invalidation table. This function makes sure that we don't go over the + * specified fill rate: if we are over, we can just evict information about + * a random key, and send invalidation messages to clients like if the key was + * modified. */ +void trackingLimitUsedSlots(void) { + static unsigned int timeout_counter = 0; + if (TrackingTable == NULL) return; + if (server.tracking_table_max_keys == 0) return; /* No limits set. */ + size_t max_keys = server.tracking_table_max_keys; + if (raxSize(TrackingTable) <= max_keys) { + timeout_counter = 0; + return; /* Limit not reached. */ + } + + /* We have to invalidate a few keys to reach the limit again. The effort + * we do here is proportional to the number of times we entered this + * function and found that we are still over the limit. */ + int effort = 100 * (timeout_counter+1); + + /* We just remove one key after another by using a random walk. */ + raxIterator ri; + raxStart(&ri,TrackingTable); + while(effort > 0) { + effort--; + raxSeek(&ri,"^",NULL,0); + raxRandomWalk(&ri,0); + if (raxEOF(&ri)) break; + robj *keyobj = createStringObject((char*)ri.key,ri.key_len); + trackingInvalidateKey(NULL,keyobj,0); + decrRefCount(keyobj); + if (raxSize(TrackingTable) <= max_keys) { + timeout_counter = 0; + raxStop(&ri); + return; /* Return ASAP: we are again under the limit. */ + } + } + + /* If we reach this point, we were not able to go under the configured + * limit using the maximum effort we had for this run. */ + raxStop(&ri); + timeout_counter++; +} + +/* Generate Redis protocol for an array containing all the key names + * in the 'keys' radix tree. If the client is not NULL, the list will not + * include keys that were modified the last time by this client, in order + * to implement the NOLOOP option. + * + * If the resulting array would be empty, NULL is returned instead. */ +sds trackingBuildBroadcastReply(client *c, rax *keys) { + raxIterator ri; + uint64_t count; + + if (c == NULL) { + count = raxSize(keys); + } else { + count = 0; + raxStart(&ri,keys); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + if (ri.data != c) count++; + } + raxStop(&ri); + + if (count == 0) return NULL; + } + + /* Create the array reply with the list of keys once, then send + * it to all the clients subscribed to this prefix. */ + char buf[32]; + size_t len = ll2string(buf,sizeof(buf),count); + sds proto = sdsempty(); + proto = sdsMakeRoomFor(proto,count*15); + proto = sdscatlen(proto,"*",1); + proto = sdscatlen(proto,buf,len); + proto = sdscatlen(proto,"\r\n",2); + raxStart(&ri,keys); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + if (c && ri.data == c) continue; + len = ll2string(buf,sizeof(buf),ri.key_len); + proto = sdscatlen(proto,"$",1); + proto = sdscatlen(proto,buf,len); + proto = sdscatlen(proto,"\r\n",2); + proto = sdscatlen(proto,ri.key,ri.key_len); + proto = sdscatlen(proto,"\r\n",2); + } + raxStop(&ri); + return proto; +} + +/* This function will run the prefixes of clients in BCAST mode and + * keys that were modified about each prefix, and will send the + * notifications to each client in each prefix. */ +void trackingBroadcastInvalidationMessages(void) { + raxIterator ri, ri2; + + /* Return ASAP if there is nothing to do here. */ + if (TrackingTable == NULL || !server.tracking_clients) return; + + raxStart(&ri,PrefixTable); + raxSeek(&ri,"^",NULL,0); + + /* For each prefix... */ + while(raxNext(&ri)) { + bcastState *bs = ri.data; + + if (raxSize(bs->keys)) { + /* Generate the common protocol for all the clients that are + * not using the NOLOOP option. */ + sds proto = trackingBuildBroadcastReply(NULL,bs->keys); + + /* Send this array of keys to every client in the list. */ + raxStart(&ri2,bs->clients); + raxSeek(&ri2,"^",NULL,0); + while(raxNext(&ri2)) { + client *c; + memcpy(&c,ri2.key,sizeof(c)); + if (c->flags & CLIENT_TRACKING_NOLOOP) { + /* This client may have certain keys excluded. */ + sds adhoc = trackingBuildBroadcastReply(c,bs->keys); + if (adhoc) { + sendTrackingMessage(c,adhoc,sdslen(adhoc),1); + sdsfree(adhoc); + } + } else { + sendTrackingMessage(c,proto,sdslen(proto),1); + } + } + raxStop(&ri2); + + /* Clean up: we can remove everything from this state, because we + * want to only track the new keys that will be accumulated starting + * from now. */ + sdsfree(proto); + } + raxFree(bs->keys); + bs->keys = raxNew(); + } + raxStop(&ri); +} + +/* This is just used in order to access the amount of used slots in the + * tracking table. */ +uint64_t trackingGetTotalItems(void) { + return TrackingTableTotalItems; +} + +uint64_t trackingGetTotalKeys(void) { + if (TrackingTable == NULL) return 0; + return raxSize(TrackingTable); +} + +uint64_t trackingGetTotalPrefixes(void) { + if (PrefixTable == NULL) return 0; + return raxSize(PrefixTable); +} |