/* * Copyright (c) 2009-2012, Salvatore Sanfilippo * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * * Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * Neither the name of Redis nor the names of its contributors may be used * to endorse or promote products derived from this software without * specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include "server.h" #include "cluster.h" /* Structure to hold the pubsub related metadata. Currently used * for pubsub and pubsubshard feature. */ typedef struct pubsubtype { int shard; dict *(*clientPubSubChannels)(client*); int (*subscriptionCount)(client*); dict **serverPubSubChannels; robj **subscribeMsg; robj **unsubscribeMsg; robj **messageBulk; }pubsubtype; /* * Get client's global Pub/Sub channels subscription count. */ int clientSubscriptionsCount(client *c); /* * Get client's shard level Pub/Sub channels subscription count. */ int clientShardSubscriptionsCount(client *c); /* * Get client's global Pub/Sub channels dict. */ dict* getClientPubSubChannels(client *c); /* * Get client's shard level Pub/Sub channels dict. */ dict* getClientPubSubShardChannels(client *c); /* * Get list of channels client is subscribed to. * If a pattern is provided, the subset of channels is returned * matching the pattern. */ void channelList(client *c, sds pat, dict* pubsub_channels); /* * Pub/Sub type for global channels. */ pubsubtype pubSubType = { .shard = 0, .clientPubSubChannels = getClientPubSubChannels, .subscriptionCount = clientSubscriptionsCount, .serverPubSubChannels = &server.pubsub_channels, .subscribeMsg = &shared.subscribebulk, .unsubscribeMsg = &shared.unsubscribebulk, .messageBulk = &shared.messagebulk, }; /* * Pub/Sub type for shard level channels bounded to a slot. */ pubsubtype pubSubShardType = { .shard = 1, .clientPubSubChannels = getClientPubSubShardChannels, .subscriptionCount = clientShardSubscriptionsCount, .serverPubSubChannels = &server.pubsubshard_channels, .subscribeMsg = &shared.ssubscribebulk, .unsubscribeMsg = &shared.sunsubscribebulk, .messageBulk = &shared.smessagebulk, }; /*----------------------------------------------------------------------------- * Pubsub client replies API *----------------------------------------------------------------------------*/ /* Send a pubsub message of type "message" to the client. * Normally 'msg' is a Redis object containing the string to send as * message. However if the caller sets 'msg' as NULL, it will be able * to send a special message (for instance an Array type) by using the * addReply*() API family. */ void addReplyPubsubMessage(client *c, robj *channel, robj *msg, robj *message_bulk) { uint64_t old_flags = c->flags; c->flags |= CLIENT_PUSHING; if (c->resp == 2) addReply(c,shared.mbulkhdr[3]); else addReplyPushLen(c,3); addReply(c,message_bulk); addReplyBulk(c,channel); if (msg) addReplyBulk(c,msg); if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING; } /* Send a pubsub message of type "pmessage" to the client. The difference * with the "message" type delivered by addReplyPubsubMessage() is that * this message format also includes the pattern that matched the message. */ void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) { uint64_t old_flags = c->flags; c->flags |= CLIENT_PUSHING; if (c->resp == 2) addReply(c,shared.mbulkhdr[4]); else addReplyPushLen(c,4); addReply(c,shared.pmessagebulk); addReplyBulk(c,pat); addReplyBulk(c,channel); addReplyBulk(c,msg); if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING; } /* Send the pubsub subscription notification to the client. */ void addReplyPubsubSubscribed(client *c, robj *channel, pubsubtype type) { uint64_t old_flags = c->flags; c->flags |= CLIENT_PUSHING; if (c->resp == 2) addReply(c,shared.mbulkhdr[3]); else addReplyPushLen(c,3); addReply(c,*type.subscribeMsg); addReplyBulk(c,channel); addReplyLongLong(c,type.subscriptionCount(c)); if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING; } /* Send the pubsub unsubscription notification to the client. * Channel can be NULL: this is useful when the client sends a mass * unsubscribe command but there are no channels to unsubscribe from: we * still send a notification. */ void addReplyPubsubUnsubscribed(client *c, robj *channel, pubsubtype type) { uint64_t old_flags = c->flags; c->flags |= CLIENT_PUSHING; if (c->resp == 2) addReply(c,shared.mbulkhdr[3]); else addReplyPushLen(c,3); addReply(c, *type.unsubscribeMsg); if (channel) addReplyBulk(c,channel); else addReplyNull(c); addReplyLongLong(c,type.subscriptionCount(c)); if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING; } /* Send the pubsub pattern subscription notification to the client. */ void addReplyPubsubPatSubscribed(client *c, robj *pattern) { uint64_t old_flags = c->flags; c->flags |= CLIENT_PUSHING; if (c->resp == 2) addReply(c,shared.mbulkhdr[3]); else addReplyPushLen(c,3); addReply(c,shared.psubscribebulk); addReplyBulk(c,pattern); addReplyLongLong(c,clientSubscriptionsCount(c)); if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING; } /* Send the pubsub pattern unsubscription notification to the client. * Pattern can be NULL: this is useful when the client sends a mass * punsubscribe command but there are no pattern to unsubscribe from: we * still send a notification. */ void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) { uint64_t old_flags = c->flags; c->flags |= CLIENT_PUSHING; if (c->resp == 2) addReply(c,shared.mbulkhdr[3]); else addReplyPushLen(c,3); addReply(c,shared.punsubscribebulk); if (pattern) addReplyBulk(c,pattern); else addReplyNull(c); addReplyLongLong(c,clientSubscriptionsCount(c)); if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING; } /*----------------------------------------------------------------------------- * Pubsub low level API *----------------------------------------------------------------------------*/ /* Return the number of pubsub channels + patterns is handled. */ int serverPubsubSubscriptionCount(void) { return dictSize(server.pubsub_channels) + dictSize(server.pubsub_patterns); } /* Return the number of pubsub shard level channels is handled. */ int serverPubsubShardSubscriptionCount(void) { return dictSize(server.pubsubshard_channels); } /* Return the number of channels + patterns a client is subscribed to. */ int clientSubscriptionsCount(client *c) { return dictSize(c->pubsub_channels) + dictSize(c->pubsub_patterns); } /* Return the number of shard level channels a client is subscribed to. */ int clientShardSubscriptionsCount(client *c) { return dictSize(c->pubsubshard_channels); } dict* getClientPubSubChannels(client *c) { return c->pubsub_channels; } dict* getClientPubSubShardChannels(client *c) { return c->pubsubshard_channels; } /* Return the number of pubsub + pubsub shard level channels * a client is subscribed to. */ int clientTotalPubSubSubscriptionCount(client *c) { return clientSubscriptionsCount(c) + clientShardSubscriptionsCount(c); } /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or * 0 if the client was already subscribed to that channel. */ int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) { dictEntry *de; list *clients = NULL; int retval = 0; /* Add the channel to the client -> channels hash table */ if (dictAdd(type.clientPubSubChannels(c),channel,NULL) == DICT_OK) { retval = 1; incrRefCount(channel); /* Add the client to the channel -> list of clients hash table */ de = dictFind(*type.serverPubSubChannels, channel); if (de == NULL) { clients = listCreate(); dictAdd(*type.serverPubSubChannels, channel, clients); incrRefCount(channel); } else { clients = dictGetVal(de); } listAddNodeTail(clients,c); } /* Notify the client */ addReplyPubsubSubscribed(c,channel,type); return retval; } /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or * 0 if the client was not subscribed to the specified channel. */ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify, pubsubtype type) { dictEntry *de; list *clients; listNode *ln; int retval = 0; /* Remove the channel from the client -> channels hash table */ incrRefCount(channel); /* channel may be just a pointer to the same object we have in the hash tables. Protect it... */ if (dictDelete(type.clientPubSubChannels(c),channel) == DICT_OK) { retval = 1; /* Remove the client from the channel -> clients list hash table */ de = dictFind(*type.serverPubSubChannels, channel); serverAssertWithInfo(c,NULL,de != NULL); clients = dictGetVal(de); ln = listSearchKey(clients,c); serverAssertWithInfo(c,NULL,ln != NULL); listDelNode(clients,ln); if (listLength(clients) == 0) { /* Free the list and associated hash entry at all if this was * the latest client, so that it will be possible to abuse * Redis PUBSUB creating millions of channels. */ dictDelete(*type.serverPubSubChannels, channel); /* As this channel isn't subscribed by anyone, it's safe * to remove the channel from the slot. */ if (server.cluster_enabled & type.shard) { slotToChannelDel(channel->ptr); } } } /* Notify the client */ if (notify) { addReplyPubsubUnsubscribed(c,channel,type); } decrRefCount(channel); /* it is finally safe to release it */ return retval; } void pubsubShardUnsubscribeAllClients(robj *channel) { int retval; dictEntry *de = dictFind(server.pubsubshard_channels, channel); serverAssertWithInfo(NULL,channel,de != NULL); list *clients = dictGetVal(de); if (listLength(clients) > 0) { /* For each client subscribed to the channel, unsubscribe it. */ listIter li; listNode *ln; listRewind(clients, &li); while ((ln = listNext(&li)) != NULL) { client *c = listNodeValue(ln); retval = dictDelete(c->pubsubshard_channels, channel); serverAssertWithInfo(c,channel,retval == DICT_OK); addReplyPubsubUnsubscribed(c, channel, pubSubShardType); /* If the client has no other pubsub subscription, * move out of pubsub mode. */ if (clientTotalPubSubSubscriptionCount(c) == 0) { c->flags &= ~CLIENT_PUBSUB; } } } /* Delete the channel from server pubsubshard channels hash table. */ retval = dictDelete(server.pubsubshard_channels, channel); /* Delete the channel from slots_to_channel mapping. */ slotToChannelDel(channel->ptr); serverAssertWithInfo(NULL,channel,retval == DICT_OK); decrRefCount(channel); /* it is finally safe to release it */ } /* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */ int pubsubSubscribePattern(client *c, robj *pattern) { dictEntry *de; list *clients; int retval = 0; if (dictAdd(c->pubsub_patterns, pattern, NULL) == DICT_OK) { retval = 1; incrRefCount(pattern); /* Add the client to the pattern -> list of clients hash table */ de = dictFind(server.pubsub_patterns,pattern); if (de == NULL) { clients = listCreate(); dictAdd(server.pubsub_patterns,pattern,clients); incrRefCount(pattern); } else { clients = dictGetVal(de); } listAddNodeTail(clients,c); } /* Notify the client */ addReplyPubsubPatSubscribed(c,pattern); return retval; } /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or * 0 if the client was not subscribed to the specified channel. */ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) { dictEntry *de; list *clients; listNode *ln; int retval = 0; incrRefCount(pattern); /* Protect the object. May be the same we remove */ if (dictDelete(c->pubsub_patterns, pattern) == DICT_OK) { retval = 1; /* Remove the client from the pattern -> clients list hash table */ de = dictFind(server.pubsub_patterns,pattern); serverAssertWithInfo(c,NULL,de != NULL); clients = dictGetVal(de); ln = listSearchKey(clients,c); serverAssertWithInfo(c,NULL,ln != NULL); listDelNode(clients,ln); if (listLength(clients) == 0) { /* Free the list and associated hash entry at all if this was * the latest client. */ dictDelete(server.pubsub_patterns,pattern); } } /* Notify the client */ if (notify) addReplyPubsubPatUnsubscribed(c,pattern); decrRefCount(pattern); return retval; } /* Unsubscribe from all the channels. Return the number of channels the * client was subscribed to. */ int pubsubUnsubscribeAllChannelsInternal(client *c, int notify, pubsubtype type) { int count = 0; if (dictSize(type.clientPubSubChannels(c)) > 0) { dictIterator *di = dictGetSafeIterator(type.clientPubSubChannels(c)); dictEntry *de; while((de = dictNext(di)) != NULL) { robj *channel = dictGetKey(de); count += pubsubUnsubscribeChannel(c,channel,notify,type); } dictReleaseIterator(di); } /* We were subscribed to nothing? Still reply to the client. */ if (notify && count == 0) { addReplyPubsubUnsubscribed(c,NULL,type); } return count; } /* * Unsubscribe a client from all global channels. */ int pubsubUnsubscribeAllChannels(client *c, int notify) { int count = pubsubUnsubscribeAllChannelsInternal(c,notify,pubSubType); return count; } /* * Unsubscribe a client from all shard subscribed channels. */ int pubsubUnsubscribeShardAllChannels(client *c, int notify) { int count = pubsubUnsubscribeAllChannelsInternal(c, notify, pubSubShardType); return count; } /* * Unsubscribe a client from provided shard subscribed channel(s). */ void pubsubUnsubscribeShardChannels(robj **channels, unsigned int count) { for (unsigned int j = 0; j < count; j++) { /* Remove the channel from server and from the clients * subscribed to it as well as notify them. */ pubsubShardUnsubscribeAllClients(channels[j]); } } /* Unsubscribe from all the patterns. Return the number of patterns the * client was subscribed from. */ int pubsubUnsubscribeAllPatterns(client *c, int notify) { int count = 0; if (dictSize(c->pubsub_patterns) > 0) { dictIterator *di = dictGetSafeIterator(c->pubsub_patterns); dictEntry *de; while ((de = dictNext(di)) != NULL) { robj *pattern = dictGetKey(de); count += pubsubUnsubscribePattern(c, pattern, notify); } dictReleaseIterator(di); } /* We were subscribed to nothing? Still reply to the client. */ if (notify && count == 0) addReplyPubsubPatUnsubscribed(c,NULL); return count; } /* * Publish a message to all the subscribers. */ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) { int receivers = 0; dictEntry *de; dictIterator *di; listNode *ln; listIter li; /* Send to clients listening for that channel */ de = dictFind(*type.serverPubSubChannels, channel); if (de) { list *list = dictGetVal(de); listNode *ln; listIter li; listRewind(list,&li); while ((ln = listNext(&li)) != NULL) { client *c = ln->value; addReplyPubsubMessage(c,channel,message,*type.messageBulk); updateClientMemUsageAndBucket(c); receivers++; } } if (type.shard) { /* Shard pubsub ignores patterns. */ return receivers; } /* Send to clients listening to matching channels */ di = dictGetIterator(server.pubsub_patterns); if (di) { channel = getDecodedObject(channel); while((de = dictNext(di)) != NULL) { robj *pattern = dictGetKey(de); list *clients = dictGetVal(de); if (!stringmatchlen((char*)pattern->ptr, sdslen(pattern->ptr), (char*)channel->ptr, sdslen(channel->ptr),0)) continue; listRewind(clients,&li); while ((ln = listNext(&li)) != NULL) { client *c = listNodeValue(ln); addReplyPubsubPatMessage(c,pattern,channel,message); updateClientMemUsageAndBucket(c); receivers++; } } decrRefCount(channel); dictReleaseIterator(di); } return receivers; } /* Publish a message to all the subscribers. */ int pubsubPublishMessage(robj *channel, robj *message, int sharded) { return pubsubPublishMessageInternal(channel, message, sharded? pubSubShardType : pubSubType); } /*----------------------------------------------------------------------------- * Pubsub commands implementation *----------------------------------------------------------------------------*/ /* SUBSCRIBE channel [channel ...] */ void subscribeCommand(client *c) { int j; if ((c->flags & CLIENT_DENY_BLOCKING) && !(c->flags & CLIENT_MULTI)) { /** * A client that has CLIENT_DENY_BLOCKING flag on * expect a reply per command and so can not execute subscribe. * * Notice that we have a special treatment for multi because of * backward compatibility */ addReplyError(c, "SUBSCRIBE isn't allowed for a DENY BLOCKING client"); return; } for (j = 1; j < c->argc; j++) pubsubSubscribeChannel(c,c->argv[j],pubSubType); c->flags |= CLIENT_PUBSUB; } /* UNSUBSCRIBE [channel ...] */ void unsubscribeCommand(client *c) { if (c->argc == 1) { pubsubUnsubscribeAllChannels(c,1); } else { int j; for (j = 1; j < c->argc; j++) pubsubUnsubscribeChannel(c,c->argv[j],1,pubSubType); } if (clientTotalPubSubSubscriptionCount(c) == 0) c->flags &= ~CLIENT_PUBSUB; } /* PSUBSCRIBE pattern [pattern ...] */ void psubscribeCommand(client *c) { int j; if ((c->flags & CLIENT_DENY_BLOCKING) && !(c->flags & CLIENT_MULTI)) { /** * A client that has CLIENT_DENY_BLOCKING flag on * expect a reply per command and so can not execute subscribe. * * Notice that we have a special treatment for multi because of * backward compatibility */ addReplyError(c, "PSUBSCRIBE isn't allowed for a DENY BLOCKING client"); return; } for (j = 1; j < c->argc; j++) pubsubSubscribePattern(c,c->argv[j]); c->flags |= CLIENT_PUBSUB; } /* PUNSUBSCRIBE [pattern [pattern ...]] */ void punsubscribeCommand(client *c) { if (c->argc == 1) { pubsubUnsubscribeAllPatterns(c,1); } else { int j; for (j = 1; j < c->argc; j++) pubsubUnsubscribePattern(c,c->argv[j],1); } if (clientTotalPubSubSubscriptionCount(c) == 0) c->flags &= ~CLIENT_PUBSUB; } /* This function wraps pubsubPublishMessage and also propagates the message to cluster. * Used by the commands PUBLISH/SPUBLISH and their respective module APIs.*/ int pubsubPublishMessageAndPropagateToCluster(robj *channel, robj *message, int sharded) { int receivers = pubsubPublishMessage(channel, message, sharded); if (server.cluster_enabled) clusterPropagatePublish(channel, message, sharded); return receivers; } /* PUBLISH */ void publishCommand(client *c) { if (server.sentinel_mode) { sentinelPublishCommand(c); return; } int receivers = pubsubPublishMessageAndPropagateToCluster(c->argv[1],c->argv[2],0); if (!server.cluster_enabled) forceCommandPropagation(c,PROPAGATE_REPL); addReplyLongLong(c,receivers); } /* PUBSUB command for Pub/Sub introspection. */ void pubsubCommand(client *c) { if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { const char *help[] = { "CHANNELS []", " Return the currently active channels matching a (default: '*').", "NUMPAT", " Return number of subscriptions to patterns.", "NUMSUB [ ...]", " Return the number of subscribers for the specified channels, excluding", " pattern subscriptions(default: no channels).", "SHARDCHANNELS []", " Return the currently active shard level channels matching a (default: '*').", "SHARDNUMSUB [ ...]", " Return the number of subscribers for the specified shard level channel(s)", NULL }; addReplyHelp(c, help); } else if (!strcasecmp(c->argv[1]->ptr,"channels") && (c->argc == 2 || c->argc == 3)) { /* PUBSUB CHANNELS [] */ sds pat = (c->argc == 2) ? NULL : c->argv[2]->ptr; channelList(c, pat, server.pubsub_channels); } else if (!strcasecmp(c->argv[1]->ptr,"numsub") && c->argc >= 2) { /* PUBSUB NUMSUB [Channel_1 ... Channel_N] */ int j; addReplyArrayLen(c,(c->argc-2)*2); for (j = 2; j < c->argc; j++) { list *l = dictFetchValue(server.pubsub_channels,c->argv[j]); addReplyBulk(c,c->argv[j]); addReplyLongLong(c,l ? listLength(l) : 0); } } else if (!strcasecmp(c->argv[1]->ptr,"numpat") && c->argc == 2) { /* PUBSUB NUMPAT */ addReplyLongLong(c,dictSize(server.pubsub_patterns)); } else if (!strcasecmp(c->argv[1]->ptr,"shardchannels") && (c->argc == 2 || c->argc == 3)) { /* PUBSUB SHARDCHANNELS */ sds pat = (c->argc == 2) ? NULL : c->argv[2]->ptr; channelList(c,pat,server.pubsubshard_channels); } else if (!strcasecmp(c->argv[1]->ptr,"shardnumsub") && c->argc >= 2) { /* PUBSUB SHARDNUMSUB [ShardChannel_1 ... ShardChannel_N] */ int j; addReplyArrayLen(c, (c->argc-2)*2); for (j = 2; j < c->argc; j++) { list *l = dictFetchValue(server.pubsubshard_channels, c->argv[j]); addReplyBulk(c,c->argv[j]); addReplyLongLong(c,l ? listLength(l) : 0); } } else { addReplySubcommandSyntaxError(c); } } void channelList(client *c, sds pat, dict *pubsub_channels) { dictIterator *di = dictGetIterator(pubsub_channels); dictEntry *de; long mblen = 0; void *replylen; replylen = addReplyDeferredLen(c); while((de = dictNext(di)) != NULL) { robj *cobj = dictGetKey(de); sds channel = cobj->ptr; if (!pat || stringmatchlen(pat, sdslen(pat), channel, sdslen(channel),0)) { addReplyBulk(c,cobj); mblen++; } } dictReleaseIterator(di); setDeferredArrayLen(c,replylen,mblen); } /* SPUBLISH */ void spublishCommand(client *c) { int receivers = pubsubPublishMessageAndPropagateToCluster(c->argv[1],c->argv[2],1); if (!server.cluster_enabled) forceCommandPropagation(c,PROPAGATE_REPL); addReplyLongLong(c,receivers); } /* SSUBSCRIBE shardchannel [shardchannel ...] */ void ssubscribeCommand(client *c) { if (c->flags & CLIENT_DENY_BLOCKING) { /* A client that has CLIENT_DENY_BLOCKING flag on * expect a reply per command and so can not execute subscribe. */ addReplyError(c, "SSUBSCRIBE isn't allowed for a DENY BLOCKING client"); return; } for (int j = 1; j < c->argc; j++) { /* A channel is only considered to be added, if a * subscriber exists for it. And if a subscriber * already exists the slotToChannel doesn't needs * to be incremented. */ if (server.cluster_enabled & (dictFind(*pubSubShardType.serverPubSubChannels, c->argv[j]) == NULL)) { slotToChannelAdd(c->argv[j]->ptr); } pubsubSubscribeChannel(c, c->argv[j], pubSubShardType); } c->flags |= CLIENT_PUBSUB; } /* SUNSUBSCRIBE [shardchannel [shardchannel ...]] */ void sunsubscribeCommand(client *c) { if (c->argc == 1) { pubsubUnsubscribeShardAllChannels(c, 1); } else { for (int j = 1; j < c->argc; j++) { pubsubUnsubscribeChannel(c, c->argv[j], 1, pubSubShardType); } } if (clientTotalPubSubSubscriptionCount(c) == 0) c->flags &= ~CLIENT_PUBSUB; } size_t pubsubMemOverhead(client *c) { /* PubSub patterns */ size_t mem = dictMemUsage(c->pubsub_patterns); /* Global PubSub channels */ mem += dictMemUsage(c->pubsub_channels); /* Sharded PubSub channels */ mem += dictMemUsage(c->pubsubshard_channels); return mem; }