summaryrefslogtreecommitdiffstats
path: root/src/pubsub.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/pubsub.c')
-rw-r--r--src/pubsub.c754
1 files changed, 754 insertions, 0 deletions
diff --git a/src/pubsub.c b/src/pubsub.c
new file mode 100644
index 0000000..a13c5a6
--- /dev/null
+++ b/src/pubsub.c
@@ -0,0 +1,754 @@
+/*
+ * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "server.h"
+#include "cluster.h"
+
+/* Structure to hold the pubsub related metadata. Currently used
+ * for pubsub and pubsubshard feature. */
+typedef struct pubsubtype {
+ int shard;
+ dict *(*clientPubSubChannels)(client*);
+ int (*subscriptionCount)(client*);
+ dict **serverPubSubChannels;
+ robj **subscribeMsg;
+ robj **unsubscribeMsg;
+ robj **messageBulk;
+}pubsubtype;
+
+/*
+ * Get client's global Pub/Sub channels subscription count.
+ */
+int clientSubscriptionsCount(client *c);
+
+/*
+ * Get client's shard level Pub/Sub channels subscription count.
+ */
+int clientShardSubscriptionsCount(client *c);
+
+/*
+ * Get client's global Pub/Sub channels dict.
+ */
+dict* getClientPubSubChannels(client *c);
+
+/*
+ * Get client's shard level Pub/Sub channels dict.
+ */
+dict* getClientPubSubShardChannels(client *c);
+
+/*
+ * Get list of channels client is subscribed to.
+ * If a pattern is provided, the subset of channels is returned
+ * matching the pattern.
+ */
+void channelList(client *c, sds pat, dict* pubsub_channels);
+
+/*
+ * Pub/Sub type for global channels.
+ */
+pubsubtype pubSubType = {
+ .shard = 0,
+ .clientPubSubChannels = getClientPubSubChannels,
+ .subscriptionCount = clientSubscriptionsCount,
+ .serverPubSubChannels = &server.pubsub_channels,
+ .subscribeMsg = &shared.subscribebulk,
+ .unsubscribeMsg = &shared.unsubscribebulk,
+ .messageBulk = &shared.messagebulk,
+};
+
+/*
+ * Pub/Sub type for shard level channels bounded to a slot.
+ */
+pubsubtype pubSubShardType = {
+ .shard = 1,
+ .clientPubSubChannels = getClientPubSubShardChannels,
+ .subscriptionCount = clientShardSubscriptionsCount,
+ .serverPubSubChannels = &server.pubsubshard_channels,
+ .subscribeMsg = &shared.ssubscribebulk,
+ .unsubscribeMsg = &shared.sunsubscribebulk,
+ .messageBulk = &shared.smessagebulk,
+};
+
+/*-----------------------------------------------------------------------------
+ * Pubsub client replies API
+ *----------------------------------------------------------------------------*/
+
+/* Send a pubsub message of type "message" to the client.
+ * Normally 'msg' is a Redis object containing the string to send as
+ * message. However if the caller sets 'msg' as NULL, it will be able
+ * to send a special message (for instance an Array type) by using the
+ * addReply*() API family. */
+void addReplyPubsubMessage(client *c, robj *channel, robj *msg, robj *message_bulk) {
+ uint64_t old_flags = c->flags;
+ c->flags |= CLIENT_PUSHING;
+ if (c->resp == 2)
+ addReply(c,shared.mbulkhdr[3]);
+ else
+ addReplyPushLen(c,3);
+ addReply(c,message_bulk);
+ addReplyBulk(c,channel);
+ if (msg) addReplyBulk(c,msg);
+ if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
+}
+
+/* Send a pubsub message of type "pmessage" to the client. The difference
+ * with the "message" type delivered by addReplyPubsubMessage() is that
+ * this message format also includes the pattern that matched the message. */
+void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) {
+ uint64_t old_flags = c->flags;
+ c->flags |= CLIENT_PUSHING;
+ if (c->resp == 2)
+ addReply(c,shared.mbulkhdr[4]);
+ else
+ addReplyPushLen(c,4);
+ addReply(c,shared.pmessagebulk);
+ addReplyBulk(c,pat);
+ addReplyBulk(c,channel);
+ addReplyBulk(c,msg);
+ if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
+}
+
+/* Send the pubsub subscription notification to the client. */
+void addReplyPubsubSubscribed(client *c, robj *channel, pubsubtype type) {
+ uint64_t old_flags = c->flags;
+ c->flags |= CLIENT_PUSHING;
+ if (c->resp == 2)
+ addReply(c,shared.mbulkhdr[3]);
+ else
+ addReplyPushLen(c,3);
+ addReply(c,*type.subscribeMsg);
+ addReplyBulk(c,channel);
+ addReplyLongLong(c,type.subscriptionCount(c));
+ if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
+}
+
+/* Send the pubsub unsubscription notification to the client.
+ * Channel can be NULL: this is useful when the client sends a mass
+ * unsubscribe command but there are no channels to unsubscribe from: we
+ * still send a notification. */
+void addReplyPubsubUnsubscribed(client *c, robj *channel, pubsubtype type) {
+ uint64_t old_flags = c->flags;
+ c->flags |= CLIENT_PUSHING;
+ if (c->resp == 2)
+ addReply(c,shared.mbulkhdr[3]);
+ else
+ addReplyPushLen(c,3);
+ addReply(c, *type.unsubscribeMsg);
+ if (channel)
+ addReplyBulk(c,channel);
+ else
+ addReplyNull(c);
+ addReplyLongLong(c,type.subscriptionCount(c));
+ if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
+}
+
+/* Send the pubsub pattern subscription notification to the client. */
+void addReplyPubsubPatSubscribed(client *c, robj *pattern) {
+ uint64_t old_flags = c->flags;
+ c->flags |= CLIENT_PUSHING;
+ if (c->resp == 2)
+ addReply(c,shared.mbulkhdr[3]);
+ else
+ addReplyPushLen(c,3);
+ addReply(c,shared.psubscribebulk);
+ addReplyBulk(c,pattern);
+ addReplyLongLong(c,clientSubscriptionsCount(c));
+ if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
+}
+
+/* Send the pubsub pattern unsubscription notification to the client.
+ * Pattern can be NULL: this is useful when the client sends a mass
+ * punsubscribe command but there are no pattern to unsubscribe from: we
+ * still send a notification. */
+void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) {
+ uint64_t old_flags = c->flags;
+ c->flags |= CLIENT_PUSHING;
+ if (c->resp == 2)
+ addReply(c,shared.mbulkhdr[3]);
+ else
+ addReplyPushLen(c,3);
+ addReply(c,shared.punsubscribebulk);
+ if (pattern)
+ addReplyBulk(c,pattern);
+ else
+ addReplyNull(c);
+ addReplyLongLong(c,clientSubscriptionsCount(c));
+ if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
+}
+
+/*-----------------------------------------------------------------------------
+ * Pubsub low level API
+ *----------------------------------------------------------------------------*/
+
+/* Return the number of pubsub channels + patterns is handled. */
+int serverPubsubSubscriptionCount(void) {
+ return dictSize(server.pubsub_channels) + dictSize(server.pubsub_patterns);
+}
+
+/* Return the number of pubsub shard level channels is handled. */
+int serverPubsubShardSubscriptionCount(void) {
+ return dictSize(server.pubsubshard_channels);
+}
+
+
+/* Return the number of channels + patterns a client is subscribed to. */
+int clientSubscriptionsCount(client *c) {
+ return dictSize(c->pubsub_channels) + dictSize(c->pubsub_patterns);
+}
+
+/* Return the number of shard level channels a client is subscribed to. */
+int clientShardSubscriptionsCount(client *c) {
+ return dictSize(c->pubsubshard_channels);
+}
+
+dict* getClientPubSubChannels(client *c) {
+ return c->pubsub_channels;
+}
+
+dict* getClientPubSubShardChannels(client *c) {
+ return c->pubsubshard_channels;
+}
+
+/* Return the number of pubsub + pubsub shard level channels
+ * a client is subscribed to. */
+int clientTotalPubSubSubscriptionCount(client *c) {
+ return clientSubscriptionsCount(c) + clientShardSubscriptionsCount(c);
+}
+
+/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
+ * 0 if the client was already subscribed to that channel. */
+int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) {
+ dictEntry *de;
+ list *clients = NULL;
+ int retval = 0;
+
+ /* Add the channel to the client -> channels hash table */
+ if (dictAdd(type.clientPubSubChannels(c),channel,NULL) == DICT_OK) {
+ retval = 1;
+ incrRefCount(channel);
+ /* Add the client to the channel -> list of clients hash table */
+ de = dictFind(*type.serverPubSubChannels, channel);
+ if (de == NULL) {
+ clients = listCreate();
+ dictAdd(*type.serverPubSubChannels, channel, clients);
+ incrRefCount(channel);
+ } else {
+ clients = dictGetVal(de);
+ }
+ listAddNodeTail(clients,c);
+ }
+ /* Notify the client */
+ addReplyPubsubSubscribed(c,channel,type);
+ return retval;
+}
+
+/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
+ * 0 if the client was not subscribed to the specified channel. */
+int pubsubUnsubscribeChannel(client *c, robj *channel, int notify, pubsubtype type) {
+ dictEntry *de;
+ list *clients;
+ listNode *ln;
+ int retval = 0;
+
+ /* Remove the channel from the client -> channels hash table */
+ incrRefCount(channel); /* channel may be just a pointer to the same object
+ we have in the hash tables. Protect it... */
+ if (dictDelete(type.clientPubSubChannels(c),channel) == DICT_OK) {
+ retval = 1;
+ /* Remove the client from the channel -> clients list hash table */
+ de = dictFind(*type.serverPubSubChannels, channel);
+ serverAssertWithInfo(c,NULL,de != NULL);
+ clients = dictGetVal(de);
+ ln = listSearchKey(clients,c);
+ serverAssertWithInfo(c,NULL,ln != NULL);
+ listDelNode(clients,ln);
+ if (listLength(clients) == 0) {
+ /* Free the list and associated hash entry at all if this was
+ * the latest client, so that it will be possible to abuse
+ * Redis PUBSUB creating millions of channels. */
+ dictDelete(*type.serverPubSubChannels, channel);
+ /* As this channel isn't subscribed by anyone, it's safe
+ * to remove the channel from the slot. */
+ if (server.cluster_enabled & type.shard) {
+ slotToChannelDel(channel->ptr);
+ }
+ }
+ }
+ /* Notify the client */
+ if (notify) {
+ addReplyPubsubUnsubscribed(c,channel,type);
+ }
+ decrRefCount(channel); /* it is finally safe to release it */
+ return retval;
+}
+
+void pubsubShardUnsubscribeAllClients(robj *channel) {
+ int retval;
+ dictEntry *de = dictFind(server.pubsubshard_channels, channel);
+ serverAssertWithInfo(NULL,channel,de != NULL);
+ list *clients = dictGetVal(de);
+ if (listLength(clients) > 0) {
+ /* For each client subscribed to the channel, unsubscribe it. */
+ listIter li;
+ listNode *ln;
+ listRewind(clients, &li);
+ while ((ln = listNext(&li)) != NULL) {
+ client *c = listNodeValue(ln);
+ retval = dictDelete(c->pubsubshard_channels, channel);
+ serverAssertWithInfo(c,channel,retval == DICT_OK);
+ addReplyPubsubUnsubscribed(c, channel, pubSubShardType);
+ /* If the client has no other pubsub subscription,
+ * move out of pubsub mode. */
+ if (clientTotalPubSubSubscriptionCount(c) == 0) {
+ c->flags &= ~CLIENT_PUBSUB;
+ }
+ }
+ }
+ /* Delete the channel from server pubsubshard channels hash table. */
+ retval = dictDelete(server.pubsubshard_channels, channel);
+ /* Delete the channel from slots_to_channel mapping. */
+ slotToChannelDel(channel->ptr);
+ serverAssertWithInfo(NULL,channel,retval == DICT_OK);
+ decrRefCount(channel); /* it is finally safe to release it */
+}
+
+
+/* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */
+int pubsubSubscribePattern(client *c, robj *pattern) {
+ dictEntry *de;
+ list *clients;
+ int retval = 0;
+
+ if (dictAdd(c->pubsub_patterns, pattern, NULL) == DICT_OK) {
+ retval = 1;
+ incrRefCount(pattern);
+ /* Add the client to the pattern -> list of clients hash table */
+ de = dictFind(server.pubsub_patterns,pattern);
+ if (de == NULL) {
+ clients = listCreate();
+ dictAdd(server.pubsub_patterns,pattern,clients);
+ incrRefCount(pattern);
+ } else {
+ clients = dictGetVal(de);
+ }
+ listAddNodeTail(clients,c);
+ }
+ /* Notify the client */
+ addReplyPubsubPatSubscribed(c,pattern);
+ return retval;
+}
+
+/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
+ * 0 if the client was not subscribed to the specified channel. */
+int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
+ dictEntry *de;
+ list *clients;
+ listNode *ln;
+ int retval = 0;
+
+ incrRefCount(pattern); /* Protect the object. May be the same we remove */
+ if (dictDelete(c->pubsub_patterns, pattern) == DICT_OK) {
+ retval = 1;
+ /* Remove the client from the pattern -> clients list hash table */
+ de = dictFind(server.pubsub_patterns,pattern);
+ serverAssertWithInfo(c,NULL,de != NULL);
+ clients = dictGetVal(de);
+ ln = listSearchKey(clients,c);
+ serverAssertWithInfo(c,NULL,ln != NULL);
+ listDelNode(clients,ln);
+ if (listLength(clients) == 0) {
+ /* Free the list and associated hash entry at all if this was
+ * the latest client. */
+ dictDelete(server.pubsub_patterns,pattern);
+ }
+ }
+ /* Notify the client */
+ if (notify) addReplyPubsubPatUnsubscribed(c,pattern);
+ decrRefCount(pattern);
+ return retval;
+}
+
+/* Unsubscribe from all the channels. Return the number of channels the
+ * client was subscribed to. */
+int pubsubUnsubscribeAllChannelsInternal(client *c, int notify, pubsubtype type) {
+ int count = 0;
+ if (dictSize(type.clientPubSubChannels(c)) > 0) {
+ dictIterator *di = dictGetSafeIterator(type.clientPubSubChannels(c));
+ dictEntry *de;
+
+ while((de = dictNext(di)) != NULL) {
+ robj *channel = dictGetKey(de);
+
+ count += pubsubUnsubscribeChannel(c,channel,notify,type);
+ }
+ dictReleaseIterator(di);
+ }
+ /* We were subscribed to nothing? Still reply to the client. */
+ if (notify && count == 0) {
+ addReplyPubsubUnsubscribed(c,NULL,type);
+ }
+ return count;
+}
+
+/*
+ * Unsubscribe a client from all global channels.
+ */
+int pubsubUnsubscribeAllChannels(client *c, int notify) {
+ int count = pubsubUnsubscribeAllChannelsInternal(c,notify,pubSubType);
+ return count;
+}
+
+/*
+ * Unsubscribe a client from all shard subscribed channels.
+ */
+int pubsubUnsubscribeShardAllChannels(client *c, int notify) {
+ int count = pubsubUnsubscribeAllChannelsInternal(c, notify, pubSubShardType);
+ return count;
+}
+
+/*
+ * Unsubscribe a client from provided shard subscribed channel(s).
+ */
+void pubsubUnsubscribeShardChannels(robj **channels, unsigned int count) {
+ for (unsigned int j = 0; j < count; j++) {
+ /* Remove the channel from server and from the clients
+ * subscribed to it as well as notify them. */
+ pubsubShardUnsubscribeAllClients(channels[j]);
+ }
+}
+
+/* Unsubscribe from all the patterns. Return the number of patterns the
+ * client was subscribed from. */
+int pubsubUnsubscribeAllPatterns(client *c, int notify) {
+ int count = 0;
+
+ if (dictSize(c->pubsub_patterns) > 0) {
+ dictIterator *di = dictGetSafeIterator(c->pubsub_patterns);
+ dictEntry *de;
+
+ while ((de = dictNext(di)) != NULL) {
+ robj *pattern = dictGetKey(de);
+ count += pubsubUnsubscribePattern(c, pattern, notify);
+ }
+ dictReleaseIterator(di);
+ }
+
+ /* We were subscribed to nothing? Still reply to the client. */
+ if (notify && count == 0) addReplyPubsubPatUnsubscribed(c,NULL);
+ return count;
+}
+
+/*
+ * Publish a message to all the subscribers.
+ */
+int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) {
+ int receivers = 0;
+ dictEntry *de;
+ dictIterator *di;
+ listNode *ln;
+ listIter li;
+
+ /* Send to clients listening for that channel */
+ de = dictFind(*type.serverPubSubChannels, channel);
+ if (de) {
+ list *list = dictGetVal(de);
+ listNode *ln;
+ listIter li;
+
+ listRewind(list,&li);
+ while ((ln = listNext(&li)) != NULL) {
+ client *c = ln->value;
+ addReplyPubsubMessage(c,channel,message,*type.messageBulk);
+ updateClientMemUsageAndBucket(c);
+ receivers++;
+ }
+ }
+
+ if (type.shard) {
+ /* Shard pubsub ignores patterns. */
+ return receivers;
+ }
+
+ /* Send to clients listening to matching channels */
+ di = dictGetIterator(server.pubsub_patterns);
+ if (di) {
+ channel = getDecodedObject(channel);
+ while((de = dictNext(di)) != NULL) {
+ robj *pattern = dictGetKey(de);
+ list *clients = dictGetVal(de);
+ if (!stringmatchlen((char*)pattern->ptr,
+ sdslen(pattern->ptr),
+ (char*)channel->ptr,
+ sdslen(channel->ptr),0)) continue;
+
+ listRewind(clients,&li);
+ while ((ln = listNext(&li)) != NULL) {
+ client *c = listNodeValue(ln);
+ addReplyPubsubPatMessage(c,pattern,channel,message);
+ updateClientMemUsageAndBucket(c);
+ receivers++;
+ }
+ }
+ decrRefCount(channel);
+ dictReleaseIterator(di);
+ }
+ return receivers;
+}
+
+/* Publish a message to all the subscribers. */
+int pubsubPublishMessage(robj *channel, robj *message, int sharded) {
+ return pubsubPublishMessageInternal(channel, message, sharded? pubSubShardType : pubSubType);
+}
+
+/*-----------------------------------------------------------------------------
+ * Pubsub commands implementation
+ *----------------------------------------------------------------------------*/
+
+/* SUBSCRIBE channel [channel ...] */
+void subscribeCommand(client *c) {
+ int j;
+ if ((c->flags & CLIENT_DENY_BLOCKING) && !(c->flags & CLIENT_MULTI)) {
+ /**
+ * A client that has CLIENT_DENY_BLOCKING flag on
+ * expect a reply per command and so can not execute subscribe.
+ *
+ * Notice that we have a special treatment for multi because of
+ * backward compatibility
+ */
+ addReplyError(c, "SUBSCRIBE isn't allowed for a DENY BLOCKING client");
+ return;
+ }
+ for (j = 1; j < c->argc; j++)
+ pubsubSubscribeChannel(c,c->argv[j],pubSubType);
+ c->flags |= CLIENT_PUBSUB;
+}
+
+/* UNSUBSCRIBE [channel ...] */
+void unsubscribeCommand(client *c) {
+ if (c->argc == 1) {
+ pubsubUnsubscribeAllChannels(c,1);
+ } else {
+ int j;
+
+ for (j = 1; j < c->argc; j++)
+ pubsubUnsubscribeChannel(c,c->argv[j],1,pubSubType);
+ }
+ if (clientTotalPubSubSubscriptionCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
+}
+
+/* PSUBSCRIBE pattern [pattern ...] */
+void psubscribeCommand(client *c) {
+ int j;
+ if ((c->flags & CLIENT_DENY_BLOCKING) && !(c->flags & CLIENT_MULTI)) {
+ /**
+ * A client that has CLIENT_DENY_BLOCKING flag on
+ * expect a reply per command and so can not execute subscribe.
+ *
+ * Notice that we have a special treatment for multi because of
+ * backward compatibility
+ */
+ addReplyError(c, "PSUBSCRIBE isn't allowed for a DENY BLOCKING client");
+ return;
+ }
+
+ for (j = 1; j < c->argc; j++)
+ pubsubSubscribePattern(c,c->argv[j]);
+ c->flags |= CLIENT_PUBSUB;
+}
+
+/* PUNSUBSCRIBE [pattern [pattern ...]] */
+void punsubscribeCommand(client *c) {
+ if (c->argc == 1) {
+ pubsubUnsubscribeAllPatterns(c,1);
+ } else {
+ int j;
+
+ for (j = 1; j < c->argc; j++)
+ pubsubUnsubscribePattern(c,c->argv[j],1);
+ }
+ if (clientTotalPubSubSubscriptionCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
+}
+
+/* This function wraps pubsubPublishMessage and also propagates the message to cluster.
+ * Used by the commands PUBLISH/SPUBLISH and their respective module APIs.*/
+int pubsubPublishMessageAndPropagateToCluster(robj *channel, robj *message, int sharded) {
+ int receivers = pubsubPublishMessage(channel, message, sharded);
+ if (server.cluster_enabled)
+ clusterPropagatePublish(channel, message, sharded);
+ return receivers;
+}
+
+/* PUBLISH <channel> <message> */
+void publishCommand(client *c) {
+ if (server.sentinel_mode) {
+ sentinelPublishCommand(c);
+ return;
+ }
+
+ int receivers = pubsubPublishMessageAndPropagateToCluster(c->argv[1],c->argv[2],0);
+ if (!server.cluster_enabled)
+ forceCommandPropagation(c,PROPAGATE_REPL);
+ addReplyLongLong(c,receivers);
+}
+
+/* PUBSUB command for Pub/Sub introspection. */
+void pubsubCommand(client *c) {
+ if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
+ const char *help[] = {
+"CHANNELS [<pattern>]",
+" Return the currently active channels matching a <pattern> (default: '*').",
+"NUMPAT",
+" Return number of subscriptions to patterns.",
+"NUMSUB [<channel> ...]",
+" Return the number of subscribers for the specified channels, excluding",
+" pattern subscriptions(default: no channels).",
+"SHARDCHANNELS [<pattern>]",
+" Return the currently active shard level channels matching a <pattern> (default: '*').",
+"SHARDNUMSUB [<shardchannel> ...]",
+" Return the number of subscribers for the specified shard level channel(s)",
+NULL
+ };
+ addReplyHelp(c, help);
+ } else if (!strcasecmp(c->argv[1]->ptr,"channels") &&
+ (c->argc == 2 || c->argc == 3))
+ {
+ /* PUBSUB CHANNELS [<pattern>] */
+ sds pat = (c->argc == 2) ? NULL : c->argv[2]->ptr;
+ channelList(c, pat, server.pubsub_channels);
+ } else if (!strcasecmp(c->argv[1]->ptr,"numsub") && c->argc >= 2) {
+ /* PUBSUB NUMSUB [Channel_1 ... Channel_N] */
+ int j;
+
+ addReplyArrayLen(c,(c->argc-2)*2);
+ for (j = 2; j < c->argc; j++) {
+ list *l = dictFetchValue(server.pubsub_channels,c->argv[j]);
+
+ addReplyBulk(c,c->argv[j]);
+ addReplyLongLong(c,l ? listLength(l) : 0);
+ }
+ } else if (!strcasecmp(c->argv[1]->ptr,"numpat") && c->argc == 2) {
+ /* PUBSUB NUMPAT */
+ addReplyLongLong(c,dictSize(server.pubsub_patterns));
+ } else if (!strcasecmp(c->argv[1]->ptr,"shardchannels") &&
+ (c->argc == 2 || c->argc == 3))
+ {
+ /* PUBSUB SHARDCHANNELS */
+ sds pat = (c->argc == 2) ? NULL : c->argv[2]->ptr;
+ channelList(c,pat,server.pubsubshard_channels);
+ } else if (!strcasecmp(c->argv[1]->ptr,"shardnumsub") && c->argc >= 2) {
+ /* PUBSUB SHARDNUMSUB [ShardChannel_1 ... ShardChannel_N] */
+ int j;
+
+ addReplyArrayLen(c, (c->argc-2)*2);
+ for (j = 2; j < c->argc; j++) {
+ list *l = dictFetchValue(server.pubsubshard_channels, c->argv[j]);
+
+ addReplyBulk(c,c->argv[j]);
+ addReplyLongLong(c,l ? listLength(l) : 0);
+ }
+ } else {
+ addReplySubcommandSyntaxError(c);
+ }
+}
+
+void channelList(client *c, sds pat, dict *pubsub_channels) {
+ dictIterator *di = dictGetIterator(pubsub_channels);
+ dictEntry *de;
+ long mblen = 0;
+ void *replylen;
+
+ replylen = addReplyDeferredLen(c);
+ while((de = dictNext(di)) != NULL) {
+ robj *cobj = dictGetKey(de);
+ sds channel = cobj->ptr;
+
+ if (!pat || stringmatchlen(pat, sdslen(pat),
+ channel, sdslen(channel),0))
+ {
+ addReplyBulk(c,cobj);
+ mblen++;
+ }
+ }
+ dictReleaseIterator(di);
+ setDeferredArrayLen(c,replylen,mblen);
+}
+
+/* SPUBLISH <shardchannel> <message> */
+void spublishCommand(client *c) {
+ int receivers = pubsubPublishMessageAndPropagateToCluster(c->argv[1],c->argv[2],1);
+ if (!server.cluster_enabled)
+ forceCommandPropagation(c,PROPAGATE_REPL);
+ addReplyLongLong(c,receivers);
+}
+
+/* SSUBSCRIBE shardchannel [shardchannel ...] */
+void ssubscribeCommand(client *c) {
+ if (c->flags & CLIENT_DENY_BLOCKING) {
+ /* A client that has CLIENT_DENY_BLOCKING flag on
+ * expect a reply per command and so can not execute subscribe. */
+ addReplyError(c, "SSUBSCRIBE isn't allowed for a DENY BLOCKING client");
+ return;
+ }
+
+ for (int j = 1; j < c->argc; j++) {
+ /* A channel is only considered to be added, if a
+ * subscriber exists for it. And if a subscriber
+ * already exists the slotToChannel doesn't needs
+ * to be incremented. */
+ if (server.cluster_enabled &
+ (dictFind(*pubSubShardType.serverPubSubChannels, c->argv[j]) == NULL)) {
+ slotToChannelAdd(c->argv[j]->ptr);
+ }
+ pubsubSubscribeChannel(c, c->argv[j], pubSubShardType);
+ }
+ c->flags |= CLIENT_PUBSUB;
+}
+
+
+/* SUNSUBSCRIBE [shardchannel [shardchannel ...]] */
+void sunsubscribeCommand(client *c) {
+ if (c->argc == 1) {
+ pubsubUnsubscribeShardAllChannels(c, 1);
+ } else {
+ for (int j = 1; j < c->argc; j++) {
+ pubsubUnsubscribeChannel(c, c->argv[j], 1, pubSubShardType);
+ }
+ }
+ if (clientTotalPubSubSubscriptionCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
+}
+
+size_t pubsubMemOverhead(client *c) {
+ /* PubSub patterns */
+ size_t mem = dictMemUsage(c->pubsub_patterns);
+ /* Global PubSub channels */
+ mem += dictMemUsage(c->pubsub_channels);
+ /* Sharded PubSub channels */
+ mem += dictMemUsage(c->pubsubshard_channels);
+ return mem;
+}